Files
sudo/plugins/python/pyhelpers.c
Robert Manner d67c64bb37 plugins/python/pyhelpers: have a default sudo_printf function
Adapted the default sudo_printf from sudoers plugin to be able to print
errors before plugin open() gets called. (This is used by the multiple io
plugin loading to display error for too much plugin load.)

Since this makes us always have a sudo_log, I have removed the logic about
whether it is available or not.
2020-01-23 12:46:14 -07:00

462 lines
13 KiB
C

/*
* SPDX-License-Identifier: ISC
*
* Copyright (c) 2019 Robert Manner <robert.manner@oneidentity.com>
*
* Permission to use, copy, modify, and distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
* ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
* ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
* OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
/*
* This is an open source non-commercial project. Dear PVS-Studio, please check it.
* PVS-Studio Static Code Analyzer for C, C++ and C#: http://www.viva64.com
*/
#include "pyhelpers.h"
#include <pwd.h>
#include <signal.h>
#include "pathnames.h"
static int
_sudo_printf_default(int msg_type, const char *fmt, ...)
{
FILE *fp = stdout;
FILE *ttyfp = NULL;
va_list ap;
int len;
if (ISSET(msg_type, SUDO_CONV_PREFER_TTY)) {
/* Try writing to /dev/tty first. */
ttyfp = fopen(_PATH_TTY, "w");
}
switch (msg_type & 0xff) {
case SUDO_CONV_ERROR_MSG:
fp = stderr;
/* FALLTHROUGH */
case SUDO_CONV_INFO_MSG:
va_start(ap, fmt);
len = vfprintf(ttyfp ? ttyfp : fp, fmt, ap);
va_end(ap);
break;
default:
len = -1;
errno = EINVAL;
break;
}
if (ttyfp != NULL)
fclose(ttyfp);
return len;
}
struct PythonContext py_ctx = {
&_sudo_printf_default,
NULL,
0,
NULL
};
int
py_is_sudo_log_available(void)
{
debug_decl(py_is_sudo_log_available, PYTHON_DEBUG_INTERNAL);
debug_return_int(py_ctx.sudo_log != &_sudo_printf_default);
}
char *
py_join_str_list(PyObject *py_str_list, const char *separator)
{
debug_decl(py_join_str_list, PYTHON_DEBUG_INTERNAL);
char *result = NULL;
PyObject *py_separator = NULL;
PyObject *py_str = NULL;
py_separator = PyUnicode_FromString(separator);
if (py_separator == NULL)
goto cleanup;
py_str = PyObject_CallMethod(py_separator, "join", "(O)", py_str_list);
if (py_str == NULL) {
goto cleanup;
}
const char *str = PyUnicode_AsUTF8(py_str);
if (str != NULL) {
result = strdup(str);
}
cleanup:
Py_XDECREF(py_str);
Py_XDECREF(py_separator);
debug_return_str(result);
}
char *
py_create_traceback_string(PyObject *py_traceback)
{
debug_decl(py_create_traceback_string, PYTHON_DEBUG_INTERNAL);
if (py_traceback == NULL)
debug_return_str(strdup(""));
char* traceback = NULL;
if (py_ctx.py_traceback_module != NULL) {
PyObject *py_traceback_str_list = PyObject_CallMethod(py_ctx.py_traceback_module, "format_tb", "(O)", py_traceback);
if (py_traceback_str_list != NULL) {
traceback = py_join_str_list(py_traceback_str_list, "");
Py_DECREF(py_traceback_str_list);
}
}
debug_return_str(traceback ? traceback : strdup(""));
}
void
py_log_last_error(const char *context_message)
{
debug_decl(py_log_last_error, PYTHON_DEBUG_INTERNAL);
if (!PyErr_Occurred()) {
py_sudo_log(SUDO_CONV_ERROR_MSG, "%s\n", context_message);
debug_return;
}
PyObject *py_type, *py_message, *py_traceback;
PyErr_Fetch(&py_type, &py_message, &py_traceback);
char *message = py_message ? py_create_string_rep(py_message) : strdup("(NULL)");
if (message == NULL)
message = strdup("?");
py_sudo_log(SUDO_CONV_ERROR_MSG, "%s%s(%s) %s\n",
context_message ? context_message : "",
context_message && *context_message ? ": " : "",
py_type ? ((PyTypeObject *)py_type)->tp_name : "None",
message);
free(message);
if (py_traceback != NULL) {
char *traceback = py_create_traceback_string(py_traceback);
py_sudo_log(SUDO_CONV_INFO_MSG, "Traceback:\n%s\n", traceback);
free(traceback);
}
Py_XDECREF(py_type);
Py_XDECREF(py_message);
Py_XDECREF(py_traceback);
debug_return;
}
PyObject *
py_str_array_to_tuple_with_count(Py_ssize_t count, char * const strings[])
{
debug_decl(py_str_array_to_tuple_with_count, PYTHON_DEBUG_INTERNAL);
PyObject *py_argv = PyTuple_New(count);
if (py_argv == NULL)
debug_return_ptr(NULL);
for (int i = 0; i < count; ++i) {
PyObject *py_arg = PyUnicode_FromString(strings[i]);
if (py_arg == NULL || PyTuple_SetItem(py_argv, i, py_arg) != 0) {
Py_CLEAR(py_argv);
break;
}
}
debug_return_ptr(py_argv);
}
PyObject *
py_str_array_to_tuple(char * const strings[])
{
debug_decl(py_str_array_to_tuple, PYTHON_DEBUG_INTERNAL);
// find the item count ("strings" ends with NULL terminator):
Py_ssize_t count = 0;
if (strings != NULL) {
while (strings[count] != NULL)
++count;
}
debug_return_ptr(py_str_array_to_tuple_with_count(count, strings));
}
char **
py_str_array_from_tuple(PyObject *py_tuple)
{
debug_decl(py_str_array_from_tuple, PYTHON_DEBUG_INTERNAL);
if (!PyTuple_Check(py_tuple)) {
PyErr_Format(PyExc_ValueError, "%s: value error, argument should be a tuple but it is '%s'",
__PRETTY_FUNCTION__, Py_TYPENAME(py_tuple));
debug_return_ptr(NULL);
}
Py_ssize_t tuple_size = PyTuple_Size(py_tuple);
// we need an extra 0 at the end
char **result = calloc(Py_SSIZE2SIZE(tuple_size) + 1, sizeof(char*));
for (int i = 0; i < tuple_size; ++i) {
PyObject *py_value = PyTuple_GetItem(py_tuple, i);
if (py_value == NULL) {
str_array_free(&result);
debug_return_ptr(NULL);
}
// Note that it can be an "int" or something else as well
char *value = py_create_string_rep(py_value);
if (value == NULL) {
// conversion error is already set
str_array_free(&result);
debug_return_ptr(NULL);
}
result[i] = value;
}
debug_return_ptr(result);
}
PyObject *
py_tuple_get(PyObject *py_tuple, Py_ssize_t index, PyTypeObject *expected_type)
{
debug_decl(py_tuple_get, PYTHON_DEBUG_INTERNAL);
PyObject *py_item = PyTuple_GetItem(py_tuple, index);
if (py_item == NULL) {
debug_return_ptr(NULL);
}
if (!PyObject_TypeCheck(py_item, expected_type)) {
PyErr_Format(PyExc_ValueError, "Value error: tuple element %d should "
"be a '%s' (but it is '%s')",
index, expected_type->tp_name, Py_TYPENAME(py_item));
debug_return_ptr(NULL);
}
debug_return_ptr(py_item);
}
PyObject *
py_create_version(unsigned int version)
{
debug_decl(py_create_version, PYTHON_DEBUG_INTERNAL);
debug_return_ptr(PyUnicode_FromFormat("%d.%d", SUDO_API_VERSION_GET_MAJOR(version),
SUDO_API_VERSION_GET_MINOR(version)));
}
PyObject *
py_from_passwd(const struct passwd *pwd)
{
debug_decl(py_from_passwd, PYTHON_DEBUG_INTERNAL);
if (pwd == NULL) {
debug_return_ptr_pynone;
}
// Create a tuple similar and convertible to python "struct_passwd" of "pwd" module
debug_return_ptr(
Py_BuildValue("(zziizzz)", pwd->pw_name, pwd->pw_passwd,
pwd->pw_uid, pwd->pw_gid, pwd->pw_gecos,
pwd->pw_dir, pwd->pw_shell)
);
}
char *
py_create_string_rep(PyObject *py_object)
{
debug_decl(py_create_string_rep, PYTHON_DEBUG_INTERNAL);
char *result = NULL;
if (py_object == NULL)
debug_return_ptr(NULL);
PyObject *py_string = PyObject_Str(py_object);
if (py_string != NULL) {
const char *bytes = PyUnicode_AsUTF8(py_string);
if (bytes != NULL) {
result = strdup(bytes);
}
}
Py_XDECREF(py_string);
debug_return_ptr(result);
}
static void
_py_debug_python_function(const char *class_name, const char *function_name, const char *message,
PyObject *py_args, PyObject *py_kwargs, int subsystem_id)
{
debug_decl_vars(_py_debug_python_function, subsystem_id);
if (sudo_debug_needed(SUDO_DEBUG_DIAG)) {
char *args_str = NULL;
char *kwargs_str = NULL;
if (py_args != NULL)
args_str = py_create_string_rep(py_args);
if (py_kwargs != NULL) {
kwargs_str = py_create_string_rep(py_kwargs);
}
if (args_str == NULL)
args_str = strdup("()");
if (kwargs_str == NULL)
kwargs_str = strdup("");
sudo_debug_printf(SUDO_DEBUG_DIAG, "%s.%s %s: %s %s\n", class_name,
function_name, message, args_str, kwargs_str);
free(args_str);
free(kwargs_str);
}
}
void
py_debug_python_call(const char *class_name, const char *function_name,
PyObject *py_args, PyObject *py_kwargs, int subsystem_id)
{
debug_decl_vars(_py_debug_python_function, subsystem_id);
if (subsystem_id == PYTHON_DEBUG_C_CALLS && sudo_debug_needed(SUDO_DEBUG_INFO)) {
// at this level we also output the callee python script
char *callee_func_name = NULL, *callee_file_name = NULL;
long callee_line_number = -1;
if (py_get_current_execution_frame(&callee_file_name, &callee_line_number, &callee_func_name) == SUDO_RC_OK) {
sudo_debug_printf(SUDO_DEBUG_INFO, "%s @ %s:%ld calls C function:\n",
callee_func_name, callee_file_name, callee_line_number);
}
free(callee_func_name);
free(callee_file_name);
}
_py_debug_python_function(class_name, function_name, "was called with arguments",
py_args, py_kwargs, subsystem_id);
}
void
py_debug_python_result(const char *class_name, const char *function_name,
PyObject *py_result, int subsystem_id)
{
if (py_result == NULL) {
debug_decl_vars(py_debug_python_result, subsystem_id);
sudo_debug_printf(SUDO_CONV_ERROR_MSG, "%s.%s call failed\n",
class_name, function_name);
} else {
_py_debug_python_function(class_name, function_name, "returned result",
py_result, NULL, subsystem_id);
}
}
void
str_array_free(char ***array)
{
debug_decl(str_array_free, PYTHON_DEBUG_INTERNAL);
if (*array == NULL)
debug_return;
for (char **item_ptr = *array; *item_ptr != NULL; ++item_ptr)
free(*item_ptr);
free(*array);
*array = NULL;
debug_return;
}
int
py_get_current_execution_frame(char **file_name, long *line_number, char **function_name)
{
*file_name = NULL;
*line_number = (long)-1;
*function_name = NULL;
PyObject *py_err_type = NULL, *py_err_value = NULL, *py_err_traceback = NULL;
PyErr_Fetch(&py_err_type, &py_err_value, &py_err_traceback);
PyObject *py_frame = NULL, *py_lineno = NULL, *py_f_code = NULL,
*py_filename = NULL, *py_function_name = NULL;
PyObject *py_getframe = PySys_GetObject("_getframe");
if (py_getframe == NULL)
goto cleanup;
py_frame = PyObject_CallFunction(py_getframe, "i", 0);
if (py_frame == NULL)
goto cleanup;
py_lineno = PyObject_GetAttrString(py_frame, "f_lineno");
if (py_lineno != NULL) {
*line_number = PyLong_AsLong(py_lineno);
}
py_f_code = PyObject_GetAttrString(py_frame, "f_code");
if (py_f_code != NULL) {
py_filename = PyObject_GetAttrString(py_f_code, "co_filename");
if (py_filename != NULL)
*file_name = strdup(PyUnicode_AsUTF8(py_filename));
py_function_name = PyObject_GetAttrString(py_f_code, "co_name");
if (py_function_name != NULL)
*function_name = strdup(PyUnicode_AsUTF8(py_function_name));
}
cleanup:
Py_CLEAR(py_frame);
Py_CLEAR(py_lineno);
Py_CLEAR(py_f_code);
Py_CLEAR(py_filename);
Py_CLEAR(py_function_name);
// we hide every error happening inside this function
PyErr_Restore(py_err_type, py_err_value, py_err_traceback);
return (*file_name && *function_name && (*line_number >= 0)) ?
SUDO_RC_OK : SUDO_RC_ERROR;
}
void
py_ctx_reset()
{
memset(&py_ctx, 0, sizeof(py_ctx));
py_ctx.sudo_log = &_sudo_printf_default;
}
int
py_sudo_conv(int num_msgs, const struct sudo_conv_message msgs[], struct sudo_conv_reply replies[], struct sudo_conv_callback *callback)
{
/* Enable suspend during password entry. */
struct sigaction sa, saved_sigtstp;
sigemptyset(&sa.sa_mask);
sa.sa_flags = SA_RESTART;
sa.sa_handler = SIG_DFL;
(void) sigaction(SIGTSTP, &sa, &saved_sigtstp);
int rc = SUDO_RC_ERROR;
if (py_ctx.sudo_conv != NULL)
rc = py_ctx.sudo_conv((int)num_msgs, msgs, replies, callback);
/* Restore signal handlers and signal mask. */
(void) sigaction(SIGTSTP, &saved_sigtstp, NULL);
return rc;
}