| /*------------------------------------------------------------------------- |
| * |
| * nodeAppend.c |
| * routines to handle append nodes. |
| * |
| * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group |
| * Portions Copyright (c) 1994, Regents of the University of California |
| * |
| * |
| * IDENTIFICATION |
| * src/backend/executor/nodeAppend.c |
| * |
| *------------------------------------------------------------------------- |
| */ |
| /* INTERFACE ROUTINES |
| * ExecInitAppend - initialize the append node |
| * ExecAppend - retrieve the next tuple from the node |
| * ExecEndAppend - shut down the append node |
| * ExecReScanAppend - rescan the append node |
| * |
| * NOTES |
| * Each append node contains a list of one or more subplans which |
| * must be iteratively processed (forwards or backwards). |
| * Tuples are retrieved by executing the 'whichplan'th subplan |
| * until the subplan stops returning tuples, at which point that |
| * plan is shut down and the next started up. |
| * |
| * Append nodes don't make use of their left and right |
| * subtrees, rather they maintain a list of subplans so |
| * a typical append node looks like this in the plan tree: |
| * |
| * ... |
| * / |
| * Append -------+------+------+--- nil |
| * / \ | | | |
| * nil nil ... ... ... |
| * subplans |
| * |
| * Append nodes are currently used for unions, and to support |
| * inheritance queries, where several relations need to be scanned. |
| * For example, in our standard person/student/employee/student-emp |
| * example, where student and employee inherit from person |
| * and student-emp inherits from student and employee, the |
| * query: |
| * |
| * select name from person |
| * |
| * generates the plan: |
| * |
| * | |
| * Append -------+-------+--------+--------+ |
| * / \ | | | | |
| * nil nil Scan Scan Scan Scan |
| * | | | | |
| * person employee student student-emp |
| */ |
| |
| #include "postgres.h" |
| |
| #include "cdb/cdbvars.h" |
| #include "executor/execAsync.h" |
| #include "executor/execdebug.h" |
| #include "executor/execPartition.h" |
| #include "executor/nodeAppend.h" |
| #include "miscadmin.h" |
| #include "pgstat.h" |
| #include "storage/latch.h" |
| |
| /* Shared state for parallel-aware Append. */ |
| struct ParallelAppendState |
| { |
| LWLock pa_lock; /* mutual exclusion to choose next subplan */ |
| int pa_next_plan; /* next plan to choose by any worker */ |
| |
| /* |
| * pa_finished[i] should be true if no more workers should select subplan |
| * i. for a non-partial plan, this should be set to true as soon as a |
| * worker selects the plan; for a partial plan, it remains false until |
| * some worker executes the plan to completion. |
| */ |
| bool pa_finished[FLEXIBLE_ARRAY_MEMBER]; |
| }; |
| |
| #define INVALID_SUBPLAN_INDEX -1 |
| #define EVENT_BUFFER_SIZE 16 |
| |
| static TupleTableSlot *ExecAppend(PlanState *pstate); |
| static bool choose_next_subplan_locally(AppendState *node); |
| static bool choose_next_subplan_for_leader(AppendState *node); |
| static bool choose_next_subplan_for_worker(AppendState *node); |
| static void mark_invalid_subplans_as_finished(AppendState *node); |
| static void ExecAppendAsyncBegin(AppendState *node); |
| static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result); |
| static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result); |
| static void ExecAppendAsyncEventWait(AppendState *node); |
| static void classify_matching_subplans(AppendState *node); |
| |
| /* ---------------------------------------------------------------- |
| * ExecInitAppend |
| * |
| * Begin all of the subscans of the append node. |
| * |
| * (This is potentially wasteful, since the entire result of the |
| * append node may not be scanned, but this way all of the |
| * structures get allocated in the executor's top level memory |
| * block instead of that of the call to ExecAppend.) |
| * ---------------------------------------------------------------- |
| */ |
| AppendState * |
| ExecInitAppend(Append *node, EState *estate, int eflags) |
| { |
| AppendState *appendstate = makeNode(AppendState); |
| PlanState **appendplanstates; |
| Bitmapset *validsubplans; |
| Bitmapset *asyncplans; |
| int nplans; |
| int nasyncplans; |
| int firstvalid; |
| int i, |
| j; |
| |
| /* check for unsupported flags */ |
| Assert(!(eflags & EXEC_FLAG_MARK)); |
| |
| /* |
| * create new AppendState for our append node |
| */ |
| appendstate->ps.plan = (Plan *) node; |
| appendstate->ps.state = estate; |
| appendstate->ps.ExecProcNode = ExecAppend; |
| |
| /* Let choose_next_subplan_* function handle setting the first subplan */ |
| appendstate->as_whichplan = INVALID_SUBPLAN_INDEX; |
| appendstate->as_syncdone = false; |
| appendstate->as_begun = false; |
| |
| /* If run-time partition pruning is enabled, then set that up now */ |
| if (node->part_prune_info != NULL) |
| { |
| PartitionPruneState *prunestate; |
| |
| /* We may need an expression context to evaluate partition exprs */ |
| ExecAssignExprContext(estate, &appendstate->ps); |
| |
| /* Create the working data structure for pruning. */ |
| prunestate = ExecCreatePartitionPruneState(&appendstate->ps, |
| node->part_prune_info); |
| appendstate->as_prune_state = prunestate; |
| |
| /* Perform an initial partition prune, if required. */ |
| if (prunestate->do_initial_prune) |
| { |
| /* Determine which subplans survive initial pruning */ |
| validsubplans = ExecFindInitialMatchingSubPlans(prunestate, |
| list_length(node->appendplans)); |
| |
| nplans = bms_num_members(validsubplans); |
| } |
| else |
| { |
| /* We'll need to initialize all subplans */ |
| nplans = list_length(node->appendplans); |
| Assert(nplans > 0); |
| validsubplans = bms_add_range(NULL, 0, nplans - 1); |
| } |
| |
| /* |
| * When no run-time pruning is required and there's at least one |
| * subplan, we can fill as_valid_subplans immediately, preventing |
| * later calls to ExecFindMatchingSubPlans. |
| */ |
| if (!prunestate->do_exec_prune && nplans > 0) |
| appendstate->as_valid_subplans = bms_add_range(NULL, 0, nplans - 1); |
| } |
| else |
| { |
| nplans = list_length(node->appendplans); |
| |
| /* |
| * When run-time partition pruning is not enabled we can just mark all |
| * subplans as valid; they must also all be initialized. |
| */ |
| Assert(nplans > 0); |
| appendstate->as_valid_subplans = validsubplans = |
| bms_add_range(NULL, 0, nplans - 1); |
| appendstate->as_prune_state = NULL; |
| |
| if (node->join_prune_paramids) |
| appendstate->as_valid_subplans = NULL; |
| } |
| |
| /* |
| * Initialize result tuple type and slot. |
| */ |
| ExecInitResultTupleSlotTL(&appendstate->ps, &TTSOpsVirtual); |
| |
| /* node returns slots from each of its subnodes, therefore not fixed */ |
| appendstate->ps.resultopsset = true; |
| appendstate->ps.resultopsfixed = false; |
| |
| appendplanstates = (PlanState **) palloc(nplans * |
| sizeof(PlanState *)); |
| |
| /* |
| * call ExecInitNode on each of the valid plans to be executed and save |
| * the results into the appendplanstates array. |
| * |
| * While at it, find out the first valid partial plan. |
| */ |
| j = 0; |
| asyncplans = NULL; |
| nasyncplans = 0; |
| firstvalid = nplans; |
| i = -1; |
| while ((i = bms_next_member(validsubplans, i)) >= 0) |
| { |
| Plan *initNode = (Plan *) list_nth(node->appendplans, i); |
| |
| /* |
| * Record async subplans. When executing EvalPlanQual, we treat them |
| * as sync ones; don't do this when initializing an EvalPlanQual plan |
| * tree. |
| */ |
| if (initNode->async_capable && estate->es_epq_active == NULL) |
| { |
| asyncplans = bms_add_member(asyncplans, j); |
| nasyncplans++; |
| } |
| |
| /* |
| * Record the lowest appendplans index which is a valid partial plan. |
| */ |
| if (i >= node->first_partial_plan && j < firstvalid) |
| firstvalid = j; |
| |
| appendplanstates[j++] = ExecInitNode(initNode, estate, eflags); |
| } |
| |
| appendstate->as_first_partial_plan = firstvalid; |
| appendstate->appendplans = appendplanstates; |
| appendstate->as_nplans = nplans; |
| |
| /* Initialize async state */ |
| appendstate->as_asyncplans = asyncplans; |
| appendstate->as_nasyncplans = nasyncplans; |
| appendstate->as_asyncrequests = NULL; |
| appendstate->as_asyncresults = NULL; |
| appendstate->as_nasyncresults = 0; |
| appendstate->as_nasyncremain = 0; |
| appendstate->as_needrequest = NULL; |
| appendstate->as_eventset = NULL; |
| appendstate->as_valid_asyncplans = NULL; |
| |
| if (nasyncplans > 0) |
| { |
| appendstate->as_asyncrequests = (AsyncRequest **) |
| palloc0(nplans * sizeof(AsyncRequest *)); |
| |
| i = -1; |
| while ((i = bms_next_member(asyncplans, i)) >= 0) |
| { |
| AsyncRequest *areq; |
| |
| areq = palloc(sizeof(AsyncRequest)); |
| areq->requestor = (PlanState *) appendstate; |
| areq->requestee = appendplanstates[i]; |
| areq->request_index = i; |
| areq->callback_pending = false; |
| areq->request_complete = false; |
| areq->result = NULL; |
| |
| appendstate->as_asyncrequests[i] = areq; |
| } |
| |
| appendstate->as_asyncresults = (TupleTableSlot **) |
| palloc0(nasyncplans * sizeof(TupleTableSlot *)); |
| |
| if (appendstate->as_valid_subplans != NULL) |
| classify_matching_subplans(appendstate); |
| } |
| |
| /* |
| * Miscellaneous initialization |
| */ |
| |
| appendstate->ps.ps_ProjInfo = NULL; |
| |
| /* For parallel query, this will be overridden later. */ |
| appendstate->choose_next_subplan = choose_next_subplan_locally; |
| |
| return appendstate; |
| } |
| |
| /* ---------------------------------------------------------------- |
| * ExecAppend |
| * |
| * Handles iteration over multiple subplans. |
| * ---------------------------------------------------------------- |
| */ |
| static TupleTableSlot * |
| ExecAppend(PlanState *pstate) |
| { |
| AppendState *node = castNode(AppendState, pstate); |
| TupleTableSlot *result; |
| |
| /* |
| * If this is the first call after Init or ReScan, we need to do the |
| * initialization work. |
| */ |
| if (!node->as_begun) |
| { |
| Assert(node->as_whichplan == INVALID_SUBPLAN_INDEX); |
| Assert(!node->as_syncdone); |
| |
| /* Nothing to do if there are no subplans */ |
| if (node->as_nplans == 0) |
| return ExecClearTuple(node->ps.ps_ResultTupleSlot); |
| |
| /* If there are any async subplans, begin executing them. */ |
| if (node->as_nasyncplans > 0) |
| ExecAppendAsyncBegin(node); |
| |
| /* |
| * If no sync subplan has been chosen, we must choose one before |
| * proceeding. |
| */ |
| if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0) |
| return ExecClearTuple(node->ps.ps_ResultTupleSlot); |
| |
| Assert(node->as_syncdone || |
| (node->as_whichplan >= 0 && |
| node->as_whichplan < node->as_nplans)); |
| |
| /* And we're initialized. */ |
| node->as_begun = true; |
| } |
| |
| for (;;) |
| { |
| PlanState *subnode; |
| |
| CHECK_FOR_INTERRUPTS(); |
| |
| /* |
| * try to get a tuple from an async subplan if any |
| */ |
| if (node->as_syncdone || !bms_is_empty(node->as_needrequest)) |
| { |
| if (ExecAppendAsyncGetNext(node, &result)) |
| return result; |
| Assert(!node->as_syncdone); |
| Assert(bms_is_empty(node->as_needrequest)); |
| } |
| |
| /* |
| * figure out which sync subplan we are currently processing |
| */ |
| Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans); |
| subnode = node->appendplans[node->as_whichplan]; |
| |
| /* |
| * get a tuple from the subplan |
| */ |
| result = ExecProcNode(subnode); |
| |
| if (!TupIsNull(result)) |
| { |
| /* |
| * If the subplan gave us something then return it as-is. We do |
| * NOT make use of the result slot that was set up in |
| * ExecInitAppend; there's no need for it. |
| */ |
| return result; |
| } |
| |
| /* |
| * wait or poll for async events if any. We do this before checking |
| * for the end of iteration, because it might drain the remaining |
| * async subplans. |
| */ |
| if (node->as_nasyncremain > 0) |
| ExecAppendAsyncEventWait(node); |
| |
| /* choose new sync subplan; if no sync/async subplans, we're done */ |
| if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0) |
| return ExecClearTuple(node->ps.ps_ResultTupleSlot); |
| } |
| } |
| |
| /* ---------------------------------------------------------------- |
| * ExecEndAppend |
| * |
| * Shuts down the subscans of the append node. |
| * |
| * Returns nothing of interest. |
| * ---------------------------------------------------------------- |
| */ |
| void |
| ExecEndAppend(AppendState *node) |
| { |
| PlanState **appendplans; |
| int nplans; |
| int i; |
| |
| /* |
| * get information from the node |
| */ |
| appendplans = node->appendplans; |
| nplans = node->as_nplans; |
| |
| /* |
| * shut down each of the subscans |
| */ |
| for (i = nplans-1; i >= 0; --i) |
| { |
| if (appendplans[i]) |
| ExecEndNode(appendplans[i]); |
| } |
| } |
| |
| void |
| ExecReScanAppend(AppendState *node) |
| { |
| int nasyncplans = node->as_nasyncplans; |
| int i; |
| |
| /* |
| * If any PARAM_EXEC Params used in pruning expressions have changed, then |
| * we'd better unset the valid subplans so that they are reselected for |
| * the new parameter values. |
| */ |
| if (node->as_prune_state && |
| bms_overlap(node->ps.chgParam, |
| node->as_prune_state->execparamids)) |
| { |
| bms_free(node->as_valid_subplans); |
| node->as_valid_subplans = NULL; |
| if (nasyncplans > 0) |
| { |
| bms_free(node->as_valid_asyncplans); |
| node->as_valid_asyncplans = NULL; |
| } |
| } |
| |
| for (i = 0; i < node->as_nplans; i++) |
| { |
| PlanState *subnode = node->appendplans[i]; |
| |
| /* |
| * ExecReScan doesn't know about my subplans, so I have to do |
| * changed-parameter signaling myself. |
| */ |
| if (node->ps.chgParam != NULL) |
| UpdateChangedParamSet(subnode, node->ps.chgParam); |
| |
| /* |
| * If chgParam of subnode is not null then plan will be re-scanned by |
| * first ExecProcNode or by first ExecAsyncRequest. |
| */ |
| if (subnode->chgParam == NULL) |
| ExecReScan(subnode); |
| } |
| |
| /* Reset async state */ |
| if (nasyncplans > 0) |
| { |
| i = -1; |
| while ((i = bms_next_member(node->as_asyncplans, i)) >= 0) |
| { |
| AsyncRequest *areq = node->as_asyncrequests[i]; |
| |
| areq->callback_pending = false; |
| areq->request_complete = false; |
| areq->result = NULL; |
| } |
| |
| node->as_nasyncresults = 0; |
| node->as_nasyncremain = 0; |
| bms_free(node->as_needrequest); |
| node->as_needrequest = NULL; |
| } |
| |
| /* Let choose_next_subplan_* function handle setting the first subplan */ |
| node->as_whichplan = INVALID_SUBPLAN_INDEX; |
| node->as_syncdone = false; |
| node->as_begun = false; |
| } |
| |
| /* ---------------------------------------------------------------- |
| * Parallel Append Support |
| * ---------------------------------------------------------------- |
| */ |
| |
| /* ---------------------------------------------------------------- |
| * ExecAppendEstimate |
| * |
| * Compute the amount of space we'll need in the parallel |
| * query DSM, and inform pcxt->estimator about our needs. |
| * ---------------------------------------------------------------- |
| */ |
| void |
| ExecAppendEstimate(AppendState *node, |
| ParallelContext *pcxt) |
| { |
| node->pstate_len = |
| add_size(offsetof(ParallelAppendState, pa_finished), |
| sizeof(bool) * node->as_nplans); |
| |
| shm_toc_estimate_chunk(&pcxt->estimator, node->pstate_len); |
| shm_toc_estimate_keys(&pcxt->estimator, 1); |
| } |
| |
| |
| /* ---------------------------------------------------------------- |
| * ExecAppendInitializeDSM |
| * |
| * Set up shared state for Parallel Append. |
| * ---------------------------------------------------------------- |
| */ |
| void |
| ExecAppendInitializeDSM(AppendState *node, |
| ParallelContext *pcxt) |
| { |
| ParallelAppendState *pstate; |
| |
| pstate = shm_toc_allocate(pcxt->toc, node->pstate_len); |
| memset(pstate, 0, node->pstate_len); |
| LWLockInitialize(&pstate->pa_lock, LWTRANCHE_PARALLEL_APPEND); |
| shm_toc_insert(pcxt->toc, node->ps.plan->plan_node_id, pstate); |
| |
| node->as_pstate = pstate; |
| node->choose_next_subplan = choose_next_subplan_for_leader; |
| } |
| |
| |
| /* ---------------------------------------------------------------- |
| * ExecAppendReInitializeDSM |
| * |
| * Reset shared state before beginning a fresh scan. |
| * ---------------------------------------------------------------- |
| */ |
| void |
| ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt) |
| { |
| ParallelAppendState *pstate = node->as_pstate; |
| |
| pstate->pa_next_plan = 0; |
| memset(pstate->pa_finished, 0, sizeof(bool) * node->as_nplans); |
| } |
| |
| /* ---------------------------------------------------------------- |
| * ExecAppendInitializeWorker |
| * |
| * Copy relevant information from TOC into planstate, and initialize |
| * whatever is required to choose and execute the optimal subplan. |
| * ---------------------------------------------------------------- |
| */ |
| void |
| ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt) |
| { |
| node->as_pstate = shm_toc_lookup(pwcxt->toc, node->ps.plan->plan_node_id, false); |
| node->choose_next_subplan = choose_next_subplan_for_worker; |
| } |
| |
| /* ---------------------------------------------------------------- |
| * choose_next_subplan_locally |
| * |
| * Choose next sync subplan for a non-parallel-aware Append, |
| * returning false if there are no more. |
| * ---------------------------------------------------------------- |
| */ |
| static bool |
| choose_next_subplan_locally(AppendState *node) |
| { |
| int whichplan = node->as_whichplan; |
| int nextplan; |
| |
| /* We should never be called when there are no subplans */ |
| Assert(node->as_nplans > 0); |
| |
| /* Nothing to do if syncdone */ |
| if (node->as_syncdone) |
| return false; |
| |
| /* |
| * If first call then have the bms member function choose the first valid |
| * sync subplan by initializing whichplan to -1. If there happen to be no |
| * valid sync subplans then the bms member function will handle that by |
| * returning a negative number which will allow us to exit returning a |
| * false value. |
| */ |
| if (whichplan == INVALID_SUBPLAN_INDEX) |
| { |
| if (node->as_nasyncplans > 0) |
| { |
| /* We'd have filled as_valid_subplans already */ |
| Assert(node->as_valid_subplans); |
| } |
| else if (node->as_valid_subplans == NULL) |
| { |
| Append *plan = (Append *) node->ps.plan; |
| node->as_valid_subplans = |
| ExecFindMatchingSubPlans(node->as_prune_state, |
| node->ps.state, |
| list_length(plan->appendplans), |
| plan->join_prune_paramids); |
| } |
| |
| whichplan = -1; |
| } |
| |
| /* Ensure whichplan is within the expected range */ |
| Assert(whichplan >= -1 && whichplan <= node->as_nplans); |
| |
| if (ScanDirectionIsForward(node->ps.state->es_direction)) |
| nextplan = bms_next_member(node->as_valid_subplans, whichplan); |
| else |
| nextplan = bms_prev_member(node->as_valid_subplans, whichplan); |
| |
| if (nextplan < 0) |
| { |
| /* Set as_syncdone if in async mode */ |
| if (node->as_nasyncplans > 0) |
| node->as_syncdone = true; |
| return false; |
| } |
| |
| node->as_whichplan = nextplan; |
| |
| return true; |
| } |
| |
| /* ---------------------------------------------------------------- |
| * choose_next_subplan_for_leader |
| * |
| * Try to pick a plan which doesn't commit us to doing much |
| * work locally, so that as much work as possible is done in |
| * the workers. Cheapest subplans are at the end. |
| * ---------------------------------------------------------------- |
| */ |
| static bool |
| choose_next_subplan_for_leader(AppendState *node) |
| { |
| ParallelAppendState *pstate = node->as_pstate; |
| |
| /* Backward scan is not supported by parallel-aware plans */ |
| Assert(ScanDirectionIsForward(node->ps.state->es_direction)); |
| |
| /* We should never be called when there are no subplans */ |
| Assert(node->as_nplans > 0); |
| |
| LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE); |
| |
| if (node->as_whichplan != INVALID_SUBPLAN_INDEX) |
| { |
| /* Mark just-completed subplan as finished. */ |
| node->as_pstate->pa_finished[node->as_whichplan] = true; |
| } |
| else |
| { |
| /* Start with last subplan. */ |
| node->as_whichplan = node->as_nplans - 1; |
| |
| /* |
| * If we've yet to determine the valid subplans then do so now. If |
| * run-time pruning is disabled then the valid subplans will always be |
| * set to all subplans. |
| */ |
| if (node->as_valid_subplans == NULL) |
| { |
| Append *plan = (Append *) node->ps.plan; |
| |
| node->as_valid_subplans = |
| ExecFindMatchingSubPlans(node->as_prune_state, |
| node->ps.state, |
| list_length(plan->appendplans), |
| plan->join_prune_paramids); |
| |
| /* |
| * Mark each invalid plan as finished to allow the loop below to |
| * select the first valid subplan. |
| */ |
| mark_invalid_subplans_as_finished(node); |
| } |
| } |
| |
| /* Loop until we find a subplan to execute. */ |
| while (pstate->pa_finished[node->as_whichplan]) |
| { |
| if (node->as_whichplan == 0) |
| { |
| pstate->pa_next_plan = INVALID_SUBPLAN_INDEX; |
| node->as_whichplan = INVALID_SUBPLAN_INDEX; |
| LWLockRelease(&pstate->pa_lock); |
| return false; |
| } |
| |
| /* |
| * We needn't pay attention to as_valid_subplans here as all invalid |
| * plans have been marked as finished. |
| */ |
| node->as_whichplan--; |
| } |
| |
| /* If non-partial, immediately mark as finished. */ |
| if (node->as_whichplan < node->as_first_partial_plan) |
| node->as_pstate->pa_finished[node->as_whichplan] = true; |
| |
| LWLockRelease(&pstate->pa_lock); |
| |
| return true; |
| } |
| |
| /* ---------------------------------------------------------------- |
| * choose_next_subplan_for_worker |
| * |
| * Choose next subplan for a parallel-aware Append, returning |
| * false if there are no more. |
| * |
| * We start from the first plan and advance through the list; |
| * when we get back to the end, we loop back to the first |
| * partial plan. This assigns the non-partial plans first in |
| * order of descending cost and then spreads out the workers |
| * as evenly as possible across the remaining partial plans. |
| * ---------------------------------------------------------------- |
| */ |
| static bool |
| choose_next_subplan_for_worker(AppendState *node) |
| { |
| ParallelAppendState *pstate = node->as_pstate; |
| |
| /* Backward scan is not supported by parallel-aware plans */ |
| Assert(ScanDirectionIsForward(node->ps.state->es_direction)); |
| |
| /* We should never be called when there are no subplans */ |
| Assert(node->as_nplans > 0); |
| |
| LWLockAcquire(&pstate->pa_lock, LW_EXCLUSIVE); |
| |
| /* Mark just-completed subplan as finished. */ |
| if (node->as_whichplan != INVALID_SUBPLAN_INDEX) |
| node->as_pstate->pa_finished[node->as_whichplan] = true; |
| |
| /* |
| * If we've yet to determine the valid subplans then do so now. If |
| * run-time pruning is disabled then the valid subplans will always be set |
| * to all subplans. |
| */ |
| else if (node->as_valid_subplans == NULL) |
| { |
| Append *plan = (Append *) node->ps.plan; |
| |
| node->as_valid_subplans = |
| ExecFindMatchingSubPlans(node->as_prune_state, |
| node->ps.state, |
| list_length(plan->appendplans), |
| plan->join_prune_paramids); |
| mark_invalid_subplans_as_finished(node); |
| } |
| |
| /* If all the plans are already done, we have nothing to do */ |
| if (pstate->pa_next_plan == INVALID_SUBPLAN_INDEX) |
| { |
| LWLockRelease(&pstate->pa_lock); |
| return false; |
| } |
| |
| /* Save the plan from which we are starting the search. */ |
| node->as_whichplan = pstate->pa_next_plan; |
| |
| /* Loop until we find a valid subplan to execute. */ |
| while (pstate->pa_finished[pstate->pa_next_plan]) |
| { |
| int nextplan; |
| |
| nextplan = bms_next_member(node->as_valid_subplans, |
| pstate->pa_next_plan); |
| if (nextplan >= 0) |
| { |
| /* Advance to the next valid plan. */ |
| pstate->pa_next_plan = nextplan; |
| } |
| else if (node->as_whichplan > node->as_first_partial_plan) |
| { |
| /* |
| * Try looping back to the first valid partial plan, if there is |
| * one. If there isn't, arrange to bail out below. |
| */ |
| nextplan = bms_next_member(node->as_valid_subplans, |
| node->as_first_partial_plan - 1); |
| pstate->pa_next_plan = |
| nextplan < 0 ? node->as_whichplan : nextplan; |
| } |
| else |
| { |
| /* |
| * At last plan, and either there are no partial plans or we've |
| * tried them all. Arrange to bail out. |
| */ |
| pstate->pa_next_plan = node->as_whichplan; |
| } |
| |
| if (pstate->pa_next_plan == node->as_whichplan) |
| { |
| /* We've tried everything! */ |
| pstate->pa_next_plan = INVALID_SUBPLAN_INDEX; |
| LWLockRelease(&pstate->pa_lock); |
| return false; |
| } |
| } |
| |
| /* Pick the plan we found, and advance pa_next_plan one more time. */ |
| node->as_whichplan = pstate->pa_next_plan; |
| pstate->pa_next_plan = bms_next_member(node->as_valid_subplans, |
| pstate->pa_next_plan); |
| |
| /* |
| * If there are no more valid plans then try setting the next plan to the |
| * first valid partial plan. |
| */ |
| if (pstate->pa_next_plan < 0) |
| { |
| int nextplan = bms_next_member(node->as_valid_subplans, |
| node->as_first_partial_plan - 1); |
| |
| if (nextplan >= 0) |
| pstate->pa_next_plan = nextplan; |
| else |
| { |
| /* |
| * There are no valid partial plans, and we already chose the last |
| * non-partial plan; so flag that there's nothing more for our |
| * fellow workers to do. |
| */ |
| pstate->pa_next_plan = INVALID_SUBPLAN_INDEX; |
| } |
| } |
| |
| /* If non-partial, immediately mark as finished. */ |
| if (node->as_whichplan < node->as_first_partial_plan) |
| node->as_pstate->pa_finished[node->as_whichplan] = true; |
| |
| LWLockRelease(&pstate->pa_lock); |
| |
| return true; |
| } |
| |
| /* |
| * mark_invalid_subplans_as_finished |
| * Marks the ParallelAppendState's pa_finished as true for each invalid |
| * subplan. |
| * |
| * This function should only be called for parallel Append with run-time |
| * pruning enabled. |
| */ |
| static void |
| mark_invalid_subplans_as_finished(AppendState *node) |
| { |
| int i; |
| |
| /* Only valid to call this while in parallel Append mode */ |
| Assert(node->as_pstate); |
| |
| /* |
| * NB: In upstream, we assert node->as_prune_state to be not empty. |
| * However, after pg12 merge, we'll allow the case that |
| * as_valid_subplans and as_prune_state both be emty while |
| * node->join_prune_paramids is true. |
| * |
| * If as_valid_subplans is empty we'll call this function. And the |
| * assertion must be removed. |
| */ |
| #if 0 |
| /* Shouldn't have been called when run-time pruning is not enabled */ |
| Assert(node->as_prune_state); |
| #endif |
| |
| /* Nothing to do if all plans are valid */ |
| if (bms_num_members(node->as_valid_subplans) == node->as_nplans) |
| return; |
| |
| /* Mark all non-valid plans as finished */ |
| for (i = 0; i < node->as_nplans; i++) |
| { |
| if (!bms_is_member(i, node->as_valid_subplans)) |
| node->as_pstate->pa_finished[i] = true; |
| } |
| } |
| |
| /* ---------------------------------------------------------------- |
| * Asynchronous Append Support |
| * ---------------------------------------------------------------- |
| */ |
| |
| /* ---------------------------------------------------------------- |
| * ExecAppendAsyncBegin |
| * |
| * Begin executing designed async-capable subplans. |
| * ---------------------------------------------------------------- |
| */ |
| static void |
| ExecAppendAsyncBegin(AppendState *node) |
| { |
| int i; |
| |
| /* Backward scan is not supported by async-aware Appends. */ |
| Assert(ScanDirectionIsForward(node->ps.state->es_direction)); |
| |
| /* We should never be called when there are no subplans */ |
| Assert(node->as_nplans > 0); |
| |
| /* We should never be called when there are no async subplans. */ |
| Assert(node->as_nasyncplans > 0); |
| |
| /* If we've yet to determine the valid subplans then do so now. */ |
| if (node->as_valid_subplans == NULL) |
| { |
| Append *plan = (Append *) node->ps.plan; |
| node->as_valid_subplans = |
| ExecFindMatchingSubPlans(node->as_prune_state, |
| node->ps.state, |
| list_length(plan->appendplans), |
| plan->join_prune_paramids); |
| |
| classify_matching_subplans(node); |
| } |
| |
| /* Initialize state variables. */ |
| node->as_syncdone = bms_is_empty(node->as_valid_subplans); |
| node->as_nasyncremain = bms_num_members(node->as_valid_asyncplans); |
| |
| /* Nothing to do if there are no valid async subplans. */ |
| if (node->as_nasyncremain == 0) |
| return; |
| |
| /* Make a request for each of the valid async subplans. */ |
| i = -1; |
| while ((i = bms_next_member(node->as_valid_asyncplans, i)) >= 0) |
| { |
| AsyncRequest *areq = node->as_asyncrequests[i]; |
| |
| Assert(areq->request_index == i); |
| Assert(!areq->callback_pending); |
| |
| /* Do the actual work. */ |
| ExecAsyncRequest(areq); |
| } |
| } |
| |
| /* ---------------------------------------------------------------- |
| * ExecAppendAsyncGetNext |
| * |
| * Get the next tuple from any of the asynchronous subplans. |
| * ---------------------------------------------------------------- |
| */ |
| static bool |
| ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result) |
| { |
| *result = NULL; |
| |
| /* We should never be called when there are no valid async subplans. */ |
| Assert(node->as_nasyncremain > 0); |
| |
| /* Request a tuple asynchronously. */ |
| if (ExecAppendAsyncRequest(node, result)) |
| return true; |
| |
| while (node->as_nasyncremain > 0) |
| { |
| CHECK_FOR_INTERRUPTS(); |
| |
| /* Wait or poll for async events. */ |
| ExecAppendAsyncEventWait(node); |
| |
| /* Request a tuple asynchronously. */ |
| if (ExecAppendAsyncRequest(node, result)) |
| return true; |
| |
| /* Break from loop if there's any sync subplan that isn't complete. */ |
| if (!node->as_syncdone) |
| break; |
| } |
| |
| /* |
| * If all sync subplans are complete, we're totally done scanning the |
| * given node. Otherwise, we're done with the asynchronous stuff but must |
| * continue scanning the sync subplans. |
| */ |
| if (node->as_syncdone) |
| { |
| Assert(node->as_nasyncremain == 0); |
| *result = ExecClearTuple(node->ps.ps_ResultTupleSlot); |
| return true; |
| } |
| |
| return false; |
| } |
| |
| /* ---------------------------------------------------------------- |
| * ExecAppendAsyncRequest |
| * |
| * Request a tuple asynchronously. |
| * ---------------------------------------------------------------- |
| */ |
| static bool |
| ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result) |
| { |
| Bitmapset *needrequest; |
| int i; |
| |
| /* Nothing to do if there are no async subplans needing a new request. */ |
| if (bms_is_empty(node->as_needrequest)) |
| { |
| Assert(node->as_nasyncresults == 0); |
| return false; |
| } |
| |
| /* |
| * If there are any asynchronously-generated results that have not yet |
| * been returned, we have nothing to do; just return one of them. |
| */ |
| if (node->as_nasyncresults > 0) |
| { |
| --node->as_nasyncresults; |
| *result = node->as_asyncresults[node->as_nasyncresults]; |
| return true; |
| } |
| |
| /* Make a new request for each of the async subplans that need it. */ |
| needrequest = node->as_needrequest; |
| node->as_needrequest = NULL; |
| i = -1; |
| while ((i = bms_next_member(needrequest, i)) >= 0) |
| { |
| AsyncRequest *areq = node->as_asyncrequests[i]; |
| |
| /* Do the actual work. */ |
| ExecAsyncRequest(areq); |
| } |
| bms_free(needrequest); |
| |
| /* Return one of the asynchronously-generated results if any. */ |
| if (node->as_nasyncresults > 0) |
| { |
| --node->as_nasyncresults; |
| *result = node->as_asyncresults[node->as_nasyncresults]; |
| return true; |
| } |
| |
| return false; |
| } |
| |
| /* ---------------------------------------------------------------- |
| * ExecAppendAsyncEventWait |
| * |
| * Wait or poll for file descriptor events and fire callbacks. |
| * ---------------------------------------------------------------- |
| */ |
| static void |
| ExecAppendAsyncEventWait(AppendState *node) |
| { |
| int nevents = node->as_nasyncplans + 1; |
| long timeout = node->as_syncdone ? -1 : 0; |
| WaitEvent occurred_event[EVENT_BUFFER_SIZE]; |
| int noccurred; |
| int i; |
| |
| /* We should never be called when there are no valid async subplans. */ |
| Assert(node->as_nasyncremain > 0); |
| |
| node->as_eventset = CreateWaitEventSet(CurrentMemoryContext, nevents); |
| AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET, |
| NULL, NULL); |
| |
| /* Give each waiting subplan a chance to add an event. */ |
| i = -1; |
| while ((i = bms_next_member(node->as_asyncplans, i)) >= 0) |
| { |
| AsyncRequest *areq = node->as_asyncrequests[i]; |
| |
| if (areq->callback_pending) |
| ExecAsyncConfigureWait(areq); |
| } |
| |
| /* |
| * No need for further processing if there are no configured events other |
| * than the postmaster death event. |
| */ |
| if (GetNumRegisteredWaitEvents(node->as_eventset) == 1) |
| { |
| FreeWaitEventSet(node->as_eventset); |
| node->as_eventset = NULL; |
| return; |
| } |
| |
| /* We wait on at most EVENT_BUFFER_SIZE events. */ |
| if (nevents > EVENT_BUFFER_SIZE) |
| nevents = EVENT_BUFFER_SIZE; |
| |
| /* |
| * If the timeout is -1, wait until at least one event occurs. If the |
| * timeout is 0, poll for events, but do not wait at all. |
| */ |
| noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event, |
| nevents, WAIT_EVENT_APPEND_READY); |
| FreeWaitEventSet(node->as_eventset); |
| node->as_eventset = NULL; |
| if (noccurred == 0) |
| return; |
| |
| /* Deliver notifications. */ |
| for (i = 0; i < noccurred; i++) |
| { |
| WaitEvent *w = &occurred_event[i]; |
| |
| /* |
| * Each waiting subplan should have registered its wait event with |
| * user_data pointing back to its AsyncRequest. |
| */ |
| if ((w->events & WL_SOCKET_READABLE) != 0) |
| { |
| AsyncRequest *areq = (AsyncRequest *) w->user_data; |
| |
| if (areq->callback_pending) |
| { |
| /* |
| * Mark it as no longer needing a callback. We must do this |
| * before dispatching the callback in case the callback resets |
| * the flag. |
| */ |
| areq->callback_pending = false; |
| |
| /* Do the actual work. */ |
| ExecAsyncNotify(areq); |
| } |
| } |
| } |
| } |
| |
| /* ---------------------------------------------------------------- |
| * ExecAsyncAppendResponse |
| * |
| * Receive a response from an asynchronous request we made. |
| * ---------------------------------------------------------------- |
| */ |
| void |
| ExecAsyncAppendResponse(AsyncRequest *areq) |
| { |
| AppendState *node = (AppendState *) areq->requestor; |
| TupleTableSlot *slot = areq->result; |
| |
| /* The result should be a TupleTableSlot or NULL. */ |
| Assert(slot == NULL || IsA(slot, TupleTableSlot)); |
| |
| /* Nothing to do if the request is pending. */ |
| if (!areq->request_complete) |
| { |
| /* The request would have been pending for a callback. */ |
| Assert(areq->callback_pending); |
| return; |
| } |
| |
| /* If the result is NULL or an empty slot, there's nothing more to do. */ |
| if (TupIsNull(slot)) |
| { |
| /* The ending subplan wouldn't have been pending for a callback. */ |
| Assert(!areq->callback_pending); |
| --node->as_nasyncremain; |
| return; |
| } |
| |
| /* Save result so we can return it. */ |
| Assert(node->as_nasyncresults < node->as_nasyncplans); |
| node->as_asyncresults[node->as_nasyncresults++] = slot; |
| |
| /* |
| * Mark the subplan that returned a result as ready for a new request. We |
| * don't launch another one here immediately because it might complete. |
| */ |
| node->as_needrequest = bms_add_member(node->as_needrequest, |
| areq->request_index); |
| } |
| |
| /* ---------------------------------------------------------------- |
| * classify_matching_subplans |
| * |
| * Classify the node's as_valid_subplans into sync ones and |
| * async ones, adjust it to contain sync ones only, and save |
| * async ones in the node's as_valid_asyncplans. |
| * ---------------------------------------------------------------- |
| */ |
| static void |
| classify_matching_subplans(AppendState *node) |
| { |
| Bitmapset *valid_asyncplans; |
| |
| Assert(node->as_valid_asyncplans == NULL); |
| |
| /* Nothing to do if there are no valid subplans. */ |
| if (bms_is_empty(node->as_valid_subplans)) |
| { |
| node->as_syncdone = true; |
| node->as_nasyncremain = 0; |
| return; |
| } |
| |
| /* Nothing to do if there are no valid async subplans. */ |
| if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans)) |
| { |
| node->as_nasyncremain = 0; |
| return; |
| } |
| |
| /* Get valid async subplans. */ |
| valid_asyncplans = bms_copy(node->as_asyncplans); |
| valid_asyncplans = bms_int_members(valid_asyncplans, |
| node->as_valid_subplans); |
| |
| /* Adjust the valid subplans to contain sync subplans only. */ |
| node->as_valid_subplans = bms_del_members(node->as_valid_subplans, |
| valid_asyncplans); |
| |
| /* Save valid async subplans. */ |
| node->as_valid_asyncplans = valid_asyncplans; |
| } |
| |
| void |
| ExecSquelchAppend(AppendState *node, bool force) |
| { |
| int i; |
| node->ps.squelched = true; |
| for (i = 0; i < node->as_nplans; i++) |
| ExecSquelchNode(node->appendplans[i], force); |
| } |