blob: 5bf9f99d97e4625d8a90c8b8bb55e832defec45a [file] [log] [blame]
/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
**********************************************************************/
/* -*-C++-*-
*****************************************************************************
*
* File: IpcGuardian.C
* Description: OS related code for Guardian-based IPC (see Ipc.h)
*
* Created: 2/11/96
* Language: C++
*
*
*
*
*****************************************************************************
*/
// -----------------------------------------------------------------------
#define AEVENT 1
#include "Platform.h"
#include <fcntl.h>
#include "ExCollections.h"
#include "Int64.h"
#include "Ipc.h"
#include "str.h"
#include "ComDiags.h"
#include "NAExit.h"
#include "ComRtUtils.h"
#include "PortProcessCalls.h"
#include "logmxevent.h"
#include "MXTraceDef.h"
#include "seabed/fs.h"
#include "seabed/ms.h"
#include "seabed/int/opts.h"
#include <sys/time.h>
#include <errno.h>
extern "C" {
int_16 TMF_SETTXHANDLE_(short *);
}
#include "rosetta/rosgen.h"
#include "nsk/nskprocess.h"
extern "C" {
#include "cextdecs/cextdecs.h"
#include "zsysc.h"
}
#include "fs/feerrors.h"
#include "trafconf/trafconfig.h" // to get TC_PROCESSOR_NAME_MAX
// Uncomment the next line to debug IPC problems (log of client's I/O)
// #define LOG_IPC
// Uncomment the next line to see a trace of $RECEIVE processing. This
// does not require the define LOG_IPC to be turned on.
// #define LOG_RECEIVE
// Uncomment the next line to see a trace of timeouts. This needs the
// define LOG_IPC to be turned on.
//#define LOG_WAIT_TIMEOUT
#if defined(LOG_IPC) || defined(LOG_RECEIVE)
void IpcGuaLogTimestamp(IpcConnection *conn); // see bottom of file
void allocateConsole(); // see bottom of file
#endif
// -----------------------------------------------------------------------
// The real thing starts here
// -----------------------------------------------------------------------
// -----------------------------------------------------------------------
// Guardian startup message (is sent to a process after startup)
// -----------------------------------------------------------------------
/* Guardian STARTUP message format, copied from stdlib.h
(couldn't include stdlib.h here because of possible global data) */
struct IpcStartupMsg
{
short msg_code;
union
{ char whole[16];
struct
{ char volume[8];
char subvolume[8];
} parts;
} defaults;
union
{ char whole[24];
struct
{ char volume[8];
char subvolume[8];
char file[8];
} parts;
} infile;
union
{ char whole[24];
struct
{ char volume[8];
char subvolume[8];
char file[8];
} parts;
} outfile;
char param[530];
// methods
IpcStartupMsg();
}; // IpcStartupMsg
const short GuaIpcStartupMsgCode = -1;
//NGG
static bool sv_checked=false;
static bool sv_trace=false;
#include "seabed/trace.h"
inline static void openTraceFile()
{
if (sv_checked) {
return;
}
sv_checked = true;
char *lv_env= getenv("ESP_TRACE_STARTUP");
if ((lv_env) && (*lv_env=='1')) {
trace_init((char *)"esptrace", true, (char *)"", true);
sv_trace = true;
}
}
#define ESP_TRACE1(s1) if (sv_trace) { trace_printf(s1); }
#define ESP_TRACE2(s1,s2) if (sv_trace) { trace_printf(s1,s2);}
IpcStartupMsg::IpcStartupMsg()
{
// set the correct message code and blank out all other fields
// except the param field which is NULL-terminated
msg_code = GuaIpcStartupMsgCode;
str_pad(defaults.whole,sizeof(defaults));
str_pad(infile.whole,sizeof(infile));
str_pad(outfile.whole,sizeof(outfile));
param[0] = 0;
}
// -----------------------------------------------------------------------
// Methods for class GuaProcessHandle
// -----------------------------------------------------------------------
Lng32 GuaProcessHandle::decompose(Int32 &cpu, Int32 &pin,
Int32 &nodeNumber
, SB_Int64_Type &seqNum
) const
{
// Phandle wrapper in porting layer
NAProcessHandle phandle((SB_Phandle_Type *)&phandle_);
Lng32 result = phandle.decompose();
if (!result) {
cpu = phandle.getCpu();
pin = phandle.getPin();
nodeNumber = phandle.getNodeNumber();
seqNum = phandle.getSeqNum();
}
return result;
}
Int32 GuaProcessHandle::decompose2(Int32 &cpu, Int32 &pin, Int32 &node
, SB_Int64_Type &seqNum
) const
{
return decompose(cpu, pin, node
, seqNum
);
}
NABoolean GuaProcessHandle::compare(const GuaProcessHandle &other) const
{
Int32 guaRetcode = XPROCESSHANDLE_COMPARE_((SB_Phandle_Type *)&phandle_,
(SB_Phandle_Type *)&(other.phandle_));
// 0 means different, 1 means two procs of a process pair (different)
// 2 means the process handles are the same
return (guaRetcode == 2);
}
NABoolean GuaProcessHandle::fromAscii(const char *ascii)
{
int retcode = get_phandle_with_retry((char *)ascii, &phandle_);
if (retcode != FEOK)
return FALSE;
return TRUE;
}
Int32 GuaProcessHandle::toAscii(char *ascii, Int32 asciiLen) const
{
short result;
Int32 guaRetcode =0;
//Phandle wrapper in porting layer
NAProcessHandle phandle((SB_Phandle_Type *)&phandle_);
guaRetcode = phandle.decompose();
memcpy(ascii, phandle.getPhandleString(), phandle.getPhandleStringLen());
result = phandle.getPhandleStringLen();
if (guaRetcode)
{
// This went wrong, return an error message in the string
// NOTE: we often use this method for error processing,
// so this is probably better than raising an exception.
// Ok, so it is just an excuse for the missing error handling.
str_cpy_all(ascii,"Err",3);
str_itoa(guaRetcode,&ascii[3]);
return str_len(ascii);
}
// return the length of the result string (will be 0 if Guardian
// couldn't convert the name)
return result;
}
// -----------------------------------------------------------------------
// Methods for class IpcNodeName
// -----------------------------------------------------------------------
IpcNodeName::IpcNodeName(const GuaProcessHandle &phandle)
{
Lng32 nodeNumber;
short nodeNameLen;
char nodeNameWithBackslash[GuaNodeNameMaxLen+1];
domain_ = IPC_DOM_GUA_PHANDLE;
//Phandle wrapper in porting layer
NAProcessHandle procHandle((SB_Phandle_Type *)&phandle.phandle_);
Int32 err = procHandle.decompose();
assert(err == 0);
nodeNumber = procHandle.getNodeNumber();
nodeNameLen = procHandle.getNodeNameLen();
memcpy(nodeNameWithBackslash, procHandle.getNodeName(), nodeNameLen);
// add the string terminator to the retrieved node name
nodeNameWithBackslash[nodeNameLen] = 0;
// copy the name without the leading backslash to the result
str_cpy(guardianNode_.nodeName_,
&nodeNameWithBackslash[1],
GuaNodeNameMaxLen,
' ');
}
// -----------------------------------------------------------------------
// Methods for class IpcProcessId
// -----------------------------------------------------------------------
IpcCpuNum IpcProcessId::getCpuNumFromPhandle() const
{
//Phandle wrapper in porting layer
NAProcessHandle phandle((SB_Phandle_Type *)&phandle_.phandle_);
Int32 err = phandle.decompose();
assert(err == 0);
return phandle.getCpu();
}
// -----------------------------------------------------------------------
// Methods for class MyGuaProcessHandle
// -----------------------------------------------------------------------
MyGuaProcessHandle::MyGuaProcessHandle()
{
// set the phandle with my own one
//Phandle wrapper in porting layer
NAProcessHandle phandle;
Int32 err = phandle.getmine((SB_Phandle_Type *)&phandle_);
assert(err==0); // only error is bounds error (3)
}
// -----------------------------------------------------------------------
// Methods for class GuaConnectionToServer
// -----------------------------------------------------------------------
GuaConnectionToServer::GuaConnectionToServer(
IpcEnvironment *env,
const IpcProcessId &procId,
NABoolean usesTransactions,
unsigned short nowaitDepth,
const char *eye,
NABoolean parallelOpen,
Int32 *openCompletionScheduled
,
NABoolean dataConnectionToEsp
)
: IpcConnection(env,procId,eye)
{
openFile_ = InvalidGuaFileNumber;
openCompletionScheduled_ = openCompletionScheduled;
nowaitDepth_ = nowaitDepth;
maxIOSize_ = env->getGuaMaxMsgIOSize();
activeIOs_ = new(env) ActiveIOQueueEntry[nowaitDepth_];
for (unsigned short i = 0; i < nowaitDepth_; i++)
{
activeIOs_[i].inUse_ = false;
activeIOs_[i].buffer_ = activeIOs_[i].readBuffer_ = NULL;
activeIOs_[i].ioTag_ = -1;
}
lastAllocatedEntry_ = nowaitDepth_-1;
numOutstandingIOs_ = 0;
partiallySentBuffer_ = NULL;
chunkBytesSent_ = 0;
partiallyReceivedBuffer_ = NULL;
chunkBytesRequested_ = 0;
chunkBytesReceived_ = 0;
usesTransactions_ = usesTransactions;
guaErrorInfo_ = GuaOK;
dataConnectionToEsp_ = dataConnectionToEsp;
self_ = FALSE; // Set at openPhandle time
openRetries_ = 0;
beginOpenTime_.tv_sec = 0;
beginOpenTime_.tv_nsec = 0;
completeOpenTime_.tv_sec = 0;
completeOpenTime_.tv_nsec = 0;
#if 0
sentMsgHdr_ = (char *)env->getHeap()->allocateMemory(sizeof(MsgTraceEntry) * 8);
memset(sentMsgHdr_, 0, sizeof(MsgTraceEntry) * 8);
sentMsgHdrInd_ = 7;
#endif
// We need a nowait depth of at least 2, one for a message and another
// one for out-of-band messages (not really implemented yet).
//assert(nowaitDepth_ >= 2);
sendCallbackBufferList_ = new(env) IpcMessageBuffer*[nowaitDepth_];
for (unsigned short i = 0; i < nowaitDepth_; i++)
sendCallbackBufferList_[i] = NULL;
tscoOpen_ = FALSE;
// now open the server process
openPhandle(NULL, parallelOpen);
}
GuaConnectionToServer::~GuaConnectionToServer()
{
closePhandle();
CollHeap *heap = getEnvironment()->getHeap();
heap->deallocateMemory(activeIOs_);
heap->deallocateMemory(sendCallbackBufferList_);
#if 0
heap->deallocateMemory((void *)sentMsgHdr_);
#endif
}
void GuaConnectionToServer::send(IpcMessageBuffer *buffer)
{
// simply add the new buffer to the send queue and try to start
// as many new I/O operations as possible
queueSendMessage(buffer);
while (tryToStartNewIO())
;
}
void GuaConnectionToServer::receive(IpcMessageStreamBase *msg)
{
// Receiving from a Guardian server is implicit, since the WRITEREADX
// call performs both a send and a receive operation together. However,
// we still need to add the callback and, if the I/O has already
// completed, call the callback.
addReceiveCallback(msg);
// maybe the Guardian I/O has already completed and the buffer is
// waiting in the base class' receive queue
IpcMessageBuffer *receiveBuf;
while (receiveBuf = getNextReceiveQueueEntry())
{
// yes, so just call its callback
receiveBuf->callReceiveCallback(this);
}
}
NABoolean GuaConnectionToServer::moreWaitsAllowed()
{
return !stopWait_;
}
//
// Wait for an I/O reply. After receives a reply, the I/O entry looks like:
//
// - entry.buffer_=entry.readBuffer_=reply buffer
//
WaitReturnStatus GuaConnectionToServer::wait(IpcTimeout timeout, UInt32 *eventConsumed, IpcAwaitiox *ipcAwaitiox)
{
Int32 cpu, pin, nodeNumber;
SB_Int64_Type seqNum = -1;
GuaProcessHandle *otherEnd;
if (getState() != OPENING)
{
// Shouldn't have ipcAwaitiox completion if numOutStandingIOs_ equals zero
assert(numOutstandingIOs_ > 0 || ipcAwaitiox == NULL);
NABoolean retry = TRUE;
MXTRC_FUNC("GCTS::wait");
MXTRC_1("timeout=%d\n", timeout);
// don't do anything if the connection is in an error state and
// there are no more pending requests to work on.
if (getState() == ERROR_STATE AND numOutstandingIOs_ <= 0
AND numQueuedSendMessages() <= 0)
{ // no more waits on this connection
stopWait(TRUE);
return WAIT_OK;
}
stopWait(FALSE);
// try to send or receive first if there is a timeout specified,
// or if we don't have any IOs outstanding. Also, try more
// I/O to cover the special case that we've posted only the
// first chunk of an multi-chunk msg.
// This latter scenario is possible because of the logic on
// tryToStartNewIO that gives up if the per-process limit on
// MQCs would be exceeded. It required this special handling
// for multichunk messages, as explained in the next paragraph.
// The connection may have to give up either 1) before the
// first chunk is posted, or 2) before the second chunk is
// posted or 3) after the second chunk is posted.
// The tryToStartNewIO method is called from this class's send
// method and then also from two places in this wait method.
// In case 1), the test of numOutStandingIOs_ will be
// sufficient to cause tryToStartNewIO to be called directly
// below this comment. For case 3), the other call to
// tryToStartNewIO will be taken, because the a_message_is_done
// will be set to true after the server replies to the chunk(s)
// already posted. But for case 2), we need special handling,
// because the server does not reply to the first chunk until
// all chunks are sent. And this class's send method will not
// be called again for the multichunk message. So the additional
// test below to detect that the second chunk has not yet been
// sent, is needed to force the call to tryToStartNewIO.
if (timeout != IpcImmediately ||
numOutstandingIOs_ == 0 ||
chunkBytesSent_ == maxIOSize_)
while (tryToStartNewIO())
;
// try to complete I/Os within the given time limit,
// if there are outstanding I/Os
if (numOutstandingIOs_ == 0)
return WAIT_OK;
IpcMessageBufferPtr bufferAddr = NULL;
_bcc_status stat;
Int32 countRead;
SB_Tag_Type ioTag = -1;
while (retry)
{
NABoolean ipcAwaitioxCompleted = ipcAwaitiox != NULL;
if (ipcAwaitioxCompleted)
ipcAwaitioxCompleted = ipcAwaitiox->getCompleted();
if (!ipcAwaitioxCompleted)
{
if (tscoOpen_)
stat = BAWAITIOXTS(&openFile_,
(void **) &bufferAddr,
&countRead,
&ioTag,
timeout,
OMIT);
else
stat = BAWAITIOX(&openFile_,
(void **) &bufferAddr,
&countRead,
&ioTag,
timeout,
OMIT);
}
else
stat = ipcAwaitiox->ActOnAwaitiox((void **)&bufferAddr,
&countRead,
&ioTag);
MXTRC_3("GCTS::wait awake filenum=%d bufferAddr=%x ioTag=%d\n", openFile_, bufferAddr, ioTag);
#ifdef LOG_WAIT_TIMEOUT
IpcGuaLogTimestamp(this);
cerr << "GCTS:timeout = " << timeout << " ioTag = " << ioTag << endl;
#endif
// Only retry if FE_EINTR error and breakEnabled is FALSE.
retry = FALSE;
if (_status_ne(stat))
{
// get a Guardian error code
Int32 retcode = BFILE_GETINFO_(openFile_,&guaErrorInfo_);
if (retcode != 0)
guaErrorInfo_ = retcode; // not even FILE_GETINFO_ worked
// timeout does not set the connection into an error state
// but it causes a return
if (guaErrorInfo_ == GuaTimeoutErr)
{
guaErrorInfo_ = GuaOK;
return WAIT_OK;
}
// Are there any cases where we need to retry? $$$$
// Retry if error is FE_EINTR (4004) and not breakEnabled
if ((guaErrorInfo_ == 4004) && !getEnvironment()->breakEnabled())
{
guaErrorInfo_ = GuaOK;
retry = TRUE;
}
// assert if a server asserted because it received more than
// total message length within the multi-chunk protocol
assert(guaErrorInfo_ != FEASSERTNUMBASE);
}
} // while (retry)
// An I/O completed at the address bufferAddr with the returned I/O tag.
// now try to find the matching message buffer for it.
getEnvironment()->setEvent(TRUE, AEVENT);
if (!guaErrorInfo_)
{
assert(ioTag >= 0 && ioTag < (Lng32)nowaitDepth_);
ActiveIOQueueEntry &entry = activeIOs_[ioTag];
assert(entry.inUse_ && ioTag == (Lng32)entry.ioTag_);
// make sure we actually received the buffer that we expected
IpcMessageBuffer *writeReadBuffer = entry.readBuffer_;
if (writeReadBuffer == NULL)
// only needed for sending 2nd or later chunks
writeReadBuffer = entry.buffer_;
assert(bufferAddr == writeReadBuffer->data(entry.offset_));
// we have got the reply for this I/O entry. so set the io tag to -1.
entry.ioTag_ = -1;
}
// If we got an error,the tag may or may not be a valid one.
// So check for both cases below
if (guaErrorInfo_)
{
setErrorInfo(-1);
if ((ioTag >= 0) && (ioTag < (Lng32(nowaitDepth_))))
{
// valid tag returned from BAWAITIOX
ActiveIOQueueEntry &entry = activeIOs_[ioTag];
assert(entry.inUse_ && ioTag == (Lng32)entry.ioTag_);
handleIOErrorForEntry(entry);
}
else
// We didn't get back any valid iotag from BAWAITIOX so we don't have
// any specific entry to handle the error for . This happens for
//cases where no I/O completes - eg error 26,160,22,16.
// So put all active IO entries in error state. This only happens for
// what we consider are very fatal errors so this is ok.
handleIOError();
}
else
{
ActiveIOQueueEntry &entry = activeIOs_[ioTag];
cleanUpActiveIOEntry(entry);
//
// after receiving the reply, we need figure out what the use case is
// in order to decide what to do next:
//
// a) we are in the multi-chunk send mode and we just received an
// empty reply. in this case we ignore the empty reply and go to
// tryToStartNewIO(), which will send the next chunk or become a
// no-op if there is no more chunk to send.
//
// b) if the reply buffer contains the first (maybe only) chunk of the
// reply then:
//
// b.1) - if this is a single chunk reply, then put the reply buffer
// on the receive queue.
// b.2) - if this is a multi-chunk reply, switch to the multi-chunk
// receive protocol. go to tryToStartNewIO() and send an
// empty request to receive the next chunk of reply.
//
// c) if the reply buffer contains a subsequent chunk of a multi-chunk
// reply, then:
//
// c.1) - if this is the large chunk of the reply, then put the
// reply buffer on the receive queue.
// c.2) - otherwise, this is a middle chunk of the reply. go to
// tryToStartNewIO() and send an empty request to receive
// the next chunk of reply.
//
if (entry.receiveBufferSizeLeft_ == 0)
{
// case a) - received empty reply during multi-chunk send mode
assert(countRead == 0);
// Note that this does NOT count as a completion, we don't
// let the upper layers know that we are using multiple
// Guardian I/Os for this.
}
else
{
// we did expect data back, case b) or c)
// If this is the first (maybe only) chunk of an IpcMessageBuffer
// then determine the length of the total message by looking into
// the message header.
if (entry.offset_ == 0)
{
if (entry.buffer_ != entry.readBuffer_)
{
// no longer need the shared send buffer. release it.
entry.buffer_->decrRefCount();
// now use only the reply buffer
entry.buffer_ = entry.readBuffer_;
}
// since this is the first (maybe only) chunk of the reply,
// it has a message header that contains the total length of
// the reply.
// Get the size of the message sent (or the reply buffer if shared)
IpcMessageObjSize bytesSent = entry.buffer_->getMessageLength();
// unpack message header which contains total message length
InternalMsgHdrInfoStruct *msgHdr =
new( (IpcMessageObj*)(entry.buffer_->data(0)) )
InternalMsgHdrInfoStruct(NULL);
IpcMessageObjSize msgLen = msgHdr->getMsgLengthFromData();
// remember the real length of the message coming back
entry.buffer_->setMessageLength(msgLen);
// check whether this is case b) or c)
if (msgLen == (IpcMessageObjSize) countRead)
{
// case b.1) - this is a single-chunk reply.
// If we were sending a large buffer (more than one chunk)
// and just received a small buffer (one chunk) then
// release the large buffer to conserve space on the IPC
// heap.
if (bytesSent > maxIOSize_)
entry.buffer_ = entry.buffer_->resize(getEnvironment(), msgLen);
// jdu 01/24/12 - need more work to get the message info right
// env()->addIpcMsgTrace(this, IpcEnvironment::RECEIVE,
// (void *)entry.buffer_, msgLen,
// (msgHdr->isLastMsgBuf()? 1: 0),
// msgHdr->getSeqNum());
queueReceiveMessage(entry.buffer_);
}
else
{
// Case b.2) - this is the first chunk of a multi-chunk
// reply. Switch to the multi-chunk receive protocol.
// we just received countRead bytes of reply from server.
if (msgLen > entry.buffer_->getBufferLength() ||
bytesSent > maxIOSize_)
{
// We want to resize the reply buffer if either
// - The server has a reply message that is larger than
// what our buffer can hold
// - The request buffer was large (more than one chunk)
// and may now be consuming space unnecessarily on the
// IPC heap
entry.buffer_->setMessageLength(countRead);
entry.buffer_ = entry.buffer_->resize(getEnvironment(), msgLen);
entry.buffer_->setMessageLength(msgLen);
}
// do some sanity checks, make sure we don't have two
// partial buffers at a time
if (partiallyReceivedBuffer_ != NULL)
reportBadMessage();
assert(partiallyReceivedBuffer_ == NULL);
if (msgLen <= (IpcMessageObjSize) countRead)
reportBadMessage();
assert(msgLen > (IpcMessageObjSize) countRead);
// move some information from the entry to data members
// in the connection while the chunky protocol is going on
partiallyReceivedBuffer_ = entry.buffer_;
chunkBytesRequested_ = countRead;
chunkBytesReceived_ = countRead;
getEnvironment()->getAllConnections()->
setReceivedPartialMessage(TRUE);
}
} // case b) - first (maybe only) chunk
else
{
// case c) - this is not the first chunk
if (partiallyReceivedBuffer_ != entry.buffer_)
reportBadMessage();
assert (partiallyReceivedBuffer_ == entry.buffer_);
chunkBytesReceived_ += countRead;
if (chunkBytesReceived_ == entry.buffer_->getMessageLength())
{
// case c.1) - this is the last chunk
// jdu 01/24/12 - need more work to get the message info right
// env()->addIpcMsgTrace(this, IpcEnvironment::RECEIVE,
// (void *)entry.buffer_,
// chunkBytesReceived_, 1, 0);
queueReceiveMessage(partiallyReceivedBuffer_);
partiallyReceivedBuffer_ = NULL;
chunkBytesRequested_ = 0;
chunkBytesReceived_ = 0;
}
else
{
// case c.2) - this is a middle chunk with more chunks to
// follow
getEnvironment()->getAllConnections()->
setReceivedPartialMessage(TRUE);
}
} // case c) - not the first chunk
} // case b) or c)
// this I/O completed
if (getState() != ERROR_STATE && numOutstandingIOs_ == 0)
setState(ESTABLISHED);
// after waiting, try (again) to start as many new I/O operations as
// possible
while (tryToStartNewIO())
;
} // if (guaErrorInfo_) else ..
// check the message buffers on the receive queue and invoke callbacks
// for any matching message streams
IpcMessageBuffer *receiveBuf;
NABoolean aCallbackIsCalled = FALSE;
while (receiveBuf = getNextReceiveQueueEntry())
{
// When the user of this connection sets trustIncomingBuffers_ to
// FALSE then we perform an integrity check on all incoming
// message buffers. A failure causes the connection to transition
// to the ERROR_STATE state.
if (!getTrustIncomingBuffers() && getState() != ERROR_STATE)
{
if (!receiveBuf->verifyBackbone())
{
setIpcMsgBufCheckFailed(TRUE);
guaErrorInfo_ = 0;
setErrorInfo(-1);
setState(ERROR_STATE);
}
}
receiveBuf->callReceiveCallback(this);
aCallbackIsCalled = TRUE;
}
// In the ERROR_STATE state we may have to announce I/O completion after
// callbacks are issued. The setState() method has the job of
// detecting when I/O is complete and informing the IpcEnviroment at
// the appropriate time. Even though it's not intuitive to call
// setState(ERROR_STATE) here (because we are already in the ERROR_STATE state),
// we make the call anyway to trigger any necessary bookkeeping.
if (aCallbackIsCalled && getState() == ERROR_STATE)
setState(ERROR_STATE);
return WAIT_OK;
} //getState() == OPENING
else
{
IpcMessageBufferPtr bufferAddr;
Int32 countRead;
SB_Tag_Type ioTag = -1;
NABoolean ipcAwaitioxCompleted = ipcAwaitiox != NULL;
if (ipcAwaitioxCompleted)
ipcAwaitioxCompleted = ipcAwaitiox->getCompleted();
_bcc_status stat;
if (!ipcAwaitioxCompleted)
{
if (tscoOpen_)
stat = BAWAITIOXTS(&openFile_,
(void **) &bufferAddr,
&countRead,
&ioTag,
timeout,
OMIT);
else
stat = BAWAITIOX(&openFile_,
(void **) &bufferAddr,
&countRead,
&ioTag,
timeout,
OMIT);
}
else
stat = ipcAwaitiox->ActOnAwaitiox((void **)&bufferAddr,
&countRead,
&ioTag);
MXTRC_3("GCTS::wait awake filenum=%d bufferAddr=%x ioTag=%d\n", openFile_, bufferAddr, ioTag);
#ifdef LOG_WAIT_TIMEOUT
IpcGuaLogTimestamp(this);
cerr << "GCTS:timeout = " << timeout << " ioTag = " << ioTag << endl;
#endif
if (_status_ne(stat))
{
// get a Seabed error code
GuaErrorNumber getinfoError = BFILE_GETINFO_(openFile_,&guaErrorInfo_);
if (getinfoError != 0)
guaErrorInfo_ = getinfoError; // not even FILE_GETINFO_ worked
if (guaErrorInfo_ == GuaTimeoutErr)
{
guaErrorInfo_ = XZFIL_ERR_OK;
return WAIT_OK;
}
short fsError = BFILE_CLOSE_(openFile_); // Don't retain unopened ACB
otherEnd = (GuaProcessHandle *)&getOtherEnd().getPhandle().phandle_;
otherEnd->decompose(cpu, pin, nodeNumber
, seqNum
);
getEnvironment()->closeTrace(__LINE__, openFile_, cpu, pin, seqNum);
openFile_ = -1; // Don't leave valid file number in object!
if (guaErrorInfo_ == XZFIL_ERR_NOSUCHDEV && getState() == OPENING && getOpenRetries() < 8 && dataConnectionToEsp_)
{
guaErrorInfo_ = XZFIL_ERR_OK;
setState(INITIAL);
setOpenRetries(getOpenRetries() + 1);
usleep(250000);
openPhandle(NULL, TRUE);
return WAIT_OK;
}
openRetryCleanup();
setErrorInfo(-1);
setState(ERROR_STATE);
getEnvironment()->getAllConnections()->bumpCompletionCount();
if (openCompletionScheduled_ != NULL)
*openCompletionScheduled_ = 1;
return WAIT_OK;
}
// Successful completion
openRetryCleanup();
getEnvironment()->getAllConnections()->bumpCompletionCount();
if (openCompletionScheduled_ != NULL)
*openCompletionScheduled_ = 1;
fileNumForIOCompletion_ = openFile_;
// use setmode 74 to turn off the automatic CANCEL upon AWAITIOX timeout
stat = BSETMODE(openFile_,74,-1);
if (_status_ne(stat))
{
// get a Guardian error code
Int32 errcode2 = BFILE_GETINFO_(openFile_,&guaErrorInfo_);
if (errcode2 != 0)
guaErrorInfo_ = errcode2; // not even FILE_GETINFO_ worked
setErrorInfo(-1);
setState(ERROR_STATE);
return WAIT_OK;
}
// use setmode 30 to allow I/O operations to finish in any order
stat = BSETMODE(openFile_,30,3);
if (_status_ne(stat))
{
// get a Guardian error code
Int32 errcode2 = BFILE_GETINFO_(openFile_,&guaErrorInfo_);
if (errcode2 != 0)
guaErrorInfo_ = errcode2; // not even FILE_GETINFO_ worked
setErrorInfo(-1);
setState(ERROR_STATE);
return WAIT_OK;
}
// use setmode 117 if no transactions should be propagated to the server
if (NOT usesTransactions_)
{
_cc_status stat = BSETMODE(openFile_,117,1);
if (_status_ne(stat))
{
// get a Guardian error code
Int32 errcode2 = BFILE_GETINFO_(openFile_,&guaErrorInfo_);
if (errcode2 != 0)
guaErrorInfo_ = errcode2; // not even FILE_GETINFO_ worked
setErrorInfo(-1);
setState(ERROR_STATE);
return WAIT_OK;;
}
}
// the connection is established now
setState(ESTABLISHED);
clock_gettime(CLOCK_REALTIME, &completeOpenTime_);
return WAIT_OK;
}
}
void GuaConnectionToServer::openRetryCleanup()
{
char msgBuf[100];
Int32 cpu, pin, nodeNumber;
SB_Int64_Type seqNum = -1;
if (getOpenRetries())
{
((GuaProcessHandle *)&getOtherEnd().getPhandle().phandle_)->
decompose(cpu, pin, nodeNumber
, seqNum
);
str_sprintf(msgBuf,
"GuaConnectionToServer: BFILE_OPEN %d,%d,%ld "
"error 14 retry count = %d\n", cpu, pin, seqNum, getOpenRetries());
SQLMXLoggingArea::logExecRtInfo(__FILE__, __LINE__, msgBuf, 0);
setOpenRetries(0);
}
}
GuaConnectionToServer * GuaConnectionToServer::castToGuaConnectionToServer()
{
return this;
}
Int32 GuaConnectionToServer::numQueuedSendMessages()
{
return sendQueueEntries();
}
Int32 GuaConnectionToServer::numQueuedReceiveMessages()
{
return receiveQueueEntries();
}
void GuaConnectionToServer::populateDiagsArea(ComDiagsArea *&diags,
CollHeap *diagsHeap)
{
if (guaErrorInfo_ != GuaOK)
{
IpcAllocateDiagsArea(diags,diagsHeap);
*diags << DgSqlCode(-2034) << DgInt0(guaErrorInfo_);
*diags << DgNskCode(guaErrorInfo_);
getEnvironment()->getMyOwnProcessId(IPC_DOM_GUA_PHANDLE).
addProcIdToDiagsArea(*diags,0);
getOtherEnd().addProcIdToDiagsArea(*diags,1);
if (guaErrorInfo_ == FETIMEDOUT)
{
static __thread bool bugcatcherInitialized = false;
static __thread bool doBugCatcher = true;
if (!bugcatcherInitialized)
{
bugcatcherInitialized = true;
char *dbc = getenv("ESP_TIMEOUT_BUGCATCHER");
if (dbc && (*dbc != '1'))
doBugCatcher = false;
}
if (doBugCatcher)
{
getOtherEnd().getPhandle().dumpAndStop(TRUE, FALSE);
genLinuxCorefile("Timeout on ESP.");
}
}
}
if (getIpcMsgBufCheckFailed())
{
IpcAllocateDiagsArea(diags, diagsHeap);
*diags << DgSqlCode(-2037);
getEnvironment()->getMyOwnProcessId(IPC_DOM_GUA_PHANDLE).
addProcIdToDiagsArea(*diags, 0);
getOtherEnd().addProcIdToDiagsArea(*diags, 1);
}
}
//
// Send a new I/O request.
//
// In Guardian each msg I/O requires a write buffer to send the request, and
// a read buffer used to receive the reply. A same buffer can be used as both
// the write buffer and the read buffer. here we will refer the write buffer
// as the "send buffer", and the read buffer as the "reply buffer".
//
// In our implementation we use ActiveIOQueueEntry to represent a msg I/O.
// entry.buffer_ and entry.readBuffer_ can be used as the send buffer and the
// reply buffer. however, the values of entry.buffer_ and entry.readBuffer_
// depend on the type of the message buffer to be sent. There are four
// possible scenarios:
//
// 1. multi-chunk send buffer, shared by multiple connections:
//
// - first chunk: entry.buffer_=send buffer
// entry.readBuffer_=reply buffer (copied from send buffer's
// first chunk)
// - subsequent chunks: entry.buffer_=send buffer, entry.readBuffer_=NULL
//
// note: for the first chunk, both entry.buffer_ and entry.readBuffer_ have
// identical contents as the send buffer. however, entry.readBuffer_
// must be used as the write buffer during the actual message send,
// because it contains the message header that includes the send
// sequence number that must not be shared between connections.
// entry.buffer_, in this case, is not used at all during message send.
// we have to save it only because after we receive the reply, we need
// to call entry.buffer_->decrRefCount() to free the shared send buffer.
//
// 2. multi-chunk send buffer, single connection (not shared):
//
// - first chunk: entry.buffer_=entry.readBuffer_=send buffer
// - subsequent chunks: entry.buffer_=send buffer, entry.readBuffer_=NULL
//
// 3. single-chunk send buffer, shared by multiple connections
//
// - entry.buffer_=entry.readBuffer_=reply buffer
//
// note: send buffer is released immediately after the send.
//
// 4. single-chunk send buffer, single connection (not shared):
//
// - entry.buffer_=entry.readBuffer_=send buffer
//
NABoolean GuaConnectionToServer::tryToStartNewIO()
{
if (getState() == OPENING)
openPhandle(NULL); // Complete open on control connection and
// temporarily ignore errors
// Any more messages or parts of messages to send?
// There is nothing to do if there are neither new messages nor
// incompleted partial messages.
if (sendQueueEntries() == 0 && !partiallySentBuffer_ &&
!partiallyReceivedBuffer_)
return FALSE;
// do not allow new send if a partial message is being received and we
// already have requested all the reply data for it.
if (partiallyReceivedBuffer_ &&
partiallyReceivedBuffer_->getMessageLength() == chunkBytesRequested_)
return FALSE;
// Can't have more than nowaitDepth_ - 1 I/Os outstanding, except
// when there is an out-of-band message for which we make an exception.
// If there is an out-of-Band message then assume that it was placed
// in front of the send queue.
if (numOutstandingIOs_ >=
(IFX (partiallySentBuffer_ OR partiallyReceivedBuffer_)
THENX nowaitDepth_
ELSEX (nowaitDepth_-1)))
{
MXTRC_1("GCTS::tryToStartNewIO false numOutstandingIOs_=%d\n", numOutstandingIOs_);
return FALSE;
}
// Check if the per-process limit on MQCs is exceeded.
// Note that this should be reconsidered when this code is
// multithreaded.
short numMsgsActual;
if (XMESSAGESYSTEMINFO(5, &numMsgsActual))
assert(0);
if (numMsgsActual+1 >= getEnvironment()->getMaxPerProcessMQCs())
return FALSE;
// If we reach here this means we can start another nowait I/O;
// get to the outstanding I/O entry that is to be filled next.
#ifndef NDEBUG
NABoolean wrapAroundCheck = FALSE;
#endif
// We may have to return early from this method if we cannot find
// space on the IPC heap for an outgoing buffer. If that happens
// we'll want to restore the original value of lastAllocatedEntry_.
unsigned short originalLastAllocated = lastAllocatedEntry_;
// find an entry that is not in use
while (activeIOs_[lastAllocatedEntry_].inUse_)
{
// increment lastAllocatedEntry_ modulo nowaitDepth_
lastAllocatedEntry_++;
if (lastAllocatedEntry_ == nowaitDepth_)
{
lastAllocatedEntry_ = 0;
#ifndef NDEBUG
assert(!wrapAroundCheck); // to detect infinite loop (shouldn't happen)
wrapAroundCheck = TRUE;
#endif
}
}
// we have found an entry that is not in use
ActiveIOQueueEntry &entry = activeIOs_[lastAllocatedEntry_];
assert(!entry.inUse_ && entry.ioTag_ == -1);
// ---------------------------------------------------------------------
// set up a new outstanding IO entry, depending on what to do next
// but don't start the corresponding I/O quite yet
// ---------------------------------------------------------------------
// initialize all fields of the entry (there is no constructor)
entry.bytesSent_ = 0;
entry.receiveBufferSizeLeft_ = 0;
entry.offset_ = 0;
// These help keep track of the need to callSendCallback.
NABoolean isFirstChunk = FALSE;
NABoolean isLastChunk = FALSE;
// ---------------------------------------------------------------------
// Decide what to do, depending on the currently pending buffers and
// IOs:
//
// a) send another chunk of a large message down to the server without
// asking for data back
// b) request some more data from the server, if the server replied
// with a partial message and we haven't asked for all of the
// rest of the data yet (never interleave this with I/Os of
// step a), so the server won't get confused)
// c) get another message from the send queue and find out that it
// is too long for a single chunk, so send the first piece
// d) should be the normal case, get the next message from the send
// queue and send it in a single chunk
// ---------------------------------------------------------------------
if (partiallySentBuffer_)
{
// case a) continue sending more chunks for this buffer
// but don't ask for reply data, since we want the reply to
// come back at entry.buffer_->data(0)
entry.buffer_ = partiallySentBuffer_;
// for multi-chunk buffer, whether shared or not, the read buffer for
// any chunk after first chunk is NULL.
entry.readBuffer_ = NULL;
assert(chunkBytesSent_ < entry.buffer_->getMessageLength());
entry.bytesSent_ = MINOF(maxIOSize_,
entry.buffer_->getMessageLength() -
chunkBytesSent_);
entry.offset_ = chunkBytesSent_;
chunkBytesSent_ += entry.bytesSent_;
// if this is the last chunk ...
if (chunkBytesSent_ >= entry.buffer_->getMessageLength())
{
// we're done sending chunks
partiallySentBuffer_ = NULL;
chunkBytesSent_ = 0;
// can call the send callback now
isLastChunk = TRUE;
}
lastSentBuffer_ = entry.buffer_;
}
else if (partiallyReceivedBuffer_)
{
// b) next thing to do is to receive another chunk from the server
entry.buffer_ = entry.readBuffer_ = partiallyReceivedBuffer_;
entry.offset_ = chunkBytesRequested_;
entry.receiveBufferSizeLeft_ =
MINOF(maxIOSize_,
entry.buffer_->getMessageLength() - chunkBytesRequested_);
chunkBytesRequested_ += entry.receiveBufferSizeLeft_;
lastSentBuffer_ = entry.buffer_;
}
else
{
// get the next buffer to send from the send queue and check
// whether it can be sent in one piece
assert(sendQueueEntries() > 0);
IpcMessageBuffer *nextToSend = sendQueue()[0];
assert(nextToSend);
// assume request and reply use same buffer
entry.buffer_ = entry.readBuffer_ = nextToSend;
isFirstChunk = TRUE;
if (entry.buffer_->getRefCount() > 1)
{
// The send buffer is shared by multiple connections. Therefore,
// allocate a different buffer for the reply.
entry.readBuffer_ = entry.buffer_->
copyFromOffset(getEnvironment(), maxIOSize_, 0, FALSE);
if (entry.readBuffer_ == NULL)
{
// We ran out of space on the IPC heap ...
getEnvironment()->setHeapFullFlag(TRUE);
lastAllocatedEntry_ = originalLastAllocated;
return FALSE;
}
}
if (entry.buffer_->getMessageLength() > maxIOSize_)
{
// case c), the message we just got from the send queue is too large
// to be sent in a single chunk :-(
assert(partiallySentBuffer_ == NULL);
// indicate multi-chunk protocol
partiallySentBuffer_ = entry.buffer_;
entry.bytesSent_ = maxIOSize_;
chunkBytesSent_ = entry.bytesSent_;
}
else
{
// case d) can send in single chunk
entry.bytesSent_ = entry.buffer_->getMessageLength();
// can call the send callback now
isLastChunk = TRUE;
}
// we always use the reply buffer to send the first (maybe only) chunk,
// for following reasons:
//
// - for multi-chunk shared send buffer, we MUST use the reply buffer
// (entry.readBuffer_) to send its first chunk because
// prepareSendBuffer() sets the sequence number in the message header.
//
// - for other types of send buffers, entry.buffer_=entry.readBuffer_
// is always true for the first chunk.
//
prepareSendBuffer(entry.readBuffer_);
// got this far so de-queue buffer from this connection's queue
removeNextSendBuffer();
entry.receiveBufferSizeLeft_ =
MINOF(maxIOSize_,entry.readBuffer_->getBufferLength());
lastSentBuffer_ = entry.readBuffer_;
}
// ---------------------------------------------------------------------
// Next, start the I/O operation
// ---------------------------------------------------------------------
// WRITEREADX requires we use the same buffer for both write and read
IpcMessageBuffer *writeReadBuffer = entry.readBuffer_;
if (writeReadBuffer == NULL)
// only needed for sending 2nd or later chunks
writeReadBuffer = entry.buffer_;
short retryCount = 0;
NABoolean needToRetry; // reset on each iteration of do loop.
short fsError = 0;
do {
Int32 dummyCountRead; // (gps 6/3/09 changed from unsigned short to int on Linux)
_bcc_status stat = BWRITEREADX(
openFile_,
(char *)writeReadBuffer->data(entry.offset_),
entry.bytesSent_,
entry.receiveBufferSizeLeft_,
&dummyCountRead,
lastAllocatedEntry_);
needToRetry = FALSE;
if (_status_ne(stat))
{
// get a Guardian error code
short retcode = BFILE_GETINFO_(openFile_,&fsError);
if (retcode != 0)
fsError = retcode; // not even FILE_GETINFO_ worked
if ((fsError == FENOLCB) &&
(retryCount < 100*60)) // after 60 seconds (and 6000 retries),
// just give up.
{
// Since the per-process limit was checked above,
// assume that it is the per-cpu limit, so let us
// retry.
retryCount++;
getEnvironment()->incrRetriedMessages();
needToRetry = TRUE;
DELAY(1); // 1 centisecond
}
}
else
{
if (entry.bytesSent_ >= sizeof(InternalMsgHdrInfoStruct))
{
InternalMsgHdrInfoStruct *imhis = (InternalMsgHdrInfoStruct *)
writeReadBuffer->data(entry.offset_);
env()->addIpcMsgTrace(this, IpcEnvironment::SEND,
(void *)writeReadBuffer->data(entry.offset_),
entry.bytesSent_,
(imhis->isLastMsgBuf()? 1: 0),
imhis->getSeqNum());
#if 0
if (sentMsgHdrInd_ == 7)
sentMsgHdrInd_ = 0;
else
sentMsgHdrInd_ += 1;
MsgTraceEntry *msgTraceEntry = (MsgTraceEntry *)(sentMsgHdr_ + sizeof(MsgTraceEntry) * sentMsgHdrInd_);
memcpy((void *)&msgTraceEntry->internalMsgHdrInfoStruct_, (void *)writeReadBuffer->data(entry.offset_), sizeof(InternalMsgHdrInfoStruct));
msgTraceEntry->bufAddr_ = (void *)writeReadBuffer->data(entry.offset_);
msgTraceEntry->sentReceivedLength_ = (unsigned int)entry.bytesSent_;
#endif
}
fsError = 0;
}
} while (needToRetry);
if (isFirstChunk)
addSendCallbackBuffer(entry.buffer_);
if (fsError)
{
// an error happened somewhere along the way and we must
// a) record the Guardian error number,
guaErrorInfo_ = fsError;
// b) set the connection to the error state. If we have not invoked
// the send callback, then handleIOErrorForEntry() will invoke the
// send callback.
setErrorInfo(-1);
handleIOErrorForEntry(entry);
// if the design is to disallow any future i/o after connection become
// error state, should we return false to prevent from calling
// tryToStartNewIO() again?
//return FALSE;
}
else
{
// buffer has been sent successfully
if (numOutstandingIOs_ == 0)
setState(SENDING);
numOutstandingIOs_++;
entry.inUse_ = true; // this entry now has an I/O in progress
entry.ioTag_ = (short)lastAllocatedEntry_;
// --------------------------------------------------------------
// If we started the I/O for all chunks of an IpcMessageBuffer
// (or if the IpcMessageBuffer was sent in a single message), then
// call its send callback.
// --------------------------------------------------------------
if (isLastChunk)
{
// removeSendCallbackBuffer() should always return TRUE here because
// send callback has not been invoked yet. If error occurred prior to
// this method call then handleIOErrorForEntry() should have cleared
// all I/Os on the same message stream and we would not have come
// here.
NABoolean sendCallbackFlag = removeSendCallbackBuffer(entry.buffer_);
if (sendCallbackFlag)
// the send callback doesn't give away entry.buffer_, and this is
// good since the same buffer may be still used for the receive
// operation.
entry.buffer_->callSendCallback(this);
else // - for debugging only
assert(sendCallbackFlag);
}
}
return TRUE;
}
void GuaConnectionToServer::openPhandle(char * processName, NABoolean parallelOpen)
{
IpcEnvironment *env = getEnvironment();
short openFlags;
openFlags = nowaitDepth_ == 0 ? 0x0 : 0x4000;
IpcConnectionState stateOnEntry = getState();
if (stateOnEntry == INITIAL)
{
char procFileName[IpcMaxGuardianPathNameLength];
short procFileNameLen;
short i, lastError;
Int32 countRead;
// If there are any and it's a data connection, use them even if
// ssd turned persistent opens off
if (env->getPersistentOpenAssigned() > 0 && dataConnectionToEsp_)
{
NABoolean success = FALSE;
short fileNum, persistentIndex;
GuaProcessHandle *otherEnd = (GuaProcessHandle *)&getOtherEnd().getPhandle().phandle_;
fileNum = env->getPersistentOpenInfo(otherEnd, &persistentIndex);
if (persistentIndex > -1)
{
openFile_ = fileNum;
{
setSendPersistentOpenReconnect(TRUE);
success = true;
}
if (success)
{
env->resetPersistentOpen(persistentIndex);
setState(ESTABLISHED);
fileNumForIOCompletion_ = openFile_;
return;
}
}
}
phandle_template* lp_phandle = (phandle_template *) &(getOtherEnd().getPhandle().phandle_);
memset(procFileName, 0, IpcMaxGuardianPathNameLength);
char *srcName = (char *) lp_phandle;
// strncpy(procFileName, (char *) lp_phandle->verifierF(), 8);
NAProcessHandle phandle((SB_Phandle_Type *)
&(getOtherEnd().getPhandle().phandle_));
phandle.decompose();
procFileNameLen = phandle.getPhandleStringLen();
strncpy(procFileName, phandle.getPhandleString(), procFileNameLen);
MXTRC_1("GCTS::openPhandle procFileName=%s\n", procFileName);
NABoolean isEsp = getEnvironment()->getAllConnections()->getPendingIOs().isEsp();
getEnvironment()->setLdoneConsumed(TRUE);
// multi fragment esp
ESP_TRACE2("GCTS: OpenPhandle: %s ", procFileName);
clock_gettime(CLOCK_REALTIME, &beginOpenTime_);
if (strcmp(getEnvironment()->myProcessName(),
procFileName) == 0) {
ESP_TRACE1("SELF");
guaErrorInfo_ = BFILE_OPEN_SELF_(&openFile_,
0, // open for read/write access
0, // shared access
nowaitDepth_,
0, // sync depth 0 (target proc is not NonStop)
openFlags); // options
self_ = TRUE;
}
else
{
if (! isEsp)
{
openFlags = openFlags | 0x400; // Thread specific completion TSCO
tscoOpen_ = TRUE;
}
// multi fragment esp
guaErrorInfo_ = BFILE_OPEN_(procFileName,
procFileNameLen,
&openFile_,
0, // open for read/write access
0, // shared access
nowaitDepth_,
0, // sync depth 0 (target proc is not NonStop)
openFlags); // options
// retcode = gettimeofday(&tv2, 0);
// elapsedTime = (tv2.tv_sec - tv1.tv_sec) * 1000000 + tv2.tv_usec - tv1.tv_usec;
}
if (guaErrorInfo_ != GuaOK)
{
setErrorInfo(-1);
setState(ERROR_STATE);
return;
}
#ifdef NA_NO_SUCH_PLATFORM // Change to platform such a NA_WINNT to enable
char messageBuffer[80];
memcpy(&messageBuffer[0], procFileName, procFileNameLen);
memset((void *)&messageBuffer[procFileNameLen], '\0', 1);
cout << messageBuffer << endl;
cout.flush();
#endif
if (parallelOpen && (openFlags & 0x4000))
{
setState(OPENING);
return;
}
} //getState() == INITIAL
else
assert(stateOnEntry == OPENING);
if (openFlags & 0x4000) // Nowaited FILE_OPEN_
{
NABoolean completed;
do
{
completed = TRUE;
_bcc_status condCode;
if (getState() == INITIAL ||
env->getAllConnections()->getPendingIOs().isEsp() == FALSE)
{
if (tscoOpen_)
condCode = BAWAITIOXTS(&openFile_);
else
condCode = BAWAITIOX(&openFile_);
}
else
{
if (tscoOpen_)
condCode = BAWAITIOXTS(&openFile_, NULL, NULL, NULL, 10); // Wait a tenth of a second
else
condCode = BAWAITIOX(&openFile_, NULL, NULL, NULL, 10); // Wait a tenth of a second
}
if (_status_ne(condCode))
{
NABoolean openFailed = TRUE;
// get a Guardian error code
GuaErrorNumber getinfoError = BFILE_GETINFO_(openFile_,&guaErrorInfo_);
if (getinfoError != 0)
guaErrorInfo_ = getinfoError; // not even FILE_GETINFO_ worked
if (guaErrorInfo_ == GuaTimeoutErr && env->getAllConnections()->getPendingIOs().isEsp())
{
guaErrorInfo_ = XZFIL_ERR_OK;
((GuaReceiveControlConnection *)env->getControlConnection())->wait(IpcImmediately, env->getEventConsumed());
completed = FALSE;
continue;
}
if (openFailed)
{
short fsError = BFILE_CLOSE_(openFile_);
Int32 cpu, pin, nodeNumber;
SB_Int64_Type seqNum = -1;
GuaProcessHandle *otherEnd = (GuaProcessHandle *)&getOtherEnd().getPhandle().phandle_;
otherEnd->decompose(cpu, pin, nodeNumber
, seqNum
);
env->closeTrace(__LINE__, openFile_, cpu, pin, seqNum);
openFile_ = -1; // Don't leave valid file number in object!
setErrorInfo(-1);
setState(ERROR_STATE);
return;
}
}
}
while (completed == FALSE);
}
fileNumForIOCompletion_ = openFile_;
MXTRC_2("connection=%x, filenum=%d\n", this, openFile_);
// some day we may want to perform nowaited FILE_OPEN_ calls and add
// a method to "work" on the open.
// use setmode 74 to turn off the automatic CANCEL upon AWAITIOX timeout
_bcc_status stat = BSETMODE(openFile_,74,-1);
if (_status_ne(stat))
{
// get a Guardian error code
Int32 errcode2 = BFILE_GETINFO_(openFile_,&guaErrorInfo_);
if (errcode2 != 0)
guaErrorInfo_ = errcode2; // not even FILE_GETINFO_ worked
setErrorInfo(-1);
setState(ERROR_STATE);
return;
}
// use setmode 30 to allow I/O operations to finish in any order
stat = BSETMODE(openFile_,30,3);
if (_status_ne(stat))
{
// get a Guardian error code
Int32 errcode2 = BFILE_GETINFO_(openFile_,&guaErrorInfo_);
if (errcode2 != 0)
guaErrorInfo_ = errcode2; // not even FILE_GETINFO_ worked
setErrorInfo(-1);
setState(ERROR_STATE);
return;
}
// use setmode 117 if no transactions should be propagated to the server
if (NOT usesTransactions_)
{
_bcc_status stat = BSETMODE(openFile_,117,1);
if (_status_ne(stat))
{
// get a Guardian error code
Int32 errcode2 = BFILE_GETINFO_(openFile_,&guaErrorInfo_);
if (errcode2 != 0)
guaErrorInfo_ = errcode2; // not even FILE_GETINFO_ worked
setErrorInfo(-1);
setState(ERROR_STATE);
return;
}
# ifdef LOG_IPC
IpcGuaLogTimestamp(this);
cerr << "No transaction forwarding (control 117)" << endl;
# endif
}
# ifdef LOG_IPC
IpcGuaLogTimestamp(this);
cerr << "Open succeeded" << endl;
# endif
// the connection is established now
setState(ESTABLISHED);
clock_gettime(CLOCK_REALTIME, &completeOpenTime_);
}
void GuaConnectionToServer::closePhandle()
{
MXTRC_1("GCTS::closePhandle connection=%x", this);
//
// it's possible that some pending I/Os still remain on this connection.
// if IPC error occurs during query execution, there are three possible
// scenarios for a connection when it comes here:
//
// 1. this connection received ipc error and handled the error on the spot
// by calling one of the handleIOError() methods.
// 2. this connection did not receive error, but some other connection(s)
// on the same message stream received error. in this case we should
// invoke IpcMessageStream::abandonPendingIOs() to abort pending I/Os
// on all of stream's connections and invoke all necessary callbacks.
// 3. this connection did not receive error, neither did any other
// connections on the same message stream. in this case it's possible
// that nothing happens on this connection until the destructor is
// invoked. thus we need to clean up any pending I/Os on the connection.
// we should invoke delinkConnection() on the stream to trigger any
// book keepings needed, as delinkConnection() and receive callback
// should have the same book keeping logic.
//
// example for #3: we have a test case that kills esps while executing a
// long running query. the master has multiple send tops, with each send top
// having a message stream that includes only one data connection to a top
// level esp. if IPC error occurs on other connections (but not on this
// connection) as the result of dead esps, all sql operations are aborted
// from higher level and the master may never get a chance to call
// handleIOError() to clean up any pending I/Os on this connection. thus we
// need to release any msg buffers from the pending I/Os. but note that
// in case of multi-chunk message buffer, there may be multiple I/O entries
// pointing to the same message buffer. in that case each buffer should be
// released only once.
//
handleIOError();
// receive queue may not be empty. if so invoke receive callback.
// example:
// GuaConnectionToServer::setFatalError() invokes handleIOErrorForStream()
// that may have put message buffer on receive queue.
IpcMessageBuffer *receiveBuf;
while (receiveBuf = getNextReceiveQueueEntry())
receiveBuf->callReceiveCallback(this);
// Note that after closing, the connection is always in the initial
// state. This is the way to fix a connection in the error state.
// The close is always considered successful.
guaErrorInfo_ = GuaOK;
clearErrorInfo();
setState(INITIAL);
if (openFile_ != InvalidGuaFileNumber)
{
IpcEnvironment *env = getEnvironment();
NABoolean closeFile;
if (env->getPersistentOpens() && dataConnectionToEsp_ && self_ == FALSE && numOutstandingIOs_ == 0)
{
closeFile = FALSE;
short persistentIndex = env->getNewPersistentOpenIndex();
if (persistentIndex > -1)
{
GuaProcessHandle *otherEnd = (GuaProcessHandle *)&getOtherEnd().getPhandle().phandle_;
env->setPersistentOpenInfo(persistentIndex, otherEnd, openFile_);
Int32 cpu, pin, nodeNumber;
SB_Int64_Type seqNum = -1;
otherEnd = (GuaProcessHandle *)&getOtherEnd().getPhandle().phandle_;
otherEnd->decompose(cpu, pin, nodeNumber
, seqNum
);
env->closeTrace(__LINE__, openFile_, cpu, pin,
seqNum); // Persistent open simulated close
}
}
else
closeFile = TRUE;
if (closeFile)
{
_bcc_status status;
short lastError;
for (Int32 numOut = 0; numOut < numOutstandingIOs_; numOut++)
{
status = BCANCELREQ(openFile_);
if (_status_ne(status))
short retCode = BFILE_GETINFO_(openFile_, &lastError);
}
BFILE_CLOSE_(openFile_);
Int32 cpu, pin, nodeNumber;
SB_Int64_Type seqNum = -1;
GuaProcessHandle *otherEnd = (GuaProcessHandle *)&getOtherEnd().getPhandle().phandle_;
otherEnd->decompose(cpu, pin, nodeNumber
, seqNum
);
env->closeTrace(__LINE__, openFile_, cpu, pin, nodeNumber);
}
openFile_ = fileNumForIOCompletion_ = InvalidGuaFileNumber;
}
}
NABoolean GuaConnectionToServer::hasActiveIOs()
{
for (unsigned short i = 0; i < nowaitDepth_; i++)
{
ActiveIOQueueEntry &entry = activeIOs_[i];
if (entry.inUse_)
return TRUE;
}
return FALSE;
}
void GuaConnectionToServer::setFatalError(IpcMessageStreamBase *msgStream)
{
if (guaErrorInfo_ == GuaOK) // if error hasn't been set yet
guaErrorInfo_ = GuaIpcApplicationErr;
setState(ERROR_STATE);
// we must set error info to -1 so receive callback knows not to
// parse the potentially corrupted receive queue buffer.
setErrorInfo(-1);
// handleIOErrorForStream() may put the message buffer on receive queue.
// and that buffer may have the following content:
//
// - the actual full reply from server, or
// - the send buffer if send failed, or
// - partial send or reply if send/reply was multi-chunk and did not
// complete
//
handleIOErrorForStream(msgStream);
IpcConnection::setFatalError(msgStream);
}
void GuaConnectionToServer::addSendCallbackBuffer(IpcMessageBuffer *buffer)
{
for (unsigned short i = 0; i < nowaitDepth_; i++)
{
if (!sendCallbackBufferList_[i])
{
sendCallbackBufferList_[i] = buffer;
return;
}
}
}
NABoolean GuaConnectionToServer::removeSendCallbackBuffer(IpcMessageBuffer *buffer)
{
for (unsigned short i = 0; i < nowaitDepth_; i++)
{
if (sendCallbackBufferList_[i] == buffer)
{
sendCallbackBufferList_[i] = NULL;
return TRUE;
}
}
return FALSE;
}
void GuaConnectionToServer::handleIOError()
{
// connection no longer usable due to I/O error. abort all existing I/Os
// on this connection.
for (unsigned short i = 0; i < nowaitDepth_; i++)
{
ActiveIOQueueEntry &entry = activeIOs_[i];
if (entry.inUse_)
handleIOErrorForEntry(entry);
}
numOutstandingIOs_ = 0;
}
void GuaConnectionToServer::handleIOErrorForStream(IpcMessageStreamBase *msgStream)
{
// abort all existing I/Os on this connection that are from the given stream
for (unsigned short i = 0; i < nowaitDepth_; i++)
{
ActiveIOQueueEntry &entry = activeIOs_[i];
if (entry.inUse_ && entry.buffer_->getMessageStream() == msgStream)
handleIOErrorForEntry(entry);
}
}
//
// The I/O entry has a write buffer (entry.buffer_) and a read buffer
// (entry.readBuffer_). For entry description after message send, see comments
// at the top of tryToStartNewIO(). For entry description after receive reply,
// see comments at the top of wait().
//
void GuaConnectionToServer::handleIOErrorForEntry(ActiveIOQueueEntry &entry)
{
// I/O error occurred on given entry during send or receive
if (getState() != ERROR_STATE)
setState(ERROR_STATE);
if (getErrorInfo() == 0)
setErrorInfo(-1);
// abort all existing I/Os on the same message stream
// - what about I/Os on other streams?
for (unsigned short i = 0; i < nowaitDepth_; i++)
{
ActiveIOQueueEntry &nextEntry = activeIOs_[i];
if (nextEntry.inUse_ &&
nextEntry.buffer_->getMessageStream() ==
entry.buffer_->getMessageStream())
{
cleanUpActiveIOEntry(nextEntry);
// special case: for multi-chunk shared buffer, if nextEntry is
// first chunk and entry is any chunk after first chunk, we need
// to free the read buffer of the first chunk since all other chunks
// have their read buffers set to NULL.
if (nextEntry.readBuffer_ &&
nextEntry.readBuffer_ != nextEntry.buffer_)
// nextEntry is the first chunk of a multi-chunk shared buffer
nextEntry.readBuffer_->decrRefCount();
}
} // for i
// clear partial send/receive buffer on the same message stream so to
// prevent further I/Os.
if (partiallySentBuffer_ &&
partiallySentBuffer_->getMessageStream() ==
entry.buffer_->getMessageStream())
partiallySentBuffer_ = NULL;
else if (partiallyReceivedBuffer_ &&
partiallyReceivedBuffer_->getMessageStream() ==
entry.buffer_->getMessageStream())
partiallyReceivedBuffer_ = NULL;
// put the buffer on receive queue, regardless of send succeeded or not.
queueReceiveMessage(entry.buffer_);
// if entry.buffer_ is still on the sendCallbackBufferList_, then send
// callback has not been invoked for this connection and thus send failed.
NABoolean sendSuccess = !removeSendCallbackBuffer(entry.buffer_);
// in case of send failure invoke send callback
if (!sendSuccess)
entry.buffer_->callSendCallback(this);
// the design is to disallow any future i/o after connection become error
// state. so we should cleanup send queue by invoking send callback for
// all send buffers.
IpcMessageBuffer *sendBuffer;
while (sendBuffer = removeNextSendBuffer())
{
queueReceiveMessage(sendBuffer);
sendBuffer->callSendCallback(this);
}
}
void GuaConnectionToServer::cleanUpActiveIOEntry(ActiveIOQueueEntry &entry)
{
// abort if we are still waiting for message reply
if (entry.ioTag_ >= 0)
{
// if we come here with entry's I/O still outstanding, it means we're
// on the error handling path. so it's ok to abort/cancel the I/O.
//
// note that when CANCELREQ is called to cancel a request on a process
// file, the file system aborts the transaction associated with the
// process.
assert(getErrorInfo() != 0);
BCANCELREQ(openFile_, entry.ioTag_);
entry.ioTag_ = -1;
}
entry.inUse_ = false;
numOutstandingIOs_--;
}
void GuaConnectionToServer::dumpAndStopOtherEnd(bool dump, bool stop) const
{
getOtherEnd().getPhandle().dumpAndStop(dump, stop);
}
// -----------------------------------------------------------------------
// Methods for class GuaConnectionToClient
// -----------------------------------------------------------------------
GuaConnectionToClient::GuaConnectionToClient(
IpcEnvironment *env,
const IpcProcessId &clientProcId,
GuaFileNumber clientFileNumber,
GuaReceiveControlConnection *controlConnection,
const char *eye)
: IpcConnection(env,clientProcId,eye)
{
clientFileNumber_ = clientFileNumber;
guaErrorInfo_ = GuaOK;
controlConnection_ = controlConnection;
partiallyRepliedBuffer_ = NULL;
chunkBytesReplied_ = 0;
partiallyReceivedBuffer_ = NULL;
chunkBytesReceived_ = 0,
fileNumForIOCompletion_ = controlConnection->receiveFile_;
numOutstandingRequests_ = 0;
#if 0
receivedMsgHdr_ = (char *)env->getHeap()->allocateMemory(sizeof(MsgTraceEntry) * 8);
memset(receivedMsgHdr_, 0, sizeof(MsgTraceEntry) * 8);
receivedMsgHdrInd_ = 7;
#endif
}
GuaConnectionToClient::~GuaConnectionToClient()
{
#if 0
CollHeap *heap = getEnvironment()->getHeap();
heap->deallocateMemory((void *)receivedMsgHdr_);
#endif
}
bool GuaConnectionToClient::isServerSide()
{
return true;
}
void GuaConnectionToClient::send(IpcMessageBuffer *buffer)
{
if (buffer->getReplyTag() == GuaInvalidReplyTag)
ABORT("Need to wait for a request before replying for now");
queueSendMessage(buffer);
while (startReplyingToNextRequest())
;
}
void GuaConnectionToClient::receive(IpcMessageStreamBase *msg)
{
setState(RECEIVING);
// tell the control connection that we are ready to receive
controlConnection_->initiateReceive();
addReceiveCallback(msg);
// maybe the Guardian I/O has already completed and the buffer is
// waiting in the base class' receive queue
IpcMessageBuffer *receiveBuf;
while (receiveBuf = getNextReceiveQueueEntry())
{
// yes, so just call its callback
# ifdef LOG_RECEIVE
cerr << "Calling receive callback for queued request during receive()"
<< endl;
# endif
receiveBuf->callReceiveCallback(this);
}
}
WaitReturnStatus GuaConnectionToClient::wait(IpcTimeout timeout, UInt32 *eventConsumed, IpcAwaitiox *ipcAwaitiox)
{
MXTRC_FUNC("GCTC::wait");
MXTRC_1("timeout=%d\n", timeout);
// wait on the control connection for the specified timeout
WaitReturnStatus result = controlConnection_->wait(timeout, eventConsumed, ipcAwaitiox);
if (result)
{
// if an I/O completed, retry until no more I/Os can be completed
// without waiting
while (controlConnection_->wait(IpcImmediately, eventConsumed, ipcAwaitiox))
;
}
return WAIT_OK;
}
GuaConnectionToClient * GuaConnectionToClient::castToGuaConnectionToClient()
{
return this;
}
Int32 GuaConnectionToClient::numQueuedSendMessages()
{
return sendQueueEntries();
}
Int32 GuaConnectionToClient::numQueuedReceiveMessages()
{
return receiveQueueEntries();
}
void GuaConnectionToClient::populateDiagsArea(ComDiagsArea *&diags,
CollHeap *diagsHeap)
{
if (guaErrorInfo_ != GuaOK)
{
IpcAllocateDiagsArea(diags,diagsHeap);
*diags << DgSqlCode(-2033) << DgInt0(guaErrorInfo_)
<< DgNskCode(guaErrorInfo_);
getEnvironment()->getMyOwnProcessId(IPC_DOM_GUA_PHANDLE).
addProcIdToDiagsArea(*diags,0);
getOtherEnd().addProcIdToDiagsArea(*diags,1);
}
}
NABoolean GuaConnectionToClient::thisIsMyClient(
const GuaProcessHandle &phandle,
GuaFileNumber fileNo) const
{
return (clientFileNumber_ == fileNo AND
getOtherEnd().getPhandle() == phandle);
}
void GuaConnectionToClient::close(NABoolean withError,
GuaErrorNumber gerr)
{
if (numOutstandingRequests_ != 0)
{
withError = TRUE;;
}
if (getState() == RECEIVING)
{
controlConnection_->numReceivingConnections_--;
}
// set the state to CLOSED or ERROR_STATE, meaning that we are not connected
// CLOSED causes the collection to be deleted when it is safe (no recursion)
// ERROR_STATE causes it to be left around to provide debugging evidence
if (withError)
setState(ERROR_STATE);
else
setState(CLOSED);
guaErrorInfo_ = gerr;
// check if there are outstanding I/Os and raise an error if there are
IpcMessageBuffer *lostBuffer;
lostBuffer = partiallyRepliedBuffer_;
if (lostBuffer != NULL)
{
// clean up
partiallyRepliedBuffer_ = NULL;
chunkBytesReplied_ = 0;
// couldn't reply with all of the buffer, this is an error
setState(ERROR_STATE);
lostBuffer->callSendCallback(this);
lostBuffer->decrRefCount();
}
while ((lostBuffer = getNextSendQueueEntry()) != NULL)
{
// if a connection with outstanding I/Os gets closed then this
// is an error
setState(ERROR_STATE);
lostBuffer->callSendCallback(this);
lostBuffer->decrRefCount();
}
if (partiallyReceivedBuffer_)
{
// clean up, client must have known what it did when it stopped
// half way sending the buffer
partiallyReceivedBuffer_->decrRefCount();
partiallyReceivedBuffer_ = NULL;
chunkBytesReceived_ = 0;
}
// it's ok to have buffers in the receive queue, those are still
// available for their recipients to be read
// numRequestors counts the number of file opens on $RECEIVE and
// there is a 1:1 correlation between open connections and file opens
controlConnection_->decrNumRequestors();
// Indicates that a closed connect exists that should be found and deleted
// when there is no recursion
if (getState() == CLOSED)
getEnvironment()->getAllConnections()->incrDeleteCount();
// if there are outstanding I/Os on $RECEIVE, tell the control
// connection about them $$$$
}
NABoolean GuaConnectionToClient::startReplyingToNextRequest()
{
IpcMessageBuffer *buffer;
if (partiallyRepliedBuffer_)
return FALSE;
buffer = getNextSendQueueEntry();
if (buffer == NULL)
return FALSE;
// In Guardian, a send from the client to the server is called reply
// and it is done without locking the server... unless the reply is
// larger than the max. reply length, in which case we need to switch
// to the multiple chunk method
IpcMessageObjSize bytesToSend = buffer->getMessageLength();
if (bytesToSend > buffer->getMaxReplyLength())
{
// message has to be transported back in multiple chunks
assert(partiallyRepliedBuffer_ == NULL);
partiallyRepliedBuffer_ = buffer;
chunkBytesReplied_ = 0;
bytesToSend = buffer->getMaxReplyLength();
IOPending();
}
lastSentBuffer_ = buffer;
// send it off
controlConnection_->sendReplyData(buffer->data(0),
bytesToSend,
buffer->getReplyTag(),
this,
GuaOK);
decrNumOutstandingRequests();
if (partiallyRepliedBuffer_ AND
NOT (getState() == ERROR_STATE))
{
chunkBytesReplied_ += bytesToSend;
// we need to get another request from the client for the next chunk
controlConnection_->initiateReceive();
}
else
{
// The send operation has completed
// jdu 01/24/12 - need more work to get the message info right
// env()->addIpcMsgTrace(this, IpcEnvironment::RESPOND,
// (void *)buffer, buffer->getMessageLength(),
// 1, (UInt32) buffer->getReplyTag());
// Call the send callback (which does not take the buffer away)
buffer->callSendCallback(this);
// try to reuse this buffer another time
if (buffer->getRefCount() == 1)
{
controlConnection_->recycleReceiveBuffer(buffer);
}
else
{
ABORT("No other reuse of reply message buffers for now");
// buffer->decrRefCount(getEnvironment()); would be another option
}
}
return TRUE;
}
void GuaConnectionToClient::acceptBuffer(IpcMessageBuffer *buffer,
IpcMessageObjSize receivedDataLength)
{
// the length of the logical message that we are going to get
IpcMessageObjSize totalMessageLength;
incrNumOutstandingRequests();
// ---------------------------------------------------------------------
// Handle case of a multi-chunk reply first. If the incoming message
// is empty that means that the requestor is asking for additional
// chunks of a multi-chunk reply.
// ---------------------------------------------------------------------
if (receivedDataLength == 0)
{
if (partiallyRepliedBuffer_ != NULL AND
partiallyReceivedBuffer_ == NULL)
;
else
dumpAndStopOtherEnd(true, false);
assert(partiallyRepliedBuffer_ != NULL AND partiallyReceivedBuffer_ == NULL);
// Requestor is asking for more of the partial reply buffer.
// Reply with the next chunk.
IpcMessageObjSize nextChunkSize =
MINOF(partiallyRepliedBuffer_->getMessageLength() - chunkBytesReplied_,
buffer->getMaxReplyLength());
controlConnection_->sendReplyData(
partiallyRepliedBuffer_->data(chunkBytesReplied_),
nextChunkSize,
buffer->getReplyTag(),
this,
GuaOK);
decrNumOutstandingRequests();
chunkBytesReplied_ += nextChunkSize;
controlConnection_->recycleReceiveBuffer(buffer);
if (chunkBytesReplied_ >= partiallyRepliedBuffer_->getMessageLength())
{
// all of the message got sent, get rid of the oversized reply
// buffer and call the callback (as usual, save everything on
// the stack before calling the callback)
IOComplete();
IpcMessageBuffer *b = partiallyRepliedBuffer_;
partiallyRepliedBuffer_ = NULL;
chunkBytesReplied_ = 0;
b->callSendCallback(this);
b->decrRefCount();
}
else
{
// tell the control connection we need another one
controlConnection_->initiateReceive();
return;
}
}
// ---------------------------------------------------------------------
// Check out the situation: is the incoming data an entire message or
// is it just a chunk. If it's a chunk, is it the first or the last one?
// Switch from and to the chunk protocol, if necessary.
// ---------------------------------------------------------------------
else if (partiallyReceivedBuffer_ == NULL)
{
// this is the first (maybe the only) chunk of a new message
// unpack message header which contains total message length
InternalMsgHdrInfoStruct *msgHdr =
new( (IpcMessageObj*)(buffer->data(0)) )
InternalMsgHdrInfoStruct(NULL);
totalMessageLength = msgHdr->getMsgLengthFromData();
buffer->setMessageLength(totalMessageLength);
if (totalMessageLength == receivedDataLength)
{
// simplest case, single-chunk message
queueReceiveMessage(buffer);
setState(ESTABLISHED);
}
else
{
// total message len should never be less than received len
if (totalMessageLength <= receivedDataLength)
reportBadMessage();
assert(totalMessageLength > receivedDataLength);
// we only received part of the data, go and allocate a
// buffer that can hold all of it and switch to the chunk protocol
buffer->setMessageLength(receivedDataLength);
partiallyReceivedBuffer_ = buffer->resize(getEnvironment(),
totalMessageLength);
chunkBytesReceived_ = receivedDataLength;
partiallyReceivedBuffer_->setMessageLength(totalMessageLength);
}
}
else
{
// we're already in the chunky protocol (beyond 1st chunk),
// copy the additional data
totalMessageLength = partiallyReceivedBuffer_->getMessageLength();
if (chunkBytesReceived_ + receivedDataLength > totalMessageLength)
reportBadMessage();
assert(chunkBytesReceived_ + receivedDataLength <= totalMessageLength);
str_cpy_all(partiallyReceivedBuffer_->data(chunkBytesReceived_),
buffer->data(0),
receivedDataLength);
chunkBytesReceived_ += receivedDataLength;
// must reply with an empty message to secondary request chunks
controlConnection_->sendReplyData(NULL,
0,
buffer->getReplyTag(),
this,
GuaOK);
decrNumOutstandingRequests();
controlConnection_->recycleReceiveBuffer(buffer);
// We are done receiving the entire message if all the data has arrived.
if (chunkBytesReceived_ == totalMessageLength)
{
queueReceiveMessage(partiallyReceivedBuffer_);
setState(ESTABLISHED);
partiallyReceivedBuffer_ = NULL;
chunkBytesReceived_ = 0;
}
else
{
// tell the control connection we need another one
controlConnection_->initiateReceive();
}
}
// call callbacks for any matching message streams
IpcMessageBuffer *receiveBuf;
while (receiveBuf = getNextReceiveQueueEntry())
receiveBuf->callReceiveCallback(this);
// send any messages blocked by multi-chunk protocol
while (startReplyingToNextRequest())
;
}
void GuaConnectionToClient::dumpAndStopOtherEnd(bool dump, bool stop) const
{
getOtherEnd().getPhandle().dumpAndStop(dump, stop);
}
// -----------------------------------------------------------------------
// Methods for class GuaReceiveControlConnection
// -----------------------------------------------------------------------
GuaReceiveControlConnection::GuaReceiveControlConnection(
IpcEnvironment * env,
short receiveDepth,
const char *eye,
GuaReceiveFastStart *guaReceiveFastStart)
: IpcControlConnection(IPC_DOM_GUA_PHANDLE,eye),
clientConnections_(env->getAllConnections(),env->getHeap()),
failedConnections_(env->getAllConnections(),env->getHeap()),
receiveBufferPool_(env->getHeap()),
activeReceiveBuffers_(env->getHeap()),
initialized_(FALSE),
guaReceiveFastStart_(guaReceiveFastStart)
{
// This process was created by the Guardian procedure PROCESS_CREATE_
// and needs to open $RECEIVE to get its messages. All messages arrive
// through $RECEIVE and then get dispatched to the appropriate
// IpcConnection objects via a lookup. Any wait operation on any
// connection to a server may therefore accept messages for other
// client connections.
// initialize data members
env_ = env;
firstClientConnection_ = NULL;
numReceivingConnections_ = 0;
receiveFile_ = InvalidGuaFileNumber;
receiveDepth_ = receiveDepth;
maxIOSize_ = env_->getGuaMaxMsgIOSize();
maxOutstandingIOs_ = 1; // Guardian limit
numOutstandingIOs_ = 0;
numOutstandingRequests_ = 0;
beginTransTag_ = -1;
txHandleValid_ = FALSE;
memset (&txHandle_, 0, sizeof(SB_Transid_Type));
activeTransReplyTag_ = GuaInvalidReplyTag;
implicitTransReplyTag_ = GuaInvalidReplyTag;
userTransReplyTag_ = GuaInvalidReplyTag;
guaErrorInfo_ = GuaOK;
// now open $RECEIVE
if (guaReceiveFastStart_!= NULL && guaReceiveFastStart_->open_)
{
guaErrorInfo_ = guaReceiveFastStart_->openError_;
receiveFile_ = guaReceiveFastStart_->receiveFile_;
}
else
guaErrorInfo_ = BFILE_OPEN_((char *)"$RECEIVE",
8,
&receiveFile_,
0, // read-write
0, // shared
(short) maxOutstandingIOs_,
receiveDepth_,
0,0,0,0); // no options
if (guaErrorInfo_ != 0)
{
// We're in serious trouble, this process has just started
// and it can't open $RECEIVE. This means we have to die.
ABORT("Unable to open $RECEIVE");
}
MXTRC_2("GRCC::GRCC connection=%x, filenum=%d\n", this, receiveFile_);
// use setmode 74 to turn off the automatic CANCEL upon AWAITIOX timeout
if (guaReceiveFastStart_ == NULL)
{
_bcc_status stat = BSETMODE(receiveFile_,74,-1);
if (_status_ne(stat))
{
// this is bad
ABORT("Internal error on setmode($receive)");
}
}
// MONITORNET is currently not available on NT and
// it is not needed until there is support for multiple nsk expand nodes
// now initiate the first READUPDATEX operation (which will complete
// with an open message), even if we don't have a connection yet
initiateReceive(TRUE);
}
IpcConnection * GuaReceiveControlConnection::getConnection() const
{
return firstClientConnection_;
}
GuaReceiveControlConnection *
GuaReceiveControlConnection::castToGuaReceiveControlConnection()
{
return this;
}
WaitReturnStatus GuaReceiveControlConnection::wait(IpcTimeout timeout, UInt32 *eventConsumed, IpcAwaitiox *ipcAwaitiox)
{
MXTRC_FUNC("GRCC::wait");
MXTRC_1("timeout=%d\n", timeout);
// ---------------------------------------------------------------------
// call AWAITIOX with the specified timeout
// ---------------------------------------------------------------------
IpcMessageBufferPtr bufferAddr = NULL;
short msgType = 0;
NABoolean controlReceived = FALSE;
Int32 countTransferred;
SB_Tag_Type ioTag = -1;
NABoolean systemMessageReceived;
// don't call AWAITIOX unless there are outstanding I/Os
if (numOutstandingIOs_ == 0)
{
if (timeout > 0) // is GuaConnectionToClient and it returns too soon
usleep(timeout * 10000); // Delay here instead before returning if
return WAIT_OK;
}
GuaErrorNumber retcode = GuaOK;
NABoolean retry = TRUE;
NABoolean setFirstClientToNull = FALSE;
while (retry)
{
if(initialized_ || timeout != IpcInfiniteTimeout){
_cc_status stat;
if (ipcAwaitiox == NULL || !ipcAwaitiox->getCompleted())
{
stat = BAWAITIOX(&receiveFile_,
(void **) &bufferAddr,
&countTransferred,
&ioTag,
timeout,
OMIT);
}
else
{
stat = ipcAwaitiox->ActOnAwaitiox((void **)&bufferAddr,
&countTransferred,
&ioTag);
}
if (_status_ne(stat))
retcode = BFILE_GETINFO_(receiveFile_,&guaErrorInfo_);
else
retcode = guaErrorInfo_ = GuaOK;
}
else { // not initialized && infinite timeout
// Set the timeout to 1 min
Lng32 newTimeOut= 100*60*1;
NABoolean done = FALSE;
while(!done){
_cc_status stat;
if (guaReceiveFastStart_ != NULL && guaReceiveFastStart_->awaitiox_)
{
guaReceiveFastStart_->awaitiox_ = FALSE;
stat = guaReceiveFastStart_->awaitioxStatus_;
bufferAddr = (char *)&guaReceiveFastStart_->readBuffer_[0];
countTransferred = guaReceiveFastStart_->awaitioxCountTransferred_;
ioTag = guaReceiveFastStart_->ioTag_;
}
else
{
stat = BAWAITIOX(&receiveFile_,
(void **) &bufferAddr,
&countTransferred,
&ioTag,
newTimeOut);
}
if (guaReceiveFastStart_ != NULL && guaReceiveFastStart_->bufferData_ != NULL)
{
memcpy((char *)guaReceiveFastStart_->bufferData_, (char *)bufferAddr, countTransferred);
bufferAddr = (IpcMessageBufferPtr)guaReceiveFastStart_->bufferData_;
retcode = guaReceiveFastStart_->fileGetInfoError_;
guaErrorInfo_ = guaReceiveFastStart_->awaitioxError_;
guaReceiveFastStart_->bufferData_ = NULL;
}
else
{
if (_status_ne(stat))
retcode = BFILE_GETINFO_(receiveFile_,&guaErrorInfo_);
else
retcode = guaErrorInfo_ = GuaOK;
}
if(guaErrorInfo_ != GuaTimeoutErr)
{
// we received something or some other error than time-out has ocurred
env_->setEvent(TRUE, AEVENT);
done = true;
}
} // while
} // else
#ifdef LOG_WAIT_TIMEOUT
IpcGuaLogTimestamp((IpcConnection *) NULL);
cerr << "GRCC:timeout = " << timeout << " ioTag = " << ioTag << endl;
#endif
if (!((guaErrorInfo_== 4004) && !env_->breakEnabled()))
{
// Not to retry unless error is FE_EINTR (4004)
retry = FALSE;
if (retcode != GuaOK)
guaErrorInfo_ = retcode; // not even FILE_GETINFO_ worked
systemMessageReceived = (guaErrorInfo_ == GuaSysmsgReceived);
if (systemMessageReceived)
msgType = *((short *) bufferAddr);
if (guaErrorInfo_ == GuaTimeoutErr)
{
// ----------------------------------------------------------
// AWAITIOX timed out, nothing to do here
// ----------------------------------------------------------
return WAIT_OK;
}
// ----------------------------------------------------------------
// Check for fatal errors (if we fail here or while reading the
// receive info we abort, since there doesn't seem any reasonable
// error recovery for such errors)
// ---------------------------------------------------------------
if (guaErrorInfo_ != GuaOK AND
guaErrorInfo_ != GuaSysmsgReceived)
{
// Are there any cases where we need to retry? $$$$
// Error recovery from this?
ABORT("Fatal error in AWAITIOX($RECEIVE)");
}
}
} // while
// ---------------------------------------------------------------------
// call FILE_GETRECEIVEINFO_ to find out about the client
// ---------------------------------------------------------------------
GuaReceiveInfo receiveInfo;
if (guaReceiveFastStart_ != NULL && guaReceiveFastStart_->fileGetReceiveInfo_)
{
guaErrorInfo_ = guaReceiveFastStart_->fileGetReceiveInfoError_;
memcpy((char *)&receiveInfo, &guaReceiveFastStart_->receiveInfo_, sizeof(GuaReceiveInfo));
guaReceiveFastStart_->fileGetReceiveInfo_ = FALSE;
}
else
guaErrorInfo_ = BFILE_GETRECEIVEINFO_((FS_Receiveinfo_Type *)&receiveInfo);
if (systemMessageReceived && (msgType == ZSYS_VAL_SMSG_CLOSE))
{
Int32 cpu, pin, nodeNumber;
SB_Int64_Type seqNum = -1;
receiveInfo.phandle_.decompose(cpu, pin, nodeNumber
, seqNum
);
env_->closeTrace(__LINE__, receiveInfo.clientFileNumber_, cpu, pin, nodeNumber);
}
if (guaErrorInfo_ != GuaOK)
{
ABORT("Fatal error in FILE_GETRECEIVEINFO_");
}
// ---------------------------------------------------------------------
// Successfully received a message and we know now where it came from
// ---------------------------------------------------------------------
// this nowait I/O just completed
numOutstandingIOs_--;
// we already assume that the message will get delivered to some connection
numReceivingConnections_--;
// eventually we'll have to reply to this
numOutstandingRequests_++;
// we got a new transaction id as a result of the completed read on
// $RECEIVE, switch back to the explicitly selected transaction of the user.
// Note: If either request lth or reply lth is zero then IO is for secondary
// chunks of multi chunk msg, ignore currently received trans reply tag.
// Save last non-chunk message reply tag to restore implicit transaction
// context after replying because REPLYX looses current trans context.
activeTransReplyTag_ = receiveInfo.replyTag_;
if (countTransferred && receiveInfo.maxReplyLen_)
{
implicitTransReplyTag_ = activeTransReplyTag_;
}
switchToUserTransid();
#ifdef LOG_RECEIVE
Int64 jts = JULIANTIMESTAMP();
MyGuaProcessHandle me;
IpcProcessId other(receiveInfo.phandle_);
char meAsAscii[200];
char otherAsAscii[200];
me.toAscii(meAsAscii,200);
other.toAscii(otherAsAscii,200);
cerr << "(" <<
// NT has problems printing out an Int64
(ULng32) jts
<< "): " << meAsAscii << " from " << otherAsAscii
<< "(" << receiveInfo.clientFileNumber_ << ") "
<< "Received " << countTransferred << " bytes with max reply len "
<< receiveInfo.maxReplyLen_
<< endl;
#endif /* LOG_RECEIVE */
// redrive the READUPDATE process to see whether we can start another I/O
initiateReceive(FALSE);
// ---------------------------------------------------------------------
// find the buffer in the list of outstanding receive buffers
// (only one entry at this time, because of Guardian limits)
// ---------------------------------------------------------------------
IpcMessageBuffer *receivedBuffer = NULL;
for (CollIndex i = 0;
i < activeReceiveBuffers_.entries() AND receivedBuffer == NULL;
i++)
{
if (activeReceiveBuffers_[i]->data(0) == bufferAddr)
{
// found it
receivedBuffer = activeReceiveBuffers_[i];
activeReceiveBuffers_.remove(receivedBuffer);
}
}
#ifdef LOG_RECEIVE
cerr << "Found the active receive buffer " << (Lng32) receivedBuffer << endl;
#endif
if (receivedBuffer == NULL)
{
// couldn't find the buffer
ABORT("Internal error: receive buffer not found");
// could also reply with error but this is an indicator for a
// grave error somewhere
}
// ---------------------------------------------------------------------
// Find the connection (if possible)
// ---------------------------------------------------------------------
//TBD
GuaConnectionToClient *conn = findConnection(receiveInfo.clientFileNumber_, receiveInfo.phandle_);
if (countTransferred > 0 && systemMessageReceived == FALSE && conn->newClientConnection(receivedBuffer) == TRUE)
{
if (conn)
{
Int32 cpu, pin, nodeNumber;
SB_Int64_Type seqNum = -1;
receiveInfo.phandle_.decompose(cpu, pin, nodeNumber
, seqNum
);
env_->closeTrace(__LINE__, receiveInfo.clientFileNumber_,
cpu, pin, seqNum); // Persistent open simulated close
conn->close();
clientConnections_ -= conn->getId();
//delete conn;
if (conn == firstClientConnection_)
setFirstClientToNull = TRUE;
}
conn = new(env_->getHeap()) GuaConnectionToClient(
env_,
IpcProcessId(receiveInfo.phandle_),
receiveInfo.clientFileNumber_,
this);
if (firstClientConnection_ == NULL)
firstClientConnection_ = conn;
clientConnections_ += conn->getId();
incrNumRequestors();
msgType = ZSYS_VAL_SMSG_OPEN;
actOnSystemMessage(msgType,
NULL, // buffer Address
0, // count transferred
receiveInfo.clientFileNumber_,
receiveInfo.phandle_,
conn);
}
// ---------------------------------------------------------------------
// Now we got all the info we need: the buffer, who sent it, and what
// connection it is for (if it is for any connection). Next thing to
// do is to process the received data.
// ---------------------------------------------------------------------
if (systemMessageReceived)
{
// -----------------------------------------------------------------
// received a system message
// -----------------------------------------------------------------
// a system message doesn't have the usual header, its type is
// delivered in the first 2 bytes instead
# ifdef LOG_RECEIVE
cerr << "System message received: " << msgType << endl;
# endif
MXTRC_1("System message received, msgType=%d\n", msgType);
NABoolean repliedToSystemMessage = FALSE;
// by default, reject any system requests (like CONTROL, etc.)
GuaErrorNumber sysMsgRetcode = GuaOK;
// switch on the system message type
switch (msgType)
{
case ZSYS_VAL_SMSG_CPUDOWN:
{
// a local CPU went down, mark all clients that are
// on that CPU as dead
zsys_ddl_smsg_cpudown_def *msg =
(zsys_ddl_smsg_cpudown_def *) bufferAddr;
IpcNodeName myNodeName =
IpcProcessId(MyGuaProcessHandle()).getNodeName();
for (CollIndex i = 0; clientConnections_.setToNext(i); i++)
{
GuaConnectionToClient *c =
clientConnections_.element(i)->
castToGuaConnectionToClient();
if (c != NULL AND
c->getOtherEnd().match(myNodeName,msg->z_cpunumber))
{
markAsDead(c,GuaClientCpuDown);
if (c == firstClientConnection_)
{
setFirstClientToNull = TRUE;
// ALM CR 5373 - If we get CPU down system
// message before close system message,
// we need to let make sure the
// EspGuaControlConnection::actOnSystemMessage
// gets a chance to stop this process
// before we set the firstClientConnection_
// to NULL at the end of this method.
// Do this by pretending this system
// message came from the firstClientConnection_.
if ( env_->getAllConnections()->
getPendingIOs().isEsp() )
{
conn = (GuaConnectionToClient *)firstClientConnection_;
/* if (firstClientConnection_)
SQLMXLoggingArea::logExecRtInfo(
__FILE__, __LINE__,
"Processed CPU down system message, "
"firstClientConnection_ not NULL", 0);
else
SQLMXLoggingArea::logExecRtInfo(
__FILE__, __LINE__,
"Processed CPU down system message, "
"but firstClientConnection_ was NULL", 0);
*/
}
}
}
}
}
break;
case ZSYS_VAL_SMSG_REMOTECPUDOWN:
{
// a remote CPU went down, mark all clients that are
// on that CPU as dead
zsys_ddl_smsg_remotecpudown_def *msg =
(zsys_ddl_smsg_remotecpudown_def *) bufferAddr;
IpcCpuNum remoteCpu = msg->z_cpunumber;
// null-terminate the node name (this may overwrite
// over the struct, but what the heck, we know we
// have used a very long receive buffer)
msg->z_nodename[msg->z_nodename_len] = 0;
// create an IpcNodeName from it
IpcNodeName remoteNodeName(IPC_DOM_GUA_PHANDLE,
&msg->z_nodename[1]);
for (CollIndex i = 0; clientConnections_.setToNext(i); i++)
{
GuaConnectionToClient *c =
clientConnections_.element(i)->
castToGuaConnectionToClient();
if (c != NULL AND
c->getOtherEnd().match(remoteNodeName,
remoteCpu))
{
markAsDead(c,GuaClientCpuDown);
if (c == firstClientConnection_)
setFirstClientToNull = TRUE;
}
}
}
break;
case XZSYS_VAL_SMSG_SHUTDOWN:
{
NAExit(0);
}
break;
case ZSYS_VAL_SMSG_OPEN:
{
char otherAscii[200];
receiveInfo.phandle_.toAscii(otherAscii, 200);
// open message: create a new connection
#ifndef NDEBUG
// This error injection code *could* work just fine in
// release code, but for performance reasons, we only
// have it for DEBUG.
const short injectedError = 48;
if (fakeErrorFromNSK(injectedError, &receiveInfo.phandle_))
{
sendReplyData(NULL,0,receiveInfo.replyTag_,NULL,injectedError);
return WAIT_OK;
}
#endif
// create a new Guardian connection to the client who
// is opening us
if (conn)
{
conn->close();
clientConnections_ -= conn->getId();
//delete conn;
if (conn == firstClientConnection_)
setFirstClientToNull = TRUE;
}
conn = new(env_->getHeap()) GuaConnectionToClient(
env_,
IpcProcessId(receiveInfo.phandle_),
receiveInfo.clientFileNumber_,
this);
MXTRC_4("GRCC::wait new connection=%x id=%d info=%s.%d\n", conn, conn->getId(), otherAscii, receiveInfo.clientFileNumber_);
// reply to the open message right here and set the
// open label to the id of the new connection
zsys_ddl_smsg_open_reply_def openReply;
openReply.z_msgnumber = ZSYS_VAL_SMSG_OPEN;
openReply.z_openid = (short) conn->getId();
sendReplyData((IpcMessageBufferPtr) &openReply,
controlReceived ? 0 : sizeof(openReply),
receiveInfo.replyTag_,
NULL,
GuaOK);
repliedToSystemMessage = TRUE;
// if this is the first client then remember it
if (firstClientConnection_ == NULL)
firstClientConnection_ = conn;
clientConnections_ += conn->getId();
incrNumRequestors();
}
break;
case ZSYS_VAL_SMSG_CLOSE:
{
// close message, remove the corresponding connection
//zsys_ddl_smsg_close_def *msg =
//(zsys_ddl_smsg_close_def *) bufferAddr;
MXTRC_1("GRCC::wait close message: conn=%x\n", conn);
if (conn)
{
conn->close();
clientConnections_ -= conn->getId();
//delete conn;
if (conn == firstClientConnection_)
setFirstClientToNull = TRUE;
}
// - do we need to reply to the CLOSE request?
//repliedToSystemMessage = TRUE;
}
break;
case ZSYS_VAL_SMSG_NODEDOWN:
{
// Node went down, all clients from that node are dead
zsys_ddl_smsg_nodedown_def *msg =
(zsys_ddl_smsg_nodedown_def *) bufferAddr;
// null-terminate the node name (this may overwrite
// over the struct, but what the heck, we know we
// have used a very long receive buffer)
msg->z_nodename[msg->z_nodename_len] = 0;
// create an IpcNodeName from it
IpcNodeName remoteNodeName(IPC_DOM_GUA_PHANDLE,
&msg->z_nodename[1]);
for (CollIndex i = 0; clientConnections_.setToNext(i); i++)
{
GuaConnectionToClient *c =
clientConnections_.element(i)->
castToGuaConnectionToClient();
if (c != NULL AND
c->getOtherEnd().match(remoteNodeName))
{
markAsDead(c,GuaClientNodeDown);
if (c == firstClientConnection_)
setFirstClientToNull = TRUE;
}
}
}
break;
default:
// don't care about other messages, if they are requests
// to do something then make sure we reject that request
sysMsgRetcode = GuaInvalidFileType;
break;
}
// the tdm_service will die if we stop before we reply
// in some cases, actOnSystemMessage calls Exit(0);
// this is an error prone fix, if actOnSystemMessage ever wants to reply itself, it would be too late
//
// if we haven't replied in the individual case
// then reply with an empty message
if (NOT repliedToSystemMessage)
sendReplyData(NULL,0,receiveInfo.replyTag_,NULL,sysMsgRetcode);
// now let any derived class do its thing with the system message
// (treat this as a callback and return from wait() calls)
// if it's not a close message from an orphan file
if (!(msgType == ZSYS_VAL_SMSG_CLOSE && controlReceived == FALSE && conn == NULL ))
{
env_->getAllConnections()->bumpCompletionCount();
actOnSystemMessage(msgType,
bufferAddr,
countTransferred,
receiveInfo.clientFileNumber_,
receiveInfo.phandle_,
conn);
}
// we're done with the received buffer
recycleReceiveBuffer(receivedBuffer);
// initiate a new receive operation for the next system message
initiateReceive();
}
else
{
// -----------------------------------------------------------------
// got a message in one of the buffers that we know, now
// dispatch it to the connection that it belongs to
// -----------------------------------------------------------------
receivedBuffer->setReplyTag(receiveInfo.replyTag_);
receivedBuffer->setMaxReplyLength(receiveInfo.maxReplyLen_);
if (conn != NULL)
{
// -------------------------------------------------------------
// Now we've found the connection that is supposed to
// receive this message (recipientConn becomes owner of buffer)
// -------------------------------------------------------------
if (countTransferred >= sizeof(InternalMsgHdrInfoStruct))
{
InternalMsgHdrInfoStruct *imhis = (InternalMsgHdrInfoStruct *)
bufferAddr;
conn->env()->addIpcMsgTrace(conn, IpcEnvironment::ACCEPT,
(void *)bufferAddr, countTransferred,
(imhis->isLastMsgBuf()? 1: 0),
imhis->getSeqNum());
#if 0
conn->incrReceivedMsgHdrInd();
MsgTraceEntry *msgTraceEntry = (MsgTraceEntry *)(conn->receivedMsgHdr() + sizeof(MsgTraceEntry) * conn->receivedMsgHdrInd());
memcpy((void *)&msgTraceEntry->internalMsgHdrInfoStruct_, (void *)bufferAddr, sizeof(InternalMsgHdrInfoStruct));
msgTraceEntry->bufAddr_ = (void *)bufferAddr;
msgTraceEntry->sentReceivedLength_ = (unsigned int)countTransferred;
#endif
}
conn->acceptBuffer(receivedBuffer,countTransferred);
}
else if (env_->isPersistentProcess())
{
// A recreated persistent process can receive messages
// from an open of a previous instance of the process
// see bug 1997, 2468 and 2469
sendReplyData(NULL,0,receiveInfo.replyTag_,
NULL,FEWRONGID);
recycleReceiveBuffer(receivedBuffer);
initiateReceive();
}
else
{
// couldn't find the connection to the client, reply with
// a special error in the hope that this won't cause a deadlock
// $$$$ should we abort instead? This looks like a bad error.
// (Current reason for not just aborting is that we believe that
// some open connections may have been destroyed at user's request)
receiveInfo.phandle_.dumpAndStop(true, false);
sendReplyData(NULL,0,receiveInfo.replyTag_,
NULL,GuaIpcApplicationErr);
recycleReceiveBuffer(receivedBuffer);
initiateReceive();
ABORT("Couldn't find connection to client"); // for now, debug this
}
}
if (setFirstClientToNull)
firstClientConnection_ = NULL;
return WAIT_INTERRUPT;
}
void GuaReceiveControlConnection::actOnSystemMessage(
short messageNum,
IpcMessageBufferPtr /*sysMsg*/,
IpcMessageObjSize /*sysMsgLen*/,
short /*clientFileNumber*/,
const GuaProcessHandle & /*clientPhandle*/,
GuaConnectionToClient *connection)
{
// The default implementation ignores all system messages, except that
// it makes sure that only one client opens the process.
if (getNumRequestors() > 1)
{
if (firstClientConnection_)
{
connection->dumpAndStopOtherEnd(true, false);
if (firstClientConnection_->getOtherEnd() ==
connection->getOtherEnd().getPhandle())
; // already have a core.
else
firstClientConnection_->dumpAndStopOtherEnd(true, false);
}
ABORT("More than one OPEN system message received");
}
else if (getNumRequestors() == 0 AND initialized_)
{
// in the default implementation the server stops if its client
// goes away
// for debugging it is sometimes helpful to print a message for this
// ABORT("Lost connection to client");
#ifdef LOG_RECEIVE
cerr << "No requestors exist. About to call NAExit()..." << endl;
#endif
NAExit(0);
}
else if (NOT initialized_ AND getNumRequestors() > 0)
{
// the first requestor came in
initialized_ = TRUE;
}
// This method should be overridden in derived classes if a process
// wants to handle more than one client. The derived class needs to
// assign a task to each newly created connection that is passed with
// an open message. The derived class also needs to decide what to
// do when it loses a client process.
}
void GuaReceiveControlConnection::sendReplyData(
IpcMessageBufferPtr data,
IpcMessageObjSize size,
short replyTag,
#ifdef LOG_RECEIVE
IpcConnection *conn,
#else
IpcConnection *, // avoid compiler warning
#endif
GuaErrorNumber retcodeToClient)
{
// must call this for a single chunk
assert(size <= maxIOSize_ AND
replyTag != GuaInvalidReplyTag);
// call REPLYX
Int32 countWritten;
_cc_status stat;
if (guaReceiveFastStart_ != NULL && guaReceiveFastStart_->replyx_)
{
stat = guaReceiveFastStart_->replyxstatus_;
countWritten = guaReceiveFastStart_->replyxCountWritten_;
guaReceiveFastStart_->replyx_ = FALSE;
}
else
{
// Reset the original transaction, if one exists.
if (txHandleValid_)
TMF_SETTXHANDLE_((short *)&txHandle_);
stat = BREPLYX(data,
size,
&countWritten,
replyTag
,retcodeToClient
);
}
if (_status_ne(stat) OR (ULng32)countWritten != size)
{
// get a Guardian error code
GuaErrorNumber errcode2 = BFILE_GETINFO_(receiveFile_,&guaErrorInfo_);
if (errcode2 != GuaOK)
guaErrorInfo_ = errcode2;
// sorry, if something goes wrong here we have no way to let
// the client or master know about it, all we can do is to die
char buf[100];
str_sprintf(buf, "REPLYX returned error %d", (Int32) guaErrorInfo_);
ABORT(buf);
// don't die in cases where the client caused the fault (if any)
}
// we have one less outstanding REPLYX
numOutstandingRequests_--;
// REPLYX loses the current transaction id, restore the user-defined one,
// unless we did a reply on the user-defined current transaction
activeTransReplyTag_ = GuaInvalidReplyTag;
if (replyTag == implicitTransReplyTag_)
implicitTransReplyTag_ = GuaInvalidReplyTag;
if (replyTag == userTransReplyTag_)
userTransReplyTag_ = GuaInvalidReplyTag;
switchToUserTransid();
#ifdef LOG_RECEIVE
if (conn)
IpcGuaLogTimestamp(conn);
else
cerr << "Without use of a connection: ";
cerr << "Replying with " << countWritten << " bytes, tag " << replyTag
<< ", err " << retcodeToClient
<< endl;
#endif
}
void GuaReceiveControlConnection::initiateReceive(NABoolean newReceive)
{
MXTRC_FUNC("GRCC::initiateReceive");
Int32 count_read = 0;
if (newReceive)
{
// A connection specifies TRUE when it initially calls this;
// newReceive is set to FALSE when we simply want to start
// a READUPDATEX call that hasn't been started earlier due to
// the maxOutstandingIOs_ limit.
numReceivingConnections_++;
}
// limit the number of outstanding IOs to the specified maximum.
if (numOutstandingIOs_ >= maxOutstandingIOs_ OR
numOutstandingIOs_ + numOutstandingRequests_ >= receiveDepth_)
return;
// get a previously used buffer or allocate a new one
IpcMessageBuffer *buffer = NULL;
// hunt for a free receive buffer from the pool of recycled ones
for (CollIndex i = 0;
buffer == NULL AND i < receiveBufferPool_.entries();
i++)
{
if (receiveBufferPool_[i]->getRefCount() == 1)
{
buffer = receiveBufferPool_[i];
receiveBufferPool_.removeAt(i);
}
}
if (buffer == NULL)
{
CollHeap *heap = (env_ ? env_->getHeap() : NULL);
buffer = IpcMessageBuffer::allocate(maxIOSize_, NULL, heap, 0);
if (buffer == NULL)
{
ABORT("Out of memory while allocating a receive buffer");
}
}
// insert the buffer into the list of buffers that have outstanding
// I/Os
activeReceiveBuffers_.insert(buffer);
// call READUPDATEX
_cc_status stat;
if (guaReceiveFastStart_ != NULL && guaReceiveFastStart_->readUpdate_)
{
guaReceiveFastStart_->readUpdate_ = FALSE;
stat = guaReceiveFastStart_->readUpdateStatus_;
guaReceiveFastStart_->bufferData_ = (unsigned char *)buffer->data(0);
}
else
{
stat = BREADUPDATEX(
receiveFile_,
(char *) buffer->data(0),
(MINOF(buffer->getBufferLength(),maxIOSize_)),
&count_read
);
}
if (_status_ne(stat))
{
// get a Guardian error code
short errcode2 = BFILE_GETINFO_(receiveFile_,&guaErrorInfo_);
// sorry, if something goes wrong here we have no way to let
// the client or master know about it, all we can do is to die
ABORT("Error in READUPDATEX");
}
else
{
// adjust the number of outstanding READUPDATEX operations
numOutstandingIOs_++;
}
}
void GuaReceiveControlConnection::switchToUserTransid()
{
}
void GuaReceiveControlConnection::setOriginalTransaction(short *txHandle)
{
memcpy(&txHandle_, txHandle, sizeof(SB_Transid_Type));
}
short * GuaReceiveControlConnection::getOriginalTransaction()
{
return (short *)&txHandle_;
}
void GuaReceiveControlConnection::clearOriginalTransaction()
{
memset(&txHandle_, 0, sizeof(SB_Transid_Type));
}
GuaConnectionToClient * GuaReceiveControlConnection::findConnection(
short openLabel)
{
// we were clever enough to let Guardian remember the connection
// id for us in the open label (alternatively, we could find
// the connection that matches the given phandle/file#)
if (clientConnections_.contains(openLabel))
{
return
clientConnections_.element(openLabel)->castToGuaConnectionToClient();
}
MXTRC("GRCC::findConnection false\n");
return NULL;
}
GuaConnectionToClient * GuaReceiveControlConnection::findConnection(
short clientFileNumber,
const GuaProcessHandle &clientPhandle)
{
// search all connections for a match
for (CollIndex i = 0; clientConnections_.setToNext(i); i++)
{
GuaConnectionToClient *c =
clientConnections_.element(i)->castToGuaConnectionToClient();
if (c != NULL AND c->thisIsMyClient(clientPhandle,clientFileNumber))
return c;
}
return NULL;
}
void GuaReceiveControlConnection::recycleReceiveBuffer(IpcMessageBuffer *b)
{
if (b->getBufferLength() == maxIOSize_)
{
// this buffer has the right length to be kept in the receive buffer pool
b->setReplyTag(GuaInvalidReplyTag);
b->setMaxReplyLength(0);
b->addCallback(NULL);
receiveBufferPool_.insert(b);
}
else
{
// good bye
b->decrRefCount();
}
}
void GuaReceiveControlConnection::markAsDead(GuaConnectionToClient *c,
GuaErrorNumber gerr)
{
// this connection is no longer part of the set of good client
// connections
clientConnections_ -= c->getId();
failedConnections_ += c->getId();
// tell the connection, too
c->close(TRUE,gerr);
if (clientConnections_.isEmpty())
getEnv()->notifyNoOpens();
}
void GuaReceiveControlConnection::waitForMaster()
{
int openWaitSeconds = 600;
const char* owsEnvVar = getenv("SQL_SRVR_OPEN_WAIT_SECONDS");
if (owsEnvVar)
{
int o = atoi(owsEnvVar);
if (o > 0)
openWaitSeconds = o;
}
int maxWaitTime = openWaitSeconds;
do {
struct timespec startedOpenWaitTs;
if (clock_gettime(CLOCK_MONOTONIC, &startedOpenWaitTs))
{
char buf[256];
str_sprintf(buf, "clock_gettime failed, errno %d", errno);
ABORT(buf);
}
Int64 timeStart = ComRtGetJulianFromUTC(startedOpenWaitTs);
wait(100 * openWaitSeconds);
if (getConnection() != NULL)
break;
struct timespec nowOpenWaitTs;
if (clock_gettime(CLOCK_MONOTONIC, &nowOpenWaitTs))
{
char buf[256];
str_sprintf(buf, "clock_gettime failed, errno %d", errno);
ABORT(buf);
}
Int64 timeNow = ComRtGetJulianFromUTC(nowOpenWaitTs);
openWaitSeconds -= ((timeNow - timeStart) / (1000 * 1000));
} while (openWaitSeconds > 0);
if (getConnection() == NULL)
{
char msg[256];
sprintf(msg,
"Server exiting after waiting %d seconds for initial open.",
maxWaitTime);
SQLMXLoggingArea::logExecRtInfo(__FILE__, __LINE__, msg, 0);
NAExit(0);
}
env_->setStopAfter(maxWaitTime);
}
// -----------------------------------------------------------------------
// Methods for class IpcGuardianServer
// -----------------------------------------------------------------------
IpcGuardianServer::IpcGuardianServer(
IpcServerClass * serverClass,
ComDiagsArea ** /* diags */,
CollHeap * /* diagsHeap */,
const char * nodeName,
const char * className,
IpcCpuNum cpuNum,
IpcServerAllocationMethod allocMethod,
short uniqueTag,
NABoolean usesTransactions,
NABoolean debugServer,
NABoolean waitedStartup,
Lng32 maxNowaitRequests,
const char * overridingDefineForProgFile,
const char * processName,
NABoolean parallelOpens) : IpcServer(NULL,
serverClass)
{
serverState_ = INITIAL;
nodeName_ = NULL;
className_ = className;
cpuNum_ = cpuNum;
actualCpuNum_ = cpuNum;
requestedCpuDown_ = FALSE;
allocMethod_ = allocMethod;
uniqueTag_ = uniqueTag;
usesTransactions_ = usesTransactions;
debugServer_ = debugServer;
waitedStartup_ = waitedStartup;
nowaitDepth_ = (unsigned short) maxNowaitRequests;
overridingDefineForProgFile_ = overridingDefineForProgFile;
processName_ = processName;
parallelOpens_ = parallelOpens,
guardianError_ = 0;
procCreateError_ = 0;
procCreateDetail_ = 0;
activeMessage_ = NULL;
nowaitedEspStartup_.nowaitedEspServer_ = &getServerClass()->nowaitedEspServer_;
nowaitedEspStartup_.procCreateError_ = &procCreateError_;
nowaitedEspStartup_.newPhandle_ = &newPhandle_;
nowaitedEspStartup_.nowaitedStartupCompleted_ = &nowaitedStartupCompleted_;
unhooked_ = false;
}
IpcGuardianServer *IpcGuardianServer::castToIpcGuardianServer()
{
return this;
}
void IpcGuardianServer::stop()
{
if (getServerClass()->getServerType() == IPC_SQLBDRS_SERVER)
return;
if (controlConnection_ && allocMethod_ != IPC_USE_PROCESS)
{
// Stop the server process (MXCMP or MXESP)
if (unhooked_)
{
if (controlConnection_->castToGuaConnectionToServer())
{
controlConnection_->castToGuaConnectionToServer()-> closePhandle();
return;
}
}
char procName[200];
short procNameLen = 200;
Int32 nid = 0;
Int32 pid = 0;
short result = 0;
//Phandle wrapper in porting layer
NAProcessHandle phandle((SB_Phandle_Type *)&(getServerId().getPhandle().phandle_));
Int32 guaRetcode = phandle.decompose();
if (!guaRetcode)
{
msg_mon_stop_process_name(phandle.getPhandleString());
}
}
}
short IpcGuardianServer::workOnStartup(IpcTimeout timeout,
ComDiagsArea **diags,
CollHeap *diagsHeap)
{
Int32 retcode = 0;
MXTRC_FUNC("IpcGuardianServer::workonStartup");
if (serverState_ == INITIAL)
{
// Check if the class name contains a slash. If this is the
// case then change the allocation mode to SPAWN, since the
// class name must be an OSS file name. In all other cases
// leave the allocation method as is.
for (Int32 i = 0; className_[i] != 0; i++)
if (className_[i] == '/')
allocMethod_ = IPC_SPAWN_OSS_PROCESS;
if (allocMethod_ == IPC_LAUNCH_GUARDIAN_PROCESS)
{
// launch the process, it will be a new Guardian process
// and can run on any node/cpu; communication will be via
// Guardian WRITEREADX
launchProcess(diags,diagsHeap);
if (serverState_ == CREATING_PROCESS)
{
assert(waitedStartup_ == FALSE);
return 0;
}
}
else if (allocMethod_ == IPC_SPAWN_OSS_PROCESS)
{
// spawn a new OSS process, it is started in the local system
// and the communication is also via WRITEREADX
spawnProcess(diags,diagsHeap);
}
else if (allocMethod_ == IPC_USE_PROCESS)
{
useProcess(diags, diagsHeap);
}
else
ABORT("Invalid process allocation method for Guardian Server");
} // serverState_ == INITIAL
else if (serverState_ == CREATING_PROCESS)
{
assert(waitedStartup_ == FALSE);
launchProcess(diags,diagsHeap); // Call it a second time
}
if (serverState_ == ERROR_STATE)
{
if (diags && (allocMethod_ != IPC_USE_PROCESS))
{
if ((!(**diags).contains(-2013)) && (!(**diags).contains(-2012))) // avoid generating redundant error
{
IpcAllocateDiagsArea(*diags,diagsHeap);
// Server process $0~string0 could not be created on $1~string1
// - Operating system error $2~int0 on program file.
(**diags) << DgSqlCode(-2013) << DgString0(progFileName_)
<< DgInt0(guardianError_)
<< DgNskCode(guardianError_);
char location[100];
getCpuLocationString(location);
(**diags) << DgString1(location);
}
}
return guardianError_;
}
return 0;
} // IpcGuardianServer::workOnStartup()
void IpcGuardianServer::acceptSystemMessage(const char *sysMsg,
Lng32 sysMsgLength)
{
short *msgType = (short *) sysMsg;
// make sure we received at least the two bytes for the message type
// or otherwise we'll read junk instead of the message type
if (sysMsg == NULL) // Temporary debugging aid
return;
assert(sysMsg != NULL AND sysMsgLength >= sizeof(msgType));
// see include file zsysc.h
switch (*msgType)
{
case ZSYS_VAL_SMSG_PROCCREATE:
{
zsys_ddl_smsg_proccreate_def *processCreateNowaitMsg =
(zsys_ddl_smsg_proccreate_def *) sysMsg;
procCreateError_ = processCreateNowaitMsg->z_error;
procCreateDetail_ = processCreateNowaitMsg->z_error_detail;
if (processCreateNowaitMsg->z_error != 0 AND
processCreateNowaitMsg->z_error != 14 /*undef. externals*/)
{
guardianError_ = 4022; // some generic Guardian error
// set the error code and set the state to ERROR_STATE, it is
// the responsibility of the user of the object to set the
// diagnostics area
serverState_ = ERROR_STATE;
return;
}
else
{
// process was successfully created, now create a connection to it
IpcProcessId serverProcId(
(const GuaProcessHandle &) processCreateNowaitMsg->z_phandle);
NABoolean useGuaIpc = TRUE;
if (useGuaIpc)
{
controlConnection_ = new
(getServerClass()->getEnv()->getHeap())
GuaConnectionToServer(getServerClass()->getEnv(),
serverProcId,
usesTransactions_,
nowaitDepth_,
eye_GUA_CONNECTION_TO_SERVER,
parallelOpens_, NULL, FALSE
);
if (controlConnection_->getState() == IpcConnection::ERROR_STATE)
guardianError_ = controlConnection_->castToGuaConnectionToServer()->getGuardianError();
}
// On NT and Linux startup message is not needed.
if (controlConnection_->getState() == IpcConnection::ERROR_STATE)
{
serverState_ = ERROR_STATE;
return;
}
else
{
serverState_ = READY;
}
}
}
break;
case ZSYS_VAL_SMSG_PROCSPAWN:
{
zsys_ddl_smsg_procspawn_def *processSpawnNowaitMsg =
(zsys_ddl_smsg_procspawn_def *) sysMsg;
if (processSpawnNowaitMsg->z_errno != 0)
{
// set the error code and set the state to ERROR_STATE, it is
// the responsibility of the user of the object to set the
// diagnostics area
guardianError_ = (short) processSpawnNowaitMsg->z_errno;
procCreateError_ = processSpawnNowaitMsg->z_tpcerror;
procCreateDetail_ = processSpawnNowaitMsg->z_tpcdetail;
serverState_ = ERROR_STATE;
return;
}
else
{
// process was successfully created, now create a connection to it
IpcProcessId serverProcId(
(const GuaProcessHandle &) processSpawnNowaitMsg->z_phandle);
NABoolean useGuaIpc = TRUE;
if (useGuaIpc)
controlConnection_ = new
(getServerClass()->getEnv()->getHeap())
GuaConnectionToServer(getServerClass()->getEnv(),
serverProcId,
usesTransactions_,
nowaitDepth_);
// created server process is immediately ready for use
serverState_ = READY;
}
}
break;
default:
ABORT("Invalid type of system message received");
}
}
void NewProcessCallback(SB_Phandle_Type *newPhandle,
MS_Mon_NewProcess_Notice_def *newProcNotice)
{
NowaitedEspStartup *nowaitedEspStartup = (NowaitedEspStartup*)newProcNotice->tag;
NowaitedEspServer *nowaitedEspServer = nowaitedEspStartup->nowaitedEspServer_;
Int32 *procCreateError = nowaitedEspStartup->procCreateError_;
NABoolean *nowaitedStartupCompleted = nowaitedEspStartup->nowaitedStartupCompleted_;
memcpy(*nowaitedEspStartup->newPhandle_,(void *)newPhandle, sizeof(SB_Phandle_Type));
ESP_TRACE2("CB: ToAcq_m, tag: %p\n", nowaitedEspStartup);
pthread_mutex_lock(&nowaitedEspServer->cond_mutex_);
ESP_TRACE2("CB: Acq_m, tag: %p\n", nowaitedEspStartup);
*procCreateError = newProcNotice->ferr;
*nowaitedStartupCompleted = TRUE;
nowaitedEspServer->callbackCount_ += 1;
// if (nowaitedEspServer->startTag_ == nowaitedEspServer->callbackCount_ && nowaitedEspServer->completionCount_ == 0)
if (nowaitedEspServer->waiting_)
pthread_cond_signal(&nowaitedEspServer->cond_cond_);
pthread_mutex_unlock(&nowaitedEspServer->cond_mutex_);
ESP_TRACE2("CB: Rls_m, tag: %p\n", nowaitedEspStartup);
}
void IpcGuardianServer::launchNSKLiteProcess(ComDiagsArea **diags,
CollHeap *diagsHeap)
{
NABoolean nowaitedStartupCompleted = FALSE;
static bool sv_cmp_node_id_checked = false;
static bool sv_cmp_node_id_mine = false;
static bool sv_launch_unhooked_checked = false;
static bool sv_launch_unhooked = false;
static bool sv_launch_cmp_unhooked_checked = false;
static bool sv_launch_cmp_unhooked = false;
bool launch_hooked_special = false;
bool noSeabaseDefTableRead = false;
NSK_PORT_HANDLE p_phandle;
if (serverState_ == INITIAL)
{
// a character string with the program file name
const Int32 maxLengthOfCommandLineArgs = 32;
char progFileName[(IpcMaxGuardianPathNameLength +
maxLengthOfCommandLineArgs)];
char * environmentName= NULL;
// if this assertion fails during testing then increase
// the literal above.
assert(strlen(" -guardian -debug") <= 32);
// parameters to NSKProcessCreate
short p_pe;
Int32 p_nowaitTag;
#define MAX_PROC_ARGS 10
#define SET_ARGV(argv,argc,argval) {argv[argc] = (char *) calloc(strlen(argval)+1, 1); \
strcpy(argv[argc++], argval); }
Int32 largc = 0;
char *largv[MAX_PROC_ARGS];
MS_Mon_PROCESSTYPE processType = MS_ProcessType_Generic;
//NGG
openTraceFile();
// ---------------------------------------------------------------------
// Set parameters for process_launch_
// ---------------------------------------------------------------------
// Pe
if (cpuNum_ == IPC_CPU_DONT_CARE)
p_pe = -1; // use same cpu as caller
else
p_pe = (short)cpuNum_;
// -----------------------------------------------------------------
// create the program file name from the class name and the overriding
// define name.
//
// for now, we form the name from an environment variable. if the
// environment variable is not present then we form the name from
// the class name. we look for environment variables of the form
// =_ARK_???_PROG_FILE_NAME
//
// names which are formed from class names are hard coded below.
//
// the long term plan is to form the name from the registry while allowing
// overrides for development and debugging purposes only
//
// note we REQUIRE the name to be identical on each PE !!!
// -----------------------------------------------------------------
if (overridingDefineForProgFile_)
environmentName = getenv(overridingDefineForProgFile_);
if (environmentName == NULL)
{
// ---------------------------------------------------------------
// The path of executables will be decided by NSKProcessCreate.
// ---------------------------------------------------------------
if ((strcmp(className_,"arkesp")== 0) || (strcmp(className_,"arkespdbg") == 0))
{
SET_ARGV(largv, largc, "tdm_arkesp");
char *fastStartArg = getenv("ESP_FASTSTART");
if (fastStartArg == NULL || *fastStartArg != '1')
SET_ARGV(largv, largc, "-noespfaststart");
strcpy(progFileName,"tdm_arkesp");
}
else
if ((strcmp(className_,"arkcmp")== 0) || (strcmp(className_,"arkcmpdbg") == 0))
{
SET_ARGV(largv, largc, "tdm_arkcmp");
strcpy(progFileName,"tdm_arkcmp");
if (!sv_launch_cmp_unhooked_checked)
{
char *lv_launch_unhooked = getenv("IPC_LAUNCH_CMP_UNHOOKED");
if ((lv_launch_unhooked != NULL) &&
(*lv_launch_unhooked == '1'))
sv_launch_cmp_unhooked = true;
sv_launch_cmp_unhooked_checked = true;
}
launch_hooked_special = !sv_launch_cmp_unhooked;
}
else
if ((strcmp(className_,"arkcat")== 0) || (strcmp(className_,"arkcatdbg") == 0))
strcpy(progFileName,"arkcat.exe");
else
if ((strcmp(className_,"arkustat")== 0) || (strcmp(className_,"arkustatdbg") == 0))
strcpy(progFileName,"arkustat.exe");
else
if ((strcmp(className_,"udrserv")== 0) || (strcmp(className_,"udrservdbg") == 0))
{
SET_ARGV(largv, largc, "tdm_udrserv");
strcpy(progFileName,"tdm_udrserv");
}
else
if ((strcmp(className_,"qms")== 0) || (strcmp(className_,"qmsdbg") == 0))
{
SET_ARGV(largv, largc, "tdm_arkqms");
strcpy(progFileName,"tdm_arkqms");
}
else
if ((strcmp(className_,"qmp")== 0) || (strcmp(className_,"qmpdbg") == 0))
{
SET_ARGV(largv, largc, "tdm_arkqmp");
strcpy(progFileName,"tdm_arkqmp");
}
else
if ((strcmp(className_,"qmm")== 0) || (strcmp(className_,"qmmdbg") == 0))
{
SET_ARGV(largv, largc, "tdm_arkqmm");
strcpy(progFileName,"tdm_arkqmm");
}
else
if (strcmp(className_,"bdrr")== 0)
{
// This process should be started as hooked always
launch_hooked_special = true;
SET_ARGV(largv, largc, "mxbdrdrc");
strcpy(progFileName,"mxbdrdrc");
if (cpuNum_ != IPC_CPU_DONT_CARE)
{
MS_Mon_Node_Info_Type info;
if (msg_mon_get_node_info_detail(cpuNum_, &info) != XZFIL_ERR_OK)
p_pe = -1;
else
if (info.num_returned < 1 || ! (info.node[0].type & MS_Mon_ZoneType_Edge))
p_pe = -1;
else
p_pe = cpuNum_;
}
else
p_pe = -1;
}
else
{ serverState_ = ERROR_STATE;
if (diags)
{
IpcAllocateDiagsArea(*diags,diagsHeap);
(**diags) << DgSqlCode(-2011) << DgInt0(FEBADNAME)
<< DgString0(className_) << DgNskCode(FEBADNAME);
}
return;
};
}
else if (strlen(environmentName) <= IpcMaxGuardianPathNameLength)
strcpy(progFileName,environmentName);
else
strcpy(progFileName,"///invalid_env_var");
strcpy(progFileName_, progFileName); // for any error messages
// nowait tag
// ---------------------------------------------------------------------
// Set the run time arguments in the command line
// ---------------------------------------------------------------------
SET_ARGV(largv, largc, "-guardian");
if (debugServer_)
SET_ARGV(largv, largc, "-debug");
// ---------------------------------------------------------------------
// start a new process on the specified PE with the specified
// program file
// ---------------------------------------------------------------------
void * envp = getServerClass()->getEnv()->getEnvVars();
Lng32 envpLen = getServerClass()->getEnv()->getEnvVarsLen();
Int32 server_nid = p_pe; /* multi fragment esp concurrent change */
Int32 server_pid = 0;
Int32 server_oid = 0;
char process_name[100];
char prog[MS_MON_MAX_PROCESS_PATH];
const char *pwd = NULL;
process_name[0] = 0;
pwd = getenv("PWD");
if (strlen(pwd) + 1 + strlen(progFileName) + 1 <
MS_MON_MAX_PROCESS_PATH)
{
strcpy(prog, pwd);
strcat(prog, "/");
strcat(prog, progFileName);
}
else
strcpy(prog, "///invalid prog file");
if (!sv_cmp_node_id_checked)
{
char *lv_cmp_node_id = getenv("CMP_NODE_AFFINITY");
if ((lv_cmp_node_id != NULL) && (*lv_cmp_node_id == '1'))
sv_cmp_node_id_mine = true;
sv_cmp_node_id_checked = true;
}
if (sv_cmp_node_id_mine && ((strcmp(className_,"arkcmp")== 0) ||
(strcmp(className_,"arkcmpdbg") == 0)))
{
Int32 nid;
Int32 err = msg_mon_get_my_info (&nid,NULL,NULL,0,NULL,NULL,NULL,NULL);
if (!err)
server_nid = nid;
}
if (launch_hooked_special)
unhooked_ = false;
else
{
if (!sv_launch_unhooked_checked)
{
char *lv_launch_unhooked = getenv("IPC_LAUNCH_UNHOOKED");
if ((lv_launch_unhooked != NULL) && (*lv_launch_unhooked == '1'))
sv_launch_unhooked = true;
sv_launch_unhooked_checked = true;
}
unhooked_ = sv_launch_unhooked;
}
// strcpy(process_name, "$srv");
if (waitedStartup_ == FALSE)
{
Int32 returnValue;
nowaitedStartupCompleted_ = FALSE;
nowaitedEspStartup_.nowaitedEspServer_->startTag_ += 1;
// Temporarily ignore returnValue
newPhandle_ = (void *)getServerClass()->getEnv()->getHeap()->allocateMemory(sizeof(SB_Phandle_Type));
ESP_TRACE2("MT: Call MMSPNW, svr: %p\n", &nowaitedEspStartup_);
bool retryStartProcess;
do
{
actualCpuNum_ = server_nid; // save requested CPU (might be IPC_CPU_DONT_CARE)
returnValue = msg_mon_start_process_nowait_cb2(NewProcessCallback,
prog, /* prog */
process_name, /* name */
process_name, /* output process name */
largc, /* args */
largv,
//0, /* open */
//&server_oid, /* oid */
processType, /* process type */
0, /* priority */
0, /* debug */
0, /* backup */
(Int64)&nowaitedEspStartup_,
&server_nid, /* nid */
&server_pid,
NULL,
NULL,
unhooked_);
ESP_TRACE2("MT: Back MMSPNW, svr: %p\n", &nowaitedEspStartup_);
if (actualCpuNum_ == IPC_CPU_DONT_CARE)
actualCpuNum_ = server_nid; // msg_mon_start_process_nowait_cb2 might have assigned server_nid
if (returnValue == XZFIL_ERR_FSERR && server_nid != -1)
{
server_nid = -1;
retryStartProcess = true;
requestedCpuDown_ = TRUE;
}
else
retryStartProcess = false;
}
while (retryStartProcess);
ESP_TRACE2("MT: Back MMSPNW, svr: %p\n", &nowaitedEspStartup_);
procCreateError_ = returnValue;
}
else
{
// should have a define for the name length
if (processName_)
{
strncpy (process_name, processName_, 99);
}
actualCpuNum_ = server_nid; // save requested CPU (might be IPC_CPU_DONT_CARE)
Int32 returnValue = msg_mon_start_process2(
prog, /* prog */
process_name, /* name */
process_name, /* output process name */
largc, /* args */
largv,
&p_phandle,
0, /* open */
&server_oid, /* oid */
processType, /* process type */
0, /* priority */
0, /* debug */
0, /* backup */
&server_nid, /* nid */
&server_pid,
NULL,
NULL,
unhooked_);
procCreateError_ = returnValue;
if (actualCpuNum_ == IPC_CPU_DONT_CARE)
actualCpuNum_ = server_nid; // msg_mon_start_process2 might have assigned server_nid
}
serverState_ = CREATING_PROCESS;
if (getenv("SQL_MSGBOX_PROCESS") != NULL)
{
MessageBox( NULL, "Requester: Process Launched", (CHAR *)&progFileName, MB_OK|MB_ICONINFORMATION );
};
} // serverState_ == INITIAL
else
{
assert(serverState_ == CREATING_PROCESS &&
nowaitedEspStartup_.nowaitedEspServer_->waitedStartupArg_ != '1');
ESP_TRACE2("MT: ToAcq_m, svr: %p\n" , &nowaitedEspStartup_);
pthread_mutex_lock(&nowaitedEspStartup_.nowaitedEspServer_->cond_mutex_);
while (nowaitedStartupCompleted_ == FALSE)
{
ESP_TRACE1("MT: Acq_m - Wt_CV\n");
nowaitedEspStartup_.nowaitedEspServer_->waiting_ = TRUE;
pthread_cond_wait(&nowaitedEspStartup_.nowaitedEspServer_->cond_cond_, &nowaitedEspStartup_.nowaitedEspServer_->cond_mutex_);
ESP_TRACE1("MT: Acq_CV\n");
nowaitedEspStartup_.nowaitedEspServer_->waiting_ = FALSE;
}
// Callback for this ESP has occurred
p_phandle = *(NSK_PORT_HANDLE *)newPhandle_;
getServerClass()->getEnv()->getHeap()->deallocateMemory(newPhandle_);
nowaitedEspStartup_.nowaitedEspServer_->completionCount_ += 1;
nowaitedStartupCompleted = TRUE;
if (nowaitedEspStartup_.nowaitedEspServer_->startTag_ == nowaitedEspStartup_.nowaitedEspServer_->completionCount_)
{
assert(nowaitedEspStartup_.nowaitedEspServer_->startTag_ == nowaitedEspStartup_.nowaitedEspServer_->callbackCount_);
nowaitedEspStartup_.nowaitedEspServer_->startTag_ =
nowaitedEspStartup_.nowaitedEspServer_->callbackCount_ =
nowaitedEspStartup_.nowaitedEspServer_->completionCount_ = 0;
}
pthread_mutex_unlock(&nowaitedEspStartup_.nowaitedEspServer_->cond_mutex_);
ESP_TRACE1("MT: Rls_m\n");
}
if (waitedStartup_ OR (procCreateError_ != NO_ERROR) OR nowaitedStartupCompleted)
{
// create a system message from the return info
//
zsys_ddl_smsg_proccreate_def sysmsg;
str_pad((char *) &sysmsg, sizeof(sysmsg), 0);
sysmsg.z_msgnumber = ZSYS_VAL_SMSG_PROCCREATE;
sysmsg.z_tag = -1;
memcpy(&sysmsg.z_phandle, &p_phandle, sizeof(sysmsg.z_phandle));
sysmsg.z_error = procCreateError_;
// the system message that otherwise would be sent to $RECEIVE
// gets delivered right here in outputList
ESP_TRACE2("MT: To call acceptSysMsg, svr: %p\n", &nowaitedEspStartup_);
acceptSystemMessage((const char *) &sysmsg,
sizeof(sysmsg));
ESP_TRACE2("MT: Back from acceptSysMsg, svr: %p\n", &nowaitedEspStartup_);
if (serverState_ == ERROR_STATE)
{
ESP_TRACE1("MT: Error in acceptSysMsg\n");
// something went wrong with process creation, non-parallel open
// of the control connection, or initiation of of parallel
// open of the control connection
if (diags)
if (procCreateError_ == XZFIL_ERR_OK)
// Diagnostics must be due to error on BFILE_OPEN_
controlConnection_->populateDiagsArea(*diags,diagsHeap);
else
populateDiagsAreaFromTPCError(*diags,diagsHeap);
return;
}
}
return;
}
void IpcGuardianServer::launchProcess(ComDiagsArea **diags,
CollHeap *diagsHeap)
{
launchNSKLiteProcess(diags,diagsHeap);
return;
}
void IpcGuardianServer::spawnProcess(ComDiagsArea **diags,
CollHeap *diagsHeap)
{
launchNSKLiteProcess(diags,diagsHeap);
return;
}
void IpcGuardianServer::useProcess(ComDiagsArea **diags,
CollHeap *diagsHeap)
{
SB_Phandle_Type procHandle;
short usedlength;
char processName[50];
char *tmpProcessName;
int rc;
if (processName_ == NULL)
{
tmpProcessName = getServerClass()->getProcessName((short)cpuNum_, processName);
// use diagsHeap for the time being
Int32 len = str_len(processName);
processName_ = new (getServerClass()->getEnv()->getHeap()) char[len+1];
str_cpy_all((char *)processName_, (const char *)processName, len+1);
}
else
tmpProcessName = (char *)processName_;
GuaErrorNumber guaError = 0;
short i = 0;
while (i < 3)
{
rc = get_phandle_with_retry(tmpProcessName, &procHandle);
if (rc != FEOK)
{
serverState_ = ERROR_STATE;
guardianError_ = rc;
if (diags)
{
IpcAllocateDiagsArea(*diags,diagsHeap);
(**diags) << DgSqlCode(-2024) << DgString0(processName_)
<< DgInt0(rc);
}
return;
}
else
{
//Phandle wrapper in porting layer
NAProcessHandle phandle(&procHandle);
rc = phandle.decompose();
if (rc != 0)
{
serverState_ = ERROR_STATE;
guardianError_ = rc;
if (diags)
{
IpcAllocateDiagsArea(*diags,diagsHeap);
(**diags) << DgSqlCode(-2024) << DgString0(processName_)
<< DgInt0(rc);
}
return;
}
}
IpcProcessId serverProcId((const GuaProcessHandle &)procHandle);
controlConnection_ = new(getServerClass()->getEnv()->getHeap())
GuaConnectionToServer(getServerClass()->getEnv(),
serverProcId,
usesTransactions_,
nowaitDepth_);
if (controlConnection_->getState() == IpcConnection::ERROR_STATE)
{
i++;
guaError = controlConnection_->
castToGuaConnectionToServer()->getGuardianError();
delete controlConnection_;
controlConnection_ = NULL;
DELAY(10);
}
else
break;
}
if (controlConnection_ != NULL)
// Theprocess is ready for use
serverState_ = READY;
else
{
serverState_ = ERROR_STATE;
guardianError_ = guaError;
if (diags)
{
IpcAllocateDiagsArea(*diags,diagsHeap);
(**diags) << DgSqlCode(-2024) << DgString0(processName_)
<< DgInt0(guardianError_);
}
}
}
NABoolean IpcGuardianServer::serverDied()
{
const GuaProcessHandle &ph = getServerId().getPhandle();
char pname[PhandleStringLen];
Int32 pnameLen = ph.toAscii(pname, PhandleStringLen);
pname[pnameLen] = '\0';
int nid, pid;
SB_Verif_Type verifier;
int rc = msg_mon_get_process_info2(pname, &nid, &pid, &verifier);
return rc != 0 ;
}
void IpcGuardianServer::populateDiagsAreaFromTPCError(ComDiagsArea *&diags,
CollHeap *diagsHeap)
{
IpcAllocateDiagsArea(diags,diagsHeap);
switch (procCreateError_)
{
// common launch errors
case XZFIL_ERR_NOSUCHDEV: // 14
case XZFIL_ERR_NOBUFSPACE: // 22
case XZFIL_ERR_FSERR: // 53
case XZFIL_ERR_BADREPLY: // 74
case XZFIL_ERR_OVERRUN: // 121
case XZFIL_ERR_DEVERR: // 190
// common error on launch--AQR
(*diags) << DgSqlCode(-2012) << DgInt0(guardianError_)
<< DgInt1(procCreateError_) <<DgInt2(procCreateDetail_)
<< DgNskCode(guardianError_);
break;
default:
(*diags) << DgSqlCode(-2013) << DgInt0(procCreateError_)
<< DgNskCode(procCreateError_);
break;
}
char location[TC_PROCESSOR_NAME_MAX];
getCpuLocationString(location);
(*diags) << DgString1(location);
// the $string0 parameter always identifies the program file name
(*diags) << DgString0(progFileName_);
const char * interpretiveText = NULL; // for 2012 errors, we add interpretive text
switch (procCreateError_)
{
case XZFIL_ERR_NOSUCHDEV: // 14
case XZFIL_ERR_FSERR: // 53
case XZFIL_ERR_DEVERR: // 190
interpretiveText = "Could not access executable file.";
break;
case XZFIL_ERR_NOBUFSPACE: // 22
interpretiveText = "Insufficient buffer space.";
break;
case XZFIL_ERR_BADREPLY: // 74
interpretiveText = "Incorrect reply received from monitor.";
break;
case XZFIL_ERR_OVERRUN: // 121
interpretiveText = "A message overrun occurred while communicating with the monitor.";
break;
default:
interpretiveText = NULL;
break;
}
if (interpretiveText)
(*diags) << DgString2(interpretiveText);
}
void IpcGuardianServer::getCpuLocationString(char *location)
{
if (!location)
return;
// populate the nodeName_ if it has not already been captured
if ((nodeName_ == NULL) && (actualCpuNum_ != IPC_CPU_DONT_CARE))
{
// populate nodeName_ from the Trafodion node number that we actually attempted to use
MS_Mon_Node_Info_Type nodeInfo;
Int32 rc = msg_mon_get_node_info_detail(actualCpuNum_, &nodeInfo);
if (rc == 0)
{
nodeName_ = new (getServerClass()->getEnv()->getHeap()) char[TC_PROCESSOR_NAME_MAX];
strcpy(nodeName_, nodeInfo.node[0].node_name);
}
}
if (nodeName_)
{
strcpy(location,nodeName_);
}
else
{
strcpy(location,"an unspecified node");
}
}
#if defined(LOG_IPC) || defined(LOG_RECEIVE)
void IpcGuaLogTimestamp(IpcConnection *conn)
{
Int64 jts = JULIANTIMESTAMP();
MyGuaProcessHandle me;
char meAsAscii[200];
char otherAsAscii[200];
char *fromto;
short fno = 999;
me.toAscii(meAsAscii,200);
if (conn)
conn->getOtherEnd().toAscii(otherAsAscii,200);
else
{
otherAsAscii[0] = '?';
otherAsAscii[1] = '?';
otherAsAscii[2] = '\0';
}
NABoolean useGuaIpc = TRUE;
if (!useGuaIpc AND conn AND conn->castToGuaConnectionToServer())
{
fromto = " to ";
fno = conn->castToGuaConnectionToServer()->
getFileNumForLogging();
}
else if (conn AND conn->castToGuaConnectionToClient())
{
fromto = " from ";
fno = conn->castToGuaConnectionToClient()->
getFileNumForLogging();
}
else
fromto = " <-> ";
cerr << "(" <<
// NT has problems printing out an Int64
(ULng32) jts
<< "): " << meAsAscii << fromto << otherAsAscii
<< "(" << fno << ") ";
}
#endif /* LOG_IPC || LOG_RECEIVE */