| /////////////////////////////////////////////////////////////////////////////// |
| // |
| // @@@ START COPYRIGHT @@@ |
| // |
| // (C) Copyright 2008-2014 Hewlett-Packard Development Company, L.P. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| // |
| // @@@ END COPYRIGHT @@@ |
| // |
| /////////////////////////////////////////////////////////////////////////////// |
| |
| #include <iostream> |
| |
| using namespace std; |
| |
| #include <stdio.h> |
| #include <stdlib.h> |
| #include <setjmp.h> |
| #include <signal.h> |
| #include <fcntl.h> |
| #include <netdb.h> |
| #include <sys/epoll.h> |
| #include <sys/file.h> |
| #include <sys/time.h> |
| #include <sys/resource.h> |
| #include <errno.h> |
| #include <limits.h> |
| |
| #include "localio.h" |
| #include "mlio.h" |
| #include "monlogging.h" |
| #include "monsonar.h" |
| #include "montrace.h" |
| #include "redirector.h" |
| #include "healthcheck.h" |
| #include "config.h" |
| #include "device.h" |
| #include "cluster.h" |
| #include "monitor.h" |
| |
| #include "replicate.h" |
| |
| #include "clusterconf.h" |
| #include "lnode.h" |
| #include "pnode.h" |
| #include "reqqueue.h" |
| |
| extern bool IAmIntegrating; |
| extern bool IAmIntegrated; |
| extern bool IsRealCluster; |
| extern char IntegratingMonitorPort[MPI_MAX_PORT_NAME]; |
| extern char MyCommPort[MPI_MAX_PORT_NAME]; |
| extern char MyMPICommPort[MPI_MAX_PORT_NAME]; |
| extern char MySyncPort[MPI_MAX_PORT_NAME]; |
| extern bool SMSIntegrating; |
| extern int CreatorShellPid; |
| extern Verifier_t CreatorShellVerifier; |
| extern CommType_t CommType; |
| |
| extern int MyPNID; |
| |
| extern CReqQueue ReqQueue; |
| extern char Node_name[MPI_MAX_PROCESSOR_NAME]; |
| |
| extern CMonitor *Monitor; |
| extern CNodeContainer *Nodes; |
| extern CConfigContainer *Config; |
| extern CDeviceContainer *Devices; |
| extern CNode *MyNode; |
| extern CMonStats *MonStats; |
| extern CRedirector Redirector; |
| extern CMonLog *MonLog; |
| extern CHealthCheck HealthCheck; |
| |
| extern long next_test_delay; |
| extern CReplicate Replicator; |
| |
| extern char *ErrorMsg (int error_code); |
| |
| const char *JoiningPhaseString( JOINING_PHASE phase); |
| const char *StateString( STATE state); |
| const char *SyncStateString( SyncState state); |
| const char *EpollEventString( __uint32_t events ); |
| const char *EpollOpString( int op ); |
| |
| void CCluster::ActivateSpare( CNode *spareNode, CNode *downNode, bool checkHealth ) |
| { |
| const char method_name[] = "CCluster::ActivateSpare"; |
| TRACE_ENTRY; |
| // if not checking health, assume the spare is healthy |
| bool spareHealthy = checkHealth ? false : true; |
| int tmCount = 0; |
| CNode *node; |
| CLNode *lnode; |
| |
| if (trace_settings & TRACE_INIT) |
| { |
| trace_printf( "%s@%d - pnid=%d, name=%s (%s) is taking over pnid=%d, name=%s (%s), check health=%d, isIntegrating=%d , integrating pnid=%d\n" |
| , method_name, __LINE__ |
| , spareNode->GetPNid(), spareNode->GetName(), StateString(spareNode->GetState()) |
| , downNode->GetPNid(), downNode->GetName(), StateString(downNode->GetState()) |
| , checkHealth, IsIntegrating(), integratingPNid_ ); |
| } |
| |
| if ( checkHealth ) |
| { |
| // TODO: Execute physical node health check script here |
| spareHealthy = true; |
| if ( !spareHealthy ) |
| { |
| // and tell the cluster the node is down, since the spare can't takeover |
| CReplNodeDown *repl = new CReplNodeDown(downNode->GetPNid()); |
| Replicator.addItem(repl); |
| } |
| } |
| |
| if ( spareHealthy ) |
| { |
| if ( downNode->GetPNid() != spareNode->GetPNid() ) |
| { |
| // Move down node's logical nodes to spare node |
| downNode->MoveLNodes( spareNode ); |
| |
| spareNode->SetPhase( Phase_Activating ); |
| |
| if ( !IsIntegrating() ) |
| { |
| downNode->SetState( State_Down ); |
| |
| // Send process death notices |
| spareNode->KillAllDown(); |
| |
| // Send node down notice |
| lnode = spareNode->GetFirstLNode(); |
| for ( ; lnode; lnode = lnode->GetNext() ) |
| { |
| // Watchdog process clone was removed in KillAllDown |
| lnode->Down(); |
| } |
| } |
| } |
| |
| // Any DTMs running? |
| for ( int i=0; !tmCount && i < Nodes->GetNodesCount(); i++ ) |
| { |
| node = Nodes->GetNode( i ); |
| lnode = node->GetFirstLNode(); |
| for ( ; lnode; lnode = lnode->GetNext() ) |
| { |
| CProcess *process = lnode->GetProcessLByType( ProcessType_DTM ); |
| if ( process ) tmCount++; |
| } |
| } |
| |
| spareNode->ResetSpareNode(); |
| |
| // Create Watchdog and PSD processes if this node is the activating spare |
| if ( spareNode->GetPNid() == MyPNID ) |
| { |
| Monitor->StartPrimitiveProcesses(); |
| } |
| else |
| { |
| // Check for end of joining phase on node re-integration |
| if ( spareNode->GetState() == State_Joining ) |
| { |
| spareNode->SetState( State_Up ); |
| } |
| if ( tmCount ) |
| { |
| // Send node prepare notice to local DTM processes |
| lnode = spareNode->GetFirstLNode(); |
| for ( ; lnode; lnode = lnode->GetNext() ) |
| { |
| lnode->PrepareForTransactions( downNode->GetPNid() != spareNode->GetPNid() ); |
| } |
| } |
| } |
| |
| if ( downNode->GetPNid() != spareNode->GetPNid() ) |
| { |
| // we need to abort any active TmSync |
| if (( MyNode->GetTmSyncState() == SyncState_Start ) || |
| ( MyNode->GetTmSyncState() == SyncState_Continue ) || |
| ( MyNode->GetTmSyncState() == SyncState_Commit ) ) |
| { |
| MyNode->SetTmSyncState( SyncState_Abort ); |
| Monitor->SetAbortPendingTmSync(); |
| if (trace_settings & (TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC)) |
| trace_printf("%s@%d" " - Node " "%d" " TmSyncState updated (" "%d" ")" "\n", method_name, __LINE__, MyPNID, MyNode->GetTmSyncState()); |
| } |
| } |
| |
| if (trace_settings & TRACE_INIT) |
| { |
| trace_printf( "%s@%d - Spare node activating! pnid=%d, name=(%s)\n" |
| , method_name, __LINE__ |
| , spareNode->GetPNid(), spareNode->GetName()); |
| } |
| } |
| |
| if ( spareNode->GetPNid() == MyPNID && spareHealthy ) |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf( "%s@%d" " - Replicating activate spare node pnid=%d, name=%s (%s), spare=%d, down pnid=%d, name=%s (%s), DTM count=%d\n" |
| , method_name, __LINE__ |
| , spareNode->GetPNid(), spareNode->GetName(), StateString(spareNode->GetState()) |
| , spareNode->IsSpareNode() |
| , downNode->GetPNid(), downNode->GetName(), StateString(downNode->GetState()) |
| , tmCount ); |
| // Let other monitors know is ok to activate this spare node |
| CReplActivateSpare *repl = new CReplActivateSpare( MyPNID, downNode->GetPNid() ); |
| Replicator.addItem(repl); |
| |
| if ( !tmCount ) |
| { |
| // No DTMs in environment so implicitly make ready for transactions |
| lnode = MyNode->GetFirstLNode(); |
| for ( ; lnode; lnode = lnode->GetNext() ) |
| { |
| ReqQueue.enqueueTmReadyReq( lnode->GetNid() ); |
| } |
| } |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| void CCluster::NodeTmReady( int nid ) |
| { |
| const char method_name[] = "CCluster::NodeTmReady"; |
| TRACE_ENTRY; |
| |
| if (trace_settings & TRACE_INIT) |
| { |
| trace_printf( "%s@%d - nid=%d\n", method_name, __LINE__, nid ); |
| } |
| |
| tmReadyCount_++; |
| |
| MyNode->StartPStartDPersistentDTM( nid ); |
| |
| if ( MyNode->GetNumLNodes() == tmReadyCount_ ) |
| { |
| char la_buf[MON_STRING_BUF_SIZE]; |
| sprintf(la_buf, "[%s], Node activated! pnid=%d, name=(%s) \n", method_name, MyNode->GetPNid(), MyNode->GetName()); |
| mon_log_write(MON_CLUSTER_NODE_TM_READY, SQ_LOG_INFO, la_buf); |
| |
| // Let other monitors know the node is up |
| CReplActivateSpare *repl = new CReplActivateSpare( MyPNID, -1 ); |
| Replicator.addItem(repl); |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| void CCluster::NodeReady( CNode *spareNode ) |
| { |
| const char method_name[] = "CCluster::NodeReady"; |
| TRACE_ENTRY; |
| |
| if (trace_settings & TRACE_INIT) |
| { |
| trace_printf( "%s@%d - spare node %s pnid=%d\n" |
| , method_name, __LINE__, spareNode->GetName(), spareNode->GetPNid() ); |
| } |
| |
| assert( spareNode->GetState() == State_Up ); |
| |
| // Send node up notice |
| CLNode *lnode = spareNode->GetFirstLNode(); |
| for ( ; lnode; lnode = lnode->GetNext() ) |
| { |
| lnode->Up(); |
| } |
| |
| switch( CommType ) |
| { |
| case CommType_InfiniBand: |
| if ( joinComm_ != MPI_COMM_NULL ) |
| { |
| MPI_Comm_free( &joinComm_ ); |
| joinComm_ = MPI_COMM_NULL; |
| } |
| break; |
| case CommType_Sockets: |
| if ( joinSock_ != -1 ) |
| { |
| close(joinSock_); |
| joinSock_ = -1; |
| } |
| break; |
| default: |
| // Programmer bonehead! |
| abort(); |
| } |
| spareNode->SetActivatingSpare( false ); |
| integratingPNid_ = -1; |
| if ( MyNode->IsCreator() ) |
| { |
| MyNode->SetCreator( false, -1, -1 ); |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| |
| long CCluster::AssignTmSeqNumber( void ) |
| { |
| struct sync_def sync; |
| |
| const char method_name[] = "CCluster::AssignTmSeqNumber"; |
| TRACE_ENTRY; |
| |
| sync.type = SyncType_TmSeqNum; |
| sync.length = 0; |
| syncCycle_.lock(); |
| exchangeTmSyncData( &sync ); |
| syncCycle_.unlock(); |
| |
| if (trace_settings & TRACE_ENTRY_EXIT) |
| trace_printf("%s@%d" " - Exit - sequence number = " "%d" "\n", method_name, __LINE__, TmSeqAssigned[MyPNID]); |
| TRACE_EXIT; |
| return TmSeqAssigned[MyPNID]; |
| } |
| |
| |
| // Assigns a new TMLeader if given pnid is same as TmLeaderNid |
| // TmLeader is a logical node num. |
| // pnid has gone down, so if that node was previously the TM leader, a new one needs to be chosen. |
| void CCluster::AssignTmLeader(int pnid) |
| { |
| const char method_name[] = "CCluster::AssignTmLeader"; |
| TRACE_ENTRY; |
| |
| int i = 0; |
| CNode *node = NULL; |
| |
| int TmLeaderPNid = LNode[TmLeaderNid]->GetNode()->GetPNid(); |
| |
| if (TmLeaderPNid != pnid) |
| { |
| return; |
| } |
| |
| node = Node[TmLeaderPNid]; |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC)) |
| { |
| trace_printf("%s@%d" " - Node " "%d" " TmLeader failed." "\n", method_name, __LINE__, TmLeaderNid); |
| } |
| |
| for (i=0; i<cfgPNodes_; i++) |
| { |
| TmLeaderPNid++; |
| |
| if (TmLeaderPNid == cfgPNodes_) |
| { |
| TmLeaderPNid = 0; // restart with nid 0 |
| } |
| |
| if (TmLeaderPNid == pnid) |
| { |
| continue; // this is the node that is going down, skip it |
| } |
| |
| node = Node[TmLeaderPNid]; |
| |
| if ( (node->GetState() != State_Up) || node->IsSpareNode() || (node->GetPhase() != Phase_Ready) ) |
| { |
| continue; // skip this node for any of the above reasons |
| } |
| |
| TmLeaderNid = node->GetFirstLNode()->GetNid(); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC)) |
| { |
| trace_printf("%s@%d" " - Node " "%d" " is the new TmLeader." "\n", method_name, __LINE__, TmLeaderNid); |
| } |
| |
| break; |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| |
| CCluster::CCluster (void) |
| :socks_(NULL) |
| ,sockPorts_(NULL) |
| ,commSock_(-1) |
| ,syncSock_(-1) |
| ,epollFD_(-1) |
| ,NumRanks (-1), |
| NumNodes (0), |
| Node (NULL), |
| LNode (NULL), |
| TmSyncPNid (-1), |
| CurNodes (0), |
| CurProcs (0), |
| cfgPNodes_ (-1), |
| NodeMap (NULL), |
| LastTmSeqNum (-1), |
| TmLeaderNid (0), // nid 0 is the defacto leader. |
| tmReadyCount_(0), |
| TmSeqAssigned (NULL ), |
| minRecvCount_(4096), |
| recvBuffer_(NULL), |
| recvBuffer2_(NULL), |
| swpRecCount_(0), |
| barrierCount_(0), |
| allGatherCount_(0), |
| commDupCount_(0), |
| barrierCountSaved_(0), |
| allGatherCountSaved_(0), |
| commDupCountSaved_(0), |
| inBarrier_(false), |
| inAllGather_(false), |
| inCommDup_(false), |
| monInitComplete_(false), |
| monSyncResponsive_(true), |
| integratingPNid_(-1), |
| joinComm_(MPI_COMM_NULL), |
| joinSock_(-1), |
| seqNum_(0), |
| waitForWatchdogExit_(false) |
| ,checkSeqNum_(false) |
| ,validateNodeDown_(true) |
| ,enqueuedDown_(false) |
| ,nodeDownDeathNotices_(true) |
| ,verifierNum_(0) |
| { |
| int i; |
| char buffer[32]; |
| char fname[MAX_PROCESS_PATH]; |
| FILE *ini; |
| |
| const char method_name[] = "CCluster::CCluster"; |
| TRACE_ENTRY; |
| |
| MPI_Comm_set_errhandler(MPI_COMM_WORLD,MPI_ERRORS_RETURN); |
| |
| char *env = getenv("SQ_MON_CHECK_SEQNUM"); |
| if ( env ) |
| { |
| int val; |
| errno = 0; |
| val = strtol(env, NULL, 10); |
| if ( errno == 0) checkSeqNum_ = (val != 0); |
| } |
| |
| if (trace_settings & TRACE_INIT) |
| trace_printf("%s@%d Checking sync sequence numbers is %s\n", |
| method_name, __LINE__, |
| (checkSeqNum_ ? "enabled" : "disabled")); |
| |
| // Compute minimum "sync cycles" per second. The minimum is 1/10 |
| // the expected number, assuming "next_test_delay" cycles per second (where |
| // next_test_delay is in microseconds). |
| syncMinPerSec_ = 1000000 / next_test_delay / 10; |
| |
| agMaxElapsed_.tv_sec = 0; |
| agMaxElapsed_.tv_nsec = 0; |
| agMinElapsed_.tv_sec = 10000; |
| agMinElapsed_.tv_nsec = 0; |
| |
| tmSyncBuffer_ = Nodes->GetSyncBuffer(); |
| |
| // Allocate structures for monitor point-to-point communications |
| int cfgPNodes = Nodes->GetClusterConfig()->GetPNodesCount(); |
| comms_ = new MPI_Comm[cfgPNodes]; |
| otherMonRank_ = new int[cfgPNodes]; |
| socks_ = new int[cfgPNodes]; |
| sockPorts_ = new int[cfgPNodes]; |
| |
| for ( int i =0; i < MAX_NODE_MASKS ; i++ ) |
| { |
| upNodes_.upNodes[i] = 0; |
| } |
| |
| for (i=0; i<cfgPNodes; ++i) |
| { |
| comms_[i] = MPI_COMM_NULL; |
| socks_[i] = -1; |
| } |
| |
| env = getenv("SQ_MON_NODE_DOWN_VALIDATION"); |
| if ( env ) |
| { |
| int val; |
| errno = 0; |
| val = strtol(env, NULL, 10); |
| if ( errno == 0) validateNodeDown_ = (val != 0); |
| } |
| if ( cfgPNodes > MAX_NODES ) validateNodeDown_ = false; |
| |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), "[%s] Validation of node down is %s\n", |
| method_name, (validateNodeDown_ ? "enabled" : "disabled")); |
| mon_log_write(MON_CLUSTER_CLUSTER_1, SQ_LOG_INFO, buf); |
| |
| InitializeConfigCluster(); |
| |
| for (size_t j=0; j<(sizeof(agElapsed_)/sizeof(int)); ++j) |
| { |
| agElapsed_[j] = 0; |
| } |
| |
| char *p = getenv("MON_MIN_RECV_COUNT"); |
| if ( p ) |
| { |
| long int val = strtoul(p, NULL, 10); |
| if (errno != ERANGE) |
| { |
| minRecvCount_ = val; |
| } |
| } |
| |
| p = getenv("SQ_MON_NODE_DOWN_DEATH_MESSAGES"); |
| if ( p && atoi(p) == 0) |
| { |
| nodeDownDeathNotices_ = false; |
| } |
| |
| // build the node objects & Sync collision assignment arrays |
| // these buffers will be used in ShareWithPeers in AllGather |
| // operation to get TMSync data as well as Replication data. |
| recvBuffer_ = new struct sync_buffer_def[NumNodes]; |
| recvBuffer2_ = new struct sync_buffer_def[NumNodes]; |
| |
| TmSeqAssigned = new int [NumNodes]; |
| |
| |
| |
| // Read initialization file |
| snprintf(fname, sizeof(fname), "%s/monitor.ini",getenv("MPI_TMPDIR")); |
| ini = fopen(fname, "r"); |
| if( ini ) |
| { |
| for (i=0; i<=MyPNID; i++) |
| { |
| fgets(buffer,32,ini); |
| LastTmSeqNum = atol( &buffer[13] ); |
| } |
| fclose(ini); |
| } |
| if( LastTmSeqNum == -1 ) |
| { |
| LastTmSeqNum = 0; |
| |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), "[CCluster::CCluster], Error= No monitor.ini found, setting to defaults. \n"); |
| mon_log_write(MON_CLUSTER_CLUSTER_2, SQ_LOG_ERR, buf); |
| } |
| if (trace_settings & TRACE_INIT) |
| trace_printf("%s@%d" " - Initialized LastTmSeqNum=" "%d" "\n", method_name, __LINE__, LastTmSeqNum); |
| |
| TRACE_EXIT; |
| } |
| |
| CCluster::~CCluster (void) |
| { |
| int ini=-1; |
| long pos=0; |
| char buf[32]; |
| char fname[MAX_PROCESS_PATH]; |
| |
| const char method_name[] = "CCluster::~CCluster"; |
| TRACE_ENTRY; |
| |
| delete [] comms_; |
| delete [] otherMonRank_; |
| delete [] socks_; |
| delete [] sockPorts_; |
| if (NodeMap) |
| { |
| delete [] NodeMap; |
| NodeMap = NULL; |
| } |
| |
| delete [] TmSeqAssigned; |
| |
| |
| |
| // Read initialization file |
| snprintf(fname,sizeof(fname),"%s/monitor.ini",getenv("MPI_TMPDIR")); |
| ini = open(fname, O_WRONLY | O_SYNC | O_CREAT, S_IRUSR | S_IWUSR ); |
| if( ini != -1 ) |
| { |
| LOCKFILE(ini); |
| if (ret == -1) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), "[CCluster::~CCluster], Error= Can't lock monitor.ini file. \n"); |
| mon_log_write(MON_CLUSTER_UCLUSTER, SQ_LOG_ERR, buf); |
| } |
| pos = lseek(ini,(long)(24*MyPNID),SEEK_SET); |
| if( pos != -1 ) |
| { |
| snprintf (buf, sizeof(buf), "%3.3d:TmSeqNum:%10.10d\n", MyPNID, LastTmSeqNum); |
| write(ini,buf,strlen(buf)); |
| } |
| UNLOCKFILE(ini); |
| close(ini); |
| } |
| |
| delete [] recvBuffer2_; |
| delete [] recvBuffer_; |
| |
| TRACE_EXIT; |
| } |
| |
| int CCluster::incrGetVerifierNum() |
| { |
| verifierNum_++; |
| if ( verifierNum_ < 0 ) |
| { |
| verifierNum_ = 0; |
| } |
| |
| return verifierNum_; |
| } |
| |
| void CCluster::CoordinateTmSeqNumber( int pnid ) |
| { |
| const char method_name[] = "CCluster::CoordinateTmSeqNumber"; |
| TRACE_ENTRY; |
| |
| LastTmSeqNum++; |
| if( LastTmSeqNum >= MAX_SEQ_VALUE ) |
| { |
| LastTmSeqNum = 1; |
| } |
| TmSeqAssigned[pnid] = LastTmSeqNum; |
| |
| TRACE_EXIT; |
| } |
| |
| // For a reintegrated monitor node, following the first sync cycle, obtain the |
| // current sync cycle sequence number. And verify that all nodes agree |
| // on the sequence number. |
| unsigned long long CCluster::EnsureAndGetSeqNum(cluster_state_def_t nodestate[]) |
| { |
| const char method_name[] = "CCluster::EnsureAndGetSeqNum"; |
| TRACE_ENTRY; |
| |
| unsigned long long seqNum = 0; |
| |
| for (int i = 0; i < cfgPNodes_; i++) |
| { |
| if (trace_settings & TRACE_RECOVERY) |
| { |
| trace_printf("%s@%d nodestate[%d].seq_num=%lld, seqNum=%lld\n", method_name, __LINE__, i, nodestate[i].seq_num, seqNum ); |
| } |
| if (nodestate[i].seq_num > 0) |
| { |
| if (seqNum == 0) |
| { |
| seqNum = nodestate[i].seq_num; |
| } |
| else |
| { |
| assert(nodestate[i].seq_num == seqNum); |
| } |
| } |
| } |
| |
| TRACE_EXIT; |
| return seqNum; |
| } |
| |
| |
| void CCluster::MarkDown (int pnid, bool communicate_state) |
| { |
| char port_fname[MAX_PROCESS_PATH]; |
| char temp_fname[MAX_PROCESS_PATH]; |
| CNode *node; |
| CLNode *lnode; |
| char buf[MON_STRING_BUF_SIZE]; |
| |
| const char method_name[] = "CCluster::MarkDown"; |
| TRACE_ENTRY; |
| |
| node = Nodes->GetNode(pnid); |
| |
| if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf( "%s@%d - pnid=%d, comm_state=%d, state=%s, isInQuiesceState=%d," |
| " (local pnid=%d, state=%s, isInQuiesceState=%d, " |
| "shutdown level=%d)\n", method_name, __LINE__, |
| pnid, communicate_state, StateString(node->GetState()), |
| node->isInQuiesceState(), |
| MyPNID, StateString(MyNode->GetState()), |
| MyNode->isInQuiesceState(), MyNode->GetShutdownLevel() ); |
| |
| if (( MyPNID == pnid ) && |
| ( MyNode->GetState() == State_Down || |
| MyNode->IsKillingNode() ) ) |
| { |
| // we are coming down ... don't process it |
| if ( !IsRealCluster && MyNode->isInQuiesceState()) |
| { |
| // in virtual env, this would be called after node quiescing, |
| // so continue with mark down processing. |
| } |
| else |
| { |
| return; |
| } |
| } |
| |
| if ( (MyNode->GetShutdownLevel() != ShutdownLevel_Undefined) && |
| (pnid != MyPNID) ) // some other node went down while shutdown was in progress |
| { |
| snprintf(buf, sizeof(buf), "[%s], Node failure during shutdown, down nid = %d\n", method_name, pnid); |
| mon_log_write(MON_CLUSTER_MARKDOWN_1, SQ_LOG_ERR, buf); |
| |
| if (!waitForWatchdogExit_) // if WDT is not exiting |
| { |
| // bring down this node because TSE backup processes may not exit |
| // if the primary was on the node that went down. |
| ReqQueue.enqueueDownReq(MyPNID); |
| } |
| } |
| |
| if ( communicate_state && pnid != MyPNID ) |
| { |
| // just communicate the change and let the real node handle it. |
| node->SetChangeState( true ); |
| return; |
| } |
| |
| if ( !Emulate_Down ) |
| { |
| if( !IsRealCluster ) |
| { |
| snprintf(port_fname, sizeof(port_fname), "%s/monitor.%d.port.%s",getenv("MPI_TMPDIR"),pnid,node->GetName()); |
| } |
| else |
| { |
| snprintf(port_fname, sizeof(port_fname), "%s/monitor.port.%s",getenv("MPI_TMPDIR"),node->GetName()); |
| } |
| sprintf(temp_fname, "%s.bak", port_fname); |
| remove(temp_fname); |
| rename(port_fname, temp_fname); |
| |
| } |
| |
| if (node->GetState() != State_Down || node->isInQuiesceState()) |
| { |
| snprintf(buf, sizeof(buf), |
| "[CCluster::MarkDown], Node %s (%d) is going down.\n", |
| node->GetName(), node->GetPNid()); |
| mon_log_write(MON_CLUSTER_MARKDOWN_2, SQ_LOG_ERR, buf); |
| |
| node->SetKillingNode( true ); |
| |
| if ( MyPNID == pnid && |
| (MyNode->GetState() == State_Up || MyNode->GetState() == State_Shutdown) && |
| !MyNode->isInQuiesceState() ) |
| { |
| STATE state = MyNode->GetState(); |
| switch ( state ) |
| { |
| case State_Up: |
| case State_Shutdown: |
| // do node quiescing and let HealthCheck thread know that quiescing has started |
| // setting internal state to 'quiesce' will prevent replicating process exits |
| // and reject normal shutdown requests in all nodes while we are quiescing. |
| if (!waitForWatchdogExit_) // if WDT is not exiting |
| { |
| MyNode->setQuiesceState(); |
| HealthCheck.setState(MON_NODE_QUIESCE); |
| } |
| break; |
| default: // in all other states |
| if ( ! Emulate_Down ) |
| { |
| // make sure no processes are alive if in the middle of re-integration |
| node->KillAllDown(); |
| snprintf(buf, sizeof(buf), |
| "[CCluster::MarkDown], Node %s (%d)is down.\n", |
| node->GetName(), node->GetPNid()); |
| mon_log_write(MON_CLUSTER_MARKDOWN_3, SQ_LOG_ERR, buf); |
| // Don't generate a core file, abort is intentional |
| struct rlimit limit; |
| limit.rlim_cur = 0; |
| limit.rlim_max = 0; |
| setrlimit(RLIMIT_CORE, &limit); |
| MPI_Abort(MPI_COMM_SELF,99); |
| } |
| } |
| } |
| else |
| { |
| if ( node->GetPNid() == integratingPNid_ ) |
| { |
| switch( CommType ) |
| { |
| case CommType_InfiniBand: |
| if ( joinComm_ != MPI_COMM_NULL ) |
| { |
| MPI_Comm_free( &joinComm_ ); |
| joinComm_ = MPI_COMM_NULL; |
| } |
| break; |
| case CommType_Sockets: |
| if ( joinSock_ != -1 ) |
| { |
| close(joinSock_); |
| joinSock_ = -1; |
| } |
| break; |
| default: |
| // Programmer bonehead! |
| abort(); |
| } |
| integratingPNid_ = -1; |
| if ( MyNode->IsCreator() ) |
| { |
| MyNode->SetCreator( false, -1, -1 ); |
| } |
| } |
| node->KillAllDown(); |
| node->SetState( State_Down ); |
| lnode = node->GetFirstLNode(); |
| for ( ; lnode; lnode = lnode->GetNext() ) |
| { |
| lnode->Down(); |
| } |
| } |
| } |
| |
| // we need to abort any active TmSync |
| if (( MyNode->GetTmSyncState() == SyncState_Start ) || |
| ( MyNode->GetTmSyncState() == SyncState_Continue ) || |
| ( MyNode->GetTmSyncState() == SyncState_Commit ) ) |
| { |
| MyNode->SetTmSyncState( SyncState_Abort ); |
| Monitor->SetAbortPendingTmSync(); |
| if (trace_settings & (TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC)) |
| trace_printf("%s@%d - Node %s (pnid=%d) TmSyncState updated (%d)(%s)\n", method_name, __LINE__, MyNode->GetName(), MyPNID, MyNode->GetTmSyncState(), SyncStateString( MyNode->GetTmSyncState() )); |
| } |
| |
| if ( Emulate_Down ) |
| { |
| IAmIntegrated = false; |
| AssignTmLeader(pnid); |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| bool CCluster::CheckSpareSet( int pnid ) |
| { |
| bool activatedSpare = false; |
| bool done = false; |
| unsigned int ii; |
| unsigned int jj; |
| CNode *newNode = Nodes->GetNode( pnid ); |
| |
| const char method_name[] = "CCluster::CheckSpareSet"; |
| TRACE_ENTRY; |
| |
| // Build spare node set |
| CNode *spareNode; |
| NodesList spareSetList; |
| NodesList *spareNodesConfigList = Nodes->GetSpareNodesConfigList(); |
| NodesList::iterator itSn; |
| for ( itSn = spareNodesConfigList->begin(); |
| itSn != spareNodesConfigList->end() && !done ; itSn++ ) |
| { |
| spareNode = *itSn; |
| PNidVector sparePNids = spareNode->GetSparePNids(); |
| // if the new node is a spare node in the configuration |
| if ( newNode->GetPNid() == spareNode->GetPNid() ) |
| { |
| // Add the spare node and each node it is configured to spare to the set |
| spareSetList.push_back( spareNode ); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf("%s@%d - pnid=%d, name=(%s) is a configured Spare\n", method_name, __LINE__, spareNode->GetPNid(), spareNode->GetName()); |
| |
| for ( ii = 0; ii < sparePNids.size(); ii++ ) |
| { |
| spareSetList.push_back( Nodes->GetNode(sparePNids[ii]) ); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf("%s@%d - pnid=%d, name=(%s) is in Spare set\n", method_name, __LINE__, Nodes->GetNode(sparePNids[ii])->GetPNid(), Nodes->GetNode(sparePNids[ii])->GetName()); |
| } |
| done = true; |
| } |
| else |
| { |
| // Check each pnid it is configured to spare |
| for ( jj = 0; jj < sparePNids.size(); jj++ ) |
| { |
| // if the new node is in the spare set of a spare node |
| if ( newNode->GetPNid() == sparePNids[jj] ) |
| { |
| // Add the spare node and each node it is configured to spare to the set |
| spareSetList.push_back( spareNode ); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf("%s@%d - pnid=%d, name=(%s) is a configured Spare\n", method_name, __LINE__, spareNode->GetPNid(), spareNode->GetName()); |
| |
| for ( ii = 0; ii < sparePNids.size(); ii++ ) |
| { |
| spareSetList.push_back( Nodes->GetNode(sparePNids[ii]) ); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf("%s@%d - pnid=%d, name=(%s) is in Spare set\n", method_name, __LINE__, Nodes->GetNode(sparePNids[ii])->GetPNid(), Nodes->GetNode(sparePNids[ii])->GetName()); |
| } |
| done = true; |
| } |
| } |
| } |
| } |
| |
| if (newNode && trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d - new node pnid=%d, name=(%s), zid=%d\n" |
| , method_name, __LINE__ |
| , newNode->GetPNid(), newNode->GetName(), newNode->GetZone()); |
| } |
| |
| // if the newNode still owns the zone |
| if ( newNode && newNode->GetZone() != -1 ) |
| { |
| // assume implicit spare node activation |
| // (no need to move logical nodes to physical node) |
| // since MarkUp() already set State_Up, |
| // just reset spare node flag and remove from available spare nodes |
| newNode->ResetSpareNode(); |
| Nodes->RemoveFromSpareNodesList( newNode ); |
| ActivateSpare( newNode, newNode ); |
| activatedSpare = true; |
| TRACE_EXIT; |
| return( activatedSpare ); |
| } |
| |
| CLNode *lnode; |
| CNode *node; |
| CNode *downNode = NULL; |
| |
| // Now check the state of each configured logical node in the set for down state |
| spareNode = newNode; // new node (pnid) is the spare to activate |
| NodesList::iterator itSs; |
| for ( itSs = spareSetList.begin(); itSs != spareSetList.end(); itSs++ ) |
| { |
| node = *itSs; |
| if ( node->GetPNid() != pnid ) |
| { |
| // Find the first down node |
| if ( !downNode ) |
| { |
| lnode = node->GetFirstLNode(); |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf( "%s@%d - node nid=%d, pnid=%d(%s), state=%s\n" |
| , method_name, __LINE__, lnode->GetNid() |
| , node->GetPNid(), node->GetName() |
| , StateString( node->GetState() ) ); |
| if ( lnode && lnode->GetState() == State_Down ) |
| { |
| downNode = node; |
| } |
| } |
| } |
| if ( spareNode && downNode ) |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf( "%s@%d - spare node pnid=%d (%s), down node pnid=%d (%s) \n" |
| , method_name, __LINE__ |
| , spareNode->GetPNid(), spareNode->GetName() |
| , downNode->GetPNid(), downNode->GetName()); |
| break; |
| } |
| } |
| |
| if ( spareNode && downNode ) |
| { |
| Nodes->RemoveFromSpareNodesList( spareNode ); |
| spareNode->ResetSpareNode(); |
| if ( downNode->GetPNid() != pnid ) |
| { // the spare node does not own the down logical nodes so activate it |
| ActivateSpare( spareNode, downNode ); |
| } |
| activatedSpare = true; |
| } |
| |
| TRACE_EXIT; |
| return( activatedSpare ); |
| } |
| |
| const char *JoiningPhaseString( JOINING_PHASE phase ) |
| { |
| const char *str; |
| |
| switch( phase ) |
| { |
| case JoiningPhase_Unknown: |
| str = "JoiningPhase_Unknown"; |
| break; |
| case JoiningPhase_1: |
| str = "JoiningPhase_1"; |
| break; |
| case JoiningPhase_2: |
| str = "JoiningPhase_2"; |
| break; |
| case JoiningPhase_3: |
| str = "JoiningPhase_3"; |
| break; |
| default: |
| str = "JoiningPhase - Undefined"; |
| break; |
| } |
| |
| return( str ); |
| } |
| |
| struct message_def *CCluster::JoinMessage( const char *node_name, int pnid, JOINING_PHASE phase ) |
| { |
| struct message_def *msg; |
| |
| const char method_name[] = "CCluster::JoinMessage"; |
| TRACE_ENTRY; |
| |
| // Record statistics (sonar counters) |
| if (sonar_verify_state(SONAR_ENABLED | SONAR_MONITOR_ENABLED)) |
| MonStats->notice_death_Incr(); |
| |
| msg = new struct message_def; |
| msg->type = MsgType_NodeJoining; |
| msg->noreply = true; |
| msg->u.request.type = ReqType_Notice; |
| strcpy( msg->u.request.u.joining.node_name, node_name ); |
| msg->u.request.u.joining.pnid = pnid; |
| msg->u.request.u.joining.phase = phase; |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST_DETAIL)) |
| trace_printf("%s@%d - Joining notice for node %s (pnid=%d, phase=%d)\n", |
| method_name, __LINE__, node_name, pnid, phase ); |
| TRACE_EXIT; |
| |
| return msg; |
| } |
| |
| struct message_def *CCluster::SpareUpMessage( const char *node_name, int pnid ) |
| { |
| struct message_def *msg; |
| |
| const char method_name[] = "CCluster::SpareUpMessage"; |
| TRACE_ENTRY; |
| |
| // Record statistics (sonar counters) |
| if (sonar_verify_state(SONAR_ENABLED | SONAR_MONITOR_ENABLED)) |
| MonStats->notice_death_Incr(); |
| |
| msg = new struct message_def; |
| msg->type = MsgType_SpareUp; |
| msg->noreply = true; |
| msg->u.request.type = ReqType_Notice; |
| strcpy( msg->u.request.u.spare_up.node_name, node_name ); |
| msg->u.request.u.spare_up.pnid = pnid; |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST_DETAIL)) |
| trace_printf("%s@%d - Spare node up notice for node %s nid=%d\n", |
| method_name, __LINE__, node_name, pnid ); |
| TRACE_EXIT; |
| |
| return msg; |
| } |
| |
| struct message_def *CCluster::ReIntegErrorMessage( const char *msgText ) |
| { |
| struct message_def *msg; |
| |
| const char method_name[] = "CCluster::ReIntegErrorMessage"; |
| TRACE_ENTRY; |
| |
| msg = new struct message_def; |
| msg->type = MsgType_ReintegrationError; |
| msg->noreply = true; |
| msg->u.request.type = ReqType_Notice; |
| strncpy( msg->u.request.u.reintegrate.msg, msgText, |
| sizeof(msg->u.request.u.reintegrate.msg) ); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST_DETAIL)) |
| trace_printf("%s@%d - Reintegrate notice %s\n", |
| method_name, __LINE__, msgText ); |
| |
| TRACE_EXIT; |
| |
| return msg; |
| } |
| |
| int CCluster::MarkUp( int pnid, char *node_name ) |
| { |
| bool spareNodeActivated = false; |
| int rc = MPI_SUCCESS; |
| int tmCount = 0; |
| CNode *node; |
| CLNode *lnode; |
| STATE nodeState; |
| |
| const char method_name[] = "CCluster::MarkUp"; |
| TRACE_ENTRY; |
| |
| if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf( "%s@%d - pnid=%d, name=%s\n" |
| , method_name, __LINE__, pnid, node_name ); |
| |
| if ( pnid == -1 ) |
| { |
| node = Nodes->GetNode( node_name ); |
| } |
| else |
| { |
| node = Nodes->GetNode( pnid ); |
| } |
| |
| if ( node == NULL ) |
| { |
| if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf( "%s@%d" " - Invalid node, pnid=%d, name=%s" "\n" |
| , method_name, __LINE__, pnid, node_name ); |
| |
| return( MPI_ERR_NAME ); |
| } |
| |
| nodeState = node->GetState(); |
| |
| if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf( "%s@%d" " - Node state=%s" "\n" |
| , method_name, __LINE__, StateString( nodeState ) ); |
| if ( nodeState != State_Up ) |
| { |
| if ( nodeState == State_Down ) |
| { |
| node->SetKillingNode( false ); |
| if ( Emulate_Down ) |
| { |
| // Any DTMs running? |
| for ( int i=0; !tmCount && i < Nodes->GetNodesCount(); i++ ) |
| { |
| CNode *tempNode = Nodes->GetNode( i ); |
| lnode = tempNode->GetFirstLNode(); |
| for ( ; lnode; lnode = lnode->GetNext() ) |
| { |
| CProcess *process = lnode->GetProcessLByType( ProcessType_DTM ); |
| if ( process ) tmCount++; |
| } |
| } |
| if ( tmCount ) |
| { |
| IAmIntegrated = true; |
| } |
| // We need to remove any old process objects before we restart the node. |
| node->CleanUpProcesses(); |
| node->SetState( State_Up ); |
| if ( MyPNID == pnid ) |
| { |
| MyNode->clearQuiesceState(); |
| HealthCheck.initializeVars(); |
| SMSIntegrating = true; |
| Monitor->StartPrimitiveProcesses(); |
| // Let other monitors know this node is up |
| CReplNodeUp *repl = new CReplNodeUp(MyPNID); |
| Replicator.addItem(repl); |
| } |
| else |
| { |
| if ( tmCount ) |
| { |
| // Send node prepare notice to local DTM processes |
| lnode = node->GetFirstLNode(); |
| for ( ; lnode; lnode = lnode->GetNext() ) |
| { |
| lnode->PrepareForTransactions( true ); |
| } |
| } |
| else |
| { |
| // Process logical node up |
| lnode = node->GetFirstLNode(); |
| for ( ; lnode; lnode = lnode->GetNext() ) |
| { |
| lnode->Up(); |
| } |
| } |
| } |
| } |
| else |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf( "%s@%d - Unexpectedly executing MarkUp. Expecting to do accept in commAccept thread\n", |
| method_name, __LINE__ ); |
| |
| } |
| } |
| else if ( nodeState == State_Merged ) |
| { |
| node->SetKillingNode( false ); |
| node->SetState( State_Joining ); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d" " - New monitor %s, pnid=%d, state=%s" "\n" |
| , method_name, __LINE__, node->GetName(), node->GetPNid(), StateString( node->GetState() ) ); |
| for ( int i =0; i < cfgPNodes_; i++ ) |
| { |
| trace_printf( "%s@%d socks_[%d]=%d\n" |
| , method_name, __LINE__ |
| , i, socks_[i]); |
| } |
| } |
| if ( MyNode->IsCreator() ) |
| { |
| SQ_theLocalIOToClient->putOnNoticeQueue( MyNode->GetCreatorPid() |
| , MyNode->GetCreatorVerifier() |
| , JoinMessage( node->GetName() |
| , node->GetPNid() |
| , JoiningPhase_1 ) |
| , NULL); |
| |
| // save the current seq num in the snapshot request. |
| // this sequence number will match the state of the cluster |
| // when this request is processed. |
| ReqQueue.enqueueSnapshotReq(seqNum_); |
| } |
| if ( MyPNID == pnid ) |
| { |
| // request and process revive packet from the creator. |
| // when complete, this will call MarkUp again. |
| ReqQueue.enqueueReviveReq( ); |
| } |
| } |
| else if ( nodeState == State_Joining ) |
| { |
| // The new monitor comes in here first and schedules a node up request on all nodes. |
| // All other monitors come here next, including the creator. |
| // The new monitor will not come here again because |
| // CReplNodeUp is a noop for the one who schedules it. |
| node->SetState( State_Up ); |
| |
| if ( Nodes->GetSNodesCount() == 0 ) |
| { // Spare nodes not configured so bring up my logical nodes |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf( "%s@%d" " - No spare nodes configured node=%s, pnid=%d, state=%s\n" |
| , method_name, __LINE__, node->GetName(), node->GetPNid() |
| , StateString(node->GetState()) ); |
| if ( MyPNID == pnid ) |
| { |
| ActivateSpare( node, node ); |
| } |
| } |
| else |
| { |
| node->SetSpareNode(); |
| Nodes->AddToSpareNodesList( node->GetPNid() ); |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf( "%s@%d" " - Adding to available spares node=%s, pnid=%d\n" |
| , method_name, __LINE__, node->GetName(), node->GetPNid() ); |
| // Check for a node down in spare set and activate down node if found |
| spareNodeActivated = CheckSpareSet( node->GetPNid() ); |
| if ( spareNodeActivated ) |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf( "%s@%d" " - Activated spare node=%s, pnid=%d\n" |
| , method_name, __LINE__, node->GetName(), node->GetPNid() ); |
| } |
| else |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf( "%s@%d" " - Available spare node=%s, pnid=%d\n" |
| , method_name, __LINE__, node->GetName(), node->GetPNid() ); |
| |
| // Spare node not activated |
| if ( MyNode->IsCreator() ) |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf( "%s@%d" " - Sending spare up notice to creator shell(%d) spare node=%s, pnid=%d\n" |
| , method_name, __LINE__, MyNode->GetCreatorPid(), node->GetName(), node->GetPNid() ); |
| // Tell creator spare node is up |
| SQ_theLocalIOToClient->putOnNoticeQueue( MyNode->GetCreatorPid() |
| , MyNode->GetCreatorVerifier() |
| , SpareUpMessage( node->GetName() |
| , node->GetPNid() ) |
| , NULL); |
| } |
| } |
| } |
| |
| if ( MyPNID == pnid ) |
| { |
| // Any DTMs running? |
| for ( int i=0; !tmCount && i < Nodes->GetNodesCount(); i++ ) |
| { |
| CNode *tempNode = Nodes->GetNode( i ); |
| lnode = tempNode->GetFirstLNode(); |
| for ( ; lnode; lnode = lnode->GetNext() ) |
| { |
| CProcess *process = lnode->GetProcessLByType( ProcessType_DTM ); |
| if ( process ) tmCount++; |
| } |
| } |
| if ( !tmCount && !spareNodeActivated ) |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf( "%s@%d" " - Replicating node up %s, pnid=%d, state=%s, spare=%d, DTM count=%d\n" |
| , method_name, __LINE__, node->GetName(), node->GetPNid() |
| , StateString(node->GetState()), node->IsSpareNode(), tmCount ); |
| // Let other monitors know this node is up |
| CReplNodeUp *repl = new CReplNodeUp(MyPNID); |
| Replicator.addItem(repl); |
| } |
| } |
| |
| switch( CommType ) |
| { |
| case CommType_InfiniBand: |
| if ( joinComm_ != MPI_COMM_NULL ) |
| { |
| MPI_Comm_free( &joinComm_ ); |
| joinComm_ = MPI_COMM_NULL; |
| } |
| break; |
| case CommType_Sockets: |
| if ( joinSock_ != -1 ) |
| { |
| close(joinSock_); |
| joinSock_ = -1; |
| } |
| break; |
| default: |
| // Programmer bonehead! |
| abort(); |
| } |
| integratingPNid_ = -1; |
| if ( MyNode->IsCreator() ) |
| { |
| MyNode->SetCreator( false, -1, -1 ); |
| } |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf( "%s@%d" " - New monitor %s, pnid=%d, state=%s, spare=%d\n" |
| , method_name, __LINE__, node->GetName(), node->GetPNid() |
| , StateString(node->GetState()), node->IsSpareNode() ); |
| } |
| } |
| |
| TRACE_EXIT; |
| return( rc ); |
| } |
| |
| |
| |
| |
| const char *StateString( STATE state) |
| { |
| const char *str; |
| |
| switch( state ) |
| { |
| case State_Unknown: |
| str = "State_Unknown"; |
| break; |
| case State_Up: |
| str = "State_Up"; |
| break; |
| case State_Down: |
| str = "State_Down"; |
| break; |
| case State_Stopped: |
| str = "State_Stopped"; |
| break; |
| case State_Shutdown: |
| str = "State_Shutdown"; |
| break; |
| case State_Unlinked: |
| str = "State_Unlinked"; |
| break; |
| case State_Merging: |
| str = "State_Merging"; |
| break; |
| case State_Merged: |
| str = "State_Merged"; |
| break; |
| case State_Joining: |
| str = "State_Joining"; |
| break; |
| case State_Initializing: |
| str = "State_Initializing"; |
| break; |
| default: |
| str = "State - Undefined"; |
| break; |
| } |
| |
| return( str ); |
| } |
| |
| const char *SyncStateString( SyncState state) |
| { |
| const char *str; |
| |
| switch( state ) |
| { |
| case SyncState_Null: |
| str = "SyncState_Null"; |
| break; |
| case SyncState_Start: |
| str = "SyncState_Start"; |
| break; |
| case SyncState_Continue: |
| str = "SyncState_Continue"; |
| break; |
| case SyncState_Abort: |
| str = "SyncState_Abort"; |
| break; |
| case SyncState_Commit: |
| str = "SyncState_Commit"; |
| break; |
| case SyncState_Suspended: |
| str = "SyncState_Suspended"; |
| break; |
| default: |
| str = "SyncState - Undefined"; |
| break; |
| } |
| |
| return( str ); |
| } |
| |
| |
| void CCluster::AddTmsyncMsg (struct sync_def *sync, |
| struct internal_msg_def *msg) |
| { |
| const char method_name[] = "CCluster::AddTmsyncMsg"; |
| TRACE_ENTRY; |
| |
| if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) |
| trace_printf("%s@%d - Requesting SyncType=%d\n", method_name, |
| __LINE__, sync->type); |
| |
| msg->type = InternalType_Sync; |
| msg->u.sync.type = sync->type; |
| msg->u.sync.pnid = sync->pnid; |
| msg->u.sync.syncnid = sync->syncnid; |
| msg->u.sync.tmleader = sync->tmleader; |
| msg->u.sync.state = sync->state; |
| msg->u.sync.count = sync->count; |
| if ( sync->type == SyncType_TmData ) |
| { |
| memmove (msg->u.sync.data, sync->data, sync->length); |
| } |
| msg->u.sync.length = sync->length; |
| |
| // We can have only have a single "InternalType_Sync" msg in our |
| // SyncBuffer, else we cause a collision. |
| |
| int msgSize = (MSG_HDR_SIZE + sizeof(sync_def) - MAX_SYNC_DATA |
| + sync->length ); |
| |
| // Insert the message size into the message header |
| msg->replSize = msgSize; |
| tmSyncBuffer_->msgInfo.msg_count = 1; |
| tmSyncBuffer_->msgInfo.msg_offset += msgSize; |
| |
| // Set end-of-buffer marker |
| msg = (struct internal_msg_def *) |
| &tmSyncBuffer_->msg[tmSyncBuffer_->msgInfo.msg_offset]; |
| msg->type = InternalType_Null; |
| |
| TRACE_EXIT; |
| } |
| |
| |
| void CCluster::DoDeviceReq(char * ldevName) |
| { |
| const char method_name[] = "CCluster::DoDeviceReq"; |
| TRACE_ENTRY; |
| |
| CProcess *process; |
| |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS)) |
| trace_printf("%s@%d - Internal device request for ldev %s\n", |
| method_name, __LINE__, ldevName); |
| Nodes->GetLNode(ldevName, &process); |
| if (!process) |
| { |
| if (trace_settings & TRACE_SYNC) |
| trace_printf("%s@%d - Device processing but can't find device %s\n", |
| method_name, __LINE__, ldevName); |
| } |
| else |
| { |
| CLogicalDevice *ldev; |
| ldev = Devices->GetLogicalDevice( ldevName ); |
| if ( !ldev ) |
| { // The device name is not known on this node |
| // we need to clone the device |
| ldev = Devices->CloneDevice( process ); |
| } |
| if ( ldev ) |
| { |
| bool rstate = false; |
| if ( ldev->Mounted() ) |
| { |
| rstate = ldev->UnMount( false ); |
| if (!rstate) |
| { |
| if (trace_settings & TRACE_REQUEST) |
| trace_printf("%s@%d - Can't unmount device %s for " |
| "process %s (%d, %d)\n", method_name, |
| __LINE__, ldev->name(), process->GetName(), |
| process->GetNid(), process->GetPid()); |
| } |
| } |
| if ( rstate ) |
| { |
| rstate = ldev->Mount( process, false ); |
| if (!rstate) |
| { |
| if (trace_settings & TRACE_REQUEST) |
| trace_printf("%s@%d - Can't mount device %s for " |
| "process %s (%d, %d)\n", method_name, |
| __LINE__, ldev->name(), process->GetName(), |
| process->GetNid(), process->GetPid()); |
| } |
| else |
| { |
| if (trace_settings & TRACE_REQUEST) |
| trace_printf("%s@%d - Mounted device %s for process " |
| "%s (%d, %d)\n", method_name, __LINE__, |
| ldev->name(), process->GetName(), |
| process->GetNid(), process->GetPid()); |
| } |
| } |
| } |
| else |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), "[%s], Can't find ldev %s.\n", method_name, |
| ldevName); |
| mon_log_write(MON_CLUSTER_DODEVICEREQ_1, SQ_LOG_ERR, buf); |
| } |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| void CCluster::SaveSchedData( struct internal_msg_def *recv_msg ) |
| { |
| const char method_name[] = "CCluster::SaveSchedData"; |
| TRACE_ENTRY; |
| |
| int nid = recv_msg->u.scheddata.PNid; |
| Node[nid]->SetNumCores( recv_msg->u.scheddata.processors ); |
| Node[nid]->SetFreeMemory( recv_msg->u.scheddata.memory_free ); |
| Node[nid]->SetFreeSwap( recv_msg->u.scheddata.swap_free ); |
| Node[nid]->SetFreeCache( recv_msg->u.scheddata.cache_free ); |
| Node[nid]->SetMemTotal( recv_msg->u.scheddata.memory_total ); |
| Node[nid]->SetMemActive( recv_msg->u.scheddata.memory_active ); |
| Node[nid]->SetMemInactive( recv_msg->u.scheddata.memory_inactive ); |
| Node[nid]->SetMemDirty( recv_msg->u.scheddata.memory_dirty ); |
| Node[nid]->SetMemWriteback( recv_msg->u.scheddata.memory_writeback ); |
| Node[nid]->SetMemVMallocUsed( recv_msg->u.scheddata.memory_VMallocUsed ); |
| Node[nid]->SetBTime( recv_msg->u.scheddata.btime ); |
| |
| CLNode *lnode; |
| lnode = Node[nid]->GetFirstLNode(); |
| int i = 0; |
| |
| for ( ; lnode; lnode = lnode->GetNext() ) |
| { |
| lnode->SetCpuUser(recv_msg->u.scheddata.proc_stats[i].cpu_user); |
| lnode->SetCpuNice(recv_msg->u.scheddata.proc_stats[i].cpu_nice); |
| lnode->SetCpuSystem(recv_msg->u.scheddata.proc_stats[i].cpu_system); |
| lnode->SetCpuIdle(recv_msg->u.scheddata.proc_stats[i].cpu_idle); |
| lnode->SetCpuIowait(recv_msg->u.scheddata.proc_stats[i].cpu_iowait); |
| lnode->SetCpuIrq(recv_msg->u.scheddata.proc_stats[i].cpu_irq); |
| lnode->SetCpuSoftIrq(recv_msg->u.scheddata.proc_stats[i].cpu_soft_irq); |
| |
| ++i; |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| void CCluster::HandleOtherNodeMsg (struct internal_msg_def *recv_msg, |
| int pnid) |
| { |
| const char method_name[] = "CCluster::HandleOtherNodeMsg"; |
| TRACE_ENTRY; |
| |
| CNode *downNode; |
| CNode *spareNode; |
| CProcess *process; |
| CLNode *lnode; |
| |
| switch (recv_msg->type) |
| { |
| case InternalType_Null: |
| if (trace_settings & TRACE_SYNC_DETAIL) |
| trace_printf("%s@%d - Node n%d has nothing to " |
| "update. \n", method_name, __LINE__, pnid); |
| break; |
| |
| case InternalType_ActivateSpare: |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS)) |
| trace_printf("%s@%d - Internal activate spare request, spare pnid=%d, down pnid=%d\n" |
| , method_name, __LINE__ |
| , recv_msg->u.activate_spare.spare_pnid |
| , recv_msg->u.activate_spare.down_pnid); |
| |
| downNode = NULL; |
| if ( recv_msg->u.activate_spare.down_pnid != -1 ) |
| { |
| downNode = Nodes->GetNode( recv_msg->u.activate_spare.down_pnid ); |
| } |
| spareNode = Nodes->GetNode( recv_msg->u.activate_spare.spare_pnid ); |
| ReqQueue.enqueueActivateSpareReq( spareNode, downNode ); |
| break; |
| |
| case InternalType_Clone: |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS)) |
| trace_printf("%s@%d - Internal clone request, process (%d, %d)" |
| " %s\n", method_name, __LINE__, |
| recv_msg->u.clone.nid, recv_msg->u.clone.os_pid, |
| (recv_msg->u.clone.backup?" Backup":"")); |
| |
| ReqQueue.enqueueCloneReq( &recv_msg->u.clone ); |
| break; |
| |
| case InternalType_Device: |
| ReqQueue.enqueueDeviceReq(recv_msg->u.device.ldev_name); |
| break; |
| |
| case InternalType_Shutdown: |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS)) |
| trace_printf("%s@%d - Internal shutdown request for level=%d\n", method_name, __LINE__, recv_msg->u.shutdown.level); |
| |
| // Queue the shutdown request for processing by a worker thread. |
| ReqQueue.enqueueShutdownReq( recv_msg->u.shutdown.level ); |
| break; |
| |
| case InternalType_Down: |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS)) |
| trace_printf("%s@%d - Internal down node request for pnid=%d\n", method_name, __LINE__, recv_msg->u.down.pnid); |
| |
| // Queue the node down request for processing by a worker thread. |
| ReqQueue.enqueueDownReq( recv_msg->u.down.pnid ); |
| break; |
| |
| case InternalType_Up: |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS)) |
| trace_printf("%s@%d - Internal up node request for pnid=%d\n", method_name, __LINE__, recv_msg->u.up.pnid); |
| |
| // Queue the node up request for processing by a worker thread. |
| ReqQueue.enqueueUpReq( recv_msg->u.up.pnid, NULL, -1 ); |
| break; |
| |
| case InternalType_Dump: |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS)) |
| trace_printf("%s@%d - Internal dump request for nid=%d, pid=%d\n", |
| method_name, __LINE__, |
| recv_msg->u.dump.nid, recv_msg->u.dump.pid); |
| lnode = Nodes->GetLNode( recv_msg->u.dump.nid ); |
| if ( lnode ) |
| { |
| process = lnode->GetProcessL(recv_msg->u.dump.pid); |
| |
| if (process) |
| { |
| int verifier = recv_msg->u.dump.verifier; |
| if ( (verifier == -1) || (verifier == process->GetVerifier()) ) |
| { |
| process->DumpBegin(recv_msg->u.dump.dumper_nid, |
| recv_msg->u.dump.dumper_pid, |
| recv_msg->u.dump.dumper_verifier, |
| recv_msg->u.dump.core_file); |
| } |
| else |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), "[%s], Can't find process nid=%d, " |
| "pid=%d, verifier=%d for dump target.\n", method_name, |
| recv_msg->u.dump.nid, recv_msg->u.dump.pid, |
| recv_msg->u.dump.verifier); |
| mon_log_write(MON_CLUSTER_HANDLEOTHERNODE_1, SQ_LOG_ERR, buf); |
| } |
| } |
| else |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), "[%s], Can't find process nid=%d, " |
| "pid=%d for dump target.\n", method_name, |
| recv_msg->u.dump.nid, recv_msg->u.dump.pid); |
| mon_log_write(MON_CLUSTER_HANDLEOTHERNODE_2, SQ_LOG_ERR, buf); |
| } |
| } |
| |
| break; |
| |
| case InternalType_DumpComplete: |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS)) |
| trace_printf("%s@%d - Internal dump-complete request for nid=%d, pid=%d\n", |
| method_name, __LINE__, |
| recv_msg->u.dump.nid, recv_msg->u.dump.pid); |
| lnode = Nodes->GetLNode( recv_msg->u.dump.nid ); |
| if ( lnode ) |
| { |
| process = lnode->GetProcessL(recv_msg->u.dump.pid); |
| |
| if (process) |
| { |
| int verifier = recv_msg->u.dump.verifier; |
| if ( (verifier == -1) || (verifier == process->GetVerifier()) ) |
| { |
| process->DumpEnd(recv_msg->u.dump.status, recv_msg->u.dump.core_file); |
| } |
| else |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), "[%s], Can't find process nid=%d, " |
| "pid=%d, verifier=%d for dump target.\n", method_name, |
| recv_msg->u.dump.nid, recv_msg->u.dump.pid, |
| recv_msg->u.dump.verifier); |
| mon_log_write(MON_CLUSTER_HANDLEOTHERNODE_3, SQ_LOG_ERR, buf); |
| } |
| } |
| else |
| { |
| // Dump completion handled in CProcess::Exit() |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), "[%s], Can't find process nid=%d, " |
| "pid=%d for dump complete target.\n", method_name, |
| recv_msg->u.dump.nid, recv_msg->u.dump.pid); |
| mon_log_write(MON_CLUSTER_HANDLEOTHERNODE_4, SQ_LOG_ERR, buf); |
| } |
| } |
| break; |
| |
| case InternalType_Exit: |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS)) |
| trace_printf("%s@%d - Internal exit request for %s (%d, %d)\n", method_name, __LINE__, recv_msg->u.exit.name, recv_msg->u.exit.nid, recv_msg->u.exit.pid); |
| ReqQueue.enqueueExitReq( &recv_msg->u.exit ); |
| break; |
| |
| case InternalType_Event: |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS)) |
| trace_printf("%s@%d - Internal event request\n", method_name, __LINE__); |
| if ( MyNode->IsMyNode(recv_msg->u.event.nid) ) |
| { |
| if (trace_settings & TRACE_SYNC) |
| trace_printf("%s@%d - processing event for (%d, %d)\n", method_name, __LINE__, recv_msg->u.event.nid, recv_msg->u.event.pid); |
| |
| lnode = Nodes->GetLNode( recv_msg->u.event.nid ); |
| if ( lnode ) |
| { |
| process = lnode->GetProcessL(recv_msg->u.event.pid); |
| |
| if (process) |
| { |
| int verifier = recv_msg->u.dump.verifier; |
| if ( (verifier == -1) || (verifier == process->GetVerifier()) ) |
| { |
| process->GenerateEvent (recv_msg->u.event.event_id, |
| recv_msg->u.event.length, |
| &recv_msg->u.event.data); |
| } |
| else |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), "[%s], Can't find process nid=%d, " |
| "pid=%d, verifier=%d for event=%d\n", method_name, |
| recv_msg->u.event.nid, recv_msg->u.event.pid, |
| recv_msg->u.event.verifier, recv_msg->u.event.event_id); |
| mon_log_write(MON_CLUSTER_HANDLEOTHERNODE_5, SQ_LOG_ERR, buf); |
| } |
| } |
| else |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), "[%s], Can't find process nid" |
| "=%d, pid=%d for processing event.\n", |
| method_name, |
| recv_msg->u.event.nid, recv_msg->u.event.pid); |
| mon_log_write(MON_CLUSTER_HANDLEOTHERNODE_6, SQ_LOG_ERR, |
| buf); |
| } |
| } |
| } |
| break; |
| |
| case InternalType_IoData: |
| if (trace_settings & (TRACE_SYNC_DETAIL | TRACE_REQUEST_DETAIL | TRACE_REDIRECTION)) |
| trace_printf("%s@%d - Internal IO data request\n", method_name, __LINE__); |
| if ( MyNode->IsMyNode(recv_msg->u.iodata.nid) ) |
| { |
| if (trace_settings & (TRACE_SYNC | TRACE_REDIRECTION)) |
| trace_printf("%s@%d - processing IO Data for (%d, %d)\n", method_name, __LINE__, recv_msg->u.iodata.nid, recv_msg->u.iodata.pid); |
| |
| lnode = Nodes->GetLNode( recv_msg->u.iodata.nid ); |
| if ( lnode ) |
| { |
| process = lnode->GetProcessL(recv_msg->u.iodata.pid); |
| |
| if (process) |
| { |
| int fd; |
| if (recv_msg->u.iodata.ioType == STDIN_DATA) |
| { |
| fd = process->FdStdin(); |
| } |
| else |
| { |
| fd = process->FdStdout(); |
| } |
| Redirector.disposeIoData(fd, |
| recv_msg->u.iodata.length, |
| recv_msg->u.iodata.data); |
| } |
| else |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), "[%s], Can't find process nid" |
| "=%d, pid=%d for processing IO Data.\n", |
| method_name, |
| recv_msg->u.iodata.nid, recv_msg->u.iodata.pid); |
| mon_log_write(MON_CLUSTER_HANDLEOTHERNODE_7, SQ_LOG_ERR, |
| buf); |
| } |
| } |
| } |
| break; |
| |
| case InternalType_StdinReq: |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS)) |
| trace_printf("%s@%d - Internal STDIN request\n", method_name, __LINE__); |
| if ( !MyNode->IsMyNode(recv_msg->u.stdin_req.supplier_nid) ) |
| { |
| break; |
| |
| } |
| |
| if (trace_settings & (TRACE_SYNC | TRACE_REDIRECTION)) |
| trace_printf("%s@%d - stdin request from (%d,%d)" |
| ", type=%d, for supplier (%d, %d)\n", |
| method_name, __LINE__, |
| recv_msg->u.stdin_req.nid, |
| recv_msg->u.stdin_req.pid, |
| recv_msg->u.stdin_req.reqType, |
| recv_msg->u.stdin_req.supplier_nid, |
| recv_msg->u.stdin_req.supplier_pid); |
| |
| lnode = Nodes->GetLNode( recv_msg->u.stdin_req.nid ); |
| if ( lnode == NULL ) |
| { |
| break; |
| } |
| process = lnode->GetProcessL(recv_msg->u.stdin_req.pid); |
| |
| if (process) |
| { |
| if (recv_msg->u.stdin_req.reqType == STDIN_REQ_DATA) |
| { |
| // Set up to forward stdin data to requester. |
| // Save file descriptor associated with stdin |
| // so can find the redirector object later. |
| CProcess *supProcess; |
| lnode = Nodes->GetLNode( recv_msg->u.stdin_req.supplier_nid ); |
| if ( lnode ) |
| { |
| supProcess = lnode->GetProcessL ( recv_msg->u.stdin_req.supplier_pid ); |
| if (supProcess) |
| { |
| int fd; |
| fd = Redirector.stdinRemote(supProcess->infile(), |
| recv_msg->u.stdin_req.nid, |
| recv_msg->u.stdin_req.pid); |
| process->FdStdin(fd); |
| } |
| else |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), "[%s], Can't find process " |
| "nid=%d, pid=%d for stdin data request.\n", |
| method_name, |
| recv_msg->u.stdin_req.supplier_nid, |
| recv_msg->u.stdin_req.supplier_pid); |
| mon_log_write(MON_CLUSTER_HANDLEOTHERNODE_8, |
| SQ_LOG_ERR, buf); |
| } |
| } |
| } |
| else if (recv_msg->u.stdin_req.reqType == STDIN_FLOW_OFF) |
| { |
| Redirector.stdinOff(process->FdStdin()); |
| } |
| else if (recv_msg->u.stdin_req.reqType == STDIN_FLOW_ON) |
| { |
| Redirector.stdinOn(process->FdStdin()); |
| } |
| } |
| else |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), "[%s], Can't find process nid=%d, " |
| "pid=%d for stdin data request.\n", method_name, |
| recv_msg->u.stdin_req.nid, |
| recv_msg->u.stdin_req.pid); |
| mon_log_write(MON_CLUSTER_HANDLEOTHERNODE_9, SQ_LOG_ERR, buf); |
| } |
| break; |
| |
| case InternalType_Kill: |
| // Queue the kill request for processing by a worker thread. |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS)) |
| trace_printf("%s@%d - Internal kill request for (%d, %d), abort =%d\n", method_name, __LINE__, recv_msg->u.kill.nid, recv_msg->u.kill.pid, recv_msg->u.kill.persistent_abort); |
| |
| ReqQueue.enqueueKillReq( &recv_msg->u.kill ); |
| break; |
| |
| |
| case InternalType_Process: |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS)) |
| trace_printf("%s@%d - Internal process request\n", method_name, __LINE__); |
| if ( MyNode->IsMyNode(recv_msg->u.process.nid) ) |
| { // Need to create process on this node. |
| // Queue process creation request for handling by worker thread |
| ReqQueue.enqueueNewProcReq( &recv_msg->u.process ); |
| } |
| break; |
| |
| case InternalType_ProcessInit: |
| if ( MyNode->IsMyNode(recv_msg->u.processInit.origNid) ) |
| { // New process request originated on this node |
| ReqQueue.enqueueProcInitReq( &recv_msg->u.processInit ); |
| } |
| break; |
| |
| case InternalType_Open: |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS)) |
| trace_printf("%s@%d - Internal open request for (%d, %d), opened (%d, %d)\n", method_name, __LINE__, recv_msg->u.open.nid, recv_msg->u.open.pid, recv_msg->u.open.opened_nid, recv_msg->u.open.opened_pid); |
| |
| ReqQueue.enqueueOpenReq( &recv_msg->u.open ); |
| break; |
| |
| case InternalType_SchedData: |
| SaveSchedData( recv_msg ); |
| break; |
| |
| case InternalType_Set: |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS)) |
| trace_printf("%s@%d - Internal set request\n", method_name, __LINE__); |
| ReqQueue.enqueueSetReq( &recv_msg->u.set ); |
| break; |
| |
| case InternalType_UniqStr: |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS)) |
| trace_printf("%s@%d - Internal unique string request\n", method_name, __LINE__); |
| ReqQueue.enqueueUniqStrReq( &recv_msg->u.uniqstr ); |
| break; |
| |
| case InternalType_Sync: |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_TMSYNC)) |
| trace_printf("%s@%d - Internal sync request for" |
| " Node %s, pnid=%d, SyncType=%d\n", |
| method_name, __LINE__, Node[pnid]->GetName(), pnid, |
| recv_msg->u.sync.type); |
| switch (recv_msg->u.sync.type ) |
| { |
| case SyncType_TmSeqNum: |
| if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) |
| trace_printf("%s@%d - TMSYNC(TmSeqNum) on Node %s (pnid=%d)\n", method_name, __LINE__, Node[pnid]->GetName(), pnid); |
| CoordinateTmSeqNumber(pnid); |
| break; |
| case SyncType_TmData: |
| if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) |
| trace_printf("%s@%d - TMSYNC(TmData) on Node %s (pnid=%d)\n", method_name, __LINE__, Node[pnid]->GetName(), pnid); |
| if ( ! MyNode->IsSpareNode() && MyNode->GetPhase() != Phase_Ready ) |
| { |
| MyNode->CheckActivationPhase(); |
| } |
| if ( ! MyNode->IsSpareNode() && MyNode->GetPhase() == Phase_Ready ) |
| { |
| if ( MyNode->GetTmSyncState() == SyncState_Null ) |
| { |
| // Begin a Slave Sync Start |
| if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) |
| trace_printf("%s@%d - Slave Sync Start on Node %s (pnid=%d)\n", method_name, __LINE__, Node[pnid]->GetName(), pnid); |
| TmSyncPNid = pnid; |
| Node[pnid]->SetTmSyncState( recv_msg->u.sync.state ); |
| if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) |
| { |
| trace_printf("%s@%d - Node %s (pnid=%d) TmSyncState updated (%d)(%s)\n", method_name, __LINE__, Node[pnid]->GetName(), pnid, Node[pnid]->GetTmSyncState(), SyncStateString( Node[pnid]->GetTmSyncState() )); |
| } |
| Monitor->CoordinateTmDataBlock( &recv_msg->u.sync ); |
| } |
| else |
| { |
| if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) |
| trace_printf("%s@%d - Sync State Collision! Node %s (pnid=%d) TmSyncState=(%d)(%s)\n", method_name, __LINE__, MyNode->GetName(), MyPNID, MyNode->GetTmSyncState(), SyncStateString( MyNode->GetTmSyncState()) ); |
| if ( MyNode->GetTmSyncState() == SyncState_Continue ) |
| { |
| if ( pnid > TmSyncPNid ) |
| // highest node id will continue |
| { |
| // They take priority ... we abort |
| if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) |
| trace_printf("%s@%d - Aborting Slave Sync Start on node %s (pnid=%d)\n", method_name, __LINE__, Node[Monitor->TmSyncPNid]->GetName(), Monitor->TmSyncPNid); |
| MyNode->SetTmSyncState( SyncState_Null ); |
| if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) |
| trace_printf("%s@%d - Node %s (pnid=%d) TmSyncState updated (%d)(%s)\n", method_name, __LINE__, MyNode->GetName(), MyPNID, MyNode->GetTmSyncState(), SyncStateString( MyNode->GetTmSyncState() ) ); |
| Monitor->ReQueue_TmSync (false); |
| // Continue with other node's Slave TmSync Start request |
| if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) |
| trace_printf("%s@%d - Slave Sync Start on node %s (pnid=%d)\n", method_name, __LINE__, Node[pnid]->GetName(), pnid); |
| TmSyncPNid = pnid; |
| Node[pnid]->SetTmSyncState( recv_msg->u.sync.state ); |
| if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) |
| { |
| trace_printf("%s@%d - Node %s (pnid=%d) TmSyncState updated (%d)(%s)\n", method_name, __LINE__, Node[pnid]->GetName(), pnid, Node[pnid]->GetTmSyncState(), SyncStateString( Node[pnid]->GetTmSyncState() )); |
| } |
| Monitor->CoordinateTmDataBlock (&recv_msg->u.sync); |
| } |
| } |
| else if ( MyNode->GetTmSyncState() == SyncState_Start ) |
| { |
| // Check if they continue with Master Sync Start |
| if ( pnid > MyPNID ) |
| // highest node id will continue |
| { |
| // They take priority ... we abort |
| if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) |
| trace_printf("%s@%d - Aborted Master Sync Start\n", method_name, __LINE__); |
| MyNode->SetTmSyncState( SyncState_Null ); |
| if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) |
| trace_printf("%s@%d - Node %s (pnid=%d) TmSyncState updated (%d)(%s)\n", method_name, __LINE__, MyNode->GetName(), MyPNID, MyNode->GetTmSyncState(), SyncStateString( MyNode->GetTmSyncState() ) ); |
| // Continue with other node's Slave TmSync Start request |
| if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) |
| trace_printf("%s@%d - Slave Sync Start on node %s (pnid=%d)\n", method_name, __LINE__, Node[pnid]->GetName(), pnid); |
| TmSyncPNid = pnid; |
| Node[pnid]->SetTmSyncState( recv_msg->u.sync.state ); |
| if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) |
| { |
| trace_printf("%s@%d - Node %s (pnid=%d) TmSyncState updated (%d)(%s)\n", method_name, __LINE__, Node[pnid]->GetName(), pnid, Node[pnid]->GetTmSyncState(), SyncStateString( Node[pnid]->GetTmSyncState() )); |
| } |
| Monitor->CoordinateTmDataBlock (&recv_msg->u.sync); |
| } |
| else |
| { |
| // We continue and assume they abort |
| if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) |
| trace_printf("%s@%d - Continuing with Master Sync Start\n", method_name, __LINE__); |
| } |
| } |
| else |
| { |
| if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) |
| trace_printf("%s@%d - Invalid TmSync_State\n", method_name, __LINE__); |
| } |
| } |
| } |
| break; |
| |
| case SyncType_TmSyncState: |
| if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) |
| trace_printf("%s@%d - TMSYNC(TmSyncState) on Node %s (pnid=%d)\n", method_name, __LINE__, Node[pnid]->GetName(), pnid); |
| break; |
| |
| default: |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), "[%s], Unknown SyncType from pnid=%d.\n", method_name, pnid); |
| mon_log_write(MON_CLUSTER_HANDLEOTHERNODE_10, SQ_LOG_ERR, buf); |
| } |
| } |
| break; |
| default: |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), "[%s], Unknown Internal message received, Physical Node=%d.\n", method_name, pnid); |
| mon_log_write(MON_CLUSTER_HANDLEOTHERNODE_11, SQ_LOG_ERR, buf); |
| } |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| void CCluster::HandleMyNodeMsg (struct internal_msg_def *recv_msg, |
| int pnid) |
| { |
| const char method_name[] = "CCluster::HandleMyNodeMsg"; |
| TRACE_ENTRY; |
| |
| CProcess *process; |
| CLNode *lnode; |
| |
| if (trace_settings & TRACE_SYNC_DETAIL) |
| trace_printf("%s@%d - Marking object as replicated, msg type=%d\n", |
| method_name, __LINE__, recv_msg->type); |
| switch (recv_msg->type) |
| { |
| |
| case InternalType_Null: |
| if (trace_settings & TRACE_SYNC_DETAIL) |
| trace_printf("%s@%d - Physical Node pnid=n%d has nothing to " |
| "update. \n", method_name, __LINE__, pnid); |
| break; |
| |
| case InternalType_ActivateSpare: |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS)) |
| trace_printf("%s@%d - Internal activate spare request, spare pnid=%d, down pnid=%d\n" |
| , method_name, __LINE__ |
| , recv_msg->u.activate_spare.spare_pnid |
| , recv_msg->u.activate_spare.down_pnid); |
| break; |
| |
| case InternalType_Clone: |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS)) |
| trace_printf("%s@%d - Internal clone request, completed replicating process (%d, %d) %s\n", method_name, __LINE__, recv_msg->u.clone.nid, recv_msg->u.clone.os_pid, (recv_msg->u.clone.backup?" Backup":"")); |
| break; |
| |
| case InternalType_Device: |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS)) |
| trace_printf("%s@%d - Internal device request, completed device processing for ldev %s\n", method_name, __LINE__, recv_msg->u.device.ldev_name); |
| break; |
| |
| case InternalType_Shutdown: |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS)) |
| trace_printf("%s@%d - Internal shutdown request for level=%d\n", method_name, __LINE__, recv_msg->u.shutdown.level); |
| |
| // Queue the shutdown request for processing by a worker thread. |
| ReqQueue.enqueueShutdownReq( recv_msg->u.shutdown.level ); |
| break; |
| |
| case InternalType_Down: |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS)) |
| trace_printf("%s@%d - Internal down node request for pnid=%d\n", method_name, __LINE__, recv_msg->u.down.pnid); |
| break; |
| |
| case InternalType_Up: |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS)) |
| trace_printf("%s@%d - Internal up node request for pnid=%d\n", method_name, __LINE__, recv_msg->u.up.pnid); |
| break; |
| |
| case InternalType_Dump: |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS)) |
| trace_printf("%s@%d - Internal dump request for nid=%d, pid=%d\n", |
| method_name, __LINE__, |
| recv_msg->u.dump.nid, recv_msg->u.dump.pid); |
| |
| lnode = Nodes->GetLNode( recv_msg->u.dump.nid ); |
| if ( lnode ) |
| { |
| process = lnode->GetProcessL(recv_msg->u.dump.pid); |
| |
| if (process) |
| { |
| int verifier = recv_msg->u.dump.verifier; |
| if ( (verifier == -1) || (verifier == process->GetVerifier()) ) |
| { |
| process->DumpBegin(recv_msg->u.dump.dumper_nid, |
| recv_msg->u.dump.dumper_pid, |
| recv_msg->u.dump.dumper_verifier, |
| recv_msg->u.dump.core_file); |
| } |
| else |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), "[%s], Can't find process nid=%d, " |
| "pid=%d, verifier=%d for dump target.\n", method_name, |
| recv_msg->u.dump.nid, recv_msg->u.dump.pid, |
| recv_msg->u.dump.verifier); |
| mon_log_write(MON_CLUSTER_HANDLEMYNODE_1, SQ_LOG_ERR, buf); |
| } |
| } |
| else |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), "[%s], Can't find process nid=%d, " |
| "pid=%d for dump target.\n", method_name, |
| recv_msg->u.dump.nid, recv_msg->u.dump.pid); |
| mon_log_write(MON_CLUSTER_HANDLEMYNODE_2, SQ_LOG_ERR, buf); |
| } |
| } |
| break; |
| |
| case InternalType_DumpComplete: |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS)) |
| trace_printf("%s@%d - Internal dump-complete request for nid=%d, pid=%d\n", |
| method_name, __LINE__, |
| recv_msg->u.dump.nid, recv_msg->u.dump.pid); |
| lnode = Nodes->GetLNode( recv_msg->u.dump.nid ); |
| if ( lnode ) |
| { |
| process = lnode->GetProcessL(recv_msg->u.dump.pid); |
| |
| if (process) |
| { |
| int verifier = recv_msg->u.dump.verifier; |
| if ( (verifier == -1) || (verifier == process->GetVerifier()) ) |
| { |
| process->DumpEnd(recv_msg->u.dump.status, recv_msg->u.dump.core_file); |
| } |
| else |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), "[%s], Can't find process nid=%d, " |
| "pid=%d, verifier=%d for dump target.\n", method_name, |
| recv_msg->u.dump.nid, recv_msg->u.dump.pid, |
| recv_msg->u.dump.verifier); |
| mon_log_write(MON_CLUSTER_HANDLEMYNODE_3, SQ_LOG_ERR, buf); |
| } |
| } |
| else |
| { |
| // Dump completion handled in CProcess::Exit() |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), "[%s], Can't find process nid=%d, " |
| "pid=%d for dump complete target.\n", method_name, |
| recv_msg->u.dump.nid, recv_msg->u.dump.pid); |
| mon_log_write(MON_CLUSTER_HANDLEMYNODE_4, SQ_LOG_ERR, buf); |
| } |
| } |
| break; |
| |
| case InternalType_Exit: |
| // Final process exit logic is done in Process_Exit, not here |
| // as in the past. |
| break; |
| |
| case InternalType_Event: |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS)) |
| trace_printf("%s@%d - Internal event request\n", method_name, __LINE__); |
| break; |
| |
| case InternalType_IoData: |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS)) |
| trace_printf("%s@%d - Internal IO data request\n", method_name, __LINE__); |
| break; |
| |
| case InternalType_StdinReq: |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS)) |
| trace_printf("%s@%d - Internal STDIN request\n", method_name, __LINE__); |
| break; |
| |
| case InternalType_Kill: |
| // Queue the kill request for processing by a worker thread. |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS)) |
| trace_printf("%s@%d - Internal kill request for (%d, %d), abort =%d\n", method_name, __LINE__, recv_msg->u.kill.nid, recv_msg->u.kill.pid, recv_msg->u.kill.persistent_abort); |
| |
| ReqQueue.enqueueKillReq( &recv_msg->u.kill ); |
| break; |
| |
| case InternalType_Process: |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS)) |
| trace_printf("%s@%d - Internal process request, completed process replication for (%d, %d) %s\n", method_name, __LINE__, recv_msg->u.process.pid, recv_msg->u.process.nid, (recv_msg->u.process.backup?" Backup":"")); |
| break; |
| |
| case InternalType_ProcessInit: |
| // No action needed |
| break; |
| |
| case InternalType_Open: |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS)) |
| trace_printf("%s@%d - Internal open request, completed open replication, " |
| "(%d, %d:%d) opened (%d, %d:%d)\n", |
| method_name, __LINE__, |
| recv_msg->u.open.nid, |
| recv_msg->u.open.pid, |
| recv_msg->u.open.verifier, |
| recv_msg->u.open.opened_nid, |
| recv_msg->u.open.opened_pid, |
| recv_msg->u.open.opened_verifier); |
| break; |
| |
| case InternalType_SchedData: |
| // No action needed |
| break; |
| |
| case InternalType_Set: |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS)) |
| trace_printf("%s@%d - Internal set request, completed replicating key %s::%s\n", method_name, __LINE__, recv_msg->u.set.group, recv_msg->u.set.key); |
| break; |
| |
| case InternalType_UniqStr: |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS)) |
| trace_printf("%s@%d - Internal unique string request, completed replicating (%d, %d)\n", method_name, __LINE__, recv_msg->u.uniqstr.nid, recv_msg->u.uniqstr.id); |
| break; |
| |
| case InternalType_Sync: |
| if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_TMSYNC)) |
| trace_printf("%s@%d - Internal sync request for node %s, pnid=%d, SyncType=%d\n" |
| , method_name, __LINE__, Node[pnid]->GetName(), pnid, recv_msg->u.sync.type); |
| switch (recv_msg->u.sync.type ) |
| { |
| case SyncType_TmSeqNum: |
| if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) |
| trace_printf("%s@%d - TMSYNC(TmSeqNum) on Node %s (pnid=%d)\n", method_name, __LINE__, Node[MyPNID]->GetName(), MyPNID); |
| CoordinateTmSeqNumber(pnid); |
| break; |
| case SyncType_TmData: |
| if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) |
| trace_printf("%s@%d - TMSYNC(TmData) on Node %s (pnid=%d)\n", method_name, __LINE__, Node[MyPNID]->GetName(), MyPNID); |
| TmSyncPNid = MyPNID; |
| if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) |
| trace_printf("%s@%d - Sync communicated, TmSyncPNid=%d\n", method_name, __LINE__, TmSyncPNid); |
| if ( ! MyNode->IsSpareNode() && MyNode->GetPhase() != Phase_Ready ) |
| { |
| MyNode->CheckActivationPhase(); |
| } |
| if ( MyNode->GetTmSyncState() == SyncState_Start && |
| MyNode->GetPhase() == Phase_Ready && |
| MyNode->GetNumLNodes() > 1 ) |
| { |
| // Begin a Slave Sync Start to other |
| // logical nodes in my physical node |
| if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) |
| trace_printf("%s@%d - Slave Sync Start on local node %s, pnid=%d\n", method_name, __LINE__, Node[pnid]->GetName(), pnid); |
| Monitor->CoordinateTmDataBlock( &recv_msg->u.sync ); |
| } |
| break; |
| case SyncType_TmSyncState: |
| if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) |
| trace_printf("%s@%d - TMSYNC(TmSyncState) on Node %s (pnid=%d)\n", method_name, __LINE__, Node[MyPNID]->GetName(), MyPNID); |
| break; |
| default: |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), "[%s], Unknown SyncType from node %s, pnid=%d during processing local SyncType.\n", method_name, Node[pnid]->GetName(), pnid); |
| mon_log_write(MON_CLUSTER_HANDLEMYNODE_5, SQ_LOG_ERR, buf); |
| } |
| } |
| break; |
| |
| default: |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), "[%s], Unknown Internal message received during processing local SyncType for pnid=%d.\n", method_name, pnid); |
| mon_log_write(MON_CLUSTER_HANDLEMYNODE_6, SQ_LOG_ERR, buf); |
| } |
| |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| |
| |
| bool CCluster::responsive() |
| { |
| const char method_name[] = "CCluster::responsive"; |
| TRACE_ENTRY; |
| |
| int barrierDiff = barrierCount_ - barrierCountSaved_; |
| |
| // if no difference in barrier count, sync thread is not responsive |
| if ( !barrierDiff && isMonInitComplete() ) |
| { |
| // this proc is called every SYNC_MAX_RESPONSIVE+1 secs |
| cumulativeDelaySec_ += CCluster::SYNC_MAX_RESPONSIVE + 1; |
| |
| monSyncResponsive_ = false; // sync thread is no longer responsive |
| |
| if ( CommType == CommType_InfiniBand ) |
| { |
| // if sync thread is stuck in mpi call, one of the following checks will be true |
| if ( inBarrier_ || inAllGather_ || inCommDup_ ) |
| { |
| mem_log_write(MON_CLUSTER_RESPONSIVE_1, cumulativeDelaySec_, |
| ( ( (inBarrier_ << 1) | inAllGather_ ) << 1 ) | inCommDup_); |
| } |
| else // non-mpi took quite long |
| { |
| mem_log_write(MON_CLUSTER_RESPONSIVE_2, cumulativeDelaySec_); |
| } |
| } |
| else |
| { |
| // if sync thread is stuck in mpi call |
| if ( inBarrier_ ) |
| { |
| mem_log_write(MON_CLUSTER_RESPONSIVE_1, cumulativeDelaySec_, |
| inBarrier_); |
| } |
| else // non-mpi took quite long |
| { |
| mem_log_write(MON_CLUSTER_RESPONSIVE_2, cumulativeDelaySec_); |
| } |
| } |
| } |
| else if (barrierDiff < syncMinPerSec_) |
| { |
| mem_log_write(MON_CLUSTER_RESPONSIVE_3, barrierDiff, syncMinPerSec_); |
| cumulativeDelaySec_ = 0; |
| monSyncResponsive_ = true; // slow but responsive |
| } |
| else |
| { |
| cumulativeDelaySec_ = 0; |
| monSyncResponsive_ = true; // truely responsive |
| } |
| |
| barrierCountSaved_ = barrierCount_; |
| if ( CommType == CommType_InfiniBand ) |
| { |
| allGatherCountSaved_ = allGatherCount_; |
| commDupCountSaved_ = commDupCount_; |
| } |
| |
| TRACE_EXIT; |
| |
| return monSyncResponsive_; |
| } |
| |
| |
| int CCluster::MPIAllgather(void *sendbuf, int sendcount, MPI_Datatype sendtype, |
| void *recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm Comm) |
| { |
| const char method_name[] = "CCluster::MPIAllGather"; |
| TRACE_ENTRY; |
| |
| inAllGather_ = true; |
| |
| int rc = MPI_Allgather (sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, Comm); |
| |
| inAllGather_ = false; |
| |
| allGatherCount_++; |
| |
| TRACE_EXIT; |
| return rc; |
| } |
| |
| void CCluster::InitializeConfigCluster( void ) |
| { |
| int rc; |
| |
| const char method_name[] = "CCluster::InitializeConfigCluster"; |
| TRACE_ENTRY; |
| |
| int worldSize; |
| MPI_Comm_size (MPI_COMM_WORLD, &worldSize); |
| int rankToPnid[worldSize]; |
| CurNodes = worldSize; |
| |
| if ( IsRealCluster ) |
| { |
| cfgPNodes_ = Nodes->GetClusterConfig()->GetPNodesCount(); |
| } |
| else |
| { |
| // Set virtual cluster size to collective size |
| MPI_Comm_size (MPI_COMM_WORLD, &cfgPNodes_); |
| |
| // For virtual cluster set physical node id equal to rank |
| for (int i=0; i<worldSize; ++i) |
| { |
| rankToPnid[i] = i; |
| |
| // Set bit indicating node is up |
| upNodes_.upNodes[i/64] |= (1ull << i); |
| } |
| } |
| NumNodes = cfgPNodes_; |
| |
| // Build the monitor's configured view of the cluster |
| if ( IsRealCluster ) |
| { // Map node name to physical node id |
| // (for virtual nodes physical node equals "rank" (previously set)) |
| CClusterConfig *clusterConfig = Nodes->GetClusterConfig(); |
| MyPNID = clusterConfig->GetPNid( Node_name ); |
| } |
| Nodes->AddNodes( ); |
| |
| MyNode = Nodes->GetNode(MyPNID); |
| Nodes->SetupCluster( &Node, &LNode ); |
| |
| if ( CommType == CommType_Sockets ) |
| { |
| InitServerSock(); |
| } |
| |
| // The new monitor in a real cluster initializes all |
| // existing nodes to a down state. |
| // ReIntegrate() will set the state to up when communication is established. |
| if ( IAmIntegrating ) |
| { |
| for (int i=0; i<cfgPNodes_; i++) |
| { |
| CNode *node = Node[i]; |
| if ( node->GetPNid() != MyPNID ) |
| { |
| node->SetState( State_Down ); |
| } |
| } |
| } |
| else |
| { |
| char *nodeNames = 0; |
| if ( IsRealCluster ) |
| { |
| if (trace_settings & TRACE_INIT) |
| trace_printf( "%s@%d Collecting port numbers and node names, " |
| "cfgPNodes_=%d, worldSize=%d, pnid=%d (%s:%s)\n" |
| "MyCommPort=%s\nMySyncPort=%s\n" |
| , method_name, __LINE__ |
| , cfgPNodes_, worldSize |
| , MyPNID, MyNode->GetName(), MyNode->GetCommPort() |
| , MyCommPort, MySyncPort ); |
| |
| bool nodeStatus[cfgPNodes_]; |
| for (int i=0; i<cfgPNodes_; ++i) |
| { |
| nodeStatus[i] = false; |
| } |
| |
| // Collect comm port info from other monitors |
| char *commPortNums = new char[worldSize * MPI_MAX_PORT_NAME]; |
| rc = MPI_Allgather (MyCommPort, MPI_MAX_PORT_NAME, MPI_CHAR, commPortNums, |
| MPI_MAX_PORT_NAME, MPI_CHAR, MPI_COMM_WORLD); |
| if (rc != MPI_SUCCESS) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), "[%s@%d] MPI_Allgather error=%s\n", |
| method_name, __LINE__, ErrorMsg(rc)); |
| mon_log_write(MON_CLUSTER_INITCONFIGCLUSTER_2, SQ_LOG_CRIT, buf); |
| |
| MPI_Abort(MPI_COMM_SELF,99); |
| } |
| |
| // Collect sync port info from other monitors |
| char *syncPortNums = new char[worldSize * MPI_MAX_PORT_NAME]; |
| rc = MPI_Allgather (MySyncPort, MPI_MAX_PORT_NAME, MPI_CHAR, syncPortNums, |
| MPI_MAX_PORT_NAME, MPI_CHAR, MPI_COMM_WORLD); |
| if (rc != MPI_SUCCESS) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), "[%s@%d] MPI_Allgather error=%s\n", |
| method_name, __LINE__, ErrorMsg(rc)); |
| mon_log_write(MON_CLUSTER_INITCONFIGCLUSTER_2, SQ_LOG_CRIT, buf); |
| |
| MPI_Abort(MPI_COMM_SELF,99); |
| } |
| |
| // Exchange Node Names with collective |
| nodeNames = new char[worldSize * MPI_MAX_PROCESSOR_NAME]; |
| rc = MPI_Allgather (Node_name, MPI_MAX_PROCESSOR_NAME, MPI_CHAR, |
| nodeNames, MPI_MAX_PROCESSOR_NAME, MPI_CHAR, |
| MPI_COMM_WORLD); |
| if (rc != MPI_SUCCESS) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), "[%s@%d] MPI_Allgather error=%s\n", |
| method_name, __LINE__, ErrorMsg(rc)); |
| mon_log_write(MON_CLUSTER_INITCONFIGCLUSTER_3, SQ_LOG_CRIT, buf); |
| |
| MPI_Abort(MPI_COMM_SELF,99); |
| } |
| |
| // For each node name get corresponding CNode object and |
| // store port number in it. |
| char * nodeName; |
| CNode * node; |
| for (int i = 0; i < worldSize; i++) |
| { |
| nodeName = &nodeNames[ i * MPI_MAX_PROCESSOR_NAME ]; |
| node = Nodes->GetNode( nodeName ); |
| if ( node ) |
| { |
| node->SetCommPort( &commPortNums[ i * MPI_MAX_PORT_NAME] ); |
| node->SetSyncPort( &syncPortNums[ i * MPI_MAX_PORT_NAME] ); |
| rankToPnid[i] = node->GetPNid(); |
| nodeStatus[rankToPnid[i]] = true; |
| |
| if (trace_settings & TRACE_INIT) |
| { |
| trace_printf( "%s@%d rankToPnid[%d]=%d (%s:%s:%s)" |
| "(node=%s,commPort=%s,syncPort=%s)\n" |
| , method_name, __LINE__, i, rankToPnid[i] |
| , node->GetName() |
| , node->GetCommPort() |
| , node->GetSyncPort() |
| , &nodeNames[ i * MPI_MAX_PROCESSOR_NAME] |
| , &commPortNums[ i * MPI_MAX_PORT_NAME] |
| , &syncPortNums[ i * MPI_MAX_PORT_NAME]); |
| } |
| } |
| else |
| { |
| rankToPnid[i] = -1; |
| |
| // Unexpectedly could not map node name to CNode object |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), "[%s@%d] Unable to find node " |
| "object for node %s\n", method_name, __LINE__, |
| nodeName ); |
| mon_log_write(MON_CLUSTER_INITCONFIGCLUSTER_4, SQ_LOG_CRIT, buf); |
| } |
| } |
| delete [] commPortNums; |
| delete [] syncPortNums; |
| |
| int TmLeaderPNid = LNode[TmLeaderNid]->GetNode()->GetPNid(); |
| |
| // Any nodes not in the initial MPI_COMM_WORLD are down. |
| for (int i=0; i<cfgPNodes_; ++i) |
| { |
| if ( nodeStatus[i] == false ) |
| { |
| node = Nodes->GetNode(i); |
| if ( node ) node->SetState( State_Down ); |
| |
| // assign new TmLeader if TMLeader node is dead. |
| if (TmLeaderPNid == i) |
| { |
| AssignTmLeader(i); |
| } |
| } |
| else |
| { // Set bit indicating node is up |
| upNodes_.upNodes[i/64] |= (1ull << i); |
| } |
| } |
| } |
| |
| // Initialize communicators for point-to-point communications |
| int myRank; |
| MPI_Comm_rank( MPI_COMM_WORLD, &myRank ); |
| |
| InitClusterComm(worldSize, myRank, rankToPnid); |
| if ( CommType == CommType_Sockets ) |
| { |
| InitClusterSocks(worldSize, myRank, nodeNames, rankToPnid); |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| for ( int i =0; i < cfgPNodes_; i++ ) |
| { |
| trace_printf( "%s@%d socks_[%d]=%d\n" |
| , method_name, __LINE__ |
| , i, socks_[i]); |
| } |
| } |
| } |
| |
| if (nodeNames) delete [] nodeNames; |
| } |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| for ( int i =0; i < MAX_NODE_MASKS ; i++ ) |
| { |
| trace_printf( "%s@%d upNodes set[%d]: %llx\n" |
| , method_name, __LINE__ |
| , i, upNodes_.upNodes[i]); |
| } |
| } |
| |
| // Kill the MPICH hydra_pmi_proxy to prevent it from killing all |
| // processes in cluster when mpirun or monitor processes are killed |
| kill( getppid(), SIGKILL ); |
| |
| TRACE_EXIT; |
| } |
| |
| void CCluster::InitClusterComm(int worldSize, int myRank, int * rankToPnid) |
| { |
| const char method_name[] = "CCluster::InitClusterComm"; |
| TRACE_ENTRY; |
| |
| // Compute an array of "colors" for use with MPI_Comm_split. |
| int *splitColors; |
| splitColors = new int[worldSize*worldSize*2]; |
| int *splitOtherNode; |
| splitOtherNode = new int[worldSize*worldSize*2]; |
| int splitRows = 0; |
| for ( int i=0; i<(worldSize*worldSize*2); ++i) |
| { |
| splitColors[i] = MPI_UNDEFINED; |
| splitOtherNode[i] = -1; |
| } |
| |
| int color = 1; |
| bool placed; |
| for (int i = 0; i < worldSize; i++) |
| { |
| for (int j = i+1; j < worldSize; j++) |
| { |
| // Find a free slot for rank "i" to rank "j" |
| |
| placed = false; |
| for (int k=0; k<splitRows; ++k) |
| { |
| if ( splitColors[k*worldSize+i] == MPI_UNDEFINED |
| && splitColors[k*worldSize+j] == MPI_UNDEFINED ) |
| { |
| splitColors[k*worldSize+i] = color; |
| splitColors[k*worldSize+j] = color; |
| placed = true; |
| |
| if (myRank == i) |
| splitOtherNode[k] = j; |
| else if (myRank == j) |
| splitOtherNode[k] = i; |
| break; |
| } |
| } |
| if (!placed) |
| { // Need to use a new row |
| splitColors[splitRows*worldSize+i] = color; |
| splitColors[splitRows*worldSize+j] = color; |
| |
| if (myRank == i) |
| splitOtherNode[splitRows] = j; |
| else if (myRank == j) |
| splitOtherNode[splitRows] = i; |
| |
| ++splitRows; |
| } |
| |
| ++color; |
| } |
| } |
| |
| if (trace_settings & TRACE_INIT) |
| { |
| trace_printf("%s@%d Created %d splitRows for worldSize=%d, myRank=%d\n", |
| method_name, __LINE__, splitRows, worldSize, myRank); |
| string line; |
| char fragment[50]; |
| for (int i=0; i<splitRows; ++i) |
| { |
| sprintf(fragment, "%s@%d splitColors[%d]=", method_name, __LINE__, |
| i); |
| line = fragment; |
| for (int j=0; j<worldSize; ++j) |
| { |
| sprintf(fragment, " %d,", splitColors[i*worldSize+j]); |
| line += fragment; |
| } |
| line += "\n"; |
| trace_printf(line.c_str()); |
| |
| trace_printf("%s@%d splitOtherNode[%d]=%d\n", method_name, |
| __LINE__, i, splitOtherNode[i]); |
| } |
| } |
| |
| // Create one communicator for each other rank in MPI_COMM_WORLD |
| // This permits point-to-point communication with each rank. |
| int myRankInComm; |
| MPI_Comm ncomm; |
| int nid; |
| |
| for (int nSplit=0; nSplit < splitRows; ++nSplit) |
| { |
| color = splitColors[nSplit*worldSize+myRank]; |
| MPI_Comm_split(MPI_COMM_WORLD, color, myRank, &ncomm); |
| if (ncomm == MPI_COMM_NULL) |
| { |
| if (splitColors[nSplit*worldSize+myRank] != MPI_UNDEFINED) |
| { |
| if (trace_settings & TRACE_INIT) |
| { |
| trace_printf("%s@%d Rank %d: Unexpected MPI_COMM_NULL from " |
| "MPI_Comm_split, nSplit=%d\n", |
| method_name, __LINE__,myRank, nSplit); |
| } |
| } |
| } |
| else |
| { |
| // Set comms_ (communicators) array element for the |
| // physical node. |
| nid = rankToPnid[splitOtherNode[nSplit]]; |
| comms_[nid] = ncomm; |
| |
| MPI_Comm_rank(ncomm, &myRankInComm); |
| otherMonRank_[nid] = (myRankInComm == 0)? 1: 0; |
| |
| if (trace_settings & TRACE_INIT) |
| { |
| trace_printf("%s@%d Rank %d: MPI_Comm_split %d, color=%d, " |
| "comms_[%d] is orig rank #%d, " |
| "otherMonRank_=%d\n", |
| method_name, __LINE__, |
| myRank, nSplit, color, |
| nid, splitOtherNode[nSplit], |
| otherMonRank_[nid]); |
| } |
| } |
| } |
| |
| delete [] splitColors; |
| delete [] splitOtherNode; |
| |
| TRACE_EXIT; |
| } |
| |
| void CCluster::HandleReintegrateError( int rc, int err, |
| int pnid, nodeId_t *nodeInfo, |
| bool abort ) |
| { |
| const char method_name[] = "CCluster::HandleReintegrateError"; |
| TRACE_ENTRY; |
| |
| char buf[MON_STRING_BUF_SIZE]; |
| |
| switch ( err ) |
| { |
| case Reintegrate_Err1: |
| snprintf(buf, sizeof(buf), "[%s], can't to connect to creator monitor" |
| " port: %s - Error: %s.\n", |
| method_name, IntegratingMonitorPort, ErrorMsg(rc)); |
| break; |
| |
| case Reintegrate_Err2: |
| snprintf(buf, sizeof(buf), "[%s], can't merge intercomm to existing " |
| "MPI collective - Error: %s.\n", |
| method_name, ErrorMsg(rc)); |
| |
| break; |
| |
| case Reintegrate_Err3: |
| snprintf(buf, sizeof(buf), "[%s], unable to obtain cluster info " |
| "from creator monitor: %s.\n", method_name, ErrorMsg(rc)); |
| break; |
| |
| case Reintegrate_Err4: |
| snprintf(buf, sizeof(buf), "[%s], Failed to send name/port " |
| "to node %d (%s): %s.\n", method_name, pnid, |
| nodeInfo->nodeName, ErrorMsg(rc)); |
| break; |
| |
| case Reintegrate_Err5: |
| snprintf(buf, sizeof(buf), "[%s], can't to connect to " |
| " node %d monitor, commPort=%s, syncPort=%s: %s.\n", |
| method_name, pnid, nodeInfo->commPort, |
| nodeInfo->syncPort, ErrorMsg(rc)); |
| break; |
| |
| case Reintegrate_Err6: |
| snprintf(buf, sizeof(buf), "[%s], can't merge intercomm " |
| "for node %d: %s.\n", method_name, pnid, |
| ErrorMsg(rc)); |
| break; |
| |
| case Reintegrate_Err7: |
| snprintf(buf, sizeof(buf), "[%s], can't disconnect " |
| "intercomm for node %d: %s.\n", method_name, pnid, |
| ErrorMsg(rc)); |
| break; |
| |
| case Reintegrate_Err8: |
| snprintf(buf, sizeof(buf), "[%s], Failed to send status to creator " |
| "monitor: %s\n", method_name, ErrorMsg(rc)); |
| break; |
| |
| case Reintegrate_Err9: |
| snprintf(buf, sizeof(buf), "[%s], Failed to send name/port " |
| "to creator monitor: %s.\n", method_name, ErrorMsg(rc)); |
| break; |
| |
| case Reintegrate_Err10: |
| snprintf(buf, sizeof(buf), "[%s], Monitor initialization failed (could" |
| " not write to port file). Aborting.\n", method_name); |
| break; |
| |
| case Reintegrate_Err11: |
| snprintf(buf, sizeof(buf), "[%s], Monitor initialization failed (could" |
| " not open port file). Aborting.\n", method_name); |
| break; |
| |
| case Reintegrate_Err12: |
| snprintf(buf, sizeof(buf), "[%s], Monitor initialization failed (could" |
| " not initialize local io). Aborting.\n", method_name); |
| break; |
| |
| case Reintegrate_Err13: |
| snprintf(buf, sizeof(buf), "[%s], Monitor initialization failed (could" |
| " not initialize devices). Aborting.\n", method_name); |
| break; |
| |
| case Reintegrate_Err14: |
| snprintf(buf, sizeof(buf), "[%s] Aborting.\n", method_name); |
| break; |
| |
| case Reintegrate_Err15: |
| snprintf(buf, sizeof(buf), "[%s], no connect acknowledgement " |
| "for node %d: %s.\n", method_name, pnid, |
| ErrorMsg(rc)); |
| break; |
| |
| default: |
| snprintf(buf, sizeof(buf), "[%s], Reintegration error: %s\n", |
| method_name, ErrorMsg(rc)); |
| } |
| |
| mon_log_write(MON_CLUSTER_REINTEGRATE_1, SQ_LOG_ERR, buf); |
| |
| if ( abort ) |
| MPI_Abort(MPI_COMM_SELF,99); |
| |
| TRACE_EXIT; |
| } |
| |
| void CCluster::SendReIntegrateStatus( STATE nodeState, int initErr ) |
| { |
| int rc; |
| nodeStatus_t nodeStatus; |
| nodeStatus.state = nodeState; |
| nodeStatus.status = initErr; |
| |
| switch( CommType ) |
| { |
| case CommType_InfiniBand: |
| rc = Monitor->SendMPI( (char *) &nodeStatus |
| , sizeof(nodeStatus_t) |
| , 0 |
| , MON_XCHNG_DATA |
| , joinComm_ ); |
| if ( rc ) |
| { |
| HandleReintegrateError( rc, Reintegrate_Err8, -1, NULL, true ); |
| } |
| break; |
| case CommType_Sockets: |
| rc = Monitor->SendSock( (char *) &nodeStatus |
| , sizeof(nodeStatus_t) |
| , joinSock_ ); |
| if ( rc ) |
| { |
| HandleReintegrateError( rc, Reintegrate_Err8, -1, NULL, true ); |
| } |
| break; |
| default: |
| // Programmer bonehead! |
| abort(); |
| } |
| |
| if ( nodeState != State_Up ) |
| { // Initialization error, abort. |
| |
| mem_log_write(CMonLog::MON_REINTEGRATE_9, MyPNID, initErr); |
| HandleReintegrateError( rc, initErr, -1, NULL, true ); |
| } |
| } |
| |
| void CCluster::ReIntegrate( int initProblem ) |
| { |
| const char method_name[] = "CCluster::ReIntegrate"; |
| TRACE_ENTRY; |
| |
| switch( CommType ) |
| { |
| case CommType_InfiniBand: |
| ReIntegrateMPI( initProblem ); |
| break; |
| case CommType_Sockets: |
| ReIntegrateSock( initProblem ); |
| break; |
| default: |
| // Programmer bonehead! |
| abort(); |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| void CCluster::ReIntegrateMPI( int initProblem ) |
| { |
| const char method_name[] = "CCluster::ReIntegrateMPI"; |
| TRACE_ENTRY; |
| |
| int rc; |
| bool haveCreatorComm = false; |
| MPI_Comm interComm; |
| MPI_Comm intraComm = MPI_COMM_NULL; |
| MPI_Comm intraCommCreatorMon = MPI_COMM_NULL; |
| |
| nodeId_t myNodeInfo; |
| strcpy(myNodeInfo.nodeName, MyNode->GetName()); |
| strcpy(myNodeInfo.commPort, MyNode->GetCommPort()); |
| // Set bit indicating my node is up |
| upNodes_.upNodes[MyPNID/64] |= (1ull << MyPNID); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf("%s@%d - Connect to creator monitor (port %s)\n", |
| method_name, __LINE__, IntegratingMonitorPort); |
| |
| mem_log_write(CMonLog::MON_REINTEGRATE_1, MyPNID); |
| |
| if (trace_settings & (TRACE_SYNC | TRACE_RECOVERY | TRACE_INIT)) |
| { |
| for ( int i =0; i < MAX_NODE_MASKS ; i++ ) |
| { |
| trace_printf( "%s@%d Integrating node %s (pnid=%d) " |
| "sees set[%d]: %llx\n" |
| , method_name, __LINE__ |
| , MyNode->GetName(), MyPNID |
| , i, upNodes_.upNodes[i] ); |
| } |
| } |
| |
| TEST_POINT( TP010_NODE_UP ); |
| // Connect with my creator monitor |
| rc = MPI_Comm_connect( IntegratingMonitorPort, |
| MPI_INFO_NULL, 0, MPI_COMM_SELF, &joinComm_ ); |
| if ( rc ) |
| { |
| HandleReintegrateError( rc, Reintegrate_Err1, -1, NULL, true ); |
| } |
| |
| MPI_Comm_set_errhandler( joinComm_, MPI_ERRORS_RETURN ); |
| |
| mem_log_write(CMonLog::MON_REINTEGRATE_4, MyPNID); |
| |
| TEST_POINT( TP011_NODE_UP ); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf("%s@%d Connected to creator monitor, sending id\n", |
| method_name, __LINE__); |
| } |
| |
| // Send this node's name and port number so creator monitor |
| // knows who we are, and set flag to let creator monitor it is the CREATOR. |
| myNodeInfo.creator = true; |
| myNodeInfo.creatorShellPid = CreatorShellPid; |
| myNodeInfo.creatorShellVerifier = CreatorShellVerifier; |
| if ((rc = Monitor->SendMPI((char *) &myNodeInfo, sizeof(nodeId_t), 0, |
| MON_XCHNG_DATA, joinComm_))) |
| HandleReintegrateError( rc, Reintegrate_Err9, -1, NULL, |
| true ); |
| |
| TEST_POINT( TP012_NODE_UP ); |
| |
| // Merge the inter-communicators obtained from the connect/accept |
| // between this new monitor and the creator monitor. |
| if ((rc = MPI_Intercomm_merge( joinComm_, 1, &intraCommCreatorMon ))) |
| HandleReintegrateError( rc, Reintegrate_Err2, -1, NULL, true ); |
| |
| MPI_Comm_set_errhandler( intraCommCreatorMon, MPI_ERRORS_RETURN ); |
| |
| nodeId_t *nodeInfo = new nodeId_t[cfgPNodes_]; |
| |
| mem_log_write(CMonLog::MON_REINTEGRATE_3, MyPNID); |
| |
| // Obtain node names & port numbers of existing monitors from |
| // the creator monitor. |
| if ((rc = Monitor->ReceiveMPI((char *)nodeInfo, sizeof(nodeId_t)*cfgPNodes_, |
| MPI_ANY_SOURCE, MON_XCHNG_DATA, joinComm_))) |
| HandleReintegrateError( rc, Reintegrate_Err3, -1, NULL, true ); |
| |
| if ( initProblem ) |
| { |
| // The monitor encountered an initialization error. Inform |
| // the creator monitor that the node is down. Then abort. |
| SendReIntegrateStatus( State_Down, initProblem ); |
| } |
| |
| // Connect to each of the other existing monitors and let them know |
| // we are the NEW monitor and reset the creator flag so they know they are |
| // not the creator monitor. |
| myNodeInfo.creator = false; |
| myNodeInfo.creatorShellPid = -1; |
| myNodeInfo.creatorShellVerifier = -1; |
| for (int i=0; i<cfgPNodes_; i++) |
| { |
| if (strcmp(nodeInfo[i].commPort, IntegratingMonitorPort) == 0) |
| { // Already connected to creator monitor |
| comms_[i] = intraCommCreatorMon; |
| otherMonRank_[i] = 0; |
| ++CurNodes; |
| |
| // Set bit indicating node is up |
| upNodes_.upNodes[i/64] |= (1ull << i); |
| |
| Node[i]->SetCommPort( IntegratingMonitorPort ); |
| Node[i]->SetState( State_Up ); |
| haveCreatorComm = true; |
| } |
| else if (nodeInfo[i].nodeName[0] != 0 |
| && nodeInfo[i].commPort[0] != 0) |
| { |
| if ( haveCreatorComm && i >= cfgPNodes_/2) |
| // Reintegration failure after connecting to half |
| // of existing monitors. |
| TEST_POINT( TP016_NODE_UP ); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf("%s@%d Attempting connection to node %d (%s), " |
| "port %s\n", method_name, __LINE__, i, |
| nodeInfo[i].nodeName, nodeInfo[i].commPort); |
| } |
| |
| mem_log_write(CMonLog::MON_REINTEGRATE_5, MyPNID, i); |
| |
| TEST_POINT( TP013_NODE_UP ); |
| |
| // Connect to existing monitor |
| if ((rc = MPI_Comm_connect( nodeInfo[i].commPort, |
| MPI_INFO_NULL, 0, MPI_COMM_SELF, |
| &interComm ))) |
| { |
| HandleReintegrateError( rc, Reintegrate_Err5, i, &nodeInfo[i], |
| false ); |
| SendReIntegrateStatus( State_Down, Reintegrate_Err14 ); |
| } |
| |
| MPI_Comm_set_errhandler( interComm, MPI_ERRORS_RETURN ); |
| |
| TEST_POINT( TP014_NODE_UP ); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf("%s@%d Connected to node %d (%s), sending id\n", |
| method_name, __LINE__,i,nodeInfo[i].nodeName); |
| } |
| |
| // Send this nodes name and port number so other monitor |
| // knows who we are. |
| if ((rc = Monitor->SendMPI((char *) &myNodeInfo, sizeof(nodeId_t), 0, |
| MON_XCHNG_DATA, interComm))) |
| { |
| HandleReintegrateError( rc, Reintegrate_Err4, i, &nodeInfo[i], |
| false ); |
| SendReIntegrateStatus( State_Down, Reintegrate_Err14 ); |
| } |
| |
| if ((rc = MPI_Intercomm_merge(interComm, 1, &intraComm))) |
| { |
| HandleReintegrateError( rc, Reintegrate_Err6, i, NULL, false ); |
| SendReIntegrateStatus( State_Down, Reintegrate_Err14 ); |
| } |
| |
| // Get acknowledgement that other monitor is ready to |
| // integrate this node. This is an interlock to avoid a |
| // race condition where the creator monitor could signal |
| // the monitors in the cluster to integrate the new node |
| // before one or more was ready to do the integration. |
| int readyFlag; |
| if ((rc = Monitor->ReceiveMPI((char *) &readyFlag, sizeof(readyFlag), |
| MPI_ANY_SOURCE, MON_XCHNG_DATA, |
| interComm))) |
| { |
| HandleReintegrateError( rc, Reintegrate_Err15, i, NULL, |
| false ); |
| SendReIntegrateStatus( State_Down, Reintegrate_Err14 ); |
| } |
| |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d - Received ready-flag from node %d (%s)\n", |
| method_name, __LINE__, i, |
| nodeInfo[i].nodeName); |
| } |
| |
| if ((rc = MPI_Comm_disconnect(&interComm))) |
| HandleReintegrateError( rc, Reintegrate_Err7, i, NULL, false ); |
| |
| MPI_Comm_set_errhandler(intraComm, MPI_ERRORS_RETURN); |
| |
| comms_[i] = intraComm; |
| otherMonRank_[i] = 0; |
| ++CurNodes; |
| Node[i]->SetSyncPort( nodeInfo[i].syncPort ); |
| Node[i]->SetState( State_Up ); |
| |
| // Set bit indicating node is up |
| upNodes_.upNodes[i/64] |= (1ull << i); |
| |
| mem_log_write(CMonLog::MON_REINTEGRATE_6, MyPNID, i); |
| } |
| else if ( i != MyPNID) |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf("%s@%d Connection to node %d not attempted, " |
| "no port information. nodeInfo[%d].port=%s, " |
| "IntegratingMonitorPort=%s\n", method_name, |
| __LINE__, i, i, nodeInfo[i].commPort, |
| IntegratingMonitorPort); |
| } |
| } |
| } |
| |
| if (trace_settings & (TRACE_SYNC | TRACE_RECOVERY | TRACE_INIT)) |
| { |
| for ( int i =0; i < MAX_NODE_MASKS ; i++ ) |
| { |
| trace_printf( "%s@%d Integrating node %s (pnid=%d) " |
| "sees set[%d]: %llx\n" |
| , method_name, __LINE__ |
| , MyNode->GetName(), MyPNID |
| , i, upNodes_.upNodes[i] ); |
| } |
| } |
| |
| mem_log_write(CMonLog::MON_REINTEGRATE_7, MyPNID); |
| |
| TEST_POINT( TP015_NODE_UP ); |
| |
| // Inform creator monitor that connections are complete and |
| // this monitor is ready to participate in "allgather" |
| // communications with the other monitors. |
| SendReIntegrateStatus( State_Up, 0 ); |
| |
| mem_log_write(CMonLog::MON_REINTEGRATE_8, MyPNID); |
| |
| MyNode->SetState( State_Merged ); |
| |
| delete[] nodeInfo; |
| |
| TRACE_EXIT; |
| } |
| |
| void CCluster::ReIntegrateSock( int initProblem ) |
| { |
| const char method_name[] = "CCluster::ReIntegrateSock"; |
| TRACE_ENTRY; |
| |
| bool haveCreatorSocket = false; |
| int rc; |
| int existingCommFd; |
| int existingSyncFd; |
| |
| // Set bit indicating my node is up |
| upNodes_.upNodes[MyPNID/64] |= (1ull << MyPNID); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf("%s@%d - Connect to creator monitor (port %s)\n", |
| method_name, __LINE__, IntegratingMonitorPort); |
| |
| mem_log_write(CMonLog::MON_REINTEGRATE_1, MyPNID); |
| |
| if (trace_settings & (TRACE_SYNC | TRACE_RECOVERY | TRACE_INIT)) |
| { |
| for ( int i =0; i < MAX_NODE_MASKS ; i++ ) |
| { |
| trace_printf( "%s@%d Integrating node %s (pnid=%d) " |
| "sees set[%d]: %llx\n" |
| , method_name, __LINE__ |
| , MyNode->GetName(), MyPNID |
| , i, upNodes_.upNodes[i] ); |
| } |
| } |
| |
| TEST_POINT( TP010_NODE_UP ); |
| |
| // Connect with my creator monitor |
| joinSock_ = Monitor->Connect( IntegratingMonitorPort ); |
| if ( joinSock_ < 0 ) |
| { |
| HandleReintegrateError( joinSock_, Reintegrate_Err1, -1, NULL, true ); |
| } |
| |
| mem_log_write(CMonLog::MON_REINTEGRATE_4, MyPNID); |
| |
| TEST_POINT( TP011_NODE_UP ); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf("%s@%d Connected to creator monitor, sending my node info\n", |
| method_name, __LINE__); |
| } |
| |
| // Send this node's name and port number so creator monitor |
| // knows who we are, and set flag to let creator monitor it is the CREATOR. |
| nodeId_t myNodeInfo; |
| strcpy(myNodeInfo.nodeName, MyNode->GetName()); |
| strcpy(myNodeInfo.commPort, MyNode->GetCommPort()); |
| strcpy(myNodeInfo.syncPort, MyNode->GetSyncPort()); |
| myNodeInfo.pnid = MyNode->GetPNid(); |
| myNodeInfo.creatorPNid = -1; |
| myNodeInfo.creator = true; |
| myNodeInfo.creatorShellPid = CreatorShellPid; |
| myNodeInfo.creatorShellVerifier = CreatorShellVerifier; |
| rc = Monitor->SendSock( (char *) &myNodeInfo |
| , sizeof(nodeId_t) |
| , joinSock_ ); |
| if ( rc ) |
| { |
| HandleReintegrateError( rc, Reintegrate_Err9, -1, NULL, true ); |
| } |
| |
| TEST_POINT( TP012_NODE_UP ); |
| |
| mem_log_write(CMonLog::MON_REINTEGRATE_3, MyPNID); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf("%s@%d Getting all node info from creator monitor\n", |
| method_name, __LINE__); |
| } |
| |
| // Obtain node names & port numbers of existing monitors from |
| // the creator monitor. |
| nodeId_t *nodeInfo; |
| size_t nodeInfoSize = (sizeof(nodeId_t) * cfgPNodes_); |
| nodeInfo = (nodeId_t *) new char[nodeInfoSize]; |
| rc = Monitor->ReceiveSock( (char *)nodeInfo |
| , nodeInfoSize |
| , joinSock_ ); |
| if ( rc ) |
| { |
| HandleReintegrateError( rc, Reintegrate_Err3, -1, NULL, true ); |
| } |
| |
| if ( initProblem ) |
| { |
| // The monitor encountered an initialization error. Inform |
| // the creator monitor that the node is down. Then abort. |
| SendReIntegrateStatus( State_Down, initProblem ); |
| } |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| for (int i=0; i<cfgPNodes_; i++) |
| { |
| trace_printf( "%s@%d - Node info for pnid=%d:\n" |
| " nodeName=%s\n" |
| " commPort=%s\n" |
| " syncPort=%s\n" |
| " creatorPNid=%d\n" |
| , method_name, __LINE__ |
| , nodeInfo[i].pnid |
| , nodeInfo[i].nodeName |
| , nodeInfo[i].commPort |
| , nodeInfo[i].syncPort |
| , nodeInfo[i].creatorPNid ); |
| } |
| } |
| // Connect to each of the other existing monitors and let them know |
| // we are the NEW monitor and reset the creator flag so they know they are |
| // not the creator monitor. |
| myNodeInfo.creator = false; |
| myNodeInfo.creatorShellPid = -1; |
| myNodeInfo.creatorShellVerifier = -1; |
| for (int i=0; i<cfgPNodes_; i++) |
| { |
| if ( nodeInfo[i].creatorPNid != -1 && nodeInfo[i].creatorPNid == i ) |
| { |
| // Get acknowledgement that creator monitor is ready to |
| // integrate this node. |
| int creatorpnid = -1; |
| rc = Monitor->ReceiveSock( (char *) &creatorpnid |
| , sizeof(creatorpnid) |
| , joinSock_ ); |
| if ( rc || creatorpnid != nodeInfo[i].creatorPNid ) |
| { |
| HandleReintegrateError( rc, Reintegrate_Err15, i, NULL, |
| false ); |
| SendReIntegrateStatus( State_Down, Reintegrate_Err14 ); |
| } |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d - Received ready indication from creator " |
| "node %d nodeInfo[%d].nodeName=%s\n" |
| , method_name, __LINE__ |
| , creatorpnid, i , nodeInfo[i].nodeName); |
| } |
| |
| otherMonRank_[i] = 0; |
| ++CurNodes; |
| |
| Node[i]->SetCommPort( nodeInfo[i].commPort ); |
| Node[i]->SetSyncPort( nodeInfo[i].syncPort ); |
| Node[i]->SetState( State_Up ); |
| |
| // Tell creator we are ready to accept its connection |
| int mypnid = MyPNID; |
| rc = Monitor->SendSock( (char *) &mypnid |
| , sizeof(mypnid) |
| , joinSock_ ); |
| if ( rc ) |
| { |
| HandleReintegrateError( rc, Reintegrate_Err4, i, &nodeInfo[i], |
| false ); |
| SendReIntegrateStatus( State_Down, Reintegrate_Err14 ); |
| } |
| |
| // Connect to creator monitor |
| existingSyncFd = AcceptSyncSock(); |
| if ( existingSyncFd < 0 ) |
| { |
| HandleReintegrateError( rc, Reintegrate_Err5, i, &nodeInfo[i], |
| false ); |
| SendReIntegrateStatus( State_Down, Reintegrate_Err14 ); |
| } |
| socks_[i] = existingSyncFd; |
| // Set bit indicating node is up |
| upNodes_.upNodes[i/64] |= (1ull << i); |
| |
| if (trace_settings & (TRACE_RECOVERY | TRACE_INIT)) |
| { |
| trace_printf( "%s@%d Connected to creator node %d (%s)\n" |
| , method_name, __LINE__ |
| , nodeInfo[i].creatorPNid |
| , nodeInfo[i].nodeName ); |
| trace_printf( "%s@%d socks_[%d]=%d\n" |
| , method_name, __LINE__ |
| , i, socks_[i]); |
| for ( int i =0; i < MAX_NODE_MASKS ; i++ ) |
| { |
| trace_printf( "%s@%d Integrating node %s (pnid=%d) " |
| "sees set[%d]: %llx\n" |
| , method_name, __LINE__ |
| , MyNode->GetName(), MyPNID |
| , i, upNodes_.upNodes[i] ); |
| } |
| } |
| |
| haveCreatorSocket = true; |
| } |
| else if ( nodeInfo[i].nodeName[0] != 0 && nodeInfo[i].commPort[0] != 0 ) |
| { |
| if ( haveCreatorSocket && i >= cfgPNodes_/2) |
| // Reintegration failure after connecting to half |
| // of existing monitors. |
| TEST_POINT( TP016_NODE_UP ); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf("%s@%d Attempting connection to node %d (%s), " |
| "port %s\n", method_name, __LINE__, i, |
| nodeInfo[i].nodeName, nodeInfo[i].commPort); |
| } |
| |
| mem_log_write(CMonLog::MON_REINTEGRATE_5, MyPNID, i); |
| |
| TEST_POINT( TP013_NODE_UP ); |
| |
| // Connect to existing monitor |
| existingCommFd = Monitor->Connect( nodeInfo[i].commPort ); |
| if ( existingCommFd < 0 ) |
| { |
| HandleReintegrateError( rc, Reintegrate_Err5, i, &nodeInfo[i], |
| false ); |
| SendReIntegrateStatus( State_Down, Reintegrate_Err14 ); |
| } |
| |
| TEST_POINT( TP014_NODE_UP ); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf("%s@%d Connected to node %d (%s), sending my node name\n", |
| method_name, __LINE__,i,nodeInfo[i].nodeName); |
| } |
| |
| // Send this nodes name and port number so other monitor |
| // knows who we are. |
| rc = Monitor->SendSock( (char *) &myNodeInfo |
| , sizeof(nodeId_t) |
| , existingCommFd ); |
| if ( rc ) |
| { |
| HandleReintegrateError( rc, Reintegrate_Err4, i, &nodeInfo[i], |
| false ); |
| SendReIntegrateStatus( State_Down, Reintegrate_Err14 ); |
| } |
| |
| // Get acknowledgement that other monitor is ready to |
| // integrate this node. This is an interlock to avoid a |
| // race condition where the creator monitor could signal |
| // the monitors in the cluster to integrate the new node |
| // before one or more was ready to do the integration. |
| int remotepnid = -1; |
| rc = Monitor->ReceiveSock( (char *) &remotepnid |
| , sizeof(remotepnid) |
| , existingCommFd ); |
| if ( rc || remotepnid != i ) |
| { |
| HandleReintegrateError( rc, Reintegrate_Err15, i, NULL, |
| false ); |
| SendReIntegrateStatus( State_Down, Reintegrate_Err14 ); |
| } |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d - Received ready indication from " |
| "node %d nodeInfo[%d].nodeName=%s\n" |
| , method_name, __LINE__ |
| , remotepnid, i , nodeInfo[i].nodeName); |
| } |
| |
| otherMonRank_[i] = 0; |
| ++CurNodes; |
| Node[i]->SetCommPort( nodeInfo[i].commPort ); |
| Node[i]->SetSyncPort( nodeInfo[i].syncPort ); |
| Node[i]->SetState( State_Up ); |
| |
| // Connect to existing monitor |
| existingSyncFd = AcceptSyncSock(); |
| if ( existingSyncFd < 0 ) |
| { |
| HandleReintegrateError( rc, Reintegrate_Err5, i, &nodeInfo[i], |
| false ); |
| SendReIntegrateStatus( State_Down, Reintegrate_Err14 ); |
| } |
| socks_[i] = existingSyncFd; |
| |
| // Set bit indicating node is up |
| upNodes_.upNodes[i/64] |= (1ull << i); |
| |
| if (trace_settings & (TRACE_RECOVERY | TRACE_INIT)) |
| { |
| trace_printf( "%s@%d socks_[%d]=%d\n" |
| , method_name, __LINE__ |
| , i, socks_[i]); |
| for ( int i =0; i < MAX_NODE_MASKS ; i++ ) |
| { |
| trace_printf( "%s@%d Integrating node %s (pnid=%d) " |
| "sees set[%d]: %llx\n" |
| , method_name, __LINE__ |
| , MyNode->GetName(), MyPNID |
| , i, upNodes_.upNodes[i] ); |
| } |
| } |
| |
| mem_log_write(CMonLog::MON_REINTEGRATE_6, MyPNID, i); |
| } |
| else if ( i != MyPNID) |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d Connection to node %d not attempted, " |
| "no port information.\n" |
| "nodeInfo[%d].commPort=%s\n" |
| "nodeInfo[%d].syncPort=%s\n" |
| "IntegratingMonitorPort=%s\n" |
| , method_name, __LINE__ |
| , i |
| , i, nodeInfo[i].commPort |
| , i, nodeInfo[i].syncPort |
| , IntegratingMonitorPort); |
| } |
| } |
| } |
| |
| if (trace_settings & (TRACE_RECOVERY | TRACE_INIT)) |
| { |
| for (int i=0; i<cfgPNodes_; i++) |
| { |
| trace_printf( "%s@%d - Node info for pnid=%d (%s)\n" |
| " Node[%d] commPort=%s\n" |
| " Node[%d] syncPort=%s\n" |
| " Node[%d] creatorPNid=%d\n" |
| , method_name, __LINE__ |
| , Node[i]->GetPNid() |
| , Node[i]->GetName() |
| , i, Node[i]->GetCommPort() |
| , i, Node[i]->GetSyncPort() |
| , i, nodeInfo[i].creatorPNid); |
| } |
| for ( int i =0; i < cfgPNodes_; i++ ) |
| { |
| trace_printf( "%s@%d socks_[%d]=%d\n" |
| , method_name, __LINE__ |
| , i, socks_[i]); |
| } |
| for ( int i =0; i < MAX_NODE_MASKS ; i++ ) |
| { |
| trace_printf( "%s@%d Integrating node %s (pnid=%d) " |
| "sees set[%d]: %llx\n" |
| , method_name, __LINE__ |
| , MyNode->GetName(), MyPNID |
| , i, upNodes_.upNodes[i] ); |
| } |
| } |
| |
| mem_log_write(CMonLog::MON_REINTEGRATE_7, MyPNID); |
| |
| TEST_POINT( TP015_NODE_UP ); |
| |
| // Inform creator monitor that connections are complete and |
| // this monitor is ready to participate in "allgather" |
| // communications with the other monitors. |
| SendReIntegrateStatus( State_Up, 0 ); |
| |
| mem_log_write(CMonLog::MON_REINTEGRATE_8, MyPNID); |
| |
| MyNode->SetState( State_Merged ); |
| |
| delete[] nodeInfo; |
| |
| TRACE_EXIT; |
| } |
| |
| // Save information about a new communicator for a node that is reintegrating |
| void CCluster::addNewComm(int pnid, int otherRank, MPI_Comm comm) |
| { |
| const char method_name[] = "CCluster::addNewComm"; |
| TRACE_ENTRY; |
| |
| if (trace_settings & TRACE_RECOVERY) |
| { |
| trace_printf("%s@%d - saving communicator for pnid %d\n", |
| method_name, __LINE__, pnid); |
| } |
| |
| // Insert info for new comm into list |
| commInfo_t commInfo = {pnid, otherRank, comm, -1, {0, 0}}; |
| clock_gettime(CLOCK_REALTIME, &commInfo.ts); |
| |
| newCommsLock_.lock(); |
| newComms_.push_back( commInfo ); |
| newCommsLock_.unlock(); |
| |
| TRACE_EXIT; |
| } |
| |
| // A node is reintegrating. Add the communicator for the node to the set of |
| // communicators used by "Allgather". |
| void CCluster::setNewComm( int pnid ) |
| { |
| const char method_name[] = "CCluster::setNewComm"; |
| TRACE_ENTRY; |
| |
| newComms_t::iterator it; |
| bool foundComm = false; |
| |
| if ( comms_[pnid] != MPI_COMM_NULL ) |
| { // Unexpectedly already have a communicator for this node |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), "[%s] Unexpectedly already have a " |
| "communicator for node %d\n", method_name, pnid); |
| mon_log_write(MON_CLUSTER_SETNEWCOMM_1, SQ_LOG_ERR, buf); |
| |
| MPI_Comm_free( &comms_[pnid] ); |
| if ( CommType == CommType_Sockets ) |
| { |
| shutdown( socks_[pnid], SHUT_RDWR); |
| close( socks_[pnid] ); |
| socks_[pnid] = -1; |
| } |
| } |
| |
| newCommsLock_.lock(); |
| for ( it = newComms_.begin(); it != newComms_.end(); ) |
| { |
| if ( it->pnid == pnid ) |
| { |
| if ( comms_[pnid] != MPI_COMM_NULL ) |
| { // Found another communicator for the specified node. |
| // Disconnect from the previous one. It must be a |
| // stale leftover from a previous reintegration |
| // attempt for the node. |
| if (trace_settings & TRACE_RECOVERY) |
| { |
| trace_printf("%s@%d - discarding stale communicator for " |
| "pnid %d\n", method_name, __LINE__, pnid); |
| } |
| |
| MPI_Comm_free( &comms_[pnid] ); |
| if ( CommType == CommType_Sockets ) |
| { |
| shutdown( socks_[pnid], SHUT_RDWR); |
| close( socks_[pnid] ); |
| socks_[pnid] = -1; |
| } |
| --CurNodes; |
| } |
| |
| if (trace_settings & TRACE_RECOVERY) |
| { |
| trace_printf("%s@%d - setting new communicator for pnid %d, " |
| "otherRank=%d\n", |
| method_name, __LINE__, it->pnid, it->otherRank); |
| } |
| |
| comms_[it->pnid] = it->comm; |
| otherMonRank_[it->pnid] = it->otherRank; |
| ++CurNodes; |
| // Set bit indicating node is up |
| upNodes_.upNodes[it->pnid/64] |= (1ull << it->pnid); |
| |
| // Delete current list element and advance to next one |
| it = newComms_.erase ( it ); |
| |
| foundComm = true; |
| } |
| else |
| { // Advance to next list element |
| ++it; |
| } |
| } |
| newCommsLock_.unlock(); |
| |
| if ( !foundComm ) |
| { // We have no communicator for the specified node. |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), "[%s] Could not find a communicator for " |
| "node %d\n", method_name, pnid); |
| mon_log_write(MON_CLUSTER_SETNEWCOMM_2, SQ_LOG_ERR, buf); |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| // Save information about a new socket for a node that is reintegrating |
| void CCluster::addNewSock(int pnid, int otherRank, int sockFd) |
| { |
| const char method_name[] = "CCluster::addNewSock"; |
| TRACE_ENTRY; |
| |
| if (trace_settings & TRACE_RECOVERY) |
| { |
| trace_printf("%s@%d - saving socket for pnid %d\n", |
| method_name, __LINE__, pnid); |
| } |
| |
| // Insert info for new comm into list |
| commInfo_t commInfo = {pnid, otherRank, MPI_COMM_NULL, sockFd, {0, 0}}; |
| clock_gettime(CLOCK_REALTIME, &commInfo.ts); |
| |
| newCommsLock_.lock(); |
| newComms_.push_back( commInfo ); |
| newCommsLock_.unlock(); |
| |
| TRACE_EXIT; |
| } |
| |
| // A node is reintegrating. Add the socket for the node to the set of |
| // communicators used by "Allgather". |
| void CCluster::setNewSock( int pnid ) |
| { |
| const char method_name[] = "CCluster::setNewSock"; |
| TRACE_ENTRY; |
| |
| newComms_t::iterator it; |
| bool foundSocket = false; |
| |
| if ( socks_[pnid] != -1 ) |
| { // Unexpectedly already have a communicator for this node |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), "[%s] Unexpectedly already have a " |
| "socket for node %d\n", method_name, pnid); |
| mon_log_write(MON_CLUSTER_SETNEWSOCK_1, SQ_LOG_ERR, buf); |
| |
| shutdown( socks_[pnid], SHUT_RDWR); |
| close( socks_[pnid] ); |
| socks_[pnid] = -1; |
| } |
| |
| newCommsLock_.lock(); |
| for ( it = newComms_.begin(); it != newComms_.end(); ) |
| { |
| if ( it->pnid == pnid ) |
| { |
| if ( socks_[pnid] != -1 ) |
| { // Found another socket for the specified node. |
| // Disconnect from the previous one. It must be a |
| // stale leftover from a previous reintegration |
| // attempt for the node. |
| if (trace_settings & TRACE_RECOVERY) |
| { |
| trace_printf("%s@%d - discarding stale communicator for " |
| "pnid %d\n", method_name, __LINE__, pnid); |
| } |
| |
| shutdown( socks_[pnid], SHUT_RDWR); |
| close( socks_[pnid] ); |
| socks_[pnid] = -1; |
| --CurNodes; |
| } |
| |
| if (trace_settings & TRACE_RECOVERY) |
| { |
| trace_printf("%s@%d - setting new communicator for pnid %d, " |
| "otherRank=%d\n", |
| method_name, __LINE__, it->pnid, it->otherRank); |
| } |
| |
| socks_[it->pnid] = it->socket; |
| otherMonRank_[it->pnid] = it->otherRank; |
| ++CurNodes; |
| // Set bit indicating node is up |
| upNodes_.upNodes[it->pnid/64] |= (1ull << it->pnid); |
| |
| // Delete current list element and advance to next one |
| it = newComms_.erase ( it ); |
| |
| foundSocket = true; |
| } |
| else |
| { // Advance to next list element |
| ++it; |
| } |
| } |
| newCommsLock_.unlock(); |
| |
| if ( !foundSocket ) |
| { // We have no communicator for the specified node. |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), "[%s] Could not find a socket for " |
| "node %d\n", method_name, pnid); |
| mon_log_write(MON_CLUSTER_SETNEWSOCK_2, SQ_LOG_ERR, buf); |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| int CCluster::Allgather( int nbytes, void *sbuf, char *rbuf, int tag, MPI_Status *stats ) |
| { |
| const char method_name[] = "CCluster::Allgather"; |
| TRACE_ENTRY; |
| |
| int err = 0; |
| |
| switch( CommType ) |
| { |
| case CommType_InfiniBand: |
| err = AllgatherIB( nbytes, sbuf, rbuf, tag, stats ); |
| break; |
| case CommType_Sockets: |
| err = AllgatherSock( nbytes, sbuf, rbuf, tag, stats ); |
| break; |
| default: |
| // Programmer bonehead! |
| MPI_Abort(MPI_COMM_SELF,99); |
| } |
| |
| TRACE_EXIT; |
| return err; |
| } |
| |
| int CCluster::AllgatherIB( int nbytes, void *sbuf, char *rbuf, int tag, MPI_Status *stats ) |
| { |
| const char method_name[] = "CCluster::AllgatherIB"; |
| TRACE_ENTRY; |
| |
| int e; |
| int err = 0; |
| |
| MPI_Request r[2*cfgPNodes_]; |
| MPI_Status s[2*cfgPNodes_]; |
| for ( int i = 0; i < 2*cfgPNodes_; i++ ) |
| { |
| s[i].MPI_ERROR = MPI_SUCCESS; |
| r[i] = MPI_REQUEST_NULL; |
| } |
| |
| char *cp = rbuf; |
| for ( int i = 0; i < cfgPNodes_; i++ ) |
| { |
| if ( comms_[i] != MPI_COMM_NULL && otherMonRank_[i] != -1 ) |
| { |
| e = MPI_Send_init( sbuf, nbytes, MPI_CHAR, otherMonRank_[i], tag, |
| comms_[i], &r[i] ); |
| if ( e != MPI_SUCCESS ) |
| { |
| MPI_Error_class( e, &err ); |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], Comunication error with pnid=%d (%s), " |
| "MPI_Send_init() error=%s (%d)\n" |
| , method_name, i, Node[i]->GetName() |
| , ErrorMsg(e), e ); |
| mon_log_write(MON_CLUSTER_ALLGATHERIB_1, SQ_LOG_ERR, buf); |
| goto early_exit; |
| } |
| |
| e = MPI_Recv_init( cp, CommBufSize, MPI_CHAR, otherMonRank_[i], tag, |
| comms_[i], &r[i+cfgPNodes_] ); |
| if ( e != MPI_SUCCESS ) |
| { |
| MPI_Error_class( e, &err ); |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], Comunication error with pnid=%d (%s), " |
| "MPI_Recv_init() error=%s (%d)\n" |
| , method_name, i, Node[i]->GetName() |
| , ErrorMsg(e), e ); |
| mon_log_write(MON_CLUSTER_ALLGATHERIB_2, SQ_LOG_ERR, buf); |
| goto early_exit; |
| } |
| } |
| cp += CommBufSize; |
| } |
| for ( int i = 0; i < 2*cfgPNodes_; i++ ) |
| { |
| if ( r[i] == MPI_REQUEST_NULL ) continue; |
| e = MPI_Start( &r[i] ); |
| if ( e != MPI_SUCCESS ) |
| { |
| MPI_Error_class( e, &err ); |
| char buf[MON_STRING_BUF_SIZE]; |
| int pnid = (i < cfgPNodes_) ? i : (i - cfgPNodes_); |
| snprintf( buf, sizeof(buf) |
| , "[%s], Comunication error with pnid=%d (%s), " |
| "MPI_Start() error=%s (%d)\n" |
| , method_name, pnid, Node[pnid]->GetName() |
| , ErrorMsg(e), e ); |
| mon_log_write(MON_CLUSTER_ALLGATHERIB_3, SQ_LOG_ERR, buf); |
| goto early_exit; |
| } |
| } |
| |
| inBarrier_ = true; |
| if (sonar_verify_state(SONAR_ENABLED | SONAR_MONITOR_ENABLED)) |
| MonStats->BarrierWaitIncr(); |
| |
| e = MPI_Waitall( cfgPNodes_*2, r, s ); |
| if ( e != MPI_SUCCESS ) |
| { |
| MPI_Error_class( e, &err ); |
| if ( err != MPI_ERR_IN_STATUS ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf), "[%s], MPI_Waitall() error=%s (%d)\n" |
| , method_name, ErrorMsg(e), e ); |
| mon_log_write(MON_CLUSTER_ALLGATHERIB_4, SQ_LOG_ERR, buf); |
| inBarrier_ = false; |
| goto early_exit; |
| } |
| } |
| |
| if (sonar_verify_state(SONAR_ENABLED | SONAR_MONITOR_ENABLED)) |
| MonStats->BarrierWaitDecr(); |
| inBarrier_ = false; |
| |
| for ( int i = 0; i < cfgPNodes_; i++ ) |
| { |
| stats[i] = s[i+cfgPNodes_]; |
| } |
| if ( e == MPI_SUCCESS ) |
| { |
| err = MPI_SUCCESS; |
| goto early_exit; |
| } |
| |
| for ( int i = 0; i < cfgPNodes_; i++ ) |
| { |
| if ( s[i].MPI_ERROR != MPI_SUCCESS && // send |
| s[i+cfgPNodes_].MPI_ERROR == MPI_SUCCESS ) // receive |
| { |
| stats[i].MPI_ERROR = s[i].MPI_ERROR; |
| } |
| } |
| |
| early_exit: |
| |
| for ( int i = 0; i < 2*cfgPNodes_; i++ ) |
| { |
| if ( r[i] != MPI_REQUEST_NULL ) |
| { |
| MPI_Request_free( &r[i] ); |
| } |
| } |
| |
| barrierCount_++; |
| |
| TRACE_EXIT; |
| return err; |
| } |
| |
| int CCluster::AllgatherSock( int nbytes, void *sbuf, char *rbuf, int tag, MPI_Status *stats ) |
| { |
| const char method_name[] = "CCluster::AllgatherSock"; |
| TRACE_ENTRY; |
| |
| static int hdrSize = Nodes->GetSyncHdrSize( ); |
| int err = MPI_SUCCESS; |
| typedef struct |
| { |
| int p_sent; |
| int p_received; |
| int p_n2recv; |
| bool p_sending; |
| bool p_receiving; |
| char *p_buff; |
| } peer_t; |
| peer_t p[cfgPNodes_]; |
| memset( p, 0, sizeof(p) ); |
| tag = 0; // make compiler happy |
| |
| int nsent = 0, nrecv = 0; |
| for ( int iPeer = 0; iPeer < cfgPNodes_; iPeer++ ) |
| { |
| peer_t *peer = &p[iPeer]; |
| stats[iPeer].MPI_ERROR = MPI_SUCCESS; |
| stats[iPeer].count = 0; |
| if ( iPeer == MyPNID || socks_[iPeer] == -1 ) |
| { |
| peer->p_sending = peer->p_receiving = false; |
| nsent++; |
| nrecv++; |
| } |
| else |
| { |
| peer->p_sending = peer->p_receiving = true; |
| peer->p_sent = peer->p_received = 0; |
| peer->p_n2recv = -1; |
| peer->p_buff = ((char *) rbuf) + (iPeer * CommBufSize); |
| struct epoll_event event; |
| event.data.fd = socks_[iPeer]; |
| event.events = EPOLLIN | EPOLLOUT | EPOLLET; |
| EpollCtl( epollFD_, EPOLL_CTL_ADD, socks_[iPeer], &event ); |
| } |
| } |
| |
| inBarrier_ = true; |
| MonStats->BarrierWaitIncr( ); |
| |
| // do the work |
| struct epoll_event events[2*cfgPNodes_ + 1]; |
| while ( 1 ) |
| { |
| int maxEvents = 2*cfgPNodes_ - nsent - nrecv; |
| if ( maxEvents == 0 ) break; |
| int nw; |
| while ( 1 ) |
| { |
| nw = epoll_wait( epollFD_, events, maxEvents, -1 ); |
| if ( nw >= 0 || errno != EINTR ) break; |
| } |
| if ( nw < 0 ) |
| { |
| char ebuff[256]; |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf), "[%s@%d] epoll_wait(%d, %d) error: %s\n", |
| method_name, __LINE__, epollFD_, maxEvents, |
| strerror_r( errno, ebuff, 256 ) ); |
| mon_log_write( MON_CLUSTER_ALLGATHERSOCK_1, SQ_LOG_CRIT, buf ); |
| MPI_Abort( MPI_COMM_SELF,99 ); |
| } |
| for ( int iEvent = 0; iEvent < nw; iEvent++ ) |
| { |
| bool stateChange = false; |
| int fd = events[iEvent].data.fd; |
| int iPeer; |
| for ( iPeer = 0; iPeer < cfgPNodes_; iPeer++ ) |
| { |
| if ( events[iEvent].data.fd == socks_[iPeer] ) break; |
| } |
| if ( iPeer < 0 || iPeer >= cfgPNodes_ || iPeer == MyPNID |
| || socks_[iPeer] == -1 |
| || (!p[iPeer].p_sending && !p[iPeer].p_receiving) ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf), "[%s@%d] invalid peer %d\n", |
| method_name, __LINE__, iPeer ); |
| mon_log_write( MON_CLUSTER_ALLGATHERSOCK_2, SQ_LOG_CRIT, buf ); |
| MPI_Abort( MPI_COMM_SELF,99 ); |
| } |
| peer_t *peer = &p[iPeer]; |
| if ( (events[iEvent].events & EPOLLERR) || |
| (events[iEvent].events & EPOLLHUP) || |
| ( !(events[iEvent].events & (EPOLLIN|EPOLLOUT))) ) |
| { |
| // An error has occurred on this fd, or the socket is not |
| // ready for reading nor writing |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s@%d] Error: peer=%d, events[%d].data.fd=%d, event[%d]=%s\n" |
| , method_name, __LINE__ |
| , iPeer |
| , iEvent |
| , events[iEvent].data.fd |
| , iEvent |
| , EpollEventString(events[iEvent].events) ); |
| mon_log_write( MON_CLUSTER_ALLGATHERSOCK_3, SQ_LOG_CRIT, buf ); |
| stats[iPeer].MPI_ERROR = MPI_ERR_EXITED; |
| err = MPI_ERR_IN_STATUS; |
| if ( peer->p_sending ) |
| { |
| peer->p_sending = false; |
| nsent++; |
| } |
| if ( peer->p_receiving ) |
| { |
| peer->p_receiving = false; |
| nrecv++; |
| } |
| stateChange = 1; |
| goto early_exit; |
| } |
| if ( peer->p_receiving |
| && events[iEvent].events & EPOLLIN ) |
| { |
| int eagain_ok = 0; |
| read_again: |
| char *r = &peer->p_buff[peer->p_received]; |
| int n2get; |
| if ( peer->p_received >= hdrSize ) |
| { |
| n2get = peer->p_n2recv; |
| } |
| else |
| { |
| n2get = hdrSize - peer->p_received; |
| } |
| int nr; |
| while ( 1 ) |
| { |
| nr = recv( fd, r, n2get, 0 ); |
| if ( nr >= 0 || errno == EINTR ) break; |
| } |
| if ( nr < 0 ) |
| { |
| if ( nr < 0 && eagain_ok && errno == EAGAIN ) |
| { |
| // do nothing |
| } |
| else |
| { |
| // error, down socket |
| int err = errno; |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s@%d] recv[%d](%d) error %d (%s)\n" |
| , method_name, __LINE__ |
| , iPeer, nr , err, strerror(err) ); |
| mon_log_write( MON_CLUSTER_ALLGATHERSOCK_4, SQ_LOG_CRIT, buf ); |
| peer->p_receiving = false; |
| nrecv++; |
| if ( peer->p_sending ) |
| { |
| peer->p_sending = false; |
| nsent++; |
| } |
| stats[iPeer].MPI_ERROR = MPI_ERR_EXITED; |
| err = MPI_ERR_IN_STATUS; |
| stateChange = true; |
| } |
| } |
| else |
| { |
| peer->p_received += nr; |
| if ( peer->p_received < hdrSize ) |
| { |
| // do nothing |
| } |
| else |
| { |
| if ( peer->p_received == hdrSize ) |
| { |
| // got the complete header, get buffer size |
| struct sync_buffer_def *sb; |
| sb = (struct sync_buffer_def *)peer->p_buff; |
| peer->p_n2recv = sb->msgInfo.msg_offset; |
| if ( peer->p_n2recv ) |
| { |
| eagain_ok = 1; |
| goto read_again; |
| } |
| } |
| else |
| { |
| // reading buffer, update counters |
| peer->p_n2recv -= nr; |
| } |
| if ( peer->p_n2recv < 0 ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf), |
| "[%s@%d] error n2recv %d\n", |
| method_name, __LINE__, peer->p_n2recv ); |
| mon_log_write( MON_CLUSTER_ALLGATHERSOCK_5, SQ_LOG_CRIT, buf ); |
| MPI_Abort( MPI_COMM_SELF,99 ); |
| } |
| if ( peer->p_n2recv == 0 ) |
| { |
| // this buffer is done |
| peer->p_receiving = false; |
| nrecv++; |
| stats[iPeer].count = peer->p_received; |
| stateChange = true; |
| } |
| } |
| } |
| } |
| if ( peer->p_sending |
| && events[iEvent].events & EPOLLOUT ) |
| { |
| char *s = &((char *)sbuf)[peer->p_sent]; |
| int n2send = nbytes - peer->p_sent; |
| int ns; |
| while ( 1 ) |
| { |
| ns = send( fd, s, n2send, 0 ); |
| if ( ns >= 0 || errno != EINTR ) break; |
| } |
| if ( ns < 0 ) |
| { |
| // error, down socket |
| int err = errno; |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s@%d] send[%d](%d) error=%d (%s)\n" |
| , method_name, __LINE__ |
| , iPeer, ns, err, strerror(err) ); |
| mon_log_write( MON_CLUSTER_ALLGATHERSOCK_6, SQ_LOG_CRIT, buf ); |
| peer->p_sending = false; |
| nsent++; |
| if ( peer->p_receiving ) |
| { |
| peer->p_receiving = false; |
| nrecv++; |
| } |
| stats[iPeer].MPI_ERROR = MPI_ERR_EXITED; |
| err = MPI_ERR_IN_STATUS; |
| stateChange = true; |
| } |
| else |
| { |
| peer->p_sent += ns; |
| if ( peer->p_sent == nbytes ) |
| { |
| // finished sending to this destination |
| peer->p_sending = false; |
| nsent++; |
| stateChange = true; |
| } |
| } |
| } |
| early_exit: |
| if ( stateChange ) |
| { |
| struct epoll_event event; |
| event.data.fd = socks_[iPeer]; |
| int op = 0; |
| if ( !peer->p_sending && !peer->p_receiving ) |
| { |
| op = EPOLL_CTL_DEL; |
| event.events = 0; |
| } |
| else if ( peer->p_sending ) |
| { |
| op = EPOLL_CTL_MOD; |
| event.events = EPOLLOUT | EPOLLET; |
| } |
| else if ( peer->p_receiving ) |
| { |
| op = EPOLL_CTL_MOD; |
| event.events = EPOLLIN | EPOLLET; |
| } |
| if ( op == EPOLL_CTL_DEL || op == EPOLL_CTL_MOD ) |
| { |
| EpollCtl( epollFD_, op, fd, &event ); |
| } |
| } |
| } |
| } |
| |
| MonStats->BarrierWaitDecr( ); |
| inBarrier_ = false; |
| |
| barrierCount_++; |
| |
| TRACE_EXIT; |
| return err; |
| } |
| |
| // When we get a communication error for a point-to-point monitor communicator |
| // verify that the other nodes in the cluster also lost communications |
| // with that monitor. If all nodes concur we consider that monitor |
| // down. |
| void CCluster::ValidateClusterState( cluster_state_def_t nodestate[], |
| bool haveDivergence) |
| { |
| const char method_name[] = "CCluster::ValidateClusterState"; |
| |
| exitedMons_t::iterator it; |
| upNodes_t nodeMask; |
| |
| for ( it = exitedMons_.begin(); it != exitedMons_.end(); ) |
| { |
| if (trace_settings & (TRACE_SYNC | TRACE_RECOVERY | TRACE_INIT)) |
| { |
| trace_printf("%s@%d checking exited pnid=%d, detecting pnid=%d, seqNum=%lld" |
| " (current seqNum_=%lld)\n", method_name, __LINE__, |
| it->exitedPnid, it->detectingPnid, it->seqNum, seqNum_); |
| } |
| |
| if ( seqNum_ >= (it->seqNum + 2) ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf), "[%s] Validating exited node %d, " |
| "detected by node %d at seq #%lld " |
| "(current seq # is %lld).\n", |
| method_name, it->exitedPnid, it->detectingPnid, |
| it->seqNum, seqNum_); |
| mon_log_write(MON_CLUSTER_VALIDATE_STATE_1, SQ_LOG_ERR, buf); |
| |
| int concurringNodes = 0; |
| |
| // Check if all active nodes see the node as down. |
| nodeMask.upNodes[it->exitedPnid/64] = 1ull << it->exitedPnid; |
| string setSeesUp; |
| string setSeesDown; |
| char nodeX[10]; |
| |
| for (int pnid = 0; pnid < cfgPNodes_; ++pnid) |
| { |
| if ( nodestate[pnid].seq_num != 0 ) |
| { // There is valid nodestate info from node "pnid" |
| if ((nodestate[pnid].nodeMask.upNodes[pnid/64] & |
| nodeMask.upNodes[pnid/64]) == 0) |
| { // Node "pnid" sees the node as down |
| |
| // temp trace |
| if (trace_settings & (TRACE_SYNC | TRACE_RECOVERY | TRACE_INIT)) |
| { |
| trace_printf("%s@%d node %d concurs that node %d " |
| "is down\n", method_name, __LINE__, |
| pnid, it->exitedPnid); |
| } |
| |
| snprintf(nodeX, sizeof(nodeX), "%d, ", pnid); |
| setSeesDown.append(nodeX); |
| |
| ++concurringNodes; |
| } |
| else |
| { |
| // temp trace |
| if (trace_settings & (TRACE_SYNC | TRACE_RECOVERY | TRACE_INIT)) |
| { |
| trace_printf("%s@%d node %d says node %d is up\n", |
| method_name, __LINE__, pnid, |
| it->exitedPnid); |
| } |
| |
| snprintf(nodeX, sizeof(nodeX), "%d, ", pnid); |
| setSeesUp.append(nodeX); |
| |
| } |
| } |
| else |
| { |
| // temp trace |
| if (trace_settings & (TRACE_SYNC | TRACE_RECOVERY | TRACE_INIT)) |
| { |
| trace_printf("%s@%d ignoring state from node %d\n", |
| method_name, __LINE__, pnid); |
| } |
| } |
| } |
| |
| if (trace_settings & (TRACE_SYNC | TRACE_RECOVERY | TRACE_INIT)) |
| { |
| trace_printf("%s@%d concurringNodes=%d, CurNodes=%d\n", |
| method_name, __LINE__, concurringNodes, CurNodes); |
| } |
| |
| if (concurringNodes == CurNodes) |
| { // General agreement that node is down, proceed to mark it down |
| |
| CNode *downNode = Nodes->GetNode( it->exitedPnid ); |
| if (downNode && downNode->GetState() != State_Down) |
| { |
| // temp trace |
| if (trace_settings & (TRACE_SYNC | TRACE_RECOVERY | TRACE_INIT)) |
| { |
| trace_printf("%s@%d proceeding to mark node %d down\n", |
| method_name, __LINE__, it->exitedPnid); |
| } |
| |
| mem_log_write(CMonLog::MON_UPDATE_CLUSTER_3, it->exitedPnid); |
| |
| HandleDownNode(it->exitedPnid); |
| } |
| else |
| { |
| if (trace_settings & (TRACE_SYNC | TRACE_RECOVERY | TRACE_INIT)) |
| { |
| trace_printf("%s@%d Node pnid=%d is already down\n" |
| , method_name, __LINE__ |
| , it->exitedPnid); |
| } |
| } |
| } |
| else if ( concurringNodes != 0 && !enqueuedDown_ ) |
| { // Some monitors say the node is down, others don't. |
| // This is not supposed to happen. Enqueue request to |
| // bring this node down. All monitors will do the same |
| // so the cluster will be brought down. |
| if (setSeesUp.length() > 2) |
| setSeesUp.erase(setSeesUp.length()-2, 2); |
| if (setSeesDown.length() > 2) |
| setSeesDown.erase(setSeesDown.length()-2, 2); |
| char buf[MON_STRING_BUF_SIZE*2]; |
| snprintf( buf, sizeof(buf), "[%s] Lost connection to node " |
| "%d but only %d of %d nodes also lost the " |
| "connection. See up: %s. See down: %s. So node " |
| "%d is going down (at seq #%lld).\n", method_name, |
| it->exitedPnid, concurringNodes, CurNodes, |
| setSeesUp.c_str(), setSeesDown.c_str(), |
| MyPNID, seqNum_ ); |
| mon_log_write(MON_CLUSTER_VALIDATE_STATE_2, SQ_LOG_ERR, buf); |
| |
| mem_log_write(CMonLog::MON_UPDATE_CLUSTER_4, MyPNID, |
| it->exitedPnid); |
| |
| enqueuedDown_ = true; |
| ReqQueue.enqueueDownReq(MyPNID); |
| } |
| |
| if (trace_settings & (TRACE_SYNC | TRACE_RECOVERY | TRACE_INIT)) |
| { |
| trace_printf("%s@%d removing exited pnid=%d, detecting pnid=%d, seqNum=%lld" |
| " (current seqNum_=%lld)\n", method_name, __LINE__, |
| it->exitedPnid, it->detectingPnid, it->seqNum, seqNum_); |
| } |
| // Delete current list element and advance to next one |
| it = exitedMons_.erase( it ); |
| } |
| else |
| { // Advance to next list element |
| ++it; |
| } |
| } |
| |
| |
| if ( haveDivergence ) |
| { |
| for ( int i =0; i < MAX_NODE_MASKS ; i++ ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s] Cluster view divergence (at seq #%lld), " |
| "node %d sees set[%d]: %llx\n" |
| , method_name, seqNum_, MyPNID, i |
| , upNodes_.upNodes[i] ); |
| mon_log_write(MON_CLUSTER_VALIDATE_STATE_3, SQ_LOG_ERR, buf); |
| } |
| |
| // For each "up node" (from local perspective) |
| // go through nodestate for each other node. If any node |
| // says the node is down, add an item to the exitedMons_ list |
| // for examination during the next sync cycle (by which time |
| // all nodes will have had a chance to detect the down monitor.) |
| |
| for (int remotePNid = 0; remotePNid < cfgPNodes_; ++remotePNid) |
| { |
| bool someExited = false; |
| // No need to check local monitor's view of the cluster since |
| // any down connections are handled directly when detected. |
| if (remotePNid == MyPNID) continue; |
| |
| // No need to check a remote monitor's view when node is down |
| CNode *remoteNode = Nodes->GetNode( remotePNid ); |
| if (remoteNode->GetState() == State_Down) |
| { |
| if (trace_settings & (TRACE_SYNC | TRACE_RECOVERY | TRACE_INIT)) |
| { |
| trace_printf("%s@%d Skipping down node " |
| "pnid=%d (%s)\n", |
| method_name, __LINE__, |
| remotePNid, remoteNode->GetName()); |
| } |
| continue; |
| } |
| |
| // Check if all active nodes see the node as up. |
| nodeMask.upNodes[remotePNid/64] = 1ull << remotePNid; |
| |
| if ( upNodes_.upNodes[remotePNid/64] & nodeMask.upNodes[remotePNid/64] ) |
| { // This remote node sees node pnid as up |
| for (int exitedPNid = 0; exitedPNid < cfgPNodes_; ++exitedPNid) |
| { |
| if ( (remotePNid != exitedPNid) && |
| (nodestate[remotePNid].seq_num != 0) && |
| (nodestate[exitedPNid].nodeMask.upNodes[remotePNid/64] & |
| nodeMask.upNodes[remotePNid/64]) == 0 ) |
| { // Node remotePNid sees exitedPNid as down |
| |
| if (trace_settings & (TRACE_SYNC | TRACE_RECOVERY | TRACE_INIT)) |
| { |
| trace_printf("%s@%d Divergence, queueing " |
| "monExited{%d, %d, %lld}\n", |
| method_name, __LINE__, exitedPNid, remotePNid, |
| seqNum_); |
| } |
| |
| someExited = true; |
| monExited_t monExited = {exitedPNid, remotePNid, seqNum_}; |
| exitedMons_.push_back( monExited ); |
| } |
| } |
| } |
| if (someExited) |
| { |
| // No need to look further for any other |
| // monitor's view of node pnid. When the |
| // exitedMons_ element is processed all nodes |
| // will be checked for concurrence. |
| break; |
| } |
| } |
| } |
| } |
| |
| bool CCluster::ValidateSeqNum( cluster_state_def_t nodestate[] ) |
| { |
| const char method_name[] = "CCluster::ValidateSeqNum"; |
| |
| unsigned long long seqNum; |
| unsigned long long seqNumBucket[256]; |
| int seqNumCount[256]; |
| int maxBucket = 0; |
| bool found; |
| int mostCountsIndex; |
| |
| if ( cfgPNodes_ == 1 ) return true; |
| |
| // Count occurrences of sequence numbers from other nodes |
| for (int pnid = 0; pnid < cfgPNodes_; pnid++) |
| { |
| seqNum = nodestate[pnid].seq_num; |
| |
| if (seqNum != 0) |
| { |
| found = false; |
| for (int i=0; i<maxBucket; ++i) |
| { |
| if ( seqNum == seqNumBucket[i] ) |
| { |
| ++seqNumCount[i]; |
| found = true; |
| break; |
| } |
| } |
| if ( ! found ) |
| { |
| seqNumBucket[maxBucket] = seqNum; |
| seqNumCount[maxBucket] = 1; |
| ++maxBucket; |
| } |
| } |
| } |
| |
| if ( maxBucket == 0 ) |
| { // Normal case, all nodes have same sequence number |
| mostCountsIndex = 0; |
| } |
| else |
| { // Look for majority sequence number |
| int mostCounts = 0; |
| mostCountsIndex = 0; |
| for (int i=0; i<maxBucket; ++i) |
| { |
| if ( seqNumCount[i] > mostCounts ) |
| { |
| mostCounts = seqNumCount[i]; |
| mostCountsIndex = i; |
| } |
| } |
| } |
| |
| if (trace_settings & TRACE_SYNC) |
| { |
| if ( seqNum_ != seqNumBucket[mostCountsIndex] ) |
| { |
| trace_printf("%s@%d Most common seq num=%lld (%d nodes), %d buckets" |
| ", local seq num (%lld) did not match.\n", |
| method_name, __LINE__, seqNumBucket[mostCountsIndex], |
| seqNumCount[mostCountsIndex], maxBucket, seqNum_); |
| } |
| } |
| |
| // Fail if my seqnum does not match majority |
| return seqNum_ == seqNumBucket[mostCountsIndex]; |
| } |
| |
| void CCluster::HandleDownNode( int pnid ) |
| { |
| const char method_name[] = "CCluster::HandleDownNode"; |
| TRACE_ENTRY; |
| |
| // Add to dead node name list |
| CNode *downNode = Nodes->GetNode( pnid ); |
| assert(downNode); |
| deadNodeList_.push_back( downNode ); |
| |
| if (trace_settings & TRACE_INIT) |
| trace_printf("%s@%d - Added down node to list, pnid=%d, name=(%s)\n", method_name, __LINE__, downNode->GetPNid(), downNode->GetName()); |
| |
| // assign new TmLeader if TMLeader node is dead. |
| AssignTmLeader(pnid); |
| |
| // Build available list of spare nodes |
| CNode *spareNode; |
| NodesList *spareNodesList = Nodes->GetSpareNodesList(); |
| NodesList::iterator itSn; |
| for ( itSn = spareNodesList->begin(); itSn != spareNodesList->end() ; itSn++ ) |
| { |
| spareNode = *itSn; |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf( "%s@%d - %s (pnid=%d) is in available spare node list, state=%s, spare=%d, rank failure=%d\n" |
| , method_name, __LINE__, spareNode->GetName(), spareNode->GetPNid() |
| , StateString(spareNode->GetState()), spareNode->IsSpareNode(), spareNode->IsRankFailure()); |
| // if spare node is available |
| if ( spareNode->IsSpareNode() && |
| !spareNode->IsRankFailure() && |
| spareNode->GetState() == State_Up ) |
| { |
| spareNodeVector_.push_back( spareNode ); |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf("%s@%d - pnid=%d, name=(%s) is available Spare\n", method_name, __LINE__, spareNode->GetPNid(), spareNode->GetName()); |
| } |
| } |
| |
| // Activate spare or down node |
| NodesList::iterator itDn; |
| for ( itDn = deadNodeList_.begin(); itDn != deadNodeList_.end() ; itDn++ ) |
| { |
| downNode = *itDn; |
| if ( Emulate_Down ) |
| { |
| ReqQueue.enqueueDownReq( downNode->GetPNid() ); |
| } |
| else |
| { |
| bool done = false; |
| spareNode = NULL; |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf( "%s@%d - spare node vector size=%ld\n" |
| , method_name, __LINE__, spareNodeVector_.size()); |
| // Find available spare node for current down node |
| for ( unsigned int ii = 0; ii < spareNodeVector_.size() && !done ; ii++ ) |
| { |
| PNidVector sparePNids = spareNodeVector_[ii]->GetSparePNids(); |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf( "%s@%d - spare pnids vector size=%ld\n" |
| , method_name, __LINE__, sparePNids.size()); |
| // Check each pnid it is configured to spare |
| for ( unsigned int jj = 0; jj < sparePNids.size(); jj++ ) |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf( "%s@%d - %s (pnid=%d) is in spare node vector[%d], size=%ld\n" |
| , method_name, __LINE__ |
| , spareNodeVector_[ii]->GetName() |
| , spareNodeVector_[ii]->GetPNid() |
| , jj, sparePNids.size()); |
| // if this is a spare for the down node |
| if ( spareNodeVector_[ii]->IsSpareNode() && |
| downNode->GetPNid() == sparePNids[jj] ) |
| { |
| // assign it and remove it from the vector |
| spareNode = spareNodeVector_[ii]; |
| spareNodeVector_.erase( spareNodeVector_.begin() + ii ); |
| done = true; |
| break; |
| } |
| } |
| } |
| |
| if ( spareNode ) |
| { |
| Nodes->RemoveFromSpareNodesList( spareNode ); |
| downNode->SetState( State_Takeover ); // change state so that pending requests could fail. |
| spareNode->SetActivatingSpare( true ); |
| if ( spareNode->GetPNid() == MyPNID ) |
| { |
| ReqQueue.enqueueActivateSpareReq( spareNode, downNode, true ); |
| } |
| } |
| else |
| { |
| if ( downNode->IsSpareNode() ) |
| { |
| Nodes->RemoveFromSpareNodesList( downNode ); |
| } |
| ReqQueue.enqueueDownReq( downNode->GetPNid() ); |
| } |
| } |
| } |
| |
| spareNodeVector_.clear(); |
| deadNodeList_.clear(); |
| |
| TRACE_EXIT; |
| } |
| |
| void CCluster::UpdateClusterState( bool &doShutdown, |
| struct sync_buffer_def * syncBuf, |
| MPI_Status *status, |
| int sentChangeNid) |
| { |
| const char method_name[] = "CCluster::UpdateClusterState"; |
| TRACE_ENTRY; |
| |
| struct sync_buffer_def *recvBuf; |
| STATE node_state; |
| int change_nid; |
| cluster_state_def_t nodestate[cfgPNodes_]; |
| bool clusterViewDivergence = false; |
| |
| |
| // Populate nodestate array using node state info from "allgather" |
| // along with local node state. |
| for (int pnid = 0; pnid < cfgPNodes_; pnid++) |
| { |
| // Only process active nodes |
| bool noComm; |
| switch( CommType ) |
| { |
| case CommType_InfiniBand: |
| noComm = (comms_[pnid] == MPI_COMM_NULL) ? true : false; |
| break; |
| case CommType_Sockets: |
| noComm = (socks_[pnid] == -1) ? true : false; |
| break; |
| default: |
| // Programmer bonehead! |
| abort(); |
| } |
| |
| if (noComm |
| || status[pnid].MPI_ERROR != MPI_SUCCESS) |
| { |
| if (trace_settings & (TRACE_RECOVERY | TRACE_INIT)) |
| { |
| if (!noComm) |
| { |
| trace_printf( "%s@%d - Communication error from node %d:\n" |
| " node_state=%d\n" |
| " change_nid=%d\n" |
| " seq_num=#%lld\n" |
| , method_name, __LINE__, pnid |
| , recvBuf->nodeInfo.node_state |
| , recvBuf->nodeInfo.change_nid |
| , seqNum_ ); |
| } |
| } |
| // Not an active node, set default values |
| nodestate[pnid].node_state = State_Unknown; |
| nodestate[pnid].change_nid = -1; |
| nodestate[pnid].seq_num = 0; |
| for ( int i =0; i < MAX_NODE_MASKS ; i++ ) |
| { |
| nodestate[pnid].nodeMask.upNodes[i] = 0; |
| } |
| |
| continue; |
| } |
| |
| recvBuf = (struct sync_buffer_def *) |
| (((char *) syncBuf) + pnid * CommBufSize); |
| |
| if (trace_settings & TRACE_SYNC_DETAIL) |
| { |
| int nr; |
| MPI_Get_count(&status[pnid], MPI_CHAR, &nr); |
| trace_printf("%s@%d - Received %d bytes from node %d, " |
| "message count=%d\n", |
| method_name, __LINE__, nr, pnid, |
| recvBuf->msgInfo.msg_count); |
| } |
| |
| nodestate[pnid].node_state = recvBuf->nodeInfo.node_state; |
| nodestate[pnid].change_nid = recvBuf->nodeInfo.change_nid; |
| nodestate[pnid].seq_num = recvBuf->nodeInfo.seq_num; |
| nodestate[pnid].nodeMask = recvBuf->nodeInfo.nodeMask; |
| |
| for ( int i =0; i < MAX_NODE_MASKS ; i++ ) |
| { |
| if ( nodestate[pnid].nodeMask.upNodes[i] != upNodes_.upNodes[i] ) |
| { |
| if (trace_settings & (TRACE_SYNC | TRACE_RECOVERY | TRACE_INIT)) |
| { |
| for ( int j =0; j < MAX_NODE_MASKS ; j++ ) |
| { |
| trace_printf( "%s@%d - Divergence (at seq #%lld), node %s " |
| "(pnid=%d) sees cluster state[%d] %llx, local " |
| "monitor sees %llx\n" |
| , method_name, __LINE__ |
| , seqNum_ |
| , Node[pnid]->GetName() |
| , pnid |
| , j |
| , nodestate[pnid].nodeMask.upNodes[j] |
| , upNodes_.upNodes[j] ); |
| } |
| } |
| clusterViewDivergence = true; |
| } |
| } |
| |
| if (trace_settings & (TRACE_SYNC_DETAIL | TRACE_TMSYNC)) |
| { |
| trace_printf( "%s@%d - Node %s (pnid=%d) TmSyncState=(%d)(%s)\n" |
| , method_name, __LINE__ |
| , Node[pnid]->GetName() |
| , pnid |
| , recvBuf->nodeInfo.tmSyncState |
| , SyncStateString( recvBuf->nodeInfo.tmSyncState )); |
| } |
| |
| if ( Node[pnid]->GetTmSyncState() != recvBuf->nodeInfo.tmSyncState ) |
| { |
| Node[pnid]->SetTmSyncState(recvBuf->nodeInfo.tmSyncState); |
| if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) |
| { |
| trace_printf("%s@%d - Node %s (pnid=%d) TmSyncState updated" |
| " (%d)(%s)\n", method_name, __LINE__, |
| Node[pnid]->GetName(), pnid, |
| recvBuf->nodeInfo.tmSyncState, |
| SyncStateString( recvBuf->nodeInfo.tmSyncState )); |
| } |
| } |
| |
| // Check if we need to increase my node's shutdown level ... |
| // all nodes should be at the highest level selected from any source |
| if ( MyNode->GetShutdownLevel() < recvBuf->nodeInfo.sdLevel ) |
| { |
| MyNode->SetShutdownLevel( recvBuf->nodeInfo.sdLevel ); |
| if (MyNode->GetState() == State_Up) |
| { |
| MyNode->SetState( State_Shutdown ); |
| } |
| if (trace_settings & (TRACE_REQUEST | TRACE_SYNC)) |
| trace_printf("%s@%d - Node %s Shutdown Level updated (%d)\n", |
| method_name, __LINE__, |
| Node[pnid]->GetName(), recvBuf->nodeInfo.sdLevel); |
| } |
| |
| Node[pnid]->SetInternalState( recvBuf->nodeInfo.internalState ); |
| if ( recvBuf->nodeInfo.internalState == State_Ready_To_Exit ) |
| { // The node is exiting. Don't communicate with it any more. |
| if (trace_settings & (TRACE_REQUEST | TRACE_SYNC)) |
| trace_printf("%s@%d - Node %s (%d) ready to exit, setting comm " |
| "to null\n", method_name, __LINE__, |
| Node[pnid]->GetName(), pnid); |
| |
| switch( CommType ) |
| { |
| case CommType_InfiniBand: |
| MPI_Comm_free( &comms_[pnid] ); |
| break; |
| case CommType_Sockets: |
| shutdown( socks_[pnid], SHUT_RDWR ); |
| close( socks_[pnid] ); |
| socks_[pnid] = -1; |
| break; |
| default: |
| // Programmer bonehead! |
| abort(); |
| } |
| Node[pnid]->SetState( State_Down ); |
| --CurNodes; |
| // Clear bit in set of "up nodes" |
| upNodes_.upNodes[pnid/64] &= ~(1ull << pnid); |
| } |
| } |
| |
| if ( checkSeqNum_ && !ValidateSeqNum( nodestate ) && !enqueuedDown_ ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), "[%s] Sync cycle sequence number (%lld) " |
| "incorrect. Scheduling node down.\n", method_name, seqNum_); |
| mon_log_write(MON_CLUSTER_UPDTCLUSTERSTATE_1, SQ_LOG_ERR, buf); |
| |
| mem_log_write(CMonLog::MON_UPDATE_CLUSTER_2, MyPNID); |
| |
| enqueuedDown_ = true; |
| |
| ReqQueue.enqueueDownReq(MyPNID); |
| } |
| |
| nodestate[MyPNID].node_state = Node[MyPNID]->GetState(); |
| nodestate[MyPNID].change_nid = sentChangeNid; |
| nodestate[MyPNID].seq_num = seqNum_; |
| nodestate[MyPNID].nodeMask = upNodes_; |
| |
| // Examine status returned from MPI receive requests |
| for (int pnid = 0; pnid < cfgPNodes_; pnid++) |
| { |
| bool noComm; |
| switch( CommType ) |
| { |
| case CommType_InfiniBand: |
| noComm = (comms_[pnid] == MPI_COMM_NULL) ? true : false; |
| break; |
| case CommType_Sockets: |
| noComm = (socks_[pnid] == -1) ? true : false; |
| break; |
| default: |
| // Programmer bonehead! |
| abort(); |
| } |
| if (noComm) continue; |
| |
| if (status[pnid].MPI_ERROR != MPI_SUCCESS) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), "[%s] MPI communications error=%d " |
| "(%s) for node %d (at seq #%lld).\n", method_name, |
| status[pnid].MPI_ERROR, ErrorMsg(status[pnid].MPI_ERROR), |
| pnid, seqNum_); |
| mon_log_write(MON_CLUSTER_UPDTCLUSTERSTATE_2, SQ_LOG_ERR, buf); |
| |
| if ( status[pnid].MPI_ERROR == MPI_ERR_EXITED ) |
| { // A monitor has gone away |
| |
| mem_log_write(CMonLog::MON_UPDATE_CLUSTER_1, pnid); |
| |
| switch( CommType ) |
| { |
| case CommType_InfiniBand: |
| MPI_Comm_free( &comms_[pnid] ); |
| break; |
| case CommType_Sockets: |
| shutdown( socks_[pnid], SHUT_RDWR ); |
| close( socks_[pnid] ); |
| socks_[pnid] = -1; |
| break; |
| default: |
| // Programmer bonehead! |
| abort(); |
| } |
| --CurNodes; |
| |
| // Clear bit in set of "up nodes" |
| upNodes_.upNodes[pnid/64] &= ~(1ull << pnid); |
| |
| // Pretend node is still up until down node processing |
| // completes. |
| nodestate[pnid].node_state = State_Unknown; |
| nodestate[pnid].change_nid = -1; |
| nodestate[pnid].seq_num = 0; |
| for ( int i =0; i < MAX_NODE_MASKS ; i++ ) |
| { |
| nodestate[pnid].nodeMask.upNodes[i] = 0; |
| } |
| |
| if ( validateNodeDown_ ) |
| { |
| if (trace_settings & (TRACE_SYNC | TRACE_RECOVERY | TRACE_INIT)) |
| { |
| trace_printf("%s@%d Divergence, queueing " |
| "monExited{%d, %d, %lld}\n", |
| method_name, __LINE__, pnid, MyPNID, |
| seqNum_); |
| } |
| // Save info for the exited monitor so can confirm |
| // that all monitors have the same view. |
| monExited_t monExited = {pnid, MyPNID, seqNum_}; |
| exitedMons_.push_back( monExited ); |
| } |
| else |
| { |
| HandleDownNode(pnid); |
| } |
| } |
| } |
| } |
| |
| if ( validateNodeDown_ ) |
| ValidateClusterState( nodestate, clusterViewDivergence ); |
| |
| if (trace_settings & (TRACE_SYNC_DETAIL | TRACE_TMSYNC)) |
| { |
| trace_printf( "%s@%d - Node %s (pnid=%d) TmSyncState=(%d)(%s)\n" |
| , method_name, __LINE__ |
| , MyNode->GetName() |
| , MyPNID |
| , tmSyncBuffer_->nodeInfo.tmSyncState |
| , SyncStateString( tmSyncBuffer_->nodeInfo.tmSyncState )); |
| } |
| |
| // Update our node states |
| for (int pnid = 0; pnid < cfgPNodes_; pnid++) |
| { |
| node_state = (STATE)nodestate[pnid].node_state; |
| change_nid = nodestate[pnid].change_nid; |
| |
| if ( pnid == MyPNID && MyNode->GetState() == State_Merged |
| && seqNum_ == 0) |
| { // Initial "allgather" for this re-integrated monitor. |
| |
| seqNum_ = EnsureAndGetSeqNum(nodestate); |
| |
| if (trace_settings & TRACE_SYNC_DETAIL) |
| { |
| trace_printf("%s@%d Completed initial allgather for pnid=%d, " |
| "state=%d(%s), seqNum_=%lld\n", method_name, __LINE__, |
| pnid, MyNode->GetState(), |
| StateString(MyNode->GetState()), seqNum_ ); |
| } |
| |
| // Queue the node up request for processing by a |
| // worker thread. |
| ReqQueue.enqueueUpReq( MyPNID, NULL, -1 ); |
| } |
| |
| if ( change_nid == MyPNID ) |
| { |
| if( MyNode->GetState() == State_Down || |
| MyNode->GetState() == State_Merged || |
| MyNode->GetState() == State_Joining ) |
| { |
| if (trace_settings & TRACE_RECOVERY) |
| trace_printf( "%s@%d enqueueing node up, state=%s\n", |
| method_name, __LINE__, |
| StateString(MyNode->GetState()) ); |
| |
| // Queue the node up request for processing by a |
| // worker thread. |
| ReqQueue.enqueueUpReq( MyPNID, NULL, -1 ); |
| } |
| else |
| { // This node is being "downed" |
| |
| if (trace_settings & TRACE_RECOVERY) |
| trace_printf( "%s@%d enqueueing node down, state=%s\n", |
| method_name, __LINE__, |
| StateString(MyNode->GetState()) ); |
| |
| // Queue the node down request for processing by a |
| // worker thread. |
| ReqQueue.enqueueDownReq( MyPNID ); |
| } |
| } |
| else |
| { |
| // In a real cluster, existing monitors need to merge new |
| // monitor. |
| |
| CNode *pnode = change_nid != -1 ? Nodes->GetNode( change_nid ) : NULL; |
| if ( ! Emulate_Down && change_nid != -1 && pnode ) |
| { |
| switch ( pnode->GetState() ) |
| { |
| case State_Down: |
| if (trace_settings & TRACE_RECOVERY) |
| trace_printf( "%s@%d - change_nid=%d, state=%s, " |
| "queueing up request\n", |
| method_name, __LINE__ , change_nid, |
| StateString(pnode->GetState())); |
| |
| mem_log_write(CMonLog::MON_UPDATE_CLUSTER_5, change_nid); |
| |
| // Queue the node up request for processing by a |
| // worker thread. |
| ReqQueue.enqueueUpReq( change_nid, |
| (char *)pnode->GetName(), |
| -1 ); |
| break; |
| case State_Merging: |
| if (trace_settings & TRACE_RECOVERY) |
| trace_printf( "%s@%d - change_nid=%d, state=%s, " |
| "queueing up request\n", |
| method_name, __LINE__ , change_nid, |
| StateString(pnode->GetState())); |
| |
| mem_log_write(CMonLog::MON_UPDATE_CLUSTER_6, change_nid); |
| |
| switch( CommType ) |
| { |
| case CommType_InfiniBand: |
| setNewComm(change_nid); |
| break; |
| case CommType_Sockets: |
| setNewSock(change_nid); |
| break; |
| default: |
| // Programmer bonehead! |
| MPI_Abort(MPI_COMM_SELF,99); |
| } |
| pnode->SetState( State_Merged ); |
| ReqQueue.enqueueUpReq( change_nid, |
| (char *)pnode->GetName(), |
| -1 ); |
| break; |
| |
| case State_Merged: |
| case State_Joining: |
| default: |
| if (trace_settings & TRACE_RECOVERY) |
| trace_printf( "%s@%d - change_nid=%d, state=%s, " |
| "no action required.\n", |
| method_name, __LINE__ , change_nid, |
| StateString( pnode->GetState() )); |
| break; |
| } |
| } |
| } |
| switch ( node_state ) |
| { |
| case State_Up: |
| case State_Joining: |
| case State_Merged: |
| case State_Merging: |
| case State_Initializing: |
| case State_Unlinked: |
| case State_Unknown: |
| break; |
| case State_Down: |
| doShutdown = true; |
| break; |
| case State_Stopped: |
| case State_Shutdown: |
| if (trace_settings & TRACE_SYNC_DETAIL) |
| trace_printf("%s@%d - Node %d is stopping.\n", method_name, __LINE__, pnid); |
| Node[pnid]->SetState( (STATE) node_state ); |
| doShutdown = true; |
| break; |
| default: |
| if (trace_settings & TRACE_SYNC) |
| trace_printf("%s@%d - Node %d in unknown state (%d).\n", |
| method_name, __LINE__, pnid, node_state); |
| } |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| bool CCluster::ProcessClusterData( struct sync_buffer_def * syncBuf, |
| struct sync_buffer_def * sendBuf, |
| bool deferredTmSync ) |
| { |
| const char method_name[] = "CCluster::ProcessClusterData"; |
| TRACE_ENTRY; |
| |
| // Using the data returned from Allgather, process replication data |
| // from all nodes. If there are any TmSync messages from other |
| // nodes, defer processing until all other replicated data are |
| // processed. |
| struct internal_msg_def *msg; |
| struct sync_buffer_def *msgBuf; |
| bool haveDeferredTmSync = false; |
| |
| for (int pnid = 0; pnid < cfgPNodes_; pnid++) |
| { |
| bool noComm; |
| switch( CommType ) |
| { |
| case CommType_InfiniBand: |
| noComm = (comms_[pnid] == MPI_COMM_NULL) ? true : false; |
| break; |
| case CommType_Sockets: |
| noComm = (socks_[pnid] == -1) ? true : false; |
| break; |
| default: |
| // Programmer bonehead! |
| abort(); |
| } |
| // Only process active nodes |
| if (noComm && pnid != MyPNID) continue; |
| |
| if ( pnid == MyPNID ) |
| { // Get pointer to message sent by this node |
| msgBuf = sendBuf; |
| } |
| else |
| { // Compute pointer to receive buffer element for node "i" |
| msgBuf = (struct sync_buffer_def *) |
| (((char *) syncBuf) + pnid * CommBufSize); |
| } |
| |
| // reset msg length to zero to initialize for PopMsg() |
| msgBuf->msgInfo.msg_offset = 0; |
| |
| if ( msgBuf->msgInfo.msg_count == 1 |
| && (( internal_msg_def *)msgBuf->msg)->type == InternalType_Sync ) |
| { |
| if ( deferredTmSync ) |
| { // This node has sent a TmSync message. Process it now. |
| if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) |
| trace_printf("%s@%d - Handling deferred TmSync message for " |
| "node %d\n", method_name, __LINE__, pnid); |
| |
| struct internal_msg_def *msg; |
| msg = Nodes->PopMsg( msgBuf ); |
| |
| if ( pnid == MyPNID ) |
| HandleMyNodeMsg (msg, MyPNID); |
| else |
| HandleOtherNodeMsg (msg, pnid); |
| } |
| else |
| { |
| // This node has sent a TmSync message. Defer processing |
| // until we handle all of the non-TmSync messages from |
| // other nodes. |
| haveDeferredTmSync = true; |
| |
| if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) |
| trace_printf("%s@%d - Deferring TmSync processing for node" |
| " %d until replicated data is handled\n", |
| method_name, __LINE__, pnid); |
| } |
| } |
| else if ( !deferredTmSync ) |
| { |
| // temp trace |
| if (trace_settings & TRACE_SYNC_DETAIL) |
| { |
| trace_printf("%s@%d - For node %d, msg_count=%d, msg_offset" |
| "=%d\n", method_name, __LINE__, pnid, |
| msgBuf->msgInfo.msg_count, |
| msgBuf->msgInfo.msg_offset); |
| } |
| do |
| { |
| // Get the next sync msg for the node |
| msg = Nodes->PopMsg( msgBuf ); |
| if (msg->type == InternalType_Null) break; |
| |
| if ( pnid == MyPNID ) |
| HandleMyNodeMsg (msg, MyPNID); |
| else |
| HandleOtherNodeMsg (msg, pnid); |
| } |
| while ( true ); |
| } |
| } |
| |
| TRACE_EXIT; |
| |
| return haveDeferredTmSync; |
| } |
| |
| bool CCluster::checkIfDone ( ) |
| { |
| const char method_name[] = "CCluster::checkIfDone"; |
| TRACE_ENTRY; |
| |
| if (trace_settings & TRACE_SYNC_DETAIL) |
| trace_printf("%s@%d - Node %d shutdown level=%d, state=%s. Process " |
| "count=%d, internal state=%d, CurNodes=%d, " |
| "local process count=%d\n", |
| method_name, __LINE__, MyNode->GetPNid(), |
| MyNode->GetShutdownLevel(), |
| StateString(MyNode->GetState()), |
| Nodes->ProcessCount(), |
| MyNode->getInternalState(), |
| CurNodes, MyNode->GetNumProcs()); |
| |
| // Check if we are also done |
| if (( MyNode->GetState() != State_Down ) && |
| ( MyNode->GetState() != State_Stopped ) ) |
| { |
| if ( MyNode->GetShutdownLevel() != ShutdownLevel_Undefined ) |
| { |
| if ( Nodes->ProcessCount() == 0 ) // all WDTs exited |
| { |
| if (trace_settings & TRACE_SYNC) |
| trace_printf("%s@%d - Monitor signaled to exit.\n", method_name, __LINE__); |
| MyNode->SetState( State_Stopped ); |
| MyNode->SetInternalState(State_Ready_To_Exit); |
| |
| // we need to sync one more time so other nodes see our state |
| return false; |
| } |
| else if ( (Nodes->ProcessCount() <= (CurNodes*2)) // only WDTs and |
| // SMSs are alive |
| && !MyNode->isInQuiesceState() // post-quiescing will |
| // expire WDT |
| && !waitForWatchdogExit_ ) // watchdog not yet |
| // exiting |
| { |
| if (trace_settings & TRACE_SYNC) |
| trace_printf("%s@%d - Stopping watchdog process.\n", |
| method_name, __LINE__); |
| |
| waitForWatchdogExit_ = true; |
| // stop the watchdog timer first |
| HealthCheck.setState(MON_STOP_WATCHDOG); |
| // let the watchdog process exit |
| HealthCheck.setState(MON_EXIT_PRIMITIVES); |
| } |
| } |
| } |
| else if ( MyNode->GetShutdownLevel() != ShutdownLevel_Undefined |
| && MyNode->GetState() == State_Down |
| && MyNode->GetNumProcs() == 0) |
| { |
| if (trace_settings & TRACE_SYNC) |
| trace_printf("%s@%d - No processes remaining, monitor exiting.\n", |
| method_name, __LINE__); |
| |
| MyNode->SetState( State_Stopped ); |
| MyNode->SetInternalState(State_Ready_To_Exit); |
| // we need to sync one more time so other nodes see our state |
| return false; |
| } |
| |
| MyNode->CheckShutdownProcessing(); |
| |
| TRACE_EXIT; |
| |
| return ( MyNode->getInternalState() == State_Ready_To_Exit ); |
| } |
| |
| |
| // Gather "Allgather" performance statistics |
| // Given the beginning and ending time of an "Allgather" operation, compute |
| // the elapsed time and increment the count for the appropriate range |
| // bucket. |
| |
| const struct timespec CCluster::agBuckets_[] = { |
| {0, 0}, // lowest |
| {0, 20000}, // 20 us |
| {0, 50000}, // 50 us |
| {0, 500000}, // 500 us |
| {0, 1000000}, // 1 ms |
| {0, 10000000}, // 10 ms |
| {0, 25000000}, // 25 ms |
| {0, 50000000}, // 50 ms |
| {0, 100000000}, // 100 ms |
| {0, 500000000}}; // 500 ms |
| const int CCluster::agBucketsSize_ = sizeof(agBuckets_)/sizeof(timespec); |
| |
| bool CCluster::agTimeStats(struct timespec & ts_begin, |
| struct timespec & ts_end) |
| { |
| const char method_name[] = "CCluster::agTimeStats"; |
| bool slowAg = false; |
| |
| struct timespec timediff; |
| if ( (ts_end.tv_nsec - ts_begin.tv_nsec ) < 0 ) |
| { |
| timediff.tv_sec = ts_end.tv_sec - ts_begin.tv_sec - 1; |
| timediff.tv_nsec = 1000000000 + ts_end.tv_nsec - ts_begin.tv_nsec; |
| } |
| else |
| { |
| timediff.tv_sec = ts_end.tv_sec - ts_begin.tv_sec; |
| timediff.tv_nsec = ts_end.tv_nsec - ts_begin.tv_nsec; |
| } |
| |
| if ( timediff.tv_sec > agMaxElapsed_.tv_sec |
| || (timediff.tv_sec == agMaxElapsed_.tv_sec |
| && timediff.tv_nsec > agMaxElapsed_.tv_nsec )) |
| // Have a new maximum elapsed time |
| agMaxElapsed_ = timediff; |
| |
| if ( timediff.tv_sec < agMinElapsed_.tv_sec |
| || (timediff.tv_sec == agMinElapsed_.tv_sec |
| && timediff.tv_nsec < agMinElapsed_.tv_nsec )) |
| // Have a new minimum time |
| agMinElapsed_ = timediff; |
| |
| for (int i=agBucketsSize_-1; i>=0; --i) |
| { |
| if (timediff.tv_sec > agBuckets_[i].tv_sec |
| || (timediff.tv_sec == agBuckets_[i].tv_sec |
| && timediff.tv_nsec > agBuckets_[i].tv_nsec )) |
| { |
| ++agElapsed_[i]; |
| if (i >= 7) |
| { |
| slowAg = true; |
| if (trace_settings & TRACE_SYNC) |
| { |
| trace_printf("%s@%d slow Allgather=(%ld, %ld) seqNum_=%lld, i=%d\n", |
| method_name, __LINE__, |
| timediff.tv_sec, timediff.tv_nsec, seqNum_, i); |
| } |
| } |
| break; |
| } |
| } |
| |
| return slowAg; |
| } |
| |
| // Display "Allgather" statistics |
| void CCluster::stats() |
| { |
| const char method_name[] = "CCluster::stats"; |
| |
| trace_printf("%s@%d Allgather min elapsed=%ld.%ld\n", method_name, __LINE__, |
| agMinElapsed_.tv_sec, agMinElapsed_.tv_nsec); |
| |
| trace_printf("%s@%d Allgather max elapsed=%ld.%ld\n", method_name, __LINE__, |
| agMaxElapsed_.tv_sec, agMaxElapsed_.tv_nsec); |
| |
| unsigned long int bucket; |
| const char * unit; |
| const char * range; |
| for (int i=0; i<agBucketsSize_; ++i) |
| { |
| if ( i == (agBucketsSize_-1)) |
| { |
| bucket = agBuckets_[i].tv_nsec; |
| range = ">"; |
| } |
| else |
| { |
| bucket = agBuckets_[i+1].tv_nsec; |
| range = "<="; |
| } |
| bucket = bucket/1000; |
| if (bucket < 1000) |
| unit = "usec"; |
| else |
| { |
| bucket = bucket / 1000; |
| if ( bucket < 1000 ) |
| unit = "msec"; |
| else |
| unit = "???"; |
| } |
| trace_printf("%s@%d bucket[%d]=%d (%s %ld %s)\n", |
| method_name, __LINE__, i, agElapsed_[i], |
| range, bucket, unit); |
| } |
| } |
| |
| bool CCluster::exchangeNodeData ( ) |
| { |
| const char method_name[] = "CCluster::exchangeNodeData"; |
| TRACE_ENTRY; |
| |
| // Record statistics (sonar counters) |
| if (sonar_verify_state(SONAR_ENABLED | SONAR_MONITOR_ENABLED)) |
| MonStats->req_sync_Incr(); |
| |
| ++swpRecCount_; // recursive count for this function |
| |
| bool doShutdown = false; |
| |
| struct internal_msg_def *msg; |
| MPI_Status status[cfgPNodes_]; |
| int err; |
| struct sync_buffer_def *recv_buffer; |
| |
| // if we are here in a second recursive call that occurred while |
| // processing TMSync data, use the second receive buffer |
| // else, use the first one. |
| if (swpRecCount_ == 1) |
| { |
| recv_buffer = recvBuffer_; |
| } |
| else |
| { |
| // should not be here in more than one recursive call. |
| assert(swpRecCount_ == 2); |
| recv_buffer = recvBuffer2_; |
| } |
| |
| // Initialize sync buffer header including node state |
| msg = Nodes->InitSyncBuffer( Nodes->GetSyncBuffer(), seqNum_, upNodes_ ); |
| |
| // Fill sync buffer based on queue of replication requests |
| Replicator.FillSyncBuffer ( msg ); |
| |
| if (trace_settings & (TRACE_SYNC_DETAIL | TRACE_TMSYNC)) |
| trace_printf("%s@%d - doing Allgather size=%d, message count=%d, seqNum_=%lld\n", |
| method_name, __LINE__, Nodes->GetSyncSize(), |
| (Nodes->GetSyncBuffer())->msgInfo.msg_count, seqNum_); |
| |
| struct timespec ts_ag_begin; |
| clock_gettime(CLOCK_REALTIME, &ts_ag_begin); |
| |
| |
| // Exchange info with other nodes |
| err = Allgather(Nodes->GetSyncSize(), Nodes->GetSyncBuffer(), (char *)recv_buffer, |
| 0 /*seqNum_*/, status ); |
| |
| struct timespec ts_ag_end; |
| clock_gettime(CLOCK_REALTIME, &ts_ag_end); |
| |
| if (err != MPI_SUCCESS && err != MPI_ERR_IN_STATUS) |
| { |
| if (trace_settings & TRACE_SYNC) |
| { |
| trace_printf("%s@%d - unexpected Allgather error=%s (%d)\n", |
| method_name, __LINE__, ErrorMsg(err), err); |
| } |
| |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), "[%s], Unexpected MPI communications " |
| "error=%s (%d).\n", method_name, ErrorMsg(err), err); |
| mon_log_write(MON_CLUSTER_EXCHANGENODEDATA_1, SQ_LOG_ERR, buf); |
| |
| // Allgather() failed in a fundamental way, bring this node down |
| if ( !enqueuedDown_ ) |
| { |
| enqueuedDown_ = true; |
| ReqQueue.enqueueDownReq(MyPNID); |
| } |
| } |
| else |
| { |
| if (agTimeStats( ts_ag_begin, ts_ag_end)) |
| { // Slow cycle, print info |
| if ( trace_settings & TRACE_SYNC ) |
| { |
| trace_printf("%s@%d - slow Allgather info: sync size=%d, message count=%d, MyPNID=%d\n", |
| method_name, __LINE__, Nodes->GetSyncSize(), |
| (Nodes->GetSyncBuffer())->msgInfo.msg_count, MyPNID); |
| struct sync_buffer_def *msgBuf; |
| int nr; |
| |
| for (int pnid = 0; pnid < cfgPNodes_; pnid++) |
| { |
| bool noComm; |
| switch( CommType ) |
| { |
| case CommType_InfiniBand: |
| noComm = (comms_[pnid] == MPI_COMM_NULL) ? true : false; |
| break; |
| case CommType_Sockets: |
| noComm = (socks_[pnid] == -1) ? true : false; |
| break; |
| default: |
| // Programmer bonehead! |
| abort(); |
| } |
| // Only process active nodes |
| if (noComm) continue; |
| |
| msgBuf = (struct sync_buffer_def *) |
| (((char *) recv_buffer) + pnid * CommBufSize); |
| |
| MPI_Get_count(&status[pnid], MPI_CHAR, &nr); |
| |
| trace_printf("%s@%d - slow Allgather info, node=%d: received bytes=%d, message count=%d, msg_offset=%d\n", |
| method_name, __LINE__, pnid, nr, |
| msgBuf->msgInfo.msg_count, |
| msgBuf->msgInfo.msg_offset); |
| } |
| } |
| } |
| |
| UpdateClusterState( doShutdown, recv_buffer, status, |
| Nodes->GetSyncBuffer()->nodeInfo.change_nid); |
| } |
| |
| // Increment count of "Allgather" calls. If wrap-around, start again at 1. |
| if ( ++seqNum_ == 0) seqNum_ = 1; |
| |
| if ( ProcessClusterData( recv_buffer, Nodes->GetSyncBuffer(), false ) ) |
| { // There is a TmSync message remaining to be handled |
| ProcessClusterData( recv_buffer, Nodes->GetSyncBuffer(), true ); |
| } |
| |
| // ?? Need the following? Possibly not since maybe all sync cycle |
| // dependent code was removed -- need to check. |
| // Wake up any threads waiting on the completion of a sync cycle |
| syncCycle_.wakeAll(); |
| |
| bool result = false; |
| if (doShutdown) result = checkIfDone( ); |
| |
| --swpRecCount_; |
| |
| TRACE_EXIT; |
| |
| return result; |
| } |
| |
| void CCluster::exchangeTmSyncData ( struct sync_def *sync ) |
| { |
| const char method_name[] = "CCluster::exchangeTmSyncData"; |
| TRACE_ENTRY; |
| |
| ++swpRecCount_; // recursive count for this function |
| |
| bool doShutdown = false; |
| |
| struct internal_msg_def *msg; |
| MPI_Status status[cfgPNodes_]; |
| int err; |
| struct sync_buffer_def *recv_buffer; |
| |
| // if we are here in a second recursive call that occurred while |
| // processing TMSync data, use the second receive buffer |
| // else, use the first one. |
| if (swpRecCount_ == 1) |
| { |
| recv_buffer = recvBuffer_; |
| } |
| else |
| { |
| // should not be here in more than one recursive call. |
| assert(swpRecCount_ == 2); |
| recv_buffer = recvBuffer2_; |
| } |
| |
| // Initialize sync buffer header including node state |
| msg = Nodes->InitSyncBuffer( tmSyncBuffer_, seqNum_, upNodes_ ); |
| |
| // Add tmsync data |
| AddTmsyncMsg( sync, msg ); |
| |
| if (trace_settings & (TRACE_SYNC_DETAIL | TRACE_TMSYNC)) |
| trace_printf("%s@%d - tmSyncBuffer_=%p, msg=%p\n", |
| method_name, __LINE__, tmSyncBuffer_, msg); |
| |
| int syncSize = sizeof(cluster_state_def_t) |
| + sizeof(msgInfo_t) |
| + tmSyncBuffer_->msgInfo.msg_offset; |
| |
| if (trace_settings & (TRACE_SYNC_DETAIL | TRACE_TMSYNC)) |
| trace_printf("%s@%d - doing Allgather size=%d, message count=%d, seqNum_=%lld\n", |
| method_name, __LINE__, syncSize, |
| tmSyncBuffer_->msgInfo.msg_count, seqNum_); |
| |
| // Exchange info with other nodes |
| err = Allgather(syncSize, tmSyncBuffer_, (char *)recv_buffer, 0 /*seqNum_*/, status ); |
| if (err != MPI_SUCCESS && err != MPI_ERR_IN_STATUS) |
| { |
| if (trace_settings & (TRACE_SYNC | TRACE_TMSYNC)) |
| { |
| trace_printf("%s@%d - unexpected Allgather error=%s (%d)\n", |
| method_name, __LINE__, ErrorMsg(err), err); |
| } |
| |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), "[%s], Unexpected MPI communications " |
| "error=%s (%d).\n", method_name, ErrorMsg(err), err); |
| mon_log_write(MON_CLUSTER_EXCHANGETMSYNC_1, SQ_LOG_ERR, buf); |
| |
| // Allgather() failed in a fundamental way, bring this node down |
| if ( !enqueuedDown_ ) |
| { |
| enqueuedDown_ = true; |
| ReqQueue.enqueueDownReq(MyPNID); |
| } |
| } |
| else |
| { |
| UpdateClusterState( doShutdown, recv_buffer, status, |
| tmSyncBuffer_->nodeInfo.change_nid); |
| } |
| |
| // Increment count of "Allgather" calls. If wrap-around, start again at 1. |
| if ( ++seqNum_ == 0) seqNum_ = 1; |
| |
| if ( ProcessClusterData( recv_buffer, tmSyncBuffer_, false ) ) |
| { // There is a TmSync message remaining to be handled |
| ProcessClusterData( recv_buffer, tmSyncBuffer_, true ); |
| } |
| |
| --swpRecCount_; |
| |
| TRACE_EXIT; |
| } |
| |
| void CCluster::EpollCtl( int efd, int op, int fd, struct epoll_event *event ) |
| { |
| const char method_name[] = "CCluster::EpollCtl"; |
| TRACE_ENTRY; |
| |
| int rc = epoll_ctl( efd, op, fd, event ); |
| if ( rc == -1 ) |
| { |
| char ebuff[256]; |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf), "[%s@%d] epoll_ctl() error: %s\n", |
| method_name, __LINE__, strerror_r( errno, ebuff, 256 ) ); |
| mon_log_write( MON_CLUSTER_EPOLLCTL_1, SQ_LOG_CRIT, buf ); |
| MPI_Abort( MPI_COMM_SELF,99 ); |
| } |
| |
| TRACE_EXIT; |
| return; |
| } |
| |
| void CCluster::InitClusterSocks( int worldSize, int myRank, char *nodeNames, int *rankToPnid ) |
| { |
| const char method_name[] = "CCluster::InitClusterSocks"; |
| TRACE_ENTRY; |
| |
| int serverSyncPort; |
| CNode *node; |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d worldSize=%d, myRank=%d\n" |
| , method_name, __LINE__ |
| , worldSize, myRank); |
| } |
| |
| // Exchange ports with collective |
| serverSyncPort = MyNode->GetSyncSocketPort(); |
| int rc = MPI_Allgather( &serverSyncPort, 1, MPI_INT, |
| sockPorts_, 1, MPI_INT, MPI_COMM_WORLD ); |
| if ( rc != MPI_SUCCESS ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf), "[%s@%d] MPI_Allgather error=%s\n", |
| method_name, __LINE__, ErrorMsg( rc ) ); |
| mon_log_write( MON_CLUSTER_INITCLUSTERSOCKS_3, SQ_LOG_CRIT, buf ); |
| MPI_Abort( MPI_COMM_SELF,99 ); |
| } |
| |
| char *n, nodeName[MPI_MAX_PROCESSOR_NAME]; |
| unsigned char srcaddr[4], dstaddr[4]; |
| struct hostent *he; |
| if ( nodeNames ) |
| { |
| n = &nodeNames[myRank*MPI_MAX_PROCESSOR_NAME]; |
| } |
| else |
| { |
| strcpy( nodeName, "localhost" ); |
| n = nodeName; |
| } |
| he = gethostbyname( n ); |
| if ( !he ) |
| { |
| char ebuff[256]; |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf), "[%s@%d] gethostbyname(%s) error: %s\n", |
| method_name, __LINE__, n, strerror_r( errno, ebuff, 256 ) ); |
| mon_log_write( MON_CLUSTER_INITCLUSTERSOCKS_4, SQ_LOG_CRIT, buf ); |
| MPI_Abort( MPI_COMM_SELF,99 ); |
| } |
| memcpy( srcaddr, he->h_addr, 4 ); |
| int idst; |
| for ( int i = 0; i < worldSize; i++ ) |
| { |
| for ( int j = i+1; j < worldSize; j++ ) |
| { |
| if ( i == myRank ) |
| { |
| idst = j; |
| if ( nodeNames ) |
| { |
| n = &nodeNames[j*MPI_MAX_PROCESSOR_NAME]; |
| he = gethostbyname( n ); |
| if ( !he ) |
| { |
| char ebuff[256]; |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf), |
| "[%s@%d] gethostbyname(%s) error: %s\n", |
| method_name, __LINE__, n, |
| strerror_r( errno, ebuff, 256 ) ); |
| mon_log_write( MON_CLUSTER_INITCLUSTERSOCKS_5, SQ_LOG_CRIT, buf ); |
| MPI_Abort( MPI_COMM_SELF,99 ); |
| } |
| memcpy( dstaddr, he->h_addr, 4 ); |
| node = Nodes->GetNode( n ); |
| if ( node ) |
| { |
| node->SetSyncSocketPort(sockPorts_[j]); |
| } |
| } |
| else |
| { |
| node = NULL; |
| memcpy( dstaddr, srcaddr, 4 ); |
| } |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d Creating client socket: src=%d.%d.%d.%d, dst(%s)=%d.%d.%d.%d, dst port=%d\n" |
| , method_name, __LINE__ |
| , (int)((unsigned char *)srcaddr)[0] |
| , (int)((unsigned char *)srcaddr)[1] |
| , (int)((unsigned char *)srcaddr)[2] |
| , (int)((unsigned char *)srcaddr)[3] |
| , n |
| , (int)((unsigned char *)dstaddr)[0] |
| , (int)((unsigned char *)dstaddr)[1] |
| , (int)((unsigned char *)dstaddr)[2] |
| , (int)((unsigned char *)dstaddr)[3] |
| , sockPorts_[j] ); |
| } |
| |
| socks_[rankToPnid[j]] = MkCltSock( srcaddr, dstaddr, sockPorts_[j] ); |
| } |
| else if ( j == myRank ) |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d Accepting server socket: src=%d.%d.%d.%d, port=%d\n" |
| , method_name, __LINE__ |
| , (int)((unsigned char *)srcaddr)[0] |
| , (int)((unsigned char *)srcaddr)[1] |
| , (int)((unsigned char *)srcaddr)[2] |
| , (int)((unsigned char *)srcaddr)[3] |
| , serverSyncPort ); |
| } |
| |
| idst = i; |
| socks_[rankToPnid[i]] = AcceptSock( syncSock_ ); |
| } |
| else |
| { |
| idst = -1; |
| } |
| if ( idst >= 0 && socks_[rankToPnid[idst]] < 0 ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| if ( idst == i ) |
| { |
| snprintf( buf, sizeof(buf), "[%s@%d] mkcltsock src=%d.%d.%d.%d dst=%d.%d.%d.%d failed\n", |
| method_name, __LINE__, |
| (int)((unsigned char *)srcaddr)[0], |
| (int)((unsigned char *)srcaddr)[1], |
| (int)((unsigned char *)srcaddr)[2], |
| (int)((unsigned char *)srcaddr)[3], |
| (int)((unsigned char *)dstaddr)[0], |
| (int)((unsigned char *)dstaddr)[1], |
| (int)((unsigned char *)dstaddr)[2], |
| (int)((unsigned char *)dstaddr)[3] ); |
| } |
| else |
| { |
| snprintf( buf, sizeof(buf), "[%s@%d] acceptsock(%d) failed\n", |
| method_name, __LINE__, syncSock_ ); |
| } |
| mon_log_write( MON_CLUSTER_INITCLUSTERSOCKS_6, SQ_LOG_CRIT, buf ); |
| MPI_Abort( MPI_COMM_SELF,99 ); |
| } |
| if ( idst >= 0 && fcntl( socks_[rankToPnid[idst]], F_SETFL, O_NONBLOCK ) ) |
| { |
| char ebuff[256]; |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf), "[%s@%d] fcntl(NONBLOCK) error: %s\n", |
| method_name, __LINE__, strerror_r( errno, ebuff, 256 ) ); |
| mon_log_write( MON_CLUSTER_INITCLUSTERSOCKS_7, SQ_LOG_CRIT, buf ); |
| MPI_Abort( MPI_COMM_SELF,99 ); |
| } |
| MPI_Barrier( MPI_COMM_WORLD ); |
| } |
| } |
| TRACE_EXIT; |
| } |
| |
| void CCluster::InitServerSock( void ) |
| { |
| const char method_name[] = "CCluster::InitServerSock"; |
| TRACE_ENTRY; |
| int serverCommPort; |
| int serverSyncPort; |
| |
| unsigned char addr[4]; |
| struct hostent *he; |
| |
| he = gethostbyname( Node_name ); |
| if ( !he ) |
| { |
| char ebuff[256]; |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s@%d] gethostbyname(%s) error: %s\n" |
| , method_name, __LINE__ |
| , Node_name, strerror_r( errno, ebuff, 256 ) ); |
| mon_log_write( MON_CLUSTER_INITSERVERSOCK_1, SQ_LOG_CRIT, buf ); |
| abort(); |
| } |
| memcpy( addr, he->h_addr, 4 ); |
| |
| commSock_ = MkSrvSock( &serverCommPort ); |
| if ( commSock_ < 0 ) |
| { |
| char ebuff[256]; |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s@%d] MkSrvSock(comm) error: %s\n" |
| , method_name, __LINE__, strerror_r( errno, ebuff, 256 ) ); |
| mon_log_write( MON_CLUSTER_INITSERVERSOCK_2, SQ_LOG_CRIT, buf ); |
| abort(); |
| } |
| else |
| { |
| snprintf( MyCommPort, sizeof(MyCommPort) |
| , "%d.%d.%d.%d:%d" |
| , (int)((unsigned char *)addr)[0] |
| , (int)((unsigned char *)addr)[1] |
| , (int)((unsigned char *)addr)[2] |
| , (int)((unsigned char *)addr)[3] |
| , serverCommPort ); |
| MyNode->SetCommSocketPort( serverCommPort ); |
| MyNode->SetCommPort( MyCommPort ); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf( "%s@%d Initialized my comm socket port, " |
| "pnid=%d (%s:%s) (commPort=%s)\n" |
| , method_name, __LINE__ |
| , MyPNID, MyNode->GetName(), MyCommPort |
| , MyNode->GetCommPort() ); |
| |
| } |
| |
| syncSock_ = MkSrvSock( &serverSyncPort ); |
| if ( syncSock_ < 0 ) |
| { |
| char ebuff[256]; |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s@%d] MkSrvSock(sync) error: %s\n" |
| , method_name, __LINE__, strerror_r( errno, ebuff, 256 ) ); |
| mon_log_write( MON_CLUSTER_INITSERVERSOCK_3, SQ_LOG_CRIT, buf ); |
| abort(); |
| } |
| else |
| { |
| snprintf( MySyncPort, sizeof(MySyncPort) |
| , "%d.%d.%d.%d:%d" |
| , (int)((unsigned char *)addr)[0] |
| , (int)((unsigned char *)addr)[1] |
| , (int)((unsigned char *)addr)[2] |
| , (int)((unsigned char *)addr)[3] |
| , serverSyncPort ); |
| MyNode->SetSyncSocketPort( serverSyncPort ); |
| MyNode->SetSyncPort( MySyncPort ); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf( "%s@%d Initialized my sync socket port, " |
| "pnid=%d (%s:%s) (syncPort=%s)\n" |
| , method_name, __LINE__ |
| , MyPNID, MyNode->GetName(), MySyncPort |
| , MyNode->GetSyncPort() ); |
| } |
| |
| epollFD_ = epoll_create1( EPOLL_CLOEXEC ); |
| if ( epollFD_ < 0 ) |
| { |
| char ebuff[256]; |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf), "[%s@%d] epoll_create1() error: %s\n", |
| method_name, __LINE__, strerror_r( errno, ebuff, 256 ) ); |
| mon_log_write( MON_CLUSTER_INITSERVERSOCK_4, SQ_LOG_CRIT, buf ); |
| MPI_Abort( MPI_COMM_SELF,99 ); |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| int CCluster::AcceptCommSock( void ) |
| { |
| const char method_name[] = "CCluster::AcceptCommSock"; |
| TRACE_ENTRY; |
| |
| int csock = AcceptSock( commSock_ ); |
| |
| TRACE_EXIT; |
| return( csock ); |
| } |
| |
| int CCluster::AcceptSyncSock( void ) |
| { |
| const char method_name[] = "CCluster::AcceptSyncSock"; |
| TRACE_ENTRY; |
| |
| int csock = AcceptSock( syncSock_ ); |
| |
| TRACE_EXIT; |
| return( csock ); |
| } |
| |
| int CCluster::AcceptSock( int sock ) |
| { |
| const char method_name[] = "CCluster::AcceptSock"; |
| TRACE_ENTRY; |
| |
| #if defined(_XOPEN_SOURCE_EXTENDED) |
| #ifdef __LP64__ |
| socklen_t size; // size of socket address |
| #else |
| size_t size; // size of socket address |
| #endif |
| #else |
| int size; // size of socket address |
| #endif |
| int csock; // connected socket |
| struct sockaddr_in sockinfo; // socket address info |
| |
| if ( getsockname( sock, (struct sockaddr *) &sockinfo, &size ) ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| int err = errno; |
| snprintf(buf, sizeof(buf), "[%s], getsockname() failed, errno=%d (%s).\n", |
| method_name, err, strerror(err)); |
| mon_log_write(MON_CLUSTER_ACCEPTSOCK_2, SQ_LOG_ERR, buf); |
| return ( -1 ); |
| } |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| unsigned char *addrp = (unsigned char *) &sockinfo.sin_addr.s_addr; |
| trace_printf( "%s@%d - Accepting socket on addr=%d.%d.%d.%d, port=%d\n" |
| , method_name, __LINE__ |
| , addrp[0] |
| , addrp[1] |
| , addrp[2] |
| , addrp[3] |
| , (int) ntohs( sockinfo.sin_port ) ); |
| } |
| |
| while ( ((csock = accept( sock |
| , (struct sockaddr *) 0 |
| , (socklen_t *) 0 ) ) < 0) && (errno == EINTR) ); |
| |
| if ( csock > 0 ) |
| { |
| int reuse; |
| if ( setsockopt( csock |
| , SOL_SOCKET |
| , SO_REUSEADDR |
| , (char *) &reuse |
| , sizeof(int) ) ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| int err = errno; |
| snprintf(buf, sizeof(buf), "[%s], setsockopt() failed, errno=%d (%s).\n", |
| method_name, err, strerror(err)); |
| mon_log_write(MON_CLUSTER_ACCEPTSOCK_3, SQ_LOG_ERR, buf); |
| return ( -2 ); |
| } |
| } |
| |
| TRACE_EXIT; |
| return ( (int)csock ); |
| } |
| |
| int CCluster::Connect( const char *portName ) |
| { |
| const char method_name[] = "CCluster::Connect"; |
| TRACE_ENTRY; |
| |
| int sock; // socket |
| int ret; // returned value |
| int reuse = 1; // sockopt reuse option |
| #if defined(_XOPEN_SOURCE_EXTENDED) |
| #ifdef __LP64__ |
| socklen_t size; // size of socket address |
| #else |
| size_t size; // size of socket address |
| #endif |
| #else |
| int size; // size of socket address |
| #endif |
| static int retries = 0; // # times to retry connect |
| int outer_failures = 0; // # failed connect loops |
| int connect_failures = 0; // # failed connects |
| char *p; // getenv results |
| struct sockaddr_in sockinfo; // socket address info |
| struct hostent *he; |
| char host[1000]; |
| const char *colon; |
| unsigned int port; |
| |
| colon = strstr(portName, ":"); |
| strcpy(host, portName); |
| int len = colon - portName; |
| host[len] = '\0'; |
| port = atoi(&colon[1]); |
| |
| size = sizeof(sockinfo); |
| |
| if ( !retries ) |
| { |
| p = getenv( "HPMP_CONNECT_RETRIES" ); |
| if ( p ) retries = atoi( p ); |
| else retries = 5; |
| } |
| |
| for ( ;; ) |
| { |
| sock = socket( AF_INET, SOCK_STREAM, 0 ); |
| if ( sock < 0 ) |
| { |
| char la_buf[MON_STRING_BUF_SIZE]; |
| int err = errno; |
| sprintf( la_buf, "[%s], socket() failed! errno=%d (%s)\n" |
| , method_name, err, strerror( err )); |
| mon_log_write(MON_CLUSTER_CONNECT_1, SQ_LOG_CRIT, la_buf); |
| abort(); |
| } |
| |
| he = gethostbyname( host ); |
| if ( !he ) |
| { |
| char ebuff[256]; |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf), "[%s@%d] gethostbyname(%s) error: %s\n", |
| method_name, __LINE__, host, strerror_r( errno, ebuff, 256 ) ); |
| mon_log_write( MON_CLUSTER_CONNECT_2, SQ_LOG_CRIT, buf ); |
| abort(); |
| } |
| |
| // Connect socket. |
| memset( (char *) &sockinfo, 0, size ); |
| memcpy( (char *) &sockinfo.sin_addr, (char *) he->h_addr, 4 ); |
| sockinfo.sin_family = AF_INET; |
| sockinfo.sin_port = htons( (unsigned short) port ); |
| |
| // Note the outer loop uses "retries" from HPMP_CONNECT_RETRIES, |
| // and has a yield between each retry, since it's more oriented |
| // toward failures from network overload and putting a pause |
| // between retries. This inner loop should only iterate when |
| // a signal interrupts the local process, so it doesn't pause |
| // or use the same HPMP_CONNECT_RETRIES count. |
| connect_failures = 0; |
| ret = 1; |
| while ( ret != 0 && connect_failures <= 10 ) |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d - Connecting to %s addr=%d.%d.%d.%d, port=%d, connect_failures=%d\n" |
| , method_name, __LINE__ |
| , host |
| , (int)((unsigned char *)he->h_addr)[0] |
| , (int)((unsigned char *)he->h_addr)[1] |
| , (int)((unsigned char *)he->h_addr)[2] |
| , (int)((unsigned char *)he->h_addr)[3] |
| , port |
| , connect_failures ); |
| } |
| |
| ret = connect( sock, (struct sockaddr *) &sockinfo, size ); |
| if ( ret == 0 ) break; |
| if ( errno == EINTR ) |
| { |
| ++connect_failures; |
| } |
| else |
| { |
| char la_buf[MON_STRING_BUF_SIZE]; |
| int err = errno; |
| sprintf( la_buf, "[%s], connect() failed! errno=%d (%s)\n" |
| , method_name, err, strerror( err )); |
| mon_log_write(MON_CLUSTER_CONNECT_3, SQ_LOG_ERR, la_buf); |
| close(sock); |
| return ( -1 ); |
| } |
| } |
| |
| if ( ret == 0 ) break; |
| |
| // For large clusters, the connect/accept calls seem to fail occasionally, |
| // no doubt do to the large number (1000's) of simultaneous connect packets |
| // flooding the network at once. So, we retry up to HPMP_CONNECT_RETRIES |
| // number of times. |
| if ( errno != EINTR ) |
| { |
| if ( ++outer_failures > retries ) |
| { |
| char la_buf[MON_STRING_BUF_SIZE]; |
| sprintf( la_buf, "[%s], connect() exceeded retries! count=%d\n" |
| , method_name, retries); |
| mon_log_write(MON_CLUSTER_CONNECT_4, SQ_LOG_ERR, la_buf); |
| close( sock ); |
| return ( -1 ); |
| } |
| struct timespec req, rem; |
| req.tv_sec = 0; |
| req.tv_nsec = 500000; |
| nanosleep( &req, &rem ); |
| } |
| close( (int)sock ); |
| } |
| |
| if ( setsockopt( sock, SOL_SOCKET, SO_REUSEADDR, (char *) &reuse, sizeof(int) ) ) |
| { |
| char la_buf[MON_STRING_BUF_SIZE]; |
| int err = errno; |
| sprintf( la_buf, "[%s], setsockopt() failed! errno=%d (%s)\n" |
| , method_name, err, strerror( err )); |
| mon_log_write(MON_CLUSTER_CONNECT_5, SQ_LOG_ERR, la_buf); |
| close( (int)sock ); |
| return ( -2 ); |
| } |
| |
| TRACE_EXIT; |
| return ( (int)sock ); |
| } |
| |
| void CCluster::ConnectToSelf( void ) |
| { |
| const char method_name[] = "CCluster::ConnectToSelf"; |
| TRACE_ENTRY; |
| |
| int sock; // socket |
| int ret; // returned value |
| #if defined(_XOPEN_SOURCE_EXTENDED) |
| #ifdef __LP64__ |
| socklen_t size; // size of socket address |
| #else |
| size_t size; // size of socket address |
| #endif |
| #else |
| int size; // size of socket address |
| #endif |
| static int retries = 0; // # times to retry connect |
| int connect_failures = 0; // # failed connects |
| char *p; // getenv results |
| struct sockaddr_in sockinfo; // socket address info |
| struct hostent *he; |
| |
| size = sizeof(sockinfo); |
| |
| if ( !retries ) |
| { |
| p = getenv( "HPMP_CONNECT_RETRIES" ); |
| if ( p ) retries = atoi( p ); |
| else retries = 5; |
| } |
| |
| sock = socket( AF_INET, SOCK_STREAM, 0 ); |
| if ( sock < 0 ) |
| { |
| char la_buf[MON_STRING_BUF_SIZE]; |
| int err = errno; |
| sprintf( la_buf, "[%s], socket() failed! errno=%d (%s)\n" |
| , method_name, err, strerror( err )); |
| mon_log_write(MON_CLUSTER_CONNECTTOSELF_1, SQ_LOG_CRIT, la_buf); |
| MPI_Abort( MPI_COMM_SELF,99 ); |
| } |
| |
| he = gethostbyname( "localhost" ); |
| if ( !he ) |
| { |
| char ebuff[256]; |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf), "[%s@%d] gethostbyname(%s) error: %s\n", |
| method_name, __LINE__, "localhost", strerror_r( errno, ebuff, 256 ) ); |
| mon_log_write( MON_CLUSTER_CONNECTTOSELF_2, SQ_LOG_CRIT, buf ); |
| MPI_Abort( MPI_COMM_SELF,99 ); |
| } |
| |
| // Connect socket. |
| memset( (char *) &sockinfo, 0, size ); |
| memcpy( (char *) &sockinfo.sin_addr, (char *) he->h_addr, 4 ); |
| sockinfo.sin_family = AF_INET; |
| sockinfo.sin_port = htons( (unsigned short) MyNode->GetCommSocketPort() ); |
| |
| connect_failures = 0; |
| ret = 1; |
| while ( ret != 0 && connect_failures <= 10 ) |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d - Connecting to localhost addr=%d.%d.%d.%d, port=%d, connect_failures=%d\n" |
| , method_name, __LINE__ |
| , (int)((unsigned char *)he->h_addr)[0] |
| , (int)((unsigned char *)he->h_addr)[1] |
| , (int)((unsigned char *)he->h_addr)[2] |
| , (int)((unsigned char *)he->h_addr)[3] |
| , MyNode->GetCommSocketPort() |
| , connect_failures ); |
| } |
| |
| ret = connect( sock, (struct sockaddr *) &sockinfo, size ); |
| if ( ret == 0 ) break; |
| if ( errno == EINTR ) |
| { |
| ++connect_failures; |
| } |
| else |
| { |
| char la_buf[MON_STRING_BUF_SIZE]; |
| int err = errno; |
| sprintf( la_buf, "[%s], connect() failed! errno=%d (%s)\n" |
| , method_name, err, strerror( err )); |
| mon_log_write(MON_CLUSTER_CONNECTTOSELF_3, SQ_LOG_CRIT, la_buf); |
| MPI_Abort( MPI_COMM_SELF,99 ); |
| } |
| } |
| |
| close( (int)sock ); |
| |
| TRACE_EXIT; |
| } |
| |
| int CCluster::MkSrvSock( int *pport ) |
| { |
| const char method_name[] = "CCluster::MkSrvSock"; |
| TRACE_ENTRY; |
| |
| int sock; // socket |
| int err; // return code |
| #if defined(_XOPEN_SOURCE_EXTENDED) |
| #ifdef __LP64__ |
| socklen_t size; // size of socket address |
| #else |
| size_t size; // size of socket address |
| #endif |
| #else |
| unsigned int size; // size of socket address |
| #endif |
| struct sockaddr_in sockinfo; // socket address info |
| |
| sock = socket( AF_INET, SOCK_STREAM, 0 ); |
| if ( sock < 0 ) |
| { |
| char la_buf[MON_STRING_BUF_SIZE]; |
| int err = errno; |
| sprintf( la_buf, "[%s], socket() failed! errno=%d (%s)\n" |
| , method_name, err, strerror( err )); |
| mon_log_write(MON_CLUSTER_MKSRVSOCK_1, SQ_LOG_CRIT, la_buf); |
| return ( -1 ); |
| } |
| |
| // Bind socket. |
| size = sizeof(sockinfo); |
| memset( (char *) &sockinfo, 0, size ); |
| sockinfo.sin_family = AF_INET; |
| sockinfo.sin_addr.s_addr = htonl( INADDR_ANY ); |
| sockinfo.sin_port = htons( 0 ); |
| do |
| { |
| err = bind( sock, (struct sockaddr *) &sockinfo, size ); |
| sched_yield( ); |
| } while ( err && errno == EADDRINUSE ); |
| |
| if ( err ) |
| { |
| char la_buf[MON_STRING_BUF_SIZE]; |
| int err = errno; |
| sprintf( la_buf, "[%s], bind() failed! errno=%d (%s)\n" |
| , method_name, err, strerror( err )); |
| mon_log_write(MON_CLUSTER_MKSRVSOCK_2, SQ_LOG_CRIT, la_buf); |
| close( (int)sock ); |
| return ( -1 ); |
| } |
| |
| if ( pport ) |
| { |
| if ( getsockname( sock, (struct sockaddr *) &sockinfo, &size ) ) |
| { |
| char la_buf[MON_STRING_BUF_SIZE]; |
| int err = errno; |
| sprintf( la_buf, "[%s], getsockname() failed! errno=%d (%s)\n" |
| , method_name, err, strerror( err )); |
| mon_log_write(MON_CLUSTER_MKSRVSOCK_3, SQ_LOG_CRIT, la_buf); |
| close( (int)sock ); |
| return ( -1 ); |
| } |
| |
| *pport = (int) ntohs( sockinfo.sin_port ); |
| } |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| unsigned char *addrp = (unsigned char *) &sockinfo.sin_addr.s_addr; |
| trace_printf( "%s@%d listening on addr=%d.%d.%d.%d, port=%d\n" |
| , method_name, __LINE__ |
| , addrp[0] |
| , addrp[1] |
| , addrp[2] |
| , addrp[3] |
| , pport?*pport:0); |
| } |
| |
| // Listen |
| if ( listen( sock, SOMAXCONN ) ) |
| { |
| char la_buf[MON_STRING_BUF_SIZE]; |
| int err = errno; |
| sprintf( la_buf, "[%s], listen() failed! errno=%d (%s)\n" |
| , method_name, err, strerror( err )); |
| mon_log_write(MON_CLUSTER_MKSRVSOCK_4, SQ_LOG_CRIT, la_buf); |
| close( (int)sock ); |
| return ( -1 ); |
| } |
| |
| TRACE_EXIT; |
| return ( (int)sock ); |
| } |
| |
| int CCluster::MkCltSock( const char *portName ) |
| { |
| const char method_name[] = "CCluster::MkCltSock"; |
| TRACE_ENTRY; |
| |
| int sock; // socket |
| int ret; // returned value |
| int reuse = 1; // sockopt reuse option |
| #if defined(_XOPEN_SOURCE_EXTENDED) |
| #ifdef __LP64__ |
| socklen_t size; // size of socket address |
| #else |
| size_t size; // size of socket address |
| #endif |
| #else |
| int size; // size of socket address |
| #endif |
| static int retries = 0; // # times to retry connect |
| int outer_failures = 0; // # failed connect loops |
| int connect_failures = 0; // # failed connects |
| char *p; // getenv results |
| struct sockaddr_in sockinfo; // socket address info |
| struct hostent *he; |
| char host[1000]; |
| const char *colon; |
| unsigned int port; |
| |
| colon = strstr(portName, ":"); |
| strcpy(host, portName); |
| int len = colon - portName; |
| host[len] = '\0'; |
| port = atoi(&colon[1]); |
| |
| size = sizeof(sockinfo); |
| |
| if ( !retries ) |
| { |
| p = getenv( "HPMP_CONNECT_RETRIES" ); |
| if ( p ) retries = atoi( p ); |
| else retries = 5; |
| } |
| |
| for ( ;; ) |
| { |
| sock = socket( AF_INET, SOCK_STREAM, 0 ); |
| if ( sock < 0 ) |
| { |
| char la_buf[MON_STRING_BUF_SIZE]; |
| int err = errno; |
| snprintf( la_buf, sizeof(la_buf) |
| , "[%s], socket() failed! errno=%d (%s)\n" |
| , method_name, err, strerror( err )); |
| mon_log_write(MON_CLUSTER_MKCLTSOCK_1, SQ_LOG_ERR, la_buf); |
| return ( -1 ); |
| } |
| |
| he = gethostbyname( host ); |
| if ( !he ) |
| { |
| char la_buf[MON_STRING_BUF_SIZE]; |
| int err = errno; |
| snprintf( la_buf, sizeof(la_buf), |
| "[%s] gethostbyname(%s) failed! errno=%d (%s)\n" |
| , method_name, host, err, strerror( err )); |
| mon_log_write(MON_CLUSTER_MKCLTSOCK_2, SQ_LOG_ERR, la_buf); |
| close( sock ); |
| return ( -1 ); |
| } |
| |
| // Connect socket. |
| memset( (char *) &sockinfo, 0, size ); |
| memcpy( (char *) &sockinfo.sin_addr, (char *) he->h_addr, 4 ); |
| sockinfo.sin_family = AF_INET; |
| sockinfo.sin_port = htons( (unsigned short) port ); |
| |
| // Note the outer loop uses "retries" from HPMP_CONNECT_RETRIES, |
| // and has a yield between each retry, since it's more oriented |
| // toward failures from network overload and putting a pause |
| // between retries. This inner loop should only iterate when |
| // a signal interrupts the local process, so it doesn't pause |
| // or use the same HPMP_CONNECT_RETRIES count. |
| connect_failures = 0; |
| ret = 1; |
| while ( ret != 0 && connect_failures <= 10 ) |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d - Connecting to %s addr=%d.%d.%d.%d, port=%d, connect_failures=%d\n" |
| , method_name, __LINE__ |
| , host |
| , (int)((unsigned char *)he->h_addr)[0] |
| , (int)((unsigned char *)he->h_addr)[1] |
| , (int)((unsigned char *)he->h_addr)[2] |
| , (int)((unsigned char *)he->h_addr)[3] |
| , port |
| , connect_failures ); |
| } |
| |
| ret = connect( sock, (struct sockaddr *) &sockinfo, size ); |
| if ( ret == 0 ) break; |
| if ( errno == EINTR ) |
| { |
| ++connect_failures; |
| } |
| else |
| { |
| char la_buf[MON_STRING_BUF_SIZE]; |
| int err = errno; |
| sprintf( la_buf, "[%s], connect() failed! errno=%d (%s)\n" |
| , method_name, err, strerror( err )); |
| mon_log_write(MON_CLUSTER_MKCLTSOCK_3, SQ_LOG_ERR, la_buf); |
| close(sock); |
| return ( -1 ); |
| } |
| } |
| |
| if ( ret == 0 ) break; |
| |
| // For large clusters, the connect/accept calls seem to fail occasionally, |
| // no doubt do to the large number (1000's) of simultaneous connect packets |
| // flooding the network at once. So, we retry up to HPMP_CONNECT_RETRIES |
| // number of times. |
| if ( errno != EINTR ) |
| { |
| if ( ++outer_failures > retries ) |
| { |
| char la_buf[MON_STRING_BUF_SIZE]; |
| sprintf( la_buf, "[%s], connect() exceeded retries! count=%d\n" |
| , method_name, retries); |
| mon_log_write(MON_CLUSTER_MKCLTSOCK_4, SQ_LOG_ERR, la_buf); |
| close( sock ); |
| return ( -1 ); |
| } |
| struct timespec req, rem; |
| req.tv_sec = 0; |
| req.tv_nsec = 500000; |
| nanosleep( &req, &rem ); |
| } |
| close( sock ); |
| } |
| |
| if ( setsockopt( sock, SOL_SOCKET, SO_REUSEADDR, (char *) &reuse, sizeof(int) ) ) |
| { |
| char la_buf[MON_STRING_BUF_SIZE]; |
| int err = errno; |
| sprintf( la_buf, "[%s], setsockopt() failed! errno=%d (%s)\n" |
| , method_name, err, strerror( err )); |
| mon_log_write(MON_CLUSTER_MKCLTSOCK_5, SQ_LOG_ERR, la_buf); |
| close( (int)sock ); |
| return ( -2 ); |
| } |
| |
| TRACE_EXIT; |
| return ( sock ); |
| } |
| |
| int CCluster::MkCltSock( unsigned char srcip[4], unsigned char dstip[4], int port ) |
| { |
| const char method_name[] = "CCluster::MkCltSock"; |
| TRACE_ENTRY; |
| |
| int sock; // socket |
| int ret; // returned value |
| int reuse = 1; // sockopt reuse option |
| #if defined(_XOPEN_SOURCE_EXTENDED) |
| #ifdef __LP64__ |
| socklen_t size; // size of socket address |
| #else |
| size_t size; // size of socket address |
| #endif |
| #else |
| int size; // size of socket address |
| #endif |
| static int retries = 0; // # times to retry connect |
| int outer_failures = 0; // # failed connect loops |
| int connect_failures = 0; // # failed connects |
| char *p; // getenv results |
| struct sockaddr_in sockinfo; // socket address info |
| |
| size = sizeof(sockinfo); |
| |
| if ( !retries ) |
| { |
| p = getenv( "HPMP_CONNECT_RETRIES" ); |
| if ( p ) retries = atoi( p ); |
| else retries = 5; |
| } |
| |
| for ( ;; ) |
| { |
| sock = socket( AF_INET, SOCK_STREAM, 0 ); |
| if ( sock < 0 ) return ( -1 ); |
| |
| // Bind local address if specified. |
| if ( srcip ) |
| { |
| memset( (char *) &sockinfo, 0, size ); |
| memcpy( (char *) &sockinfo.sin_addr, |
| (char *) srcip, sizeof(srcip) ); |
| sockinfo.sin_family = AF_INET; |
| sockinfo.sin_port = 0; |
| if ( bind( sock, (struct sockaddr *) &sockinfo, size ) ) |
| { |
| char la_buf[MON_STRING_BUF_SIZE]; |
| int err = errno; |
| sprintf( la_buf, "[%s], bind() failed! errno=%d (%s)\n" |
| , method_name, err, strerror( err )); |
| mon_log_write(MON_CLUSTER_MKCLTSOCK_6, SQ_LOG_ERR, la_buf); |
| close( sock ); |
| return ( -1 ); |
| } |
| } |
| |
| // Connect socket. |
| memset( (char *) &sockinfo, 0, size ); |
| memcpy( (char *) &sockinfo.sin_addr, (char *) dstip, 4 ); |
| sockinfo.sin_family = AF_INET; |
| sockinfo.sin_port = htons( (unsigned short) port ); |
| |
| // Note the outer loop uses "retries" from HPMP_CONNECT_RETRIES, |
| // and has a yield between each retry, since it's more oriented |
| // toward failures from network overload and putting a pause |
| // between retries. This inner loop should only iterate when |
| // a signal interrupts the local process, so it doesn't pause |
| // or use the same HPMP_CONNECT_RETRIES count. |
| connect_failures = 0; |
| ret = 1; |
| while ( ret != 0 && connect_failures <= 10 ) |
| { |
| ret = connect( sock, (struct sockaddr *) &sockinfo, |
| size ); |
| if ( ret == 0 ) break; |
| if ( errno == EINTR ) |
| { |
| ++connect_failures; |
| } |
| else |
| { |
| char la_buf[MON_STRING_BUF_SIZE]; |
| int err = errno; |
| sprintf( la_buf, "[%s], connect() failed! errno=%d (%s)\n" |
| , method_name, err, strerror( err )); |
| mon_log_write(MON_CLUSTER_MKCLTSOCK_7, SQ_LOG_ERR, la_buf); |
| close(sock); |
| return ( -1 ); |
| } |
| } |
| |
| if ( ret == 0 ) break; |
| |
| // For large clusters, the connect/accept calls seem to fail occasionally, |
| // no doubt do to the large number (1000's) of simultaneous connect packets |
| // flooding the network at once. So, we retry up to HPMP_CONNECT_RETRIES |
| // number of times. |
| if ( errno != EINTR ) |
| { |
| if ( ++outer_failures > retries ) |
| { |
| char la_buf[MON_STRING_BUF_SIZE]; |
| sprintf( la_buf, "[%s], connect() exceeded retries! count=%d\n" |
| , method_name, retries); |
| mon_log_write(MON_CLUSTER_MKCLTSOCK_8, SQ_LOG_ERR, la_buf); |
| close( sock ); |
| return ( -1 ); |
| } |
| struct timespec req, rem; |
| req.tv_sec = 0; |
| req.tv_nsec = 500000; |
| nanosleep( &req, &rem ); |
| } |
| close( sock ); |
| } |
| |
| if ( setsockopt( sock, SOL_SOCKET, SO_REUSEADDR, (char *) &reuse, sizeof(int) ) ) |
| { |
| char la_buf[MON_STRING_BUF_SIZE]; |
| int err = errno; |
| sprintf( la_buf, "[%s], setsockopt() failed! errno=%d (%s)\n" |
| , method_name, err, strerror( err )); |
| mon_log_write(MON_CLUSTER_MKCLTSOCK_8, SQ_LOG_ERR, la_buf); |
| close( sock ); |
| return ( -2 ); |
| } |
| |
| TRACE_EXIT; |
| return ( sock ); |
| } |
| |
| int CCluster::ReceiveMPI(char *buf, int size, int source, MonXChngTags tag, MPI_Comm comm) |
| { |
| const char method_name[] = "CCluster::ReceiveMPI"; |
| TRACE_ENTRY; |
| |
| MPI_Request request; |
| MPI_Status status; |
| int received = 0; |
| |
| int error = MPI_Irecv(buf, size, MPI_CHAR, source, tag, comm, &request); |
| |
| if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf("%s@%d - Msg Received. Error = %d\n", method_name, __LINE__, error); |
| |
| if (!error) |
| { |
| while (!received) |
| { |
| error = MPI_Test(&request, &received, &status); |
| |
| if (!error) |
| { |
| if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf("%s@%d - Msg Received Test. Flag = %d\n", method_name, __LINE__, received); |
| } |
| else |
| { |
| usleep(10000); // sleep 10ms and try again |
| } |
| } |
| } |
| |
| TRACE_EXIT; |
| return error; |
| } |
| |
| int CCluster::SendMPI(char *buf, int size, int source, MonXChngTags tag, MPI_Comm comm) |
| { |
| const char method_name[] = "CCluster::SendMPI"; |
| TRACE_ENTRY; |
| |
| MPI_Request request; |
| MPI_Status status; |
| int sent = 0; |
| |
| int error = MPI_Isend(buf, size, MPI_CHAR, source, tag, comm, &request); |
| |
| if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf("%s@%d - Msg Sent. Error = %d\n", method_name, __LINE__, error); |
| |
| if (!error) |
| { |
| while (!sent) |
| { |
| error = MPI_Test(&request, &sent, &status); |
| |
| if (!error) |
| { |
| if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf("%s@%d - Msg Sent Test. Flag = %d\n", method_name, __LINE__, sent); |
| } |
| else |
| { |
| usleep(10000); // sleep 10ms and try again |
| } |
| } |
| } |
| |
| TRACE_EXIT; |
| return error; |
| } |
| |
| int CCluster::ReceiveSock(char *buf, int size, int sockFd) |
| { |
| const char method_name[] = "CCluster::ReceiveSock"; |
| TRACE_ENTRY; |
| |
| bool readAgain = false; |
| int error = 0; |
| int readCount = 0; |
| int received = 0; |
| int sizeCount = size; |
| |
| do |
| { |
| readCount = (int) recv( sockFd |
| , buf |
| , sizeCount |
| , 0 ); |
| |
| if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d - Count read %d = recv(%d)\n" |
| , method_name, __LINE__ |
| , readCount |
| , sizeCount ); |
| } |
| |
| if ( readCount > 0 ) |
| { // Got data |
| received += readCount; |
| buf += readCount; |
| if ( received == size ) |
| { |
| readAgain = false; |
| } |
| else |
| { |
| sizeCount -= received; |
| readAgain = true; |
| } |
| } |
| else if ( readCount == 0 ) |
| { // EOF |
| error = ENODATA; |
| readAgain = false; |
| } |
| else |
| { // Got an error |
| if ( errno != EINTR) |
| { |
| error = errno; |
| readAgain = false; |
| } |
| else |
| { |
| readAgain = true; |
| } |
| } |
| } |
| while( readAgain ); |
| |
| if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d - recv(), received=%d, error=%d(%s)\n" |
| , method_name, __LINE__ |
| , received |
| , error, strerror(error) ); |
| } |
| |
| TRACE_EXIT; |
| return error; |
| } |
| |
| int CCluster::SendSock(char *buf, int size, int sockFd) |
| { |
| const char method_name[] = "CCluster::SendSock"; |
| TRACE_ENTRY; |
| |
| bool sendAgain = false; |
| int error = 0; |
| int sendCount = 0; |
| int sent = 0; |
| |
| do |
| { |
| sendCount = (int) send( sockFd |
| , buf |
| , size |
| , 0 ); |
| |
| if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d - send(), sendCount=%d\n" |
| , method_name, __LINE__ |
| , sendCount ); |
| } |
| |
| if ( sendCount > 0 ) |
| { // Sent data |
| sent += sendCount; |
| if ( sendCount == size ) |
| { |
| sendAgain = false; |
| } |
| else |
| { |
| sendAgain = true; |
| } |
| } |
| else |
| { // Got an error |
| if ( errno != EINTR) |
| { |
| error = errno; |
| sendAgain = false; |
| } |
| else |
| { |
| sendAgain = true; |
| } |
| } |
| } |
| while( sendAgain ); |
| |
| if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d - send(), sent=%d, error=%d(%s)\n" |
| , method_name, __LINE__ |
| , sent |
| , error, strerror(error) ); |
| } |
| |
| TRACE_EXIT; |
| return error; |
| } |