| /********************************************************************** |
| // @@@ START COPYRIGHT @@@ |
| // |
| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| // |
| // @@@ END COPYRIGHT @@@ |
| **********************************************************************/ |
| /* -*-C++-*- |
| ***************************************************************************** |
| * |
| * File: Ipc.C |
| * Description: Implementation for process communication services on the |
| * low (executor) level without using C++ run time library |
| * |
| * Created: 10/6/95 |
| * Language: C++ |
| * |
| ***************************************************************************** |
| */ |
| |
| |
| #define AEVENT 1 |
| #define CLI_DLL |
| |
| //#define IPC_INTEGRITY_CHECKING 1 // for debugging purposes |
| |
| #include "Platform.h" |
| #include "ComASSERT.h" |
| #include "ComDiags.h" |
| #include "logmxevent.h" |
| #include "ExCollections.h" |
| #include "Ipc.h" |
| #include "ipcmsg.h" |
| #include "str.h" |
| #include "HeapLog.h" |
| #include "ComRtUtils.h" |
| #include "PortProcessCalls.h" |
| |
| #include <time.h> |
| #include <sys/time.h> |
| #include "seabed/pctl.h" |
| #include "seabed/ms.h" |
| #include "seabed/int/opts.h" |
| |
| #include <unistd.h> // for getpid() |
| |
| #include "Globals.h" |
| #include "Context.h" |
| #include "MXTraceDef.h" |
| #include "ExSMTrace.h" |
| |
| #include "SMConnection.h" |
| #include "ExSMCommon.h" |
| #include "ExSMGlobals.h" |
| #include "ExSMReadyList.h" |
| #include "ExSMTask.h" |
| |
| NABoolean XAWAITIOX_MINUS_ONE = TRUE; |
| |
| #include "ComCextdecs.h" |
| #include "Ex_esp_msg.h" |
| |
| #ifndef FS_MAX_NOWAIT_DEPTH |
| #define FS_MAX_NOWAIT_DEPTH 16 |
| #endif |
| |
| // ----------------------------------------------------------------------- |
| // Methods for class IpcNodeName |
| // ----------------------------------------------------------------------- |
| |
| IpcNodeName::IpcNodeName(IpcNetworkDomain dom, |
| const char *name) |
| { |
| domain_ = dom; |
| |
| if (domain_ == IPC_DOM_INTERNET) |
| { |
| SockIPAddress extIPAddr; |
| SockErrNo lookupResult = extIPAddr.set(name); |
| |
| if (lookupResult.hasError()) |
| ABORT("Node name not found"); |
| ipAddr_ = extIPAddr.getRawAddress(); |
| } |
| else |
| { |
| assert(domain_ == IPC_DOM_GUA_PHANDLE); |
| assert(str_len(name) <= GuaNodeNameMaxLen); |
| str_pad(guardianNode_.nodeName_,GuaNodeNameMaxLen,' '); |
| str_cpy_all(guardianNode_.nodeName_, |
| name, |
| str_len(name)); |
| } |
| } |
| |
| IpcNodeName::IpcNodeName(const SockIPAddress &iPNode) |
| { |
| domain_ = IPC_DOM_INTERNET; |
| ipAddr_ = iPNode.getRawAddress(); |
| } |
| |
| IpcNodeName & IpcNodeName::operator = (const IpcNodeName &other) |
| { |
| domain_ = other.domain_; |
| if (domain_ == IPC_DOM_INTERNET) |
| ipAddr_ = other.ipAddr_; |
| else |
| guardianNode_ = other.guardianNode_; |
| |
| return *this; |
| } |
| |
| NABoolean IpcNodeName::operator == (const IpcNodeName &other) |
| { |
| if (domain_ != other.domain_) |
| return FALSE; |
| |
| if (domain_ == IPC_DOM_INTERNET) |
| { |
| return (str_cmp((char *) &ipAddr_, |
| (char *) &other.ipAddr_, |
| sizeof(ipAddr_)) == 0); |
| } |
| else if (domain_ == IPC_DOM_GUA_PHANDLE) |
| { |
| return (str_cmp(guardianNode_.nodeName_, |
| other.guardianNode_.nodeName_, |
| GuaNodeNameMaxLen) == 0); |
| } |
| else |
| return FALSE; |
| } |
| |
| SockIPAddress IpcNodeName::getIPAddress() const |
| { |
| assert(domain_ == IPC_DOM_INTERNET); |
| return SockIPAddress(ipAddr_); |
| } |
| |
| // ----------------------------------------------------------------------- |
| // Methods for class GuaProcessHandle |
| // ----------------------------------------------------------------------- |
| |
| NABoolean GuaProcessHandle::operator == (const GuaProcessHandle &other) const |
| { |
| // call a system procedure to compare |
| return compare(other); |
| } |
| |
| void GuaProcessHandle::dumpAndStop(bool doDump, bool doStop) const |
| { |
| char coreFile[1024]; |
| NAProcessHandle phandle((SB_Phandle_Type *)&phandle_); |
| phandle.decompose(); |
| if (doDump) |
| msg_mon_dump_process_name(NULL, phandle.getPhandleString(), coreFile); |
| if (doStop) |
| msg_mon_stop_process_name(phandle.getPhandleString()); |
| } |
| |
| // ----------------------------------------------------------------------- |
| // Methods for class IpcProcessId |
| // ----------------------------------------------------------------------- |
| |
| IpcProcessId::IpcProcessId() : IpcMessageObj(IPC_PROCESS_ID, |
| IpcCurrProcessIdVersion) |
| { |
| domain_ = IPC_DOM_INVALID; |
| } |
| |
| IpcProcessId::IpcProcessId( |
| const GuaProcessHandle &phandle) : IpcMessageObj(IPC_PROCESS_ID, |
| IpcCurrProcessIdVersion) |
| { |
| domain_ = IPC_DOM_GUA_PHANDLE; |
| phandle_ = phandle; |
| } |
| |
| IpcProcessId::IpcProcessId( |
| const SockIPAddress &ipAddr, |
| SockPortNumber port) : IpcMessageObj(IPC_PROCESS_ID, |
| IpcCurrProcessIdVersion) |
| { |
| // only an internet node name goes together with an IP address and a port |
| domain_ = IPC_DOM_INTERNET; |
| pid_.ipAddress_ = ipAddr.getRawAddress(); |
| pid_.listnerPort_ = port; |
| } |
| |
| IpcProcessId::IpcProcessId(const char *asciiRepresentation) : |
| IpcMessageObj(IPC_PROCESS_ID, |
| IpcCurrProcessIdVersion) |
| { |
| domain_ = IPC_DOM_INVALID; |
| |
| // On NSK, try to interpret the string as a PHANDLE first |
| if (phandle_.fromAscii(asciiRepresentation)) |
| domain_ = IPC_DOM_GUA_PHANDLE; |
| |
| if (domain_ == IPC_DOM_INVALID) |
| { |
| // try to decode an internet address, followed by a port number, |
| Int32 colonPos = 0; |
| while (asciiRepresentation[colonPos] != 0 AND |
| asciiRepresentation[colonPos] != ':') |
| colonPos++; |
| |
| if (asciiRepresentation[colonPos] == ':') |
| { |
| char asciiIpAddr[300]; |
| SockIPAddress ipAddr; |
| |
| assert(colonPos < 300); |
| str_cpy_all(asciiIpAddr,asciiRepresentation,colonPos); |
| asciiIpAddr[colonPos] = 0; |
| SockErrNo sen = ipAddr.set(asciiIpAddr); |
| |
| if (NOT sen.hasError()) |
| { |
| pid_.ipAddress_ = ipAddr.getRawAddress(); |
| // now parse the port number |
| ULng32 portNo = 0; |
| colonPos++; |
| while (asciiRepresentation[colonPos] >= '0' AND |
| asciiRepresentation[colonPos] <= '9') |
| { |
| portNo = portNo * 10 + (asciiRepresentation[colonPos] - '0'); |
| } |
| pid_.listnerPort_ = portNo; |
| domain_ = IPC_DOM_INTERNET; |
| } |
| } |
| } |
| } |
| |
| IpcProcessId::IpcProcessId( |
| const IpcProcessId &other) : IpcMessageObj(other.getType(), |
| other.getVersion()) |
| { |
| domain_ = other.domain_; |
| |
| if (domain_ == IPC_DOM_GUA_PHANDLE) |
| phandle_ = other.phandle_; |
| else if (domain_ == IPC_DOM_INTERNET) |
| pid_ = other.pid_; |
| } |
| |
| IpcProcessId & IpcProcessId::operator = (const IpcProcessId &other) |
| { |
| domain_ = other.domain_; |
| |
| if (domain_ == IPC_DOM_GUA_PHANDLE) |
| phandle_ = other.phandle_; |
| else if (domain_ == IPC_DOM_INTERNET) |
| pid_ = other.pid_; |
| |
| return *this; |
| } |
| |
| NABoolean IpcProcessId::operator == (const IpcProcessId &other) const |
| { |
| if (domain_ != other.domain_) |
| return FALSE; |
| |
| return ((domain_ == IPC_DOM_GUA_PHANDLE AND |
| phandle_ == other.phandle_) |
| OR |
| (domain_ == IPC_DOM_INTERNET AND |
| str_cmp((char *) pid_.ipAddress_.ipAddress_, |
| (char *) other.pid_.ipAddress_.ipAddress_, |
| 4) == 0 AND |
| pid_.listnerPort_ == other.pid_.listnerPort_)); |
| } |
| |
| NABoolean IpcProcessId::match(const IpcNodeName &name, |
| IpcCpuNum cpuNum) const |
| { |
| if (domain_ == IPC_DOM_INTERNET) |
| { |
| // IP addresses don't tell the CPU, just compare a normalized |
| // form of the IP addresses |
| return (getNodeName() == name); |
| } |
| else if (domain_ == IPC_DOM_GUA_PHANDLE) |
| { |
| // if the caller cares about CPU number then compare this |
| // first |
| if (cpuNum != IPC_CPU_DONT_CARE AND cpuNum != getCpuNum()) |
| return FALSE; |
| |
| // compare the node names |
| return (getNodeName() == name); |
| } |
| else |
| return FALSE; |
| } |
| |
| SockIPAddress IpcProcessId::getIPAddress() const |
| { |
| assert(domain_ == IPC_DOM_INTERNET); |
| |
| return SockIPAddress(pid_.ipAddress_); |
| } |
| |
| SockPortNumber IpcProcessId::getPortNumber() const |
| { |
| assert(domain_ == IPC_DOM_INTERNET); |
| |
| return pid_.listnerPort_; |
| } |
| |
| const GuaProcessHandle & IpcProcessId::getPhandle() const |
| { |
| assert(domain_ == IPC_DOM_GUA_PHANDLE); |
| |
| return phandle_; |
| } |
| |
| IpcNodeName IpcProcessId::getNodeName() const |
| { |
| // getting to the node name is somewhat convoluted, sorry about that |
| if (domain_ == IPC_DOM_INTERNET) |
| { |
| return IpcNodeName(SockIPAddress(pid_.ipAddress_)); |
| } |
| else if (domain_ == IPC_DOM_GUA_PHANDLE) |
| { |
| return IpcNodeName(phandle_); |
| } |
| else |
| ABORT("Can't get node name of an invalid process id"); |
| |
| // the return statement is here so this file will compile under VC++4.1 |
| // what is actually returned is of little consequence since the ABORT makes |
| // sure we never get to return. |
| // Perhaps we should set pid_.ipAddress_ = some meaningless ip address?; |
| |
| return IpcNodeName(SockIPAddress(pid_.ipAddress_)); |
| } |
| |
| IpcCpuNum IpcProcessId::getCpuNum() const |
| { |
| if (domain_ == IPC_DOM_GUA_PHANDLE) |
| { |
| // ask Guardian to get the CPU number out of the phandle |
| return getCpuNumFromPhandle(); |
| } |
| else |
| { |
| // for the internet we don't have control over the assignment of CPU |
| // numbers, return a don't care value |
| return IPC_CPU_DONT_CARE; |
| } |
| } |
| |
| std::string IpcProcessId::toString() const |
| { |
| char outb[100]; |
| |
| toAscii(outb, sizeof(outb)); |
| return outb; |
| } |
| |
| Int32 IpcProcessId::toAscii(char *outBuf, Int32 outBufLen) const |
| { |
| // process names shouldn't be longer than 300 bytes |
| char outb[300] = ""; // Initialize in case this is called |
| Int32 outLen = 0; |
| |
| if (domain_ == IPC_DOM_GUA_PHANDLE) |
| { |
| outLen = phandle_.toAscii(outb,300); |
| } |
| |
| if (domain_ == IPC_DOM_INTERNET) |
| { |
| sprintf(outb,"%d.%d.%d.%d:%d", |
| pid_.ipAddress_.ipAddress_[0], |
| pid_.ipAddress_.ipAddress_[1], |
| pid_.ipAddress_.ipAddress_[2], |
| pid_.ipAddress_.ipAddress_[3], |
| pid_.listnerPort_); |
| outLen = str_len(outb); |
| } |
| |
| // copy the result and terminate it with a NUL character |
| str_cpy_all(outBuf,outb,MINOF(outLen,outBufLen-1)); |
| outBuf[MINOF(outLen,outBufLen-1)] = 0; |
| |
| // return the actual length or the length we would need |
| return outLen; |
| } |
| |
| void IpcProcessId::addProcIdToDiagsArea(ComDiagsArea &diags, |
| Int32 stringno) const |
| { |
| char asciiProcId[300]; |
| |
| toAscii(asciiProcId,300); |
| switch (stringno) |
| { |
| case 0: |
| diags << DgString0(asciiProcId); |
| break; |
| case 1: |
| diags << DgString1(asciiProcId); |
| break; |
| case 2: |
| diags << DgString2(asciiProcId); |
| break; |
| case 3: |
| diags << DgString3(asciiProcId); |
| break; |
| case 4: |
| diags << DgString4(asciiProcId); |
| break; |
| default: |
| ABORT("Invalid string no in IpcProcessId::addProcIdToDiagsArea"); |
| } |
| } |
| |
| IpcConnection * IpcProcessId::createConnectionToServer( |
| IpcEnvironment *env, |
| NABoolean usesTransactions, |
| Lng32 maxNowaitRequests, |
| NABoolean parallelOpen, |
| Int32 *openCompletionScheduled |
| , |
| NABoolean dataConnectionToEsp |
| ) const |
| { |
| NABoolean useGuaIpc = TRUE; |
| |
| if (domain_ == IPC_DOM_INTERNET) |
| { |
| usesTransactions = usesTransactions; // make compiler happy |
| return new(env->getHeap()) SockConnection(env,*this,FALSE); |
| } |
| else if (domain_ == IPC_DOM_GUA_PHANDLE) |
| { |
| return new(env->getHeap()) GuaConnectionToServer(env, |
| *this, |
| usesTransactions, |
| (unsigned short) maxNowaitRequests, |
| eye_GUA_CONNECTION_TO_SERVER, |
| parallelOpen, |
| openCompletionScheduled |
| , |
| dataConnectionToEsp |
| ); |
| } |
| else |
| { |
| return NULL; |
| } |
| } |
| |
| IpcMessageObjSize IpcProcessId::packedLength() |
| { |
| // we pack the domain type and then the phandle or socket process id |
| IpcMessageObjSize result = baseClassPackedLength() + sizeof(domain_); |
| result += sizeof(spare_); |
| |
| if (domain_ == IPC_DOM_GUA_PHANDLE) |
| { |
| result += sizeof(phandle_); |
| } |
| else if (domain_ == IPC_DOM_INTERNET) |
| { |
| result += sizeof(pid_); |
| } |
| |
| return result; |
| } |
| |
| IpcMessageObjSize IpcProcessId::packObjIntoMessage(IpcMessageBufferPtr buffer) |
| { |
| // pack base class and domain info |
| IpcMessageObjSize result = packBaseClassIntoMessage(buffer); |
| str_cpy_all(buffer,(const char *) &domain_, sizeof(domain_)); |
| result += sizeof(domain_); |
| buffer += sizeof(domain_); |
| result += sizeof(spare_); |
| buffer += sizeof(spare_); |
| |
| // --------------------------------------------------------------------- |
| // NOTE: this code assumes that the OS dependent information (phandle |
| // and socket pid) can be sent to another process as a byte string!!!! |
| // --------------------------------------------------------------------- |
| |
| // pack the object of the right domain |
| if (domain_ == IPC_DOM_GUA_PHANDLE) |
| { |
| str_cpy_all(buffer,(const char *) &phandle_,sizeof(phandle_)); |
| result += sizeof(phandle_); |
| } |
| else if (domain_ == IPC_DOM_INTERNET) |
| { |
| str_cpy_all(buffer,(const char *) &pid_,sizeof(pid_)); |
| result += sizeof(pid_); |
| } |
| |
| return result; |
| } |
| |
| void IpcProcessId::unpackObj(IpcMessageObjType objType, |
| IpcMessageObjVersion objVersion, |
| NABoolean sameEndianness, |
| IpcMessageObjSize objSize, |
| IpcConstMessageBufferPtr buffer) |
| { |
| assert(objType == IPC_PROCESS_ID AND |
| objVersion == IpcCurrProcessIdVersion AND |
| sameEndianness); |
| |
| unpackBaseClass(buffer); |
| |
| str_cpy_all((char *) &domain_, buffer, sizeof(domain_)); |
| buffer += sizeof(domain_); |
| buffer += sizeof(spare_); |
| |
| // check the supplied length |
| assert(objSize == packedLength()); |
| |
| if (domain_ == IPC_DOM_GUA_PHANDLE) |
| { |
| str_cpy_all((char *) &phandle_, buffer, sizeof(phandle_)); |
| } |
| else if (domain_ == IPC_DOM_INTERNET) |
| { |
| str_cpy_all((char *) &pid_, buffer, sizeof(pid_)); |
| } |
| } |
| |
| // ----------------------------------------------------------------------- |
| // Methods for class IpcServer |
| // ----------------------------------------------------------------------- |
| |
| IpcServer::IpcServer(IpcConnection *controlConnection, |
| IpcServerClass *serverClass) |
| { |
| controlConnection_ = controlConnection; |
| serverClass_ = serverClass; |
| str_pad(progFileName_,IpcMaxGuardianPathNameLength,0); |
| } |
| |
| IpcServer::~IpcServer() |
| { |
| logEspRelease(__FILE__, __LINE__); |
| if (controlConnection_) |
| { |
| #ifdef IPC_INTEGRITY_CHECKING |
| IpcEnvironment * ie = controlConnection_->getEnvironment(); |
| IpcAllConnections * allc = ie->getAllConnections(); |
| |
| ie->checkIntegrity(); |
| #endif |
| stop(); |
| delete controlConnection_; |
| controlConnection_ = NULL; |
| |
| #ifdef IPC_INTEGRITY_CHECKING |
| ie->checkIntegrity(); |
| #endif |
| } |
| } |
| |
| void IpcServer::release() |
| { |
| serverClass_->freeServerProcess(this); |
| } |
| |
| void IpcServer::stop() |
| { |
| // TBD $$$$ (in implementations for derived classes) |
| } |
| |
| IpcGuardianServer *IpcServer::castToIpcGuardianServer() |
| { |
| // IpcGuardianServer::castToIpcGuardianServer() returns a non-null value |
| return NULL; |
| } |
| |
| void IpcServer::logEspRelease(const char * filename, int lineNum, |
| const char *msg) |
| { |
| IpcConnection *cc = controlConnection_; |
| if (cc && |
| cc->getEnvironment() && |
| cc->getEnvironment()->getLogReleaseEsp()) |
| { |
| /* |
| Coverage notes: to test this code in a dev regression requires |
| changing $TRAF_HOME/etc/ms.env. However, it was tested in |
| stress test on May 10, 2012. |
| */ |
| char logMsg[500]; |
| |
| // get the other end's name. |
| char espName[32]; |
| Int32 pnameLen = cc->getOtherEnd().getPhandle().toAscii( |
| espName, sizeof(espName)); |
| espName[pnameLen] = '\0'; |
| |
| // get the error #, if available. Else, use -99. |
| GuaErrorNumber guaError = -99; |
| if (cc->castToGuaConnectionToServer()) |
| guaError = cc->castToGuaConnectionToServer()->getGuardianError(); |
| |
| // get replySeqNum_ and state_ |
| ULng32 replySeqNum = cc->getReplySeqNum(); |
| |
| char *state = (char *) "No State"; |
| switch (cc->getState()) |
| { |
| case IpcConnection::INITIAL: |
| state = (char *) "INITIAL"; |
| break; |
| case IpcConnection::OPENING: |
| state = (char *) "OPENING"; |
| break; |
| case IpcConnection::ESTABLISHED: |
| state = (char *) "ESTABLISHED"; |
| break; |
| case IpcConnection::SENDING: |
| state = (char *) "SENDING"; |
| break; |
| case IpcConnection::REPLY_PENDING: |
| state = (char *) "REPLY_PENDING"; |
| break; |
| case IpcConnection::RECEIVING: |
| state = (char *) "RECEIVING"; |
| break; |
| case IpcConnection::CANCELLING: |
| state = (char *) "CANCELLING"; |
| break; |
| case IpcConnection::ERROR_STATE: |
| state = (char *) "ERROR_STATE"; |
| break; |
| case IpcConnection::CLOSED: |
| state = (char *) "CLOSED"; |
| break; |
| } |
| |
| if (msg) |
| str_sprintf(logMsg, |
| "Releasing ESP %s , %s," |
| "guaError = %d, replySeqNum = %d, state = %s", |
| espName, |
| msg, |
| guaError, |
| replySeqNum, |
| state); |
| else |
| str_sprintf(logMsg, |
| "Releasing ESP %s ," |
| "guaError = %d, replySeqNum = %d, state = %s", |
| espName, |
| guaError, |
| replySeqNum, |
| state); |
| |
| SQLMXLoggingArea::logExecRtInfo(filename, lineNum, logMsg, 0); |
| } |
| } |
| |
| // ----------------------------------------------------------------------- |
| // Methods for class IpcConnection |
| // ----------------------------------------------------------------------- |
| |
| IpcConnection::IpcConnection(IpcEnvironment *env, |
| const IpcProcessId &pid, |
| const char *eye) : |
| otherEnd_(pid), |
| sendQueue_(env->getHeap()), |
| receiveQueue_(env->getHeap()), |
| replySeqNum_(0), |
| recvStreams_(env->getHeap()), |
| stopWait_(FALSE), |
| trustIncomingBuffers_(TRUE), |
| ipcMsgBufCheckFailed_(FALSE), |
| breakReceived_(FALSE), |
| lastTraceIndex_(NumIpcConnTraces-1), |
| fileNumForIOCompletion_(InvalidGuaFileNumber), |
| sendPersistentOpenReconnect_(FALSE) |
| { |
| // --------------------------------------------------------------------- |
| // Copy the eye catcher |
| // --------------------------------------------------------------------- |
| str_cpy_all((char *) &eyeCatcher_, eye, 4); |
| |
| state_ = INITIAL; |
| environment_ = env; |
| |
| IpcAllConnections *allConn = environment_->getAllConnections(); |
| |
| // find a free entry in the array that stores pointers to all connections |
| id_ = allConn->unusedIndex(); |
| |
| // insert this connection into the free slot and remember the slot number |
| allConn->insertAt(id_,this); |
| |
| clearErrorInfo(); |
| MXTRC_2("IpcConnection::IpcConnection id=%d this=%x\n", id_, this); |
| #ifdef IPC_INTEGRITY_CHECKING |
| cerr << "Just created IpcConnection " << (void *)this |
| << " and inserted into IpcAllConnections " << (void *)allConn |
| << "." << endl; |
| checkIntegrity(); |
| #endif |
| } |
| |
| IpcConnection::~IpcConnection() |
| { |
| MXTRC_1("IpcConnection::~IpcConnection id=%d\n", id_); |
| IpcAllConnections *allConn = environment_->getAllConnections(); |
| assert(!allConn->getPendingIOs().contains(id_)); |
| #ifdef IPC_INTEGRITY_CHECKING |
| checkIntegrity(); |
| #endif |
| |
| assert(sendQueue_.entries() == 0); |
| assert(receiveQueue_.entries() == 0); |
| |
| NABoolean couldRemove = allConn->remove(id_); |
| assert(couldRemove); |
| #ifdef IPC_INTEGRITY_CHECKING |
| cerr << "Just destroyed IpcConnection " << (void *)this |
| << " and removed from IpcAllConnections " << (void *)allConn |
| << "." << endl; |
| checkIntegrity(FALSE /* suppress orphan checking */); |
| #endif |
| } |
| |
| const char *IpcConnection::getConnectionStateString(IpcConnectionState s) |
| { |
| switch (s) |
| { |
| case INITIAL: return "INITIAL"; |
| case OPENING: return "OPENING"; |
| case ESTABLISHED: return "ESTABLISHED"; |
| case SENDING: return "SENDING"; |
| case REPLY_PENDING: return "REPLY_PENDING"; |
| case RECEIVING: return "RECEIVING"; |
| case CANCELLING: return "CANCELLING"; |
| case ERROR_STATE: return "ERROR_STATE"; |
| case CLOSED: return "CLOSED"; |
| default: return ComRtGetUnknownString((Int32) s); |
| } |
| } |
| |
| void IpcConnection::setState(IpcConnectionState s) |
| { |
| #ifdef IPC_INTEGRITY_CHECKING |
| checkIntegrity(); |
| #endif |
| |
| if (s == SENDING OR s == RECEIVING OR s == OPENING) |
| { |
| environment_->getAllConnections()->IOPending(id_); |
| } |
| else if (s != IpcConnection::ERROR_STATE) |
| { |
| environment_->getAllConnections()->IOComplete(id_); |
| } |
| else |
| { |
| // |
| // Upon reaching the ERROR_STATE state, we should only announce |
| // I/O completion if all the following are true: |
| // |
| // a) there are no more send queue entries |
| // b) no recv queue entry has a registered callback that |
| // has not been delivered |
| // |
| // If there are no send queue entries and no callbacks are pending |
| // when the ERROR state is reached then we announce completion. |
| // Otherwise, more I/O attempts will be made, and for each attempt |
| // we will detect the ERROR and transition to the ERROR state. |
| // Eventually we will transition to ERROR and there will be no more |
| // send queue entries nor callbacks pending, and at that point we |
| // can announce completion. |
| // |
| if (sendQueueEntries() == 0 && numReceiveCallbacksPending() == 0) |
| { |
| environment_->getAllConnections()->IOComplete(id_); |
| } |
| } |
| |
| if (state_ != s) |
| { |
| // Take care of tracing first. |
| if (++lastTraceIndex_ >= NumIpcConnTraces) |
| lastTraceIndex_ = 0; |
| traceState_[lastTraceIndex_].oldState_ = state_; |
| traceState_[lastTraceIndex_].mostRecentSendBuffer_ = lastSentBuffer_; |
| traceState_[lastTraceIndex_].mostRecentReceiveBuffer_ = lastReceivedBuffer_; |
| if (environment_->getLogTimeIpcConnectionState()) |
| clock_gettime(CLOCK_REALTIME, |
| &traceState_[lastTraceIndex_].stateChangeTime_); |
| |
| // Now a state change. |
| if (s == OPENING) |
| getEnvironment()->incrNumOpensInProgress(); |
| else if (state_ == OPENING) |
| getEnvironment()->decrNumOpensInProgress(); |
| state_ = s; |
| } |
| } |
| |
| NABoolean IpcConnection::moreWaitsAllowed() |
| { |
| return TRUE; |
| } |
| |
| SockConnection *IpcConnection::castToSockConnection() |
| { |
| return NULL; |
| } |
| |
| GuaConnectionToServer *IpcConnection::castToGuaConnectionToServer() |
| { |
| return NULL; |
| } |
| |
| GuaMsgConnectionToServer *IpcConnection::castToGuaMsgConnectionToServer() |
| { |
| return NULL; |
| } |
| |
| GuaConnectionToClient *IpcConnection::castToGuaConnectionToClient() |
| { |
| return NULL; |
| } |
| |
| SqlTableConnection *IpcConnection::castToSqlTableConnection() |
| { |
| return NULL; |
| } |
| |
| Int64 IpcConnection::getSqlTableTransid() |
| { |
| return -1; |
| } |
| |
| void IpcConnection::openPhandle(char * processName, NABoolean parallelOpen) |
| { |
| assert(FALSE); |
| } |
| |
| IpcMessageBuffer * IpcConnection::getNextSendQueueEntry() |
| { |
| IpcMessageBuffer *msgBuf = removeNextSendBuffer(); |
| if (msgBuf) |
| prepareSendBuffer(msgBuf); |
| return msgBuf; |
| } |
| |
| void IpcConnection::IOPending() |
| { |
| environment_->getAllConnections()->IOPending(id_); |
| } |
| |
| void IpcConnection::IOComplete() |
| { |
| environment_->getAllConnections()->IOComplete(id_); |
| } |
| |
| bool IpcConnection::isServerSide() |
| { |
| return false; |
| } |
| |
| IpcMessageBuffer *IpcConnection::removeNextSendBuffer() |
| { |
| IpcMessageBuffer *msgBuf = NULL; |
| if (sendQueue_.getFirst(msgBuf)) |
| return msgBuf; |
| return NULL; |
| } |
| |
| IpcMessageBuffer *IpcConnection::removeNextReceiveBuffer() |
| { |
| IpcMessageBuffer *msgBuf = NULL; |
| if (receiveQueue_.getFirst(msgBuf)) |
| return msgBuf; |
| return NULL; |
| } |
| |
| void IpcConnection::removeReceiveStreams() |
| { |
| for (CollIndex j = 0; j < recvStreams_.entries(); j++) |
| recvStreams_.removeAt(j); |
| } |
| |
| |
| |
| void IpcConnection::prepareSendBuffer(IpcMessageBuffer *msgBuf) |
| { |
| assert(msgBuf); |
| |
| // the client side does not send sequence number as we rely on the msg |
| // system to maintain the send order. |
| // |
| // the server side needs to send sequence number. this is because if the |
| // client side uses GuaMsgConnectionToServer, it picks up arrival reply |
| // msgs in random order, even though the replies are delivered in correct |
| // order by the msg system. |
| InternalMsgHdrInfoStruct *msgHdr = |
| (InternalMsgHdrInfoStruct*)(msgBuf->data(0)); |
| |
| if (isServerSide()) |
| { |
| msgHdr->setSeqNum(replySeqNum_); |
| // increment reply seq number for the server side |
| replySeqNum_++; |
| } |
| if (sendPersistentOpenReconnect_) |
| { |
| msgHdr->setSockReplyTag(PERSISTENT_OPEN_RECONNECT_CODE); |
| setSendPersistentOpenReconnect(FALSE); |
| |
| } |
| else |
| msgHdr->setSockReplyTag(0); |
| |
| } |
| |
| IpcMessageBuffer * IpcConnection::getNextReceiveQueueEntry() |
| { |
| if (receiveQueue_.entries() == 0) |
| return NULL; |
| |
| // here's the design of message sequence number: |
| // |
| // - the client side does not send sequence number. so the server side does |
| // not need to verify sequence number upon request arrival. the msg system |
| // ensures the msgs are delivered in their original send order. |
| // - the server sends the sequence number. so the client side needs to |
| // verify sequence number upon reply arrival. this is because if the |
| // client side uses GuaMsgConnectionToServer, it picks up arrival reply |
| // msgs in random order, even though the replies are delivered in correct |
| // order by the msg system. |
| // - in case of IPC error, we don't know if the server was able to properly |
| // assign a sequence number to the buffer, thus we bypass the sequence |
| // number check. |
| // |
| // note: the msg system guarantees that msgs are delivered in their send |
| // order, with the exception of msgs being delivered through different |
| // physical wires. for example, in case of servernet error, msgs could get |
| // re-routed through the expand network. |
| // |
| if (isServerSide() || getState() == ERROR_STATE) |
| { |
| // this is on the server side, or we got error. |
| // no need to verify reply seq number. |
| IpcMessageBuffer *msgBuf = receiveQueue_[0]; |
| for (CollIndex j = 0; j < recvStreams_.entries(); j++) |
| { |
| if ((msgBuf->getMessageStream() == NULL) || |
| (msgBuf->getMessageStream() == recvStreams_[j]) ) |
| { |
| // found a valid msg on receive queue |
| receiveQueue_.removeAt(0); |
| msgBuf->addCallback(recvStreams_[j]); |
| recvStreams_.removeAt(j); |
| return msgBuf; |
| } |
| } |
| |
| // if we reach here it means the next msg on receive queue does not |
| // match any receiving streams. we must wait for message stream to |
| // ask for it. |
| } |
| else |
| { |
| // this is on the client side and we did not get error. |
| // we need to verify the reply seq number. |
| for (CollIndex i = 0; i < receiveQueue_.entries(); i++) |
| { |
| IpcMessageBuffer *msgBuf = receiveQueue_[i]; |
| |
| // unpack message header which contains the sequence number |
| InternalMsgHdrInfoStruct* msgHdr = |
| (InternalMsgHdrInfoStruct*)(msgBuf->data(0)); |
| if (msgHdr->getSeqNum() == replySeqNum_) |
| { |
| // the next msg on receive queue is the expected reply |
| for (CollIndex j = 0; j < recvStreams_.entries(); j++) |
| { |
| if ((msgBuf->getMessageStream() == NULL) || |
| (msgBuf->getMessageStream() == recvStreams_[j]) ) |
| { |
| // found a valid msg on receive queue |
| receiveQueue_.removeAt(i); |
| msgBuf->addCallback(recvStreams_[j]); |
| |
| // for the SeaMonster continue protocol we cannot |
| // delete the stream from recvStreams_ till |
| // end of the batch reply is sent. |
| if (this->castToSMConnection()) |
| { |
| InternalMsgHdrInfoStruct *hdr = |
| (InternalMsgHdrInfoStruct *) msgBuf->data(0); |
| ESPMessageTypeEnum streamType = |
| (ESPMessageTypeEnum) hdr->getType(); |
| |
| IpcBufferedMsgStream *str = NULL; |
| if (msgBuf->getMessageStream()) |
| str = msgBuf->getMessageStream()-> |
| castToIpcBufferedMsgStream(); |
| |
| if (str && str->getSMContinueProtocol()) |
| { |
| if ( streamType == IPC_MSG_SQLESP_DATA_REPLY && |
| hdr->getSMLastInBatch() ) |
| recvStreams_.removeAt(j); |
| } |
| else |
| recvStreams_.removeAt(j); |
| } |
| else |
| recvStreams_.removeAt(j); |
| |
| // increment reply seq number for the client side |
| replySeqNum_++; |
| return msgBuf; |
| } |
| } // for j |
| |
| // if we reach here it means the next msg on receive queue does |
| // not match any receiving streams. we must wait for message |
| // stream to ask for it. |
| break; |
| } |
| else |
| { |
| // the next msg on receive queue is not the expected reply. |
| // continue to the subsequent msg on receive queue. |
| continue; |
| } |
| } // for i |
| } |
| |
| return NULL; |
| } |
| |
| void IpcConnection::setFatalError(IpcMessageStreamBase *msgStream) |
| { |
| if (getState() != ERROR_STATE) |
| setState(ERROR_STATE); |
| |
| if (getErrorInfo() == 0) |
| setErrorInfo(-1); |
| |
| // receive queue may not be empty. if so invoke receive callback. |
| // example: |
| // GuaMsgConnectionToServer::setFatalError() invokes handleIOErrorForStream() |
| // that may have put message buffer on receive queue. |
| IpcMessageBuffer *receiveBuf; |
| while (receiveBuf = getNextReceiveQueueEntry()) |
| receiveBuf->callReceiveCallback(this); |
| |
| // - what if recvStreams_ is not empty? should we clean up I/Os on them? |
| } |
| |
| #ifdef IPC_INTEGRITY_CHECKING |
| |
| // methods that perform integrity checking on Ipc-related data structures |
| |
| void IpcConnection::checkIntegrity(NABoolean checkIfOrphan) |
| { |
| // If the parameter "checkIfOrphan" is true, we will do the orphan check; |
| // otherwise we will not. (The caller passes FALSE in contexts where it |
| // is valid to be an orphan, e.g. at the end of the IpcConnection destructor.) |
| |
| // if checking, assume the worst: this object is an orphan |
| isOrphaned_ = checkIfOrphan; |
| |
| environment_->checkIntegrity(); // traverse up to IpcEnvironment, do check |
| if (isOrphaned_) |
| { |
| cerr << "Found orphaned IpcConnection object " << (void *)this << "." << endl; |
| assert(!isOrphaned_); |
| } |
| } |
| |
| void IpcConnection::checkLocalIntegrity(void) |
| { |
| isOrphaned_ = FALSE; // ah, this object isn't an orphan after all |
| } |
| |
| #endif |
| |
| NABoolean IpcConnection::newClientConnection(IpcMessageBuffer *receivedBuffer) |
| { |
| InternalMsgHdrInfoStruct* msgHdr = |
| (InternalMsgHdrInfoStruct*)(receivedBuffer->data(0)); |
| if (msgHdr->getSeqNum() == 0 && msgHdr->getSockReplyTag() == PERSISTENT_OPEN_RECONNECT_CODE) |
| { |
| return TRUE; |
| } |
| else |
| return FALSE; |
| } |
| |
| |
| void IpcConnection::reportBadMessage() |
| { |
| const char option2 = '2'; |
| const char *envvar = getenv("ESP_PROPAGATE_ASSERT"); |
| if (envvar == NULL) |
| envvar = &option2; |
| if (getEnvironment()->getIpcServerType() == IPC_SQLESP_SERVER && |
| ((*envvar == '1' || *envvar == '2'))) |
| { |
| if (*envvar == '2') |
| { |
| NAProcessHandle phandle; |
| phandle.getmine(); |
| phandle.decompose(); |
| UInt32 seconds = (phandle.getPin() % 100) * 5; |
| if (seconds >= 250) |
| seconds -= 250; |
| sleep(seconds); |
| |
| genLinuxCorefile(NULL); |
| } |
| dumpAndStopOtherEnd(true, (*envvar == '2')); |
| if (*envvar == '2') |
| NAExit(0); // Already generated core file of myself |
| } |
| } |
| |
| // ----------------------------------------------------------------------- |
| // Methods for class IpcAllConnections |
| // ----------------------------------------------------------------------- |
| |
| #ifdef IPC_INTEGRITY_CHECKING |
| |
| void IpcAllConnections::checkIntegrity(void) |
| { |
| // try to traverse to IpcEnvironment... to get there, we need to go |
| // via IpcConnection... |
| CollIndex firstConn = 0; |
| |
| // find first IpcConnection * (if any) |
| while ((!used(firstConn)) && (firstConn < entries())) |
| firstConn++; |
| |
| if (firstConn < entries()) |
| { |
| // we have a connection |
| IpcConnection * c = usedEntry(firstConn); |
| |
| // traverse up to IpcEnvironment, do check |
| c->checkIntegrity(); |
| } |
| // else ... can't get there; just do nothing |
| } |
| |
| void IpcAllConnections::checkLocalIntegrity(void) |
| { |
| // check integrity of subarray |
| pendingIOs_->checkLocalIntegrity(); |
| |
| // check integrity of the set of connections |
| CollIndex conn = 0; |
| ULng32 entriesChecked = 0; |
| |
| // find first IpcConnection * (if any) |
| while (entriesChecked < entries()) |
| { |
| if (used(conn)) |
| { |
| at(conn)->checkLocalIntegrity(); |
| entriesChecked++; |
| } |
| conn++; |
| } |
| } |
| |
| #endif |
| |
| void IpcAllConnections::waitForAllSqlTableConnections(Int64 transid) |
| { |
| // wait for SqlTableConnections (with matched transid) to complete. |
| |
| IpcSetOfConnections x = getPendingIOs(); |
| |
| for (CollIndex i = 0; x.setToNext(i); i++) |
| { |
| if (x.element(i)->castToSqlTableConnection()) |
| { |
| if (x.element(i)->getSqlTableTransid() == transid) |
| x.element(i)->wait(IpcInfiniteTimeout); |
| } |
| } |
| } |
| |
| CollIndex IpcAllConnections::fillInListOfPendingPins(char *buff, |
| ULng32 buffSize, |
| CollIndex numOfPins) |
| { |
| CollIndex i; |
| CollIndex firstConnInd = 0; |
| CollIndex numOfPendings = MINOF(numOfPins, pendingIOs_->entries()); |
| char tempB[300]; |
| Int32 len; |
| |
| buff[0] = '\0'; // null terminate |
| for (i = 0; i < numOfPendings; i++) |
| { |
| if (NOT pendingIOs_->setToNext(firstConnInd)) |
| break; // should not happen |
| len = pendingIOs_->element(firstConnInd)-> |
| getOtherEnd().toAscii(tempB, 300); |
| if (len > 0 && (ULng32) len + 2 < buffSize) |
| { |
| if (buff[0] != '\0') // not the first entry |
| { |
| str_cat(buff, ", ", buff); |
| buffSize -= 2; |
| } |
| str_cat(buff, tempB, buff); |
| buffSize -= len; |
| } |
| else if (len > 0) |
| break; // no room left |
| firstConnInd++; |
| } |
| |
| return i; |
| } |
| |
| void IpcAllConnections::fillInListOfPendingPhandles(GuaProcessHandle *phandles, |
| CollIndex& numOfPhandles) |
| { |
| CollIndex firstConnInd = 0; |
| numOfPhandles = MINOF(numOfPhandles, pendingIOs_->entries()); |
| for (CollIndex i = 0; i < numOfPhandles; i++) |
| { |
| if (NOT pendingIOs_->setToNext(firstConnInd)) |
| { |
| // should not happen |
| numOfPhandles = i; |
| return; |
| } |
| |
| memcpy((char *)&phandles[i], |
| (char *)&pendingIOs_->element(firstConnInd)->getOtherEnd().getPhandle().phandle_, |
| sizeof(GuaProcessHandle)); |
| |
| firstConnInd++; |
| } |
| } |
| |
| // Methods for connection tracing |
| void IpcAllConnections::print() |
| { |
| char buf[10000]; |
| Int32 lineno = 0; |
| |
| while (printConnTrace(lineno, buf)) |
| { |
| printf("%s", buf); |
| lineno++; |
| } |
| } |
| |
| const char *ConnTraceDesc = "All IpcConnections and their states"; |
| |
| void IpcAllConnections::registTraceInfo(IpcEnvironment *env, ExeTraceInfo *ti) |
| { |
| if (env) |
| { |
| if (ti) |
| { |
| Int32 lineWidth = 50; // temp |
| void *regdTrace; |
| Int32 ret = ti->addTrace("IpcConnectionState", this, -1, 4, |
| this, getAnEntry, |
| NULL, |
| lineWidth, ConnTraceDesc, ®dTrace); |
| if (ret == 0) |
| { |
| // trace info added successfully, now add entry fields |
| ti->addTraceField(regdTrace, "Connection ", 0, |
| ExeTrace::TR_POINTER32); |
| ti->addTraceField(regdTrace, "Type ", 1, ExeTrace::TR_STRING); |
| ti->addTraceField(regdTrace, "OtherEnd ", 2, |
| ExeTrace::TR_STRING); |
| ti->addTraceField(regdTrace, "State ", 3, ExeTrace::TR_STRING); |
| traceRef_ = regdTrace; |
| } |
| } |
| } |
| } |
| |
| Int32 IpcAllConnections::printConnTrace(Int32 lineno, char *buf) |
| { |
| if (lineno == 0) |
| printEntry_ = 0; // first time to print all entries |
| |
| // find first IpcConnection * (if any) |
| while ((!used(printEntry_)) && (printEntry_ < entries())) |
| printEntry_++; |
| |
| if (printEntry_ >= entries()) |
| return 0; // no more connections |
| |
| IpcConnection *c = usedEntry(printEntry_++); // then advance to next |
| Int32 rv = 0; |
| if (c) |
| { |
| const char * stateName = "UNKNOWN"; |
| const char * eyeCatcher = "UNKN"; |
| Int32 cpu = 0, node, pin = 0; |
| SB_Int64_Type seqNum = -1; |
| |
| if ((IpcConnection::INITIAL <= c->getState()) && (c->getState() <= IpcConnection::CLOSED)) |
| stateName = IpcConnStateName[c->getState()]; |
| if ((char *)NULL != c->getEyeCatcher()) |
| eyeCatcher = c->getEyeCatcher(); |
| IpcNetworkDomain domain = c->getOtherEnd().getDomain(); |
| if (domain == IPC_DOM_GUA_PHANDLE) |
| { |
| GuaProcessHandle *otherEnd = (GuaProcessHandle *)&(c->getOtherEnd().getPhandle().phandle_); |
| if (otherEnd) |
| otherEnd->decompose(cpu, pin, node |
| , seqNum |
| ); |
| } |
| rv = sprintf(buf, "%.4d %8p %.4s %.3d,%.8d %" PRId64 " %s\n", |
| lineno, c, eyeCatcher, cpu, pin, seqNum, |
| stateName); |
| } |
| return rv; |
| } |
| |
| // ----------------------------------------------------------------------- |
| // Methods for class IpcSetOfConnections |
| // ----------------------------------------------------------------------- |
| |
| IpcSetOfConnections::IpcSetOfConnections(IpcAllConnections *superset, |
| CollHeap* hp, |
| NABoolean eventDriven, NABoolean esp) : |
| SUBARRAY(IpcConnection *)( (ARRAY(IpcConnection *) *) superset, hp), |
| cancelWait_(FALSE), allc_(superset), |
| eventDriven_(eventDriven), callCount_(0), pollCount_(0),esp_(esp), |
| waitCount_(0), ldoneCount_(0), lreqCount_(0), lsigCount_(0), |
| smCompletionCount_(0), timeoutCount_(0), activityPollCount_(0), |
| lastWaitStatus_(0), ipcAwaitioxEnabled_(TRUE) |
| {} |
| |
| IpcSetOfConnections::IpcSetOfConnections (const IpcSetOfConnections & orig, |
| CollHeap * h) |
| : SUBARRAY(IpcConnection *)(orig, h) |
| { |
| cancelWait_ = FALSE; |
| allc_ = orig.allc_; |
| eventDriven_ = orig.eventDriven_; |
| callCount_ = 0; |
| pollCount_ = 0; |
| esp_ = orig.esp_; |
| waitCount_ = 0; |
| ldoneCount_ = 0; |
| lreqCount_ = 0; |
| lsigCount_ = 0; |
| smCompletionCount_ = 0; |
| timeoutCount_ = 0; |
| activityPollCount_ = 0; |
| lastWaitStatus_ = 0; |
| ipcAwaitioxEnabled_ = FALSE; |
| memset((void *)&ipcAwaitiox_, 0, sizeof(IpcAwaitiox)); |
| } |
| |
| NABoolean IpcSetOfConnections::moreWaitsAnyConnection() |
| { |
| if (cancelWait_) |
| { // Break the wait loop for an asynchronous cancel request. |
| cancelWait_ = FALSE; |
| return FALSE; |
| } |
| |
| for (CollIndex i = 0; setToNext(i); i++) |
| { |
| if (element(i)->moreWaitsAllowed()) |
| return TRUE; |
| } |
| return FALSE; |
| } |
| |
| void IpcSetOfConnections::waitOnSMConnections(IpcTimeout timeout) |
| { |
| for (CollIndex i = 0; setToNext(i); i++) |
| { |
| IpcConnection *conn = element(i)->castToSMConnection(); |
| if (conn) |
| conn->wait(timeout); |
| } |
| } |
| |
| WaitReturnStatus IpcSetOfConnections::waitOnSet(IpcTimeout timeout, |
| NABoolean calledByESP, |
| NABoolean *timedout) |
| { |
| #define MAX_TOTALWAITTIME 2147483000 //soln 10-061230-1405 |
| if (timedout != NULL) |
| *timedout = FALSE; |
| NABoolean interruptRecvd = FALSE; |
| MXTRC_FUNC("IpcSetOfConnections::wait"); |
| MXTRC_1("timeout=%d\n", timeout); |
| // could use select UNIX call or AWAITIOX(-1) later $$$$ |
| |
| // for now, do one round with zero timeout, then let more rounds follow |
| // with exponentially increasing timeouts. Stop when an I/O completes. |
| // Make sure to check all other connections before returning after |
| // an unsucessful wait on a single connection with no timeout. |
| |
| // timeout increment (in 10 msec units), this value is the duration |
| // of the first wait cycle we do |
| const IpcTimeout toInc = 1; |
| |
| // number of times we just give up the time slice before we actually |
| // start to wait (with a timeout of toInc) |
| const Lng32 timeSlices = 3; |
| |
| IpcTimeout tout = 0; |
| IpcTimeout totalWaitTime = -1; |
| CollIndex firstConnInd, currentFirstConnInd; |
| IpcConnection *firstConnection = NULL; // For ESP it's conn to the Master |
| NABoolean somethingCompleted = FALSE; |
| ULng32 seqNo = 0; |
| IpcAllConnections *allc = NULL; |
| Lng32 currTimeSlices = timeSlices; |
| IpcEnvironment *env = NULL; |
| NABoolean ldoneConsumed = FALSE, activity = FALSE; |
| short waitFlag; |
| short status; |
| Int64 currentTime; |
| callCount_ += 1; |
| if (timeout == -2) |
| timeout = -1; |
| NABoolean isWaited; |
| Int32 isWaitedFactor = 20; |
| IpcTimeout totalOfTimeouts = 0; |
| NABoolean freeUnusedMemory = TRUE; |
| CliGlobals *cliGlobals = NULL; |
| NABoolean soloClient = FALSE; |
| NABoolean receivedSMEvent = FALSE; |
| ExSMGlobals *smGlobals = ExSMGlobals::GetExSMGlobals();; |
| ExSMReadyList *smReadyList = (smGlobals ? smGlobals->getReadyList() : NULL); |
| |
| // loop, exponentially increasing the timeout used to wait |
| // on the first connection of the set |
| while (NOT somethingCompleted) |
| { |
| if (totalWaitTime >= timeout AND timeout != IpcInfiniteTimeout) |
| { |
| if (timedout != NULL) |
| { |
| *timedout = TRUE; |
| } |
| break; |
| } |
| else |
| { |
| // get a hold of the first connection in the set, return if there is |
| // none (note that the set may change during the while loop) |
| firstConnInd = 0; |
| if (NOT setToNext(firstConnInd)) |
| { |
| if ( calledByESP ) |
| { |
| NAExit(0); // Stop this ESP! No point executing w/o connections. |
| } |
| // set is empty, return right away |
| assert(!ipcAwaitiox_.getCompleted()); // Shouldn't happen--return with AWAITIOX(-1) completion outstanding |
| return WAIT_OK; |
| } |
| else |
| // for an ESP, the first connection is always the master |
| firstConnection = element(firstConnInd); |
| |
| env = firstConnection->getEnvironment(); |
| cliGlobals = env->getCliGlobals(); |
| |
| assert(env->getAllConnections()->getNumPendingIOs() > 0 || |
| timeout != IpcInfiniteTimeout); |
| |
| static bool sv_solo_client_completion = false; |
| static char escc = '1'; |
| if (!sv_solo_client_completion) |
| { |
| sv_solo_client_completion = true; |
| char *sccEnvvar = getenv("IPC_SOLO_CLIENT_COMPLETION"); |
| if (sccEnvvar) |
| escc = *sccEnvvar; |
| } |
| if (escc == '1' && firstConnection->getFileNumForIOCompletion() == 1 && |
| env->getAllConnections()->entries() == 1 && !calledByESP |
| && timeout == -1) |
| soloClient = TRUE; |
| isWaited = eventDriven_ && timeout == -1 && env->getMaxPollingInterval() != 301; |
| pollCount_ += 1; // Increment the connections polled count |
| waitFlag = LDONE; |
| if (esp_) |
| waitFlag |= LREQ; |
| else if (env->breakEnabled() && !env->lsigConsumed()) |
| waitFlag |= LSIG; |
| status = -1; |
| |
| // On Linux if SeaMonster is enabled, set the LRABBIT bit in |
| // waitFlag |
| Int32 numSMConnections = env->getAllConnections()->getNumSMConnections(); |
| if (numSMConnections > 0) |
| waitFlag |= LRABBIT; |
| |
| // if this is the first time, remember the sequence number of |
| // completed I/Os so far, if that number changes this method will |
| // return |
| if (allc == NULL) |
| { |
| // get the completion sequence number so far (get the IPC |
| // environment from one of the connections in the set) |
| ldoneConsumed = env->ldoneConsumed(); |
| activity = env->isEvent(AEVENT); |
| allc = env->getAllConnections(); |
| allc->incrRecursionCount(); |
| // The sequence number at entry is captured so that it can later |
| // be used to determine if any messages have completed. If so, |
| // there is work for the scheduler to do and somethingCompleted |
| // is set to TRUE causing a timed or infinite waitOnSet to |
| // return. |
| seqNo = allc->getCompletionSeqenceNo(); |
| totalOfTimeouts = 0; |
| } |
| else if (isWaited && !env->ldoneConsumed() && !soloClient) |
| { |
| if (env->isEvent(AEVENT)) |
| { |
| activityPollCount_ += 1; |
| totalOfTimeouts = 0; |
| } |
| else |
| { |
| IpcTimeout waitInterval = (tout + 1) * isWaitedFactor; |
| if (waitInterval == 0) |
| lastWaitStatus_ = status = XWAIT0(waitFlag, waitInterval); |
| else |
| lastWaitStatus_ = status = XWAITNO0(waitFlag, waitInterval); |
| waitCount_ += 1; |
| |
| if (status & LDONE) |
| { |
| ldoneCount_ += 1; |
| ldoneConsumed = TRUE; |
| totalOfTimeouts = 0; |
| freeUnusedMemory = TRUE; |
| } |
| else if (status & LREQ) |
| { |
| lreqCount_ += 1; |
| totalOfTimeouts = 0; |
| freeUnusedMemory = TRUE; |
| } |
| else if (status & LSIG) |
| { |
| lsigCount_ += 1; |
| env->setLsigConsumed(TRUE); |
| totalOfTimeouts = 0; |
| freeUnusedMemory = TRUE; |
| } |
| else if (status & LRABBIT) |
| { |
| smCompletionCount_++; |
| totalOfTimeouts = 0; |
| freeUnusedMemory = TRUE; |
| |
| EXSM_TRACE(EXSM_TRACE_PROTOCOL, |
| "RECV LRABBIT wait count %" PRId64 |
| " sm count %" PRId64 " timeout %d", |
| (Int64) waitCount_, (Int64) smCompletionCount_, |
| (int) timeout); |
| } |
| else |
| { |
| totalOfTimeouts += waitInterval; |
| if (calledByESP) |
| { |
| if (allc->entries() == 1 || |
| // Persistent opens AND all connections are (obsolete) |
| // client connections AND ESP is confirmed to be idle |
| (env->getPersistentOpens() && |
| allc->entries() == env->getControlConnection()->castToGuaReceiveControlConnection()->getClientConnections()->entries() && |
| env->getIdleTimestamp() > 0)) |
| { |
| // master has released this esp fragment |
| if (env->getStopAfter() > 0) |
| { |
| assert(env->getIdleTimestamp() > 0); |
| currentTime = NA_JulianTimestamp(); |
| Int64 timeDiff = currentTime - env->getIdleTimestamp(); |
| if (timeDiff > ((Int64)env->getStopAfter() * 1000000)) |
| { |
| if (env->getLogEspIdleTimeout()) |
| { |
| /* |
| Coverage notes: to test this code in a dev |
| regression requires changing |
| $TRAF_HOME/etc/ms.env. However, it was |
| tested in stress test on |
| May 10, 2012. |
| */ |
| char myName[20]; |
| memset(myName, '\0', sizeof(myName)); |
| char buf[500]; |
| msg_mon_get_my_info(NULL, NULL, |
| myName, sizeof(myName),NULL,NULL,NULL,NULL); |
| str_sprintf(buf, |
| "ESP_IDLE_TIMEOUT causes %s to exit.", |
| myName); |
| SQLMXLoggingArea::logExecRtInfo(__FILE__, |
| __LINE__, buf, 0); |
| } |
| // stop esp if it has become idle and timed out |
| NAExit(0); |
| } |
| |
| } |
| } |
| else |
| { |
| if (env->getInactiveTimeout() > 0 && |
| env->getInactiveTimestamp() > 0) |
| { |
| currentTime = NA_JulianTimestamp(); |
| Int64 timeDiff = currentTime - env->getInactiveTimestamp(); |
| if (timeDiff > ((Int64)env->getInactiveTimeout() * 1000000)) |
| // stop esp if it has become inactive and timed out |
| NAExit(0); |
| } |
| } |
| } |
| timeoutCount_ += 1; |
| |
| } // XWAIT timed out |
| } // if (env->isEvent(AEVENT)) else |
| } // else if (isWaited && !env->ldoneConsumed() && !soloClient) |
| |
| |
| // ----------------------------------------------------------------- |
| // wait with timeout to on the first (and maybe only) connection |
| // ----------------------------------------------------------------- |
| |
| if (soloClient) |
| { |
| interruptRecvd = firstConnection->wait(timeout); |
| if (interruptRecvd) |
| { |
| allc->decrRecursionCount(); |
| return WAIT_INTERRUPT; |
| } |
| else if (seqNo != allc->getCompletionSeqenceNo()) |
| somethingCompleted = TRUE; |
| continue; |
| } |
| |
| if (currTimeSlices > 0 AND tout > 0 AND !isWaited) |
| { |
| // give up the time slice and reset the timeout to 0 instead |
| // of waiting with a timeout that is greater than 10 milliseconds |
| // (10 msec is a long time on a 300 MHz machine) |
| currTimeSlices--; |
| |
| // platform-dependent code to give up the time slice |
| // the Sleep() method on NT can be used to give up the processor |
| Sleep(0); |
| tout = 0; |
| } |
| |
| env->setLdoneConsumed(FALSE); // So that we know if it changed if we loop again |
| env->setEvent(FALSE, AEVENT); // So that we know if there was any "activity" |
| // which means to: a) an AWAITIOX completed, b) |
| // a MSG_ISDONE_ returned true, or c) something was |
| // started such as tryToStartNewIO called MSG_LINK_. |
| Int64 waitCalled = 0, waitReturned; |
| if (!isWaited && tout > 0) |
| waitCalled = NA_JulianTimestamp(); |
| |
| |
| NABoolean cycleThruConnections = TRUE; |
| NABoolean doIpcAwaitiox; |
| |
| if (env->getMaxPollingInterval() == 302 || XAWAITIOX_MINUS_ONE == FALSE) |
| { |
| doIpcAwaitiox = FALSE; |
| } |
| else |
| { |
| doIpcAwaitiox = |
| ipcAwaitioxEnabled_ && |
| (esp_ || env->getMasterFastCompletion()) && |
| eventDriven_ && |
| (isWaited || tout == 0); |
| } |
| |
| while (cycleThruConnections) |
| { |
| cycleThruConnections = FALSE; |
| interruptRecvd = FALSE; |
| currentFirstConnInd = firstConnInd; |
| |
| if (doIpcAwaitiox) |
| { |
| if (esp_) |
| { |
| currentFirstConnInd = 0; |
| if (setToNext(currentFirstConnInd) && |
| currentFirstConnInd != firstConnInd) |
| // Can happen for SSMP but should never happen for ESP |
| firstConnection = element(currentFirstConnInd); |
| } |
| |
| ipcAwaitiox_.DoAwaitiox(esp_ ? FALSE : TRUE); |
| |
| if (ipcAwaitiox_.getFileNum() != -1) |
| env->bawaitioxTrace(this, allc->getRecursionCount(), |
| currentFirstConnInd, firstConnection, |
| &ipcAwaitiox_); |
| |
| if (ipcAwaitiox_.getFileNum() == |
| firstConnection->getFileNumForIOCompletion()) |
| { |
| interruptRecvd = |
| firstConnection->wait(isWaited ? IpcImmediately : tout, |
| env->getEventConsumed(), |
| (ipcAwaitiox_.getFileNum() == -1 |
| ? NULL |
| : &ipcAwaitiox_)); |
| } |
| } // doIpcAwaitiox |
| |
| else |
| { |
| interruptRecvd = |
| firstConnection->wait(isWaited ? IpcImmediately : tout, |
| env->getEventConsumed()); |
| } |
| |
| if (env->ldoneConsumed()) |
| ldoneConsumed = TRUE; |
| if (env->isEvent(AEVENT)) |
| activity = TRUE; |
| |
| if (interruptRecvd) |
| { |
| env->setLdoneConsumed(ldoneConsumed); |
| env->setEvent(activity, AEVENT); |
| allc->decrRecursionCount(); |
| return WAIT_INTERRUPT; |
| } |
| |
| else if (!isWaited && !env->isEvent(AEVENT) && tout > 0 && |
| seqNo == allc->getCompletionSeqenceNo()) |
| { |
| // waitOnSet timeout assumes that the first connection will |
| // wait the specified interval if an interupt is not received |
| // and there was not a completion. The following code will |
| // delay if the first connection did not wait the specified |
| // interval. |
| if (firstConnection->getFileNumForIOCompletion() != 1) // Fix bug 2903 |
| { |
| // Don't delay for GuaConnectionToClient |
| // (fileNumForIOCompletion equals 1 ($RECEIVE)) |
| waitReturned = NA_JulianTimestamp(); |
| IpcTimeout delayTime = |
| tout - ((IpcTimeout)((waitReturned - waitCalled) / 10000)); |
| if (delayTime > 0) |
| { |
| timespec nanoDelayTime; |
| nanoDelayTime.tv_sec = delayTime / 100; |
| nanoDelayTime.tv_nsec = (delayTime % 100) * 10000000; |
| Int32 retVal = nanosleep(&nanoDelayTime, NULL); |
| } |
| } |
| } |
| |
| if (doIpcAwaitiox && ipcAwaitiox_.getFileNum() != -1 && |
| ipcAwaitiox_.getCompleted() == FALSE) |
| { |
| cycleThruConnections = TRUE; |
| continue; |
| } |
| |
| |
| // add up the time spent waiting |
| // (after about 280 days of waiting this would cause an overflow trap) |
| if (totalWaitTime == -1 || isWaited) |
| totalWaitTime = tout; |
| else |
| { |
| if ( totalWaitTime < MAX_TOTALWAITTIME) // soln 10-061230-1405 begin |
| totalWaitTime += tout; |
| else |
| totalWaitTime = -1; // soln 10-061230-1405 end |
| } |
| |
| // ----------------------------------------------------------------- |
| // wait with 0 timeout on all the other connections |
| // ----------------------------------------------------------------- |
| for (CollIndex i = currentFirstConnInd+1; setToNext(i); i++) |
| { |
| IpcConnection *element_i = element(i); |
| assert(element_i); |
| |
| if (doIpcAwaitiox) |
| { |
| if (ipcAwaitiox_.getFileNum() == element_i->getFileNumForIOCompletion()) |
| interruptRecvd = element_i->wait(IpcImmediately, env->getEventConsumed(), |
| (ipcAwaitiox_.getFileNum() == -1 ? |
| NULL : &ipcAwaitiox_)); |
| else |
| continue; |
| } |
| else |
| { |
| interruptRecvd = element_i->wait(IpcImmediately, env->getEventConsumed()); |
| } |
| |
| if (env->ldoneConsumed()) |
| ldoneConsumed =TRUE; |
| |
| if (interruptRecvd) |
| { |
| env->setLdoneConsumed(ldoneConsumed); |
| env->setEvent(activity, AEVENT); |
| allc->decrRecursionCount(); |
| return WAIT_INTERRUPT; |
| } |
| |
| if (doIpcAwaitiox && ipcAwaitiox_.getFileNum() != -1 && |
| ipcAwaitiox_.getCompleted() == FALSE) |
| { |
| cycleThruConnections = TRUE; |
| break; |
| } |
| |
| } // for (CollIndex i = currentFirstConnInd+1; setToNext(i); i++) |
| |
| if (ipcAwaitiox_.getCompleted()) |
| { |
| void *bufAddr; |
| Int32 count; |
| SB_Tag_Type tag; |
| CollIndex usedLength = allc->getUsedLength(); |
| for (CollIndex i = 0; i < usedLength ; i++) |
| { |
| if (allc->getUsage(i) != UNUSED_COLL_ENTRY) |
| { |
| IpcConnection *conn = allc->usedEntry(i); |
| if (conn->getState() == IpcConnection::ERROR_STATE && |
| conn->getFileNumForIOCompletion() == |
| ipcAwaitiox_.getFileNum()) |
| { |
| // Clear completed_ |
| ipcAwaitiox_.ActOnAwaitiox(&bufAddr, &count, &tag); |
| } |
| } |
| } |
| } |
| |
| } // while (cycleThruConnections) |
| |
| if ( allc_->getRecursionCount() == 1 ) // delete closed connections |
| { |
| CollIndex usedLength = allc->getUsedLength(); |
| for (CollIndex i = 0; |
| i < usedLength && allc->getDeleteCount() > 0; i++) |
| { |
| if (allc->getUsage(i) != UNUSED_COLL_ENTRY) |
| { |
| IpcConnection *conn = allc->usedEntry(i); |
| if (conn->getState() == IpcConnection::CLOSED) |
| { |
| delete conn; |
| allc->decrDeleteCount(); |
| } |
| } |
| } |
| } |
| |
| // Compare the completion sequence numbers to see whether any |
| // connections have completed. |
| if (seqNo != allc->getCompletionSeqenceNo()) |
| { |
| somethingCompleted = TRUE; |
| } |
| else |
| { |
| // increase the used timeout kind of exponentially by multiplying |
| // with 1.5 and then adding an increment (in 10 ms units). |
| // Max out at 3 seconds of waiting time. |
| tout = MINOF(env->getMaxPollingInterval(), ( tout + tout/2 + toInc )); |
| } |
| |
| // if we have received a partial message, reset the timeout |
| if (allc->getReceivedPartialMessage()) |
| { |
| tout = 0; |
| currTimeSlices = timeSlices; |
| allc->setReceivedPartialMessage(FALSE); |
| } |
| if (!moreWaitsAnyConnection()) |
| { |
| env->setLdoneConsumed(ldoneConsumed); |
| env->setEvent(activity, AEVENT); |
| allc->decrRecursionCount(); |
| return WAIT_OK; |
| } |
| |
| // Check the SeaMonster completed task queue if there is one, |
| // whether or not wait was called, and whether or not LRABBIT |
| // was received if wait was called. |
| // |
| // It pays to always check it because it's cheaper than figuring |
| // out whether or not to. |
| // |
| // Notes about the "while (firstTask)" loop below |
| // |
| // * getFirst() does not modify the ready list |
| // |
| // * The while loop is guaranteed to terminate because items in |
| // output queues are guaranteed to be removed until all are |
| // emptied and the completed task queue is emptied. |
| // |
| // * In the common case, firstTask will be different on each |
| // successive call to getFirst(). It could be the same on |
| // successive calls if the reader thread is dispatched between |
| // successive calls, and the task is placed back at the head |
| // of the queue when it is empty. |
| |
| if (smReadyList) |
| { |
| ExSMTask *firstTask = NULL; |
| while ((firstTask = smReadyList->getFirst()) != NULL) |
| firstTask->getSMConnection()->wait(0); |
| |
| if (seqNo != allc->getCompletionSeqenceNo()) |
| { |
| receivedSMEvent = TRUE; |
| somethingCompleted = TRUE; |
| EXSM_TRACE(EXSM_TRACE_PROTOCOL, "Done processing SM connections"); |
| continue; |
| } |
| } |
| |
| } // if (totalWaitTime >= timeout AND timeout != IpcInfiniteTimeout) else |
| } // while (NOT somethingCompleted) |
| |
| if (!receivedSMEvent) |
| { |
| env->setLdoneConsumed(ldoneConsumed); |
| env->setEvent(activity, AEVENT); |
| } |
| |
| allc->decrRecursionCount(); |
| return WAIT_OK; |
| } |
| |
| #ifdef IPC_INTEGRITY_CHECKING |
| |
| void IpcSetOfConnections::checkIntegrity(void) |
| { |
| isOrphaned_ = TRUE; // assume the worst: this object is an orphan |
| |
| // try to traverse to IpcEnvironment via IpcAllConnections and do check... |
| allc_->checkIntegrity(); |
| |
| if ((isOrphaned_) && |
| (entries() > 0)) // entries > 0 ==> traverse to IpcEnvironment should have succeeded |
| { |
| cerr << "Found orphaned IpcSetOfConnections object." << endl; |
| // assert(!isOrphaned_); well, it might not really be orphaned; |
| // it may be we can't get to its associated ExRtFragTable... |
| |
| // check this object's integrity now, since we didn't traverse to it |
| checkLocalIntegrity(); |
| } |
| } |
| |
| void IpcSetOfConnections::checkLocalIntegrity(void) |
| { |
| isOrphaned_ = FALSE; |
| |
| CollIndex conn = 0; |
| |
| while (setToNext(conn)) |
| { |
| |
| // check to see if the corresponding element in the superset is used |
| if (!allc_->used(conn)) |
| { |
| cerr << "Found IpcConnection in subarray that is not in IpcAllConnections." << endl; |
| assert(allc_->used(conn)); |
| } |
| conn++; |
| } |
| } |
| |
| #endif |
| |
| // ----------------------------------------------------------------------- |
| // Methods for class IpcControlConnection |
| // ----------------------------------------------------------------------- |
| |
| IpcControlConnection::~IpcControlConnection() {} |
| |
| SockControlConnection * IpcControlConnection::castToSockControlConnection() |
| { |
| return NULL; |
| } |
| |
| GuaReceiveControlConnection * |
| IpcControlConnection::castToGuaReceiveControlConnection() |
| { |
| return NULL; |
| } |
| |
| // ----------------------------------------------------------------------- |
| // Methods for class InternalMsgHdrInfoStruct |
| // ----------------------------------------------------------------------- |
| |
| /////////////////////////////////////////////////////////////////////////////// |
| // general constructor |
| InternalMsgHdrInfoStruct::InternalMsgHdrInfoStruct( |
| IpcMessageObjType msgType, |
| IpcMessageObjVersion version) |
| : IpcMessageObj(msgType,version), |
| totalLength_(0), |
| alignment_(IpcMyAlignment), |
| flags_(0), |
| format_(0), |
| sockReplyTag_(0), |
| eyeCatcher_(Release1MessageEyeCatcher), |
| seqNum_(0), |
| msgStreamId_(0) |
| { } |
| |
| /////////////////////////////////////////////////////////////////////////////// |
| // constructor used to perform copyless receive. unpacks objects in place. |
| InternalMsgHdrInfoStruct::InternalMsgHdrInfoStruct( |
| IpcBufferedMsgStream* msgStream) |
| : IpcMessageObj(msgStream) |
| { |
| // IpcBufferedMsgStream parm used to differentiate from default constructor |
| if (getEndianness() != IpcMyEndianness) |
| { |
| swapFourBytes(totalLength_); |
| swapTwoBytes(alignment_); |
| swapTwoBytes(format_); |
| swapTwoBytes(sockReplyTag_); |
| swapFourBytes(eyeCatcher_); |
| swapFourBytes(seqNum_); |
| assert(0); // Need swapEightBytes() to swap msgStreamId_! |
| setEndianness(IpcMyEndianness); |
| } |
| } |
| |
| IpcMessageObjSize InternalMsgHdrInfoStruct::packedLength() |
| { |
| return (IpcMessageObjSize) sizeof(*this); |
| } |
| |
| IpcMessageObjSize InternalMsgHdrInfoStruct::packObjIntoMessage( |
| IpcMessageBufferPtr buffer) |
| { |
| IpcMessageObjSize result = IpcMessageObj::packObjIntoMessage(buffer); |
| |
| // we know that the packed representation has the same layout as the unpacked |
| // representation, but it needs to be in big-endian format |
| |
| return result; |
| } |
| |
| void InternalMsgHdrInfoStruct::unpackObj(IpcMessageObjType objType, |
| IpcMessageObjVersion objVersion, |
| NABoolean sameEndianness, |
| IpcMessageObjSize objSize, |
| IpcConstMessageBufferPtr buffer) |
| { |
| assert(objSize == sizeof(*this)); |
| |
| // the header always arrives in big endian format, so we should |
| // see the same endianness if and only if this is a big-endian machine |
| |
| IpcMessageObj::unpackObj(objType, |
| objVersion, |
| sameEndianness, |
| objSize, |
| buffer); |
| } |
| |
| // ----------------------------------------------------------------------- |
| // Methods for class IpcMessageBuffer |
| // ----------------------------------------------------------------------- |
| |
| // Private constructor. Only called by public methods such as |
| // allocate(), createBuffer(), copy(), copyFromOffset(). |
| IpcMessageBuffer::IpcMessageBuffer(CollHeap *heap, |
| IpcMessageObjSize maxLen, |
| IpcMessageObjSize msgLen, |
| IpcMessageStreamBase *msg, |
| short flags, |
| short replyTag, |
| IpcMessageObjSize maxReplyLength, |
| Int64 transid) |
| : InternalMessageBufferHeader(heap, |
| maxLen, |
| msgLen, |
| msg, |
| replyTag, |
| maxReplyLength, |
| transid, |
| flags) |
| { |
| } |
| |
| IpcMessageBuffer *IpcMessageBuffer::allocate(IpcMessageObjSize maxLen, |
| IpcMessageStreamBase *msg, |
| CollHeap *heap, |
| short flags) |
| { |
| return new (maxLen, heap, TRUE) |
| IpcMessageBuffer(heap, |
| maxLen, |
| 0, |
| msg, |
| flags, |
| GuaInvalidReplyTag, |
| 0, // maxReplyLength |
| -1); // transid |
| } |
| |
| IpcMessageBuffer *IpcMessageBuffer::createBuffer(IpcEnvironment *env, |
| IpcMessageObjSize newMaxLen, |
| NABoolean failureIsFatal) |
| { |
| // default is to make a copy of the same length |
| if (newMaxLen == 0) |
| newMaxLen = maxLength_; |
| |
| CollHeap *heap = (env ? env->getHeap() : NULL); |
| |
| IpcMessageBuffer *result = new (newMaxLen, heap, failureIsFatal) |
| IpcMessageBuffer(heap, |
| newMaxLen, |
| newMaxLen, |
| message_, |
| flags_, |
| replyTag_, |
| maxReplyLength_, |
| transid_); |
| return result; |
| } |
| |
| IpcMessageBuffer *IpcMessageBuffer::copy(IpcEnvironment *env, |
| IpcMessageObjSize newMaxLen, |
| NABoolean failureIsFatal) |
| { |
| // default is to make a copy of the same length |
| if (newMaxLen == 0) |
| newMaxLen = maxLength_; |
| else |
| assert(newMaxLen >= msgLength_); // data must fit in new copy |
| |
| CollHeap *heap = (env ? env->getHeap() : NULL); |
| |
| IpcMessageBuffer *result = new (newMaxLen, heap, failureIsFatal) |
| IpcMessageBuffer(heap, |
| newMaxLen, |
| msgLength_, |
| message_, |
| flags_, |
| replyTag_, |
| maxReplyLength_, |
| transid_); |
| |
| if (result) |
| { |
| str_cpy_all((char *) result->data(0), |
| (const char *) data(0), |
| (Lng32) MINOF(msgLength_,newMaxLen)); |
| |
| // the copy gets the reply tag, if there is any |
| setReplyTag(GuaInvalidReplyTag); |
| setMaxReplyLength(0); |
| } |
| |
| return result; |
| } |
| |
| IpcMessageBuffer *IpcMessageBuffer::copyFromOffset(IpcEnvironment *env, |
| IpcMessageObjSize newMaxLen, |
| IpcMessageObjSize offset, |
| NABoolean failureIsFatal) |
| { |
| assert(msgLength_ > offset); // offset must be before the end |
| |
| CollHeap *heap = (env ? env->getHeap() : NULL); |
| |
| IpcMessageBuffer *result = new (newMaxLen, heap, failureIsFatal) |
| IpcMessageBuffer(heap, |
| newMaxLen, |
| (Lng32) MINOF(msgLength_ |
| ,newMaxLen), |
| message_, |
| flags_, |
| replyTag_, |
| maxReplyLength_, |
| transid_); |
| if (result) |
| { |
| str_cpy_all((char *) result->data(0), |
| (const char *) data(offset), |
| (Lng32) MINOF(msgLength_,newMaxLen)); |
| |
| // the copy gets the reply tag, if there is any |
| setReplyTag(GuaInvalidReplyTag); |
| setMaxReplyLength(0); |
| } |
| |
| return result; |
| } |
| |
| IpcMessageBuffer *IpcMessageBuffer::resize(IpcEnvironment *env, |
| IpcMessageObjSize newMaxLen) |
| { |
| // NOTE: other users of the buffer will retain access to the old buffer |
| IpcMessageBuffer *result = copy(env,newMaxLen); |
| |
| // this will delete this buffer, unless there are other references to it |
| decrRefCount(); |
| return result; |
| } |
| |
| IpcMessageRefCount IpcMessageBuffer::decrRefCount() |
| { |
| IpcMessageRefCount result = refCount_--; |
| |
| if (refCount_ == 0) |
| { |
| if (chunkLockCount_) |
| { |
| chunkLockCount_->deallocate(); |
| NADELETEBASIC(chunkLockCount_, heap_); |
| } |
| |
| // The ref count has dropped to zero. Use the correct method to |
| // deallocate space for this buffer. |
| if (heap_) |
| { |
| // This object was allocated on an NAMemory heap |
| heap_->deallocateMemory(this); |
| } |
| else |
| { |
| // This object was allocated on the C++ heap |
| delete this; |
| } |
| } |
| else if (refCount_ < 0) |
| { |
| // negative refcounts aren't allowed |
| assert(refCount_ > 0); |
| } |
| |
| return result; |
| } |
| |
| CollIndex IpcMessageBuffer::initLockCount(IpcMessageObjSize maxIOSize) |
| { |
| if (!chunkLockCount_) |
| { |
| chunkLockCount_ = new(heap_) NAArray<CollIndex>(heap_,2); |
| chunkLockCount_->setHeap(heap_); |
| |
| if (!chunkLockCount_) |
| { |
| return(0); |
| } |
| maxIOSize_ = maxIOSize; |
| |
| // determine if last chunk is less than the max IO transmission size |
| bool partialChunk = msgLength_ && (msgLength_ % maxIOSize_) ? 1 : 0; |
| CollIndex chunkCount = partialChunk ? 1 : 0; |
| chunkCount += (msgLength_ >= maxIOSize_) ? (msgLength_ / maxIOSize_) : 0; |
| for ( CollIndex i = 0; i < chunkCount ; i++ ) |
| { |
| chunkLockCount_->insertAt(i, 0); |
| } |
| return(chunkCount); |
| } |
| |
| return(chunkLockCount_->entries()); |
| } |
| |
| CollIndex IpcMessageBuffer::incrLockCount(IpcMessageObjSize offset) |
| { |
| assert(chunkLockCount_ != NULL); |
| CollIndex chunkIndex = (CollIndex) (offset ? offset/maxIOSize_ : 0); |
| CollIndex lockCount = chunkLockCount_->at(chunkIndex); |
| lockCount += 1; |
| chunkLockCount_->insertAt(chunkIndex, lockCount); |
| return(lockCount); |
| } |
| |
| CollIndex IpcMessageBuffer::decrLockCount(IpcMessageObjSize offset) |
| { |
| assert(chunkLockCount_ != NULL); |
| CollIndex chunkIndex = (CollIndex) (offset ? offset/maxIOSize_ : 0); |
| CollIndex lockCount = chunkLockCount_->at(chunkIndex); |
| lockCount -= 1; |
| chunkLockCount_->insertAt(chunkIndex, lockCount); |
| return(lockCount); |
| } |
| |
| CollIndex IpcMessageBuffer::getLockCount(IpcMessageObjSize offset) |
| { |
| assert(chunkLockCount_ != NULL); |
| CollIndex chunkIndex = (CollIndex) (offset ? offset/maxIOSize_ : 0); |
| CollIndex lockCount = chunkLockCount_->at(chunkIndex); |
| return(lockCount); |
| } |
| |
| void IpcMessageBuffer::alignOffset(IpcMessageObjSize &offset) |
| { |
| ULng32 offs = (ULng32) offset; // just for safety |
| |
| // clear the last 3 bits of the address to round it down to |
| // the next address that is divisible by 8 |
| ULng32 roundedDown = offs LAND 0xFFFFFFF8; |
| |
| // if that didn't change anything we're done, the offset was |
| // aligned already |
| if (offs != roundedDown) |
| { |
| // else we have to round up and add the filler |
| offset = roundedDown + 8; |
| } |
| } |
| |
| void IpcMessageBuffer::callSendCallback(IpcConnection *conn) |
| { |
| // increment the wraparound counter for completed high-level I/Os |
| conn->getEnvironment()->getAllConnections()->bumpCompletionCount(); |
| // set connection indicating source of IO |
| connection_ = conn; |
| // call the callback without passing a message buffer to it |
| message_->internalActOnSend(conn); |
| } |
| |
| void IpcMessageBuffer::callReceiveCallback(IpcConnection *conn) |
| { |
| // increment the wraparound counter for completed high-level I/Os |
| conn->getEnvironment()->getAllConnections()->bumpCompletionCount(); |
| // set connection indicating source of IO |
| connection_ = conn; |
| // call the callback |
| message_->internalActOnReceive(this, conn); |
| } |
| |
| void * IpcMessageBuffer::operator new(size_t headerSize, |
| IpcMessageObjSize bufferLength, |
| CollHeap *heap, |
| NABoolean failureIsFatal) |
| { |
| // If heap is not NULL, allocate the object on that heap. Otherwise |
| // allocate on the C++ heap. |
| if (heap) |
| return heap->allocateMemory(headerSize + (size_t) bufferLength, |
| failureIsFatal); |
| else |
| return ::operator new(headerSize + (size_t) bufferLength); |
| } |
| |
| NABoolean IpcMessageBuffer::verifyBackbone() |
| { |
| return verifyIpcMessageBufferBackbone(*this); |
| } |
| |
| // |
| // Global function to verify IpcMessageBuffer backbone. This function |
| // needs to look at private data inside IpcMessageObj |
| // instances. Rather than making the entire IpcMessageBuffer class a |
| // "friend" of IpcMessageObj, we just make this function a friend of |
| // the IpcMessageObj and IpcMessageBuffer classes. |
| // |
| NABoolean verifyIpcMessageBufferBackbone(IpcMessageBuffer &b) |
| { |
| // Make sure buffer is at least as large as the header object |
| if (b.msgLength_ < sizeof(InternalMsgHdrInfoStruct)) |
| { |
| ipcIntegrityCheckEpilogue(FALSE); |
| return FALSE; |
| } |
| |
| // Now we can reference fields in the header |
| InternalMsgHdrInfoStruct *header = (InternalMsgHdrInfoStruct *) b.data(0); |
| IpcMessageObjSize maxChainLen = MINOF(b.maxLength_, header->totalLength_); |
| |
| // Loop over all objects in the message buffer, including the header |
| // object, and do some sanity checks on them. We will look at all |
| // length and next fields to make sure the chain of objects does not |
| // extend beyond maxChainLen bytes. |
| IpcMessageObj *obj = header; |
| IpcMessageObjSize currOffset = 0; |
| while (obj != NULL) |
| { |
| // Make sure buffer is at least as large as an IpcMessageObj |
| if (currOffset + sizeof(IpcMessageObj) > maxChainLen) |
| { |
| ipcIntegrityCheckEpilogue(FALSE); |
| return FALSE; |
| } |
| |
| // Now we can reference IpcMessageObj fields |
| if (obj->s_.refCount_ != 1) |
| { |
| ipcIntegrityCheckEpilogue(FALSE); |
| return FALSE; |
| } |
| |
| // All objects except the header must have a NULL virtual function |
| // pointer. The header may not because the receiving connection |
| // may have already done an in-place unpack of the header and that |
| // operation has the side-effect of setting the vptr. |
| if (obj != header && obj->getMyVPtr() != NULL) |
| { |
| ipcIntegrityCheckEpilogue(FALSE); |
| return FALSE; |
| } |
| |
| IpcMessageObjSize next = (IpcMessageObjSize)((Long) obj->s_.next_); |
| |
| if (obj->s_.objLength_ < sizeof(IpcMessageObj) || |
| currOffset + obj->s_.objLength_ > maxChainLen || |
| (next != 0 && next < obj->s_.objLength_)) |
| { |
| ipcIntegrityCheckEpilogue(FALSE); |
| return FALSE; |
| } |
| |
| // Advance to next object in the chain |
| currOffset += next; |
| obj = obj->getNextFromOffset(); |
| } |
| |
| return TRUE; |
| } |
| |
| // ----------------------------------------------------------------------- |
| // Methods for class IpcMessageStreamBase |
| // ----------------------------------------------------------------------- |
| IpcMessageStream * IpcMessageStreamBase::castToIpcMessageStream() |
| { |
| return NULL; |
| } |
| |
| IpcBufferedMsgStream * |
| IpcMessageStreamBase::castToIpcBufferedMsgStream() |
| { |
| return NULL; |
| } |
| |
| void IpcMessageStreamBase::addToCompletedList() |
| { |
| environment_->addToCompletedMessages(this); |
| } |
| |
| // ----------------------------------------------------------------------- |
| // Methods for class IpcMessageStream |
| // ----------------------------------------------------------------------- |
| |
| IpcMessageStream::IpcMessageStream( |
| IpcEnvironment *env, |
| IpcMessageObjType msgType, |
| IpcMessageObjVersion version, |
| IpcMessageObjSize fixedMsgBufferLength, |
| NABoolean shareMessageObjects) : |
| IpcMessageStreamBase(env), |
| h_(msgType,version), |
| recipients_(env->getAllConnections(),env->getHeap()), |
| activeIOs_(env->getAllConnections(),env->getHeap()) |
| { |
| #ifndef USE_SB_NEW_RI |
| assert(fixedMsgBufferLength <= IOSIZEMAX); |
| #else |
| assert(fixedMsgBufferLength <= env->getGuaMaxMsgIOSize()); |
| #endif |
| |
| msgBuffer_ = NULL; |
| fixedBufLen_ = fixedMsgBufferLength; |
| maxReplyLength_ = 0; |
| shareObjects_ = shareMessageObjects; |
| objectsInBuffer_ = FALSE; |
| state_ = EMPTY; |
| tail_ = first(); |
| current_ = NULL; |
| errorInfo_ = 0; |
| numOfSendCallbacks_ = 0; |
| corruptMessage_ = false; |
| isOrphaned_ = FALSE; |
| } |
| |
| IpcMessageStream::~IpcMessageStream() |
| { |
| // the destructor for the message header, which will |
| // perform a check of the reference count, is called implicitly |
| |
| // deallocate the existing message buffer (despite the name of the proc) |
| allocateMessageBuffer(0); |
| } |
| |
| IpcMessageStream *IpcMessageStream::castToIpcMessageStream() |
| { |
| return this; |
| } |
| |
| IpcMessageStream & IpcMessageStream::operator << (IpcMessageObj & toAppend) |
| { |
| #ifdef IPC_INTEGRITY_CHECKING |
| // checkIntegrity(); message may be legitimately orphaned here |
| #endif |
| |
| // check message state, this is the process of composing the message |
| assert(state_ == EMPTY OR state_ == COMPOSING); |
| state_ = COMPOSING; |
| |
| if (shareObjects_) |
| { |
| // if we add a new object to the message, it ought not be in use |
| // by some other message (only the owner can have a refcount) |
| // (this may be extended later) |
| assert(toAppend.s_.next_ == NULL AND toAppend.s_.refCount_ == 1); |
| |
| // increment the reference count of the appended shared object |
| toAppend.incrRefCount(); |
| |
| // add the object to the linked list of IpcMessageObj objects hanging |
| // off the IpcMessageStream, don't pack the object into the |
| // message buffer yet!! |
| tail_->s_.next_ = &toAppend; |
| tail_ = &toAppend; |
| |
| } |
| else |
| { |
| // copy the object into the message buffer without altering its |
| // reference count (toAppend can go away after this method returns) |
| |
| // size of toAppend when packed into the message buffer |
| IpcMessageObjSize thisObjSize = toAppend.packedLength(); |
| |
| // start offset of the packed version of toAppend in the message buffer |
| IpcMessageObjSize startOffset; |
| |
| // we need a message buffer of at least this size |
| IpcMessageObjSize neededMsgSize; |
| |
| // number of bytes used during the pack process |
| IpcMessageObjSize packedBytes; |
| |
| // a pointer to the packed object cast as an IpcMessageObj |
| IpcMessageObj *bufObj; |
| |
| // we know that the object's header uses at least some space |
| assert(thisObjSize >= sizeof(IpcMessageObj)); |
| |
| // check whether there are previous objects in the buffer |
| if (tail_ == first()) |
| { |
| // no, this is the first object to be added to the message |
| // (after the header, which gets packed during send(), but |
| // whose space gets allocated right here) |
| h_.totalLength_ = h_.packedLength(); |
| IpcMessageBuffer::alignOffset(h_.totalLength_); |
| startOffset = h_.totalLength_; |
| } |
| else |
| { |
| startOffset = h_.totalLength_; |
| } |
| |
| neededMsgSize = startOffset + thisObjSize; |
| |
| // check whether we need to allocate a new or bigger message buffer |
| if (msgBuffer_ == NULL OR |
| msgBuffer_->getBufferLength() < neededMsgSize) |
| { |
| // current buffer is too small, make room and maybe add a little |
| // reserve so this doesn't happen again next time |
| |
| if (fixedBufLen_ > 0) |
| neededMsgSize = MAXOF(neededMsgSize,fixedBufLen_); |
| else |
| neededMsgSize = MAXOF(neededMsgSize,DefaultInitialMessageBufSize); |
| |
| resizeMessageBuffer(neededMsgSize); |
| } |
| |
| // now pack the object into the message buffer |
| |
| packedBytes = toAppend.packObjIntoMessage(msgBuffer_->data(startOffset)); |
| bufObj = (IpcMessageObj *) (msgBuffer_->data(startOffset)); |
| |
| // Do some sanity checks, did the object really use the length it |
| // promised to use? Did the length field in the message get set |
| // correctly? |
| assert(thisObjSize == packedBytes); |
| assert(bufObj->s_.objLength_ == packedBytes); |
| |
| // advance the buffer pointer to the next place where we could |
| // put an object (the next object wants to sit on an 8 byte boundary) |
| // NOTE: we don't increase bufObj->s_.objLength_ when we align. |
| h_.totalLength_ += packedBytes; |
| IpcMessageBuffer::alignOffset(h_.totalLength_); |
| |
| // prepare the packed object to be shipped off and set the link from |
| // the previous object |
| bufObj->setMyVPtr(NULL); |
| bufObj->s_.refCount_ = 1; |
| bufObj->s_.next_ = NULL; |
| tail_->s_.next_ = (IpcMessageObj *) ((long)startOffset); |
| tail_ = bufObj; |
| } |
| return *this; |
| } |
| |
| NABoolean IpcMessageStream::extractNextObj(IpcMessageObj &toRetrieve, |
| NABoolean checkObjects) |
| { |
| #ifdef IPC_INTEGRITY_CHECKING |
| // checkIntegrity(); message may be legitimately orphaned here |
| #endif |
| |
| // check whether it is ok to extract data now |
| assert(state_ == RECEIVED OR state_ == EXTRACTING); |
| state_ = EXTRACTING; |
| |
| // we assume that the user does something like the following to ensure |
| // that "toRetrieve" is actually an object of the correct (derived) class: |
| // |
| // IpcMessageStream m; |
| // ... |
| // m.receive(); |
| // while (m.moreObjects()) |
| // { |
| // IpcMessageObjType t; |
| // |
| // t = m.getNextObjType(); |
| // // if different versions and lengths are possible, the user |
| // // would have to retrieve those as well |
| // switch (t) |
| // { |
| // case xyz: // case for object type of class MyOwnMessageObject |
| // { |
| // MyOwnMessageObject myOwnObj; |
| // |
| // // myOwnObj should have the correct type and version |
| // // (if not automatically set by constructor, do it manually) |
| // m >> myOwnObj; |
| // ... |
| // } |
| // } |
| // } |
| // |
| |
| // the user should have predicted the object type and version correctly |
| assert(current_ AND |
| toRetrieve.s_.objType_ == current_->s_.objType_ AND |
| toRetrieve.isActualVersionOK(current_->s_.objVersion_)); |
| |
| // current_ points to the next object to be retrieved |
| IpcMessageObj *packedObject = current_; |
| |
| // need to turn around bytes if this object is in the other endianness |
| if (h_.getEndianness() != IpcMyEndianness) |
| packedObject->turnByteOrder(); |
| |
| // The object pointed to by <current> will get copied onto <toRetrieve>. |
| // To avoid trouble, set the refcount of <current> to the refcount of |
| // toRetrieve while unpacking the object. This way, even if the user's |
| // unpackObj function simply copies the refcount, we still haven't |
| // destroyed it. Also, this does not alter <toRetrieve>'s refcount |
| // without the user seeing it. Copying a new content onto <toRetrieve> |
| // is transparent to those people who have a pointer to it, therefore |
| // it shouldn't change it's refcount. |
| IpcMessageRefCount saveRefCount = packedObject->s_.refCount_; |
| |
| // advance the current object pointer |
| // (check whether the object uses an offset or a real pointer) |
| if (objectsInBuffer_) |
| current_ = current_->getNextFromOffset(); |
| else |
| current_ = current_->s_.next_; |
| |
| // for now, all refcounts of objects in messages should be 1 |
| assert(saveRefCount == 1); |
| |
| packedObject->s_.refCount_ = toRetrieve.s_.refCount_; |
| |
| // If the caller requested an integrity check on the packed object |
| // then we call a checkObj() method before unpackObj(). |
| NABoolean result = TRUE; |
| if (checkObjects) |
| { |
| result = toRetrieve.checkObj(packedObject->s_.objType_, |
| packedObject->s_.objVersion_, |
| h_.getEndianness() == IpcMyEndianness, |
| packedObject->s_.objLength_, |
| (IpcConstMessageBufferPtr) packedObject); |
| } |
| |
| // now call the user-defined unpack function |
| // NOTE: this may call this method recursively!!! |
| if (result) |
| { |
| toRetrieve.unpackObj(packedObject->s_.objType_, |
| packedObject->s_.objVersion_, |
| h_.getEndianness() == IpcMyEndianness, |
| packedObject->s_.objLength_, |
| (IpcConstMessageBufferPtr) packedObject); |
| } |
| |
| // restore the refcount in the message |
| packedObject->s_.refCount_ = saveRefCount; |
| |
| return result; |
| } |
| |
| IpcMessageStream & IpcMessageStream::operator >> (IpcMessageObj * &toRetrieve) |
| { |
| #ifdef IPC_INTEGRITY_CHECKING |
| // checkIntegrity(); message may be legitimately orphaned here |
| #endif |
| |
| // check whether it is ok to extract data now |
| assert(state_ == RECEIVED OR state_ == EXTRACTING); |
| state_ = EXTRACTING; |
| |
| // current_ points to the next object to be retrieved |
| if (current_ != NULL) |
| { |
| // return the pointer to the current object |
| toRetrieve = current_; |
| |
| // indicate that the current object is now also referenced |
| // by the caller of this method (caller has to release the |
| // object later) |
| toRetrieve->incrRefCount(); |
| |
| // advance the current object pointer |
| // (check whether the object uses an offset or a real pointer) |
| if (objectsInBuffer_) |
| current_ = current_->getNextFromOffset(); |
| else |
| current_ = current_->s_.next_; |
| } |
| else |
| { |
| // error, no more objects are available |
| toRetrieve = NULL; |
| } |
| |
| return *this; |
| } |
| |
| void IpcMessageStream::clearAllObjects() |
| { |
| #ifdef IPC_INTEGRITY_CHECKING |
| checkIntegrity(); |
| #endif |
| |
| IpcMessageObj *obj; |
| IpcMessageObj *next; |
| |
| assert(state_ != SENDING AND state_ != RECEIVING); |
| assert(activeIOs_.entries() == 0); |
| |
| if (objectsInBuffer_) |
| { |
| clearMessageBufferContents(); |
| } |
| else |
| { |
| // if the message contains a linked list of shared objects then |
| // release all objects except the header which is hardwired in |
| // and unlink the objects from the list as you go |
| obj = first()->s_.next_; |
| first()->s_.next_ = NULL; |
| while (obj != NULL) |
| { |
| next = obj->s_.next_; |
| obj->s_.next_ = NULL; |
| obj->decrRefCount(); |
| obj = next; |
| } |
| } |
| |
| objectsInBuffer_ = FALSE; |
| current_ = NULL; |
| tail_ = first(); |
| |
| state_ = EMPTY; |
| } |
| |
| void IpcMessageStream::addRecipient(IpcConnection *recipient) |
| { |
| #ifdef IPC_INTEGRITY_CHECKING |
| // checkIntegrity(); don't check here since msg. will be legitimately orphaned |
| #endif |
| |
| recipients_ += recipient->getId(); |
| |
| #ifdef IPC_INTEGRITY_CHECKING |
| // checkIntegrity(); don't check here since msg. will be legitimately orphaned |
| #endif |
| } |
| |
| void IpcMessageStream::addRecipients(const IpcSetOfConnections &/*recipients*/) |
| { |
| ABORT("not implemented yet"); |
| // recipients_ += recipients; |
| } |
| |
| void IpcMessageStream::deleteRecipient(IpcConnection *recipient) |
| { |
| #ifdef IPC_INTEGRITY_CHECKING |
| checkIntegrity(); |
| #endif |
| |
| recipients_ -= recipient->getId(); |
| |
| #ifdef IPC_INTEGRITY_CHECKING |
| checkIntegrity(); |
| #endif |
| } |
| |
| void IpcMessageStream::deleteAllRecipients() |
| { |
| recipients_.clear(); |
| } |
| |
| void IpcMessageStream::giveMessageTo(IpcMessageStream &other, |
| IpcConnection *connection) |
| { |
| // --------------------------------------------------------------------- |
| // This method takes another message stream, implicitly performs a |
| // receive() call on it, connects it to the connection, and passes |
| // the current message of "this" message stream on to the "other" |
| // message stream, as if it had been received by the connection. |
| // IpcMessageStream::giveMessageTo() can therefore be used by |
| // "router" message streams that listen to a connection that is shared |
| // among multiple message streams, check the contents of the message, |
| // and then dispatch the message to the appropriate message stream. |
| // Only the "router" message stream should call receive() explicitly. |
| // --------------------------------------------------------------------- |
| assert(connection); |
| |
| // the other message must be in a state that allows receiving |
| assert(other.getState() == EMPTY OR |
| other.getState() == SENT); |
| |
| // get the message buffer that we want to give to the other message stream |
| IpcMessageBuffer *bufferToMove = msgBuffer_; |
| |
| // detach myself from the message buffer (NOTE: somebody may have already |
| // looked at the message contents, so my pointers may point to the |
| // message buffer) |
| msgBuffer_ = NULL; |
| objectsInBuffer_ = FALSE; |
| current_ = NULL; |
| tail_ = first(); |
| |
| // this is now an empty message, ready for either send or receive |
| state_ = EMPTY; |
| |
| // set the other's recipients and active IOs to the connection |
| IpcConnectionId id = connection->getId(); |
| other.recipients_ += id; |
| other.activeIOs_ += id; |
| |
| // call the callback function for the other message stream, |
| // just as a connection would do |
| bufferToMove->addCallback(&other); |
| bufferToMove->callReceiveCallback(connection); |
| } |
| |
| |
| /////////////////////////////////////////////////////////////////////////////// |
| // give receive message to class IpcBufferedMsgStream |
| void IpcMessageStream::giveReceiveMsgTo(IpcBufferedMsgStream& msgStream) |
| { |
| msgStream.addInputBuffer(msgBuffer_); |
| |
| msgBuffer_ = NULL; |
| objectsInBuffer_ = FALSE; |
| current_ = NULL; |
| tail_ = first(); |
| state_ = EMPTY; |
| |
| msgStream.actOnReceive(NULL); // trigger receiving message stream |
| } |
| |
| // Uncomment the next line to trace IpcMessageObj. Works better if LOG_IPC |
| // is also defined in IpcGuardian.cpp |
| // #define LOG_IPC_MSG_OBJ |
| |
| void IpcMessageStream::send(NABoolean waited, Int64 transid) |
| { |
| #ifdef IPC_INTEGRITY_CHECKING |
| checkIntegrity(); |
| #endif |
| |
| MXTRC_FUNC("IpcMessageStream::send"); |
| IpcMessageObjSize thisObjLength; |
| IpcMessageObj *obj; |
| |
| // check message state |
| assert(state_ == COMPOSING); |
| |
| if (shareObjects_) |
| { |
| // ----------------------------------------------------------------- |
| // Objects are shared between caller and IPC layer. Therefore they |
| // were not copied by operator <<. Now that we know all of them, |
| // copy them into the message buffer. |
| // ----------------------------------------------------------------- |
| |
| // ----------------------------------------------------------------- |
| // Compute the total length of the message. |
| // --------------------------------------------------------------------- |
| |
| // loop through all objects and ask them how much space they need |
| |
| h_.totalLength_ = 0; |
| obj = first(); |
| while (obj != NULL) |
| { |
| // ask the object how long it will be when packed into the buffer |
| thisObjLength = obj->packedLength(); |
| |
| // we know that the object's header uses at least some space |
| assert(thisObjLength >= sizeof(IpcMessageObj)); |
| |
| // store that info in the object's length field (we check later |
| // when the packing is done whether the method lied or not) |
| obj->s_.objLength_ = thisObjLength; |
| |
| // find the offset of the next potential object by aligning the |
| // address and add the needed filler space to the object length |
| // NOTE: we even align the end of the message |
| h_.totalLength_ += thisObjLength; |
| IpcMessageBuffer::alignOffset(h_.totalLength_); |
| |
| // next, please |
| obj = obj->s_.next_; |
| } |
| |
| // ----------------------------------------------------------------- |
| // allocate an empty message buffer, if the user specified a fixed |
| // length buffer size then use that (allows replies via REPLYX to |
| // have maxBufferLen_ bytes without switching to the multi-chunk |
| // protocol) |
| // ----------------------------------------------------------------- |
| allocateMessageBuffer(MAXOF(h_.totalLength_,fixedBufLen_)); |
| |
| // ----------------------------------------------------------------- |
| // pack the message object into a buffer |
| // ----------------------------------------------------------------- |
| IpcMessageObjSize msgDataLen = 0; |
| IpcMessageObj *bufObj = (IpcMessageObj *) msgBuffer_->data(); |
| IpcMessageObj *nextBufObj = NULL; |
| |
| obj = first(); |
| while (obj != NULL) |
| { |
| #ifdef LOG_IPC_MSG_OBJ |
| cerr << obj->getType() << ", "; |
| #endif |
| MXTRC_1("type=%d\n", obj->getType()); |
| // let the user code pack the actual object into the buffer |
| thisObjLength = obj->packObjIntoMessage( |
| (IpcMessageBufferPtr) bufObj); |
| msgDataLen += thisObjLength; |
| |
| // now massage some of the header fields of the object in |
| // the buffer: |
| // |
| // - make sure the object length is the one we previously |
| // calculated by calling obj->packedLength(); |
| // - make sure we correctly set the obj. length in the message |
| // - wipe out the virtual function pointer |
| // - set the refcount of the object in the message to 1 |
| // - find the end of the object by aligning the end pointer |
| // - set the next_ pointer to be a relative offset to the |
| // next message |
| // |
| assert(obj->s_.objLength_ == thisObjLength); |
| #ifdef NA_BIG_ENDIAN |
| // NOTE: byte length *may* have been converted to big-endian format |
| // which means that s_.objLength_ is in the wrong format |
| assert(obj->s_.objLength_ == bufObj->s_.objLength_); |
| #endif |
| bufObj->setMyVPtr(NULL); |
| bufObj->s_.refCount_ = 1; |
| |
| // NOTE: see also IpcMessageObj::packDependentObjIntoMessage() |
| // when making changes to the above code |
| |
| // advance the buffer pointer to the next place where we could |
| // put an object (the next object wants to sit on an 8 byte boundary) |
| // NOTE: we don't increase bufObj->s_.objLength_ when we align. |
| IpcMessageBuffer::alignOffset(msgDataLen); |
| |
| // take care of the object's next_ pointer that is now embedded |
| // in the message buffer |
| if (obj->s_.next_ != NULL) |
| { |
| nextBufObj = (IpcMessageObj *) msgBuffer_->data(msgDataLen); |
| // store the place where the next object will be located |
| bufObj->s_.next_ = nextBufObj; |
| // now convert it to an offset relative to the start of obj |
| bufObj->convertNextToOffset(); |
| } |
| else |
| { |
| nextBufObj = NULL; |
| } |
| |
| // dissolve the linked list that hooked the objects together |
| IpcMessageObj *next = obj->s_.next_; |
| obj->s_.next_ = NULL; |
| |
| // the object got copied into the message, decrement its |
| // reference count except for the first object, which is |
| // embedded in the message object |
| if (obj != first()) |
| obj->decrRefCount(); |
| |
| // Allow testing of corrupt message handling on the other side. |
| if (corruptMessage_ && (next == NULL)) |
| bufObj->s_.refCount_ = 666; |
| |
| // advance to the next object |
| obj = next; |
| bufObj = nextBufObj; |
| } |
| |
| assert(h_.totalLength_ == msgDataLen); |
| msgBuffer_->setMessageLength(msgDataLen); |
| |
| #ifdef LOG_IPC_MSG_OBJ |
| if (msgDataLen) // i.e., if any objs |
| cerr << endl; |
| #endif |
| |
| } // shared objects |
| else |
| { |
| // ----------------------------------------------------------------- |
| // Objects were not shared between caller and IPC layer and are |
| // therefore already copied into the message buffer. |
| // ----------------------------------------------------------------- |
| |
| // the header is not packed into the buffer yet but there is |
| // space allocated for it at the beginning of the buffer |
| first()->packObjIntoMessage(msgBuffer_->data()); |
| msgBuffer_->setMessageLength(h_.totalLength_); |
| msgBuffer_->addCallback(this); |
| } // objects are not shared |
| |
| // --------------------------------------------------------------------- |
| // now send the message buffer to each of the connections specified |
| // setup transid in message buffer. |
| // --------------------------------------------------------------------- |
| state_ = SENDING; |
| |
| msgBuffer_->setTransid (transid); |
| |
| // can't have a previous operation uncompleted, since that would |
| // confuse the activeIOs_ set. |
| assert(activeIOs_.entries() == 0); |
| assert(numOfSendCallbacks_ == 0); |
| |
| // Determine the total number of IOs first, since some of the sends may |
| // complete immediately and we don't want a situation where a callback |
| // is called with no outstanding IOs while we aren't done yet with |
| // our send loop below. Also cache the pointer to the message buffer, |
| // since it may be modified by callbacks that initiate a receive operation. |
| IpcSetOfConnections sendConnections(recipients_); |
| IpcMessageBuffer *msgBuffer = msgBuffer_; |
| CollIndex numRecipients = sendConnections.entries(); |
| |
| activeIOs_ += sendConnections; |
| msgBuffer_ = NULL; |
| |
| // By calling IpcConnection::send() we give up the message buffer. |
| // So, in order to do multiple sends, get enough ref counts before |
| // starting - i.e., one extra ref count for every connection beyond the first |
| // one. Note that this helps the connections to find out whether |
| // they have exclusive access to the message buffer or not. If we would |
| // let each connection increment its own refcount this wouldn't work. |
| for (CollIndex extraRecipients = 1; |
| extraRecipients < numRecipients; |
| extraRecipients++) |
| msgBuffer->incrRefCount(); |
| |
| // --------------------------------------------------------------------- |
| // The actual IpcConnection::send() call(s) |
| // --------------------------------------------------------------------- |
| for (IpcConnectionId i = 0; sendConnections.setToNext(i); i++) |
| sendConnections.element(i)->send(msgBuffer); |
| |
| // if the send is in wait mode, then we must wait until the send completes |
| // on all connections. if the send is no-wait, then don't wait here but |
| // simply return, and the caller shall issue wait on all connections. |
| // otherwise we could see the stack piles up quickly due to recursive |
| // send/receive calls. |
| if (waited) |
| { |
| NABoolean interruptRecvd = waitOnMsgStream(IpcInfiniteTimeout); |
| if (interruptRecvd) |
| state_ = BREAK_RECEIVED; |
| } |
| |
| #ifdef IPC_INTEGRITY_CHECKING |
| checkIntegrity(); |
| #endif |
| } |
| |
| void IpcMessageStream::receive(NABoolean waited) |
| { |
| #ifdef IPC_INTEGRITY_CHECKING |
| checkIntegrity(); |
| #endif |
| |
| MXTRC_FUNC("IpcMessageStream::receive"); |
| |
| // check state |
| assert(state_ == EMPTY OR state_ == SENT |
| OR state_ == ERROR_STATE); // After error, succesfully sent messages |
| // must have their receive callback registered. |
| // See code in ExMasterEspMessage::actOnSend |
| // which ensures that recipients_ contains only |
| // connections of successfully sent messages |
| // when this method is called. |
| |
| // reset errors |
| errorInfo_ = 0; |
| |
| // for now, can't call receive when there are I/Os outstanding |
| assert(activeIOs_.entries() == 0); |
| |
| // --------------------------------------------------------------------- |
| // initiate a receive operation for connection <from> or for all |
| // associated connections if <from> is NULL |
| // --------------------------------------------------------------------- |
| |
| // determine outstanding IOs first, since some of the receives may |
| // complete immediately and we don't want a situation where a callback |
| // is called with no outstanding IOs while we aren't done yet with |
| // our for loop below |
| IpcSetOfConnections recConnections(recipients_); |
| activeIOs_ = recConnections; |
| |
| for (IpcConnectionId i = 0; recConnections.setToNext(i); i++) |
| { |
| state_ = RECEIVING; |
| |
| // Be careful: some I/Os may complete immediately which may change |
| // the state of this message stream!!! |
| // NOTE: the receive callbacks are not supposed to send any data |
| // until the last receive has completed, since this would otherwise |
| // interfere with this loop. |
| recConnections.element(i)->receive(this); |
| } |
| |
| // if the receive is in wait mode, then we must wait until the receive |
| // completes on all connections. if the receive is no-wait, then don't wait |
| // here but simply return, and the caller shall issue wait on all |
| // connections. otherwise we could see the stack piles up quickly due to |
| // recursive send/receive calls. |
| if (waited) |
| { |
| NABoolean interruptRecvd = waitOnMsgStream(IpcInfiniteTimeout); |
| if (interruptRecvd) |
| state_ = BREAK_RECEIVED; |
| } |
| |
| #ifdef IPC_INTEGRITY_CHECKING |
| checkIntegrity(); |
| #endif |
| } |
| |
| WaitReturnStatus IpcMessageStream::waitOnMsgStream(IpcTimeout timeout) |
| { |
| #ifdef IPC_INTEGRITY_CHECKING |
| checkIntegrity(); |
| #endif |
| |
| NABoolean interruptRecvd = FALSE; |
| |
| interruptRecvd = activeIOs_.waitOnSet(timeout); |
| MXTRC_FUNC("IpcMessageStream::wait"); |
| |
| if (interruptRecvd) |
| { |
| state_ = BREAK_RECEIVED; |
| return WAIT_INTERRUPT; |
| } |
| |
| if (timeout == IpcInfiniteTimeout) |
| { |
| // This means the user wants to wait until all messages in this |
| // stream have been sent or have been received, so loop until |
| // outstandingIOs_ is empty. |
| while (activeIOs_.entries() > 0 && |
| activeIOs_.moreWaitsAnyConnection()) |
| { |
| interruptRecvd = activeIOs_.waitOnSet(timeout); |
| if (interruptRecvd) |
| { |
| state_ = BREAK_RECEIVED; |
| return WAIT_INTERRUPT; |
| } |
| } |
| } |
| |
| #ifdef IPC_INTEGRITY_CHECKING |
| checkIntegrity(); |
| #endif |
| return WAIT_OK; |
| } |
| |
| void IpcMessageStream::actOnSend(IpcConnection *) |
| { |
| // the default callback implementation does nothing |
| } |
| |
| void IpcMessageStream::actOnSendAllComplete() |
| { |
| // the default callback implementation does nothing |
| } |
| |
| void IpcMessageStream::actOnReceive(IpcConnection *) |
| { |
| // the default callback implementation does nothing |
| } |
| |
| void IpcMessageStream::actOnReceiveAllComplete() |
| { |
| // the default callback implementation does nothing |
| } |
| |
| void IpcMessageStream::clearMessageBufferContents() |
| { |
| // if a message buffer exists and has data in it, |
| // it must contain objects with a reference count of 1 only |
| // (the "1" count is the use of the object in the message buffer |
| // and means that we can delete the buffer) |
| if (objectsInBuffer_) |
| { |
| IpcMessageObj *obj = first()->s_.next_; |
| |
| first()->s_.next_ = NULL; |
| while (obj != NULL) |
| { |
| assert(obj->getRefCount() == 1); |
| // objects in the message buffer are always using offsets |
| obj = obj->getNextFromOffset(); |
| } |
| |
| // give up on the objects in the message buffer |
| objectsInBuffer_ = FALSE; |
| tail_ = first(); |
| current_ = NULL; |
| h_.totalLength_ = 0; |
| } |
| } |
| |
| void IpcMessageStream::allocateMessageBuffer(IpcMessageObjSize len) |
| { |
| IpcMessageBuffer *toDelete = NULL; |
| |
| // do we need to delete an old or unusable message buffer |
| if (msgBuffer_ != NULL AND |
| (len == 0 OR len > msgBuffer_->getBufferLength())) |
| { |
| clearMessageBufferContents(); |
| toDelete = msgBuffer_; |
| msgBuffer_ = NULL; |
| } |
| |
| if (msgBuffer_ == NULL AND len > 0) |
| { |
| CollHeap *heap = (environment_ ? environment_->getHeap() : NULL); |
| |
| // we need to allocate a new message buffer |
| msgBuffer_ = IpcMessageBuffer::allocate(len, |
| this, |
| heap, |
| 0); |
| if (toDelete != NULL) |
| { |
| // move reply tag and max reply len from the old one |
| msgBuffer_->setReplyTag(toDelete->getReplyTag()); |
| msgBuffer_->setMaxReplyLength(toDelete->getMaxReplyLength()); |
| } |
| } |
| else if (msgBuffer_ != NULL) |
| { |
| // reuse same message buffer, but change the callback |
| msgBuffer_->addCallback(this); |
| msgBuffer_->setMessageLength(0); |
| } |
| |
| if (toDelete != NULL) |
| toDelete->decrRefCount(); |
| } |
| |
| void IpcMessageStream::resizeMessageBuffer(IpcMessageObjSize newMaxLen) |
| { |
| // make sure we have a message buffer of the desired length, but |
| // don't mess with the contents |
| if (msgBuffer_ != NULL) |
| msgBuffer_ = msgBuffer_->resize(environment_,newMaxLen); |
| else |
| allocateMessageBuffer(newMaxLen); |
| } |
| |
| void IpcMessageStream::internalActOnSend(IpcConnection *connection) |
| { |
| #ifdef IPC_INTEGRITY_CHECKING |
| checkIntegrity(); |
| #endif |
| |
| assert(activeIOs_.contains(connection->getId())); |
| |
| // stream can only remember the first reported error from connection |
| if (state_ != ERROR_STATE && connection->getErrorInfo() != 0) |
| { |
| state_ = IpcMessageStream::ERROR_STATE; |
| errorInfo_ = connection->getErrorInfo(); |
| } |
| |
| actOnSend(connection); |
| |
| numOfSendCallbacks_++; |
| if (numOfSendCallbacks_ == activeIOs_.entries()) |
| { |
| state_ = SENT; |
| |
| activeIOs_.clear(); |
| numOfSendCallbacks_ = 0; |
| |
| actOnSendAllComplete(); |
| } |
| |
| #ifdef IPC_INTEGRITY_CHECKING |
| checkIntegrity(); |
| #endif |
| } |
| |
| void IpcMessageStream::internalActOnReceive(IpcMessageBuffer *buffer, |
| IpcConnection *connection) |
| { |
| #ifdef IPC_INTEGRITY_CHECKING |
| checkIntegrity(); |
| #endif |
| |
| // deallocate the existing message buffer (despite the name of the proc) |
| allocateMessageBuffer(0); |
| |
| assert(buffer); |
| msgBuffer_ = buffer; |
| |
| assert(connection); |
| IpcConnectionId id = connection->getId(); |
| assert(activeIOs_.contains(id)); |
| activeIOs_ -= id; |
| |
| if (connection->getErrorInfo() != 0) |
| { |
| // stream can only remember the first reported error from connection |
| if (state_ != ERROR_STATE) |
| { |
| if (connection->breakReceived()) |
| state_ = BREAK_RECEIVED; |
| else |
| state_ = ERROR_STATE; |
| errorInfo_ = connection->getErrorInfo(); |
| } |
| } |
| else |
| { |
| // Set the state to RECEIVED. Even if there are more active IOs, |
| // allow the user to retrieve results. Be careful at the next state |
| // change: set it to EMPTY if no more receives are expected, set it back |
| // to RECEIVING if there are more outstanding IOs. |
| state_ = RECEIVED; |
| |
| // unpack the message header right away |
| InternalMsgHdrInfoStruct *justReceived = |
| (InternalMsgHdrInfoStruct *) msgBuffer_->data(); |
| |
| // make sure the OS gave us a message with the correct length |
| if ((msgBuffer_->getMessageLength() >= |
| sizeof(InternalMsgHdrInfoStruct)) && |
| (msgBuffer_->getMessageLength() == |
| justReceived->totalLength_)) |
| { |
| h_.unpackObj(justReceived->getType(), |
| justReceived->getVersion(), |
| TRUE, // we've already taken care of endianness |
| justReceived->s_.objLength_, |
| msgBuffer_->data()); |
| } |
| else |
| { |
| connection->dumpAndStopOtherEnd(true, false); |
| assert(msgBuffer_->getMessageLength() >= |
| sizeof(InternalMsgHdrInfoStruct) AND |
| msgBuffer_->getMessageLength() == |
| justReceived->totalLength_); |
| } |
| |
| // for now... |
| if ((h_.alignment_ != IpcMyAlignment) || |
| (h_.getEndianness() != IpcMyEndianness)) |
| connection->dumpAndStopOtherEnd(true, false); |
| |
| assert(h_.alignment_ == IpcMyAlignment AND |
| h_.getEndianness() == IpcMyEndianness); |
| |
| // the next pointer in the object's copy of the header is an |
| // actual pointer |
| h_.s_.next_ = justReceived->getNextFromOffset(); |
| |
| // indicate that we now have a linked list of objects in |
| // the message that are all stored in the message buffer and |
| // that all use offsets rather than pointers |
| // (Exception: a copy of the first object is outside the buffer and it |
| // uses a real pointer to the next object) |
| objectsInBuffer_ = TRUE; |
| |
| // check refcount that came in the buffer |
| assert(h_.s_.refCount_ == 1); |
| |
| IpcMessageObj *obj = first(); |
| |
| obj = obj->s_.next_; |
| |
| current_ = obj; // the first non-header object in the message |
| tail_ = NULL; // don't use tail in received messages |
| |
| // loop over all objects in the message buffer and do |
| // some sanity checks on them |
| while (obj != NULL) |
| { |
| bool badMessage = false; |
| if (obj->getRefCount() != 1) |
| badMessage = true; |
| if ((obj->s_.next_ != NULL) && |
| ((ULong)obj->s_.next_ < obj->s_.objLength_)) |
| badMessage = true; |
| if (obj->getMyVPtr() != NULL) |
| badMessage = true; |
| |
| if (badMessage) |
| connection->reportBadMessage(); |
| |
| assert(obj->getRefCount() == 1); |
| |
| assert(obj->s_.next_ == NULL OR |
| (IpcMessageObjSize )((Long) obj->s_.next_) >= obj->s_.objLength_); |
| |
| assert(obj->getMyVPtr() == NULL); |
| |
| obj = obj->getNextFromOffset(); |
| } |
| } |
| |
| // call user's callback |
| actOnReceive(connection); |
| |
| if (activeIOs_.entries() == 0) |
| actOnReceiveAllComplete(); |
| |
| #ifdef IPC_INTEGRITY_CHECKING |
| checkIntegrity(); |
| #endif |
| } |
| |
| ExMasterEspMessage * IpcMessageStream::castToExMasterEspMessage(void) |
| { |
| // virtual method giving safe cast to ExMasterEspMessage |
| return NULL; |
| } |
| |
| // abort any outstanding I/Os on this stream |
| void IpcMessageStream::abandonPendingIOs() |
| { |
| // Part of the fix for soln 10-070108-1544. The statement that |
| // owns this stream has encountered a fatal error and is prone |
| // to deadlock if Statement::releaseTransaction waits forever |
| // for the release trans/work message to complete. |
| // One problem with this fix is that it puts the IpcConnections |
| // into the error state, so that the ESPs will be stopped. And |
| // if multiple statements share the same IpcConnection then the |
| // second statement can inherit the error. |
| |
| // |
| // soln 10-080625-4107. below is an example: |
| // the message stream contains the fixup msg from master to 4 esps. all 4 |
| // esps have gone away. the fixup msg is multi-chunk (more than 4 chunks |
| // in my test). 2 connections (GMCS) finished send successfully, without |
| // receiving the ipc error yet due to the timing of the subsequent nowait |
| // calls. the send call backs were called on those 2 connections and as a |
| // result they were removed from stream's activeIOs_ list. |
| // |
| // the other 2 connections are still in the middle of multi-chunk send |
| // and thus send callbacks have not been invoked on them. note in this |
| // case, the stream has only two active I/Os but still has 4 recipient |
| // connections. |
| // |
| // now we come here because the upper layer found dead esps and decides |
| // to clean up all I/Os on stream (see Statement::releaseTransaction()). |
| // the for loop below iterates thru stream's activeIOs_ list and invokes |
| // setFatalError() on the latter 2 connections. as a result all I/Os on |
| // those 2 connections will be cleared and they will be removed from the |
| // stream's activeIOs_ list. however, because send never finished on those |
| // 2 connections, we still need to invoke send callbacks on them during |
| // cleanup process (see GuaMsgConnectionToServer::handleIOErrorForEntry()). |
| // after we invoke the send callback on the last active connection, there |
| // will be no active I/O on the stream and IpcMessageStream::receive() |
| // will be invoked (see ExMasterEspMessage::actOnSend()). the receive |
| // will add the 2 former connections, who finished send successfully, back |
| // to stream's activeIOs_ list. so after the first invocation of the for |
| // loop, the stream still has 2 recipient connections and 2 active I/Os. |
| // |
| // because of the above use case, we must go thru the for loop again and |
| // invoke setFatalError() on the 2 former connections that were inactive |
| // before the for loop but became active after the first execution of the |
| // for loop. therefore I'm adding a while loop outside the for loop to |
| // make sure all I/Os on the stream are cleaned up properly. |
| // |
| while (hasIOPending()) |
| { |
| for (CollIndex i = 0; activeIOs_.setToNext(i); i++) |
| { |
| IpcConnection * c = activeIOs_.element(i); |
| NABoolean useGuaIpc = TRUE; |
| if (useGuaIpc) |
| { |
| if (c->castToGuaConnectionToServer()) |
| c->castToGuaConnectionToServer()->setFatalError(this); |
| } |
| } // for |
| } // while |
| } |
| |
| #ifdef IPC_INTEGRITY_CHECKING |
| |
| // methods that perform integrity checking on Ipc-related data structures |
| |
| void IpcMessageStream::checkIntegrity(void) |
| { |
| isOrphaned_ = TRUE; // assume the worst: this object is an orphan |
| environment_->checkIntegrity(); // traverse up to IpcEnvironment, do check |
| if ((isOrphaned_) && (castToExMasterEspMessage())) |
| { |
| cerr << "Found orphaned ExMasterEspMessage object " << (void *)this << "." << |
| endl; |
| checkLocalIntegrity(); // didn't get here; check our integrity now |
| } |
| } |
| |
| void IpcMessageStream::checkLocalIntegrity(void) |
| { |
| isOrphaned_ = FALSE; |
| activeIOs_.checkLocalIntegrity(); |
| } |
| |
| #endif |
| |
| // ---------------------------------------------------------------------------- |
| // IpcBufferedMsgStream |
| // ---------------------------------------------------------------------------- |
| |
| /////////////////////////////////////////////////////////////////////////////// |
| // constructor |
| IpcBufferedMsgStream::IpcBufferedMsgStream(IpcEnvironment *env, |
| IpcMessageType msgType, |
| IpcMessageObjVersion version, |
| Lng32 inUseBufferLimit, |
| IpcMessageObjSize bufferSize) |
| : IpcMessageStreamBase(env), |
| msgType_(msgType), |
| msgVersion_(version), |
| bufferSize_(bufferSize), |
| inUseBufferLimit_(inUseBufferLimit), |
| garbageCollectLimit_(0), |
| errorInfo_(0), |
| receiveMsgComplete_(FALSE), |
| sendMsgBuf_(NULL), |
| sendMsgHdr_(NULL), |
| sendMsgObj_(NULL), |
| receiveMsgBufI_(0), |
| receiveMsgBuf_(NULL), |
| receiveMsgHdr_(NULL), |
| receiveMsgObj_(NULL), |
| sendBufList_(env->getHeap()), |
| receiveBufList_(env->getHeap()), |
| inBufList_(env->getHeap()), |
| outBufList_(env->getHeap()), |
| inUseBufList_(env->getHeap()), |
| replyTagBufList_(env->getHeap()), |
| smContinueProtocol_(FALSE) |
| { |
| } |
| |
| IpcBufferedMsgStream::~IpcBufferedMsgStream() |
| { |
| releaseBuffers(); |
| |
| IpcMessageBuffer *msgBuf = NULL; |
| CollIndex i = 0; |
| |
| for (i = 0; i < inBufList_.entries(); i++) |
| { |
| msgBuf = inBufList_[i]; |
| msgBuf->decrRefCount(); |
| } |
| |
| for (i = 0; i < receiveBufList_.entries(); i++) |
| { |
| msgBuf = receiveBufList_[i]; |
| msgBuf->decrRefCount(); |
| } |
| } |
| |
| IpcBufferedMsgStream * |
| IpcBufferedMsgStream::castToIpcBufferedMsgStream() |
| { |
| return this; |
| } |
| |
| /////////////////////////////////////////////////////////////////////////////// |
| // get next receive message from input queue. |
| NABoolean IpcBufferedMsgStream::getNextReceiveMsg(IpcMessageObjType& msgType) |
| { |
| if (receiveMsgComplete_) |
| { |
| if (receiveMsgBufI_ < receiveBufList_.entries()) |
| { // current receive message not completely unpacked, return type |
| |
| msgType = receiveMsgHdr_->getType(); |
| return (TRUE); |
| } |
| // release current receive message to inuse pool and advance to next. |
| // If a user msg object was unpacked inplace and did not implement the |
| // virtual method msgObjIsFree() then its persistence will be guaranteed |
| // until now! |
| IpcMessageBuffer* msgBuf; |
| while (receiveBufList_.getFirst(msgBuf)) |
| { // move all buffers for current receive message to inuse pool |
| inUseBufList_.insert(msgBuf); |
| } |
| receiveMsgBuf_ = NULL; |
| receiveMsgObj_ = receiveMsgHdr_ = NULL; |
| receiveMsgComplete_ = FALSE; |
| } |
| |
| if (inBufList_.entries()) |
| { |
| if (receiveBufList_.entries() == 0) |
| { // get first buffer of message |
| inBufList_.getFirst(receiveMsgBuf_); |
| receiveBufList_.insert(receiveMsgBuf_); |
| receiveMsgBufI_ = 0; |
| |
| // get message header (already unpacked by Guardian IpcConnection) |
| receiveMsgHdr_ = (InternalMsgHdrInfoStruct*)(receiveMsgBuf_->data(0)); |
| if (receiveMsgHdr_->isLastMsgBuf()) |
| { // single buffer message, proceed |
| receiveMsgComplete_ = TRUE; |
| // unpack 1st user msg object's base class (new should handle NULL) |
| IpcMessageObj *msgHeap = receiveMsgHdr_->getNextFromOffset(); |
| if (!msgHeap) |
| receiveMsgObj_ = NULL; |
| else |
| receiveMsgObj_ = new(msgHeap) IpcMessageObj(this); |
| if (receiveMsgObj_ == NULL) |
| { // recursively call to advance past empty message buffers |
| receiveMsgBufI_++; |
| |
| NABoolean result = getNextReceiveMsg(msgType); |
| return result; |
| } |
| |
| msgType = receiveMsgHdr_->getType(); |
| return (TRUE); |
| } |
| } |
| |
| // Try and extract remaining buffers associated with the same message from |
| // the in queue. The buffers must originate from the same connection OR the |
| // same local message stream and must have matching message types. A TRUE |
| // return code will be returned only after all buffers comprising the |
| // message have arrived. |
| |
| for (CollIndex i = 0; i < inBufList_.entries(); ) |
| { // get additional buffers of multi-buffer message |
| IpcMessageBuffer* msgBuf = inBufList_[i]; |
| if ((msgBuf->getConnection() == receiveMsgBuf_->getConnection()) && |
| (msgBuf->getMessageStream() == receiveMsgBuf_->getMessageStream()) ) |
| { |
| // get message header (already unpacked by Guardian IpcConnection) |
| InternalMsgHdrInfoStruct* msgHdr = (InternalMsgHdrInfoStruct*)(msgBuf->data(0)); |
| if (msgHdr->getMsgStreamId() == receiveMsgHdr_->getMsgStreamId()) |
| { |
| assert(msgHdr->getType() == receiveMsgHdr_->getType()); |
| inBufList_.removeAt(i); |
| receiveBufList_.insert(msgBuf); |
| if (msgHdr->isLastMsgBuf()) |
| { // all buffers for this message received, proceed |
| // unpack next message object's base class |
| receiveMsgObj_ = |
| new(receiveMsgHdr_->getNextFromOffset()) IpcMessageObj(this); |
| assert(receiveMsgObj_); // should not be NULL if multiple bufs |
| receiveMsgComplete_ = TRUE; |
| |
| msgType = receiveMsgHdr_->getType(); |
| return (TRUE); |
| } |
| continue; |
| } |
| } |
| i++; |
| } |
| } |
| |
| return (FALSE); |
| } |
| |
| /////////////////////////////////////////////////////////////////////////////// |
| // get next message object type from current receive message. |
| NABoolean IpcBufferedMsgStream::getNextObjType(IpcMessageObjType& msgType) |
| { |
| if (receiveMsgObj_) |
| { |
| msgType = receiveMsgObj_->getType(); |
| |
| return (TRUE); |
| } |
| |
| return (FALSE); |
| } |
| |
| /////////////////////////////////////////////////////////////////////////////// |
| // get next message object size from current receive message. |
| IpcMessageObjSize IpcBufferedMsgStream::getNextObjSize() const |
| { |
| return (receiveMsgObj_ ? receiveMsgObj_->s_.objLength_ : 0); |
| } |
| |
| /////////////////////////////////////////////////////////////////////////////// |
| // get a pointer to the next packed object in the current receive message. |
| IpcMessageObj* IpcBufferedMsgStream::receiveMsgObj() |
| { |
| if (receiveMsgObj_ == NULL) |
| return (NULL); // no more receive message objects |
| |
| IpcMessageObj* returnMsgObj_ = receiveMsgObj_; |
| |
| // the base class portion of the next packed object (IpcMessageObj) is |
| // unpacked inplace now, before the derived class constructor is called |
| // to allow it to be used internally by IpcBufferedMsgStream. The message |
| // object will behave as a base class object until the derived class |
| // constructor completes (new should handle NULL). |
| IpcMessageObj *msgHeap = returnMsgObj_->getNextFromOffset(); |
| if (!msgHeap) |
| receiveMsgObj_ = NULL; |
| else |
| receiveMsgObj_ = new(msgHeap) IpcMessageObj(this); |
| |
| while (receiveMsgObj_ == NULL) |
| { // check next receive message buffer |
| receiveMsgBufI_++; |
| if (receiveMsgBufI_ >= receiveBufList_.entries()) |
| { // done with this message |
| receiveMsgBuf_ = NULL; |
| receiveMsgObj_ = receiveMsgHdr_ = NULL; |
| |
| break; |
| } |
| |
| receiveMsgBuf_ = receiveBufList_[receiveMsgBufI_]; |
| // msg header already unpacked by Guardian IpcConnection |
| receiveMsgHdr_ = (InternalMsgHdrInfoStruct*)(receiveMsgBuf_->data(0)); |
| // unpack next message object's base class (new should handle NULL). |
| IpcMessageObj *msgHeap = receiveMsgHdr_->getNextFromOffset(); |
| if (!msgHeap) |
| receiveMsgObj_ = NULL; |
| else |
| receiveMsgObj_ = new(msgHeap) IpcMessageObj(this); |
| } |
| |
| return (returnMsgObj_); |
| } |
| |
| /////////////////////////////////////////////////////////////////////////////// |
| // give current receive message to a peer message stream for processing. |
| void IpcBufferedMsgStream::giveReceiveMsgTo(IpcBufferedMsgStream& msgStream) |
| { |
| assert(receiveMsgComplete_); |
| |
| IpcMessageBuffer* msgBuf; |
| while (receiveBufList_.getFirst(msgBuf)) |
| { |
| if (msgBuf->getReplyTag() != GuaInvalidReplyTag) |
| { |
| replyTagBufList_.remove(msgBuf); |
| msgBuf->decrRefCount(); // no longer in two lists! |
| } |
| msgStream.addInputBuffer(msgBuf); |
| } |
| |
| receiveMsgBuf_ = NULL; |
| receiveMsgObj_ = receiveMsgHdr_ = NULL; |
| receiveMsgComplete_ = FALSE; |
| |
| msgStream.actOnReceive(NULL); // trigger receiving message stream |
| } |
| |
| /////////////////////////////////////////////////////////////////////////////// |
| // pack an object in the current send message |
| IpcBufferedMsgStream& IpcBufferedMsgStream::operator << (IpcMessageObj& obj) |
| { |
| IpcMessageObjSize packedLength = obj.packedLength(); |
| IpcMessageObj* packedObj = |
| new(*this, packedLength) IpcMessageObj(obj.getType(), obj.getVersion()); |
| |
| // IpcBufferedMsgStream does NOT allow the object to pack the base class. |
| // This is because we want to ensure that the packed object behaves as a |
| // generic IpcMessageObj with NO virtual behavior. The basic IpcMessageObj |
| // is constructed in the buffer via the new operator above. |
| IpcMessageObjSize packedBytes = |
| obj.packObjIntoMessage((IpcMessageBufferPtr)(&packedObj[1])); |
| |
| // Did the object really use the length it promised to use? |
| assert(packedLength == packedBytes); |
| |
| return *this; |
| } |
| |
| /////////////////////////////////////////////////////////////////////////////// |
| // unpack the next object in the current receive message |
| NABoolean IpcBufferedMsgStream::extractNextObj(IpcMessageObj &obj, |
| NABoolean checkObjects) |
| { |
| IpcMessageObj* packedObj = receiveMsgObj(); |
| |
| // the user should have predicted the object type and version correctly |
| assert(packedObj AND |
| packedObj->getType() == obj.getType() AND |
| packedObj->getVersion() == obj.getVersion()); |
| |
| // IpcBufferedMsgStream does NOT allow the object to unpack the base class. |
| // This is because the packed object behaves as a generic IpcMessageObj and |
| // the destination object wishes to retain its virtual behavior. |
| IpcMessageObjSize objLen = packedObj->s_.objLength_ - sizeof(IpcMessageObj); |
| |
| // If the caller requested an integrity check on the packed object |
| // then we call a checkObj() method before unpackObj(). |
| NABoolean result = TRUE; |
| if (checkObjects) |
| { |
| result = obj.checkObj(packedObj->getType(), |
| packedObj->getVersion(), |
| TRUE, |
| objLen, |
| (IpcConstMessageBufferPtr)(&packedObj[1])); |
| } |
| |
| if (result) |
| { |
| obj.unpackObj(packedObj->getType(), |
| packedObj->getVersion(), |
| TRUE, |
| objLen, |
| (IpcConstMessageBufferPtr)(&packedObj[1])); |
| } |
| |
| return result; |
| } |
| |
| /////////////////////////////////////////////////////////////////////////////// |
| // allocate space for a packed object in the current send message. |
| IpcMessageObj* IpcBufferedMsgStream::sendMsgObj(IpcMessageObjSize packedObjLen) |
| { |
| IpcMessageBuffer::alignOffset(packedObjLen); |
| |
| if (sendMsgBuf_ == NULL) |
| { |
| IpcMessageObjSize length = |
| (packedObjLen > bufferSize_ ? packedObjLen : bufferSize_); |
| length += sizeof(InternalMsgHdrInfoStruct); |
| |
| CollHeap *heap = (environment_ ? environment_->getHeap() : NULL); |
| |
| sendMsgBuf_ = IpcMessageBuffer::allocate(length, |
| this, |
| heap, |
| 0); |
| |
| if (sendMsgBuf_ == NULL) |
| return NULL; // no buffers available |
| |
| sendBufList_.insert(sendMsgBuf_); |
| // Create message header |
| // Recursively calls sendMsgObj() via operator new |
| sendMsgHdr_ = new(*this, 0) InternalMsgHdrInfoStruct(this->msgType_, |
| this->msgVersion_); |
| sendMsgHdr_->totalLength_ = sizeof(InternalMsgHdrInfoStruct); |
| } |
| |
| if (packedObjLen == 0) |
| return (NULL); // internal call to create an empty buffer, hdr only |
| |
| if (sendMsgHdr_ == NULL) |
| { // first object must be header |
| sendMsgObj_ = sendMsgHdr_ = |
| (InternalMsgHdrInfoStruct*)(sendMsgBuf_->data(0)); |
| } |
| else |
| { |
| IpcMessageObjSize newLen = sendMsgHdr_->totalLength_ + packedObjLen; |
| if (newLen > sendMsgBuf_->getBufferLength()) |
| { |
| sendMsgBuf_ = NULL; |
| sendMsgObj_ = sendMsgHdr_ = NULL; |
| |
| return (sendMsgObj(packedObjLen)); //recursively call for new buffer |
| } |
| |
| // link to previous last object (must coexist with IpcMessageStream!) |
| // IpcBufferedMsgStream only uses "next_" as an offset although it is |
| // defined as a pointer. IpcMessageStream uses next as both pointer and |
| // offset. The objLength_ field gets set later, in prepSendMsgForOutput() |
| sendMsgObj_->s_.next_ = |
| (IpcMessageObj*)(sendMsgBuf_->data(sendMsgHdr_->totalLength_)); |
| sendMsgObj_->convertNextToOffset(); |
| sendMsgObj_ = |
| (IpcMessageObj*)(sendMsgBuf_->data(sendMsgHdr_->totalLength_)); |
| sendMsgHdr_->totalLength_ = newLen; |
| |
| } |
| |
| return (sendMsgObj_); |
| } |
| |
| IpcConnection* IpcBufferedMsgStream::getConnection() |
| { |
| ABORT("IpcMessageStreamBase::getConnection() should not be called!"); |
| return NULL; |
| } |
| |
| /////////////////////////////////////////////////////////////////////////////// |
| // cleanup unpacked message buffers with objects no longer inuse |
| void IpcBufferedMsgStream::cleanupBuffers() |
| { |
| if (inUseBufList_.entries() <= (CollIndex) garbageCollectLimit_) |
| return; |
| |
| CollIndex i = 0; |
| while (i < inUseBufList_.entries()) |
| { |
| IpcMessageBuffer* msgBuf = inUseBufList_[i]; |
| InternalMsgHdrInfoStruct* msgHdr = |
| (InternalMsgHdrInfoStruct*)(msgBuf->data(0)); |
| IpcMessageObj* msgObj = msgHdr; |
| IpcMessageObj* nextMsgObj; |
| while (nextMsgObj = msgObj->getNextFromOffset()) |
| { |
| if (!(nextMsgObj->msgObjIsFree())) |
| break; // quit, no need to check remaining objects in buffer |
| |
| msgObj->mergeNextPackedObj(); // unlink msg objects no longer in use |
| } |
| |
| if (msgHdr->getNextFromOffset() == NULL) |
| { // all objects are no longer inuse, free buffer |
| inUseBufList_.removeAt(i); |
| msgBuf->decrRefCount(); |
| } |
| else |
| { // msg objects in buffer still in use |
| i++; |
| while (!(msgHdr->isLastMsgBuf())) |
| { // skip over remaining buffers for this message. |
| // must guarantee that a complex object consisting of multiple message |
| // objects or a set of objects within a message can be held inuse by |
| // the root(first allocated) object! |
| assert(i < inUseBufList_.entries()); |
| msgBuf = inUseBufList_[i]; |
| msgHdr = (InternalMsgHdrInfoStruct*)(msgBuf->data(0)); |
| i++; |
| } |
| } |
| } |
| if (inUseBufList_.entries() < (CollIndex) inUseBufferLimit_) |
| { // optimize garbage collection limit until inuse limit reached |
| garbageCollectLimit_ = inUseBufList_.entries(); |
| } |
| |
| } |
| |
| //////////////////////////////////////////////////////////////////////////// |
| // prepare send message objects for output and put buffers in output queue. |
| void IpcBufferedMsgStream::prepSendMsgForOutput() |
| { |
| while (sendBufList_.getFirst(sendMsgBuf_)) |
| { |
| sendMsgHdr_ = (InternalMsgHdrInfoStruct*)(sendMsgBuf_->data(0)); |
| sendMsgBuf_->setMessageLength(sendMsgHdr_->totalLength_); |
| MXTRC_1("totalLength_=%d \n", sendMsgHdr_->totalLength_); |
| if (sendBufList_.entries() == 0) |
| sendMsgHdr_->setLastMsgBuf(); |
| sendMsgHdr_->setMsgStreamId((Long)this); |
| sendMsgObj_ = sendMsgHdr_; |
| do |
| { |
| // calculate object length, note that this is done only to be |
| // compatible with IpcMessageStream and that this object length |
| // is rounded up to the alignment that was done internally |
| if (sendMsgObj_->s_.next_) |
| sendMsgObj_->s_.objLength_ = |
| (IpcMessageObjSize)(Long)((sendMsgObj_->s_.next_)); |
| else |
| sendMsgObj_->s_.objLength_ = |
| (IpcMessageObjSize)(sendMsgBuf_->data(0) |
| + sendMsgHdr_->totalLength_ |
| - (IpcMessageBufferPtr) sendMsgObj_); |
| MXTRC_3("objType_=%d objLength_=%d next_=%d\n", sendMsgObj_->s_.objType_, sendMsgObj_->s_.objLength_, sendMsgObj_->s_.next_); |
| sendMsgObj_->prepMsgObjForSend(); |
| sendMsgObj_->setMyVPtr(NULL); |
| } |
| while ((sendMsgObj_ = sendMsgObj_->getNextFromOffset()) != NULL); |
| |
| outBufList_.insert(sendMsgBuf_); |
| } |
| |
| sendMsgBuf_ = NULL; |
| sendMsgObj_ = sendMsgHdr_ = NULL; |
| } |
| |
| //////////////////////////////////////////////////////////////////////////// |
| // add a message buffer to the input queue. |
| void IpcBufferedMsgStream::addInputBuffer(IpcMessageBuffer* inputBuf) |
| { |
| inBufList_.insert(inputBuf); |
| if (inputBuf->getReplyTag() != GuaInvalidReplyTag) |
| { |
| if (getSMContinueProtocol()) |
| EXSM_TRACE(EXSM_TRACE_PROTOCOL, "STREAM %p add input buf %p", this, |
| inputBuf); |
| |
| inputBuf->incrRefCount(); // buffer in 2 lists, incr to hold! |
| |
| // save reply tag, needed for response |
| replyTagBufList_.insert(inputBuf); |
| } |
| } |
| |
| /////////////////////////////////////////////////////////////////////////////// |
| // get next message buffer from output queue matched with next reply tag. |
| IpcMessageBuffer* IpcServerMsgStream::getReplyTagOutputBuffer( |
| IpcConnection*& connection, |
| IpcBufferedMsgStream*& msgStream) |
| { |
| MXTRC_FUNC("IpcServerMsgStream::getReplyTagOutputBuffer"); |
| |
| if (numOfReplyTagBuffers()) |
| { |
| IpcMessageBuffer* outputBuf = getOutputBuffer(); |
| if (outputBuf) |
| { |
| IpcMessageBuffer* replyTagBuf = replyTagBufList_[0]; |
| |
| outputBuf->setReplyTag(replyTagBuf->getReplyTag()); |
| outputBuf->setMaxReplyLength(replyTagBuf->getMaxReplyLength()); |
| connection = replyTagBuf->getConnection(); |
| MXTRC_2("replyTag=%d connection=%x", replyTagBuf->getReplyTag(), connection); |
| msgStream = (IpcBufferedMsgStream*)(replyTagBuf->getMessageStream()); |
| |
| NABoolean deleteReplyTag = FALSE; |
| |
| // For the SeaMonster continue protocol the communication |
| // between client and server is not one to one. That is client |
| // does not send a continue message for every reply from the |
| // server and the server does not send only one reply for every |
| // request. For every request the server receives, the server |
| // replies with the number of buffers set by |
| // sendBufferLimit_. The server sets the LAST IN BATCH flag for |
| // the last buffer in a batch and waits for a continue message |
| // from the client before it sends more replies. |
| // |
| // The way a stream controls if more buffers can be sent to the |
| // client is through replyTagBufList_. If there are any entries |
| // in replyTagBufList_ then the server stream can send messages |
| // to the client and if there are no entries in replyTagBufList_ |
| // then the server stream cannot send any more messages until |
| // the client sends a continue request and that gets added to |
| // replyTagBufList_. |
| // |
| // So for SeaMonster continue protocol just after sending one |
| // reply server cannot delete the request from replyTagBufList_ |
| // and needs to wait until the LAST IN BATCH reply or the EOD |
| // reply is sent. The following changes make sure that for |
| // SeaMonster continue protocol we do not delete the request |
| // from replyTagBufList_ except for the LAST IN BATCH or the EOD |
| // reply. |
| if (getSMContinueProtocol()) |
| { |
| buffersSentInBatch_++; |
| |
| InternalMsgHdrInfoStruct *hdr = |
| (InternalMsgHdrInfoStruct *)outputBuf->data(0); |
| |
| // If this buffer is the last in batch to be sent then mark the buffer |
| // as LAST IN BATCH and remember to delete the buffer from reply tag |
| // If this buffer is not last in batch then check if LAST IN BATCH |
| // was already set by the TCB since it reached EOD, if so remember |
| // to delete buffer from reply tag |
| if (buffersSentInBatch_ == sendBufferLimit_) |
| { |
| EXSM_TRACE(EXSM_TRACE_PROTOCOL, |
| "STREAM %p sent %d limit %d sending last in batch", |
| this, buffersSentInBatch_, sendBufferLimit_); |
| hdr->setSMLastInBatch(); |
| |
| deleteReplyTag = TRUE; |
| } |
| else if (hdr->getSMLastInBatch()) |
| deleteReplyTag = TRUE; |
| } |
| else // regular one to one continue protocol case |
| deleteReplyTag = TRUE; |
| |
| if (deleteReplyTag) |
| { |
| buffersSentInBatch_ = 0; |
| replyTagBuf->setReplyTag(GuaInvalidReplyTag); // invalidate reply tag |
| replyTagBuf->decrRefCount(); // free if not in another list |
| replyTagBufList_.remove(replyTagBuf); |
| } |
| |
| return outputBuf; |
| |
| } // if (outputBuf) |
| } // if (numOfReplyTagBuffers()) |
| |
| return NULL; |
| } |
| |
| /////////////////////////////////////////////////////////////////////////////// |
| // internal send call back may be redefined by derived classes. |
| void IpcBufferedMsgStream::internalActOnSend(IpcConnection* connection) |
| { |
| if (connection && !errorInfo_) |
| { |
| errorInfo_ = connection->getErrorInfo(); |
| } |
| actOnSend(connection); |
| } |
| |
| /////////////////////////////////////////////////////////////////////////////// |
| // internal receive call back may be redefined by derived classes. |
| void IpcBufferedMsgStream::internalActOnReceive(IpcMessageBuffer* buffer, |
| IpcConnection* connection) |
| { |
| if (connection && !errorInfo_) |
| { |
| errorInfo_ = connection->getErrorInfo(); |
| } |
| if (buffer) |
| { |
| addInputBuffer(buffer); |
| } |
| actOnReceive(connection); |
| } |
| |
| void IpcBufferedMsgStream::actOnSendAllComplete() |
| { |
| // the default callback implementation does nothing |
| } |
| |
| void IpcBufferedMsgStream::actOnReceiveAllComplete() |
| { |
| // the default callback implementation does nothing |
| } |
| |
| // ---------------------------------------------------------------------------- |
| // IpcClientMsgStream |
| // ---------------------------------------------------------------------------- |
| |
| /////////////////////////////////////////////////////////////////////////////// |
| // constructor |
| IpcClientMsgStream::IpcClientMsgStream(IpcEnvironment *env, |
| IpcMessageType msgType, |
| IpcMessageObjVersion version, |
| Lng32 sendBufferLimit, |
| Lng32 inUseBufferLimit, |
| IpcMessageObjSize bufferSize) |
| : IpcBufferedMsgStream(env, msgType, version, inUseBufferLimit, bufferSize), |
| sendBufferLimit_(sendBufferLimit), |
| responsesPending_(0), |
| recipients_(env->getAllConnections(),env->getHeap()), |
| localRecipients_(env->getHeap()), |
| localReplyTag_(0), |
| smBatchIsComplete_(FALSE) |
| { } |
| |
| /////////////////////////////////////////////////////////////////////////////// |
| // broadcast the current send message to all recipients |
| void IpcClientMsgStream::sendRequest(Int64 transid) |
| { |
| MXTRC_FUNC("IpcClientMsgStream::sendRequest"); |
| prepSendMsgForOutput(); |
| |
| while (numOfOutputBuffers()) |
| { |
| IpcMessageBuffer* msgBuf; |
| CollIndex numRecipients = |
| recipients_.entries() + localRecipients_.entries(); |
| |
| for (IpcConnectionId i = 0; recipients_.setToNext(i); i++) |
| { // send output message to all remote connections |
| numRecipients--; |
| msgBuf = numRecipients ? copyOutputBuffer() : getOutputBuffer(); |
| |
| // store transid in msgbuf |
| msgBuf->setTransid (transid); |
| |
| responsesPending_++; |
| |
| if (getSMContinueProtocol()) |
| EXSM_TRACE(EXSM_TRACE_PROTOCOL, "STREAM %p rp is now %d", this, |
| (int) responsesPending_); |
| |
| IpcConnection *conn = recipients_.element(i); |
| conn->send(msgBuf); |
| } |
| |
| assert(numRecipients == localRecipients_.entries()); |
| |
| for (CollIndex j = 0; j < localRecipients_.entries(); j++) |
| { // send output message to all local server msg streams |
| IpcBufferedMsgStream* msgStream = localRecipients_[j]; |
| numRecipients--; |
| msgBuf = numRecipients ? copyOutputBuffer() : getOutputBuffer(); |
| responsesPending_++; |
| msgBuf->setReplyTag(getLocalReplyTag()); |
| msgStream->internalActOnReceive(msgBuf, NULL); |
| } |
| assert(numRecipients == 0); |
| } |
| |
| // commenting out the following code as this is causing problems for SM, |
| // removing this code does not cause problems for Seabed messages as this |
| // is mostly noop for seabed messages |
| //recipients_.waitOnSet(IpcImmediately); |
| } |
| |
| // abort any outstanding I/Os on this stream |
| void IpcClientMsgStream::abandonPendingIOs() |
| { |
| while (numOfResponsesPending() > 0) |
| { |
| for (IpcConnectionId i = 0; recipients_.setToNext(i); i++) |
| { |
| IpcConnection * c = recipients_.element(i); |
| NABoolean useGuaIpc = TRUE; |
| if (useGuaIpc) |
| { |
| if (c->castToGuaConnectionToServer()) |
| c->castToGuaConnectionToServer()->setFatalError(this); |
| } |
| } // for |
| } // while |
| } |
| |
| /////////////////////////////////////////////////////////////////////////////// |
| // internal receive call back |
| void IpcClientMsgStream::internalActOnReceive(IpcMessageBuffer* buffer, |
| IpcConnection* connection) |
| { |
| MXTRC_FUNC("IpcClientMsgStream::internalActOnReceive"); |
| |
| // Note: It is possible for this function to be called with a NULL |
| // buffer pointer under certain error conditions. |
| |
| if (buffer) |
| buffer->setReplyTag(GuaInvalidReplyTag); // invalidate so buff can cleanup |
| |
| // Cases to consider |
| // (a) buffer is NULL (which indicates an error condition) |
| // (b) stream is NOT following seamonster continue protocol |
| // (c) stream IS following seamonster continue protocol |
| |
| if (!getSMContinueProtocol() || !buffer) |
| { |
| // Cases (a) and (b) |
| if (responsesPending_ > 0) |
| responsesPending_--; |
| else |
| { |
| connection->dumpAndStopOtherEnd(true, false); |
| assert(responsesPending_ > 0); |
| } |
| } |
| else |
| { |
| // Case (c) |
| InternalMsgHdrInfoStruct *hdr = |
| (InternalMsgHdrInfoStruct *) buffer->data(0); |
| assert(hdr); |
| |
| if (hdr->getSMLastInBatch()) |
| { |
| // Remember that the LAST IN BATCH buffer is received so later |
| // in actOnReceive() we can decrement the statement globals |
| // message counter. This saved this information in a stream |
| // data member since the actOnReceive() method does not have a |
| // buffer pointer to check the actual LAST IN BATCH flag. |
| smBatchIsComplete_ = TRUE; |
| |
| responsesPending_--; |
| EXSM_TRACE(EXSM_TRACE_PROTOCOL, "STREAM %p rp is now %d", this, |
| (int) responsesPending_); |
| |
| IpcConnection *conn = connection->castToSMConnection(); |
| assert(conn); |
| ((SMConnection *)conn)->decrOutstandingSMRequests(); |
| } |
| } |
| |
| // Call the parent class internalActOnReceive() function. This will |
| // invoke the child class actOnReceive() virtual method. |
| IpcBufferedMsgStream::internalActOnReceive(buffer, connection); |
| |
| if (getSMContinueProtocol()) |
| { |
| // After receive callback processing is complete in the parent and |
| // child classes, we can reset the batch complete flag |
| smBatchIsComplete_ = FALSE; |
| } |
| } |
| |
| /////////////////////////////////////////////////////////////////////////////// |
| // internal send call back |
| void IpcClientMsgStream::internalActOnSend(IpcConnection* connection) |
| { |
| // We'll have to initiate receive even if connection is in error. This |
| // is because the stream needs to do bookkeeping and other work only |
| // when the receive callback is called. As result, decrementing |
| // responsesPending_ becomes unnecessary. Please ignore next comment |
| |
| // If an error occurred, we will decrement responsesPending_ before |
| // calling the superclass implementation of this method. This way |
| // when the superclass method issues send callbacks, those callbacks |
| // can correctly determine whether responsesPending_ has dropped to |
| // zero following an IPC error. |
| |
| // But before making any judgements about whether an error occurred, |
| // first make sure this stream's errorInfo_ field is updated with |
| // any error information from the connection. |
| if (connection && !errorInfo_) |
| errorInfo_ = connection->getErrorInfo(); |
| |
| IpcBufferedMsgStream::internalActOnSend(connection); |
| if (connection) |
| { |
| connection->receive(this); |
| } |
| } |
| |
| // ---------------------------------------------------------------------------- |
| // IpcServerMsgStream |
| // ---------------------------------------------------------------------------- |
| |
| /////////////////////////////////////////////////////////////////////////////// |
| // constructor |
| IpcServerMsgStream::IpcServerMsgStream(IpcEnvironment *env, |
| IpcMessageType msgType, |
| IpcMessageObjVersion version, |
| Lng32 sendBufferLimit, |
| Lng32 inUseBufferLimit, |
| IpcMessageObjSize bufferSize) |
| : IpcBufferedMsgStream(env, msgType, version, inUseBufferLimit, bufferSize), |
| sendBufferLimit_(sendBufferLimit), |
| client_(NULL), |
| buffersSentInBatch_(0) |
| { }; |
| |
| ////////////////////////////////////////////////////////////////////////////// |
| // send the current response message back to the client |
| void IpcServerMsgStream::sendResponse() |
| { |
| MXTRC_FUNC("IpcServerMsgStream::sendResponse"); |
| prepSendMsgForOutput(); |
| tickleOutputIo(); |
| } |
| |
| ////////////////////////////////////////////////////////////////////////////// |
| // server is done replying to all requests |
| void IpcServerMsgStream::responseDone() |
| { |
| while (numOfOutputBuffers() < numOfReplyTagBuffers()) |
| { // create empty responses for excess reply tags |
| sendMsgObj(0); |
| prepSendMsgForOutput(); |
| } |
| tickleOutputIo(); |
| } |
| |
| /////////////////////////////////////////////////////////////////////////////// |
| // reply to outstanding requests from the output queue |
| void IpcServerMsgStream::tickleOutputIo() |
| { |
| // This early return is done without any SeaMonster tracing so that |
| // SeaMonster tracing only happens when there is actually something |
| // to send |
| if (numOfOutputBuffers() == 0) |
| return; |
| |
| if (getSMContinueProtocol()) |
| EXSM_TRACE(EXSM_TRACE_PROTOCOL, |
| "STREAM %p BEGIN tickleOutputIo bufs %d", this, |
| (int) numOfOutputBuffers()); |
| |
| IpcMessageBuffer* msgBuf; |
| IpcConnection* connection; |
| IpcBufferedMsgStream* msgStream; |
| MXTRC_FUNC("IpcServerMsgStream::tickleOutputIo"); |
| while (msgBuf = getReplyTagOutputBuffer(connection, msgStream)) |
| { |
| if (connection) |
| { |
| // respond to remote client message stream via connection |
| connection->send(msgBuf); |
| |
| // comment out the following wait to eliminate the cause of bug 2473 |
| // in an open/close cursor test that recreates the problem |
| //connection->wait(IpcImmediately); |
| } |
| else |
| { // respond to local client message stream |
| msgStream->internalActOnReceive(msgBuf, NULL); |
| } |
| } |
| |
| if (getSMContinueProtocol()) |
| EXSM_TRACE(EXSM_TRACE_PROTOCOL, |
| "STREAM %p END tickleOutputIo bufs %d", this, |
| (int) numOfOutputBuffers()); |
| } |
| |
| void IpcBufferedMsgStream::setSMLastInBatch() |
| { |
| Int32 entries = (Int32) numOfSendBuffers(); |
| assert(entries > 0); |
| |
| IpcMessageBuffer *msgBuf = sendBufList_[entries - 1]; |
| InternalMsgHdrInfoStruct *hdr = (InternalMsgHdrInfoStruct *) msgBuf->data(0); |
| hdr->setSMLastInBatch(); |
| } |
| |
| /////////////////////////////////////////////////////////////////////////////// |
| // internal receive call back |
| void IpcServerMsgStream::internalActOnReceive(IpcMessageBuffer* buffer, |
| IpcConnection* connection) |
| { |
| if (buffer) |
| { |
| assert(buffer->getReplyTag() != GuaInvalidReplyTag); // vaild reply tag ? |
| |
| if (client_ != NULL) |
| { |
| // cannot be recipient of local msg stream if receiving from connection |
| assert(client_ == buffer->getConnection()); |
| client_->receive(this); |
| } |
| } |
| IpcBufferedMsgStream::internalActOnReceive(buffer, connection); |
| } |
| |
| // ----------------------------------------------------------------------- |
| // Methods for class IpcServerClass |
| // ----------------------------------------------------------------------- |
| |
| IpcServerClass::IpcServerClass(IpcEnvironment *env, |
| IpcServerType serverType, |
| IpcServerAllocationMethod allocationMethod, |
| short serverVersion, |
| char *nodeName) : |
| allocatedServers_(env->getHeap()) |
| { |
| environment_ = env; |
| serverType_ = serverType; |
| allocationMethod_ = allocationMethod; |
| serverVersion_ = serverVersion; |
| nodeName_ = nodeName; |
| char *parallelOpens = getenv("ESP_PARALLEL_CC_OPENS"); |
| if (parallelOpens != NULL && *parallelOpens == '0') |
| parallelOpens_ = FALSE; |
| else |
| parallelOpens_ = TRUE; |
| Int32 retVal; |
| retVal = pthread_mutex_init(&nowaitedEspServer_.cond_mutex_, NULL); |
| assert(retVal == 0); |
| retVal = pthread_cond_init(&nowaitedEspServer_.cond_cond_, NULL); |
| assert(retVal == 0); |
| nowaitedEspServer_.startTag_ = 0; |
| nowaitedEspServer_.callbackCount_ = 0; |
| nowaitedEspServer_.completionCount_ = 0; |
| nowaitedEspServer_.waiting_ = FALSE; |
| char *waitedStartupArg = getenv("ESP_PARALLEL_STARTUP"); |
| if (waitedStartupArg == NULL) |
| nowaitedEspServer_.waitedStartupArg_ = '0'; |
| else |
| { |
| switch (*waitedStartupArg) |
| { |
| case '0': |
| nowaitedEspServer_.waitedStartupArg_ = '1'; |
| break; |
| case '1': |
| nowaitedEspServer_.waitedStartupArg_ = '0'; |
| break; |
| default: |
| nowaitedEspServer_.waitedStartupArg_ = '0'; |
| } |
| } |
| if (allocationMethod_ == IPC_ALLOC_DONT_CARE) |
| { |
| // NA_WINNT is set and NA_GUARDIAN_IPC is set |
| // The standard method on NT is to create a Guardian process |
| // in order to run in an NT only or simulated environment we can set an environment |
| // variable to override that mechanism. |
| if (getenv("SQL_NO_NSK_LITE") == NULL) |
| { |
| allocationMethod_ = IPC_LAUNCH_GUARDIAN_PROCESS; |
| } |
| else |
| { |
| allocationMethod_ = IPC_LAUNCH_NT_PROCESS; |
| time_t tp; |
| time(&tp); |
| nextPort_ = IPC_SQLESP_PORTNUMBER + tp % 10000; // arbitrary |
| }; |
| } |
| } |
| |
| IpcServerClass::~IpcServerClass() |
| { |
| NAHeap *heap = (NAHeap *)environment_->getHeap(); |
| CollIndex entryCount; |
| entryCount = allocatedServers_.entries(); |
| for (CollIndex i = 0 ; i < entryCount; i++) { |
| NADELETE(allocatedServers_[i], IpcServer, heap); |
| } |
| allocatedServers_.clear(); |
| } |
| |
| IpcServer * IpcServerClass::allocateServerProcess(ComDiagsArea **diags, |
| CollHeap *diagsHeap, |
| const char *nodeName, |
| IpcCpuNum cpuNum, |
| IpcPriority priority, |
| Lng32 espLevel, |
| NABoolean usesTransactions, |
| NABoolean waitedCreation, |
| Lng32 maxNowaitRequests, |
| const char* progFileName, |
| const char* processName, |
| NABoolean parallelOpens, |
| IpcGuardianServer **creatingProcess) |
| { |
| IpcServer *result = NULL; |
| short retcode = 0; |
| if (creatingProcess != NULL) |
| { |
| result = *creatingProcess; |
| if ((*creatingProcess)->isCreatingProcess()) |
| { |
| assert(waitedCreation == FALSE); |
| retcode = (*creatingProcess)->workOnStartup(IpcInfiniteTimeout,diags,diagsHeap); |
| if ((*creatingProcess)->isCreatingProcess()) |
| { |
| result = NULL; |
| return result; // Launch didn't complete |
| } |
| } |
| else |
| assert(FALSE); // Existing process in not in creating state |
| if (retcode) |
| { |
| char buf[20]; |
| str_sprintf(buf, "retcode = %d", retcode); |
| const char *retcodeText = buf; |
| (*creatingProcess)->logEspRelease(__FILE__, __LINE__, |
| retcodeText); |
| (*creatingProcess)->release(); |
| *creatingProcess = NULL; // Launch completed |
| result = NULL; |
| } |
| // remember this server |
| if (result != NULL) |
| allocatedServers_.insert(result); |
| return result; |
| } |
| NABoolean lv_usesTransactions = usesTransactions; |
| Lng32 lv_maxNowaitRequests = maxNowaitRequests; |
| |
| IpcConnection *serverConn = NULL; |
| const char *className = NULL; |
| IpcServerPortNumber defaultPortNumber = IPC_INVALID_SERVER_PORTNUMBER; |
| NABoolean debugServer = FALSE; |
| const char *overridingDefineName = NULL; |
| |
| debugServer = (getenv("DEBUG_SERVER") != NULL); |
| |
| // to avoid compiler warning for Unix build |
| waitedCreation = waitedCreation; |
| |
| switch (serverType_) |
| { |
| case IPC_SQLUSTAT_SERVER: |
| if (debugServer) |
| { |
| className = "arkustatdbg"; |
| defaultPortNumber = IPC_SQLUSTAT_DEBUG_PORTNUMBER; |
| } |
| else |
| { |
| className = "arkustat"; |
| defaultPortNumber = IPC_SQLUSTAT_PORTNUMBER; |
| } |
| overridingDefineName = "ARK_STA_PROG_FILE_NAME"; |
| break; |
| case IPC_SQLCAT_SERVER: |
| if (debugServer) |
| { |
| className = "arkcatdbg"; |
| defaultPortNumber = IPC_SQLCAT_DEBUG_PORTNUMBER; |
| } |
| else |
| { |
| className = "arkcat"; |
| defaultPortNumber = IPC_SQLCAT_PORTNUMBER; |
| } |
| overridingDefineName = "ARK_CAT_PROG_FILE_NAME"; |
| break; |
| case IPC_SQLCOMP_SERVER: |
| if (debugServer) |
| { |
| className = "arkcmpdbg"; |
| defaultPortNumber = IPC_SQLCOMP_DEBUG_PORTNUMBER; |
| } |
| else |
| { |
| className = "arkcmp"; |
| defaultPortNumber = IPC_SQLCOMP_PORTNUMBER; |
| } |
| overridingDefineName = "_ARK_CMP_PROG_FILE_NAME"; |
| break; |
| case IPC_SQLESP_SERVER: |
| if (debugServer) |
| { |
| className = "arkespdbg"; |
| defaultPortNumber = IPC_SQLESP_DEBUG_PORTNUMBER; |
| } |
| else |
| { |
| className = "arkesp"; |
| defaultPortNumber = IPC_SQLESP_PORTNUMBER; |
| } |
| overridingDefineName = "_ARK_ESP_PROG_FILE_NAME"; |
| break; |
| case IPC_SQLBDRR_SERVER: |
| className = "bdrr"; |
| overridingDefineName = "=_MX_BDRR_PROG_FILE_NAME"; |
| break; |
| // |
| // UDR Servers |
| // |
| |
| case IPC_SQLUDR_SERVER: |
| { |
| if (debugServer) |
| { |
| className = "udrservdbg"; |
| defaultPortNumber = IPC_SQLUDR_DEBUG_PORTNUMBER; |
| } |
| else |
| { |
| className = "udrserv"; |
| defaultPortNumber = IPC_SQLUDR_PORTNUMBER; |
| } |
| overridingDefineName = "_ARK_UDR_PROG_FILE_NAME"; |
| } |
| break; |
| |
| // |
| // Query Matching Server |
| // |
| case IPC_SQLQMS_SERVER: |
| { |
| if (debugServer) |
| { |
| className = "qmsdbg"; |
| defaultPortNumber = IPC_SQLQMS_DEBUG_PORTNUMBER; |
| } |
| else |
| { |
| className = "qms"; |
| defaultPortNumber = IPC_SQLQMS_PORTNUMBER; |
| } |
| overridingDefineName = "_ARK_QMS_PROG_FILE_NAME"; |
| } |
| break; |
| |
| // |
| // Query Matching Publisher |
| // |
| case IPC_SQLQMP_SERVER: |
| { |
| if (debugServer) |
| { |
| className = "qmpdbg"; |
| defaultPortNumber = IPC_SQLQMP_DEBUG_PORTNUMBER; |
| } |
| else |
| { |
| className = "qmp"; |
| defaultPortNumber = IPC_SQLQMP_PORTNUMBER; |
| } |
| overridingDefineName = "_ARK_QMP_PROG_FILE_NAME"; |
| } |
| break; |
| |
| // |
| // Query Matching Monitor |
| // |
| case IPC_SQLQMM_SERVER: |
| { |
| if (debugServer) |
| { |
| className = "qmmdbg"; |
| defaultPortNumber = IPC_SQLQMM_DEBUG_PORTNUMBER; |
| } |
| else |
| { |
| className = "qmm"; |
| defaultPortNumber = IPC_SQLQMM_PORTNUMBER; |
| } |
| overridingDefineName = "_ARK_QMM_PROG_FILE_NAME"; |
| } |
| break; |
| |
| // generic servers passed in as progFileName |
| case IPC_GENERIC_SERVER: |
| case IPC_SQLBDRS_SERVER: |
| if ((allocationMethod_ != IPC_USE_PROCESS) && |
| (! progFileName)) |
| ABORT("Invalid server type specified in IpcServer::IpcServer()"); |
| |
| if (allocationMethod_ == IPC_USE_PROCESS) |
| className = processName; |
| else |
| className = progFileName; |
| defaultPortNumber = IPC_GENERIC_PORTNUMBER; |
| // overridingDefineName = "_ARK_GENERIC_PROG_FILE_NAME"; |
| break; |
| case IPC_SQLSSCP_SERVER: |
| className = "sscp"; |
| lv_usesTransactions = FALSE; |
| lv_maxNowaitRequests = FS_MAX_NOWAIT_DEPTH; |
| overridingDefineName = "=_MX_SSCP_PROCESS_PREFIX"; |
| break; |
| case IPC_SQLSSMP_SERVER: |
| className = "ssmp"; |
| lv_usesTransactions = FALSE; |
| lv_maxNowaitRequests = FS_MAX_NOWAIT_DEPTH; |
| overridingDefineName = "=_MX_SSMP_PROCESS_PREFIX"; |
| break; |
| default: |
| ABORT("Invalid server type specified in IpcServer::IpcServer()"); |
| break; |
| } |
| |
| switch (allocationMethod_) |
| { |
| |
| case IPC_LAUNCH_GUARDIAN_PROCESS: |
| case IPC_SPAWN_OSS_PROCESS: |
| case IPC_USE_PROCESS: |
| { |
| IpcGuardianServer *result2 = |
| new(environment_) IpcGuardianServer( |
| this, |
| diags, |
| diagsHeap, |
| nodeName, |
| className, |
| cpuNum, |
| priority, //IPC_PRIORITY_DONT_CARE, |
| allocationMethod_, |
| (short) allocatedServers_.entries(), |
| lv_usesTransactions, |
| FALSE, |
| waitedCreation, |
| lv_maxNowaitRequests, |
| overridingDefineName, |
| processName, |
| parallelOpens); |
| result = result2; |
| retcode = result2->workOnStartup(IpcInfiniteTimeout,diags,diagsHeap); |
| if (result2->isCreatingProcess() && retcode == 0) |
| return result2; |
| if (retcode) |
| { |
| char buf[20]; |
| str_sprintf(buf, "retcode = %d", retcode); |
| const char *retcodeText = buf; |
| result2->logEspRelease(__FILE__, __LINE__, |
| retcodeText); |
| result2->release(); |
| result = NULL; |
| } |
| } |
| break; |
| |
| case IPC_INETD: |
| { |
| serverConn = createInternetProcess(diags, |
| diagsHeap, |
| nodeName, |
| className, |
| cpuNum, |
| usesTransactions, |
| defaultPortNumber); |
| // make an IpcServer object |
| if (serverConn != NULL) |
| { |
| result = new(environment_) IpcServer(serverConn,this); |
| // $$$$ add errors from creating the connection |
| } |
| } |
| break; |
| |
| case IPC_POSIX_FORK_EXEC: |
| { |
| serverConn = forkProcess(diags, |
| diagsHeap, |
| nodeName, |
| className, |
| cpuNum, |
| usesTransactions); |
| // make an IpcServer object |
| if (serverConn != NULL) |
| result = new(environment_) IpcServer(serverConn,this); |
| } |
| break; |
| |
| |
| case IPC_LAUNCH_NT_PROCESS: |
| { |
| defaultPortNumber = (IpcServerPortNumber)nextPort_; |
| nextPort_++; |
| |
| // make an IpcServer object |
| if (serverConn != NULL) |
| result = new(environment_) IpcServer(serverConn,this); |
| } |
| break; |
| default: |
| ABORT("Invalid server class allocation method"); |
| break; |
| |
| } // switch |
| |
| // remember this server |
| if (result != NULL) |
| allocatedServers_.insert(result); |
| |
| return result; |
| } |
| |
| void IpcServerClass::freeServerProcess(IpcServer *s) |
| { |
| // assume caller already killed the server |
| allocatedServers_.remove(s); |
| NADELETE(s, IpcServer, environment_->getHeap()); |
| } |
| |
| char *IpcServerClass::getProcessName(const char *nodeName, short nodeNameLen, short cpuNum, char *processName) |
| { |
| return getServerProcessName(serverType_, nodeName, nodeNameLen, cpuNum, processName); |
| } |
| // ----------------------------------------------------------------------- |
| // methods for class IpcEnvironment |
| // ----------------------------------------------------------------------- |
| |
| ///////////////////////////////////////////////////////////////////// |
| // Helper method getDefineShort |
| // Accepts a define name as an argument. Be sure to make this 24 bytes |
| // long, blank padded if necessary, not including the null terminator -- |
| // see the manual for DEFINEINFO if there are any questions. |
| // |
| // Return -1 if define not resolved, else it returns an integer parsed |
| // from the class MAP's "file name". |
| // |
| // For example: |
| // add_define =_SQLMX_MAX_OUTGOING_MSG class=MAP file=\$SYSTEM.#128 |
| // |
| ///////////////////////////////////////////////////////////////////// |
| short getDefineShort( char * defineName ) |
| { |
| Lng32 retVal = -1; |
| |
| return (short) retVal; |
| return (short) retVal; |
| } |
| |
| |
| // ----------------------------------------------------------------------- |
| // Methods for IpcEnvironment |
| // ----------------------------------------------------------------------- |
| |
| IpcEnvironment::IpcEnvironment(CollHeap *heap, UInt32 *eventConsumed, |
| NABoolean breakEnabled, IpcServerType serverType, NABoolean useGuaIpcAtRuntime |
| , NABoolean persistentProcess |
| ) : |
| breakEnabled_(breakEnabled), |
| heap_(heap), |
| eventConsumedAddr_(eventConsumed), |
| completedMessages_(heap), |
| envvars_(NULL), |
| envvarsLen_(0), |
| heapFull_(FALSE), |
| safetyBuffer_(NULL), |
| stopAfter_(0), |
| inactiveTimeout_(0), |
| inactiveTimestamp_(0), |
| espPrivStackSize_(64 * 1024), // Neo default priv stack size |
| espFreeMemTimeout_(0), // initial value, will be overwritten by fixup message. |
| useGuaIpcAtRuntime_(useGuaIpcAtRuntime), |
| serverType_(serverType), |
| guaMaxMsgIOSize_(IOSIZEMAX), |
| maxCCNowaitDepthLow_(InitialNowaitRequestsPerEsp), |
| maxCCNowaitDepthHigh_(HighLoadNowaitRequestsPerEsp), |
| maxPerProcessMQCs_(XMAX_SETTABLE_SENDLIMIT_H), // Seaquest "H-Series"/Seaquest limit |
| retriedMessageCount_(0), |
| cliGlobals_(NULL), |
| numOpensInProgress_(0) |
| , persistentProcess_(persistentProcess) |
| , corruptDownloadMsg_(false) |
| , logReleaseEsp_(false) |
| , logEspIdleTimeout_(false) |
| , logEspGotCloseMsg_(false) |
| , logTimeIpcConnectionState_(false) |
| , seamonsterEnabled_(false) |
| { |
| if (heap_ == NULL) |
| heap_ = new DefaultIpcHeap; // here it's ok to use global operator new |
| |
| allConnections_ = new(heap_) IpcAllConnections(heap_, |
| (serverType == IPC_SQLESP_SERVER |
| || serverType == IPC_SQLSSCP_SERVER |
| || serverType == IPC_SQLSSMP_SERVER)); |
| controlConnection_ = NULL; |
| for (Lng32 i = 0; i < 4; i++) |
| currentExRtFragTable_[i] = NULL; // for integrity checking |
| |
| idleTimestamp_ = NA_JulianTimestamp(); |
| #ifdef USE_SB_NEW_RI |
| const char *maxenvvar = getenv("IPC_IOSIZEMAX"); |
| if (maxenvvar) |
| guaMaxMsgIOSize_ = MAXOF(3000, MINOF(atoi(maxenvvar), 1048576)); |
| #endif //USE_SB_NEW_RI |
| |
| maxPollingInterval_ = 300; |
| const char *envvar; |
| envvar = getenv("IPC_MAX_POLLING_INTERVAL"); |
| if (envvar) |
| maxPollingInterval_ = atoi(envvar); |
| |
| persistentOpenAssigned_ = 0; |
| char *perOpensEnvvar = getenv("ESP_PERSISTENT_OPENS"); |
| Int32 perOpensEnvvarVal; |
| if (perOpensEnvvar != NULL ) |
| perOpensEnvvarVal = atoi(perOpensEnvvar); |
| else |
| perOpensEnvvarVal = 0; |
| if (perOpensEnvvarVal < 1) |
| { |
| persistentOpens_ = FALSE; |
| persistentOpenEntries_ = 64; |
| } |
| else |
| { |
| persistentOpens_ = TRUE; |
| if (perOpensEnvvarVal < 3) |
| persistentOpenEntries_ = 64; |
| else if (perOpensEnvvarVal < 256) |
| persistentOpenEntries_ = perOpensEnvvarVal; |
| else |
| persistentOpenEntries_ = 256; |
| } |
| persistentOpenArray_ = (PersistentOpenEntry (*) [1])heap_->allocateMemory(sizeof(PersistentOpenEntry) * persistentOpenEntries_); |
| for (Int32 i = 0; i < persistentOpenEntries_; i++) |
| { |
| (*persistentOpenArray_)[i].persistentOpenExists_ = FALSE; |
| } |
| |
| char *masterFastCompletion = getenv("MASTER_FAST_COMPLETION"); |
| if (masterFastCompletion != NULL && *masterFastCompletion == '0') |
| masterFastCompletion_ = FALSE; |
| else |
| masterFastCompletion_ = TRUE; |
| char *nowaitDepthEnvvar = getenv("ESP_NOWAIT_DEPTH"); |
| if (nowaitDepthEnvvar != NULL) |
| maxCCNowaitDepthLow_ = maxCCNowaitDepthHigh_ = atoi(nowaitDepthEnvvar); |
| XCONTROLMESSAGESYSTEM(XCTLMSGSYS_SETSENDLIMIT, XMAX_SETTABLE_SENDLIMIT_H); |
| |
| if (getenv("ESP_CORRUPT_MESSAGE_TEST")) |
| corruptDownloadMsg_ = true; |
| |
| const char *lre = getenv("LOG_ESP_RELEASE"); |
| if (lre && *lre == '1') |
| logReleaseEsp_ = true; |
| |
| const char *liet = getenv("LOG_IDLE_ESP_TIMEOUT"); |
| if (liet && *liet == '1') |
| logEspIdleTimeout_ = true; |
| |
| const char *legcm = getenv("LOG_ESP_GOT_CLOSE_MSG"); |
| if (legcm && *legcm == '1') |
| logEspGotCloseMsg_ = true; |
| |
| const char *etis = getenv("ESP_TIME_IPCCONNECTION_STATES"); |
| if (etis && *etis == '1') |
| logTimeIpcConnectionState_ = true; |
| |
| const char *smEnv = getenv("SQ_SEAMONSTER"); |
| if (smEnv && *smEnv == '1') |
| seamonsterEnabled_ = true; |
| |
| char *espAssignByLevel = getenv("ESP_ASSIGN_BY_LEVEL"); |
| if (espAssignByLevel == NULL) |
| espAssignByLevel_ = '0'; |
| else |
| { |
| switch (*espAssignByLevel) |
| { |
| case '1': |
| espAssignByLevel_ = '1'; |
| break; |
| default: |
| espAssignByLevel_ = '0'; |
| } |
| } |
| |
| memset(myProcessName_, 0, sizeof(myProcessName_)); |
| |
| closeTraceArray_ = (CloseTraceEntry (*) [closeTraceEntries])heap_->allocateMemory(sizeof(CloseTraceEntry) * closeTraceEntries); |
| for (Int32 i = 0; i < closeTraceEntries; i++) |
| { |
| (*closeTraceArray_)[i].count_ = 0; |
| (*closeTraceArray_)[i].line_ = 0; |
| (*closeTraceArray_)[i].clientFileNumber_ = 0; |
| (*closeTraceArray_)[i].cpu_ = 0; |
| (*closeTraceArray_)[i].pin_ = 0; |
| (*closeTraceArray_)[i].seqNum_ = -1; |
| } |
| closeTraceIndex_ = closeTraceEntries - 1; |
| bawaitioxTraceArray_ = (BawaitioxTraceEntry (*) [bawaitioxTraceEntries])heap_->allocateMemory(sizeof(BawaitioxTraceEntry) * bawaitioxTraceEntries); |
| for (Int32 i = 0; i < bawaitioxTraceEntries; i++) |
| { |
| (*bawaitioxTraceArray_)[i].count_ = 0; |
| (*bawaitioxTraceArray_)[i].recursionCount_ = 0; |
| (*bawaitioxTraceArray_)[i].firstConnectionIndex_ = 0; |
| (*bawaitioxTraceArray_)[i].firstConnection_ = NULL; |
| char *ipcAwaitiox = (char *)&((*bawaitioxTraceArray_)[i].ipcAwaitiox_); |
| memset((char *)&((*bawaitioxTraceArray_)[i].ipcAwaitiox_), 0, sizeof(IpcAwaitiox)); |
| } |
| bawaitioxTraceIndex_ = bawaitioxTraceEntries - 1; |
| |
| // Ipc data message trace area initialization |
| maxIpcMsgTraceIndex_ = NUM_IPC_MSG_TRACE_ENTRIES; |
| const char *ipcMsgEnv = getenv("NUM_EXE_IPC_MSG_TRACE_ENTRIES"); |
| if (ipcMsgEnv != NULL) |
| { |
| Int32 nums = atoi(ipcMsgEnv); |
| if (nums >= 0 && nums < MAX_IPC_MSG_TRACE_ENTRIES) |
| maxIpcMsgTraceIndex_ = nums; //ignore any other value |
| } |
| ipcMsgTraceArea_ = new (heap_) IpcMsgTrace[maxIpcMsgTraceIndex_]; |
| memset(ipcMsgTraceArea_, 0, sizeof(IpcMsgTrace) * maxIpcMsgTraceIndex_); |
| lastIpcMsgTraceIndex_ = maxIpcMsgTraceIndex_; |
| ipcMsgTraceRef_ = NULL; |
| } |
| |
| void IpcEnvironment::closeTrace(unsigned short line, |
| short clientFileNumber, |
| Int32 cpu, |
| Int32 pin, |
| SB_Int64_Type seqNum) |
| { |
| unsigned short i = closeTraceIndex_ == closeTraceEntries - 1 ? 0 : closeTraceIndex_ + 1; |
| (*closeTraceArray_)[i].count_ = (*closeTraceArray_)[closeTraceIndex_].count_ + 1; |
| (*closeTraceArray_)[i].line_ = line; |
| (*closeTraceArray_)[i].clientFileNumber_ = clientFileNumber; |
| (*closeTraceArray_)[i].cpu_ = cpu; |
| (*closeTraceArray_)[i].pin_ = pin; |
| (*closeTraceArray_)[i].seqNum_ = seqNum; |
| closeTraceIndex_ = i; |
| } |
| void IpcEnvironment::bawaitioxTrace(IpcSetOfConnections *ipcSetOfConnections, |
| ULng32 recursionCount, |
| CollIndex firstConnectionIndex, |
| IpcConnection *firstConnection, |
| IpcAwaitiox *ipcAwaitiox) |
| { |
| unsigned short i = bawaitioxTraceIndex_ == bawaitioxTraceEntries - 1 ? 0 : bawaitioxTraceIndex_ + 1; |
| (*bawaitioxTraceArray_)[i].count_ = (*bawaitioxTraceArray_)[bawaitioxTraceIndex_].count_ + 1; |
| (*bawaitioxTraceArray_)[i].recursionCount_ = recursionCount; |
| (*bawaitioxTraceArray_)[i].ipcSetOfConnections_ = ipcSetOfConnections; |
| (*bawaitioxTraceArray_)[i].firstConnectionIndex_ = firstConnectionIndex; |
| (*bawaitioxTraceArray_)[i].firstConnection_ = firstConnection; |
| bawaitioxTraceIndex_ = i; |
| } |
| |
| IpcEnvironment::~IpcEnvironment() |
| { |
| if (ipcMsgTraceRef_) |
| { |
| ExeTraceInfo *ti = cliGlobals_->getExeTraceInfo(); |
| if (ti) |
| { |
| ti->removeTrace(ipcMsgTraceRef_); |
| } |
| } |
| NADELETEBASIC(ipcMsgTraceArea_, heap_); |
| |
| delete allConnections_; |
| delete controlConnection_; |
| releaseSafetyBuffer(); |
| } |
| |
| void IpcEnvironment::stopIpcEnvironment() |
| { |
| |
| NAExit(0); |
| } |
| |
| void IpcEnvironment::setIdleTimestamp() |
| { |
| idleTimestamp_ = NA_JulianTimestamp(); |
| } |
| |
| void IpcEnvironment::setInactiveTimestamp() |
| { |
| inactiveTimestamp_ = NA_JulianTimestamp(); |
| } |
| |
| void IpcEnvironment::deleteCompletedMessages() |
| { |
| while (completedMessages_.entries()) |
| { |
| IpcMessageStreamBase * mm = completedMessages_[0]; |
| completedMessages_.remove(mm); |
| delete mm; |
| } |
| |
| // Clean up any SeaMonster send buffers that had been queued but are |
| // now completed |
| void *buf = NULL; |
| while (ExSM_RemoveCompletedSendBuffer(buf)) |
| { |
| IpcMessageBuffer *msgBuf = (IpcMessageBuffer *) buf; |
| assert(msgBuf); |
| msgBuf->decrRefCount(); |
| } |
| } |
| |
| void IpcEnvironment::setControlConnection(IpcControlConnection *cc) |
| { |
| if (controlConnection_ == NULL) |
| controlConnection_ = cc; |
| else |
| { |
| controlConnection_->getConnection()->dumpAndStopOtherEnd(true, false); |
| if (cc->getConnection()->getOtherEnd() == |
| controlConnection_->getConnection()->getOtherEnd()) |
| ; // Already have a core-file. |
| else |
| cc->getConnection()->dumpAndStopOtherEnd(true, false); |
| assert(controlConnection_ == NULL); |
| } |
| } |
| |
| IpcProcessId IpcEnvironment::getMyOwnProcessId(IpcNetworkDomain dom) |
| { |
| if (dom == IPC_DOM_INVALID) |
| { |
| // if not specified, the default domains are the "native" domains |
| dom = IPC_DOM_INTERNET; |
| } |
| |
| if (dom == IPC_DOM_INTERNET) |
| { |
| SockIPAddress sockIpAddr; |
| SockPortNumber portNo; |
| |
| // get the port number (listner port for server, some number for master) |
| if (controlConnection_) |
| { |
| // can't have an internet proc id when reading from $RECEIVE |
| assert(controlConnection_->castToSockControlConnection()); |
| |
| // get the listner port number from the control connection |
| portNo = controlConnection_->castToSockControlConnection()-> |
| getListnerPortNum(); |
| } |
| else |
| { |
| portNo = 0; |
| |
| } |
| |
| // get the IP address of the local node |
| sockIpAddr.set(); |
| |
| // make a process id from the IP address and the port number |
| return IpcProcessId(sockIpAddr,portNo); |
| } |
| else if (dom == IPC_DOM_GUA_PHANDLE) |
| { |
| // for Guardian, just get the phandle from the operating system |
| return IpcProcessId(MyGuaProcessHandle()); |
| } |
| else |
| { |
| ABORT("Invalid domain in IpcEnvironment::getMyOwnProcessId()"); |
| } |
| // make the compiler happy |
| return IpcProcessId(); |
| } |
| |
| IpcPriority IpcEnvironment::getMyProcessPriority() |
| { |
| IpcPriority priority; |
| priority = -1; |
| |
| return priority; |
| } |
| |
| void IpcEnvironment::setEnvVars(char ** envvars) |
| { |
| envvars_ = envvars; |
| } |
| |
| void IpcEnvironment::setEnvVarsLen(Lng32 envvarsLen) |
| { |
| envvarsLen_ = envvarsLen; |
| } |
| |
| void IpcEnvironment::releaseSafetyBuffer() |
| { |
| // Don't mmap and munmap a 512K block on Linux for every download |
| // and fixup. 256K is not a lot to just leave allocated. |
| } |
| |
| void IpcEnvironment::setHeapFullFlag(NABoolean b) |
| { |
| heapFull_ = b; |
| |
| if (heapFull_) |
| { |
| releaseSafetyBuffer(); |
| } |
| else |
| { |
| // Allocate a safety buffer if we don't have one already. The size |
| // chosen here (256K) is arbitrary. It is expected to be "enough" |
| // space on the heap to complete some I/Os after the heap became |
| // full. The main scanario we know of when the heap becomes full |
| // is during broadcast of large ESP fragments. |
| if (safetyBuffer_ == NULL) |
| safetyBuffer_ = (char *) heap_->allocateMemory(256 * 1024); |
| } |
| } |
| |
| void IpcEnvironment::notifyNoOpens() |
| { |
| if (serverType_ != IPC_SQLSSCP_SERVER && |
| serverType_ != IPC_SQLSSMP_SERVER) |
| stopIpcEnvironment(); |
| } |
| |
| void IpcEnvironment::logRetriedMessages() |
| { |
| if (retriedMessageCount_ > 0) |
| { |
| } |
| } |
| |
| #ifdef IPC_INTEGRITY_CHECKING |
| |
| // methods that perform integrity checking on Ipc-related data structures |
| |
| void IpcEnvironment::setCurrentExRtFragTable(ExRtFragTable *ft) |
| { |
| NABoolean alreadyIn = FALSE; |
| Lng32 i; |
| Lng32 firstOpen = -1; |
| |
| for (i = 0; i < 4; i++) |
| { |
| if (ft == currentExRtFragTable_[i]) |
| alreadyIn = TRUE; |
| else if ((currentExRtFragTable_[i] == 0) && (firstOpen < 0)) |
| firstOpen = i; |
| } |
| |
| if ((!alreadyIn) && (firstOpen >= 0)) |
| { |
| cerr << "Adding ExRtFragTable integrity check pointer in IpcEnvironment to " |
| << (void *)ft << "." << endl; |
| // to give debugger a chance to come up |
| /*for (long i = 0; i < 10000000; i++) |
| { |
| long j = i + 1; |
| while (j > 1) |
| { |
| if (j & 1) |
| j = 3 * j + 1; |
| else |
| j = j / 2; |
| } |
| } */ |
| currentExRtFragTable_[firstOpen] = ft; |
| } |
| } |
| |
| void IpcEnvironment::removeCurrentExRtFragTable(ExRtFragTable *ft) |
| { |
| Lng32 i; |
| |
| for (i = 0; i < 4; i++) |
| { |
| if (ft == currentExRtFragTable_[i]) |
| { |
| currentExRtFragTable_[i] = 0; |
| cerr << "Removing ExRtFragTable integrity check pointer in IpcEnvironment to " |
| << (void *)ft << "." << endl; |
| } |
| } |
| } |
| |
| void IpcEnvironment::setExRtFragTableIntegrityCheckPtr |
| (void (*fnptr) (ExRtFragTable *ft)) |
| { |
| integrityCheckExRtFragTablePtr_ = fnptr; |
| } |
| |
| |
| ExRtFragTable * IpcEnvironment::getCurrentExRtFragTable(Lng32 i) |
| { |
| if ((i >= 0) && (i < 4)) |
| return currentExRtFragTable_[i]; |
| |
| return 0; |
| } |
| |
| void IpcEnvironment::checkIntegrity(void) |
| { |
| // IpcEnvironment, being the top of the network of IPC-related data structures, |
| // is the place where we begin integrity checking |
| checkLocalIntegrity(); |
| } |
| |
| void IpcEnvironment::checkLocalIntegrity(void) |
| { |
| for (Lng32 i = 0; i < 4; i++) |
| { |
| if ((currentExRtFragTable_[i]) && (integrityCheckExRtFragTablePtr_)) |
| { |
| // this file doesn't know about the class ExRtFragTable, so we call |
| // a C-style function instead in file /executor/ex_frag_rt.cpp... |
| //currentExRtFragTable_[i]->checkLocalIntegrity(); |
| (*integrityCheckExRtFragTablePtr_)(currentExRtFragTable_[i]); |
| } |
| } |
| |
| allConnections_->checkLocalIntegrity(); // check IpcAllConnections |
| } |
| |
| #endif |
| |
| short IpcEnvironment::getNewPersistentOpenIndex() |
| { |
| if (persistentOpenAssigned_ < persistentOpenEntries_) |
| { |
| for (unsigned short i = 0; i < persistentOpenEntries_; i++) |
| { |
| if ((*persistentOpenArray_)[i].persistentOpenExists_ == FALSE) |
| return i; |
| } |
| } |
| return -1; |
| } |
| |
| void IpcEnvironment::setPersistentOpenInfo(short index, GuaProcessHandle *otherEnd, short fileNum) |
| { |
| memcpy((void *)&(*persistentOpenArray_)[index].persistentOpenPhandle_, (void *)otherEnd, sizeof(GuaProcessHandle)); |
| (*persistentOpenArray_)[index].persistentOpenFileNum_ = fileNum; |
| (*persistentOpenArray_)[index].persistentOpenExists_ = TRUE; |
| persistentOpenAssigned_ += 1; |
| } |
| |
| short IpcEnvironment::getPersistentOpenInfo(GuaProcessHandle *otherEnd, short *index) |
| { |
| Int32 guaRetCode; |
| for (short i = 0; i < persistentOpenEntries_; i++) |
| { |
| if ((*persistentOpenArray_)[i].persistentOpenExists_) |
| { |
| guaRetCode = XPROCESSHANDLE_COMPARE_((SB_Phandle_Type *)otherEnd, |
| (SB_Phandle_Type *)&(*persistentOpenArray_)[i].persistentOpenPhandle_); |
| if (guaRetCode == 2) // Phandles are the same |
| { |
| *index = i; |
| return (*persistentOpenArray_)[i].persistentOpenFileNum_; |
| } |
| } |
| } |
| // Matching phandle was not found |
| *index = -1; |
| return -1; |
| } |
| |
| void IpcEnvironment::resetPersistentOpen(short index) |
| { |
| (*persistentOpenArray_)[index].persistentOpenExists_ = FALSE; |
| persistentOpenAssigned_ -= 1; |
| } |
| |
| const char *IpcMsgTraceDesc = |
| "SQL Ipc Message exchanged between consumer and producer processes.\n Can use env NUM_EXE_IPC_MSG_TRACE_ENTRIES to config more or less entries"; |
| |
| void IpcEnvironment::registTraceInfo(ExeTraceInfo *ti) |
| { |
| if (cliGlobals_ && !ipcMsgTraceRef_) |
| { |
| // register IPC message trace and IPC connection trace |
| if (ti) |
| { |
| Int32 lineWidth = 66; |
| void *regdTrace; |
| Int32 ret = ti->addTrace("IpcMessages", this, maxIpcMsgTraceIndex_, 6, |
| this, getALine, |
| &lastIpcMsgTraceIndex_, |
| lineWidth, IpcMsgTraceDesc, ®dTrace); |
| if (ret == 0) |
| { |
| // trace info added successfully, now add entry fields |
| ti->addTraceField(regdTrace, "Connection ", 0, |
| ExeTrace::TR_POINTER32); |
| ti->addTraceField(regdTrace, "BufferAddr ", 1, ExeTrace::TR_POINTER32); |
| ti->addTraceField(regdTrace, "Length ", 2, ExeTrace::TR_INT32); |
| ti->addTraceField(regdTrace, "Type", 3, ExeTrace::TR_INT32); |
| ti->addTraceField(regdTrace, "Last ", 4, ExeTrace::TR_CHAR); |
| ti->addTraceField(regdTrace, "SeqNum", 5, ExeTrace::TR_INT32); |
| ipcMsgTraceRef_ = regdTrace; |
| } |
| |
| // IPC connection trace |
| allConnections_->registTraceInfo(this, ti); |
| } |
| } |
| } |
| |
| Int32 IpcEnvironment::printAnIpcEntry(Int32 lineno, char *buf) |
| { |
| if (lineno >= maxIpcMsgTraceIndex_) |
| return 0; |
| Int32 rv; |
| rv = sprintf(buf, "%.4d %8p %8p %8d %.4s %3d %10d\n", |
| lineno, |
| ipcMsgTraceArea_[lineno].conn_, |
| ipcMsgTraceArea_[lineno].bufAddr_, |
| ipcMsgTraceArea_[lineno].length_, |
| IpcMsgOperName[ipcMsgTraceArea_[lineno].sendOrReceive_], |
| ipcMsgTraceArea_[lineno].isLast_, |
| ipcMsgTraceArea_[lineno].seqNum_); |
| return rv; |
| } |
| |
| char const *IpcEnvironment::myProcessName() |
| { |
| if (myProcessName_[0] == '\0') |
| if (getCliGlobals()) |
| strcpy(myProcessName_, getCliGlobals()->myProcessNameString()); |
| else { |
| NAProcessHandle myPhandle; |
| myPhandle.getmine(); |
| myPhandle.decompose(); |
| strcpy(myProcessName_, myPhandle.getPhandleString()); |
| return myProcessName_; |
| } |
| return myProcessName_; |
| |
| } |
| |
| void IpcAllocateDiagsArea(ComDiagsArea *&diags, CollHeap *diagsHeap) |
| { |
| if ( NOT diags) |
| { |
| // diags does not point to an allocated diags area yet, allocate one |
| diags = ComDiagsArea::allocate(diagsHeap); |
| |
| // catch the case where we can't even allocate the diags area |
| if (NOT diags) |
| ABORT("unable to allocate diagnostics area for IPC error"); |
| } |
| } |
| |
| // ----------------------------------------------------------------------- |
| // Global operator new with the placement form where an IPC environment |
| // is specified |
| // ----------------------------------------------------------------------- |
| void * operator new(size_t size, IpcEnvironment *env) |
| { |
| return env->getHeap()->allocateMemory(size); |
| } |
| |
| void * operator new[](size_t size, IpcEnvironment *env) |
| { |
| return env->getHeap()->allocateMemory(size); |
| } |
| |
| char *getServerProcessName(IpcServerType serverType, const char *nodeName, short nodeNameLen, |
| short cpuNum, char *processName, short *envType) |
| { |
| const char *overridingDefineName = NULL; |
| char *processPrefixFromEnvvar = NULL; |
| const char *processPrefix = NULL; |
| char serverNodeName[MAX_SEGMENT_NAME_LEN+1]; |
| short len; |
| |
| switch (serverType) |
| { |
| case IPC_SQLSSCP_SERVER: |
| overridingDefineName = "=_MX_SSCP_PROCESS_PREFIX"; |
| break; |
| case IPC_SQLSSMP_SERVER: |
| overridingDefineName = "=_MX_SSMP_PROCESS_PREFIX"; |
| break; |
| case IPC_SQLQMS_SERVER: |
| overridingDefineName = "=_MX_QMS_PROCESS_PREFIX"; |
| break; |
| case IPC_SQLQMP_SERVER: |
| overridingDefineName = "=_MX_QMP_PROCESS_PREFIX"; |
| break; |
| case IPC_SQLQMM_SERVER: |
| overridingDefineName = "=_MX_QMM_PROCESS_PREFIX"; |
| break; |
| default: |
| return NULL; |
| } |
| |
| if (overridingDefineName) |
| { |
| processPrefixFromEnvvar = |
| getenv((overridingDefineName[0] == '=') |
| ? &overridingDefineName[1] |
| : overridingDefineName); |
| } |
| |
| if (processPrefixFromEnvvar != NULL) |
| { |
| if (processPrefixFromEnvvar[0] != '$' || str_len(processPrefixFromEnvvar) > 4) |
| return NULL; |
| processPrefix = processPrefixFromEnvvar; |
| } |
| if (processPrefix == NULL) |
| { |
| switch (serverType) |
| { |
| case IPC_SQLSSCP_SERVER: |
| processPrefix = SSCP_PROCESS_PREFIX; |
| break; |
| case IPC_SQLSSMP_SERVER: |
| processPrefix = SSMP_PROCESS_PREFIX; |
| break; |
| case IPC_SQLQMS_SERVER: |
| processPrefix = QMS_PROCESS_PREFIX; |
| break; |
| case IPC_SQLQMP_SERVER: |
| processPrefix = QMP_PROCESS_PREFIX; |
| break; |
| case IPC_SQLQMM_SERVER: |
| processPrefix = QMM_PROCESS_PREFIX; |
| break; |
| default: |
| return NULL; |
| } |
| } |
| str_sprintf(processName, "%s%d", processPrefix, cpuNum); |
| return processName; |
| } |
| |
| void IpcAwaitiox::DoAwaitiox(NABoolean ignoreLrec) |
| { |
| if (!completed_) |
| { |
| fileNum_ = ignoreLrec ? -2 : -1; // Reminder: change to -1 |
| bufAddr_ = 0; |
| count_ = retCode_ = lastError_ = 0; |
| tag_ = 0; |
| if (ignoreLrec) |
| condCode_ = BAWAITIOXTS(&fileNum_, &bufAddr_, &count_, &tag_, 0); |
| else |
| condCode_ = BAWAITIOX(&fileNum_, &bufAddr_, &count_, &tag_, 0); |
| if (fileNum_ == -2) |
| fileNum_ = -1; |
| completed_ = TRUE; |
| if (condCode_ != 0) // not successful completion |
| { |
| retCode_ = BFILE_GETINFO_(fileNum_, &lastError_); |
| if (retCode_ == 0 && lastError_ == 40) |
| { |
| fileNum_ = -1; |
| completed_ = FALSE; |
| } |
| } |
| retryCount_ = 0; |
| } |
| else |
| { |
| retryCount_ += 1; |
| bool loop = false; |
| char *envvarPtr; |
| if (retryCount_ >= 10) |
| { |
| envvarPtr = getenv("IPC_LOOP_ON_UNEXPECTED_COMPLETION"); |
| if (envvarPtr && atoi(envvarPtr) == 1) |
| loop = true; |
| } |
| while (retryCount_ >= 10 && loop) |
| { |
| sleep(10); |
| } |
| assert(retryCount_ < 10); |
| } |
| } |
| |
| Int32 IpcAwaitiox::ActOnAwaitiox(void **bufAddr, Int32 *count, SB_Tag_Type *tag) |
| { |
| *bufAddr = bufAddr_; |
| *count = count_; |
| *tag = tag_; |
| completed_ = FALSE; |
| return condCode_; |
| } |
| |
| // These operator delete functions will be called if initialization throws an |
| // exception. They remove a compiler warning with the .NET 2003 compiler. |
| void IpcMessageBuffer::operator delete(void *p, |
| IpcMessageObjSize bufferLength, |
| CollHeap *heap, NABoolean bIgnore) |
| { |
| if (heap) |
| heap->deallocateMemory(p); |
| } |
| |
| void IpcAllConnections::printConnTraceLine(char *buffer, int *rsp_len, IpcConnection *conn) |
| { |
| Int32 lineLen; |
| Int32 cpu, node, pin; |
| SB_Int64_Type seqNum = -1; |
| IpcNetworkDomain domain; |
| cpu = pin; |
| domain = conn->getOtherEnd().getDomain(); |
| |
| if (domain == IPC_DOM_GUA_PHANDLE) |
| { |
| GuaProcessHandle *otherEnd = (GuaProcessHandle *)&(conn->getOtherEnd().getPhandle().phandle_); |
| if (otherEnd) |
| otherEnd->decompose(cpu, pin, node |
| , seqNum |
| ); |
| } |
| |
| if (!memcmp(conn->getEyeCatcher(), "STBL", 4)) |
| { |
| } |
| else if (conn->castToSMConnection()) |
| { |
| cpu = ((SMConnection *)conn)->getSMTarget().node; |
| pin = ((SMConnection *)conn)->getSMTarget().pid; |
| sm_id_t smQueryId = ((SMConnection *)conn)->getSMTarget().id; |
| Int32 smTag = ((SMConnection *)conn)->getSMTarget().tag; |
| *rsp_len = sprintf(buffer + *rsp_len, |
| "%.4s %s %.3d,%.8d,%.8" PRId64 |
| ",%.8" PRId64 ",%.8d\n", |
| conn->getEyeCatcher(), |
| IpcConnStateName[conn->getState()], |
| cpu, pin, seqNum, smQueryId, smTag); |
| } |
| else |
| { |
| *rsp_len = sprintf(buffer, "%.4s %s %.3d,%.8d,%.8" PRId64 "\n", |
| conn->getEyeCatcher(), |
| IpcConnStateName[conn->getState()], |
| cpu, pin, seqNum); |
| |
| } |
| } |
| |
| void IpcAllConnections::infoAllConnections(char *buffer, int max_len, int *rsp_len) |
| { |
| CollIndex i, usedLength = getUsedLength(); |
| IpcConnection *conn; |
| enum { MAX_LINE = 250 }; // Currently printConnTraceLine sprintf's about 80 characters |
| char local_buffer[MAX_LINE]; |
| int line_len = 0; |
| for (i = 0; i < usedLength; i++) |
| { |
| if (getUsage(i) != UNUSED_COLL_ENTRY) |
| { |
| conn = usedEntry(i); |
| local_buffer[0] = '\0'; |
| line_len = 0; |
| printConnTraceLine(local_buffer, &line_len, conn); |
| if ( (*rsp_len + line_len + 1) <= max_len) |
| { |
| strncpy( (buffer + *rsp_len), local_buffer, (size_t)line_len ); |
| *rsp_len += line_len; |
| } |
| else |
| { |
| sprintf( (buffer + *rsp_len - 16 ),"\nOUT OF BUFFER!\n"); |
| return; |
| } |
| } |
| } |
| *(buffer + *rsp_len) = '\n'; |
| *rsp_len += 1; |
| } |
| |
| void IpcSetOfConnections::infoPendingConnections(char *buffer, int max_len, int *rsp_len) |
| { |
| IpcAllConnections *allConns; |
| IpcConnection *conn; |
| enum { MAX_LINE = 250 }; // Currently printConnTraceLine sprintf's about 80 characters |
| char local_buffer[MAX_LINE]; |
| int line_len = 0; |
| CollIndex i, firstConnIndex = 0; |
| if (!setToNext(firstConnIndex)) |
| return; |
| else |
| { |
| conn = element(firstConnIndex); |
| allConns = conn->getEnvironment()->getAllConnections(); |
| local_buffer[0] = '\0'; |
| line_len = 0; |
| allConns->printConnTraceLine(local_buffer, &line_len, conn); |
| if ( (*rsp_len + line_len + 1) <= max_len ) |
| { |
| strncpy( (buffer + *rsp_len), local_buffer, line_len); |
| *rsp_len += line_len; |
| } |
| else |
| { |
| sprintf( (buffer + *rsp_len - 16), "\nOUT OF BUFFER!\n"); |
| return; |
| } |
| } |
| for (i = firstConnIndex + 1; setToNext(i); i++) |
| { |
| conn = element(i); |
| local_buffer[0] = '\0'; |
| line_len = 0; |
| allConns->printConnTraceLine(local_buffer, &line_len, conn); |
| if ( (*rsp_len + line_len + 1) <= max_len ) |
| { |
| strncpy( (buffer + *rsp_len), local_buffer, line_len); |
| *rsp_len += line_len; |
| } |
| else |
| { |
| sprintf( (buffer + *rsp_len - 16), "\nOUT OF BUFFER!\n"); |
| return; |
| } |
| } |
| *(buffer + *rsp_len) = '\n'; |
| *rsp_len += 1; |
| } |
| |
| void operator delete(void *p, IpcEnvironment *env) |
| { |
| env->getHeap()->deallocateMemory(p); |
| } |