| /********************************************************************** |
| // @@@ START COPYRIGHT @@@ |
| // |
| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| // |
| // @@@ END COPYRIGHT @@@ |
| **********************************************************************/ |
| /* -*-C++-*- |
| ***************************************************************************** |
| * |
| * File: IpcMsg.cpp |
| * Description: Implementation for the IPC classes using the NSK messaging |
| * API. |
| * |
| * Created: 6/25/99 |
| * Language: C++ |
| * |
| * |
| * |
| * |
| ***************************************************************************** |
| */ |
| |
| #define AEVENT 1 |
| |
| #define set_extern_data |
| |
| #include "Platform.h" |
| |
| #include "rosgen.h" |
| #include "fs_rosetta_dml.h" |
| #include "dpxnsdp2" |
| #include "yfsiopen" |
| //#include "dfsiopn.h" |
| enum {FS_SMS_VERSION_MAY94 = 1}; |
| #include "wdialect" |
| #include "ppctlc(WAIT, SETSTOP)" |
| #include "dmsghi.h" |
| #include "psignalc.h(PK_SIG_SYSTEMCALL_ABORTINQUIRE_, PK_SUSPEND_DISALLOW_SET_)" |
| #include "pmallocc(ADDRESS_WIRE_, ADDRESS_UNWIRE_)" |
| #include "hpfs2f(fs2_transid_to_buffer)" |
| #include "ffilcpp(FS_SQL_SETUPREQUESTINFO, FS_SQL_PUTMSGIDINACB, \ |
| FS_SQL_RESETAFTERREPLY)" |
| #include "Int64.h" |
| |
| #define _resident |
| #define _priv |
| #include "ExCollections.h" |
| #include "Ipc.h" |
| #include "str.h" |
| #include "ComDiags.h" |
| #include "NAExit.h" |
| #include "ipcmsg.h" |
| #include <fcntl.h> |
| #include "logmxevent.h" |
| |
| extern "C" { |
| //#include <cextdecs.h> |
| #include "cextdecs.h(PROCESSHANDLE_TO_FILENAME_,PROCESSHANDLE_DECOMPOSE_,FILE_OPEN_,SETMODE,FILE_GETINFO_,FILE_CLOSE_, AWAITIOX,PROCESS_DELAY_)" |
| #include <tal.h> |
| // should be #include <zsysc.h> |
| #include "zsysc.h" |
| } |
| |
| //Function used to get a pointer to the pfs |
| _callable void fs2_get_pfsaddr(Long *); |
| |
| //Function used to get transid from the filesystem |
| extern "C" _priv _resident int_16 FS_GETTRANSID_ |
| ( |
| extaddr tubaddr, // input |
| // the address of the Trans Usage Block |
| // (having the tcbref as the 1st |
| // field), or 0D or ptmfnocurtransid |
| phandle_template *destination, // input |
| // where it will be sent |
| int_16 *buffer); // output |
| // the tcbref is placed here |
| |
| extern "C" _priv int_16 TMFLIBFS_ABORTTRANS_ |
| ( |
| int_16 *tcbref_ptr, |
| short disposition |
| ); |
| |
| |
| // ----------------------------------------------------------------------- |
| // Methods for class GuaMsgConnectionToServer |
| // ----------------------------------------------------------------------- |
| |
| GuaMsgConnectionToServer::GuaMsgConnectionToServer( |
| IpcEnvironment *env, |
| const IpcProcessId &procId, |
| NABoolean usesTransactions, |
| unsigned short nowaitDepth, |
| const char *eye) : IpcConnection(env,procId,eye) |
| { |
| openFile_ = InvalidGuaFileNumber; |
| nowaitDepth_ = nowaitDepth; |
| maxIOSize_ = env->getGuaMaxMsgIOSize(); |
| |
| activeIOs_ = new(env) ActiveIOQueueEntry[nowaitDepth_]; |
| //get the length of the request/reply control structure |
| // int controllen = sizeof(fs_fs_template) + sizeof(fs_fs_template::__writeread); |
| Int32 controllen = sizeof(fs_fs_writeread); |
| |
| //initialize each entry in the Active IOs queue |
| for (unsigned short i = 0; i < nowaitDepth_; i++) |
| { |
| activeIOs_[i].writeDataCBAPtr_ = (void*) new(env) char[sizeof(NSK_CBA)]; |
| activeIOs_[i].readDataCBAPtr_ = (void*) new(env) char[sizeof(NSK_CBA)]; |
| activeIOs_[i].controlCBAPtr_ = (void*) new(env) char[sizeof(NSK_CBA)]; |
| activeIOs_[i].controlBuf_ = (void*) new(env) char[controllen]; |
| activeIOs_[i].inUse_ = FALSE; |
| activeIOs_[i].expectReply_ = FALSE; |
| activeIOs_[i].msgid_ = 0; |
| activeIOs_[i].transid_ = -1; |
| activeIOs_[i].buffer_ = activeIOs_[i].readBuffer_ = NULL; |
| } |
| lastAllocatedEntry_ = nowaitDepth_-1; |
| |
| numOutstandingIOs_ = 0; |
| partiallySentBuffer_ = NULL; |
| chunkBytesSent_ = 0; |
| partiallyReceivedBuffer_ = NULL; |
| chunkBytesRequested_ = 0; |
| chunkBytesReceived_ = 0; |
| usesTransactions_ = usesTransactions; |
| abortXnOnPathErrors_ = FALSE; |
| guaErrorInfo_ = GuaOK; |
| currentEntry_ = 0; |
| |
| // We need a nowait dept of at least 2, one for a message and another |
| // one for out-of-band messages (not really implemented yet). |
| // assert(nowaitDepth_ >= 2); |
| |
| sendCallbackBufferList_ = new(env) IpcMessageBuffer*[nowaitDepth_]; |
| for (unsigned short i = 0; i < nowaitDepth_; i++) |
| sendCallbackBufferList_[i] = NULL; |
| |
| // now open the server process |
| openPhandle(NULL); |
| } |
| |
| GuaMsgConnectionToServer::~GuaMsgConnectionToServer() |
| { |
| closePhandle(); |
| |
| CollHeap *heap = getEnvironment()->getHeap(); |
| for (Int32 i = 0; i < nowaitDepth_; i++) |
| { |
| ActiveIOQueueEntry &entry = activeIOs_[i]; |
| heap->deallocateMemory(entry.writeDataCBAPtr_); |
| heap->deallocateMemory(entry.readDataCBAPtr_); |
| heap->deallocateMemory(entry.controlCBAPtr_); |
| heap->deallocateMemory(entry.controlBuf_); |
| } |
| heap->deallocateMemory(activeIOs_); |
| heap->deallocateMemory(sendCallbackBufferList_); |
| } |
| |
| void GuaMsgConnectionToServer::send(IpcMessageBuffer *buffer) |
| { |
| // simply add the new buffer to the send queue and try to start |
| // as many new I/O operations as possible |
| queueSendMessage(buffer); |
| while (tryToStartNewIO()) |
| ; |
| } |
| |
| void GuaMsgConnectionToServer::receive(IpcMessageStreamBase *msg) |
| { |
| // Receiving from a Guardian server is implicit, since the WRITEREADX |
| // call performs both a send and a receive operation together. However, |
| // we still need to add the callback and, if the I/O has already |
| // completed, call the callback. |
| |
| addReceiveCallback(msg); |
| |
| // maybe the Guardian I/O has already completed and the buffer is |
| // waiting in the base class' receive queue |
| IpcMessageBuffer *receiveBuf; |
| while (receiveBuf = getNextReceiveQueueEntry()) |
| { |
| // yes, so just call its callback |
| receiveBuf->callReceiveCallback(this); |
| } |
| } |
| |
| NABoolean GuaMsgConnectionToServer::moreWaitsAllowed() |
| { |
| return !stopWait_; |
| } |
| |
| // |
| // Wait for an I/O reply. After receives a reply, the I/O entry looks like: |
| // |
| // - entry.buffer_=entry.readBuffer_=reply buffer |
| // |
| WaitReturnStatus GuaMsgConnectionToServer::wait(IpcTimeout timeout, UInt32 *eventConsumed = NULL, IpcAwaitiox *ipcAwaitiox) |
| { |
| NABoolean ipcAwaitioxCompleted = ipcAwaitiox != NULL; |
| if (ipcAwaitioxCompleted) |
| ipcAwaitioxCompleted = ipcAwaitiox->getCompleted(); |
| assert(ipcAwaitioxCompleted == FALSE); |
| //Internal error: AWAITIOX should not have completed a message system based operation |
| short error = 0; |
| short waitField = LDONE | LSIG; |
| short status = 0; |
| direct_globals_template * pfsptr; |
| fs2_get_pfsaddr((Long*)&pfsptr); |
| |
| if ((ULng32)(openFile_) >= (ULng32)(pfsptr->numftentries)) |
| { |
| // If this connection is not open, there can be nothing to wait on. |
| // Return an error in this case to indicate that this is an inappropriate |
| // call. Do no wait on this connection again. |
| guaErrorInfo_ = FENOTOPEN; |
| setErrorInfo(-1); |
| handleIOError(); |
| stopWait(TRUE); |
| return WAIT_OK; |
| } |
| |
| // don't do anything if the connection is in an error state and |
| // there are no more pending requests to work on. |
| if (getState() == ERROR_STATE AND numOutstandingIOs_ <= 0 |
| AND numQueuedSendMessages() <= 0) |
| { // no more waits on this connection |
| stopWait(TRUE); |
| return WAIT_OK; |
| } |
| stopWait(FALSE); |
| setBreakReceived(FALSE); |
| |
| // try to send or receive first if there is a timeout specified, |
| // or if we don't have any IOs outstanding. Also, try more |
| // I/O to cover the special case that we've posted only the |
| // first chunk of an multi-chunk msg. |
| |
| // This latter scenario is possible because of the logic on |
| // tryToStartNewIO that gives up if the per-process limit on |
| // MQCs would be exceeded. It required this special handling |
| // for multichunk messages, as explained in the next paragraph. |
| |
| // The connection may have to give up either 1) before the |
| // first chunk is posted, or 2) before the second chunk is |
| // posted or 3) after the second chunk is posted. |
| // The tryToStartNewIO method is called from this class's send |
| // method and then also from two places in this wait method. |
| // In case 1), the test of numOutStandingIOs_ will be |
| // sufficient to cause tryToStartNewIO to be called directly |
| // below this comment. For case 3), the other call to |
| // tryToStartNewIO will be taken, because the a_message_is_done |
| // will be set to true after the server replies to the chunk(s) |
| // already posted. But for case 2), we need special handling, |
| // because the server does not reply to the first chunk until |
| // all chunks are sent. And this class's send method will not |
| // be called again for the multichunk message. So the additional |
| // test below to detect that the second chunk has not yet been |
| // sent, is needed to force the call to tryToStartNewIO. |
| |
| if (timeout != IpcImmediately || |
| numOutstandingIOs_ == 0 || |
| chunkBytesSent_ == maxIOSize_) |
| while (tryToStartNewIO()) |
| ; |
| |
| // try to complete I/Os within the given time limit, |
| // if there are outstanding I/Os |
| if (numOutstandingIOs_ == 0) |
| return WAIT_OK; |
| |
| //used to mark the point in the activeIOs queue from |
| //where we started checking for IO completion |
| unsigned short start = currentEntry_; |
| //indicates we found a completed message |
| Int32 a_message_is_done = 0; |
| |
| //check if anyone of our messages has completed |
| do { |
| if ((activeIOs_[currentEntry_].inUse_) && |
| (activeIOs_[currentEntry_].expectReply_)) |
| { |
| //check if the message for this entry is done |
| if (MSG_ISDONE2_((NSK_msId2)activeIOs_[currentEntry_].msgid_)) |
| { |
| getEnvironment()->setEvent(TRUE, AEVENT); |
| a_message_is_done = 1; |
| break;// found a complete message break |
| } |
| } |
| |
| currentEntry_++; |
| |
| /* |
| ** Not that expensive since if-conversion reduces the control flow into |
| ** predicated instructions |
| */ |
| if (currentEntry_ == nowaitDepth_) |
| currentEntry_ = 0; |
| } while(currentEntry_ != start ); |
| |
| // |
| // IMPORTANT: MUST NOT have any early return statement from here until |
| // the end of this function. So that the loop at the end of the function |
| // that issues receive callbacks will always be executed if any receive |
| // entries were queued by this function. |
| // |
| |
| NABoolean interrupt = FALSE; |
| |
| //if no message is done wait, then check again for message completion |
| //for nowaited mode (timeout == 0), do not call WAIT. |
| if ((!a_message_is_done && timeout != 0) || getEnvironment()->lsigConsumed()) |
| { |
| // Only wait for LDONE if breakEnabled is not set. |
| IpcTimeout waitTimeout = timeout == 0 ? -2 : timeout; |
| if (getEnvironment()->lsigConsumed()) |
| status = LSIG; |
| else |
| { |
| if (getEnvironment()->breakEnabled()) |
| status = WAIT(waitField, waitTimeout);//wait for timeout |
| else |
| status = WAIT(LDONE, waitTimeout);//wait for timeout |
| } |
| if (!status) |
| { |
| a_message_is_done = FALSE; // timed out |
| } |
| else |
| { |
| if (status & LSIG) |
| { |
| getEnvironment()->setLsigConsumed(FALSE); |
| short oldsigmod = PK_SUSPEND_DISALLOW_SET_(1); |
| error = PK_SIG_SYSTEMCALL_ABORTINQUIRE_(); |
| PK_SUSPEND_DISALLOW_SET_(oldsigmod); |
| if (error > 0) |
| { |
| // received a signal. |
| guaErrorInfo_ = FE_EINTR; |
| setBreakReceived(TRUE); |
| setErrorInfo(-1); |
| handleIOError(); |
| interrupt = TRUE; // FE_EINTR |
| } |
| } // if (status & LSIG) |
| else |
| { |
| //woken up because some message completed on the LDONE queue |
| //somewhere. Check if the message completed was ours. |
| getEnvironment()->setLdoneConsumed(TRUE); |
| do { |
| //check if this activeIOs_ entry is in Use |
| if (activeIOs_[currentEntry_].inUse_ && |
| activeIOs_[currentEntry_].expectReply_) |
| { |
| //check if the message for this entry is done |
| if (MSG_ISDONE2_((NSK_msId2)activeIOs_[currentEntry_].msgid_)) |
| { |
| a_message_is_done = 1; |
| // The following line of code was added but is being |
| // removed because: |
| // a) LDONE consumed which servers the same purpose |
| // has already been set, and |
| // b) a compiler performance regression occurred and |
| // it's an unlikely but possible cause |
| //getEnvironment()->setEvent(TRUE, AEVENT); |
| break; |
| } |
| } |
| |
| currentEntry_++; |
| if (currentEntry_ == nowaitDepth_) { |
| currentEntry_ = 0; |
| } |
| } while(currentEntry_ != start ); |
| } |
| } |
| } |
| |
| if (a_message_is_done) |
| { |
| ActiveIOQueueEntry &entry = activeIOs_[currentEntry_]; |
| |
| NSK_msResult2 results; |
| |
| short oldstop = SETSTOP(2);//become unstoppable |
| |
| //Pickup reply and terminate message |
| MSG_BREAK2_((NSK_msId2)entry.msgid_, &results, |
| (NSK_PHandle _ptr64 *)getOtherEnd().getPhandle().phandle_); |
| |
| //Get the error from the reply control buffer |
| message_header_template * ReplyControlBuf = (message_header_template *)entry.controlBuf_; |
| error = ReplyControlBuf->error(); |
| |
| if (error == GuaTimeoutErr) |
| { |
| // timeout does not set the connection into an error state but it |
| // causes a return. later we shall wait on this connection again. |
| SETSTOP(oldstop);//become stoppable |
| guaErrorInfo_ = error; |
| return WAIT_OK; |
| } |
| |
| //reset the filesystem data structures |
| Int64 localTransid = entry.transid_; |
| resetAfterReply(entry.msgid_, error, &localTransid); |
| |
| SETSTOP(oldstop);//become stoppable |
| |
| // we have got the reply for this I/O entry |
| entry.expectReply_ = FALSE; |
| |
| if (error) |
| { |
| // Remember the Guardian error code |
| guaErrorInfo_ = error; |
| setErrorInfo(-1); |
| handleIOErrorForEntry(entry); |
| } |
| else |
| { |
| cleanUpActiveIOEntry(entry); |
| |
| //get # of bytes written in reply |
| ULng32 countRead = (ULng32)results.rr_dataSize; |
| |
| // Now try to figure out what the original operation was, so |
| // we know what to do with the IpcMessageBuffer: |
| // |
| // a) If this I/O was a write operation for part of a buffer, |
| // then another operation for the same buffer is following, so |
| // just remove the outstanding I/O entry. |
| // b) If this I/O returned part of a buffer, then we have to issue |
| // another I/O operation for the rest of the buffer. |
| // c) If we have received all of the data, the buffer is ready to |
| // be delivered to its destination. If the buffer has its callback |
| // assigned, then call the callback, otherwise add the buffer to |
| // the receive queue that is managed by the base class, |
| // IpcConnection. "completelyReadBuffer" is set for case c) |
| // |
| if (entry.receiveBufferSizeLeft_ == 0) |
| { |
| // case a) |
| assert(countRead == 0); |
| |
| // Note that this does NOT count as a completion, we don't |
| // let the upper layers know that we are using multiple |
| // Guardian I/Os for this. |
| } |
| else |
| { |
| // we did expect data back, case b) or c) |
| if (entry.offset_ == 0) |
| { |
| if (numOutstandingIOs_ > 0) |
| { |
| // more pending I/Os on this connection. check if there |
| // are other chunks of entry.buffer_. |
| // |
| // - in multi-chunk mode, when server receives any |
| // of the subsequent chunk, it sends an empty reply |
| // right away. however, when server receives the first |
| // chunk, it does not reply right away. instead, server |
| // holds the first chunk until it has received all |
| // subsequent chunks and then server replies to the |
| // first chunk. on the client side, even though the |
| // first chunk receives its reply the last, but since |
| // we are looking at a randomly selected I/O entry, |
| // "entry" could be any one of the multi-chunk I/Os. |
| // |
| for (unsigned short i = 0; i < nowaitDepth_; i++) |
| { |
| ActiveIOQueueEntry &nextEntry = activeIOs_[i]; |
| if (nextEntry.inUse_ && |
| nextEntry.buffer_ == entry.buffer_) |
| { |
| // entry is the first chunk and nextEntry |
| // is a subsequent chunk of a multi-chunk |
| // send buffer. |
| // only the first chunk receives reply. |
| // there is no reply for subsequent chunks. |
| |
| // we must clean up I/O on nextEntry now |
| // or otherwise the shared send buffer |
| // entry.buffer_ can get deallocated |
| // (see a few lines below). after that |
| // we cannot clean up I/O on nextEntry as |
| // entry.buffer_->chunkLockCount_ is not |
| // longer accessible. |
| cleanUpActiveIOEntry(nextEntry); |
| |
| if (numOutstandingIOs_ == 0) |
| // no more pending I/Os on this connection |
| break; |
| } |
| } |
| } |
| |
| if (entry.buffer_->isShared()) |
| { |
| // no longer need the shared send buffer. release it. |
| entry.buffer_->decrRefCount(getEnvironment()); |
| // now use only the reply buffer |
| entry.buffer_ = entry.readBuffer_; |
| } |
| |
| // since this is the first (maybe only) chunk of the message |
| // buffer, we can get the length of the total message by |
| // looking into the message header. |
| |
| // Get the size of the message sent (or the reply buffer if shared) |
| IpcMessageObjSize bytesSent = entry.buffer_->getMessageLength(); |
| |
| // unpack message header which contains reply message length |
| InternalMsgHdrInfoStruct *msgHdr = |
| new( (IpcMessageObj*)(entry.buffer_->data(0)) ) |
| InternalMsgHdrInfoStruct(NULL); |
| IpcMessageObjSize msgLen = msgHdr->getMsgLengthFromData(); |
| |
| // remember the real length of the message coming back |
| entry.buffer_->setMessageLength(msgLen); |
| |
| // check whether this is case b) or c) |
| if (msgLen == (IpcMessageObjSize) countRead) |
| { |
| // The "normal" case c). This is a single-chunk reply. |
| |
| // If we were sending a large buffer(more than one chunk) |
| // and just received a small buffer (one chunk) then |
| // release the large buffer to conserve space on the IPC |
| // heap. |
| if (bytesSent > maxIOSize_) |
| { |
| entry.buffer_ = entry.buffer_->resize(getEnvironment(), msgLen); |
| } |
| |
| queueReceiveMessage(entry.buffer_); |
| } |
| else |
| { |
| // Case b). This is a multi-chunk reply. Switch to the |
| // chunk protocol, countRead bytes are already received |
| // back from the server. |
| if (msgLen > entry.buffer_->getBufferLength() || |
| bytesSent > maxIOSize_) |
| { |
| // We want to resize the reply buffer if either: |
| // - The server has a reply message that is larger |
| // than what our buffer can hold |
| // - The request buffer was large (more than one |
| // chunk) and may now be consuming space |
| // unnecessarily on the IPC heap |
| entry.buffer_->setMessageLength(countRead); |
| entry.buffer_ = entry.buffer_->resize(getEnvironment(), msgLen); |
| entry.buffer_->setMessageLength(msgLen); |
| } |
| |
| // do some sanity checks, make sure we don't have two |
| // partial buffers at a time |
| assert(partiallyReceivedBuffer_ == NULL); |
| assert(msgLen > (IpcMessageObjSize) countRead); |
| |
| // move some information from the entry to data members |
| // in the connection while the chunk protocol is going on |
| partiallyReceivedBuffer_ = entry.buffer_; |
| chunkBytesRequested_ = countRead; |
| chunkBytesReceived_ = countRead; |
| getEnvironment()->getAllConnections()-> |
| setReceivedPartialMessage(TRUE); |
| } |
| } // first (maybe only) chunk |
| else |
| { |
| // case b), this is not the first chunk |
| assert (partiallyReceivedBuffer_ == entry.buffer_); |
| chunkBytesReceived_ += countRead; |
| |
| if (chunkBytesReceived_ == entry.buffer_->getMessageLength()) |
| { |
| // this was the last chunk |
| queueReceiveMessage(partiallyReceivedBuffer_); |
| partiallyReceivedBuffer_ = NULL; |
| chunkBytesRequested_ = 0; |
| chunkBytesReceived_ = 0; |
| } |
| else |
| { |
| getEnvironment()->getAllConnections()-> |
| setReceivedPartialMessage(TRUE); |
| } |
| } // not the first chunk |
| } // case b) or c) |
| |
| // this I/O completed |
| if (getState() != ERROR_STATE && numOutstandingIOs_ == 0) |
| setState(ESTABLISHED); |
| |
| // after waiting, try (again) to start as many new I/O operations as |
| // possible |
| while (tryToStartNewIO()) |
| ; |
| } // if (error) else |
| } // if (a_message_is_done) |
| |
| IpcMessageBuffer *receiveBuf; |
| NABoolean aCallbackIsCalled = FALSE; |
| while (receiveBuf = getNextReceiveQueueEntry()) |
| { |
| // When the user of this connection sets trustIncomingBuffers_ to |
| // FALSE then we perform an integrity check on all incoming |
| // message buffers. A failure causes the connection to transition |
| // to the ERROR_STATE state. |
| if (!getTrustIncomingBuffers() && getState() != ERROR_STATE) |
| { |
| if (!receiveBuf->verifyBackbone()) |
| { |
| setIpcMsgBufCheckFailed(TRUE); |
| guaErrorInfo_ = 0; |
| setErrorInfo(-1); |
| setState(ERROR_STATE); |
| } |
| } |
| |
| receiveBuf->callReceiveCallback(this); |
| aCallbackIsCalled = TRUE; |
| } |
| |
| // In the ERROR_STATE state we may have to announce I/O completion after |
| // callbacks are issued. The setState() method has the job of |
| // detecting when I/O is complete and informing the IpcEnviroment at |
| // the appropriate time. Even though it's not intuitive to call |
| // setState(ERROR_STATE) here (because we are already in the ERROR_STATE state), |
| // we make the call anyway to trigger any necessary bookkeeping. |
| if (aCallbackIsCalled && getState() == ERROR_STATE) |
| setState(ERROR_STATE); |
| |
| return interrupt ? WAIT_INTERRUPT : WAIT_OK ; |
| } |
| |
| GuaMsgConnectionToServer * GuaMsgConnectionToServer::castToGuaMsgConnectionToServer() |
| { |
| return this; |
| } |
| |
| Int32 GuaMsgConnectionToServer::numQueuedSendMessages() |
| { |
| return sendQueueEntries(); |
| } |
| |
| Int32 GuaMsgConnectionToServer::numQueuedReceiveMessages() |
| { |
| return receiveQueueEntries(); |
| } |
| |
| void GuaMsgConnectionToServer::populateDiagsArea(ComDiagsArea *&diags, |
| CollHeap *diagsHeap) |
| { |
| if (guaErrorInfo_ != GuaOK) |
| { |
| IpcAllocateDiagsArea(diags,diagsHeap); |
| |
| *diags << DgSqlCode(-2034) << DgInt0(guaErrorInfo_); |
| getEnvironment()->getMyOwnProcessId(IPC_DOM_GUA_PHANDLE). |
| addProcIdToDiagsArea(*diags,0); |
| getOtherEnd().addProcIdToDiagsArea(*diags,1); |
| } |
| |
| if (getIpcMsgBufCheckFailed()) |
| { |
| IpcAllocateDiagsArea(diags, diagsHeap); |
| |
| *diags << DgSqlCode(-2037); |
| getEnvironment()->getMyOwnProcessId(IPC_DOM_GUA_PHANDLE). |
| addProcIdToDiagsArea(*diags, 0); |
| getOtherEnd().addProcIdToDiagsArea(*diags, 1); |
| } |
| } |
| |
| // |
| // Send a new I/O request. |
| // |
| // In Guardian each msg I/O requires a write buffer to send the request, and |
| // a read buffer used to receive the reply. A same buffer can be used as both |
| // the write buffer and the read buffer. here we will refer the write buffer |
| // as the "send buffer", and the read buffer as the "reply buffer". |
| // |
| // In our implementation we use ActiveIOQueueEntry to represent a msg I/O. |
| // entry.buffer_ and entry.readBuffer_ can be used as the send buffer and the |
| // reply buffer. however, the values of entry.buffer_ and entry.readBuffer_ |
| // depend on the type of the message buffer to be sent. There are four |
| // possible scenarios: |
| // |
| // 1. multi-chunk send buffer, shared by multiple connections: |
| // |
| // - first chunk: entry.buffer_=send buffer |
| // entry.readBuffer_=reply buffer |
| // - subsequent chunks: entry.buffer_=send buffer, entry.readBuffer_=NULL |
| // |
| // 2. multi-chunk send buffer, single connection (not shared): |
| // |
| // - first chunk: entry.buffer_=entry.readBuffer_=send buffer |
| // - subsequent chunks: entry.buffer_=send buffer, entry.readBuffer_=NULL |
| // |
| // 3. single-chunk send buffer, shared by multiple connections |
| // |
| // - entry.buffer_=send buffer |
| // - entry.readBuffer_=reply buffer |
| // |
| // 4. single-chunk send buffer, single connection (not shared): |
| // |
| // - entry.buffer_=entry.readBuffer_=send buffer |
| // |
| |
| |
| NABoolean GuaMsgConnectionToServer::tryToStartNewIO() |
| { |
| // Any more messages or parts of messages to send? |
| |
| // There is nothing to do if there are neither new messages nor |
| // incompleted partial messages. |
| if (sendQueueEntries() == 0 && !partiallySentBuffer_ && |
| !partiallyReceivedBuffer_) |
| return FALSE; |
| |
| // do not allow new send if a partial message is being received and we |
| // already have requested all the reply data for it. |
| if (partiallyReceivedBuffer_ && |
| partiallyReceivedBuffer_->getMessageLength() == chunkBytesRequested_) |
| return FALSE; |
| |
| // Can't have more than nowaitDepth_ - 1 I/Os outstanding, except |
| // when there is an out-of-band message for which we make an exception. |
| // If there is an out-of-Band message then assume that it was placed |
| // in front of the send queue. |
| if (numOutstandingIOs_ >= |
| (IFX (partiallySentBuffer_ OR partiallyReceivedBuffer_) |
| THENX nowaitDepth_ |
| ELSEX (nowaitDepth_-1))) |
| return FALSE; |
| |
| // Check if the per-process limit on MQCs is exceeded. |
| // Note that this should be reconsidered when this code is |
| // multithreaded. |
| short numMsgsActual; |
| if (MESSAGESYSTEMINFO(5, &numMsgsActual)) |
| assert(0); |
| if (numMsgsActual+1 >= getEnvironment()->getMaxPerProcessMQCs()) |
| return FALSE; |
| |
| // If we reach here this means we can start another nowait I/O; |
| // get to the outstanding I/O entry that is to be filled next. |
| #ifndef NDEBUG |
| NABoolean wrapAroundCheck = FALSE; |
| #endif |
| |
| // We may have to return early from this method if we cannot find |
| // space on the IPC heap for an outgoing buffer. If that happens |
| // we'll want to restore the original value of lastAllocatedEntry_. |
| unsigned short originalLastAllocated = lastAllocatedEntry_; |
| |
| // find an entry that is not in use |
| while (activeIOs_[lastAllocatedEntry_].inUse_) |
| { |
| // increment lastAllocatedEntry_ modulo nowaitDepth_ |
| lastAllocatedEntry_++; |
| if (lastAllocatedEntry_ == nowaitDepth_) |
| { |
| lastAllocatedEntry_ = 0; |
| #ifndef NDEBUG |
| assert(!wrapAroundCheck); // to detect infinite loop (shouldn't happen) |
| wrapAroundCheck = TRUE; |
| #endif |
| } |
| } |
| // we have found an entry that is not in use |
| ActiveIOQueueEntry &entry = activeIOs_[lastAllocatedEntry_]; |
| //assert(!entry.inUse_ && !entry.expectReply_); |
| |
| // --------------------------------------------------------------------- |
| // set up a new outstanding IO entry, depending on what to do next |
| // but don't start the corresponding I/O quite yet |
| // --------------------------------------------------------------------- |
| |
| // initialize all fields of the entry (there is no constructor) |
| entry.bytesSent_ = 0; |
| entry.receiveBufferSizeLeft_ = 0; |
| entry.offset_ = 0; |
| entry.msgid_ = 0; |
| |
| // These help keep track of the need to callSendCallback. |
| NABoolean isFirstChunk = FALSE; |
| NABoolean isLastChunk = FALSE; |
| |
| // --------------------------------------------------------------------- |
| // Decide what to do, depending on the currently pending buffers and |
| // IOs: |
| // |
| // a) send another chunk of a large message down to the server without |
| // asking for data back |
| // b) request some more data from the server, if the server replied |
| // with a partial message and we haven't asked for all of the |
| // rest of the data yet (never interleave this with I/Os of |
| // steps a) and b), so the server won't get confused |
| // c) get another message from the send queue and find out that it |
| // is too long for a single chunk, so send the first piece |
| // d) should be the normal case, get the next message from the send |
| // queue and send it in a single chunk |
| // --------------------------------------------------------------------- |
| |
| if (partiallySentBuffer_) |
| { |
| // case a) continue sending more chunks for this buffer |
| // but don't ask for reply data, since we may want the reply to come |
| // back at entry.buffer_->data(0) |
| entry.buffer_ = partiallySentBuffer_; |
| // for multi-chunk buffer, whether shared or not, the read buffer for |
| // any chunk after first chunk is NULL. |
| entry.readBuffer_ = NULL; |
| |
| assert(chunkBytesSent_ < entry.buffer_->getMessageLength()); |
| |
| entry.bytesSent_ = MINOF(maxIOSize_, |
| entry.buffer_->getMessageLength() - |
| chunkBytesSent_); |
| entry.offset_ = chunkBytesSent_; |
| chunkBytesSent_ += entry.bytesSent_; |
| // if this is the last chunk ... |
| if (chunkBytesSent_ >= entry.buffer_->getMessageLength()) |
| { |
| // we're done sending chunks |
| partiallySentBuffer_ = NULL; |
| chunkBytesSent_ = 0; |
| // can call the send callback now |
| isLastChunk = TRUE; |
| } |
| |
| lastSentBuffer_ = entry.buffer_; |
| } |
| else if (partiallyReceivedBuffer_) |
| { |
| // b) next thing to do is to receive another chunk from the server |
| entry.buffer_ = entry.readBuffer_ = partiallyReceivedBuffer_; |
| entry.offset_ = chunkBytesRequested_; |
| entry.receiveBufferSizeLeft_ = |
| MINOF(maxIOSize_, |
| entry.buffer_->getMessageLength() - chunkBytesRequested_); |
| chunkBytesRequested_ += entry.receiveBufferSizeLeft_; |
| |
| lastSentBuffer_ = entry.buffer_; |
| } |
| else |
| { |
| // get the next buffer to send from the send queue and check |
| // whether it can be sent in one piece |
| |
| assert(sendQueueEntries() > 0); |
| IpcMessageBuffer *nextToSend = sendQueue()[0]; |
| assert(nextToSend); |
| |
| // assume request and reply use same buffer |
| entry.buffer_ = entry.readBuffer_ = nextToSend; |
| isFirstChunk = TRUE; |
| |
| if (entry.buffer_->getRefCount() > 1 || entry.buffer_->isShared()) |
| { |
| if (!entry.buffer_->initLockCount(getEnvironment()->getHeap(), |
| maxIOSize_)) |
| { |
| // We ran out of space on the IPC heap. This is OK and we |
| // can return early. Higher layers will redrive the I/O for |
| // this connection. |
| getEnvironment()->setHeapFullFlag(TRUE); |
| lastAllocatedEntry_ = originalLastAllocated; |
| return FALSE; |
| } |
| |
| // The send buffer is shared by multiple connections. Therefore, |
| // allocate a different buffer for the reply. the reply buffer size |
| // is set as: |
| // |
| // - if user has explicitly specified the reply buffer length |
| // and it is less than maxIOSize_, then use it. |
| // - otherwise, use maxIOSize_. |
| IpcMessageStream *msgStream = entry.buffer_->getMessageStream()->castToIpcMessageStream(); |
| //assert(msgStream); |
| IpcMessageObjSize maxReplyLen = msgStream->getMaxReplyLength(); |
| if (maxReplyLen > 0 && maxReplyLen < maxIOSize_) |
| entry.readBuffer_ = entry.buffer_->createBuffer(getEnvironment(), maxReplyLen, FALSE); |
| else |
| entry.readBuffer_ = entry.buffer_->createBuffer(getEnvironment(), maxIOSize_, FALSE); |
| if (entry.readBuffer_ == NULL) |
| { |
| // We ran out of space on the IPC heap ... |
| getEnvironment()->setHeapFullFlag(TRUE); |
| lastAllocatedEntry_ = originalLastAllocated; |
| return FALSE; |
| } |
| } |
| |
| if (entry.buffer_->getMessageLength() > maxIOSize_) |
| { |
| // case c), the message we just got from the send queue is too large |
| // to be sent in a single chunk :-( |
| assert(partiallySentBuffer_ == NULL); |
| |
| // indicate multi-chunk protocol |
| partiallySentBuffer_ = entry.buffer_; |
| |
| entry.bytesSent_ = maxIOSize_; |
| chunkBytesSent_ = entry.bytesSent_; |
| } |
| else |
| { |
| // case d) can send in single chunk |
| entry.bytesSent_ = entry.buffer_->getMessageLength(); |
| // can call the send callback now |
| isLastChunk = TRUE; |
| } |
| |
| prepareSendBuffer(entry.buffer_); |
| |
| // got this far so de-queue buffer from this connection's queue |
| removeNextSendBuffer(); |
| entry.receiveBufferSizeLeft_ = |
| MINOF(maxIOSize_, entry.readBuffer_->getBufferLength()); |
| |
| lastSentBuffer_ = entry.buffer_; |
| } |
| |
| // --------------------------------------------------------------------- |
| // Next, start the I/O operation |
| // --------------------------------------------------------------------- |
| short wireOptions = 8; |
| short GuardianError = 0; |
| |
| direct_globals_template * pfsptr; |
| acb_standard_template * acb; |
| |
| // Get the pointer to the acb - needed in order to decrement the count |
| // of outstanding requests in the ACB on errors. |
| fs2_get_pfsaddr((Long*)&pfsptr); |
| |
| // check that the file is actually open, i.e., that this is a valid |
| // filenum. |
| if ((ULng32)(openFile_) >= (ULng32)(pfsptr->numftentries)) |
| { |
| GuardianError = FENOTOPEN; |
| } |
| else |
| acb = (acb_standard_template *) pfsptr->file_table[openFile_]; |
| |
| //become unstoppable |
| short oldstop = SETSTOP(2); |
| |
| if (NOT GuardianError) |
| { |
| // lock memory used for the control info buffer |
| GuardianError = ADDRESS_WIRE_((unsigned char *)entry.controlBuf_, |
| sizeof(fs_fs_writeread), wireOptions); |
| } |
| |
| if (NOT GuardianError) |
| { |
| entry.transid_ = (Int64)entry.buffer_->getTransid(); |
| // setup request control buffer |
| GuardianError = setupRequestInfo((void*)entry.controlBuf_, |
| (Int64)entry.buffer_->getTransid()); |
| if (GuardianError) |
| ADDRESS_UNWIRE_((unsigned char *)entry.controlBuf_, |
| (sizeof(fs_fs_writeread)), wireOptions); |
| } |
| |
| if (NOT GuardianError) |
| { |
| GuardianError = addressWire(entry, wireOptions); |
| if (GuardianError) |
| { |
| // decrement the number of outstanding requests - |
| // this was incremented by the call to setupRequestInfo() above. |
| acb->acb_numreqs = acb->acb_numreqs - 1; |
| ADDRESS_UNWIRE_((unsigned char *)entry.controlBuf_, |
| (sizeof(fs_fs_writeread)), wireOptions); |
| } |
| } |
| |
| if (NOT GuardianError) |
| { |
| // generate Context-Based Addresses (CBAs) for the data buffer |
| // and for the control buffer |
| NSK_CBA &writeDataCBA = *(NSK_CBA*) entry.writeDataCBAPtr_; |
| NSK_CBA &readDataCBA = *(NSK_CBA*) entry.readDataCBAPtr_; |
| NSK_CBA &controlCBA = *(NSK_CBA*) entry.controlCBAPtr_; |
| |
| // for all other types of send use the send buffer |
| CBA_CREATE_((NSK_CBA *)entry.writeDataCBAPtr_, entry.buffer_->data(entry.offset_)); |
| |
| if (!entry.readBuffer_) |
| // this is a subsequent chunk of a multi-chunk send. no reply is |
| // needed. |
| CBA_CREATE_((NSK_CBA *)entry.readDataCBAPtr_, 0); |
| else |
| // all other types of reply |
| CBA_CREATE_((NSK_CBA *)entry.readDataCBAPtr_, entry.readBuffer_->data(entry.offset_)); |
| |
| CBA_CREATE_((NSK_CBA *)entry.controlCBAPtr_,entry.controlBuf_); |
| |
| NSK_msLinkOpts2 options = MSG_LINK_CBA; |
| |
| short retryCount = 0; |
| NABoolean needToRetry; |
| do { |
| // send the message to the server, using CBAs, so that hide/reveal |
| // operations later can't invalidate the addresses |
| NSK_msId2 localMsgId = 0; |
| |
| GuardianError = MSG_LINK2_( |
| (NSK_PHandle _ptr64 *)getOtherEnd().getPhandle().phandle_, |
| (NSK_msId2 _ptr64 *)&localMsgId, |
| (int16 _ptr64 *)&controlCBA, |
| (NSK_msSize2)sizeof(fs_fs_writeread), |
| (int16 _ptr64 *)&controlCBA, |
| (NSK_msSize2)sizeof(fs_fs_writeread), |
| (char _ptr64 *)&writeDataCBA, //message to send to server |
| (NSK_msSize2)entry.bytesSent_, //# of bytes to send |
| (char _ptr64 *)&readDataCBA, //buffer to receive reply from server |
| (NSK_msSize2)entry.receiveBufferSizeLeft_, //reply bytes expected |
| 0, 0, 0, options); |
| |
| entry.msgid_ = (UInt32)localMsgId; |
| |
| if (GuardianError == FENOLCB) // too many MQCs. |
| { |
| // Since the per-process limit was checked above, |
| // assume that it is the per-cpu limit, so let us |
| // retry. |
| retryCount++; |
| needToRetry = TRUE; |
| getEnvironment()->incrRetriedMessages(); |
| PROCESS_DELAY_(10*1000); // 10,000 microseconds |
| } |
| else |
| needToRetry = FALSE; |
| } while (needToRetry && |
| retryCount < 100*60 ); // after 60 seconds (and 6000 retries), |
| // just give up. |
| getEnvironment()->setEvent(TRUE, AEVENT); |
| |
| if (GuardianError) |
| { |
| // failed message link |
| |
| // decrement the number of outstanding requests - |
| // this was incremented by the call to setupRequestInfo() above. |
| acb->acb_numreqs = acb->acb_numreqs - 1; |
| addressUnwire(entry); |
| } // if (GuardianError) |
| else |
| { |
| // Put the message id into the acb. |
| putMsgIdinACB(entry.msgid_); |
| } |
| } |
| |
| //go back to old stop mode |
| SETSTOP(oldstop); |
| |
| if (isFirstChunk) |
| addSendCallbackBuffer(entry.buffer_); |
| |
| if (GuardianError) |
| { |
| // an error happened somewhere along the way and we must |
| // a) record the Guardian error number, |
| guaErrorInfo_ = GuardianError; |
| // b) set the connection to the error state. If we have not invoked |
| // the send callback, then handleIOErrorForEntry() will invoke the |
| // send callback. |
| setErrorInfo(-1); |
| handleIOErrorForEntry(entry); |
| |
| // if the design is to disallow any future i/o after connection become |
| // error state, should we return false to prevent from calling |
| // tryToStartNewIO() again? |
| //return FALSE; |
| } |
| else |
| { |
| // buffer has been sent successfully |
| if (numOutstandingIOs_ == 0) |
| setState(SENDING); |
| numOutstandingIOs_++; |
| entry.inUse_ = TRUE; // this entry now has an I/O in progress |
| entry.expectReply_ = TRUE; |
| |
| // -------------------------------------------------------------- |
| // If we started the I/O for all chunks of an IpcMessageBuffer |
| // (or if the IpcMessageBuffer was sent in a single message), then |
| // call its send callback. |
| // -------------------------------------------------------------- |
| if (isLastChunk) |
| { |
| // removeSendCallbackBuffer() should always return TRUE here because |
| // send callback has not been invoked yet. If error occured prior to |
| // this method call then handleIOErrorForEntry() should have cleared |
| // all I/Os on the same message stream and we would not have come |
| // here. |
| NABoolean sendCallbackFlag = removeSendCallbackBuffer(entry.buffer_); |
| if (sendCallbackFlag) |
| // the send callback doesn't give away entry.buffer_, and this is |
| // good since the same buffer may be still used for the receive |
| // operation. |
| entry.buffer_ ->callSendCallback(this); |
| else // - for debugging only |
| assert(sendCallbackFlag); |
| } |
| } |
| |
| return TRUE; |
| } |
| |
| void GuaMsgConnectionToServer::openPhandle(char * processName) |
| { |
| char procFileName[IpcMaxGuardianPathNameLength]; |
| short procFileNameLen; |
| short openFlags = nowaitDepth_ == 0 ? 0x0 : 0x4000; |
| |
| if (! processName) |
| { |
| // convert the phandle to a string that can be passed to FILE_OPEN_ |
| guaErrorInfo_ = PROCESSHANDLE_TO_FILENAME_( |
| (short *) getOtherEnd().getPhandle().phandle_, |
| procFileName, |
| IpcMaxGuardianPathNameLength, |
| &procFileNameLen, |
| 0); |
| if (guaErrorInfo_ != GuaOK) |
| { |
| setErrorInfo(-1); |
| setState(ERROR_STATE); |
| return; |
| } |
| } |
| else |
| { |
| strcpy(procFileName, processName); |
| procFileNameLen = (short)strlen(processName); |
| } |
| |
| getEnvironment()->setLdoneConsumed(TRUE); |
| |
| // solution 10-081025-6810: |
| // wait for esp open reply indefinitely. time out every 10 minutes, write |
| // a warning msg to ems log including error code and esp pin, and then go |
| // back to waiting. we allow up to 10 timeouts, or 100 minutes. |
| // |
| // - when AWAITIOX times out, it completes/cancels the nowait i/o |
| // initiated by FILE_OPEN_. thus we cannot simply call AWAITIOX again since |
| // there is no more outstanding i/o. we must try FILE_OPEN_ again and then |
| // call AWAITIOX to wait another 10 minutes. also, we must save the file |
| // numbers returns from each timed out FILE_OPEN_ so we can close them later. |
| // and we cannot call FILE_CLOSE_ on any of the returned file numbers until |
| // AWAITIOX returns successfully with esp's open reply. otherwise the close |
| // msg from master will cause esp to exit. |
| // |
| GuaFileNumber oldOpens[10]; // allow up to 10 timeouts |
| short numOldOpens = 0; |
| _cc_status stat; |
| while (1) |
| { |
| guaErrorInfo_ = FILE_OPEN_(procFileName, |
| procFileNameLen, |
| &openFile_, |
| 0, // open for read/write access |
| 0, // shared access |
| nowaitDepth_, |
| 0, // sync depth 0 (target proc is not NonStop) |
| openFlags); // options |
| if (guaErrorInfo_ != GuaOK) |
| break; |
| |
| if (!(openFlags &= 0x4000)) |
| // return if FILE_OPEN_ was waited |
| break; |
| |
| // FILE_OPEN_ was no-waited |
| |
| // wait indefinitely |
| stat = AWAITIOX(&openFile_); |
| if (_status_eq(stat)) |
| // reply received with no error |
| break; |
| |
| // we got error. get Guardian error code. |
| GuaErrorNumber getinfoError = FILE_GETINFO_(openFile_,&guaErrorInfo_); |
| if (getinfoError != 0) |
| guaErrorInfo_ = getinfoError; // not even FILE_GETINFO_ worked |
| |
| // set guaErrorInfo_ to -1 if we did not get a valid error? |
| //if (guaErrorInfo_ == GuaOK) |
| // guaErrorInfo_ = -1; |
| |
| |
| // AWAITIOX returned error, or ran out allowed timeouts. |
| break; |
| } // while (1) |
| |
| for (short i = 0; i < numOldOpens; i++) |
| FILE_CLOSE_(oldOpens[i]); |
| |
| if (guaErrorInfo_ != GuaOK) |
| { |
| if (openFile_ != -1) |
| { |
| FILE_CLOSE_(openFile_); // Don't retain unopened ACB |
| openFile_ = -1; // Don't leave valid file number in object! |
| } |
| setErrorInfo(-1); |
| setState(ERROR_STATE); |
| return; |
| } |
| |
| // some day we may want to perform nowaited FILE_OPEN_ calls and add |
| // a method to "work" on the open. |
| |
| // use setmode 74 to turn off the automatic CANCEL upon AWAITIOX timeout |
| stat = SETMODE(openFile_,74,-1); |
| if (_status_ne(stat)) |
| { |
| // get a Guardian error code |
| Int32 errcode2 = FILE_GETINFO_(openFile_,&guaErrorInfo_); |
| |
| if (errcode2 != 0) |
| guaErrorInfo_ = errcode2; // not even FILE_GETINFO_ worked |
| setErrorInfo(-1); |
| setState(ERROR_STATE); |
| return; |
| } |
| |
| // use setmode 30 to allow I/O operations to finish in any order |
| stat = SETMODE(openFile_,30,3); |
| if (_status_ne(stat)) |
| { |
| // get a Guardian error code |
| Int32 errcode2 = FILE_GETINFO_(openFile_,&guaErrorInfo_); |
| |
| if (errcode2 != 0) |
| guaErrorInfo_ = errcode2; // not even FILE_GETINFO_ worked |
| setErrorInfo(-1); |
| setState(ERROR_STATE); |
| return; |
| } |
| |
| // use setmode 117 if no transactions should be propagated to the server |
| if (NOT usesTransactions_) |
| { |
| _cc_status stat = SETMODE(openFile_,117,1); |
| if (_status_ne(stat)) |
| { |
| // get a Guardian error code |
| Int32 errcode2 = FILE_GETINFO_(openFile_,&guaErrorInfo_); |
| |
| if (errcode2 != 0) |
| guaErrorInfo_ = errcode2; // not even FILE_GETINFO_ worked |
| setErrorInfo(-1); |
| setState(ERROR_STATE); |
| return; |
| } |
| } |
| |
| // the connection is established now |
| setState(ESTABLISHED); |
| } |
| |
| void GuaMsgConnectionToServer::closePhandle() |
| { |
| // |
| // it's possible that some pending I/Os still remain on this connection. |
| // if IPC error occurs during query execution, there are three possible |
| // scenarios for a connection when it comes here: |
| // |
| // 1. this connection received ipc error and handled the error on the spot |
| // by calling one of the handleIOError() methods. |
| // 2. this connection did not receive error, but some other connection(s) |
| // on the same message stream received error. in this case we should |
| // invoke IpcMessageStream::abandonPendingIOs() to abort pending I/Os |
| // on all of stream's connections and invoke all necessary callbacks. |
| // 3. this connection did not receive error, neither did any other |
| // connections on the same message stream. in this case it's possible |
| // that nothing happens on this connection until the destructor is |
| // invoked. thus we need to clean up any pending I/Os on the connection. |
| // we should invoke delinkConnection() on the stream to trigger any |
| // book keepings needed, as delinkConnection() and receive callback |
| // should have the same book keeping logic. |
| // |
| // example for #3: we have a test case that kills esps while executing a |
| // long running query. the master has multiple send tops, with each send top |
| // having a message stream that includes only one data connection to a top |
| // level esp. if IPC error occurs on other connections (but not on this |
| // connection) as the result of dead esps, all sql operations are aborted |
| // from higher level and the master may never get a chance to call |
| // handleIOError() to clean up any pending I/Os on this connection. thus we |
| // need to release any msg buffers from the pending I/Os. but note that |
| // in case of multi-chunk message buffer, there may be multiple I/O entries |
| // pointing to the same message buffer. in that case each buffer should be |
| // released only once. |
| // |
| handleIOError(); |
| |
| // receive queue may not be empty. if so invoke receive callback. |
| // example: |
| // GuaMsgConnectionToServer::setFatalError() invokes handleIOErrorForStream() |
| // that may have put message buffer on receive queue. |
| IpcMessageBuffer *receiveBuf; |
| while (receiveBuf = getNextReceiveQueueEntry()) |
| receiveBuf->callReceiveCallback(this); |
| |
| // Note that after closing, the connection is always in the initial |
| // state. This is the way to fix a connection in the error state. |
| // The close is always considered successful. |
| guaErrorInfo_ = GuaOK; |
| clearErrorInfo(); |
| setState(INITIAL); |
| |
| if (openFile_ != InvalidGuaFileNumber) |
| { |
| FILE_CLOSE_(openFile_); |
| openFile_ = InvalidGuaFileNumber; |
| } |
| } |
| |
| //This function sets up the control information required by |
| //the file system on the server side |
| //control is a pointer to the buffer that is supposed to |
| //contain the control information |
| short GuaMsgConnectionToServer::setupRequestInfo(void * control, Int64 transid){ |
| //Redirected to T9055 to insulate SQL/MX from changes in |
| //ACB_REQUEST_TEMPLATE |
| Int32 retcode = FS_SQL_SETUPREQUESTINFO(openFile_, |
| (fs_fs_template *) control, |
| transid); |
| return retcode; |
| |
| #if 0 |
| fs_fs_writeread* controlInfo =(fs_fs_writeread*) control; |
| direct_globals_template * pfsptr;//pointer to the PFS |
| acb_standard_template * acb;//pointer to the acb for this file |
| short error = FEOK; //variable to catch errors |
| |
| //first get pointer to the PFS |
| fs2_get_pfsaddr((Long*)&pfsptr); |
| |
| //then get pointer to the acb in the |
| //PFS Filetable at the index openFile_ |
| acb = (acb_standard_template *) pfsptr->file_table[openFile_]; |
| |
| //set the dialect type to indicate this is a call from the file system |
| //since we are trying to emulate the WRITEREADX call |
| controlInfo->dialect_type = DIALECT_FS_FS; |
| controlInfo->request_type = FS_FS_WRITEREAD; |
| controlInfo->request_version = CURRENT_VERSION_FS_FS; |
| controlInfo->minimum_interpretation_version = MINIMUM_VERSION_FS_FS; |
| |
| controlInfo->tcbref_valid = 0; |
| controlInfo->lid_valid = 0; |
| controlInfo->filler = 0; |
| memset((void *)&(controlInfo->sendflags), 0, |
| sizeof(linkmon_sendflags_template)); |
| |
| //get the number of outstanding requests |
| int_16 next = acb->acb_numreqs; |
| |
| void ** acb_reqaddrs = (void **)((char *)&(acb->req.acb_requestbase_addr) + |
| sizeof(acb->req.acb_requestbase_addr) ); |
| |
| //get a pointer to the data for this request |
| acb_request_template * acb_reqptr = |
| (acb_request_template *)acb_reqaddrs[next]; |
| |
| //check if numreqs is less than the maximum number of reqs allowed |
| if (( acb->acb_numreqs) < (int_16) acb->acb_maxreqs) |
| { |
| //update the numreqs |
| acb->acb_numreqs = next + 1; |
| int_16 savenum = acb_reqptr->acb_reqnum; |
| //initialize the acb req to all zeros |
| _fill_32(acb_reqptr, (_len(acb_request_template)/ 4), 0 ); |
| acb_reqptr->acb_reqnum = savenum; |
| } // if req avail |
| else |
| { |
| // we used to return FETOOMANY here, but the check in |
| // tryToStartNewIO of numOutStandingIOs and nowaitDepth_ |
| // should have prevented the FETOOMANY. |
| assert(0); |
| } |
| |
| // if transid argument is invalid set flag in controlInfo. |
| if (transid == (Int64)-1) |
| { |
| controlInfo->tcbref_valid = 0; |
| } |
| else |
| { |
| // Here, acb_reqptr->acb_tubaddr is not initialized with the |
| // context transid (as done in FS), to prevent ENDTRANSACTION |
| // from returning error 81 if there are outstanding ipc msgs |
| // (for read only query). |
| // For nowaited queries involves insert, update, delete, |
| // or nowaited prepares, CliGlobals::checkOperationsPending |
| // detects outstanding operations and returns error to |
| // FS2^FLUSH^ALL^VSBB. |
| |
| // transid passed in is valid. Call FS2 to move transid into |
| // control buffer (may need to "massage" transid if the destination |
| // is a remote node. |
| |
| error = Fs2_transid_to_buffer |
| (acb->posix.acb_procsection->tmfvirtualnode, |
| (unsigned char *)&transid, |
| (unsigned char *)&(controlInfo->tcbref), |
| 0 |
| ); |
| if (error) |
| { |
| acb->acb_numreqs = acb->acb_numreqs - 1; |
| return error; |
| } |
| controlInfo->tcbref_valid = 1; |
| } |
| |
| //hard coded since file open passes a zero |
| controlInfo->sender.syncid = 1; |
| |
| //get the filenum for this connection |
| controlInfo->sender.first_word.filenum = openFile_; |
| |
| //copy the phandle |
| memcpy(&controlInfo->sender.phandle,&pfsptr->my_phandle,(sizeof(short)*10)); |
| controlInfo->sender.user_openid = acb->posix.acb_procsection->acb_useropenid; |
| controlInfo->sender.id.openid = acb->posix.acb_procsection->sender.acb_procopenid; |
| return FEOK; |
| #endif |
| } |
| |
| //This function is used to put the msgid into the acb after a MSG_LINK_. |
| //This let's the filesystem cleanup outstanding IOs on PROCESS_STOP_ |
| void GuaMsgConnectionToServer::putMsgIdinACB(UInt32 msgid){ |
| //Redirected to T9055 to insulate SQL/MX from changes in |
| //ACB_REQUEST_TEMPLATE |
| Int32 retcode = FS_SQL_PUTMSGIDINACB(openFile_, msgid); |
| #if 0 |
| direct_globals_template * pfsptr;//pointer to the PFS |
| acb_standard_template * acb;//pointer to the acb for openFile_ |
| |
| //first get pointer to the PFS |
| fs2_get_pfsaddr((Long*)&pfsptr); |
| |
| //then get pointer to the acb in the PFS |
| //Filetable at the index openFile_ |
| acb = (acb_standard_template *) pfsptr->file_table[openFile_]; |
| |
| //get the index into the array of acb requests |
| int_16 next = acb->acb_numreqs - 1; |
| |
| void ** acb_reqaddrs = (void **)((char *)&(acb->req.acb_requestbase_addr) + |
| sizeof(acb->req.acb_requestbase_addr) ); |
| |
| //get a pointer to the acb data for this request |
| acb_request_template * acb_reqptr = |
| (acb_request_template *)acb_reqaddrs[next]; |
| |
| //put the msgid into the acb |
| acb_reqptr->mid.acb_mid = msgid; |
| if (acb_reqptr->tub.acb_tubaddr != 0) |
| acb_reqptr->tub.acb_tub->pending_count = |
| (ULng32)(acb_reqptr->tub.acb_tub->pending_count) + 1u; |
| #endif |
| } |
| |
| void GuaMsgConnectionToServer::resetAfterReply(UInt32 msgid, short error, |
| Int64 *transid){ |
| //Redirected to T9055 to insulate SQL/MX from changes in |
| //ACB_REQUEST_TEMPLATE |
| Int32 retcode = FS_SQL_RESETAFTERREPLY(openFile_, msgid, error, transid, |
| abortXnOnPathErrors_); |
| |
| #if 0 |
| direct_globals_template * pfsptr;//pointer to the PFS |
| acb_standard_template * acb;//pointer to the acb for openFile_ |
| acb_request_template * acb_reqptr; |
| short reqIndex = -1; |
| short i; |
| |
| //first get pointer to the PFS |
| fs2_get_pfsaddr((Long*)&pfsptr); |
| |
| //then get pointer to the acb in the |
| //PFS Filetable at the index openFile_ |
| acb = (acb_standard_template *) pfsptr->file_table[openFile_]; |
| |
| void ** acb_reqaddrs = (void **)((char *)&(acb->req.acb_requestbase_addr) + |
| sizeof(acb->req.acb_requestbase_addr) ); |
| |
| // get a pointer to the acb data for this request (search through array |
| // of outstanding requests,, to get the request id that matches this one, |
| // that was just completed). This is a fix for a bug - earlier, it was |
| // just assumed that the request completed was the last request in the |
| // array - this may not be the case, as requests may be completed out |
| // of order. This bug lead to %4100 halts (BR #85). The fix to search |
| // the array for the completed request, along with the fix to "shrink" the |
| // array (see code below) solves this bug by doing what the Enscribe |
| // Filesystem does in AWAITIOX. |
| for (i=0; i <= acb->acb_numreqs; i++) |
| { |
| acb_reqptr = (acb_request_template *)acb_reqaddrs[i]; |
| if (acb_reqptr->mid.acb_mid == msgid) |
| { |
| reqIndex = i; |
| break; |
| } |
| } |
| |
| //set msgid to 0 |
| acb_reqptr->mid.acb_mid = 0; |
| acb_reqptr->acb_reqrdy = 1; |
| |
| if (acb_reqptr->tub.acb_tubaddr != 0) |
| acb_reqptr->tub.acb_tub->pending_count = |
| (ULng32)(acb_reqptr->tub.acb_tub->pending_count) - 1u; |
| |
| //decrement the numreqs |
| acb->acb_numreqs = acb->acb_numreqs - 1; |
| |
| // Move all requests after this one to be before this one. This "shrinking" |
| // of the request array is done to make it efficient to find a slot while |
| // sending a new request (free slots are always at the end of the array) |
| if ((acb->acb_numreqs > 0) && (reqIndex != acb->acb_numreqs)) |
| { |
| i = reqIndex; |
| while (i != acb->acb_numreqs) |
| { |
| acb_reqaddrs[i] = acb_reqaddrs[i + 1]; |
| i = i + 1; |
| } |
| acb_reqaddrs[acb->acb_numreqs] = (void *)acb_reqptr; |
| } |
| |
| if (error == FECPUFAIL || |
| error == FEPATHDOWN || |
| (error >= FENETERR && error <= 255) |
| ) |
| { |
| // abort the transaction if a path error is received. |
| if ((transid != NULL) && (*transid != -1)) |
| { |
| // Want to stop the transaction on path errors. This is |
| // to fix the bug reported in solution 10-030508-6267. |
| if (abortXnOnPathErrors_) |
| { |
| TMFLIBFS_ABORTTRANS_( (int_16 *)transid, |
| FETRANSABRTOWNERDIED ); |
| } |
| } |
| } |
| #endif |
| } |
| |
| void GuaMsgConnectionToServer::setFatalError(IpcMessageStreamBase *msgStream) |
| { |
| if (guaErrorInfo_ == GuaOK) // if error hasn't been set yet |
| guaErrorInfo_ = GuaIpcApplicationErr; |
| |
| setState(ERROR_STATE); |
| |
| // we must set error info to -1 so receive callback knows not to |
| // parse the potentially corrupted receive queue buffer. |
| setErrorInfo(-1); |
| |
| // handleIOErrorForStream() may put the message buffer on receive queue. |
| // and that buffer may have the following content: |
| // |
| // - the actual full reply from server, or |
| // - the send buffer if send failed, or |
| // - partial send or reply if send/reply was multi-chunk and did not |
| // complete |
| // |
| handleIOErrorForStream(msgStream); |
| |
| // This next call only works if GuaMsgConnectionToServer inherits |
| // directly from IpcConnection. |
| IpcConnection::setFatalError(msgStream); |
| } |
| |
| void GuaMsgConnectionToServer::addSendCallbackBuffer(IpcMessageBuffer *buffer) |
| { |
| for (unsigned short i = 0; i < nowaitDepth_; i++) |
| { |
| if (!sendCallbackBufferList_[i]) |
| { |
| sendCallbackBufferList_[i] = buffer; |
| return; |
| } |
| } |
| } |
| |
| NABoolean GuaMsgConnectionToServer::removeSendCallbackBuffer(IpcMessageBuffer *buffer) |
| { |
| for (unsigned short i = 0; i < nowaitDepth_; i++) |
| { |
| if (sendCallbackBufferList_[i] == buffer) |
| { |
| sendCallbackBufferList_[i] = NULL; |
| return TRUE; |
| } |
| } |
| |
| return FALSE; |
| } |
| |
| void GuaMsgConnectionToServer::handleIOError() |
| { |
| // connection no longer usable due to I/O error. abort all existing I/Os |
| // on this connection. |
| for (unsigned short i = 0; i < nowaitDepth_; i++) |
| { |
| ActiveIOQueueEntry &entry = activeIOs_[i]; |
| if (entry.inUse_) |
| handleIOErrorForEntry(entry); |
| } |
| |
| assert(partiallySentBuffer_ == NULL); |
| assert(partiallyReceivedBuffer_ == NULL); |
| |
| currentEntry_ = 0; |
| numOutstandingIOs_ = 0; |
| } |
| |
| void GuaMsgConnectionToServer::handleIOErrorForStream(IpcMessageStreamBase *msgStream) |
| { |
| // abort all existing I/Os on this connection that are from the given stream |
| for (unsigned short i = 0; i < nowaitDepth_; i++) |
| { |
| ActiveIOQueueEntry &entry = activeIOs_[i]; |
| if (entry.inUse_ && entry.buffer_->getMessageStream() == msgStream) |
| handleIOErrorForEntry(entry); |
| } |
| } |
| |
| // |
| // The I/O entry has a write buffer (entry.buffer_) and a read buffer |
| // (entry.readBuffer_). For entry description after message send, see comments |
| // at the top of tryToStartNewIO(). For entry description after receive reply, |
| // see comments at the top of wait(). |
| // |
| void GuaMsgConnectionToServer::handleIOErrorForEntry(ActiveIOQueueEntry &entry) |
| { |
| // I/O error occurred on given entry during send or receive |
| |
| if (getState() != ERROR_STATE) |
| setState(ERROR_STATE); |
| |
| if (getErrorInfo() == 0) |
| setErrorInfo(-1); |
| |
| // clean up I/O entry |
| if (entry.inUse_) |
| cleanUpActiveIOEntry(entry); |
| |
| // abort all existing I/Os on the same message stream |
| // - what about I/Os on other streams? |
| for (unsigned short i = 0; i < nowaitDepth_; i++) |
| { |
| ActiveIOQueueEntry &nextEntry = activeIOs_[i]; |
| |
| if (nextEntry.inUse_ && |
| nextEntry.buffer_->getMessageStream() == |
| entry.buffer_->getMessageStream()) |
| { |
| cleanUpActiveIOEntry(nextEntry); |
| |
| // special case: for multi-chunk shared buffer, if nextEntry is |
| // first chunk and entry is any chunk after first chunk, we need |
| // to free the read buffer of the first chunk since all other chunks |
| // have their read buffers set to NULL. |
| if (nextEntry.readBuffer_ && |
| nextEntry.readBuffer_ != nextEntry.buffer_) |
| // nextEntry is the first chunk of a multi-chunk shared buffer |
| nextEntry.readBuffer_->decrRefCount(getEnvironment()); |
| } |
| } // for i |
| |
| // clear partial send/receive buffer on the same message stream so to |
| // prevent further I/Os. |
| if (partiallySentBuffer_ && |
| partiallySentBuffer_->getMessageStream() == |
| entry.buffer_->getMessageStream()) |
| partiallySentBuffer_ = NULL; |
| else if (partiallyReceivedBuffer_ && |
| partiallyReceivedBuffer_->getMessageStream() == |
| entry.buffer_->getMessageStream()) |
| partiallyReceivedBuffer_ = NULL; |
| |
| // if entry is the first chunk of a multi-chunk shared buffer, we need to |
| // free its read buffer. |
| if (entry.readBuffer_ && entry.readBuffer_ != entry.buffer_) |
| entry.readBuffer_->decrRefCount(getEnvironment()); |
| |
| // put the buffer on receive queue, regardless of send succeeded or not. |
| queueReceiveMessage(entry.buffer_); |
| |
| // if entry.buffer_ is still on the sendCallbackBufferList_, then send |
| // callback has not been invoked for this connection and thus send failed. |
| NABoolean sendSuccess = !removeSendCallbackBuffer(entry.buffer_); |
| // in case of send failure invoke send callback |
| if (!sendSuccess) |
| entry.buffer_->callSendCallback(this); |
| |
| // the design is to disallow any future i/o after connection become error |
| // state. so we should cleanup send queue by invoking send callback for |
| // all send buffers. |
| IpcMessageBuffer *sendBuffer; |
| while (sendBuffer = removeNextSendBuffer()) |
| { |
| queueReceiveMessage(sendBuffer); |
| sendBuffer->callSendCallback(this); |
| } |
| } |
| |
| void GuaMsgConnectionToServer::cleanUpActiveIOEntry(ActiveIOQueueEntry &entry) |
| { |
| short oldstop = SETSTOP(2);//become unstoppable |
| |
| // abort if we are still waiting for message reply |
| if (entry.expectReply_) |
| { |
| MSG_ABANDON2_((NSK_msId2)entry.msgid_); |
| resetAfterReply(entry.msgid_, 0, NULL); |
| } |
| |
| addressUnwire(entry); |
| |
| SETSTOP(oldstop);//become stoppable |
| |
| entry.expectReply_ = FALSE; |
| entry.inUse_ = FALSE; |
| entry.msgid_ = 0; |
| entry.transid_ = -1; |
| |
| numOutstandingIOs_--; |
| } |
| |
| // lock memory used for the actual message send/reply |
| short GuaMsgConnectionToServer::addressWire(ActiveIOQueueEntry &entry, |
| short wireOptions) |
| { |
| // |
| // no need to lock control buffer in this method. it should have been locked |
| // already by the caller. |
| // |
| |
| // |
| // lock memory for write/send buffer. write buffer may be shared. |
| // |
| short GuardianError = 0; |
| if (entry.buffer_->isShared()) |
| { |
| // write buffer is shared by multiple connections |
| if (entry.buffer_->getLockCount(entry.offset_) == 0) |
| { |
| // if the write buffer chunk is not locked, lock it |
| GuardianError = ADDRESS_WIRE_ |
| ((unsigned char *)entry.buffer_->data(entry.offset_), |
| entry.bytesSent_, wireOptions); |
| if (GuardianError) |
| return GuardianError; |
| } |
| |
| // increment lock count for shared write bufer |
| entry.buffer_->incrLockCount(entry.offset_); |
| } |
| else |
| { |
| // write buffer is only used by a single connection |
| // entry.buffer_ = entry.readBuffer_ = send buffer |
| ULng32 maxDataBufferLength = |
| MAXOF(entry.receiveBufferSizeLeft_, entry.bytesSent_); |
| GuardianError= ADDRESS_WIRE_ |
| ((unsigned char *)entry.buffer_->data(entry.offset_), |
| maxDataBufferLength, wireOptions); |
| if (GuardianError) |
| return GuardianError; |
| } |
| |
| // |
| // lock memory for read/reply buffer. reply buffer is never shared. |
| // |
| if (entry.readBuffer_ && entry.readBuffer_ != entry.buffer_) |
| { |
| // entry.buffer_ = send buffer (shared) |
| // entry.readBuffer_ = reply buffer |
| assert(entry.buffer_->isShared()); |
| GuardianError = ADDRESS_WIRE_ |
| ((unsigned char *)entry.readBuffer_->data(0), |
| entry.receiveBufferSizeLeft_, wireOptions); |
| if (GuardianError) |
| { |
| // if error, unlock write buffer that must be shared |
| if (entry.buffer_->decrLockCount(entry.offset_) == 0) |
| // no more I/O on the write buffer. unlock its memory. |
| ADDRESS_UNWIRE_((unsigned char *)entry.buffer_->data(entry.offset_), |
| entry.bytesSent_, wireOptions); |
| |
| return GuardianError; |
| } |
| } |
| |
| return GuardianError; |
| } |
| |
| // unlock memory used for the actual message send/reply |
| void GuaMsgConnectionToServer::addressUnwire(ActiveIOQueueEntry &entry) |
| { |
| // |
| // unlock memory for control buffer |
| // |
| short wireOptions = 8; |
| ADDRESS_UNWIRE_((unsigned char *)entry.controlBuf_, sizeof(fs_fs_writeread), wireOptions); |
| |
| // |
| // unlock memory for write/send buffer. write buffer may be shared. |
| // |
| if (entry.buffer_->isShared()) |
| { |
| // write buffer is shared by multiple connections |
| if (entry.buffer_->decrLockCount(entry.offset_) == 0) |
| // no more I/O on the write buffer. unlock its memory. |
| ADDRESS_UNWIRE_((unsigned char *)entry.buffer_->data(entry.offset_), |
| entry.bytesSent_, wireOptions); |
| } |
| else |
| { |
| // write buffer is only used by a single connection |
| // entry.buffer_ = entry.readBuffer_ = send buffer |
| ULng32 maxDataBufferLength = |
| MAXOF(entry.receiveBufferSizeLeft_, entry.bytesSent_); |
| ADDRESS_UNWIRE_((unsigned char *)entry.buffer_->data(entry.offset_), |
| maxDataBufferLength, wireOptions); |
| } |
| |
| // |
| // unlock memory for read/reply buffer. reply buffer is never shared. |
| // |
| if (entry.readBuffer_ && entry.readBuffer_ != entry.buffer_) |
| ADDRESS_UNWIRE_((unsigned char *)entry.readBuffer_->data(0), |
| entry.receiveBufferSizeLeft_, wireOptions); |
| } |
| |
| #undef _resident |
| #undef _priv |