| /*------------------------------------------------------------------------- |
| * |
| * nodeForeignscan.c |
| * Routines to support scans of foreign tables |
| * |
| * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group |
| * Portions Copyright (c) 1994, Regents of the University of California |
| * |
| * |
| * IDENTIFICATION |
| * src/backend/executor/nodeForeignscan.c |
| * |
| *------------------------------------------------------------------------- |
| */ |
| /* |
| * INTERFACE ROUTINES |
| * |
| * ExecForeignScan scans a foreign table. |
| * ExecInitForeignScan creates and initializes state info. |
| * ExecReScanForeignScan rescans the foreign relation. |
| * ExecEndForeignScan releases any resources allocated. |
| */ |
| #include "postgres.h" |
| |
| #include "executor/executor.h" |
| #include "executor/nodeForeignscan.h" |
| #include "foreign/fdwapi.h" |
| #include "utils/memutils.h" |
| #include "utils/rel.h" |
| |
| static TupleTableSlot *ForeignNext(ForeignScanState *node); |
| static bool ForeignRecheck(ForeignScanState *node, TupleTableSlot *slot); |
| |
| |
| /* ---------------------------------------------------------------- |
| * ForeignNext |
| * |
| * This is a workhorse for ExecForeignScan |
| * ---------------------------------------------------------------- |
| */ |
| static TupleTableSlot * |
| ForeignNext(ForeignScanState *node) |
| { |
| TupleTableSlot *slot; |
| ForeignScan *plan = (ForeignScan *) node->ss.ps.plan; |
| ExprContext *econtext = node->ss.ps.ps_ExprContext; |
| MemoryContext oldcontext; |
| |
| /* Call the Iterate function in short-lived context */ |
| oldcontext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); |
| if (plan->operation != CMD_SELECT) |
| { |
| /* |
| * direct modifications cannot be re-evaluated, so shouldn't get here |
| * during EvalPlanQual processing |
| */ |
| Assert(node->ss.ps.state->es_epq_active == NULL); |
| |
| slot = node->fdwroutine->IterateDirectModify(node); |
| } |
| else |
| slot = node->fdwroutine->IterateForeignScan(node); |
| MemoryContextSwitchTo(oldcontext); |
| |
| /* |
| * Insert valid value into tableoid, the only actually-useful system |
| * column. |
| */ |
| if (plan->fsSystemCol && !TupIsNull(slot)) |
| slot->tts_tableOid = RelationGetRelid(node->ss.ss_currentRelation); |
| |
| return slot; |
| } |
| |
| /* |
| * ForeignRecheck -- access method routine to recheck a tuple in EvalPlanQual |
| */ |
| static bool |
| ForeignRecheck(ForeignScanState *node, TupleTableSlot *slot) |
| { |
| FdwRoutine *fdwroutine = node->fdwroutine; |
| ExprContext *econtext; |
| |
| /* |
| * extract necessary information from foreign scan node |
| */ |
| econtext = node->ss.ps.ps_ExprContext; |
| |
| /* Does the tuple meet the remote qual condition? */ |
| econtext->ecxt_scantuple = slot; |
| |
| ResetExprContext(econtext); |
| |
| /* |
| * If an outer join is pushed down, RecheckForeignScan may need to store a |
| * different tuple in the slot, because a different set of columns may go |
| * to NULL upon recheck. Otherwise, it shouldn't need to change the slot |
| * contents, just return true or false to indicate whether the quals still |
| * pass. For simple cases, setting fdw_recheck_quals may be easier than |
| * providing this callback. |
| */ |
| if (fdwroutine->RecheckForeignScan && |
| !fdwroutine->RecheckForeignScan(node, slot)) |
| return false; |
| |
| return ExecQual(node->fdw_recheck_quals, econtext); |
| } |
| |
| /* ---------------------------------------------------------------- |
| * ExecForeignScan(node) |
| * |
| * Fetches the next tuple from the FDW, checks local quals, and |
| * returns it. |
| * We call the ExecScan() routine and pass it the appropriate |
| * access method functions. |
| * ---------------------------------------------------------------- |
| */ |
| static TupleTableSlot * |
| ExecForeignScan(PlanState *pstate) |
| { |
| ForeignScanState *node = castNode(ForeignScanState, pstate); |
| ForeignScan *plan = (ForeignScan *) node->ss.ps.plan; |
| EState *estate = node->ss.ps.state; |
| |
| /* |
| * Ignore direct modifications when EvalPlanQual is active --- they are |
| * irrelevant for EvalPlanQual rechecking |
| */ |
| if (estate->es_epq_active != NULL && plan->operation != CMD_SELECT) |
| return NULL; |
| |
| return ExecScan(&node->ss, |
| (ExecScanAccessMtd) ForeignNext, |
| (ExecScanRecheckMtd) ForeignRecheck); |
| } |
| |
| |
| /* ---------------------------------------------------------------- |
| * ExecInitForeignScan |
| * ---------------------------------------------------------------- |
| */ |
| ForeignScanState * |
| ExecInitForeignScan(ForeignScan *node, EState *estate, int eflags) |
| { |
| Relation currentRelation = NULL; |
| |
| /* |
| * get the relation object id from the relid'th entry in the range table, |
| * open that relation and acquire appropriate lock on it. |
| */ |
| if (node->scan.scanrelid > 0) |
| currentRelation = ExecOpenScanRelation(estate, node->scan.scanrelid, eflags); |
| |
| return ExecInitForeignScanForPartition(node, estate, eflags, currentRelation); |
| } |
| |
| ForeignScanState * |
| ExecInitForeignScanForPartition(ForeignScan *node, EState *estate, int eflags, Relation currentRelation) |
| { |
| ForeignScanState *scanstate; |
| Index scanrelid = node->scan.scanrelid; |
| int tlistvarno; |
| FdwRoutine *fdwroutine; |
| |
| /* check for unsupported flags */ |
| Assert(!(eflags & (EXEC_FLAG_BACKWARD | EXEC_FLAG_MARK))); |
| |
| /* |
| * create state structure |
| */ |
| scanstate = makeNode(ForeignScanState); |
| scanstate->ss.ps.plan = (Plan *) node; |
| scanstate->ss.ps.state = estate; |
| scanstate->ss.ps.ExecProcNode = ExecForeignScan; |
| |
| /* |
| * Miscellaneous initialization |
| * |
| * create expression context for node |
| */ |
| ExecAssignExprContext(estate, &scanstate->ss.ps); |
| |
| /* |
| * open the scan relation, if any; also acquire function pointers from the |
| * FDW's handler |
| */ |
| if (scanrelid > 0) |
| { |
| scanstate->ss.ss_currentRelation = currentRelation; |
| fdwroutine = GetFdwRoutineForRelation(currentRelation, true); |
| } |
| else |
| { |
| /* We can't use the relcache, so get fdwroutine the hard way */ |
| fdwroutine = GetFdwRoutineByServerId(node->fs_server); |
| } |
| |
| /* |
| * Determine the scan tuple type. If the FDW provided a targetlist |
| * describing the scan tuples, use that; else use base relation's rowtype. |
| */ |
| if (node->fdw_scan_tlist != NIL || currentRelation == NULL) |
| { |
| TupleDesc scan_tupdesc; |
| |
| scan_tupdesc = ExecTypeFromTL(node->fdw_scan_tlist); |
| ExecInitScanTupleSlot(estate, &scanstate->ss, scan_tupdesc, |
| &TTSOpsHeapTuple); |
| /* Node's targetlist will contain Vars with varno = INDEX_VAR */ |
| tlistvarno = INDEX_VAR; |
| } |
| else |
| { |
| TupleDesc scan_tupdesc; |
| |
| /* don't trust FDWs to return tuples fulfilling NOT NULL constraints */ |
| scan_tupdesc = CreateTupleDescCopy(RelationGetDescr(currentRelation)); |
| ExecInitScanTupleSlot(estate, &scanstate->ss, scan_tupdesc, |
| &TTSOpsHeapTuple); |
| /* Node's targetlist will contain Vars with varno = scanrelid */ |
| tlistvarno = scanrelid; |
| } |
| |
| /* Don't know what an FDW might return */ |
| scanstate->ss.ps.scanopsfixed = false; |
| scanstate->ss.ps.scanopsset = true; |
| |
| /* |
| * Initialize result slot, type and projection. |
| */ |
| ExecInitResultTypeTL(&scanstate->ss.ps); |
| ExecAssignScanProjectionInfoWithVarno(&scanstate->ss, tlistvarno); |
| |
| /* |
| * initialize child expressions |
| */ |
| scanstate->ss.ps.qual = |
| ExecInitQual(node->scan.plan.qual, (PlanState *) scanstate); |
| scanstate->fdw_recheck_quals = |
| ExecInitQual(node->fdw_recheck_quals, (PlanState *) scanstate); |
| |
| /* |
| * Determine whether to scan the foreign relation asynchronously or not; |
| * this has to be kept in sync with the code in ExecInitAppend(). |
| */ |
| scanstate->ss.ps.async_capable = (((Plan *) node)->async_capable && |
| estate->es_epq_active == NULL); |
| |
| /* |
| * Initialize FDW-related state. |
| */ |
| scanstate->fdwroutine = fdwroutine; |
| scanstate->fdw_state = NULL; |
| |
| /* |
| * For the FDW's convenience, look up the modification target relation's |
| * ResultRelInfo. The ModifyTable node should have initialized it for us, |
| * see ExecInitModifyTable. |
| * |
| * Don't try to look up the ResultRelInfo when EvalPlanQual is active, |
| * though. Direct modifications cannot be re-evaluated as part of |
| * EvalPlanQual. The lookup wouldn't work anyway because during |
| * EvalPlanQual processing, EvalPlanQual only initializes the subtree |
| * under the ModifyTable, and doesn't run ExecInitModifyTable. |
| */ |
| if (node->resultRelation > 0 && estate->es_epq_active == NULL) |
| { |
| if (estate->es_result_relations == NULL || |
| estate->es_result_relations[node->resultRelation - 1] == NULL) |
| { |
| elog(ERROR, "result relation not initialized"); |
| } |
| scanstate->resultRelInfo = estate->es_result_relations[node->resultRelation - 1]; |
| } |
| |
| /* Initialize any outer plan. */ |
| if (outerPlan(node)) |
| outerPlanState(scanstate) = |
| ExecInitNode(outerPlan(node), estate, eflags); |
| |
| /* |
| * Tell the FDW to initialize the scan. |
| */ |
| if (node->operation != CMD_SELECT) |
| { |
| /* |
| * Direct modifications cannot be re-evaluated by EvalPlanQual, so |
| * don't bother preparing the FDW. |
| * |
| * In case of an inherited UPDATE/DELETE with foreign targets there |
| * can be direct-modify ForeignScan nodes in the EvalPlanQual subtree, |
| * so we need to ignore such ForeignScan nodes during EvalPlanQual |
| * processing. See also ExecForeignScan/ExecReScanForeignScan. |
| */ |
| if (estate->es_epq_active == NULL) |
| fdwroutine->BeginDirectModify(scanstate, eflags); |
| } |
| else |
| fdwroutine->BeginForeignScan(scanstate, eflags); |
| |
| return scanstate; |
| } |
| |
| /* ---------------------------------------------------------------- |
| * ExecEndForeignScan |
| * |
| * frees any storage allocated through C routines. |
| * ---------------------------------------------------------------- |
| */ |
| void |
| ExecEndForeignScan(ForeignScanState *node) |
| { |
| ForeignScan *plan = (ForeignScan *) node->ss.ps.plan; |
| EState *estate = node->ss.ps.state; |
| |
| /* Let the FDW shut down */ |
| if (plan->operation != CMD_SELECT) |
| { |
| if (estate->es_epq_active == NULL) |
| node->fdwroutine->EndDirectModify(node); |
| } |
| else |
| node->fdwroutine->EndForeignScan(node); |
| |
| /* Shut down any outer plan. */ |
| if (outerPlanState(node)) |
| ExecEndNode(outerPlanState(node)); |
| |
| /* Free the exprcontext */ |
| ExecFreeExprContext(&node->ss.ps); |
| |
| /* clean out the tuple table */ |
| if (node->ss.ps.ps_ResultTupleSlot) |
| ExecClearTuple(node->ss.ps.ps_ResultTupleSlot); |
| ExecClearTuple(node->ss.ss_ScanTupleSlot); |
| } |
| |
| /* ---------------------------------------------------------------- |
| * ExecReScanForeignScan |
| * |
| * Rescans the relation. |
| * ---------------------------------------------------------------- |
| */ |
| void |
| ExecReScanForeignScan(ForeignScanState *node) |
| { |
| ForeignScan *plan = (ForeignScan *) node->ss.ps.plan; |
| EState *estate = node->ss.ps.state; |
| PlanState *outerPlan = outerPlanState(node); |
| |
| /* |
| * Ignore direct modifications when EvalPlanQual is active --- they are |
| * irrelevant for EvalPlanQual rechecking |
| */ |
| if (estate->es_epq_active != NULL && plan->operation != CMD_SELECT) |
| return; |
| |
| node->fdwroutine->ReScanForeignScan(node); |
| |
| /* |
| * If chgParam of subnode is not null then plan will be re-scanned by |
| * first ExecProcNode. outerPlan may also be NULL, in which case there is |
| * nothing to rescan at all. |
| */ |
| if (outerPlan != NULL && outerPlan->chgParam == NULL) |
| ExecReScan(outerPlan); |
| |
| ExecScanReScan(&node->ss); |
| } |
| |
| /* ---------------------------------------------------------------- |
| * ExecForeignScanEstimate |
| * |
| * Informs size of the parallel coordination information, if any |
| * ---------------------------------------------------------------- |
| */ |
| void |
| ExecForeignScanEstimate(ForeignScanState *node, ParallelContext *pcxt) |
| { |
| FdwRoutine *fdwroutine = node->fdwroutine; |
| |
| if (fdwroutine->EstimateDSMForeignScan) |
| { |
| node->pscan_len = fdwroutine->EstimateDSMForeignScan(node, pcxt); |
| shm_toc_estimate_chunk(&pcxt->estimator, node->pscan_len); |
| shm_toc_estimate_keys(&pcxt->estimator, 1); |
| } |
| } |
| |
| /* ---------------------------------------------------------------- |
| * ExecForeignScanInitializeDSM |
| * |
| * Initialize the parallel coordination information |
| * ---------------------------------------------------------------- |
| */ |
| void |
| ExecForeignScanInitializeDSM(ForeignScanState *node, ParallelContext *pcxt) |
| { |
| FdwRoutine *fdwroutine = node->fdwroutine; |
| |
| if (fdwroutine->InitializeDSMForeignScan) |
| { |
| int plan_node_id = node->ss.ps.plan->plan_node_id; |
| void *coordinate; |
| |
| coordinate = shm_toc_allocate(pcxt->toc, node->pscan_len); |
| fdwroutine->InitializeDSMForeignScan(node, pcxt, coordinate); |
| shm_toc_insert(pcxt->toc, plan_node_id, coordinate); |
| } |
| } |
| |
| /* ---------------------------------------------------------------- |
| * ExecForeignScanReInitializeDSM |
| * |
| * Reset shared state before beginning a fresh scan. |
| * ---------------------------------------------------------------- |
| */ |
| void |
| ExecForeignScanReInitializeDSM(ForeignScanState *node, ParallelContext *pcxt) |
| { |
| FdwRoutine *fdwroutine = node->fdwroutine; |
| |
| if (fdwroutine->ReInitializeDSMForeignScan) |
| { |
| int plan_node_id = node->ss.ps.plan->plan_node_id; |
| void *coordinate; |
| |
| coordinate = shm_toc_lookup(pcxt->toc, plan_node_id, false); |
| fdwroutine->ReInitializeDSMForeignScan(node, pcxt, coordinate); |
| } |
| } |
| |
| /* ---------------------------------------------------------------- |
| * ExecForeignScanInitializeWorker |
| * |
| * Initialization according to the parallel coordination information |
| * ---------------------------------------------------------------- |
| */ |
| void |
| ExecForeignScanInitializeWorker(ForeignScanState *node, |
| ParallelWorkerContext *pwcxt) |
| { |
| FdwRoutine *fdwroutine = node->fdwroutine; |
| |
| if (fdwroutine->InitializeWorkerForeignScan) |
| { |
| int plan_node_id = node->ss.ps.plan->plan_node_id; |
| void *coordinate; |
| |
| coordinate = shm_toc_lookup(pwcxt->toc, plan_node_id, false); |
| fdwroutine->InitializeWorkerForeignScan(node, pwcxt->toc, coordinate); |
| } |
| } |
| |
| /* ---------------------------------------------------------------- |
| * ExecShutdownForeignScan |
| * |
| * Gives FDW chance to stop asynchronous resource consumption |
| * and release any resources still held. |
| * ---------------------------------------------------------------- |
| */ |
| void |
| ExecShutdownForeignScan(ForeignScanState *node) |
| { |
| FdwRoutine *fdwroutine = node->fdwroutine; |
| |
| if (fdwroutine->ShutdownForeignScan) |
| fdwroutine->ShutdownForeignScan(node); |
| } |
| |
| /* ---------------------------------------------------------------- |
| * ExecAsyncForeignScanRequest |
| * |
| * Asynchronously request a tuple from a designed async-capable node |
| * ---------------------------------------------------------------- |
| */ |
| void |
| ExecAsyncForeignScanRequest(AsyncRequest *areq) |
| { |
| ForeignScanState *node = (ForeignScanState *) areq->requestee; |
| FdwRoutine *fdwroutine = node->fdwroutine; |
| |
| Assert(fdwroutine->ForeignAsyncRequest != NULL); |
| fdwroutine->ForeignAsyncRequest(areq); |
| } |
| |
| /* ---------------------------------------------------------------- |
| * ExecAsyncForeignScanConfigureWait |
| * |
| * In async mode, configure for a wait |
| * ---------------------------------------------------------------- |
| */ |
| void |
| ExecAsyncForeignScanConfigureWait(AsyncRequest *areq) |
| { |
| ForeignScanState *node = (ForeignScanState *) areq->requestee; |
| FdwRoutine *fdwroutine = node->fdwroutine; |
| |
| Assert(fdwroutine->ForeignAsyncConfigureWait != NULL); |
| fdwroutine->ForeignAsyncConfigureWait(areq); |
| } |
| |
| /* ---------------------------------------------------------------- |
| * ExecAsyncForeignScanNotify |
| * |
| * Callback invoked when a relevant event has occurred |
| * ---------------------------------------------------------------- |
| */ |
| void |
| ExecAsyncForeignScanNotify(AsyncRequest *areq) |
| { |
| ForeignScanState *node = (ForeignScanState *) areq->requestee; |
| FdwRoutine *fdwroutine = node->fdwroutine; |
| |
| Assert(fdwroutine->ForeignAsyncNotify != NULL); |
| fdwroutine->ForeignAsyncNotify(areq); |
| } |