/**********************************************************************
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
**********************************************************************/
/* -*-C++-*-
 *****************************************************************************
 *
 * File:         IpcMsg.cpp
 * Description:  Implementation for the IPC classes using the NSK messaging
 *               API.
 *
 * Created:      6/25/99
 * Language:     C++
 *
 *
 *
 *
 *****************************************************************************
 */

#define AEVENT 1

#define set_extern_data

#include "Platform.h"

#include "rosgen.h"
#include "fs_rosetta_dml.h"
#include "dpxnsdp2"
#include "yfsiopen"
//#include "dfsiopn.h"
enum {FS_SMS_VERSION_MAY94 = 1};
#include "wdialect"
#include "ppctlc(WAIT, SETSTOP)"
#include "dmsghi.h"
#include "psignalc.h(PK_SIG_SYSTEMCALL_ABORTINQUIRE_, PK_SUSPEND_DISALLOW_SET_)"
#include "pmallocc(ADDRESS_WIRE_, ADDRESS_UNWIRE_)"
#include "hpfs2f(fs2_transid_to_buffer)"
#include "ffilcpp(FS_SQL_SETUPREQUESTINFO, FS_SQL_PUTMSGIDINACB, \
                  FS_SQL_RESETAFTERREPLY)"
#include "Int64.h"

#define _resident
#define _priv
#include "ExCollections.h"
#include "Ipc.h"
#include "str.h"
#include "ComDiags.h"
#include "NAExit.h"
#include "ipcmsg.h"
#include <fcntl.h>
#include "logmxevent.h"

extern "C" {
//#include <cextdecs.h>
#include "cextdecs.h(PROCESSHANDLE_TO_FILENAME_,PROCESSHANDLE_DECOMPOSE_,FILE_OPEN_,SETMODE,FILE_GETINFO_,FILE_CLOSE_, AWAITIOX,PROCESS_DELAY_)"
#include <tal.h>
// should be #include <zsysc.h>
#include "zsysc.h"
}

//Function used to get a pointer to the pfs
_callable void fs2_get_pfsaddr(Long *);

//Function used to get transid from the filesystem
extern "C" _priv _resident int_16 FS_GETTRANSID_
(
 extaddr           tubaddr,      // input
                                 // the address of the Trans Usage Block
                                 // (having the tcbref as the 1st
                                 // field), or 0D or ptmfnocurtransid
 phandle_template *destination,  // input
                                 // where it will be sent
 int_16           *buffer);      // output
                                 // the tcbref is placed here

extern "C" _priv int_16 TMFLIBFS_ABORTTRANS_
(
  int_16 *tcbref_ptr, 
  short disposition 
);


// -----------------------------------------------------------------------
// Methods for class GuaMsgConnectionToServer
// -----------------------------------------------------------------------

GuaMsgConnectionToServer::GuaMsgConnectionToServer(
     IpcEnvironment *env,
     const IpcProcessId &procId,
     NABoolean usesTransactions,
     unsigned short nowaitDepth,
     const char *eye) : IpcConnection(env,procId,eye)
{
  openFile_                 = InvalidGuaFileNumber;
  nowaitDepth_              = nowaitDepth;
  maxIOSize_                = env->getGuaMaxMsgIOSize();

  activeIOs_                = new(env) ActiveIOQueueEntry[nowaitDepth_];
  //get the length of the request/reply control structure
//  int controllen = sizeof(fs_fs_template) + sizeof(fs_fs_template::__writeread);
  Int32 controllen = sizeof(fs_fs_writeread);

  //initialize each entry in the Active IOs queue
  for (unsigned short i = 0; i < nowaitDepth_; i++)
    {
      activeIOs_[i].writeDataCBAPtr_ = (void*) new(env) char[sizeof(NSK_CBA)];
      activeIOs_[i].readDataCBAPtr_ = (void*) new(env) char[sizeof(NSK_CBA)];
      activeIOs_[i].controlCBAPtr_ = (void*) new(env) char[sizeof(NSK_CBA)];  
      activeIOs_[i].controlBuf_ = (void*) new(env) char[controllen];
      activeIOs_[i].inUse_ = FALSE;
      activeIOs_[i].expectReply_ = FALSE;
      activeIOs_[i].msgid_ = 0;
      activeIOs_[i].transid_ = -1;
      activeIOs_[i].buffer_ = activeIOs_[i].readBuffer_ = NULL;
    }
  lastAllocatedEntry_       = nowaitDepth_-1;

  numOutstandingIOs_        = 0;
  partiallySentBuffer_      = NULL;
  chunkBytesSent_           = 0;
  partiallyReceivedBuffer_  = NULL;
  chunkBytesRequested_      = 0;
  chunkBytesReceived_       = 0;
  usesTransactions_         = usesTransactions;
  abortXnOnPathErrors_      = FALSE;
  guaErrorInfo_             = GuaOK;
  currentEntry_             = 0;

  // We need a nowait dept of at least 2, one for a message and another
  // one for out-of-band messages (not really implemented yet).
  //  assert(nowaitDepth_ >= 2);
  
  sendCallbackBufferList_ = new(env) IpcMessageBuffer*[nowaitDepth_];
  for (unsigned short i = 0; i < nowaitDepth_; i++)
    sendCallbackBufferList_[i] = NULL;

  // now open the server process
  openPhandle(NULL);
}

GuaMsgConnectionToServer::~GuaMsgConnectionToServer()
{
  closePhandle();

  CollHeap *heap = getEnvironment()->getHeap();
  for (Int32 i = 0; i < nowaitDepth_; i++)
    {
      ActiveIOQueueEntry &entry = activeIOs_[i];
      heap->deallocateMemory(entry.writeDataCBAPtr_);
      heap->deallocateMemory(entry.readDataCBAPtr_);
      heap->deallocateMemory(entry.controlCBAPtr_);
      heap->deallocateMemory(entry.controlBuf_);
    }
  heap->deallocateMemory(activeIOs_);
  heap->deallocateMemory(sendCallbackBufferList_);
}

void GuaMsgConnectionToServer::send(IpcMessageBuffer *buffer)
{
  // simply add the new buffer to the send queue and try to start
  // as many new I/O operations as possible
  queueSendMessage(buffer);
  while (tryToStartNewIO())
    ;
}

void GuaMsgConnectionToServer::receive(IpcMessageStreamBase *msg)
{
  // Receiving from a Guardian server is implicit, since the WRITEREADX
  // call performs both a send and a receive operation together. However,
  // we still need to add the callback and, if the I/O has already
  // completed, call the callback.

  addReceiveCallback(msg);

  // maybe the Guardian I/O has already completed and the buffer is
  // waiting in the base class' receive queue
  IpcMessageBuffer *receiveBuf;
  while (receiveBuf = getNextReceiveQueueEntry())
    {
      // yes, so just call its callback
      receiveBuf->callReceiveCallback(this);
    }
}

NABoolean GuaMsgConnectionToServer::moreWaitsAllowed()
{
  return !stopWait_;
}

