Closes-Bug: 1411520 1411830
Fixes node down and node up processing.
Change-Id: I1f30a0298df395acdd58d530e4c0f5a5ba8356ce
diff --git a/sqf/export/include/common/evl_sqlog_eventnum.h b/sqf/export/include/common/evl_sqlog_eventnum.h
index f99726c..49afcfb 100644
--- a/sqf/export/include/common/evl_sqlog_eventnum.h
+++ b/sqf/export/include/common/evl_sqlog_eventnum.h
@@ -698,6 +698,8 @@
#define MON_COMMACCEPT_14 101320114
#define MON_COMMACCEPT_15 101320115
#define MON_COMMACCEPT_16 101320116
+#define MON_COMMACCEPT_17 101320117
+#define MON_COMMACCEPT_18 101320118
/* Module: reqnodedown.cxx = 33 */
#define MON_EXT_NODEDOWN_REQ 101330101
diff --git a/sqf/monitor/linux/cluster.cxx b/sqf/monitor/linux/cluster.cxx
index 8c9bcfa..aed7439 100644
--- a/sqf/monitor/linux/cluster.cxx
+++ b/sqf/monitor/linux/cluster.cxx
@@ -91,7 +91,8 @@
const char *JoiningPhaseString( JOINING_PHASE phase);
const char *StateString( STATE state);
const char *SyncStateString( SyncState state);
-const char *RedirEpollEventString( __uint32_t events );
+const char *EpollEventString( __uint32_t events );
+const char *EpollOpString( int op );
void CCluster::ActivateSpare( CNode *spareNode, CNode *downNode, bool checkHealth )
{
@@ -1213,8 +1214,16 @@
node->SetState( State_Joining );
if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
- trace_printf( "%s@%d" " - New monitor %s, pnid=%d, state=%s" "\n"
- , method_name, __LINE__, node->GetName(), node->GetPNid(), StateString( node->GetState() ) );
+ {
+ trace_printf( "%s@%d" " - New monitor %s, pnid=%d, state=%s" "\n"
+ , method_name, __LINE__, node->GetName(), node->GetPNid(), StateString( node->GetState() ) );
+ for ( int i =0; i < cfgPNodes_; i++ )
+ {
+ trace_printf( "%s@%d socks_[%d]=%d\n"
+ , method_name, __LINE__
+ , i, socks_[i]);
+ }
+ }
if ( MyNode->IsCreator() )
{
SQ_theLocalIOToClient->putOnNoticeQueue( MyNode->GetCreatorPid()
@@ -2520,9 +2529,23 @@
nodeStatus[i] = false;
}
- // Collect port info from other monitors
- char *portNums = new char[worldSize * MPI_MAX_PORT_NAME];
- rc = MPI_Allgather (MyCommPort, MPI_MAX_PORT_NAME, MPI_CHAR, portNums,
+ // Collect comm port info from other monitors
+ char *commPortNums = new char[worldSize * MPI_MAX_PORT_NAME];
+ rc = MPI_Allgather (MyCommPort, MPI_MAX_PORT_NAME, MPI_CHAR, commPortNums,
+ MPI_MAX_PORT_NAME, MPI_CHAR, MPI_COMM_WORLD);
+ if (rc != MPI_SUCCESS)
+ {
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf(buf, sizeof(buf), "[%s@%d] MPI_Allgather error=%s\n",
+ method_name, __LINE__, ErrorMsg(rc));
+ mon_log_write(MON_CLUSTER_INITCONFIGCLUSTER_2, SQ_LOG_CRIT, buf);
+
+ MPI_Abort(MPI_COMM_SELF,99);
+ }
+
+ // Collect sync port info from other monitors
+ char *syncPortNums = new char[worldSize * MPI_MAX_PORT_NAME];
+ rc = MPI_Allgather (MySyncPort, MPI_MAX_PORT_NAME, MPI_CHAR, syncPortNums,
MPI_MAX_PORT_NAME, MPI_CHAR, MPI_COMM_WORLD);
if (rc != MPI_SUCCESS)
{
@@ -2559,18 +2582,22 @@
node = Nodes->GetNode( nodeName );
if ( node )
{
- node->SetCommPort( &portNums[ i * MPI_MAX_PORT_NAME] );
+ node->SetCommPort( &commPortNums[ i * MPI_MAX_PORT_NAME] );
+ node->SetSyncPort( &syncPortNums[ i * MPI_MAX_PORT_NAME] );
rankToPnid[i] = node->GetPNid();
nodeStatus[rankToPnid[i]] = true;
if (trace_settings & TRACE_INIT)
{
- trace_printf( "%s@%d rankToPnid[%d]=%d (%s:%s)"
- "(node=%s,port=%s)\n"
+ trace_printf( "%s@%d rankToPnid[%d]=%d (%s:%s:%s)"
+ "(node=%s,commPort=%s,syncPort=%s)\n"
, method_name, __LINE__, i, rankToPnid[i]
- , node->GetName(), node->GetCommPort()
+ , node->GetName()
+ , node->GetCommPort()
+ , node->GetSyncPort()
, &nodeNames[ i * MPI_MAX_PROCESSOR_NAME]
- , &portNums[ i * MPI_MAX_PORT_NAME]);
+ , &commPortNums[ i * MPI_MAX_PORT_NAME]
+ , &syncPortNums[ i * MPI_MAX_PORT_NAME]);
}
}
else
@@ -2585,7 +2612,8 @@
mon_log_write(MON_CLUSTER_INITCONFIGCLUSTER_4, SQ_LOG_CRIT, buf);
}
}
- delete [] portNums;
+ delete [] commPortNums;
+ delete [] syncPortNums;
int TmLeaderPNid = LNode[TmLeaderNid]->GetNode()->GetPNid();
@@ -3213,10 +3241,6 @@
int existingCommFd;
int existingSyncFd;
- nodeId_t myNodeInfo;
- strcpy(myNodeInfo.nodeName, MyNode->GetName());
- strcpy(myNodeInfo.commPort, MyNode->GetCommPort());
- strcpy(myNodeInfo.syncPort, MyNode->GetSyncPort());
// Set bit indicating my node is up
upNodes_.upNodes[MyPNID/64] |= (1ull << MyPNID);
@@ -3253,12 +3277,18 @@
if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
{
- trace_printf("%s@%d Connected to creator monitor, sending id\n",
+ trace_printf("%s@%d Connected to creator monitor, sending my node info\n",
method_name, __LINE__);
}
// Send this node's name and port number so creator monitor
// knows who we are, and set flag to let creator monitor it is the CREATOR.
+ nodeId_t myNodeInfo;
+ strcpy(myNodeInfo.nodeName, MyNode->GetName());
+ strcpy(myNodeInfo.commPort, MyNode->GetCommPort());
+ strcpy(myNodeInfo.syncPort, MyNode->GetSyncPort());
+ myNodeInfo.pnid = MyNode->GetPNid();
+ myNodeInfo.creatorPNid = -1;
myNodeInfo.creator = true;
myNodeInfo.creatorShellPid = CreatorShellPid;
myNodeInfo.creatorShellVerifier = CreatorShellVerifier;
@@ -3272,14 +3302,21 @@
TEST_POINT( TP012_NODE_UP );
- nodeId_t *nodeInfo = new nodeId_t[cfgPNodes_];
-
mem_log_write(CMonLog::MON_REINTEGRATE_3, MyPNID);
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf("%s@%d Getting all node info from creator monitor\n",
+ method_name, __LINE__);
+ }
+
// Obtain node names & port numbers of existing monitors from
// the creator monitor.
+ nodeId_t *nodeInfo;
+ size_t nodeInfoSize = (sizeof(nodeId_t) * cfgPNodes_);
+ nodeInfo = (nodeId_t *) new char[nodeInfoSize];
rc = Monitor->ReceiveSock( (char *)nodeInfo
- , sizeof(nodeId_t)*cfgPNodes_
+ , nodeInfoSize
, joinSock_ );
if ( rc )
{
@@ -3297,14 +3334,17 @@
{
for (int i=0; i<cfgPNodes_; i++)
{
- trace_printf( "%s@%d - Port info for pnid=%d\n"
- "nodeInfo[%d].nodeName=%s\n"
- "nodeInfo[%d].commPort=%s\n"
- "nodeInfo[%d].syncPort=%s\n"
- , method_name, __LINE__, i
- , i, nodeInfo[i].nodeName
- , i, nodeInfo[i].commPort
- , i, nodeInfo[i].syncPort );
+ trace_printf( "%s@%d - Node info for pnid=%d:\n"
+ " nodeName=%s\n"
+ " commPort=%s\n"
+ " syncPort=%s\n"
+ " creatorPNid=%d\n"
+ , method_name, __LINE__
+ , nodeInfo[i].pnid
+ , nodeInfo[i].nodeName
+ , nodeInfo[i].commPort
+ , nodeInfo[i].syncPort
+ , nodeInfo[i].creatorPNid );
}
}
// Connect to each of the other existing monitors and let them know
@@ -3315,14 +3355,49 @@
myNodeInfo.creatorShellVerifier = -1;
for (int i=0; i<cfgPNodes_; i++)
{
- if (strcmp(nodeInfo[i].commPort, IntegratingMonitorPort) == 0)
+ if ( nodeInfo[i].creatorPNid != -1 && nodeInfo[i].creatorPNid == i )
{
+ // Get acknowledgement that creator monitor is ready to
+ // integrate this node.
+ int creatorpnid = -1;
+ rc = Monitor->ReceiveSock( (char *) &creatorpnid
+ , sizeof(creatorpnid)
+ , joinSock_ );
+ if ( rc || creatorpnid != nodeInfo[i].creatorPNid )
+ {
+ HandleReintegrateError( rc, Reintegrate_Err15, i, NULL,
+ false );
+ SendReIntegrateStatus( State_Down, Reintegrate_Err14 );
+ }
+
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Received ready indication from creator "
+ "node %d nodeInfo[%d].nodeName=%s\n"
+ , method_name, __LINE__
+ , creatorpnid, i , nodeInfo[i].nodeName);
+ }
+
otherMonRank_[i] = 0;
++CurNodes;
+ Node[i]->SetCommPort( nodeInfo[i].commPort );
Node[i]->SetSyncPort( nodeInfo[i].syncPort );
Node[i]->SetState( State_Up );
+ // Tell creator we are ready to accept its connection
+ int mypnid = MyPNID;
+ rc = Monitor->SendSock( (char *) &mypnid
+ , sizeof(mypnid)
+ , joinSock_ );
+ if ( rc )
+ {
+ HandleReintegrateError( rc, Reintegrate_Err4, i, &nodeInfo[i],
+ false );
+ SendReIntegrateStatus( State_Down, Reintegrate_Err14 );
+ }
+
+ // Connect to creator monitor
existingSyncFd = AcceptSyncSock();
if ( existingSyncFd < 0 )
{
@@ -3336,6 +3411,10 @@
if (trace_settings & (TRACE_RECOVERY | TRACE_INIT))
{
+ trace_printf( "%s@%d Connected to creator node %d (%s)\n"
+ , method_name, __LINE__
+ , nodeInfo[i].creatorPNid
+ , nodeInfo[i].nodeName );
trace_printf( "%s@%d socks_[%d]=%d\n"
, method_name, __LINE__
, i, socks_[i]);
@@ -3382,7 +3461,7 @@
if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
{
- trace_printf("%s@%d Connected to node %d (%s), sending id\n",
+ trace_printf("%s@%d Connected to node %d (%s), sending my node name\n",
method_name, __LINE__,i,nodeInfo[i].nodeName);
}
@@ -3403,11 +3482,11 @@
// race condition where the creator monitor could signal
// the monitors in the cluster to integrate the new node
// before one or more was ready to do the integration.
- int readyFlag;
- rc = Monitor->ReceiveSock( (char *) &readyFlag
- , sizeof(readyFlag)
+ int remotepnid = -1;
+ rc = Monitor->ReceiveSock( (char *) &remotepnid
+ , sizeof(remotepnid)
, existingCommFd );
- if ( rc )
+ if ( rc || remotepnid != i )
{
HandleReintegrateError( rc, Reintegrate_Err15, i, NULL,
false );
@@ -3416,16 +3495,19 @@
if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
{
- trace_printf( "%s@%d - Received ready-flag from node %d (%s)\n",
- method_name, __LINE__, i,
- nodeInfo[i].nodeName);
+ trace_printf( "%s@%d - Received ready indication from "
+ "node %d nodeInfo[%d].nodeName=%s\n"
+ , method_name, __LINE__
+ , remotepnid, i , nodeInfo[i].nodeName);
}
otherMonRank_[i] = 0;
++CurNodes;
- Node[i]->SetSyncPort(nodeInfo[i].syncPort );
+ Node[i]->SetCommPort( nodeInfo[i].commPort );
+ Node[i]->SetSyncPort( nodeInfo[i].syncPort );
Node[i]->SetState( State_Up );
+ // Connect to existing monitor
existingSyncFd = AcceptSyncSock();
if ( existingSyncFd < 0 )
{
@@ -3465,7 +3547,8 @@
"nodeInfo[%d].syncPort=%s\n"
"IntegratingMonitorPort=%s\n"
, method_name, __LINE__
- , i, i, nodeInfo[i].commPort
+ , i
+ , i, nodeInfo[i].commPort
, i, nodeInfo[i].syncPort
, IntegratingMonitorPort);
}
@@ -3476,14 +3559,16 @@
{
for (int i=0; i<cfgPNodes_; i++)
{
- trace_printf( "%s@%d - Port info for pnid=%d\n"
- "Node[%d] name=%s\n"
- "Node[%d] commPort=%s\n"
- "Node[%d] syncPort=%s\n"
- , method_name, __LINE__, i
- , i, Node[i]->GetName()
+ trace_printf( "%s@%d - Node info for pnid=%d (%s)\n"
+ " Node[%d] commPort=%s\n"
+ " Node[%d] syncPort=%s\n"
+ " Node[%d] creatorPNid=%d\n"
+ , method_name, __LINE__
+ , Node[i]->GetPNid()
+ , Node[i]->GetName()
, i, Node[i]->GetCommPort()
- , i, Node[i]->GetSyncPort() );
+ , i, Node[i]->GetSyncPort()
+ , i, nodeInfo[i].creatorPNid);
}
for ( int i =0; i < cfgPNodes_; i++ )
{
@@ -3985,11 +4070,13 @@
// ready for reading nor writing
char buf[MON_STRING_BUF_SIZE];
snprintf( buf, sizeof(buf)
- , "[%s@%d] error event[%d]=%s peer=%d\n"
+ , "[%s@%d] Error: peer=%d, events[%d].data.fd=%d, event[%d]=%s\n"
, method_name, __LINE__
+ , iPeer
, iEvent
- , RedirEpollEventString(events[iEvent].events)
- , iPeer );
+ , events[iEvent].data.fd
+ , iEvent
+ , EpollEventString(events[iEvent].events) );
mon_log_write( MON_CLUSTER_ALLGATHERSOCK_3, SQ_LOG_CRIT, buf );
stats[iPeer].MPI_ERROR = MPI_ERR_EXITED;
err = MPI_ERR_IN_STATUS;
@@ -4027,7 +4114,7 @@
nr = recv( fd, r, n2get, 0 );
if ( nr >= 0 || errno == EINTR ) break;
}
- if ( nr <= 0 )
+ if ( nr < 0 )
{
if ( nr < 0 && eagain_ok && errno == EAGAIN )
{
@@ -4066,7 +4153,7 @@
{
if ( peer->p_received == hdrSize )
{
- // complete header, get buffer size
+ // got the complete header, get buffer size
struct sync_buffer_def *sb;
sb = (struct sync_buffer_def *)peer->p_buff;
peer->p_n2recv = sb->msgInfo.msg_offset;
@@ -4112,7 +4199,7 @@
ns = send( fd, s, n2send, 0 );
if ( ns >= 0 || errno != EINTR ) break;
}
- if ( ns <= 0 )
+ if ( ns < 0 )
{
// error, down socket
int err = errno;
@@ -4138,7 +4225,7 @@
peer->p_sent += ns;
if ( peer->p_sent == nbytes )
{
- // finsished sending to this destination
+ // finished sending to this destination
peer->p_sending = false;
nsent++;
stateChange = true;
@@ -4150,22 +4237,26 @@
{
struct epoll_event event;
event.data.fd = socks_[iPeer];
- int op;
+ int op = 0;
if ( !peer->p_sending && !peer->p_receiving )
{
op = EPOLL_CTL_DEL;
+ event.events = 0;
}
else if ( peer->p_sending )
{
op = EPOLL_CTL_MOD;
event.events = EPOLLOUT | EPOLLET;
}
- else
+ else if ( peer->p_receiving )
{
op = EPOLL_CTL_MOD;
event.events = EPOLLIN | EPOLLET;
}
- EpollCtl( epollFD_, op, fd, &event );
+ if ( op == EPOLL_CTL_DEL || op == EPOLL_CTL_MOD )
+ {
+ EpollCtl( epollFD_, op, fd, &event );
+ }
}
}
}
@@ -6648,19 +6739,21 @@
int error = 0;
int readCount = 0;
int received = 0;
+ int sizeCount = size;
do
{
readCount = (int) recv( sockFd
, buf
- , size
+ , sizeCount
, 0 );
if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY))
{
- trace_printf( "%s@%d - recv(), readCount=%d\n"
+ trace_printf( "%s@%d - Count read %d = recv(%d)\n"
, method_name, __LINE__
- , readCount );
+ , readCount
+ , sizeCount );
}
if ( readCount > 0 )
@@ -6673,11 +6766,13 @@
}
else
{
+ sizeCount -= received;
readAgain = true;
}
}
else if ( readCount == 0 )
{ // EOF
+ error = ENODATA;
readAgain = false;
}
else
diff --git a/sqf/monitor/linux/commaccept.cxx b/sqf/monitor/linux/commaccept.cxx
index 7832bc1..e9f0e26 100644
--- a/sqf/monitor/linux/commaccept.cxx
+++ b/sqf/monitor/linux/commaccept.cxx
@@ -33,8 +33,10 @@
extern CMonitor *Monitor;
extern CNode *MyNode;
extern CNodeContainer *Nodes;
+extern int MyPNID;
extern char MyCommPort[MPI_MAX_PORT_NAME];
extern char *ErrorMsg (int error_code);
+extern const char *StateString( STATE state);
extern CommType_t CommType;
CCommAccept::CCommAccept(): shutdown_(false), thread_id_(0)
@@ -167,7 +169,8 @@
int cfgPNodes = Monitor->GetNumNodes();
nodeId_t *nodeInfo;
- nodeInfo = new nodeId_t[cfgPNodes];
+ size_t nodeInfoSize = (sizeof(nodeId_t) * cfgPNodes);
+ nodeInfo = (nodeId_t *) new char[nodeInfoSize];
int rc;
CNode *node;
@@ -183,29 +186,36 @@
sizeof(nodeInfo[i].commPort));
strncpy(nodeInfo[i].syncPort, node->GetSyncPort(),
sizeof(nodeInfo[i].syncPort));
+ nodeInfo[i].pnid = node->GetPNid();
+ nodeInfo[i].creatorPNid = ( i == MyPNID) ? MyPNID : -1;
if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
{
- trace_printf( "%s@%d - Port for node %d (%s)\n"
- "CommPort=%s\n"
- "SyncPort=%s\n"
+ trace_printf( "%s@%d - Node info for pnid=%d (%s)\n"
+ " CommPort=%s\n"
+ " SyncPort=%s\n"
+ " creatorPNid=%d\n"
, method_name, __LINE__
- , i, node->GetName()
- , node->GetCommPort()
- , node->GetSyncPort());
+ , nodeInfo[i].pnid
+ , nodeInfo[i].nodeName
+ , nodeInfo[i].commPort
+ , nodeInfo[i].syncPort
+ , nodeInfo[i].creatorPNid );
}
}
else
{
if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
{
- trace_printf("%s@%d - No port info for pnid=%d (%s) node not up!\n",
+ trace_printf("%s@%d - No node info for pnid=%d (%s) node not up!\n",
method_name, __LINE__, i, node->GetName());
}
+ nodeInfo[i].pnid = -1;
nodeInfo[i].nodeName[0] = '\0';
nodeInfo[i].commPort[0] = '\0';
nodeInfo[i].syncPort[0] = '\0';
+ nodeInfo[i].creatorPNid = -1;
}
}
@@ -216,17 +226,19 @@
for (int i=0; i<cfgPNodes; i++)
{
trace_printf( "Port info for pnid=%d\n"
- "nodeInfo[%d].nodeName=%s\n"
- "nodeInfo[%d].commPort=%s\n"
- "nodeInfo[%d].syncPort=%s\n"
+ " nodeInfo[%d].nodeName=%s\n"
+ " nodeInfo[%d].commPort=%s\n"
+ " nodeInfo[%d].syncPort=%s\n"
+ " nodeInfo[%d].creatorPNid=%d\n"
, i , i, nodeInfo[i].nodeName
, i, nodeInfo[i].commPort
- , i, nodeInfo[i].syncPort );
+ , i, nodeInfo[i].syncPort
+ , i, nodeInfo[i].creatorPNid );
}
}
rc = Monitor->SendSock( (char *) nodeInfo
- , sizeof(nodeId_t)*cfgPNodes
+ , nodeInfoSize
, sockFd);
if ( rc )
{
@@ -522,9 +534,11 @@
if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
{
- trace_printf( "%s@%d - Accepted connection from node %s, commPort=%s, "
- "syncPort=%s, creator=%d, creatorShellPid=%d:%d\n"
+ trace_printf( "%s@%d - Accepted connection from node %d (%s), "
+ "commPort=%s, syncPort=%s, creator=%d, "
+ "creatorShellPid=%d:%d\n"
, method_name, __LINE__
+ , nodeId.pnid
, nodeId.nodeName
, nodeId.commPort
, nodeId.syncPort
@@ -537,7 +551,7 @@
int pnid = -1;
if ( node != NULL )
{ // Store port number for the node
- pnid = node->GetPNid();
+ pnid = nodeId.pnid;
node->SetCommPort( nodeId.commPort );
node->SetSyncPort( nodeId.syncPort );
}
@@ -546,9 +560,34 @@
close( joinFd );
char buf[MON_STRING_BUF_SIZE];
- snprintf(buf, sizeof(buf), "[%s], got connection from unknown "
- "node %s. Ignoring it.\n", method_name, nodeId.nodeName);
- mon_log_write(MON_COMMACCEPT_9, SQ_LOG_ERR, buf);
+ snprintf( buf, sizeof(buf)
+ , "[%s], got connection from unknown "
+ "node %d (%s). Ignoring it.\n"
+ , method_name
+ , nodeId.pnid
+ , nodeId.nodeName);
+ mon_log_write(MON_COMMACCEPT_9, SQ_LOG_ERR, buf);
+
+ return;
+ }
+
+ // Sanity check, re-integrating node must be down
+ if ( node->GetState() != State_Down )
+ {
+ int intdata = -1;
+ rc = Monitor->SendSock( (char *) &intdata
+ , 0
+ , joinFd );
+
+ close( joinFd );
+
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s], got connection from node %s. "
+ "Node not down, node state=%s\n"
+ , method_name, nodeId.nodeName
+ , StateString(node->GetState()));
+ mon_log_write(MON_COMMACCEPT_10, SQ_LOG_ERR, buf);
return;
}
@@ -572,29 +611,20 @@
, NULL );
return;
}
-
- Monitor->SetJoinSock( joinFd );
-
- Monitor->SetIntegratingNid( pnid );
-
- integratingFd = Monitor->MkCltSock( node->GetSyncPort() );
- Monitor->addNewSock( pnid, 1, integratingFd );
-
- node->SetState( State_Merging );
}
else
{ // No longer need joinFd
if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
{
- trace_printf( "%s@%d - Sending ready flag to new monitor\n",
+ trace_printf( "%s@%d - Sending ready indication to new monitor\n",
method_name, __LINE__);
}
// Tell connecting monitor that we are ready to integrate it.
- int readyFlag = 1;
- rc = Monitor->SendSock( (char *) &readyFlag
- , sizeof(readyFlag)
+ int mypnid = MyPNID;
+ rc = Monitor->SendSock( (char *) &mypnid
+ , sizeof(mypnid)
, joinFd );
if ( rc )
{
@@ -604,21 +634,11 @@
snprintf(buf, sizeof(buf), "[%s], unable to send connect "
"acknowledgement to new monitor: %s.\n", method_name,
ErrorMsg(rc));
- mon_log_write(MON_COMMACCEPT_10, SQ_LOG_ERR, buf);
-
- if ( MyNode->IsCreator() )
- {
- snprintf(buf, sizeof(buf), "Cannot send connect acknowledgment "
- "to new monitor: %s.\n", ErrorMsg(rc));
- SQ_theLocalIOToClient->putOnNoticeQueue( MyNode->GetCreatorPid()
- , MyNode->GetCreatorVerifier()
- , Notice( buf )
- , NULL );
- }
-
+ mon_log_write(MON_COMMACCEPT_11, SQ_LOG_ERR, buf);
return;
}
+ // Connect to new monitor
integratingFd = Monitor->MkCltSock( node->GetSyncPort() );
Monitor->addNewSock( pnid, 1, integratingFd );
@@ -627,18 +647,84 @@
close( joinFd );
}
- if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
- {
- trace_printf( "%s@%d - Connected to new monitor for node %d\n",
- method_name, __LINE__, pnid );
- }
-
- // Ideally the following logic should be done in another thread
- // so this thread can post another accept without delay. For
- // initial implementation simplicity this work is being done
- // here for now
if ( MyNode->IsCreator() )
{
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Sending my pnid to new monitor\n",
+ method_name, __LINE__);
+ }
+
+ // Sanity check, tell integrating monitor my creator pnid
+ int mypnid = MyPNID;
+ rc = Monitor->SendSock( (char *) &mypnid
+ , sizeof(mypnid)
+ , joinFd );
+ if ( rc )
+ {
+ close( joinFd );
+
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf(buf, sizeof(buf), "[%s], unable to send pnid "
+ "acknowledgement to new monitor: %s.\n", method_name,
+ ErrorMsg(rc));
+ mon_log_write(MON_COMMACCEPT_12, SQ_LOG_ERR, buf);
+
+ snprintf(buf, sizeof(buf), "Cannot send pnid acknowledgment "
+ "to new monitor: %s.\n", ErrorMsg(rc));
+ SQ_theLocalIOToClient->putOnNoticeQueue( MyNode->GetCreatorPid()
+ , MyNode->GetCreatorVerifier()
+ , Notice( buf )
+ , NULL );
+ return;
+ }
+
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Wait for ok to connect to new monitor in node %d\n",
+ method_name, __LINE__, pnid );
+ }
+
+ // Get new monitor acknowledgement that creator can connect
+ int newpnid = -1;
+ rc = Monitor->ReceiveSock( (char *) &newpnid
+ , sizeof(newpnid)
+ , joinFd );
+ if ( rc || newpnid != pnid )
+ {
+ close( joinFd );
+
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf(buf, sizeof(buf), "[%s], unable to send connect "
+ "acknowledgement to new monitor: %s.\n", method_name,
+ ErrorMsg(rc));
+ mon_log_write(MON_COMMACCEPT_13, SQ_LOG_ERR, buf);
+
+ snprintf(buf, sizeof(buf), "Cannot receive connect acknowledgment "
+ "to new monitor: %s.\n", ErrorMsg(rc));
+ SQ_theLocalIOToClient->putOnNoticeQueue( MyNode->GetCreatorPid()
+ , MyNode->GetCreatorVerifier()
+ , Notice( buf )
+ , NULL );
+ return;
+ }
+
+ Monitor->SetJoinSock( joinFd );
+
+ Monitor->SetIntegratingNid( pnid );
+
+ // Connect to new monitor
+ integratingFd = Monitor->MkCltSock( node->GetSyncPort() );
+ Monitor->addNewSock( pnid, 1, integratingFd );
+
+ node->SetState( State_Merging );
+
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Connected to new monitor for node %d\n",
+ method_name, __LINE__, pnid );
+ }
+
mem_log_write(CMonLog::MON_CONNTONEWMON_5, pnid);
// Get status from new monitor indicating whether
@@ -653,7 +739,7 @@
snprintf(buf, sizeof(buf), "[%s], unable to obtain "
"node status from new monitor: %s.\n",
method_name, ErrorMsg(rc));
- mon_log_write(MON_COMMACCEPT_11, SQ_LOG_ERR, buf);
+ mon_log_write(MON_COMMACCEPT_14, SQ_LOG_ERR, buf);
snprintf(buf, sizeof(buf), "Unable to obtain node status from "
"node %s monitor: %s.\n", nodeId.nodeName, ErrorMsg(rc));
@@ -666,32 +752,31 @@
close( joinFd );
Monitor->SetJoinSock( -1 );
Monitor->SetIntegratingNid( -1 );
+ return;
+ }
+
+ mem_log_write(CMonLog::MON_CONNTONEWMON_6, node->GetPNid(),
+ nodeStatus.state);
+
+ if (nodeStatus.state == State_Up)
+ {
+ // communicate the change and handle it after sync
+ // in ImAlive
+ node->SetChangeState( true );
}
else
{
- mem_log_write(CMonLog::MON_CONNTONEWMON_6, node->GetPNid(),
- nodeStatus.state);
-
- if (nodeStatus.state == State_Up)
- {
- // communicate the change and handle it after sync
- // in ImAlive
- node->SetChangeState( true );
- }
- else
- {
- char buf[MON_STRING_BUF_SIZE];
- snprintf(buf, sizeof(buf), "Node %s monitor failed to complete "
- "initialization\n", nodeId.nodeName);
- SQ_theLocalIOToClient->putOnNoticeQueue( MyNode->GetCreatorPid()
- , MyNode->GetCreatorVerifier()
- , Notice( buf )
- , NULL );
- node->SetState( State_Down );
- close( joinFd );
- Monitor->SetJoinSock( -1 );
- Monitor->SetIntegratingNid( -1 );
- }
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf(buf, sizeof(buf), "Node %s monitor failed to complete "
+ "initialization\n", nodeId.nodeName);
+ SQ_theLocalIOToClient->putOnNoticeQueue( MyNode->GetCreatorPid()
+ , MyNode->GetCreatorVerifier()
+ , Notice( buf )
+ , NULL );
+ node->SetState( State_Down );
+ close( joinFd );
+ Monitor->SetJoinSock( -1 );
+ Monitor->SetIntegratingNid( -1 );
}
}
@@ -760,7 +845,7 @@
MPI_Error_class( rc, &errClass );
snprintf(buf, sizeof(buf), "[%s], cannot accept new monitor: %s.\n",
method_name, ErrorMsg(rc));
- mon_log_write(MON_COMMACCEPT_12, SQ_LOG_ERR, buf);
+ mon_log_write(MON_COMMACCEPT_15, SQ_LOG_ERR, buf);
}
else
@@ -814,7 +899,7 @@
char buf[MON_STRING_BUF_SIZE];
snprintf(buf, sizeof(buf), "[%s], cannot accept new monitor: %s.\n",
method_name, strerror(errno));
- mon_log_write(MON_COMMACCEPT_13, SQ_LOG_ERR, buf);
+ mon_log_write(MON_COMMACCEPT_16, SQ_LOG_ERR, buf);
}
else
@@ -870,7 +955,7 @@
char buf[MON_STRING_BUF_SIZE];
snprintf(buf, sizeof(buf), "[%s], pthread_sigmask error=%d\n",
method_name, rc);
- mon_log_write(MON_COMMACCEPT_14, SQ_LOG_ERR, buf);
+ mon_log_write(MON_COMMACCEPT_17, SQ_LOG_ERR, buf);
}
// Enter thread processing loop
@@ -893,7 +978,7 @@
char buf[MON_STRING_BUF_SIZE];
snprintf(buf, sizeof(buf), "[%s], thread create error=%d\n",
method_name, rc);
- mon_log_write(MON_COMMACCEPT_15, SQ_LOG_ERR, buf);
+ mon_log_write(MON_COMMACCEPT_18, SQ_LOG_ERR, buf);
}
TRACE_EXIT;
diff --git a/sqf/monitor/linux/internal.h b/sqf/monitor/linux/internal.h
old mode 100755
new mode 100644
index de2ea61..98c19cf
--- a/sqf/monitor/linux/internal.h
+++ b/sqf/monitor/linux/internal.h
@@ -381,6 +381,8 @@
char nodeName[MPI_MAX_PROCESSOR_NAME];
char commPort[MPI_MAX_PORT_NAME];
char syncPort[MPI_MAX_PORT_NAME];
+ int pnid;
+ int creatorPNid;
int creatorShellPid;
Verifier_t creatorShellVerifier;
bool creator; // NEW monitor set to true to tell creator it is the CREATOR
diff --git a/sqf/monitor/linux/redirector.cxx b/sqf/monitor/linux/redirector.cxx
index 18d5a0b..2609953 100755
--- a/sqf/monitor/linux/redirector.cxx
+++ b/sqf/monitor/linux/redirector.cxx
@@ -59,7 +59,7 @@
extern CMonStats *MonStats;
extern CReqQueue ReqQueue;
-const char *RedirEpollEventString( __uint32_t events )
+const char *EpollEventString( __uint32_t events )
{
static char str[80] = {0};
@@ -101,6 +101,29 @@
return( str );
}
+const char *EpollOpString( int op )
+{
+ static char str[15] = {0};
+
+ switch (op)
+ {
+ case EPOLL_CTL_ADD:
+ strcpy( str, "EPOLL_CTL_ADD" );
+ break;
+ case EPOLL_CTL_MOD:
+ strcpy( str, "EPOLL_CTL_MOD" );
+ break;
+ case EPOLL_CTL_DEL:
+ strcpy( str, "EPOLL_CTL_DEL" );
+ break;
+ default:
+ strcpy( str, "Invalid OP" );
+ break;
+ }
+
+ return( str );
+}
+
CRedirect::CRedirect(int nid, int pid)
:fd_(-1)
,activity_(false)
@@ -1888,7 +1911,7 @@
if (trace_settings & TRACE_REDIRECTION)
trace_printf("%s@%d for fd=%d, events=%d %s\n",
method_name, __LINE__, fd, events,
- RedirEpollEventString(events));
+ EpollEventString(events));
// Acquire lock to prevent memory modifications during
// fork/exec (see uses of OFED_MUTEX define)