blob: bb525d04d5560d974d95b6153d1e08676823af9a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/*-------------------------------------------------------------------------
*
* nodeMotion.c
* Routines to handle moving tuples around in Greenplum Database.
*
*-------------------------------------------------------------------------
*/
#include "postgres.h"
#include "access/heapam.h"
#include "nodes/execnodes.h" /* Slice, SliceTable */
#include "cdb/cdbheap.h"
#include "cdb/cdblink.h"
#include "cdb/cdbmotion.h"
#include "cdb/cdbvars.h"
#include "cdb/cdbhash.h"
#include "executor/executor.h"
#include "executor/execdebug.h"
#include "executor/nodeMotion.h"
#include "optimizer/clauses.h"
#include "parser/parse_oper.h"
#include "parser/parsetree.h"
#include "utils/lsyscache.h"
#include "utils/tuplesort.h"
#include "utils/tuplesort_mk.h"
#include "miscadmin.h"
#include "nodes/makefuncs.h"
#include "utils/memutils.h"
#include "utils/debugbreak.h"
/* #define MEASURE_MOTION_TIME */
#ifdef MEASURE_MOTION_TIME
#include <unistd.h> /* gettimeofday */
#endif
/* #define CDB_MOTION_DEBUG */
#ifdef CDB_MOTION_DEBUG
#include "lib/stringinfo.h" /* StringInfo */
#endif
/*
* CdbTupleHeapInfo
*
* A priority queue element holding the next tuple of the
* sorted tuple stream received from a particular sender.
* Used by sorted receiver (Merge Receive).
*/
typedef struct CdbTupleHeapInfo
{
/* Next tuple from this sender */
HeapTuple tuple;
/* Which sender did this tuple come from? */
int sourceRouteId;
} CdbTupleHeapInfo;
/*
* CdbMergeComparatorContext
*
* This contains the information necessary to compare
* two tuples (other than the tuples themselves).
* It includes :
* 1) the number and array of indexes into the tuples columns
* that are the basis for the ordering
* (numSortCols, sortColIdx)
* 2) the kind and FmgrInfo of the compare function
* for each column being ordered
* (sortFnKinds, sortFunctions)
* 3) the tuple desc
* (tupDesc)
* Used by sorted receiver (Merge Receive). It is passed as the
* context argument to the key comparator.
*/
typedef struct CdbMergeComparatorContext
{
FmgrInfo *sortFunctions;
SortFunctionKind *sortFnKinds;
int numSortCols;
AttrNumber *sortColIdx;
TupleDesc tupDesc;
MemTupleBinding *mt_bind;
} CdbMergeComparatorContext;
static CdbMergeComparatorContext *
CdbMergeComparator_CreateContext(TupleDesc tupDesc,
int numSortCols,
AttrNumber *sortColIdx,
Oid *sortOperators);
static void
CdbMergeComparator_DestroyContext(CdbMergeComparatorContext *ctx);
/*=========================================================================
* FUNCTIONS PROTOTYPES
*/
static TupleTableSlot *execMotionSender(MotionState * node);
static TupleTableSlot *execMotionUnsortedReceiver(MotionState * node);
static TupleTableSlot *execMotionSortedReceiver(MotionState * node);
static TupleTableSlot *execMotionSortedReceiver_mk(MotionState * node);
static void execMotionSortedReceiverFirstTime(MotionState * node);
static int
CdbMergeComparator(void *lhs, void *rhs, void *context);
static uint32 evalHashKey(ExprContext *econtext, List *hashkeys, List *hashtypes, CdbHash * h);
static void doSendTuple(Motion * motion, MotionState * node, TupleTableSlot *outerTupleSlot);
/*=========================================================================
*/
#ifdef CDB_MOTION_DEBUG
static void
formatTuple(StringInfo buf, HeapTuple tup, TupleDesc tupdesc, Oid *outputFunArray)
{
int i;
for (i = 0; i < tupdesc->natts; i++)
{
bool isnull;
Datum d = heap_getattr(tup, i+1, tupdesc, &isnull);
if (d && !isnull)
{
Datum ds = OidFunctionCall1(outputFunArray[i], d);
char *s = DatumGetCString(ds);
char *name = NameStr(tupdesc->attrs[i]->attname);
if (name && *name)
appendStringInfo(buf, " %s=\"%.30s\"", name, s);
else
appendStringInfo(buf, " \"%.30s\"", s);
pfree(s);
}
}
appendStringInfoChar(buf, '\n');
}
#endif
/**
* Is it a hash distribution motion ?
*/
bool isMotionRedistribute(const Motion *m)
{
return (m->motionType == MOTIONTYPE_HASH);
}
/**
* Is it a gather motion?
*/
bool isMotionGather(const Motion *m)
{
return (m->motionType == MOTIONTYPE_FIXED
&& m->numOutputSegs == 1);
}
/**
* Is it a gather motion to master?
*/
bool isMotionGatherToMaster(const Motion *m)
{
return (m->motionType == MOTIONTYPE_FIXED
&& m->numOutputSegs == 1
&& m->outputSegIdx[0] == -1);
}
/**
* Is it a gather motion to segment?
*/
bool isMotionGatherToSegment(const Motion *m)
{
return (m->motionType == MOTIONTYPE_FIXED
&& m->numOutputSegs == 1
&& m->outputSegIdx[0] > 0);
}
/**
* Is it a redistribute from master?
*/
bool isMotionRedistributeFromMaster(const Motion *m)
{
return true;
}
/*
* Set the statistic info in gpmon packet.
*/
void
setMotionStatsForGpmon(MotionState *node)
{
MotionLayerState *mlStates = (MotionLayerState *)node->ps.state->motionlayer_context;
ChunkTransportState *transportStates = node->ps.state->interconnect_context;
int motionId = ((Motion *) node->ps.plan)->motionID;
MotionNodeEntry *mlEntry = getMotionNodeEntry(mlStates, motionId, "setMotionStatsForGpmon");
ChunkTransportStateEntry *transportEntry = NULL;
getChunkTransportState(transportStates, motionId, &transportEntry);
uint64 avgAckTime = 0;
if (transportEntry->stat_count_acks > 0)
avgAckTime = transportEntry->stat_total_ack_time / transportEntry->stat_count_acks;
Gpmon_M_Set(GpmonPktFromMotionState(node), GPMON_MOTION_BYTES_SENT,
mlEntry->stat_total_bytes_sent);
Gpmon_M_Set(GpmonPktFromMotionState(node), GPMON_MOTION_TOTAL_ACK_TIME,
transportEntry->stat_total_ack_time);
Gpmon_M_Set(GpmonPktFromMotionState(node), GPMON_MOTION_AVG_ACK_TIME,
avgAckTime);
Gpmon_M_Set(GpmonPktFromMotionState(node), GPMON_MOTION_MAX_ACK_TIME,
transportEntry->stat_max_ack_time);
Gpmon_M_Set(GpmonPktFromMotionState(node), GPMON_MOTION_MIN_ACK_TIME,
transportEntry->stat_min_ack_time);
Gpmon_M_Set(GpmonPktFromMotionState(node), GPMON_MOTION_COUNT_RESENT,
transportEntry->stat_count_resent);
Gpmon_M_Set(GpmonPktFromMotionState(node), GPMON_MOTION_MAX_RESENT,
transportEntry->stat_max_resent);
Gpmon_M_Set(GpmonPktFromMotionState(node), GPMON_MOTION_BYTES_RECEIVED,
mlEntry->stat_total_bytes_recvd);
Gpmon_M_Set(GpmonPktFromMotionState(node), GPMON_MOTION_COUNT_DROPPED,
transportEntry->stat_count_dropped);
}
/* ----------------------------------------------------------------
* ExecMotion
* ----------------------------------------------------------------
*/
TupleTableSlot *
ExecMotion(MotionState * node)
{
Motion *motion = (Motion *) node->ps.plan;
/*
* at the top here we basically decide: -- SENDER vs. RECEIVER and --
* SORTED vs. UNSORTED
*/
if (node->mstype == MOTIONSTATE_RECV)
{
TupleTableSlot *tuple;
#ifdef MEASURE_MOTION_TIME
struct timeval startTime;
struct timeval stopTime;
gettimeofday(&startTime, NULL);
#endif
if (node->ps.state->active_recv_id >= 0)
{
if (node->ps.state->active_recv_id != motion->motionID)
{
elog(LOG, "DEADLOCK HAZARD: Updating active_motion_id from %d to %d",
node->ps.state->active_recv_id, motion->motionID);
node->ps.state->active_recv_id = motion->motionID;
}
} else
node->ps.state->active_recv_id = motion->motionID;
/* Running in diagnostic mode ? */
if (Gp_interconnect_type == INTERCONNECT_TYPE_NIL)
{
node->ps.state->active_recv_id = -1;
return NULL;
}
if (motion->sendSorted)
{
if (gp_enable_motion_mk_sort)
tuple = execMotionSortedReceiver_mk(node);
else
tuple = execMotionSortedReceiver(node);
}
else
tuple = execMotionUnsortedReceiver(node);
if (tuple == NULL)
node->ps.state->active_recv_id = -1;
else
{
Gpmon_M_Incr(GpmonPktFromMotionState(node), GPMON_QEXEC_M_ROWSIN);
Gpmon_M_Incr_Rows_Out(GpmonPktFromMotionState(node));
setMotionStatsForGpmon(node);
}
#ifdef MEASURE_MOTION_TIME
gettimeofday(&stopTime, NULL);
node->motionTime.tv_sec += stopTime.tv_sec - startTime.tv_sec;
node->motionTime.tv_usec += stopTime.tv_usec - startTime.tv_usec;
while (node->motionTime.tv_usec < 0)
{
node->motionTime.tv_usec += 1000000;
node->motionTime.tv_sec--;
}
while (node->motionTime.tv_usec >= 1000000)
{
node->motionTime.tv_usec -= 1000000;
node->motionTime.tv_sec++;
}
#endif
CheckSendPlanStateGpmonPkt(&node->ps);
return tuple;
}
else if(node->mstype == MOTIONSTATE_SEND)
{
return execMotionSender(node);
}
Assert(!"Non-active motion is executed");
return NULL;
}
static TupleTableSlot *
execMotionSender(MotionState * node)
{
/* SENDER LOGIC */
TupleTableSlot *outerTupleSlot;
PlanState *outerNode;
Motion *motion = (Motion *) node->ps.plan;
bool done = false;
#ifdef MEASURE_MOTION_TIME
struct timeval time1;
struct timeval time2;
gettimeofday(&time1, NULL);
#endif
AssertState(motion->motionType == MOTIONTYPE_HASH ||
(motion->motionType == MOTIONTYPE_EXPLICIT && motion->segidColIdx > 0) ||
(motion->motionType == MOTIONTYPE_FIXED && motion->numOutputSegs <= 1));
Assert(node->ps.state->interconnect_context);
while (!done)
{
/* grab TupleTableSlot from our child. */
outerNode = outerPlanState(node);
outerTupleSlot = ExecProcNode(outerNode);
#ifdef MEASURE_MOTION_TIME
gettimeofday(&time2, NULL);
node->otherTime.tv_sec += time2.tv_sec - time1.tv_sec;
node->otherTime.tv_usec += time2.tv_usec - time1.tv_usec;
while (node->otherTime.tv_usec < 0)
{
node->otherTime.tv_usec += 1000000;
node->otherTime.tv_sec--;
}
while (node->otherTime.tv_usec >= 1000000)
{
node->otherTime.tv_usec -= 1000000;
node->otherTime.tv_sec++;
}
#endif
/* Running in diagnostic mode, we just drop all tuples. */
if (Gp_interconnect_type == INTERCONNECT_TYPE_NIL)
{
if (!TupIsNull(outerTupleSlot))
continue;
return NULL;
}
if (done || TupIsNull(outerTupleSlot))
{
doSendEndOfStream(motion, node);
done = true;
}
else
{
doSendTuple(motion, node, outerTupleSlot);
/* doSendTuple() may have set node->stopRequested as a side-effect */
Gpmon_M_Incr_Rows_Out(GpmonPktFromMotionState(node));
setMotionStatsForGpmon(node);
CheckSendPlanStateGpmonPkt(&node->ps);
if (node->stopRequested)
{
elog(gp_workfile_caching_loglevel, "Motion initiating Squelch walker");
/* propagate stop notification to our children */
ExecSquelchNode(outerNode);
done = true;
}
}
#ifdef MEASURE_MOTION_TIME
gettimeofday(&time1, NULL);
node->motionTime.tv_sec += time1.tv_sec - time2.tv_sec;
node->motionTime.tv_usec += time1.tv_usec - time2.tv_usec;
while (node->motionTime.tv_usec < 0)
{
node->motionTime.tv_usec += 1000000;
node->motionTime.tv_sec--;
}
while (node->motionTime.tv_usec >= 1000000)
{
node->motionTime.tv_usec -= 1000000;
node->motionTime.tv_sec++;
}
#endif
}
Assert(node->stopRequested || node->numTuplesFromChild == node->numTuplesToAMS);
/* nothing else to send out, so we return NULL up the tree. */
return NULL;
}
static TupleTableSlot *
execMotionUnsortedReceiver(MotionState * node)
{
/* RECEIVER LOGIC */
TupleTableSlot *slot;
HeapTuple tuple;
Motion *motion = (Motion *) node->ps.plan;
ReceiveReturnCode recvRC;
AssertState(motion->motionType == MOTIONTYPE_HASH ||
(motion->motionType == MOTIONTYPE_EXPLICIT && motion->segidColIdx > 0) ||
(motion->motionType == MOTIONTYPE_FIXED && motion->numOutputSegs <= 1));
Assert(node->ps.state->motionlayer_context);
Assert(node->ps.state->interconnect_context);
if (node->stopRequested)
{
SendStopMessage(node->ps.state->motionlayer_context,
node->ps.state->interconnect_context,
motion->motionID);
return NULL;
}
recvRC = RecvTupleFrom(node->ps.state->motionlayer_context,
node->ps.state->interconnect_context,
motion->motionID, &tuple, ANY_ROUTE);
if (recvRC == END_OF_STREAM)
{
#ifdef CDB_MOTION_DEBUG
if (gp_log_interconnect >= GPVARS_VERBOSITY_DEBUG)
elog(DEBUG4, "motionID=%d saw end of stream", motion->motionID);
#endif
Assert(node->numTuplesFromAMS == node->numTuplesToParent);
Assert(node->numTuplesFromChild == 0);
Assert(node->numTuplesToAMS == 0);
return NULL;
}
node->numTuplesFromAMS++;
node->numTuplesToParent++;
/* store it in our result slot and return this. */
slot = node->ps.ps_ResultTupleSlot;
slot = ExecStoreGenericTuple(tuple, slot, true /* shouldFree */);
#ifdef CDB_MOTION_DEBUG
if (node->numTuplesToParent <= 20)
{
StringInfoData buf;
initStringInfo(&buf);
appendStringInfo(&buf, " motion%-3d rcv %5d.",
motion->motionID,
node->numTuplesToParent);
formatTuple(&buf, tuple, ExecGetResultType(&node->ps),
node->outputFunArray);
elog(DEBUG3, buf.data);
pfree(buf.data);
}
#endif
return slot;
}
/*
* General background on Sorted Motion:
* -----------------------------------
* NOTE: This function is only used for order-preserving motion. There are
* only 2 types of motion that order-preserving makes sense for: FIXED and
* BROADCAST (HASH does not make sense). so we have:
*
* CASE 1: broadcast order-preserving fixed motion. This should only be
* called for SENDERs.
*
* CASE 2: single-destination order-preserving fixed motion. The SENDER
* side will act like Unsorted motion and won't call this. So only
* the RECEIVER should be called for this case.
*
*
* Sorted Receive Notes:
* --------------------
*
* The 1st time we execute, we need to pull a tuple from each of our source
* and store them in our tupleheap, this is what execMotionSortedFirstTime()
* does. Once that is done, we can pick the lowest (or whatever the
* criterion is) value from amongst all the sources. This works since each
* stream is sorted itself.
*
* We keep track of which one was selected, this will be slot we will need
* to fill during the next call.
*
* Subsuquent calls to this function (after the 1st time) will start by
* trying to receive a tuple for the slot that was emptied the previous call.
* Then we again select the lowest value and return that tuple.
*
*/
/* Sorted receiver using mk heap */
typedef struct MotionMKHeapReaderContext
{
MotionState *node;
int srcRoute;
} MotionMKHeapReaderContext;
typedef struct MotionMKHeapContext
{
MKHeapReader *readers; /* Readers, one per sender */
MKHeap *heap; /* The mkheap */
MKContext mkctxt; /* compare context */
} MotionMKHeapContext;
static bool motion_mkhp_read(void *vpctxt, MKEntry *a)
{
MotionMKHeapReaderContext *ctxt = (MotionMKHeapReaderContext *) vpctxt;
MotionState *node = ctxt->node;
HeapTuple inputTuple = NULL;
Motion *motion = (Motion *) node->ps.plan;
ReceiveReturnCode recvRC;
if ( ctxt->srcRoute < 0 )
{
/* routes have not been set yet so set them */
ListCell *lcProcess;
int routeIndex, readerIndex;
MotionMKHeapContext *ctxt = (MotionMKHeapContext *) node->tupleheap;
Slice *sendSlice = (Slice *)list_nth(node->ps.state->es_sliceTable->slices, motion->motionID);
Assert(sendSlice->sliceIndex == motion->motionID);
readerIndex = 0;
foreach_with_count(lcProcess, sendSlice->primaryProcesses, routeIndex)
{
if ( lfirst(lcProcess) != NULL)
{
MotionMKHeapReaderContext *readerContext;
Assert(readerIndex < node->numInputSegs);
readerContext = (MotionMKHeapReaderContext *) ctxt->readers[readerIndex].mkhr_ctxt;
readerContext->srcRoute = routeIndex;
readerIndex++;
}
}
Assert(readerIndex == node->numInputSegs);
}
MemSet(a, 0, sizeof(MKEntry));
/* Receive the successor of the tuple that we returned last time. */
recvRC = RecvTupleFrom(node->ps.state->motionlayer_context,
node->ps.state->interconnect_context,
motion->motionID,
&inputTuple,
ctxt->srcRoute);
if (recvRC == GOT_TUPLE)
{
a->ptr = inputTuple;
return true;
}
return false;
}
static Datum tupsort_fetch_datum_motion(MKEntry *a, MKContext *mkctxt, MKLvContext *lvctxt, bool *isNullOut)
{
Datum d;
if (is_heaptuple_memtuple(a->ptr))
d = memtuple_getattr((MemTuple) a->ptr, mkctxt->mt_bind, lvctxt->attno, isNullOut);
else
d = heap_getattr((HeapTuple) a->ptr, lvctxt->attno, mkctxt->tupdesc, isNullOut);
return d;
}
static void tupsort_free_datum_motion(MKEntry *e)
{
pfree(e->ptr);
e->ptr = NULL;
}
static void create_motion_mk_heap(MotionState *node)
{
MotionMKHeapContext *ctxt = palloc0(sizeof(MotionMKHeapContext));
Motion *motion = (Motion *) node->ps.plan;
int nreader = node->numInputSegs;
int i=0;
Assert(nreader >= 1);
create_mksort_context(
&ctxt->mkctxt,
motion->numSortCols,
tupsort_fetch_datum_motion,
tupsort_free_datum_motion,
ExecGetResultType(&node->ps), false, 0, /* dummy does not matter */
motion->sortOperators,
motion->sortColIdx,
NULL
);
ctxt->readers = palloc0(sizeof(MKHeapReader) * nreader);
for(i=0; i<nreader; ++i)
{
MotionMKHeapReaderContext *hrctxt = palloc(sizeof(MotionMKHeapContext));
hrctxt->node = node;
hrctxt->srcRoute = -1; /* set to a negative to indicate that we need to update it to the real value */
ctxt->readers[i].reader = motion_mkhp_read;
ctxt->readers[i].mkhr_ctxt = hrctxt;
}
node->tupleheap = (void *) ctxt;
}
static void destroy_motion_mk_heap(MotionState *node)
{
/* Don't need to do anything. Memory are allocated from
* query execution context. By calling this, we are at
* the end of the life of a query.
*/
}
static TupleTableSlot *
execMotionSortedReceiver_mk(MotionState * node)
{
TupleTableSlot *slot = NULL;
MKEntry e;
Motion *motion = (Motion *) node->ps.plan;
MotionMKHeapContext *ctxt = (MotionMKHeapContext *) node->tupleheap;
Assert(motion->motionType == MOTIONTYPE_FIXED &&
motion->numOutputSegs <= 1 &&
motion->sendSorted &&
ctxt
);
if (node->stopRequested)
{
SendStopMessage(node->ps.state->motionlayer_context,
node->ps.state->interconnect_context,
motion->motionID);
return NULL;
}
if (!node->tupleheapReady)
{
Assert(ctxt->readers);
Assert(!ctxt->heap);
ctxt->heap = mkheap_from_reader(ctxt->readers, node->numInputSegs, &ctxt->mkctxt);
node->tupleheapReady = true;
}
mke_set_empty(&e);
mkheap_putAndGet(ctxt->heap, &e);
if (mke_is_empty(&e))
return NULL;
slot = node->ps.ps_ResultTupleSlot;
slot = ExecStoreGenericTuple(e.ptr, slot, true);
return slot;
}
/* Sorted receiver using CdbHeap */
static TupleTableSlot *
execMotionSortedReceiver(MotionState * node)
{
TupleTableSlot *slot;
CdbHeap *hp = (CdbHeap *) node->tupleheap;
HeapTuple tuple,
inputTuple;
Motion *motion = (Motion *) node->ps.plan;
ReceiveReturnCode recvRC;
CdbTupleHeapInfo *tupHeapInfo;
AssertState(motion->motionType == MOTIONTYPE_FIXED &&
motion->numOutputSegs <= 1 &&
motion->sendSorted &&
hp != NULL);
/* Notify senders and return EOS if caller doesn't want any more data. */
if (node->stopRequested)
{
SendStopMessage(node->ps.state->motionlayer_context,
node->ps.state->interconnect_context,
motion->motionID);
return NULL;
}
/* On first call, fill the priority queue with each sender's first tuple. */
if (!node->tupleheapReady)
{
execMotionSortedReceiverFirstTime(node);
}
/*
* Delete from the priority queue the element that we fetched last
* time. Receive and insert the next tuple from that same sender.
*/
else
{
/* Old element is still at the head of the pq. */
AssertState(NULL != (tupHeapInfo = CdbHeap_Min(CdbTupleHeapInfo, hp)) &&
tupHeapInfo->tuple == NULL &&
tupHeapInfo->sourceRouteId == node->routeIdNext);
/* Receive the successor of the tuple that we returned last time. */
recvRC = RecvTupleFrom(node->ps.state->motionlayer_context,
node->ps.state->interconnect_context,
motion->motionID,
&inputTuple,
node->routeIdNext);
/* Substitute it in the pq for its predecessor. */
if (recvRC == GOT_TUPLE)
{
CdbTupleHeapInfo info;
info.tuple = inputTuple;
info.sourceRouteId = node->routeIdNext;
CdbHeap_DeleteMinAndInsert(hp, &info);
node->numTuplesFromAMS++;
#ifdef CDB_MOTION_DEBUG
if (node->numTuplesFromAMS <= 20)
{
StringInfoData buf;
initStringInfo(&buf);
appendStringInfo(&buf, " motion%-3d rcv<-%-3d %5d.",
motion->motionID,
node->routeIdNext,
node->numTuplesFromAMS);
formatTuple(&buf, inputTuple, ExecGetResultType(&node->ps),
node->outputFunArray);
elog(DEBUG3, buf.data);
pfree(buf.data);
}
#endif
}
/* At EOS, drop this sender from the priority queue. */
else if (!CdbHeap_IsEmpty(hp))
CdbHeap_DeleteMin(hp);
}
/* Finished if all senders have returned EOS. */
if (CdbHeap_IsEmpty(hp))
{
Assert(node->numTuplesFromAMS == node->numTuplesToParent);
Assert(node->numTuplesFromChild == 0);
Assert(node->numTuplesToAMS == 0);
return NULL;
}
/*
* Our next result tuple, with lowest key among all senders, is now
* at the head of the priority queue. Get it from there.
*
* We transfer ownership of the tuple from the pq element to
* our caller, but the pq element itself will remain in place
* until the next time we are called, to avoid an unnecessary
* rearrangement of the priority queue.
*/
tupHeapInfo = CdbHeap_Min(CdbTupleHeapInfo, hp);
tuple = tupHeapInfo->tuple;
node->routeIdNext = tupHeapInfo->sourceRouteId;
/* Zap dangling tuple ptr for safety. PQ element doesn't own it anymore. */
tupHeapInfo->tuple = NULL;
/* Update counters. */
node->numTuplesToParent++;
/* Store tuple in our result slot. */
slot = outerPlanState(node)->ps_ResultTupleSlot;
slot = ExecStoreGenericTuple(tuple, slot, true /* shouldFree */);
#ifdef CDB_MOTION_DEBUG
if (node->numTuplesToParent <= 20)
{
StringInfoData buf;
initStringInfo(&buf);
appendStringInfo(&buf, " motion%-3d mrg<-%-3d %5d.",
motion->motionID,
node->routeIdNext,
node->numTuplesToParent);
formatTuple(&buf, tuple, ExecGetResultType(&node->ps),
node->outputFunArray);
elog(DEBUG3, buf.data);
pfree(buf.data);
}
#endif
/* Return result slot. */
return slot;
} /* execMotionSortedReceiver */
void
execMotionSortedReceiverFirstTime(MotionState * node)
{
HeapTuple inputTuple;
CdbHeap *hp = (CdbHeap *) node->tupleheap;
Motion *motion = (Motion *) node->ps.plan;
int iSegIdx;
int n = 0;
ListCell *lcProcess;
ReceiveReturnCode recvRC;
Slice *sendSlice = (Slice *)list_nth(node->ps.state->es_sliceTable->slices, motion->motionID);
Assert(sendSlice->sliceIndex == motion->motionID);
/*
* We need to get a tuple from every sender, and stick it into the heap.
*/
foreach_with_count(lcProcess, sendSlice->primaryProcesses, iSegIdx)
{
if ( lfirst(lcProcess) == NULL)
continue; /* skip this one: we are not receiving from it */
/*
* another place where we are mapping segid space to routeid space. so
* route[x] = inputSegIdx[x] now.
*/
recvRC = RecvTupleFrom(node->ps.state->motionlayer_context,
node->ps.state->interconnect_context,
motion->motionID, &inputTuple, iSegIdx);
if (recvRC == GOT_TUPLE)
{
CdbTupleHeapInfo *infoArray = (CdbTupleHeapInfo *)hp->slotArray;
CdbTupleHeapInfo *info = &infoArray[n];
n++;
info->tuple = inputTuple;
info->sourceRouteId = iSegIdx;
node->numTuplesFromAMS++;
#ifdef CDB_MOTION_DEBUG
if (node->numTuplesFromAMS <= 20)
{
StringInfoData buf;
initStringInfo(&buf);
appendStringInfo(&buf, " motion%-3d rcv<-%-3d %5d.",
motion->motionID,
iSegIdx,
node->numTuplesFromAMS);
formatTuple(&buf, inputTuple, ExecGetResultType(&node->ps),
node->outputFunArray);
elog(DEBUG3, buf.data);
pfree(buf.data);
}
#endif
}
}
Assert(iSegIdx == node->numInputSegs);
/*
* Rearrange the infoArray to satisfy the heap property.
* This is quicker than inserting the initial elements one by one.
*/
CdbHeap_Heapify(hp, n);
node->tupleheapReady = true;
} /* execMotionSortedReceiverFirstTime */
/* ----------------------------------------------------------------
* ExecInitMotion
*
* NOTE: have to be a bit careful, estate->es_cur_slice_idx is not the
* ultimate correct value that it should be on the QE. this happens
* after this call in mppexec.c. This is ok since we don't need it,
* but just be aware before you try and use it here.
* ----------------------------------------------------------------
*/
MotionState *
ExecInitMotion(Motion * node, EState *estate, int eflags)
{
MotionState *motionstate = NULL;
TupleDesc tupDesc;
Slice *sendSlice = NULL;
Slice *recvSlice = NULL;
SliceTable *sliceTable = estate->es_sliceTable;
int parentSliceIndex = estate->currentSliceIdInPlan;
#ifdef CDB_MOTION_DEBUG
int i;
#endif
Assert(node->motionID > 0);
Assert(node->motionID <= sliceTable->nMotions);
estate->currentSliceIdInPlan = node->motionID;
int parentExecutingSliceId = estate->currentExecutingSliceId;
estate->currentExecutingSliceId = node->motionID;
/*
* create state structure
*/
motionstate = makeNode(MotionState);
motionstate->ps.plan = (Plan *) node;
motionstate->ps.state = estate;
motionstate->mstype = MOTIONSTATE_NONE;
motionstate->stopRequested = false;
motionstate->hashExpr = NULL;
motionstate->cdbhash = NULL;
/* Look up the sending gang's slice table entry. */
sendSlice = (Slice *)list_nth(sliceTable->slices, node->motionID);
Assert(IsA(sendSlice, Slice));
Assert(sendSlice->sliceIndex == node->motionID);
/* QD must fill in the global slice table. */
if (Gp_role == GP_ROLE_DISPATCH)
{
MemoryContext oldcxt = MemoryContextSwitchTo(estate->es_query_cxt);
Flow *sendFlow;
/* Top node of subplan should have a Flow node. */
Insist(node->plan.lefttree && node->plan.lefttree->flow);
sendFlow = node->plan.lefttree->flow;
/* Sending slice table entry hasn't been filled in yet. */
Assert(sendSlice->rootIndex == -1);
Assert(sendSlice->gangSize == 0);
/* Look up the receiving (parent) gang's slice table entry. */
recvSlice = (Slice *)list_nth(sliceTable->slices, parentSliceIndex);
Assert(IsA(recvSlice, Slice));
Assert(recvSlice->sliceIndex == parentSliceIndex);
Assert(recvSlice->rootIndex == 0 ||
(recvSlice->rootIndex > sliceTable->nMotions &&
recvSlice->rootIndex < list_length(sliceTable->slices)));
/* Sending slice become a children of recv slice */
recvSlice->children = lappend_int(recvSlice->children, sendSlice->sliceIndex);
sendSlice->parentIndex = parentSliceIndex;
sendSlice->rootIndex = recvSlice->rootIndex;
/* The gang beneath a Motion will be a reader. */
sendSlice->gangType = GANGTYPE_PRIMARY_READER;
/* How many sending processes in the dispatcher array? Note that targeted dispatch may reduce this number in practice */
sendSlice->gangSize = 1;
if (sendFlow->flotype != FLOW_SINGLETON)
{
sendSlice->gangSize = GetQEGangNum();
}
/* Does sending slice need 1-gang with read-only access to entry db? */
if (sendFlow->flotype == FLOW_SINGLETON &&
sendFlow->segindex == -1)
{
sendSlice->gangType = GANGTYPE_ENTRYDB_READER;
}
sendSlice->numGangMembersToBeActive = sliceCalculateNumSendingProcesses(sendSlice, GetQEGangNum());
if (node->motionType == MOTIONTYPE_FIXED && node->numOutputSegs == 1)
{
/* Sending to a single receiving process on the entry db? */
/* Is receiving slice a root slice that runs here in the qDisp? */
if (recvSlice->sliceIndex == recvSlice->rootIndex)
{
motionstate->mstype = MOTIONSTATE_RECV;
Assert(recvSlice->gangType == GANGTYPE_UNALLOCATED);
}
else
{
Assert(recvSlice->gangSize == 1);
Assert(node->outputSegIdx[0] >= 0
? (recvSlice->gangType == GANGTYPE_PRIMARY_READER ||
recvSlice->gangType == GANGTYPE_ENTRYDB_READER)
: recvSlice->gangType == GANGTYPE_ENTRYDB_READER);
}
}
MemoryContextSwitchTo(oldcxt);
}
/* QE must fill in map from motionID to MotionState node. */
else
{
Insist(Gp_role == GP_ROLE_EXECUTE);
recvSlice = (Slice *)list_nth(sliceTable->slices, sendSlice->parentIndex);
if (LocallyExecutingSliceIndex(estate) == recvSlice->sliceIndex)
{
/* this is recv */
motionstate->mstype = MOTIONSTATE_RECV;
}
else if (LocallyExecutingSliceIndex(estate) == sendSlice->sliceIndex)
{
/* this is send */
motionstate->mstype = MOTIONSTATE_SEND;
}
/* TODO: If neither sending nor receiving, don't bother to initialize. */
}
motionstate->tupleheapReady = false;
motionstate->sentEndOfStream = false;
motionstate->otherTime.tv_sec = 0;
motionstate->otherTime.tv_usec = 0;
motionstate->motionTime.tv_sec = 0;
motionstate->motionTime.tv_usec = 0;
motionstate->numTuplesFromChild = 0;
motionstate->numTuplesToAMS = 0;
motionstate->numTuplesFromAMS = 0;
motionstate->numTuplesToParent = 0;
motionstate->stopRequested = false;
motionstate->numInputSegs = sendSlice->numGangMembersToBeActive;
/*
* Miscellaneous initialization
*
* create expression context for node
*/
ExecAssignExprContext(estate, &motionstate->ps);
/*
* tuple table initialization
*/
ExecInitResultTupleSlot(estate, &motionstate->ps);
/*
* initializes child nodes.
*/
outerPlanState(motionstate) = ExecInitNode(outerPlan(node), estate, eflags);
/*
* initialize tuple type. no need to initialize projection info
* because this node doesn't do projections.
*/
ExecAssignResultTypeFromTL(&motionstate->ps);
motionstate->ps.ps_ProjInfo = NULL;
tupDesc = ExecGetResultType(&motionstate->ps);
/* Set up motion send data structures */
if (motionstate->mstype == MOTIONSTATE_SEND && node->motionType == MOTIONTYPE_HASH)
{
int nkeys;
Assert(node->numOutputSegs > 0);
nkeys = list_length(node->hashDataTypes);
if (nkeys > 0)
motionstate->hashExpr = (List *) ExecInitExpr((Expr *) node->hashExpr,
(PlanState *) motionstate);
/*
* Create hash API reference
*/
motionstate->cdbhash = makeCdbHash(node->numOutputSegs, HASH_FNV_1);
#ifdef MEASURE_MOTION_TIME
/*
* Create buckets to hold counts of tuples hashing to each
* destination segindex (for debugging purposes)
*/
motionstate->numTuplesByHashSegIdx = (int *) palloc0(node->numHashSegs * sizeof(int));
#endif
}
/* Merge Receive: Set up the key comparator and priority queue. */
if (node->sendSorted && motionstate->mstype == MOTIONSTATE_RECV)
{
if (gp_enable_motion_mk_sort)
create_motion_mk_heap(motionstate);
else
{
CdbMergeComparatorContext *mcContext;
/* Allocate context object for the key comparator. */
mcContext = CdbMergeComparator_CreateContext(tupDesc,
node->numSortCols,
node->sortColIdx,
node->sortOperators);
/* Create the priority queue structure. */
motionstate->tupleheap = CdbHeap_Create(CdbMergeComparator,
mcContext,
motionstate->numInputSegs,
sizeof(CdbTupleHeapInfo),
NULL);
}
}
/*
* Perform per-node initialization in the motion layer.
*/
UpdateMotionLayerNode(motionstate->ps.state->motionlayer_context,
node->motionID,
node->sendSorted,
tupDesc,
PlanStateOperatorMemKB((PlanState *) motionstate));
#ifdef CDB_MOTION_DEBUG
motionstate->outputFunArray = (Oid *)palloc(tupDesc->natts * sizeof(Oid));
for (i = 0; i < tupDesc->natts; i++)
{
bool typisvarlena;
getTypeOutputInfo(tupDesc->attrs[i]->atttypid,
&motionstate->outputFunArray[i],
&typisvarlena);
}
#endif
/*
* Temporarily set currentExecutingSliceId to the parent value, since
* this motion might be in the top slice of an InitPlan.
*/
estate->currentExecutingSliceId = parentExecutingSliceId;
initGpmonPktForMotion((Plan *)node, &motionstate->ps.gpmon_pkt, estate);
estate->currentExecutingSliceId = node->motionID;
return motionstate;
}
#define MOTION_NSLOTS 1
/* ----------------------------------------------------------------
* ExecCountSlotsMotion
* ----------------------------------------------------------------
*/
int
ExecCountSlotsMotion(Motion * node)
{
return ExecCountSlotsNode(outerPlan((Plan *) node)) +
ExecCountSlotsNode(innerPlan((Plan *) node)) +
MOTION_NSLOTS;
}
/* ----------------------------------------------------------------
* ExecEndMotion(node)
* ----------------------------------------------------------------
*/
void
ExecEndMotion(MotionState * node)
{
Motion *motion = (Motion *)node->ps.plan;
uint16 motNodeID = motion->motionID;
#ifdef MEASURE_MOTION_TIME
double otherTimeSec;
double motionTimeSec;
#endif
ExecFreeExprContext(&node->ps);
/*
* clean out the tuple table
*/
ExecClearTuple(node->ps.ps_ResultTupleSlot);
/*
* Set the slice no for the nodes under this motion.
*/
Assert(node->ps.state != NULL);
node->ps.state->currentSliceIdInPlan = motNodeID;
int parentExecutingSliceId = node->ps.state->currentExecutingSliceId;
node->ps.state->currentExecutingSliceId = motNodeID;
/*
* shut down the subplan
*/
ExecEndNode(outerPlanState(node));
#ifdef MEASURE_MOTION_TIME
motionTimeSec = (double) node->motionTime.tv_sec + (double) node->motionTime.tv_usec / 1000000.0;
if (node->mstype == MOTIONSTATE_RECV)
{
elog(DEBUG1,
"Motion Node %d (RECEIVER) Statistics:\n"
"Timing: \n"
"\t Time receiving the tuple: %f sec\n"
"Counters: \n"
"\tnumTuplesFromChild: %d\n"
"\tnumTuplesFromAMS: %d\n"
"\tnumTuplesToAMS: %d\n"
"\tnumTuplesToParent: %d\n",
motNodeID,
motionTimeSec,
node->numTuplesFromChild,
node->numTuplesFromAMS,
node->numTuplesToAMS,
node->numTuplesToParent
);
}
else if(node->mstype == MOTIONSTATE_SEND)
{
otherTimeSec = (double) node->otherTime.tv_sec + (double) node->otherTime.tv_usec / 1000000.0;
elog(DEBUG1,
"Motion Node %d (SENDER) Statistics:\n"
"Timing: \n"
"\t Time getting next tuple to send: %f sec \n"
"\t Time sending the tuple: %f sec\n"
"\t Percentage of time sending: %2.2f%% \n"
"Counters: \n"
"\tnumTuplesFromChild: %d\n"
"\tnumTuplesToAMS: %d\n",
motNodeID,
otherTimeSec,
motionTimeSec,
(double) (motionTimeSec / (otherTimeSec + motionTimeSec)) * 100,
node->numTuplesFromChild,
node->numTuplesToAMS
);
}
if (node->numTuplesByHashSegIdx != NULL)
{
int i;
for (i = 0; i < motion->numHashSegs; i++)
{
elog(DEBUG1, "Motion Node %3d Hash Bucket %3d: %10d tuples",
motNodeID, i, node->numTuplesByHashSegIdx[i]);
}
pfree(node->numTuplesByHashSegIdx);
node->numTuplesByHashSegIdx = NULL;
}
#endif /* MEASURE_MOTION_TIME */
/* Merge Receive: Free the priority queue and associated structures. */
if (node->tupleheap != NULL)
{
if (gp_enable_motion_mk_sort)
destroy_motion_mk_heap(node);
else
{
CdbHeap *hp = (CdbHeap *) node->tupleheap;
CdbMergeComparator_DestroyContext(hp->comparatorContext);
CdbHeap_Destroy(hp);
}
node->tupleheap = NULL;
}
/* Free the slices and routes */
if(node->cdbhash != NULL)
{
pfree(node->cdbhash);
node->cdbhash = NULL;
}
/*
* Free up this motion node's resources in the Motion Layer.
*
* TODO: For now, we don't flush the comm-layer. NO ERRORS DURING AMS!!!
*/
EndMotionLayerNode(node->ps.state->motionlayer_context, motNodeID, /* flush-comm-layer */ false);
#ifdef CDB_MOTION_DEBUG
if (node->outputFunArray)
pfree(node->outputFunArray);
#endif
/*
* Temporarily set currentExecutingSliceId to the parent value, since
* this motion might be in the top slice of an InitPlan.
*/
node->ps.state->currentExecutingSliceId = parentExecutingSliceId;
EndPlanStateGpmonPkt(&node->ps);
node->ps.state->currentExecutingSliceId = motNodeID;
}
/*=========================================================================
* HELPER FUNCTIONS
*/
/*
* CdbMergeComparator:
* Used to compare tuples for a sorted motion node.
*/
int
CdbMergeComparator(void *lhs, void *rhs, void *context)
{
CdbMergeComparatorContext *ctx = (CdbMergeComparatorContext *)context;
CdbTupleHeapInfo *linfo = (CdbTupleHeapInfo *) lhs;
CdbTupleHeapInfo *rinfo = (CdbTupleHeapInfo *) rhs;
HeapTuple ltup = linfo->tuple;
HeapTuple rtup = rinfo->tuple;
FmgrInfo *sortFunctions;
SortFunctionKind *sortFnKinds;
int numSortCols;
AttrNumber *sortColIdx;
TupleDesc tupDesc;
int nkey;
Assert(ltup && rtup);
sortFunctions = ctx->sortFunctions;
sortFnKinds = ctx->sortFnKinds;
numSortCols = ctx->numSortCols;
sortColIdx = ctx->sortColIdx;
tupDesc = ctx->tupDesc;
for (nkey = 0; nkey < numSortCols; nkey++)
{
AttrNumber attno = sortColIdx[nkey];
Datum datum1,
datum2;
bool isnull1,
isnull2;
int32 compare;
if(is_heaptuple_memtuple(ltup))
datum1 = memtuple_getattr((MemTuple) ltup, ctx->mt_bind, attno, &isnull1);
else
datum1 = heap_getattr(ltup, attno, tupDesc, &isnull1);
if(is_heaptuple_memtuple(rtup))
datum2 = memtuple_getattr((MemTuple) rtup, ctx->mt_bind, attno, &isnull2);
else
datum2 = heap_getattr(rtup, attno, tupDesc, &isnull2);
compare = ApplySortFunction(&sortFunctions[nkey],
sortFnKinds[nkey],
datum1, isnull1,
datum2, isnull2);
if (compare != 0)
return compare;
}
return 0;
}
/* CdbMergeComparator */
/* Create context object for use by CdbMergeComparator */
CdbMergeComparatorContext *
CdbMergeComparator_CreateContext(TupleDesc tupDesc,
int numSortCols,
AttrNumber *sortColIdx,
Oid *sortOperators)
{
CdbMergeComparatorContext *ctx;
int i;
Assert(tupDesc &&
numSortCols > 0 &&
sortColIdx &&
sortOperators);
/* Allocate and initialize the context object. */
ctx = (CdbMergeComparatorContext *)palloc0(sizeof(*ctx));
ctx->numSortCols = numSortCols;
ctx->sortColIdx = sortColIdx;
ctx->tupDesc = tupDesc;
ctx->mt_bind = create_memtuple_binding(tupDesc);
/* Allocate the sort function arrays. */
ctx->sortFunctions = (FmgrInfo *)palloc0(numSortCols * sizeof(FmgrInfo));
ctx->sortFnKinds = (SortFunctionKind *)palloc0(numSortCols * sizeof(SortFunctionKind));
/* Load the sort functions. */
for (i = 0; i < numSortCols; i++)
{
RegProcedure sortFunction;
Assert(sortOperators[i] && sortColIdx[i]);
/* select a function that implements the sort operator */
SelectSortFunction(sortOperators[i],
&sortFunction,
&ctx->sortFnKinds[i]);
fmgr_info(sortFunction, &ctx->sortFunctions[i]);
}
return ctx;
} /* CdbMergeComparator_CreateContext */
void
CdbMergeComparator_DestroyContext(CdbMergeComparatorContext *ctx)
{
if (!ctx)
return;
if (ctx->sortFnKinds)
pfree(ctx->sortFnKinds);
if (ctx->sortFunctions)
pfree(ctx->sortFunctions);
} /* CdbMergeComparator_DestroyContext */
/*
* Experimental code that will be replaced later with new hashing mechanism
*/
uint32
evalHashKey(ExprContext *econtext, List *hashkeys, List *hashtypes, CdbHash * h)
{
ListCell *hk;
ListCell *ht;
MemoryContext oldContext;
ResetExprContext(econtext);
oldContext = MemoryContextSwitchTo(econtext->ecxt_per_tuple_memory);
cdbhashinit(h);
/*
* If we have 1 or more distribution keys for this relation, hash
* them. However, If this happens to be a relation with an empty
* policy (partitioning policy with a NULL distribution key list)
* then we have no hash key value to feed in, so use cdbhashnokey()
* to assign a hash value for us.
*/
if (list_length(hashkeys) > 0)
{
forboth(hk, hashkeys, ht, hashtypes)
{
ExprState *keyexpr = (ExprState *) lfirst(hk);
Datum keyval;
bool isNull;
/*
* Get the attribute value of the tuple
*/
keyval = ExecEvalExpr(keyexpr, econtext, &isNull, NULL);
/*
* Compute the hash function
*/
if (!isNull) /* treat nulls as having hash key 0 */
cdbhash(h, keyval, lfirst_oid(ht));
else
cdbhashnull(h);
}
}
else
{
cdbhashnokey(h);
}
MemoryContextSwitchTo(oldContext);
return cdbhashreduce(h);
}
void
doSendEndOfStream(Motion * motion, MotionState * node)
{
/*
* We have no more child tuples, but we have not successfully sent an
* End-of-Stream token yet.
*/
SendEndOfStream(node->ps.state->motionlayer_context,
node->ps.state->interconnect_context,
motion->motionID);
node->sentEndOfStream = true;
}
/*
* A crufty confusing part of the current code is how contentId is used within
* the motion structures and then how that gets translated to targetRoutes by
* this motion nodes.
*
* WARNING: There are ALOT of assumptions in here about how the motion node
* instructions are encoded into motion and stuff.
*
* There are 3 types of sending that can happen
* here:
*
* FIXED - sending to a single process. the value in node->fixedSegIdxMask[0]
* is the contentId of who to send to. But we can actually ignore that
* since now with slice tables, we should only have a single CdbProcess
* that we could send to for this motion node.
*
*
* BROADCAST - actually a subcase of FIXED, but handling is simple. send to all
* of our routes.
*
* HASH - maps hash values to segid. this mapping is 1->1 such that a hash
* value of 2 maps to contentid of 2 (for now). Since we can't ever
* use Hash to send to the QD, the QD's contentid of -1 is not an issue.
* Also, the contentid maps directly to the routeid.
*
*/
void
doSendTuple(Motion * motion, MotionState * node, TupleTableSlot *outerTupleSlot)
{
int16 targetRoute;
HeapTuple tuple;
SendReturnCode sendRC;
ExprContext *econtext = node->ps.ps_ExprContext;
/* We got a tuple from the child-plan. */
node->numTuplesFromChild++;
if (motion->motionType == MOTIONTYPE_FIXED)
{
if (motion->numOutputSegs == 0) /* Broadcast */
{
targetRoute = BROADCAST_SEGIDX;
}
else /* Fixed Motion. */
{
Assert(motion->numOutputSegs == 1);
/*
* Actually, since we can only send to a single output segment
* here, we are guaranteed that we only have a single
* targetRoute setup that we could possibly send to. So we
* can cheat and just fix the targetRoute to 0 (the 1st
* route).
*/
targetRoute = 0;
}
}
else if (motion->motionType == MOTIONTYPE_HASH) /* Redistribute */
{
uint32 hval = 0;
Assert(motion->numOutputSegs > 0);
Assert(motion->outputSegIdx != NULL);
econtext->ecxt_outertuple = outerTupleSlot;
Assert(node->cdbhash->numsegs == motion->numOutputSegs);
hval = evalHashKey(econtext, node->hashExpr,
motion->hashDataTypes, node->cdbhash);
Assert(hval < GetQEGangNum() && "redistribute destination outside segment array");
/* hashSegIdx takes our uint32 and maps it to an int, and here
* we assign it to an int16. See below. */
targetRoute = motion->outputSegIdx[hval];
/* see MPP-2099, let's not run into this one again! NOTE: the
* definition of BROADCAST_SEGIDX is key here, it *cannot* be
* a valid route which our map (above) will *ever* return.
*
* Note the "mapping" is generated at *planning* time in
* makeDefaultSegIdxArray() in cdbmutate.c (it is the trivial
* map, and is passed around our system a fair amount!). */
Assert(targetRoute != BROADCAST_SEGIDX);
}
else /* ExplicitRedistribute */
{
Datum segidColIdxDatum;
Assert(motion->segidColIdx > 0 && motion->segidColIdx <= list_length((motion->plan).targetlist));
bool is_null = false;
segidColIdxDatum = slot_getattr(outerTupleSlot, motion->segidColIdx, &is_null);
targetRoute = Int32GetDatum(segidColIdxDatum);
Assert(!is_null);
}
tuple = ExecFetchSlotGenericTuple(outerTupleSlot, true);
/* send the tuple out. */
sendRC = SendTuple(node->ps.state->motionlayer_context,
node->ps.state->interconnect_context,
motion->motionID,
tuple,
targetRoute);
Assert(sendRC == SEND_COMPLETE || sendRC == STOP_SENDING);
if (sendRC == SEND_COMPLETE)
node->numTuplesToAMS++;
else
node->stopRequested = true;
#ifdef CDB_MOTION_DEBUG
if (sendRC == SEND_COMPLETE && node->numTuplesToAMS <= 20)
{
StringInfoData buf;
initStringInfo(&buf);
appendStringInfo(&buf, " motion%-3d snd->%-3d, %5d.",
motion->motionID,
targetRoute,
node->numTuplesToAMS);
formatTuple(&buf, tuple, ExecGetResultType(&node->ps),
node->outputFunArray);
elog(DEBUG3, buf.data);
pfree(buf.data);
}
#endif
}
/*
* ExecReScanMotion
*
* Motion nodes do not allow rescan after a tuple has been fetched.
*
* When the planner knows that a NestLoop cannot have more than one outer
* tuple, it can omit the usual Materialize operator atop the inner subplan,
* which can lead to invocation of ExecReScanMotion before the motion node's
* first tuple is fetched. Rescan can be implemented as a no-op in this case.
* (After ExecNestLoop fetches an outer tuple, it invokes rescan on the inner
* subplan before fetching the first inner tuple. That doesn't bother us,
* provided there is only one outer tuple.)
*/
void
ExecReScanMotion(MotionState *node, ExprContext *exprCtxt)
{
if (node->ps.chgParam != NULL
&& (node->mstype != MOTIONSTATE_RECV ||
node->numTuplesToParent != 0)
)
{
ereport(ERROR,
(errcode(ERRCODE_INTERNAL_ERROR),
errmsg("Illegal rescan of motion node: invalid plan."),
errhint("likely caused by bad NL-join, try setting enable_nestloop off")));
}
return;
}
/*
* Mark this node as "stopped." When ExecProcNode() is called on a
* stopped motion node it should behave as if there are no tuples
* available.
*
* ExecProcNode() on a stopped motion node should also notify the
* "other end" of the motion node of the stoppage.
*
* Note: once this is called, it is possible that the motion node will
* never be called again, so we *must* send the stop message now.
*/
void
ExecStopMotion(MotionState * node)
{
Motion *motion;
AssertArg(node != NULL);
motion = (Motion *) node->ps.plan;
node->stopRequested = true;
node->ps.state->active_recv_id = -1;
//if (!node->ps.state->es_interconnect_is_setup)
//return;
if (Gp_interconnect_type == INTERCONNECT_TYPE_NIL)
return;
/* pass down */
SendStopMessage(node->ps.state->motionlayer_context,
node->ps.state->interconnect_context,
motion->motionID);
}
void
initGpmonPktForMotion(Plan *planNode, gpmon_packet_t *gpmon_pkt, EState *estate)
{
Assert(planNode != NULL && gpmon_pkt != NULL && IsA(planNode, Motion));
{
MotionType motionType = ((Motion *)planNode)->motionType;
PerfmonNodeType type = PMNT_Invalid;
if (motionType == MOTIONTYPE_HASH)
{
type = PMNT_RedistributeMotion;
}
else if (motionType == MOTIONTYPE_EXPLICIT)
{
type = PMNT_ExplicitRedistributeMotion;
}
else if (((Motion *)planNode)->numOutputSegs == 0)
{
type = PMNT_BroadcastMotion;
}
else
{
type = PMNT_GatherMotion;
}
Assert(GPMON_MOTION_TOTAL <= (int)GPMON_QEXEC_M_COUNT);
InitPlanNodeGpmonPkt(planNode, gpmon_pkt, estate,
type,
(int64) planNode->plan_rows, NULL);
}
}