//
// Wait for an I/O reply. After receives a reply, the I/O entry looks like:
//
//   - entry.buffer_=entry.readBuffer_=reply buffer
//
WaitReturnStatus GuaMsgConnectionToServer::wait(IpcTimeout timeout, UInt32 *eventConsumed = NULL, IpcAwaitiox *ipcAwaitiox)
{
  NABoolean ipcAwaitioxCompleted = ipcAwaitiox != NULL;
  if (ipcAwaitioxCompleted)
    ipcAwaitioxCompleted = ipcAwaitiox->getCompleted();
  assert(ipcAwaitioxCompleted == FALSE); 
        //Internal error: AWAITIOX should not have completed a message system based operation
  short error = 0;
  short waitField = LDONE | LSIG;
  short status = 0;
  direct_globals_template * pfsptr;
  fs2_get_pfsaddr((Long*)&pfsptr);

  if ((ULng32)(openFile_) >= (ULng32)(pfsptr->numftentries))
  {
     // If this connection is not open, there can be nothing to wait on.
     // Return an error in this case to indicate that this is an inappropriate
     // call. Do no wait on this connection again.
     guaErrorInfo_ = FENOTOPEN;
     setErrorInfo(-1);
     handleIOError();
     stopWait(TRUE);
     return WAIT_OK;
  }

  // don't do anything if the connection is in an error state and
  // there are no more pending requests to work on.
  if (getState() == ERROR_STATE AND numOutstandingIOs_ <= 0
      AND numQueuedSendMessages() <= 0)
  { // no more waits on this connection
    stopWait(TRUE);
    return WAIT_OK;
  }
  stopWait(FALSE);  
  setBreakReceived(FALSE);

  // try to send or receive first if there is a timeout specified,
  // or if we don't have any IOs outstanding.  Also, try more
  // I/O to cover the special case that we've posted only the 
  // first chunk of an multi-chunk msg.  

  // This latter scenario is possible because of the logic on 
  // tryToStartNewIO that gives up if the per-process limit on 
  // MQCs would be exceeded.  It required this special handling
  // for multichunk messages, as explained in the next paragraph.

  // The connection may have to give up either 1) before the 
  // first chunk is posted, or 2) before the second chunk is 
  // posted or 3) after the second chunk is posted.  
  // The tryToStartNewIO method is called from this class's send
  // method and then also from two places in this wait method.   
  // In case 1), the test of numOutStandingIOs_ will be 
  // sufficient to cause tryToStartNewIO to be called directly 
  // below this comment.  For case 3), the other call to 
  // tryToStartNewIO will be taken, because the a_message_is_done 
  // will be set to true after the server replies to the chunk(s) 
  // already posted.  But for case 2), we need special handling, 
  // because the server does not reply to the first chunk until 
  // all chunks are sent.  And this class's send method will not 
  // be called again for the multichunk message.  So the additional
  // test below to detect that the second chunk has not yet been
  // sent, is needed to force the call to tryToStartNewIO.

  if (timeout != IpcImmediately || 
      numOutstandingIOs_ == 0   || 
      chunkBytesSent_ == maxIOSize_)
    while (tryToStartNewIO())
      ;

  // try to complete I/Os within the given time limit,
  // if there are outstanding I/Os
  if (numOutstandingIOs_ == 0)
    return WAIT_OK;

  //used to mark the point in the activeIOs queue from
  //where we started checking for IO completion
  unsigned short start = currentEntry_;
  //indicates we found a completed message
  Int32 a_message_is_done = 0;
  
  //check if anyone of our messages has completed
  do {
    if ((activeIOs_[currentEntry_].inUse_) &&
        (activeIOs_[currentEntry_].expectReply_))
      {
        //check if the message for this entry is done
        if (MSG_ISDONE2_((NSK_msId2)activeIOs_[currentEntry_].msgid_))
          {
            getEnvironment()->setEvent(TRUE, AEVENT);
            a_message_is_done = 1;
            break;// found a complete message break
          }
      }

    currentEntry_++;

    /*
    ** Not that expensive since if-conversion reduces the control flow into
    ** predicated instructions 
    */
    if (currentEntry_ == nowaitDepth_)
      currentEntry_ = 0;
  } while(currentEntry_ != start );
 
  //
  // IMPORTANT: MUST NOT have any early return statement from here until
  // the end of this function. So that the loop at the end of the function
  // that issues receive callbacks will always be executed if any receive
  // entries were queued by this function.
  //

  NABoolean interrupt = FALSE;
 
  //if no message is done wait, then check again for message completion
  //for nowaited mode (timeout == 0), do not call WAIT.
  if ((!a_message_is_done && timeout != 0) || getEnvironment()->lsigConsumed())
    {
      // Only wait for LDONE if breakEnabled is not set.
      IpcTimeout waitTimeout = timeout == 0 ? -2 : timeout;
      if (getEnvironment()->lsigConsumed())
        status = LSIG;
      else
        {
          if (getEnvironment()->breakEnabled())
            status = WAIT(waitField, waitTimeout);//wait for timeout
          else
            status = WAIT(LDONE, waitTimeout);//wait for timeout
        }
      if (!status)
        {
          a_message_is_done = FALSE; // timed out
        }
      else
        {
          if (status & LSIG)
            {
              getEnvironment()->setLsigConsumed(FALSE);
              short oldsigmod = PK_SUSPEND_DISALLOW_SET_(1);
              error = PK_SIG_SYSTEMCALL_ABORTINQUIRE_();
              PK_SUSPEND_DISALLOW_SET_(oldsigmod);
              if (error > 0)
                {
                  // received a signal.
                  guaErrorInfo_ = FE_EINTR;
                  setBreakReceived(TRUE);
                  setErrorInfo(-1);
                  handleIOError();
                  interrupt = TRUE; // FE_EINTR
                }
            } // if (status & LSIG)
          else
            {
              //woken up because some message completed on the LDONE queue
              //somewhere. Check if the message completed was ours.
              getEnvironment()->setLdoneConsumed(TRUE);
              do {
                //check if this activeIOs_ entry is in Use
                if (activeIOs_[currentEntry_].inUse_ &&
                    activeIOs_[currentEntry_].expectReply_)
                  {
                    //check if the message for this entry is done
                    if (MSG_ISDONE2_((NSK_msId2)activeIOs_[currentEntry_].msgid_))
                      {
                        a_message_is_done = 1;
                        // The following line of code was added but is being
                        // removed because:
                        // a) LDONE consumed which servers the same purpose
                        //    has already been set, and
                        // b) a compiler performance regression occurred and
                        //    it's an unlikely but possible cause
                        //getEnvironment()->setEvent(TRUE, AEVENT);
                        break;
                      }
                  }

                currentEntry_++;
                if (currentEntry_ == nowaitDepth_) {
                  currentEntry_ = 0;
                }
              } while(currentEntry_ != start );
            }
        }
    }

  if (a_message_is_done)
    {
      ActiveIOQueueEntry &entry = activeIOs_[currentEntry_];

      NSK_msResult2  results;

      short oldstop = SETSTOP(2);//become unstoppable

      //Pickup reply and terminate message
      MSG_BREAK2_((NSK_msId2)entry.msgid_, &results,
                  (NSK_PHandle _ptr64 *)getOtherEnd().getPhandle().phandle_);

      //Get the error from the reply control buffer
      message_header_template * ReplyControlBuf = (message_header_template *)entry.controlBuf_;
      error = ReplyControlBuf->error();

      if (error == GuaTimeoutErr)
        {
          // timeout does not set the connection into an error state but it
          // causes a return. later we shall wait on this connection again.
          SETSTOP(oldstop);//become stoppable
          guaErrorInfo_ = error;
          return WAIT_OK;
        }

      //reset the filesystem data structures
      Int64 localTransid = entry.transid_;
      resetAfterReply(entry.msgid_, error, &localTransid);

      SETSTOP(oldstop);//become stoppable

      // we have got the reply for this I/O entry
      entry.expectReply_ = FALSE;

      if (error)
        {
          // Remember the Guardian error code
          guaErrorInfo_ = error;
          setErrorInfo(-1);
          handleIOErrorForEntry(entry);
        }
      else
        {
          cleanUpActiveIOEntry(entry);

          //get # of bytes written in reply
          ULng32 countRead = (ULng32)results.rr_dataSize;

          // Now try to figure out what the original operation was, so
          // we know what to do with the IpcMessageBuffer:
          //
          // a) If this I/O was a write operation for part of a buffer,
          //    then another operation for the same buffer is following, so
          //    just remove the outstanding I/O entry.
          // b) If this I/O returned part of a buffer, then we have to issue
          //    another I/O operation for the rest of the buffer.
          // c) If we have received all of the data, the buffer is ready to
          //    be delivered to its destination. If the buffer has its callback
          //    assigned, then call the callback, otherwise add the buffer to
          //    the receive queue that is managed by the base class,
          //    IpcConnection. "completelyReadBuffer" is set for case c)
          //
          if (entry.receiveBufferSizeLeft_ == 0)
            {
              // case a)
              assert(countRead == 0);
        
              // Note that this does NOT count as a completion, we don't
              // let the upper layers know that we are using multiple
              // Guardian I/Os for this.      
            }
          else
            {
              // we did expect data back, case b) or c)
              if (entry.offset_ == 0)
                {
                  if (numOutstandingIOs_ > 0)
                    {
                      // more pending I/Os on this connection. check if there
                      // are other chunks of entry.buffer_.
                      //
                      //  - in multi-chunk mode, when server receives any
                      // of the subsequent chunk, it sends an empty reply
                      // right away. however, when server receives the first
                      // chunk, it does not reply right away. instead, server
                      // holds the first chunk until it has received all
                      // subsequent chunks and then server replies to the
                      // first chunk. on the client side, even though the
                      // first chunk receives its reply the last, but since
                      // we are looking at a randomly selected I/O entry,
                      // "entry" could be any one of the multi-chunk I/Os.
                      //
                      for (unsigned short i = 0; i < nowaitDepth_; i++)
                        {
                          ActiveIOQueueEntry &nextEntry = activeIOs_[i];
                          if (nextEntry.inUse_ &&
                              nextEntry.buffer_ == entry.buffer_)
                            {
                              // entry is the first chunk and nextEntry
                              // is a subsequent chunk of a multi-chunk
                              // send buffer.
                              // only the first chunk receives reply.
                              // there is no reply for subsequent chunks.

                              // we must clean up I/O on nextEntry now
                              // or otherwise the shared send buffer
                              // entry.buffer_ can get deallocated
                              // (see a few lines below). after that
                              // we cannot clean up I/O on nextEntry as
                              // entry.buffer_->chunkLockCount_ is not
                              // longer accessible.
                              cleanUpActiveIOEntry(nextEntry);

                              if (numOutstandingIOs_ == 0)
                                // no more pending I/Os on this connection
                                break;
                            }
                        }
                    }

                  if (entry.buffer_->isShared())
                    {
                      // no longer need the shared send buffer. release it.
                      entry.buffer_->decrRefCount(getEnvironment());
                      // now use only the reply buffer 
                      entry.buffer_ = entry.readBuffer_;
                    }
          
                  // since this is the first (maybe only) chunk of the message
                  // buffer, we can get the length of the total message by
                  // looking into the message header.

                  // Get the size of the message sent (or the reply buffer if shared)
                  IpcMessageObjSize bytesSent = entry.buffer_->getMessageLength();

                  // unpack message header which contains reply message length
                  InternalMsgHdrInfoStruct *msgHdr = 
                    new( (IpcMessageObj*)(entry.buffer_->data(0)) )
                    InternalMsgHdrInfoStruct(NULL);
                  IpcMessageObjSize msgLen = msgHdr->getMsgLengthFromData();
          
                  // remember the real length of the message coming back
                  entry.buffer_->setMessageLength(msgLen);
          
                  // check whether this is case b) or c)
                  if (msgLen == (IpcMessageObjSize) countRead)
                    {
                      // The "normal" case c). This is a single-chunk reply.

                      // If we were sending a large buffer(more than one chunk)
                      // and just received a small buffer (one chunk) then
                      // release the large buffer to conserve space on the IPC
                      // heap.
                      if (bytesSent > maxIOSize_)
                        {
                          entry.buffer_ = entry.buffer_->resize(getEnvironment(), msgLen);
                        }

                      queueReceiveMessage(entry.buffer_);
                    }
                  else
                    {
                      // Case b). This is a multi-chunk reply. Switch to the
                      // chunk protocol, countRead bytes are already received
                      // back from the server.
                      if (msgLen > entry.buffer_->getBufferLength() ||
                          bytesSent > maxIOSize_)
                        {
                          // We want to resize the reply buffer if either:
                          // - The server has a reply message that is larger
                          //   than what our buffer can hold
                          // - The request buffer was large (more than one
                          //   chunk) and may now be consuming space
                          //   unnecessarily on the IPC heap
                          entry.buffer_->setMessageLength(countRead);
                          entry.buffer_ = entry.buffer_->resize(getEnvironment(), msgLen);
                          entry.buffer_->setMessageLength(msgLen);
                        }
            
                      // do some sanity checks, make sure we don't have two
                      // partial buffers at a time
                      assert(partiallyReceivedBuffer_ == NULL);
                      assert(msgLen > (IpcMessageObjSize) countRead);
            
                      // move some information from the entry to data members
                      // in the connection while the chunk protocol is going on
                      partiallyReceivedBuffer_ = entry.buffer_;
                      chunkBytesRequested_ = countRead;
                      chunkBytesReceived_ = countRead;
                      getEnvironment()->getAllConnections()->
                        setReceivedPartialMessage(TRUE);
                    }
                } // first (maybe only) chunk
              else
                {
                  // case b), this is not the first chunk
                  assert (partiallyReceivedBuffer_ == entry.buffer_);
                  chunkBytesReceived_ += countRead;
          
                  if (chunkBytesReceived_ == entry.buffer_->getMessageLength())
                    {
                      // this was the last chunk
                      queueReceiveMessage(partiallyReceivedBuffer_);
                      partiallyReceivedBuffer_ = NULL;
                      chunkBytesRequested_ = 0;
                      chunkBytesReceived_ = 0;
                    }
                  else
                    {
                      getEnvironment()->getAllConnections()->
                        setReceivedPartialMessage(TRUE);
                    }
                } // not the first chunk
            } // case b) or c)
      
          // this I/O completed
          if (getState() != ERROR_STATE && numOutstandingIOs_ == 0)
            setState(ESTABLISHED);

          // after waiting, try (again) to start as many new I/O operations as
          // possible
          while (tryToStartNewIO())
            ;
        } // if (error) else
    } // if (a_message_is_done)

  IpcMessageBuffer *receiveBuf;
  NABoolean aCallbackIsCalled = FALSE;
  while (receiveBuf = getNextReceiveQueueEntry())
    {
      // When the user of this connection sets trustIncomingBuffers_ to
      // FALSE then we perform an integrity check on all incoming
      // message buffers. A failure causes the connection to transition
      // to the ERROR_STATE state.
      if (!getTrustIncomingBuffers() && getState() != ERROR_STATE)
        {
          if (!receiveBuf->verifyBackbone())
            {
              setIpcMsgBufCheckFailed(TRUE);
              guaErrorInfo_ = 0;
              setErrorInfo(-1);
              setState(ERROR_STATE);
            }
        }

      receiveBuf->callReceiveCallback(this);
      aCallbackIsCalled = TRUE;
    }

  // In the ERROR_STATE state we may have to announce I/O completion after 
  // callbacks are issued. The setState() method has the job of
  // detecting when I/O is complete and informing the IpcEnviroment at
  // the appropriate time. Even though it's not intuitive to call
  // setState(ERROR_STATE) here (because we are already in the ERROR_STATE state),
  // we make the call anyway to trigger any necessary bookkeeping.
  if (aCallbackIsCalled && getState() == ERROR_STATE)
    setState(ERROR_STATE); 

  return interrupt ? WAIT_INTERRUPT : WAIT_OK ;
}

