/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
**********************************************************************/
/* -*-C++-*-
 *****************************************************************************
 *
 * File:         Ipc.C
 * Description:  Implementation for process communication services on the
 *               low (executor) level without using C++ run time library
 *
 * Created:      10/6/95
 * Language:     C++
 *
 *****************************************************************************
 */


#define AEVENT 1
#define  CLI_DLL

//#define IPC_INTEGRITY_CHECKING 1  // for debugging purposes

#include "Platform.h"
#include "ComASSERT.h"
#include "ComDiags.h"
#include "logmxevent.h"
#include "ExCollections.h"
#include "Ipc.h"
#include "ipcmsg.h"
#include "str.h"
#include "HeapLog.h"
#include "ComRtUtils.h"
#include "PortProcessCalls.h"

#include <time.h>
#include <sys/time.h>
#include "seabed/pctl.h"
#include "seabed/ms.h"
#include "seabed/int/opts.h"

#include <unistd.h>		// for getpid()

#include "Globals.h"
#include "Context.h"
#include "MXTraceDef.h"
#include "ExSMTrace.h"

#include "SMConnection.h"
#include "ExSMCommon.h"
#include "ExSMGlobals.h"
#include "ExSMReadyList.h"
#include "ExSMTask.h"

NABoolean XAWAITIOX_MINUS_ONE = TRUE;

#include "ComCextdecs.h"
#include "Ex_esp_msg.h"

#ifndef FS_MAX_NOWAIT_DEPTH
#define FS_MAX_NOWAIT_DEPTH 16
#endif

// -----------------------------------------------------------------------
// Methods for class IpcNodeName
// -----------------------------------------------------------------------

IpcNodeName::IpcNodeName(IpcNetworkDomain dom,
			 const char *name)
{
  domain_ = dom;

  if (domain_ == IPC_DOM_INTERNET)
    {
      SockIPAddress extIPAddr;
      SockErrNo lookupResult = extIPAddr.set(name);

      if (lookupResult.hasError())
	ABORT("Node name not found");
      ipAddr_ = extIPAddr.getRawAddress();
    }
  else
    {
      assert(domain_ == IPC_DOM_GUA_PHANDLE);
      assert(str_len(name) <= GuaNodeNameMaxLen);
      str_pad(guardianNode_.nodeName_,GuaNodeNameMaxLen,' ');
      str_cpy_all(guardianNode_.nodeName_,
		  name,
		  str_len(name));
    }
}

IpcNodeName::IpcNodeName(const SockIPAddress &iPNode)
{
  domain_ = IPC_DOM_INTERNET;
  ipAddr_ = iPNode.getRawAddress();
}

IpcNodeName & IpcNodeName::operator = (const IpcNodeName &other)
{
  domain_ = other.domain_;
  if (domain_ == IPC_DOM_INTERNET)
    ipAddr_ = other.ipAddr_;
  else
    guardianNode_ = other.guardianNode_;

  return *this;
}

NABoolean IpcNodeName::operator == (const IpcNodeName &other)
{
  if (domain_ != other.domain_)
    return FALSE;

  if (domain_ == IPC_DOM_INTERNET)
    {
      return (str_cmp((char *) &ipAddr_,
		      (char *) &other.ipAddr_,
		      sizeof(ipAddr_)) == 0);
    }
  else if (domain_ == IPC_DOM_GUA_PHANDLE)
    {
      return (str_cmp(guardianNode_.nodeName_,
		      other.guardianNode_.nodeName_,
		      GuaNodeNameMaxLen) == 0);
    }
  else
    return FALSE;
}

SockIPAddress IpcNodeName::getIPAddress() const
{
  assert(domain_ == IPC_DOM_INTERNET);
  return SockIPAddress(ipAddr_);
}

// -----------------------------------------------------------------------
// Methods for class GuaProcessHandle
// -----------------------------------------------------------------------

NABoolean GuaProcessHandle::operator == (const GuaProcessHandle &other) const
{
  // call a system procedure to compare
  return compare(other);
}

void GuaProcessHandle::dumpAndStop(bool doDump, bool doStop) const
{
  char coreFile[1024];
  NAProcessHandle phandle((SB_Phandle_Type *)&phandle_);
  phandle.decompose();
  if (doDump)
    msg_mon_dump_process_name(NULL, phandle.getPhandleString(), coreFile);
  if (doStop)
    msg_mon_stop_process_name(phandle.getPhandleString()); 
}

// -----------------------------------------------------------------------
// Methods for class IpcProcessId
// -----------------------------------------------------------------------

IpcProcessId::IpcProcessId() : IpcMessageObj(IPC_PROCESS_ID,
					     IpcCurrProcessIdVersion)
{
  domain_ = IPC_DOM_INVALID;
}

IpcProcessId::IpcProcessId(
     const GuaProcessHandle &phandle) : IpcMessageObj(IPC_PROCESS_ID,
						      IpcCurrProcessIdVersion)
{
  domain_ = IPC_DOM_GUA_PHANDLE;
  phandle_ = phandle;
}

IpcProcessId::IpcProcessId(
     const SockIPAddress &ipAddr,
     SockPortNumber port) : IpcMessageObj(IPC_PROCESS_ID,
					  IpcCurrProcessIdVersion)
{
  // only an internet node name goes together with an IP address and a port
  domain_ = IPC_DOM_INTERNET;
  pid_.ipAddress_ = ipAddr.getRawAddress();
  pid_.listnerPort_ = port;
}

IpcProcessId::IpcProcessId(const char *asciiRepresentation) :
     IpcMessageObj(IPC_PROCESS_ID,
		   IpcCurrProcessIdVersion)
{
  domain_ = IPC_DOM_INVALID;

  // On NSK, try to interpret the string as a PHANDLE first
  if (phandle_.fromAscii(asciiRepresentation))
    domain_ = IPC_DOM_GUA_PHANDLE;

  if (domain_ == IPC_DOM_INVALID)
    {
      // try to decode an internet address, followed by a port number,
      Int32 colonPos = 0;
      while (asciiRepresentation[colonPos] != 0 AND
	     asciiRepresentation[colonPos] != ':')
	colonPos++;

      if (asciiRepresentation[colonPos] == ':')
	{
	  char asciiIpAddr[300];
	  SockIPAddress ipAddr;

	  assert(colonPos < 300);
	  str_cpy_all(asciiIpAddr,asciiRepresentation,colonPos);
	  asciiIpAddr[colonPos] = 0;
	  SockErrNo sen = ipAddr.set(asciiIpAddr);

	  if (NOT sen.hasError())
	    {
	      pid_.ipAddress_ = ipAddr.getRawAddress();
	      // now parse the port number
	      ULng32 portNo = 0;
	      colonPos++;
	      while (asciiRepresentation[colonPos] >= '0' AND
		     asciiRepresentation[colonPos] <= '9')
		{
		  portNo = portNo * 10 + (asciiRepresentation[colonPos] - '0');
		}
	      pid_.listnerPort_ = portNo;
	      domain_ = IPC_DOM_INTERNET;
	    }
	}
    }
}

IpcProcessId::IpcProcessId(
     const IpcProcessId &other) : IpcMessageObj(other.getType(),
						other.getVersion())
{
  domain_ = other.domain_;

  if (domain_ == IPC_DOM_GUA_PHANDLE)
    phandle_ = other.phandle_;
  else if (domain_ == IPC_DOM_INTERNET)
    pid_ = other.pid_;
}

IpcProcessId & IpcProcessId::operator = (const IpcProcessId &other)
{
  domain_ = other.domain_;

  if (domain_ == IPC_DOM_GUA_PHANDLE)
    phandle_ = other.phandle_;
  else if (domain_ == IPC_DOM_INTERNET)
    pid_ = other.pid_;

  return *this;
}

NABoolean IpcProcessId::operator == (const IpcProcessId &other) const
{
  if (domain_ != other.domain_)
    return FALSE;

  return ((domain_ == IPC_DOM_GUA_PHANDLE AND
	   phandle_ == other.phandle_)
	  OR
	  (domain_ == IPC_DOM_INTERNET AND
	   str_cmp((char *) pid_.ipAddress_.ipAddress_,
		   (char *) other.pid_.ipAddress_.ipAddress_,
		   4) == 0 AND
	   pid_.listnerPort_ == other.pid_.listnerPort_));
}

NABoolean IpcProcessId::match(const IpcNodeName &name,
			      IpcCpuNum cpuNum) const
{
  if (domain_ == IPC_DOM_INTERNET)
    {
      // IP addresses don't tell the CPU, just compare a normalized
      // form of the IP addresses
      return (getNodeName() == name);
    }
  else if (domain_ == IPC_DOM_GUA_PHANDLE)
    {
      // if the caller cares about CPU number then compare this
      // first
      if (cpuNum != IPC_CPU_DONT_CARE AND cpuNum != getCpuNum())
	return FALSE;

      // compare the node names
      return (getNodeName() == name);
    }
  else
    return FALSE;
}

SockIPAddress IpcProcessId::getIPAddress() const
{
  assert(domain_ == IPC_DOM_INTERNET);

  return SockIPAddress(pid_.ipAddress_);
}

SockPortNumber IpcProcessId::getPortNumber() const
{
  assert(domain_ == IPC_DOM_INTERNET);

  return pid_.listnerPort_;
}

const GuaProcessHandle & IpcProcessId::getPhandle() const
{
  assert(domain_ == IPC_DOM_GUA_PHANDLE);

  return phandle_;
}

IpcNodeName IpcProcessId::getNodeName() const
{
  // getting to the node name is somewhat convoluted, sorry about that
  if (domain_ == IPC_DOM_INTERNET)
    {
      return IpcNodeName(SockIPAddress(pid_.ipAddress_));
    }
  else if (domain_ == IPC_DOM_GUA_PHANDLE)
    {
      return IpcNodeName(phandle_);
    }
  else 
    ABORT("Can't get node name of an invalid process id");

//    the return statement is here so this file will compile under VC++4.1
//    what is actually returned is of little consequence since the ABORT makes
//	  sure we never get to return.
//    Perhaps we should set pid_.ipAddress_ =  some meaningless ip address?;
	  
      return IpcNodeName(SockIPAddress(pid_.ipAddress_));
}

IpcCpuNum IpcProcessId::getCpuNum() const
{
  if (domain_ == IPC_DOM_GUA_PHANDLE)
    {
      // ask Guardian to get the CPU number out of the phandle
      return getCpuNumFromPhandle();
    }
  else
    {
      // for the internet we don't have control over the assignment of CPU
      // numbers, return a don't care value
      return IPC_CPU_DONT_CARE;
    }
}

Int32 IpcProcessId::toAscii(char *outBuf, Int32 outBufLen) const
{
  // process names shouldn't be longer than 300 bytes
  char outb[300] = "";	  // Initialize in case this is called
  Int32 outLen = 0;

  if (domain_ == IPC_DOM_GUA_PHANDLE)
    {
      outLen = phandle_.toAscii(outb,300);
    }

  if (domain_ == IPC_DOM_INTERNET)
    {
      sprintf(outb,"%d.%d.%d.%d:%d",
	      pid_.ipAddress_.ipAddress_[0],
	      pid_.ipAddress_.ipAddress_[1],
	      pid_.ipAddress_.ipAddress_[2],
	      pid_.ipAddress_.ipAddress_[3],
	      pid_.listnerPort_);
      outLen = str_len(outb);
    }

  // copy the result and terminate it with a NUL character
  str_cpy_all(outBuf,outb,MINOF(outLen,outBufLen-1));
  outBuf[MINOF(outLen,outBufLen-1)] = 0;

  // return the actual length or the length we would need
  return outLen;
}

void IpcProcessId::addProcIdToDiagsArea(ComDiagsArea &diags,
					Int32 stringno) const
{
  char asciiProcId[300];

  toAscii(asciiProcId,300);
  switch (stringno)
    {
    case 0:
      diags << DgString0(asciiProcId);
      break;
    case 1:
      diags << DgString1(asciiProcId);
      break;
    case 2:
      diags << DgString2(asciiProcId);
      break;
    case 3:
      diags << DgString3(asciiProcId);
      break;
    case 4:
      diags << DgString4(asciiProcId);
      break;
    default:
      ABORT("Invalid string no in IpcProcessId::addProcIdToDiagsArea");
    }
}

IpcConnection * IpcProcessId::createConnectionToServer(
     IpcEnvironment *env,
     NABoolean usesTransactions,
     Lng32 maxNowaitRequests,
     NABoolean parallelOpen,
     Int32 *openCompletionScheduled
     ,
     NABoolean dataConnectionToEsp
     ) const
{
  NABoolean useGuaIpc = TRUE;

  if (domain_ == IPC_DOM_INTERNET)
    {
      usesTransactions = usesTransactions; // make compiler happy
      return new(env->getHeap()) SockConnection(env,*this,FALSE);
    }
  else if (domain_ == IPC_DOM_GUA_PHANDLE)
    {
	return new(env->getHeap()) GuaConnectionToServer(env,
					      *this,
					      usesTransactions,
					      (unsigned short) maxNowaitRequests,
					      eye_GUA_CONNECTION_TO_SERVER,
					      parallelOpen,
					      openCompletionScheduled
                                              ,
                                              dataConnectionToEsp
                                              );
    }
  else
    {
      return NULL;
    }
}

IpcMessageObjSize IpcProcessId::packedLength()
{
  // we pack the domain type and then the phandle or socket process id
  IpcMessageObjSize result = baseClassPackedLength() + sizeof(domain_);
  result += sizeof(spare_);

  if (domain_ == IPC_DOM_GUA_PHANDLE)
    {
      result += sizeof(phandle_);
    }
  else if (domain_ == IPC_DOM_INTERNET)
    {
      result += sizeof(pid_);
    }

  return result;
}

IpcMessageObjSize IpcProcessId::packObjIntoMessage(IpcMessageBufferPtr buffer)
{
  // pack base class and domain info
  IpcMessageObjSize result = packBaseClassIntoMessage(buffer);
  str_cpy_all(buffer,(const char *) &domain_, sizeof(domain_));
  result += sizeof(domain_);
  buffer += sizeof(domain_);
  result += sizeof(spare_);
  buffer += sizeof(spare_);

  // ---------------------------------------------------------------------
  // NOTE: this code assumes that the OS dependent information (phandle
  // and socket pid) can be sent to another process as a byte string!!!!
  // ---------------------------------------------------------------------

  // pack the object of the right domain
  if (domain_ == IPC_DOM_GUA_PHANDLE)
    {
      str_cpy_all(buffer,(const char *) &phandle_,sizeof(phandle_));
      result += sizeof(phandle_);
    }
  else if (domain_ == IPC_DOM_INTERNET)
    {
      str_cpy_all(buffer,(const char *) &pid_,sizeof(pid_));
      result += sizeof(pid_);
    }
  
  return result;
}

void IpcProcessId::unpackObj(IpcMessageObjType objType,
			     IpcMessageObjVersion objVersion,
			     NABoolean sameEndianness,
			     IpcMessageObjSize objSize,
			     IpcConstMessageBufferPtr buffer)
{
  assert(objType == IPC_PROCESS_ID AND
	 objVersion == IpcCurrProcessIdVersion AND
	 sameEndianness);

  unpackBaseClass(buffer);

  str_cpy_all((char *) &domain_, buffer, sizeof(domain_));
  buffer += sizeof(domain_);
  buffer += sizeof(spare_);

  // check the supplied length
  assert(objSize == packedLength());

  if (domain_ == IPC_DOM_GUA_PHANDLE)
    {
      str_cpy_all((char *) &phandle_, buffer, sizeof(phandle_));
    }
  else if (domain_ == IPC_DOM_INTERNET)
    {
      str_cpy_all((char *) &pid_, buffer, sizeof(pid_));
    }
}

// -----------------------------------------------------------------------
// Methods for class IpcServer
// -----------------------------------------------------------------------

IpcServer::IpcServer(IpcConnection *controlConnection,
		     IpcServerClass *serverClass)
{
  controlConnection_ = controlConnection;
  serverClass_ = serverClass;
  str_pad(progFileName_,IpcMaxGuardianPathNameLength,0);
}

IpcServer::~IpcServer()
{
  logEspRelease(__FILE__, __LINE__);  
  if (controlConnection_)
    {
#ifdef IPC_INTEGRITY_CHECKING
      IpcEnvironment * ie = controlConnection_->getEnvironment();
      IpcAllConnections * allc = ie->getAllConnections();

      ie->checkIntegrity();
#endif
      stop();
      delete controlConnection_;
      controlConnection_ = NULL;

#ifdef IPC_INTEGRITY_CHECKING
      ie->checkIntegrity();
#endif
    }
}

void IpcServer::release()
{
  serverClass_->freeServerProcess(this);
}

void IpcServer::stop()
{
  // TBD $$$$ (in implementations for derived classes)
}

IpcGuardianServer *IpcServer::castToIpcGuardianServer()
{
  // IpcGuardianServer::castToIpcGuardianServer() returns a non-null value
  return NULL;
}

void IpcServer::logEspRelease(const char * filename, int lineNum, 
                              const char *msg)
{
  IpcConnection *cc = controlConnection_;
  if (cc &&
      cc->getEnvironment() &&
      cc->getEnvironment()->getLogReleaseEsp())
  {
    /*
    Coverage notes: to test this code in a dev regression requires
    changing $TRAF_HOME/etc/ms.env.  However, it was tested in
    stress test on May 10, 2012.
    */
    char logMsg[500];

    // get the other end's name.
    char espName[32];
    Int32 pnameLen = cc->getOtherEnd().getPhandle().toAscii(
                       espName, sizeof(espName));
    espName[pnameLen] = '\0';

    // get the error #, if available.  Else, use -99.
    GuaErrorNumber guaError = -99;
    if (cc->castToGuaConnectionToServer())
      guaError = cc->castToGuaConnectionToServer()->getGuardianError();

    // get replySeqNum_ and state_
    ULng32 replySeqNum = cc->getReplySeqNum();

    char *state = (char *) "No State";
    switch (cc->getState())
    {
      case IpcConnection::INITIAL: 
        state = (char *) "INITIAL"; 
        break;
      case IpcConnection::OPENING: 
        state = (char *) "OPENING"; 
        break;
      case IpcConnection::ESTABLISHED: 
        state = (char *) "ESTABLISHED"; 
        break;
      case IpcConnection::SENDING: 
        state = (char *) "SENDING"; 
        break;
      case IpcConnection::REPLY_PENDING: 
        state = (char *) "REPLY_PENDING"; 
        break;
      case IpcConnection::RECEIVING: 
        state = (char *) "RECEIVING"; 
        break;
      case IpcConnection::CANCELLING: 
        state = (char *) "CANCELLING"; 
        break;
      case IpcConnection::ERROR_STATE: 
        state = (char *) "ERROR_STATE"; 
        break;
      case IpcConnection::CLOSED: 
        state = (char *) "CLOSED"; 
        break;
    }

    if (msg)
      str_sprintf(logMsg,
      "Releasing ESP %s , %s,"
      "guaError = %d, replySeqNum = %d, state = %s",
      espName,
      msg,
      guaError,
      replySeqNum,
      state);
    else
      str_sprintf(logMsg,
      "Releasing ESP %s ,"
      "guaError = %d, replySeqNum = %d, state = %s",
      espName,
      guaError,
      replySeqNum,
      state);
    
    SQLMXLoggingArea::logExecRtInfo(filename, lineNum, logMsg, 0);
  }
}

// -----------------------------------------------------------------------
//  Methods for class IpcConnection
// -----------------------------------------------------------------------

IpcConnection::IpcConnection(IpcEnvironment *env,
			     const IpcProcessId &pid,
                             const char *eye) :
  otherEnd_(pid),
  sendQueue_(env->getHeap()),
  receiveQueue_(env->getHeap()),
  replySeqNum_(0),
  recvStreams_(env->getHeap()),
  stopWait_(FALSE),
  trustIncomingBuffers_(TRUE),
  ipcMsgBufCheckFailed_(FALSE),
  breakReceived_(FALSE),
  lastTraceIndex_(NumIpcConnTraces-1),
  fileNumForIOCompletion_(InvalidGuaFileNumber),
  sendPersistentOpenReconnect_(FALSE)
{
  // ---------------------------------------------------------------------
  // Copy the eye catcher
  // ---------------------------------------------------------------------
  str_cpy_all((char *) &eyeCatcher_, eye, 4);

  state_ = INITIAL;
  environment_ = env;

  IpcAllConnections *allConn = environment_->getAllConnections();

  // find a free entry in the array that stores pointers to all connections
  id_ = allConn->unusedIndex();

  // insert this connection into the free slot and remember the slot number
  allConn->insertAt(id_,this);

  clearErrorInfo();
  MXTRC_2("IpcConnection::IpcConnection id=%d this=%x\n", id_, this);
#ifdef IPC_INTEGRITY_CHECKING
  cerr << "Just created IpcConnection " << (void *)this
    << " and inserted into IpcAllConnections " << (void *)allConn
    << "." << endl;
  checkIntegrity();
#endif
}

IpcConnection::~IpcConnection() 
{
  MXTRC_1("IpcConnection::~IpcConnection id=%d\n", id_);
  IpcAllConnections *allConn = environment_->getAllConnections();
  assert(!allConn->getPendingIOs().contains(id_));
#ifdef IPC_INTEGRITY_CHECKING
  checkIntegrity();
#endif

  assert(sendQueue_.entries() == 0);
  assert(receiveQueue_.entries() == 0);

  NABoolean couldRemove = allConn->remove(id_);
  assert(couldRemove);
#ifdef IPC_INTEGRITY_CHECKING
  cerr << "Just destroyed IpcConnection " << (void *)this
    << " and removed from IpcAllConnections " << (void *)allConn
    << "." << endl;
  checkIntegrity(FALSE /* suppress orphan checking */);
#endif
}

const char *IpcConnection::getConnectionStateString(IpcConnectionState s)
{
  switch (s)
  {
    case INITIAL: return "INITIAL";
    case OPENING: return "OPENING";
    case ESTABLISHED: return "ESTABLISHED";
    case SENDING: return "SENDING";
    case REPLY_PENDING: return "REPLY_PENDING";
    case RECEIVING: return "RECEIVING";
    case CANCELLING: return "CANCELLING";
    case ERROR_STATE: return "ERROR_STATE";
    case CLOSED: return "CLOSED";
    default: return ComRtGetUnknownString((Int32) s);
  }
}

void IpcConnection::setState(IpcConnectionState s)
{
#ifdef IPC_INTEGRITY_CHECKING
  checkIntegrity();
#endif

  if (s == SENDING OR s == RECEIVING OR s == OPENING)
  {
    environment_->getAllConnections()->IOPending(id_);
  }
  else if (s != IpcConnection::ERROR_STATE)
  {
    environment_->getAllConnections()->IOComplete(id_);
  }
  else
  {
    // 
    // Upon reaching the ERROR_STATE state, we should only announce 
    // I/O completion if all the following are true:
    // 
    //  a) there are no more send queue entries
    //  b) no recv queue entry has a registered callback that
    //     has not been delivered
    // 
    // If there are no send queue entries and no callbacks are pending
    // when the ERROR state is reached then we announce completion.
    // Otherwise, more I/O attempts will be made, and for each attempt
    // we will detect the ERROR and transition to the ERROR state.
    // Eventually we will transition to ERROR and there will be no more
    // send queue entries nor callbacks pending, and at that point we
    // can announce completion.
    //
    if (sendQueueEntries() == 0 && numReceiveCallbacksPending() == 0)
    {
      environment_->getAllConnections()->IOComplete(id_);
    }
  }

  if (state_ != s)
  {
    // Take care of tracing first.
    if (++lastTraceIndex_ >= NumIpcConnTraces)
      lastTraceIndex_ = 0;
    traceState_[lastTraceIndex_].oldState_ = state_;
    traceState_[lastTraceIndex_].mostRecentSendBuffer_ = lastSentBuffer_;
    traceState_[lastTraceIndex_].mostRecentReceiveBuffer_ = lastReceivedBuffer_;
    if (environment_->getLogTimeIpcConnectionState())
      clock_gettime(CLOCK_REALTIME, 
                  &traceState_[lastTraceIndex_].stateChangeTime_);

    // Now a state change.
    if (s == OPENING)
      getEnvironment()->incrNumOpensInProgress();
    else if (state_ == OPENING)
      getEnvironment()->decrNumOpensInProgress();
    state_ = s;
  }  
}

