| /*------------------------------------------------------------------------- |
| * |
| * nodeMotion.c |
| * Routines to handle moving tuples around in Apache Cloudberry. |
| * |
| * Portions Copyright (c) 2005-2008, Greenplum inc |
| * Portions Copyright (c) 2012-Present VMware, Inc. or its affiliates. |
| * |
| * |
| * IDENTIFICATION |
| * src/backend/executor/nodeMotion.c |
| * |
| *------------------------------------------------------------------------- |
| */ |
| |
| #include "postgres.h" |
| |
| #include "access/heapam.h" |
| #include "access/parallel.h" |
| #include "nodes/execnodes.h" /* Slice, SliceTable */ |
| #include "cdb/cdbmotion.h" |
| #include "cdb/cdbutil.h" |
| #include "cdb/cdbvars.h" |
| #include "cdb/cdbhash.h" |
| #include "executor/executor.h" |
| #include "executor/execdebug.h" |
| #include "executor/execUtils.h" |
| #include "executor/nodeMotion.h" |
| #include "lib/binaryheap.h" |
| #include "utils/tuplesort.h" |
| #include "utils/wait_event.h" |
| #include "miscadmin.h" |
| #include "utils/memutils.h" |
| |
| |
| /* #define MEASURE_MOTION_TIME */ |
| |
| #ifdef MEASURE_MOTION_TIME |
| #include <unistd.h> /* gettimeofday */ |
| #endif |
| |
| /* #define CDB_MOTION_DEBUG */ |
| |
| #ifdef CDB_MOTION_DEBUG |
| #include "utils/lsyscache.h" /* getTypeOutputInfo */ |
| #include "lib/stringinfo.h" /* StringInfo */ |
| #endif |
| |
| |
| /*========================================================================= |
| * FUNCTIONS PROTOTYPES |
| */ |
| static TupleTableSlot *execMotionSender(MotionState *node); |
| static TupleTableSlot *execMotionUnsortedReceiver(MotionState *node); |
| static TupleTableSlot *execMotionSortedReceiver(MotionState *node); |
| |
| static int CdbMergeComparator(Datum lhs, Datum rhs, void *context); |
| static uint32 evalHashKey(ExprContext *econtext, List *hashkeys, CdbHash *h); |
| |
| static void doSendEndOfStream(Motion *motion, MotionState *node); |
| static void doSendTuple(Motion *motion, MotionState *node, TupleTableSlot *outerTupleSlot); |
| |
| |
| /*========================================================================= |
| */ |
| |
| #ifdef CDB_MOTION_DEBUG |
| static void |
| formatTuple(StringInfo buf, TupleTableSlot *slot, Oid *outputFunArray) |
| { |
| TupleDesc tupdesc = slot->tts_tupleDescriptor; |
| int i; |
| |
| for (i = 0; i < tupdesc->natts; i++) |
| { |
| bool isnull; |
| Datum d = slot_getattr(slot, i + 1, &isnull); |
| |
| if (d && !isnull) |
| { |
| char *s = OidOutputFunctionCall(outputFunArray[i], d); |
| char *name = NameStr(tupdesc->attrs[i].attname); |
| |
| if (name && *name) |
| appendStringInfo(buf, " %s=\"%.30s\"", name, s); |
| else |
| appendStringInfo(buf, " \"%.30s\"", s); |
| pfree(s); |
| } |
| } |
| appendStringInfoChar(buf, '\n'); |
| } |
| #endif |
| |
| /* ---------------------------------------------------------------- |
| * ExecMotion |
| * ---------------------------------------------------------------- |
| */ |
| static TupleTableSlot * |
| ExecMotion(PlanState *pstate) |
| { |
| MotionState *node = castNode(MotionState, pstate); |
| Motion *motion = (Motion *) node->ps.plan; |
| |
| /* |
| * Check for interrupts. Without this we've seen the scenario before that |
| * it could be quite slow to cancel a query that selects all the tuples |
| * from a big distributed table because the motion node on QD has no chance |
| * of checking the cancel signal. |
| */ |
| CHECK_FOR_INTERRUPTS(); |
| |
| /* sanity check */ |
| if (node->stopRequested) |
| ereport(ERROR, |
| (errcode(ERRCODE_INTERNAL_ERROR), |
| errmsg("unexpected internal error"), |
| errmsg("Already stopped motion node is executed again, data will lost"), |
| errhint("Likely motion node is incorrectly squelched earlier"))); |
| |
| /* |
| * at the top here we basically decide: -- SENDER vs. RECEIVER and -- |
| * SORTED vs. UNSORTED |
| */ |
| if (node->mstype == MOTIONSTATE_RECV) |
| { |
| TupleTableSlot *tuple; |
| #ifdef MEASURE_MOTION_TIME |
| struct timeval startTime; |
| struct timeval stopTime; |
| |
| gettimeofday(&startTime, NULL); |
| #endif |
| |
| if (node->ps.state->active_recv_id >= 0) |
| { |
| if (node->ps.state->active_recv_id != motion->motionID) |
| { |
| /* |
| * See motion_sanity_walker() for details on how a deadlock |
| * may occur. |
| */ |
| elog(LOG, "DEADLOCK HAZARD: Updating active_motion_id from %d to %d", |
| node->ps.state->active_recv_id, motion->motionID); |
| node->ps.state->active_recv_id = motion->motionID; |
| } |
| } |
| else |
| node->ps.state->active_recv_id = motion->motionID; |
| |
| if (motion->sendSorted) |
| tuple = execMotionSortedReceiver(node); |
| else |
| tuple = execMotionUnsortedReceiver(node); |
| |
| /* |
| * We tell the upper node as if this was the end of tuple stream if |
| * query-finish is requested. Unlike other nodes, we skipped this |
| * check in ExecProc because this node in sender mode should send EoS |
| * to the receiver side, but the receiver side can simply stop |
| * processing the stream. The sender side of this stream could still |
| * be sending more tuples, but this slice will eventually clean up the |
| * executor and eventually Stop message will be delivered to the |
| * sender side. |
| */ |
| if (QueryFinishPending) |
| tuple = NULL; |
| |
| if (tuple == NULL) |
| node->ps.state->active_recv_id = -1; |
| #ifdef MEASURE_MOTION_TIME |
| gettimeofday(&stopTime, NULL); |
| |
| node->motionTime.tv_sec += stopTime.tv_sec - startTime.tv_sec; |
| node->motionTime.tv_usec += stopTime.tv_usec - startTime.tv_usec; |
| |
| while (node->motionTime.tv_usec < 0) |
| { |
| node->motionTime.tv_usec += 1000000; |
| node->motionTime.tv_sec--; |
| } |
| |
| while (node->motionTime.tv_usec >= 1000000) |
| { |
| node->motionTime.tv_usec -= 1000000; |
| node->motionTime.tv_sec++; |
| } |
| #endif |
| return tuple; |
| } |
| else if (node->mstype == MOTIONSTATE_SEND) |
| { |
| return execMotionSender(node); |
| } |
| else |
| { |
| elog(ERROR, "cannot execute inactive Motion"); |
| return NULL; |
| } |
| } |
| |
| static TupleTableSlot * |
| execMotionSender(MotionState *node) |
| { |
| /* SENDER LOGIC */ |
| TupleTableSlot *outerTupleSlot; |
| PlanState *outerNode; |
| Motion *motion = (Motion *) node->ps.plan; |
| bool done = false; |
| |
| #ifdef MEASURE_MOTION_TIME |
| struct timeval time1; |
| struct timeval time2; |
| |
| gettimeofday(&time1, NULL); |
| #endif |
| |
| AssertState(motion->motionType == MOTIONTYPE_GATHER || |
| motion->motionType == MOTIONTYPE_GATHER_SINGLE || |
| motion->motionType == MOTIONTYPE_HASH || |
| motion->motionType == MOTIONTYPE_BROADCAST || |
| motion->motionType == MOTIONTYPE_BROADCAST_WORKERS || |
| (motion->motionType == MOTIONTYPE_EXPLICIT && motion->segidColIdx > 0)); |
| Assert(node->ps.state->interconnect_context); |
| |
| while (!done) |
| { |
| /* grab TupleTableSlot from our child. */ |
| outerNode = outerPlanState(node); |
| outerTupleSlot = ExecProcNode(outerNode); |
| |
| #ifdef MEASURE_MOTION_TIME |
| gettimeofday(&time2, NULL); |
| |
| node->otherTime.tv_sec += time2.tv_sec - time1.tv_sec; |
| node->otherTime.tv_usec += time2.tv_usec - time1.tv_usec; |
| |
| while (node->otherTime.tv_usec < 0) |
| { |
| node->otherTime.tv_usec += 1000000; |
| node->otherTime.tv_sec--; |
| } |
| |
| while (node->otherTime.tv_usec >= 1000000) |
| { |
| node->otherTime.tv_usec -= 1000000; |
| node->otherTime.tv_sec++; |
| } |
| #endif |
| |
| if (done || TupIsNull(outerTupleSlot)) |
| { |
| doSendEndOfStream(motion, node); |
| done = true; |
| } |
| else if (motion->motionType == MOTIONTYPE_GATHER_SINGLE && |
| GpIdentity.segindex != (gp_session_id % node->numInputSegs)) |
| { |
| /* |
| * For explicit gather motion, receiver gets data from one |
| * segment only. The others execute the subplan normally, but |
| * throw away the resulting tuples. |
| */ |
| } |
| else |
| { |
| doSendTuple(motion, node, outerTupleSlot); |
| /* doSendTuple() may have set node->stopRequested as a side-effect */ |
| |
| if (node->stopRequested) |
| { |
| elog(gp_workfile_caching_loglevel, "Motion calling Squelch on child node"); |
| /* propagate stop notification to our children */ |
| ExecSquelchNode(outerNode, true); |
| done = true; |
| } |
| } |
| #ifdef MEASURE_MOTION_TIME |
| gettimeofday(&time1, NULL); |
| |
| node->motionTime.tv_sec += time1.tv_sec - time2.tv_sec; |
| node->motionTime.tv_usec += time1.tv_usec - time2.tv_usec; |
| |
| while (node->motionTime.tv_usec < 0) |
| { |
| node->motionTime.tv_usec += 1000000; |
| node->motionTime.tv_sec--; |
| } |
| |
| while (node->motionTime.tv_usec >= 1000000) |
| { |
| node->motionTime.tv_usec -= 1000000; |
| node->motionTime.tv_sec++; |
| } |
| #endif |
| } |
| |
| Assert(node->stopRequested || node->numTuplesFromChild == node->numTuplesToAMS); |
| |
| /* nothing else to send out, so we return NULL up the tree. */ |
| return NULL; |
| } |
| |
| |
| static TupleTableSlot * |
| execMotionUnsortedReceiver(MotionState *node) |
| { |
| /* RECEIVER LOGIC */ |
| TupleTableSlot *slot; |
| MinimalTuple tuple; |
| Motion *motion = (Motion *) node->ps.plan; |
| EState *estate = node->ps.state; |
| |
| AssertState(motion->motionType == MOTIONTYPE_GATHER || |
| motion->motionType == MOTIONTYPE_GATHER_SINGLE || |
| motion->motionType == MOTIONTYPE_HASH || |
| motion->motionType == MOTIONTYPE_BROADCAST || |
| motion->motionType == MOTIONTYPE_BROADCAST_WORKERS || |
| (motion->motionType == MOTIONTYPE_EXPLICIT && motion->segidColIdx > 0)); |
| |
| Assert(node->ps.state->motionlayer_context); |
| |
| if (node->stopRequested) |
| { |
| SendStopMessage(node->ps.state->motionlayer_context, |
| node->ps.state->interconnect_context, |
| motion->motionID); |
| return NULL; |
| } |
| |
| if (estate->interconnect_context == NULL) |
| { |
| if (!estate->es_interconnect_is_setup && estate->dispatcherState && |
| !estate->es_got_eos) |
| { |
| /* |
| * We could only possibly get here in the following scenario: |
| * 1. We are QD gracefully aborting a transaction. |
| * 2. We have torn down the interconnect of the current slice. |
| * 3. Since an error has happened, we no longer need to finish fetching |
| * all the tuples, hence squelching the executor subtree. |
| * 4. We are in the process of ExecSquelchShareInputScan(), and the |
| * Shared Scan has this Motion below it. |
| * |
| * NB: if you need to change this, see also execMotionSortedReceiver() |
| */ |
| ereport(NOTICE, |
| (errmsg("An ERROR must have happened. Stopping a Shared Scan."))); |
| return NULL; |
| } |
| else |
| ereport(ERROR, (errmsg("Interconnect is down unexpectedly."))); |
| } |
| |
| tuple = RecvTupleFrom(node->ps.state->motionlayer_context, |
| node->ps.state->interconnect_context, |
| motion->motionID, ANY_ROUTE); |
| |
| if (!tuple) |
| { |
| #ifdef CDB_MOTION_DEBUG |
| if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG) |
| elog(DEBUG4, "motionID=%d saw end of stream", motion->motionID); |
| #endif |
| Assert(node->numTuplesFromAMS == node->numTuplesToParent); |
| Assert(node->numTuplesFromChild == 0); |
| Assert(node->numTuplesToAMS == 0); |
| return NULL; |
| } |
| |
| node->numTuplesFromAMS++; |
| node->numTuplesToParent++; |
| |
| /* store it in our result slot and return this. */ |
| slot = node->ps.ps_ResultTupleSlot; |
| |
| slot = ExecStoreMinimalTuple(tuple, slot, true /* shouldFree */ ); |
| |
| #ifdef CDB_MOTION_DEBUG |
| if (node->numTuplesToParent <= 20) |
| { |
| StringInfoData buf; |
| |
| initStringInfo(&buf); |
| appendStringInfo(&buf, " motion%-3d rcv %5d.", |
| motion->motionID, |
| node->numTuplesToParent); |
| formatTuple(&buf, slot, node->outputFunArray); |
| elog(DEBUG3, "%s", buf.data); |
| pfree(buf.data); |
| } |
| #endif |
| |
| return slot; |
| } |
| |
| |
| |
| /* |
| * General background on Sorted Motion: |
| * ----------------------------------- |
| * NOTE: This function is only used for order-preserving motion. There are |
| * only 2 types of motion that order-preserving makes sense for: FIXED and |
| * BROADCAST (HASH does not make sense). so we have: |
| * |
| * CASE 1: broadcast order-preserving fixed motion. This should only be |
| * called for SENDERs. |
| * |
| * CASE 2: single-destination order-preserving fixed motion. The SENDER |
| * side will act like Unsorted motion and won't call this. So only |
| * the RECEIVER should be called for this case. |
| * |
| * |
| * Sorted Receive Notes: |
| * -------------------- |
| * |
| * The 1st time we execute, we need to pull a tuple from each of our source |
| * and store them in our tupleheap. Once that is done, we can pick the lowest |
| * (or whatever the criterion is) value from amongst all the sources. This |
| * works since each stream is sorted itself. |
| * |
| * We keep track of which one was selected, this will be slot we will need |
| * to fill during the next call. |
| * |
| * Subsequent calls to this function (after the 1st time) will start by |
| * trying to receive a tuple for the slot that was emptied the previous call. |
| * Then we again select the lowest value and return that tuple. |
| */ |
| |
| /* Sorted receiver using binary heap */ |
| static TupleTableSlot * |
| execMotionSortedReceiver(MotionState *node) |
| { |
| TupleTableSlot *slot; |
| binaryheap *hp = node->tupleheap; |
| MinimalTuple inputTuple; |
| Motion *motion = (Motion *) node->ps.plan; |
| EState *estate = node->ps.state; |
| |
| AssertState(motion->motionType == MOTIONTYPE_GATHER && |
| motion->sendSorted && |
| hp != NULL); |
| |
| /* Notify senders and return EOS if caller doesn't want any more data. */ |
| if (node->stopRequested) |
| { |
| |
| SendStopMessage(node->ps.state->motionlayer_context, |
| node->ps.state->interconnect_context, |
| motion->motionID); |
| return NULL; |
| } |
| |
| if (estate->interconnect_context == NULL) |
| { |
| if (!estate->es_interconnect_is_setup && estate->dispatcherState && |
| !estate->es_got_eos) |
| { |
| /* |
| * We could only possibly get here in the following scenario: |
| * 1. We are QD gracefully aborting a transaction. |
| * 2. We have torn down the interconnect of the current slice. |
| * 3. Since an error has happened, we no longer need to finish fetching |
| * all the tuples, hence squelching the executor subtree. |
| * 4. We are in the process of ExecSquelchShareInputScan(), and the |
| * Shared Scan has this Motion below it. |
| * |
| * NB: if you need to change this, see also execMotionUnsortedReceiver() |
| */ |
| ereport(NOTICE, |
| (errmsg("An ERROR must have happened. Stopping a Shared Scan."))); |
| return NULL; |
| } |
| else |
| ereport(ERROR, (errmsg("Interconnect is down unexpectedly."))); |
| } |
| |
| /* |
| * On first call, fill the priority queue with each sender's first tuple. |
| */ |
| if (!node->tupleheapReady) |
| { |
| hp = node->tupleheap; |
| motion = (Motion *) node->ps.plan; |
| int iSegIdx; |
| ListCell *lcProcess; |
| ExecSlice *sendSlice = &node->ps.state->es_sliceTable->slices[motion->motionID]; |
| |
| Assert(sendSlice->sliceIndex == motion->motionID); |
| |
| foreach_with_count(lcProcess, sendSlice->primaryProcesses, iSegIdx) |
| { |
| MemoryContext oldcxt; |
| |
| if (lfirst(lcProcess) == NULL) |
| continue; /* skip this one: we are not receiving from it */ |
| |
| inputTuple = RecvTupleFrom(node->ps.state->motionlayer_context, |
| node->ps.state->interconnect_context, |
| motion->motionID, iSegIdx); |
| |
| if (!inputTuple) |
| continue; /* skip this one: received nothing */ |
| |
| /* |
| * Make a slot to hold this tuple. We will reuse it to hold any |
| * future tuples from the same sender. We initialized the result |
| * tuple slot with the correct type earlier, so make the new slot |
| * have the same type. |
| */ |
| oldcxt = MemoryContextSwitchTo(estate->es_query_cxt); |
| node->slots[iSegIdx] = MakeTupleTableSlot(node->ps.ps_ResultTupleSlot->tts_tupleDescriptor, |
| &TTSOpsMinimalTuple); |
| MemoryContextSwitchTo(oldcxt); |
| |
| /* |
| * Store the tuple in the slot, and add it to the heap. |
| * |
| * Use slot_getsomeattrs() to materialize the columns we need for |
| * the comparisons in the tts_values/isnull arrays. The comparator |
| * can then peek directly into the arrays, which is cheaper than |
| * calling slot_getattr() all the time. |
| */ |
| ExecStoreMinimalTuple(inputTuple, node->slots[iSegIdx], true); |
| slot_getsomeattrs(node->slots[iSegIdx], node->lastSortColIdx); |
| binaryheap_add_unordered(hp, iSegIdx); |
| |
| node->numTuplesFromAMS++; |
| |
| #ifdef CDB_MOTION_DEBUG |
| if (node->numTuplesFromAMS <= 20) |
| { |
| StringInfoData buf; |
| |
| initStringInfo(&buf); |
| appendStringInfo(&buf, " motion%-3d rcv<-%-3d %5d.", |
| motion->motionID, |
| iSegIdx, |
| node->numTuplesFromAMS); |
| formatTuple(&buf, node->slots[iSegIdx], node->outputFunArray); |
| elog(DEBUG3, "%s", buf.data); |
| pfree(buf.data); |
| } |
| #endif |
| } |
| Assert(iSegIdx == node->numInputSegs); |
| |
| /* |
| * Done adding the elements, now arrange the heap to satisfy the heap |
| * property. This is quicker than inserting the initial elements one by |
| * one. |
| */ |
| binaryheap_build(hp); |
| |
| node->tupleheapReady = true; |
| } |
| |
| /* |
| * Delete from the priority queue the element that we fetched last time. |
| * Receive and insert the next tuple from that same sender. |
| */ |
| else |
| { |
| /* sanity check */ |
| if (binaryheap_empty(hp)) |
| elog(ERROR, "sorted Gather Motion called again after already receiving all data"); |
| |
| /* Old element is still at the head of the pq. */ |
| Assert(DatumGetInt32(binaryheap_first(hp)) == node->routeIdNext); |
| |
| /* Receive the successor of the tuple that we returned last time. */ |
| inputTuple = RecvTupleFrom(node->ps.state->motionlayer_context, |
| node->ps.state->interconnect_context, |
| motion->motionID, |
| node->routeIdNext); |
| |
| /* Substitute it in the pq for its predecessor. */ |
| if (inputTuple) |
| { |
| ExecStoreMinimalTuple(inputTuple, node->slots[node->routeIdNext], true); |
| slot_getsomeattrs(node->slots[node->routeIdNext], node->lastSortColIdx); |
| binaryheap_replace_first(hp, Int32GetDatum(node->routeIdNext)); |
| |
| node->numTuplesFromAMS++; |
| |
| #ifdef CDB_MOTION_DEBUG |
| if (node->numTuplesFromAMS <= 20) |
| { |
| StringInfoData buf; |
| |
| initStringInfo(&buf); |
| appendStringInfo(&buf, " motion%-3d rcv<-%-3d %5d.", |
| motion->motionID, |
| node->routeIdNext, |
| node->numTuplesFromAMS); |
| formatTuple(&buf, node->slots[node->routeIdNext], node->outputFunArray); |
| elog(DEBUG3, "%s", buf.data); |
| pfree(buf.data); |
| } |
| #endif |
| } |
| else |
| { |
| /* At EOS, drop this sender from the priority queue. */ |
| binaryheap_remove_first(hp); |
| } |
| } |
| |
| /* Finished if all senders have returned EOS. */ |
| if (binaryheap_empty(hp)) |
| { |
| Assert(node->numTuplesFromAMS == node->numTuplesToParent); |
| Assert(node->numTuplesFromChild == 0); |
| Assert(node->numTuplesToAMS == 0); |
| return NULL; |
| } |
| |
| /* |
| * Our next result tuple, with lowest key among all senders, is now at the |
| * head of the priority queue. Get it from there. |
| * |
| * We transfer ownership of the tuple from the pq element to our caller, |
| * but the pq element itself will remain in place until the next time we |
| * are called, to avoid an unnecessary rearrangement of the priority |
| * queue. |
| */ |
| node->routeIdNext = binaryheap_first(hp); |
| slot = node->slots[node->routeIdNext]; |
| |
| /* Update counters. */ |
| node->numTuplesToParent++; |
| |
| #ifdef CDB_MOTION_DEBUG |
| if (node->numTuplesToParent <= 20) |
| { |
| StringInfoData buf; |
| |
| initStringInfo(&buf); |
| appendStringInfo(&buf, " motion%-3d mrg<-%-3d %5d.", |
| motion->motionID, |
| node->routeIdNext, |
| node->numTuplesToParent); |
| formatTuple(&buf, slot, node->outputFunArray); |
| elog(DEBUG3, "%s", buf.data); |
| pfree(buf.data); |
| } |
| #endif |
| |
| /* Return result slot. */ |
| return slot; |
| } /* execMotionSortedReceiver */ |
| |
| /* ---------------------------------------------------------------- |
| * ExecInitMotion |
| * |
| * NOTE: have to be a bit careful, estate->es_cur_slice_idx is not the |
| * ultimate correct value that it should be on the QE. this happens |
| * after this call in mppexec.c. This is ok since we don't need it, |
| * but just be aware before you try and use it here. |
| * ---------------------------------------------------------------- |
| */ |
| |
| MotionState * |
| ExecInitMotion(Motion *node, EState *estate, int eflags) |
| { |
| MotionState *motionstate = NULL; |
| TupleDesc tupDesc; |
| ExecSlice *sendSlice; |
| ExecSlice *recvSlice; |
| SliceTable *sliceTable = estate->es_sliceTable; |
| PlanState *outerPlan; |
| int parentIndex; |
| |
| /* |
| * If GDD is enabled, the lock of table may downgrade to RowExclusiveLock, |
| * (see CdbTryOpenRelation function), then EPQ would be triggered, EPQ will |
| * execute the subplan in the executor, so it will create a new EState, |
| * but there are no slice tables in the new EState and we can not AssignGangs |
| * on the QE. In this case, we raise an error. |
| */ |
| if (estate->es_epq_active) |
| ereport(ERROR, |
| (errcode(ERRCODE_T_R_SERIALIZATION_FAILURE), |
| errmsg("EvalPlanQual can not handle subPlan with Motion node"))); |
| |
| Assert(node->motionID > 0); |
| AssertImply(node->senderSliceInfo && node->senderSliceInfo->parallel_workers <= 1, |
| node->motionID < sliceTable->numSlices); |
| AssertImply(node->motionType == MOTIONTYPE_HASH, node->numHashSegments > 0); |
| |
| parentIndex = estate->currentSliceId; |
| estate->currentSliceId = node->motionID; |
| |
| /* |
| * create state structure |
| */ |
| motionstate = makeNode(MotionState); |
| motionstate->ps.plan = (Plan *) node; |
| motionstate->ps.state = estate; |
| motionstate->ps.ExecProcNode = ExecMotion; |
| motionstate->mstype = MOTIONSTATE_NONE; |
| motionstate->stopRequested = false; |
| motionstate->hashExprs = NIL; |
| motionstate->cdbhash = NULL; |
| motionstate->cdbhashworkers = NULL; |
| |
| /* Look up the sending and receiving gang's slice table entries. */ |
| sendSlice = &sliceTable->slices[node->motionID]; |
| Assert(sendSlice->sliceIndex == node->motionID); |
| recvSlice = &sliceTable->slices[parentIndex]; |
| Assert(parentIndex == sendSlice->parentIndex); |
| |
| /* QD must fill in the global slice table. */ |
| if (Gp_role == GP_ROLE_DISPATCH) |
| { |
| MemoryContext oldcxt = MemoryContextSwitchTo(estate->es_query_cxt); |
| |
| if (node->motionType == MOTIONTYPE_GATHER || |
| node->motionType == MOTIONTYPE_GATHER_SINGLE) |
| { |
| /* Sending to a single receiving process on the entry db? */ |
| /* Is receiving slice a root slice that runs here in the qDisp? */ |
| if (recvSlice->sliceIndex == recvSlice->rootIndex) |
| { |
| motionstate->mstype = MOTIONSTATE_RECV; |
| /* For parallel retrieve cursor, the motion's gang type could be set as |
| * GANGTYPE_ENTRYDB_READER explicitly*/ |
| Assert(recvSlice->gangType == GANGTYPE_UNALLOCATED || |
| recvSlice->gangType == GANGTYPE_ENTRYDB_READER || |
| recvSlice->gangType == GANGTYPE_PRIMARY_WRITER || |
| recvSlice->gangType == GANGTYPE_PRIMARY_READER); |
| } |
| else |
| { |
| /* sanity checks */ |
| if (list_length(recvSlice->segments) != 1) |
| elog(ERROR, "unexpected gang size: %d", list_length(recvSlice->segments)); |
| } |
| } |
| |
| MemoryContextSwitchTo(oldcxt); |
| } |
| |
| /* QE must fill in map from motionID to MotionState node. */ |
| else |
| { |
| Assert(Gp_role == GP_ROLE_EXECUTE); |
| |
| if (LocallyExecutingSliceIndex(estate) == recvSlice->sliceIndex) |
| { |
| /* this is recv */ |
| motionstate->mstype = MOTIONSTATE_RECV; |
| } |
| else if (LocallyExecutingSliceIndex(estate) == sendSlice->sliceIndex) |
| { |
| /* this is send */ |
| motionstate->mstype = MOTIONSTATE_SEND; |
| } |
| /* TODO: If neither sending nor receiving, don't bother to initialize. */ |
| } |
| |
| motionstate->tupleheapReady = false; |
| motionstate->sentEndOfStream = false; |
| |
| motionstate->otherTime.tv_sec = 0; |
| motionstate->otherTime.tv_usec = 0; |
| motionstate->motionTime.tv_sec = 0; |
| motionstate->motionTime.tv_usec = 0; |
| |
| motionstate->numTuplesFromChild = 0; |
| motionstate->numTuplesToAMS = 0; |
| motionstate->numTuplesFromAMS = 0; |
| motionstate->numTuplesToParent = 0; |
| |
| motionstate->stopRequested = false; |
| motionstate->numInputSegs = list_length(sendSlice->segments); |
| |
| /* It should have been set to 1 in FillSliceGangInfo if parallel_workers == 0 */ |
| Assert(recvSlice->parallel_workers); |
| |
| motionstate->parallel_workers = recvSlice->parallel_workers; |
| |
| /* |
| * Miscellaneous initialization |
| * |
| * create expression context for node |
| */ |
| ExecAssignExprContext(estate, &motionstate->ps); |
| |
| /* |
| * Initializes child nodes. If alien elimination is on, we skip children |
| * of receiver motion. |
| */ |
| if (!estate->eliminateAliens || motionstate->mstype == MOTIONSTATE_SEND) |
| { |
| outerPlanState(motionstate) = ExecInitNode(outerPlan(node), estate, eflags); |
| } |
| |
| /* |
| * initialize tuple type. no need to initialize projection info because |
| * this node doesn't do projections. |
| */ |
| outerPlan = outerPlanState(motionstate); |
| |
| /* |
| * Initialize result type and slot |
| */ |
| ExecInitResultTupleSlotTL(&motionstate->ps, &TTSOpsMinimalTuple); |
| tupDesc = ExecGetResultType(&motionstate->ps); |
| |
| motionstate->ps.ps_ProjInfo = NULL; |
| /* numHashSegments is target locus.numsegments without parallel_workes */ |
| motionstate->numHashSegments = node->numHashSegments; |
| |
| /* Set up motion send data structures */ |
| if (motionstate->mstype == MOTIONSTATE_SEND && node->motionType == MOTIONTYPE_HASH) |
| { |
| int nkeys; |
| |
| Assert(node->numHashSegments > 0); |
| AssertImply(node->senderSliceInfo && node->senderSliceInfo->parallel_workers <= 1, |
| node->numHashSegments <= recvSlice->planNumSegments); |
| nkeys = list_length(node->hashExprs); |
| |
| if (nkeys > 0) |
| motionstate->hashExprs = ExecInitExprList(node->hashExprs, |
| (PlanState *) motionstate); |
| |
| /* |
| * Create hash API reference |
| */ |
| motionstate->cdbhash = makeCdbHash( |
| motionstate->numHashSegments, |
| nkeys, |
| node->hashFuncs); |
| |
| /* |
| * Create hash API reference for workers. |
| * During redistribute motion; we'll mod segments to decide to send to which |
| * segment and then mod workers to decide send to which segments. |
| */ |
| if (motionstate->parallel_workers >= 2) |
| { |
| motionstate->cdbhashworkers = makeCdbHash( |
| motionstate->numHashSegments * motionstate->parallel_workers, |
| nkeys, |
| node->hashFuncs); |
| } |
| } |
| |
| /* |
| * Merge Receive: Set up the key comparator and priority queue. |
| * |
| * This is very similar to a Merge Append. |
| */ |
| if (node->sendSorted && motionstate->mstype == MOTIONSTATE_RECV) |
| { |
| int numInputSegs = motionstate->numInputSegs; |
| int lastSortColIdx = 0; |
| |
| /* Allocate array to slots for the next tuple from each sender */ |
| motionstate->slots = palloc0(numInputSegs * sizeof(TupleTableSlot *)); |
| |
| /* Prepare SortSupport data for each column */ |
| motionstate->numSortCols = node->numSortCols; |
| motionstate->sortKeys = (SortSupport) palloc0(node->numSortCols * sizeof(SortSupportData)); |
| |
| for (int i = 0; i < node->numSortCols; i++) |
| { |
| SortSupport sortKey = &motionstate->sortKeys[i]; |
| |
| AssertArg(node->sortColIdx[i] != 0); |
| AssertArg(node->sortOperators[i] != 0); |
| |
| sortKey->ssup_cxt = CurrentMemoryContext; |
| sortKey->ssup_collation = node->collations[i]; |
| sortKey->ssup_nulls_first = node->nullsFirst[i]; |
| sortKey->ssup_attno = node->sortColIdx[i]; |
| |
| PrepareSortSupportFromOrderingOp(node->sortOperators[i], sortKey); |
| |
| /* Also make note of the last column used in the sort key */ |
| if (node->sortColIdx[i] > lastSortColIdx) |
| lastSortColIdx = node->sortColIdx[i]; |
| } |
| motionstate->lastSortColIdx = lastSortColIdx; |
| motionstate->tupleheap = |
| binaryheap_allocate(motionstate->numInputSegs, |
| CdbMergeComparator, |
| motionstate); |
| } |
| |
| /* |
| * Perform per-node initialization in the motion layer. |
| */ |
| UpdateMotionLayerNode(motionstate->ps.state->motionlayer_context, |
| node->motionID, |
| node->sendSorted, |
| tupDesc); |
| |
| |
| #ifdef CDB_MOTION_DEBUG |
| motionstate->outputFunArray = (Oid *) palloc(tupDesc->natts * sizeof(Oid)); |
| for (int i = 0; i < tupDesc->natts; i++) |
| { |
| bool typisvarlena; |
| |
| getTypeOutputInfo(tupDesc->attrs[i].atttypid, |
| &motionstate->outputFunArray[i], |
| &typisvarlena); |
| } |
| #endif |
| |
| estate->currentSliceId = parentIndex; |
| |
| return motionstate; |
| } |
| |
| /* ---------------------------------------------------------------- |
| * ExecEndMotion(node) |
| * ---------------------------------------------------------------- |
| */ |
| void |
| ExecEndMotion(MotionState *node) |
| { |
| Motion *motion = (Motion *) node->ps.plan; |
| #ifdef MEASURE_MOTION_TIME |
| double otherTimeSec; |
| double motionTimeSec; |
| #endif |
| |
| ExecFreeExprContext(&node->ps); |
| |
| /* |
| * clean out the tuple table |
| */ |
| ExecClearTuple(node->ps.ps_ResultTupleSlot); |
| |
| /* |
| * Set the slice no for the nodes under this motion. |
| */ |
| Assert(node->ps.state != NULL); |
| |
| /* |
| * shut down the subplan |
| */ |
| ExecEndNode(outerPlanState(node)); |
| |
| #ifdef MEASURE_MOTION_TIME |
| motionTimeSec = (double) node->motionTime.tv_sec + (double) node->motionTime.tv_usec / 1000000.0; |
| |
| if (node->mstype == MOTIONSTATE_RECV) |
| { |
| elog(DEBUG1, |
| "Motion Node %d (RECEIVER) Statistics:\n" |
| "Timing: \n" |
| "\t Time receiving the tuple: %f sec\n" |
| "Counters: \n" |
| "\tnumTuplesFromChild: %d\n" |
| "\tnumTuplesFromAMS: %d\n" |
| "\tnumTuplesToAMS: %d\n" |
| "\tnumTuplesToParent: %d\n", |
| motNodeID, |
| motionTimeSec, |
| node->numTuplesFromChild, |
| node->numTuplesFromAMS, |
| node->numTuplesToAMS, |
| node->numTuplesToParent |
| ); |
| } |
| else if (node->mstype == MOTIONSTATE_SEND) |
| { |
| otherTimeSec = (double) node->otherTime.tv_sec + (double) node->otherTime.tv_usec / 1000000.0; |
| elog(DEBUG1, |
| "Motion Node %d (SENDER) Statistics:\n" |
| "Timing: \n" |
| "\t Time getting next tuple to send: %f sec \n" |
| "\t Time sending the tuple: %f sec\n" |
| "\t Percentage of time sending: %2.2f%% \n" |
| "Counters: \n" |
| "\tnumTuplesFromChild: %d\n" |
| "\tnumTuplesToAMS: %d\n", |
| motNodeID, |
| otherTimeSec, |
| motionTimeSec, |
| (double) (motionTimeSec / (otherTimeSec + motionTimeSec)) * 100, |
| node->numTuplesFromChild, |
| node->numTuplesToAMS |
| ); |
| } |
| #endif /* MEASURE_MOTION_TIME */ |
| |
| /* Merge Receive: Free the priority queue and associated structures. */ |
| if (node->tupleheap != NULL) |
| { |
| binaryheap_free(node->tupleheap); |
| node->tupleheap = NULL; |
| } |
| |
| /* Free the slices and routes */ |
| if (node->cdbhash != NULL) |
| { |
| pfree(node->cdbhash); |
| node->cdbhash = NULL; |
| } |
| if (node->cdbhashworkers != NULL) |
| { |
| pfree(node->cdbhashworkers); |
| node->cdbhashworkers = NULL; |
| } |
| |
| /* |
| * Free up this motion node's resources in the Motion Layer. |
| * |
| * TODO: For now, we don't flush the comm-layer. NO ERRORS DURING AMS!!! |
| */ |
| EndMotionLayerNode(node->ps.state->motionlayer_context, motion->motionID, |
| /* flush-comm-layer */ false); |
| |
| #ifdef CDB_MOTION_DEBUG |
| if (node->outputFunArray) |
| pfree(node->outputFunArray); |
| #endif |
| } |
| |
| |
| |
| /*========================================================================= |
| * HELPER FUNCTIONS |
| */ |
| |
| /* |
| * CdbMergeComparator: |
| * Used to compare tuples for a sorted motion node. |
| */ |
| static int |
| CdbMergeComparator(Datum lhs, Datum rhs, void *context) |
| { |
| MotionState *node = (MotionState *) context; |
| int lSegIdx = DatumGetInt32(lhs); |
| int rSegIdx = DatumGetInt32(rhs); |
| |
| TupleTableSlot *lslot = node->slots[lSegIdx]; |
| TupleTableSlot *rslot = node->slots[rSegIdx]; |
| SortSupport sortKeys = node->sortKeys; |
| int nkey; |
| int compare; |
| |
| Assert(lslot && rslot); |
| |
| for (nkey = 0; nkey < node->numSortCols; nkey++) |
| { |
| SortSupport ssup = &sortKeys[nkey]; |
| AttrNumber attno = ssup->ssup_attno; |
| Datum datum1, |
| datum2; |
| bool isnull1, |
| isnull2; |
| |
| /* |
| * The caller has called slot_getsomeattrs() to ensure |
| * that all the columns we need are available directly in |
| * the values/isnull arrays. |
| */ |
| datum1 = lslot->tts_values[attno - 1]; |
| isnull1 = lslot->tts_isnull[attno - 1]; |
| datum2 = rslot->tts_values[attno - 1]; |
| isnull2 = rslot->tts_isnull[attno - 1]; |
| |
| compare = ApplySortComparator(datum1, isnull1, |
| datum2, isnull2, |
| ssup); |
| if (compare != 0) |
| { |
| INVERT_COMPARE_RESULT(compare); |
| return compare; |
| } |
| } |
| return 0; |
| } /* CdbMergeComparator */ |
| |
| /* |
| * Experimental code that will be replaced later with new hashing mechanism |
| */ |
| uint32 |
| evalHashKey(ExprContext *econtext, List *hashkeys, CdbHash * h) |
| { |
| ListCell *hk; |
| MemoryContext oldContext; |
| unsigned int target_seg; |
| |
| ResetExprContext(econtext); |
| |
| oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory); |
| |
| /* |
| * If we have 1 or more distribution keys for this relation, hash them. |
| * However, If this happens to be a relation with an empty policy |
| * (partitioning policy with a NULL distribution key list) then we have no |
| * hash key value to feed in, so use cdbhashrandomseg() to pick a segment |
| * at random. |
| */ |
| if (list_length(hashkeys) > 0) |
| { |
| int i; |
| |
| cdbhashinit(h); |
| |
| i = 0; |
| foreach(hk, hashkeys) |
| { |
| ExprState *keyexpr = (ExprState *) lfirst(hk); |
| Datum keyval; |
| bool isNull; |
| |
| /* |
| * Get the attribute value of the tuple |
| */ |
| keyval = ExecEvalExpr(keyexpr, econtext, &isNull); |
| |
| /* |
| * Compute the hash function |
| */ |
| cdbhash(h, i + 1, keyval, isNull); |
| i++; |
| } |
| target_seg = cdbhashreduce(h); |
| } |
| else |
| { |
| target_seg = cdbhashrandomseg(h->numsegs); |
| } |
| |
| MemoryContextSwitchTo(oldContext); |
| |
| return target_seg; |
| } |
| |
| |
| void |
| doSendEndOfStream(Motion *motion, MotionState *node) |
| { |
| /* |
| * We have no more child tuples, but we have not successfully sent an |
| * End-of-Stream token yet. |
| */ |
| SendEndOfStream(node->ps.state->motionlayer_context, |
| node->ps.state->interconnect_context, |
| motion->motionID); |
| node->sentEndOfStream = true; |
| } |
| |
| /* |
| * A crufty confusing part of the current code is how contentId is used within |
| * the motion structures and then how that gets translated to targetRoutes by |
| * this motion nodes. |
| * |
| * WARNING: There are ALOT of assumptions in here about how the motion node |
| * instructions are encoded into motion and stuff. |
| * |
| * There are 3 types of sending that can happen here: |
| * |
| * FIXED - sending to a single process. the value in node->fixedSegIdxMask[0] |
| * is the contentId of who to send to. But we can actually ignore that |
| * since now with slice tables, we should only have a single CdbProcess |
| * that we could send to for this motion node. |
| * |
| * |
| * BROADCAST - actually a subcase of FIXED, but handling is simple. send to all |
| * of our routes. |
| * |
| * HASH - maps hash values to segid. this mapping is 1->1 such that a hash |
| * value of 2 maps to contentid of 2 (for now). Since we can't ever |
| * use Hash to send to the QD, the QD's contentid of -1 is not an issue. |
| * Also, the contentid maps directly to the routeid. |
| * |
| */ |
| void |
| doSendTuple(Motion *motion, MotionState *node, TupleTableSlot *outerTupleSlot) |
| { |
| int16 targetRoute = 0; |
| SendReturnCode sendRC = STOP_SENDING; |
| ExprContext *econtext = node->ps.ps_ExprContext; |
| int parallel_workers; |
| int i; |
| |
| int parentIndex = node->ps.state->currentSliceId; |
| ExecSlice *recvSlice = &node->ps.state->es_sliceTable->slices[parentIndex]; |
| parallel_workers = recvSlice->parallel_workers; |
| Assert(parallel_workers != 0); |
| |
| /* We got a tuple from the child-plan. */ |
| node->numTuplesFromChild++; |
| |
| if (motion->motionType == MOTIONTYPE_GATHER || |
| motion->motionType == MOTIONTYPE_GATHER_SINGLE) |
| { |
| /* |
| * Actually, since we can only send to a single output segment |
| * here, we are guaranteed that we only have a single targetRoute |
| * setup that we could possibly send to. So we can cheat and just |
| * fix the targetRoute to 0 (the 1st route). |
| */ |
| targetRoute = 0; |
| |
| } |
| else if (motion->motionType == MOTIONTYPE_BROADCAST) |
| { |
| targetRoute = BROADCAST_SEGIDX; |
| } |
| else if (motion->motionType == MOTIONTYPE_BROADCAST_WORKERS) |
| { |
| int numSegments = recvSlice->planNumSegments; |
| |
| Assert(numSegments != 0); |
| Assert(numSegments % parallel_workers == 0); |
| |
| for (i = 0; i < numSegments / parallel_workers; i++) |
| { |
| targetRoute = i * parallel_workers + random() % parallel_workers; |
| |
| CheckAndSendRecordCache(node->ps.state->motionlayer_context, |
| node->ps.state->interconnect_context, |
| motion->motionID, |
| targetRoute); |
| sendRC = SendTuple(node->ps.state->motionlayer_context, |
| node->ps.state->interconnect_context, |
| motion->motionID, |
| outerTupleSlot, |
| targetRoute); |
| |
| Assert(sendRC == SEND_COMPLETE || sendRC == STOP_SENDING); |
| |
| if (sendRC == STOP_SENDING) |
| break; |
| |
| #ifdef CDB_MOTION_DEBUG |
| if (sendRC == SEND_COMPLETE && node->numTuplesToAMS <= 20) |
| { |
| StringInfoData buf; |
| |
| initStringInfo(&buf); |
| appendStringInfo(&buf, " motion%-3d snd->%-3d, %5d.", |
| motion->motionID, |
| targetRoute, |
| node->numTuplesToAMS); |
| formatTuple(&buf, outerTupleSlot, node->outputFunArray); |
| elog(DEBUG3, "%s", buf.data); |
| pfree(buf.data); |
| } |
| #endif |
| } |
| |
| Assert(sendRC == SEND_COMPLETE || sendRC == STOP_SENDING); |
| |
| if (sendRC == SEND_COMPLETE) |
| node->numTuplesToAMS++; |
| else |
| node->stopRequested = true; |
| |
| return; |
| } |
| else if (motion->motionType == MOTIONTYPE_HASH) /* Redistribute */ |
| { |
| uint32 segIdx = 0; |
| uint32 workerIdx = 0; |
| |
| econtext->ecxt_outertuple = outerTupleSlot; |
| segIdx = evalHashKey(econtext, node->hashExprs, node->cdbhash); |
| |
| if (parallel_workers >= 2) |
| { |
| workerIdx = evalHashKey(econtext, node->hashExprs, node->cdbhashworkers) / node->numHashSegments; |
| } |
| |
| #ifdef USE_ASSERT_CHECKING |
| Assert(segIdx < node->numHashSegments && |
| "redistribute destination outside segment array"); |
| #endif /* USE_ASSERT_CHECKING */ |
| |
| if (parallel_workers >= 2) |
| targetRoute = segIdx * parallel_workers + workerIdx; |
| else |
| targetRoute = segIdx; |
| |
| /* |
| * see MPP-2099, let's not run into this one again! NOTE: the |
| * definition of BROADCAST_SEGIDX is key here, it *cannot* be a valid |
| * route which our map (above) will *ever* return. |
| * |
| * Note the "mapping" is generated at *planning* time in |
| * makeDefaultSegIdxArray() in cdbmutate.c (it is the trivial map, and |
| * is passed around our system a fair amount!). |
| */ |
| Assert(targetRoute != BROADCAST_SEGIDX); |
| } |
| else if (motion->motionType == MOTIONTYPE_EXPLICIT) |
| { |
| Datum segidColIdxDatum; |
| |
| Assert(motion->segidColIdx > 0 && motion->segidColIdx <= list_length((motion->plan).targetlist)); |
| bool is_null = false; |
| |
| segidColIdxDatum = slot_getattr(outerTupleSlot, motion->segidColIdx, &is_null); |
| targetRoute = Int32GetDatum(segidColIdxDatum); |
| Assert(!is_null); |
| } |
| else |
| elog(ERROR, "unknown motion type %d", motion->motionType); |
| |
| CheckAndSendRecordCache(node->ps.state->motionlayer_context, |
| node->ps.state->interconnect_context, |
| motion->motionID, |
| targetRoute); |
| |
| /* send the tuple out. */ |
| sendRC = SendTuple(node->ps.state->motionlayer_context, |
| node->ps.state->interconnect_context, |
| motion->motionID, |
| outerTupleSlot, |
| targetRoute); |
| |
| Assert(sendRC == SEND_COMPLETE || sendRC == STOP_SENDING); |
| if (sendRC == SEND_COMPLETE) |
| node->numTuplesToAMS++; |
| else |
| node->stopRequested = true; |
| |
| #ifdef CDB_MOTION_DEBUG |
| if (sendRC == SEND_COMPLETE && node->numTuplesToAMS <= 20) |
| { |
| StringInfoData buf; |
| |
| initStringInfo(&buf); |
| appendStringInfo(&buf, " motion%-3d snd->%-3d, %5d.", |
| motion->motionID, |
| targetRoute, |
| node->numTuplesToAMS); |
| formatTuple(&buf, outerTupleSlot, node->outputFunArray); |
| elog(DEBUG3, "%s", buf.data); |
| pfree(buf.data); |
| } |
| #endif |
| } |
| |
| |
| /* |
| * ExecReScanMotion |
| * |
| * Motion nodes do not allow rescan after a tuple has been fetched. |
| * |
| * When the planner knows that a NestLoop cannot have more than one outer |
| * tuple, it can omit the usual Materialize operator atop the inner subplan, |
| * which can lead to invocation of ExecReScanMotion before the motion node's |
| * first tuple is fetched. Rescan can be implemented as a no-op in this case. |
| * (After ExecNestLoop fetches an outer tuple, it invokes rescan on the inner |
| * subplan before fetching the first inner tuple. That doesn't bother us, |
| * provided there is only one outer tuple.) |
| */ |
| void |
| ExecReScanMotion(MotionState *node) |
| { |
| if (node->mstype != MOTIONSTATE_RECV || |
| node->numTuplesToParent != 0) |
| { |
| ereport(ERROR, |
| (errcode(ERRCODE_INTERNAL_ERROR), |
| errmsg("illegal rescan of motion node: invalid plan"), |
| errhint("Likely caused by bad NL-join, try setting enable_nestloop to off"))); |
| } |
| return; |
| } |
| |
| |
| /* |
| * Mark this node as "stopped." When ExecProcNode() is called on a |
| * stopped motion node it should behave as if there are no tuples |
| * available. |
| * |
| * ExecProcNode() on a stopped motion node should also notify the |
| * "other end" of the motion node of the stoppage. |
| * |
| * Note: once this is called, it is possible that the motion node will |
| * never be called again, so we *must* send the stop message now. |
| */ |
| void |
| ExecSquelchMotion(MotionState *node, bool force) |
| { |
| Motion *motion; |
| |
| AssertArg(node != NULL); |
| if (node->ps.squelched) |
| return; |
| motion = (Motion *) node->ps.plan; |
| node->stopRequested = true; |
| node->ps.state->active_recv_id = -1; |
| |
| /* pass down */ |
| SendStopMessage(node->ps.state->motionlayer_context, |
| node->ps.state->interconnect_context, |
| motion->motionID); |
| node->ps.squelched = true; |
| } |