GuaMsgConnectionToServer * GuaMsgConnectionToServer::castToGuaMsgConnectionToServer()
{
  return this;
}

Int32 GuaMsgConnectionToServer::numQueuedSendMessages()
{
  return sendQueueEntries();
}

Int32 GuaMsgConnectionToServer::numQueuedReceiveMessages()
{
  return receiveQueueEntries();
}

void GuaMsgConnectionToServer::populateDiagsArea(ComDiagsArea *&diags,
                                                 CollHeap *diagsHeap)
{
  if (guaErrorInfo_ != GuaOK)
  {
    IpcAllocateDiagsArea(diags,diagsHeap);
    
    *diags << DgSqlCode(-2034) << DgInt0(guaErrorInfo_);
    getEnvironment()->getMyOwnProcessId(IPC_DOM_GUA_PHANDLE).
      addProcIdToDiagsArea(*diags,0);
    getOtherEnd().addProcIdToDiagsArea(*diags,1);
  }

  if (getIpcMsgBufCheckFailed())
  {
    IpcAllocateDiagsArea(diags, diagsHeap);

    *diags << DgSqlCode(-2037);
    getEnvironment()->getMyOwnProcessId(IPC_DOM_GUA_PHANDLE).
      addProcIdToDiagsArea(*diags, 0);
    getOtherEnd().addProcIdToDiagsArea(*diags, 1);
  }
}

//
// Send a new I/O request.
//
// In Guardian each msg I/O requires a write buffer to send the request, and
// a read buffer used to receive the reply. A same buffer can be used as both
// the write buffer and the read buffer. here we will refer the write buffer
// as the "send buffer", and the read buffer as the "reply buffer".
//
// In our implementation we use ActiveIOQueueEntry to represent a msg I/O.
// entry.buffer_ and entry.readBuffer_ can be used as the send buffer and the
// reply buffer. however, the values of entry.buffer_ and entry.readBuffer_
// depend on the type of the message buffer to be sent. There are four
// possible scenarios:
//
// 1. multi-chunk send buffer, shared by multiple connections:
//
//   - first chunk: entry.buffer_=send buffer
//                  entry.readBuffer_=reply buffer
//   - subsequent chunks: entry.buffer_=send buffer, entry.readBuffer_=NULL
//
// 2. multi-chunk send buffer, single connection (not shared):
//
//   - first chunk: entry.buffer_=entry.readBuffer_=send buffer
//   - subsequent chunks: entry.buffer_=send buffer, entry.readBuffer_=NULL
//
// 3. single-chunk send buffer, shared by multiple connections
//
//   - entry.buffer_=send buffer
//   - entry.readBuffer_=reply buffer
//
// 4. single-chunk send buffer, single connection (not shared):
//
//   - entry.buffer_=entry.readBuffer_=send buffer
//