NABoolean IpcConnection::moreWaitsAllowed()
{
  return TRUE;
}

SockConnection *IpcConnection::castToSockConnection()
{
  return NULL;
}

GuaConnectionToServer *IpcConnection::castToGuaConnectionToServer()
{
  return NULL;
}

GuaMsgConnectionToServer *IpcConnection::castToGuaMsgConnectionToServer()
{
  return NULL;
}

GuaConnectionToClient *IpcConnection::castToGuaConnectionToClient()
{
  return NULL;
}

SqlTableConnection *IpcConnection::castToSqlTableConnection()
{
  return NULL;
}

Int64 IpcConnection::getSqlTableTransid()
{
  return -1;
}

void IpcConnection::openPhandle(char * processName, NABoolean parallelOpen)
{
  assert(FALSE);
}

IpcMessageBuffer * IpcConnection::getNextSendQueueEntry()
{
  IpcMessageBuffer *msgBuf = removeNextSendBuffer();
  if (msgBuf)
    prepareSendBuffer(msgBuf);
  return msgBuf;
}

void IpcConnection::IOPending()
{
  environment_->getAllConnections()->IOPending(id_);
}

void IpcConnection::IOComplete()
{
  environment_->getAllConnections()->IOComplete(id_);
}

bool IpcConnection::isServerSide()
{
  return false;
}

IpcMessageBuffer *IpcConnection::removeNextSendBuffer()
{
  IpcMessageBuffer *msgBuf = NULL;
  if (sendQueue_.getFirst(msgBuf))
    return msgBuf;
  return NULL;
}

IpcMessageBuffer *IpcConnection::removeNextReceiveBuffer()
{
  IpcMessageBuffer *msgBuf = NULL;
  if (receiveQueue_.getFirst(msgBuf))
    return msgBuf;
  return NULL;
}

void IpcConnection::removeReceiveStreams()
{
  for (CollIndex j = 0; j < recvStreams_.entries(); j++)
      recvStreams_.removeAt(j);
}



void IpcConnection::prepareSendBuffer(IpcMessageBuffer *msgBuf)
{
  assert(msgBuf);

  // the client side does not send sequence number as we rely on the msg
  // system to maintain the send order.
  //
  // the server side needs to send sequence number. this is because if the
  // client side uses GuaMsgConnectionToServer, it picks up arrival reply
  // msgs in random order, even though the replies are delivered in correct
  // order by the msg system.
  InternalMsgHdrInfoStruct *msgHdr =
      (InternalMsgHdrInfoStruct*)(msgBuf->data(0)); 

  if (isServerSide())
    {
      msgHdr->setSeqNum(replySeqNum_);
      // increment reply seq number for the server side
      replySeqNum_++;
    }
  if (sendPersistentOpenReconnect_)
  {
    msgHdr->setSockReplyTag(PERSISTENT_OPEN_RECONNECT_CODE);
    setSendPersistentOpenReconnect(FALSE);

  }
  else
    msgHdr->setSockReplyTag(0);

}

IpcMessageBuffer * IpcConnection::getNextReceiveQueueEntry()
{
  if (receiveQueue_.entries() == 0)
    return NULL;

  // here's the design of message sequence number:
  //
  // - the client side does not send sequence number. so the server side does
  //   not need to verify sequence number upon request arrival. the msg system
  //   ensures the msgs are delivered in their original send order.
  // - the server sends the sequence number. so the client side needs to
  //   verify sequence number upon reply arrival. this is because if the
  //   client side uses GuaMsgConnectionToServer, it picks up arrival reply
  //   msgs in random order, even though the replies are delivered in correct
  //   order by the msg system.
  // - in case of IPC error, we don't know if the server was able to properly
  //   assign a sequence number to the buffer, thus we bypass the sequence
  //   number check. 
  //
  // note: the msg system guarantees that msgs are delivered in their send
  // order, with the exception of msgs being delivered through different
  // physical wires. for example, in case of servernet error, msgs could get
  // re-routed through the expand network.
  //
  if (isServerSide() || getState() == ERROR_STATE)
    {
      // this is on the server side, or we got error.
      // no need to verify reply seq number.
      IpcMessageBuffer *msgBuf = receiveQueue_[0];
      for (CollIndex j = 0; j < recvStreams_.entries(); j++)
        {
          if ((msgBuf->getMessageStream() == NULL)    ||        
              (msgBuf->getMessageStream() == recvStreams_[j]) )
            {
              // found a valid msg on receive queue
              receiveQueue_.removeAt(0);
              msgBuf->addCallback(recvStreams_[j]);
              recvStreams_.removeAt(j);
              return msgBuf;
            }
        }

      // if we reach here it means the next msg on receive queue does not
      // match any receiving streams. we must wait for message stream to
      // ask for it.
    }
  else
    {
      // this is on the client side and we did not get error.
      // we need to verify the reply seq number.
      for (CollIndex i = 0; i < receiveQueue_.entries(); i++)
        {
          IpcMessageBuffer *msgBuf = receiveQueue_[i];

          // unpack message header which contains the sequence number
          InternalMsgHdrInfoStruct* msgHdr =
            (InternalMsgHdrInfoStruct*)(msgBuf->data(0)); 
          if (msgHdr->getSeqNum() == replySeqNum_)
            {
              // the next msg on receive queue is the expected reply
              for (CollIndex j = 0; j < recvStreams_.entries(); j++)
                {
                  if ((msgBuf->getMessageStream() == NULL)    ||        
                      (msgBuf->getMessageStream() == recvStreams_[j]) )
                    {
                      // found a valid msg on receive queue
                      receiveQueue_.removeAt(i);
                      msgBuf->addCallback(recvStreams_[j]);

                      // for the SeaMonster continue protocol we cannot
                      // delete the stream from recvStreams_ till
                      // end of the batch reply is sent.
                      if (this->castToSMConnection())
                      {
                        InternalMsgHdrInfoStruct *hdr =
                             (InternalMsgHdrInfoStruct *) msgBuf->data(0);
                        ESPMessageTypeEnum streamType =
                             (ESPMessageTypeEnum) hdr->getType();

                        IpcBufferedMsgStream *str = NULL;
                        if (msgBuf->getMessageStream())
                          str = msgBuf->getMessageStream()->
                            castToIpcBufferedMsgStream();

                        if (str && str->getSMContinueProtocol())
                        {
                          if ( streamType == IPC_MSG_SQLESP_DATA_REPLY &&
                               hdr->getSMLastInBatch() )
                            recvStreams_.removeAt(j);
                        }
                         else
                           recvStreams_.removeAt(j);
                      }
                      else
                        recvStreams_.removeAt(j);

                      // increment reply seq number for the client side
                      replySeqNum_++;
                      return msgBuf;
                    }
                } // for j

              // if we reach here it means the next msg on receive queue does
              // not match any receiving streams. we must wait for message
              // stream to ask for it.
              break;
            }
          else
            {
              // the next msg on receive queue is not the expected reply.
              // continue to the subsequent msg on receive queue.
              continue;
            }
        } // for i
    }

  return NULL;
}

void IpcConnection::setFatalError(IpcMessageStreamBase *msgStream)
{ 
  if (getState() != ERROR_STATE)
    setState(ERROR_STATE);
  
  if (getErrorInfo() == 0)
    setErrorInfo(-1);

  // receive queue may not be empty. if so invoke receive callback.
  // example:
  // GuaMsgConnectionToServer::setFatalError() invokes handleIOErrorForStream()
  // that may have put message buffer on receive queue.
  IpcMessageBuffer *receiveBuf;
  while (receiveBuf = getNextReceiveQueueEntry())
    receiveBuf->callReceiveCallback(this);

  // - what if recvStreams_ is not empty? should we clean up I/Os on them?
}

#ifdef IPC_INTEGRITY_CHECKING

// methods that perform integrity checking on Ipc-related data structures

void IpcConnection::checkIntegrity(NABoolean checkIfOrphan)
  {
  // If the parameter "checkIfOrphan" is true, we will do the orphan check;
  // otherwise we will not. (The caller passes FALSE in contexts where it
  // is valid to be an orphan, e.g. at the end of the IpcConnection destructor.)

  // if checking, assume the worst: this object is an orphan
  isOrphaned_ = checkIfOrphan;  

  environment_->checkIntegrity(); // traverse up to IpcEnvironment, do check
  if (isOrphaned_)
    {
    cerr << "Found orphaned IpcConnection object " << (void *)this << "." << endl;
    assert(!isOrphaned_);
    }
  }

void IpcConnection::checkLocalIntegrity(void)
  {
  isOrphaned_ = FALSE;  // ah, this object isn't an orphan after all
  }

#endif

NABoolean IpcConnection::newClientConnection(IpcMessageBuffer *receivedBuffer)
{
  InternalMsgHdrInfoStruct* msgHdr =
         (InternalMsgHdrInfoStruct*)(receivedBuffer->data(0));
  if (msgHdr->getSeqNum() == 0 && msgHdr->getSockReplyTag() == PERSISTENT_OPEN_RECONNECT_CODE)
  {
    return TRUE;
  }
  else
    return FALSE;
}


void IpcConnection::reportBadMessage()
{
  const char option2 = '2';
  const char *envvar = getenv("ESP_PROPAGATE_ASSERT");
  if (envvar == NULL)
    envvar = &option2;
  if (getEnvironment()->getIpcServerType() == IPC_SQLESP_SERVER &&
      ((*envvar == '1' || *envvar == '2')))
  {
    if (*envvar == '2')
    {
      NAProcessHandle phandle;
      phandle.getmine();
      phandle.decompose();
      UInt32 seconds = (phandle.getPin() % 100) * 5;
      if (seconds >= 250)
        seconds -= 250;
      sleep(seconds);

      genLinuxCorefile(NULL);
    }
    dumpAndStopOtherEnd(true, (*envvar == '2'));
    if (*envvar == '2')
      NAExit(0);  // Already generated core file of myself
  }
}

// -----------------------------------------------------------------------
//  Methods for class IpcAllConnections
// -----------------------------------------------------------------------

#ifdef IPC_INTEGRITY_CHECKING

void IpcAllConnections::checkIntegrity(void)
  {
  // try to traverse to IpcEnvironment... to get there, we need to go
  // via IpcConnection...  
  CollIndex firstConn = 0;
  
  // find first IpcConnection * (if any)
  while ((!used(firstConn)) && (firstConn < entries()))
    firstConn++;

  if (firstConn < entries())  
    {
    // we have a connection
    IpcConnection * c = usedEntry(firstConn);
    
    // traverse up to IpcEnvironment, do check
    c->checkIntegrity(); 
    }
  // else ... can't get there; just do nothing
  }

void IpcAllConnections::checkLocalIntegrity(void)
  {
  // check integrity of subarray
  pendingIOs_->checkLocalIntegrity();

  // check integrity of the set of connections
  CollIndex conn = 0;
  ULng32 entriesChecked = 0;
  
  // find first IpcConnection * (if any)
  while (entriesChecked < entries())
    {
    if (used(conn))
      {
      at(conn)->checkLocalIntegrity();
      entriesChecked++;
      }
    conn++;
    }
  }

#endif

void IpcAllConnections::waitForAllSqlTableConnections(Int64 transid)
{
  // wait for SqlTableConnections (with matched transid) to complete.

  IpcSetOfConnections x = getPendingIOs();

  for (CollIndex i = 0; x.setToNext(i); i++)
    {
      if (x.element(i)->castToSqlTableConnection())
	{
	  if (x.element(i)->getSqlTableTransid() == transid)
	    x.element(i)->wait(IpcInfiniteTimeout);
	}
    }
}

CollIndex IpcAllConnections::fillInListOfPendingPins(char *buff,
                                                     ULng32 buffSize,
                                                     CollIndex numOfPins)
{
  CollIndex i;
  CollIndex firstConnInd = 0;
  CollIndex numOfPendings = MINOF(numOfPins, pendingIOs_->entries());
  char tempB[300];
  Int32 len;

  buff[0] = '\0';   // null terminate
  for (i = 0; i < numOfPendings; i++)
    {
      if (NOT pendingIOs_->setToNext(firstConnInd))
        break;   // should not happen
      len = pendingIOs_->element(firstConnInd)->
                         getOtherEnd().toAscii(tempB, 300);
      if (len > 0 && (ULng32) len + 2 < buffSize)
        {
          if (buff[0] != '\0')  // not the first entry
            {
              str_cat(buff, ", ", buff);
              buffSize -= 2;
            }
          str_cat(buff, tempB, buff);
          buffSize -= len;
        }
      else if (len > 0)
        break;   // no room left
      firstConnInd++;
    }

  return i;
}

void IpcAllConnections::fillInListOfPendingPhandles(GuaProcessHandle *phandles,
						    CollIndex& numOfPhandles)
{
  CollIndex firstConnInd = 0;
  numOfPhandles = MINOF(numOfPhandles, pendingIOs_->entries());
  for (CollIndex i = 0; i < numOfPhandles; i++)
    {
      if (NOT pendingIOs_->setToNext(firstConnInd))
	{
	  // should not happen
	  numOfPhandles = i;
	  return;
	}

      memcpy((char *)&phandles[i],
	     (char *)&pendingIOs_->element(firstConnInd)->getOtherEnd().getPhandle().phandle_,
	     sizeof(GuaProcessHandle));

      firstConnInd++;
    }
}

// Methods for connection tracing
void IpcAllConnections::print()
{
  char buf[10000];
  Int32 lineno = 0;

  while (printConnTrace(lineno, buf))
    {
      printf("%s", buf);
      lineno++;
    }
}

const char *ConnTraceDesc = "All IpcConnections and their states";

void IpcAllConnections::registTraceInfo(IpcEnvironment *env, ExeTraceInfo *ti)
{
  if (env)
    {
      if (ti)
        {
          Int32 lineWidth = 50; // temp
          void *regdTrace;
          Int32 ret = ti->addTrace("IpcConnectionState", this, -1, 4,
                               this, getAnEntry,
                               NULL,
                               lineWidth, ConnTraceDesc, &regdTrace);
          if (ret == 0)
          {
            // trace info added successfully, now add entry fields
            ti->addTraceField(regdTrace, "Connection ", 0,
                              ExeTrace::TR_POINTER32);
            ti->addTraceField(regdTrace, "Type ", 1, ExeTrace::TR_STRING);
            ti->addTraceField(regdTrace, "OtherEnd     ", 2,
                              ExeTrace::TR_STRING);
            ti->addTraceField(regdTrace, "State ", 3, ExeTrace::TR_STRING);
            traceRef_ = regdTrace;
          }
        }
    }
}

Int32 IpcAllConnections::printConnTrace(Int32 lineno, char *buf)
{
  if (lineno == 0)
    printEntry_ = 0;  // first time to print all entries

  // find first IpcConnection * (if any)
  while ((!used(printEntry_)) && (printEntry_ < entries()))
    printEntry_++;

  if (printEntry_ >= entries())
    return 0;  // no more connections

  IpcConnection *c = usedEntry(printEntry_++); // then advance to next
  Int32 rv = 0;
  if (c)
    {
      const char * stateName = "UNKNOWN";
      const char * eyeCatcher = "UNKN";
      Int32 cpu = 0, node, pin = 0;
      SB_Int64_Type seqNum = -1;

      if ((IpcConnection::INITIAL <=  c->getState()) && (c->getState() <= IpcConnection::CLOSED))
          stateName = IpcConnStateName[c->getState()];
      if ((char *)NULL != c->getEyeCatcher())
         eyeCatcher = c->getEyeCatcher();
      IpcNetworkDomain domain = c->getOtherEnd().getDomain();
      if (domain == IPC_DOM_GUA_PHANDLE)
        {
          GuaProcessHandle *otherEnd = (GuaProcessHandle *)&(c->getOtherEnd().getPhandle().phandle_);
          if (otherEnd)
            otherEnd->decompose(cpu, pin, node
                               , seqNum
                               );
        }
      rv = sprintf(buf, "%.4d  %8p  %.4s  %.3d,%.8d %" PRId64 " %s\n",
                   lineno, c, eyeCatcher, cpu, pin, seqNum,
                   stateName);
    }
  return rv;
}

// -----------------------------------------------------------------------
//  Methods for class IpcSetOfConnections
// -----------------------------------------------------------------------

IpcSetOfConnections::IpcSetOfConnections(IpcAllConnections *superset,
					 CollHeap* hp,
					 NABoolean eventDriven, NABoolean esp) :
        SUBARRAY(IpcConnection *)( (ARRAY(IpcConnection *) *) superset, hp),
        cancelWait_(FALSE), allc_(superset),
	eventDriven_(eventDriven), callCount_(0), pollCount_(0),esp_(esp),
	waitCount_(0), ldoneCount_(0), lreqCount_(0), lsigCount_(0),
        smCompletionCount_(0), timeoutCount_(0), activityPollCount_(0),
        lastWaitStatus_(0), ipcAwaitioxEnabled_(TRUE)
{}

IpcSetOfConnections::IpcSetOfConnections (const IpcSetOfConnections & orig, 
                                          CollHeap * h)
     : SUBARRAY(IpcConnection *)(orig, h)
{
  cancelWait_ = FALSE;
  allc_ = orig.allc_;
  eventDriven_ = orig.eventDriven_;
  callCount_ = 0;
  pollCount_ = 0;
  esp_ = orig.esp_;
  waitCount_ = 0;
  ldoneCount_ = 0;
  lreqCount_ = 0;
  lsigCount_ = 0;
  smCompletionCount_ = 0;
  timeoutCount_ = 0;
  activityPollCount_ = 0;
  lastWaitStatus_ = 0;
  ipcAwaitioxEnabled_ = FALSE;
  memset((void *)&ipcAwaitiox_, 0, sizeof(IpcAwaitiox));
}

NABoolean IpcSetOfConnections::moreWaitsAnyConnection()
{
  if (cancelWait_)
    { // Break the wait loop for an asynchronous cancel request.
      cancelWait_ = FALSE;
      return FALSE;
    }
  
  for (CollIndex i = 0; setToNext(i); i++)
    {
      if (element(i)->moreWaitsAllowed())
	return TRUE;
    }
  return FALSE;
}

void IpcSetOfConnections::waitOnSMConnections(IpcTimeout timeout)
{
  for (CollIndex i = 0; setToNext(i); i++)
  {
    IpcConnection *conn = element(i)->castToSMConnection();
    if (conn)
      conn->wait(timeout);
  }
}

