blob: ddf0218995186fb5dfe270c228a330eaaac6fe88 [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
**********************************************************************/
#include "SMConnection.h"
#include "ExSMGlobals.h"
#include "ExSMTask.h"
#include "ExSMReadyList.h"
#include "ExSMQueue.h"
#include "ExSMEvent.h"
#include "ExSMShortMessage.h"
#include "ex_exe_stmt_globals.h"
#include "ExpErrorEnums.h"
#include "NAAssert.h"
#include "PortProcessCalls.h"
// -----------------------------------------------------------------------
// Methods for class SMConnection
// -----------------------------------------------------------------------
SMConnection::SMConnection(IpcEnvironment *env,
sm_target_t &smTarget,
UInt32 numReceiveBuffers,
IpcMessageObjSize maxBufSize,
ex_tcb *tcb,
ExMasterStmtGlobals *masterGlobals,
NABoolean isServer)
: IpcConnection(env,
IpcProcessId(),
isServer ?
eye_SEAMONSTER_CONNECTION_TO_CLIENT :
eye_SEAMONSTER_CONNECTION_TO_SERVER)
, scheduled_(0)
, isServer_(isServer)
, outstandingSMRequests_(0)
, numReceiveBuffers_(numReceiveBuffers)
, maxBufSize_(maxBufSize)
, dataStream_(NULL)
, cancelStream_(NULL)
, smTarget_(smTarget)
, smTask_(NULL)
, tcb_(tcb)
, smErrorNumber_(0)
, ccErrorNumber_(0)
, masterGlobals_(masterGlobals)
, sendCount_(0)
, recvCount_(0)
, postCount_(0)
, chunk_buffer_(NULL)
, chunk_nextOffset_(0)
, chunk_size_(0)
, chunk_waitingForAck_(false)
, intranode_(false)
{
memset(smErrorFunction_, 0, sizeof(smErrorFunction_));
ExSMGlobals *smGlobals = ExSMGlobals::GetExSMGlobals();
assert(smGlobals);
IpcEnvironment *ipcEnv = getEnvironment();
assert(ipcEnv);
// Create the SM target that will be stored in the task
//
// * Server-side: connection and task store the same target
//
// * Client-side: the task stores a modified target. The only change
// is to target.tag. In the task's tag, we set a bit to indicate
// that this task receives replies (as opposed to receiving
// requests). This bit is needed for multi-fragment where both
// endpoints of a logical connection can be in the same process.
sm_target_t targetForTask = smTarget_;
if (!isServer_)
ExSMTag_SetReplyFlag(&targetForTask.tag);
// Create the task. We allocate an additional slot in the
// receive queue for a "larger than expected" reply buffer
uint32_t queueSize = numReceiveBuffers_ + 1;
smTask_ = smGlobals->addTask(targetForTask,
queueSize,
&scheduled_,
ipcEnv->getHeap(),
tcb_,
this);
assert(smTask_);
EXSM_TRACE(EXSM_TRACE_INIT, "CONN CTOR %s %p task %p",
isServer_ ? "S" : "C", this, smTask_);
EXSM_TRACE(EXSM_TRACE_INIT, "CONN CTOR %s %p max buf %d",
isServer_ ? "S" : "C", this, (int) maxBufSize_);
if (masterGlobals)
masterGlobals->addSMConnection(this);
ipcEnv->getAllConnections()->incrNumSMConnections();
// When intranode_ is true, both ends of the connection are on the
// same seaquest node and there is no limit on the chunk size that
// can be sent.
int32_t myNode = smGlobals->getSQNodeNum();
if (myNode == smTarget_.node)
intranode_ = true;
}
SMConnection::~SMConnection()
{
EXSM_TRACE(EXSM_TRACE_INIT,"CONN DTOR %s %p %s",
isServer_ ? "S" : "C", this,
getConnectionStateString(getState()));
ExSMGlobals *smGlobals = ExSMGlobals::GetExSMGlobals();
assert(smGlobals);
handleIOErrorForSM();
ExSMEvent::add(ExSMEvent::SMError, &smTarget_,
smErrorNumber_, ccErrorNumber_, 0, (int64_t) this);
if (smTask_)
{
smGlobals->removeTask(smTask_);
smTask_ = NULL;
}
if (masterGlobals_)
masterGlobals_->removeSMConnection(this);
IpcMessageBuffer *receiveBuf;
while (receiveBuf = getNextReceiveQueueEntry())
receiveBuf->callReceiveCallback(this);
// Right now we do not have error handling in the SM connection or
// streams for fatal error scenarios. For example the master might
// get an IPC error on an ESP control connection and decide to run
// TCB destructors while the SM connections are expecting
// arrivals. For now we will simply CLOSE this connection which
// removes it from the pendingIOs_ list.
setState(CLOSED);
getEnvironment()->getAllConnections()->decrNumSMConnections();
}
Int32 SMConnection::numQueuedSendMessages()
{
return sendQueueEntries();
}
Int32 SMConnection::numQueuedReceiveMessages()
{
Int32 result = 0;
if (smTask_)
{
ExSMQueue *q = smTask_->getOutQueue();
if (q)
result = (Int32) q->getLength();
}
return result;
}
int32_t SMConnection::allocateReceiveBuffers()
{
ExSMQueue *inQueue = smTask_->getInQueue();
int count = 0;
int32_t rc = 0;
while (inQueue->getLength() < numReceiveBuffers_)
{
assert(!inQueue->isFull());
IpcEnvironment *env = getEnvironment();
NAMemory *heap = (env ? env->getHeap() : NULL);
IpcMessageBuffer *msgBuf =
IpcMessageBuffer::allocate(maxBufSize_, NULL, heap, 0);
assert(msgBuf);
ExSMQueue::Entry &entry = inQueue->getTailEntry();
entry.setData((void *) msgBuf);
inQueue->insert();
count++;
sm_target_t target = smTarget_;
if (!isServer_)
ExSMTag_SetReplyFlag(&target.tag);
UInt32 maxSMChunkSize = ExSM_GetMaxChunkSize(intranode_);
UInt32 bufferSize = msgBuf->getBufferLength();
if (bufferSize > maxSMChunkSize)
bufferSize = maxSMChunkSize;
rc = ExSM_Post(target,
bufferSize,
msgBuf->data(),
smTask_,
isServer_);
if (rc == SM_ERR_NOPEER || rc == SM_ERR_NOSERVICE || rc == SM_ERR_NODE_DOWN)
return rc;
exsm_assert_rc(rc, "ExSM_Post");
postCount_++;
}
EXSM_TRACE(EXSM_TRACE_BUFFER,"CONN %p %s alloc recv bufs %d", this,
isServer_ ? "server" : "client", count);
return rc;
}
void SMConnection::send(IpcMessageBuffer *msgBuf)
{
assert(msgBuf);
EXSM_TRACE(EXSM_TRACE_BUFFER, "CONN %p BEGIN SEND %s state %s", this,
(isServer_ ? "S" : "C"), getConnectionStateString(getState()));
EXSM_TRACE(EXSM_TRACE_BUFFER, "CONN %p buf %p len %d ref %d", this,
msgBuf, (int) msgBuf->getMessageLength(),
(int) msgBuf->getRefCount());
if (!isServer_ && getState() == ERROR_STATE)
{
// The connection is in an error state so we do not attempt the
// send. We issue a send callback followed by a receive callback
// because data streams require both callbacks, even when errors
// are encountered, to perform their bookkeeping.
// This call will put the connection on pendingIOs_ even though no
// I/O is being attempted. The connection will be removed from
// pendingIOs_ when the receive callback is processed.
incrOutstandingSMRequests();
// Perform the send callback
msgBuf->callSendCallback(this);
// Perform the recieve callback and release the reference to the
// message buffer
msgBuf->callReceiveCallback(this);
msgBuf->decrRefCount();
}
else
{
queueSendMessage(msgBuf);
while (tryToSendOneChunk())
;
}
EXSM_TRACE(EXSM_TRACE_BUFFER, "CONN %p END SEND state %s", this,
getConnectionStateString(getState()));
}
// Try to send one buffer or one chunk if the buffer size is too
// large. Return true if a buffer or chunk is sent.
//
// Nothing is sent and false is returned if the connection is waiting
// for an ack.
//
// Buffers are too large if either of the following is true:
// * Buffer size exceeds the expected size for this connection
// * Buffer size exceeds the maximum SM chunk size
//
// When we send chunks the protocol is:
//
// * First send a short message containing the buffer size and
// the chunk size, and return false
// * For each chunk
// * The function will eventually be called again when an ack
// arrives.
// * Send a chunk and return true
//
// Any time the function is called while waiting for an ack, do
// nothing and return false.
//
// The function will return true if a send or prepost attempt returns
// the NOPEER error.
//
bool SMConnection::tryToSendOneChunk()
{
EXSM_TRACE(EXSM_TRACE_BUFFER,
"CONN %p TRY wait %d arr %d q_sz %d",
this, (int) getWaitingForAck(), (int) getAckArrived(),
(int) sendQueueEntries());
int32_t rc = 0;
// These local variables will be used for the SM send
IpcMessageBuffer *sendBuffer = NULL;
UInt32 sendOffset = 0;
UInt32 sendBytes = 0;
// Keep track of whether we are sending a message in chunks, and
// whether we have reached the final chunk
bool sqlChunkMode = false;
bool finalSQLChunk = false;
// We want to send different things depending on whether we are in
// chunk mode. In chunk mode, we want to send the next
// chunk. Otherwise we want to send the next buffer from the send
// queue.
//
// In chunk mode the WAITING FOR ACK property of the connection will
// be TRUE. It gets set to TRUE by the enterChunkMode() method and
// stays TRUE until chunk transmissions are complete.
if (getWaitingForAck())
{
// Note the fact that we are in chunk mode
sqlChunkMode = true;
// See if the ACK ARRIVED property is set. The property is stored
// in the task object. The property is set by the reader thread
// calling task->sendChunk_SetAckArrived(true). The property is
// cleared below, inside the getAckArrived() block.
if (getAckArrived())
{
// The connection was waiting for an ack and the ack
// arrived. The next send can take place.
// Adjust local variables to drive the send. Since we are
// already in chunk mode, the data to send comes from the buffer
// chunk_buffer_ at offset chunk_nextOffset_.
//
// The number of bytes to send will be chunk_size_ unless this
// is the last chunk and there are not that many bytes remaining
// in chunk_buffer_.
exsm_assert(chunk_buffer_, "ACK arrived but no buffer to send");
UInt32 msgLen = (UInt32) chunk_buffer_->getMessageLength();
sendBuffer = chunk_buffer_;
sendOffset = chunk_nextOffset_;
if (chunk_nextOffset_ + chunk_size_ < msgLen)
{
sendBytes = chunk_size_;
}
else
{
exsm_assert(chunk_nextOffset_ < msgLen,
"Chunk offset should not exceed message length");
sendBytes = (msgLen - chunk_nextOffset_);
finalSQLChunk = true;
}
// Adjust data members in response to the ack arriving
//
// * Clear the ACK ARRIVED flag
//
// * For a non-final chunk, set the WAITING FOR ACK flag because
// this connection will require an ack before sending the next
// chunk
//
// * For a final chunk, clear the WAITING FOR ACK flag
//
setAckArrived(false, !finalSQLChunk);
EXSM_TRACE(EXSM_TRACE_BUFFER,
"CONN %p ACK ARRIVED buf %p offset %d bytes %d final %d",
this, sendBuffer, (int) sendBytes, (int) sendOffset,
(int) finalSQLChunk);
} // ack arrived
else
{
// No ack has arrived so there is nothing to do. The sendBuffer
// variable is NULL which will be seen by code below and trigger
// an early return.
}
} // waiting for an ack
else
{
// Remove a buffer from the send queue
sendBuffer = getNextSendQueueEntry();
if (sendBuffer)
{
// maxSMChunkSize will be the largest physical chunk we can
// transmit over SM. For intranode messages there is no limit so
// maxSMChunkSize will be the total size of sendBuffer.
UInt32 msgLen = (UInt32) sendBuffer->getMessageLength();
UInt32 maxSMChunkSize = ExSM_GetMaxChunkSize(intranode_);
EXSM_TRACE(EXSM_TRACE_BUFFER,
"CONN %p dequeue %p len %d",
this, sendBuffer, (int) msgLen);
// Cases to consider after a buffer is removed from the send
// queue:
//
// (a) The buffer exceeds SM maximum chunk size. It will be
// sent in multiple chunks.
//
// (b) The buffer does not exceed SM maximum chunk size but is
// larger than the receiving connection's max buffer size. The
// buffer can be sent in one chunk but first requires a short
// message round-trip interaction so that the receiving end can
// allocate a receive buffer of sufficient size.
//
// (c) The buffer does not exceed either limit and can be sent
// right away
if (msgLen <= maxBufSize_ && msgLen <= maxSMChunkSize)
{
// Case (c)
// The buffer can be sent right away. There is nothing to do
// here except adjust local variables that will drive the
// send. The send takes place later in this method.
sendOffset = 0;
sendBytes = msgLen;
}
else
{
// Cases (a) and (b) are treated the same
// The buffer exceeds either the SM maximum size or the
// receiving connection's maximum size. A short message will
// be sent containing the buffer size and the chunk size. Data
// from the buffer will not be sent until an ack arrives.
// Adjust data members to ENTER chunk mode
UInt32 sqlChunkSize =
(msgLen > maxSMChunkSize ? maxSMChunkSize : msgLen);
enterChunkMode(sendBuffer, sqlChunkSize);
// Setting sendBuffer to NULL will trigger an early return
// after the short message is sent
sendBuffer = NULL;
// We are about to send a short message containing the buffer
// size and the chosen chunk size. The tag needs the REPLY bit
// set. The message data region contains:
// * Message type
// * Buffer size
// * SQL chunk size
sm_target_t targetToSend = smTarget_;
if (isServer_)
ExSMTag_SetReplyFlag(&targetToSend.tag);
ExSMShortMessage m;
m.setTarget(targetToSend);
m.setNumValues(3);
m.setValue(0, (int32_t) ExSMShortMessage::SIZE);
m.setValue(1, (int32_t) msgLen);
m.setValue(2, (int32_t) sqlChunkSize);
m.send();
if (rc == SM_ERR_NOPEER || rc == SM_ERR_NOSERVICE || rc == SM_ERR_NODE_DOWN)
{
if (!isServer_)
{
setErrorInfo(-1);
setState(ERROR_STATE);
smErrorNumber_ = rc;
strcpy(smErrorFunction_, "ExSM_SendShortMessage");
}
else
NAExit(0);
}
exsm_assert_rc(rc, "ExSM_SendShortMessage");
} // buffer is too large
} // if (sendBuffer)
} // not waiting for an ack
// There is nothing to do if the connection is waiting on an ack or
// if there was nothing in the send queue. In either case the
// sendBuffer variable is NULL.
if (sendBuffer == NULL)
return false;
// On the client side this adds the SM connection to
// pendingIOs_. On the server side the SM connection is added to
// pendingIOs_ as soon as the connection is created.
if (!isServer_)
incrOutstandingSMRequests();
// Before sending an SM message we must have pre-allocated receive
// buffers for all possible replies. Pre-allocated buffers allow the
// reader thread to fill target buffers without the overhead of
// allocating space on the fly.
rc = allocateReceiveBuffers();
if (rc == SM_ERR_NOPEER || rc == SM_ERR_NOSERVICE || rc == SM_ERR_NODE_DOWN)
{
if (!isServer_)
{
smErrorNumber_ = rc;
strcpy(smErrorFunction_, "ExSM_Post");
setErrorInfo(-1);
setState(ERROR_STATE);
// Issue the send callback with the connection in the ERROR
// state. Note that smErrorNumber_ and smErrorFunction_ were
// set by allocateReceiveBuffers().
sendBuffer->callSendCallback(this);
sendBuffer->decrRefCount();
return true;
}
else
NAExit(0);
}
// The next step is to send a message
ExSM_MessageType msgType =
(isServer_ ? EXSM_MSG_REPLY : EXSM_MSG_REQUEST);
ExSMGlobals *smGlobals = ExSMGlobals::GetExSMGlobals();
sm_target_t targetToSend = smTarget_;
if (sqlChunkMode)
ExSMTag_SetSQLChunkFlag(&targetToSend.tag);
if (isServer_)
ExSMTag_SetReplyFlag(&targetToSend.tag);
bool messageWasSent = false;
rc = ExSM_Send(smGlobals,
targetToSend,
sendBuffer->data(sendOffset),
sendBytes,
msgType,
true, // isPrepostRequired
messageWasSent, // OUT
-1, // retry forever
sendCount_,
sendBuffer->data(0));
if (rc == SM_ERR_NOPEER || rc == SM_ERR_NOSERVICE || rc == SM_ERR_NODE_DOWN)
{
if (!isServer_)
{
setErrorInfo(-1);
setState(ERROR_STATE);
smErrorNumber_ = rc;
strcpy(smErrorFunction_, "ExSM_Send");
}
else
NAExit(0);
}
else if (rc == SM_ERR_QUEUED)
{
// Add a reference to the send buffer because SM is holding a
// pointer to the buffer. This new reference will be released
// after SM returns the buffer to the reader thread. The reader
// will place the buffer on a list that the main thread cleans up
// periodically.
sendBuffer->incrRefCount();
}
else
{
assert(messageWasSent && rc == 0);
}
sendCount_++;
EXSM_TRACE(EXSM_TRACE_PROTOCOL,"CONN %p send count %lld", this,
sendCount_);
if (sqlChunkMode)
{
// Do not attempt to send more chunks if an error was encountered
if (getState() == ERROR_STATE)
finalSQLChunk = true;
if (finalSQLChunk)
{
// Adjust data members to EXIT chunk mode
exitChunkMode();
}
else
{
// A chunk was sent and it was not the final chunk. Adjust data
// members to move to the NEXT chunk.
moveToNextChunk();
}
}
// Invoke the send callback if this was not a chunked message or if
// it was the final chunk
if (!sqlChunkMode || finalSQLChunk)
{
// The lastSentBuffer_ data member is used inside some callback
// methods, for example to record the size of the message that was
// just sent in runtime stats.
lastSentBuffer_ = sendBuffer;
sendBuffer->callSendCallback(this);
lastSentBuffer_ = NULL;
// Release the message buffer. The buffer was passed in from a
// stream but the stream no longer holds a reference. If the
// buffer was not queued by SM, this connection holds the only
// reference. If the buffer was queued, an additional reference
// was added above that the buffer will be released later.
sendBuffer->decrRefCount();
}
return true;
}
// SMConnection::receive()
//
// SM connections are only used by buffered streams (subclasses of
// IpcBufferedMsgStream). Buffered streams call connection.receive()
// in the following cases:
//
// Client
// * IpcClientMsgStream::internalActOnSend() calls
// connection->receive() BEFORE issuing the send callback on the
// stream
//
// Server
// * IpcServerMsgStream::internalActOnReceive() calls
// connection->receive() AFTER issuing the receive callback on the
// stream
// * During the send bottom TCB constructor, connection.receive() is
// called once to put the connection into an initial receiving state
//
void SMConnection::receive(IpcMessageStreamBase *msgStream)
{
// if we are on server side adds SM connection to pendingIOs_
// on the client side SM connection is added to pendingIOs when
// the first message is sent and the connection state is set to SENDING
if (isServer_)
setState(RECEIVING);
// The call to addReceiveCallback(stream) will add the stream to the
// connection's recvStreams_ list. It does nothing else.
// recvStreams_ is used as buffers arrive to match buffers to the
// correct stream. The matchmaking happens in
// IpcConnection::getNextReceiveQueueEntry().
addReceiveCallback(msgStream);
// Make sure the maximum number of receive buffers are now
// pre-allocated. This is needed only for server-side connections
// because the server-side has to be ready to receive messages
// before doing any sends. In contract, a client-side connection can
// allocate receive buffers when a new request is sent to the
// server.
if (isServer_)
{
EXSM_TRACE(EXSM_TRACE_PROTOCOL,
"CONN %p receive, allocating buffers", this);
int32_t rc = allocateReceiveBuffers();
if (rc == SM_ERR_NOPEER || rc == SM_ERR_NOSERVICE || rc == SM_ERR_NODE_DOWN)
NAExit(0);
}
}
WaitReturnStatus SMConnection::wait(IpcTimeout timeout,
UInt32 *eventConsumed,
IpcAwaitiox *ipcAwaitiox)
{
WaitReturnStatus waitReturnStatus;
// Temporary code till Gil's changes are merged to send a message to master
// through control connection
ExSMGlobals *smGlobals = ExSMGlobals::GetExSMGlobals();
if (smGlobals->getReaderThreadState() == ExSMGlobals::TERMINATED_DUE_TO_ERROR)
{
if (!isServer_)
{
if (getState() != ERROR_STATE)
{
EXSM_TRACE(EXSM_TRACE_PROTOCOL,"CONN %p Reader thread terminated",
this);
smErrorNumber_ = smGlobals->getReaderThreadSmErrorNumber();
strcpy(smErrorFunction_, smGlobals->getReaderThreadSmErrorFunction());
setErrorInfo(-1);
setState(ERROR_STATE);
}
}
else
{
EXSM_TRACE(EXSM_TRACE_PROTOCOL,
"CONN %p Reader thread terminated, exiting", this);
NAExit(0);
}
}
// $$$$ SEAMONSTER PROJECT JUNE 2013
// We should be able to remove the scheduled_ data member now that
// we have a ready list for tasks with arrivals, and only call
// connection.wait() for a task that is on the ready list.
if (scheduled_)
waitReturnStatus = workOnArrivals();
else
waitReturnStatus = WAIT_OK;
return waitReturnStatus;
}
WaitReturnStatus SMConnection::workOnArrivals()
{
EXSM_TRACE(EXSM_TRACE_PROTOCOL,"CONN %p BEGIN WORKONARRIVALS", this);
while (getWaitingForAck() && getAckArrived())
while (tryToSendOneChunk())
;
scheduled_ = 0;
ExSMQueue *outQueue = smTask_->getOutQueue();
// Each iteration of the while loop will do the following
// * Remove an incoming buffer from the SM task queue
// * On the client side, give the buffer a message stream pointer
// (data or cancel)
// * Add the buffer to the connection's receive queue
// * On the server side, set the connection state to ESTABLISHED
//
// Following the while loop, a message stream receive callback is
// issued for each buffer on the connection's receive queue
ExSMTaskList *smTaskList = ExSMGlobals::GetExSMGlobals()->getSMTaskList();
ExSMReadyList *readyList = ExSMGlobals::GetExSMGlobals()->getReadyList();
while (!outQueue->isEmpty())
{
ExSMQueue::Entry &entry = outQueue->getHeadEntry();
IpcMessageBuffer *msgBuf = (IpcMessageBuffer *) entry.getData();
bool locked = false;
// We are about to remove a buffer from the task output queue
//
// If the task output queue is about to become empty (current
// length is 1 and we are about to remove an entry), perform the
// following steps:
//
// (a) Before removing the buffer, acquire the task list lock.
// This prevents the reader thread from putting the task back
// on the completed task queue.
//
// (b) After removing the buffer, if the output queue is still
// empty, remove the task from the completed task queue and
// release the task list lock.
// Acquire the task list lock
if (outQueue->getLength() == 1)
{
smTaskList->lock();
locked = true;
}
// Remove a buffer from the task output queue
outQueue->removeHead();
recvCount_++;
// Remove the SM task from the ready list and release the lock. A
// task is added to the ready list by the reader thread when a
// buffer arrives for a task and that task currently has an empty
// output queue.
//
// NOTE: The SM ready list is accessed by both threads (main and
// reader) but does not have its own lock. By convention,
// modifications to the ready list are always performed while
// holding a lock on the SM task list (a global collection of all
// SM tasks in this process).
if (outQueue->isEmpty())
{
assert(locked);
readyList->remove(smTask_);
}
if (locked)
smTaskList->unlock();
// The next few lines look into the message and extract the type
// of the sending message stream. The stream type is used for
// tracing and also used later to differentiate data versus cancel
// messages.
InternalMsgHdrInfoStruct *infoStruct = (InternalMsgHdrInfoStruct *)
msgBuf->data(0);
ESPMessageTypeEnum streamType =
(ESPMessageTypeEnum) infoStruct->getType();
EXSM_TRACE(EXSM_TRACE_PROTOCOL,
"CONN %p recv %p len %d tag %d %s", this, msgBuf,
(int) msgBuf->getMessageLength(),
(int) msgBuf->getReplyTag(),
getESPStreamTypeString(streamType));
if (!isServer_)
EXSM_TRACE(EXSM_TRACE_PROTOCOL,
"CONN %p lbatch %d seq curr %d incoming %d", this,
(int) infoStruct->getSMLastInBatch(),
(int) getReplySeqNum(), (int) infoStruct->getSeqNum());
// After a control connection error was reported we discard any
// late-arriving data buffers. This is because the method
// reportControlConnectionError() already issued receive callbacks
// for all pending arrivals and we do not want callbacks issued
// twice for the same arrival.
if (ccErrorNumber_ != 0)
{
msgBuf->decrRefCount();
// Return to the top of the "WHILE OUTQ IS NOT EMPTY" loop
continue;
}
if (!isServer_)
{
// For client-side only. Add a pointer to the appropriate stream
// to the message buffer object.
//
// Note: we do not call decrOutstandingSMRequests() for a DATA
// reply as we do for a CANCEL reply. This is because data
// streams use a batch protocol and we do not want the decrement
// for every buffer. Only the last buffer in a batch should
// trigger the decrement. The stream receive callback (issued
// later in this method) takes care of decrementing the counter
// once a complete batch is seen. This happens inside
// IpcClientMsgStream::internalActOnReceive().
switch (streamType)
{
case IPC_MSG_SQLESP_DATA_REPLY:
{
msgBuf->addCallback(dataStream_);
break;
}
case IPC_MSG_SQLESP_CANCEL_REPLY:
{
msgBuf->addCallback(cancelStream_);
decrOutstandingSMRequests();
break;
}
default:
assert(0);
} // switch (streamType)
} // if (!isServer_)
// To allow this buffer to pass through an IpcClientMsgStream or
// IpcServerMsgStream it needs a valid reply tag. The tag is only
// checked for being valid, the actual value currently has no
// meaning for SM.
short replyTag = 1;
msgBuf->setReplyTag(replyTag);
EXSM_TRACE(EXSM_TRACE_PROTOCOL,"CONN %p set reply tag %d",
this, (int) replyTag);
queueReceiveMessage(msgBuf);
// right for server, as the server sets it to SENDING
// again before receive call is done.
// But for client the the conn should not removed
// from the pendingIOs_ after the first receive but
// should be set to ESTABLISHED after all the expected
// number of replies are recieved or EOD is received.
if (isServer_)
setState(ESTABLISHED);
} // while out queue is not empty
// NOTE: getNextReceiveQueueEntry() has side effects and is not
// guaranteed to return a buffer just because the queue is
// non-empty. To fully understand the de-queueing step, you should
// read getNextReceiveQueueEntry(). The logic there includes:
// * special cases for server-side and the ERROR state
// * matching of reply sequence numbers
// * if the incoming buffer has a stream pointer, matching of that
// pointer with an element of recvStreams_
// * removal from receiveQueue_ and recvStreams_
// * updating the buffer's stream pointer
// * increment of replySeqNum_
IpcMessageBuffer *receiveBuf;
while ((receiveBuf = getNextReceiveQueueEntry()))
receiveBuf->callReceiveCallback(this);
EXSM_TRACE(EXSM_TRACE_PROTOCOL,"CONN %p END WORKONARRIVALS", this);
return WAIT_OK;
}
void SMConnection::setState(IpcConnectionState s)
{
IpcConnectionState oldState = getState();
if (oldState != s)
EXSM_TRACE(EXSM_TRACE_PROTOCOL,"CONN %p %s -> %s", this,
getConnectionStateString(oldState),
getConnectionStateString(s));
IpcConnection::setState(s);
}
void SMConnection::incrOutstandingSMRequests()
{
outstandingSMRequests_++;
EXSM_TRACE(EXSM_TRACE_PROTOCOL,"CONN %p outstanding reqs %d", this,
(int) outstandingSMRequests_);
if (outstandingSMRequests_ == 1)
setState(SENDING);
}
void SMConnection::decrOutstandingSMRequests()
{
assert(outstandingSMRequests_ > 0);
outstandingSMRequests_--;
EXSM_TRACE(EXSM_TRACE_PROTOCOL,"CONN %p outstanding reqs %d", this,
(int) outstandingSMRequests_);
if (outstandingSMRequests_ == 0)
setState(ESTABLISHED);
}
// SMConnection::reportControlConnectionError()
//
// ExMasterEspMessage::actOnErrorConnection calls this function after
// an error is encountered on an ESP control connection and we want to
// propagate the error to SM connections.
//
// The method is only called in the master executor and is therefore a
// no-op for server-side connections. The method is also a no-op if
// the connection already encountered errors.
void SMConnection::reportControlConnectionError(GuaErrorNumber error)
{
if (isServer_)
return;
// When this function is called for the first time, store the error
// number in a field specific to control connection errors
if (ccErrorNumber_ == 0)
{
ccErrorNumber_ = error;
ExSMEvent::add(ExSMEvent::ControlConnectionError, &smTarget_,
ccErrorNumber_, smErrorNumber_, 0, (int64_t) this);
EXSM_TRACE(EXSM_TRACE_PROTOCOL,"CONN %p reportControlConnectionError %d",
this, (int) error);
}
// Also store the error in the general-purpose field smErrorNumber_
if (smErrorNumber_ == 0)
smErrorNumber_ = error;
setErrorInfo(-1);
setState(ERROR_STATE);
handleIOErrorForSM();
// The code above that processes pending responses may have set the
// state of the connection to ESTABLISHED as a way to remove the
// connection from the global pendingIOs_ list. It would nice to
// avoid the transition to ESTABLISHED but for now that is not being
// attempted. Instead we simply make sure that the connection is in
// an ERROR state by the time this method returns.
setState(ERROR_STATE);
}
void SMConnection::handleIOErrorForSM()
{
IpcMessageBuffer *sendBuf;
// might need to decrement the ref count of the chunk mode buffer,
// otherwise it might be a memory leak
while (sendBuf = removeNextSendBuffer())
sendBuf->decrRefCount();
IpcMessageBuffer *receiveBuf;
while (receiveBuf = removeNextReceiveBuffer())
getEnvironment()->getAllConnections()->bumpCompletionCount();
removeReceiveStreams();
if (dataStream_)
{
IpcClientMsgStream *dataStream = (IpcClientMsgStream *)dataStream_;
for (Lng32 i=0; i < dataStream->numOfResponsesPending(); i++)
{
decrOutstandingSMRequests();
dataStream->setSMBatchIsComplete(TRUE);
dataStream->internalActOnReceive(NULL, this);
}
}
if (cancelStream_)
{
IpcClientMsgStream *cancelStream = (IpcClientMsgStream *)cancelStream_;
for (Lng32 i=0; i < cancelStream->numOfResponsesPending(); i++)
{
decrOutstandingSMRequests();
cancelStream->internalActOnReceive(NULL, this);
}
}
}
// SMConnection::populateDiagsArea()
//
// This method is a pure virtual function in the IpcConnection parent
// class.
//
// The method gets called by stream objects when a send or receive
// callback is issued and the connection has entered an ERROR
// state. The stream caller provides a diags area in which error
// conditions are written.
//
// The method is a no-op if the connection is not in an ERROR state.
//
void SMConnection::populateDiagsArea(ComDiagsArea *&diags,
CollHeap *diagsHeap)
{
if (smErrorNumber_ == GuaOK)
return;
// Cases to consider
//
// a. An SM NOPEER error was encountered. This means the process at
// the other end of an SM connection went away unexpectedly.
// Generate a descriptive error.
//
// b. Otherwise all other errors are assumed to be Guardian errors
// on an ESP control connection. This assumption might also change
// in the future. For this case do not generate new diagnostics if
// the diags area already contains the error.
// Case (a)
if (smErrorNumber_ == SM_ERR_NOPEER ||
smErrorNumber_ == SM_ERR_NOSERVICE ||
smErrorNumber_ == SM_ERR_NODE_DOWN)
{
IpcAllocateDiagsArea(diags, diagsHeap);
char processName[12];
sprintf(processName, "%d,%d", smTarget_.node, smTarget_.pid);
*diags << DgSqlCode(-EXE_SM_FUNCTION_ERROR)
<< DgString0(smErrorFunction_)
<< DgInt0((Lng32) smErrorNumber_)
<< DgInt1((Lng32) smTarget_.pid)
<< DgString1(processName)
<< DgNskCode((Lng32) 10000 + abs(smErrorNumber_));
}
// Case (b)
else
{
if (diags && diags->contains(-EXE_SM_CONTROL_CONN_ERROR))
return;
IpcAllocateDiagsArea(diags, diagsHeap);
char targetBuf[128];
sprintf(targetBuf, "%d:%d:%" PRId64 ":%d:0x%c",
(int) smTarget_.node, (int) smTarget_.pid, smTarget_.id,
(int) ExSMTag_GetTagWithoutQualifier(smTarget_.tag),
(char) ExSMTag_GetQualifierDisplay(smTarget_.tag));
*diags << DgSqlCode(-EXE_SM_CONTROL_CONN_ERROR)
<< DgString1(targetBuf)
<< DgInt0(smErrorNumber_);
getEnvironment()->getMyOwnProcessId(IPC_DOM_GUA_PHANDLE).
addProcIdToDiagsArea(*diags,0);
}
}
// Several data members must be maintained and kept in sync as the
// connection begins chunk mode, moves to the next chunk, and ends
// chunk mode. The following methods encapsuate the ENTER, NEXT, and
// EXIT operations.
void SMConnection::enterChunkMode(IpcMessageBuffer *sendBuffer,
UInt32 sqlChunkSize)
{
chunk_buffer_ = sendBuffer;
chunk_nextOffset_ = 0;
chunk_size_ = sqlChunkSize;
setAckArrived(false, // Clear the ACK ARRIVED flag
true); // Set the WAITING FOR ACK flag
EXSM_TRACE(EXSM_TRACE_BUFFER, "CONN %p ENTER CHUNK buf %p sz %d", this,
sendBuffer, (int) sqlChunkSize);
}
void SMConnection::moveToNextChunk()
{
chunk_nextOffset_ += chunk_size_;
EXSM_TRACE(EXSM_TRACE_BUFFER, "CONN %p NEXT CHUNK offset %d", this,
(int) chunk_nextOffset_);
}
void SMConnection::exitChunkMode()
{
chunk_buffer_ = NULL;
chunk_nextOffset_ = 0;
chunk_size_ = 0;
setAckArrived(false, // Clear the ACK ARRIVED flag
false); // Clear the WAITING FOR ACK flag
EXSM_TRACE(EXSM_TRACE_BUFFER, "CONN %p EXIT CHUNK", this);
}
// The following two methods manage the ACK ARRIVED attribute of this
// connection. The attribute is actually stored in the task object
// which makes the attribute visible and writeable by the reader
// thread.
void SMConnection::setAckArrived(bool arrived, bool waitForAnother)
{
// Set the ACK ARRIVED flag in the task object
if (smTask_)
smTask_->sendChunk_SetAckArrived(arrived);
// Set the WAITING FOR ACK flag in the connection
chunk_waitingForAck_ = waitForAnother;
}
bool SMConnection::getAckArrived()
{
bool result = false;
if (smTask_)
result = smTask_->sendChunk_GetAckArrived();
return result;
}
void SMConnection::dumpAndStopOtherEnd(bool doDump, bool doStop) const
{
char coreFile[1024];
char processName[MS_MON_MAX_PROCESS_NAME+1];
char seqName[PhandleStringLen];
if (XZFIL_ERR_OK == msg_mon_get_process_name2(smTarget_.node,
smTarget_.pid,
smTarget_.verifier,
processName))
{
sprintf(seqName, "%s:%d", processName, smTarget_.verifier);
if (doDump)
msg_mon_dump_process_name(NULL, seqName, coreFile);
if (doStop)
msg_mon_stop_process_name(seqName);
}
}