| /////////////////////////////////////////////////////////////////////////////// |
| // |
| // @@@ START COPYRIGHT @@@ |
| // |
| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| // |
| // @@@ END COPYRIGHT @@@ |
| // |
| /////////////////////////////////////////////////////////////////////////////// |
| |
| #include <stddef.h> |
| #include <stdio.h> |
| #include <zlib.h> |
| #include "reqqueue.h" |
| #include "monitor.h" |
| #include "monlogging.h" |
| #include "mlio.h" |
| #include "montrace.h" |
| #include "monsonar.h" |
| #include "clusterconf.h" |
| #include "nameserverconfig.h" |
| #include "lock.h" |
| #include "lnode.h" |
| #include "pnode.h" |
| #include "replicate.h" |
| #include "internal.h" |
| #include "healthcheck.h" |
| #ifndef NAMESERVER_PROCESS |
| #include "redirector.h" |
| #include "nameserver.h" |
| #include "ptpclient.h" |
| #endif |
| |
| extern int MyPNID; |
| extern bool Emulate_Down; |
| extern CMonitor *Monitor; |
| extern CNodeContainer *Nodes; |
| extern CNode *MyNode; |
| extern CMonStats *MonStats; |
| extern CLock MemModLock; |
| extern CNodeContainer *Nodes; |
| extern CReplicate Replicator; |
| extern CReqQueue ReqQueue; |
| extern CConfigContainer *Config; |
| extern CHealthCheck HealthCheck; |
| #ifdef NAMESERVER_PROCESS |
| extern char *ErrorMsg (int error_code); |
| #else |
| extern CRedirector Redirector; |
| extern bool NameServerEnabled; |
| extern CPtpClient *PtpClient; |
| extern CNameServer *NameServer; |
| extern CNameServerConfigContainer *NameServerConfig; |
| #endif |
| |
| extern int req_type_startup; |
| |
| extern bool IAmIntegrating; |
| extern bool IAmIntegrated; |
| extern bool IsRealCluster; |
| extern bool IsAgentMode; |
| extern bool IsMaster; |
| extern bool ZClientEnabled; |
| |
| extern CommType_t CommType; |
| extern bool IsRealCluster; |
| |
| CReqResource::CReqResource() |
| { |
| } |
| |
| CReqResource::~CReqResource() |
| { |
| } |
| |
| CReqResourceProc::CReqResourceProc( int nid |
| , int pid |
| , const char *name |
| , Verifier_t verifier ) |
| : nid_(nid) |
| , pid_(pid) |
| , verifier_(verifier) |
| , processName_(name) |
| { |
| // Add eyecatcher sequence as a debugging aid |
| memcpy(&eyecatcher_, "RESA", 4); |
| } |
| |
| CReqResourceProc::~CReqResourceProc() |
| { |
| // Alter eyecatcher sequence as a debugging aid to identify deleted object |
| memcpy(&eyecatcher_, "resa", 4); |
| } |
| |
| CReqResource::ResourceStatus_t CReqResourceProc::acquireResource( long requestId ) |
| { |
| const char method_name[] = "CReqResourceProc::acquireResource"; |
| TRACE_ENTRY; |
| |
| CProcess *process = NULL; |
| if ( nid_ == -1 || pid_ == -1 ) |
| { // find by name (check node state, don't check process state, not backup) |
| process = Nodes->GetProcess( processName_.c_str() |
| , verifier_, true, false, false ); |
| } |
| else |
| { // find by nid (check node state, don't check process state, backup is Ok) |
| process = Nodes->GetProcess( nid_ |
| , pid_ |
| , verifier_, true, false, true ); |
| } |
| |
| if ( process == NULL ) |
| { // Process no longer exists |
| if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) |
| { |
| trace_printf("%s@%d request #%ld, process (%d, %d) no longer " |
| "exists\n", |
| method_name, __LINE__, requestId, nid_, pid_); |
| } |
| |
| TRACE_EXIT; |
| return UnAvailable; |
| } |
| |
| if ( process->GetState() != State_Up) |
| { |
| if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) |
| { |
| trace_printf("%s@%d request #%ld could not acquire ownership of " |
| "process %s (%d, %d), process state not State_Up\n", |
| method_name, __LINE__, requestId, process->GetName(), |
| process->GetNid(), process->GetPid()); |
| } |
| |
| TRACE_EXIT; |
| return NotUp; |
| } |
| |
| if ( !process->isOwned() ) |
| { |
| process->setOwner( requestId ); |
| if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) |
| trace_printf("%s@%d request #%ld obtained ownership of process " |
| "%s (%d, %d)\n", |
| method_name, __LINE__, requestId, process->GetName(), |
| process->GetNid(), process->GetPid()); |
| TRACE_EXIT; |
| return Acquired; |
| } |
| else |
| { |
| if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) |
| trace_printf("%s@%d request #%ld could not acquire ownership of " |
| "process %s (%d, %d), resource busy.\n", |
| method_name, __LINE__, requestId, process->GetName(), |
| process->GetNid(), process->GetPid()); |
| |
| TRACE_EXIT; |
| return Busy; |
| } |
| } |
| |
| void CReqResourceProc::releaseResource() |
| { |
| const char method_name[] = "CReqResourceProc::releaseResource"; |
| TRACE_ENTRY; |
| |
| CProcess *process = Nodes->GetProcess( nid_, pid_ ); |
| |
| if ( process ) |
| { |
| // Indicate that the process object is no longer owned. |
| process->resetOwned(); |
| |
| if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) |
| trace_printf("%s@%d released ownership of process %s (%d, %d)\n", |
| method_name, __LINE__, process->GetName(), |
| process->GetNid(), process->GetPid()); |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| CProcess* CReqResourceProc::getProcess() |
| { |
| return( Nodes->GetProcess( nid_, pid_ ) ); |
| } |
| |
| CReqResourceConfig::CReqResourceConfig(CConfigGroup *config): config_(config) |
| { |
| // Add eyecatcher sequence as a debugging aid |
| memcpy(&eyecatcher_, "RESB", 4); |
| } |
| |
| CReqResourceConfig::~CReqResourceConfig() |
| { |
| // Alter eyecatcher sequence as a debugging aid to identify deleted object |
| memcpy(&eyecatcher_, "resb", 4); |
| } |
| |
| |
| CReqResource::ResourceStatus_t CReqResourceConfig::acquireResource( long ) |
| { |
| return UnAvailable; |
| } |
| |
| void CReqResourceConfig::releaseResource() |
| { |
| } |
| |
| CRequest::CRequest(): concurrent_(false), id_(0), numResources_(0) |
| { |
| clock_gettime(CLOCK_REALTIME, &reqArrival_); |
| reqStart_.tv_sec = 0; |
| reqStart_.tv_nsec = 0; |
| execTimeMax_ = CReqQueue::REQ_MAX_RESPONSIVE + MyNode->GetWDTKeepAliveTimerValue(); |
| priority_ = Normal; |
| } |
| |
| CRequest::~CRequest() |
| { |
| const char method_name[] = "CRequest::~CRequest"; |
| TRACE_ENTRY; |
| |
| for (int i=0; i<numResources_; i++) |
| { |
| delete resources_[i]; |
| } |
| |
| if ( trace_settings & TRACE_REQUEST ) |
| trace_printf("%s@%d - request #%ld\n", method_name, __LINE__, id_); |
| TRACE_EXIT; |
| } |
| |
| void CRequest::addResource(CReqResource * resource) |
| { |
| if (numResources_ < MAX_RESOURCES) |
| { |
| resources_[numResources_] = resource; |
| ++numResources_; |
| } |
| } |
| |
| |
| |
| CRequest::ReqStatus_t CRequest::okToExecute() |
| { |
| const char method_name[] = "CRequest::okToExecute"; |
| TRACE_ENTRY; |
| |
| ReqStatus_t reqStatus = OkToExec; |
| |
| // do any request preparation |
| if (!prepare()) |
| { // Some problem discovered during preparation. |
| // Error reply has been sent to requester by prepare(). |
| TRACE_EXIT; |
| |
| return Failed; |
| } |
| |
| bool ownershipFailure; |
| if ( !isExclusive() |
| && !takeOwnership( ownershipFailure ) ) |
| { // Can't get ownership of needed objects |
| |
| if (ownershipFailure) |
| { // Not possible to get required ownership to complete |
| // the request so request fails. |
| |
| if ( trace_settings & TRACE_REQUEST_DETAIL ) |
| trace_printf("%s@%d ownership failure for request " |
| "#%ld, replying with error\n", |
| method_name, __LINE__, getId()); |
| |
| // Send failure response |
| errorReply( MPI_ERR_NAME ); |
| |
| reqStatus = Failed; |
| } |
| else |
| { |
| // Check for request failure due to timeout |
| struct timespec now; |
| struct timespec tDiff; |
| clock_gettime(CLOCK_REALTIME, &now); |
| |
| CReqQueue::timeDiff ( reqArrival_, now, tDiff ); |
| |
| if ( tDiff.tv_sec > CReqQueue::REQ_MAX_DEFER) |
| { // timed out |
| populateRequestString(); |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), "[%s] request #%ld timed out " |
| "attempting to gain ownership of needed resources " |
| "(%s)\n", method_name, getId(), |
| requestString_.c_str()); |
| mon_log_write(MON_REQQUEUE_REQUEST_1, SQ_LOG_ERR, buf); |
| |
| // Send failure response |
| errorReply( MPI_ERR_OTHER ); |
| |
| reqStatus = Failed; |
| } |
| else |
| { // Could not get ownership, will wait until later |
| if ( trace_settings & TRACE_REQUEST ) |
| trace_printf("%s@%d cannot take ownership for request " |
| "#%ld\n", method_name, __LINE__, getId()); |
| |
| reqStatus = WaitToExec; |
| } |
| } |
| } |
| else |
| { |
| clock_gettime(CLOCK_REALTIME, &reqStart_); |
| } |
| |
| |
| TRACE_EXIT; |
| |
| return reqStatus; |
| } |
| |
| |
| void CRequest::evalReqPerformance( void ) |
| { |
| const char method_name[] = "CRequest::evalReqPerformance"; |
| |
| // Log info if request took a long time |
| struct timespec curTime; |
| clock_gettime(CLOCK_REALTIME, &curTime); |
| |
| struct timespec queuedTime; |
| struct timespec performTime; |
| struct timespec totalTime; |
| |
| CReqQueue::timeDiff ( reqArrival_, reqStart_, queuedTime ); |
| CReqQueue::timeDiff ( reqStart_, curTime, performTime ); |
| CReqQueue::timeDiff ( reqArrival_, curTime, totalTime ); |
| |
| // temp trace |
| if ( trace_settings & TRACE_REQUEST ) |
| { |
| trace_printf("%s@%d request #%ld, arrival-to-start=%ld.%06ld, " |
| "start-to-complete=%ld.%06ld, total=%ld.%06ld\n", |
| method_name, __LINE__, getId(), |
| queuedTime.tv_sec, queuedTime.tv_nsec / 1000, |
| performTime.tv_sec, performTime.tv_nsec / 1000, |
| totalTime.tv_sec, totalTime.tv_nsec / 1000); |
| } |
| // end trace |
| |
| if ( performTime.tv_sec > CReqQueue::REQ_MAX_PERFORM ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| populateRequestString(); |
| sprintf(buf, "[%s], Lengthy request: perform " |
| "time=%ld.%06ld, total time=%ld.%06ld {%s}\n", method_name, |
| performTime.tv_sec, performTime.tv_nsec, |
| totalTime.tv_sec, totalTime.tv_nsec, requestString()); |
| mon_log_write(MON_REQ_EVALREQ_PERFORMANCE_1, SQ_LOG_ERR, buf); |
| } |
| } |
| |
| #ifndef NAMESERVER_PROCESS |
| // Sending reply to request from local io client |
| void CRequest::lioreply(struct message_def *msg, int Pid, int *error) |
| { |
| const char method_name[] = "CRequest::lioreply"; |
| TRACE_ENTRY; |
| |
| if (msg->noreply) // tell client to release buffer |
| { |
| if (trace_settings & (TRACE_MLIO_DETAIL | TRACE_PROCESS_DETAIL)) |
| trace_printf("%s@%d request type = %d\n", method_name, __LINE__, |
| msg->u.request.type); |
| if (msg->u.request.type != ReqType_NewProcess && |
| msg->u.request.type != ReqType_Dump && |
| msg->u.request.type != ReqType_Close) |
| { |
| if (trace_settings & (TRACE_MLIO_DETAIL | TRACE_PROCESS_DETAIL)) |
| trace_printf("%s@%d" " sending readysend ctl msg=%p, pid=%d, " |
| "idx=%d\n", method_name, __LINE__, msg, Pid, |
| ((SharedMsgDef*)msg)->trailer.index ); |
| |
| SQ_theLocalIOToClient->sendCtlMsg ( Pid, |
| MC_ReadySend, |
| ((SharedMsgDef*)msg)-> |
| trailer.index, |
| error |
| ); |
| } |
| else |
| { |
| if (trace_settings & (TRACE_MLIO_DETAIL | TRACE_PROCESS_DETAIL)) |
| trace_printf("%s@%d waiting for cluster generated reply\n", |
| method_name, __LINE__); |
| } |
| } |
| else // tell client they have message. |
| { |
| if (trace_settings & TRACE_PROCESS) |
| // if (trace_settings & (TRACE_MLIO_DETAIL | TRACE_PROCESS_DETAIL)) |
| trace_printf("%s@%d sending reply ctl msg=%p, pid=%d, idx=%d\n", |
| method_name, __LINE__, msg, Pid, |
| ((SharedMsgDef*)msg)->trailer.index ); |
| |
| SQ_theLocalIOToClient->sendCtlMsg ( Pid, |
| MC_SReady, |
| ((SharedMsgDef*)msg)->trailer.index, |
| error |
| ); |
| } |
| |
| TRACE_EXIT; |
| } |
| #endif |
| |
| void CExternalReq::validateObj( void ) |
| { |
| if ((strncmp((const char *)&eyecatcher_, "RQE", 3) !=0 ) && |
| (strncmp((const char *)&eyecatcher_, "RqE", 3) !=0 )) |
| { // Not a valid object |
| abort(); |
| } |
| } |
| |
| void CExternalReq::errorReply( int rc ) |
| { |
| msg_->u.reply.type = ReplyType_Generic; |
| msg_->u.reply.u.generic.nid = -1; |
| msg_->u.reply.u.generic.pid = -1; |
| msg_->u.reply.u.generic.verifier = -1; |
| msg_->u.reply.u.generic.process_name[0] = '\0'; |
| msg_->u.reply.u.generic.return_code = rc; |
| |
| #ifdef NAMESERVER_PROCESS |
| if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) |
| { |
| const char method_name[] = "CExternalReq::errorReply"; |
| trace_printf( "%s@%d rc=%d\n", method_name, __LINE__, rc ); |
| } |
| monreply(msg_, sockFd_); |
| #else |
| // Send reply to requester |
| lioreply(msg_, pid_); |
| #endif |
| } |
| |
| |
| |
| // Attempt to take ownership of all resources needed for this request. |
| bool CExternalReq::takeOwnership( bool & ownershipFailure ) |
| { |
| const char method_name[] = "CExternalReq::takeOwnership"; |
| bool haveOwnership = true; |
| CReqResource::ResourceStatus_t resourceStatus; |
| ownershipFailure = false; |
| |
| TRACE_ENTRY; |
| |
| if ( (trace_settings & TRACE_REQUEST) && ( numResources_ != 0 ) ) |
| { |
| trace_printf("%s@%d request #%ld, # resources=%d\n", method_name, |
| __LINE__, id_, numResources_); |
| } |
| |
| for (int i=0; i<numResources_; i++) |
| { |
| resourceStatus = resources_[i]->acquireResource( id_ ); |
| if ( resourceStatus != CReqResource::Acquired ) |
| { // Could not obtain the resource, release already obtained |
| // resources. |
| for (int j=0; j<i; j++) |
| { |
| resources_[j]->releaseResource(); |
| } |
| haveOwnership = false; |
| |
| if ( resourceStatus == CReqResource::UnAvailable ) |
| { |
| ownershipFailure = true; |
| } |
| break; |
| } |
| } |
| |
| TRACE_EXIT; |
| return haveOwnership; |
| } |
| |
| // Release any resources acquired for executing the request. |
| void CExternalReq::giveupOwnership() |
| { |
| const char method_name[] = "CExternalReq::giveupOwnership"; |
| TRACE_ENTRY; |
| |
| if ( (trace_settings & TRACE_REQUEST) && ( numResources_ != 0 ) ) |
| { |
| trace_printf("%s@%d request #%ld, # resources=%d\n", method_name, |
| __LINE__, id_, numResources_); |
| } |
| |
| for (int i=0; i<numResources_; i++) |
| { |
| resources_[i]->releaseResource(); |
| } |
| TRACE_EXIT; |
| } |
| |
| CExtNullReq::CExtNullReq (reqQueueMsg_t msgType, int nid, int pid, int sockFd, |
| struct message_def *msg ) |
| : CExternalReq(msgType, nid, pid, sockFd, msg) |
| { |
| // Add eyecatcher sequence as a debugging aid |
| memcpy(&eyecatcher_, "RQE_", 4); |
| } |
| |
| CExtNullReq::~CExtNullReq() |
| { |
| // Alter eyecatcher sequence as a debugging aid to identify deleted object |
| memcpy(&eyecatcher_, "rqe_", 4); |
| } |
| |
| CInternalReq::CInternalReq() |
| : seqNum_(0), |
| reviveFlag_(0) // will be set by requests that are needed to perform revive |
| { |
| // Add eyecatcher sequence as a debugging aid |
| memcpy(&eyecatcher_, "REQI", 4); |
| } |
| |
| CInternalReq::~CInternalReq() |
| { |
| // Alter eyecatcher sequence as a debugging aid to identify deleted object |
| memcpy(&eyecatcher_, "reqi", 4); |
| } |
| |
| void CInternalReq::validateObj( void ) |
| { |
| if ((strncmp((const char *)&eyecatcher_, "RQI", 3) !=0 ) && |
| (strncmp((const char *)&eyecatcher_, "RqI", 3) !=0 )) |
| { // Not a valid object |
| abort(); |
| } |
| } |
| |
| void CInternalReq::giveupOwnership() |
| { |
| } |
| |
| void CInternalReq::populateRequestString( void ) |
| { |
| } |
| |
| void CInternalReq::performRequest() |
| { |
| const char method_name[] = "CInternalReq::performRequest"; |
| TRACE_ENTRY; |
| |
| // Trace info about request |
| if (trace_settings & TRACE_REQUEST ) |
| trace_printf("%s@%d \n", method_name, __LINE__); |
| |
| TRACE_EXIT; |
| } |
| |
| void CInternalReq::errorReply( int ) |
| { |
| } |
| |
| #ifndef NAMESERVER_PROCESS |
| CIntCloneProcReq::CIntCloneProcReq( bool backup, bool unhooked, bool eventMessages, bool systemMessages, int nid, PROCESSTYPE type, int priority, int parentNid, int parentPid, int parentVerifier, int osPid, int verifier, pid_t priorPid, int persistentRetries, int argc, struct timespec creationTime, strId_t pathStrId, strId_t ldpathStrId, strId_t programStrId, int nameLen, int portLen, int infileLen, int outfileLen, int argvLen, const char * stringData, int origPNidNs) |
| : CInternalReq(), |
| backup_( backup ), |
| unhooked_( unhooked ), |
| eventMessages_ ( eventMessages ), |
| systemMessages_( systemMessages ), |
| nid_ ( nid ), |
| type_( type ), |
| priority_( priority ), |
| parentNid_( parentNid ), |
| parentPid_( parentPid ), |
| parentVerifier_( parentVerifier ), |
| osPid_( osPid ), |
| verifier_( verifier ), |
| priorPid_( priorPid ), |
| persistentRetries_ ( persistentRetries ), |
| argc_( argc ), |
| pathStrId_ ( pathStrId ), |
| ldpathStrId_ ( ldpathStrId ), |
| programStrId_ ( programStrId ), |
| nameLen_ ( nameLen ), |
| portLen_ ( portLen ), |
| infileLen_ ( infileLen ), |
| outfileLen_ ( outfileLen ), |
| argvLen_ ( argvLen ), |
| origPNidNs_ ( origPNidNs ) |
| { |
| // Add eyecatcher sequence as a debugging aid |
| memcpy(&eyecatcher_, "RQIL", 4); |
| |
| creationTime_.tv_sec = creationTime.tv_sec; |
| creationTime_.tv_nsec = creationTime.tv_nsec; |
| |
| int stringDataLen = nameLen_ + portLen_+ infileLen_ + outfileLen_ |
| + argvLen_; |
| stringData_ = new char [stringDataLen]; |
| memcpy ( stringData_, stringData, stringDataLen ); |
| } |
| |
| void CIntCloneProcReq::populateRequestString( void ) |
| { |
| char strBuf[MON_STRING_BUF_SIZE/2]; |
| sprintf( strBuf, "IntReq(%s) req #=%ld (name=%s/nid=%d/pid=%d) parent(nid=%d/pid=%d)" |
| , CReqQueue::intReqType[InternalType_Clone] |
| , getId(), |
| &stringData_[0], // process name |
| nid_, osPid_, parentNid_, parentPid_ ); |
| requestString_.assign( strBuf ); |
| } |
| |
| CIntCloneProcReq::~CIntCloneProcReq() |
| { |
| // Alter eyecatcher sequence as a debugging aid to identify deleted object |
| memcpy(&eyecatcher_, "rqil", 4); |
| |
| delete [] stringData_; |
| } |
| |
| void CIntCloneProcReq::performRequest() |
| { |
| const char method_name[] = "CIntCloneProcReq::performRequest"; |
| TRACE_ENTRY; |
| |
| CProcess *process; |
| CProcess *parent; |
| |
| // Trace info about request |
| if (trace_settings & TRACE_REQUEST ) |
| trace_printf("%s@%d \n", method_name, __LINE__); |
| |
| if ( priorPid_ != 0 ) |
| { // The "clone" represents a restarted persistent process |
| process = Nodes->GetProcess( nid_, priorPid_, false ); |
| if (process) |
| { |
| parent = Nodes->GetProcess( process->GetParentNid(), |
| process->GetParentPid() ); |
| // Handle prior process termination |
| process->Exit( parent ); |
| |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST |
| | TRACE_PROCESS)) |
| { |
| trace_printf("%s@%d - For restarted persistent process " |
| " altering (%d, %d) to (%d, %d)\n", |
| method_name, __LINE__, |
| nid_, |
| priorPid_, |
| nid_, |
| osPid_ ); |
| } |
| |
| CNode * node = Nodes->GetLNode (process->GetNid())->GetNode(); |
| node->DelFromPidMap ( process ); |
| process->SetVerifier(verifier_); |
| process->SetParentVerifier(parentVerifier_); |
| process->CompleteProcessStartup (&stringData_[nameLen_], |
| osPid_, |
| eventMessages_, |
| systemMessages_, |
| false, |
| &creationTime_, |
| origPNidNs_); |
| node->AddToNameMap ( process ); |
| node->AddToPidMap ( osPid_, process ); |
| } |
| else |
| { // Unexpectedly could not find process object |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS)) |
| { |
| trace_printf("%s@%d - Unexpectedly could not locate " |
| " restarted persistent process (%d , %d)\n", |
| method_name, __LINE__, |
| nid_, |
| priorPid_); |
| } |
| } |
| } |
| else |
| { |
| // check to see if we pre-cloned this process for startup in a remote node |
| process = Nodes->GetLNode (nid_)-> |
| CompleteProcessStartup(&stringData_[0], // process name |
| &stringData_[nameLen_], // port, |
| osPid_, |
| eventMessages_, |
| systemMessages_, |
| &creationTime_, |
| origPNidNs_); |
| if (process) |
| { |
| if (trace_settings & TRACE_SYNC) |
| { |
| trace_printf("%s@%d - Created process %s (%d, %d), started " |
| "on port %s\n", method_name, __LINE__, |
| process->GetName(), process->GetNid(), |
| process->GetPid(), process->GetPort()); |
| } |
| |
| process->SetVerifier(verifier_); |
| process->SetParentVerifier(parentVerifier_); |
| |
| // Send reply or notice to parent process if necessary. |
| int result = ( process->GetPid() != -1 ) ? 0 : MPI_ERR_SPAWN; |
| parent = CProcessContainer::ParentNewProcReply( process, result ); |
| if ( result ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), |
| "[%s], Process %s (%d, %d) did not " |
| "startup successfully.\n", |
| method_name, process->GetName(), process->GetNid(), |
| process->GetPid()); |
| mon_log_write(MON_INTREQ_CLONEPROC_1, SQ_LOG_ERR, buf); |
| } |
| |
| if (parent && process->IsBackup()) |
| { |
| if (trace_settings & TRACE_SYNC) |
| trace_printf("%s@%d - For backup process (%d, %d), for" |
| " parent (%d, %d) setting parent's Parent" |
| "_Nid/Parent_Pid=(%d, %d).\n", |
| method_name, __LINE__, process->GetNid(), |
| process->GetPid(), parent->GetNid(), parent->GetPid(), |
| process->GetNid(), process->GetPid()); |
| |
| parent->SetParentNid ( process->GetNid() ); |
| parent->SetParentPid ( process->GetPid() ); |
| parent->SetParent( process ); |
| } |
| |
| // There might be a request waiting for the process creation to |
| // complete so have worker check pending request queue. |
| ReqQueue.nudgeWorker(); |
| } |
| else |
| { |
| // This is a new clone process that needs to be created |
| // mirroring another node |
| CNode * node = Nodes->GetLNode (nid_)->GetNode(); |
| CProcess * process; |
| process = node->CloneProcess (nid_, |
| type_, |
| priority_, |
| backup_, |
| unhooked_, |
| &stringData_[0], // process name |
| &stringData_[nameLen_], // port |
| osPid_, |
| verifier_, |
| parentNid_, |
| parentPid_, |
| parentVerifier_, |
| eventMessages_, |
| systemMessages_, |
| pathStrId_, |
| ldpathStrId_, |
| programStrId_, |
| &stringData_[nameLen_ + portLen_], // infile |
| &stringData_[nameLen_ + portLen_ + infileLen_], // outfile |
| &creationTime_, |
| origPNidNs_); |
| if ( process ) |
| { |
| process->userArgs ( argc_, argvLen_, |
| &stringData_[nameLen_ + portLen_ |
| +infileLen_ + outfileLen_] ); |
| } |
| } |
| } |
| |
| TRACE_EXIT; |
| } |
| #endif |
| |
| |
| #ifdef NAMESERVER_PROCESS |
| CIntCloneProcNsReq::CIntCloneProcNsReq( bool backup |
| , bool unhooked |
| , bool eventMessages |
| , bool systemMessages |
| , int nid |
| , PROCESSTYPE type |
| , int priority |
| , int parentNid |
| , int parentPid |
| , int parentVerifier |
| , int osPid |
| , int verifier |
| , pid_t priorPid |
| , int persistentRetries |
| , int argc |
| , struct timespec creationTime |
| , int pathLen |
| , int ldpathLen |
| , int programLen |
| // , strId_t pathStrId |
| // , strId_t ldpathStrId |
| // , strId_t programStrId |
| , int nameLen |
| , int portLen |
| , int infileLen |
| , int outfileLen |
| , int argvLen |
| , const char * stringData |
| , int origPNidNs) |
| : CInternalReq() |
| , backup_( backup ) |
| , unhooked_( unhooked ) |
| , eventMessages_( eventMessages ) |
| , systemMessages_( systemMessages ) |
| , nid_( nid ) |
| , type_( type ) |
| , priority_( priority ) |
| , parentNid_( parentNid ) |
| , parentPid_( parentPid ) |
| , parentVerifier_( parentVerifier ) |
| , osPid_( osPid ) |
| , verifier_( verifier ) |
| , priorPid_( priorPid ) |
| , persistentRetries_( persistentRetries ) |
| , argc_( argc ) |
| , pathLen_( pathLen ) |
| , ldpathLen_( ldpathLen ) |
| , programLen_( programLen ) |
| // , pathStrId_( pathStrId ) |
| // , ldpathStrId_( ldpathStrId ) |
| // , programStrId_( programStrId ) |
| , nameLen_( nameLen ) |
| , portLen_( portLen ) |
| , infileLen_( infileLen ) |
| , outfileLen_( outfileLen ) |
| , argvLen_( argvLen ) |
| , origPNidNs_( origPNidNs ) |
| { |
| // Add eyecatcher sequence as a debugging aid |
| memcpy(&eyecatcher_, "RQIL", 4); |
| |
| creationTime_.tv_sec = creationTime.tv_sec; |
| creationTime_.tv_nsec = creationTime.tv_nsec; |
| |
| int stringDataLen = nameLen_ + portLen_+ infileLen_ + outfileLen_ |
| + pathLen_ + ldpathLen_ + programLen_ |
| + argvLen_; |
| stringData_ = new char [stringDataLen]; |
| memcpy ( stringData_, stringData, stringDataLen ); |
| } |
| |
| void CIntCloneProcNsReq::populateRequestString( void ) |
| { |
| char strBuf[MON_STRING_BUF_SIZE/2]; |
| sprintf( strBuf, "IntReq(%s) req #=%ld (name=%s/nid=%d/pid=%d) parent(nid=%d/pid=%d)" |
| , CReqQueue::intReqType[InternalType_Clone] |
| , getId(), |
| &stringData_[0], // process name |
| nid_, osPid_, parentNid_, parentPid_ ); |
| requestString_.assign( strBuf ); |
| } |
| |
| CIntCloneProcNsReq::~CIntCloneProcNsReq() |
| { |
| // Alter eyecatcher sequence as a debugging aid to identify deleted object |
| memcpy(&eyecatcher_, "rqil", 4); |
| |
| delete [] stringData_; |
| } |
| |
| void CIntCloneProcNsReq::performRequest() |
| { |
| const char method_name[] = "CIntCloneProcNsReq::performRequest"; |
| TRACE_ENTRY; |
| |
| CProcess *process; |
| |
| // Trace info about request |
| if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) |
| trace_printf("%s@%d \n", method_name, __LINE__); |
| |
| // check to see if we pre-cloned this process for startup in a remote node |
| process = Nodes->GetLNode (nid_)-> |
| CompleteProcessStartup(&stringData_[0], // process name |
| &stringData_[nameLen_], // port, |
| osPid_, |
| eventMessages_, |
| systemMessages_, |
| &creationTime_, |
| origPNidNs_); |
| if (process) |
| { |
| if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) |
| { |
| trace_printf("%s@%d - Registered process %s (%d, %d:%d), " |
| "port=%s, origPNidNs=%d, MyPNID=%d\n", method_name, __LINE__, |
| process->GetName(), process->GetNid(), |
| process->GetPid(), process->GetVerifier(), |
| process->GetPort(), process->GetOrigPNidNs(), MyPNID ); |
| } |
| |
| process->SetVerifier(verifier_); |
| process->SetParentVerifier(parentVerifier_); |
| |
| // Send reply to originating monitor |
| if ( MyPNID == process->GetOrigPNidNs() ) |
| { |
| struct message_def *msg = process->GetMonContext(); |
| msg->u.reply.type = ReplyType_NewProcessNs; |
| msg->noreply = false; |
| msg->reply_tag = process->GetReplyTag(); |
| msg->u.reply.u.new_process_ns.nid = process->GetNid(); |
| msg->u.reply.u.new_process_ns.pid = process->GetPid(); |
| msg->u.reply.u.new_process_ns.verifier = process->GetReplyTag(); |
| strncpy(msg->u.reply.u.new_process_ns.process_name, process->GetName(), MAX_PROCESS_NAME); |
| msg->u.reply.u.new_process_ns.return_code = MPI_SUCCESS; |
| |
| int sockFd = process->GetMonSockFd(); |
| // Send reply to requester |
| monreply( msg, sockFd ); |
| } |
| |
| // There might be a request waiting for the process creation to |
| // complete so have worker check pending request queue. |
| ReqQueue.nudgeWorker(); |
| } |
| else |
| { |
| // This is a new clone process that needs to be created |
| // mirroring another node |
| CNode * node = Nodes->GetLNode(nid_)->GetNode(); |
| CProcess * process; |
| process = node->CloneProcess( nid_, |
| type_, |
| priority_, |
| backup_, |
| unhooked_, |
| &stringData_[0], // process name |
| &stringData_[nameLen_], // port |
| osPid_, |
| verifier_, |
| parentNid_, |
| parentPid_, |
| parentVerifier_, |
| eventMessages_, |
| systemMessages_, |
| // pathStrId_, |
| // ldpathStrId_, |
| // programStrId_, |
| &stringData_[nameLen_ + portLen_ + infileLen_ + outfileLen_], // path |
| &stringData_[nameLen_ + portLen_ + infileLen_ + outfileLen_ + pathLen_], // ldpath |
| &stringData_[nameLen_ + portLen_ + infileLen_ + outfileLen_ + pathLen_ + ldpathLen_], // program |
| &stringData_[nameLen_ + portLen_], // infile |
| &stringData_[nameLen_ + portLen_ + infileLen_], // outfile |
| &creationTime_, |
| origPNidNs_); |
| if ( process ) |
| { |
| process->userArgs( argc_ |
| , argvLen_ |
| , &stringData_[ nameLen_ |
| + portLen_ |
| + infileLen_ |
| + outfileLen_ |
| + pathLen_ |
| + ldpathLen_ |
| + programLen_] ); |
| } |
| } |
| |
| TRACE_EXIT; |
| } |
| #endif |
| |
| |
| #ifndef NAMESERVER_PROCESS |
| CIntDeviceReq::CIntDeviceReq( char *ldevName ) |
| : CInternalReq() |
| { |
| // Add eyecatcher sequence as a debugging aid |
| memcpy(&eyecatcher_, "RQID", 4); |
| |
| STRCPY( ldevName_, ldevName ); |
| } |
| |
| CIntDeviceReq::~CIntDeviceReq() |
| { |
| // Alter eyecatcher sequence as a debugging aid to identify deleted object |
| memcpy(&eyecatcher_, "rqid", 4); |
| } |
| |
| void CIntDeviceReq::populateRequestString( void ) |
| { |
| char strBuf[MON_STRING_BUF_SIZE/2]; |
| sprintf( strBuf, "IntReq(%s) req #=%ld (ldevname=%s)" |
| , CReqQueue::intReqType[InternalType_Device] |
| , getId(), ldevName_ ); |
| requestString_.assign( strBuf ); |
| } |
| |
| void CIntDeviceReq::performRequest() |
| { |
| const char method_name[] = "CIntDeviceReq::performRequest"; |
| TRACE_ENTRY; |
| |
| // Trace info about request |
| if (trace_settings & TRACE_REQUEST ) |
| trace_printf("%s@%d device=%s\n", method_name, __LINE__, ldevName_); |
| |
| Monitor->DoDeviceReq( ldevName_ ); |
| |
| TRACE_EXIT; |
| } |
| #endif |
| |
| #ifndef NAMESERVER_PROCESS |
| CIntExitReq::CIntExitReq( ) |
| : CInternalReq() |
| , nid_(0) |
| , pid_(0) |
| , verifier_(-1) |
| , abended_(false) |
| { |
| // Add eyecatcher sequence as a debugging aid |
| memcpy(&eyecatcher_, "RQIE", 4); |
| |
| name_[0] = '\0'; |
| } |
| |
| CIntExitReq::~CIntExitReq( ) |
| { |
| // Alter eyecatcher sequence as a debugging aid to identify deleted object |
| memcpy(&eyecatcher_, "rqie", 4); |
| } |
| |
| void CIntExitReq::populateRequestString( void ) |
| { |
| char strBuf[MON_STRING_BUF_SIZE/2]; |
| sprintf( strBuf, "IntReq(%s) req #=%ld (name=%s/nid=%d/pid=%d/verifier=%d)" |
| , CReqQueue::intReqType[InternalType_Exit] |
| , getId(), name_, nid_, pid_, verifier_ ); |
| requestString_.assign( strBuf ); |
| } |
| |
| void CIntExitReq::prepRequest( struct exit_def *exitDef ) |
| { |
| const char method_name[] = "CIntExitReq::prepRequest"; |
| TRACE_ENTRY; |
| |
| nid_ = exitDef->nid; |
| pid_ = exitDef->pid; |
| verifier_ = exitDef->verifier; |
| strcpy( name_, exitDef->name ); |
| abended_ = exitDef->abended; |
| |
| TRACE_EXIT; |
| } |
| |
| void CIntExitReq::performRequest() |
| { |
| const char method_name[] = "CIntExitReq::performRequest"; |
| TRACE_ENTRY; |
| |
| CProcess *process = NULL; |
| CLNode *lnode; |
| |
| lnode = Nodes->GetLNode( nid_ ); |
| if ( lnode ) |
| { |
| process = lnode->GetNode()->GetProcess( pid_ ); |
| |
| if ( ! process ) |
| { |
| // Could not locate process by process id. If the exit |
| // occurred due to an early process termination on another |
| // node we won't have the process id. Try the look up by |
| // name instead. |
| process = lnode->GetNode()->GetProcess( name_, false ); |
| } |
| } |
| |
| if ( process ) |
| { |
| if ( (verifier_ != -1) && (verifier_ != process->GetVerifier()) ) |
| { |
| if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) |
| { |
| trace_printf("%s@%d - Exit %s (%d, %d:%d) failed -- verifier mismatch (%d)\n", |
| method_name, __LINE__, |
| name_, |
| nid_, |
| pid_, |
| verifier_, |
| process->GetVerifier()); |
| } |
| } |
| else |
| { |
| lnode->GetNode()->DelFromNameMap ( process ); |
| lnode->GetNode()->DelFromPidMap ( process ); |
| lnode->GetNode()->Exit_Process (process, abended_, -1); |
| } |
| } |
| else |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| sprintf(buf, "[%s], Can't find process %s (%d, %d) for processing " |
| "exit.\n", method_name, name_, nid_, pid_); |
| mon_log_write(MON_CLUSTER_HANDLEOTHERNODE_5, SQ_LOG_ERR, buf); |
| } |
| |
| TRACE_EXIT; |
| } |
| #else |
| CIntExitNsReq::CIntExitNsReq( ) |
| : CInternalReq() |
| , nid_(0) |
| , pid_(0) |
| , verifier_(-1) |
| { |
| // Add eyecatcher sequence as a debugging aid |
| memcpy(&eyecatcher_, "RQIE", 4); |
| |
| name_[0] = '\0'; |
| } |
| |
| CIntExitNsReq::~CIntExitNsReq( ) |
| { |
| // Alter eyecatcher sequence as a debugging aid to identify deleted object |
| memcpy(&eyecatcher_, "rqie", 4); |
| } |
| |
| void CIntExitNsReq::populateRequestString( void ) |
| { |
| char strBuf[MON_STRING_BUF_SIZE/2]; |
| sprintf( strBuf, "IntReq(%s) req #=%ld (name=%s/nid=%d/pid=%d/verifier=%d)" |
| "(msg=%p, sockFd=%d, origPNid=%d)" |
| , CReqQueue::intReqType[InternalType_Exit] |
| , getId(), name_, nid_, pid_, verifier_ |
| , msg_, sockFd_, origPNid_ ); |
| requestString_.assign( strBuf ); |
| } |
| |
| void CIntExitNsReq::prepRequest( struct exit_ns_def *exitDef ) |
| { |
| const char method_name[] = "CIntExitNsReq::prepRequest"; |
| TRACE_ENTRY; |
| |
| nid_ = exitDef->nid; |
| pid_ = exitDef->pid; |
| verifier_ = exitDef->verifier; |
| strcpy( name_, exitDef->name ); |
| abended_ = exitDef->abended; |
| msg_ = exitDef->msg; |
| sockFd_ = exitDef->sockFd; |
| origPNid_ = exitDef->origPNid; |
| |
| TRACE_EXIT; |
| } |
| |
| void CIntExitNsReq::performRequest() |
| { |
| const char method_name[] = "CIntExitReq::performRequest"; |
| TRACE_ENTRY; |
| |
| CProcess *process = NULL; |
| CLNode *lnode; |
| |
| // Check if this name server is handling monitor request |
| // from CExtDelProcessNsReq::performRequest() |
| if (origPNid_ == MyPNID) |
| { |
| msg_->noreply = false; |
| msg_->u.reply.type = ReplyType_DelProcessNs; |
| msg_->u.reply.u.del_process_ns.nid = nid_; |
| msg_->u.reply.u.del_process_ns.pid = pid_; |
| msg_->u.reply.u.del_process_ns.verifier = verifier_; |
| strncpy(msg_->u.reply.u.del_process_ns.process_name, name_, MAX_PROCESS_NAME); |
| msg_->u.reply.u.del_process_ns.return_code = MPI_SUCCESS; |
| |
| if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) |
| { |
| trace_printf("%s@%d - Replying to monitor sockFd_=%d, msg_=%p, deleted %s (%d, %d:%d)\n", |
| method_name, __LINE__, |
| sockFd_, |
| msg_, |
| name_, |
| nid_, |
| pid_, |
| verifier_ ); |
| } |
| |
| // Send reply to requesting monitor |
| monreply( msg_, sockFd_ ); |
| return; |
| } |
| |
| lnode = Nodes->GetLNode( nid_ ); |
| if ( lnode ) |
| { |
| process = lnode->GetNode()->GetProcess( pid_ ); |
| |
| if ( ! process ) |
| { |
| // Could not locate process by process id. If the exit |
| // occurred due to an early process termination on another |
| // node we won't have the process id. Try the look up by |
| // name instead. |
| process = lnode->GetNode()->GetProcess( name_, false ); |
| } |
| } |
| |
| if ( process ) |
| { |
| if ( (verifier_ != -1) && (verifier_ != process->GetVerifier()) ) |
| { |
| if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) |
| { |
| trace_printf("%s@%d - Exit %s (%d, %d:%d) failed -- verifier mismatch (%d)\n", |
| method_name, __LINE__, |
| name_, |
| nid_, |
| pid_, |
| verifier_, |
| process->GetVerifier()); |
| } |
| } |
| else |
| { |
| lnode->GetNode()->Exit_Process( process, abended_, -1 ); |
| } |
| } |
| else |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| sprintf(buf, "[%s], Can't find process %s (%d, %d) for processing " |
| "exit.\n", method_name, name_, nid_, pid_); |
| mon_log_write(MON_CLUSTER_HANDLEOTHERNODE_5, SQ_LOG_ERR, buf); |
| } |
| |
| TRACE_EXIT; |
| } |
| #endif |
| |
| #ifndef NAMESERVER_PROCESS |
| CIntIoDataReq::CIntIoDataReq( ioData_t *ioData ) |
| : CInternalReq() |
| , nid_( ioData->nid ) |
| , pid_( ioData->pid ) |
| , verifier_( ioData->verifier ) |
| , ioType_( ioData->ioType ) |
| , length_( ioData->length ) |
| { |
| // Add eyecatcher sequence as a debugging aid |
| memcpy(&eyecatcher_, "RqIK", 4); |
| memcpy(data_, ioData->data, (length_<=MAX_SYNC_DATA)?length_:MAX_SYNC_DATA); |
| } |
| |
| CIntIoDataReq::~CIntIoDataReq() |
| { |
| // Alter eyecatcher sequence as a debugging aid to identify deleted object |
| memcpy(&eyecatcher_, "rQik", 4); |
| } |
| |
| void CIntIoDataReq::populateRequestString( void ) |
| { |
| char strBuf[MON_STRING_BUF_SIZE/2]; |
| sprintf( strBuf, "IntReq(%s) req #=%ld (nid=%d/pid=%d/verifier=%d), type=%d, length=%d" |
| , CReqQueue::intReqType[InternalType_IoData] |
| , getId(), nid_, pid_, verifier_, ioType_, length_ ); |
| requestString_.assign( strBuf ); |
| } |
| |
| void CIntIoDataReq::performRequest() |
| { |
| const char method_name[] = "CIntIoDataReq::performRequest"; |
| TRACE_ENTRY; |
| |
| if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS)) |
| { |
| trace_printf( "%s@%d" " - IO data " |
| "to (%d,%d:%d), count=%d\n(%s)" |
| , method_name, __LINE__ |
| , nid_ |
| , pid_ |
| , verifier_ |
| , length_ |
| , length_?data_:"\n" ); |
| } |
| if ( MyNode->IsMyNode( nid_ ) ) |
| { |
| if (trace_settings & (TRACE_SYNC | TRACE_REDIRECTION)) |
| trace_printf( "%s@%d - processing IO Data for (%d, %d:%d)\n" |
| , method_name, __LINE__ |
| , nid_, pid_, verifier_ ); |
| |
| CLNode *lnode; |
| lnode = Nodes->GetLNode( nid_ ); |
| if ( lnode ) |
| { |
| CProcess *process; |
| process = lnode->GetProcessL( pid_ ); |
| if (process) |
| { |
| int fd; |
| if (ioType_ == STDIN_DATA) |
| { |
| fd = process->FdStdin(); |
| } |
| else |
| { |
| fd = process->FdStdout(); |
| } |
| Redirector.disposeIoData( fd, length_, data_ ); |
| } |
| else |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], Can't find process nid" |
| "=%d, pid=%d for processing IO Data.\n" |
| , method_name, nid_, pid_ ); |
| mon_log_write(MON_REQ_IODATA_1, SQ_LOG_ERR, buf); |
| } |
| } |
| } |
| |
| TRACE_EXIT; |
| } |
| #endif |
| |
| #ifndef NAMESERVER_PROCESS |
| CIntKillReq::CIntKillReq( struct kill_def *killDef ) |
| : CInternalReq() |
| , nid_( killDef->nid ) |
| , pid_( killDef->pid ) |
| , verifier_( killDef->verifier ) |
| , abort_( killDef->persistent_abort ) |
| { |
| // Add eyecatcher sequence as a debugging aid |
| memcpy(&eyecatcher_, "RQIK", 4); |
| } |
| |
| CIntKillReq::~CIntKillReq() |
| { |
| // Alter eyecatcher sequence as a debugging aid to identify deleted object |
| memcpy(&eyecatcher_, "rqik", 4); |
| } |
| |
| void CIntKillReq::populateRequestString( void ) |
| { |
| char strBuf[MON_STRING_BUF_SIZE/2]; |
| sprintf( strBuf, "IntReq(%s) req #=%ld (nid=%d/pid=%d/verifier=%d), abort=%d" |
| , CReqQueue::intReqType[InternalType_Kill] |
| , getId(), nid_, pid_, verifier_, abort_ ); |
| requestString_.assign( strBuf ); |
| } |
| |
| void CIntKillReq::performRequest() |
| { |
| const char method_name[] = "CIntKillReq::performRequest"; |
| TRACE_ENTRY; |
| |
| CProcess *process = Nodes->GetProcess( nid_, pid_, false ); |
| |
| if (process) |
| { |
| if ( (verifier_ != -1) && (verifier_ != process->GetVerifier()) ) |
| { |
| if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) |
| { |
| trace_printf("%s@%d - Kill (%d, %d:%d) failed -- verifier mismatch (%d)\n", |
| method_name, __LINE__, |
| nid_, |
| pid_, |
| verifier_, |
| process->GetVerifier()); |
| } |
| } |
| else |
| { |
| CLNode *lnode = Nodes->GetLNode( nid_ ); |
| if ( lnode && process->GetPid() == pid_ ) |
| { |
| |
| // Remove mapping of name to process object. |
| lnode->GetNode()->DelFromNameMap ( process ); |
| } |
| else if (trace_settings & TRACE_PROCESS) |
| { |
| trace_printf("%s@%d - Leaving %s (%d, %d) in namemap, killed " |
| "(%d, %d:%d)\n", method_name, __LINE__, |
| process->GetName(), process->GetNid(), |
| process->GetPid(), nid_, pid_, verifier_ ); |
| } |
| |
| process->SetAbort( abort_ ); |
| |
| if ( !process->IsClone() ) |
| { |
| // Indicate thate process is down and abended |
| lnode->GetNode()->SetProcessState( process, State_Down, true ); |
| |
| // Save the pid/verifier to cleanup LIO buffers after SIGCHLD |
| SQ_theLocalIOToClient->addToVerifierMap( process->GetPid() |
| , process->GetVerifier() ); |
| |
| // Kill the process, will get child death signal later |
| kill( pid_, Monitor->GetProcTermSig() ); |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS)) |
| trace_printf("%s@%d - Completed kill(%d) for (%d, %d:%d)\n", |
| method_name, __LINE__, Monitor->GetProcTermSig(), |
| nid_, pid_, verifier_); |
| } |
| else |
| { // Actual process is on another node. |
| if (trace_settings & TRACE_PROCESS_DETAIL) |
| trace_printf("%s@%d - Ingoring kill for clone process %s " |
| "(%d, %d:%d), state=%d\n", method_name, __LINE__, |
| process->GetName(), nid_, pid_, verifier_, |
| process->GetState() ); |
| } |
| |
| CProcess *parent = Nodes->GetProcess( process->GetParentNid(), |
| process->GetParentPid() ); |
| process->Switch(parent); // switch process pair roles if needed |
| } |
| } |
| else |
| { |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS)) |
| trace_printf("%s@%d - Killing process but can't find process " |
| "(%d, %d:%d)\n", method_name, __LINE__, |
| nid_, pid_, verifier_ ); |
| } |
| |
| TRACE_EXIT; |
| } |
| #endif |
| |
| #ifndef NAMESERVER_PROCESS |
| CIntNewProcReq::CIntNewProcReq( int nid |
| , PROCESSTYPE type |
| , int priority |
| , int backup |
| , int parentNid |
| , int parentPid |
| , Verifier_t parentVerifier |
| , int pairParentNid |
| , int pairParentPid |
| , Verifier_t pairParentVerifier |
| , int argc |
| , bool unhooked |
| , void *reqTag |
| , strId_t pathStrId |
| , strId_t ldpathStrId |
| , strId_t programStrId |
| , int nameLen |
| , int infileLen |
| , int outfileLen |
| , int argvLen |
| , const char * stringData ) |
| : CInternalReq(), |
| nid_ ( nid ), |
| type_( type ), |
| priority_( priority ), |
| backup_( backup ), |
| parentNid_( parentNid ), |
| parentPid_( parentPid ), |
| parentVerifier_( parentVerifier ), |
| pairParentNid_( pairParentNid ), |
| pairParentPid_( pairParentPid ), |
| pairParentVerifier_( pairParentVerifier ), |
| argc_( argc ), |
| unhooked_( unhooked ), |
| reqTag_ ( reqTag ), |
| pathStrId_ ( pathStrId ), |
| ldpathStrId_ ( ldpathStrId ), |
| programStrId_ ( programStrId ), |
| nameLen_ ( nameLen ), |
| infileLen_ ( infileLen ), |
| outfileLen_ ( outfileLen ), |
| argvLen_ ( argvLen ) |
| { |
| // Add eyecatcher sequence as a debugging aid |
| memcpy(&eyecatcher_, "RQIN", 4); |
| |
| int stringDataLen = nameLen_ + infileLen_ + outfileLen_ + argvLen_; |
| stringData_ = new char [stringDataLen]; |
| memcpy ( stringData_, stringData, stringDataLen ); |
| } |
| |
| CIntNewProcReq::~CIntNewProcReq( ) |
| { |
| // Alter eyecatcher sequence as a debugging aid to identify deleted object |
| memcpy(&eyecatcher_, "rqin", 4); |
| |
| delete [] stringData_; |
| } |
| |
| void CIntNewProcReq::populateRequestString( void ) |
| { |
| char strBuf[MON_STRING_BUF_SIZE/2]; |
| sprintf( strBuf, "IntReq(%s) req #=%ld (name=%s/nid=%d) parent(nid=%d/pid=%d)" |
| , CReqQueue::intReqType[InternalType_Process] |
| , getId(), |
| stringData_, |
| nid_, parentNid_, parentPid_ ); |
| requestString_.assign( strBuf ); |
| } |
| |
| void CIntNewProcReq::performRequest() |
| { |
| const char method_name[] = "CIntNewProcReq::performRequest"; |
| TRACE_ENTRY; |
| |
| CProcess *parentProcess = NULL; |
| int result = 0; |
| |
| if (trace_settings & TRACE_SYNC) |
| { |
| trace_printf("%s@%d - processing new process %s (%d, tbd) %s " |
| "(tag %p)\n", method_name, __LINE__, |
| stringData_, |
| nid_, (backup_?" Backup":""), reqTag_); |
| } |
| |
| if (parentNid_ == -1) |
| { |
| parentProcess = NULL; |
| } |
| else |
| { |
| parentProcess = Nodes->GetProcess( parentNid_, parentPid_ ); |
| if ( parentProcess ) |
| { |
| if ( (parentVerifier_ == -1) || |
| (parentVerifier_ == parentProcess->GetVerifier()) ) |
| { |
| if ( backup_ && |
| (parentProcess->GetPairParentNid() == -1 && |
| parentProcess->GetPairParentPid() == -1)) |
| { |
| parentProcess->SetPairParentNid( pairParentNid_ ); |
| parentProcess->SetPairParentPid( pairParentPid_ ); |
| parentProcess->SetPairParentVerifier( pairParentVerifier_ ); |
| } |
| } |
| } |
| else |
| { |
| if (NameServerEnabled) |
| { |
| if (trace_settings & TRACE_REQUEST) |
| trace_printf( "%s@%d" " - Getting parent process from Name Server (%d,%d:%d)\n" |
| , method_name, __LINE__ |
| , parentNid_ |
| , parentPid_ |
| , parentVerifier_ ); |
| parentProcess = Nodes->CloneProcessNs( parentNid_ |
| , parentPid_ |
| , parentVerifier_ ); |
| } |
| } |
| } |
| |
| if (parentProcess || unhooked_ ) |
| { |
| CLNode *lnode = Nodes->GetLNode(nid_); |
| |
| if ( lnode && |
| (lnode->GetState() == State_Up || |
| lnode->GetState() == State_Shutdown ) ) |
| { // Create the CProcess object and store the various |
| // process parameters. |
| // cause strings to be forwarded |
| string path; |
| string ldpath; |
| string program; |
| Config->strIdToString( pathStrId_, path ); |
| Config->strIdToString( ldpathStrId_, ldpath ); |
| Config->strIdToString( programStrId_, program ); |
| CProcess *newProcess ; |
| newProcess = lnode->GetNode()-> |
| CreateProcess ( parentProcess, |
| nid_, |
| type_, |
| 0, |
| priority_, |
| backup_, |
| unhooked_, |
| &stringData_[0], // process name |
| pathStrId_, |
| ldpathStrId_, |
| programStrId_, |
| &stringData_[nameLen_], // infile |
| &stringData_[nameLen_ + infileLen_], // outfile |
| reqTag_, |
| result); |
| if ( newProcess != NULL ) |
| { |
| newProcess->userArgs ( argc_, argvLen_, |
| &stringData_[nameLen_ + infileLen_ |
| + outfileLen_] ); |
| |
| // Create the new process (fork/exec) |
| if (newProcess->Create(newProcess->GetParent(), reqTag_, result)) |
| { |
| MyNode->AddToNameMap( newProcess ); |
| MyNode->AddToPidMap( newProcess->GetPid(), newProcess ); |
| |
| if (!NameServerEnabled) |
| { |
| // Successfully forked process. Replicate actual process |
| // id and process name. |
| CReplProcInit *repl |
| = new CReplProcInit(newProcess, reqTag_, 0, parentNid_); |
| Replicator.addItem(repl); |
| } |
| } |
| else |
| { |
| MyNode->DeleteFromList ( newProcess ); |
| newProcess = NULL; |
| } |
| } |
| |
| if ( newProcess == NULL ) |
| { |
| // Process creation failure, relay error code to node |
| // that requested process creation. |
| if (!NameServerEnabled) |
| { |
| CReplProcInit *repl = new CReplProcInit(newProcess, reqTag_, |
| result, parentNid_); |
| Replicator.addItem(repl); |
| } |
| } |
| } |
| } |
| else if ( parentProcess == NULL ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| sprintf(buf, "[%s], Can't find parent process nid=%d, pid=%d " |
| "for process create.\n", method_name, |
| parentNid_, parentPid_ ); |
| mon_log_write(MON_INTREQ_NEWPROC_2, SQ_LOG_ERR, buf); |
| } |
| |
| TRACE_EXIT; |
| } |
| #endif |
| |
| #ifdef NAMESERVER_PROCESS |
| CIntNewProcNsReq::CIntNewProcNsReq( int nid |
| , int pid |
| , Verifier_t verifier |
| , PROCESSTYPE type |
| , int priority |
| , int backup |
| , int parentNid |
| , int parentPid |
| , Verifier_t parentVerifier |
| , int pairParentNid |
| , int pairParentPid |
| , Verifier_t pairParentVerifier |
| , int argc |
| , bool unhooked |
| , void* reqTag |
| , int pathLen |
| , int ldpathLen |
| , int programLen |
| , int nameLen |
| , int infileLen |
| , int outfileLen |
| , int argvLen |
| , const char* stringData ) |
| : CInternalReq() |
| , nid_( nid ) |
| , pid_( pid ) |
| , verifier_( verifier ) |
| , type_( type ) |
| , priority_( priority ) |
| , backup_( backup ) |
| , parentNid_( parentNid ) |
| , parentPid_( parentPid ) |
| , parentVerifier_( parentVerifier ) |
| , pairParentNid_( pairParentNid ) |
| , pairParentPid_( pairParentPid ) |
| , pairParentVerifier_( pairParentVerifier ) |
| , argc_( argc ) |
| , unhooked_( unhooked ) |
| , reqTag_( reqTag ) |
| , pathLen_( pathLen ) |
| , ldpathLen_( ldpathLen ) |
| , programLen_( programLen ) |
| , nameLen_( nameLen ) |
| , infileLen_( infileLen ) |
| , outfileLen_( outfileLen ) |
| , argvLen_( argvLen ) |
| { |
| // Add eyecatcher sequence as a debugging aid |
| memcpy(&eyecatcher_, "RqIN", 4); |
| |
| int stringDataLen = pathLen_ |
| + ldpathLen_ |
| + programLen_ |
| + nameLen_ |
| + infileLen_ |
| + outfileLen_ |
| + argvLen_; |
| stringData_ = new char [stringDataLen]; |
| memcpy ( stringData_, stringData, stringDataLen ); |
| } |
| |
| CIntNewProcNsReq::~CIntNewProcNsReq( ) |
| { |
| // Alter eyecatcher sequence as a debugging aid to identify deleted object |
| memcpy(&eyecatcher_, "rQin", 4); |
| |
| delete [] stringData_; |
| } |
| |
| void CIntNewProcNsReq::populateRequestString( void ) |
| { |
| char strBuf[MON_STRING_BUF_SIZE/2]; |
| sprintf( strBuf, "IntReq(%s) req #=%ld (name=%s/nid=%d) parent(nid=%d/pid=%d)" |
| , CReqQueue::intReqType[InternalType_Process] |
| , getId(), |
| stringData_, |
| nid_, parentNid_, parentPid_ ); |
| requestString_.assign( strBuf ); |
| } |
| |
| void CIntNewProcNsReq::performRequest() |
| { |
| const char method_name[] = "CIntNewProcNsReq::performRequest"; |
| TRACE_ENTRY; |
| |
| CProcess *parentProcess = NULL; |
| int result = 0; |
| |
| if (trace_settings & TRACE_SYNC) |
| { |
| trace_printf("%s@%d - processing new process %s (%d, tbd) %s " |
| "(tag %p)\n", method_name, __LINE__, |
| stringData_, |
| nid_, (backup_?" Backup":""), reqTag_); |
| } |
| |
| if (parentNid_ == -1) |
| { |
| parentProcess = NULL; |
| } |
| else |
| { |
| parentProcess = Nodes->GetProcess( parentNid_, parentPid_ ); |
| if ( parentProcess ) |
| { |
| if ( (parentVerifier_ == -1) || |
| (parentVerifier_ == parentProcess->GetVerifier()) ) |
| { |
| if ( backup_ && |
| (parentProcess->GetPairParentNid() == -1 && |
| parentProcess->GetPairParentPid() == -1)) |
| { |
| parentProcess->SetPairParentNid( pairParentNid_ ); |
| parentProcess->SetPairParentPid( pairParentPid_ ); |
| parentProcess->SetPairParentVerifier( pairParentVerifier_ ); |
| } |
| } |
| } |
| } |
| |
| if (parentProcess || unhooked_ ) |
| { |
| CProcess *newProcess = NULL; |
| CLNode *lnode = Nodes->GetLNode(nid_); |
| |
| if ( lnode && |
| (lnode->GetState() == State_Up || |
| lnode->GetState() == State_Shutdown ) ) |
| { // Create the CProcess object and store the various |
| // process parameters. |
| newProcess = lnode->GetNode()-> |
| CreateProcess ( parentProcess, |
| nid_, |
| pid_, |
| verifier_, |
| false, |
| false, |
| type_, |
| 0, |
| priority_, |
| backup_, |
| unhooked_, |
| &stringData_[0], // process name |
| &stringData_[nameLen_ + infileLen_ + outfileLen_], // path |
| &stringData_[nameLen_ + infileLen_ + outfileLen_ + pathLen_], // ldpath |
| &stringData_[nameLen_ + infileLen_ + outfileLen_ + pathLen_ + ldpathLen_], // program |
| &stringData_[nameLen_], // infile |
| &stringData_[nameLen_ + infileLen_], // outfile |
| reqTag_, |
| result); |
| } |
| if ( newProcess == NULL ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| sprintf( buf, "[%s], Can't create process %s (%d,%d:%d)\n" |
| , method_name, &stringData_[0],nid_, pid_, verifier_ ); |
| mon_log_write(MON_INTREQ_NEWPROC_1, SQ_LOG_ERR, buf); |
| } |
| } |
| else if ( parentProcess == NULL ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| sprintf(buf, "[%s], Can't find parent process nid=%d, pid=%d " |
| "for process create.\n", method_name, |
| parentNid_, parentPid_ ); |
| mon_log_write(MON_INTREQ_NEWPROC_3, SQ_LOG_ERR, buf); |
| } |
| |
| TRACE_EXIT; |
| } |
| #endif |
| |
| #ifndef NAMESERVER_PROCESS |
| CIntNotifyReq::CIntNotifyReq( struct notify_def *notifyDef ) |
| : CInternalReq() |
| , nid_( notifyDef->nid ) |
| , pid_( notifyDef->pid ) |
| , verifier_( notifyDef->verifier ) |
| , canceled_( notifyDef->canceled ) |
| , targetNid_( notifyDef->target_nid ) |
| , targetPid_( notifyDef->target_pid ) |
| , targetVerifier_( notifyDef->target_verifier ) |
| , transId_( notifyDef->trans_id ) |
| { |
| // Add eyecatcher sequence as a debugging aid |
| memcpy(&eyecatcher_, "RQI1", 4); |
| } |
| |
| CIntNotifyReq::~CIntNotifyReq() |
| { |
| // Alter eyecatcher sequence as a debugging aid to identify deleted object |
| memcpy(&eyecatcher_, "rqi1", 4); |
| } |
| |
| void CIntNotifyReq::populateRequestString( void ) |
| { |
| char strBuf[MON_STRING_BUF_SIZE/2]; |
| sprintf( strBuf, "IntReq(%s) req #=%ld notify(nid=%d/pid=%d/verifier=%d) death target(nid=%d/pid=%d/verifier=%d), canceled=%d, transId=%lld.%lld.%lld.%lld" |
| , CReqQueue::intReqType[InternalType_Notify] |
| , getId() |
| , nid_ |
| , pid_ |
| , verifier_ |
| , targetNid_ |
| , targetPid_ |
| , targetVerifier_ |
| , canceled_ |
| , transId_.txid[0] |
| , transId_.txid[1] |
| , transId_.txid[2] |
| , transId_.txid[3] ); |
| requestString_.assign( strBuf ); |
| } |
| |
| void CIntNotifyReq::performRequest() |
| { |
| const char method_name[] = "CIntNotifyReq::performRequest"; |
| TRACE_ENTRY; |
| |
| //CLNode *node; |
| //CLNode *targetNode; |
| CProcess *sourceProcess = NULL; |
| CProcess *targetProcess = NULL; |
| |
| if (trace_settings & TRACE_PROCESS_DETAIL) |
| { |
| trace_printf( "%s@%d - Process death notification, canceled=%d\n" |
| , method_name, __LINE__ |
| , canceled_ ); |
| } |
| |
| |
| if (trace_settings & TRACE_REQUEST) |
| { |
| trace_printf( "%s@%d" " - Finding targetProcess (%d,%d:%d)\n" |
| , method_name, __LINE__ |
| , nid_ |
| , pid_ |
| , verifier_ ); |
| } |
| |
| // find by nid,pid (check node state, don't check process state, backup is Ok) |
| sourceProcess = Nodes->GetProcess( nid_ |
| , pid_ |
| , verifier_ |
| , true, false, true ); |
| if ( sourceProcess ) |
| { |
| if (trace_settings & TRACE_REQUEST) |
| { |
| trace_printf( "%s@%d - Found sourceProcess %s (%d,%d:%d), clone=%d\n" |
| , method_name, __LINE__ |
| , sourceProcess->GetName() |
| , sourceProcess->GetNid() |
| , sourceProcess->GetPid() |
| , sourceProcess->GetVerifier() |
| , sourceProcess->IsClone() ); |
| } |
| } |
| else |
| { |
| if (!NameServerEnabled) |
| { |
| if (trace_settings & TRACE_REQUEST) |
| { |
| trace_printf("%s@%d - Can't find sourceProcess\n", method_name, __LINE__); |
| } |
| } |
| else |
| { // Name Server find by nid,pid:verifier |
| if (trace_settings & TRACE_REQUEST) |
| { |
| trace_printf( "%s@%d" " - Getting sourceProcess from Name Server (%d,%d:%d)\n" |
| , method_name, __LINE__ |
| , nid_ |
| , pid_ |
| , verifier_ ); |
| } |
| sourceProcess = Nodes->CloneProcessNs( nid_ |
| , pid_ |
| , verifier_ ); |
| if (sourceProcess) |
| { |
| if (trace_settings & TRACE_REQUEST) |
| trace_printf( "%s@%d - Found sourceProcess %s (%d,%d:%d), clone=%d\n" |
| , method_name, __LINE__ |
| , sourceProcess->GetName() |
| , sourceProcess->GetNid() |
| , sourceProcess->GetPid() |
| , sourceProcess->GetVerifier() |
| , sourceProcess->IsClone() ); |
| } |
| else |
| { |
| trace_printf( "%s@%d" " - Can't find sourceProcess (%d,%d:%d)\n" |
| , method_name, __LINE__ |
| , nid_ |
| , pid_ |
| , verifier_ ); |
| } |
| } |
| } |
| |
| if ( sourceProcess ) |
| { |
| if (trace_settings & TRACE_REQUEST) |
| { |
| trace_printf( "%s@%d" " - Finding targetProcess (%d,%d:%d)\n" |
| , method_name, __LINE__ |
| , targetNid_ |
| , targetPid_ |
| , targetVerifier_ ); |
| } |
| |
| // find by nid,pid (check node state, don't check process state, backup is Ok) |
| targetProcess = Nodes->GetProcess( targetNid_ |
| , targetPid_ |
| , targetVerifier_ |
| , true, false, true ); |
| if ( targetProcess ) |
| { |
| if (trace_settings & TRACE_REQUEST) |
| { |
| trace_printf( "%s@%d - Found targetProcess %s (%d,%d:%d), clone=%d\n" |
| , method_name, __LINE__ |
| , targetProcess->GetName() |
| , targetProcess->GetNid() |
| , targetProcess->GetPid() |
| , targetProcess->GetVerifier() |
| , targetProcess->IsClone() ); |
| } |
| } |
| else |
| { |
| if (!NameServerEnabled) |
| { |
| if (trace_settings & TRACE_REQUEST) |
| { |
| trace_printf("%s@%d - Can't find targetProcess\n", method_name, __LINE__); |
| } |
| } |
| else |
| { // Name Server find by nid,pid:verifier |
| if (trace_settings & TRACE_REQUEST) |
| { |
| trace_printf( "%s@%d" " - Getting targetProcess from Name Server (%d,%d:%d)\n" |
| , method_name, __LINE__ |
| , targetNid_ |
| , targetPid_ |
| , targetVerifier_ ); |
| } |
| targetProcess = Nodes->CloneProcessNs( targetNid_ |
| , targetPid_ |
| , targetVerifier_ ); |
| if (targetProcess) |
| { |
| if (trace_settings & TRACE_REQUEST) |
| trace_printf( "%s@%d - Found targetProcess %s (%d,%d:%d), clone=%d\n" |
| , method_name, __LINE__ |
| , targetProcess->GetName() |
| , targetProcess->GetNid() |
| , targetProcess->GetPid() |
| , targetProcess->GetVerifier() |
| , targetProcess->IsClone() ); |
| } |
| else |
| { |
| trace_printf( "%s@%d" " - Can't find targetProcess (%d,%d:%d)\n" |
| , method_name, __LINE__ |
| , targetNid_ |
| , targetPid_ |
| , targetVerifier_ ); |
| } |
| } |
| } |
| |
| if ( targetProcess ) |
| { |
| if (canceled_) |
| { |
| if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) |
| { |
| trace_printf( "%s@%d - Process (%d, %d:%d) deleting death " |
| "notice interest for %s (%d, %d:%d), " |
| "trans_id=%lld.%lld.%lld.%lld\n" |
| , method_name, __LINE__ |
| , nid_ |
| , pid_ |
| , verifier_ |
| , targetProcess->GetName() |
| , targetNid_ |
| , targetPid_ |
| , targetVerifier_ |
| , transId_.txid[0] |
| , transId_.txid[1] |
| , transId_.txid[2] |
| , transId_.txid[3] ); |
| } |
| |
| // Remove death notice registration |
| targetProcess->CancelDeathNotification( nid_ |
| , pid_ |
| , verifier_ |
| , transId_ ); |
| } |
| else |
| { |
| if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) |
| { |
| trace_printf("%s@%d - Process (%d, %d:%d) registering interest " |
| "in death of process %s (%d, %d:%d), " |
| "trans_id=%lld.%lld.%lld.%lld\n" |
| , method_name, __LINE__ |
| , nid_ |
| , pid_ |
| , verifier_ |
| , targetProcess->GetName() |
| , targetNid_ |
| , targetPid_ |
| , targetVerifier_ |
| , transId_.txid[0] |
| , transId_.txid[1] |
| , transId_.txid[2] |
| , transId_.txid[3] ); |
| } |
| // Register interest with the target process |
| sourceProcess->procExitReg( targetProcess, transId_); |
| } |
| } |
| else |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| sprintf( buf |
| , "[%s], Can't find target process nid=%d, pid=%d:%d for " |
| "processing process death notification.\n" |
| , method_name |
| , targetNid_ |
| , targetPid_ |
| , targetVerifier_ ); |
| mon_log_write(MON_INTREQ_NOTIFY_1, SQ_LOG_INFO, buf); |
| } |
| } |
| else |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| sprintf( buf |
| , "[%s], Can't find sourceProcess nid=%d, pid=%d:%d for " |
| "processing process death notification.\n" |
| , method_name |
| , nid_ |
| , pid_ |
| , verifier_ ); |
| mon_log_write(MON_INTREQ_NOTIFY_3, SQ_LOG_INFO, buf); |
| } |
| |
| TRACE_EXIT; |
| } |
| #endif |
| |
| #ifndef NAMESERVER_PROCESS |
| CIntOpenReq::CIntOpenReq( struct open_def *openDef ) |
| : CInternalReq() |
| , openerNid_( openDef->nid ) |
| , openerPid_( openDef->pid ) |
| , openerVerifier_( openDef->verifier ) |
| , openedNid_( openDef->opened_nid ) |
| , openedPid_( openDef->opened_pid ) |
| , openedVerifier_( openDef->opened_verifier ) |
| { |
| // Add eyecatcher sequence as a debugging aid |
| memcpy(&eyecatcher_, "RQIO", 4); |
| } |
| |
| CIntOpenReq::~CIntOpenReq() |
| { |
| // Alter eyecatcher sequence as a debugging aid to identify deleted object |
| memcpy(&eyecatcher_, "rqio", 4); |
| } |
| |
| void CIntOpenReq::populateRequestString( void ) |
| { |
| char strBuf[MON_STRING_BUF_SIZE/2]; |
| sprintf( strBuf, "IntReq(%s) req #=%ld opener(nid=%d/pid=%d/verifier=%d) opened(nid=%d/pid=%d/verifier=%d)" |
| , CReqQueue::intReqType[InternalType_Open] |
| , getId() |
| , openerNid_ |
| , openerPid_ |
| , openerVerifier_ |
| , openedNid_ |
| , openedPid_ |
| , openedVerifier_ ); |
| requestString_.assign( strBuf ); |
| } |
| |
| void CIntOpenReq::performRequest() |
| { |
| const char method_name[] = "CIntOpenReq::performRequest"; |
| TRACE_ENTRY; |
| |
| CProcess *process = Nodes->GetProcess( openedNid_, openedPid_ ); |
| |
| if (process) |
| { |
| if ( (openedVerifier_ != -1) && (openedVerifier_ != process->GetVerifier()) ) |
| { |
| if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) |
| { |
| trace_printf("%s@%d - Open (%d, %d:%d) failed -- verifier mismatch (%d)\n", |
| method_name, __LINE__, |
| openedNid_, |
| openedPid_, |
| openedVerifier_, |
| process->GetVerifier()); |
| } |
| } |
| else |
| { |
| Nodes->GetLNode (openerNid_)-> |
| Open_Process (openerNid_, |
| openerPid_, |
| openerVerifier_, |
| 0, // notification will be handle independently |
| process); |
| } |
| } |
| else |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| sprintf(buf, "[%s], Can't find process nid=%d, pid=%d for " |
| "processing open.\n", method_name, openedNid_, openedPid_ ); |
| mon_log_write(MON_CLUSTER_HANDLEOTHERNODE_11, SQ_LOG_ERR, buf); |
| } |
| |
| TRACE_EXIT; |
| } |
| #endif |
| |
| CIntProcInitReq::CIntProcInitReq( struct process_init_def *procInitDef ) |
| : CInternalReq() |
| , nid_( procInitDef->nid ) |
| , pid_( procInitDef->pid ) |
| , verifier_( procInitDef->verifier ) |
| , state_( procInitDef->state ) |
| , result_( procInitDef->result ) |
| , process_( (CProcess *) procInitDef->tag ) |
| { |
| // Add eyecatcher sequence as a debugging aid |
| memcpy(&eyecatcher_, "RQII", 4); |
| |
| STRCPY(name_, procInitDef->name); |
| } |
| |
| CIntProcInitReq::~CIntProcInitReq() |
| { |
| // Alter eyecatcher sequence as a debugging aid to identify deleted object |
| memcpy(&eyecatcher_, "rqii", 4); |
| } |
| |
| void CIntProcInitReq::populateRequestString( void ) |
| { |
| char strBuf[MON_STRING_BUF_SIZE/2]; |
| sprintf( strBuf, "IntReq(%s) req #=%ld (name=%s/nid=%d/pid=%d,verifier=%d)" |
| , CReqQueue::intReqType[InternalType_ProcessInit] |
| , getId(), name_, nid_, pid_, verifier_ ); |
| requestString_.assign( strBuf ); |
| } |
| |
| void CIntProcInitReq::performRequest() |
| { |
| const char method_name[] = "CIntProcInitReq::performRequest"; |
| TRACE_ENTRY; |
| |
| if (trace_settings & (TRACE_SYNC | TRACE_PROCESS)) |
| trace_printf( "%s@%d - processing process init %s (%d, %d), result=%d, tag=%p\n" |
| , method_name, __LINE__ |
| , name_, nid_, pid_, result_, static_cast<void*>(process_) ); |
| |
| if ( result_ != 0 ) |
| { // Was unable to create the process, send response to requester |
| if ( process_ ) |
| { |
| #ifndef NAMESERVER_PROCESS |
| // this will send response to to the requester and remove the process object |
| MyNode->Exit_Process(process_, true, process_->GetNid()); |
| #endif |
| } |
| } |
| else if ( process_ ) |
| { |
| // Update process state information |
| process_->SetPid ( pid_ ); |
| process_->SetVerifier ( verifier_ ); |
| process_->SetState ( state_ ); |
| process_->SetName ( name_ ); |
| |
| // Add to pid and name maps |
| Nodes->GetLNode( process_->GetNid() )->GetNode()->AddToPidMap(process_->GetPid(), process_); |
| Nodes->GetLNode( process_->GetNid() )->GetNode()->AddToNameMap(process_); |
| |
| CProcess* parent; |
| |
| if (process_->IsBackup()) |
| { |
| parent = Nodes->GetProcess(process_->GetParentNid(), |
| process_->GetParentPid(), false); |
| if (parent) |
| { // Set link from primary process object to |
| // this backup process object. |
| if (trace_settings & (TRACE_SYNC | TRACE_PROCESS)) |
| { |
| trace_printf( "%s@%d - For backup process %s (%d,%d:%d)" |
| ", for parent %s (%d,%d:%d) setting " |
| "parent's Parent_Nid/Parent_Pid=" |
| "(%d,%d).\n" |
| , method_name, __LINE__ |
| , process_->GetName() |
| , process_->GetNid() |
| , process_->GetPid() |
| , process_->GetVerifier() |
| , parent->GetName() |
| , parent->GetNid() |
| , parent->GetPid() |
| , parent->GetVerifier() |
| , process_->GetNid() |
| , process_->GetPid()); |
| } |
| parent->SetParentNid ( process_->GetNid() ); |
| parent->SetParentPid ( process_->GetPid() ); |
| } |
| } |
| #ifndef NAMESERVER_PROCESS |
| if (NameServerEnabled) |
| { |
| if (process_->IsUnhooked()) |
| { |
| if ( process_->GetParentNid() != -1 && process_->GetParentPid() != -1 ) |
| { |
| parent = Nodes->GetProcess(process_->GetParentNid(), |
| process_->GetParentPid(), false); |
| if (parent && !parent->IsClone()) |
| { // Parent process object keeps track of child processes |
| // created. Needed when parent process exits to clean up |
| // parent clone process object in remote nodes. |
| if (trace_settings & (TRACE_SYNC_DETAIL | TRACE_REQUEST_DETAIL |
| | TRACE_PROCESS_DETAIL)) |
| trace_printf( "%s@%d - Adding unhooked child process %s (%d,%d:%d) to " |
| "parent %s (%d,%d:%d)\n" |
| , method_name, __LINE__ |
| , process_->GetName() |
| , process_->GetNid() |
| , process_->GetPid() |
| , process_->GetVerifier() |
| , parent->GetName() |
| , parent->GetNid() |
| , parent->GetPid() |
| , parent->GetVerifier() ); |
| |
| parent->childUnHookedAdd( process_->GetNid() |
| , process_->GetPid() ); |
| } |
| } |
| } |
| } |
| #endif |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| CIntSetReq::CIntSetReq( ConfigType type, const char *group, const char *key, |
| const char *value) |
| : CInternalReq(), |
| type_( type ) |
| { |
| // Add eyecatcher sequence as a debugging aid |
| memcpy(&eyecatcher_, "RQIS", 4); |
| |
| STRCPY( group_, group ); |
| STRCPY( key_, key ); |
| STRCPY( value_, value ); |
| } |
| |
| CIntSetReq::~CIntSetReq() |
| { |
| // Alter eyecatcher sequence as a debugging aid to identify deleted object |
| memcpy(&eyecatcher_, "rqis", 4); |
| } |
| |
| void CIntSetReq::populateRequestString( void ) |
| { |
| char strBuf[MON_STRING_BUF_SIZE/2]; |
| sprintf( strBuf, "IntReq(%s) req #=%ld (group=%s/key=%s)" |
| , CReqQueue::intReqType[InternalType_Set] |
| , getId(), group_, key_ ); |
| requestString_.assign( strBuf ); |
| } |
| |
| void CIntSetReq::performRequest() |
| { |
| const char method_name[] = "CIntSetReq::performRequest"; |
| TRACE_ENTRY; |
| |
| Config->Set( group_, type_, key_, value_, true ); |
| |
| TRACE_EXIT; |
| } |
| |
| #ifndef NAMESERVER_PROCESS |
| CIntStdInReq::CIntStdInReq( struct stdin_req_def *stdin_req ) |
| : CInternalReq() |
| , nid_( stdin_req->nid ) |
| , pid_( stdin_req->pid ) |
| , verifier_( stdin_req->verifier ) |
| , reqType_( stdin_req->reqType ) |
| , supplierNid_( stdin_req->supplier_nid ) |
| , supplierPid_( stdin_req->supplier_pid ) |
| { |
| // Add eyecatcher sequence as a debugging aid |
| memcpy(&eyecatcher_, "RqIS", 4); |
| } |
| |
| CIntStdInReq::~CIntStdInReq() |
| { |
| // Alter eyecatcher sequence as a debugging aid to identify deleted object |
| memcpy(&eyecatcher_, "rQis", 4); |
| } |
| |
| void CIntStdInReq::populateRequestString( void ) |
| { |
| char strBuf[MON_STRING_BUF_SIZE/2]; |
| sprintf( strBuf, "IntReq(%s) req #=%ld (nid=%d/pid=%d/verifier=%d), " |
| "type=%d, supplier (%d,%d)" |
| , CReqQueue::intReqType[InternalType_StdinReq] |
| , getId(), nid_, pid_, verifier_, reqType_ |
| , supplierNid_, supplierPid_ ); |
| requestString_.assign( strBuf ); |
| } |
| |
| void CIntStdInReq::performRequest() |
| { |
| const char method_name[] = "CIntStdInReq::performRequest"; |
| TRACE_ENTRY; |
| |
| if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS)) |
| { |
| trace_printf("%s@%d - stdin request from (%d,%d:%d)" |
| ", type=%d, for supplier (%d,%d)\n" |
| , method_name, __LINE__ |
| , nid_ |
| , pid_ |
| , verifier_ |
| , reqType_ |
| , supplierNid_ |
| , supplierPid_ ); |
| } |
| |
| if ( !MyNode->IsMyNode( supplierNid_ ) ) |
| { |
| return; |
| } |
| |
| CLNode *lnode; |
| lnode = Nodes->GetLNode( nid_ ); |
| if ( lnode == NULL ) |
| { |
| return; |
| } |
| |
| CProcess *process; |
| process = lnode->GetProcessL( pid_ ); |
| if (process) |
| { |
| if (reqType_ == STDIN_REQ_DATA) |
| { |
| // Set up to forward stdin data to requester. |
| // Save file descriptor associated with stdin |
| // so can find the redirector object later. |
| CProcess *supProcess; |
| lnode = Nodes->GetLNode( supplierNid_ ); |
| if ( lnode ) |
| { |
| supProcess = lnode->GetProcessL ( supplierPid_ ); |
| if (supProcess) |
| { |
| int fd; |
| fd = Redirector.stdinRemote( supProcess->infile() |
| , supplierNid_ |
| , supplierPid_ ); |
| process->FdStdin(fd); |
| } |
| else |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf), |
| "[%s], Can't find supplier process " |
| "nid=%d, pid=%d for stdin data request.\n" |
| , method_name |
| , supplierNid_ |
| , supplierPid_); |
| mon_log_write(MON_REQ_STDIN_1, SQ_LOG_ERR, buf); |
| } |
| } |
| } |
| else if (reqType_ == STDIN_FLOW_OFF) |
| { |
| Redirector.stdinOff(process->FdStdin()); |
| } |
| else if (reqType_ == STDIN_FLOW_ON) |
| { |
| Redirector.stdinOn(process->FdStdin()); |
| } |
| } |
| else |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], Can't find process nid=%d, " |
| "pid=%d for stdin data request.\n" |
| , method_name |
| , nid_ |
| , pid_ ); |
| mon_log_write(MON_REQ_STDIN_2, SQ_LOG_ERR, buf); |
| } |
| |
| TRACE_EXIT; |
| } |
| #endif |
| |
| CIntUniqStrReq::CIntUniqStrReq( int nid, int id, const char *value ) |
| : CInternalReq(), nid_(nid), id_(id) |
| { |
| // Add eyecatcher sequence as a debugging aid |
| memcpy(&eyecatcher_, "RQIF", 4); |
| |
| STRCPY( value_, value ); |
| } |
| |
| CIntUniqStrReq::~CIntUniqStrReq() |
| { |
| // Alter eyecatcher sequence as a debugging aid to identify deleted object |
| memcpy(&eyecatcher_, "rqif", 4); |
| } |
| |
| void CIntUniqStrReq::populateRequestString( void ) |
| { |
| char strBuf[MON_STRING_BUF_SIZE/2]; |
| sprintf( strBuf, "IntReq(%s) req #=%ld (nid=%d/id=%d)" |
| , CReqQueue::intReqType[InternalType_UniqStr] |
| , getId(), nid_, id_ ); |
| requestString_.assign( strBuf ); |
| } |
| |
| void CIntUniqStrReq::performRequest() |
| { |
| const char method_name[] = "CIntUniqStrReq::performRequest"; |
| TRACE_ENTRY; |
| |
| Config->addUniqueString( nid_, id_, value_ ); |
| |
| TRACE_EXIT; |
| } |
| |
| |
| CIntChildDeathReq::CIntChildDeathReq( pid_t pid ) |
| : CInternalReq(), pid_(pid), process_(NULL) |
| { |
| const char method_name[] = "CIntChildDeathReq::CIntChildDeathReq"; |
| |
| // Add eyecatcher sequence as a debugging aid |
| memcpy(&eyecatcher_, "RQIC", 4); |
| |
| process_ = MyNode->GetProcess( pid ); |
| |
| if (!process_) |
| { |
| if (trace_settings & TRACE_PROCESS) |
| { |
| trace_printf("%s@%d Process %d not found so unable to set " |
| "process state.\n", method_name, __LINE__, pid); |
| } |
| } |
| } |
| |
| CIntChildDeathReq::~CIntChildDeathReq( ) |
| { |
| // Alter eyecatcher sequence as a debugging aid to identify deleted object |
| memcpy(&eyecatcher_, "rqic", 4); |
| } |
| |
| void CIntChildDeathReq::populateRequestString( void ) |
| { |
| char strBuf[MON_STRING_BUF_SIZE/2]; |
| sprintf( strBuf, "IntReq(ChildDeath) req #=%ld (pid=%d)" |
| , getId(), pid_ ); |
| requestString_.assign( strBuf ); |
| } |
| |
| void CIntChildDeathReq::performRequest() |
| { |
| const char method_name[] = "CIntChildDeathReq::performRequest"; |
| |
| // process could have been deleted by a previous child death |
| // request. This could happen if child death signal did not arrive |
| // in hangupTime + PROCESS_DEATH_MARGIN. So get process ptr again. |
| process_ = MyNode->GetProcess( pid_ ); |
| |
| if ( process_ != NULL) |
| { |
| if (trace_settings & TRACE_PROCESS) |
| { |
| trace_printf( "%s@%d Processing child death " |
| "of process %s (%d, %d:%d)\n" |
| , method_name, __LINE__ |
| , process_->GetName() |
| , process_->GetNid() |
| , process_->GetPid() |
| , process_->GetVerifier() ); |
| } |
| #ifndef NAMESERVER_PROCESS |
| if ( NameServerEnabled ) |
| { |
| int rc = NameServer->ProcessDelete(process_); // in reqQueue thread (CIntChildDeathReq) |
| if (rc) |
| { |
| char la_buf[MON_STRING_BUF_SIZE]; |
| snprintf( la_buf, sizeof(la_buf) |
| , "[%s] - Process delete request to Name Server failed" |
| "for child process %s (%d, %d:%d)\n" |
| , method_name |
| , process_->GetName() |
| , process_->GetNid() |
| , process_->GetPid() |
| , process_->GetVerifier() ); |
| mon_log_write(MON_INTREQ_CHILDDEATH_1, SQ_LOG_ERR, la_buf); |
| } |
| } |
| #endif |
| MyNode->DelFromNameMap ( process_ ); |
| MyNode->DelFromPidMap ( process_ ); |
| |
| // if state is still Up, then process has not called exit. |
| bool abended = (process_->GetState() == State_Up); |
| MyNode->SetProcessState(process_, State_Stopped, abended); |
| |
| } |
| } |
| |
| CIntAttachedDeathReq::CIntAttachedDeathReq( pid_t pid ) |
| : CInternalReq(), pid_(pid) |
| { |
| // Add eyecatcher sequence as a debugging aid |
| memcpy(&eyecatcher_, "RQIA", 4); |
| } |
| |
| CIntAttachedDeathReq::~CIntAttachedDeathReq( ) |
| { |
| // Alter eyecatcher sequence as a debugging aid to identify deleted object |
| memcpy(&eyecatcher_, "rqia", 4); |
| } |
| |
| void CIntAttachedDeathReq::populateRequestString( void ) |
| { |
| char strBuf[MON_STRING_BUF_SIZE/2]; |
| sprintf( strBuf, "IntReq(AttachedDeath) req #=%ld (pid=%d)" |
| , getId(), pid_ ); |
| requestString_.assign( strBuf ); |
| } |
| |
| void CIntAttachedDeathReq::performRequest() |
| { |
| CProcess * process = MyNode->GetProcess ( pid_ ); |
| |
| if (process) |
| { |
| MyNode->DelFromNameMap ( process ); |
| MyNode->DelFromPidMap ( process ); |
| |
| MyNode->SetProcessState(process, State_Stopped, true); // abend |
| } |
| } |
| |
| CIntShutdownReq::CIntShutdownReq( int level ) |
| : CInternalReq(), |
| level_ ( level ) |
| { |
| // Add eyecatcher sequence as a debugging aid |
| memcpy(&eyecatcher_, "RQIH", 4); |
| } |
| |
| CIntShutdownReq::~CIntShutdownReq() |
| { |
| // Alter eyecatcher sequence as a debugging aid to identify deleted object |
| memcpy(&eyecatcher_, "rqih", 4); |
| } |
| |
| void CIntShutdownReq::populateRequestString( void ) |
| { |
| char strBuf[MON_STRING_BUF_SIZE/2]; |
| sprintf( strBuf, "IntReq(%s) req #=%ld (level=%d)" |
| , CReqQueue::intReqType[InternalType_Shutdown] |
| , getId(), level_ ); |
| requestString_.assign( strBuf ); |
| } |
| |
| void CIntShutdownReq::performRequest() |
| { |
| const char method_name[] = "CIntShutdownReq::performRequest"; |
| TRACE_ENTRY; |
| |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST)) |
| trace_printf("%s@%d - Shutdown request, level=%d\n", |
| method_name, __LINE__, level_); |
| |
| MyNode->SetShutdownLevel( (ShutdownLevel) level_ ); |
| |
| // only abrupt case is supported through this mechanism at present. |
| // modify this assert as more shutdown levels are supported here. |
| assert(level_ == ShutdownLevel_Abrupt); |
| |
| if( !getenv("SQ_VIRTUAL_NODES") ) |
| { |
| // Execute shutdown via the Watchdog process |
| HealthCheck.setState(MON_SHUT_DOWN); |
| // wait forever if not a spare node |
| // spare node will go through clean shutdown |
| if ( !MyNode->IsSpareNode() ) |
| { |
| for (;;) |
| sleep(10000); |
| } |
| } |
| else |
| { |
| // Stop all processes |
| Monitor->HardNodeDown( MyPNID ); |
| #ifndef NAMESERVER_PROCESS |
| MyNode->EmptyQuiescingPids(); |
| #endif |
| // now stop the Watchdog process |
| HealthCheck.setState(MON_NODE_DOWN); |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| CIntNameServerAddReq::CIntNameServerAddReq( int req_nid |
| , int req_pid |
| , Verifier_t req_verifier |
| , char *nodeName |
| ) |
| : CInternalReq() |
| , req_nid_(req_nid) |
| , req_pid_(req_pid) |
| , req_verifier_(req_verifier) |
| { |
| // Add eyecatcher sequence as a debugging aid |
| memcpy(&eyecatcher_, "RqIA", 4); |
| STRCPY( nodeName_, nodeName ); |
| } |
| |
| CIntNameServerAddReq::~CIntNameServerAddReq() |
| { |
| // Alter eyecatcher sequence as a debugging aid to identify deleted object |
| memcpy(&eyecatcher_, "rQia", 4); |
| } |
| |
| void CIntNameServerAddReq::populateRequestString( void ) |
| { |
| char strBuf[MON_STRING_BUF_SIZE/2]; |
| snprintf( strBuf, sizeof(strBuf), |
| "IntReq(%s) req #=%ld " |
| "(node_name=%s)" |
| , CReqQueue::intReqType[InternalType_NodeAdd] |
| , getId() |
| , nodeName_ ); |
| requestString_.assign( strBuf ); |
| } |
| |
| void CIntNameServerAddReq::performRequest() |
| { |
| const char method_name[] = "CIntNameServerAddReq::performRequest"; |
| TRACE_ENTRY; |
| |
| int rc = MPI_SUCCESS; |
| CProcess *requester = NULL; |
| |
| if (trace_settings & (TRACE_SYNC | TRACE_PROCESS)) |
| { |
| trace_printf("%s@%d - NameServer add request (%s), " |
| "node_name=%s\n" |
| , method_name, __LINE__ |
| , requester ? requester->GetName() : "" |
| , nodeName_); |
| } |
| |
| CNameServerConfigContainer *nameServerConfig = Nodes->GetNameServerConfig(); |
| |
| // Insert nameserver in configuration database |
| if ( nameServerConfig->SaveConfig( nodeName_ ) ) |
| { |
| } |
| else |
| { |
| rc = MPI_ERR_IO; |
| } |
| |
| requester = Nodes->GetProcess( req_nid_ |
| , req_pid_ |
| , req_verifier_ ); |
| if (requester) |
| { |
| // Reply to requester |
| requester->CompleteRequest( rc ); |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| CIntNameServerDeleteReq::CIntNameServerDeleteReq( int req_nid |
| , int req_pid |
| , Verifier_t req_verifier |
| , const char *nodeName ) |
| : CInternalReq() |
| , req_nid_(req_nid) |
| , req_pid_(req_pid) |
| , req_verifier_(req_verifier) |
| { |
| // Add eyecatcher sequence as a debugging aid |
| memcpy(&eyecatcher_, "RqIB", 4); |
| STRCPY( nodeName_, nodeName ); |
| } |
| |
| CIntNameServerDeleteReq::~CIntNameServerDeleteReq() |
| { |
| // Alter eyecatcher sequence as a debugging aid to identify deleted object |
| memcpy(&eyecatcher_, "rQib", 4); |
| } |
| |
| void CIntNameServerDeleteReq::populateRequestString( void ) |
| { |
| char strBuf[MON_STRING_BUF_SIZE/2]; |
| snprintf( strBuf, sizeof(strBuf), |
| "IntReq(%s) req #=%ld " |
| "(node=%s)" |
| , CReqQueue::intReqType[InternalType_NameServerDelete] |
| , getId() |
| , nodeName_ ); |
| requestString_.assign( strBuf ); |
| } |
| |
| void CIntNameServerDeleteReq::performRequest() |
| { |
| const char method_name[] = "CIntNameServerDeleteReq::performRequest"; |
| TRACE_ENTRY; |
| |
| int rc = MPI_SUCCESS; |
| CProcess *requester = NULL; |
| |
| requester = Nodes->GetProcess( req_nid_ |
| , req_pid_ |
| , req_verifier_ ); |
| |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST)) |
| trace_printf( "%s@%d - NameServer delete request (%s), node=%s\n" |
| , method_name, __LINE__ |
| , requester ? requester->GetName() : "" |
| , nodeName_ ); |
| |
| CNameServerConfigContainer *nameServerConfig = Nodes->GetNameServerConfig(); |
| CNameServerConfig *config = nameServerConfig->GetConfig( nodeName_ ); |
| if ( config ) |
| { |
| if ( !nameServerConfig->DeleteConfig( config ) ) |
| rc = MPI_ERR_IO; |
| } |
| else |
| { |
| rc = MPI_ERR_IO; |
| } |
| |
| requester = Nodes->GetProcess( req_nid_ |
| , req_pid_ |
| , req_verifier_ ); |
| if (requester) |
| { |
| // Reply to requester |
| requester->CompleteRequest( rc ); |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| |
| CIntNodeNameReq::CIntNodeNameReq( int req_nid |
| , int req_pid |
| , Verifier_t req_verifier |
| , const char *current_name |
| , const char *new_name ) |
| : CInternalReq() |
| , req_nid_(req_nid) |
| , req_pid_(req_pid) |
| , req_verifier_(req_verifier) |
| { |
| // Add eyecatcher sequence as a debugging aid |
| memcpy(&eyecatcher_, "RQIZ", 4); |
| new_name_ = new_name; |
| current_name_=current_name; |
| } |
| |
| CIntNodeNameReq::~CIntNodeNameReq() |
| { |
| // Alter eyecatcher sequence as a debugging aid to identify deleted object |
| memcpy(&eyecatcher_, "rqiz", 4); |
| } |
| |
| void CIntNodeNameReq::populateRequestString( void ) |
| { |
| char strBuf[MON_STRING_BUF_SIZE/2]; |
| sprintf( strBuf, "IntReq(%s) req #=%ld" |
| , CReqQueue::intReqType[InternalType_NodeName] |
| , getId() ); |
| requestString_.assign( strBuf ); |
| } |
| |
| void CIntNodeNameReq::performRequest() |
| { |
| const char method_name[] = "CIntNodeNameReq::performRequest"; |
| TRACE_ENTRY; |
| |
| int rc = MPI_SUCCESS; |
| char current_n[MPI_MAX_PROCESSOR_NAME]; |
| char new_n[MPI_MAX_PROCESSOR_NAME]; |
| CPNodeConfig *pnodeConfig = NULL; |
| CProcess *requester = NULL; |
| |
| strcpy (current_n, current_name_.c_str()); |
| strcpy (new_n, new_name_.c_str()); |
| |
| CClusterConfig *clusterConfig = Nodes->GetClusterConfig(); |
| if (clusterConfig) |
| { |
| // Check for existence of node name in the configuration |
| pnodeConfig = clusterConfig->GetPNodeConfig( current_n ); |
| if (pnodeConfig) |
| { |
| // Update the node name in the configuration database |
| if (clusterConfig->UpdatePNodeConfig( pnodeConfig->GetPNid() |
| , new_n |
| , pnodeConfig->GetExcludedFirstCore() |
| , pnodeConfig->GetExcludedLastCore() )) |
| { |
| // lock sync thread since we are making a change the monitor's |
| // operational view of the cluster |
| if ( !Emulate_Down ) |
| { |
| Monitor->EnterSyncCycle(); |
| } |
| |
| // Change node name in monitor's view of cluster |
| CNode *node = Nodes->GetNode(current_n); |
| if (node) |
| { |
| node->SetName( new_n ); |
| Nodes->ChangedNode( node ); |
| } |
| else |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| sprintf( buf |
| , "[%s], Failed to retrieve node object for node %s!\n" |
| , method_name, current_n); |
| mon_log_write(MON_INTREQ_NODE_NAME_1, SQ_LOG_ERR, buf); |
| |
| rc = MPI_ERR_INTERN; |
| } |
| |
| // unlock sync thread |
| if ( !Emulate_Down ) |
| { |
| Monitor->ExitSyncCycle(); |
| } |
| } |
| else |
| { |
| rc = MPI_ERR_IO; |
| } |
| } |
| else |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| sprintf( buf |
| , "[%s], Failed to retrieve pnodeConfig object for node %s!\n" |
| , method_name, current_n); |
| mon_log_write(MON_INTREQ_NODE_NAME_2, SQ_LOG_ERR, buf); |
| |
| rc = MPI_ERR_INTERN; |
| } |
| } |
| else |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| sprintf(buf, "[%s], Failed to retrieve ClusterConfig object!\n", |
| method_name); |
| mon_log_write(MON_INTREQ_NODE_NAME_3, SQ_LOG_ERR, buf); |
| |
| rc = MPI_ERR_INTERN; |
| } |
| |
| requester = Nodes->GetProcess( req_nid_ |
| , req_pid_ |
| , req_verifier_ ); |
| if (requester) |
| { |
| // Reply to requester |
| requester->CompleteRequest( rc ); |
| } |
| |
| |
| TRACE_EXIT; |
| } |
| |
| |
| CIntNodeAddReq::CIntNodeAddReq( int req_nid |
| , int req_pid |
| , Verifier_t req_verifier |
| , char *nodeName |
| , int firstCore |
| , int lastCore |
| , int processors |
| , int roles |
| ) |
| : CInternalReq() |
| , req_nid_(req_nid) |
| , req_pid_(req_pid) |
| , req_verifier_(req_verifier) |
| , first_core_(firstCore) |
| , last_core_(lastCore) |
| , processors_(processors) |
| , roles_(roles) |
| { |
| // Add eyecatcher sequence as a debugging aid |
| memcpy(&eyecatcher_, "RQIJ", 4); |
| STRCPY( nodeName_, nodeName ); |
| } |
| |
| CIntNodeAddReq::~CIntNodeAddReq() |
| { |
| // Alter eyecatcher sequence as a debugging aid to identify deleted object |
| memcpy(&eyecatcher_, "rqij", 4); |
| } |
| |
| void CIntNodeAddReq::populateRequestString( void ) |
| { |
| char strBuf[MON_STRING_BUF_SIZE/2]; |
| snprintf( strBuf, sizeof(strBuf), |
| "IntReq(%s) req #=%ld " |
| "(node_name=%s/first_core=%d/last_core=%d/processors=%d/roles=%d)" |
| , CReqQueue::intReqType[InternalType_NodeAdd] |
| , getId() |
| , nodeName_ |
| , first_core_ |
| , last_core_ |
| , processors_ |
| , roles_ ); |
| requestString_.assign( strBuf ); |
| } |
| |
| void CIntNodeAddReq::performRequest() |
| { |
| const char method_name[] = "CIntNodeAddReq::performRequest"; |
| TRACE_ENTRY; |
| |
| int nid; |
| int pnid; |
| int rc = MPI_SUCCESS; |
| CProcess *requester = NULL; |
| |
| if (trace_settings & (TRACE_SYNC | TRACE_PROCESS)) |
| { |
| trace_printf("%s@%d - Node add request (%s), " |
| "node_name=%s, first_core=%d, last_core=%d, " |
| "processors=%d, roles=%d\n" |
| , method_name, __LINE__ |
| , requester ? requester->GetName() : "" |
| , nodeName_ |
| , first_core_ |
| , last_core_ |
| , processors_ |
| , roles_ ); |
| } |
| |
| CClusterConfig *clusterConfig = Nodes->GetClusterConfig(); |
| |
| // Get the next pnid and nid available for assignment |
| nid = clusterConfig->GetNextNid(); |
| pnid = clusterConfig->GetNextPNid(); |
| |
| // Insert node in configuration database and |
| // add to configuration object in monitor |
| if (clusterConfig->SaveNodeConfig( nodeName_ |
| , nid |
| , pnid |
| , first_core_ |
| , last_core_ |
| , processors_ |
| , -1 // excludedFirstCore |
| , -1 // excludedLastCore |
| , roles_ )) |
| { |
| // lock sync thread since we are making a change the monitor's |
| // operational view of the cluster |
| if ( !Emulate_Down ) |
| { |
| Monitor->EnterSyncCycle(); |
| } |
| |
| // Add node to monitor's view of cluster and broadcast node added notice |
| if (!Monitor->ReinitializeConfigCluster( true, pnid )) |
| { |
| rc = MPI_ERR_INTERN; |
| } |
| |
| // unlock sync thread |
| if ( !Emulate_Down ) |
| { |
| Monitor->ExitSyncCycle(); |
| } |
| } |
| else |
| { |
| rc = MPI_ERR_IO; |
| } |
| |
| requester = Nodes->GetProcess( req_nid_ |
| , req_pid_ |
| , req_verifier_ ); |
| if (requester) |
| { |
| // Reply to requester |
| requester->CompleteRequest( rc ); |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| CIntNodeDeleteReq::CIntNodeDeleteReq( int req_nid |
| , int req_pid |
| , Verifier_t req_verifier |
| , int pnid ) |
| : CInternalReq() |
| , req_nid_(req_nid) |
| , req_pid_(req_pid) |
| , req_verifier_(req_verifier) |
| , pnid_(pnid) |
| { |
| // Add eyecatcher sequence as a debugging aid |
| memcpy(&eyecatcher_, "RQIT", 4); |
| } |
| |
| CIntNodeDeleteReq::~CIntNodeDeleteReq() |
| { |
| // Alter eyecatcher sequence as a debugging aid to identify deleted object |
| memcpy(&eyecatcher_, "rqit", 4); |
| } |
| |
| void CIntNodeDeleteReq::populateRequestString( void ) |
| { |
| char strBuf[MON_STRING_BUF_SIZE/2]; |
| snprintf( strBuf, sizeof(strBuf), |
| "IntReq(%s) req #=%ld " |
| "(pnid=%d)" |
| , CReqQueue::intReqType[InternalType_NodeDelete] |
| , getId() |
| , pnid_ ); |
| requestString_.assign( strBuf ); |
| } |
| |
| void CIntNodeDeleteReq::performRequest() |
| { |
| const char method_name[] = "CIntNodeDeleteReq::performRequest"; |
| TRACE_ENTRY; |
| |
| int rc = MPI_SUCCESS; |
| CProcess *requester = NULL; |
| |
| requester = Nodes->GetProcess( req_nid_ |
| , req_pid_ |
| , req_verifier_ ); |
| |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST)) |
| trace_printf( "%s@%d - Node delete request (%s), pnid=%d\n" |
| , method_name, __LINE__ |
| , requester ? requester->GetName() : "" |
| , pnid_ ); |
| |
| CClusterConfig *clusterConfig = Nodes->GetClusterConfig(); |
| if (clusterConfig->DeleteNodeConfig( pnid_ )) |
| { |
| // lock sync thread |
| if ( !Emulate_Down ) |
| { |
| Monitor->EnterSyncCycle(); |
| } |
| |
| // Reload the static configuration and broadcast node deleted notice |
| if (!Monitor->ReinitializeConfigCluster( false, pnid_ )) |
| { |
| rc = MPI_ERR_INTERN; |
| } |
| |
| // unlock sync thread |
| if ( !Emulate_Down ) |
| { |
| Monitor->ExitSyncCycle(); |
| } |
| } |
| else |
| { |
| rc = MPI_ERR_IO; |
| } |
| |
| requester = Nodes->GetProcess( req_nid_ |
| , req_pid_ |
| , req_verifier_ ); |
| if (requester) |
| { |
| // Reply to requester |
| requester->CompleteRequest( rc ); |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| CIntDownReq::CIntDownReq( int pnid ) |
| : CInternalReq(), |
| pnid_ ( pnid ) |
| { |
| // Add eyecatcher sequence as a debugging aid |
| memcpy(&eyecatcher_, "RQIP", 4); |
| |
| if ( pnid == MyPNID ) |
| { |
| SetReviveFlag(1); // allow this request to be processed during revive |
| } |
| } |
| |
| CIntDownReq::~CIntDownReq() |
| { |
| // Alter eyecatcher sequence as a debugging aid to identify deleted object |
| memcpy(&eyecatcher_, "rqip", 4); |
| } |
| |
| void CIntDownReq::populateRequestString( void ) |
| { |
| char strBuf[MON_STRING_BUF_SIZE/2]; |
| sprintf( strBuf, "IntReq(%s) req #=%ld (pnid=%d)" |
| , CReqQueue::intReqType[InternalType_Down] |
| , getId(), pnid_ ); |
| requestString_.assign( strBuf ); |
| } |
| |
| void CIntDownReq::performRequest() |
| { |
| const char method_name[] = "CIntDownReq::performRequest"; |
| TRACE_ENTRY; |
| |
| const char *tp = getenv( "MON_TP017_NODE_DOWN" ); \ |
| if ((tp != NULL) && (MyPNID == 6)) |
| { |
| if (trace_settings & TRACE_REQUEST) |
| trace_printf("%s@%d - Node down test point, pnid=%d delaying...\n", |
| method_name, __LINE__, MyPNID); |
| |
| while ( true ) |
| { |
| sleep(1); |
| } |
| } |
| |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST)) |
| trace_printf("%s@%d - Node down request, pnid=%d\n", |
| method_name, __LINE__, pnid_); |
| Monitor->HardNodeDown( pnid_ ); |
| |
| TRACE_EXIT; |
| } |
| |
| CIntSoftNodeDownReq::CIntSoftNodeDownReq( int pnid ) |
| : CInternalReq() |
| , pnid_ ( pnid ) |
| { |
| // Add eyecatcher sequence as a debugging aid |
| memcpy(&eyecatcher_, "RQIX", 4); |
| } |
| |
| CIntSoftNodeDownReq::~CIntSoftNodeDownReq() |
| { |
| // Alter eyecatcher sequence as a debugging aid to identify deleted object |
| memcpy(&eyecatcher_, "rqix", 4); |
| } |
| |
| void CIntSoftNodeDownReq::populateRequestString( void ) |
| { |
| char strBuf[MON_STRING_BUF_SIZE/2]; |
| sprintf( strBuf, "IntReq(%s) req #=%ld (pnid=%d)" |
| , CReqQueue::intReqType[InternalType_SoftNodeDown] |
| , getId(), pnid_ ); |
| requestString_.assign( strBuf ); |
| } |
| |
| void CIntSoftNodeDownReq::performRequest() |
| { |
| const char method_name[] = "CIntSoftNodeDownReq::performRequest"; |
| TRACE_ENTRY; |
| |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST)) |
| trace_printf("%s@%d - Node soft down request, pnid=%d\n", |
| method_name, __LINE__, pnid_); |
| Monitor->SoftNodeDown( pnid_ ); |
| |
| TRACE_EXIT; |
| } |
| |
| CIntSoftNodeUpReq::CIntSoftNodeUpReq( int pnid ) |
| : CInternalReq() |
| , pnid_ ( pnid ) |
| { |
| // Add eyecatcher sequence as a debugging aid |
| memcpy(&eyecatcher_, "RQIY", 4); |
| |
| } |
| |
| CIntSoftNodeUpReq::~CIntSoftNodeUpReq() |
| { |
| // Alter eyecatcher sequence as a debugging aid to identify deleted object |
| memcpy(&eyecatcher_, "rqiy", 4); |
| } |
| |
| void CIntSoftNodeUpReq::populateRequestString( void ) |
| { |
| char strBuf[MON_STRING_BUF_SIZE/2]; |
| sprintf( strBuf, "IntReq(%s) req #=%ld (pnid=%d)" |
| , CReqQueue::intReqType[InternalType_SoftNodeUp] |
| , getId(), pnid_ ); |
| requestString_.assign( strBuf ); |
| } |
| |
| void CIntSoftNodeUpReq::performRequest() |
| { |
| const char method_name[] = "CIntSoftNodeUpReq::performRequest"; |
| TRACE_ENTRY; |
| |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST)) |
| trace_printf("%s@%d - Soft node up request, pnid=%d\n", |
| method_name, __LINE__, pnid_ ); |
| Monitor->SoftNodeUpPrepare( pnid_ ); |
| |
| TRACE_EXIT; |
| } |
| |
| CIntUpReq::CIntUpReq( int pnid, char *node_name, int merge_lead ) |
| : CInternalReq(), |
| nodeName_ ( node_name?node_name:"" ), |
| mergeLead_ ( merge_lead ), |
| pnid_ ( pnid ) |
| { |
| // Add eyecatcher sequence as a debugging aid |
| memcpy(&eyecatcher_, "RQIQ", 4); |
| |
| SetReviveFlag(1); // allow this request to be processed during revive |
| } |
| |
| CIntUpReq::~CIntUpReq() |
| { |
| // Alter eyecatcher sequence as a debugging aid to identify deleted object |
| memcpy(&eyecatcher_, "rqiq", 4); |
| } |
| |
| void CIntUpReq::populateRequestString( void ) |
| { |
| char strBuf[MON_STRING_BUF_SIZE/2]; |
| sprintf( strBuf, "IntReq(%s) req #=%ld (pnid=%d)" |
| , CReqQueue::intReqType[InternalType_Up] |
| , getId(), pnid_ ); |
| requestString_.assign( strBuf ); |
| } |
| |
| void CIntUpReq::performRequest() |
| { |
| const char method_name[] = "CIntUpReq::performRequest"; |
| TRACE_ENTRY; |
| |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST)) |
| trace_printf("%s@%d - Node up request, pnid=%d, name=%s\n", |
| method_name, __LINE__, pnid_, nodeName_.c_str()); |
| Monitor->HardNodeUp( pnid_, (char *) nodeName_.c_str() ); |
| |
| TRACE_EXIT; |
| } |
| |
| CIntActivateSpareReq::CIntActivateSpareReq(CNode *spareNode, CNode *downNode, bool checkHealth) |
| : CInternalReq(), |
| spareNode_(spareNode), |
| downNode_(downNode), |
| checkHealth_(checkHealth) |
| { |
| // Add eyecatcher sequence as a debugging aid |
| memcpy(&eyecatcher_, "RQIM", 4); |
| } |
| |
| CIntActivateSpareReq::~CIntActivateSpareReq() |
| { |
| // Alter eyecatcher sequence as a debugging aid to identify deleted object |
| memcpy(&eyecatcher_, "rqim", 4); |
| } |
| |
| void CIntActivateSpareReq::populateRequestString( void ) |
| { |
| char strBuf[MON_STRING_BUF_SIZE/2]; |
| sprintf( strBuf, "Activate Spare (%s) req #=%ld" |
| , CReqQueue::intReqType[InternalType_ActivateSpare] |
| , getId()); |
| requestString_.assign( strBuf ); |
| } |
| |
| void CIntActivateSpareReq::performRequest() |
| { |
| const char method_name[] = "CIntActivateSpareReq::performRequest"; |
| TRACE_ENTRY; |
| |
| if ( downNode_ == NULL ) |
| { |
| Monitor->NodeReady(spareNode_); |
| } |
| else |
| { |
| spareNode_->ResetSpareNode(); |
| Nodes->RemoveFromSpareNodesList( spareNode_ ); |
| Monitor->ActivateSpare(spareNode_, downNode_, checkHealth_); |
| } |
| |
| TRACE_EXIT; |
| return; |
| } |
| |
| CIntReviveReq::CIntReviveReq() |
| : CInternalReq() |
| { |
| // Add eyecatcher sequence as a debugging aid |
| memcpy(&eyecatcher_, "RQIR", 4); |
| |
| SetReviveFlag(1); // allow this request to be processed during revive |
| } |
| |
| CIntReviveReq::~CIntReviveReq() |
| { |
| // Alter eyecatcher sequence as a debugging aid to identify deleted object |
| memcpy(&eyecatcher_, "rqir", 4); |
| } |
| |
| void CIntReviveReq::populateRequestString( void ) |
| { |
| char strBuf[MON_STRING_BUF_SIZE/2]; |
| sprintf( strBuf, "Revive(%s) req #=%ld" |
| , CReqQueue::intReqType[InternalType_Revive] |
| , getId()); |
| requestString_.assign( strBuf ); |
| } |
| |
| void CIntReviveReq::performRequest() |
| { |
| const char method_name[] = "CIntReviveReq::performRequest"; |
| TRACE_ENTRY; |
| |
| int error; |
| |
| if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf("%s@%d - Revive request\n", method_name, __LINE__); |
| |
| mem_log_write(MON_REQQUEUE_REVIVE_1); |
| |
| CCluster::snapShotHeader_t header; |
| |
| switch( CommType ) |
| { |
| case CommType_InfiniBand: |
| error = Monitor->ReceiveMPI( (char *)&header |
| , sizeof(header) |
| , 0 |
| , MON_XCHNG_HEADER |
| , Monitor->getJoinComm()); |
| break; |
| case CommType_Sockets: |
| error = Monitor->ReceiveSock( (char *)&header |
| , sizeof(header) |
| , Monitor->getJoinSock() |
| , method_name ); |
| break; |
| default: |
| // Programmer bonehead! |
| abort(); |
| } |
| |
| mem_log_write(MON_REQQUEUE_REVIVE_2, error); |
| |
| if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf("%s@%d - Msg Received - header. Error = %d\n", method_name, __LINE__, error); |
| |
| if (error) |
| { |
| if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf("%s@%d - Unable to receive header. Exiting.", method_name, __LINE__); |
| |
| TRACE_EXIT; |
| return; |
| } |
| |
| if (header.compressedSize_ == -1) |
| { // creator monitor ran into compression error, abort. |
| if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf("%s@%d - Creator monitor compression error. Exiting.", method_name, __LINE__); |
| |
| char buf[MON_STRING_BUF_SIZE]; |
| sprintf(buf, "Creator monitor had compression error. Aborting node reintegration.\n"); |
| mon_log_write(MON_REQQUEUE_REVIVE_2, SQ_LOG_CRIT, buf); |
| |
| // exit call below runs desctructors. Stop healthcheck thread so that its lock can be destructed. |
| HealthCheck.shutdownWork(); |
| |
| TRACE_EXIT; |
| exit(0); // this will cause other monitors to disconnect from the new monitor. |
| } |
| |
| char *compBuf = (char *) malloc ( header.compressedSize_ ); |
| |
| if (!compBuf) |
| { |
| if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf("%s@%d - Unable to allocate buffer of size = %ld\n", |
| method_name, __LINE__, header.compressedSize_); |
| TRACE_EXIT; |
| return; |
| } |
| |
| switch( CommType ) |
| { |
| case CommType_InfiniBand: |
| error = Monitor->ReceiveMPI( compBuf |
| , header.compressedSize_ |
| , 0 |
| , MON_XCHNG_DATA |
| , Monitor->getJoinComm()); |
| break; |
| case CommType_Sockets: |
| error = Monitor->ReceiveSock( compBuf |
| , header.compressedSize_ |
| , Monitor->getJoinSock() |
| , method_name ); |
| break; |
| default: |
| // Programmer bonehead! |
| abort(); |
| } |
| |
| mem_log_write(MON_REQQUEUE_REVIVE_3, error); |
| |
| if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf("%s@%d - Msg Received - data. Error = %d\n", method_name, __LINE__, error); |
| |
| if (error) |
| { |
| if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf("%s@%d - Unable to receive data. Exiting.", method_name, __LINE__); |
| |
| free( compBuf ); |
| |
| TRACE_EXIT; |
| return; |
| } |
| |
| char *buf = (char *) malloc ( header.fullSize_ ); |
| |
| if (!buf) |
| { |
| if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf("%s@%d - Unable to allocate buffer of size = %ld\n", |
| method_name, __LINE__, header.fullSize_); |
| |
| free( compBuf ); |
| |
| TRACE_EXIT; |
| return; |
| } |
| |
| unsigned long bufLen = header.fullSize_; |
| |
| error = uncompress((Bytef *)buf, &bufLen, (Bytef *)compBuf, header.compressedSize_); |
| |
| if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf("%s@%d - compSize = %ld, fullsize = %ld, uncompress error = %d\n", |
| method_name, __LINE__, header.compressedSize_, header.fullSize_, error); |
| |
| free( compBuf ); // don't need anymore. Will work on uncompressed buffer from this point. |
| |
| char *buffer = buf; |
| |
| #ifndef NAMESERVER_PROCESS |
| // unpack the current TM leader |
| Monitor->SetTmLeader( header.tmLeader_ ); |
| |
| if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf( "%s@%d - TM leader (%d) unpacked\n", method_name, __LINE__ |
| , Monitor->GetTmLeader() ); |
| #endif |
| |
| mem_log_write(MON_REQQUEUE_REVIVE_4); |
| |
| Config->Init(); |
| |
| if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf("%s@%d - Config Cluster group unpacked\n", method_name, __LINE__); |
| |
| Nodes->UnpackSpareNodesList( (intBuffPtr_t&)buffer, header.spareNodesCount_); |
| |
| if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf("%s@%d - Spare Nodes List unpacked\n", method_name, __LINE__); |
| |
| Nodes->UnpackNodeMappings( (intBuffPtr_t&)buffer, header.nodeMapCount_ ); |
| |
| if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf("%s@%d - Node mappings unpacked\n", method_name, __LINE__); |
| |
| Nodes->UnpackZids( (intBuffPtr_t&)buffer ); |
| |
| if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf("%s@%d - Node zids unpacked\n", method_name, __LINE__); |
| |
| // unpack process objects and create clones |
| Monitor->UnpackProcObjs(buffer, header.procCount_); |
| |
| if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf("%s@%d - Process Objects unpacked\n", method_name, __LINE__); |
| |
| Config->UnpackRegistry(buffer, (header.clusterRegistryCount_ + header.processRegistryCount_)); |
| Config->UnpackUniqueStrings(buffer, header.uniqueStringCount_); |
| |
| if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf("%s@%d - Registry unpacked\n", method_name, __LINE__); |
| |
| mem_log_write(MON_REQQUEUE_REVIVE_5); |
| |
| // process the requests that were deferred to the revive side queue. |
| ReqQueue.processReviveRequests(header.seqNum_); |
| |
| mem_log_write(MON_REQQUEUE_REVIVE_6); |
| |
| if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf("%s@%d - Queued revive requests processed\n", method_name, __LINE__); |
| |
| free( buf ); |
| |
| // done with joining. |
| // we are in the new monitor, and this will drive the state change |
| MyNode->SetChangeState( true ); |
| |
| TRACE_EXIT; |
| |
| return; |
| } |
| |
| CIntSnapshotReq::CIntSnapshotReq( unsigned long long seqNum ) |
| : CInternalReq(), |
| seqNum_( seqNum ) |
| { |
| // Add eyecatcher sequence as a debugging aid |
| memcpy(&eyecatcher_, "RQIG", 4); |
| } |
| |
| CIntSnapshotReq::~CIntSnapshotReq() |
| { |
| // Alter eyecatcher sequence as a debugging aid to identify deleted object |
| memcpy(&eyecatcher_, "rqig", 4); |
| } |
| |
| void CIntSnapshotReq::populateRequestString( void ) |
| { |
| char strBuf[MON_STRING_BUF_SIZE/2]; |
| sprintf( strBuf, "Snapshot(%s) req #=%ld" |
| , CReqQueue::intReqType[InternalType_Snapshot] |
| , getId()); |
| requestString_.assign( strBuf ); |
| } |
| |
| void CIntSnapshotReq::performRequest() |
| { |
| const char method_name[] = "CIntSnapshotReq::performRequest"; |
| TRACE_ENTRY; |
| |
| char *snapshotBuf = NULL; |
| char *compBuf = NULL; |
| unsigned long size = 0; |
| unsigned long compSize = 0; |
| int z_result = 0; |
| struct timespec startTime, snapShotTime, compressTime; |
| int error = 0; |
| |
| if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf("%s@%d - Snapshot request\n", method_name, __LINE__); |
| |
| mem_log_write(MON_REQQUEUE_SNAPSHOT_1); |
| |
| // abort this request if join communication is not setup |
| switch( CommType ) |
| { |
| case CommType_InfiniBand: |
| if (Monitor->getJoinComm() == MPI_COMM_NULL) |
| { |
| mem_log_write(MON_REQQUEUE_SNAPSHOT_2); |
| |
| char buf[MON_STRING_BUF_SIZE]; |
| sprintf(buf, "[%s], Join communicator is null, aborting snapshot req.\n", method_name); |
| mon_log_write(MON_REQQUEUE_SNAPSHOT_2, SQ_LOG_ERR, buf); |
| |
| TRACE_EXIT; |
| return; |
| } |
| break; |
| case CommType_Sockets: |
| if (Monitor->getJoinSock() == -1) |
| { |
| mem_log_write(MON_REQQUEUE_SNAPSHOT_2); |
| |
| char buf[MON_STRING_BUF_SIZE]; |
| sprintf(buf, "[%s], Join socket is -1, aborting snapshot req.\n", method_name); |
| mon_log_write(MON_REQQUEUE_SNAPSHOT_2, SQ_LOG_ERR, buf); |
| |
| TRACE_EXIT; |
| return; |
| } |
| break; |
| default: |
| // Programmer bonehead! |
| abort(); |
| } |
| |
| // estimate size of snapshot buffer |
| // about 500 bytes per process, 2 times total |
| int procSize = Nodes->ProcessCount() * 2 * 500; |
| int idsSize = Nodes->GetSNodesCount() * sizeof(int); // spare pnids |
| idsSize += (Nodes->GetPNodesCount() + Nodes->GetLNodesCount()) * sizeof(int); // pnid/nid map |
| idsSize += Nodes->GetPNodesCount() * 2 * sizeof(int); // pnid/zid |
| idsSize += Nodes->GetLNodesCount() * sizeof(int); // nids |
| idsSize += Config->getUniqueStringsSize(); |
| idsSize += Config->getRegistrySize(); |
| |
| if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf("%s@%d - Snapshot sizes, procSize = %d, idsSize = %d\n", |
| method_name, __LINE__, procSize, idsSize); |
| |
| mem_log_write(MON_REQQUEUE_SNAPSHOT_4, procSize, idsSize); |
| |
| snapshotBuf = (char *) malloc (procSize + idsSize); |
| |
| if (!snapshotBuf) |
| { |
| if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf("%s@%d - Unable to allocate snapshot buffer, size = %ld\n", |
| method_name, __LINE__, size); |
| |
| mem_log_write(MON_REQQUEUE_SNAPSHOT_5); |
| |
| TRACE_EXIT; |
| return; |
| } |
| |
| clock_gettime(CLOCK_REALTIME, &startTime); |
| |
| memset( snapshotBuf, 0, (procSize + idsSize) ); |
| char *buf = snapshotBuf; |
| |
| CCluster::snapShotHeader_t header; |
| |
| #ifndef NAMESERVER_PROCESS |
| // pack the current TM leader |
| header.tmLeader_ = Monitor->GetTmLeader(); |
| #endif |
| |
| // pack spareNodes pnids |
| header.spareNodesCount_ = Nodes->PackSpareNodesList( (intBuffPtr_t&)buf ); |
| |
| // pack logical-to-physical nid mappings |
| header.nodeMapCount_ = Nodes->PackNodeMappings( (intBuffPtr_t&)buf ); |
| |
| Nodes->PackZids( (intBuffPtr_t&)buf ); |
| |
| // pack process objects |
| header.procCount_ = Monitor->PackProcObjs(buf); |
| |
| header.clusterRegistryCount_ = Config->PackRegistry(buf, ConfigType_Cluster); |
| header.processRegistryCount_ = Config->PackRegistry(buf, ConfigType_Process); |
| header.uniqueStringCount_ = Config->PackUniqueStrings(buf); |
| |
| mem_log_write(MON_REQQUEUE_SNAPSHOT_6, header.nodeMapCount_, header.procCount_); |
| |
| header.fullSize_ = buf - snapshotBuf; |
| |
| mem_log_write(MON_REQQUEUE_SNAPSHOT_7, header.fullSize_); |
| |
| // the seq num in the request reflects the state of the cluster at this point. |
| // do not use the current seq num as it may have advanced. |
| header.seqNum_ = seqNum_; // copy the one stored in this request |
| |
| if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf("%s@%d - procCount = %ld, fullSize = %ld\n", |
| method_name, __LINE__, header.procCount_, header.fullSize_); |
| |
| clock_gettime(CLOCK_REALTIME, &snapShotTime); |
| |
| // compress call requires the compression buffer to be little more than the input buffer. |
| compSize = compressBound(header.fullSize_); |
| |
| compBuf = (char *)malloc(compSize); |
| |
| if (!compBuf) |
| { |
| if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf("%s@%d - Unable to allocate compression buffer, size = %ld\n", |
| method_name, __LINE__, compSize); |
| |
| free( snapshotBuf ); |
| |
| mem_log_write(MON_REQQUEUE_SNAPSHOT_8); |
| |
| TRACE_EXIT; |
| return; |
| } |
| |
| memset( compBuf, 0, compSize ); // TODO: WHY? |
| z_result = compress((Bytef *)compBuf, (unsigned long *)&compSize, |
| (Bytef *)snapshotBuf, header.fullSize_); |
| |
| mem_log_write(MON_REQQUEUE_SNAPSHOT_9, z_result, compSize); |
| |
| if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf("%s@%d - compression result = %d, orig size = %ld, comp size = %ld\n", |
| method_name, __LINE__, z_result, header.fullSize_, compSize); |
| |
| if (z_result != Z_OK) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| sprintf(buf, "Snapshot buffer compression error = %d, aborting node reintegration.\n", z_result); |
| mon_log_write (MON_REQQUEUE_SNAPSHOT_14, SQ_LOG_CRIT, buf); |
| |
| // send msg to new monitor so that it can exit |
| header.compressedSize_ = -1; |
| switch( CommType ) |
| { |
| case CommType_InfiniBand: |
| error = Monitor->SendMPI( (char *)&header |
| , sizeof(header) |
| , 0 |
| , MON_XCHNG_HEADER |
| , Monitor->getJoinComm()); |
| break; |
| case CommType_Sockets: |
| error = Monitor->SendSock( (char *)&header |
| , sizeof(header) |
| , Monitor->getJoinSock() |
| , method_name ); |
| break; |
| default: |
| // Programmer bonehead! |
| abort(); |
| } |
| if (error) { |
| sprintf(buf, "Unable to send exit msg to new monitor, error = %d\n", error); |
| mon_log_write(MON_REQQUEUE_SNAPSHOT_15, SQ_LOG_CRIT, buf); |
| } |
| |
| sprintf(buf, "Node reintegration aborted due to buffer compression error."); |
| #ifndef NAMESERVER_PROCESS |
| SQ_theLocalIOToClient->putOnNoticeQueue( MyNode->GetCreatorPid() |
| , MyNode->GetCreatorVerifier() |
| , Monitor->ReIntegErrorMessage( buf ) |
| , NULL ); |
| #endif |
| TRACE_EXIT; |
| return; |
| } |
| header.compressedSize_ = compSize; |
| header.compressedSize_ = compSize; |
| clock_gettime(CLOCK_REALTIME, &compressTime); |
| |
| if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf("%s@%d - snapshot times, start = %ld.%ld, end = %ld.%ld compressed = %ld.%ld\n", |
| method_name, __LINE__, startTime.tv_sec, startTime.tv_nsec, |
| snapShotTime.tv_sec, snapShotTime.tv_nsec, compressTime.tv_sec, compressTime.tv_nsec); |
| |
| free( snapshotBuf ); // don't need anymore. Will work on compressed buffer from this point. |
| |
| mem_log_write(MON_REQQUEUE_SNAPSHOT_10); |
| |
| switch( CommType ) |
| { |
| case CommType_InfiniBand: |
| error = Monitor->SendMPI( (char *)&header |
| , sizeof(header) |
| , 0 |
| , MON_XCHNG_HEADER |
| , Monitor->getJoinComm()); |
| break; |
| case CommType_Sockets: |
| error = Monitor->SendSock( (char *)&header |
| , sizeof(header) |
| , Monitor->getJoinSock() |
| , method_name ); |
| break; |
| default: |
| // Programmer bonehead! |
| abort(); |
| } |
| |
| mem_log_write(MON_REQQUEUE_SNAPSHOT_11, error); |
| |
| if (error) |
| { |
| if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf("%s@%d - Unable to send header. Exiting.", method_name, __LINE__); |
| |
| free( compBuf ); |
| |
| TRACE_EXIT; |
| return; |
| } |
| |
| if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf("%s@%d - Msg Sent - header. Error = %d\n", method_name, __LINE__, error); |
| |
| switch( CommType ) |
| { |
| case CommType_InfiniBand: |
| error = Monitor->SendMPI( compBuf |
| , header.compressedSize_ |
| , 0 |
| , MON_XCHNG_DATA |
| , Monitor->getJoinComm()); |
| break; |
| case CommType_Sockets: |
| error = Monitor->SendSock( compBuf |
| , header.compressedSize_ |
| , Monitor->getJoinSock() |
| , method_name ); |
| break; |
| default: |
| // Programmer bonehead! |
| abort(); |
| } |
| |
| mem_log_write(MON_REQQUEUE_SNAPSHOT_12, header.compressedSize_, error); |
| |
| if (error) |
| { |
| if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf("%s@%d - Unable to send data. Exiting.", method_name, __LINE__); |
| |
| free( compBuf ); |
| |
| TRACE_EXIT; |
| return; |
| } |
| |
| if (trace_settings & TRACE_REQUEST) |
| trace_printf("%s@%d - Msg Sent - data. Error = %d\n", method_name, __LINE__, error); |
| |
| free( compBuf ); |
| |
| mem_log_write(MON_REQQUEUE_SNAPSHOT_13); |
| |
| TRACE_EXIT; |
| return; |
| } |
| |
| CQuiesceReq::CQuiesceReq( ) |
| : CInternalReq() |
| { |
| // Add eyecatcher sequence as a debugging aid |
| memcpy(&eyecatcher_, "RQIU", 4); |
| } |
| |
| CQuiesceReq::~CQuiesceReq() |
| { |
| // Alter eyecatcher sequence as a debugging aid to identify deleted object |
| memcpy(&eyecatcher_, "rqiu", 4); |
| } |
| |
| void CQuiesceReq::populateRequestString( void ) |
| { |
| char strBuf[MON_STRING_BUF_SIZE/2]; |
| sprintf( strBuf, "Quiece(%s) req #=%ld" |
| , CReqQueue::intReqType[InternalType_Quiesce] |
| , getId()); |
| requestString_.assign( strBuf ); |
| } |
| |
| void CQuiesceReq::performRequest() |
| { |
| const char method_name[] = "CQuiesceReq::performRequest"; |
| TRACE_ENTRY; |
| |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST)) |
| trace_printf("%s@%d - Quiece request\n", |
| method_name, __LINE__); |
| |
| if ( Monitor->isMonSyncResponsive() ) |
| { |
| Monitor->CompleteSyncCycle(); // let other nodes know that we are in quiesce state |
| } |
| |
| #ifndef NAMESERVER_PROCESS |
| MyNode->SendQuiescingNotices(); |
| #endif |
| |
| char buf[MON_STRING_BUF_SIZE]; |
| sprintf(buf, "[%s], Quiesce notices sent.\n", method_name); |
| mon_log_write(MON_REQQUEUE_QUIESCE_1, SQ_LOG_WARNING, buf); |
| |
| // if nothing in exit list, schedule a node down. |
| // if not, node down will be scheduled when exit list becomes empty. |
| #ifndef NAMESERVER_PROCESS |
| if (MyNode->getNumQuiesceExitPids() == 0) |
| { |
| #endif |
| if (trace_settings & (TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC)) |
| trace_printf("%s@%d - Scheduling node down\n", method_name, __LINE__); |
| HealthCheck.setState(MON_SCHED_NODE_DOWN); |
| #ifndef NAMESERVER_PROCESS |
| } |
| #endif |
| |
| TRACE_EXIT; |
| } |
| |
| CPostQuiesceReq::CPostQuiesceReq( ) |
| : CInternalReq() |
| { |
| // Add eyecatcher sequence as a debugging aid |
| memcpy(&eyecatcher_, "RQIB", 4); |
| } |
| |
| CPostQuiesceReq::~CPostQuiesceReq() |
| { |
| // Alter eyecatcher sequence as a debugging aid to identify deleted object |
| memcpy(&eyecatcher_, "rqib", 4); |
| } |
| |
| void CPostQuiesceReq::populateRequestString( void ) |
| { |
| char strBuf[MON_STRING_BUF_SIZE/2]; |
| sprintf( strBuf, "PostQuiece(%s) req #=%ld" |
| , CReqQueue::intReqType[InternalType_PostQuiece] |
| , getId()); |
| requestString_.assign( strBuf ); |
| } |
| |
| void CPostQuiesceReq::performRequest() |
| { |
| const char method_name[] = "CPostQuiesceReq::performRequest"; |
| TRACE_ENTRY; |
| |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST)) |
| trace_printf("%s@%d - Post Quiece request\n", |
| method_name, __LINE__); |
| |
| if( !getenv("SQ_VIRTUAL_NODES") ) |
| { |
| // Execute node fail safe via the Watchdog process |
| HealthCheck.setState(MON_NODE_DOWN); |
| // wait forever |
| for (;;) |
| sleep(10000); |
| } |
| else |
| { |
| // Stop all processes |
| Monitor->HardNodeDown( MyPNID ); |
| #ifndef NAMESERVER_PROCESS |
| MyNode->EmptyQuiescingPids(); |
| #endif |
| // now stop the Watchdog process |
| HealthCheck.setState(MON_NODE_DOWN); |
| // and tell the cluster the node is down |
| CReplNodeDown *repl = new CReplNodeDown(MyPNID); |
| Replicator.addItem(repl); |
| } |
| TRACE_EXIT; |
| } |
| |
| #ifndef NAMESERVER_PROCESS |
| CIntCreatePrimitiveReq::CIntCreatePrimitiveReq( int pnid ) |
| :CInternalReq() |
| ,pnid_ ( pnid ) |
| { |
| // Add eyecatcher sequence as a debugging aid |
| memcpy(&eyecatcher_, "RQIW", 4); |
| } |
| |
| CIntCreatePrimitiveReq::~CIntCreatePrimitiveReq() |
| { |
| // Alter eyecatcher sequence as a debugging aid to identify deleted object |
| memcpy(&eyecatcher_, "rqiw", 4); |
| } |
| |
| void CIntCreatePrimitiveReq::populateRequestString( void ) |
| { |
| char strBuf[MON_STRING_BUF_SIZE/2]; |
| sprintf( strBuf, "IntReq(%s) req #=%ld (pnid=%d)" |
| , CReqQueue::intReqType[InternalType_CreatePrimitives] |
| , getId(), pnid_ ); |
| requestString_.assign( strBuf ); |
| } |
| |
| void CIntCreatePrimitiveReq::performRequest() |
| { |
| const char method_name[] = "CIntCreatePrimitiveReq::performRequest"; |
| TRACE_ENTRY; |
| |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST)) |
| trace_printf("%s@%d - Create Primitive Processes request, pnid=%d\n", |
| method_name, __LINE__, pnid_); |
| if ( pnid_ == MyPNID ) |
| { |
| if ( NameServerEnabled ) |
| { |
| bool startNs = false; |
| if ( IsRealCluster ) |
| { |
| CNameServerConfig *config; |
| config = NameServerConfig->GetConfig( MyNode->GetName() ); |
| if ( config ) |
| startNs = true; |
| } |
| else |
| { |
| startNs = true; |
| } |
| if ( !MyNode->IsSoftNodeUp() ) |
| { // Don't restart the name server on a soft node up |
| if ( startNs ) |
| { |
| NameServer->SetLocalHost(); |
| MyNode->StartNameServerProcess(); |
| } |
| } |
| else |
| { |
| MyNode->ResetSoftNodeUp(); |
| } |
| } |
| MyNode->StartWatchdogProcess(); |
| MyNode->StartPStartDProcess(); |
| char *env = getenv( "SQ_SEAMONSTER" ); |
| if ( env && strcmp( env, "1" ) == 0 ) |
| { |
| MyNode->StartSMServiceProcess(); |
| } |
| } |
| |
| TRACE_EXIT; |
| } |
| #endif |
| |
| #ifndef NAMESERVER_PROCESS |
| CIntTmReadyReq::CIntTmReadyReq( int nid ) |
| :CInternalReq() |
| ,nid_ ( nid ) |
| { |
| // Add eyecatcher sequence as a debugging aid |
| memcpy(&eyecatcher_, "RQIV", 4); |
| } |
| |
| CIntTmReadyReq::~CIntTmReadyReq() |
| { |
| // Alter eyecatcher sequence as a debugging aid to identify deleted object |
| memcpy(&eyecatcher_, "rqiv", 4); |
| } |
| |
| void CIntTmReadyReq::populateRequestString( void ) |
| { |
| char strBuf[MON_STRING_BUF_SIZE/2]; |
| sprintf( strBuf, "IntReq(%s) req #=%ld (nid=%d)" |
| , CReqQueue::intReqType[InternalType_TmReady] |
| , getId(), nid_ ); |
| requestString_.assign( strBuf ); |
| } |
| |
| void CIntTmReadyReq::performRequest() |
| { |
| const char method_name[] = "CIntTmReadyReq::performRequest"; |
| TRACE_ENTRY; |
| |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST)) |
| trace_printf("%s@%d - TM ready request, pnid=%d\n", |
| method_name, __LINE__, nid_); |
| |
| Monitor->NodeTmReady( nid_ ); |
| |
| TRACE_EXIT; |
| } |
| #endif |
| |
| // |
| CReqQueue::CReqQueue(): busyExclusive_(false), busyWorkers_(0), syncDependentRequests_(0), maxQueueSize_(0), |
| maxBusyWorkers_(0), numRequests_(0), execTimeMax_(0) |
| { |
| // Add eyecatcher sequence as a debugging aid |
| memcpy(&eyecatcher_, "REQQ", 4); |
| mostRecentStart_.tv_sec = 0; |
| mostRecentStart_.tv_nsec = 0; |
| } |
| |
| CReqQueue::~CReqQueue() |
| { |
| // Alter eyecatcher sequence as a debugging aid to identify deleted object |
| memcpy(&eyecatcher_, "reqq", 4); |
| } |
| |
| |
| CExternalReq *CReqQueue::prepExternalReq(CExternalReq::reqQueueMsg_t msgType, |
| int nid, int pid, int sockFd, |
| struct message_def *msg) |
| { |
| const char method_name[] = "CReqQueue::prepExternalReq"; |
| TRACE_ENTRY; |
| |
| CExternalReq * request = NULL; |
| |
| |
| if (msg && msg->type == MsgType_Service) |
| { |
| if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) |
| trace_printf("%s@%d from pid=%d, request=%s, lio msg type=%d\n", |
| method_name, __LINE__, pid, |
| svcReqType[msg->u.request.type], msgType); |
| |
| switch (msg->u.request.type) |
| { |
| #ifdef NAMESERVER_PROCESS |
| case ReqType_DelProcessNs: |
| request = new CExtDelProcessNsReq(msgType, nid, pid, sockFd, msg); |
| request->setConcurrent(reqConcurrent[msg->u.request.type]); |
| break; |
| |
| case ReqType_NameServerStart: |
| request = new CExtNameServerStartNsReq(msgType, nid, pid, sockFd, msg); |
| request->setConcurrent(reqConcurrent[msg->u.request.type]); |
| break; |
| |
| case ReqType_NameServerStop: |
| request = new CExtNameServerStopNsReq(msgType, nid, pid, sockFd, msg); |
| request->setConcurrent(reqConcurrent[msg->u.request.type]); |
| break; |
| |
| case ReqType_NewProcessNs: |
| request = new CExtNewProcNsReq(msgType, nid, pid, sockFd, msg); |
| request->setConcurrent(reqConcurrent[msg->u.request.type]); |
| break; |
| |
| case ReqType_ProcessInfo: |
| request = new CExtProcInfoReq(msgType, nid, pid, sockFd, msg); |
| request->setConcurrent(reqConcurrent[msg->u.request.type]); |
| break; |
| |
| case ReqType_ProcessInfoCont: |
| request = new CExtProcInfoContReq(msgType, nid, pid, sockFd, msg); |
| request->setConcurrent(reqConcurrent[msg->u.request.type]); |
| break; |
| |
| case ReqType_ProcessInfoNs: |
| request = new CExtProcInfoNsReq(msgType, nid, pid, sockFd, msg); |
| request->setConcurrent(reqConcurrent[msg->u.request.type]); |
| break; |
| |
| case ReqType_ShutdownNs: |
| request = new CExtShutdownNsReq(msgType, nid, pid, sockFd, msg); |
| request->setConcurrent(reqConcurrent[msg->u.request.type]); |
| break; |
| |
| #else |
| case ReqType_Close: |
| // No work to do for "close" (obsolete request). Reply |
| // with success. |
| request = new CExtNullReq(msgType, nid, pid, -1, msg); |
| request->errorReply( MPI_SUCCESS ); |
| delete request; |
| request = NULL; |
| break; |
| |
| case ReqType_Dump: |
| request = new CExtDumpReq(msgType, pid, msg); |
| request->setConcurrent(reqConcurrent[msg->u.request.type]); |
| break; |
| |
| case ReqType_ProcessInfo: |
| request = new CExtProcInfoReq(msgType, nid, pid, -1, msg); |
| request->setConcurrent(reqConcurrent[msg->u.request.type]); |
| break; |
| |
| case ReqType_ProcessInfoCont: |
| request = new CExtProcInfoContReq(msgType, nid, pid, -1, msg); |
| request->setConcurrent(reqConcurrent[msg->u.request.type]); |
| break; |
| |
| case ReqType_NodeInfo: |
| request = new CExtNodeInfoReq(msgType, pid, msg); |
| request->setConcurrent(reqConcurrent[msg->u.request.type]); |
| break; |
| |
| case ReqType_PNodeInfo: |
| request = new CExtPNodeInfoReq(msgType, pid, msg); |
| request->setConcurrent(reqConcurrent[msg->u.request.type]); |
| break; |
| |
| case ReqType_Set: |
| request = new CExtSetReq(msgType, pid, msg); |
| request->setConcurrent(reqConcurrent[msg->u.request.type]); |
| break; |
| |
| case ReqType_Get: |
| request = new CExtGetReq(msgType, pid, msg); |
| request->setConcurrent(reqConcurrent[msg->u.request.type]); |
| break; |
| |
| case ReqType_NewProcess: |
| request = new CExtNewProcReq(msgType, nid, pid, -1, msg); |
| request->setConcurrent(reqConcurrent[msg->u.request.type]); |
| break; |
| |
| case ReqType_Kill: |
| request = new CExtKillReq(msgType, pid, msg); |
| request->setConcurrent(reqConcurrent[msg->u.request.type]); |
| break; |
| |
| case ReqType_Exit: |
| request = new CExtExitReq(msgType, pid, msg); |
| request->setConcurrent(reqConcurrent[msg->u.request.type]); |
| break; |
| |
| case ReqType_Notify: |
| request = new CExtNotifyReq(msgType, pid, msg); |
| request->setConcurrent(reqConcurrent[msg->u.request.type]); |
| break; |
| |
| case ReqType_MonStats: |
| request = new CExtMonStatsReq(msgType, pid, msg); |
| request->setConcurrent(reqConcurrent[msg->u.request.type]); |
| break; |
| |
| case ReqType_Open: |
| request = new CExtOpenReq(msgType, pid, msg); |
| request->setConcurrent(reqConcurrent[msg->u.request.type]); |
| break; |
| |
| case ReqType_Startup: |
| if (msgType == CExternalReq::AttachStartupMsg) |
| { |
| request = new CExtAttachStartupReq ( msgType, pid, msg ); |
| } |
| else |
| { |
| request = new CExtStartupReq ( msgType, pid, msg ); |
| } |
| request->setConcurrent(reqConcurrent[msg->u.request.type]); |
| break; |
| |
| // The following request types are not executed concurrently |
| // so ownership does not need to be asserted. |
| case ReqType_Event: |
| request = new CExtEventReq(msgType, pid, msg); |
| request->setConcurrent(reqConcurrent[msg->u.request.type]); |
| break; |
| |
| case ReqType_Mount: |
| request = new CExtMountReq(msgType, pid, msg); |
| request->setConcurrent(reqConcurrent[msg->u.request.type]); |
| break; |
| |
| case ReqType_NameServerAdd: |
| request = new CExtNameServerAddReq(msgType, pid, msg); |
| request->setConcurrent(reqConcurrent[msg->u.request.type]); |
| break; |
| |
| case ReqType_NameServerDelete: |
| request = new CExtNameServerDeleteReq(msgType, pid, msg); |
| request->setConcurrent(reqConcurrent[msg->u.request.type]); |
| break; |
| |
| case ReqType_NameServerStart: |
| request = new CExtNameServerStartReq(msgType, pid, msg); |
| request->setConcurrent(reqConcurrent[msg->u.request.type]); |
| break; |
| |
| case ReqType_NameServerStop: |
| request = new CExtNameServerStopReq(msgType, pid, msg); |
| request->setConcurrent(reqConcurrent[msg->u.request.type]); |
| break; |
| |
| case ReqType_NodeAdd: |
| request = new CExtNodeAddReq(msgType, pid, msg); |
| request->setConcurrent(reqConcurrent[msg->u.request.type]); |
| break; |
| |
| case ReqType_NodeDelete: |
| request = new CExtNodeDeleteReq(msgType, pid, msg); |
| request->setConcurrent(reqConcurrent[msg->u.request.type]); |
| break; |
| |
| case ReqType_NodeDown: |
| request = new CExtNodeDownReq(msgType, pid, msg); |
| request->setConcurrent(reqConcurrent[msg->u.request.type]); |
| break; |
| |
| case ReqType_NodeName: |
| request = new CExtNodeNameReq(msgType, pid, msg); |
| request->setConcurrent(reqConcurrent[msg->u.request.type]); |
| break; |
| |
| case ReqType_NodeUp: |
| request = new CExtNodeUpReq(msgType, pid, msg); |
| request->setConcurrent(reqConcurrent[msg->u.request.type]); |
| break; |
| #if 0 |
| case ReqType_PersistAdd: |
| request = new CExtPersistAddReq(msgType, pid, msg); |
| request->setConcurrent(reqConcurrent[msg->u.request.type]); |
| break; |
| |
| case ReqType_PersistDelete: |
| request = new CExtPersistDeleteReq(msgType, pid, msg); |
| request->setConcurrent(reqConcurrent[msg->u.request.type]); |
| break; |
| #endif |
| case ReqType_Shutdown: |
| request = new CExtShutdownReq(msgType, pid, msg); |
| request->setConcurrent(reqConcurrent[msg->u.request.type]); |
| break; |
| |
| case ReqType_TmLeader: |
| request = new CExtTmLeaderReq(msgType, pid, msg); |
| request->setConcurrent(reqConcurrent[msg->u.request.type]); |
| break; |
| |
| case ReqType_TmReady: |
| request = new CExtTmReadyReq(msgType, pid, msg); |
| request->setConcurrent(reqConcurrent[msg->u.request.type]); |
| break; |
| |
| case ReqType_TmSync: |
| request = new CExtTmSyncReq(msgType, pid, msg); |
| request->setConcurrent(reqConcurrent[msg->u.request.type]); |
| break; |
| |
| case ReqType_ZoneInfo: |
| request = new CExtZoneInfoReq(msgType, pid, msg); |
| request->setConcurrent(reqConcurrent[msg->u.request.type]); |
| break; |
| |
| case ReqType_OpenInfo: |
| case ReqType_Notice: |
| case ReqType_TransInfo: |
| case ReqType_Stfsd: |
| #endif |
| default: |
| // Invalid request type |
| if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS)) |
| trace_printf("%s@%d invalid request type\n", |
| method_name, __LINE__); |
| // Send error reply |
| request = new CExtNullReq(msgType, nid, pid, sockFd, msg); |
| request->errorReply( MPI_ERR_REQUEST ); |
| delete request; |
| request = NULL; |
| } |
| } |
| #ifndef NAMESERVER_PROCESS |
| else if (msg && msg->type == MsgType_UnsolicitedMessage) |
| { |
| if ( msg->u.reply.type == ReplyType_TmSync ) |
| { |
| // This is a reply to an UnsolicitedMessage/TmSync request to the |
| // DTM. This needs to be handled immediately rather than |
| // being queued and processed later. That's because the |
| // TmSync operations master could be blocked waiting for |
| // and so queueing the request would be ineffective. |
| |
| // Record statistics (sonar counters) |
| if (sonar_verify_state(SONAR_ENABLED | SONAR_MONITOR_ENABLED)) |
| MonStats->msg_type_unsolicited_Incr(); |
| |
| if (trace_settings & (TRACE_REQUEST | TRACE_TMSYNC)) |
| trace_printf("%s@%d - TmSync reply\n", method_name, __LINE__); |
| Monitor->ProcessTmSyncReply ( msg ); |
| |
| // Signal client so local io buffer can be freed |
| int error; |
| SQ_theLocalIOToClient->sendCtlMsg ( pid, |
| MC_ReadySend, |
| ((SharedMsgDef*)msg)-> |
| trailer.index, |
| &error |
| ); |
| } |
| else |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| sprintf(buf, "[%s], Unknown reply type.\n", method_name); |
| mon_log_write(MON_REQQUEUE_PREP_EXT_REQ_1, SQ_LOG_ERR, buf); |
| } |
| } |
| #endif |
| |
| else if (msgType == CExternalReq::ShutdownWork) |
| { |
| request = new CExtNullReq(msgType, nid, pid, sockFd, msg); |
| request->setConcurrent(true); |
| } |
| else |
| { |
| if ( trace_settings & TRACE_REQUEST ) |
| trace_printf("%s@%d Unknown message type=%d\n", |
| method_name, __LINE__, |
| ((msg != NULL) ? msg->type : -1)); |
| |
| // Send error reply |
| request = new CExtNullReq(msgType, nid, pid, sockFd, msg); |
| request->errorReply( MPI_ERR_REQUEST ); |
| delete request; |
| request = NULL; |
| } |
| |
| TRACE_EXIT; |
| return request; |
| } |
| |
| // Enqueue an external request |
| void CReqQueue::enqueueReq(CExternalReq::reqQueueMsg_t msgType, |
| int nid, int pid, int sockFd, |
| struct message_def *msg) |
| { |
| const char method_name[] = "CReqQueue::enqueueReq (ext)"; |
| TRACE_ENTRY; |
| |
| MemModLock.lock(); |
| |
| CExternalReq *extReq; |
| extReq = prepExternalReq(msgType, nid, pid, sockFd, msg); |
| |
| if (extReq) |
| { // Have a valid external request |
| reqQueueLock_.lock(); |
| |
| extReq->setId ( numRequests_ ); |
| reqQueue_.push_back (extReq); |
| |
| if ( trace_settings & TRACE_REQUEST ) |
| trace_printf("%s@%d queueing request #%ld\n", method_name, |
| __LINE__, numRequests_); |
| |
| // Maintain statistics |
| ++numRequests_; |
| int listSize = reqQueue_.size(); |
| if (listSize > maxQueueSize_) |
| maxQueueSize_ = listSize; |
| |
| // Record statistics (sonar counters) |
| if (sonar_verify_state(SONAR_ENABLED | SONAR_MONITOR_ENABLED)) |
| MonStats->ReqQueueIncr(); |
| |
| // Since there is a new request, possibly wake up a worker thread |
| // to work on it. |
| if (extReq->isShutdown()) |
| reqQueueLock_.wakeAll(); |
| else |
| reqQueueLock_.wakeOne(); |
| |
| reqQueueLock_.unlock(); |
| } |
| |
| MemModLock.unlock(); |
| |
| TRACE_EXIT; |
| } |
| |
| #ifndef NAMESERVER_PROCESS |
| void CReqQueue::enqueueCloneReq ( struct clone_def *cloneDef ) |
| { |
| CInternalReq * request; |
| |
| request = new CIntCloneProcReq( cloneDef->backup |
| , cloneDef->unhooked |
| , cloneDef->event_messages |
| , cloneDef->system_messages |
| , cloneDef->nid |
| , cloneDef->type |
| , cloneDef->priority |
| , cloneDef->parent_nid |
| , cloneDef->parent_pid |
| , cloneDef->parent_verifier |
| , cloneDef->os_pid |
| , cloneDef->verifier |
| , cloneDef->prior_pid |
| , cloneDef->persistent_retries |
| , cloneDef->argc |
| , cloneDef->creation_time |
| , cloneDef->pathStrId |
| , cloneDef->ldpathStrId |
| , cloneDef->programStrId |
| , cloneDef->nameLen |
| , cloneDef->portLen |
| , cloneDef->infileLen |
| , cloneDef->outfileLen |
| , cloneDef->argvLen |
| , &cloneDef->stringData |
| , cloneDef->origPNidNs); |
| enqueueReq ( request ); |
| } |
| #endif |
| |
| #ifdef NAMESERVER_PROCESS |
| void CReqQueue::enqueueCloneReq ( struct clone_def *cloneDef ) |
| { |
| CInternalReq * request; |
| |
| request = new CIntCloneProcNsReq( cloneDef->backup |
| , cloneDef->unhooked |
| , cloneDef->event_messages |
| , cloneDef->system_messages |
| , cloneDef->nid |
| , cloneDef->type |
| , cloneDef->priority |
| , cloneDef->parent_nid |
| , cloneDef->parent_pid |
| , cloneDef->parent_verifier |
| , cloneDef->os_pid |
| , cloneDef->verifier |
| , cloneDef->prior_pid |
| , cloneDef->persistent_retries |
| , cloneDef->argc |
| , cloneDef->creation_time |
| , cloneDef->pathLen |
| , cloneDef->ldpathLen |
| , cloneDef->programLen |
| , cloneDef->nameLen |
| , cloneDef->portLen |
| , cloneDef->infileLen |
| , cloneDef->outfileLen |
| , cloneDef->argvLen |
| , &cloneDef->stringData |
| , cloneDef->origPNidNs); |
| enqueueReq ( request ); |
| } |
| #endif |
| |
| void CReqQueue::enqueueActivateSpareReq (CNode *spareNode, CNode *downNode, bool checkHealth ) |
| { |
| CInternalReq * request; |
| |
| request = new CIntActivateSpareReq ( spareNode, downNode, checkHealth ); |
| |
| // request->setPriority(CRequest::High); |
| |
| enqueueReq ( request ); |
| } |
| |
| void CReqQueue::enqueueReviveReq () |
| { |
| CInternalReq * request; |
| |
| request = new CIntReviveReq ( ); |
| |
| enqueueReq ( request ); |
| } |
| |
| void CReqQueue::enqueueSnapshotReq (unsigned long long seqnum) |
| { |
| CInternalReq * request; |
| |
| request = new CIntSnapshotReq ( seqnum ); |
| |
| enqueueReq ( request ); |
| } |
| |
| void CReqQueue::enqueueQuiesceReq () |
| { |
| CInternalReq * request; |
| |
| request = new CQuiesceReq ( ); |
| |
| request->setPriority(CRequest::High); |
| |
| enqueueReq ( request ); |
| } |
| |
| void CReqQueue::enqueuePostQuiesceReq () |
| { |
| CInternalReq * request; |
| |
| request = new CPostQuiesceReq ( ); |
| |
| request->setPriority(CRequest::High); |
| |
| enqueueReq ( request ); |
| } |
| |
| #ifndef NAMESERVER_PROCESS |
| void CReqQueue::enqueueDeviceReq ( char *ldevName ) |
| { |
| CInternalReq * request; |
| |
| request = new CIntDeviceReq ( ldevName ); |
| |
| enqueueReq ( request ); |
| } |
| #endif |
| |
| void CReqQueue::enqueueNameServerAddReq( int req_nid |
| , int req_pid |
| , Verifier_t req_verifier |
| , char *node_name |
| ) |
| { |
| CInternalReq * request; |
| |
| request = new CIntNameServerAddReq( req_nid |
| , req_pid |
| , req_verifier |
| , node_name |
| ); |
| |
| request->setPriority(CRequest::High); |
| |
| enqueueReq ( request ); |
| } |
| |
| void CReqQueue::enqueueNameServerDeleteReq( int req_nid |
| , int req_pid |
| , Verifier_t req_verifier |
| , char *node_name ) |
| { |
| CInternalReq * request; |
| |
| request = new CIntNameServerDeleteReq( req_nid |
| , req_pid |
| , req_verifier |
| , node_name ); |
| |
| request->setPriority(CRequest::High); |
| |
| enqueueReq ( request ); |
| } |
| |
| void CReqQueue::enqueueNodeAddReq( int req_nid |
| , int req_pid |
| , Verifier_t req_verifier |
| , char *node_name |
| , int firstCore |
| , int lastCore |
| , int processors |
| , int roles |
| ) |
| { |
| CInternalReq * request; |
| |
| request = new CIntNodeAddReq( req_nid |
| , req_pid |
| , req_verifier |
| , node_name |
| , firstCore |
| , lastCore |
| , processors |
| , roles |
| ); |
| |
| request->setPriority(CRequest::High); |
| |
| enqueueReq ( request ); |
| } |
| |
| void CReqQueue::enqueueNodeDeleteReq( int req_nid |
| , int req_pid |
| , Verifier_t req_verifier |
| , int pnid ) |
| { |
| CInternalReq * request; |
| |
| request = new CIntNodeDeleteReq( req_nid |
| , req_pid |
| , req_verifier |
| , pnid ); |
| |
| request->setPriority(CRequest::High); |
| |
| enqueueReq ( request ); |
| } |
| |
| void CReqQueue::enqueueDownReq( int pnid ) |
| { |
| CInternalReq * request; |
| |
| request = new CIntDownReq ( pnid ); |
| |
| request->setPriority(CRequest::High); |
| |
| enqueueReq ( request ); |
| } |
| |
| void CReqQueue::enqueueNodeNameReq( int req_nid |
| , int req_pid |
| , Verifier_t req_verifier |
| , char *current_name |
| , char *new_name) |
| { |
| CInternalReq * request; |
| |
| request = new CIntNodeNameReq( req_nid |
| , req_pid |
| , req_verifier |
| , current_name |
| , new_name ); |
| |
| enqueueReq ( request ); |
| } |
| |
| void CReqQueue::enqueueSoftNodeDownReq( int pnid ) |
| { |
| CInternalReq * request; |
| |
| request = new CIntSoftNodeDownReq ( pnid ); |
| |
| request->setPriority(CRequest::High); |
| |
| enqueueReq ( request ); |
| } |
| |
| void CReqQueue::enqueueSoftNodeUpReq( int pnid ) |
| { |
| CInternalReq * request; |
| |
| request = new CIntSoftNodeUpReq ( pnid ); |
| |
| enqueueReq ( request ); |
| } |
| |
| void CReqQueue::enqueueShutdownReq( int level ) |
| { |
| CInternalReq * request; |
| |
| request = new CIntShutdownReq ( level ); |
| |
| request->setPriority(CRequest::High); |
| |
| enqueueReq ( request ); |
| } |
| |
| void CReqQueue::enqueueUpReq( int pnid, char *node_name, int merge_lead ) |
| { |
| CInternalReq * request; |
| |
| request = new CIntUpReq ( pnid, node_name, merge_lead ); |
| |
| |
| enqueueReq ( request ); |
| } |
| |
| #ifndef NAMESERVER_PROCESS |
| void CReqQueue::enqueueExitReq( struct exit_def *exitDef ) |
| { |
| CIntExitReq * request; |
| |
| request = new CIntExitReq ( ); |
| request->prepRequest( exitDef ); |
| |
| enqueueReq ( request ); |
| } |
| #else |
| void CReqQueue::enqueueExitNsReq( struct exit_ns_def *exitDef ) |
| { |
| CIntExitNsReq * request; |
| |
| request = new CIntExitNsReq ( ); |
| request->prepRequest( exitDef ); |
| |
| enqueueReq ( request ); |
| } |
| #endif |
| |
| #ifndef NAMESERVER_PROCESS |
| void CReqQueue::enqueueIoDataReq( ioData_t *ioData ) |
| { |
| CInternalReq * request; |
| |
| request = new CIntIoDataReq ( ioData ); |
| |
| enqueueReq ( request ); |
| } |
| #endif |
| |
| #ifndef NAMESERVER_PROCESS |
| void CReqQueue::enqueueKillReq( struct kill_def *killDef ) |
| { |
| CInternalReq * request; |
| |
| request = new CIntKillReq ( killDef ); |
| |
| enqueueReq ( request ); |
| } |
| #endif |
| |
| #ifndef NAMESERVER_PROCESS |
| void CReqQueue::enqueueNewProcReq( struct process_def *procDef ) |
| { |
| CIntNewProcReq * request; |
| |
| request = new CIntNewProcReq( procDef->nid |
| , procDef->type |
| , procDef->priority |
| , procDef->backup |
| , procDef->parent_nid |
| , procDef->parent_pid |
| , procDef->parent_verifier |
| , procDef->pair_parent_nid |
| , procDef->pair_parent_pid |
| , procDef->pair_parent_verifier |
| , procDef->argc |
| , procDef->unhooked |
| , procDef->tag |
| , procDef->pathStrId |
| , procDef->ldpathStrId |
| , procDef->programStrId |
| , procDef->nameLen |
| , procDef->infileLen |
| , procDef->outfileLen |
| , procDef->argvLen |
| , &procDef->stringData ); |
| |
| enqueueReq ( request ); |
| } |
| #endif |
| |
| #ifdef NAMESERVER_PROCESS |
| void CReqQueue::enqueueNewProcNsReq( struct process_def *procDef ) |
| { |
| CIntNewProcNsReq* request; |
| |
| request = new CIntNewProcNsReq( procDef->nid |
| , procDef->pid |
| , procDef->verifier |
| , procDef->type |
| , procDef->priority |
| , procDef->backup |
| , procDef->parent_nid |
| , procDef->parent_pid |
| , procDef->parent_verifier |
| , procDef->pair_parent_nid |
| , procDef->pair_parent_pid |
| , procDef->pair_parent_verifier |
| , procDef->argc |
| , procDef->unhooked |
| , procDef->tag |
| , procDef->pathLen |
| , procDef->ldpathLen |
| , procDef->programLen |
| , procDef->nameLen |
| , procDef->infileLen |
| , procDef->outfileLen |
| , procDef->argvLen |
| , &procDef->stringData ); |
| |
| enqueueReq ( request ); |
| } |
| #endif |
| |
| #ifndef NAMESERVER_PROCESS |
| void CReqQueue::enqueueNotifyReq( struct notify_def *notifyDef ) |
| { |
| CIntNotifyReq * request; |
| |
| request = new CIntNotifyReq( notifyDef ); |
| |
| enqueueReq ( request ); |
| } |
| #endif |
| |
| #ifndef NAMESERVER_PROCESS |
| void CReqQueue::enqueueOpenReq( struct open_def *openDef ) |
| { |
| CIntOpenReq * request; |
| |
| request = new CIntOpenReq( openDef ); |
| |
| enqueueReq ( request ); |
| } |
| #endif |
| |
| void CReqQueue::enqueueProcInitReq( struct process_init_def *procInitDef ) |
| { |
| CIntProcInitReq * request; |
| |
| request = new CIntProcInitReq( procInitDef ); |
| |
| enqueueReq ( request ); |
| } |
| |
| void CReqQueue::enqueueSetReq( struct set_def *setDef ) |
| { |
| CIntSetReq * request; |
| |
| request = new CIntSetReq (setDef->type, setDef->group, |
| setDef->key, &setDef->valueData); |
| |
| enqueueReq ( request ); |
| } |
| |
| #ifndef NAMESERVER_PROCESS |
| void CReqQueue::enqueueStdInReq( struct stdin_req_def *stdin_req ) |
| { |
| CInternalReq * request; |
| |
| request = new CIntStdInReq ( stdin_req ); |
| |
| enqueueReq ( request ); |
| } |
| #endif |
| |
| void CReqQueue::enqueueUniqStrReq( struct uniqstr_def *uniqStrDef ) |
| { |
| CIntUniqStrReq * request; |
| |
| request = new CIntUniqStrReq (uniqStrDef->nid, uniqStrDef->id, |
| &uniqStrDef->valueData); |
| |
| enqueueReq ( request ); |
| } |
| |
| #ifndef NAMESERVER_PROCESS |
| void CReqQueue::enqueueChildDeathReq( pid_t pid ) |
| { |
| CIntChildDeathReq * request; |
| |
| request = new CIntChildDeathReq ( pid ); |
| // queue the request to be handled later by worker thread |
| enqueueReq ( request ); |
| } |
| #endif |
| |
| #ifndef NAMESERVER_PROCESS |
| void CReqQueue::enqueueAttachedDeathReq( pid_t pid ) |
| { |
| CIntAttachedDeathReq * request; |
| |
| request = new CIntAttachedDeathReq ( pid ); |
| // queue the request to be handled later by worker thread |
| enqueueReq ( request ); |
| } |
| #endif |
| |
| #ifndef NAMESERVER_PROCESS |
| void CReqQueue::enqueueCreatePrimitiveReq( int pnid ) |
| { |
| CInternalReq * request; |
| |
| request = new CIntCreatePrimitiveReq( pnid ); |
| |
| enqueueReq ( request ); |
| } |
| #endif |
| |
| #ifndef NAMESERVER_PROCESS |
| void CReqQueue::enqueueTmReadyReq( int nid ) |
| { |
| CInternalReq * request; |
| |
| request = new CIntTmReadyReq( nid ); |
| |
| enqueueReq ( request ); |
| } |
| #endif |
| |
| // this function moves the queued requests from revive queue to the main request queue. |
| // it will skip the requests whose seq num is less than the given one. |
| void CReqQueue::processReviveRequests(unsigned long long minSeqNum) |
| { |
| const char method_name[] = "CReqQueue::processReviveRequests"; |
| TRACE_ENTRY; |
| |
| reqListInt_t::iterator it; |
| |
| while ( true ) |
| { |
| reqReviveQueueLock_.lock(); |
| it = reqReviveQueue_.begin(); |
| |
| if (it == reqReviveQueue_.end()) |
| { |
| IAmIntegrating = false; // all subsequent requests will go to regular req queue |
| IAmIntegrated = true; |
| reqReviveQueueLock_.unlock(); |
| break; |
| } |
| |
| CInternalReq *request = *it; |
| it = reqReviveQueue_.erase (it); // remove the request from the list |
| reqReviveQueueLock_.unlock(); // so that Sync thread can keep adding to the revive list |
| |
| unsigned long long reqSeqNum = request->getSeqNum(); |
| |
| // move requests whose seq num is above the minSeqNum, discard others. |
| if (reqSeqNum > minSeqNum) |
| { |
| enqueueReq( request, true ); |
| |
| if ( trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY) ) |
| trace_printf("%s@%d Req moved from revive to regular queue. " |
| "Req seq num = %llu, Min seq num = %llu\n", |
| method_name, __LINE__, reqSeqNum, minSeqNum); |
| } |
| else |
| { |
| delete request; |
| |
| if ( trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY) ) |
| trace_printf("%s@%d Req discarded from revive queue. " |
| "Req seq num = %llu, Min seq num = %llu\n", |
| method_name, __LINE__, reqSeqNum, minSeqNum); |
| } |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| // this function adds new request to the revive queue instead of the regular req queue. |
| bool CReqQueue::addToReqReviveQueue( CInternalReq *req ) |
| { |
| const char method_name[] = "CReqQueue::addToReqReviveQueue"; |
| TRACE_ENTRY; |
| |
| bool result = false; // request not added yet. |
| |
| if ( IAmIntegrating ) |
| { |
| reqReviveQueueLock_.lock(); |
| |
| if ( IAmIntegrating ) // check again because it could have gotten turned off while waiting on lock. |
| { |
| reqReviveQueue_.push_back(req); |
| |
| int qsize = reqReviveQueue_.size(); |
| |
| if ( trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY) ) |
| trace_printf("%s@%d Req added to revive queue. Revive queue size = %d\n", |
| method_name, __LINE__, qsize); |
| |
| if ( qsize > MAX_REVIVE_QUEUE_SIZE ) |
| { |
| // remove a request set from the front of the queue. |
| // the set is a group of all requests with the same seq num. |
| unsigned long long seqnum = reqReviveQueue_.front()->getSeqNum(); |
| while (seqnum == reqReviveQueue_.front()->getSeqNum()) |
| { |
| reqReviveQueue_.pop_front(); // calls destructor |
| |
| if ( trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY) ) |
| trace_printf("%s@%d Req removed from revive queue. Revive queue size = %d\n", |
| method_name, __LINE__, qsize); |
| } |
| } |
| |
| result = true; |
| } |
| |
| reqReviveQueueLock_.unlock(); |
| } |
| |
| TRACE_EXIT; |
| |
| return result; |
| } |
| |
| void CReqQueue::enqueueReq( CInternalReq *req, bool reviveOper ) |
| { |
| const char method_name[] = "CReqQueue::enqueueReq (int)"; |
| TRACE_ENTRY; |
| |
| if ( !reviveOper ) |
| { |
| req->setSeqNum ( Monitor->getSeqNum() ); |
| |
| if ( IAmIntegrating && !req->GetReviveFlag() ) |
| { |
| if ( addToReqReviveQueue(req) ) |
| { |
| TRACE_EXIT; |
| return; |
| } |
| } |
| } |
| |
| reqQueueLock_.lock(); |
| |
| req->setId ( numRequests_ ); |
| |
| if (req->getPriority() == CRequest::High) |
| { |
| reqList_t::iterator it; |
| it = reqQueue_.begin(); |
| |
| // find first request not at high priority |
| while ( it != reqQueue_.end() |
| && (*it)->getPriority() == CRequest::High ) |
| { |
| ++it; |
| } |
| |
| // insert new request before first non-high priority request |
| reqQueue_.insert(it, req); |
| } |
| else |
| { |
| reqQueue_.push_back (req); |
| } |
| |
| if ( trace_settings & TRACE_REQUEST ) |
| trace_printf("%s@%d queueing request #%ld\n", method_name, |
| __LINE__, numRequests_); |
| |
| // Maintain statistics |
| ++numRequests_; |
| int listSize = reqQueue_.size(); |
| if (listSize > maxQueueSize_) |
| maxQueueSize_ = listSize; |
| |
| // Record statistics (sonar counters) |
| if (sonar_verify_state(SONAR_ENABLED | SONAR_MONITOR_ENABLED)) |
| MonStats->ReqQueueIncr(); |
| |
| // Since there is a new request, possibly wake up a worker thread |
| // to work on it. |
| reqQueueLock_.wakeOne(); |
| reqQueueLock_.unlock(); |
| |
| TRACE_EXIT; |
| } |
| |
| |
| void CReqQueue::nudgeWorker() |
| { |
| const char method_name[] = "CReqQueue::nudgeWorker"; |
| TRACE_ENTRY; |
| |
| reqQueueLock_.lock(); |
| reqQueueLock_.wakeOne(); |
| reqQueueLock_.unlock(); |
| |
| TRACE_EXIT; |
| } |
| |
| void CReqQueue::timeDiff ( struct timespec t1, struct timespec t2, |
| struct timespec &tDiff) |
| { |
| if ( (t2.tv_nsec - t1.tv_nsec ) < 0 ) |
| { |
| tDiff.tv_sec = t2.tv_sec - t1.tv_sec - 1; |
| tDiff.tv_nsec = 1000000000 + t2.tv_nsec - t1.tv_nsec; |
| } |
| else |
| { |
| tDiff.tv_sec = t2.tv_sec - t1.tv_sec; |
| tDiff.tv_nsec = t2.tv_nsec - t1.tv_nsec; |
| } |
| } |
| |
| bool CReqQueue::responsive(struct timespec &curTime) |
| { |
| const char method_name[] = "CReqQueue::responsive"; |
| TRACE_ENTRY; |
| |
| bool result = true; |
| |
| if ( busyWorkers_ > 0 ) |
| { |
| struct timespec reqTime; |
| |
| // Get difference between request start and now. |
| timeDiff ( mostRecentStart_, curTime, reqTime ); |
| |
| if ( reqTime.tv_sec > getExecTimeMax() ) |
| { // Request has been executing too long |
| if (trace_settings & TRACE_REQUEST) |
| trace_printf("%s@%d" " - request not responsive, reqTime.tv_sec=%ld > responsive=%d" "\n" |
| , method_name, __LINE__ |
| , reqTime.tv_sec, getExecTimeMax()); |
| |
| result = false; |
| } |
| } |
| |
| TRACE_EXIT; |
| |
| return result; |
| } |
| |
| CRequest* CReqQueue::getRequest() |
| { |
| const char method_name[] = "CReqQueue::getRequest"; |
| TRACE_ENTRY; |
| |
| reqQueueLock_.lock(); |
| |
| reqList_t::iterator it; |
| it = reqQueue_.begin(); |
| |
| CRequest *request = NULL; |
| CRequest::ReqStatus_t reqStatus; |
| while (request == NULL) |
| { |
| // If necessary, wait until a currently executing exclusive request |
| // finishes. |
| workerStatusLock_.lock(); |
| while (busyExclusive_) |
| { |
| if ( trace_settings & TRACE_REQUEST ) |
| trace_printf("%s@%d doing workerStatusLock_.wait() waiting for exclusive request to finish\n", method_name, __LINE__); |
| workerStatusLock_.wait(); |
| if ( trace_settings & TRACE_REQUEST ) |
| trace_printf("%s@%d completed workerStatusLock_.wait() waiting for exclusive request to finish\n", method_name, __LINE__); |
| } |
| workerStatusLock_.unlock(); |
| |
| if (it != reqQueue_.end()) |
| { |
| request = *it; |
| if ( trace_settings & TRACE_REQUEST ) |
| trace_printf("%s@%d examining request\n", |
| method_name, __LINE__); |
| |
| // bugcatcher, temp call |
| request->validateObj(); |
| |
| if ( (reqStatus = request->okToExecute( )) != CRequest::OkToExec ) |
| { |
| if ( reqStatus == CRequest::Failed ) |
| { |
| // Take request out of list |
| it = reqQueue_.erase (it); |
| |
| delete request; |
| |
| // Record statistics (sonar counters) |
| if (sonar_verify_state(SONAR_ENABLED | SONAR_MONITOR_ENABLED)) |
| MonStats->ReqQueueDecr(); |
| } |
| else |
| { |
| // Take request out of request list. "it" then |
| // points to next element in the queue. |
| it = reqQueue_.erase (it); |
| |
| // Record statistics (sonar counters) |
| if (sonar_verify_state(SONAR_ENABLED | SONAR_MONITOR_ENABLED)) |
| MonStats->ReqQueueDecr(); |
| |
| if ( trace_settings & TRACE_REQUEST ) |
| { |
| trace_printf("%s@%d request #%ld deferred.\n", |
| method_name, __LINE__, request->getId()); |
| } |
| |
| // Put request on deferred request list |
| reqDeferred_.push_back (request); |
| } |
| request = NULL; |
| } |
| } |
| else |
| { // Wait for a new request to be queued or a currently |
| // executing request to complete. |
| if ( trace_settings & TRACE_REQUEST ) |
| trace_printf("%s@%d waiting for request\n", method_name, |
| __LINE__); |
| reqQueueLock_.wait(); |
| if ( trace_settings & TRACE_REQUEST ) |
| trace_printf("%s@%d finished waiting for request\n", |
| method_name, __LINE__); |
| |
| it = reqQueue_.begin(); |
| } |
| } |
| |
| if (!request->isShutdown()) |
| { |
| // Take request out of list |
| reqQueue_.erase (it); |
| |
| // Record statistics (sonar counters) |
| if (sonar_verify_state(SONAR_ENABLED | SONAR_MONITOR_ENABLED)) |
| MonStats->ReqQueueDecr(); |
| |
| workerStatusLock_.lock(); |
| |
| if ( request->isExclusive() ) |
| { // Request needs to run while no other requests are running. |
| // Wait until any currently executing requests finish. |
| if ( trace_settings & TRACE_REQUEST ) |
| trace_printf("%s@%d request is exclusive\n", |
| method_name, __LINE__); |
| while ( busyWorkers_ > 0 ) |
| { |
| if ( trace_settings & TRACE_REQUEST ) |
| trace_printf("%s@%d exclusive request waiting while " |
| "current requests complete\n", |
| method_name, __LINE__); |
| workerStatusLock_.wait(); |
| } |
| busyExclusive_ = true; |
| } |
| |
| ++busyWorkers_; |
| if (busyWorkers_ > maxBusyWorkers_) |
| maxBusyWorkers_ = busyWorkers_; |
| |
| // Store start time of most recent request. Need this to detect |
| // if a request ever gets stuck unexpectedly. |
| mostRecentStart_ = request->startTime(); |
| |
| // Store max request execution time. This is needed to detect if it is a lengthy request or not. |
| execTimeMax_ = request->getExecTimeMax(); |
| |
| if ( trace_settings & TRACE_REQUEST ) |
| trace_printf("%s@%d will work on %s request #%ld, priority = %d, busyWorkers=%d\n", method_name, __LINE__, (request->isExclusive() ? "exclusive" : ""), request->getId(), (int)request->getPriority(), busyWorkers_); |
| |
| if ( trace_settings & TRACE_REQUEST_DETAIL ) |
| { |
| request->populateRequestString(); |
| trace_printf("%s@%d request = %s\n", method_name, __LINE__, request->requestString()); |
| } |
| |
| workerStatusLock_.unlock(); |
| |
| } |
| |
| reqQueueLock_.unlock(); |
| |
| TRACE_EXIT; |
| |
| return request; |
| } |
| |
| void CReqQueue::finishRequest(CRequest *request) |
| { |
| const char method_name[] = "CReqQueue::finishRequest"; |
| TRACE_ENTRY; |
| |
| workerStatusLock_.lock(); |
| |
| busyExclusive_ = false; |
| |
| --busyWorkers_; |
| if (busyWorkers_ == 0) |
| { |
| if ( trace_settings & TRACE_REQUEST ) |
| trace_printf("%s@%d no busy workers, doing workerStatusLock_.wakeAll()\n", method_name, __LINE__); |
| workerStatusLock_.wakeAll(); |
| } |
| else |
| { |
| if ( trace_settings & TRACE_REQUEST ) |
| trace_printf("%s@%d busyWorkers now=%d\n", method_name, __LINE__, busyWorkers_); |
| } |
| |
| workerStatusLock_.unlock(); |
| |
| reqQueueLock_.lock(); |
| |
| // Release any object ownership acquired at start of request processing. |
| request->giveupOwnership(); |
| |
| if ( !reqDeferred_.empty() ) |
| { // Re-queue deferred requests |
| |
| if ( trace_settings & TRACE_REQUEST ) |
| { |
| trace_printf("%s@%d re-queueing %d deferred requests\n", |
| method_name, __LINE__, (int) reqDeferred_.size()); |
| } |
| |
| int numDeferred = reqDeferred_.size(); |
| |
| reqQueue_.insert ( reqQueue_.begin(), reqDeferred_.begin(), |
| reqDeferred_.end() ); |
| reqDeferred_.erase ( reqDeferred_.begin(), reqDeferred_.end() ); |
| |
| // Record statistics (sonar counters) |
| for (int i=0; i<numDeferred; ++i) |
| { |
| if (sonar_verify_state(SONAR_ENABLED | SONAR_MONITOR_ENABLED)) |
| MonStats->ReqQueueIncr(); |
| } |
| } |
| |
| if (request->isSyncDependent()) |
| { |
| --syncDependentRequests_; |
| if ( trace_settings & TRACE_REQUEST ) |
| trace_printf("%s@%d sync dependent request, syncDependentRequests_ now=%d\n", method_name, __LINE__, syncDependentRequests_); |
| if (syncDependentRequests_ == 0) |
| { |
| reqQueueLock_.wakeAll(); |
| } |
| } |
| |
| reqQueueLock_.unlock(); |
| |
| request->evalReqPerformance(); |
| |
| delete request; |
| |
| TRACE_EXIT; |
| } |
| |
| void CReqQueue::stats() |
| { |
| const char method_name[] = "CReqQueue::stats"; |
| TRACE_ENTRY; |
| |
| if ( trace_settings & (TRACE_REQUEST | TRACE_STATS) ) |
| trace_printf("%s@%d numRequests=%ld, maxQueueSize=%d, maxBusyWorkers=%d\n", method_name, __LINE__, numRequests_, maxQueueSize_, maxBusyWorkers_); |
| |
| TRACE_EXIT; |
| } |
| |
| // Definition of which requests can execute concurrently |
| const bool CReqQueue::reqConcurrent[] = { |
| false, // unused, request types start at 1 |
| false, // ReqType_Close |
| false, // ReqType_DelProcessNs |
| true, // ReqType_Dump |
| false, // ReqType_Event |
| false, // ReqType_Exit |
| true, // ReqType_Get |
| false, // ReqType_Kill |
| false, // ReqType_MonStats |
| false, // ReqType_Mount |
| false, // ReqType_NameServerAdd |
| false, // ReqType_NameServerDelete |
| false, // ReqType_NameServerStart |
| false, // ReqType_NameServerStop |
| false, // ReqType_NewProcess |
| false, // ReqType_NewProcessNs |
| false, // ReqType_NodeAdd |
| false, // ReqType_NodeDelete |
| false, // ReqType_NodeDown |
| false, // ReqType_NodeInfo |
| false, // ReqType_NodeName |
| false, // ReqType_NodeUp |
| false, // ReqType_Notice -- not an actual request |
| false, // ReqType_Notify |
| true, // ReqType_Open |
| false, // ReqType_OpenInfo |
| false, // ReqType_PersistAdd |
| false, // ReqType_PersistDelete |
| false, // ReqType_PNodeInfo |
| false, // ReqType_ProcessInfo |
| false, // ReqType_ProcessInfoCont |
| false, // ReqType_ProcessInfoNs |
| false, // ReqType_Set |
| false, // ReqType_Shutdown |
| false, // ReqType_ShutdownNs |
| false, // ReqType_Startup |
| false, // ReqType_Stfsd |
| false, // ReqType_TmLeader |
| false, // ReqType_TmReady |
| false, // ReqType_TmSync |
| false, // ReqType_TransInfo |
| false, // ReqType_ZoneInfo |
| false // ReqType_Invalid |
| }; |
| |
| // Request names used for trace output |
| const char * CReqQueue::svcReqType[] = { |
| "", // unused, request types start at 1 |
| "Close", // ReqType_Close |
| "DelProcessNs", // ReqType_DelProcessNs |
| "Dump", // ReqType_Dump |
| "Event", // ReqType_Event |
| "Exit", // ReqType_Exit |
| "Get", // ReqType_Get |
| "Kill", // ReqType_Kill |
| "MonStats", // ReqType_MonStats |
| "Mount", // ReqType_Mount |
| "NameServerAdd", // ReqType_NameServerAdd |
| "NameServerDelete", // ReqType_NameServerDelete |
| "NameServerStart", // ReqType_NameServerStart |
| "NameServerStop", // ReqType_NameServerStop |
| "NewProcess", // ReqType_NewProcess |
| "NewProcessNs", // ReqType_NewProcessNs |
| "NodeAdd", // ReqType_NodeAdd |
| "NodeDelete", // ReqType_NodeDelete |
| "NodeDown", // ReqType_NodeDown |
| "NodeInfo", // ReqType_NodeInfo |
| "NodeName", // ReqType_NodeName |
| "NodeUp", // ReqType_NodeUp |
| "Notice", // ReqType_Notice -- not an actual request |
| "Notify", // ReqType_Notify |
| "Open", // ReqType_Open |
| "OpenInfo", // ReqType_OpenInfo |
| "PersistAdd", // ReqType_PersistAdd |
| "PersistDelete", // ReqType_PersistDelete |
| "PNodeInfo", // ReqType_PNodeInfo |
| "ProcessInfo", // ReqType_ProcessInfo |
| "ProcessInfoCont", // ReqType_ProcessInfoCont |
| "ProcessInfoNs", // ReqType_ProcessInfoNs |
| "Set", // ReqType_Set |
| "Shutdown", // ReqType_Shutdown |
| "ShutdownNs", // ReqType_ShutdownNs |
| "Startup", // ReqType_Startup |
| "Stfsd", // ReqType_Stfsd |
| "TmLeader", // ReqType_TmLeader |
| "TmReady", // ReqType_TmReady |
| "TmSync", // ReqType_TmSync |
| "TransInfo", // ReqType_TransInfo |
| "ZoneInfo" // ReqType_ZoneInfo |
| "Invalid" // ReqType_Invalid |
| }; |
| |
| // Must match internal.h:InternalType |
| const char * CReqQueue::intReqType[] = { |
| "" // InternalType_Null |
| , "ActivateSpare" // InternalType_ActivateSpare |
| , "Clone" // InternalType_Clone |
| , "Device" // InternalType_Device |
| , "Down" // InternalType_Down |
| , "Dump" // InternalType_Dump |
| , "DumpComplete" // InternalType_DumpComplete |
| , "Event" // InternalType_Event |
| , "Exit" // InternalType_Exit |
| , "IoData" // InternalType_IoData |
| , "Kill" // InternalType_Kill |
| , "NameServerAdd" // InternalType_NameServerAdd |
| , "NameServerDelete" // InternalType_NameServerDelete |
| , "NodeAdd" // InternalType_NodeAdd |
| , "NodeAdded" // InternalType_NodeAdded |
| , "NodeDelete" // InternalType_NodeDelete |
| , "NodeDeleted" // InternalType_NodeDeleted |
| , "NodeName" // InternalType_NodeName |
| , "Notify" // InternalType_Notify |
| , "PersistAdd" // InternalType_PersistAdd |
| , "PersistDelete" // InternalType_PersistDelete |
| , "Process" // InternalType_Process |
| , "ProcessInit" // InternalType_ProcessInit |
| , "Open" // InternalType_Open |
| , "Set" // InternalType_Set |
| , "StdinReq" // InternalType_StdinReq |
| , "Sync" // InternalType_Sync |
| , "Up" // InternalType_Up |
| , "CreatePrimitives" // InternalType_CreatePrimitives |
| , "Quiesce" // InternalType_Quiesce |
| , "PostQuiesce" // InternalType_PostQuiece |
| , "Revive" // InternalType_Revive |
| , "Snapshot" // InternalType_Snapshot |
| , "UniqStr" // InternalType_UniqStr |
| , "TMReady" // InternalType_TmReady |
| , "Shutdown" // InternalType_Shutdown |
| , "SchedData" // InternalType_SchedData |
| , "SoftNodeDown" // InternalType_SoftNodeDown |
| , "SoftNodeUp" // InternalType_SoftNodeUp |
| }; |
| |