| // ********************************************************************** |
| // @@@ START COPYRIGHT @@@ |
| // |
| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| // |
| // @@@ END COPYRIGHT @@@ |
| // ********************************************************************** |
| |
| #include "QmmQmm.h" |
| #include "QueryRewriteServer.h" |
| #include "Ipc.h" |
| #include "ComCextdecs.h" |
| #include "ComRtUtils.h" |
| #include "PortProcessCalls.h" |
| |
| #ifdef NA_NSK |
| extern "C" |
| { |
| #include "cextdecs.h(NODENUMBER_TO_NODENAME_, \ |
| REMOTEPROCESSORSTATUS, \ |
| PROCESSHANDLE_DECOMPOSE_, \ |
| PROCESSSTRING_SCAN_, \ |
| PROCESS_SPAWN_, \ |
| FILENAME_RESOLVE_, \ |
| FILENAME_TO_PATHNAME_)" |
| } |
| |
| #else |
| #include "cextdecs/cextdecs.h" |
| #endif |
| |
| using namespace QR; |
| |
| IpcServerClass* QmpStub::qmpServerClass_ = NULL; |
| IpcServerClass* QmsStub::qmsServerClass_ = NULL; |
| |
| void QmmGuaReceiveControlConnection::actOnSystemMessage |
| (short messageNum, |
| IpcMessageBufferPtr sysMsg, |
| IpcMessageObjSize sysMsgLen, |
| short clientFileNumber, |
| const GuaProcessHandle& clientPhandle, |
| GuaConnectionToClient* connection) |
| { |
| // @ZXros -- these declns go with ifdef'ed code below |
| #ifdef NA_NSK |
| short result; |
| short cpu; |
| short pin; |
| Int32 segmentNumber; |
| #endif |
| switch (messageNum) |
| { |
| case ZSYS_VAL_SMSG_OPEN: |
| { |
| NAString qmmName("qmm"); |
| QmmMessageStream* msgStream = |
| new QmmMessageStream(const_cast<IpcEnvironment*> |
| (qmm_->getEnvironment()), |
| qmmName, qmm_); |
| msgStream->addRecipient(connection); |
| //connection->receive(msgStream); |
| msgStream->receive(FALSE); |
| } |
| break; |
| |
| case ZSYS_VAL_SMSG_CLOSE: |
| case ZSYS_VAL_SMSG_PROCDEATH: |
| qmm_->handleClientExit((short *)&(clientPhandle.phandle_), messageNum); |
| if (connection) |
| connection->setFatalError(NULL); //@ZX |
| break; |
| |
| case ZSYS_VAL_SMSG_CPUDOWN: |
| { |
| zsys_ddl_smsg_cpudown_def* msg = (zsys_ddl_smsg_cpudown_def*)sysMsg; |
| QRLogger::log(CAT_QR_IPC, LL_INFO, |
| "Cpu %d on qmm's segment has gone down.", msg->z_cpunumber); |
| qmm_->getQmsStub(1, msg->z_cpunumber) |
| ->disable(QmsStub::CPU_NOT_REACHABLE); |
| } |
| break; |
| |
| case ZSYS_VAL_SMSG_CPUUP: |
| { |
| zsys_ddl_smsg_cpuup_def* msg = (zsys_ddl_smsg_cpuup_def*)sysMsg; |
| QRLogger::log(CAT_QR_IPC, LL_INFO, |
| "Cpu %d on qmm's segment has come back up.", msg->z_cpunumber); |
| //qmm_->getQmsStub(1, msg->z_cpunumber)->start(); |
| qmm_->getQmsStub(1, msg->z_cpunumber)->scheduleRestart(); |
| } |
| break; |
| |
| case ZSYS_VAL_SMSG_REMOTECPUDOWN: |
| { |
| zsys_ddl_smsg_remotecpudown_def* msg = |
| (zsys_ddl_smsg_remotecpudown_def*)sysMsg; |
| QRLogger::log(CAT_QR_IPC, LL_INFO, |
| "Cpu %d on segment %d has gone down.", msg->z_cpunumber, msg->z_nodenumber); |
| |
| qmm_->getQmsStub(msg->z_nodenumber, msg->z_cpunumber) |
| ->disable(QmsStub::CPU_NOT_REACHABLE); |
| } |
| break; |
| |
| case ZSYS_VAL_SMSG_REMOTECPUUP: |
| { |
| zsys_ddl_smsg_remotecpuup_def* msg = |
| (zsys_ddl_smsg_remotecpuup_def*)sysMsg; |
| QRLogger::log(CAT_QR_IPC, LL_INFO, |
| "Cpu %d on segment %d has come back up.", msg->z_cpunumber, msg->z_nodenumber); |
| //qmm_->getQmsStub(msg->z_nodenumber, msg->z_cpunumber)->start(); |
| qmm_->getQmsStub(msg->z_nodenumber, msg->z_cpunumber)->scheduleRestart(); |
| } |
| break; |
| |
| case ZSYS_VAL_SMSG_NODEDOWN: |
| { |
| zsys_ddl_smsg_nodedown_def* msg = (zsys_ddl_smsg_nodedown_def*)sysMsg; |
| QRLogger::log(CAT_QR_IPC, LL_INFO, |
| "Segment %d has gone down.", msg->z_nodenumber); |
| for (short cpu = 0; cpu < CPUS_PER_SEGMENT; cpu++) |
| qmm_->getQmsStub(msg->z_nodenumber, cpu) |
| ->disable(QmsStub::SEGMENT_NOT_REACHABLE); |
| } |
| break; |
| |
| case ZSYS_VAL_SMSG_NODEUP: |
| { |
| zsys_ddl_smsg_nodeup_def* msg = (zsys_ddl_smsg_nodeup_def*)sysMsg; |
| QRLogger::log(CAT_QR_IPC, LL_INFO, |
| "Segment %d has come back up.", msg->z_nodenumber); |
| for (short cpu = 0; cpu < CPUS_PER_SEGMENT; cpu++) |
| //qmm_->getQmsStub(msg->z_nodenumber, cpu)->start(); |
| qmm_->getQmsStub(msg->z_nodenumber, cpu)->scheduleRestart(); |
| } |
| break; |
| |
| default: |
| break; |
| } |
| |
| // See if we have any processes due to be restarted. |
| QRProcessStub::checkRestarts(); |
| } |
| |
| // Static singleton instance of the class. |
| Qmm* Qmm::instance_ = NULL; |
| |
| Qmm::Qmm(CollHeap* heap) |
| : qmsPool_(NULL), |
| qmsCount_(0), |
| qmp_(NULL), |
| heap_(heap) |
| { |
| // Establish the IPC heap and cache the IpcEnvironment ptr from |
| // MvQueryRewriteServer. |
| MvQueryRewriteServer::setHeap(heap); |
| ipcEnv_ = MvQueryRewriteServer::getIpcEnv(); |
| |
| // Set static IpcServerClass for QmsStub to use when instantiating qms |
| // processes. |
| QmsStub::setQmsServerClass(new(heap_) IpcServerClass(ipcEnv_, |
| IPC_SQLQMS_SERVER, |
| IPC_SPAWN_OSS_PROCESS)); |
| } |
| |
| void Qmm::handleClientExit(const short* phandle, short messageNum) |
| { |
| short cpu, pin; |
| Int32 segmentNumber; |
| short result = 0; |
| NABoolean qmpDied = FALSE; |
| |
| Int32 lc_pin; |
| Int32 lc_cpu; |
| Int32 lc_seg; |
| |
| result = XPROCESSHANDLE_DECOMPOSE_ ((SB_Phandle_Type *)phandle, &lc_cpu, &lc_pin, &segmentNumber); |
| cpu = lc_cpu; |
| pin = lc_pin; |
| |
| QRLogger::log(CAT_QR_IPC, LL_INFO, |
| "dead process is on cpu <%d> and pin <%d>", cpu, pin); |
| |
| |
| if (qmp_ && *qmp_ == *(SB_Phandle_Type *)phandle) |
| { |
| qmpDied = TRUE; |
| } |
| |
| if (!qmpDied) |
| { |
| // When a qms dies we are delivered a null connection pointer and handle. |
| // When this happens, we check the status of our qms processes until we find |
| // one that's no longer alive. If >1 close messages arrive close together, |
| // we may not get the same one referenced by the message, but that one will |
| // be restarted in response to a subsequent message. |
| NABoolean found = FALSE; |
| QmsStub* qmsStub; |
| for (Int32 i=0; i<qmsCount_ && !found; i++) |
| { |
| qmsStub = qmsPool_[i]; |
| NABoolean processDoesNotExist = FALSE; |
| |
| char procName[200]; |
| short result = -1; |
| |
| SB_Phandle_Type procHandle = qmsStub->getProcessHandle(); |
| NAProcessHandle processHandle(&procHandle); |
| Int32 guaRetcode = processHandle.decompose(); |
| if (!guaRetcode) |
| { |
| memset(procName, 0, sizeof(procName)); |
| memcpy(procName, processHandle.getPhandleString(), processHandle.getPhandleStringLen()); |
| result = msg_mon_get_process_info (procName, &lc_cpu, &lc_pin); |
| } |
| |
| if (result || (lc_cpu < 0) || (lc_pin < 0)) |
| processDoesNotExist = TRUE; |
| |
| if (qmsStub->getStatus() == QmsStub::RUNNING && processDoesNotExist) |
| { |
| found = TRUE; |
| QRLogger::log(CAT_QR_IPC, LL_FATAL, |
| "*** QMS on cpu %d of segment %d has died, " |
| "attempting to restart...", |
| qmsStub->getCpuNumber(), qmsStub->getSegmentNumber()); |
| qmsStub->disable(QmsStub::NOT_RUNNING); |
| qmsStub->scheduleRestart(); |
| } |
| } |
| |
| if (!found) |
| { |
| if (messageNum == ZSYS_VAL_SMSG_CLOSE) |
| QRLogger::log(CAT_QR_IPC, LL_INFO, |
| "*** Unknown client process has ended. ***"); |
| else if (messageNum == ZSYS_VAL_SMSG_PROCDEATH) |
| QRLogger::log(CAT_QR_IPC, LL_INFO, |
| "*** Unknown child process has died. ***"); |
| else |
| QRLogger::log(CAT_QR_IPC, LL_INFO, |
| "*** Unknown process has terminated, message=%d. ***", messageNum); |
| } |
| |
| return; |
| } |
| |
| if (messageNum == ZSYS_VAL_SMSG_CLOSE) |
| QRLogger::log(CAT_QR_IPC, LL_INFO, |
| "*** CLOSE message received for process %d,%d on segment %d ***", |
| cpu, pin, segmentNumber); |
| else if (messageNum == ZSYS_VAL_SMSG_PROCDEATH) |
| QRLogger::log(CAT_QR_IPC, LL_INFO, |
| "*** PROCDEATH message received for process %d,%d on segment %d ***", |
| cpu, pin, segmentNumber); |
| else |
| QRLogger::log(CAT_QR_IPC, LL_INFO, |
| "*** Process %d,%d on segment %d has terminated, message=%d. ***", |
| cpu, pin, segmentNumber, messageNum); |
| |
| if (qmp_ && *qmp_ == *(SB_Phandle_Type *)phandle) |
| { |
| QRLogger::log(CAT_QR_IPC, LL_INFO, |
| "*** QMP has died, will attempt to restart..."); |
| qmp_->scheduleRestart(); |
| } |
| else if (*getQmsStub(segmentNumber, cpu) == *(SB_Phandle_Type *)phandle) |
| { |
| // This probably never gets executed; process handle usually delivered |
| // for a qms termination is null, and is handled in loop above that looks |
| // for one that is no longer running. |
| QRLogger::log(CAT_QR_IPC, LL_ERROR, |
| "*** QMS on cpu %d of segment %d has died, " |
| "will attempt to restart...", |
| cpu, segmentNumber); |
| getQmsStub(segmentNumber, cpu)->disable(QmsStub::NOT_RUNNING); |
| getQmsStub(segmentNumber, cpu)->scheduleRestart(); |
| } |
| else |
| QRLogger::log(CAT_QR_IPC, LL_WARN, |
| "Could not identify terminated process as either QMP or QMS"); |
| } |
| |
| // Only used for Windows. |
| void Qmm::allocateQms() |
| { |
| char segmentName[SEGMENT_NAME_LEN + 1]; |
| short segmentNameLen = 0; |
| short result = NODENUMBER_TO_NODENAME_(-1, segmentName, SEGMENT_NAME_LEN, |
| &segmentNameLen); |
| segmentName[segmentNameLen] = '\0'; |
| QRLogger::log(CAT_QR_IPC, LL_DEBUG, |
| "Result for caller's segment is %d, segment name is %s", |
| result, segmentName); |
| qmsCount_ = 1; |
| qmsPool_ = new(heap_) QmsStub*[qmsCount_]; |
| qmsPool_[0] = new(heap_) QmsStub(1, segmentName, 0, 0, TRUE, TRUE, |
| /*qmsMsgStream_,*/ ipcEnv_, heap_); |
| } |
| |
| void Qmm::allocateQmsPool() |
| { |
| short qmsInx = 0; |
| Int32 maxCpuNum = 0; |
| Int32 lv_ret = 0; |
| Int32 nodeCount = 0; |
| Int32 nodeMax = 0; |
| MS_Mon_Node_Info_Entry_Type *nodeInfo = NULL; |
| |
| // Get the number of nodes to know how much info space to allocate |
| lv_ret = msg_mon_get_node_info(&nodeCount, 0, NULL); |
| if ((lv_ret == 0) && (nodeCount > 0)) |
| { |
| // Allocate the space for node info entries |
| nodeInfo = (MS_Mon_Node_Info_Entry_Type *) new(heap_) |
| char[nodeCount * sizeof(MS_Mon_Node_Info_Entry_Type)]; |
| if (nodeInfo) |
| { |
| // Get the node info |
| memset(nodeInfo, 0, sizeof(nodeInfo)); |
| nodeMax = nodeCount; |
| lv_ret = msg_mon_get_node_info(&nodeCount, nodeMax, nodeInfo); |
| if (lv_ret == 0) |
| { |
| // Find number of storage nodes by checking the storage bit. |
| // The computed value is used as node Id where QMSs will be created. |
| // QMSs should be running on the storage nodes only. |
| for ( Int32 i=0; i<nodeCount; i++ ) { |
| if (( nodeInfo[i].type & MS_Mon_ZoneType_Storage ) && |
| ( !nodeInfo[i].spare_node )) |
| maxCpuNum++; |
| } |
| } |
| NADELETEBASIC(nodeInfo,heap_); |
| } |
| } |
| |
| qmsCount_ = maxCpuNum; |
| qmsPool_ = new(heap_) QmsStub*[qmsCount_]; |
| |
| // Create a qms stub for each possible cpu in the cluster |
| for (short cpu = 0; cpu < maxCpuNum; cpu++) |
| { |
| qmsPool_[qmsInx++] = new(heap_) QmsStub(1, (char *)"\\NSK", cpu, 0, TRUE, TRUE, |
| /*qmsMsgStream_,*/ ipcEnv_, heap_); |
| } |
| |
| } // allocateQmsPool() |
| |
| // Make sure all QMSs started, and if not, retry up to maxRetries times, each |
| // time invoking a delay of delaySeconds seconds prior to starting it on the |
| // first cpu in the array on which it isn't running. When QMM dies and comes |
| // back up, sometimes the old one's QMSs haven't received the notification and |
| // self-terminated before the new QMM tries to create mew Q<Ss, and a "duplicate |
| // process name" error results. |
| void Qmm::checkAndRetryQms(Int16 maxRetries, Int16 delaySeconds) |
| { |
| NABoolean needRetry = TRUE; |
| for (Int16 i=1; i<=maxRetries && needRetry; i++) |
| { |
| NABoolean allOK = TRUE; |
| needRetry = FALSE; |
| for (Int16 cpuInx=0; cpuInx<qmsCount_; cpuInx++) |
| { |
| if (qmsPool_[cpuInx]->getStatus() == QmsStub::NOT_RUNNING) |
| { |
| if (allOK) // first one found that isn't running? |
| { |
| allOK = FALSE; |
| DELAY(delaySeconds * 100); |
| } |
| if (!qmsPool_[cpuInx]->start()) |
| needRetry = TRUE; // at least 1 qms didn't start |
| } |
| } |
| } |
| } |
| |
| void Qmm::startQmp(short cpu) |
| { |
| if (qmp_) |
| delete qmp_; |
| qmp_ = new(heap_) QmpStub(*ipcEnv_, qmpStartOpt_, cpu, heap_); |
| } |
| |
| NABoolean QmpStub::start() |
| { |
| char* baseProcName = MvQueryRewriteServer::getProcessName(IPC_SQLQMP_SERVER, NULL, cpu_); |
| |
| SB_Phandle_Type *phandle; |
| phandle = get_phandle_with_retry(baseProcName); |
| |
| if (phandle != NULL) |
| { |
| QRLogger::log(CAT_QR_IPC, LL_INFO, |
| "qmm found existing qmp process, not starting new one."); |
| setProcessHandle(*phandle); |
| return TRUE; |
| } |
| |
| switch ((Int32)qmpStartOpt_) |
| { |
| case SPAWN: |
| spawnProcess(ipcEnv_, cpu_); |
| break; |
| case SERVER: |
| allocateProcess(ipcEnv_, cpu_); |
| break; |
| case NONE: |
| // don't start qmp here; start manually |
| break; |
| default: |
| assertLogAndThrow1(CAT_QR_IPC, LL_ERROR, |
| FALSE, QmmException, |
| "Unknown option for starting QMP -- %d", |
| qmpStartOpt_); |
| break; |
| } |
| |
| return TRUE; |
| } |
| |
| void QmpStub::allocateProcess(IpcEnvironment& ipcEnv, short cpu) |
| { |
| if (!qmpServerClass_) |
| qmpServerClass_ = new(heap_) IpcServerClass(&ipcEnv, |
| IPC_SQLQMP_SERVER, |
| IPC_SPAWN_OSS_PROCESS); |
| ComDiagsArea* diagsArea = NULL; |
| IpcAllocateDiagsArea(diagsArea, heap_); |
| |
| // Have to supply name of segment (minus leading \, which gets prepended), or a |
| // fault will occur while populating the diagnostics area if there is an error. |
| char localSegmentName[SEGMENT_NAME_LEN + 1]; |
| short localSegmentNameLen; |
| short result = NODENUMBER_TO_NODENAME_(-1,localSegmentName, |
| SEGMENT_NAME_LEN, |
| &localSegmentNameLen); |
| assertLogAndThrow1(CAT_QR_IPC, LL_ERROR, |
| result==0, QmmException, |
| "Could not get name of local segment, error is %d", result); |
| localSegmentName[localSegmentNameLen] = '\0'; |
| qmpServer_ = qmpServerClass_->allocateServerProcess(&diagsArea, heap_, |
| localSegmentName+1, cpu, |
| IPC_PRIORITY_DONT_CARE, 1, |
| TRUE, TRUE); |
| if (qmpServer_) |
| { |
| |
| QRLogger::log(CAT_QR_IPC, LL_DEBUG, |
| "QMP process started on cpu #%d", qmpServer_->getServerId().getCpuNum()); |
| setProcessHandle(qmpServer_->getServerId().getPhandle().phandle_); |
| } |
| else |
| { |
| QRLogger::log(CAT_QR_IPC, LL_ERROR, |
| "Failed to allocate server process for QMP on cpu %d ", cpu); |
| QRLogger::logDiags(diagsArea, CAT_QR_IPC); |
| } |
| } |
| |
| void QmpStub::spawnProcess(IpcEnvironment& ipcEnv, short cpu) //, ComDiagsArea **diags, CollHeap *diagsHeap) |
| { |
| SB_Phandle_Type p_handle; |
| const char* progFile = "tdm_arkqmp"; |
| |
| #define MAX_PROC_ARGS 10 |
| #define SET_ARGV(argv,argc,argval) {argv[argc] = (char *) calloc(strlen(argval), 1); \ |
| strcpy(argv[argc++], argval); } |
| |
| Int32 largc = 0; |
| char *largv[MAX_PROC_ARGS]; |
| |
| Int32 server_nid = cpu; |
| Int32 server_pid = 0; |
| Int32 server_oid = 0; |
| //char process_name[100]; |
| char prog[MS_MON_MAX_PROCESS_PATH]; |
| |
| SET_ARGV(largv, largc, progFile); |
| SET_ARGV(largv, largc, "-oss"); |
| //SET_ARGV(largv, largc, NULL); |
| |
| strcpy(prog, getenv("TRAF_HOME")); |
| strcat(prog, "/export/bin32/"); |
| strcat(prog, progFile); |
| |
| char* tmpprocess_name = MvQueryRewriteServer::getProcessName(IPC_SQLQMP_SERVER, NULL, cpu, heap_); |
| |
| char process_name[100]; |
| strcpy(process_name, tmpprocess_name); |
| |
| msg_mon_start_process( |
| prog, /* prog */ |
| process_name, /* name */ |
| process_name, /* output process name */ |
| largc, /* args */ |
| largv, |
| &p_handle, |
| 0, /* open */ |
| &server_oid, /* oid */ |
| MS_ProcessType_Generic, /* process type */ |
| 0, /* priority */ |
| 0, /* debug */ |
| 0, /* backup */ |
| &server_nid, /* nid */ |
| &server_pid, |
| NULL, |
| NULL); /* pid */ |
| |
| setProcessHandle(p_handle); |
| } |
| |
| void Qmm::relayPendingPubsToQms() |
| { |
| QRXmlMessageObj* xmlMsgObj; |
| for (CollIndex i=0; i<pendingPubs_.entries(); i++) |
| { |
| xmlMsgObj = pendingPubs_[i]; |
| QRLogger::log(CAT_QR_IPC, LL_DEBUG, |
| "XML of publish message is:\n%s", xmlMsgObj->getData()); |
| for (Int32 qmsInx=0; qmsInx<qmsCount_; qmsInx++) |
| { |
| qmsPool_[qmsInx]->publish(xmlMsgObj); |
| } |
| } |
| |
| QRLogger::log(CAT_QR_IPC, LL_DEBUG, |
| "Sent %d Publish requests to QMS list", pendingPubs_.entries()); |
| for (CollIndex i=0; i<pendingPubs_.entries(); i++) |
| pendingPubs_[i]->decrRefCount(); |
| pendingPubs_.clear(); |
| } |
| |
| void Qmm::executeMessageLoop() |
| { |
| // Have to allocate this from heap, because ~IpcEnvironment deletes it. |
| QmmGuaReceiveControlConnection* conn = |
| new(heap_) QmmGuaReceiveControlConnection(ipcEnv_, this); |
| ipcEnv_->setControlConnection(conn); |
| NAString qmmName("qmm"); |
| QRMessageStream msgStream(ipcEnv_, qmmName); |
| //QRMessageRequest request(msgStream); |
| QRMessageObj* responseObj = NULL; |
| |
| while (!conn->getConnection()) |
| conn->wait(IpcInfiniteTimeout); |
| IpcConnection* firstClient = NULL; |
| |
| if (listenOpt_ == WAITONALL) |
| { |
| firstClient = conn->getConnection(); |
| msgStream.addRecipient(firstClient); |
| while (TRUE) |
| { |
| IpcAllConnections* allConns = ipcEnv_->getAllConnections(); |
| allConns->waitOnAll(); |
| } |
| } |
| else if (listenOpt_ == WAITCC) |
| { |
| WaitReturnStatus waitStatus; |
| while (TRUE) |
| { |
| //debugMessage1("IPC heap usage = %d", ipcEnv_.getHeap()->getAllocSize()); |
| waitStatus = conn->wait(getWaitTimeout()); |
| if (waitStatus == WAIT_OK) |
| { |
| QRProcessStub::checkRestarts(); |
| } |
| } |
| } |
| else |
| { |
| assertLogAndThrow1(CAT_QR_IPC, LL_ERROR, |
| listenOpt_ == RECEIVE, QmmException, |
| "Unknown listen opt -- %d", listenOpt_); |
| firstClient = conn->getConnection(); |
| msgStream.addRecipient(firstClient); |
| |
| while (TRUE) |
| { |
| msgStream.receive(); |
| //IpcAllConnections* allConns = ipcEnv_->getAllConnections(); |
| //allConns->waitOnAll(); |
| try |
| { |
| //responseObj = processRequestMessage(request); |
| responseObj = processRequestMessage(&msgStream); |
| } |
| catch(...) |
| {} |
| |
| // Either an exception was caught above, or a response was not generated |
| // for some other reason (shouldn't happen). Either way, create a status |
| // message indicating an internal error. |
| if (!responseObj) |
| responseObj = new QRStatusMessageObj(QR::InternalError); |
| |
| msgStream.clearAllObjects(); |
| msgStream.setType(responseObj->getType()); // so correct msg type logged |
| msgStream << *responseObj; |
| msgStream.send(); |
| responseObj->decrRefCount(); |
| relayPendingPubsToQms(); |
| } |
| } |
| } |
| |
| QRRequestResult Qmm::handlePublishRequest(QRMessageStream* msgStream) |
| { |
| // Have to use the global heap because IpcMessageObj is not derived from |
| // NABasicObject. |
| QRXmlMessageObj* xmlMsgObj = new QRXmlMessageObj(NULL, PUBLISH_REQUEST); |
| *msgStream >> *xmlMsgObj; |
| pendingPubs_.insert(xmlMsgObj); |
| return QR::Success; |
| } |
| |
| QRRequestResult Qmm::handleAllocateRequest(QRMessageStream* msgStream) |
| { |
| return QR::Unable; |
| } |
| |
| //QRMessageObj* Qmm::processRequestMessage(QRMessageRequest& request) |
| QRMessageObj* Qmm::processRequestMessage(QRMessageStream* msgStream) |
| { |
| QRRequestResult result; |
| XMLFormattedString resultXML; |
| QRMessageObj* responseMsgPtr = NULL; |
| //QRMessageTypeEnum requestType; |
| |
| if (!msgStream->moreObjects()) |
| { |
| QRLogger::log(CAT_QR_IPC, LL_ERROR, |
| "QMM received an empty message stream."); |
| return new QRStatusMessageObj(ProtocolError); |
| } |
| |
| // @ZX: Although we allow the possibility of multiple request objects per |
| // message, the returned response object will indicate the status of |
| // only the last one. Should probably return a status for each request |
| // item (or at least delete the overwritten ones so they don't leak). |
| // @ZX: Note that the message stream is cleared when we encounter an unknown |
| // or unhandled message type. How can an object be extracted from a stream |
| // if you don't understand its type? |
| while (msgStream->moreObjects()) |
| { |
| switch (msgStream->getNextObjType()) |
| { |
| // Can't use heap_ for the allocation of the response message because |
| // IpcMessageObj is not derived from NABasicObject. |
| |
| case PUBLISH_REQUEST: |
| result = handlePublishRequest(msgStream); |
| if (result == QR::Success) |
| QRLogger::log(CAT_QR_IPC, LL_INFO, "PUBLISH was successful."); |
| else |
| QRLogger::log(CAT_QR_IPC, LL_ERROR, |
| "PUBLISH failed, status is %d", result); |
| responseMsgPtr = new QRStatusMessageObj(result); |
| break; |
| |
| case ALLOCATE_REQUEST: |
| result = handleAllocateRequest(msgStream); |
| if (result == QR::Success) |
| QRLogger::log(CAT_QR_IPC, LL_INFO, "ALLOCATE was successful."); |
| else |
| QRLogger::log(CAT_QR_IPC, LL_ERROR, |
| "ALLOCATE failed, status is %d", result); |
| responseMsgPtr = new QRStatusMessageObj(result); |
| break; |
| |
| case DEFAULTS_REQUEST: |
| QRLogger::log(CAT_QR_IPC, LL_ERROR, |
| "'DEFAULTS' request not yet handled by QMM"); |
| responseMsgPtr = new QRStatusMessageObj(QR::InvalidRequest); |
| msgStream->clearAllObjects(); |
| break; |
| |
| default: |
| QRLogger::log(CAT_QR_IPC, LL_ERROR, |
| "QMM received unexpected message type: %d", msgStream->getNextObjType()); |
| responseMsgPtr = new QRStatusMessageObj(QR::InvalidRequest); |
| msgStream->clearAllObjects(); |
| break; |
| } |
| } |
| |
| return responseMsgPtr; |
| } // processRequestMessage |
| |
| IpcTimeout Qmm::getWaitTimeout() |
| { |
| const NAList<QRProcessStub*> restartList = QRProcessStub::getRestartList(); |
| CollIndex restartListEntries = restartList.entries(); |
| if (restartListEntries == 0) |
| return IpcInfiniteTimeout; |
| |
| // Find the time of the next scheduled restart. Init to first entry in list; |
| // we exited above is list was empty. |
| Int64 earliestTimestamp = restartList[0]->getLockoutEndTS(); |
| for (CollIndex i=1; i<restartListEntries; i++) |
| { |
| if (restartList[i]->getLockoutEndTS() < earliestTimestamp) |
| earliestTimestamp = restartList[i]->getLockoutEndTS(); |
| } |
| |
| // Timestamp is microsecond resolution, IpcTimeout is in 10ms units (100 = |
| // 1 second). |
| Int64 microsecondsTillNext = earliestTimestamp - NA_JulianTimestamp() |
| + 1000000; // 1-sec fudge factor |
| if (microsecondsTillNext < 0) // already past time somehow? |
| microsecondsTillNext = 0; |
| QRLogger::log(CAT_QR_IPC, LL_DEBUG, |
| "Wait timeout set to %d seconds", microsecondsTillNext/1000000); |
| return (IpcTimeout)(microsecondsTillNext / 1000); |
| } |
| |
| |
| NAList<QRProcessStub*> QRProcessStub::restartList_((CollHeap*)NULL); |
| |
| QRProcessStub::QRProcessStub(CollHeap* heap) |
| : lockoutEndTS_(0), |
| retryNumber_(0), |
| heap_(heap) |
| { |
| // Initialize the process handle to its null representation, all -1s. |
| nullProcessHandle(); |
| } |
| |
| void QRProcessStub::setLockout() |
| { |
| static Int32 seconds[] = {0, 10, 20, 30, 60, 120}; |
| Int32 lockoutSeconds = (retryNumber_ >= (sizeof seconds / sizeof seconds[0])) |
| ? 180 |
| : seconds[retryNumber_]; |
| QRLogger::log(CAT_QR_IPC, LL_DEBUG, |
| "Will not attempt another restart of this process for %d seconds.", |
| lockoutSeconds); |
| lockoutEndTS_ = NA_JulianTimestamp() + (lockoutSeconds * 1000000); |
| } |
| |
| void QRProcessStub::scheduleRestart() |
| { |
| if (lockoutEndTS_ > NA_JulianTimestamp()) |
| { |
| // Not time yet, add to restart list. When restarted and removed from |
| // restart list, will enter another (longer) lockout period. |
| restartList_.insert(this); |
| } |
| else |
| { |
| // Either there was no lockout or it has expired. Restart process, set |
| // retry count to 1. Only when a process dies again when it is in its |
| // lockout period do we queue the restart and extend the next lockout. |
| // If the restart fails, increment the retry count and put the stub in |
| // the restart list. |
| if (start()) |
| retryNumber_ = 1; |
| else |
| { |
| retryNumber_++; |
| restartList_.insert(this); |
| } |
| setLockout(); |
| } |
| } |
| |
| void QRProcessStub::checkRestarts() |
| { |
| // Go through the list backwards so indexing is not affected by removal of |
| // elements. CollIndex is an unsigned type, so have to start at entries() |
| // (1 past last valid index) instead of entries()-1, so we don't try to assign |
| // it a negative value in the case of an empty list. |
| QRProcessStub* processStub; |
| for (CollIndex i=restartList_.entries(); i>0; i--) |
| { |
| processStub = restartList_[i-1]; |
| if (processStub->lockoutEndTS_ < NA_JulianTimestamp()) |
| { |
| // If the process starts successfully, remove it from the list, else |
| // leave it there to try again next time. In either case, bump up the |
| // lockout period before the next time it can be restarted. |
| if (processStub->start()) |
| restartList_.removeAt(i-1); |
| processStub->retryNumber_++; |
| processStub->setLockout(); |
| } |
| } |
| } |
| |
| void QRProcessStub::nullProcessHandle() |
| { |
| } |
| |
| QmsStub::QmsStub(short segmentNumber, char* segmentName, short cpuNumber, |
| short segmentStatus, NABoolean cpuExists, |
| NABoolean cpuReachable, // QRMessageStream* qmsMsgStream, |
| IpcEnvironment* ipcEnv, |
| CollHeap* heap) |
| : QRProcessStub(heap), |
| segmentNumber_(segmentNumber), |
| cpuNumber_(cpuNumber), |
| segmentStatus_(segmentStatus), |
| qmsServer_(NULL), |
| status_(UNINITIALIZED), |
| //qmsMsgStream_(qmsMsgStream) |
| qmsMsgStream_(new(heap_) QRMessageStream(ipcEnv, "qmm", heap, PUBLISH_REQUEST)) |
| { |
| strcpy(segmentName_, segmentName); |
| |
| if (!cpuExists) |
| { |
| status_ = CPU_NOT_PRESENT; |
| return; |
| } |
| else if (!cpuReachable) |
| { |
| status_ = CPU_NOT_REACHABLE; |
| QRLogger::log(CAT_QR_IPC, LL_DEBUG, |
| "Processor %d on segment %s is not reachable.", cpuNumber, segmentName); |
| return; |
| } |
| else if (segmentStatus != 0) |
| { |
| status_ = SEGMENT_NOT_REACHABLE; // Segment status logged by caller |
| return; |
| } |
| |
| #ifndef NA_WINNT |
| qmsProcessName_ = MvQueryRewriteServer::getProcessName(IPC_SQLQMS_SERVER, segmentName_, cpuNumber, heap_); |
| #else |
| qmsProcessName_ = new(heap_) char[PROCESSNAME_STRING_LEN]; |
| sprintf(qmsProcessName_, "%s.%s%02d", segmentName_, QMS_PROCESS_PREFIX, cpuNumber); |
| #endif |
| |
| // Launch the QMS process. |
| start(); |
| } |
| |
| NABoolean QmsStub::start() |
| { |
| // Qmm class should have called static function to set this. |
| assertLogAndThrow(CAT_QR_IPC, LL_ERROR, |
| qmsServerClass_, QmmException, "qmsServerClass_ is NULL"); |
| |
| // Nodeup message is accompanied by a remote cpu up message for one processor, |
| // so there will be a redundant attempt to start qms on that cpu a 2nd time. |
| if (qmsServer_) |
| { |
| QRLogger::log(CAT_QR_IPC, LL_DEBUG, |
| "QmsStub::start() -- QMS process already running on cpu %d " |
| "of segment %s", cpuNumber_, segmentName_); |
| if (status_ == RUNNING) |
| { |
| QRLogger::log(CAT_QR_IPC, LL_INFO, "-- will not restart QMS process."); |
| return TRUE; |
| } |
| else |
| { |
| // Exists, but status not what we want. Get rid of it, new one will |
| // be started below. |
| QRLogger::log(CAT_QR_IPC, LL_DEBUG, |
| "-- QMS process has status %d, will restart.", status_); |
| disable(NOT_RUNNING); |
| } |
| } |
| |
| ComDiagsArea* diagsArea = NULL; |
| IpcAllocateDiagsArea(diagsArea, heap_); |
| |
| // Pass segmentName_+1 because IpcGuardianServer::spawnProcess() assumes the |
| // node name does not include the leading \, and adds one. |
| |
| qmsServer_ = qmsServerClass_->allocateServerProcess |
| (&diagsArea, heap_, segmentName_+1, |
| cpuNumber_, IPC_PRIORITY_DONT_CARE, 1, |
| TRUE, TRUE, 2, NULL, |
| qmsProcessName_); |
| |
| if (qmsServer_) |
| { |
| qmsMsgStream_->addRecipient(qmsServer_->getControlConnection()); |
| status_ = RUNNING; |
| |
| setProcessHandle(qmsServer_->getServerId().getPhandle().phandle_); |
| QRLogger::log(CAT_QR_IPC, LL_DEBUG, |
| "QMS process started on cpu #%d", qmsServer_->getServerId().getCpuNum()); |
| MvQueryRewriteServer::initQms(qmsServer_, heap_); |
| return TRUE; |
| } |
| else |
| { |
| status_ = NOT_RUNNING; |
| QRLogger::log(CAT_QR_IPC, LL_ERROR, |
| "Failed to allocate server process for QMS on cpu %d of segment %s", |
| cpuNumber_, segmentName_); |
| QRLogger::logDiags(diagsArea, CAT_QR_IPC); |
| return FALSE; |
| } |
| } |
| |
| void QmsStub::disable(Status reason) |
| { |
| setStatus(reason); |
| |
| // This stub may not have an active process. For example, a segment down will |
| // cause this to be called for each stub, including processors not present in |
| // the segment (e.g., a segment with only 8 instead of 16 processors). |
| if (qmsServer_) |
| { |
| qmsMsgStream_->deleteRecipient(qmsServer_->getControlConnection()); |
| qmsServer_->release(); |
| qmsServer_ = NULL; |
| nullProcessHandle(); |
| } |
| } |
| |
| void QmsStub::publish(QRXmlMessageObj* xmlMsgObj) |
| { |
| if (!qmsServer_) |
| return; |
| |
| qmsMsgStream_->clearAllObjects(); |
| qmsMsgStream_->setType(PUBLISH_REQUEST); |
| *qmsMsgStream_ << *xmlMsgObj; |
| qmsMsgStream_->send(); |
| } |
| |
| //void QmmMessageStream::actOnSend(IpcConnection* connection) |
| //{ |
| //} |
| |
| void QmmMessageStream::actOnReceive(IpcConnection* connection) |
| { |
| QRLogger::log(CAT_QR_IPC, LL_INFO, "Reached QmmMessageStream::actOnReceive()"); |
| QRMessageObj* responseObj = NULL; |
| try |
| { |
| responseObj = qmm_->processRequestMessage(this); |
| } |
| catch(...) |
| {} |
| |
| QRMessageStream::actOnReceive(connection); |
| respond(responseObj); |
| |
| qmm_->relayPendingPubsToQms(); |
| |
| // See if we have any processes due to be restarted. |
| QRProcessStub::checkRestarts(); |
| } |