| /* |
| * interface to SPI functions |
| * |
| * src/pl/plpython/plpy_spi.c |
| */ |
| |
| #include "postgres.h" |
| |
| #include <limits.h> |
| |
| #include "access/htup_details.h" |
| #include "access/xact.h" |
| #include "catalog/pg_type.h" |
| #include "executor/spi.h" |
| #include "mb/pg_wchar.h" |
| #include "parser/parse_type.h" |
| #include "plpy_elog.h" |
| #include "plpy_main.h" |
| #include "plpy_planobject.h" |
| #include "plpy_plpymodule.h" |
| #include "plpy_procedure.h" |
| #include "plpy_resultobject.h" |
| #include "plpy_spi.h" |
| #include "plpython.h" |
| #include "utils/faultinjector.h" |
| #include "utils/memutils.h" |
| #include "utils/syscache.h" |
| |
| static PyObject *PLy_spi_execute_query(char *query, long limit); |
| static PyObject *PLy_spi_execute_fetch_result(SPITupleTable *tuptable, |
| uint64 rows, int status); |
| static void PLy_spi_exception_set(PyObject *excclass, ErrorData *edata); |
| |
| |
| /* prepare(query="select * from foo") |
| * prepare(query="select * from foo where bar = $1", params=["text"]) |
| * prepare(query="select * from foo where bar = $1", params=["text"], limit=5) |
| */ |
| PyObject * |
| PLy_spi_prepare(PyObject *self, PyObject *args) |
| { |
| PLyPlanObject *plan; |
| PyObject *list = NULL; |
| PyObject *volatile optr = NULL; |
| char *query; |
| PLyExecutionContext *exec_ctx = PLy_current_execution_context(); |
| volatile MemoryContext oldcontext; |
| volatile ResourceOwner oldowner; |
| volatile int nargs; |
| |
| PLy_enter_python_intepreter = false; |
| |
| if (!PyArg_ParseTuple(args, "s|O:prepare", &query, &list)) |
| return NULL; |
| |
| if (list && (!PySequence_Check(list))) |
| { |
| PLy_exception_set(PyExc_TypeError, |
| "second argument of plpy.prepare must be a sequence"); |
| PLy_enter_python_intepreter = true; |
| return NULL; |
| } |
| |
| if ((plan = (PLyPlanObject *) PLy_plan_new()) == NULL) |
| { |
| PLy_enter_python_intepreter = true; |
| return NULL; |
| } |
| |
| plan->mcxt = AllocSetContextCreate(TopMemoryContext, |
| "PL/Python plan context", |
| ALLOCSET_DEFAULT_SIZES); |
| oldcontext = MemoryContextSwitchTo(plan->mcxt); |
| |
| nargs = list ? PySequence_Length(list) : 0; |
| |
| plan->nargs = nargs; |
| plan->types = nargs ? palloc0(sizeof(Oid) * nargs) : NULL; |
| plan->values = nargs ? palloc0(sizeof(Datum) * nargs) : NULL; |
| plan->args = nargs ? palloc0(sizeof(PLyObToDatum) * nargs) : NULL; |
| |
| MemoryContextSwitchTo(oldcontext); |
| |
| oldcontext = CurrentMemoryContext; |
| oldowner = CurrentResourceOwner; |
| |
| if(!PLy_spi_subtransaction_begin(oldcontext, oldowner)) |
| return NULL; |
| |
| PG_TRY(); |
| { |
| int i; |
| |
| for (i = 0; i < nargs; i++) |
| { |
| char *sptr; |
| Oid typeId; |
| int32 typmod; |
| |
| optr = PySequence_GetItem(list, i); |
| if (PyString_Check(optr)) |
| sptr = PyString_AsString(optr); |
| else if (PyUnicode_Check(optr)) |
| sptr = PLyUnicode_AsString(optr); |
| else |
| { |
| ereport(ERROR, |
| (errmsg("plpy.prepare: type name at ordinal position %d is not a string", i))); |
| sptr = NULL; /* keep compiler quiet */ |
| } |
| |
| /******************************************************** |
| * Resolve argument type names and then look them up by |
| * oid in the system cache, and remember the required |
| *information for input conversion. |
| ********************************************************/ |
| |
| parseTypeString(sptr, &typeId, &typmod, false); |
| |
| Py_DECREF(optr); |
| |
| /* |
| * set optr to NULL, so we won't try to unref it again in case of |
| * an error |
| */ |
| optr = NULL; |
| |
| plan->types[i] = typeId; |
| PLy_output_setup_func(&plan->args[i], plan->mcxt, |
| typeId, typmod, |
| exec_ctx->curr_proc); |
| } |
| |
| pg_verifymbstr(query, strlen(query), false); |
| plan->plan = SPI_prepare(query, plan->nargs, plan->types); |
| if (plan->plan == NULL) |
| elog(ERROR, "SPI_prepare failed: %s", |
| SPI_result_code_string(SPI_result)); |
| |
| /* transfer plan from procCxt to topCxt */ |
| if (SPI_keepplan(plan->plan)) |
| elog(ERROR, "SPI_keepplan failed"); |
| |
| PLy_spi_subtransaction_commit(oldcontext, oldowner); |
| } |
| PG_CATCH(); |
| { |
| Py_DECREF(plan); |
| Py_XDECREF(optr); |
| |
| PLy_spi_subtransaction_abort(oldcontext, oldowner); |
| return NULL; |
| } |
| PG_END_TRY(); |
| |
| Assert(plan->plan != NULL); |
| PLy_enter_python_intepreter = true; |
| return (PyObject *) plan; |
| } |
| |
| /* execute(query="select * from foo", limit=5) |
| * execute(plan=plan, values=(foo, bar), limit=5) |
| */ |
| PyObject * |
| PLy_spi_execute(PyObject *self, PyObject *args) |
| { |
| char *query; |
| PyObject *plan; |
| PyObject *list = NULL; |
| long limit = 0; |
| PyObject *ret; |
| PLy_enter_python_intepreter = false; |
| |
| if (PyArg_ParseTuple(args, "s|l", &query, &limit)) |
| { |
| ret = PLy_spi_execute_query(query, (int64)limit); |
| PLy_enter_python_intepreter = true; |
| return ret; |
| } |
| |
| PyErr_Clear(); |
| |
| if (PyArg_ParseTuple(args, "O|Ol", &plan, &list, &limit) && |
| is_PLyPlanObject(plan)) |
| { |
| ret = PLy_spi_execute_plan(plan, list, (int64)limit); |
| PLy_enter_python_intepreter = true; |
| return ret; |
| } |
| |
| PLy_exception_set(PLy_exc_error, "plpy.execute expected a query or a plan"); |
| PLy_enter_python_intepreter = true; |
| return NULL; |
| } |
| |
| PyObject * |
| PLy_spi_execute_plan(PyObject *ob, PyObject *list, int64 limit) |
| { |
| volatile int nargs; |
| int i, |
| rv; |
| PLyPlanObject *plan; |
| volatile MemoryContext oldcontext; |
| volatile ResourceOwner oldowner; |
| PyObject *ret; |
| |
| if (list != NULL) |
| { |
| if (!PySequence_Check(list) || PyString_Check(list) || PyUnicode_Check(list)) |
| { |
| PLy_exception_set(PyExc_TypeError, "plpy.execute takes a sequence as its second argument"); |
| return NULL; |
| } |
| nargs = PySequence_Length(list); |
| } |
| else |
| nargs = 0; |
| |
| plan = (PLyPlanObject *) ob; |
| |
| if (nargs != plan->nargs) |
| { |
| char *sv; |
| PyObject *so = PyObject_Str(list); |
| |
| if (!so) |
| PLy_elog(ERROR, "could not execute plan"); |
| sv = PyString_AsString(so); |
| PLy_exception_set_plural(PyExc_TypeError, |
| "Expected sequence of %d argument, got %d: %s", |
| "Expected sequence of %d arguments, got %d: %s", |
| plan->nargs, |
| plan->nargs, nargs, sv); |
| Py_DECREF(so); |
| |
| return NULL; |
| } |
| |
| oldcontext = CurrentMemoryContext; |
| oldowner = CurrentResourceOwner; |
| |
| if(!PLy_spi_subtransaction_begin(oldcontext, oldowner)) |
| return NULL; |
| |
| PG_TRY(); |
| { |
| PLyExecutionContext *exec_ctx = PLy_current_execution_context(); |
| char *volatile nulls; |
| volatile int j; |
| |
| if (nargs > 0) |
| nulls = palloc(nargs * sizeof(char)); |
| else |
| nulls = NULL; |
| |
| for (j = 0; j < nargs; j++) |
| { |
| PLyObToDatum *arg = &plan->args[j]; |
| PyObject *elem; |
| |
| elem = PySequence_GetItem(list, j); |
| PG_TRY(); |
| { |
| bool isnull; |
| |
| plan->values[j] = PLy_output_convert(arg, elem, &isnull); |
| nulls[j] = isnull ? 'n' : ' '; |
| } |
| PG_FINALLY(); |
| { |
| Py_DECREF(elem); |
| } |
| PG_END_TRY(); |
| } |
| |
| rv = SPI_execute_plan(plan->plan, plan->values, nulls, |
| exec_ctx->curr_proc->fn_readonly, limit); |
| ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed, rv); |
| |
| if (nargs > 0) |
| pfree(nulls); |
| |
| PLy_spi_subtransaction_commit(oldcontext, oldowner); |
| } |
| PG_CATCH(); |
| { |
| int k; |
| |
| /* |
| * cleanup plan->values array |
| */ |
| for (k = 0; k < nargs; k++) |
| { |
| if (!plan->args[k].typbyval && |
| (plan->values[k] != PointerGetDatum(NULL))) |
| { |
| pfree(DatumGetPointer(plan->values[k])); |
| plan->values[k] = PointerGetDatum(NULL); |
| } |
| } |
| |
| PLy_spi_subtransaction_abort(oldcontext, oldowner); |
| return NULL; |
| } |
| PG_END_TRY(); |
| |
| for (i = 0; i < nargs; i++) |
| { |
| if (!plan->args[i].typbyval && |
| (plan->values[i] != PointerGetDatum(NULL))) |
| { |
| pfree(DatumGetPointer(plan->values[i])); |
| plan->values[i] = PointerGetDatum(NULL); |
| } |
| } |
| |
| if (rv < 0) |
| { |
| PLy_exception_set(PLy_exc_spi_error, |
| "SPI_execute_plan failed: %s", |
| SPI_result_code_string(rv)); |
| return NULL; |
| } |
| |
| return ret; |
| } |
| |
| static PyObject * |
| PLy_spi_execute_query(char *query, int64 limit) |
| { |
| int rv; |
| volatile MemoryContext oldcontext; |
| volatile ResourceOwner oldowner; |
| PyObject *ret = NULL; |
| |
| oldcontext = CurrentMemoryContext; |
| oldowner = CurrentResourceOwner; |
| |
| if(!PLy_spi_subtransaction_begin(oldcontext, oldowner)) |
| return NULL; |
| |
| PG_TRY(); |
| { |
| PLyExecutionContext *exec_ctx = PLy_current_execution_context(); |
| |
| pg_verifymbstr(query, strlen(query), false); |
| rv = SPI_execute(query, exec_ctx->curr_proc->fn_readonly, limit); |
| ret = PLy_spi_execute_fetch_result(SPI_tuptable, SPI_processed, rv); |
| |
| PLy_spi_subtransaction_commit(oldcontext, oldowner); |
| } |
| PG_CATCH(); |
| { |
| PLy_spi_subtransaction_abort(oldcontext, oldowner); |
| return NULL; |
| } |
| PG_END_TRY(); |
| |
| if (rv < 0) |
| { |
| Py_XDECREF(ret); |
| PLy_exception_set(PLy_exc_spi_error, |
| "SPI_execute failed: %s", |
| SPI_result_code_string(rv)); |
| return NULL; |
| } |
| |
| return ret; |
| } |
| |
| static PyObject * |
| PLy_spi_execute_fetch_result(SPITupleTable *tuptable, uint64 rows, int status) |
| { |
| PLyResultObject *result; |
| PLyExecutionContext *exec_ctx = PLy_current_execution_context(); |
| volatile MemoryContext oldcontext; |
| |
| |
| #ifdef FAULT_INJECTOR |
| if (rows >= 10000 && rows <= 1000000) |
| { |
| if (FaultInjector_InjectFaultIfSet("executor_run_high_processed", |
| DDLNotSpecified, |
| "" /* databaseName */, |
| "" /* tableName */) == FaultInjectorTypeSkip) |
| { |
| /* |
| * For testing purposes, pretend that we have already processed |
| * almost 2^32 rows. |
| */ |
| rows = UINT_MAX - 10; |
| } |
| } |
| #endif /* FAULT_INJECTOR */ |
| |
| result = (PLyResultObject *) PLy_result_new(); |
| if (!result) |
| { |
| SPI_freetuptable(tuptable); |
| return NULL; |
| } |
| Py_DECREF(result->status); |
| result->status = PyInt_FromLong(status); |
| |
| if (status > 0 && tuptable == NULL) |
| { |
| Py_DECREF(result->nrows); |
| result->nrows = PyLong_FromUnsignedLongLong(rows); |
| } |
| else if (status > 0 && tuptable != NULL) |
| { |
| PLyDatumToOb ininfo; |
| MemoryContext cxt; |
| |
| Py_DECREF(result->nrows); |
| result->nrows = PyLong_FromUnsignedLongLong(rows); |
| |
| cxt = AllocSetContextCreate(CurrentMemoryContext, |
| "PL/Python temp context", |
| ALLOCSET_DEFAULT_SIZES); |
| |
| /* Initialize for converting result tuples to Python */ |
| PLy_input_setup_func(&ininfo, cxt, RECORDOID, -1, |
| exec_ctx->curr_proc); |
| |
| oldcontext = CurrentMemoryContext; |
| PG_TRY(); |
| { |
| MemoryContext oldcontext2; |
| |
| if (rows) |
| { |
| uint64 i; |
| |
| /* |
| * PyList_New() and PyList_SetItem() use Py_ssize_t for list |
| * size and list indices; so we cannot support a result larger |
| * than PY_SSIZE_T_MAX. |
| */ |
| if (rows > (uint64) PY_SSIZE_T_MAX) |
| ereport(ERROR, |
| (errcode(ERRCODE_PROGRAM_LIMIT_EXCEEDED), |
| errmsg("query result has too many rows to fit in a Python list"))); |
| |
| Py_DECREF(result->rows); |
| result->rows = PyList_New(rows); |
| if (result->rows) |
| { |
| PLy_input_setup_tuple(&ininfo, tuptable->tupdesc, |
| exec_ctx->curr_proc); |
| |
| for (i = 0; i < rows; i++) |
| { |
| PyObject *row = PLy_input_from_tuple(&ininfo, |
| tuptable->vals[i], |
| tuptable->tupdesc, |
| true); |
| |
| PyList_SetItem(result->rows, i, row); |
| } |
| } |
| } |
| |
| /* |
| * Save tuple descriptor for later use by result set metadata |
| * functions. Save it in TopMemoryContext so that it survives |
| * outside of an SPI context. We trust that PLy_result_dealloc() |
| * will clean it up when the time is right. (Do this as late as |
| * possible, to minimize the number of ways the tupdesc could get |
| * leaked due to errors.) |
| */ |
| oldcontext2 = MemoryContextSwitchTo(TopMemoryContext); |
| result->tupdesc = CreateTupleDescCopy(tuptable->tupdesc); |
| MemoryContextSwitchTo(oldcontext2); |
| } |
| PG_CATCH(); |
| { |
| MemoryContextSwitchTo(oldcontext); |
| MemoryContextDelete(cxt); |
| Py_DECREF(result); |
| PG_RE_THROW(); |
| } |
| PG_END_TRY(); |
| |
| MemoryContextDelete(cxt); |
| SPI_freetuptable(tuptable); |
| |
| /* in case PyList_New() failed above */ |
| if (!result->rows) |
| { |
| Py_DECREF(result); |
| result = NULL; |
| } |
| } |
| |
| return (PyObject *) result; |
| } |
| |
| PyObject * |
| PLy_commit(PyObject *self, PyObject *args) |
| { |
| MemoryContext oldcontext = CurrentMemoryContext; |
| PLyExecutionContext *exec_ctx = PLy_current_execution_context(); |
| |
| PG_TRY(); |
| { |
| SPI_commit(); |
| |
| /* was cleared at transaction end, reset pointer */ |
| exec_ctx->scratch_ctx = NULL; |
| } |
| PG_CATCH(); |
| { |
| ErrorData *edata; |
| PLyExceptionEntry *entry; |
| PyObject *exc; |
| |
| /* Save error info */ |
| MemoryContextSwitchTo(oldcontext); |
| edata = CopyErrorData(); |
| FlushErrorState(); |
| |
| /* was cleared at transaction end, reset pointer */ |
| exec_ctx->scratch_ctx = NULL; |
| |
| /* Look up the correct exception */ |
| entry = hash_search(PLy_spi_exceptions, &(edata->sqlerrcode), |
| HASH_FIND, NULL); |
| |
| /* |
| * This could be a custom error code, if that's the case fallback to |
| * SPIError |
| */ |
| exc = entry ? entry->exc : PLy_exc_spi_error; |
| /* Make Python raise the exception */ |
| PLy_spi_exception_set(exc, edata); |
| FreeErrorData(edata); |
| |
| return NULL; |
| } |
| PG_END_TRY(); |
| |
| Py_RETURN_NONE; |
| } |
| |
| PyObject * |
| PLy_rollback(PyObject *self, PyObject *args) |
| { |
| MemoryContext oldcontext = CurrentMemoryContext; |
| PLyExecutionContext *exec_ctx = PLy_current_execution_context(); |
| |
| PG_TRY(); |
| { |
| SPI_rollback(); |
| |
| /* was cleared at transaction end, reset pointer */ |
| exec_ctx->scratch_ctx = NULL; |
| } |
| PG_CATCH(); |
| { |
| ErrorData *edata; |
| PLyExceptionEntry *entry; |
| PyObject *exc; |
| |
| /* Save error info */ |
| MemoryContextSwitchTo(oldcontext); |
| edata = CopyErrorData(); |
| FlushErrorState(); |
| |
| /* was cleared at transaction end, reset pointer */ |
| exec_ctx->scratch_ctx = NULL; |
| |
| /* Look up the correct exception */ |
| entry = hash_search(PLy_spi_exceptions, &(edata->sqlerrcode), |
| HASH_FIND, NULL); |
| |
| /* |
| * This could be a custom error code, if that's the case fallback to |
| * SPIError |
| */ |
| exc = entry ? entry->exc : PLy_exc_spi_error; |
| /* Make Python raise the exception */ |
| PLy_spi_exception_set(exc, edata); |
| FreeErrorData(edata); |
| |
| return NULL; |
| } |
| PG_END_TRY(); |
| |
| Py_RETURN_NONE; |
| } |
| |
| /* |
| * Utilities for running SPI functions in subtransactions. |
| * |
| * Usage: |
| * |
| * MemoryContext oldcontext = CurrentMemoryContext; |
| * ResourceOwner oldowner = CurrentResourceOwner; |
| * |
| * if(!PLy_spi_subtransaction_begin(oldcontext, oldowner)) |
| * return NULL; |
| * |
| * PG_TRY(); |
| * { |
| * <call SPI functions> |
| * PLy_spi_subtransaction_commit(oldcontext, oldowner); |
| * } |
| * PG_CATCH(); |
| * { |
| * <do cleanup> |
| * PLy_spi_subtransaction_abort(oldcontext, oldowner); |
| * return NULL; |
| * } |
| * PG_END_TRY(); |
| * |
| * These utilities take care of restoring connection to the SPI manager and |
| * setting a Python exception in case of an abort. |
| */ |
| bool |
| PLy_spi_subtransaction_begin(MemoryContext oldcontext, ResourceOwner oldowner) |
| { |
| PG_TRY(); |
| { |
| /* Start subtransaction (could fail) */ |
| BeginInternalSubTransaction(NULL); |
| /* Want to run inside function's memory context */ |
| MemoryContextSwitchTo(oldcontext); |
| } |
| PG_CATCH(); |
| { |
| ErrorData *edata; |
| PLyExceptionEntry *entry; |
| PyObject *exc; |
| |
| /* Ensure we restore original context and owner */ |
| MemoryContextSwitchTo(oldcontext); |
| CurrentResourceOwner = oldowner; |
| |
| /* Save error info */ |
| edata = CopyErrorData(); |
| FlushErrorState(); |
| |
| /* Look up the correct exception */ |
| entry = hash_search(PLy_spi_exceptions, &(edata->sqlerrcode), |
| HASH_FIND, NULL); |
| |
| /* |
| * This could be a custom error code, if that's the case fallback to |
| * SPIError |
| */ |
| exc = entry ? entry->exc : PLy_exc_spi_error; |
| /* Make Python raise the exception */ |
| PLy_spi_exception_set(exc, edata); |
| FreeErrorData(edata); |
| |
| return false; |
| } |
| PG_END_TRY(); |
| |
| return true; |
| } |
| |
| void |
| PLy_spi_subtransaction_commit(MemoryContext oldcontext, ResourceOwner oldowner) |
| { |
| /* Commit the inner transaction, return to outer xact context */ |
| ReleaseCurrentSubTransaction(); |
| MemoryContextSwitchTo(oldcontext); |
| CurrentResourceOwner = oldowner; |
| } |
| |
| void |
| PLy_spi_subtransaction_abort(MemoryContext oldcontext, ResourceOwner oldowner) |
| { |
| ErrorData *edata; |
| PLyExceptionEntry *entry; |
| PyObject *exc; |
| |
| /* Save error info */ |
| MemoryContextSwitchTo(oldcontext); |
| edata = CopyErrorData(); |
| FlushErrorState(); |
| |
| /* Abort the inner transaction */ |
| RollbackAndReleaseCurrentSubTransaction(); |
| MemoryContextSwitchTo(oldcontext); |
| CurrentResourceOwner = oldowner; |
| |
| /* Look up the correct exception */ |
| entry = hash_search(PLy_spi_exceptions, &(edata->sqlerrcode), |
| HASH_FIND, NULL); |
| |
| /* |
| * This could be a custom error code, if that's the case fallback to |
| * SPIError |
| */ |
| exc = entry ? entry->exc : PLy_exc_spi_error; |
| /* Make Python raise the exception */ |
| PLy_spi_exception_set(exc, edata); |
| FreeErrorData(edata); |
| } |
| |
| /* |
| * Raise a SPIError, passing in it more error details, like the |
| * internal query and error position. |
| */ |
| static void |
| PLy_spi_exception_set(PyObject *excclass, ErrorData *edata) |
| { |
| PyObject *args = NULL; |
| PyObject *spierror = NULL; |
| PyObject *spidata = NULL; |
| |
| args = Py_BuildValue("(s)", edata->message); |
| if (!args) |
| goto failure; |
| |
| /* create a new SPI exception with the error message as the parameter */ |
| spierror = PyObject_CallObject(excclass, args); |
| if (!spierror) |
| goto failure; |
| |
| spidata = Py_BuildValue("(izzzizzzzz)", edata->sqlerrcode, edata->detail, edata->hint, |
| edata->internalquery, edata->internalpos, |
| edata->schema_name, edata->table_name, edata->column_name, |
| edata->datatype_name, edata->constraint_name); |
| if (!spidata) |
| goto failure; |
| |
| if (PyObject_SetAttrString(spierror, "spidata", spidata) == -1) |
| goto failure; |
| |
| PyErr_SetObject(excclass, spierror); |
| |
| Py_DECREF(args); |
| Py_DECREF(spierror); |
| Py_DECREF(spidata); |
| return; |
| |
| failure: |
| Py_XDECREF(args); |
| Py_XDECREF(spierror); |
| Py_XDECREF(spidata); |
| elog(ERROR, "could not convert SPI error to Python exception"); |
| } |