| /*------------------------------------------------------------------------- |
| * |
| * nodeTableFunction.c |
| * Support routines for scans of enhanced table functions. |
| * |
| * DESCRIPTION |
| * |
| * This code is distinct from ExecFunctionScan due to the nature of |
| * the plans. A plain table function will be called without an input |
| * subquery, whereas the enhanced table function framework allows |
| * table functions operating over table input. |
| * |
| * Normal Table Function Enhanced Table Function |
| * |
| * (out) (out) |
| * | | |
| * (FunctionScan) (TableFunctionScan) |
| * | |
| * (SubqueryScan) |
| * |
| * INTERFACE ROUTINES |
| * ExecTableFunctionScan sequentially scans a relation. |
| * ExecTableFunctionNext retrieve next tuple in sequential order. |
| * ExecInitTableFunctionScan creates and initializes a externalscan node. |
| * ExecEndTableFunctionScan releases any storage allocated. |
| * ExecStopTableFunctionScan closes external resources before EOD. |
| * ExecTableFunctionReScan rescans the relation |
| * |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| *------------------------------------------------------------------------- |
| */ |
| |
| #include "postgres.h" |
| #include "funcapi.h" |
| #include "tablefuncapi.h" |
| |
| #include "cdb/cdbvars.h" |
| #include "executor/executor.h" |
| #include "executor/nodeTableFunction.h" |
| #include "parser/parsetree.h" |
| #include "utils/lsyscache.h" |
| |
| |
| static void setupFunctionArguments(TableFunctionState *node); |
| static TupleTableSlot *TableFunctionNext(TableFunctionState *node); |
| static gpmon_packet_t *GpmonPktFromTableFunctionState(TableFunctionState *node); |
| static void initGpmonPktForTableFunction(Plan *planNode, |
| gpmon_packet_t *gpmon_pkt, |
| EState *estate); |
| |
| /* Private structure forward declared in tablefuncapi.h */ |
| typedef struct AnyTableData |
| { |
| ExprContext *econtext; |
| PlanState *subplan; /* subplan node */ |
| TupleDesc subdesc; /* tuple descriptor of subplan */ |
| JunkFilter *junkfilter; /* for projection of subplan tuple */ |
| } AnyTableData; |
| |
| /* |
| * setupFunctionArguments |
| */ |
| static void |
| setupFunctionArguments(TableFunctionState *node) |
| { |
| ExprContext *econtext = node->ss.ps.ps_ExprContext; |
| int count = 0; |
| int i = 0; |
| ListCell *arg = NULL; |
| bool argDone; |
| |
| /* Evaluate the static function args */ |
| argDone = ExecEvalFuncArgs(&node->fcinfo, |
| node->fcache->args, |
| econtext); |
| |
| /* |
| * We don't allow sets in the arguments of the table function (except for |
| * specific anytable values generated by TableValueExpressions). |
| */ |
| if (argDone != ExprSingleResult) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), |
| errmsg("set-valued function called in context that cannot " |
| "accept a set"))); |
| } |
| |
| /* |
| * Identify the AnyTable Arguement of the function and swap it with the |
| * AnyTableData for our subplan. |
| */ |
| foreach(arg, node->fcache->args) |
| { |
| ExprState *argstate = (ExprState *) lfirst(arg); |
| |
| if (IsA(argstate->expr, TableValueExpr)) |
| { |
| node->fcinfo.arg[i] = AnyTableGetDatum(node->inputscan); |
| node->fcinfo.argnull[i] = false; |
| count++; |
| } |
| i++; |
| } |
| |
| /* |
| * Currently we don't allow table functions with more than one |
| * table value expression arguments, and if they don't have at |
| * least one they will be planned as nodeFunctionScan instead |
| * of nodeTableFunctionScan. Therefore we should have found |
| * exactly 1 TableValueExpr above. |
| */ |
| if (count != 1) |
| { |
| elog(ERROR, |
| "table functions over multiple TABLE value expressions " |
| "not yet supported"); /* not feasible */ |
| } |
| } |
| |
| /* |
| * TableFunctionNext - ExecScan callback function for table funciton scans |
| */ |
| static TupleTableSlot * |
| TableFunctionNext(TableFunctionState *node) |
| { |
| MemoryContext oldcontext = NULL; |
| TupleTableSlot *slot = NULL; |
| ExprContext *econtext = node->ss.ps.ps_ExprContext; |
| FuncExprState *fcache = node->fcache; |
| bool returns_set = fcache->func.fn_retset; |
| HeapTuple tuple = NULL; |
| TupleDesc resultdesc = node->resultdesc; |
| Datum user_result; |
| |
| /* Clean up any per-tuple memory */ |
| ResetExprContext(econtext); |
| |
| /* |
| * If all results have been returned by the callback function then |
| * we are done. |
| */ |
| if (node->rsinfo.isDone == ExprEndResult) |
| return slot; /* empty slot */ |
| |
| /* Invoke the user supplied function */ |
| node->fcinfo.isnull = false; |
| oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); |
| user_result = FunctionCallInvoke(&node->fcinfo); |
| MemoryContextSwitchTo(oldcontext); |
| if (node->rsinfo.returnMode != SFRM_ValuePerCall) |
| { |
| /* FIXME: should support both protocols */ |
| ereport(ERROR, |
| (errcode(ERRCODE_E_R_I_E_SRF_PROTOCOL_VIOLATED), |
| errmsg("table functions must use SFRM_ValuePerCall protocol"))); |
| } |
| if (node->rsinfo.isDone == ExprEndResult) |
| return slot; /* empty slot */ |
| |
| /* Mark this the last value if the func doesn't return a set */ |
| if (!returns_set || node->rsinfo.isDone == ExprSingleResult) |
| node->rsinfo.isDone = ExprEndResult; |
| |
| /* This would only error if the user violated the SRF calling convensions */ |
| if (returns_set && node->fcinfo.isnull) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED), |
| errmsg("function returning set of rows cannot return null value"))); |
| } |
| |
| /* Convert the Datum into tuple and store it into the scan slot */ |
| if (node->is_rowtype) |
| { |
| HeapTupleHeader th; |
| |
| if (node->fcinfo.isnull) |
| { |
| int i; |
| Datum values[MaxTupleAttributeNumber]; |
| bool nulls[MaxTupleAttributeNumber]; |
| |
| Insist(!returns_set); /* checked above */ |
| Insist(resultdesc->natts <= MaxTupleAttributeNumber); |
| for (i = 0; i < resultdesc->natts; i++) |
| nulls[i] = true; |
| |
| /* |
| * If we get a clean solution to the tuple allocation below we |
| * can use it here as well. This is less an issue because there |
| * is only a single tuple in this case, so the overhead of a |
| * single palloc is not a big deal. |
| */ |
| oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); |
| tuple = heap_form_tuple(resultdesc, values, nulls); |
| MemoryContextSwitchTo(oldcontext); |
| } |
| else |
| { |
| |
| /* Convert returned HeapTupleHeader into a HeapTuple */ |
| th = DatumGetHeapTupleHeader(user_result); |
| tuple = &node->tuple; |
| |
| ItemPointerSetInvalid(&(tuple->t_self)); |
| tuple->t_len = HeapTupleHeaderGetDatumLength(th); |
| tuple->t_data = th; |
| |
| /* Double check that this tuple is of the expected form */ |
| if (resultdesc->tdtypeid != HeapTupleHeaderGetTypeId(th)) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_E_R_I_E_SRF_PROTOCOL_VIOLATED), |
| errmsg("invalid tuple returned from table function"), |
| errdetail("Returned tuple does not match output " |
| "tuple descriptor."))); |
| } |
| } |
| } |
| else |
| { |
| /* |
| * TODO: This will allocate memory for each tuple, we should be able |
| * to get away with fewer pallocs. |
| */ |
| oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); |
| tuple = heap_form_tuple(resultdesc, |
| &user_result, |
| &node->fcinfo.isnull); |
| MemoryContextSwitchTo(oldcontext); |
| } |
| |
| /* |
| * Store the tuple into the scan slot. |
| * |
| * Note: Tuple should be allocated in the per-row memory context, so they |
| * will be freed automatically when the context is freed, we cannot free |
| * them again here. |
| */ |
| Assert(tuple); |
| slot = ExecStoreHeapTuple(tuple, |
| node->ss.ss_ScanTupleSlot, |
| InvalidBuffer, |
| false /* shouldFree */); |
| Assert(!TupIsNull(slot)); |
| |
| node->ss.ss_ScanTupleSlot = slot; |
| |
| /* Update gpmon statistics */ |
| if (!TupIsNull(slot)) |
| { |
| Gpmon_M_Incr_Rows_Out(GpmonPktFromTableFunctionState(node)); |
| CheckSendPlanStateGpmonPkt(&node->ss.ps); |
| } |
| |
| return slot; |
| } |
| |
| /* |
| * ExecTableFunction - wrapper around TableFunctionNext |
| */ |
| TupleTableSlot * |
| ExecTableFunction(TableFunctionState *node) |
| { |
| /* Setup arguments on the first call */ |
| if (node->is_firstcall) |
| { |
| setupFunctionArguments(node); |
| node->is_firstcall = false; |
| } |
| |
| return ExecScan(&node->ss, (ExecScanAccessMtd) TableFunctionNext); |
| } |
| |
| |
| /* |
| * ExecInitTableFunction - Setup the table function executor |
| */ |
| TableFunctionState * |
| ExecInitTableFunction(TableFunctionScan *node, EState *estate, int eflags) |
| { |
| TableFunctionState *scanstate; |
| PlanState *subplan; |
| RangeTblEntry *rte; |
| Oid funcrettype; |
| TypeFuncClass functypclass; |
| FuncExpr *func; |
| ExprContext *econtext; |
| TupleDesc inputdesc = NULL; |
| TupleDesc resultdesc = NULL; |
| |
| /* Inner plan is not used, outer plan must be present */ |
| Assert(innerPlan(node) == NULL); |
| Assert(outerPlan(node) != NULL); |
| |
| /* Forward scan only */ |
| Assert(!(eflags & (EXEC_FLAG_MARK | EXEC_FLAG_BACKWARD))); |
| |
| /* |
| * Create state structure. |
| */ |
| scanstate = makeNode(TableFunctionState); |
| scanstate->ss.ps.plan = (Plan *)node; |
| scanstate->ss.ps.state = estate; |
| scanstate->inputscan = palloc0(sizeof(AnyTableData)); |
| scanstate->is_firstcall = true; |
| |
| /* Create expression context for the node. */ |
| ExecAssignExprContext(estate, &scanstate->ss.ps); |
| ExecInitResultTupleSlot(estate, &scanstate->ss.ps); |
| ExecInitScanTupleSlot(estate, &scanstate->ss); |
| econtext = scanstate->ss.ps.ps_ExprContext; |
| |
| /* Initialize child expressions */ |
| scanstate->ss.ps.targetlist = (List *) |
| ExecInitExpr((Expr *)node->scan.plan.targetlist, |
| (PlanState *)scanstate); |
| scanstate->ss.ps.qual = (List *) |
| ExecInitExpr((Expr *)node->scan.plan.qual, |
| (PlanState *)scanstate); |
| |
| /* Initialize child nodes */ |
| outerPlanState(scanstate) = ExecInitNode(outerPlan(node), estate, eflags); |
| subplan = outerPlanState(scanstate); |
| inputdesc = CreateTupleDescCopy(ExecGetResultType(subplan)); |
| |
| /* get info about the function */ |
| rte = rt_fetch(node->scan.scanrelid, estate->es_range_table); |
| Insist(rte->rtekind == RTE_TABLEFUNCTION); |
| |
| /* |
| * The funcexpr must be a function call. This check is to verify that |
| * the planner didn't try to perform constant folding or other inlining |
| * on a function invoked as a table function. |
| */ |
| if (!rte->funcexpr || !IsA(rte->funcexpr, FuncExpr)) |
| { |
| /* should not be possible */ |
| elog(ERROR, "table function expression is not a function expression"); |
| } |
| func = (FuncExpr *) rte->funcexpr; |
| functypclass = get_expr_result_type((Node*) func, &funcrettype, &resultdesc); |
| |
| switch (functypclass) |
| { |
| case TYPEFUNC_COMPOSITE: |
| { |
| /* Composite data type: copy the typcache entry for safety */ |
| Assert(resultdesc); |
| resultdesc = CreateTupleDescCopy(resultdesc); |
| scanstate->is_rowtype = true; |
| break; |
| } |
| |
| case TYPEFUNC_RECORD: |
| { |
| /* Record data type: Construct tuple desc based on rangeTable */ |
| resultdesc = BuildDescFromLists(rte->eref->colnames, |
| rte->funccoltypes, |
| rte->funccoltypmods); |
| scanstate->is_rowtype = true; |
| break; |
| } |
| |
| case TYPEFUNC_SCALAR: |
| { |
| /* Scalar data type: Construct a tuple descriptor manually */ |
| char *attname = strVal(linitial(rte->eref->colnames)); |
| |
| resultdesc = CreateTemplateTupleDesc(1, false); |
| TupleDescInitEntry(resultdesc, |
| (AttrNumber) 1, |
| attname, |
| funcrettype, |
| -1, |
| 0); |
| scanstate->is_rowtype = false; |
| break; |
| } |
| |
| default: |
| { |
| /* This should not be possible, it should be caught by parser. */ |
| elog(ERROR, "table function has unsupported return type"); |
| } |
| } |
| |
| /* |
| * For RECORD results, make sure a typmod has been assigned. (The |
| * function should do this for itself, but let's cover things in case it |
| * doesn't.) |
| */ |
| BlessTupleDesc(resultdesc); |
| scanstate->resultdesc = resultdesc; |
| ExecAssignScanType(&scanstate->ss, resultdesc); |
| |
| /* Other node-specific setup */ |
| scanstate->fcache = (FuncExprState*) |
| ExecInitExpr((Expr *) rte->funcexpr, (PlanState *) scanstate); |
| Assert(scanstate->fcache && IsA(scanstate->fcache, FuncExprState)); |
| |
| scanstate->rsinfo.type = (fmNodeTag) T_ReturnSetInfo; |
| scanstate->rsinfo.econtext = econtext; |
| scanstate->rsinfo.expectedDesc = resultdesc; |
| scanstate->rsinfo.allowedModes = (int) (SFRM_ValuePerCall); |
| scanstate->rsinfo.returnMode = (int) (SFRM_ValuePerCall); |
| scanstate->rsinfo.isDone = ExprSingleResult; |
| scanstate->rsinfo.setResult = NULL; |
| scanstate->rsinfo.setDesc = NULL; |
| |
| scanstate->userdata = rte->funcuserdata; |
| /* Initialize a function cache for the function expression */ |
| init_fcache(func->funcid, scanstate->fcache, |
| econtext->ecxt_per_query_memory, |
| true); |
| |
| /* Initialize the function call info */ |
| InitFunctionCallInfoData(scanstate->fcinfo, /* Fcinfo */ |
| &(scanstate->fcache->func), /* Flinfo */ |
| 0, /* Nargs */ |
| (Node*) scanstate, /* Context */ |
| (Node*) &(scanstate->rsinfo)); /* ResultInfo */ |
| |
| /* setup the AnyTable input */ |
| scanstate->inputscan->econtext = econtext; |
| scanstate->inputscan->subplan = subplan; |
| scanstate->inputscan->subdesc = inputdesc; |
| |
| /* Determine projection information for subplan */ |
| TupleDesc cleanTupType = ExecCleanTypeFromTL(subplan->plan->targetlist, |
| false /* hasoid */); |
| |
| scanstate->inputscan->junkfilter = |
| ExecInitJunkFilter(subplan->plan->targetlist, |
| cleanTupType, |
| NULL /* slot */); |
| BlessTupleDesc(scanstate->inputscan->junkfilter->jf_cleanTupType); |
| |
| /* Initialize result tuple type and projection info */ |
| ExecAssignResultTypeFromTL(&scanstate->ss.ps); |
| ExecAssignScanProjectionInfo(&scanstate->ss); |
| |
| initGpmonPktForTableFunction((Plan *)node, |
| &scanstate->ss.ps.gpmon_pkt, estate); |
| |
| return scanstate; |
| } |
| |
| int |
| ExecCountSlotsTableFunction(TableFunctionScan *node) |
| { |
| return ExecCountSlotsNode(outerPlan(node)) + 2; |
| } |
| |
| void |
| ExecEndTableFunction(TableFunctionState *node) |
| { |
| /* Free the ExprContext */ |
| ExecFreeExprContext(&node->ss.ps); |
| |
| /* Clean out the tuple table */ |
| ExecClearTuple(node->ss.ps.ps_ResultTupleSlot); |
| |
| /* End the subplans */ |
| ExecEndNode(outerPlanState(node)); |
| |
| EndPlanStateGpmonPkt(&node->ss.ps); |
| } |
| |
| void |
| ExecReScanTableFunction(TableFunctionState *node, ExprContext *exprCtxt) |
| { |
| /* TableFunction Planner marks TableFunction nodes as not rescannable */ |
| elog(ERROR, "invalid rescan of TableFunctionScan"); |
| } |
| |
| |
| void |
| initGpmonPktForTableFunction(Plan *planNode, |
| gpmon_packet_t *gpmon_pkt, |
| EState *estate) |
| { |
| Assert(planNode != NULL); |
| Assert(gpmon_pkt != NULL); |
| Assert(IsA(planNode, TableFunctionScan)); |
| |
| InitPlanNodeGpmonPkt(planNode, gpmon_pkt, estate, PMNT_TableFunctionScan, |
| (int64) planNode->plan_rows, NULL); |
| } |
| |
| gpmon_packet_t * |
| GpmonPktFromTableFunctionState(TableFunctionState *node) |
| { |
| return &node->ss.ps.gpmon_pkt; |
| } |
| |
| |
| /* Callback functions exposed to the user */ |
| TupleDesc |
| AnyTable_GetTupleDesc(AnyTable t) |
| { |
| if (t == NULL) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
| errmsg("invalid null value for anytable type"))); |
| } |
| Insist(t->junkfilter && IsA(t->junkfilter, JunkFilter)); |
| |
| /* Return the projected tuple descriptor */ |
| return t->junkfilter->jf_cleanTupType; |
| } |
| |
| HeapTuple |
| AnyTable_GetNextTuple(AnyTable t) |
| { |
| MemoryContext oldcontext; |
| |
| if (t == NULL) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_INVALID_PARAMETER_VALUE), |
| errmsg("invalid null value for anytable type"))); |
| } |
| |
| /* Fetch the next tuple into the tuple slot */ |
| oldcontext = MemoryContextSwitchTo(t->econtext->ecxt_per_query_memory); |
| t->econtext->ecxt_outertuple = ExecProcNode(t->subplan); |
| MemoryContextSwitchTo(oldcontext); |
| if (TupIsNull(t->econtext->ecxt_outertuple)) |
| { |
| return (HeapTuple) NULL; |
| } |
| |
| /* ---------------------------------------- |
| * 1) Fetch the tuple from the tuple slot |
| * 2) apply resjunk filtering |
| * 3) copy result into a HeapTuple |
| * ---------------------------------------- |
| */ |
| return ExecRemoveJunk(t->junkfilter, t->econtext->ecxt_outertuple); |
| } |
| |
| /* |
| * tf_set_userdata_internal |
| * This is the entity of TF_SET_USERDATA() API. Sets bytea datum to |
| * RangeTblEntry, which is transported to project function via serialized |
| * plan tree. |
| */ |
| void |
| tf_set_userdata_internal(FunctionCallInfo fcinfo, bytea *userdata) |
| { |
| if (!fcinfo->context || !IsA(fcinfo->context, RangeTblEntry)) |
| ereport(ERROR, |
| (errcode(ERRCODE_WRONG_OBJECT_TYPE), |
| errmsg("expected RangeTblEntry node, found %d", |
| fcinfo->context ? nodeTag(fcinfo->context) : 0))); |
| |
| /* Make sure it gets detoasted, but packed is allowed */ |
| ((RangeTblEntry *) fcinfo->context)->funcuserdata = |
| userdata ? pg_detoast_datum_packed(userdata) : NULL; |
| } |
| |
| /* |
| * tf_get_userdata_internal |
| * This is the entity of TF_GET_USERDATA() API. Extracts userdata from |
| * its scan node which was transported via serialized plan tree. |
| */ |
| bytea * |
| tf_get_userdata_internal(FunctionCallInfo fcinfo) |
| { |
| bytea *userdata; |
| |
| if (!fcinfo->context || !IsA(fcinfo->context, TableFunctionState)) |
| ereport(ERROR, |
| (errcode(ERRCODE_WRONG_OBJECT_TYPE), |
| errmsg("expected TableFunctionState node, found %d", |
| fcinfo->context ? nodeTag(fcinfo->context) : 0))); |
| |
| userdata = ((TableFunctionState *) fcinfo->context)->userdata; |
| if (!userdata) |
| return NULL; |
| |
| /* unpack, just in case */ |
| return pg_detoast_datum(userdata); |
| } |