WaitReturnStatus IpcSetOfConnections::waitOnSet(IpcTimeout timeout,
						NABoolean calledByESP,
						NABoolean *timedout)
{
  #define  MAX_TOTALWAITTIME   2147483000     //soln 10-061230-1405	
  if (timedout != NULL)
    *timedout = FALSE;
  NABoolean interruptRecvd = FALSE;
	MXTRC_FUNC("IpcSetOfConnections::wait");
	MXTRC_1("timeout=%d\n", timeout);
  // could use select UNIX call or AWAITIOX(-1) later $$$$

  // for now, do one round with zero timeout, then let more rounds follow
  // with exponentially increasing timeouts. Stop when an I/O completes.
  // Make sure to check all other connections before returning after
  // an unsucessful wait on a single connection with no timeout.

  // timeout increment (in 10 msec units), this value is the duration
  // of the first wait cycle we do
  const IpcTimeout  toInc = 1;

  // number of times we just give up the time slice before we actually
  // start to wait (with a timeout of toInc)
  const Lng32        timeSlices = 3;

  IpcTimeout        tout = 0;
  IpcTimeout        totalWaitTime = -1;
  CollIndex         firstConnInd, currentFirstConnInd;
  IpcConnection     *firstConnection = NULL; // For ESP it's conn to the Master
  NABoolean         somethingCompleted = FALSE;
  ULng32     seqNo = 0;
  IpcAllConnections *allc = NULL;
  Lng32              currTimeSlices = timeSlices;
  IpcEnvironment *env = NULL;
  NABoolean ldoneConsumed = FALSE, activity = FALSE;
  short waitFlag;
  short status;
  Int64 currentTime;
  callCount_ += 1;
  if (timeout == -2)
    timeout = -1;
  NABoolean isWaited;
  Int32 isWaitedFactor = 20;
  IpcTimeout totalOfTimeouts = 0;
  NABoolean freeUnusedMemory = TRUE;
  CliGlobals *cliGlobals = NULL;
  NABoolean soloClient = FALSE;
  NABoolean receivedSMEvent = FALSE;
  ExSMGlobals *smGlobals = ExSMGlobals::GetExSMGlobals();;
  ExSMReadyList *smReadyList = (smGlobals ? smGlobals->getReadyList() : NULL);

  // loop, exponentially increasing the timeout used to wait
  // on the first connection of the set
  while (NOT somethingCompleted)
  {
    if (totalWaitTime >= timeout AND timeout != IpcInfiniteTimeout)
    {
      if (timedout != NULL)
      {
	*timedout = TRUE;
      }
      break;
    }
    else
    {
      // get a hold of the first connection in the set, return if there is
      // none (note that the set may change during the while loop)
      firstConnInd = 0;
      if (NOT setToNext(firstConnInd))
	{
	  if ( calledByESP ) 
	    {
	      NAExit(0); // Stop this ESP! No point executing w/o connections.
	    }
	  // set is empty, return right away
	  assert(!ipcAwaitiox_.getCompleted()); // Shouldn't happen--return with AWAITIOX(-1) completion outstanding
	  return WAIT_OK;
	}
      else
	// for an ESP, the first connection is always the master
	firstConnection = element(firstConnInd);

      env = firstConnection->getEnvironment();
      cliGlobals = env->getCliGlobals();

      assert(env->getAllConnections()->getNumPendingIOs() > 0 ||
             timeout != IpcInfiniteTimeout);

      static bool sv_solo_client_completion = false;
      static char escc = '1';
      if (!sv_solo_client_completion)
      {
        sv_solo_client_completion = true;
        char *sccEnvvar = getenv("IPC_SOLO_CLIENT_COMPLETION");
        if (sccEnvvar)
          escc = *sccEnvvar;
      }
      if (escc == '1' && firstConnection->getFileNumForIOCompletion() == 1 &&
          env->getAllConnections()->entries() == 1 && !calledByESP
          && timeout == -1)
        soloClient = TRUE;
      isWaited = eventDriven_ && timeout == -1 && env->getMaxPollingInterval() != 301;
      pollCount_ += 1; // Increment the connections polled count
      waitFlag = LDONE;
      if (esp_)
	waitFlag |= LREQ;
      else if (env->breakEnabled() && !env->lsigConsumed())
	waitFlag |= LSIG;
      status = -1;

      // On Linux if SeaMonster is enabled, set the LRABBIT bit in
      // waitFlag
      Int32 numSMConnections = env->getAllConnections()->getNumSMConnections();
      if (numSMConnections > 0)
        waitFlag |= LRABBIT;

      // if this is the first time, remember the sequence number of
      // completed I/Os so far, if that number changes this method will
      // return
      if (allc == NULL)
	{
	  // get the completion sequence number so far (get the IPC
	  // environment from one of the connections in the set)
	  ldoneConsumed = env->ldoneConsumed();
	  activity = env->isEvent(AEVENT);
	  allc = env->getAllConnections();
	  allc->incrRecursionCount();
          // The sequence number at entry is captured so that it can later
          // be used to determine if any messages have completed. If so,
          // there is work for the scheduler to do and somethingCompleted
          // is set to TRUE causing a timed or infinite waitOnSet to
          // return.
	  seqNo = allc->getCompletionSeqenceNo();
	  totalOfTimeouts = 0;
	}
      else if (isWaited && !env->ldoneConsumed() && !soloClient)
      {
	if (env->isEvent(AEVENT))
	{
	  activityPollCount_ += 1;
	  totalOfTimeouts = 0;
	}
	else
	{
	  IpcTimeout waitInterval = (tout + 1) * isWaitedFactor;
	  if (waitInterval == 0)
	    lastWaitStatus_ = status = XWAIT0(waitFlag, waitInterval);
	  else
	    lastWaitStatus_ = status = XWAITNO0(waitFlag, waitInterval);
	  waitCount_ += 1;
          
	  if (status & LDONE)
	  {
	    ldoneCount_ += 1;
	    ldoneConsumed = TRUE;
	    totalOfTimeouts = 0;
            freeUnusedMemory = TRUE;
	  }
	  else if (status & LREQ)
	  {
	    lreqCount_ += 1;
	    totalOfTimeouts = 0;
            freeUnusedMemory = TRUE;
	  }
	  else if (status & LSIG)
	  {
	    lsigCount_ += 1;
	    env->setLsigConsumed(TRUE);
	    totalOfTimeouts = 0;
            freeUnusedMemory = TRUE;
	  }
	  else if (status & LRABBIT)
	  {
	    smCompletionCount_++;
            totalOfTimeouts = 0;
            freeUnusedMemory = TRUE;

            EXSM_TRACE(EXSM_TRACE_PROTOCOL,
                       "RECV LRABBIT wait count %" PRId64
                       " sm count %" PRId64 " timeout %d",
                       (Int64) waitCount_, (Int64) smCompletionCount_,
                       (int) timeout);
	  }
	  else
	  {
	    totalOfTimeouts += waitInterval;
	    if (calledByESP)
	      {
		if (allc->entries() == 1 ||
                    // Persistent opens AND all connections are (obsolete)
                    // client connections AND ESP is confirmed to be idle 
                    (env->getPersistentOpens() &&
                     allc->entries() == env->getControlConnection()->castToGuaReceiveControlConnection()->getClientConnections()->entries() &&
                     env->getIdleTimestamp() > 0))
		  {
		    // master has released this esp fragment
		    if (env->getStopAfter() > 0)
		      {
			assert(env->getIdleTimestamp() > 0);
			currentTime = NA_JulianTimestamp();
			Int64 timeDiff = currentTime - env->getIdleTimestamp();
			if (timeDiff > ((Int64)env->getStopAfter() * 1000000))
                        {
                          if (env->getLogEspIdleTimeout())
                          {
                            /*
                            Coverage notes: to test this code in a dev 
                            regression requires changing 
                            $TRAF_HOME/etc/ms.env.  However, it was 
                            tested in stress test on 
                            May 10, 2012.
                            */
                            char myName[20];
                            memset(myName, '\0', sizeof(myName));
                            char buf[500];
                            msg_mon_get_my_info(NULL, NULL, 
                              myName, sizeof(myName),NULL,NULL,NULL,NULL);
                            str_sprintf(buf, 
                              "ESP_IDLE_TIMEOUT causes %s to exit.",
                              myName);
                            SQLMXLoggingArea::logExecRtInfo(__FILE__, 
                                                  __LINE__, buf, 0);
                          }
			  // stop esp if it has become idle and timed out
			  NAExit(0);
                        }

		      }
		  }
		else
		  {
		    if (env->getInactiveTimeout() > 0 &&
			env->getInactiveTimestamp() > 0)
		      {
			currentTime = NA_JulianTimestamp();
			Int64 timeDiff = currentTime - env->getInactiveTimestamp();
			if (timeDiff > ((Int64)env->getInactiveTimeout() * 1000000))
			  // stop esp if it has become inactive and timed out
			  NAExit(0);
		      }
		  }
	      }
            timeoutCount_ += 1;

	  } // XWAIT timed out
	} // if (env->isEvent(AEVENT)) else
      } // else if (isWaited && !env->ldoneConsumed() && !soloClient)


      // -----------------------------------------------------------------
      // wait with timeout to on the first (and maybe only) connection
      // -----------------------------------------------------------------

      if (soloClient)
      {
        interruptRecvd = firstConnection->wait(timeout);
        if (interruptRecvd)
        {
	  allc->decrRecursionCount();
	  return WAIT_INTERRUPT;
        }
        else if (seqNo != allc->getCompletionSeqenceNo())
	  somethingCompleted = TRUE;
        continue;
      }

      if (currTimeSlices > 0 AND tout > 0 AND !isWaited)
	{
	  // give up the time slice and reset the timeout to 0 instead
	  // of waiting with a timeout that is greater than 10 milliseconds
	  // (10 msec is a long time on a 300 MHz machine)
	  currTimeSlices--;

	  // platform-dependent code to give up the time slice
	  // the Sleep() method on NT can be used to give up the processor
	  Sleep(0);
	  tout = 0;
      }

      env->setLdoneConsumed(FALSE); // So that we know if it changed if we loop again
      env->setEvent(FALSE, AEVENT); // So that we know if there was any "activity"
                                    // which means to: a) an AWAITIOX completed, b)
                                    // a MSG_ISDONE_ returned true, or c) something was
                                    // started such as tryToStartNewIO called MSG_LINK_.
      Int64 waitCalled = 0, waitReturned;
      if (!isWaited && tout > 0)
	waitCalled = NA_JulianTimestamp();


      NABoolean cycleThruConnections = TRUE;
      NABoolean doIpcAwaitiox;
      
      if (env->getMaxPollingInterval() == 302 || XAWAITIOX_MINUS_ONE == FALSE)
      {
        doIpcAwaitiox = FALSE;
      }
      else
      {
        doIpcAwaitiox =
          ipcAwaitioxEnabled_ &&
          (esp_ || env->getMasterFastCompletion()) &&
          eventDriven_ &&
          (isWaited || tout == 0);
      }
      
      while (cycleThruConnections)
      {
        cycleThruConnections = FALSE;
        interruptRecvd = FALSE;
        currentFirstConnInd = firstConnInd;
        
        if (doIpcAwaitiox)
        {
          if (esp_)
          {
            currentFirstConnInd = 0;
            if (setToNext(currentFirstConnInd) &&
                currentFirstConnInd != firstConnInd)
              // Can happen for SSMP but should never happen for ESP
              firstConnection = element(currentFirstConnInd);
          }
          
          ipcAwaitiox_.DoAwaitiox(esp_ ? FALSE : TRUE);
          
          if (ipcAwaitiox_.getFileNum() != -1)
            env->bawaitioxTrace(this, allc->getRecursionCount(),
                                currentFirstConnInd, firstConnection,
                                &ipcAwaitiox_);
          
          if (ipcAwaitiox_.getFileNum() ==
              firstConnection->getFileNumForIOCompletion())
          {
            interruptRecvd =
              firstConnection->wait(isWaited ? IpcImmediately : tout,
                                    env->getEventConsumed(),
                                    (ipcAwaitiox_.getFileNum() == -1
                                     ? NULL
                                     : &ipcAwaitiox_));
          }
        } // doIpcAwaitiox
        
        else
        {
          interruptRecvd =
            firstConnection->wait(isWaited ? IpcImmediately : tout,
                                  env->getEventConsumed());
        }
        
        if (env->ldoneConsumed())
          ldoneConsumed = TRUE;
        if (env->isEvent(AEVENT))
          activity = TRUE;
        
        if (interruptRecvd)
        {
          env->setLdoneConsumed(ldoneConsumed);
          env->setEvent(activity, AEVENT);
          allc->decrRecursionCount();
          return WAIT_INTERRUPT;
        }
        
        else if (!isWaited && !env->isEvent(AEVENT) && tout > 0 &&
                 seqNo == allc->getCompletionSeqenceNo())
        {
          // waitOnSet timeout assumes that the first connection will
          // wait the specified interval if an interupt is not received
          // and there was not a completion. The following code will
          // delay if the first connection did not wait the specified
          // interval.
          if (firstConnection->getFileNumForIOCompletion() != 1) // Fix bug 2903
          {
            // Don't delay for GuaConnectionToClient
            // (fileNumForIOCompletion equals 1 ($RECEIVE))
            waitReturned = NA_JulianTimestamp();
            IpcTimeout delayTime =
              tout - ((IpcTimeout)((waitReturned - waitCalled) / 10000));
            if (delayTime > 0)
            {
              timespec nanoDelayTime;
              nanoDelayTime.tv_sec = delayTime / 100;
              nanoDelayTime.tv_nsec = (delayTime % 100) * 10000000;
              Int32 retVal = nanosleep(&nanoDelayTime, NULL);
            }
          }
        }
        
        if (doIpcAwaitiox && ipcAwaitiox_.getFileNum() != -1 &&
            ipcAwaitiox_.getCompleted() == FALSE)
        {
          cycleThruConnections = TRUE;
          continue;
        }
        
        
        // add up the time spent waiting
        // (after about 280 days of waiting this would cause an overflow trap)
        if (totalWaitTime == -1 || isWaited)
          totalWaitTime = tout;
        else
        {
          if ( totalWaitTime < MAX_TOTALWAITTIME)  // soln 10-061230-1405 begin
            totalWaitTime += tout;
          else
            totalWaitTime = -1;                    // soln 10-061230-1405  end
        }
        
        // -----------------------------------------------------------------
        // wait with 0 timeout on all the other connections
        // -----------------------------------------------------------------
        for (CollIndex i = currentFirstConnInd+1; setToNext(i); i++)
        {
          IpcConnection *element_i = element(i);
          assert(element_i);
          
          if (doIpcAwaitiox)
          {
            if (ipcAwaitiox_.getFileNum() == element_i->getFileNumForIOCompletion())
              interruptRecvd = element_i->wait(IpcImmediately, env->getEventConsumed(),
                                               (ipcAwaitiox_.getFileNum() == -1 ?
                                                NULL : &ipcAwaitiox_));
            else
              continue;
          }
          else
          {
            interruptRecvd = element_i->wait(IpcImmediately, env->getEventConsumed());
          }
          
          if (env->ldoneConsumed())
            ldoneConsumed =TRUE;
          
          if (interruptRecvd)
          {
            env->setLdoneConsumed(ldoneConsumed);
            env->setEvent(activity, AEVENT);
            allc->decrRecursionCount();
            return WAIT_INTERRUPT;
          }
          
          if (doIpcAwaitiox && ipcAwaitiox_.getFileNum() != -1 &&
              ipcAwaitiox_.getCompleted() == FALSE)
          {
            cycleThruConnections = TRUE;
            break;
          }

        } // for (CollIndex i = currentFirstConnInd+1; setToNext(i); i++)

        if (ipcAwaitiox_.getCompleted())
        {
          void *bufAddr;
          Int32 count;
          SB_Tag_Type tag;
          CollIndex usedLength = allc->getUsedLength();
          for (CollIndex i = 0; i < usedLength ; i++)
          {
            if (allc->getUsage(i) != UNUSED_COLL_ENTRY)
            {
              IpcConnection *conn = allc->usedEntry(i);
              if (conn->getState() == IpcConnection::ERROR_STATE &&
                  conn->getFileNumForIOCompletion() ==
                  ipcAwaitiox_.getFileNum())
              {
                // Clear completed_
                ipcAwaitiox_.ActOnAwaitiox(&bufAddr, &count, &tag);
              }
            }
          }
        }
        
      } // while (cycleThruConnections)
      
      if ( allc_->getRecursionCount() == 1 ) // delete closed connections
      {
        CollIndex usedLength = allc->getUsedLength();
        for (CollIndex i = 0;
             i < usedLength && allc->getDeleteCount() > 0; i++)
        {
          if (allc->getUsage(i) != UNUSED_COLL_ENTRY)
          {
            IpcConnection *conn = allc->usedEntry(i);
            if (conn->getState() == IpcConnection::CLOSED)
            {
              delete conn;
              allc->decrDeleteCount();
            }
          }
        }
      }
      
      // Compare the completion sequence numbers to see whether any
      // connections have completed.
      if (seqNo != allc->getCompletionSeqenceNo())
      {
        somethingCompleted = TRUE;
      }
      else
      {
        // increase the used timeout kind of exponentially by multiplying 
        // with 1.5 and then adding an increment (in 10 ms units).
        // Max out at 3 seconds of waiting time.
        tout = MINOF(env->getMaxPollingInterval(), ( tout + tout/2 + toInc ));
      }
      
      // if we have received a partial message, reset the timeout
      if (allc->getReceivedPartialMessage())
      {
        tout = 0;
        currTimeSlices = timeSlices;
        allc->setReceivedPartialMessage(FALSE);
      }
      if (!moreWaitsAnyConnection())
      {
        env->setLdoneConsumed(ldoneConsumed);
        env->setEvent(activity, AEVENT);
        allc->decrRecursionCount();
        return WAIT_OK;
      }
      
      // Check the SeaMonster completed task queue if there is one,
      // whether or not wait was called, and whether or not LRABBIT
      // was received if wait was called.
      // 
      // It pays to always check it because it's cheaper than figuring
      // out whether or not to.
      //
      // Notes about the "while (firstTask)" loop below
      // 
      // * getFirst() does not modify the ready list
      //
      // * The while loop is guaranteed to terminate because items in
      //   output queues are guaranteed to be removed until all are
      //   emptied and the completed task queue is emptied.
      // 
      // * In the common case, firstTask will be different on each
      //   successive call to getFirst(). It could be the same on
      //   successive calls if the reader thread is dispatched between
      //   successive calls, and the task is placed back at the head
      //   of the queue when it is empty.

      if (smReadyList)
      {
        ExSMTask *firstTask = NULL;
        while ((firstTask = smReadyList->getFirst()) != NULL)
          firstTask->getSMConnection()->wait(0);

        if (seqNo != allc->getCompletionSeqenceNo())
        {
          receivedSMEvent = TRUE;
          somethingCompleted = TRUE;
          EXSM_TRACE(EXSM_TRACE_PROTOCOL, "Done processing SM connections");
          continue;
        }
      }

    } // if (totalWaitTime >= timeout AND timeout != IpcInfiniteTimeout) else
  } // while (NOT somethingCompleted)
  
  if (!receivedSMEvent)
  {
    env->setLdoneConsumed(ldoneConsumed);
    env->setEvent(activity, AEVENT);
  }
  
  allc->decrRecursionCount();
  return WAIT_OK;
}

#ifdef IPC_INTEGRITY_CHECKING

void IpcSetOfConnections::checkIntegrity(void)
  {
  isOrphaned_ = TRUE;  // assume the worst: this object is an orphan

  // try to traverse to IpcEnvironment via IpcAllConnections and do check... 
  allc_->checkIntegrity();

  if ((isOrphaned_) && 
      (entries() > 0))  // entries > 0 ==> traverse to IpcEnvironment should have succeeded
    {
    cerr << "Found orphaned IpcSetOfConnections object." << endl;
    // assert(!isOrphaned_);  well, it might not really be orphaned;
    // it may be we can't get to its associated ExRtFragTable...

    // check this object's integrity now, since we didn't traverse to it
    checkLocalIntegrity(); 
    }
  }

void IpcSetOfConnections::checkLocalIntegrity(void)
  {
  isOrphaned_ = FALSE;

  CollIndex conn = 0;

  while (setToNext(conn))
    {

    // check to see if the corresponding element in the superset is used
    if (!allc_->used(conn))
      {
      cerr << "Found IpcConnection in subarray that is not in IpcAllConnections." << endl;
      assert(allc_->used(conn));
      }
    conn++;
    }
  }

#endif

// -----------------------------------------------------------------------
// Methods for class IpcControlConnection
// -----------------------------------------------------------------------

IpcControlConnection::~IpcControlConnection() {}

SockControlConnection * IpcControlConnection::castToSockControlConnection()
{
  return NULL;
}

GuaReceiveControlConnection *
IpcControlConnection::castToGuaReceiveControlConnection()
{
  return NULL;
}

// -----------------------------------------------------------------------
// Methods for class InternalMsgHdrInfoStruct
// -----------------------------------------------------------------------

///////////////////////////////////////////////////////////////////////////////
// general constructor
InternalMsgHdrInfoStruct::InternalMsgHdrInfoStruct(
       IpcMessageObjType msgType,
       IpcMessageObjVersion version)
  : IpcMessageObj(msgType,version),
    totalLength_(0),
    alignment_(IpcMyAlignment),
    flags_(0),
    format_(0),
    sockReplyTag_(0),
    eyeCatcher_(Release1MessageEyeCatcher),
    seqNum_(0),
    msgStreamId_(0)
  { }

///////////////////////////////////////////////////////////////////////////////
// constructor used to perform copyless receive. unpacks objects in place.
InternalMsgHdrInfoStruct::InternalMsgHdrInfoStruct(
       IpcBufferedMsgStream* msgStream)
: IpcMessageObj(msgStream)
  { 
  // IpcBufferedMsgStream parm used to differentiate from default constructor
  if (getEndianness() != IpcMyEndianness)
    {
    swapFourBytes(totalLength_);
    swapTwoBytes(alignment_);
    swapTwoBytes(format_);
    swapTwoBytes(sockReplyTag_);
    swapFourBytes(eyeCatcher_);
    swapFourBytes(seqNum_);
    assert(0); // Need swapEightBytes() to swap msgStreamId_!
    setEndianness(IpcMyEndianness);
    }
  }

IpcMessageObjSize InternalMsgHdrInfoStruct::packedLength()
{
  return (IpcMessageObjSize) sizeof(*this);
}

IpcMessageObjSize InternalMsgHdrInfoStruct::packObjIntoMessage(
     IpcMessageBufferPtr buffer)
{
  IpcMessageObjSize result = IpcMessageObj::packObjIntoMessage(buffer);

  // we know that the packed representation has the same layout as the unpacked
  // representation, but it needs to be in big-endian format

  return result;
}

void InternalMsgHdrInfoStruct::unpackObj(IpcMessageObjType objType,
					 IpcMessageObjVersion objVersion,
					 NABoolean sameEndianness,
					 IpcMessageObjSize objSize,
					 IpcConstMessageBufferPtr buffer)
{
  assert(objSize == sizeof(*this));

  // the header always arrives in big endian format, so we should
  // see the same endianness if and only if this is a big-endian machine

  IpcMessageObj::unpackObj(objType,
			   objVersion,
			   sameEndianness,
			   objSize,
			   buffer);
}

// -----------------------------------------------------------------------
// Methods for class IpcMessageBuffer
// -----------------------------------------------------------------------

// Private constructor. Only called by public methods such as
// allocate(), createBuffer(), copy(), copyFromOffset().
IpcMessageBuffer::IpcMessageBuffer(CollHeap *heap,
                                   IpcMessageObjSize maxLen,
                                   IpcMessageObjSize msgLen,
                                   IpcMessageStreamBase *msg,
                                   short flags,
                                   short replyTag,
                                   IpcMessageObjSize maxReplyLength,
                                   Int64 transid)
  : InternalMessageBufferHeader(heap,
                                maxLen,
                                msgLen,
                                msg,
                                replyTag,
                                maxReplyLength,
                                transid,
                                flags)
{
}

IpcMessageBuffer *IpcMessageBuffer::allocate(IpcMessageObjSize maxLen,
					     IpcMessageStreamBase *msg,
					     CollHeap *heap,
                                             short flags)
{
  return new (maxLen, heap, TRUE)
    IpcMessageBuffer(heap,
                     maxLen,
                     0,
                     msg,
                     flags,
                     GuaInvalidReplyTag,
                     0,    // maxReplyLength
                     -1);  // transid
}

IpcMessageBuffer *IpcMessageBuffer::createBuffer(IpcEnvironment *env,
                                           IpcMessageObjSize newMaxLen,
                                           NABoolean failureIsFatal)
{
  // default is to make a copy of the same length
  if (newMaxLen == 0)
    newMaxLen = maxLength_;

  CollHeap *heap = (env ? env->getHeap() : NULL);

  IpcMessageBuffer *result = new (newMaxLen, heap, failureIsFatal)
    IpcMessageBuffer(heap,
                     newMaxLen,
                     newMaxLen,
                     message_,
                     flags_,
                     replyTag_,
                     maxReplyLength_, 
                     transid_);
  return result;
}

IpcMessageBuffer *IpcMessageBuffer::copy(IpcEnvironment *env,
					 IpcMessageObjSize newMaxLen,
                                         NABoolean failureIsFatal)
{
  // default is to make a copy of the same length
  if (newMaxLen == 0)
    newMaxLen = maxLength_;
  else
    assert(newMaxLen >= msgLength_); // data must fit in new copy

  CollHeap *heap = (env ? env->getHeap() : NULL);

  IpcMessageBuffer *result = new (newMaxLen, heap, failureIsFatal)
    IpcMessageBuffer(heap,
                     newMaxLen,
                     msgLength_,
                     message_,
                     flags_,
                     replyTag_,
                     maxReplyLength_, 
                     transid_);
  
  if (result)
  {
    str_cpy_all((char *) result->data(0),
                (const char *) data(0),
                (Lng32) MINOF(msgLength_,newMaxLen));

    // the copy gets the reply tag, if there is any
    setReplyTag(GuaInvalidReplyTag);
    setMaxReplyLength(0);
  }

  return result;
}

IpcMessageBuffer *IpcMessageBuffer::copyFromOffset(IpcEnvironment *env,
                                               IpcMessageObjSize newMaxLen,
                                               IpcMessageObjSize offset,
                                               NABoolean failureIsFatal)
{
  assert(msgLength_ > offset); // offset must be before the end

  CollHeap *heap = (env ? env->getHeap() : NULL);

  IpcMessageBuffer *result = new (newMaxLen, heap, failureIsFatal)
    IpcMessageBuffer(heap,
                     newMaxLen,
                     (Lng32) MINOF(msgLength_
                                   ,newMaxLen),
                     message_,
                     flags_,
                     replyTag_,
                     maxReplyLength_, 
                     transid_);
  if (result)
  {
    str_cpy_all((char *) result->data(0),
                (const char *) data(offset),
                (Lng32) MINOF(msgLength_,newMaxLen));

    // the copy gets the reply tag, if there is any
    setReplyTag(GuaInvalidReplyTag);
    setMaxReplyLength(0);
  }

  return result;
}

IpcMessageBuffer *IpcMessageBuffer::resize(IpcEnvironment *env,
					   IpcMessageObjSize newMaxLen)
{
  // NOTE: other users of the buffer will retain access to the old buffer
  IpcMessageBuffer *result = copy(env,newMaxLen);

  // this will delete this buffer, unless there are other references to it
  decrRefCount();
  return result;
}

IpcMessageRefCount IpcMessageBuffer::decrRefCount()
{
  IpcMessageRefCount result = refCount_--;
  
  if (refCount_ == 0)
  {
    if (chunkLockCount_)
    {
      chunkLockCount_->deallocate();
      NADELETEBASIC(chunkLockCount_, heap_);
    }

    // The ref count has dropped to zero. Use the correct method to
    // deallocate space for this buffer.
    if (heap_)
    {
      // This object was allocated on an NAMemory heap
      heap_->deallocateMemory(this);
    }
    else
    {
      // This object was allocated on the C++ heap
      delete this;
    }
  }
  else if (refCount_ < 0)
  {
    // negative refcounts aren't allowed
    assert(refCount_ > 0);
  }
  
  return result;
}

CollIndex IpcMessageBuffer::initLockCount(IpcMessageObjSize maxIOSize)
{
  if (!chunkLockCount_)
    {
      chunkLockCount_ = new(heap_) NAArray<CollIndex>(heap_,2);
      chunkLockCount_->setHeap(heap_);

      if (!chunkLockCount_) 
        {
          return(0);
        }
      maxIOSize_ = maxIOSize;
  
      // determine if last chunk is less than the max IO transmission size
      bool partialChunk = msgLength_ && (msgLength_ % maxIOSize_) ? 1 : 0;
      CollIndex chunkCount = partialChunk ? 1 : 0;
      chunkCount += (msgLength_ >= maxIOSize_) ? (msgLength_ / maxIOSize_) : 0;
      for ( CollIndex i = 0; i < chunkCount ; i++ )
        {
          chunkLockCount_->insertAt(i, 0);
        }
      return(chunkCount);
    }
  
  return(chunkLockCount_->entries());
}