NABoolean GuaMsgConnectionToServer::tryToStartNewIO()
{
  // Any more messages or parts of messages to send?

  // There is nothing to do if there are neither new messages nor
  // incompleted partial messages.
  if (sendQueueEntries() == 0 && !partiallySentBuffer_ &&
      !partiallyReceivedBuffer_)
    return FALSE;

  // do not allow new send if a partial message is being received and we
  // already have requested all the reply data for it.
  if (partiallyReceivedBuffer_ &&
      partiallyReceivedBuffer_->getMessageLength() == chunkBytesRequested_)
    return FALSE;

  // Can't have more than nowaitDepth_ - 1 I/Os outstanding, except
  // when there is an out-of-band message for which we make an exception.
  // If there is an out-of-Band message then assume that it was placed
  // in front of the send queue.
  if (numOutstandingIOs_ >=
      (IFX (partiallySentBuffer_ OR partiallyReceivedBuffer_)
       THENX nowaitDepth_ 
       ELSEX (nowaitDepth_-1)))
    return FALSE;
  
  // Check if the per-process limit on MQCs is exceeded.
  // Note that this should be reconsidered when this code is 
  // multithreaded.
  short numMsgsActual;
  if (MESSAGESYSTEMINFO(5, &numMsgsActual))
    assert(0);
  if (numMsgsActual+1 >= getEnvironment()->getMaxPerProcessMQCs()) 
    return FALSE;

  // If we reach here this means we can start another nowait I/O;
  // get to the outstanding I/O entry that is to be filled next.
#ifndef NDEBUG
  NABoolean wrapAroundCheck    = FALSE;
#endif

  // We may have to return early from this method if we cannot find
  // space on the IPC heap for an outgoing buffer. If that happens
  // we'll want to restore the original value of lastAllocatedEntry_.
  unsigned short originalLastAllocated = lastAllocatedEntry_;

  // find an entry that is not in use
  while (activeIOs_[lastAllocatedEntry_].inUse_)
    {
      // increment lastAllocatedEntry_ modulo nowaitDepth_
      lastAllocatedEntry_++;
      if (lastAllocatedEntry_ == nowaitDepth_)
        {
          lastAllocatedEntry_ = 0;
#ifndef NDEBUG
          assert(!wrapAroundCheck);  // to detect infinite loop (shouldn't happen)
          wrapAroundCheck = TRUE;
#endif
        }
    }
  // we have found an entry that is not in use
  ActiveIOQueueEntry &entry = activeIOs_[lastAllocatedEntry_];
  //assert(!entry.inUse_ && !entry.expectReply_);

  // ---------------------------------------------------------------------
  // set up a new outstanding IO entry, depending on what to do next
  // but don't start the corresponding I/O quite yet
  // ---------------------------------------------------------------------

  // initialize all fields of the entry (there is no constructor)
  entry.bytesSent_             = 0;
  entry.receiveBufferSizeLeft_ = 0;
  entry.offset_                = 0;
  entry.msgid_                 = 0;

  // These help keep track of the need to callSendCallback.
  NABoolean isFirstChunk = FALSE;
  NABoolean isLastChunk = FALSE;

  // ---------------------------------------------------------------------
  // Decide what to do, depending on the currently pending buffers and
  // IOs:
  // 
  // a) send another chunk of a large message down to the server without
  //    asking for data back
  // b) request some more data from the server, if the server replied
  //    with a partial message and we haven't asked for all of the
  //    rest of the data yet (never interleave this with I/Os of
  //    steps a) and b), so the server won't get confused
  // c) get another message from the send queue and find out that it
  //    is too long for a single chunk, so send the first piece
  // d) should be the normal case, get the next message from the send
  //    queue and send it in a single chunk
  // ---------------------------------------------------------------------

  if (partiallySentBuffer_)
    {
      // case a) continue sending more chunks for this buffer
      // but don't ask for reply data, since we may want the reply to come
      // back at entry.buffer_->data(0) 
      entry.buffer_ = partiallySentBuffer_;
      // for multi-chunk buffer, whether shared or not, the read buffer for
      // any chunk after first chunk is NULL.
      entry.readBuffer_ = NULL;

      assert(chunkBytesSent_ < entry.buffer_->getMessageLength());

      entry.bytesSent_ = MINOF(maxIOSize_,
			       entry.buffer_->getMessageLength() -
			       chunkBytesSent_);
      entry.offset_ = chunkBytesSent_;
      chunkBytesSent_ += entry.bytesSent_;
      // if this is the last chunk ...
      if (chunkBytesSent_ >= entry.buffer_->getMessageLength())
        {
          // we're done sending chunks
          partiallySentBuffer_ = NULL;
          chunkBytesSent_ = 0;
	  // can call the send callback now
          isLastChunk = TRUE;
	}

      lastSentBuffer_ = entry.buffer_;
    }
  else if (partiallyReceivedBuffer_)
    {
      // b) next thing to do is to receive another chunk from the server
      entry.buffer_ = entry.readBuffer_ = partiallyReceivedBuffer_;
      entry.offset_ = chunkBytesRequested_;
      entry.receiveBufferSizeLeft_ =
	MINOF(maxIOSize_,
	      entry.buffer_->getMessageLength() - chunkBytesRequested_);
      chunkBytesRequested_ += entry.receiveBufferSizeLeft_;

      lastSentBuffer_ = entry.buffer_;
    }
  else
    {
      // get the next buffer to send from the send queue and check
      // whether it can be sent in one piece

      assert(sendQueueEntries() > 0);
      IpcMessageBuffer *nextToSend = sendQueue()[0];
      assert(nextToSend);

      // assume request and reply use same buffer
      entry.buffer_ = entry.readBuffer_ = nextToSend;
      isFirstChunk = TRUE;

      if (entry.buffer_->getRefCount() > 1 || entry.buffer_->isShared())
        {
          if (!entry.buffer_->initLockCount(getEnvironment()->getHeap(),
                                            maxIOSize_))
            {
              // We ran out of space on the IPC heap. This is OK and we
              // can return early. Higher layers will redrive the I/O for
              // this connection.
              getEnvironment()->setHeapFullFlag(TRUE);
              lastAllocatedEntry_ = originalLastAllocated;
              return FALSE;
            }

          // The send buffer is shared by multiple connections. Therefore,
          // allocate a different buffer for the reply. the reply buffer size
          // is set as:
          //
          //   - if user has explicitly specified the reply buffer length
          //     and it is less than maxIOSize_, then use it.
          //   - otherwise, use maxIOSize_.
          IpcMessageStream *msgStream = entry.buffer_->getMessageStream()->castToIpcMessageStream();
          //assert(msgStream);
          IpcMessageObjSize maxReplyLen = msgStream->getMaxReplyLength();
          if (maxReplyLen > 0 && maxReplyLen < maxIOSize_)
            entry.readBuffer_ = entry.buffer_->createBuffer(getEnvironment(), maxReplyLen, FALSE);
          else
            entry.readBuffer_ = entry.buffer_->createBuffer(getEnvironment(), maxIOSize_, FALSE);
          if (entry.readBuffer_ == NULL)
            {
              // We ran out of space on the IPC heap ...
              getEnvironment()->setHeapFullFlag(TRUE);
              lastAllocatedEntry_ = originalLastAllocated;
              return FALSE;
            }
        }

      if (entry.buffer_->getMessageLength() > maxIOSize_)
        {
          // case c), the message we just got from the send queue is too large
          // to be sent in a single chunk :-(
          assert(partiallySentBuffer_ == NULL);

          // indicate multi-chunk protocol
          partiallySentBuffer_ = entry.buffer_;

          entry.bytesSent_ = maxIOSize_;
          chunkBytesSent_ = entry.bytesSent_;
        }
      else
        {
          // case d) can send in single chunk
          entry.bytesSent_ = entry.buffer_->getMessageLength();
          // can call the send callback now
          isLastChunk = TRUE;
        }

      prepareSendBuffer(entry.buffer_);

      // got this far so de-queue buffer from this connection's queue
      removeNextSendBuffer();
      entry.receiveBufferSizeLeft_ =
        MINOF(maxIOSize_, entry.readBuffer_->getBufferLength());

      lastSentBuffer_ = entry.buffer_;
    }

  // ---------------------------------------------------------------------
  // Next, start the I/O operation
  // ---------------------------------------------------------------------
  short wireOptions = 8;
  short GuardianError = 0;
 
  direct_globals_template * pfsptr;
  acb_standard_template * acb;

  // Get the pointer to the acb - needed in order to decrement the count
  // of outstanding requests in the ACB on errors.
  fs2_get_pfsaddr((Long*)&pfsptr);

  // check that the file is actually open, i.e., that this is a valid 
  // filenum.
  if ((ULng32)(openFile_) >= (ULng32)(pfsptr->numftentries))
  {
     GuardianError = FENOTOPEN;
  }
  else
     acb = (acb_standard_template *) pfsptr->file_table[openFile_];

  //become unstoppable
  short oldstop = SETSTOP(2);

  if (NOT GuardianError)
    {
      // lock memory used for the control info buffer
      GuardianError = ADDRESS_WIRE_((unsigned char *)entry.controlBuf_,
                                    sizeof(fs_fs_writeread), wireOptions);
    }

  if (NOT GuardianError)
    {
      entry.transid_ = (Int64)entry.buffer_->getTransid();
      // setup request control buffer 
      GuardianError = setupRequestInfo((void*)entry.controlBuf_,
				       (Int64)entry.buffer_->getTransid());
      if (GuardianError)
        ADDRESS_UNWIRE_((unsigned char *)entry.controlBuf_,
                        (sizeof(fs_fs_writeread)), wireOptions);
    }

  if (NOT GuardianError)
    {
      GuardianError = addressWire(entry, wireOptions);
      if (GuardianError)
        {
          // decrement the number of outstanding requests -
          // this was incremented by the call to setupRequestInfo() above.
          acb->acb_numreqs = acb->acb_numreqs - 1;
          ADDRESS_UNWIRE_((unsigned char *)entry.controlBuf_,
                          (sizeof(fs_fs_writeread)), wireOptions);
        }
    }

  if (NOT GuardianError)
    {
      // generate Context-Based Addresses (CBAs) for the data buffer
      // and for the control buffer
      NSK_CBA  &writeDataCBA = *(NSK_CBA*) entry.writeDataCBAPtr_;
      NSK_CBA  &readDataCBA = *(NSK_CBA*) entry.readDataCBAPtr_;
      NSK_CBA  &controlCBA = *(NSK_CBA*) entry.controlCBAPtr_;

      // for all other types of send use the send buffer
      CBA_CREATE_((NSK_CBA *)entry.writeDataCBAPtr_, entry.buffer_->data(entry.offset_));

      if (!entry.readBuffer_)
        // this is a subsequent chunk of a multi-chunk send. no reply is
        // needed.
        CBA_CREATE_((NSK_CBA *)entry.readDataCBAPtr_, 0);
      else
        // all other types of reply
        CBA_CREATE_((NSK_CBA *)entry.readDataCBAPtr_, entry.readBuffer_->data(entry.offset_));

      CBA_CREATE_((NSK_CBA *)entry.controlCBAPtr_,entry.controlBuf_);

      NSK_msLinkOpts2 options = MSG_LINK_CBA;

      short retryCount = 0;
      NABoolean needToRetry;
      do {
          // send the message to the server, using CBAs, so that hide/reveal
          // operations later can't invalidate the addresses
          NSK_msId2 localMsgId = 0;

          GuardianError = MSG_LINK2_(
	       (NSK_PHandle _ptr64 *)getOtherEnd().getPhandle().phandle_,
	       (NSK_msId2 _ptr64 *)&localMsgId,
	       (int16 _ptr64 *)&controlCBA,
	       (NSK_msSize2)sizeof(fs_fs_writeread),
	       (int16 _ptr64 *)&controlCBA,
	       (NSK_msSize2)sizeof(fs_fs_writeread),
	       (char _ptr64 *)&writeDataCBA, //message to send to server
	       (NSK_msSize2)entry.bytesSent_,  //# of bytes to send 
	       (char _ptr64 *)&readDataCBA, //buffer to receive reply from server
	       (NSK_msSize2)entry.receiveBufferSizeLeft_, //reply bytes expected
               0, 0, 0, options);

          entry.msgid_ = (UInt32)localMsgId;

          if (GuardianError == FENOLCB)  // too many MQCs.
            {
              // Since the per-process limit was checked above,
              // assume that it is the per-cpu limit, so let us
              // retry.
              retryCount++;
              needToRetry = TRUE;
              getEnvironment()->incrRetriedMessages();
              PROCESS_DELAY_(10*1000);  // 10,000 microseconds
            }
          else
            needToRetry = FALSE;
      } while (needToRetry && 
               retryCount < 100*60 );  // after 60 seconds (and 6000 retries),
                                       // just give up.
      getEnvironment()->setEvent(TRUE, AEVENT);

      if (GuardianError)
        {
          // failed message link

          // decrement the number of outstanding requests -
          // this was incremented by the call to setupRequestInfo() above.
          acb->acb_numreqs = acb->acb_numreqs - 1;
          addressUnwire(entry);
        } // if (GuardianError)
      else
        {
          // Put the message id into the acb.
          putMsgIdinACB(entry.msgid_);
        }
    }

  //go back to old stop mode
  SETSTOP(oldstop);

  if (isFirstChunk)
    addSendCallbackBuffer(entry.buffer_);

  if (GuardianError)
    {
      // an error happened somewhere along the way and we must
      // a) record the Guardian error number,
      guaErrorInfo_ = GuardianError;
      // b) set the connection to the error state.  If we have not invoked
      // the send callback, then handleIOErrorForEntry() will invoke the
      // send callback.
      setErrorInfo(-1);
      handleIOErrorForEntry(entry);

      // if the design is to disallow any future i/o after connection become
      // error state, should we return false to prevent from calling
      // tryToStartNewIO() again?
      //return FALSE;
    }
  else
    {
      // buffer has been sent successfully
      if (numOutstandingIOs_ == 0)
	setState(SENDING);
      numOutstandingIOs_++;
      entry.inUse_ = TRUE;  // this entry now has an I/O in progress
      entry.expectReply_ = TRUE;

      // --------------------------------------------------------------
      // If we started the I/O for all chunks of an IpcMessageBuffer
      // (or if the IpcMessageBuffer was sent in a single message), then 
      // call its send callback.
      // --------------------------------------------------------------
      if (isLastChunk)
        {
          // removeSendCallbackBuffer() should always return TRUE here because
          // send callback has not been invoked yet. If error occured prior to
          // this method call then handleIOErrorForEntry() should have cleared
          // all I/Os on the same message stream and we would not have come
          // here.
          NABoolean sendCallbackFlag = removeSendCallbackBuffer(entry.buffer_);
          if (sendCallbackFlag)
            // the send callback doesn't give away entry.buffer_,  and this is
            // good since the same buffer may be still used for the receive
            // operation.
            entry.buffer_ ->callSendCallback(this);
          else // - for debugging only
            assert(sendCallbackFlag);
        }
    }

  return TRUE;
}

