blob: a53dba21c1648585044f572989f1d40b13f84c67 [file] [log] [blame]
/*-------------------------------------------------------------------------
*
* nodeTableFunction.c
* Support routines for scans of enhanced table functions.
*
* DESCRIPTION
*
* This code is distinct from ExecFunctionScan due to the nature of
* the plans. A plain table function will be called without an input
* subquery, whereas the enhanced table function framework allows
* table functions operating over table input.
*
* Normal Table Function Enhanced Table Function
*
* (out) (out)
* | |
* (FunctionScan) (TableFunctionScan)
* |
* (SubqueryScan)
*
* INTERFACE ROUTINES
* ExecTableFunctionScan sequentially scans a relation.
* ExecTableFunctionNext retrieve next tuple in sequential order.
* ExecInitTableFunctionScan creates and initializes a externalscan node.
* ExecEndTableFunctionScan releases any storage allocated.
* ExecStopTableFunctionScan closes external resources before EOD.
* ExecTableFunctionReScan rescans the relation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "funcapi.h"
#include "tablefuncapi.h"
#include "cdb/cdbvars.h"
#include "executor/executor.h"
#include "executor/nodeTableFunction.h"
#include "parser/parsetree.h"
#include "utils/lsyscache.h"
static void setupFunctionArguments(TableFunctionState *node);
static TupleTableSlot *TableFunctionNext(TableFunctionState *node);
static gpmon_packet_t *GpmonPktFromTableFunctionState(TableFunctionState *node);
static void initGpmonPktForTableFunction(Plan *planNode,
gpmon_packet_t *gpmon_pkt,
EState *estate);
/* Private structure forward declared in tablefuncapi.h */
typedef struct AnyTableData
{
ExprContext *econtext;
PlanState *subplan; /* subplan node */
TupleDesc subdesc; /* tuple descriptor of subplan */
JunkFilter *junkfilter; /* for projection of subplan tuple */
} AnyTableData;
/*
* setupFunctionArguments
*/
static void
setupFunctionArguments(TableFunctionState *node)
{
ExprContext *econtext = node->ss.ps.ps_ExprContext;
int count = 0;
int i = 0;
ListCell *arg = NULL;
bool argDone;
/* Evaluate the static function args */
argDone = ExecEvalFuncArgs(&node->fcinfo,
node->fcache->args,
econtext);
/*
* We don't allow sets in the arguments of the table function (except for
* specific anytable values generated by TableValueExpressions).
*/
if (argDone != ExprSingleResult)
{
ereport(ERROR,
(errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
errmsg("set-valued function called in context that cannot "
"accept a set")));
}
/*
* Identify the AnyTable Arguement of the function and swap it with the
* AnyTableData for our subplan.
*/
foreach(arg, node->fcache->args)
{
ExprState *argstate = (ExprState *) lfirst(arg);
if (IsA(argstate->expr, TableValueExpr))
{
node->fcinfo.arg[i] = AnyTableGetDatum(node->inputscan);
node->fcinfo.argnull[i] = false;
count++;
}
i++;
}
/*
* Currently we don't allow table functions with more than one
* table value expression arguments, and if they don't have at
* least one they will be planned as nodeFunctionScan instead
* of nodeTableFunctionScan. Therefore we should have found
* exactly 1 TableValueExpr above.
*/
if (count != 1)
{
elog(ERROR,
"table functions over multiple TABLE value expressions "
"not yet supported"); /* not feasible */
}
}
/*
* TableFunctionNext - ExecScan callback function for table funciton scans
*/
static TupleTableSlot *
TableFunctionNext(TableFunctionState *node)
{
MemoryContext oldcontext = NULL;
TupleTableSlot *slot = NULL;
ExprContext *econtext = node->ss.ps.ps_ExprContext;
FuncExprState *fcache = node->fcache;
bool returns_set = fcache->func.fn_retset;
HeapTuple tuple = NULL;
TupleDesc resultdesc = node->resultdesc;
Datum user_result;
/* Clean up any per-tuple memory */
ResetExprContext(econtext);
/*
* If all results have been returned by the callback function then
* we are done.
*/
if (node->rsinfo.isDone == ExprEndResult)
return slot; /* empty slot */
/* Invoke the user supplied function */
node->fcinfo.isnull = false;
oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
user_result = FunctionCallInvoke(&node->fcinfo);
MemoryContextSwitchTo(oldcontext);
if (node->rsinfo.returnMode != SFRM_ValuePerCall)
{
/* FIXME: should support both protocols */
ereport(ERROR,
(errcode(ERRCODE_E_R_I_E_SRF_PROTOCOL_VIOLATED),
errmsg("table functions must use SFRM_ValuePerCall protocol")));
}
if (node->rsinfo.isDone == ExprEndResult)
return slot; /* empty slot */
/* Mark this the last value if the func doesn't return a set */
if (!returns_set || node->rsinfo.isDone == ExprSingleResult)
node->rsinfo.isDone = ExprEndResult;
/* This would only error if the user violated the SRF calling convensions */
if (returns_set && node->fcinfo.isnull)
{
ereport(ERROR,
(errcode(ERRCODE_NULL_VALUE_NOT_ALLOWED),
errmsg("function returning set of rows cannot return null value")));
}
/* Convert the Datum into tuple and store it into the scan slot */
if (node->is_rowtype)
{
HeapTupleHeader th;
if (node->fcinfo.isnull)
{
int i;
Datum values[MaxTupleAttributeNumber];
bool nulls[MaxTupleAttributeNumber];
Insist(!returns_set); /* checked above */
Insist(resultdesc->natts <= MaxTupleAttributeNumber);
for (i = 0; i < resultdesc->natts; i++)
nulls[i] = true;
/*
* If we get a clean solution to the tuple allocation below we
* can use it here as well. This is less an issue because there
* is only a single tuple in this case, so the overhead of a
* single palloc is not a big deal.
*/
oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
tuple = heap_form_tuple(resultdesc, values, nulls);
MemoryContextSwitchTo(oldcontext);
}
else
{
/* Convert returned HeapTupleHeader into a HeapTuple */
th = DatumGetHeapTupleHeader(user_result);
tuple = &node->tuple;
ItemPointerSetInvalid(&(tuple->t_self));
tuple->t_len = HeapTupleHeaderGetDatumLength(th);
tuple->t_data = th;
/* Double check that this tuple is of the expected form */
if (resultdesc->tdtypeid != HeapTupleHeaderGetTypeId(th))
{
ereport(ERROR,
(errcode(ERRCODE_E_R_I_E_SRF_PROTOCOL_VIOLATED),
errmsg("invalid tuple returned from table function"),
errdetail("Returned tuple does not match output "
"tuple descriptor.")));
}
}
}
else
{
/*
* TODO: This will allocate memory for each tuple, we should be able
* to get away with fewer pallocs.
*/
oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
tuple = heap_form_tuple(resultdesc,
&user_result,
&node->fcinfo.isnull);
MemoryContextSwitchTo(oldcontext);
}
/*
* Store the tuple into the scan slot.
*
* Note: Tuple should be allocated in the per-row memory context, so they
* will be freed automatically when the context is freed, we cannot free
* them again here.
*/
Assert(tuple);
slot = ExecStoreHeapTuple(tuple,
node->ss.ss_ScanTupleSlot,
InvalidBuffer,
false /* shouldFree */);
Assert(!TupIsNull(slot));
node->ss.ss_ScanTupleSlot = slot;
/* Update gpmon statistics */
if (!TupIsNull(slot))
{
Gpmon_M_Incr_Rows_Out(GpmonPktFromTableFunctionState(node));
CheckSendPlanStateGpmonPkt(&node->ss.ps);
}
return slot;
}
/*
* ExecTableFunction - wrapper around TableFunctionNext
*/
TupleTableSlot *
ExecTableFunction(TableFunctionState *node)
{
/* Setup arguments on the first call */
if (node->is_firstcall)
{
setupFunctionArguments(node);
node->is_firstcall = false;
}
return ExecScan(&node->ss, (ExecScanAccessMtd) TableFunctionNext);
}
/*
* ExecInitTableFunction - Setup the table function executor
*/
TableFunctionState *
ExecInitTableFunction(TableFunctionScan *node, EState *estate, int eflags)
{
TableFunctionState *scanstate;
PlanState *subplan;
RangeTblEntry *rte;
Oid funcrettype;
TypeFuncClass functypclass;
FuncExpr *func;
ExprContext *econtext;
TupleDesc inputdesc = NULL;
TupleDesc resultdesc = NULL;
/* Inner plan is not used, outer plan must be present */
Assert(innerPlan(node) == NULL);
Assert(outerPlan(node) != NULL);
/* Forward scan only */
Assert(!(eflags & (EXEC_FLAG_MARK | EXEC_FLAG_BACKWARD)));
/*
* Create state structure.
*/
scanstate = makeNode(TableFunctionState);
scanstate->ss.ps.plan = (Plan *)node;
scanstate->ss.ps.state = estate;
scanstate->inputscan = palloc0(sizeof(AnyTableData));
scanstate->is_firstcall = true;
/* Create expression context for the node. */
ExecAssignExprContext(estate, &scanstate->ss.ps);
ExecInitResultTupleSlot(estate, &scanstate->ss.ps);
ExecInitScanTupleSlot(estate, &scanstate->ss);
econtext = scanstate->ss.ps.ps_ExprContext;
/* Initialize child expressions */
scanstate->ss.ps.targetlist = (List *)
ExecInitExpr((Expr *)node->scan.plan.targetlist,
(PlanState *)scanstate);
scanstate->ss.ps.qual = (List *)
ExecInitExpr((Expr *)node->scan.plan.qual,
(PlanState *)scanstate);
/* Initialize child nodes */
outerPlanState(scanstate) = ExecInitNode(outerPlan(node), estate, eflags);
subplan = outerPlanState(scanstate);
inputdesc = CreateTupleDescCopy(ExecGetResultType(subplan));
/* get info about the function */
rte = rt_fetch(node->scan.scanrelid, estate->es_range_table);
Insist(rte->rtekind == RTE_TABLEFUNCTION);
/*
* The funcexpr must be a function call. This check is to verify that
* the planner didn't try to perform constant folding or other inlining
* on a function invoked as a table function.
*/
if (!rte->funcexpr || !IsA(rte->funcexpr, FuncExpr))
{
/* should not be possible */
elog(ERROR, "table function expression is not a function expression");
}
func = (FuncExpr *) rte->funcexpr;
functypclass = get_expr_result_type((Node*) func, &funcrettype, &resultdesc);
switch (functypclass)
{
case TYPEFUNC_COMPOSITE:
{
/* Composite data type: copy the typcache entry for safety */
Assert(resultdesc);
resultdesc = CreateTupleDescCopy(resultdesc);
scanstate->is_rowtype = true;
break;
}
case TYPEFUNC_RECORD:
{
/* Record data type: Construct tuple desc based on rangeTable */
resultdesc = BuildDescFromLists(rte->eref->colnames,
rte->funccoltypes,
rte->funccoltypmods);
scanstate->is_rowtype = true;
break;
}
case TYPEFUNC_SCALAR:
{
/* Scalar data type: Construct a tuple descriptor manually */
char *attname = strVal(linitial(rte->eref->colnames));
resultdesc = CreateTemplateTupleDesc(1, false);
TupleDescInitEntry(resultdesc,
(AttrNumber) 1,
attname,
funcrettype,
-1,
0);
scanstate->is_rowtype = false;
break;
}
default:
{
/* This should not be possible, it should be caught by parser. */
elog(ERROR, "table function has unsupported return type");
}
}
/*
* For RECORD results, make sure a typmod has been assigned. (The
* function should do this for itself, but let's cover things in case it
* doesn't.)
*/
BlessTupleDesc(resultdesc);
scanstate->resultdesc = resultdesc;
ExecAssignScanType(&scanstate->ss, resultdesc);
/* Other node-specific setup */
scanstate->fcache = (FuncExprState*)
ExecInitExpr((Expr *) rte->funcexpr, (PlanState *) scanstate);
Assert(scanstate->fcache && IsA(scanstate->fcache, FuncExprState));
scanstate->rsinfo.type = (fmNodeTag) T_ReturnSetInfo;
scanstate->rsinfo.econtext = econtext;
scanstate->rsinfo.expectedDesc = resultdesc;
scanstate->rsinfo.allowedModes = (int) (SFRM_ValuePerCall);
scanstate->rsinfo.returnMode = (int) (SFRM_ValuePerCall);
scanstate->rsinfo.isDone = ExprSingleResult;
scanstate->rsinfo.setResult = NULL;
scanstate->rsinfo.setDesc = NULL;
scanstate->userdata = rte->funcuserdata;
/* Initialize a function cache for the function expression */
init_fcache(func->funcid, scanstate->fcache,
econtext->ecxt_per_query_memory,
true);
/* Initialize the function call info */
InitFunctionCallInfoData(scanstate->fcinfo, /* Fcinfo */
&(scanstate->fcache->func), /* Flinfo */
0, /* Nargs */
(Node*) scanstate, /* Context */
(Node*) &(scanstate->rsinfo)); /* ResultInfo */
/* setup the AnyTable input */
scanstate->inputscan->econtext = econtext;
scanstate->inputscan->subplan = subplan;
scanstate->inputscan->subdesc = inputdesc;
/* Determine projection information for subplan */
TupleDesc cleanTupType = ExecCleanTypeFromTL(subplan->plan->targetlist,
false /* hasoid */);
scanstate->inputscan->junkfilter =
ExecInitJunkFilter(subplan->plan->targetlist,
cleanTupType,
NULL /* slot */);
BlessTupleDesc(scanstate->inputscan->junkfilter->jf_cleanTupType);
/* Initialize result tuple type and projection info */
ExecAssignResultTypeFromTL(&scanstate->ss.ps);
ExecAssignScanProjectionInfo(&scanstate->ss);
initGpmonPktForTableFunction((Plan *)node,
&scanstate->ss.ps.gpmon_pkt, estate);
return scanstate;
}
int
ExecCountSlotsTableFunction(TableFunctionScan *node)
{
return ExecCountSlotsNode(outerPlan(node)) + 2;
}
void
ExecEndTableFunction(TableFunctionState *node)
{
/* Free the ExprContext */
ExecFreeExprContext(&node->ss.ps);
/* Clean out the tuple table */
ExecClearTuple(node->ss.ps.ps_ResultTupleSlot);
/* End the subplans */
ExecEndNode(outerPlanState(node));
EndPlanStateGpmonPkt(&node->ss.ps);
}
void
ExecReScanTableFunction(TableFunctionState *node, ExprContext *exprCtxt)
{
/* TableFunction Planner marks TableFunction nodes as not rescannable */
elog(ERROR, "invalid rescan of TableFunctionScan");
}
void
initGpmonPktForTableFunction(Plan *planNode,
gpmon_packet_t *gpmon_pkt,
EState *estate)
{
Assert(planNode != NULL);
Assert(gpmon_pkt != NULL);
Assert(IsA(planNode, TableFunctionScan));
InitPlanNodeGpmonPkt(planNode, gpmon_pkt, estate, PMNT_TableFunctionScan,
(int64) planNode->plan_rows, NULL);
}
gpmon_packet_t *
GpmonPktFromTableFunctionState(TableFunctionState *node)
{
return &node->ss.ps.gpmon_pkt;
}
/* Callback functions exposed to the user */
TupleDesc
AnyTable_GetTupleDesc(AnyTable t)
{
if (t == NULL)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid null value for anytable type")));
}
Insist(t->junkfilter && IsA(t->junkfilter, JunkFilter));
/* Return the projected tuple descriptor */
return t->junkfilter->jf_cleanTupType;
}
HeapTuple
AnyTable_GetNextTuple(AnyTable t)
{
MemoryContext oldcontext;
if (t == NULL)
{
ereport(ERROR,
(errcode(ERRCODE_INVALID_PARAMETER_VALUE),
errmsg("invalid null value for anytable type")));
}
/* Fetch the next tuple into the tuple slot */
oldcontext = MemoryContextSwitchTo(t->econtext->ecxt_per_query_memory);
t->econtext->ecxt_outertuple = ExecProcNode(t->subplan);
MemoryContextSwitchTo(oldcontext);
if (TupIsNull(t->econtext->ecxt_outertuple))
{
return (HeapTuple) NULL;
}
/* ----------------------------------------
* 1) Fetch the tuple from the tuple slot
* 2) apply resjunk filtering
* 3) copy result into a HeapTuple
* ----------------------------------------
*/
return ExecRemoveJunk(t->junkfilter, t->econtext->ecxt_outertuple);
}
/*
* tf_set_userdata_internal
* This is the entity of TF_SET_USERDATA() API. Sets bytea datum to
* RangeTblEntry, which is transported to project function via serialized
* plan tree.
*/
void
tf_set_userdata_internal(FunctionCallInfo fcinfo, bytea *userdata)
{
if (!fcinfo->context || !IsA(fcinfo->context, RangeTblEntry))
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("expected RangeTblEntry node, found %d",
fcinfo->context ? nodeTag(fcinfo->context) : 0)));
/* Make sure it gets detoasted, but packed is allowed */
((RangeTblEntry *) fcinfo->context)->funcuserdata =
userdata ? pg_detoast_datum_packed(userdata) : NULL;
}
/*
* tf_get_userdata_internal
* This is the entity of TF_GET_USERDATA() API. Extracts userdata from
* its scan node which was transported via serialized plan tree.
*/
bytea *
tf_get_userdata_internal(FunctionCallInfo fcinfo)
{
bytea *userdata;
if (!fcinfo->context || !IsA(fcinfo->context, TableFunctionState))
ereport(ERROR,
(errcode(ERRCODE_WRONG_OBJECT_TYPE),
errmsg("expected TableFunctionState node, found %d",
fcinfo->context ? nodeTag(fcinfo->context) : 0)));
userdata = ((TableFunctionState *) fcinfo->context)->userdata;
if (!userdata)
return NULL;
/* unpack, just in case */
return pg_detoast_datum(userdata);
}