CollIndex IpcMessageBuffer::incrLockCount(IpcMessageObjSize offset)
{
  assert(chunkLockCount_ != NULL);
  CollIndex chunkIndex = (CollIndex) (offset ? offset/maxIOSize_ : 0);
  CollIndex lockCount = chunkLockCount_->at(chunkIndex);
  lockCount += 1;
  chunkLockCount_->insertAt(chunkIndex, lockCount);
  return(lockCount);
}

CollIndex IpcMessageBuffer::decrLockCount(IpcMessageObjSize offset)
{
  assert(chunkLockCount_ != NULL);
  CollIndex chunkIndex = (CollIndex) (offset ? offset/maxIOSize_ : 0);
  CollIndex lockCount = chunkLockCount_->at(chunkIndex);
  lockCount -= 1;
  chunkLockCount_->insertAt(chunkIndex, lockCount);
  return(lockCount);
}

CollIndex IpcMessageBuffer::getLockCount(IpcMessageObjSize offset)
{
  assert(chunkLockCount_ != NULL);
  CollIndex chunkIndex = (CollIndex) (offset ? offset/maxIOSize_ : 0);
  CollIndex lockCount = chunkLockCount_->at(chunkIndex);
  return(lockCount);
}

void IpcMessageBuffer::alignOffset(IpcMessageObjSize &offset)
{
  ULng32 offs = (ULng32) offset; // just for safety

  // clear the last 3 bits of the address to round it down to
  // the next address that is divisible by 8
  ULng32 roundedDown = offs LAND 0xFFFFFFF8;

  // if that didn't change anything we're done, the offset was
  // aligned already
  if (offs != roundedDown)
    {
      // else we have to round up and add the filler
      offset = roundedDown + 8;
    }
}

void IpcMessageBuffer::callSendCallback(IpcConnection *conn)
{
  // increment the wraparound counter for completed high-level I/Os
  conn->getEnvironment()->getAllConnections()->bumpCompletionCount();
  // set connection indicating source of IO 
  connection_ = conn;
  // call the callback without passing a message buffer to it
  message_->internalActOnSend(conn);
}

void IpcMessageBuffer::callReceiveCallback(IpcConnection *conn)
{
  // increment the wraparound counter for completed high-level I/Os
  conn->getEnvironment()->getAllConnections()->bumpCompletionCount();
  // set connection indicating source of IO 
  connection_ = conn;
  // call the callback
  message_->internalActOnReceive(this, conn);
}

void * IpcMessageBuffer::operator new(size_t headerSize,
				      IpcMessageObjSize bufferLength,
				      CollHeap *heap,
                                      NABoolean failureIsFatal)
{
  // If heap is not NULL, allocate the object on that heap. Otherwise
  // allocate on the C++ heap.
  if (heap)
    return heap->allocateMemory(headerSize + (size_t) bufferLength,
                                failureIsFatal);
  else
    return ::operator new(headerSize + (size_t) bufferLength);
}

NABoolean IpcMessageBuffer::verifyBackbone()
{
  return verifyIpcMessageBufferBackbone(*this);
}

//
// Global function to verify IpcMessageBuffer backbone. This function
// needs to look at private data inside IpcMessageObj
// instances. Rather than making the entire IpcMessageBuffer class a
// "friend" of IpcMessageObj, we just make this function a friend of
// the IpcMessageObj and IpcMessageBuffer classes.
//
NABoolean verifyIpcMessageBufferBackbone(IpcMessageBuffer &b)
{
  // Make sure buffer is at least as large as the header object
  if (b.msgLength_ < sizeof(InternalMsgHdrInfoStruct))
  {
    ipcIntegrityCheckEpilogue(FALSE);
    return FALSE;
  }

  // Now we can reference fields in the header
  InternalMsgHdrInfoStruct *header = (InternalMsgHdrInfoStruct *) b.data(0);
  IpcMessageObjSize maxChainLen = MINOF(b.maxLength_, header->totalLength_);
  
  // Loop over all objects in the message buffer, including the header
  // object, and do some sanity checks on them. We will look at all
  // length and next fields to make sure the chain of objects does not
  // extend beyond maxChainLen bytes.
  IpcMessageObj *obj = header;
  IpcMessageObjSize currOffset = 0;
  while (obj != NULL)
  {
    // Make sure buffer is at least as large as an IpcMessageObj
    if (currOffset + sizeof(IpcMessageObj) > maxChainLen)
    {
      ipcIntegrityCheckEpilogue(FALSE);
      return FALSE;
    }
    
    // Now we can reference IpcMessageObj fields
    if (obj->s_.refCount_ != 1)
    {
      ipcIntegrityCheckEpilogue(FALSE);
      return FALSE;
    }

    // All objects except the header must have a NULL virtual function
    // pointer. The header may not because the receiving connection
    // may have already done an in-place unpack of the header and that
    // operation has the side-effect of setting the vptr.
    if (obj != header && obj->getMyVPtr() != NULL)
    {
      ipcIntegrityCheckEpilogue(FALSE);
      return FALSE;
    }

    IpcMessageObjSize next = (IpcMessageObjSize)((Long) obj->s_.next_);

    if (obj->s_.objLength_ < sizeof(IpcMessageObj) ||
        currOffset + obj->s_.objLength_ > maxChainLen ||
        (next != 0 && next < obj->s_.objLength_))
    {
      ipcIntegrityCheckEpilogue(FALSE);
      return FALSE;
    }
    
    // Advance to next object in the chain
    currOffset += next;
    obj = obj->getNextFromOffset();
  }

  return TRUE;
}

// -----------------------------------------------------------------------
// Methods for class IpcMessageStreamBase
// -----------------------------------------------------------------------
IpcMessageStream * IpcMessageStreamBase::castToIpcMessageStream()
{
  return NULL;
}

IpcBufferedMsgStream *
IpcMessageStreamBase::castToIpcBufferedMsgStream()
{
  return NULL;
}

void IpcMessageStreamBase::addToCompletedList()
{    
  environment_->addToCompletedMessages(this);
}

// -----------------------------------------------------------------------
// Methods for class IpcMessageStream
// -----------------------------------------------------------------------

IpcMessageStream::IpcMessageStream(
     IpcEnvironment *env,
     IpcMessageObjType msgType,
     IpcMessageObjVersion version,
     IpcMessageObjSize fixedMsgBufferLength,
     NABoolean shareMessageObjects) :
        IpcMessageStreamBase(env),
        h_(msgType,version),
	recipients_(env->getAllConnections(),env->getHeap()),
	activeIOs_(env->getAllConnections(),env->getHeap())
{
#ifndef USE_SB_NEW_RI
  assert(fixedMsgBufferLength <= IOSIZEMAX);
#else
  assert(fixedMsgBufferLength <= env->getGuaMaxMsgIOSize());
#endif

  msgBuffer_       = NULL;
  fixedBufLen_     = fixedMsgBufferLength;
  maxReplyLength_  = 0;
  shareObjects_    = shareMessageObjects;
  objectsInBuffer_ = FALSE;
  state_           = EMPTY;
  tail_            = first();
  current_         = NULL;
  errorInfo_       = 0;
  numOfSendCallbacks_ = 0;
  corruptMessage_  = false;
  isOrphaned_      = FALSE;
}

IpcMessageStream::~IpcMessageStream()
{
  // the destructor for the message header, which will
  // perform a check of the reference count, is called implicitly

  // deallocate the existing message buffer (despite the name of the proc)
  allocateMessageBuffer(0);
}

IpcMessageStream *IpcMessageStream::castToIpcMessageStream()
{
  return this;
}

IpcMessageStream & IpcMessageStream::operator << (IpcMessageObj & toAppend)
{
#ifdef IPC_INTEGRITY_CHECKING
//  checkIntegrity();  message may be legitimately orphaned here
#endif

  // check message state, this is the process of composing the message
  assert(state_ == EMPTY OR state_ == COMPOSING);
  state_ = COMPOSING;

  if (shareObjects_)
    {
      // if we add a new object to the message, it ought not be in use
      // by some other message (only the owner can have a refcount)
      // (this may be extended later)
      assert(toAppend.s_.next_ == NULL AND toAppend.s_.refCount_ == 1);

      // increment the reference count of the appended shared object
      toAppend.incrRefCount();

      // add the object to the linked list of IpcMessageObj objects hanging
      // off the IpcMessageStream, don't pack the object into the
      // message buffer yet!!
      tail_->s_.next_ = &toAppend;
      tail_ = &toAppend;

    }
  else
    {
      // copy the object into the message buffer without altering its
      // reference count (toAppend can go away after this method returns)

      // size of toAppend when packed into the message buffer
      IpcMessageObjSize thisObjSize = toAppend.packedLength();

      // start offset of the packed version of toAppend in the message buffer
      IpcMessageObjSize startOffset;

      // we need a message buffer of at least this size
      IpcMessageObjSize neededMsgSize;

      // number of bytes used during the pack process
      IpcMessageObjSize packedBytes;

      // a pointer to the packed object cast as an IpcMessageObj
      IpcMessageObj     *bufObj;

      // we know that the object's header uses at least some space
      assert(thisObjSize >= sizeof(IpcMessageObj));
      
      // check whether there are previous objects in the buffer
      if (tail_ == first())
	{
	  // no, this is the first object to be added to the message
	  // (after the header, which gets packed during send(), but
	  // whose space gets allocated right here)
	  h_.totalLength_ = h_.packedLength();
	  IpcMessageBuffer::alignOffset(h_.totalLength_);
	  startOffset = h_.totalLength_;
	}
      else
	{
	  startOffset = h_.totalLength_;
	}

      neededMsgSize = startOffset + thisObjSize;

      // check whether we need to allocate a new or bigger message buffer
      if (msgBuffer_ == NULL OR
	  msgBuffer_->getBufferLength() < neededMsgSize)
	{
	  // current buffer is too small, make room and maybe add a little
	  // reserve so this doesn't happen again next time

	  if (fixedBufLen_ > 0)
	    neededMsgSize = MAXOF(neededMsgSize,fixedBufLen_);
	  else
	    neededMsgSize = MAXOF(neededMsgSize,DefaultInitialMessageBufSize);

	  resizeMessageBuffer(neededMsgSize);
	}

      // now pack the object into the message buffer

      packedBytes = toAppend.packObjIntoMessage(msgBuffer_->data(startOffset));
      bufObj = (IpcMessageObj *) (msgBuffer_->data(startOffset));
      
      // Do some sanity checks, did the object really use the length it
      // promised to use? Did the length field in the message get set
      // correctly?
      assert(thisObjSize == packedBytes);
      assert(bufObj->s_.objLength_ == packedBytes);

      // advance the buffer pointer to the next place where we could
      // put an object (the next object wants to sit on an 8 byte boundary)
      // NOTE: we don't increase bufObj->s_.objLength_ when we align.
      h_.totalLength_ += packedBytes;
      IpcMessageBuffer::alignOffset(h_.totalLength_);

      // prepare the packed object to be shipped off and set the link from
      // the previous object
      bufObj->setMyVPtr(NULL);
      bufObj->s_.refCount_ = 1;
      bufObj->s_.next_ = NULL;
      tail_->s_.next_ = (IpcMessageObj *) ((long)startOffset);
      tail_ = bufObj;
    }
  return *this;
}

NABoolean IpcMessageStream::extractNextObj(IpcMessageObj &toRetrieve,
                                           NABoolean checkObjects)
{
#ifdef IPC_INTEGRITY_CHECKING
//  checkIntegrity();  message may be legitimately orphaned here
#endif

  // check whether it is ok to extract data now
  assert(state_ == RECEIVED OR state_ == EXTRACTING);
  state_ = EXTRACTING;

  // we assume that the user does something like the following to ensure
  // that "toRetrieve" is actually an object of the correct (derived) class:
  //
  //    IpcMessageStream m;
  //    ...
  //    m.receive();
  //    while (m.moreObjects())
  //      {
  //        IpcMessageObjType t;
  //
  //        t = m.getNextObjType();
  //        // if different versions and lengths are possible, the user
  //        // would have to retrieve those as well
  //        switch (t)
  //          {
  //          case xyz: // case for object type of class MyOwnMessageObject
  //            {
  //              MyOwnMessageObject myOwnObj;
  //
  //              // myOwnObj should have the correct type and version
  //              // (if not automatically set by constructor, do it manually)
  //              m >> myOwnObj;
  //              ...
  //            }
  //          }
  //      }
  //

  // the user should have predicted the object type and version correctly
  assert(current_ AND
         toRetrieve.s_.objType_ == current_->s_.objType_ AND
         toRetrieve.isActualVersionOK(current_->s_.objVersion_));

  // current_ points to the next object to be retrieved
  IpcMessageObj *packedObject = current_;
  
  // need to turn around bytes if this object is in the other endianness
  if (h_.getEndianness() != IpcMyEndianness)
    packedObject->turnByteOrder();
  
  // The object pointed to by <current> will get copied onto <toRetrieve>.
  // To avoid trouble, set the refcount of <current> to the refcount of
  // toRetrieve while unpacking the object. This way, even if the user's
  // unpackObj function simply copies the refcount, we still haven't
  // destroyed it. Also, this does not alter <toRetrieve>'s refcount
  // without the user seeing it. Copying a new content onto <toRetrieve>
  // is transparent to those people who have a pointer to it, therefore
  // it shouldn't change it's refcount.
  IpcMessageRefCount saveRefCount = packedObject->s_.refCount_;
  
  // advance the current object pointer
  // (check whether the object uses an offset or a real pointer)
  if (objectsInBuffer_)
    current_ = current_->getNextFromOffset();
  else
    current_ = current_->s_.next_;
  
  // for now, all refcounts of objects in messages should be 1
  assert(saveRefCount == 1);
  
  packedObject->s_.refCount_ = toRetrieve.s_.refCount_;

  // If the caller requested an integrity check on the packed object
  // then we call a checkObj() method before unpackObj().
  NABoolean result = TRUE;
  if (checkObjects)
  {
    result = toRetrieve.checkObj(packedObject->s_.objType_,
                                 packedObject->s_.objVersion_,
                                 h_.getEndianness() == IpcMyEndianness,
                                 packedObject->s_.objLength_,
                                 (IpcConstMessageBufferPtr) packedObject);
  }
  
  // now call the user-defined unpack function
  // NOTE: this may call this method recursively!!!
  if (result)
  {
    toRetrieve.unpackObj(packedObject->s_.objType_,
                         packedObject->s_.objVersion_,
                         h_.getEndianness() == IpcMyEndianness,
                         packedObject->s_.objLength_,
                         (IpcConstMessageBufferPtr) packedObject);
  }
  
  // restore the refcount in the message
  packedObject->s_.refCount_ = saveRefCount;
  
  return result;
}

IpcMessageStream & IpcMessageStream::operator >> (IpcMessageObj * &toRetrieve)
{
#ifdef IPC_INTEGRITY_CHECKING
//  checkIntegrity(); message may be legitimately orphaned here
#endif

  // check whether it is ok to extract data now
  assert(state_ == RECEIVED OR state_ == EXTRACTING);
  state_ = EXTRACTING;

  // current_ points to the next object to be retrieved
  if (current_ != NULL)
    {
      // return the pointer to the current object
      toRetrieve = current_;

      // indicate that the current object is now also referenced
      // by the caller of this method (caller has to release the
      // object later)
      toRetrieve->incrRefCount();

      // advance the current object pointer
      // (check whether the object uses an offset or a real pointer)
      if (objectsInBuffer_)
	current_ = current_->getNextFromOffset();
      else
	current_ = current_->s_.next_;
    }
  else
    {
      // error, no more objects are available
      toRetrieve = NULL;
    }

  return *this;
}

void IpcMessageStream::clearAllObjects()
{
#ifdef IPC_INTEGRITY_CHECKING
  checkIntegrity();
#endif

  IpcMessageObj *obj;
  IpcMessageObj *next;

  assert(state_ != SENDING AND state_ != RECEIVING);
  assert(activeIOs_.entries() == 0);

  if (objectsInBuffer_)
    {
      clearMessageBufferContents();
    }
  else
    {
      // if the message contains a linked list of shared objects then
      // release all objects except the header which is hardwired in
      // and unlink the objects from the list as you go
      obj = first()->s_.next_;
      first()->s_.next_ = NULL;
      while (obj != NULL)
	{
	  next = obj->s_.next_;
	  obj->s_.next_ = NULL;
	  obj->decrRefCount();
	  obj = next;
	}
    }

  objectsInBuffer_ = FALSE;
  current_         = NULL;
  tail_            = first();

  state_ = EMPTY;
}

void IpcMessageStream::addRecipient(IpcConnection *recipient)
{
#ifdef IPC_INTEGRITY_CHECKING
//  checkIntegrity();  don't check here since msg. will be legitimately orphaned
#endif

  recipients_ += recipient->getId();

#ifdef IPC_INTEGRITY_CHECKING
//  checkIntegrity(); don't check here since msg. will be legitimately orphaned
#endif
}

void IpcMessageStream::addRecipients(const IpcSetOfConnections &/*recipients*/)
{
  ABORT("not implemented yet");
  // recipients_ += recipients;
}

void IpcMessageStream::deleteRecipient(IpcConnection *recipient)
{
#ifdef IPC_INTEGRITY_CHECKING
  checkIntegrity();
#endif

  recipients_ -= recipient->getId();

#ifdef IPC_INTEGRITY_CHECKING
  checkIntegrity();
#endif
}

void IpcMessageStream::deleteAllRecipients()
{
  recipients_.clear();
}

void IpcMessageStream::giveMessageTo(IpcMessageStream &other,
				     IpcConnection *connection)
{
  // ---------------------------------------------------------------------
  // This method takes another message stream, implicitly performs a
  // receive() call on it, connects it to the connection, and passes
  // the current message of "this" message stream on to the "other"
  // message stream, as if it had been received by the connection.
  // IpcMessageStream::giveMessageTo() can therefore be used by
  // "router" message streams that listen to a connection that is shared
  // among multiple message streams, check the contents of the message,
  // and then dispatch the message to the appropriate message stream.
  // Only the "router" message stream should call receive() explicitly.
  // ---------------------------------------------------------------------
  assert(connection);

  // the other message must be in a state that allows receiving
  assert(other.getState() == EMPTY OR
	 other.getState() == SENT);

  // get the message buffer that we want to give to the other message stream
  IpcMessageBuffer *bufferToMove = msgBuffer_;

  // detach myself from the message buffer (NOTE: somebody may have already
  // looked at the message contents, so my pointers may point to the
  // message buffer)
  msgBuffer_       = NULL;
  objectsInBuffer_ = FALSE;
  current_         = NULL;
  tail_            = first();

  // this is now an empty message, ready for either send or receive
  state_ = EMPTY;

  // set the other's recipients and active IOs to the connection
  IpcConnectionId id = connection->getId();
  other.recipients_ += id;
  other.activeIOs_ += id;

  // call the callback function for the other message stream,
  // just as a connection would do
  bufferToMove->addCallback(&other);
  bufferToMove->callReceiveCallback(connection);
}


///////////////////////////////////////////////////////////////////////////////
// give receive message to class IpcBufferedMsgStream
void IpcMessageStream::giveReceiveMsgTo(IpcBufferedMsgStream& msgStream)
  {
  msgStream.addInputBuffer(msgBuffer_);
    
  msgBuffer_       = NULL;
  objectsInBuffer_ = FALSE;
  current_         = NULL;
  tail_            = first();
  state_ = EMPTY;
  
  msgStream.actOnReceive(NULL);   // trigger receiving message stream
  }

// Uncomment the next line to trace IpcMessageObj.  Works better if LOG_IPC
// is also defined in IpcGuardian.cpp
// #define LOG_IPC_MSG_OBJ

void IpcMessageStream::send(NABoolean waited, Int64 transid)
{
#ifdef IPC_INTEGRITY_CHECKING
  checkIntegrity();
#endif

  MXTRC_FUNC("IpcMessageStream::send");
  IpcMessageObjSize   thisObjLength;
  IpcMessageObj *obj;

  // check message state
  assert(state_ == COMPOSING);

  if (shareObjects_)
    {
      // -----------------------------------------------------------------
      // Objects are shared between caller and IPC layer. Therefore they
      // were not copied by operator <<. Now that we know all of them,
      // copy them into the message buffer.
      // -----------------------------------------------------------------

      // -----------------------------------------------------------------
      // Compute the total length of the message.
      // ---------------------------------------------------------------------

      // loop through all objects and ask them how much space they need

      h_.totalLength_ = 0;
      obj = first();
      while (obj != NULL)
	{
	  // ask the object how long it will be when packed into the buffer
	  thisObjLength = obj->packedLength();
	  
	  // we know that the object's header uses at least some space
	  assert(thisObjLength >= sizeof(IpcMessageObj));
	  
	  // store that info in the object's length field (we check later
	  // when the packing is done whether the method lied or not)
	  obj->s_.objLength_ = thisObjLength;
	  
	  // find the offset of the next potential object by aligning the
	  // address and add the needed filler space to the object length
	  // NOTE: we even align the end of the message
	  h_.totalLength_ += thisObjLength;
	  IpcMessageBuffer::alignOffset(h_.totalLength_);
	  
	  // next, please
	  obj = obj->s_.next_;
	}

      // -----------------------------------------------------------------
      // allocate an empty message buffer, if the user specified a fixed
      // length buffer size then use that (allows replies via REPLYX to
      // have maxBufferLen_ bytes without switching to the multi-chunk
      // protocol)
      // -----------------------------------------------------------------
      allocateMessageBuffer(MAXOF(h_.totalLength_,fixedBufLen_));

      // -----------------------------------------------------------------
      // pack the message object into a buffer
      // -----------------------------------------------------------------
      IpcMessageObjSize msgDataLen = 0;
      IpcMessageObj *bufObj = (IpcMessageObj *) msgBuffer_->data();
      IpcMessageObj *nextBufObj = NULL;

      obj = first();
      while (obj != NULL)
	{
#ifdef LOG_IPC_MSG_OBJ
          cerr << obj->getType() << ", ";
#endif          
	  MXTRC_1("type=%d\n", obj->getType());
	  // let the user code pack the actual object into the buffer
	  thisObjLength = obj->packObjIntoMessage(
	       (IpcMessageBufferPtr) bufObj);
	  msgDataLen += thisObjLength;
	  
	  // now massage some of the header fields of the object in
	  // the buffer:
	  //
	  // - make sure the object length is the one we previously
	  //   calculated by calling obj->packedLength();
	  // - make sure we correctly set the obj. length in the message
	  // - wipe out the virtual function pointer
	  // - set the refcount of the object in the message to 1
	  // - find the end of the object by aligning the end pointer
	  // - set the next_ pointer to be a relative offset to the
	  //   next message
	  //
	  assert(obj->s_.objLength_ == thisObjLength);
#ifdef NA_BIG_ENDIAN
      // NOTE: byte length *may* have been converted to big-endian format
	  //       which means that s_.objLength_ is in the wrong format
      assert(obj->s_.objLength_ == bufObj->s_.objLength_);
#endif
	  bufObj->setMyVPtr(NULL);
	  bufObj->s_.refCount_ = 1;

	  // NOTE: see also IpcMessageObj::packDependentObjIntoMessage()
	  // when making changes to the above code

	  // advance the buffer pointer to the next place where we could
	  // put an object (the next object wants to sit on an 8 byte boundary)
	  // NOTE: we don't increase bufObj->s_.objLength_ when we align.
	  IpcMessageBuffer::alignOffset(msgDataLen);

	  // take care of the object's next_ pointer that is now embedded
	  // in the message buffer
	  if (obj->s_.next_ != NULL)
	    {
	      nextBufObj = (IpcMessageObj *) msgBuffer_->data(msgDataLen);
	      // store the place where the next object will be located
	      bufObj->s_.next_ = nextBufObj;
	      // now convert it to an offset relative to the start of obj
	      bufObj->convertNextToOffset();
	    }
	  else
	    {
	      nextBufObj = NULL;
	    }

	  // dissolve the linked list that hooked the objects together
	  IpcMessageObj *next = obj->s_.next_;
	  obj->s_.next_ = NULL;

	  // the object got copied into the message, decrement its
	  // reference count except for the first object, which is
	  // embedded in the message object
	  if (obj != first())
	    obj->decrRefCount();
 
          // Allow testing of corrupt message handling on the other side.
	  if (corruptMessage_ && (next == NULL))
            bufObj->s_.refCount_ = 666;

          // advance to the next object
	  obj = next;
	  bufObj = nextBufObj;
	}
  
      assert(h_.totalLength_ == msgDataLen);
      msgBuffer_->setMessageLength(msgDataLen);

#ifdef LOG_IPC_MSG_OBJ
      if (msgDataLen)   // i.e., if any objs
        cerr << endl;
#endif

    } // shared objects
  else
    {
      // -----------------------------------------------------------------
      // Objects were not shared between caller and IPC layer and are
      // therefore already copied into the message buffer.
      // -----------------------------------------------------------------

      // the header is not packed into the buffer yet but there is
      // space allocated for it at the beginning of the buffer
      first()->packObjIntoMessage(msgBuffer_->data());
      msgBuffer_->setMessageLength(h_.totalLength_);
      msgBuffer_->addCallback(this);
    } // objects are not shared

  // ---------------------------------------------------------------------
  // now send the message buffer to each of the connections specified
  // setup transid in message buffer.
  // ---------------------------------------------------------------------
  state_ = SENDING;

  msgBuffer_->setTransid (transid);

  // can't have a previous operation uncompleted, since that would
  // confuse the activeIOs_ set.
  assert(activeIOs_.entries() == 0);
  assert(numOfSendCallbacks_ == 0);

  // Determine the total number of IOs first, since some of the sends may
  // complete immediately and we don't want a situation where a callback
  // is called with no outstanding IOs while we aren't done yet with
  // our send loop below. Also cache the pointer to the message buffer,
  // since it may be modified by callbacks that initiate a receive operation.
  IpcSetOfConnections sendConnections(recipients_);
  IpcMessageBuffer *msgBuffer = msgBuffer_;
  CollIndex numRecipients = sendConnections.entries();

  activeIOs_ += sendConnections;
  msgBuffer_ = NULL;

  // By calling IpcConnection::send() we give up the message buffer.
  // So, in order to do multiple sends, get enough ref counts before
  // starting - i.e., one extra ref count for every connection beyond the first
  // one. Note that this helps the connections to find out whether
  // they have exclusive access to the message buffer or not. If we would
  // let each connection increment its own refcount this wouldn't work.
  for (CollIndex extraRecipients = 1;
       extraRecipients < numRecipients;
       extraRecipients++)
    msgBuffer->incrRefCount();

  // ---------------------------------------------------------------------
  // The actual IpcConnection::send() call(s)
  // ---------------------------------------------------------------------
  for (IpcConnectionId i = 0; sendConnections.setToNext(i); i++)
    sendConnections.element(i)->send(msgBuffer);

  // if the send is in wait mode, then we must wait until the send completes
  // on all connections. if the send is no-wait, then don't wait here but
  // simply return, and the caller shall issue wait on all connections.
  // otherwise we could see the stack piles up quickly due to recursive
  // send/receive calls.
  if (waited)
    {
      NABoolean interruptRecvd = waitOnMsgStream(IpcInfiniteTimeout);
      if (interruptRecvd)
        state_ = BREAK_RECEIVED;
    }

#ifdef IPC_INTEGRITY_CHECKING
  checkIntegrity();
#endif
}