void GuaMsgConnectionToServer::openPhandle(char * processName)
{
  char  procFileName[IpcMaxGuardianPathNameLength];
  short procFileNameLen;
  short openFlags = nowaitDepth_ == 0 ? 0x0 : 0x4000;

  if (! processName)
    {
      // convert the phandle to a string that can be passed to FILE_OPEN_
      guaErrorInfo_ = PROCESSHANDLE_TO_FILENAME_(
	   (short *) getOtherEnd().getPhandle().phandle_,
	   procFileName,
	   IpcMaxGuardianPathNameLength,
	   &procFileNameLen,
	   0);
      if (guaErrorInfo_ != GuaOK)
	{
	  setErrorInfo(-1);
	  setState(ERROR_STATE);
	  return;
	}
    }
  else
    {
      strcpy(procFileName, processName);
      procFileNameLen = (short)strlen(processName);
    }

  getEnvironment()->setLdoneConsumed(TRUE);

  // solution 10-081025-6810:
  // wait for esp open reply indefinitely. time out every 10 minutes, write
  // a warning msg to ems log including error code and esp pin, and then go
  // back to waiting. we allow up to 10 timeouts, or 100 minutes.
  //
  // - when AWAITIOX times out, it completes/cancels the nowait i/o
  // initiated by FILE_OPEN_. thus we cannot simply call AWAITIOX again since
  // there is no more outstanding i/o. we must try FILE_OPEN_ again and then
  // call AWAITIOX to wait another 10 minutes. also, we must save the file
  // numbers returns from each timed out FILE_OPEN_ so we can close them later.
  // and we cannot call FILE_CLOSE_ on any of the returned file numbers until
  // AWAITIOX returns successfully with esp's open reply. otherwise the close
  // msg from master will cause esp to exit.
  //
  GuaFileNumber oldOpens[10]; // allow up to 10 timeouts
  short numOldOpens = 0;
  _cc_status stat;
  while (1)
    {
      guaErrorInfo_ = FILE_OPEN_(procFileName,
                                 procFileNameLen,
                                 &openFile_,
                                 0, // open for read/write access
                                 0, // shared access
                                 nowaitDepth_,
                                 0, // sync depth 0 (target proc is not NonStop)
                                 openFlags); // options
      if (guaErrorInfo_ != GuaOK)
        break;

      if (!(openFlags &= 0x4000))
        // return if FILE_OPEN_ was waited
        break;

      // FILE_OPEN_ was no-waited

      // wait indefinitely
      stat = AWAITIOX(&openFile_);
      if (_status_eq(stat))
        // reply received with no error
        break;

      // we got error. get Guardian error code.
      GuaErrorNumber getinfoError = FILE_GETINFO_(openFile_,&guaErrorInfo_);
      if (getinfoError != 0)
        guaErrorInfo_ = getinfoError; // not even FILE_GETINFO_ worked

      // set guaErrorInfo_ to -1 if we did not get a valid error?
      //if (guaErrorInfo_ == GuaOK)
      //  guaErrorInfo_ = -1;


      // AWAITIOX returned error, or ran out allowed timeouts.
      break;
    } // while (1)

  for (short i = 0; i < numOldOpens; i++)
    FILE_CLOSE_(oldOpens[i]);

  if (guaErrorInfo_ != GuaOK)
    {
      if (openFile_ != -1)
        {
          FILE_CLOSE_(openFile_); // Don't retain unopened ACB
          openFile_ = -1; // Don't leave valid file number in object!
        }
      setErrorInfo(-1);
      setState(ERROR_STATE);
      return;
    }

  // some day we may want to perform nowaited FILE_OPEN_ calls and add
  // a method to "work" on the open.

  // use setmode 74 to turn off the automatic CANCEL upon AWAITIOX timeout
  stat = SETMODE(openFile_,74,-1);
  if (_status_ne(stat))
    {
      // get a Guardian error code
      Int32 errcode2 = FILE_GETINFO_(openFile_,&guaErrorInfo_);

      if (errcode2 != 0)
	guaErrorInfo_ = errcode2; // not even FILE_GETINFO_ worked
      setErrorInfo(-1);
      setState(ERROR_STATE);
      return;
    }

  // use setmode 30 to allow I/O operations to finish in any order
  stat = SETMODE(openFile_,30,3);
  if (_status_ne(stat))
    {
      // get a Guardian error code
      Int32 errcode2 = FILE_GETINFO_(openFile_,&guaErrorInfo_);

      if (errcode2 != 0)
	guaErrorInfo_ = errcode2; // not even FILE_GETINFO_ worked
      setErrorInfo(-1);
      setState(ERROR_STATE);
      return;
    }

  // use setmode 117 if no transactions should be propagated to the server
  if (NOT usesTransactions_)
    {
      _cc_status stat = SETMODE(openFile_,117,1);
      if (_status_ne(stat))
	{
	  // get a Guardian error code
	  Int32 errcode2 = FILE_GETINFO_(openFile_,&guaErrorInfo_);

	  if (errcode2 != 0)
	    guaErrorInfo_ = errcode2; // not even FILE_GETINFO_ worked
	  setErrorInfo(-1);
	  setState(ERROR_STATE);
	  return;
	}
    }

  // the connection is established now
  setState(ESTABLISHED);
}

