blob: ee3434aaa4af3043a4e8f78285454c08861b672b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "nodeVMotion.h"
#include "tuplebatch.h"
#include "execVQual.h"
/*=========================================================================
* FUNCTIONS PROTOTYPES
*/
static TupleTableSlot *execVMotionSender(MotionState * node);
static TupleTableSlot *execVMotionUnsortedReceiver(MotionState * node);
TupleTableSlot *ExecVMotion(MotionState * node);
static void doSendTupleBatch(Motion * motion, MotionState * node, TupleTableSlot *outerTupleSlot);
static SendReturnCode
SendTupleBatch(MotionLayerState *mlStates, ChunkTransportState *transportStates,
int16 motNodeID, TupleBatch tuplebatch, int16 targetRoute);
/*=========================================================================
*/
TupleTableSlot *
ExecVMotionVirtualLayer(MotionState *node)
{
if(node->mstype == MOTIONSTATE_SEND)
return ExecVMotion(node);
else if(node->mstype == MOTIONSTATE_RECV)
{
TupleTableSlot* slot = node->ps.ps_ResultTupleSlot;
while(1)
{
bool succ = VirtualNodeProc(slot);
if(!succ)
{
slot = ExecVMotion(node);
if(TupIsNull(slot))
break;
else
continue;
}
break;
}
return slot;
}
}
/* ----------------------------------------------------------------
* ExecVMotion
* ----------------------------------------------------------------
*/
TupleTableSlot *
ExecVMotion(MotionState * node)
{
Motion *motion = (Motion *) node->ps.plan;
/*
* at the top here we basically decide: -- SENDER vs. RECEIVER and --
* SORTED vs. UNSORTED
*/
if (node->mstype == MOTIONSTATE_RECV)
{
TupleTableSlot *tuple;
if (node->ps.state->active_recv_id >= 0)
{
if (node->ps.state->active_recv_id != motion->motionID)
{
elog(LOG, "DEADLOCK HAZARD: Updating active_motion_id from %d to %d",
node->ps.state->active_recv_id, motion->motionID);
node->ps.state->active_recv_id = motion->motionID;
}
} else
node->ps.state->active_recv_id = motion->motionID;
/* Running in diagnostic mode ? */
if (Gp_interconnect_type == INTERCONNECT_TYPE_NIL)
{
node->ps.state->active_recv_id = -1;
return NULL;
}
Assert(!motion->sendSorted);
tuple = execVMotionUnsortedReceiver(node);
if (tuple == NULL)
node->ps.state->active_recv_id = -1;
else
{
Gpmon_M_Incr(GpmonPktFromMotionState(node), GPMON_QEXEC_M_ROWSIN);
Gpmon_M_Incr_Rows_Out(GpmonPktFromMotionState(node));
setMotionStatsForGpmon(node);
}
#ifdef MEASURE_MOTION_TIME
gettimeofday(&stopTime, NULL);
node->motionTime.tv_sec += stopTime.tv_sec - startTime.tv_sec;
node->motionTime.tv_usec += stopTime.tv_usec - startTime.tv_usec;
while (node->motionTime.tv_usec < 0)
{
node->motionTime.tv_usec += 1000000;
node->motionTime.tv_sec--;
}
while (node->motionTime.tv_usec >= 1000000)
{
node->motionTime.tv_usec -= 1000000;
node->motionTime.tv_sec++;
}
#endif
CheckSendPlanStateGpmonPkt(&node->ps);
return tuple;
}
else if(node->mstype == MOTIONSTATE_SEND)
{
return execVMotionSender(node);
}
Assert(!"Non-active motion is executed");
return NULL;
}
static TupleTableSlot *
execVMotionSender(MotionState * node)
{
/* SENDER LOGIC */
TupleTableSlot *outerTupleSlot;
PlanState *outerNode;
Motion *motion = (Motion *) node->ps.plan;
bool done = false;
#ifdef MEASURE_MOTION_TIME
struct timeval time1;
struct timeval time2;
gettimeofday(&time1, NULL);
#endif
AssertState(motion->motionType == MOTIONTYPE_HASH ||
(motion->motionType == MOTIONTYPE_EXPLICIT && motion->segidColIdx > 0) ||
(motion->motionType == MOTIONTYPE_FIXED && motion->numOutputSegs <= 1));
Assert(node->ps.state->interconnect_context);
while (!done)
{
/* grab TupleTableSlot from our child. */
outerNode = outerPlanState(node);
outerTupleSlot = ExecProcNode(outerNode);
/* Running in diagnostic mode, we just drop all tuples. */
if (Gp_interconnect_type == INTERCONNECT_TYPE_NIL)
{
if (!TupIsNull(outerTupleSlot))
continue;
return NULL;
}
if (done || TupIsNull(outerTupleSlot))
{
doSendEndOfStream(motion, node);
done = true;
}
else
{
doSendTupleBatch(motion, node, outerTupleSlot);
Gpmon_M_Incr_Rows_Out(GpmonPktFromMotionState(node));
setMotionStatsForGpmon(node);
CheckSendPlanStateGpmonPkt(&node->ps);
if (node->stopRequested)
{
elog(gp_workfile_caching_loglevel, "Motion initiating Squelch walker");
/* propagate stop notification to our children */
ExecSquelchNode(outerNode);
done = true;
}
}
#ifdef MEASURE_MOTION_TIME
gettimeofday(&time1, NULL);
node->motionTime.tv_sec += time1.tv_sec - time2.tv_sec;
node->motionTime.tv_usec += time1.tv_usec - time2.tv_usec;
while (node->motionTime.tv_usec < 0)
{
node->motionTime.tv_usec += 1000000;
node->motionTime.tv_sec--;
}
while (node->motionTime.tv_usec >= 1000000)
{
node->motionTime.tv_usec -= 1000000;
node->motionTime.tv_sec++;
}
#endif
}
Assert(node->stopRequested || node->numTuplesFromChild == node->numTuplesToAMS);
/* nothing else to send out, so we return NULL up the tree. */
return NULL;
}
void
doSendTupleBatch(Motion * motion, MotionState * node, TupleTableSlot *outerTupleSlot)
{
int16 targetRoute;
HeapTuple tuple;
SendReturnCode sendRC;
ExprContext *econtext = node->ps.ps_ExprContext;
/* We got a tuple from the child-plan. */
node->numTuplesFromChild++;
if (motion->motionType == MOTIONTYPE_FIXED)
{
if (motion->numOutputSegs == 0) /* Broadcast */
{
targetRoute = BROADCAST_SEGIDX;
}
else /* Fixed Motion. */
{
Assert(motion->numOutputSegs == 1);
/*
* Actually, since we can only send to a single output segment
* here, we are guaranteed that we only have a single
* targetRoute setup that we could possibly send to. So we
* can cheat and just fix the targetRoute to 0 (the 1st
* route).
*/
targetRoute = 0;
}
}
else if (motion->motionType == MOTIONTYPE_HASH) /* Redistribute */
{
//TODO:: Implement later
}
else /* ExplicitRedistribute */
{
Datum segidColIdxDatum;
Assert(motion->segidColIdx > 0 && motion->segidColIdx <= list_length((motion->plan).targetlist));
bool is_null = false;
segidColIdxDatum = slot_getattr(outerTupleSlot, motion->segidColIdx, &is_null);
targetRoute = Int32GetDatum(segidColIdxDatum);
Assert(!is_null);
}
/* send the tuple out. */
sendRC = SendTupleBatch(node->ps.state->motionlayer_context,
node->ps.state->interconnect_context,
motion->motionID,
outerTupleSlot->PRIVATE_tb,
targetRoute);
Assert(sendRC == SEND_COMPLETE || sendRC == STOP_SENDING);
if (sendRC == SEND_COMPLETE)
node->numTuplesToAMS++;
else
node->stopRequested = true;
}
TupleTableSlot *execVMotionUnsortedReceiver(MotionState * node)
{
/* RECEIVER LOGIC */
TupleTableSlot *slot;
HeapTuple tuple;
Motion *motion = (Motion *) node->ps.plan;
ReceiveReturnCode recvRC;
AssertState(motion->motionType == MOTIONTYPE_HASH ||
(motion->motionType == MOTIONTYPE_EXPLICIT && motion->segidColIdx > 0) ||
(motion->motionType == MOTIONTYPE_FIXED && motion->numOutputSegs <= 1));
Assert(node->ps.state->motionlayer_context);
Assert(node->ps.state->interconnect_context);
if (node->stopRequested)
{
SendStopMessage(node->ps.state->motionlayer_context,
node->ps.state->interconnect_context,
motion->motionID);
return NULL;
}
recvRC = RecvTupleFrom(node->ps.state->motionlayer_context,
node->ps.state->interconnect_context,
motion->motionID, &tuple, ANY_ROUTE);
if (recvRC == END_OF_STREAM)
{
Assert(node->numTuplesFromAMS == node->numTuplesToParent);
Assert(node->numTuplesFromChild == 0);
Assert(node->numTuplesToAMS == 0);
return NULL;
}
node->numTuplesFromAMS++;
node->numTuplesToParent++;
/* store it in our result slot and return this. */
slot = node->ps.ps_ResultTupleSlot;
bool succ = tbDeserialization(((MemTuple)tuple)->PRIVATE_mt_bits,&slot->PRIVATE_tb);
if(!succ)
elog(ERROR,"Deserialization process Failed");
TupSetVirtualTupleNValid(slot, ((TupleBatch)slot->PRIVATE_tb)->ncols);
return slot;
}
/*
* Function: SendTupleBatch - Sends a batch of tuples to the AMS layer.
*/
SendReturnCode
SendTupleBatch(MotionLayerState *mlStates,
ChunkTransportState *transportStates,
int16 motNodeID,
TupleBatch tuplebatch,
int16 targetRoute)
{
MotionNodeEntry *pMNEntry;
TupleChunkListData tcList;
MemoryContext oldCtxt;
SendReturnCode rc;
/*
* Analyze tools. Do not send any thing if this slice is in the bit mask
*/
if (gp_motion_slice_noop != 0 && (gp_motion_slice_noop & (1 << currentSliceId)) != 0)
return SEND_COMPLETE;
/*
* Pull up the motion node entry with the node's details. This includes
* details that affect sending, such as whether the motion node needs to
* include backup segment-dbs.
*/
pMNEntry = getMotionNodeEntry(mlStates, motNodeID, "SendTuple");
MemTuple tuple = tbSerialization(tuplebatch);
if (targetRoute != BROADCAST_SEGIDX)
{
struct directTransportBuffer b;
getTransportDirectBuffer(transportStates, motNodeID, targetRoute, &b);
if (b.pri != NULL && b.prilen > TUPLE_CHUNK_HEADER_SIZE)
{
int sent = 0;
sent = SerializeTupleDirect(tuple, &pMNEntry->ser_tup_info, &b);
if (sent > 0)
{
putTransportDirectBuffer(transportStates, motNodeID, targetRoute, sent);
tcList.num_chunks = 1;
tcList.serialized_data_length = sent;
statSendTuple(mlStates, pMNEntry, &tcList);
return SEND_COMPLETE;
}
}
}
/* Create and store the serialized form, and some stats about it. */
oldCtxt = MemoryContextSwitchTo(mlStates->motion_layer_mctx);
SerializeTupleIntoChunks(tuple, &pMNEntry->ser_tup_info, &tcList);
pfree(tuple);
MemoryContextSwitchTo(oldCtxt);
/* do the send. */
if (!SendTupleChunkToAMS(mlStates, transportStates, motNodeID, targetRoute, tcList.p_first))
{
pMNEntry->stopped = true;
rc = STOP_SENDING;
}
else
{
/* update stats */
statSendTuple(mlStates, pMNEntry, &tcList);
rc = SEND_COMPLETE;
}
/* cleanup */
clearTCList(&pMNEntry->ser_tup_info.chunkCache, &tcList);
return rc;
}