void IpcMessageStream::receive(NABoolean waited)
{
#ifdef IPC_INTEGRITY_CHECKING
  checkIntegrity();
#endif

  MXTRC_FUNC("IpcMessageStream::receive");

  // check state
  assert(state_ == EMPTY OR state_ == SENT
         OR state_ == ERROR_STATE);  // After error, succesfully sent messages 
                               // must have their receive callback registered.
                               // See code in ExMasterEspMessage::actOnSend
                               // which ensures that recipients_ contains only
                               // connections of successfully sent messages
                               // when this method is called.

  // reset errors
  errorInfo_ = 0;

  // for now, can't call receive when there are I/Os outstanding
  assert(activeIOs_.entries() == 0);

  // ---------------------------------------------------------------------
  // initiate a receive operation for connection <from> or for all
  // associated connections if <from> is NULL
  // ---------------------------------------------------------------------

  // determine outstanding IOs first, since some of the receives may
  // complete immediately and we don't want a situation where a callback
  // is called with no outstanding IOs while we aren't done yet with
  // our for loop below
  IpcSetOfConnections recConnections(recipients_);
  activeIOs_ = recConnections;

  for (IpcConnectionId i = 0; recConnections.setToNext(i); i++)
    {
      state_ = RECEIVING;

      // Be careful: some I/Os may complete immediately which may change
      // the state of this message stream!!!
      // NOTE: the receive callbacks are not supposed to send any data
      // until the last receive has completed, since this would otherwise
      // interfere with this loop.
      recConnections.element(i)->receive(this);
    }

  // if the receive is in wait mode, then we must wait until the receive
  // completes on all connections. if the receive is no-wait, then don't wait
  // here but simply return, and the caller shall issue wait on all
  // connections. otherwise we could see the stack piles up quickly due to
  // recursive send/receive calls.
  if (waited)
    {
      NABoolean interruptRecvd = waitOnMsgStream(IpcInfiniteTimeout);
      if (interruptRecvd)
        state_ = BREAK_RECEIVED;
    }

#ifdef IPC_INTEGRITY_CHECKING
  checkIntegrity();
#endif
}

WaitReturnStatus IpcMessageStream::waitOnMsgStream(IpcTimeout timeout)
{
#ifdef IPC_INTEGRITY_CHECKING
  checkIntegrity();
#endif

  NABoolean interruptRecvd = FALSE;

  interruptRecvd = activeIOs_.waitOnSet(timeout);
 MXTRC_FUNC("IpcMessageStream::wait");

  if (interruptRecvd)
  {
     state_ = BREAK_RECEIVED; 
     return WAIT_INTERRUPT;
  }

  if (timeout == IpcInfiniteTimeout)
    {
      // This means the user wants to wait until all messages in this
      // stream have been sent or have been received, so loop until
      // outstandingIOs_ is empty.
      while (activeIOs_.entries() > 0 &&
	     activeIOs_.moreWaitsAnyConnection())
      {
	interruptRecvd = activeIOs_.waitOnSet(timeout);
        if (interruptRecvd)
        {
           state_ = BREAK_RECEIVED;
           return WAIT_INTERRUPT;
        }
      }
    }

#ifdef IPC_INTEGRITY_CHECKING
  checkIntegrity();
#endif
  return WAIT_OK;
}
 
void IpcMessageStream::actOnSend(IpcConnection *)
{
  // the default callback implementation does nothing
}

void IpcMessageStream::actOnSendAllComplete()
{
  // the default callback implementation does nothing
}

void IpcMessageStream::actOnReceive(IpcConnection *)
{
  // the default callback implementation does nothing
}

void IpcMessageStream::actOnReceiveAllComplete()
{
  // the default callback implementation does nothing
}

void IpcMessageStream::clearMessageBufferContents()
{
  // if a message buffer exists and has data in it,
  // it must contain objects with a reference count of 1 only
  // (the "1" count is the use of the object in the message buffer
  // and means that we can delete the buffer)
  if (objectsInBuffer_)
    {
      IpcMessageObj *obj = first()->s_.next_;

      first()->s_.next_ = NULL;
      while (obj != NULL)
	{
	  assert(obj->getRefCount() == 1);
	  // objects in the message buffer are always using offsets
	  obj = obj->getNextFromOffset();
	}

      // give up on the objects in the message buffer
      objectsInBuffer_ = FALSE;
      tail_            = first();
      current_         = NULL;
      h_.totalLength_  = 0;
    }
}

void IpcMessageStream::allocateMessageBuffer(IpcMessageObjSize len)
{
  IpcMessageBuffer *toDelete = NULL;

  // do we need to delete an old or unusable message buffer
  if (msgBuffer_ != NULL AND
      (len == 0 OR len > msgBuffer_->getBufferLength()))
    {
      clearMessageBufferContents();
      toDelete   = msgBuffer_;
      msgBuffer_ = NULL;
    }

  if (msgBuffer_ == NULL AND len > 0)
    {
      CollHeap *heap = (environment_ ? environment_->getHeap() : NULL);

      // we need to allocate a new message buffer
      msgBuffer_ = IpcMessageBuffer::allocate(len,
					      this,
					      heap,
                                              0);
      if (toDelete != NULL)
	{
	  // move reply tag and max reply len from the old one
	  msgBuffer_->setReplyTag(toDelete->getReplyTag());
	  msgBuffer_->setMaxReplyLength(toDelete->getMaxReplyLength());
	}
    }
  else if (msgBuffer_ != NULL)
    {
      // reuse same message buffer, but change the callback
      msgBuffer_->addCallback(this);
      msgBuffer_->setMessageLength(0);
    }

  if (toDelete != NULL)
    toDelete->decrRefCount();
}

void IpcMessageStream::resizeMessageBuffer(IpcMessageObjSize newMaxLen)
{
  // make sure we have a message buffer of the desired length, but
  // don't mess with the contents
  if (msgBuffer_ != NULL)
    msgBuffer_ = msgBuffer_->resize(environment_,newMaxLen);
  else
    allocateMessageBuffer(newMaxLen);
}

void IpcMessageStream::internalActOnSend(IpcConnection *connection)
{
#ifdef IPC_INTEGRITY_CHECKING
  checkIntegrity();
#endif

  assert(activeIOs_.contains(connection->getId()));

  // stream can only remember the first reported error from connection
  if (state_ != ERROR_STATE && connection->getErrorInfo() != 0)
    {
      state_ = IpcMessageStream::ERROR_STATE;
      errorInfo_ = connection->getErrorInfo();
    }

  actOnSend(connection);

  numOfSendCallbacks_++;
  if (numOfSendCallbacks_ == activeIOs_.entries())
    {
      state_ = SENT;

      activeIOs_.clear();
      numOfSendCallbacks_ = 0;

      actOnSendAllComplete();
    }

#ifdef IPC_INTEGRITY_CHECKING
  checkIntegrity();
#endif
}

void IpcMessageStream::internalActOnReceive(IpcMessageBuffer *buffer,
					    IpcConnection *connection)
{
#ifdef IPC_INTEGRITY_CHECKING
  checkIntegrity();
#endif

  // deallocate the existing message buffer (despite the name of the proc)
  allocateMessageBuffer(0);

  assert(buffer);
  msgBuffer_ = buffer;

  assert(connection);
  IpcConnectionId id = connection->getId();
  assert(activeIOs_.contains(id));
  activeIOs_ -= id;

  if (connection->getErrorInfo() != 0)
    {
      // stream can only remember the first reported error from connection
      if (state_ != ERROR_STATE)
        {
          if (connection->breakReceived())
            state_ = BREAK_RECEIVED;
          else 
            state_ = ERROR_STATE;
          errorInfo_ = connection->getErrorInfo();
        }
    }
  else
    {
      // Set the state to RECEIVED. Even if there are more active IOs,
      // allow the user to retrieve results. Be careful at the next state
      // change: set it to EMPTY if no more receives are expected, set it back
      // to RECEIVING if there are more outstanding IOs.
      state_ = RECEIVED;

      // unpack the message header right away
      InternalMsgHdrInfoStruct *justReceived =
        (InternalMsgHdrInfoStruct *) msgBuffer_->data();

      // make sure the OS gave us a message with the correct length
      if ((msgBuffer_->getMessageLength() >= 
          sizeof(InternalMsgHdrInfoStruct)) &&
          (msgBuffer_->getMessageLength() == 
           justReceived->totalLength_))
        {
          h_.unpackObj(justReceived->getType(),
                   justReceived->getVersion(),
                   TRUE, // we've already taken care of endianness
                   justReceived->s_.objLength_,
                   msgBuffer_->data());
        }
      else 
        {
          connection->dumpAndStopOtherEnd(true, false);
          assert(msgBuffer_->getMessageLength() >= 
                 sizeof(InternalMsgHdrInfoStruct) AND
                 msgBuffer_->getMessageLength() == 
                 justReceived->totalLength_);
        }

      // for now...
      if ((h_.alignment_ != IpcMyAlignment) ||
          (h_.getEndianness() != IpcMyEndianness))
        connection->dumpAndStopOtherEnd(true, false);

      assert(h_.alignment_ == IpcMyAlignment AND
             h_.getEndianness() == IpcMyEndianness);

      // the next pointer in the object's copy of the header is an
      // actual pointer
      h_.s_.next_ = justReceived->getNextFromOffset();

      // indicate that we now have a linked list of objects in
      // the message that are all stored in the message buffer and
      // that all use offsets rather than pointers
      // (Exception: a copy of the first object is outside the buffer and it
      // uses a real pointer to the next object)
      objectsInBuffer_ = TRUE;

      // check refcount that came in the buffer
      assert(h_.s_.refCount_ == 1);
  
      IpcMessageObj *obj = first();
  
      obj = obj->s_.next_;
  
      current_ = obj; // the first non-header object in the message
      tail_ = NULL;   // don't use tail in received messages
  
      // loop over all objects in the message buffer and do
      // some sanity checks on them
      while (obj != NULL)
        {
          bool badMessage = false;
          if (obj->getRefCount() != 1)
            badMessage = true;
          if ((obj->s_.next_ != NULL) &&
	      ((ULong)obj->s_.next_ < obj->s_.objLength_))
            badMessage = true;
          if (obj->getMyVPtr() != NULL)
            badMessage = true;

          if (badMessage)
            connection->reportBadMessage();

          assert(obj->getRefCount() == 1);

          assert(obj->s_.next_ == NULL OR
	          (IpcMessageObjSize )((Long) obj->s_.next_) >= obj->s_.objLength_);

          assert(obj->getMyVPtr() == NULL);
      
          obj = obj->getNextFromOffset();
        }
    }

  // call user's callback
  actOnReceive(connection);

  if (activeIOs_.entries() == 0)
    actOnReceiveAllComplete();

#ifdef IPC_INTEGRITY_CHECKING
  checkIntegrity();
#endif
}

ExMasterEspMessage * IpcMessageStream::castToExMasterEspMessage(void)
  {
  // virtual method giving safe cast to ExMasterEspMessage
  return NULL;
  }

// abort any outstanding I/Os on this stream
void IpcMessageStream::abandonPendingIOs()
  {
    // Part of the fix for soln 10-070108-1544.  The statement that 
    // owns this stream has encountered a fatal error and is prone
    // to deadlock if Statement::releaseTransaction waits forever 
    // for the release trans/work message to complete.  
    // One problem with this fix is that it puts the IpcConnections
    // into the error state, so that the ESPs will be stopped.  And 
    // if multiple statements share the same IpcConnection then the 
    // second statement can inherit the error.

    //
    // soln 10-080625-4107. below is an example:
    // the message stream contains the fixup msg from master to 4 esps. all 4
    // esps have gone away. the fixup msg is multi-chunk (more than 4 chunks
    // in my test). 2 connections (GMCS) finished send successfully, without
    // receiving the ipc error yet due to the timing of the subsequent nowait
    // calls. the send call backs were called on those 2 connections and as a
    // result they were removed from stream's activeIOs_ list.
    //
    // the other 2 connections are still in the middle of multi-chunk send
    // and thus send callbacks have not been invoked on them. note in this
    // case, the stream has only two active I/Os but still has 4 recipient
    // connections.
    //
    // now we come here because the upper layer found dead esps and decides
    // to clean up all I/Os on stream (see Statement::releaseTransaction()).
    // the for loop below iterates thru stream's activeIOs_ list and invokes
    // setFatalError() on the latter 2 connections. as a result all I/Os on
    // those 2 connections will be cleared and they will be removed from the
    // stream's activeIOs_ list. however, because send never finished on those
    // 2 connections, we still need to invoke send callbacks on them during
    // cleanup process (see GuaMsgConnectionToServer::handleIOErrorForEntry()).
    // after we invoke the send callback on the last active connection, there
    // will be no active I/O on the stream and IpcMessageStream::receive()
    // will be invoked (see ExMasterEspMessage::actOnSend()). the receive
    // will add the 2 former connections, who finished send successfully, back
    // to stream's activeIOs_ list. so after the first invocation of the for
    // loop, the stream still has 2 recipient connections and 2 active I/Os.
    //
    // because of the above use case, we must go thru the for loop again and
    // invoke setFatalError() on the 2 former connections that were inactive
    // before the for loop but became active after the first execution of the
    // for loop. therefore I'm adding a while loop outside the for loop to
    // make sure all I/Os on the stream are cleaned up properly.
    //
    while (hasIOPending())
      {
        for (CollIndex i = 0; activeIOs_.setToNext(i); i++)
          {
            IpcConnection * c = activeIOs_.element(i);
            NABoolean useGuaIpc = TRUE;
            if (useGuaIpc)
              {
                if (c->castToGuaConnectionToServer())
                  c->castToGuaConnectionToServer()->setFatalError(this);
              }
          } // for
      } // while
  }

#ifdef IPC_INTEGRITY_CHECKING

// methods that perform integrity checking on Ipc-related data structures

void IpcMessageStream::checkIntegrity(void)
  {
  isOrphaned_ = TRUE;  // assume the worst: this object is an orphan
  environment_->checkIntegrity(); // traverse up to IpcEnvironment, do check
  if ((isOrphaned_) && (castToExMasterEspMessage()))
    {
    cerr << "Found orphaned ExMasterEspMessage object " << (void *)this << "." <<
      endl;
    checkLocalIntegrity();  // didn't get here; check our integrity now
    }
  }

void IpcMessageStream::checkLocalIntegrity(void)
  {
  isOrphaned_ = FALSE;
  activeIOs_.checkLocalIntegrity();
  }

#endif

// ----------------------------------------------------------------------------
// IpcBufferedMsgStream
// ----------------------------------------------------------------------------

///////////////////////////////////////////////////////////////////////////////
// constructor
IpcBufferedMsgStream::IpcBufferedMsgStream(IpcEnvironment *env,
                                           IpcMessageType msgType,
                                           IpcMessageObjVersion version,
                                           Lng32 inUseBufferLimit,
                                           IpcMessageObjSize bufferSize)
  : IpcMessageStreamBase(env),
    msgType_(msgType), 
    msgVersion_(version), 
    bufferSize_(bufferSize),
    inUseBufferLimit_(inUseBufferLimit),
    garbageCollectLimit_(0),
    errorInfo_(0),
    receiveMsgComplete_(FALSE),
    sendMsgBuf_(NULL),
    sendMsgHdr_(NULL),
    sendMsgObj_(NULL),
    receiveMsgBufI_(0),
    receiveMsgBuf_(NULL),
    receiveMsgHdr_(NULL),
    receiveMsgObj_(NULL),
    sendBufList_(env->getHeap()),
    receiveBufList_(env->getHeap()),
    inBufList_(env->getHeap()),
    outBufList_(env->getHeap()),
    inUseBufList_(env->getHeap()),
    replyTagBufList_(env->getHeap()),
    smContinueProtocol_(FALSE)
{
}

IpcBufferedMsgStream::~IpcBufferedMsgStream()
{
  releaseBuffers();

  IpcMessageBuffer *msgBuf = NULL;
  CollIndex i = 0;

  for (i = 0; i < inBufList_.entries(); i++)
  {
    msgBuf = inBufList_[i];
    msgBuf->decrRefCount();
  }

  for (i = 0; i < receiveBufList_.entries(); i++)
  {
    msgBuf = receiveBufList_[i];
    msgBuf->decrRefCount();
  }
}

IpcBufferedMsgStream *
IpcBufferedMsgStream::castToIpcBufferedMsgStream()
{
  return this;
}

///////////////////////////////////////////////////////////////////////////////
// get next receive message from input queue.
NABoolean IpcBufferedMsgStream::getNextReceiveMsg(IpcMessageObjType& msgType)
  {
  if (receiveMsgComplete_)
    {
    if (receiveMsgBufI_ < receiveBufList_.entries())
      { // current receive message not completely unpacked, return type

      msgType = receiveMsgHdr_->getType();
      return (TRUE);
      }
    // release current receive message to inuse pool and advance to next.
    // If a user msg object was unpacked inplace and did not implement the
    // virtual method msgObjIsFree() then its persistence will be guaranteed
    // until now!
    IpcMessageBuffer* msgBuf;
    while (receiveBufList_.getFirst(msgBuf))
      { // move all buffers for current receive message to inuse pool
      inUseBufList_.insert(msgBuf);
      }
    receiveMsgBuf_ = NULL;
    receiveMsgObj_ = receiveMsgHdr_ = NULL;
    receiveMsgComplete_ = FALSE;
    }
  
  if (inBufList_.entries())
    {
    if (receiveBufList_.entries() == 0)
      { // get first buffer of message
      inBufList_.getFirst(receiveMsgBuf_);
      receiveBufList_.insert(receiveMsgBuf_);
      receiveMsgBufI_ = 0;

      // get message header (already unpacked by Guardian IpcConnection)
      receiveMsgHdr_ = (InternalMsgHdrInfoStruct*)(receiveMsgBuf_->data(0)); 
      if (receiveMsgHdr_->isLastMsgBuf())
        { // single buffer message, proceed
        receiveMsgComplete_ = TRUE;
        // unpack 1st user msg object's base class (new should handle NULL)
	IpcMessageObj *msgHeap = receiveMsgHdr_->getNextFromOffset();
	if (!msgHeap)
	  receiveMsgObj_ = NULL;
	else
	  receiveMsgObj_ = new(msgHeap) IpcMessageObj(this);
        if (receiveMsgObj_ == NULL)
        { // recursively call to advance past empty message buffers
          receiveMsgBufI_++;
          
          NABoolean result = getNextReceiveMsg(msgType);
          return result;
        }
        
        msgType = receiveMsgHdr_->getType();
        return (TRUE);
        }
      }

    // Try and extract remaining buffers associated with the same message from
    // the in queue. The buffers must originate from the same connection OR the
    // same local message stream and must have matching message types. A TRUE 
    // return code will be returned only after all buffers comprising the 
    // message have arrived.
    
    for (CollIndex i = 0; i < inBufList_.entries(); )
      { // get additional buffers of multi-buffer message  
      IpcMessageBuffer* msgBuf = inBufList_[i];
      if ((msgBuf->getConnection() == receiveMsgBuf_->getConnection())   &&
          (msgBuf->getMessageStream() == receiveMsgBuf_->getMessageStream()) )
        {
        // get message header (already unpacked by Guardian IpcConnection)
        InternalMsgHdrInfoStruct* msgHdr = (InternalMsgHdrInfoStruct*)(msgBuf->data(0));
        if  (msgHdr->getMsgStreamId() == receiveMsgHdr_->getMsgStreamId())
          {
          assert(msgHdr->getType() == receiveMsgHdr_->getType());
          inBufList_.removeAt(i);
          receiveBufList_.insert(msgBuf);
          if (msgHdr->isLastMsgBuf())
            { // all buffers for this message received, proceed
            // unpack next message object's base class 
            receiveMsgObj_ = 
              new(receiveMsgHdr_->getNextFromOffset()) IpcMessageObj(this);
            assert(receiveMsgObj_);  // should not be NULL if multiple bufs
            receiveMsgComplete_ = TRUE;
            
            msgType = receiveMsgHdr_->getType();
            return (TRUE);
            }
          continue;
          }
        }
      i++;
      }
    }

  return (FALSE);
  }

///////////////////////////////////////////////////////////////////////////////
// get next message object type from current receive message.
NABoolean IpcBufferedMsgStream::getNextObjType(IpcMessageObjType& msgType)
  {
  if (receiveMsgObj_) 
    {
    msgType = receiveMsgObj_->getType();

    return (TRUE);
    }
  
  return (FALSE);
  }

///////////////////////////////////////////////////////////////////////////////
// get next message object size from current receive message.
IpcMessageObjSize IpcBufferedMsgStream::getNextObjSize() const
{
  return (receiveMsgObj_ ? receiveMsgObj_->s_.objLength_ : 0);
}