void GuaMsgConnectionToServer::closePhandle()
{
  //
  // it's possible that some pending I/Os still remain on this connection.
  // if IPC error occurs during query execution, there are three possible
  // scenarios for a connection when it comes here:
  //
  //   1. this connection received ipc error and handled the error on the spot
  //      by calling one of the handleIOError() methods.
  //   2. this connection did not receive error, but some other connection(s)
  //      on the same message stream received error. in this case we should
  //      invoke IpcMessageStream::abandonPendingIOs() to abort pending I/Os
  //      on all of stream's connections and invoke all necessary callbacks.
  //   3. this connection did not receive error, neither did any other
  //      connections on the same message stream. in this case it's possible
  //      that nothing happens on this connection until the destructor is
  //      invoked. thus we need to clean up any pending I/Os on the connection.
  //      we should invoke delinkConnection() on the stream to trigger any
  //      book keepings needed, as delinkConnection() and receive callback
  //      should have the same book keeping logic.
  //
  // example for #3: we have a test case that kills esps while executing a
  // long running query. the master has multiple send tops, with each send top
  // having a message stream that includes only one data connection to a top
  // level esp. if IPC error occurs on other connections (but not on this
  // connection) as the result of dead esps, all sql operations are aborted
  // from higher level and the master may never get a chance to call
  // handleIOError() to clean up any pending I/Os on this connection. thus we
  // need to release any msg buffers from the pending I/Os. but note that
  // in case of multi-chunk message buffer, there may be multiple I/O entries
  // pointing to the same message buffer. in that case each buffer should be
  // released only once.
  //
  handleIOError();

  // receive queue may not be empty. if so invoke receive callback.
  // example:
  // GuaMsgConnectionToServer::setFatalError() invokes handleIOErrorForStream()
  // that may have put message buffer on receive queue.
  IpcMessageBuffer *receiveBuf;
  while (receiveBuf = getNextReceiveQueueEntry())
    receiveBuf->callReceiveCallback(this);

  // Note that after closing, the connection is always in the initial
  // state. This is the way to fix a connection in the error state.
  // The close is always considered successful.
  guaErrorInfo_ = GuaOK;
  clearErrorInfo();
  setState(INITIAL);

  if (openFile_ != InvalidGuaFileNumber)
    {
      FILE_CLOSE_(openFile_);
      openFile_ = InvalidGuaFileNumber;
    }
}

//This function sets up the control information required by
//the file system on the server side
//control is a pointer to the buffer that is supposed to
//contain the control information
short GuaMsgConnectionToServer::setupRequestInfo(void * control, Int64 transid){
  //Redirected to T9055 to insulate SQL/MX from changes in 
  //ACB_REQUEST_TEMPLATE 
  Int32 retcode = FS_SQL_SETUPREQUESTINFO(openFile_,
                                        (fs_fs_template *) control,
                                        transid);
  return retcode;

#if 0
  fs_fs_writeread* controlInfo =(fs_fs_writeread*) control;
  direct_globals_template * pfsptr;//pointer to the PFS
  acb_standard_template * acb;//pointer to the acb for this file
  short error = FEOK; //variable to catch errors
  
  //first get pointer to the PFS
  fs2_get_pfsaddr((Long*)&pfsptr);
 
  //then get pointer to the acb in the 
  //PFS Filetable at the index openFile_
  acb = (acb_standard_template *) pfsptr->file_table[openFile_];
 
  //set the dialect type to indicate this is a call from the file system
  //since we are trying to emulate the WRITEREADX call
  controlInfo->dialect_type = DIALECT_FS_FS;
  controlInfo->request_type = FS_FS_WRITEREAD;
  controlInfo->request_version = CURRENT_VERSION_FS_FS;
  controlInfo->minimum_interpretation_version = MINIMUM_VERSION_FS_FS;
  
  controlInfo->tcbref_valid = 0;
  controlInfo->lid_valid = 0;
  controlInfo->filler = 0;
  memset((void *)&(controlInfo->sendflags), 0, 
         sizeof(linkmon_sendflags_template));
 
  //get the number of outstanding requests
  int_16 next = acb->acb_numreqs;
 
  void ** acb_reqaddrs = (void **)((char *)&(acb->req.acb_requestbase_addr) + 
                                   sizeof(acb->req.acb_requestbase_addr) );
 
  //get a pointer to the data for this request
  acb_request_template * acb_reqptr =
      (acb_request_template *)acb_reqaddrs[next];
  
  //check if numreqs is less than the maximum number of reqs allowed
  if (( acb->acb_numreqs) < (int_16) acb->acb_maxreqs)
  {
    //update the numreqs
    acb->acb_numreqs = next + 1;
    int_16 savenum = acb_reqptr->acb_reqnum;
    //initialize the acb req to all zeros
    _fill_32(acb_reqptr, (_len(acb_request_template)/ 4), 0 );
    acb_reqptr->acb_reqnum = savenum;
  } // if req avail
  else
  {
    // we used to return FETOOMANY here, but the check in
    // tryToStartNewIO of numOutStandingIOs and nowaitDepth_ 
    // should have prevented the FETOOMANY.
     assert(0);
  }
  
  // if transid argument is invalid set flag in controlInfo.
  if (transid == (Int64)-1) 
    {
      controlInfo->tcbref_valid = 0;
    }
  else
    {
      // Here, acb_reqptr->acb_tubaddr is not initialized with the 
      // context transid (as done in FS), to prevent ENDTRANSACTION 
      // from returning error 81 if there are outstanding ipc msgs 
      // (for read only query).
      // For nowaited queries involves insert, update, delete, 
      // or nowaited prepares, CliGlobals::checkOperationsPending 
      // detects outstanding operations and returns error to
      // FS2^FLUSH^ALL^VSBB.

      // transid passed in is valid. Call FS2 to move transid into
      // control buffer (may need to "massage" transid if the destination
      // is a remote node.
     
      error = Fs2_transid_to_buffer
                  (acb->posix.acb_procsection->tmfvirtualnode, 
                   (unsigned char *)&transid, 
                   (unsigned char *)&(controlInfo->tcbref),
                   0
                  );
      if (error)
	{
	  acb->acb_numreqs = acb->acb_numreqs - 1;
	  return error;  
	}
      controlInfo->tcbref_valid = 1;
    }
  
  //hard coded since file open passes a zero
  controlInfo->sender.syncid = 1;
  
  //get the filenum for this connection
  controlInfo->sender.first_word.filenum = openFile_;
  
  //copy the phandle
  memcpy(&controlInfo->sender.phandle,&pfsptr->my_phandle,(sizeof(short)*10));
  controlInfo->sender.user_openid = acb->posix.acb_procsection->acb_useropenid;
  controlInfo->sender.id.openid = acb->posix.acb_procsection->sender.acb_procopenid;
  return FEOK;
#endif
}

