blob: c4a1ec951f958bdadca6f7ba0d4e35c13ad83315 [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
**********************************************************************/
/* -*-C++-*-
*****************************************************************************
*
* File: ex_send_top.C
* Description: Send top node (client part of client-server connection)
*
*
* Created: 12/11/95
* Language: C++
*
*
*
*
*****************************************************************************
*/
// -----------------------------------------------------------------------
#include "BaseTypes.h"
#include "ex_stdh.h"
#include "ex_exe_stmt_globals.h"
#include "ComTdb.h"
#include "ex_tcb.h"
#include "ex_expr.h"
#include "str.h"
#include "ex_send_top.h"
#include "ex_send_bottom.h"
#include "ex_split_bottom.h"
#include "ex_io_control.h"
#include "ex_frag_rt.h"
#include "Ex_esp_msg.h"
#include "ComDiags.h"
#include "ExStats.h"
#include "ComTdb.h"
#include "logmxevent.h"
#include "ExCextdecs.h"
#include "Context.h"
#include "seabed/int/opts.h"
static THREAD_P short sv_max_parallel_opens = 0;
#include "ExSMTrace.h"
#include "SMConnection.h"
#include "ExSMQueue.h"
#define ex_assert_both_sides( assert_test, assert_msg ) \
if (!(assert_test)) \
{ \
if (connection_) \
connection_->dumpAndStopOtherEnd(true, false); \
ex_assert(0, assert_msg); \
}
// -----------------------------------------------------------------------
// Methods for class ex_send_top_tdb
// -----------------------------------------------------------------------
ex_tcb * ex_send_top_tdb::build(ex_globals * glob)
{
// seems like no split top node is above us, assume that we
// are building only a single instance
ex_assert(0,"send top w/o split top not used at this time");
return buildInstance(glob->castToExExeStmtGlobals(),
0,
0);
}
ex_tcb * ex_send_top_tdb::buildInstance(ExExeStmtGlobals * glob,
Lng32 myInstanceNum,
Lng32 childInstanceNum)
{
ex_tcb * result;
// find out what type of connection is needed to communicate with the
// corresponding send bottom node
// do a lookup in the fragment table for the process id that
// has the send bottom node
// if this is extract consumer, create a processId from the esp phandle stored in tdb
IpcProcessId dummyProcId;
IpcProcessId &sendBottomProcId = dummyProcId;
if (getExtractConsumerFlag())
{
sendBottomProcId = *(IpcProcessId *) new (glob->getSpace()) IpcProcessId(getExtractEsp());
}
else // normal case
{
sendBottomProcId = glob->getInstanceProcessId(childFragId_, childInstanceNum);
}
result = new(glob->getSpace()) ex_send_top_tcb(*this,
glob,
sendBottomProcId,
myInstanceNum,
childInstanceNum);
result->registerSubtasks();
return result;
}
// -----------------------------------------------------------------------
// Methods for class ex_send_top_tcb
// -----------------------------------------------------------------------
/////////////////////////////////////////////////////////////////////////////
// Constructor
ex_send_top_tcb::ex_send_top_tcb(const ex_send_top_tdb& sendTopTdb,
ExExeStmtGlobals* glob,
const IpcProcessId& sendBottomProcId,
Lng32 myInstanceNum,
Lng32 childInstanceNum)
: ex_tcb(sendTopTdb,1,glob),
ipcBroken_(FALSE),
workAtp_(NULL),
myInstanceNum_(myInstanceNum),
childInstanceNum_(childInstanceNum),
currentBufferNumber_(1),
currentSendBuffer_(NULL),
currentReceiveBuffer_(NULL),
ioSubtask_(NULL),
ioCancelSubtask_(NULL),
connection_(NULL),
msgStream_(NULL),
cancelMessageStream_(NULL),
mySendTopTcbIndex_(NULL_COLL_INDEX),
childFragHandle_(NullFragInstanceHandle),
bottomProcId_(sendBottomProcId)
{
CollHeap * space = glob->getSpace();
ExMasterStmtGlobals *masterGlob = glob->castToExMasterStmtGlobals();
ExEspStmtGlobals *espGlob = glob->castToExEspStmtGlobals();
// We cannot build the SeaMonster target structure until fixup. In
// the master executor The target process ID and tag is not yet
// known. We will zero out the structure for now and populate it
// later inside fixup().
smTarget_.node = 0;
smTarget_.pid = 0;
smTarget_.tag = 0;
smTarget_.id = 0;
// for non master ESPs, the trace cannot be initialized until fixup
// if tracing is turned on using set session default.
// trace file and level are set in the ms.env file will enable us to
// trace from this point..
// If we are in the master executor and this is a parallel extract
// producer query then we need to register the top-level ESP
if (masterGlob != NULL && sendTopTdb.getExtractProducerFlag())
masterGlob->insertExtractEsp(sendBottomProcId);
// Allocate the queues to communicate with parent
qParent_.down = new(space) ex_queue(ex_queue::DOWN_QUEUE,
sendTopTdb.queueSizeDown_,
sendTopTdb.criDescDown_,
space);
// Allocate the private state in each entry of the down queue
ex_send_top_private_state *p =
new(space) ex_send_top_private_state(this);
qParent_.down->allocatePstate(p, this);
delete p;
qParent_.up = new(space) ex_queue(ex_queue::UP_QUEUE,
sendTopTdb.queueSizeUp_,
sendTopTdb.criDescUp_,
space);
workAtp_ = allocateAtp(sendTopTdb.workCriDesc_, space);
// fixup expressions
if (moveInputValues())
(void) moveInputValues()->fixup(0, getExpressionMode(), this,
glob->getSpace(), glob->getDefaultHeap(), FALSE, glob);
nextToSendDown_ = qParent_.down->getHeadIndex();
stTidx_ = 0;
for (int idx = 0; idx < NumSendTopTraceElements; idx++)
{
// initialize the trace buffer.
setStState(INVALID, __LINE__);
}
setStState(NOT_OPENED, __LINE__);
sendBufferSize_ = sendTopTdb.getSendBufferSize();
receiveBufferSize_ = sendTopTdb.getRecvBufferSize();
IpcMessageObjSize maxBufSize = sendBufferSize_ > receiveBufferSize_ ?
sendBufferSize_ : receiveBufferSize_;
maxBufSize += (sizeof(TupMsgBuffer) + sizeof(ExEspInputDataReqHeader) +
sizeof(ExEspOpenReqHeader));
// temporary: ??? Larry Schumacher ???
// Old class IpcMessageStream cannot route multi-buffer messages originating
// from new class IpcBufferedMsgStream so we add padding to ensure that the
// message fits in one buffer. When routing message streams have been updated
// to class IpcBufferedMsgStream we can remove the pad. The size of the
// message sent to send bottom is a fixed length except for a variable
// portion consisting of ComDiagsArea entries. Hopefully they won't exceed
// the amount of padding added (1000 bytes)! Response messages from send
// bottom are not routed through an old IpcMessageStream so they do not need
// padding (unless ComDiagsArea responses become normal traffic in which case
// we should optimize by expanding buffer size to include them).
// Note also that a few bytes of padding will be needed because the
// SqlBuffer inside the TupMsgBuffer gets aligned on an 8 byte boundary.
// If class TupMsgBuffer has a size that is not a multiple of 8, then
// the size of that space is 8 - (sizeof(TupMsgBuffer) mod 8).
maxBufSize += 1000;
// allocate a message stream to talk to send_bottom
msgStream_ = new(glob->getSpace())
ExSendTopMsgStream(glob,
getNumSendBuffers(),
getNumRecvBuffers(),
maxBufSize,
this);
if (sendTopTdb.getExchangeUsesSM())
msgStream_->setSMContinueProtocol(TRUE);
// are the send bottom and send top nodes in the same process?
if (masterGlob && masterGlob->getRtFragTable()->isLocal(sendBottomProcId))
// || espGlob && $$$$ TBD: check for ESP whether it's local
{
// get child tdb from global fragment instance map
//ex_tdb *childTdb = (ex_tdb *)glob->getInstanceMap()->childPtr_;
// build the child (for now $$$)
//guiChildTcb_ = childTdb->build(glob);
// get the send bottom node
// (assume only one local send bottom node $$$$)
//localChild_ = (ex_local_send_bottom_tcb *)
// ((ex_split_bottom_tcb *)guiChildTcb_)->getSendNode(0);
//msgStream_->setRecipient(localChild_->msgStream_);
setStState(OPEN_COMPLETE, __LINE__);
ABORT("local send bottom not implemented yet!");
}
if (espGlob)
{
// the ESP keeps track of all send tops in a given fragment instance
// for deadlock detection related to cancels
mySendTopTcbIndex_ = espGlob->registerSendTopTcb(this);
}
}
/////////////////////////////////////////////////////////////////////////////
// destructor
ex_send_top_tcb::~ex_send_top_tcb()
{
freeResources();
}
/////////////////////////////////////////////////////////////////////////////
// free all resources
void ex_send_top_tcb::freeResources()
{
delete qParent_.up;
qParent_.up = NULL;
delete qParent_.down;
qParent_.down = NULL;
if (workAtp_)
{
deallocateAtp(workAtp_, getSpace());
workAtp_ = NULL;
}
// We must delete the connection before deleting the message
// stream. This will guarantee that any outstanding I/O
// associated with the message stream over the connection
// will be cleaned up. If we do it in the reverse order,
// and there is an I/O outstanding, all sorts of dangling
// pointer scenarios are possible, resulting in abends or
// other unpredictable behavior.
delete connection_;
delete msgStream_;
delete cancelMessageStream_;
}
/////////////////////////////////////////////////////////////////////////////
ExWorkProcRetcode ex_send_top_tcb::sCancel(ex_tcb *tcb)
{ return ((ex_send_top_tcb *) tcb)->processCancel(); }
/////////////////////////////////////////////////////////////////////////////
// register tcb for work
void ex_send_top_tcb::registerSubtasks()
{
ExScheduler *sched = getGlobals()->getScheduler();
// register events for parent queues
ex_assert(qParent_.down && qParent_.up,"Parent queues must exist");
sched->registerInsertSubtask(ex_tcb::sWork, this, qParent_.down);
sched->registerCancelSubtask(sCancel, this, qParent_.down);
sched->registerUnblockSubtask(ex_tcb::sWork,this, qParent_.up);
// register a non-queue event for the IPC with the send top node
ioSubtask_ = sched->registerNonQueueSubtask(sWork,this);
ioCancelSubtask_ = sched->registerNonQueueSubtask(sCancel,this);
}
/////////////////////////////////////////////////////////////////////////////
// TCB fixup
Int32 ex_send_top_tcb::fixup()
{
if (sendTopTdb().getExchangeUsesSM())
{
// The purpose of this block is to populate the SeaMonster target
// structure. In the master executor the process ID is not known
// during the TCB constructor so we discover the value here.
ExExeStmtGlobals *glob = getGlobals()->castToExExeStmtGlobals();
ExEspStmtGlobals *espGlob = glob->castToExEspStmtGlobals();
// Find the SeaMonster query ID and tag. A lookup is performed
// with the child fragment number and the send top's instance
// number
int myFrag = (espGlob ? (int) espGlob->getMyFragId() : 0);
int myInstNum = (int) myInstanceNum_;
int childFrag = (int) sendTopTdb().getChildFragId();
int childInstNum = (int) childInstanceNum_;
int smTag = (int) sendTopTdb().getSMTag();
Int64 smQueryID = glob->getSMQueryID();
// Find the send bottom's node and pid
const GuaProcessHandle &phandle = bottomProcId_.getPhandle();
Int32 otherCPU, otherPID, otherNode;
SB_Int64_Type seqNum = 0;
phandle.decompose2(otherCPU, otherPID, otherNode
, seqNum
);
// Store SeaMonster information in the TCB and in IPC objects
// Seaquest NodeId == NV CpuNum
smTarget_.node = ExSM_GetNodeID(otherCPU);
smTarget_.pid = otherPID;
smTarget_.tag = smTag;
smTarget_.id = smQueryID;
ex_assert(smTarget_.id > 0, "Invalid SeaMonster query ID");
int downRowLen = (int) sendTopTdb().getDownRecordLength();
int upRowLen = (int) sendTopTdb().getUpRecordLength();
int downQueueLen = (int) sendTopTdb().queueSizeDown_;
int upQueueLen = (int) sendTopTdb().queueSizeUp_;
EXSM_TRACE(EXSM_TRACE_INIT|EXSM_TRACE_TAG, "SNDT FIXUP %p", this);
EXSM_TRACE(EXSM_TRACE_INIT|EXSM_TRACE_TAG, "SNDT frag %d inst %d",
myFrag, myInstNum);
EXSM_TRACE(EXSM_TRACE_INIT|EXSM_TRACE_TAG, "SNDT child frag %d inst %d",
childFrag, childInstNum);
// regress/executor/TEST123 expects this line of output when
// tracing is reduced to tags only
EXSM_TRACE(EXSM_TRACE_INIT|EXSM_TRACE_TAG,
"SNDT tgt %d:%d:%" PRId64 ":%d",
(int) smTarget_.node, (int) smTarget_.pid, smTarget_.id,
(int) smTarget_.tag);
EXSM_TRACE(EXSM_TRACE_INIT, "SNDT work stream %p", msgStream_);
EXSM_TRACE(EXSM_TRACE_INIT, "SNDT cancel stream %p",
cancelMessageStream_);
EXSM_TRACE(EXSM_TRACE_INIT, "SNDT send limit %d, in use limit %d",
(int) msgStream_->getSendBufferLimit(),
(int) msgStream_->getInUseBufferLimit());
EXSM_TRACE(EXSM_TRACE_INIT,
"SNDT send size %d", (int) sendBufferSize_);
EXSM_TRACE(EXSM_TRACE_INIT,
"SNDT recv size %d", (int) receiveBufferSize_);
EXSM_TRACE(EXSM_TRACE_INIT,
"SNDT stream max buf %d", (int) msgStream_->getBufferSize());
EXSM_TRACE(EXSM_TRACE_INIT, "SNDT send bufs %d recv bufs %d",
(int) getNumSendBuffers(), (int) getNumRecvBuffers());
EXSM_TRACE(EXSM_TRACE_INIT, "SNDT row len %d down, %d up",
downRowLen, upRowLen);
EXSM_TRACE(EXSM_TRACE_INIT, "SNDT down queue %d, up queue %d",
downQueueLen, upQueueLen);
} // if exchange uses SM
return 0;
}
/////////////////////////////////////////////////////////////////////////////
// tcb work method for queue processing
short ex_send_top_tcb::work()
{
EXSM_TRACE(EXSM_TRACE_WORK,"SNDT %p BEGIN WORK", this);
short result = WORK_OK;
if (ipcBroken_)
{
result = WORK_BAD_ERROR;
}
else
{
result = checkReceive();
EXSM_TRACE(EXSM_TRACE_WORK,"SNDT %p checkReceive rc %s",
this, ExWorkProcRetcodeToString(result));
if (result == WORK_OK)
{
result = checkSend();
EXSM_TRACE(EXSM_TRACE_WORK,"SNDT %p checkSend rc %s",
this, ExWorkProcRetcodeToString(result));
}
if (sendTopState_ != WAITING_FOR_OPEN_COMPLETION)
{
if (result == WORK_OK)
{
result = continueRequest();
EXSM_TRACE(EXSM_TRACE_WORK,"SNDT %p continueRequest rc %s",
this, ExWorkProcRetcodeToString(result));
}
if (qParent_.down->isEmpty())
{
msgStream_->releaseBuffers(); // do final garbage collection
if (sendTopTdb().getExchangeUsesSM())
{
ExExeStmtGlobals *glob = getGlobals()->castToExExeStmtGlobals();
EXSM_TRACE(EXSM_TRACE_WORK,"SNDT %p down queue empty", this);
EXSM_TRACE(EXSM_TRACE_WORK,"SNDT %p glob msgs %d cancels %d", this,
(int) glob->numSendTopMsgesOut(),
(int) glob->numCancelMsgesOut());
if (msgStream_)
EXSM_TRACE(EXSM_TRACE_WORK,"SNDT %p data resp pend %d", this,
(int) msgStream_->numOfResponsesPending());
if (cancelMessageStream_)
EXSM_TRACE(EXSM_TRACE_WORK,"SNDT %p cancel resp pend %d", this,
(int) cancelMessageStream_->numOfResponsesPending());
}
}
}
}
EXSM_TRACE(EXSM_TRACE_WORK, "SNDT %p END WORK %s %s", this,
ExWorkProcRetcodeToString(result),
getExSendTopStateString(sendTopState_));
return result;
}
/////////////////////////////////////////////////////////////////////////////
// check for response data from send bottom and put in up queue
short ex_send_top_tcb::checkReceive()
{
ExOperStats *statsEntry;
ExESPStats *espStats;
while (NOT qParent_.up->isFull())
{ // process data from send bottom
if (ipcBroken_)
return WORK_BAD_ERROR;
if (qParent_.down->isEmpty())
{
// No more requests from the parent. This should mean
// that there are no more data from the send bottom.
// Clean up any remaining exhausted or empty receive buffers.
do
{
if (currentReceiveBuffer_)
{
SqlBuffer* sqlBuf =
currentReceiveBuffer_->get_sql_buffer();
ex_assert_both_sides(sqlBuf->atEOTD(),
"Receiving extra rows from send bottom");
}
currentReceiveBuffer_ = getReceiveBuffer();
}
while (currentReceiveBuffer_);
// done, we might be called again if more empty receive buffers or
// new requests arrive
return WORK_OK;
}
// the corresponding request entry in the down queue
queue_index rindex = qParent_.down->getHeadIndex();
ex_queue_entry *rentry = qParent_.down->getQueueEntry(rindex);
ex_send_top_private_state & pstate =
*((ex_send_top_private_state *) rentry->pstate);
// get sql buffer from message stream
if (currentReceiveBuffer_ == NULL)
{
currentReceiveBuffer_ = getReceiveBuffer();
if (currentReceiveBuffer_ == NULL)
{
if (ipcBroken_)
return WORK_BAD_ERROR;
else
return WORK_OK;
}
SqlBuffer *sb = currentReceiveBuffer_->get_sql_buffer();
ex_assert(sb, "Invalid SqlBuffer pointer");
sb->driveUnpack();
EXSM_TRACE(EXSM_TRACE_WORK, "SNDT %p tupps arrived %d", this,
(int) sb->getTotalTuppDescs());
statsEntry = getStatsEntry();
if (statsEntry)
espStats = statsEntry->castToExESPStats();
else
espStats = NULL;
// we got a buffer. Update statistics
if (espStats)
{
espStats->bufferStats()->totalRecdBytes() +=
sb->getSendBufferSize();
espStats->bufferStats()->recdBuffers().addEntry(
sb->get_used_size());
}
}
int numRowsReturned = 0;
SqlBuffer* currSqlBuffer = currentReceiveBuffer_->get_sql_buffer();
currSqlBuffer->driveUnpack();
statsEntry = getStatsEntry();
while (NOT qParent_.up->isFull())
{ // move data from sql buffer to up queue
// data describing the row coming back
tupp nextTupp;
ControlInfo *ci;
NABoolean currBufferIsEmpty;
ComDiagsArea* diagsArea;
// the entry in the up-queue to be filled
ex_queue_entry* pentry = qParent_.up->getTailEntry();
// get the next row out of currSqlBuffer
currBufferIsEmpty = currSqlBuffer->moveOutSendOrReplyData(
FALSE,
&(pentry->upState),
nextTupp,
&ci,
&diagsArea, // will never be set not a non-NULL value
NULL);
if (currBufferIsEmpty)
{
currentReceiveBuffer_ = NULL;
if (sendTopTdb().logDiagnostics())
{
// 2^18 = about .25M, just needs to be
// very infrequent and efficient
if (((pstate.matchCount_++) % 2^18) == 0)
{
char msg[1024];
str_sprintf(msg, "Send top returning row # %d.",
(Lng32) pstate.matchCount_);
SQLMXLoggingArea::logExecRtInfo(NULL, 0, msg,
sendTopTdb().getExplainNodeId());
}
}
break;
}
// check for a diags area coming back with this row
if (ci->getIsExtDiagsAreaPresent())
{
IpcMessageObjType msgType;
ex_assert_both_sides(msgStream_->getNextObjType(msgType) AND
msgType == IPC_SQL_DIAG_AREA,
"no diags areas in message");
// construct a copy of diags area from message object
// and put it in the up queue entry
diagsArea =
ComDiagsArea::allocate (getGlobals()->getDefaultHeap());
*msgStream_ >> *diagsArea;
}
else
diagsArea = NULL;
if ((rentry->downState.request == ex_queue::GET_NOMORE) &&
(pentry->upState.status != ex_queue::Q_NO_DATA))
{
// Don't reply to up-queue. This request has been canceled.
// Ignore replies until Q_NO_DATA.
continue;
}
// fix index fields in the up state (state has already been set)
pentry->upState.downIndex = rindex;
pentry->upState.parentIndex = rentry->downState.parentIndex;
// no need to fiddle with matchNo, the sender of the message
// has done the right thing already
// pass the Tupp up to the parent queue: copy request ATP and
// append the tupp that we got from the message
pentry->copyAtp(rentry);
pentry->getTupp(rentry->criDesc()->noTuples()) = nextTupp;
pentry->setDiagsArea(diagsArea);
nextTupp.release();
// if stats were returned, unpack and merge them to the
// stat area.
if (ci->getIsExtStatsAreaPresent())
{
IpcMessageObjType msgType;
ex_assert_both_sides(msgStream_->getNextObjType(msgType) AND
msgType == IPC_SQL_STATS_AREA,
"no stats area in message");
CollHeap * defaultHeap = getGlobals()->getDefaultHeap();
ExStatisticsArea* statArea = new(defaultHeap)
ExStatisticsArea(defaultHeap, 0,
((ComTdb*)getTdb())->getCollectStatsType());
*msgStream_ >> *statArea;
if ( getGlobals()->statsEnabled() )
mergeStats(statArea);
EXSM_TRACE(EXSM_TRACE_WORK,"SNDT %p merged stats", this);
// we can get rid of the received area now
NADELETE(statArea, ExStatisticsArea, defaultHeap);
}
// If statistics are being collected, update the number of
// rows returned now.
if(pentry->upState.status != ex_queue::Q_NO_DATA)
{
if(statsEntry)
statsEntry->incActualRowsReturned();
}
qParent_.up->insert();
numRowsReturned++;
// if this is EOF then we are done with the request
if (pentry->upState.status == ex_queue::Q_NO_DATA)
{
EXSM_TRACE(EXSM_TRACE_WORK,"SNDT %p returned Q_NO_DATA", this);
ex_assert_both_sides( (pstate.step_ == STARTED_) ||
(pstate.step_ == CANCELED_AFTER_SENT_),
"Send top responding incorrectly!");
pstate.step_ = NOT_STARTED_;
qParent_.down->removeHead();
// rentry and pstate have become invalid, exit to outer loop
break;
}
} // while parent up queue not full
EXSM_TRACE(EXSM_TRACE_WORK,
"SNDT %p queue entries returned: %d",
this, numRowsReturned);
if (qParent_.up->isFull())
EXSM_TRACE(EXSM_TRACE_WORK, "SNDT %p up queue is full", this);
} // while parent down queue not empty and parent up queue not full
// At this point we are either waiting for another buffer from the
// send bottom node or the up queue is full. In both cases an event
// will wake us up if there is more work.
return WORK_OK;
}
/////////////////////////////////////////////////////////////////////////////
// get a receive buffer from the message stream
TupMsgBuffer* ex_send_top_tcb::getReceiveBuffer()
{
IpcMessageObjType msgType;
while (msgStream_->getNextReceiveMsg(msgType))
{
ex_assert_both_sides(msgType == IPC_MSG_SQLESP_DATA_REPLY,
"received message from unknown message stream");
while (msgStream_->getNextObjType(msgType))
{
if (msgType == ESP_RETURN_STATUS_HDR)
{ // reply to open message, save child ExFragInstanceHandle
// only a reply to an OPEN can be of this message type
ex_assert_both_sides(sendTopState_ == WAITING_FOR_OPEN_REPLY,
"received status reply after opening send bottom");
ExEspReturnStatusReplyHeader* statHdr =
new(msgStream_->receiveMsgObj())
ExEspReturnStatusReplyHeader(msgStream_);
childFragHandle_ = statHdr->handle_;
setStState(OPEN_COMPLETE, __LINE__);
}
// check for a diagnostics area, returned from server during OPEN
else if (msgType == ESP_DIAGNOSTICS_AREA)
{
// construct a copy of diags area from message object
ComDiagsArea* diagsArea =
ComDiagsArea::allocate (getGlobals()->getDefaultHeap());
Lng32 objSize = msgStream_->getNextObjSize();
// The diagsArea could have come packed from buffered or unbuffered stream.
// If the sender used unbuffered stream, it will send only the header first,
// followed by the actual diagsArea. If the sender used buffered stream,
// it will only send the actual diags area.
if (objSize == sizeof(IpcMessageObj))
{
IpcMessageObj* packedObj = msgStream_->receiveMsgObj(); // ignore the header
packedObj = msgStream_->receiveMsgObj();
ex_assert_both_sides(packedObj,
"error receiving diags area from unbuffered stream");
diagsArea->unpackObj(packedObj->getType(),
packedObj->getVersion(),
TRUE,
packedObj->getObjLength(),
(const char *)packedObj);
}
else
{
*msgStream_ >> *diagsArea;
}
// check for connection errors
if (diagsArea->getNumber(DgSqlCode::ERROR_) > 0)
{
ipcBroken_ = TRUE;
}
// merge the returned diagnostics area with the main one
if (getGlobals()->castToExExeStmtGlobals()->getDiagsArea())
{
getGlobals()->castToExExeStmtGlobals()->
getDiagsArea()->mergeAfter(*diagsArea);
// clean up
diagsArea->deAllocate();
}
else
{
getGlobals()->castToExExeStmtGlobals()->
setGlobDiagsArea(diagsArea);
diagsArea->decrRefCount();
}
}
else if (msgType == ESP_RETURN_DATA_HDR)
{ // reply data header, get sql buffer and return for processing
ExEspReturnDataReplyHeader* dataHdr =
new(msgStream_->receiveMsgObj())
ExEspReturnDataReplyHeader(msgStream_);
ex_assert_both_sides(msgStream_->getNextObjType(msgType) AND
msgType == ESP_OUTPUT_SQL_BUFFER,
"output data header w/o output data buffer received");
if (dataHdr->stopSendingData_)
setStState(SERVER_SATURATED, __LINE__);
else
setStState(OPEN_COMPLETE, __LINE__);
TupMsgBuffer *buffer =
new(msgStream_->receiveMsgObj()) TupMsgBuffer(msgStream_);
return buffer;
}
else
{
ex_assert_both_sides(0, "Unexpected reply received from ESP");
}
}
}
return NULL;
}
/////////////////////////////////////////////////////////////////////////////
// check for data in the down queue to send to send bottom
short ex_send_top_tcb::checkSend()
{
ExEspStmtGlobals *espGlobals =
getGlobals()->castToExExeStmtGlobals()->castToExEspStmtGlobals();
// free up any receive buffers no longer in use
msgStream_->cleanupBuffers();
if (sendTopState_ == SERVER_SATURATED OR
sendTopState_ == WAITING_FOR_OPEN_REPLY)
return WORK_OK; // send continue messages only
while (qParent_.down->entryExists(nextToSendDown_))
{ // build sql buffers and send to send bottom
if (ipcBroken_)
return WORK_BAD_ERROR;
// Need to handle canceled entries here in the outer loop
// (which executes on the first queue entry, and whenever
// we get a new buffer), as well as in the inner loop (which
// handles all the queue entries that fit in one buffer) below.
ex_queue_entry* pentry = qParent_.down->getQueueEntry(nextToSendDown_);
ex_send_top_private_state & pstate
= *((ex_send_top_private_state *) pentry->pstate);
ex_assert(pstate.step_ == NOT_STARTED_,
"Message shoulda been NOT_STARTED_");
if (espGlobals)
espGlobals->setSendTopTcbActivated(mySendTopTcbIndex_);
// NOTE: the down request we are working on may be a GET_NOMORE
// request. We could optimize the logic here by not sending a
// GET_NOMORE request down. However, that would just trigger
// some "late cancel" logic in the split bottom node above us
// (if applicable). Cancels in this parallel data stream are
// best dealt with in the split bottom node below us, therefore
// we'll send this request down even if it is a cancel.
// flow control: don't send any more requests down if
// there are too many outstanding requests or if we
// have too many buffers in use
if (msgStream_->sendLimitReached())
return WORK_OK;
if (msgStream_->inUseLimitReached())
return WORK_POOL_BLOCKED;
// Am I being asked to refrain from new I/O?
if (getGlobals()->castToExExeStmtGlobals()->noNewRequest())
{
// If my root is split_bottom, it will use its list of
// activated send tops reschedule me when I/O can resume.
// See the call to setSendTopTcbActivated above.
return WORK_OK;
}
// get sql buffer from message stream
currentSendBuffer_ = getSendBuffer();
if (currentSendBuffer_ == NULL)
{
if (ipcBroken_)
{
return WORK_BAD_ERROR;
}
else
{
if (sendTopState_ == WAITING_FOR_OPEN_COMPLETION)
return WORK_OK;
else
return WORK_POOL_BLOCKED;
}
}
SqlBuffer* sqlBuf = currentSendBuffer_->get_sql_buffer();
NABoolean sendSpaceAvailable = TRUE;
do // fill sql buffer with data from down queue
{
ex_queue_entry* pentry =
qParent_.down->getQueueEntry(nextToSendDown_);
ex_send_top_private_state & pstate
= *((ex_send_top_private_state *) pentry->pstate);
ex_assert(pstate.step_ == NOT_STARTED_,
"Message shoulda been NOT_STARTED_");
//
// Allocate space in buffer for control info (down state) and
// input row, if one is present
//
tupp_descriptor* controlInfoAsTupp =
sqlBuf->add_tuple_desc((unsigned short) sizeof(ControlInfo));
NABoolean isInputRowToBeSent = (moveInputValues() != NULL);
tupp_descriptor* rowToSend =
isInputRowToBeSent ?
sqlBuf->add_tuple_desc(sendTopTdb().getDownRecordLength()) :
NULL;
NABoolean isDiagsAreaToBeSent = pentry->getDiagsArea() != NULL;
//
// Check that there was sufficient space in buffer.
//
sendSpaceAvailable = (controlInfoAsTupp != NULL) AND
(!isInputRowToBeSent OR rowToSend != NULL);
if (!sendSpaceAvailable)
{
//
// Sufficient space is not available in buffer. Deallocate
// control info tupp_descriptor, if non-null.
//
if (controlInfoAsTupp) sqlBuf->remove_tuple_desc();
if (rowToSend) sqlBuf->remove_tuple_desc();
}
else
{
//
// There was space in the send buffer --- copy the current input
// row into it and advance to the next input queue entry.
//
ControlInfo msgControlInfo;
//
// Copy the down state and flags into first tupp allocated
// in the message buffer.
//
msgControlInfo.getDownState() = pentry->downState;
msgControlInfo.getDownState().parentIndex = nextToSendDown_;
msgControlInfo.setIsDataRowPresent(isInputRowToBeSent);
msgControlInfo.setIsExtDiagsAreaPresent(isDiagsAreaToBeSent);
str_cpy_all(controlInfoAsTupp->getTupleAddress(),
(char *) &msgControlInfo,
sizeof(ControlInfo));
// SqlBuffer::findAndCancel may need this.
controlInfoAsTupp->setControlTupleType();
if (isInputRowToBeSent)
{
// SqlBuffer::findAndCancel may need this.
controlInfoAsTupp->setDataTupleType();
//
// Evaluate the move input expression, copying the
// actual input row into the second tupp allocated
// in the message buffer.
//
workAtp_->getTupp((unsigned short)
sendTopTdb().moveExprTuppIndex_) = rowToSend;
ex_expr::exp_return_type retval =
moveInputValues()->eval(pentry->getAtp(),workAtp_);
ex_assert(retval != ex_expr::EXPR_ERROR,
"Add error handling");
// release the tupp_descriptor in the buffer
workAtp_->getTupp((unsigned short)
sendTopTdb().moveExprTuppIndex_) = NULL;
}
//
// If there is a diagnostics area to send, then add
// it to the message following the sql buffer.
//
if (isDiagsAreaToBeSent)
{ // construct a copy of ComDiagsArea in message
*msgStream_ << *(pentry->getDiagsArea());
}
// this one is sent down (ok, packed into the buffer),
// so remember its state and buffer number (in case
// we cancel later) and advance to the next down queue entry
pstate.step_ = STARTED_;
pstate.bufferNumber_ = currentBufferNumber_;
pstate.matchCount_ = 0;
nextToSendDown_++;
}
} while (sendSpaceAvailable AND
qParent_.down->entryExists(nextToSendDown_));
// send sql buffer even if not completely full
ex_assert(NOT currentSendBuffer_->get_sql_buffer()->isEmpty(),
"Send top's sql buffer is empty");
// indicate if stats are enabled or not
sqlBuf->setStatsEnabled(getGlobals()->statsEnabled());
currentSendBuffer_ = NULL;
ExOperStats *statsEntry;
if ((statsEntry = getStatsEntry()) != NULL)
{
ExESPStats *stats = statsEntry->castToExESPStats();
if (stats)
{
stats->bufferStats()->totalSentBytes() +=
sqlBuf->getSendBufferSize();
stats->bufferStats()->sentBuffers().addEntry(sqlBuf->get_used_size());
}
}
// pack sql buffer
sqlBuf->drivePack();
EXSM_TRACE(EXSM_TRACE_WORK,"SNDT %p SEND data %d tupps",
this, (int) sqlBuf->getTotalTuppDescs());
msgStream_->sendRequest();
currentBufferNumber_++;
}
return WORK_OK;
}
/////////////////////////////////////////////////////////////////////////////
// get a send buffer from the message stream
TupMsgBuffer* ex_send_top_tcb::getSendBuffer()
{
if (sendTopState_ == NOT_OPENED ||
sendTopState_ == WAITING_FOR_OPEN_COMPLETION)
{
// Don't even try to open the connection before the child process
// isn't ready. If this is the master, check for the run-time fragment
// directory to be in the "READY" state. If this is an ESP, it's ok
// to go ahead and open, the other ESP must have received the load
// message for its fragment instance by now, since the master has
// already sent us data via its send top nodes.
// The above argument does not apply for parallel extract consumer.
ExExeStmtGlobals* glob = getGlobals()->castToExExeStmtGlobals();
ExMasterStmtGlobals* masterGlob = glob->castToExMasterStmtGlobals();
if (sendTopState_ == NOT_OPENED)
{
if (!sendTopTdb().getExtractConsumerFlag())
{
if (masterGlob AND
masterGlob->getRtFragTable() AND
masterGlob->getRtFragTable()->getState() != ExRtFragTable::READY)
return NULL; // retry later
}
// the connection may already be created thru late cancel.
// if so, there is no need to create another one.
if (connection_ == NULL)
if (createConnectionToSendBottom() != 0 ||
#ifdef NA_LINUX_DISABLE // multi fragment esp
sendTopState_ == NOT_OPENED ||
#endif
sendTopState_ == WAITING_FOR_OPEN_COMPLETION)
return NULL;
}
if (sendTopState_ == WAITING_FOR_OPEN_COMPLETION)
{
if (createConnectionToSendBottom() != 0) // Finish creating connection
return NULL;
}
EXSM_TRACE(EXSM_TRACE_BUFFER,
"SNDT %p about to allocate msg buffer", this);
msgStream_->addRecipient(connection_);
// construct open request in message,
// ask child ESP to respond with ExFragInstanceHandle
ExEspOpenReqHeader *openReq =
new(*msgStream_) ExEspOpenReqHeader((NAMemory *) NULL);
if (openReq == NULL)
return NULL;
// if this is parallel extract consumer sendTop, send the security info too
if (sendTopTdb().getExtractConsumerFlag())
{
openReq->myInstanceNum_ = 0;
openReq->statID_ = 0;
openReq->setOpenType(ExEspOpenReqHeader::PARALLEL_EXTRACT);
// Send the securityInfo object along with the OPEN
// message. The user name we put in the message must not have
// a ",<password>" suffix. If the user name we get from the
// context does have the password suffix, we strip it off.
IpcEnvironment *ipcEnv =
getGlobals()->castToExExeStmtGlobals()->getIpcEnvironment();
CollHeap *ipcHeap = ipcEnv->getHeap();
ExMsgSecurityInfo *secInfo = new (ipcHeap) ExMsgSecurityInfo(ipcHeap);
const char *key = sendTopTdb().getExtractSecurityKey();
Int32 len = str_len(key);
char *copyOfKey = (char *) ipcHeap->allocateMemory(len + 1);
str_cpy_all(copyOfKey, key, len + 1);
secInfo->setSecurityKey(copyOfKey);
ContextCli *context = masterGlob->getContext();
ex_assert(context, "Invalid ContextCli pointer");
// We need to retrieve a user identifier from ContextCli. On
// Linux it will be the 32-bit integer and we will send the
// string representation of that integer. On other platforms
// it will be a user name.
// NOTE: The user ID for the extract security check is
// currently sent and compared as a C string. On Linux it is
// possible to send and compare integers which would lead to
// simpler code. The code to send/compare strings is still
// used because it works on all platforms.
const char *idToSend = NULL;
Int32 *userID = context->getDatabaseUserID();
Int32 userAsInt = *((Int32 *) userID);
char userIDBuf[32];
sprintf(userIDBuf, "%d", (int) userAsInt);
idToSend = &(userIDBuf[0]);
ex_assert(idToSend, "Could not retrieve user ID from ContextCli");
char *copyOfId = NULL;
// On platforms other than Linux the user identifer can be a
// "username,password" string. We only want to send the
// characters before the comma.
char *locationOfComma = NULL;
if (locationOfComma == NULL)
{
len = str_len(idToSend);
copyOfId = (char *) ipcHeap->allocateMemory(len + 1);
str_cpy_all(copyOfId, idToSend, len + 1);
secInfo->setAuthID(copyOfId);
}
else
{
len = locationOfComma - idToSend;
copyOfId = (char *) ipcHeap->allocateMemory(len + 1);
str_cpy_all(copyOfId, idToSend, len);
copyOfId[len] = 0;
secInfo->setAuthID(copyOfId);
}
*msgStream_ << *secInfo;
secInfo->decrRefCount();
secInfo = NULL;
}
else // normal case, this is not an extract consumer
{
openReq->key_ = glob->getFragmentKey(sendTopTdb().getChildFragId());
openReq->myInstanceNum_ = getMyInstanceNum();
openReq->statID_ = 0; // not used at this point
}
setStState(WAITING_FOR_OPEN_REPLY, __LINE__);
}
else if ((sendTopState_ == OPEN_COMPLETE) &&
(msgStream_->getRecipients().isEmpty()))
{
// This can happen if query has been executed once before and this
// send_top did a late cancel.
msgStream_->addRecipient(connection_);
}
// construct Input data request header in message,
// include child ExFragInstanceHandle for fast routing
ExEspInputDataReqHeader *hdr =
new(*msgStream_) ExEspInputDataReqHeader((NAMemory *) NULL);
if (hdr == NULL)
return NULL;
hdr->handle_ = childFragHandle_;
hdr->myInstanceNum_ = getMyInstanceNum();
hdr->injectErrorAtQueueFreq_ = getGlobals()->getInjectErrorAtQueue();
#ifdef _DEBUG
if (getenv("TEST_ERROR_AT_QUEUE_NO_ESP"))
hdr->injectErrorAtQueueFreq_ = 0;
#endif
// construct sql buffer(TupMsgBuffer) directly in message to avoid copy
TupMsgBuffer *result =
new(*msgStream_, sendBufferSize_) TupMsgBuffer(sendBufferSize_,
TupMsgBuffer::MSG_IN,
msgStream_);
if (result == NULL)
ABORT("mismatched send top buffer size");
return result;
}
/////////////////////////////////////////////////////////////////////////////
// pre-send continue requests in anticipation of many reply data messages
short ex_send_top_tcb::continueRequest()
{
if (qParent_.down->isEmpty() OR
sendTopState_ == WAITING_FOR_OPEN_REPLY OR
sendTopState_ == NOT_OPENED OR
sendTopState_ == WAITING_FOR_OPEN_COMPLETION)
return WORK_OK;
if (msgStream_->sendLimitReached())
EXSM_TRACE(EXSM_TRACE_CONTINUE,"SNDT %p send limit reached, rp %d", this,
(int) msgStream_->numOfResponsesPending());
// send continue requests until the limit for outstanding requests
// or for incoming and used buffers is reached
while (NOT msgStream_->sendLimitReached())
{
if (ipcBroken_)
return WORK_BAD_ERROR;
if (msgStream_->inUseLimitReached())
{
EXSM_TRACE(EXSM_TRACE_CONTINUE,
"SNDT %p CREQ in use limit reached, rp %d", this,
(int) msgStream_->numOfResponsesPending());
// We can't send another continue request down because there
// is no room up here to take the reply. Return a status that
// causes this task to be rescheduled. Next time, methods
// cleanupBuffers() and checkReceive() can reduce the number
// of in-use buffers and of unread buffers in the receive
// queue.
return WORK_POOL_BLOCKED;
}
// Am I being asked to refrain from new I/O?
if (getGlobals()->castToExExeStmtGlobals()->noNewRequest())
{
ExEspStmtGlobals *espGlobals = getGlobals()->
castToExExeStmtGlobals()->castToExEspStmtGlobals();
if (espGlobals)
{
// Make sure my root (split_bottom) knows to schedule me
// when I/O can resume.
espGlobals->setSendTopTcbActivated(mySendTopTcbIndex_);
}
return WORK_OK;
}
// construct continue request header in message,
// include child ExFragInstanceHandle for fast routing
ExEspContinueReqHeader *hdr =
new(*msgStream_) ExEspContinueReqHeader((NAMemory *) NULL);
if (hdr == NULL)
return WORK_POOL_BLOCKED;
hdr->handle_ = childFragHandle_;
hdr->myInstanceNum_ = getMyInstanceNum();
EXSM_TRACE(EXSM_TRACE_CONTINUE,
"SNDT %p CREQ sending continue", this);
msgStream_->sendRequest();
}
return WORK_OK;
}
void ex_send_top_tcb::checkCancelReply()
{
if (!cancelMessageStream_)
{
// no replies yet if no message stream yet.
}
else
{
IpcMessageObjType msgType;
while (cancelMessageStream_->getNextReceiveMsg(msgType))
{
ex_assert_both_sides(msgType == IPC_MSG_SQLESP_CANCEL_REPLY,
"received message from unknown message stream");
while (cancelMessageStream_->getNextObjType(msgType))
{
ex_assert_both_sides(msgType == ESP_RETURN_CANCEL_HDR,
"Wrong type of message received by cancelMessageStream!");
EXSM_TRACE(EXSM_TRACE_CANCEL,
"SNDT %p recv ESP_RETURN_CANCEL_HDR", this);
EXSM_TRACE(EXSM_TRACE_CANCEL,"SNDT %p state %s", this,
getExSendTopStateString(sendTopState_));
ExEspCancelReplyHeader *dataHdr =
new(cancelMessageStream_->receiveMsgObj())
ExEspCancelReplyHeader(cancelMessageStream_);
// if CANCELED_BEFORE_OPENED, only the late cancel message
// was sent, not the open message. Go back to NOT_OPENED
// state so that the open message could be sent.
// See the state transition diagram in the header file.
if (sendTopState_ == CANCELED_BEFORE_OPENED)
{
setStState(NOT_OPENED, __LINE__);
}
}
}
// free up any receive buffers no longer in use
cancelMessageStream_->cleanupBuffers();
} // if cancelMessageStream_ ... else ...
}
/////////////////////////////////////////////////////////////////////////////
//
ExWorkProcRetcode ex_send_top_tcb::processCancel()
{
EXSM_TRACE(EXSM_TRACE_CANCEL,"SNDT %p BEGIN processCancel", this);
ExWorkProcRetcode result = processCancelHelper();
if (sendTopTdb().getExchangeUsesSM())
{
ExExeStmtGlobals *glob = getGlobals()->castToExExeStmtGlobals();
EXSM_TRACE(EXSM_TRACE_CANCEL,"SNDT %p glob msgs %d cancels %d", this,
(int) glob->numSendTopMsgesOut(),
(int) glob->numCancelMsgesOut());
if (cancelMessageStream_)
EXSM_TRACE(EXSM_TRACE_CANCEL,"SNDT %p cancel resp pend %d", this,
(int) cancelMessageStream_->numOfResponsesPending());
EXSM_TRACE(EXSM_TRACE_CANCEL,"SNDT %p END processCancel rc %s", this,
ExWorkProcRetcodeToString(result));
}
return result;
}
ExWorkProcRetcode ex_send_top_tcb::processCancelHelper()
{
if (ipcBroken_)
return WORK_BAD_ERROR;
checkCancelReply();
// Are there any messages still waiting to be canceled? Am I too late?
if (qParent_.down->isEmpty())
{
EXSM_TRACE(EXSM_TRACE_CANCEL,"SNDT %p down queue is empty", this);
return WORK_OK;
}
TupMsgBuffer * cancelSendBuffer = NULL;
queue_index pindex = qParent_.down->getHeadIndex();
while (pindex != nextToSendDown_)
{
ex_queue_entry *pentry = qParent_.down->getQueueEntry(pindex);
ex_send_top_private_state & pstate
= *((ex_send_top_private_state *) pentry->pstate);
SqlBuffer *sqlBuf = NULL;
if ((pentry->downState.request == ex_queue::GET_NOMORE) &&
(pstate.step_ == STARTED_))
{
// Am I being asked to refrain from new I/O?
if (getGlobals()->castToExExeStmtGlobals()->noNewRequest())
{
ExEspStmtGlobals *espGlobals = getGlobals()->
castToExExeStmtGlobals()->castToExEspStmtGlobals();
if (espGlobals)
{
// Make sure my root (split_bottom) knows to schedule me
// when I/O can resume.
espGlobals->setSendTopTcbActivated(mySendTopTcbIndex_);
}
EXSM_TRACE(EXSM_TRACE_CANCEL,
"SNDT %p must refrain from new I/O", this);
return WORK_OK;
}
if (cancelSendBuffer == NULL)
{
if (cancelMessageStream_)
if (cancelMessageStream_->sendLimitReached() ||
cancelMessageStream_->inUseLimitReached())
{
EXSM_TRACE(EXSM_TRACE_CANCEL,
"SNDT %p stream limits prevent new I/O", this);
return WORK_OK;
}
// cancelMessageStream_ may be NULL, but it's OK
// getCancelSendBuffer can establish the stream and
// connection_ should be already created.
cancelSendBuffer = getCancelSendBuffer(FALSE);
if (cancelSendBuffer == NULL)
{
EXSM_TRACE(EXSM_TRACE_CANCEL,
"SNDT %p send buffer not available", this);
return WORK_POOL_BLOCKED;
}
sqlBuf = cancelSendBuffer->get_sql_buffer();
}
// Allocate space in buffer for control info (down state).
//
tupp_descriptor *controlInfoAsTupp =
sqlBuf->add_tuple_desc((unsigned short) sizeof(ControlInfo));
// Check that there was sufficient space in buffer.
//
if (controlInfoAsTupp == NULL)
{
//
// Sufficient space is not available in buffer.
// Will cancel more later. Will rely on actOnReceive
// to schedule me, after reply from ESP.
EXSM_TRACE(EXSM_TRACE_CANCEL,
"SNDT %p no room in buffer for ControlInfo", this);
break;
}
else
{
//
// There was space in the send buffer --- copy the current input
// row into it and advance to the next input queue entry.
//
ControlInfo msgControlInfo;
//
// Copy the down state and flags into first tupp allocated
// in the message buffer.
//
msgControlInfo.getDownState() = pentry->downState;
msgControlInfo.getDownState().parentIndex = pindex;
msgControlInfo.setIsDataRowPresent(FALSE);
msgControlInfo.setIsExtDiagsAreaPresent(FALSE);
msgControlInfo.setBufferSequenceNumber(pstate.bufferNumber_);
str_cpy_all(controlInfoAsTupp->getTupleAddress(),
(char *) &msgControlInfo,
sizeof(ControlInfo));
// Now the pentry's pstate.step_ does its transition....
pstate.step_ = CANCELED_AFTER_SENT_;
}
} // if GET_NOMORE && STARTED_
pindex++;
} // while pindex != nextToSendDown_
if (cancelSendBuffer)
{
SqlBuffer *sqlBuf = cancelSendBuffer->get_sql_buffer();
sqlBuf->drivePack();
EXSM_TRACE(EXSM_TRACE_CANCEL,"SNDT %p SEND cancel %d tupps", this,
(int) sqlBuf->getTotalTuppDescs());
cancelMessageStream_->sendRequest();
}
else
{
if (sendTopTdb().getExchangeUsesSM() && pindex == nextToSendDown_)
EXSM_TRACE(EXSM_TRACE_CANCEL,
"SNDT %p no outstanding entries to cancel", this);
}
return WORK_OK;
}
/////////////////////////////////////////////////////////////////////////////
//
short ex_send_top_tcb::createConnectionToSendBottom(NABoolean nowaitedCompleted)
{
short retcode = 0;
if (sendTopTdb().getExchangeUsesSM())
retcode = createSMConnection();
else
retcode = createIpcGuardianConnection(nowaitedCompleted);
return retcode;
}
short ex_send_top_tcb::createSMConnection()
{
short retcode = 0;
IpcEnvironment *env =
getGlobals()->castToExExeStmtGlobals()->getIpcEnvironment();
// The numBufs variable represents the number of pre-allocated
// receive buffers. The connection will need N batches of M buffers
// where:
// N = getNumSendBuffers()
// M = getNumRecvBuffers()
//
// And we add one for cancel replies
UInt32 numBufs = (getNumSendBuffers() * getNumRecvBuffers()) + 1;
ExMasterStmtGlobals *masterGlobals =
getGlobals()->castToExExeStmtGlobals()->castToExMasterStmtGlobals();
connection_ = new (env->getHeap())
SMConnection(env,
smTarget_,
numBufs,
msgStream_->getBufferSize(),
this,
masterGlobals);
EXSM_TRACE(EXSM_TRACE_PROTOCOL,"SNDT %p created conn %p",
this, connection_);
ex_assert(connection_, "SM connection allocation failed");
SMConnection *smConn = (SMConnection *)
connection_->castToSMConnection();
ex_assert(smConn, "Invalid SM connection pointer");
smConn->setDataStream(msgStream_);
smConn->setCancelStream(cancelMessageStream_);
return retcode;
}
/////////////////////////////////////////////////////////////////////////////
//
short ex_send_top_tcb::createIpcGuardianConnection(NABoolean nowaitedCompleted)
{
short retcode;
IpcEnvironment *env = getGlobals()->castToExExeStmtGlobals()->getIpcEnvironment();
static THREAD_P bool sv_env_multiple_fragments_checked = false;
static THREAD_P bool sv_multiple_fragments = true;
Lng32 nowaitDepth = 4;
if (getStatsEntry() && sendTopTdb().getUseOldStatsNoWaitDepth())
nowaitDepth = 2;
if (sendTopState_ == NOT_OPENED)//added
{
NABoolean espParallelDcOpens;
char * espParallelDcOpensEnvvar = getenv("ESP_PARALLEL_DC_OPENS");
if (espParallelDcOpensEnvvar != NULL && *espParallelDcOpensEnvvar == '0')
espParallelDcOpens = FALSE;
else { // multi fragment esp
#ifdef USE_SB_FILE_DYN_THREADS
sv_max_parallel_opens = FS_MAX_CONCUR_NOWAIT_OPENS;
#else
sv_max_parallel_opens = 8;
#endif
if (espParallelDcOpensEnvvar) {
sv_max_parallel_opens = atoi(espParallelDcOpensEnvvar);
}
espParallelDcOpens = TRUE;
}
if (!sv_env_multiple_fragments_checked) {
char *lv_espMultipleFragmentsEnvvar = getenv("ESP_MULTI_FRAGMENTS");
if ((lv_espMultipleFragmentsEnvvar != NULL) &&
(*lv_espMultipleFragmentsEnvvar == '0')) {
sv_multiple_fragments = false;
}
sv_env_multiple_fragments_checked = true;
}
if (sv_multiple_fragments) {
if ((env->getNumOpensInProgress() >= sv_max_parallel_opens) && nowaitedCompleted == false) {
Int32 lv_err;
struct timespec lv_tv;
lv_tv.tv_sec = 0;
lv_tv.tv_nsec = 100 * 1000;
lv_err = nanosleep(&lv_tv, NULL);
return 1;
}
}
// Open the bottom ESP by creating a connection to
// it. createConnectionToServer() can return NULL if bottomProcId_
// does not refer to a running server.
connection_ = bottomProcId_.createConnectionToServer(
env,
FALSE, // transactions are sent via the control connection
nowaitDepth,
espParallelDcOpens,
ioSubtask_->getScheduledAddr()
,
TRUE // data connection to ESP
);
if (connection_ && connection_->getState() == IpcConnection::OPENING)
{
if (nowaitedCompleted)
{
connection_->openPhandle(NULL, TRUE);
if (connection_->getState() == IpcConnection::ESTABLISHED)
{
setStState(OPEN_COMPLETE, __LINE__);
return 0;
}
}
else
{
//connection_->openPhandle(NULL, FALSE); // Temporarily
setStState(WAITING_FOR_OPEN_COMPLETION, __LINE__);
return 0;
}
}
if (connection_ && connection_->getState() == IpcConnection::ESTABLISHED)
{
#ifdef NA_LINUX_DISABLE // multi fragment esp experiment
setStState(OPEN_COMPLETE, __LINE__);
#endif
return 0;
}
} // if (state == NOT_OPENED)
else
{
ex_assert(sendTopState_ == WAITING_FOR_OPEN_COMPLETION,
"ex_send_top_tcb::createConnectionToSendBottom was called in an invalid sendTopState_");
}
// If the connection is NULL or the connection encountered errors,
// add diagnostics to the statement globals diags area. Also set the
// TCB's ipcBroken flag and set retcode to indicate an error.
if (connection_ == NULL ||
(connection_ && connection_->getErrorInfo()))
{
ComDiagsArea* da =
getGlobals()->castToExExeStmtGlobals()->getDiagsArea();
// If da is NULL, allocate a new diags area and attach to
// statement globals
if (da == NULL)
{
da = ComDiagsArea::allocate(getHeap());
getGlobals()->castToExExeStmtGlobals()->setGlobDiagsArea(da);
da->decrRefCount();
}
// Two cases to consider
//
// a. connection_ is NULL. This can happen when the ESP phandle
// does not represent a running server. For an extract consumer
// query the phandle comes from query text so we report an error
// to the application. Otherwise the phandle is managed
// internally and should always be valid, so it's a bug if we
// come here and we raise an assertion failure.
//
// b. connection is not NULL and contains error information. Add
// the error information to the statement globals diags area.
if (connection_ == NULL)
{
if (sendTopTdb().getExtractConsumerFlag())
{
const char *stringForDiags = sendTopTdb().getExtractEsp();
*da << DgSqlCode(-EXE_PARALLEL_EXTRACT_CONNECT_ERROR)
<< DgString0(stringForDiags);
}
else
{
ex_assert(FALSE, "createConnectionToSendBottom failed to connect");
}
}
else
{
connection_->populateDiagsArea(da, getHeap());
}
setIpcBroken();
retcode = -1;
}
else
{
retcode = 0;
}
return retcode;
}
/////////////////////////////////////////////////////////////////////////////
//
ex_queue_pair ex_send_top_tcb::getParentQueue() const
{
return qParent_;
}
/////////////////////////////////////////////////////////////////////////////
//
const ex_tcb* ex_send_top_tcb::getChild(Int32 pos) const
{
ex_assert((pos >= 0), "");
return NULL;
}
/////////////////////////////////////////////////////////////////////////////
//
Int32 ex_send_top_tcb::numChildren() const
{
return 0;
}
ExOperStats * ex_send_top_tcb::doAllocateStatsEntry(CollHeap *heap,
ComTdb *tdb)
{
ExOperStats * stat = NULL;
if (tdb->getCollectStatsType() == ComTdb::OPERATOR_STATS)
{
// check to see if entry for this node exists. This could happen if
// multuiple send top tcbs are constructed for the same table(like, by
// a split top node).
stat =
getGlobals()->getStatsArea()->get(ExOperStats::EX_OPER_STATS,
tdb->getTdbId());
if (stat)
{
setStatsEntry(stat);
stat->incDop();
return NULL;
}
stat = ex_tcb::doAllocateStatsEntry(heap, tdb);
}
else
{
stat = new(heap) ExESPStats(heap,
sendTopTdb().getSendBufferSize(),
sendTopTdb().getRecvBufferSize(),
getChildInstanceNum(),
this,
tdb);
}
if (stat)
{
// Assuming parent Tdb is the one above this Tdb
stat->setParentTdbId(tdb->getTdbId()+1);
// Set SplitBottom as the left child TdbId
stat->setLeftChildTdbId(tdb->getTdbId()-1);
}
return stat;
}
/////////////////////////////////////////////////////////////////////////////
//
short ex_send_top_tcb::notifyProducerThatWeCanceled()
{
ExEspStmtGlobals *espGlobals =
getGlobals()->castToExExeStmtGlobals()->castToExEspStmtGlobals();
if (espGlobals->noNewRequest())
{
// sol 10-090220-9443. after esp receives release work msg, it sets the
// no-new-request flag that prevents this esp from propagating any msgs
// further down the esp chain. the msg can be any of the following:
//
// - data request: see checkSend()
// - continue msg: see continueRequest()
// - cancel msg: see processCancel()
// - late cancel msg: see notifyProducerThatWeCanceled()
//
// Make sure my root (split_bottom) knows to schedule me
// when I/O can resume.
espGlobals->setSendTopTcbActivated(mySendTopTcbIndex_);
return WORK_OK;
}
if (connection_ == NULL)
if (createConnectionToSendBottom(TRUE) != 0)
return -1;
TupMsgBuffer *tmb = getCancelSendBuffer(TRUE);
SqlBuffer *sqlBuf = tmb->get_sql_buffer();
tupp_descriptor *controlInfoAsTupp =
sqlBuf->add_tuple_desc((unsigned short) sizeof(ControlInfo));
ControlInfo msgControlInfo;
msgControlInfo.getDownState().request = ex_queue::GET_NOMORE;
msgControlInfo.setBufferSequenceNumber(9999999);
msgControlInfo.getDownState().parentIndex = 9999999; // manage this later
msgControlInfo.setIsDataRowPresent(FALSE);
msgControlInfo.setIsExtDiagsAreaPresent(FALSE);
str_cpy_all(controlInfoAsTupp->getTupleAddress(),
(char *) &msgControlInfo,
sizeof(ControlInfo));
ex_assert(espGlobals,
"Master does not use notifyProducerThatWeCanceled() right now");
// make sure we wait for the late cancel to complete
espGlobals->setSendTopTcbLateCancelling();
if (sendTopState_ == NOT_OPENED OR
sendTopState_ == WAITING_FOR_OPEN_COMPLETION)
setStState(CANCELED_BEFORE_OPENED, __LINE__);
EXSM_TRACE(EXSM_TRACE_CANCEL,"SNDT %p SEND late cancel", this);
cancelMessageStream_->setLateCancel();
cancelMessageStream_->sendRequest();
return 0;
}
/////////////////////////////////////////////////////////////////////////////
//
TupMsgBuffer * ex_send_top_tcb::getCancelSendBuffer(NABoolean lateCancel)
{
ex_assert((sendTopState_ != NOT_OPENED AND sendTopState_ != WAITING_FOR_OPEN_COMPLETION) || lateCancel,
"Canceling too soon!");
if (!cancelMessageStream_)
{
IpcMessageObjSize maxBufSize = MAXOF(sendBufferSize_,500);
IpcMessageBuffer::alignOffset(maxBufSize);
maxBufSize += sizeof(TupMsgBuffer);
IpcMessageBuffer::alignOffset(maxBufSize);
maxBufSize += MAXOF(sizeof(ExEspCancelReqHeader),
sizeof(ExEspLateCancelReqHeader));
IpcMessageBuffer::alignOffset(maxBufSize);
cancelMessageStream_ = new(getGlobals()->getSpace())
ExSendTopCancelMessageStream(
getGlobals()->castToExExeStmtGlobals(),
1, //sendBufferLimit,
1, //inUseBufferLimit,
maxBufSize,
this);
ex_assert(connection_ != NULL, "Connection not created!");
cancelMessageStream_->addRecipient(connection_);
if (sendTopTdb().getExchangeUsesSM())
{
SMConnection *smConn = (SMConnection *)
connection_->castToSMConnection();
ex_assert(smConn, "Invalid SM connection pointer");
smConn->setCancelStream(cancelMessageStream_);
}
}
else
{
ex_assert( !cancelMessageStream_->sendLimitReached() &&
!cancelMessageStream_->inUseLimitReached(),
"getCancelSendBuffer called at the wrong time!");
}
// construct cancel request header in message,
if (lateCancel)
{
ExEspLateCancelReqHeader *hdr =
new(*cancelMessageStream_) ExEspLateCancelReqHeader((NAMemory *) NULL);
if (hdr == NULL)
return NULL;
// a late cancel message may not know the ExFragInstanceHandle yet
hdr->key_ = getGlobals()->castToExExeStmtGlobals()->getFragmentKey(
sendTopTdb().getChildFragId());
hdr->myInstanceNum_ = getMyInstanceNum();
}
else
{
// include child ExFragInstanceHandle for fast routing
ExEspCancelReqHeader *hdr =
new(*cancelMessageStream_) ExEspCancelReqHeader((NAMemory *) NULL);
if (hdr == NULL)
return NULL;
hdr->handle_ = childFragHandle_;
hdr->myInstanceNum_ = getMyInstanceNum();
}
// construct sql buffer(TupMsgBuffer) directly in message to avoid copy
Lng32 sqlBufferLen = sendBufferSize_;
// The TupMsgBuffer for a late cancel is for only one queue entry and
// doesn't have a data record in it. Note that it may be routed through
// a non-buffered message stream at the producer ESP (the new incoming
// message stream in case we also send an open request). Therefore
// we must not send a multi-buffer message, because only buffered message
// streams know how to route that. Therefore we keep the buffer length of
// the TupMsgBuffer small. Sorry for the arbitrary choice of 500. It
// includes a very generous reserve as long as we work with single
// cancel requests with no records.
if (lateCancel)
sqlBufferLen = MAXOF(sqlBufferLen,500);
TupMsgBuffer *result = new(*cancelMessageStream_, sqlBufferLen)
TupMsgBuffer(sqlBufferLen,
TupMsgBuffer::MSG_IN,
cancelMessageStream_);
if (result == NULL)
ABORT("mismatched send top buffer size");
return result;
}
void ex_send_top_tcb::incReqMsg(Int64 msgBytes)
{
ExStatisticsArea *statsArea;
if ((statsArea = getGlobals()->getStatsArea()) != NULL)
statsArea->incReqMsg(msgBytes);
}
/////////////////////////////////////////////////////////////////////////////
void ex_send_top_tcb::setStState(
enum ExSendTopState newState,
int linenum)
{
if (newState != sendTopState_)
{
if (stTidx_ >= NumSendTopTraceElements)
stTidx_ = 0;
sendTopState_ = newState;
stStateTrace_[stTidx_].stState_ = sendTopState_;
stStateTrace_[stTidx_].lineNum_ = linenum;
stTidx_++;
}
}
const char *ex_send_top_tcb::getExSendTopStateString(ExSendTopState s)
{
switch (s)
{
case INVALID: return "INVALID";
case NOT_OPENED: return "NOT_OPENED";
case WAITING_FOR_OPEN_COMPLETION: return "WAITING_FOR_OPEN_COMPLETION";
case WAITING_FOR_OPEN_REPLY: return "WAITING_FOR_OPEN_REPLY";
case CANCELED_BEFORE_OPENED: return "CANCELED_BEFORE_OPENED";
case OPEN_COMPLETE: return "OPEN_COMPLETE";
case SERVER_SATURATED: return "SERVER_SATURATED";
default: return ComRtGetUnknownString((Int32) s);
}
}
// -----------------------------------------------------------------------
// Methods for class ExSendTopMessageStream
// -----------------------------------------------------------------------
/////////////////////////////////////////////////////////////////////////////
// constructor
ExSendTopMsgStream::ExSendTopMsgStream(ExExeStmtGlobals* glob,
Lng32 sendBufferLimit,
Lng32 inUseBufferLimit,
IpcMessageObjSize bufferSize,
ex_send_top_tcb* sendTopTcb)
: IpcClientMsgStream(glob->getIpcEnvironment(),
IPC_MSG_SQLESP_DATA_REQUEST,
CurrEspRequestMessageVersion,
sendBufferLimit,
inUseBufferLimit,
bufferSize),
sendTopTcb_(sendTopTcb)
{ }
///////////////////////////////////////////////////////////////////////////////
// method called upon send complete
void ExSendTopMsgStream::actOnSend(IpcConnection* connection)
{
EXSM_TRACE(EXSM_TRACE_PROTOCOL, "SNDT %p ACTS D s %p c %p",
sendTopTcb_, this, connection);
if (connection)
{
// we always use this message with the same connection
// ex_assert(connection == sendTopTcb_->connection_,
// "A send top message is associated with the wrong connection");
if (getErrorInfo())
{
ComDiagsArea* da = sendTopTcb_->getGlobals()->
castToExExeStmtGlobals()->getDiagsArea();
ComDiagsArea* oldDa = da;
connection->populateDiagsArea( da, sendTopTcb_->getHeap());
if ((!oldDa) && (da))
{
sendTopTcb_->getGlobals()->castToExExeStmtGlobals()->
setGlobDiagsArea(da);
da->decrRefCount();
}
sendTopTcb_->setIpcBroken();
sendTopTcb_->tickleSchedulerWork();
EXSM_TRACE(EXSM_TRACE_PROTOCOL,"SNDT %p IPC broken", sendTopTcb_);
EXSM_TRACE(EXSM_TRACE_PROTOCOL,"SNDT %p tcb scheduled", sendTopTcb_);
}
else
{
sendTopTcb_->incReqMsg(connection->getLastSentMsg()->getMessageLength());
}
}
ExExeStmtGlobals *stmtGlob =
sendTopTcb_->getGlobals()->castToExExeStmtGlobals();
stmtGlob->incrementSendTopMsgesOut();
EXSM_TRACE(EXSM_TRACE_PROTOCOL, "SNDT %p ACTS D glob msgs %d",
sendTopTcb_,
(int) stmtGlob->numSendTopMsgesOut());
}
/////////////////////////////////////////////////////////////////////////////
// method called upon receive complete
void ExSendTopMsgStream::actOnReceive(IpcConnection* connection)
{
EXSM_TRACE(EXSM_TRACE_PROTOCOL, "SNDT %p ACTR D s %p c %p",
sendTopTcb_, this, connection);
if (connection)
{
// we always use this message with the same connection
// ex_assert(connection == sendTopTcb_->connection_,
// "A send top message is associated with the wrong connection");
if (getErrorInfo())
{
ComDiagsArea* da = sendTopTcb_->getGlobals()->
castToExExeStmtGlobals()->getDiagsArea();
ComDiagsArea* oldDa = da;
connection->populateDiagsArea( da, sendTopTcb_->getHeap());
if ((!oldDa) && (da))
{
sendTopTcb_->getGlobals()->castToExExeStmtGlobals()->
setGlobDiagsArea(da);
da->decrRefCount();
}
sendTopTcb_->setIpcBroken();
}
}
ExExeStmtGlobals *stmtGlob =
sendTopTcb_->getGlobals()->castToExExeStmtGlobals();
if (getSMContinueProtocol())
{
if (getSMBatchIsComplete())
stmtGlob->decrementSendTopMsgesOut();
}
else
{
stmtGlob->decrementSendTopMsgesOut();
}
EXSM_TRACE(EXSM_TRACE_PROTOCOL, "SNDT %p ACTR D glob msgs %d rp %d",
sendTopTcb_,
(int) stmtGlob->numSendTopMsgesOut(),
(int) numOfResponsesPending());
// wake up the work procedure
sendTopTcb_->tickleSchedulerWork(TRUE);
EXSM_TRACE(EXSM_TRACE_PROTOCOL,"SNDT %p tcb scheduled", sendTopTcb_);
}
// -----------------------------------------------------------------------
// Methods for class ExSendTopCancelMessageStream
// -----------------------------------------------------------------------
ExSendTopCancelMessageStream::ExSendTopCancelMessageStream(
ExExeStmtGlobals *glob,
Lng32 sendBufferLimit,
Lng32 inUseBufferLimit,
IpcMessageObjSize bufferSize,
ex_send_top_tcb *sendTopTcb)
: IpcClientMsgStream(glob->getIpcEnvironment(),
IPC_MSG_SQLESP_CANCEL_REQUEST,
CurrEspRequestMessageVersion,
sendBufferLimit,
inUseBufferLimit,
bufferSize),
sendTopTcb_(sendTopTcb),
lateCancel_(FALSE)
{}
/////////////////////////////////////////////////////////////////////////////
// method called upon send complete
void ExSendTopCancelMessageStream::actOnSend(IpcConnection *connection)
{
EXSM_TRACE(EXSM_TRACE_PROTOCOL, "SNDT %p ACTS C %p c %p",
sendTopTcb_, this, connection);
if (connection)
{
if (getErrorInfo())
{
ComDiagsArea* da = sendTopTcb_->getGlobals()->
castToExExeStmtGlobals()->getDiagsArea();
ComDiagsArea* oldDa = da;
connection->populateDiagsArea( da, sendTopTcb_->getHeap());
if ((!oldDa) && (da))
{
sendTopTcb_->getGlobals()->castToExExeStmtGlobals()->
setGlobDiagsArea(da);
da->decrRefCount();
}
sendTopTcb_->setIpcBroken();
sendTopTcb_->tickleSchedulerCancel();
EXSM_TRACE(EXSM_TRACE_PROTOCOL,"SNDT %p IPC broken", sendTopTcb_);
EXSM_TRACE(EXSM_TRACE_PROTOCOL,"SNDT %p tcb scheduled", sendTopTcb_);
}
}
ExExeStmtGlobals *stmtGlob =
sendTopTcb_->getGlobals()->castToExExeStmtGlobals();
stmtGlob->incrementCancelMsgesOut();
EXSM_TRACE(EXSM_TRACE_PROTOCOL, "SNDT %p ACTS C glob cancel %d",
sendTopTcb_,
(int) stmtGlob->numCancelMsgesOut());
}
void ExSendTopCancelMessageStream::actOnReceive(IpcConnection *connection)
{
EXSM_TRACE(EXSM_TRACE_PROTOCOL, "SNDT %p ACTR %s %p c %p", sendTopTcb_,
(lateCancel_ ? "LC" : "C"), this, connection);
if (connection)
{
if (getErrorInfo())
{
ComDiagsArea *da = sendTopTcb_->getGlobals()->
castToExExeStmtGlobals()->getDiagsArea();
ComDiagsArea *oldDa = da;
connection->populateDiagsArea( da,
sendTopTcb_->getHeap());
if ((!oldDa) && (da))
{
sendTopTcb_->getGlobals()->castToExExeStmtGlobals()->
setGlobDiagsArea(da);
da->decrRefCount();
}
sendTopTcb_->setIpcBroken();
}
}
ExExeStmtGlobals *stmtGlob =
sendTopTcb_->getGlobals()->castToExExeStmtGlobals();
stmtGlob->decrementCancelMsgesOut();
if (lateCancel_)
{
// do the bookkeeping here and let the cancel method clean
// up the message stream
lateCancel_ = FALSE;
ExEspStmtGlobals *espGlobals = sendTopTcb_->getGlobals()->
castToExExeStmtGlobals()->castToExEspStmtGlobals();
if (espGlobals)
espGlobals->resetSendTopTcbLateCancelling();
}
EXSM_TRACE(EXSM_TRACE_PROTOCOL, "SNDT %p ACTR %s glob cancel %d rp %d",
sendTopTcb_,
(lateCancel_ ? "LC" : "C"),
(int) stmtGlob->numCancelMsgesOut(),
(int) numOfResponsesPending());
// wake up the cancel procedure. It might be awaiting this
// receive so it can send somemore.
sendTopTcb_->tickleSchedulerCancel();
EXSM_TRACE(EXSM_TRACE_PROTOCOL,"SNDT %p tcb scheduled", sendTopTcb_);
}
///////////////////////////////////////////////
// class ex_send_top_private_state
///////////////////////////////////////////////
ex_send_top_private_state::ex_send_top_private_state(
const ex_send_top_tcb * /*tcb*/)
{
step_ = ex_send_top_tcb::NOT_STARTED_;
}
ex_send_top_private_state::~ex_send_top_private_state()
{
}
ex_tcb_private_state * ex_send_top_private_state::allocate_new(
const ex_tcb *tcb)
{
return new(((ex_tcb *)tcb)->getSpace())
ex_send_top_private_state((ex_send_top_tcb *) tcb);
}