///////////////////////////////////////////////////////////////////////////////
// get a pointer to the next packed object in the current receive message.
IpcMessageObj* IpcBufferedMsgStream::receiveMsgObj()
  {
  if (receiveMsgObj_ == NULL)
     return (NULL);   // no more receive message objects
 
  IpcMessageObj* returnMsgObj_ = receiveMsgObj_;
  
  // the base class portion of the next packed object (IpcMessageObj) is
  // unpacked inplace now, before the derived class constructor is called
  // to allow it to be used internally by IpcBufferedMsgStream. The message
  // object will behave as a base class object until the derived class
  // constructor completes (new should handle NULL).
  IpcMessageObj *msgHeap = returnMsgObj_->getNextFromOffset();
  if (!msgHeap)
    receiveMsgObj_ = NULL;
  else 
    receiveMsgObj_ = new(msgHeap) IpcMessageObj(this);
  
  while (receiveMsgObj_ == NULL)
    { // check next receive message buffer
    receiveMsgBufI_++;
    if (receiveMsgBufI_ >= receiveBufList_.entries())
      { // done with this message
      receiveMsgBuf_ = NULL;
      receiveMsgObj_ = receiveMsgHdr_ = NULL;

      break;
      }

    receiveMsgBuf_ = receiveBufList_[receiveMsgBufI_];
    // msg header already unpacked by Guardian IpcConnection
    receiveMsgHdr_ = (InternalMsgHdrInfoStruct*)(receiveMsgBuf_->data(0));
    // unpack next message object's base class (new should handle NULL).
    IpcMessageObj *msgHeap = receiveMsgHdr_->getNextFromOffset();
    if (!msgHeap)
      receiveMsgObj_ = NULL;
    else
      receiveMsgObj_ = new(msgHeap) IpcMessageObj(this);
    }
  
  return (returnMsgObj_);
  }

///////////////////////////////////////////////////////////////////////////////
// give current receive message to a peer message stream for processing.
void IpcBufferedMsgStream::giveReceiveMsgTo(IpcBufferedMsgStream& msgStream)
{
  assert(receiveMsgComplete_);
  
  IpcMessageBuffer* msgBuf;
  while (receiveBufList_.getFirst(msgBuf))
  {
    if (msgBuf->getReplyTag() != GuaInvalidReplyTag)
    {
      replyTagBufList_.remove(msgBuf);
      msgBuf->decrRefCount(); // no longer in two lists!
    }
    msgStream.addInputBuffer(msgBuf);
  }

  receiveMsgBuf_ = NULL;
  receiveMsgObj_ = receiveMsgHdr_ = NULL;
  receiveMsgComplete_ = FALSE;
  
  msgStream.actOnReceive(NULL);   // trigger receiving message stream
}

///////////////////////////////////////////////////////////////////////////////
// pack an object in the current send message
IpcBufferedMsgStream& IpcBufferedMsgStream::operator << (IpcMessageObj& obj)
  {
  IpcMessageObjSize packedLength = obj.packedLength();
  IpcMessageObj* packedObj = 
    new(*this, packedLength) IpcMessageObj(obj.getType(), obj.getVersion());

  // IpcBufferedMsgStream does NOT allow the object to pack the base class.
  // This is because we want to ensure that the packed object behaves as a
  // generic IpcMessageObj with NO virtual behavior. The basic IpcMessageObj
  // is constructed in the buffer via the new operator above.
  IpcMessageObjSize packedBytes = 
    obj.packObjIntoMessage((IpcMessageBufferPtr)(&packedObj[1]));
      
  // Did the object really use the length it promised to use?
  assert(packedLength == packedBytes);
  
  return *this;
  }

///////////////////////////////////////////////////////////////////////////////
// unpack the next object in the current receive message
NABoolean IpcBufferedMsgStream::extractNextObj(IpcMessageObj &obj,
                                               NABoolean checkObjects)
{
  IpcMessageObj* packedObj = receiveMsgObj();

  // the user should have predicted the object type and version correctly
  assert(packedObj  AND 
         packedObj->getType() == obj.getType()  AND
         packedObj->getVersion() == obj.getVersion());

  // IpcBufferedMsgStream does NOT allow the object to unpack the base class.
  // This is because the packed object behaves as a generic IpcMessageObj and
  // the destination object wishes to retain its virtual behavior.
  IpcMessageObjSize objLen = packedObj->s_.objLength_ - sizeof(IpcMessageObj);

  // If the caller requested an integrity check on the packed object
  // then we call a checkObj() method before unpackObj().
  NABoolean result = TRUE;
  if (checkObjects)
  {
    result = obj.checkObj(packedObj->getType(),
                          packedObj->getVersion(),
                          TRUE,
                          objLen,
                          (IpcConstMessageBufferPtr)(&packedObj[1]));
  }
  
  if (result)
  {
    obj.unpackObj(packedObj->getType(),
                  packedObj->getVersion(),
                  TRUE,
                  objLen,
                  (IpcConstMessageBufferPtr)(&packedObj[1]));
  }

  return result;
}

///////////////////////////////////////////////////////////////////////////////
// allocate space for a packed object in the current send message.
IpcMessageObj* IpcBufferedMsgStream::sendMsgObj(IpcMessageObjSize packedObjLen)
{
  IpcMessageBuffer::alignOffset(packedObjLen);

  if (sendMsgBuf_ == NULL)
    {
    IpcMessageObjSize length = 
      (packedObjLen > bufferSize_ ? packedObjLen : bufferSize_); 
    length += sizeof(InternalMsgHdrInfoStruct);

    CollHeap *heap = (environment_ ? environment_->getHeap() : NULL);
    
    sendMsgBuf_ = IpcMessageBuffer::allocate(length,
                                             this,
                                             heap,
                                             0);
    
    if (sendMsgBuf_ == NULL)
      return NULL;   // no buffers available

    sendBufList_.insert(sendMsgBuf_);
    // Create message header
    // Recursively calls sendMsgObj() via operator new
    sendMsgHdr_ = new(*this, 0) InternalMsgHdrInfoStruct(this->msgType_,
                                                        this->msgVersion_);
    sendMsgHdr_->totalLength_ = sizeof(InternalMsgHdrInfoStruct);
    }

  if (packedObjLen == 0)
    return (NULL); // internal call to create an empty buffer, hdr only

  if (sendMsgHdr_ == NULL)
    { // first object must be header
    sendMsgObj_ = sendMsgHdr_ = 
                       (InternalMsgHdrInfoStruct*)(sendMsgBuf_->data(0));
    }
  else
  {
    IpcMessageObjSize newLen = sendMsgHdr_->totalLength_ + packedObjLen;
    if (newLen > sendMsgBuf_->getBufferLength())
    {
      sendMsgBuf_ = NULL;
      sendMsgObj_ = sendMsgHdr_ = NULL;

      return (sendMsgObj(packedObjLen)); //recursively call for new buffer
    }

    // link to previous last object (must coexist with IpcMessageStream!)
    // IpcBufferedMsgStream only uses "next_" as an offset although it is 
    // defined as a pointer. IpcMessageStream uses next as both pointer and
    // offset. The objLength_ field gets set later, in prepSendMsgForOutput()
    sendMsgObj_->s_.next_ =
      (IpcMessageObj*)(sendMsgBuf_->data(sendMsgHdr_->totalLength_));
    sendMsgObj_->convertNextToOffset();
    sendMsgObj_ =
      (IpcMessageObj*)(sendMsgBuf_->data(sendMsgHdr_->totalLength_));
    sendMsgHdr_->totalLength_ = newLen;
    
  }
  
  return (sendMsgObj_);
}

IpcConnection* IpcBufferedMsgStream::getConnection()
{
  ABORT("IpcMessageStreamBase::getConnection() should not be called!");
  return NULL;
}

///////////////////////////////////////////////////////////////////////////////
// cleanup unpacked message buffers with objects no longer inuse
void IpcBufferedMsgStream::cleanupBuffers()
{
  if (inUseBufList_.entries() <= (CollIndex) garbageCollectLimit_) 
    return;
  
  CollIndex i = 0;
  while (i < inUseBufList_.entries())
  {  
    IpcMessageBuffer* msgBuf = inUseBufList_[i];
    InternalMsgHdrInfoStruct* msgHdr = 
      (InternalMsgHdrInfoStruct*)(msgBuf->data(0));
    IpcMessageObj* msgObj = msgHdr;
    IpcMessageObj* nextMsgObj;
    while (nextMsgObj = msgObj->getNextFromOffset())
    { 
      if (!(nextMsgObj->msgObjIsFree()))
        break;  // quit, no need to check remaining objects in buffer
      
      msgObj->mergeNextPackedObj(); // unlink msg objects no longer in use
    }
    
    if (msgHdr->getNextFromOffset() == NULL)
    { // all objects are no longer inuse, free buffer
      inUseBufList_.removeAt(i);
      msgBuf->decrRefCount();
    }
    else
    { // msg objects in buffer still in use
      i++;
      while (!(msgHdr->isLastMsgBuf()))
      { // skip over remaining buffers for this message.
        // must guarantee that a complex object consisting of multiple message
        // objects or a set of objects within a message can be held inuse by 
        // the root(first allocated) object!
        assert(i < inUseBufList_.entries());
        msgBuf = inUseBufList_[i];
        msgHdr = (InternalMsgHdrInfoStruct*)(msgBuf->data(0));
        i++;
      }
    }
  }
  if (inUseBufList_.entries() < (CollIndex) inUseBufferLimit_)
  { // optimize garbage collection limit until inuse limit reached
    garbageCollectLimit_ = inUseBufList_.entries(); 
  }
  
}

////////////////////////////////////////////////////////////////////////////
// prepare send message objects for output and put buffers in output queue.
void IpcBufferedMsgStream::prepSendMsgForOutput()
{
  while (sendBufList_.getFirst(sendMsgBuf_))
  {
    sendMsgHdr_ = (InternalMsgHdrInfoStruct*)(sendMsgBuf_->data(0));
    sendMsgBuf_->setMessageLength(sendMsgHdr_->totalLength_);
    MXTRC_1("totalLength_=%d \n", sendMsgHdr_->totalLength_);
    if (sendBufList_.entries() == 0)
      sendMsgHdr_->setLastMsgBuf();
    sendMsgHdr_->setMsgStreamId((Long)this);
    sendMsgObj_ = sendMsgHdr_;
    do
      {
      // calculate object length, note that this is done only to be
      // compatible with IpcMessageStream and that this object length
      // is rounded up to the alignment that was done internally
      if (sendMsgObj_->s_.next_)
        sendMsgObj_->s_.objLength_ =
	  (IpcMessageObjSize)(Long)((sendMsgObj_->s_.next_));
      else
        sendMsgObj_->s_.objLength_ =
          (IpcMessageObjSize)(sendMsgBuf_->data(0)
			      + sendMsgHdr_->totalLength_
			      - (IpcMessageBufferPtr) sendMsgObj_);
      MXTRC_3("objType_=%d objLength_=%d next_=%d\n", sendMsgObj_->s_.objType_, sendMsgObj_->s_.objLength_, sendMsgObj_->s_.next_);
      sendMsgObj_->prepMsgObjForSend();
      sendMsgObj_->setMyVPtr(NULL);
      }
    while ((sendMsgObj_ = sendMsgObj_->getNextFromOffset()) != NULL);

    outBufList_.insert(sendMsgBuf_);
  }
  
  sendMsgBuf_ = NULL;
  sendMsgObj_ = sendMsgHdr_ = NULL;
}

////////////////////////////////////////////////////////////////////////////
// add a message buffer to the input queue.
void IpcBufferedMsgStream::addInputBuffer(IpcMessageBuffer* inputBuf)
{
  inBufList_.insert(inputBuf); 
  if (inputBuf->getReplyTag() != GuaInvalidReplyTag)
  {
    if (getSMContinueProtocol())
      EXSM_TRACE(EXSM_TRACE_PROTOCOL, "STREAM %p add input buf %p", this,
                 inputBuf);

    inputBuf->incrRefCount(); // buffer in 2 lists, incr to hold!

    // save reply tag, needed for response
    replyTagBufList_.insert(inputBuf);
  }
}

///////////////////////////////////////////////////////////////////////////////
// get next message buffer from output queue matched with next reply tag.
IpcMessageBuffer* IpcServerMsgStream::getReplyTagOutputBuffer(
                                            IpcConnection*& connection,
                                            IpcBufferedMsgStream*& msgStream)
{
  MXTRC_FUNC("IpcServerMsgStream::getReplyTagOutputBuffer");

  if (numOfReplyTagBuffers())
  {
    IpcMessageBuffer* outputBuf = getOutputBuffer();
    if (outputBuf)
    {
      IpcMessageBuffer* replyTagBuf = replyTagBufList_[0];
      
      outputBuf->setReplyTag(replyTagBuf->getReplyTag());
      outputBuf->setMaxReplyLength(replyTagBuf->getMaxReplyLength());
      connection = replyTagBuf->getConnection();
      MXTRC_2("replyTag=%d connection=%x", replyTagBuf->getReplyTag(), connection);
      msgStream = (IpcBufferedMsgStream*)(replyTagBuf->getMessageStream());

      NABoolean deleteReplyTag = FALSE;

      // For the SeaMonster continue protocol the communication
      // between client and server is not one to one. That is client
      // does not send a continue message for every reply from the
      // server and the server does not send only one reply for every
      // request.  For every request the server receives, the server
      // replies with the number of buffers set by
      // sendBufferLimit_. The server sets the LAST IN BATCH flag for
      // the last buffer in a batch and waits for a continue message
      // from the client before it sends more replies.
      //
      // The way a stream controls if more buffers can be sent to the
      // client is through replyTagBufList_. If there are any entries
      // in replyTagBufList_ then the server stream can send messages
      // to the client and if there are no entries in replyTagBufList_
      // then the server stream cannot send any more messages until
      // the client sends a continue request and that gets added to
      // replyTagBufList_.
      //
      // So for SeaMonster continue protocol just after sending one
      // reply server cannot delete the request from replyTagBufList_
      // and needs to wait until the LAST IN BATCH reply or the EOD
      // reply is sent. The following changes make sure that for
      // SeaMonster continue protocol we do not delete the request
      // from replyTagBufList_ except for the LAST IN BATCH or the EOD
      // reply.
      if (getSMContinueProtocol())
      {
        buffersSentInBatch_++;

        InternalMsgHdrInfoStruct *hdr = 
                        (InternalMsgHdrInfoStruct *)outputBuf->data(0);

        // If this buffer is the last in batch to be sent then mark the buffer
        // as LAST IN BATCH and remember to delete the buffer from reply tag
        // If this buffer is not last in batch then check if LAST IN BATCH 
        // was already set by the TCB since it reached EOD, if so remember
        // to delete buffer from reply tag
        if (buffersSentInBatch_ == sendBufferLimit_)
        {
          EXSM_TRACE(EXSM_TRACE_PROTOCOL, 
                     "STREAM %p sent %d limit %d sending last in batch",
                     this, buffersSentInBatch_, sendBufferLimit_);
          hdr->setSMLastInBatch();
   
          deleteReplyTag = TRUE;
        }
        else if (hdr->getSMLastInBatch())
          deleteReplyTag = TRUE;
      }
      else // regular one to one continue protocol case
        deleteReplyTag = TRUE;
   
      if (deleteReplyTag)
      {
        buffersSentInBatch_ = 0;
        replyTagBuf->setReplyTag(GuaInvalidReplyTag); // invalidate reply tag
        replyTagBuf->decrRefCount(); // free if not in another list
        replyTagBufList_.remove(replyTagBuf);
      }

      return outputBuf;

    } // if (outputBuf)
  } // if (numOfReplyTagBuffers())

  return NULL;
}

///////////////////////////////////////////////////////////////////////////////  
// internal send call back may be redefined by derived classes.
void IpcBufferedMsgStream::internalActOnSend(IpcConnection* connection)
  {
  if (connection && !errorInfo_)
    {
    errorInfo_ = connection->getErrorInfo();
    }
  actOnSend(connection);
  }
                 
///////////////////////////////////////////////////////////////////////////////  
// internal receive call back may be redefined by derived classes.
void IpcBufferedMsgStream::internalActOnReceive(IpcMessageBuffer* buffer,
                                                IpcConnection* connection)
  {
  if (connection && !errorInfo_)
    {
    errorInfo_ = connection->getErrorInfo();
    }
  if (buffer)
    {
    addInputBuffer(buffer);
    }
  actOnReceive(connection);
  }

void IpcBufferedMsgStream::actOnSendAllComplete()
{
  // the default callback implementation does nothing
}

void IpcBufferedMsgStream::actOnReceiveAllComplete()
{
  // the default callback implementation does nothing
}

// ----------------------------------------------------------------------------
// IpcClientMsgStream
// ----------------------------------------------------------------------------

///////////////////////////////////////////////////////////////////////////////
// constructor
IpcClientMsgStream::IpcClientMsgStream(IpcEnvironment *env,
                                       IpcMessageType msgType,
                                       IpcMessageObjVersion version,
                                       Lng32 sendBufferLimit,
                                       Lng32 inUseBufferLimit,
                                       IpcMessageObjSize bufferSize)
  : IpcBufferedMsgStream(env, msgType, version, inUseBufferLimit, bufferSize),
    sendBufferLimit_(sendBufferLimit),
    responsesPending_(0),
    recipients_(env->getAllConnections(),env->getHeap()),
    localRecipients_(env->getHeap()),
    localReplyTag_(0),
    smBatchIsComplete_(FALSE)
{ }

///////////////////////////////////////////////////////////////////////////////
// broadcast the current send message to all recipients
void IpcClientMsgStream::sendRequest(Int64 transid)
  {
  MXTRC_FUNC("IpcClientMsgStream::sendRequest");
  prepSendMsgForOutput();

  while (numOfOutputBuffers())
    {
    IpcMessageBuffer* msgBuf;
    CollIndex numRecipients = 
                          recipients_.entries() + localRecipients_.entries();

    for (IpcConnectionId i = 0; recipients_.setToNext(i); i++)
      { // send output message to all remote connections
      numRecipients--;
      msgBuf = numRecipients ? copyOutputBuffer() : getOutputBuffer(); 

      // store transid in msgbuf
      msgBuf->setTransid (transid);

      responsesPending_++;

      if (getSMContinueProtocol())
        EXSM_TRACE(EXSM_TRACE_PROTOCOL, "STREAM %p rp is now %d", this,
                   (int) responsesPending_);
      
      IpcConnection *conn = recipients_.element(i);
      conn->send(msgBuf);
    }

    assert(numRecipients == localRecipients_.entries()); 

    for (CollIndex j = 0; j < localRecipients_.entries(); j++)
      { // send output message to all local server msg streams
      IpcBufferedMsgStream* msgStream = localRecipients_[j];
      numRecipients--;
      msgBuf = numRecipients ? copyOutputBuffer() : getOutputBuffer(); 
      responsesPending_++;
      msgBuf->setReplyTag(getLocalReplyTag());
      msgStream->internalActOnReceive(msgBuf, NULL);
      }
    assert(numRecipients == 0);
    }

  // commenting out the following code as this is causing problems for SM,
  // removing this code does not cause problems for Seabed messages as this
  // is mostly noop for seabed messages
  //recipients_.waitOnSet(IpcImmediately);
  }

// abort any outstanding I/Os on this stream
void IpcClientMsgStream::abandonPendingIOs()
{
  while (numOfResponsesPending() > 0)
    {
      for (IpcConnectionId i = 0; recipients_.setToNext(i); i++)
        {
          IpcConnection * c = recipients_.element(i);
          NABoolean useGuaIpc = TRUE;
          if (useGuaIpc)
            {
              if (c->castToGuaConnectionToServer())
                c->castToGuaConnectionToServer()->setFatalError(this);
            }
        } // for
    } // while
}

///////////////////////////////////////////////////////////////////////////////
// internal receive call back 
void IpcClientMsgStream::internalActOnReceive(IpcMessageBuffer* buffer,
                                              IpcConnection* connection)
{
  MXTRC_FUNC("IpcClientMsgStream::internalActOnReceive");

  // Note: It is possible for this function to be called with a NULL
  // buffer pointer under certain error conditions.

  if (buffer)
    buffer->setReplyTag(GuaInvalidReplyTag); // invalidate so buff can cleanup

  // Cases to consider
  // (a) buffer is NULL (which indicates an error condition)
  // (b) stream is NOT following seamonster continue protocol
  // (c) stream IS following seamonster continue protocol

  if (!getSMContinueProtocol() || !buffer)
  {
    // Cases (a) and (b)
    if (responsesPending_ > 0)
      responsesPending_--;
    else
    {
      connection->dumpAndStopOtherEnd(true, false);
      assert(responsesPending_ > 0);
    }
  }
  else
  {
    // Case (c)
    InternalMsgHdrInfoStruct *hdr =
      (InternalMsgHdrInfoStruct *) buffer->data(0);
    assert(hdr);

    if (hdr->getSMLastInBatch())
    {
      // Remember that the LAST IN BATCH buffer is received so later
      // in actOnReceive() we can decrement the statement globals
      // message counter. This saved this information in a stream
      // data member since the actOnReceive() method does not have a
      // buffer pointer to check the actual LAST IN BATCH flag.
      smBatchIsComplete_ = TRUE;
      
      responsesPending_--;
      EXSM_TRACE(EXSM_TRACE_PROTOCOL, "STREAM %p rp is now %d", this,
                 (int) responsesPending_);

      IpcConnection *conn = connection->castToSMConnection();
      assert(conn);
      ((SMConnection *)conn)->decrOutstandingSMRequests();
    }
  }

  // Call the parent class internalActOnReceive() function. This will
  // invoke the child class actOnReceive() virtual method.
  IpcBufferedMsgStream::internalActOnReceive(buffer, connection);

  if (getSMContinueProtocol())
  {
    // After receive callback processing is complete in the parent and
    // child classes, we can reset the batch complete flag
    smBatchIsComplete_ = FALSE;
  }
}

///////////////////////////////////////////////////////////////////////////////
// internal send call back
void IpcClientMsgStream::internalActOnSend(IpcConnection* connection)
{
  // We'll have to initiate receive even if connection is in error. This
  // is because the stream needs to do bookkeeping and other work only
  // when the receive callback is called. As result, decrementing
  // responsesPending_ becomes unnecessary. Please ignore next comment

  // If an error occurred, we will decrement responsesPending_ before
  // calling the superclass implementation of this method. This way
  // when the superclass method issues send callbacks, those callbacks
  // can correctly determine whether responsesPending_ has dropped to
  // zero following an IPC error.

  // But before making any judgements about whether an error occurred,
  // first make sure this stream's errorInfo_ field is updated with
  // any error information from the connection.
  if (connection && !errorInfo_)
    errorInfo_ = connection->getErrorInfo();

  IpcBufferedMsgStream::internalActOnSend(connection);
  if (connection)
  {
    connection->receive(this);
  }
}

// ----------------------------------------------------------------------------
// IpcServerMsgStream
// ----------------------------------------------------------------------------

///////////////////////////////////////////////////////////////////////////////
// constructor
IpcServerMsgStream::IpcServerMsgStream(IpcEnvironment *env,
                                       IpcMessageType msgType,
                                       IpcMessageObjVersion version,
                                       Lng32 sendBufferLimit,
                                       Lng32 inUseBufferLimit,
                                       IpcMessageObjSize bufferSize)
  : IpcBufferedMsgStream(env, msgType, version, inUseBufferLimit, bufferSize),
    sendBufferLimit_(sendBufferLimit),
    client_(NULL),
    buffersSentInBatch_(0)
  { };

//////////////////////////////////////////////////////////////////////////////
// send the current response message back to the client
void IpcServerMsgStream::sendResponse()
  {
  MXTRC_FUNC("IpcServerMsgStream::sendResponse");
  prepSendMsgForOutput();
  tickleOutputIo();
  }