//This function is used to put the msgid into the acb after a MSG_LINK_.
//This let's the filesystem cleanup outstanding IOs on PROCESS_STOP_
void GuaMsgConnectionToServer::putMsgIdinACB(UInt32 msgid){
  //Redirected to T9055 to insulate SQL/MX from changes in 
  //ACB_REQUEST_TEMPLATE 
  Int32 retcode = FS_SQL_PUTMSGIDINACB(openFile_, msgid);
#if 0
  direct_globals_template * pfsptr;//pointer to the PFS
  acb_standard_template * acb;//pointer to the acb for openFile_
  
  //first get pointer to the PFS
  fs2_get_pfsaddr((Long*)&pfsptr);
  
  //then get pointer to the acb in the PFS 
  //Filetable at the index openFile_
  acb = (acb_standard_template *) pfsptr->file_table[openFile_];
  
  //get the index into the array of acb requests
  int_16 next = acb->acb_numreqs - 1;
  
  void ** acb_reqaddrs = (void **)((char *)&(acb->req.acb_requestbase_addr) + 
                                   sizeof(acb->req.acb_requestbase_addr) );

  //get a pointer to the acb data for this request
  acb_request_template * acb_reqptr =
      (acb_request_template *)acb_reqaddrs[next];
  
  //put the msgid into the acb
  acb_reqptr->mid.acb_mid = msgid;
  if (acb_reqptr->tub.acb_tubaddr != 0)
     acb_reqptr->tub.acb_tub->pending_count = 
            (ULng32)(acb_reqptr->tub.acb_tub->pending_count) + 1u;
#endif 
}

void  GuaMsgConnectionToServer::resetAfterReply(UInt32 msgid, short error,
                                                Int64 *transid){
  //Redirected to T9055 to insulate SQL/MX from changes in 
  //ACB_REQUEST_TEMPLATE 
  Int32 retcode = FS_SQL_RESETAFTERREPLY(openFile_, msgid, error, transid, 
                                       abortXnOnPathErrors_);

#if 0
  direct_globals_template * pfsptr;//pointer to the PFS
  acb_standard_template * acb;//pointer to the acb for openFile_
  acb_request_template * acb_reqptr;
  short reqIndex = -1; 
  short i;

 //first get pointer to the PFS
  fs2_get_pfsaddr((Long*)&pfsptr);
  
  //then get pointer to the acb in the 
  //PFS Filetable at the index openFile_
  acb = (acb_standard_template *) pfsptr->file_table[openFile_];
  
  void ** acb_reqaddrs = (void **)((char *)&(acb->req.acb_requestbase_addr) + 
                                   sizeof(acb->req.acb_requestbase_addr) );
  
  // get a pointer to the acb data for this request (search through array
  // of outstanding requests,, to get the request id that matches this one,
  // that was just completed). This is a fix for a bug - earlier, it was
  // just assumed that the request completed was the last request in the
  // array - this may not be the case, as requests may be completed out
  // of order. This bug lead to %4100 halts (BR #85). The fix to search
  // the array for the completed request, along with the fix to "shrink" the
  // array (see code below) solves this bug by doing what the Enscribe
  // Filesystem does in AWAITIOX.
  for (i=0; i <= acb->acb_numreqs; i++)
  {   
     acb_reqptr = (acb_request_template *)acb_reqaddrs[i];
     if (acb_reqptr->mid.acb_mid == msgid)
     {
        reqIndex = i;
        break;
     }
  }
  
  //set msgid to 0
  acb_reqptr->mid.acb_mid = 0;
  acb_reqptr->acb_reqrdy = 1;
  
  if (acb_reqptr->tub.acb_tubaddr != 0)
     acb_reqptr->tub.acb_tub->pending_count = 
            (ULng32)(acb_reqptr->tub.acb_tub->pending_count) - 1u;
 
  //decrement the numreqs
  acb->acb_numreqs = acb->acb_numreqs - 1;

  // Move all requests after this one to be before this one. This "shrinking"
  // of the request array is done to make it efficient to find a slot while
  // sending a new request (free slots are always at the end of the array)
  if ((acb->acb_numreqs > 0) && (reqIndex != acb->acb_numreqs))
  {
     i = reqIndex;
     while (i != acb->acb_numreqs)
     {
        acb_reqaddrs[i] = acb_reqaddrs[i + 1];
        i = i + 1;
     }
     acb_reqaddrs[acb->acb_numreqs] = (void *)acb_reqptr;
  }

  if (error == FECPUFAIL ||
      error == FEPATHDOWN ||
      (error >= FENETERR && error <= 255)
     )
  {
    // abort the transaction if a path error is received.
    if ((transid != NULL) && (*transid != -1))
    {
       // Want to stop the transaction on path errors. This is 
       // to fix the bug reported in solution 10-030508-6267.
       if (abortXnOnPathErrors_)
       {
          TMFLIBFS_ABORTTRANS_( (int_16 *)transid,
                                FETRANSABRTOWNERDIED );
       }
    }
  }
#endif
}

void GuaMsgConnectionToServer::setFatalError(IpcMessageStreamBase *msgStream)
{ 
  if (guaErrorInfo_ == GuaOK)  // if error hasn't been set yet
    guaErrorInfo_ = GuaIpcApplicationErr;

  setState(ERROR_STATE);

  // we must set error info to -1 so receive callback knows not to
  // parse the potentially corrupted receive queue buffer.
  setErrorInfo(-1);

  // handleIOErrorForStream() may put the message buffer on receive queue.
  // and that buffer may have the following content:
  //
  //   - the actual full reply from server, or
  //   - the send buffer if send failed, or
  //   - partial send or reply if send/reply was multi-chunk and did not
  //     complete
  //
  handleIOErrorForStream(msgStream);

  // This next call only works if GuaMsgConnectionToServer inherits
  // directly from IpcConnection.
  IpcConnection::setFatalError(msgStream);
}

void GuaMsgConnectionToServer::addSendCallbackBuffer(IpcMessageBuffer *buffer)
{
  for (unsigned short i = 0; i < nowaitDepth_; i++)
    {
      if (!sendCallbackBufferList_[i])
        {
          sendCallbackBufferList_[i] = buffer;
          return;
        }
    }
}

NABoolean GuaMsgConnectionToServer::removeSendCallbackBuffer(IpcMessageBuffer *buffer)
{
  for (unsigned short i = 0; i < nowaitDepth_; i++)
    {
      if (sendCallbackBufferList_[i] == buffer)
        {
          sendCallbackBufferList_[i] = NULL;
          return TRUE;
        }
    }

  return FALSE;  
}