//////////////////////////////////////////////////////////////////////////////
// server is done replying to all requests
void IpcServerMsgStream::responseDone()
{
  while (numOfOutputBuffers() < numOfReplyTagBuffers())
  { // create empty responses for excess reply tags
    sendMsgObj(0);  
    prepSendMsgForOutput();
  }
  tickleOutputIo();
}

///////////////////////////////////////////////////////////////////////////////
// reply to outstanding requests from the output queue
void IpcServerMsgStream::tickleOutputIo()
{
  // This early return is done without any SeaMonster tracing so that
  // SeaMonster tracing only happens when there is actually something
  // to send
  if (numOfOutputBuffers() == 0)
    return;

  if (getSMContinueProtocol())
    EXSM_TRACE(EXSM_TRACE_PROTOCOL,
               "STREAM %p BEGIN tickleOutputIo bufs %d", this,
               (int) numOfOutputBuffers());
  
  IpcMessageBuffer* msgBuf;
  IpcConnection* connection; 
  IpcBufferedMsgStream* msgStream;
  MXTRC_FUNC("IpcServerMsgStream::tickleOutputIo");
  while (msgBuf = getReplyTagOutputBuffer(connection, msgStream))
  {
    if (connection)
    {
      // respond to remote client message stream via connection
      connection->send(msgBuf);
      
      // comment out the following wait to eliminate the cause of bug 2473
      // in an open/close cursor test that recreates the problem
      //connection->wait(IpcImmediately);
    }
    else
    {  // respond to local client message stream
      msgStream->internalActOnReceive(msgBuf, NULL);
    }
  }

  if (getSMContinueProtocol())
    EXSM_TRACE(EXSM_TRACE_PROTOCOL,
               "STREAM %p END tickleOutputIo bufs %d", this,
               (int) numOfOutputBuffers());
}

void IpcBufferedMsgStream::setSMLastInBatch()
{
  Int32 entries = (Int32) numOfSendBuffers();
  assert(entries > 0);

  IpcMessageBuffer *msgBuf = sendBufList_[entries - 1];
  InternalMsgHdrInfoStruct *hdr = (InternalMsgHdrInfoStruct *) msgBuf->data(0);
  hdr->setSMLastInBatch();
}

///////////////////////////////////////////////////////////////////////////////
// internal receive call back 
void IpcServerMsgStream::internalActOnReceive(IpcMessageBuffer* buffer,
                                              IpcConnection* connection)
{
  if (buffer)
  {
    assert(buffer->getReplyTag() != GuaInvalidReplyTag); // vaild reply tag ?

    if (client_ != NULL)
    {
      // cannot be recipient of local msg stream if receiving from connection
      assert(client_ == buffer->getConnection());
      client_->receive(this);
    }
  }
  IpcBufferedMsgStream::internalActOnReceive(buffer, connection);
}

// -----------------------------------------------------------------------
// Methods for class IpcServerClass
// -----------------------------------------------------------------------

IpcServerClass::IpcServerClass(IpcEnvironment *env,
			       IpcServerType serverType,
			       IpcServerAllocationMethod allocationMethod,
                               short serverVersion,
                               char *nodeName) :
     allocatedServers_(env->getHeap())
{
  environment_ = env;
  serverType_ = serverType;
  allocationMethod_ = allocationMethod;
  serverVersion_ = serverVersion;
  nodeName_ = nodeName;
  char *parallelOpens = getenv("ESP_PARALLEL_CC_OPENS");
  if (parallelOpens != NULL && *parallelOpens == '0')
    parallelOpens_ = FALSE;
  else
    parallelOpens_ = TRUE;
  Int32 retVal;
  retVal = pthread_mutex_init(&nowaitedEspServer_.cond_mutex_, NULL);
  assert(retVal == 0);
  retVal = pthread_cond_init(&nowaitedEspServer_.cond_cond_, NULL);
  assert(retVal == 0);
  nowaitedEspServer_.startTag_ = 0;
  nowaitedEspServer_.callbackCount_ = 0;
  nowaitedEspServer_.completionCount_ = 0;
  nowaitedEspServer_.waiting_ = FALSE;
  char *waitedStartupArg = getenv("ESP_PARALLEL_STARTUP");
  if (waitedStartupArg == NULL)
    nowaitedEspServer_.waitedStartupArg_ = '0';
  else
  {
    switch (*waitedStartupArg)
    {
    case '0':
      nowaitedEspServer_.waitedStartupArg_ = '1';
      break;
    case '1':
      nowaitedEspServer_.waitedStartupArg_ = '0';
      break;
    default:
      nowaitedEspServer_.waitedStartupArg_ = '0';
    }
  }
  if (allocationMethod_ == IPC_ALLOC_DONT_CARE)
    {
      // NA_WINNT is set and NA_GUARDIAN_IPC is set
      // The standard method on NT is to create a Guardian process
      // in order to run in an NT only or simulated environment we can set an environment
      // variable to override that mechanism.
      if (getenv("SQL_NO_NSK_LITE") == NULL)
        {
          allocationMethod_ = IPC_LAUNCH_GUARDIAN_PROCESS;
        }
      else 
        {
          allocationMethod_ = IPC_LAUNCH_NT_PROCESS;
          time_t tp;
          time(&tp);
	  nextPort_ = IPC_SQLESP_PORTNUMBER + tp % 10000; // arbitrary
        };
    }
}

IpcServerClass::~IpcServerClass() 
{
   NAHeap *heap = (NAHeap *)environment_->getHeap();
   CollIndex entryCount;
   entryCount  = allocatedServers_.entries();
   for (CollIndex i = 0 ; i < entryCount; i++) {
       NADELETE(allocatedServers_[i], IpcServer, heap);
   }
   allocatedServers_.clear();
}

IpcServer * IpcServerClass::allocateServerProcess(ComDiagsArea **diags,
						  CollHeap   *diagsHeap,
						  const char *nodeName,
						  IpcCpuNum cpuNum,
						  IpcPriority priority,
						  Lng32 espLevel,
						  NABoolean usesTransactions,
						  NABoolean waitedCreation,
						  Lng32 maxNowaitRequests,
						  const char* progFileName,
						  const char* processName,
						  NABoolean parallelOpens,
						  IpcGuardianServer **creatingProcess)
{
  IpcServer *result = NULL;
  short retcode = 0;
  if (creatingProcess != NULL)
  {
    result = *creatingProcess;
    if ((*creatingProcess)->isCreatingProcess())
    {
      assert(waitedCreation == FALSE);
      retcode = (*creatingProcess)->workOnStartup(IpcInfiniteTimeout,diags,diagsHeap);
      if ((*creatingProcess)->isCreatingProcess())
      {
	result = NULL;
	return result; // Launch didn't complete
      }
    }
    else
      assert(FALSE); // Existing process in not in creating state
    if (retcode)
      {
        char buf[20];
        str_sprintf(buf, "retcode = %d", retcode);
        const char *retcodeText = buf;
        (*creatingProcess)->logEspRelease(__FILE__, __LINE__, 
                      retcodeText);
        (*creatingProcess)->release();
        *creatingProcess = NULL; // Launch completed
        result = NULL;
      }
    // remember this server
    if (result != NULL)
      allocatedServers_.insert(result);
    return result;
  }
  NABoolean lv_usesTransactions = usesTransactions;
  Lng32 lv_maxNowaitRequests = maxNowaitRequests;
 
  IpcConnection *serverConn = NULL;
  const char *className = NULL;
  IpcServerPortNumber defaultPortNumber = IPC_INVALID_SERVER_PORTNUMBER;
  NABoolean debugServer = FALSE;
  const char *overridingDefineName = NULL;

  debugServer = (getenv("DEBUG_SERVER") != NULL);

  // to avoid compiler warning for Unix build
  waitedCreation = waitedCreation;

  switch (serverType_)
    {
    case IPC_SQLUSTAT_SERVER:
      if (debugServer)
	{
	  className = "arkustatdbg";
	  defaultPortNumber = IPC_SQLUSTAT_DEBUG_PORTNUMBER;
	}
      else
	{
	  className = "arkustat";
	  defaultPortNumber = IPC_SQLUSTAT_PORTNUMBER;
	}
	  overridingDefineName = "ARK_STA_PROG_FILE_NAME";
      break;
    case IPC_SQLCAT_SERVER:
      if (debugServer)
	{
	  className = "arkcatdbg";
	  defaultPortNumber = IPC_SQLCAT_DEBUG_PORTNUMBER;
	}
      else
	{
	  className = "arkcat";
	  defaultPortNumber = IPC_SQLCAT_PORTNUMBER;
	}
	  overridingDefineName = "ARK_CAT_PROG_FILE_NAME";
      break;
    case IPC_SQLCOMP_SERVER:
      if (debugServer)
	{
	  className = "arkcmpdbg";
	  defaultPortNumber = IPC_SQLCOMP_DEBUG_PORTNUMBER;
	}
      else
	{
	  className = "arkcmp";
	  defaultPortNumber = IPC_SQLCOMP_PORTNUMBER;
	}
	  overridingDefineName = "_ARK_CMP_PROG_FILE_NAME";
      break;
    case IPC_SQLESP_SERVER:
      if (debugServer)
	{
	  className = "arkespdbg";
	  defaultPortNumber = IPC_SQLESP_DEBUG_PORTNUMBER;
	}
      else
	{
	  className = "arkesp";
	  defaultPortNumber = IPC_SQLESP_PORTNUMBER;
	}
      overridingDefineName = "_ARK_ESP_PROG_FILE_NAME";
      break;
   case IPC_SQLBDRR_SERVER:
      className = "bdrr";
      overridingDefineName = "=_MX_BDRR_PROG_FILE_NAME";
      break;
      //
      // UDR Servers
      //
      
    case IPC_SQLUDR_SERVER:
      {
        if (debugServer)
        {
          className = "udrservdbg";
          defaultPortNumber = IPC_SQLUDR_DEBUG_PORTNUMBER;
        }
        else
        {
          className = "udrserv";
          defaultPortNumber = IPC_SQLUDR_PORTNUMBER;
        }
        overridingDefineName = "_ARK_UDR_PROG_FILE_NAME";
      }
      break;
     
      //
      // Query Matching Server
      //
    case IPC_SQLQMS_SERVER:
      {
        if (debugServer)
        {
          className = "qmsdbg";
          defaultPortNumber = IPC_SQLQMS_DEBUG_PORTNUMBER;
        }
        else
        {
          className = "qms";
          defaultPortNumber = IPC_SQLQMS_PORTNUMBER;
        }
        overridingDefineName = "_ARK_QMS_PROG_FILE_NAME";
      }
      break;

      //
      // Query Matching Publisher
      //
    case IPC_SQLQMP_SERVER:
      {
        if (debugServer)
        {
          className = "qmpdbg";
          defaultPortNumber = IPC_SQLQMP_DEBUG_PORTNUMBER;
        }
        else
        {
          className = "qmp";
          defaultPortNumber = IPC_SQLQMP_PORTNUMBER;
        }
        overridingDefineName = "_ARK_QMP_PROG_FILE_NAME";
      }
      break;  

      //
      // Query Matching Monitor
      //
    case IPC_SQLQMM_SERVER:
      {
        if (debugServer)
        {
          className = "qmmdbg";
          defaultPortNumber = IPC_SQLQMM_DEBUG_PORTNUMBER;
        }
        else
        {
          className = "qmm";
          defaultPortNumber = IPC_SQLQMM_PORTNUMBER;
        }
        overridingDefineName = "_ARK_QMM_PROG_FILE_NAME";
      }
      break; 

      // generic servers passed in as progFileName
    case IPC_GENERIC_SERVER:
    case IPC_SQLBDRS_SERVER:
      if ((allocationMethod_ != IPC_USE_PROCESS) &&
         (! progFileName))
        ABORT("Invalid server type specified in IpcServer::IpcServer()");

      if (allocationMethod_ == IPC_USE_PROCESS)
	className = processName;
      else
        className = progFileName;
      defaultPortNumber = IPC_GENERIC_PORTNUMBER;
      //      overridingDefineName = "_ARK_GENERIC_PROG_FILE_NAME";
      break;
    case IPC_SQLSSCP_SERVER:
      className = "sscp";
      lv_usesTransactions = FALSE;
      lv_maxNowaitRequests =  FS_MAX_NOWAIT_DEPTH;   
      overridingDefineName = "=_MX_SSCP_PROCESS_PREFIX"; 
      break;
    case IPC_SQLSSMP_SERVER:
      className = "ssmp";
      lv_usesTransactions = FALSE;
      lv_maxNowaitRequests =  FS_MAX_NOWAIT_DEPTH;   
      overridingDefineName = "=_MX_SSMP_PROCESS_PREFIX";
      break;
    default:
      ABORT("Invalid server type specified in IpcServer::IpcServer()");
      break;
    }

  switch (allocationMethod_)
    {

    case IPC_LAUNCH_GUARDIAN_PROCESS:
    case IPC_SPAWN_OSS_PROCESS:
    case IPC_USE_PROCESS:
      {
	IpcGuardianServer *result2 =
	  new(environment_) IpcGuardianServer(
	       this,
	       diags,
	       diagsHeap,
	       nodeName,
	       className,
	       cpuNum,
	       priority, //IPC_PRIORITY_DONT_CARE,
	       allocationMethod_,
	       (short) allocatedServers_.entries(),
	       lv_usesTransactions,
	       FALSE,
               waitedCreation,
	       lv_maxNowaitRequests,
	       overridingDefineName,
	       processName,
	       parallelOpens);
	result = result2;
	  retcode = result2->workOnStartup(IpcInfiniteTimeout,diags,diagsHeap);
	if (result2->isCreatingProcess() && retcode == 0)
          return result2;
        if (retcode)
          {
             char buf[20];
             str_sprintf(buf, "retcode = %d", retcode);
             const char *retcodeText = buf;
             result2->logEspRelease(__FILE__, __LINE__,
                      retcodeText);
             result2->release();
             result = NULL;
          }
      }
      break;

    case IPC_INETD:
      {
	serverConn = createInternetProcess(diags,
					   diagsHeap,
					   nodeName,
					   className,
					   cpuNum,
					   usesTransactions,
					   defaultPortNumber);
	// make an IpcServer object
	if (serverConn != NULL)
	  {
	    result = new(environment_) IpcServer(serverConn,this);
	    // $$$$ add errors from creating the connection
	  }
      }
      break;

    case IPC_POSIX_FORK_EXEC:
      {
	serverConn = forkProcess(diags,
				 diagsHeap,
				 nodeName,
				 className,
				 cpuNum,
				 usesTransactions);
	// make an IpcServer object
	if (serverConn != NULL)
	  result = new(environment_) IpcServer(serverConn,this);
      }
      break;


    case IPC_LAUNCH_NT_PROCESS:
      {
	defaultPortNumber = (IpcServerPortNumber)nextPort_;
	nextPort_++;
	
	// make an IpcServer object
	if (serverConn != NULL)
	  result = new(environment_) IpcServer(serverConn,this);
      }
      break;
    default:
      ABORT("Invalid server class allocation method");
      break;

    } // switch

  // remember this server
  if (result != NULL)
    allocatedServers_.insert(result);

  return result;
}

void IpcServerClass::freeServerProcess(IpcServer *s)
{
  // assume caller already killed the server
  allocatedServers_.remove(s);
  NADELETE(s, IpcServer, environment_->getHeap());
}

char *IpcServerClass::getProcessName(const char *nodeName, short nodeNameLen, short cpuNum, char *processName)
{
  return getServerProcessName(serverType_, nodeName, nodeNameLen, cpuNum, processName);
}
// -----------------------------------------------------------------------
// methods for class IpcEnvironment
// -----------------------------------------------------------------------

/////////////////////////////////////////////////////////////////////
// Helper method getDefineShort
//  Accepts a define name as an argument.  Be sure to make this 24 bytes
//  long, blank padded if necessary, not including the null terminator --
//  see the manual for DEFINEINFO if there are any questions.
//  
//  Return -1 if define not resolved, else it returns an integer parsed 
//  from the class MAP's "file name".
//
// For example:
// add_define =_SQLMX_MAX_OUTGOING_MSG class=MAP file=\$SYSTEM.#128
//
/////////////////////////////////////////////////////////////////////
short getDefineShort( char * defineName )
{
  Lng32 retVal = -1;

  return (short) retVal;
  return (short) retVal;
}


// -----------------------------------------------------------------------
// Methods for IpcEnvironment
// -----------------------------------------------------------------------

 IpcEnvironment::IpcEnvironment(CollHeap *heap, UInt32 *eventConsumed,
                               NABoolean breakEnabled, IpcServerType serverType, NABoolean useGuaIpcAtRuntime
                               , NABoolean persistentProcess
                              ) :
     breakEnabled_(breakEnabled),
     heap_(heap),
     eventConsumedAddr_(eventConsumed),
     completedMessages_(heap),
     envvars_(NULL),
     envvarsLen_(0),
     heapFull_(FALSE),
     safetyBuffer_(NULL),
     stopAfter_(0),
     inactiveTimeout_(0),
     inactiveTimestamp_(0),
     espPrivStackSize_(64 * 1024), // Neo default priv stack size
     espFreeMemTimeout_(0), // initial value, will be overwritten by fixup message. 
     useGuaIpcAtRuntime_(useGuaIpcAtRuntime),
     serverType_(serverType),
     guaMaxMsgIOSize_(IOSIZEMAX),
     maxCCNowaitDepthLow_(InitialNowaitRequestsPerEsp), 
     maxCCNowaitDepthHigh_(HighLoadNowaitRequestsPerEsp),
     maxPerProcessMQCs_(XMAX_SETTABLE_SENDLIMIT_H), // Seaquest "H-Series"/Seaquest limit
     retriedMessageCount_(0),
     cliGlobals_(NULL),
     numOpensInProgress_(0)
     , persistentProcess_(persistentProcess)
     , corruptDownloadMsg_(false)
     , logReleaseEsp_(false)
     , logEspIdleTimeout_(false)
     , logEspGotCloseMsg_(false)
     , logTimeIpcConnectionState_(false)
     , seamonsterEnabled_(false)
{
  if (heap_ == NULL)
    heap_ = new DefaultIpcHeap; // here it's ok to use global operator new
  
  allConnections_ = new(heap_) IpcAllConnections(heap_, 
    (serverType == IPC_SQLESP_SERVER
     || serverType == IPC_SQLSSCP_SERVER
     || serverType == IPC_SQLSSMP_SERVER));
  controlConnection_ = NULL;
  for (Lng32 i = 0; i < 4; i++)
    currentExRtFragTable_[i] = NULL; // for integrity checking

  idleTimestamp_ = NA_JulianTimestamp();
#ifdef USE_SB_NEW_RI
  const char *maxenvvar = getenv("IPC_IOSIZEMAX");
  if (maxenvvar)
    guaMaxMsgIOSize_ = MAXOF(3000, MINOF(atoi(maxenvvar), 1048576));
#endif //USE_SB_NEW_RI

  maxPollingInterval_ = 300;
  const char *envvar;
  envvar = getenv("IPC_MAX_POLLING_INTERVAL");
  if (envvar)
    maxPollingInterval_ = atoi(envvar);

  persistentOpenAssigned_ = 0;
  char *perOpensEnvvar = getenv("ESP_PERSISTENT_OPENS");
  Int32 perOpensEnvvarVal;
  if (perOpensEnvvar != NULL )
    perOpensEnvvarVal = atoi(perOpensEnvvar);
  else
    perOpensEnvvarVal = 0;
  if (perOpensEnvvarVal < 1)
  {
    persistentOpens_ = FALSE;
    persistentOpenEntries_ = 64;
  }
  else
  {
    persistentOpens_ = TRUE;
    if (perOpensEnvvarVal < 3)
      persistentOpenEntries_ = 64;
    else if (perOpensEnvvarVal < 256)
      persistentOpenEntries_ = perOpensEnvvarVal;
    else
      persistentOpenEntries_ = 256;
  }
  persistentOpenArray_ = (PersistentOpenEntry (*) [1])heap_->allocateMemory(sizeof(PersistentOpenEntry) * persistentOpenEntries_);
  for (Int32 i = 0; i < persistentOpenEntries_; i++)
  {
    (*persistentOpenArray_)[i].persistentOpenExists_ = FALSE;
  }

  char *masterFastCompletion = getenv("MASTER_FAST_COMPLETION");
  if (masterFastCompletion != NULL && *masterFastCompletion == '0')
    masterFastCompletion_ = FALSE;
  else
    masterFastCompletion_ = TRUE; 
  char *nowaitDepthEnvvar = getenv("ESP_NOWAIT_DEPTH");
  if (nowaitDepthEnvvar != NULL)
    maxCCNowaitDepthLow_ = maxCCNowaitDepthHigh_ = atoi(nowaitDepthEnvvar);
  XCONTROLMESSAGESYSTEM(XCTLMSGSYS_SETSENDLIMIT, XMAX_SETTABLE_SENDLIMIT_H);

 if (getenv("ESP_CORRUPT_MESSAGE_TEST"))
   corruptDownloadMsg_ = true;

  const char *lre = getenv("LOG_ESP_RELEASE");
  if (lre && *lre == '1')
    logReleaseEsp_ = true;

  const char *liet = getenv("LOG_IDLE_ESP_TIMEOUT");
  if (liet && *liet == '1')
    logEspIdleTimeout_ = true;

  const char *legcm = getenv("LOG_ESP_GOT_CLOSE_MSG");
  if (legcm && *legcm == '1')
    logEspGotCloseMsg_ = true;

  const char *etis = getenv("ESP_TIME_IPCCONNECTION_STATES");
  if (etis && *etis == '1')
    logTimeIpcConnectionState_ = true;
  
  const char *smEnv = getenv("SQ_SEAMONSTER");
  if (smEnv && *smEnv == '1')
    seamonsterEnabled_ = true;

  char *espAssignByLevel = getenv("ESP_ASSIGN_BY_LEVEL");
  if (espAssignByLevel == NULL)
    espAssignByLevel_ = '0';
  else
  {
    switch (*espAssignByLevel)
    {
    case '1':
      espAssignByLevel_ = '1';
      break;
    default:
      espAssignByLevel_ = '0';
    }
  }

  memset(myProcessName_, 0, sizeof(myProcessName_));

  closeTraceArray_ = (CloseTraceEntry (*) [closeTraceEntries])heap_->allocateMemory(sizeof(CloseTraceEntry) * closeTraceEntries);
  for (Int32 i = 0; i < closeTraceEntries; i++)
  {
    (*closeTraceArray_)[i].count_ = 0;
    (*closeTraceArray_)[i].line_ = 0;
    (*closeTraceArray_)[i].clientFileNumber_ = 0;
    (*closeTraceArray_)[i].cpu_ = 0;
    (*closeTraceArray_)[i].pin_ = 0;
    (*closeTraceArray_)[i].seqNum_ = -1;
  }
  closeTraceIndex_ = closeTraceEntries - 1;
  bawaitioxTraceArray_ = (BawaitioxTraceEntry (*) [bawaitioxTraceEntries])heap_->allocateMemory(sizeof(BawaitioxTraceEntry) * bawaitioxTraceEntries);
  for (Int32 i = 0; i < bawaitioxTraceEntries; i++)
  {
    (*bawaitioxTraceArray_)[i].count_ = 0;
    (*bawaitioxTraceArray_)[i].recursionCount_ = 0;
    (*bawaitioxTraceArray_)[i].firstConnectionIndex_ = 0;
    (*bawaitioxTraceArray_)[i].firstConnection_ = NULL;
    char *ipcAwaitiox = (char *)&((*bawaitioxTraceArray_)[i].ipcAwaitiox_);
    memset((char *)&((*bawaitioxTraceArray_)[i].ipcAwaitiox_), 0, sizeof(IpcAwaitiox));
  }
  bawaitioxTraceIndex_ = bawaitioxTraceEntries - 1;

  // Ipc data message trace area initialization
  maxIpcMsgTraceIndex_ = NUM_IPC_MSG_TRACE_ENTRIES;
  const char *ipcMsgEnv = getenv("NUM_EXE_IPC_MSG_TRACE_ENTRIES");
  if (ipcMsgEnv != NULL)
  {
    Int32 nums = atoi(ipcMsgEnv);
    if (nums >= 0 && nums < MAX_IPC_MSG_TRACE_ENTRIES)
      maxIpcMsgTraceIndex_ = nums;  //ignore any other value
  }
  ipcMsgTraceArea_ = new (heap_) IpcMsgTrace[maxIpcMsgTraceIndex_];
  memset(ipcMsgTraceArea_, 0, sizeof(IpcMsgTrace) * maxIpcMsgTraceIndex_);
  lastIpcMsgTraceIndex_ = maxIpcMsgTraceIndex_;
  ipcMsgTraceRef_ = NULL;
}

  void IpcEnvironment::closeTrace(unsigned short line,
                                  short clientFileNumber,
                                  Int32 cpu,
                                  Int32 pin,
                                  SB_Int64_Type seqNum)
{
  unsigned short i = closeTraceIndex_ == closeTraceEntries - 1 ? 0 : closeTraceIndex_ + 1;
  (*closeTraceArray_)[i].count_ = (*closeTraceArray_)[closeTraceIndex_].count_ + 1;
  (*closeTraceArray_)[i].line_ = line;
  (*closeTraceArray_)[i].clientFileNumber_ = clientFileNumber;
  (*closeTraceArray_)[i].cpu_ = cpu;
  (*closeTraceArray_)[i].pin_ = pin;
  (*closeTraceArray_)[i].seqNum_ = seqNum;
  closeTraceIndex_ = i;
}
  void IpcEnvironment::bawaitioxTrace(IpcSetOfConnections *ipcSetOfConnections,
                                  ULng32 recursionCount,
                                  CollIndex firstConnectionIndex,
                                  IpcConnection *firstConnection,
                                  IpcAwaitiox *ipcAwaitiox)
{
  unsigned short i = bawaitioxTraceIndex_ == bawaitioxTraceEntries - 1 ? 0 : bawaitioxTraceIndex_ + 1;
  (*bawaitioxTraceArray_)[i].count_ = (*bawaitioxTraceArray_)[bawaitioxTraceIndex_].count_ + 1;
  (*bawaitioxTraceArray_)[i].recursionCount_ = recursionCount;
  (*bawaitioxTraceArray_)[i].ipcSetOfConnections_ = ipcSetOfConnections;
  (*bawaitioxTraceArray_)[i].firstConnectionIndex_ = firstConnectionIndex;
  (*bawaitioxTraceArray_)[i].firstConnection_ = firstConnection;
  bawaitioxTraceIndex_ = i;
}