void GuaMsgConnectionToServer::handleIOError()
{
  // connection no longer usable due to I/O error. abort all existing I/Os
  // on this connection.
  for (unsigned short i = 0; i < nowaitDepth_; i++)
    {
      ActiveIOQueueEntry &entry = activeIOs_[i];
      if (entry.inUse_)
        handleIOErrorForEntry(entry);
    }

  assert(partiallySentBuffer_ == NULL);
  assert(partiallyReceivedBuffer_ == NULL);

  currentEntry_ = 0;
  numOutstandingIOs_ = 0;
}

void GuaMsgConnectionToServer::handleIOErrorForStream(IpcMessageStreamBase *msgStream)
{
  // abort all existing I/Os on this connection that are from the given stream
  for (unsigned short i = 0; i < nowaitDepth_; i++)
    {
      ActiveIOQueueEntry &entry = activeIOs_[i];
      if (entry.inUse_ && entry.buffer_->getMessageStream() == msgStream)
        handleIOErrorForEntry(entry);
    }
}

//
// The I/O entry has a write buffer (entry.buffer_) and a read buffer
// (entry.readBuffer_). For entry description after message send, see comments
// at the top of tryToStartNewIO(). For entry description after receive reply,
// see comments at the top of wait().
//
void GuaMsgConnectionToServer::handleIOErrorForEntry(ActiveIOQueueEntry &entry)
{
  // I/O error occurred on given entry during send or receive

  if (getState() != ERROR_STATE)
    setState(ERROR_STATE);

  if (getErrorInfo() == 0)
      setErrorInfo(-1);

  // clean up I/O entry
  if (entry.inUse_)
    cleanUpActiveIOEntry(entry);

  // abort all existing I/Os on the same message stream
  //  - what about I/Os on other streams?
  for (unsigned short i = 0; i < nowaitDepth_; i++)
    {
      ActiveIOQueueEntry &nextEntry = activeIOs_[i];

      if (nextEntry.inUse_ &&
          nextEntry.buffer_->getMessageStream() ==
          entry.buffer_->getMessageStream())
        {
          cleanUpActiveIOEntry(nextEntry);

          // special case: for multi-chunk shared buffer, if nextEntry is
          // first chunk and entry is any chunk after first chunk, we need
          // to free the read buffer of the first chunk since all other chunks
          // have their read buffers set to NULL.
          if (nextEntry.readBuffer_ &&
              nextEntry.readBuffer_ != nextEntry.buffer_)
            // nextEntry is the first chunk of a multi-chunk shared buffer
            nextEntry.readBuffer_->decrRefCount(getEnvironment());
        }
    }  // for i

  // clear partial send/receive buffer on the same message stream so to
  // prevent further I/Os.
  if (partiallySentBuffer_ &&
      partiallySentBuffer_->getMessageStream() ==
      entry.buffer_->getMessageStream())
    partiallySentBuffer_ = NULL;
  else if (partiallyReceivedBuffer_ &&
           partiallyReceivedBuffer_->getMessageStream() ==
           entry.buffer_->getMessageStream())
    partiallyReceivedBuffer_ = NULL;

  // if entry is the first chunk of a multi-chunk shared buffer, we need to
  // free its read buffer.
  if (entry.readBuffer_ && entry.readBuffer_ != entry.buffer_)
    entry.readBuffer_->decrRefCount(getEnvironment());

  // put the buffer on receive queue, regardless of send succeeded or not.
  queueReceiveMessage(entry.buffer_);

  // if entry.buffer_ is still on the sendCallbackBufferList_, then send
  // callback has not been invoked for this connection and thus send failed.
  NABoolean sendSuccess = !removeSendCallbackBuffer(entry.buffer_);
  // in case of send failure invoke send callback
  if (!sendSuccess)
    entry.buffer_->callSendCallback(this);

  // the design is to disallow any future i/o after connection become error
  // state. so we should cleanup send queue by invoking send callback for
  // all send buffers.
  IpcMessageBuffer *sendBuffer;
  while (sendBuffer = removeNextSendBuffer())
    {
      queueReceiveMessage(sendBuffer);
      sendBuffer->callSendCallback(this);
    }
}

void GuaMsgConnectionToServer::cleanUpActiveIOEntry(ActiveIOQueueEntry &entry)
{
  short oldstop = SETSTOP(2);//become unstoppable

  // abort if we are still waiting for message reply
  if (entry.expectReply_)
    {
      MSG_ABANDON2_((NSK_msId2)entry.msgid_);
      resetAfterReply(entry.msgid_, 0, NULL);
    }

  addressUnwire(entry);

  SETSTOP(oldstop);//become stoppable 

  entry.expectReply_ = FALSE;
  entry.inUse_ = FALSE;
  entry.msgid_ = 0;
  entry.transid_ = -1;

  numOutstandingIOs_--;
}

// lock memory used for the actual message send/reply
short GuaMsgConnectionToServer::addressWire(ActiveIOQueueEntry &entry,
                                            short wireOptions)
{
  //
  // no need to lock control buffer in this method. it should have been locked
  // already by the caller.
  //

  //
  // lock memory for write/send buffer. write buffer may be shared.
  //
  short GuardianError = 0;
  if (entry.buffer_->isShared())
    {
      // write buffer is shared by multiple connections
      if (entry.buffer_->getLockCount(entry.offset_) == 0)
        {
          // if the write buffer chunk is not locked, lock it
          GuardianError = ADDRESS_WIRE_
            ((unsigned char *)entry.buffer_->data(entry.offset_),
             entry.bytesSent_, wireOptions);
          if (GuardianError)
            return GuardianError;
        }

      // increment lock count for shared write bufer
      entry.buffer_->incrLockCount(entry.offset_);
    }
  else
    {
      // write buffer is only used by a single connection
      // entry.buffer_ = entry.readBuffer_ = send buffer
      ULng32 maxDataBufferLength =
        MAXOF(entry.receiveBufferSizeLeft_, entry.bytesSent_);
      GuardianError= ADDRESS_WIRE_
        ((unsigned char *)entry.buffer_->data(entry.offset_),
         maxDataBufferLength, wireOptions);
      if (GuardianError)
        return GuardianError;
    }

  //
  // lock memory for read/reply buffer. reply buffer is never shared.
  //
  if (entry.readBuffer_ && entry.readBuffer_ != entry.buffer_)
    {
      // entry.buffer_ = send buffer (shared)
      // entry.readBuffer_ = reply buffer
      assert(entry.buffer_->isShared());
      GuardianError = ADDRESS_WIRE_
        ((unsigned char *)entry.readBuffer_->data(0),
         entry.receiveBufferSizeLeft_, wireOptions);
      if (GuardianError)
        {
          // if error, unlock write buffer that must be shared
          if (entry.buffer_->decrLockCount(entry.offset_) == 0)
            // no more I/O on the write buffer. unlock its memory.
            ADDRESS_UNWIRE_((unsigned char *)entry.buffer_->data(entry.offset_),
                            entry.bytesSent_, wireOptions);

          return GuardianError;
        }
    }

  return GuardianError;
}

// unlock memory used for the actual message send/reply
void GuaMsgConnectionToServer::addressUnwire(ActiveIOQueueEntry &entry)
{
  //
  // unlock memory for control buffer
  //
  short wireOptions = 8;
  ADDRESS_UNWIRE_((unsigned char *)entry.controlBuf_, sizeof(fs_fs_writeread), wireOptions);

  //
  // unlock memory for write/send buffer. write buffer may be shared.
  //
  if (entry.buffer_->isShared())
    {
      // write buffer is shared by multiple connections
      if (entry.buffer_->decrLockCount(entry.offset_) == 0)
        // no more I/O on the write buffer. unlock its memory.
        ADDRESS_UNWIRE_((unsigned char *)entry.buffer_->data(entry.offset_),
                        entry.bytesSent_, wireOptions);
    }
  else
    {
      // write buffer is only used by a single connection
      // entry.buffer_ = entry.readBuffer_ = send buffer
      ULng32 maxDataBufferLength =
        MAXOF(entry.receiveBufferSizeLeft_, entry.bytesSent_);
      ADDRESS_UNWIRE_((unsigned char *)entry.buffer_->data(entry.offset_),
                      maxDataBufferLength, wireOptions);
    }

  //
  // unlock memory for read/reply buffer. reply buffer is never shared.
  //
  if (entry.readBuffer_ && entry.readBuffer_ != entry.buffer_)
    ADDRESS_UNWIRE_((unsigned char *)entry.readBuffer_->data(0),
                    entry.receiveBufferSizeLeft_, wireOptions);
}

#undef _resident
#undef _priv