IpcEnvironment::~IpcEnvironment()
{
  if (ipcMsgTraceRef_)
  {
    ExeTraceInfo *ti = cliGlobals_->getExeTraceInfo();
    if (ti)
    {
      ti->removeTrace(ipcMsgTraceRef_);
    }
  }
  NADELETEBASIC(ipcMsgTraceArea_, heap_);

  delete allConnections_;
  delete controlConnection_;
  releaseSafetyBuffer();
}

void IpcEnvironment::stopIpcEnvironment()
{

  NAExit(0);
}

void IpcEnvironment::setIdleTimestamp()
{
  idleTimestamp_ = NA_JulianTimestamp();
}

void IpcEnvironment::setInactiveTimestamp()
{
  inactiveTimestamp_ = NA_JulianTimestamp();
}

void IpcEnvironment::deleteCompletedMessages()
{
  while (completedMessages_.entries())
  {
    IpcMessageStreamBase * mm = completedMessages_[0];
    completedMessages_.remove(mm);
    delete mm;
  }

  // Clean up any SeaMonster send buffers that had been queued but are
  // now completed
  void *buf = NULL;
  while (ExSM_RemoveCompletedSendBuffer(buf))
  {
    IpcMessageBuffer *msgBuf = (IpcMessageBuffer *) buf;
    assert(msgBuf);
    msgBuf->decrRefCount();
  }
}

void IpcEnvironment::setControlConnection(IpcControlConnection *cc)
{
  if (controlConnection_ == NULL)
    controlConnection_ = cc;
  else
  {
    controlConnection_->getConnection()->dumpAndStopOtherEnd(true, false);
    if (cc->getConnection()->getOtherEnd() ==
        controlConnection_->getConnection()->getOtherEnd())
      ; // Already have a core-file.
    else
      cc->getConnection()->dumpAndStopOtherEnd(true, false);
    assert(controlConnection_ == NULL);
  }
}

IpcProcessId IpcEnvironment::getMyOwnProcessId(IpcNetworkDomain dom)
{
  if (dom == IPC_DOM_INVALID)
    {
      // if not specified, the default domains are the "native" domains
      dom = IPC_DOM_INTERNET;
    }

  if (dom == IPC_DOM_INTERNET)
    {
      SockIPAddress sockIpAddr;
      SockPortNumber portNo;

      // get the port number (listner port for server, some number for master)
      if (controlConnection_)
	{
	  // can't have an internet proc id when reading from $RECEIVE
	  assert(controlConnection_->castToSockControlConnection());

	  // get the listner port number from the control connection
	  portNo = controlConnection_->castToSockControlConnection()->
	    getListnerPortNum();
	}
      else
	{
	  portNo = 0;

	}

      // get the IP address of the local node
      sockIpAddr.set();
      
      // make a process id from the IP address and the port number
      return IpcProcessId(sockIpAddr,portNo);
    }
  else if (dom == IPC_DOM_GUA_PHANDLE)
    {
      // for Guardian, just get the phandle from the operating system
      return IpcProcessId(MyGuaProcessHandle());
    }
  else
    {
      ABORT("Invalid domain in IpcEnvironment::getMyOwnProcessId()");
    }
  // make the compiler happy
  return IpcProcessId();
}

IpcPriority IpcEnvironment::getMyProcessPriority()
{
  IpcPriority priority;
  priority = -1;

  return priority;
}

void IpcEnvironment::setEnvVars(char ** envvars)
{
  envvars_ = envvars;
}

void IpcEnvironment::setEnvVarsLen(Lng32 envvarsLen)
{
  envvarsLen_ = envvarsLen;
}

void IpcEnvironment::releaseSafetyBuffer()
{
// Don't mmap and munmap a 512K block on Linux for every download
// and fixup. 256K is not a lot to just leave allocated.
}

void IpcEnvironment::setHeapFullFlag(NABoolean b)
{
  heapFull_ = b;

  if (heapFull_)
  {
    releaseSafetyBuffer();
  }
  else
  {
    // Allocate a safety buffer if we don't have one already. The size
    // chosen here (256K) is arbitrary. It is expected to be "enough"
    // space on the heap to complete some I/Os after the heap became
    // full. The main scanario we know of when the heap becomes full
    // is during broadcast of large ESP fragments.
    if (safetyBuffer_ == NULL)
      safetyBuffer_ = (char *) heap_->allocateMemory(256 * 1024);
  }
}

void IpcEnvironment::notifyNoOpens()
{
  if (serverType_ != IPC_SQLSSCP_SERVER &&
      serverType_ != IPC_SQLSSMP_SERVER)
    stopIpcEnvironment();
}

void IpcEnvironment::logRetriedMessages()
{
  if (retriedMessageCount_ > 0)
  {
  }
}

#ifdef IPC_INTEGRITY_CHECKING

// methods that perform integrity checking on Ipc-related data structures

void IpcEnvironment::setCurrentExRtFragTable(ExRtFragTable *ft)
  {
  NABoolean alreadyIn = FALSE;
  Lng32 i;
  Lng32 firstOpen = -1;

  for (i = 0; i < 4; i++)
    {
    if (ft == currentExRtFragTable_[i])
      alreadyIn = TRUE;
    else if ((currentExRtFragTable_[i] == 0) && (firstOpen < 0))
      firstOpen = i;
    }

  if ((!alreadyIn) && (firstOpen >= 0))
    {
    cerr << "Adding ExRtFragTable integrity check pointer in IpcEnvironment to "
      << (void *)ft << "." << endl;
    // to give debugger a chance to come up
    /*for (long i = 0; i < 10000000; i++)
      {
      long j = i + 1;
      while (j > 1)
        {
        if (j & 1)
          j = 3 * j + 1;
        else
          j = j / 2;
        }
      } */
    currentExRtFragTable_[firstOpen] = ft;
    }
  }

void IpcEnvironment::removeCurrentExRtFragTable(ExRtFragTable *ft)
  {
  Lng32 i;
  
  for (i = 0; i < 4; i++)
    {
    if (ft == currentExRtFragTable_[i])
      {
      currentExRtFragTable_[i] = 0;
      cerr << "Removing ExRtFragTable integrity check pointer in IpcEnvironment to "
        << (void *)ft << "." << endl;
      }
    }
  }

void IpcEnvironment::setExRtFragTableIntegrityCheckPtr
(void (*fnptr) (ExRtFragTable *ft))
  {
  integrityCheckExRtFragTablePtr_ = fnptr;
  }


ExRtFragTable * IpcEnvironment::getCurrentExRtFragTable(Lng32 i) 
  { 
  if ((i >= 0) && (i < 4))
    return currentExRtFragTable_[i];
  
  return 0;
  }

void IpcEnvironment::checkIntegrity(void)
  {
  // IpcEnvironment, being the top of the network of IPC-related data structures,
  // is the place where we begin integrity checking
  checkLocalIntegrity();
  }

void IpcEnvironment::checkLocalIntegrity(void)
  {
  for (Lng32 i = 0; i < 4; i++) 
    {
    if ((currentExRtFragTable_[i]) && (integrityCheckExRtFragTablePtr_))
      {
      // this file doesn't know about the class ExRtFragTable, so we call
      // a C-style function instead in file /executor/ex_frag_rt.cpp...
      //currentExRtFragTable_[i]->checkLocalIntegrity(); 
      (*integrityCheckExRtFragTablePtr_)(currentExRtFragTable_[i]);
      }
    }
  
  allConnections_->checkLocalIntegrity();  // check IpcAllConnections
  }

#endif

short IpcEnvironment::getNewPersistentOpenIndex()
{
  if (persistentOpenAssigned_ < persistentOpenEntries_)
  {
    for (unsigned short i = 0; i < persistentOpenEntries_; i++)
    {
      if ((*persistentOpenArray_)[i].persistentOpenExists_ == FALSE)
        return i;
    }
  }
  return -1;
}

void IpcEnvironment::setPersistentOpenInfo(short index, GuaProcessHandle *otherEnd, short fileNum)
{
  memcpy((void *)&(*persistentOpenArray_)[index].persistentOpenPhandle_, (void *)otherEnd, sizeof(GuaProcessHandle));
  (*persistentOpenArray_)[index].persistentOpenFileNum_ = fileNum;
  (*persistentOpenArray_)[index].persistentOpenExists_ = TRUE;
  persistentOpenAssigned_ += 1;
}

short IpcEnvironment::getPersistentOpenInfo(GuaProcessHandle *otherEnd, short *index)
{
  Int32 guaRetCode;
  for (short i = 0; i < persistentOpenEntries_; i++)
  {
    if ((*persistentOpenArray_)[i].persistentOpenExists_)
    {
      guaRetCode = XPROCESSHANDLE_COMPARE_((SB_Phandle_Type *)otherEnd,
                                           (SB_Phandle_Type *)&(*persistentOpenArray_)[i].persistentOpenPhandle_);
      if (guaRetCode == 2) // Phandles are the same
      {
        *index = i;
        return (*persistentOpenArray_)[i].persistentOpenFileNum_;
      }
    }
  }
  // Matching phandle was not found
  *index = -1;
  return -1;
}

void IpcEnvironment::resetPersistentOpen(short index)
{
  (*persistentOpenArray_)[index].persistentOpenExists_ = FALSE;
  persistentOpenAssigned_ -= 1;
}

const char *IpcMsgTraceDesc =
           "SQL Ipc Message exchanged between consumer and producer processes.\n Can use env NUM_EXE_IPC_MSG_TRACE_ENTRIES to config more or less entries";

void IpcEnvironment::registTraceInfo(ExeTraceInfo *ti)
{
  if (cliGlobals_ && !ipcMsgTraceRef_)
  {
    // register IPC message trace and IPC connection trace
    if (ti)
    {
      Int32 lineWidth = 66;
      void *regdTrace;
      Int32 ret = ti->addTrace("IpcMessages", this, maxIpcMsgTraceIndex_, 6,
                               this, getALine,
                               &lastIpcMsgTraceIndex_,
                               lineWidth, IpcMsgTraceDesc, &regdTrace);
      if (ret == 0)
      {
        // trace info added successfully, now add entry fields
        ti->addTraceField(regdTrace, "Connection ", 0,
                          ExeTrace::TR_POINTER32);
        ti->addTraceField(regdTrace, "BufferAddr   ", 1, ExeTrace::TR_POINTER32);
        ti->addTraceField(regdTrace, "Length ", 2, ExeTrace::TR_INT32);
        ti->addTraceField(regdTrace, "Type", 3, ExeTrace::TR_INT32);
        ti->addTraceField(regdTrace, "Last   ", 4, ExeTrace::TR_CHAR);
        ti->addTraceField(regdTrace, "SeqNum", 5, ExeTrace::TR_INT32);
        ipcMsgTraceRef_ = regdTrace;
      }

      // IPC connection trace
      allConnections_->registTraceInfo(this, ti);
    }
  }
}

Int32 IpcEnvironment::printAnIpcEntry(Int32 lineno, char *buf)
{
  if (lineno >= maxIpcMsgTraceIndex_)
    return 0;
  Int32 rv;
  rv = sprintf(buf, "%.4d  %8p  %8p  %8d  %.4s  %3d %10d\n",
               lineno,
               ipcMsgTraceArea_[lineno].conn_,
               ipcMsgTraceArea_[lineno].bufAddr_,
               ipcMsgTraceArea_[lineno].length_,
               IpcMsgOperName[ipcMsgTraceArea_[lineno].sendOrReceive_],
               ipcMsgTraceArea_[lineno].isLast_,
               ipcMsgTraceArea_[lineno].seqNum_);
  return rv;
}

char const *IpcEnvironment::myProcessName()
{
  if (myProcessName_[0] == '\0')
    if (getCliGlobals())
      strcpy(myProcessName_, getCliGlobals()->myProcessNameString());
    else {
      NAProcessHandle myPhandle;
      myPhandle.getmine();
      myPhandle.decompose();
      strcpy(myProcessName_, myPhandle.getPhandleString());
      return myProcessName_;
    }
  return myProcessName_;
 
}

void IpcAllocateDiagsArea(ComDiagsArea *&diags, CollHeap *diagsHeap)
{
  if ( NOT diags)
    {
      // diags does not point to an allocated diags area yet, allocate one
      diags = ComDiagsArea::allocate(diagsHeap);

      // catch the case where we can't even allocate the diags area
      if (NOT diags)
	ABORT("unable to allocate diagnostics area for IPC error");
    }
}

// -----------------------------------------------------------------------
// Global operator new with the placement form where an IPC environment
// is specified
// -----------------------------------------------------------------------
void * operator new(size_t size, IpcEnvironment *env)
{
  return env->getHeap()->allocateMemory(size);
}

void * operator new[](size_t size, IpcEnvironment *env)
{
  return env->getHeap()->allocateMemory(size);
}

char *getServerProcessName(IpcServerType serverType, const char *nodeName, short nodeNameLen, 
                           short cpuNum, char *processName, short *envType)
{
  const char *overridingDefineName = NULL;
  char *processPrefixFromEnvvar = NULL;
  const char *processPrefix = NULL;
  char serverNodeName[MAX_SEGMENT_NAME_LEN+1];
  short len;

  switch (serverType)
  {
    case IPC_SQLSSCP_SERVER:
      overridingDefineName = "=_MX_SSCP_PROCESS_PREFIX"; 
      break;
    case IPC_SQLSSMP_SERVER:
      overridingDefineName = "=_MX_SSMP_PROCESS_PREFIX";
      break;
    case IPC_SQLQMS_SERVER:
      overridingDefineName = "=_MX_QMS_PROCESS_PREFIX";
      break;
    case IPC_SQLQMP_SERVER:
      overridingDefineName = "=_MX_QMP_PROCESS_PREFIX";
      break;
    case IPC_SQLQMM_SERVER:
      overridingDefineName = "=_MX_QMM_PROCESS_PREFIX";
      break;
    default:
      return NULL;
  }
  
  if (overridingDefineName)
  {
        processPrefixFromEnvvar = 
	  getenv((overridingDefineName[0] == '=') 
	         ? &overridingDefineName[1]
	         : overridingDefineName);
   }
 
    if (processPrefixFromEnvvar != NULL)
    { 
      if (processPrefixFromEnvvar[0] != '$' || str_len(processPrefixFromEnvvar) > 4) 
        return NULL;
      processPrefix = processPrefixFromEnvvar;
    }  
   if (processPrefix == NULL)
   {
       switch (serverType)
       {
         case IPC_SQLSSCP_SERVER:
           processPrefix = SSCP_PROCESS_PREFIX;
           break;
         case IPC_SQLSSMP_SERVER:
           processPrefix = SSMP_PROCESS_PREFIX;
           break;
         case IPC_SQLQMS_SERVER:
           processPrefix = QMS_PROCESS_PREFIX;
           break;
         case IPC_SQLQMP_SERVER:
           processPrefix = QMP_PROCESS_PREFIX;
           break;
         case IPC_SQLQMM_SERVER:
           processPrefix = QMM_PROCESS_PREFIX;
           break;
         default:
           return NULL;
       }
   }
   str_sprintf(processName, "%s%d", processPrefix, cpuNum);
  return processName;
}

void IpcAwaitiox::DoAwaitiox(NABoolean ignoreLrec)
{
  if (!completed_)
  {
    fileNum_ = ignoreLrec ? -2 : -1; // Reminder: change to -1
    bufAddr_ = 0;
    count_ = retCode_ = lastError_ = 0;
    tag_ = 0;
    if (ignoreLrec)
       condCode_ = BAWAITIOXTS(&fileNum_, &bufAddr_, &count_, &tag_, 0);
    else
       condCode_ = BAWAITIOX(&fileNum_, &bufAddr_, &count_, &tag_, 0);
    if (fileNum_ == -2)
      fileNum_ = -1;
    completed_ = TRUE;
    if (condCode_ != 0) // not successful completion
    {
      retCode_ = BFILE_GETINFO_(fileNum_, &lastError_);
      if (retCode_ == 0 && lastError_ == 40)
      {
	fileNum_ = -1;
	completed_ = FALSE;
      }
    }
    retryCount_ = 0;
  }
  else
  {
    retryCount_ += 1;
    bool loop = false;
    char *envvarPtr;
    if (retryCount_ >= 10)
    {
      envvarPtr = getenv("IPC_LOOP_ON_UNEXPECTED_COMPLETION");
      if (envvarPtr && atoi(envvarPtr) == 1)
        loop = true;
    }
    while (retryCount_ >= 10  && loop)
    {
      sleep(10);
    }
    assert(retryCount_ < 10);
  }
}

Int32 IpcAwaitiox::ActOnAwaitiox(void **bufAddr, Int32 *count, SB_Tag_Type *tag)
{
  *bufAddr = bufAddr_;
  *count = count_;
  *tag = tag_;
  completed_ = FALSE;
  return condCode_;
}

// These operator delete functions will be called if initialization throws an
// exception. They remove a compiler warning with the .NET 2003 compiler.
void IpcMessageBuffer::operator delete(void *p,
                                       IpcMessageObjSize bufferLength,
                                       CollHeap *heap, NABoolean bIgnore)
{
  if (heap)
    heap->deallocateMemory(p);
}

void IpcAllConnections::printConnTraceLine(char *buffer, int *rsp_len, IpcConnection *conn)
{
      Int32 lineLen;
      Int32 cpu, node, pin;
      SB_Int64_Type seqNum = -1;
      IpcNetworkDomain domain;
      cpu = pin;
      domain = conn->getOtherEnd().getDomain();

      if (domain == IPC_DOM_GUA_PHANDLE)
      {
        GuaProcessHandle *otherEnd = (GuaProcessHandle *)&(conn->getOtherEnd().getPhandle().phandle_);
        if (otherEnd)
          otherEnd->decompose(cpu, pin, node
                             , seqNum
                             );
      }

      if (!memcmp(conn->getEyeCatcher(), "STBL", 4))
      {
      }
      else if (conn->castToSMConnection())
      {
        cpu = ((SMConnection *)conn)->getSMTarget().node;
        pin = ((SMConnection *)conn)->getSMTarget().pid;
        sm_id_t smQueryId = ((SMConnection *)conn)->getSMTarget().id;
        Int32 smTag = ((SMConnection *)conn)->getSMTarget().tag;
        *rsp_len = sprintf(buffer + *rsp_len,
                           "%.4s %s %.3d,%.8d,%.8" PRId64 
                           ",%.8" PRId64 ",%.8d\n",
                           conn->getEyeCatcher(),
                           IpcConnStateName[conn->getState()],
                           cpu, pin, seqNum, smQueryId, smTag);
      }
      else
      {
        *rsp_len  = sprintf(buffer, "%.4s %s %.3d,%.8d,%.8" PRId64 "\n",
                          conn->getEyeCatcher(), 
                          IpcConnStateName[conn->getState()], 
                          cpu, pin, seqNum);

      }
}

void IpcAllConnections::infoAllConnections(char *buffer, int max_len, int *rsp_len)
{
  CollIndex i, usedLength = getUsedLength();
  IpcConnection *conn;
  enum { MAX_LINE = 250 }; //  Currently printConnTraceLine sprintf's about 80 characters
  char local_buffer[MAX_LINE];
  int  line_len = 0;
  for (i = 0; i < usedLength; i++)
  {
    if (getUsage(i) != UNUSED_COLL_ENTRY)
    {
      conn = usedEntry(i);
      local_buffer[0] = '\0';
      line_len = 0;
      printConnTraceLine(local_buffer, &line_len, conn);
      if ( (*rsp_len + line_len + 1) <= max_len)
      {
         strncpy( (buffer + *rsp_len), local_buffer, (size_t)line_len );
         *rsp_len += line_len;
      }
      else
      {
         sprintf( (buffer + *rsp_len - 16 ),"\nOUT OF BUFFER!\n");
         return;
      }
    }
  }
  *(buffer + *rsp_len) = '\n';
  *rsp_len += 1;
}

void IpcSetOfConnections::infoPendingConnections(char *buffer, int max_len, int *rsp_len)
{
  IpcAllConnections *allConns;
  IpcConnection *conn;
  enum { MAX_LINE = 250 };  // Currently printConnTraceLine sprintf's about 80 characters
  char local_buffer[MAX_LINE];
  int line_len = 0;
  CollIndex i, firstConnIndex = 0;
  if (!setToNext(firstConnIndex))
    return;
  else
  {
    conn = element(firstConnIndex);
    allConns = conn->getEnvironment()->getAllConnections();
    local_buffer[0] = '\0';
    line_len = 0;
    allConns->printConnTraceLine(local_buffer, &line_len, conn);
    if ( (*rsp_len + line_len + 1) <= max_len )
    {
       strncpy( (buffer + *rsp_len), local_buffer, line_len);
       *rsp_len += line_len;
    }
    else
    {
       sprintf( (buffer + *rsp_len - 16), "\nOUT OF BUFFER!\n");
       return;
    }
  }
  for (i = firstConnIndex + 1; setToNext(i); i++)
  {
    conn = element(i);
    local_buffer[0] = '\0';
    line_len = 0;
    allConns->printConnTraceLine(local_buffer, &line_len, conn);
    if ( (*rsp_len + line_len + 1) <= max_len )
    {
       strncpy( (buffer + *rsp_len), local_buffer, line_len);
       *rsp_len += line_len;
    }
    else
    {
       sprintf( (buffer + *rsp_len - 16), "\nOUT OF BUFFER!\n");
       return;
    }
  }
  *(buffer + *rsp_len) = '\n';
  *rsp_len += 1;
}

void operator delete(void *p, IpcEnvironment *env)
{
  env->getHeap()->deallocateMemory(p);
}
