Closes-Bug: 1411520 1411830

Fixes node down and node up processing.

Change-Id: I1f30a0298df395acdd58d530e4c0f5a5ba8356ce
diff --git a/sqf/export/include/common/evl_sqlog_eventnum.h b/sqf/export/include/common/evl_sqlog_eventnum.h
index f99726c..49afcfb 100644
--- a/sqf/export/include/common/evl_sqlog_eventnum.h
+++ b/sqf/export/include/common/evl_sqlog_eventnum.h
@@ -698,6 +698,8 @@
 #define MON_COMMACCEPT_14                   101320114
 #define MON_COMMACCEPT_15                   101320115
 #define MON_COMMACCEPT_16                   101320116
+#define MON_COMMACCEPT_17                   101320117
+#define MON_COMMACCEPT_18                   101320118
 
 /* Module: reqnodedown.cxx = 33 */
 #define MON_EXT_NODEDOWN_REQ                101330101
diff --git a/sqf/monitor/linux/cluster.cxx b/sqf/monitor/linux/cluster.cxx
index 8c9bcfa..aed7439 100644
--- a/sqf/monitor/linux/cluster.cxx
+++ b/sqf/monitor/linux/cluster.cxx
@@ -91,7 +91,8 @@
 const char *JoiningPhaseString( JOINING_PHASE phase);
 const char *StateString( STATE state);
 const char *SyncStateString( SyncState state);
-const char *RedirEpollEventString( __uint32_t events );
+const char *EpollEventString( __uint32_t events );
+const char *EpollOpString( int op );
 
 void CCluster::ActivateSpare( CNode *spareNode, CNode *downNode, bool checkHealth )
 {
@@ -1213,8 +1214,16 @@
             node->SetState( State_Joining );
             
             if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
-               trace_printf( "%s@%d" " - New monitor %s, pnid=%d, state=%s" "\n"
-                           , method_name, __LINE__, node->GetName(), node->GetPNid(), StateString( node->GetState() ) );
+            {
+                trace_printf( "%s@%d" " - New monitor %s, pnid=%d, state=%s" "\n"
+                            , method_name, __LINE__, node->GetName(), node->GetPNid(), StateString( node->GetState() ) );
+                for ( int i =0; i < cfgPNodes_; i++ )
+                {
+                    trace_printf( "%s@%d socks_[%d]=%d\n"
+                                , method_name, __LINE__
+                                , i, socks_[i]);
+                }
+            }
             if ( MyNode->IsCreator() )
             {
                 SQ_theLocalIOToClient->putOnNoticeQueue( MyNode->GetCreatorPid()
@@ -2520,9 +2529,23 @@
                 nodeStatus[i] = false;
             }
 
-            // Collect port info from other monitors
-            char *portNums = new char[worldSize * MPI_MAX_PORT_NAME];
-            rc = MPI_Allgather (MyCommPort, MPI_MAX_PORT_NAME, MPI_CHAR, portNums,
+            // Collect comm port info from other monitors
+            char *commPortNums = new char[worldSize * MPI_MAX_PORT_NAME];
+            rc = MPI_Allgather (MyCommPort, MPI_MAX_PORT_NAME, MPI_CHAR, commPortNums,
+                                MPI_MAX_PORT_NAME, MPI_CHAR, MPI_COMM_WORLD);
+            if (rc != MPI_SUCCESS)
+            {
+                char buf[MON_STRING_BUF_SIZE];
+                snprintf(buf, sizeof(buf), "[%s@%d] MPI_Allgather error=%s\n",
+                         method_name, __LINE__, ErrorMsg(rc));
+                mon_log_write(MON_CLUSTER_INITCONFIGCLUSTER_2, SQ_LOG_CRIT, buf);
+
+                MPI_Abort(MPI_COMM_SELF,99);
+            }
+
+            // Collect sync port info from other monitors
+            char *syncPortNums = new char[worldSize * MPI_MAX_PORT_NAME];
+            rc = MPI_Allgather (MySyncPort, MPI_MAX_PORT_NAME, MPI_CHAR, syncPortNums,
                                 MPI_MAX_PORT_NAME, MPI_CHAR, MPI_COMM_WORLD);
             if (rc != MPI_SUCCESS)
             {
@@ -2559,18 +2582,22 @@
                 node = Nodes->GetNode( nodeName );
                 if ( node )
                 {
-                    node->SetCommPort( &portNums[ i * MPI_MAX_PORT_NAME] );
+                    node->SetCommPort( &commPortNums[ i * MPI_MAX_PORT_NAME] );
+                    node->SetSyncPort( &syncPortNums[ i * MPI_MAX_PORT_NAME] );
                     rankToPnid[i] = node->GetPNid();
                     nodeStatus[rankToPnid[i]] = true;
 
                     if (trace_settings & TRACE_INIT)
                     {
-                        trace_printf( "%s@%d rankToPnid[%d]=%d (%s:%s)"
-                                      "(node=%s,port=%s)\n"
+                        trace_printf( "%s@%d rankToPnid[%d]=%d (%s:%s:%s)"
+                                      "(node=%s,commPort=%s,syncPort=%s)\n"
                                     , method_name, __LINE__, i, rankToPnid[i]
-                                    , node->GetName(), node->GetCommPort()
+                                    , node->GetName()
+                                    , node->GetCommPort()
+                                    , node->GetSyncPort()
                                     , &nodeNames[ i * MPI_MAX_PROCESSOR_NAME]
-                                    , &portNums[ i * MPI_MAX_PORT_NAME]);
+                                    , &commPortNums[ i * MPI_MAX_PORT_NAME]
+                                    , &syncPortNums[ i * MPI_MAX_PORT_NAME]);
                     }
                 }
                 else
@@ -2585,7 +2612,8 @@
                     mon_log_write(MON_CLUSTER_INITCONFIGCLUSTER_4, SQ_LOG_CRIT, buf);
                 }
             }
-            delete [] portNums;
+            delete [] commPortNums;
+            delete [] syncPortNums;
 
             int TmLeaderPNid = LNode[TmLeaderNid]->GetNode()->GetPNid();
 
@@ -3213,10 +3241,6 @@
     int existingCommFd;
     int existingSyncFd;
 
-    nodeId_t myNodeInfo;
-    strcpy(myNodeInfo.nodeName, MyNode->GetName());
-    strcpy(myNodeInfo.commPort, MyNode->GetCommPort());
-    strcpy(myNodeInfo.syncPort, MyNode->GetSyncPort());
     // Set bit indicating my node is up
     upNodes_.upNodes[MyPNID/64] |= (1ull << MyPNID);
 
@@ -3253,12 +3277,18 @@
 
     if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
     {
-        trace_printf("%s@%d Connected to creator monitor, sending id\n",
+        trace_printf("%s@%d Connected to creator monitor, sending my node info\n",
                      method_name, __LINE__); 
     }
 
     // Send this node's name and port number so creator monitor 
     // knows who we are, and set flag to let creator monitor it is the CREATOR.
+    nodeId_t myNodeInfo;
+    strcpy(myNodeInfo.nodeName, MyNode->GetName());
+    strcpy(myNodeInfo.commPort, MyNode->GetCommPort());
+    strcpy(myNodeInfo.syncPort, MyNode->GetSyncPort());
+    myNodeInfo.pnid = MyNode->GetPNid();
+    myNodeInfo.creatorPNid = -1;
     myNodeInfo.creator = true;
     myNodeInfo.creatorShellPid = CreatorShellPid;
     myNodeInfo.creatorShellVerifier = CreatorShellVerifier;
@@ -3272,14 +3302,21 @@
 
     TEST_POINT( TP012_NODE_UP );
 
-    nodeId_t *nodeInfo = new nodeId_t[cfgPNodes_];
-
     mem_log_write(CMonLog::MON_REINTEGRATE_3, MyPNID);
 
+    if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+    {
+        trace_printf("%s@%d Getting all node info from creator monitor\n",
+                     method_name, __LINE__);
+    }
+
     // Obtain node names & port numbers of existing monitors from
     // the creator monitor.
+    nodeId_t *nodeInfo;
+    size_t nodeInfoSize = (sizeof(nodeId_t) * cfgPNodes_);
+    nodeInfo = (nodeId_t *) new char[nodeInfoSize];
     rc = Monitor->ReceiveSock( (char *)nodeInfo
-                             , sizeof(nodeId_t)*cfgPNodes_
+                             , nodeInfoSize
                              , joinSock_ );
     if ( rc )
     {
@@ -3297,14 +3334,17 @@
     {
         for (int i=0; i<cfgPNodes_; i++)
         {
-            trace_printf( "%s@%d - Port info for pnid=%d\n"
-                          "nodeInfo[%d].nodeName=%s\n"
-                          "nodeInfo[%d].commPort=%s\n"
-                          "nodeInfo[%d].syncPort=%s\n"
-                        , method_name, __LINE__, i
-                        , i, nodeInfo[i].nodeName
-                        , i, nodeInfo[i].commPort
-                        , i, nodeInfo[i].syncPort );
+            trace_printf( "%s@%d - Node info for pnid=%d:\n"
+                          "        nodeName=%s\n"
+                          "        commPort=%s\n"
+                          "        syncPort=%s\n"
+                          "        creatorPNid=%d\n"
+                        , method_name, __LINE__
+                        , nodeInfo[i].pnid
+                        , nodeInfo[i].nodeName
+                        , nodeInfo[i].commPort
+                        , nodeInfo[i].syncPort
+                        , nodeInfo[i].creatorPNid );
         }
     }
     // Connect to each of the other existing monitors and let them know 
@@ -3315,14 +3355,49 @@
     myNodeInfo.creatorShellVerifier = -1;
     for (int i=0; i<cfgPNodes_; i++)
     {
-        if (strcmp(nodeInfo[i].commPort, IntegratingMonitorPort) == 0)
+        if ( nodeInfo[i].creatorPNid != -1 && nodeInfo[i].creatorPNid == i )
         {
+            // Get acknowledgement that creator monitor is ready to
+            // integrate this node.
+            int creatorpnid = -1;
+            rc = Monitor->ReceiveSock( (char *) &creatorpnid
+                                     , sizeof(creatorpnid)
+                                     , joinSock_ );
+            if ( rc || creatorpnid != nodeInfo[i].creatorPNid )
+            {
+                HandleReintegrateError( rc, Reintegrate_Err15, i, NULL,
+                                        false );
+                SendReIntegrateStatus( State_Down, Reintegrate_Err14 );
+            }
+
+            if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+            {
+                trace_printf( "%s@%d - Received ready indication from creator "
+                              "node %d nodeInfo[%d].nodeName=%s\n"
+                            , method_name, __LINE__
+                            , creatorpnid, i , nodeInfo[i].nodeName);
+            }
+
             otherMonRank_[i] = 0;
             ++CurNodes;
 
+            Node[i]->SetCommPort( nodeInfo[i].commPort );
             Node[i]->SetSyncPort( nodeInfo[i].syncPort );
             Node[i]->SetState( State_Up );
 
+            // Tell creator we are ready to accept its connection
+            int mypnid = MyPNID;
+            rc = Monitor->SendSock( (char *) &mypnid
+                                  , sizeof(mypnid)
+                                  , joinSock_ );
+            if ( rc )
+            {
+                HandleReintegrateError( rc, Reintegrate_Err4, i, &nodeInfo[i],
+                                        false );
+                SendReIntegrateStatus( State_Down, Reintegrate_Err14 );
+            }
+
+            // Connect to creator monitor
             existingSyncFd = AcceptSyncSock();
             if ( existingSyncFd < 0 )
             {
@@ -3336,6 +3411,10 @@
 
             if (trace_settings & (TRACE_RECOVERY | TRACE_INIT))
             {
+                trace_printf( "%s@%d Connected to creator node %d (%s)\n"
+                            , method_name, __LINE__
+                            , nodeInfo[i].creatorPNid
+                            , nodeInfo[i].nodeName );
                 trace_printf( "%s@%d socks_[%d]=%d\n"
                             , method_name, __LINE__
                             , i, socks_[i]);
@@ -3382,7 +3461,7 @@
 
             if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
             {
-                trace_printf("%s@%d Connected to node %d (%s), sending id\n",
+                trace_printf("%s@%d Connected to node %d (%s), sending my node name\n",
                              method_name, __LINE__,i,nodeInfo[i].nodeName); 
             }
 
@@ -3403,11 +3482,11 @@
             // race condition where the creator monitor could signal
             // the monitors in the cluster to integrate the new node
             // before one or more was ready to do the integration.
-            int readyFlag;
-            rc = Monitor->ReceiveSock( (char *) &readyFlag
-                                     , sizeof(readyFlag)
+            int remotepnid = -1;
+            rc = Monitor->ReceiveSock( (char *) &remotepnid
+                                     , sizeof(remotepnid)
                                      , existingCommFd );
-            if ( rc )
+            if ( rc || remotepnid != i )
             {
                 HandleReintegrateError( rc, Reintegrate_Err15, i, NULL,
                                         false );
@@ -3416,16 +3495,19 @@
 
             if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
             {
-                trace_printf( "%s@%d - Received ready-flag from node %d (%s)\n",
-                              method_name, __LINE__, i,
-                             nodeInfo[i].nodeName); 
+                trace_printf( "%s@%d - Received ready indication from "
+                              "node %d nodeInfo[%d].nodeName=%s\n"
+                            , method_name, __LINE__
+                            , remotepnid, i , nodeInfo[i].nodeName);
             }
 
             otherMonRank_[i] = 0;
             ++CurNodes;
-            Node[i]->SetSyncPort(nodeInfo[i].syncPort );
+            Node[i]->SetCommPort( nodeInfo[i].commPort );
+            Node[i]->SetSyncPort( nodeInfo[i].syncPort );
             Node[i]->SetState( State_Up );
 
+            // Connect to existing monitor
             existingSyncFd = AcceptSyncSock();
             if ( existingSyncFd < 0 )
             {
@@ -3465,7 +3547,8 @@
                               "nodeInfo[%d].syncPort=%s\n"
                               "IntegratingMonitorPort=%s\n"
                             , method_name, __LINE__
-                            , i, i, nodeInfo[i].commPort
+                            , i
+                            , i, nodeInfo[i].commPort
                             , i, nodeInfo[i].syncPort
                             , IntegratingMonitorPort);
             }
@@ -3476,14 +3559,16 @@
     {
         for (int i=0; i<cfgPNodes_; i++)
         {
-            trace_printf( "%s@%d - Port info for pnid=%d\n"
-                          "Node[%d] name=%s\n"
-                          "Node[%d] commPort=%s\n"
-                          "Node[%d] syncPort=%s\n"
-                        , method_name, __LINE__, i
-                        , i, Node[i]->GetName()
+            trace_printf( "%s@%d - Node info for pnid=%d (%s)\n"
+                          "        Node[%d] commPort=%s\n"
+                          "        Node[%d] syncPort=%s\n"
+                          "        Node[%d] creatorPNid=%d\n"
+                        , method_name, __LINE__
+                        , Node[i]->GetPNid()
+                        , Node[i]->GetName()
                         , i, Node[i]->GetCommPort()
-                        , i, Node[i]->GetSyncPort() );
+                        , i, Node[i]->GetSyncPort()
+                        , i, nodeInfo[i].creatorPNid);
         }
         for ( int i =0; i < cfgPNodes_; i++ )
         {
@@ -3985,11 +4070,13 @@
                 // ready for reading nor writing
                 char buf[MON_STRING_BUF_SIZE];
                 snprintf( buf, sizeof(buf)
-                        , "[%s@%d] error event[%d]=%s peer=%d\n"
+                        , "[%s@%d] Error: peer=%d, events[%d].data.fd=%d, event[%d]=%s\n"
                         , method_name, __LINE__
+                        , iPeer
                         , iEvent
-                        , RedirEpollEventString(events[iEvent].events)
-                        , iPeer );
+                        , events[iEvent].data.fd
+                        , iEvent
+                        , EpollEventString(events[iEvent].events) );
                 mon_log_write( MON_CLUSTER_ALLGATHERSOCK_3, SQ_LOG_CRIT, buf );
                 stats[iPeer].MPI_ERROR = MPI_ERR_EXITED;
                 err = MPI_ERR_IN_STATUS;
@@ -4027,7 +4114,7 @@
                     nr = recv( fd, r, n2get, 0 );
                     if ( nr >= 0 || errno == EINTR ) break;
                 }
-                if ( nr <= 0 )
+                if ( nr < 0 )
                 {
                     if ( nr < 0 && eagain_ok && errno == EAGAIN )
                     {
@@ -4066,7 +4153,7 @@
                     {
                         if ( peer->p_received == hdrSize )
                         {
-                            // complete header, get buffer size
+                            // got the complete header, get buffer size
                             struct sync_buffer_def *sb;
                             sb = (struct sync_buffer_def *)peer->p_buff;
                             peer->p_n2recv = sb->msgInfo.msg_offset;
@@ -4112,7 +4199,7 @@
                     ns = send( fd, s, n2send, 0 );
                     if ( ns >= 0 || errno != EINTR ) break;
                 }
-                if ( ns <= 0 )
+                if ( ns < 0 )
                 {
                     // error, down socket
                     int err = errno;
@@ -4138,7 +4225,7 @@
                     peer->p_sent += ns;
                     if ( peer->p_sent == nbytes )
                     {
-                        // finsished sending to this destination
+                        // finished sending to this destination
                         peer->p_sending = false;
                         nsent++;
                         stateChange = true;
@@ -4150,22 +4237,26 @@
             {
                 struct epoll_event event;
                 event.data.fd = socks_[iPeer];
-                int op;
+                int op = 0;
                 if ( !peer->p_sending && !peer->p_receiving )
                 {
                     op = EPOLL_CTL_DEL;
+                    event.events = 0;
                 }
                 else if ( peer->p_sending )
                 {
                     op = EPOLL_CTL_MOD;
                     event.events = EPOLLOUT | EPOLLET;
                 }
-                else
+                else if ( peer->p_receiving )
                 {
                     op = EPOLL_CTL_MOD;
                     event.events = EPOLLIN | EPOLLET;
                 }
-                EpollCtl( epollFD_, op, fd, &event );
+                if ( op == EPOLL_CTL_DEL || op == EPOLL_CTL_MOD )
+                {
+                    EpollCtl( epollFD_, op, fd, &event );
+                }
             }
         }
     }
@@ -6648,19 +6739,21 @@
     int     error = 0;
     int     readCount = 0;
     int     received = 0;
+    int     sizeCount = size;
     
     do
     {
         readCount = (int) recv( sockFd
                               , buf
-                              , size 
+                              , sizeCount
                               , 0 );
     
         if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY))
         {
-            trace_printf( "%s@%d - recv(), readCount=%d\n"
+            trace_printf( "%s@%d - Count read %d = recv(%d)\n"
                         , method_name, __LINE__
-                        , readCount );
+                        , readCount
+                        , sizeCount );
         }
     
         if ( readCount > 0 )
@@ -6673,11 +6766,13 @@
             }
             else
             {
+                sizeCount -= received;
                 readAgain = true;
             }
         }
         else if ( readCount == 0 )
         { // EOF
+             error = ENODATA;
              readAgain = false;
         }
         else
diff --git a/sqf/monitor/linux/commaccept.cxx b/sqf/monitor/linux/commaccept.cxx
index 7832bc1..e9f0e26 100644
--- a/sqf/monitor/linux/commaccept.cxx
+++ b/sqf/monitor/linux/commaccept.cxx
@@ -33,8 +33,10 @@
 extern CMonitor *Monitor;
 extern CNode *MyNode;
 extern CNodeContainer *Nodes;
+extern int MyPNID;
 extern char MyCommPort[MPI_MAX_PORT_NAME];
 extern char *ErrorMsg (int error_code);
+extern const char *StateString( STATE state);
 extern CommType_t CommType;
 
 CCommAccept::CCommAccept(): shutdown_(false), thread_id_(0)
@@ -167,7 +169,8 @@
     int cfgPNodes = Monitor->GetNumNodes();
 
     nodeId_t *nodeInfo;
-    nodeInfo = new nodeId_t[cfgPNodes];
+    size_t nodeInfoSize = (sizeof(nodeId_t) * cfgPNodes);
+    nodeInfo = (nodeId_t *) new char[nodeInfoSize];
     int rc;
 
     CNode *node;
@@ -183,29 +186,36 @@
                     sizeof(nodeInfo[i].commPort));
             strncpy(nodeInfo[i].syncPort, node->GetSyncPort(),
                     sizeof(nodeInfo[i].syncPort));
+            nodeInfo[i].pnid = node->GetPNid();
+            nodeInfo[i].creatorPNid = ( i == MyPNID) ? MyPNID : -1;
 
             if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
             {
-                trace_printf( "%s@%d - Port for node %d (%s)\n"
-                              "CommPort=%s\n"
-                              "SyncPort=%s\n"
+                trace_printf( "%s@%d - Node info for pnid=%d (%s)\n"
+                              "        CommPort=%s\n"
+                              "        SyncPort=%s\n"
+                              "        creatorPNid=%d\n"
                             , method_name, __LINE__
-                            , i, node->GetName()
-                            , node->GetCommPort()
-                            , node->GetSyncPort());
+                            , nodeInfo[i].pnid
+                            , nodeInfo[i].nodeName
+                            , nodeInfo[i].commPort
+                            , nodeInfo[i].syncPort
+                            , nodeInfo[i].creatorPNid );
             }
         }
         else
         {
             if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
             {
-                trace_printf("%s@%d - No port info for pnid=%d (%s) node not up!\n",
+                trace_printf("%s@%d - No node info for pnid=%d (%s) node not up!\n",
                              method_name, __LINE__, i, node->GetName());
             }
 
+            nodeInfo[i].pnid = -1;
             nodeInfo[i].nodeName[0] = '\0';
             nodeInfo[i].commPort[0] = '\0';
             nodeInfo[i].syncPort[0] = '\0';
+            nodeInfo[i].creatorPNid = -1;
         }
     }
 
@@ -216,17 +226,19 @@
         for (int i=0; i<cfgPNodes; i++)
         {
             trace_printf( "Port info for pnid=%d\n"
-                          "nodeInfo[%d].nodeName=%s\n"
-                          "nodeInfo[%d].commPort=%s\n"
-                          "nodeInfo[%d].syncPort=%s\n"
+                          "        nodeInfo[%d].nodeName=%s\n"
+                          "        nodeInfo[%d].commPort=%s\n"
+                          "        nodeInfo[%d].syncPort=%s\n"
+                          "        nodeInfo[%d].creatorPNid=%d\n"
                         , i , i, nodeInfo[i].nodeName
                         , i, nodeInfo[i].commPort
-                        , i, nodeInfo[i].syncPort );
+                        , i, nodeInfo[i].syncPort
+                        , i, nodeInfo[i].creatorPNid );
         }
     }
 
     rc = Monitor->SendSock( (char *) nodeInfo
-                          , sizeof(nodeId_t)*cfgPNodes
+                          , nodeInfoSize
                           , sockFd);
     if ( rc )
     {
@@ -522,9 +534,11 @@
     
     if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
     {
-        trace_printf( "%s@%d - Accepted connection from node %s, commPort=%s, "
-                      "syncPort=%s, creator=%d, creatorShellPid=%d:%d\n"
+        trace_printf( "%s@%d - Accepted connection from node %d (%s), "
+                      "commPort=%s, syncPort=%s, creator=%d, "
+                      "creatorShellPid=%d:%d\n"
                     , method_name, __LINE__
+                    , nodeId.pnid
                     , nodeId.nodeName
                     , nodeId.commPort
                     , nodeId.syncPort
@@ -537,7 +551,7 @@
     int pnid = -1;
     if ( node != NULL )
     {   // Store port number for the node
-        pnid = node->GetPNid();
+        pnid = nodeId.pnid;
         node->SetCommPort( nodeId.commPort );
         node->SetSyncPort( nodeId.syncPort );
     }
@@ -546,9 +560,34 @@
         close( joinFd );
 
         char buf[MON_STRING_BUF_SIZE];
-        snprintf(buf, sizeof(buf), "[%s], got connection from unknown "
-                 "node %s. Ignoring it.\n", method_name, nodeId.nodeName);
-        mon_log_write(MON_COMMACCEPT_9, SQ_LOG_ERR, buf);    
+        snprintf( buf, sizeof(buf)
+                , "[%s], got connection from unknown "
+                  "node %d (%s). Ignoring it.\n"
+                , method_name
+                , nodeId.pnid
+                , nodeId.nodeName);
+        mon_log_write(MON_COMMACCEPT_9, SQ_LOG_ERR, buf);
+
+        return;
+    }
+
+    // Sanity check, re-integrating node must be down
+    if ( node->GetState() != State_Down )
+    {
+        int intdata = -1;
+        rc = Monitor->SendSock( (char *) &intdata
+                              , 0
+                              , joinFd );
+
+        close( joinFd );
+
+        char buf[MON_STRING_BUF_SIZE];
+        snprintf( buf, sizeof(buf)
+                , "[%s], got connection from node %s. "
+                  "Node not down, node state=%s\n"
+                , method_name, nodeId.nodeName
+                , StateString(node->GetState()));
+        mon_log_write(MON_COMMACCEPT_10, SQ_LOG_ERR, buf);
 
         return;
     }
@@ -572,29 +611,20 @@
                                                    , NULL );
             return;
         }
-
-        Monitor->SetJoinSock( joinFd );
-
-        Monitor->SetIntegratingNid( pnid );
-
-        integratingFd = Monitor->MkCltSock( node->GetSyncPort() );
-        Monitor->addNewSock( pnid, 1, integratingFd );
-
-        node->SetState( State_Merging ); 
     }
     else
     {   // No longer need joinFd
 
         if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
         {
-            trace_printf( "%s@%d - Sending ready flag to new monitor\n",
+            trace_printf( "%s@%d - Sending ready indication to new monitor\n",
                           method_name, __LINE__);
         }
 
         // Tell connecting monitor that we are ready to integrate it.
-        int readyFlag = 1;
-        rc = Monitor->SendSock( (char *) &readyFlag
-                              , sizeof(readyFlag)
+        int mypnid = MyPNID;
+        rc = Monitor->SendSock( (char *) &mypnid
+                              , sizeof(mypnid)
                               , joinFd );
         if ( rc )
         {
@@ -604,21 +634,11 @@
             snprintf(buf, sizeof(buf), "[%s], unable to send connect "
                      "acknowledgement to new monitor: %s.\n", method_name,
                      ErrorMsg(rc));
-            mon_log_write(MON_COMMACCEPT_10, SQ_LOG_ERR, buf);    
-
-            if ( MyNode->IsCreator() )
-            {
-                snprintf(buf, sizeof(buf), "Cannot send connect acknowledgment "
-                         "to new monitor: %s.\n", ErrorMsg(rc));
-                SQ_theLocalIOToClient->putOnNoticeQueue( MyNode->GetCreatorPid()
-                                                       , MyNode->GetCreatorVerifier()
-                                                       , Notice( buf )
-                                                       , NULL );
-            }
-
+            mon_log_write(MON_COMMACCEPT_11, SQ_LOG_ERR, buf);
             return;
         }
 
+        // Connect to new monitor
         integratingFd = Monitor->MkCltSock( node->GetSyncPort() );
         Monitor->addNewSock( pnid, 1, integratingFd );
 
@@ -627,18 +647,84 @@
         close( joinFd );
     }
 
-    if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
-    {
-        trace_printf( "%s@%d - Connected to new monitor for node %d\n",
-                      method_name, __LINE__, pnid );
-    }
-
-    // Ideally the following logic should be done in another thread
-    // so this thread can post another accept without delay.  For
-    // initial implementation simplicity this work is being done 
-    // here for now
     if ( MyNode->IsCreator() )
     {
+        if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+        {
+            trace_printf( "%s@%d - Sending my pnid to new monitor\n",
+                          method_name, __LINE__);
+        }
+
+        // Sanity check, tell integrating monitor my creator pnid
+        int mypnid = MyPNID;
+        rc = Monitor->SendSock( (char *) &mypnid
+                              , sizeof(mypnid)
+                              , joinFd );
+        if ( rc )
+        {
+            close( joinFd );
+
+            char buf[MON_STRING_BUF_SIZE];
+            snprintf(buf, sizeof(buf), "[%s], unable to send pnid "
+                     "acknowledgement to new monitor: %s.\n", method_name,
+                     ErrorMsg(rc));
+            mon_log_write(MON_COMMACCEPT_12, SQ_LOG_ERR, buf);
+
+            snprintf(buf, sizeof(buf), "Cannot send pnid acknowledgment "
+                     "to new monitor: %s.\n", ErrorMsg(rc));
+            SQ_theLocalIOToClient->putOnNoticeQueue( MyNode->GetCreatorPid()
+                                                   , MyNode->GetCreatorVerifier()
+                                                   , Notice( buf )
+                                                   , NULL );
+            return;
+        }
+
+        if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+        {
+            trace_printf( "%s@%d - Wait for ok to connect to new monitor in node %d\n",
+                          method_name, __LINE__, pnid );
+        }
+
+        // Get new monitor acknowledgement that creator can connect
+        int newpnid = -1;
+        rc = Monitor->ReceiveSock( (char *) &newpnid
+                                 , sizeof(newpnid)
+                                 , joinFd );
+        if ( rc || newpnid != pnid )
+        {
+            close( joinFd );
+
+            char buf[MON_STRING_BUF_SIZE];
+            snprintf(buf, sizeof(buf), "[%s], unable to send connect "
+                  "acknowledgement to new monitor: %s.\n", method_name,
+                  ErrorMsg(rc));
+            mon_log_write(MON_COMMACCEPT_13, SQ_LOG_ERR, buf);
+
+            snprintf(buf, sizeof(buf), "Cannot receive connect acknowledgment "
+                  "to new monitor: %s.\n", ErrorMsg(rc));
+            SQ_theLocalIOToClient->putOnNoticeQueue( MyNode->GetCreatorPid()
+                                                , MyNode->GetCreatorVerifier()
+                                                , Notice( buf )
+                                                , NULL );
+            return;
+        }
+
+        Monitor->SetJoinSock( joinFd );
+
+        Monitor->SetIntegratingNid( pnid );
+
+        // Connect to new monitor
+        integratingFd = Monitor->MkCltSock( node->GetSyncPort() );
+        Monitor->addNewSock( pnid, 1, integratingFd );
+
+        node->SetState( State_Merging ); 
+
+        if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+        {
+            trace_printf( "%s@%d - Connected to new monitor for node %d\n",
+                          method_name, __LINE__, pnid );
+        }
+
         mem_log_write(CMonLog::MON_CONNTONEWMON_5, pnid);
 
         // Get status from new monitor indicating whether
@@ -653,7 +739,7 @@
             snprintf(buf, sizeof(buf), "[%s], unable to obtain "
                      "node status from new monitor: %s.\n",
                      method_name, ErrorMsg(rc));
-            mon_log_write(MON_COMMACCEPT_11, SQ_LOG_ERR, buf);
+            mon_log_write(MON_COMMACCEPT_14, SQ_LOG_ERR, buf);
 
             snprintf(buf, sizeof(buf), "Unable to obtain node status from "
                      "node %s monitor: %s.\n", nodeId.nodeName, ErrorMsg(rc));
@@ -666,32 +752,31 @@
             close( joinFd );
             Monitor->SetJoinSock( -1 );
             Monitor->SetIntegratingNid( -1 );
+            return;
+        }
+
+        mem_log_write(CMonLog::MON_CONNTONEWMON_6, node->GetPNid(),
+                      nodeStatus.state);
+
+        if (nodeStatus.state == State_Up)
+        {
+            // communicate the change and handle it after sync
+            // in ImAlive
+            node->SetChangeState( true );
         }
         else
         {
-            mem_log_write(CMonLog::MON_CONNTONEWMON_6, node->GetPNid(),
-                          nodeStatus.state);
-
-            if (nodeStatus.state == State_Up)
-            {
-                // communicate the change and handle it after sync
-                // in ImAlive
-                node->SetChangeState( true );
-            }
-            else
-            {
-                char buf[MON_STRING_BUF_SIZE];
-                snprintf(buf, sizeof(buf), "Node %s monitor failed to complete "
-                         "initialization\n", nodeId.nodeName);
-                SQ_theLocalIOToClient->putOnNoticeQueue( MyNode->GetCreatorPid()
-                                                       , MyNode->GetCreatorVerifier()
-                                                       , Notice( buf )
-                                                       , NULL );
-                node->SetState( State_Down ); 
-                close( joinFd );
-                Monitor->SetJoinSock( -1 );
-                Monitor->SetIntegratingNid( -1 );
-            }
+            char buf[MON_STRING_BUF_SIZE];
+            snprintf(buf, sizeof(buf), "Node %s monitor failed to complete "
+                     "initialization\n", nodeId.nodeName);
+            SQ_theLocalIOToClient->putOnNoticeQueue( MyNode->GetCreatorPid()
+                                                   , MyNode->GetCreatorVerifier()
+                                                   , Notice( buf )
+                                                   , NULL );
+            node->SetState( State_Down );
+            close( joinFd );
+            Monitor->SetJoinSock( -1 );
+            Monitor->SetIntegratingNid( -1 );
         }
     }
 
@@ -760,7 +845,7 @@
             MPI_Error_class( rc, &errClass );
             snprintf(buf, sizeof(buf), "[%s], cannot accept new monitor: %s.\n",
                      method_name, ErrorMsg(rc));
-            mon_log_write(MON_COMMACCEPT_12, SQ_LOG_ERR, buf);    
+            mon_log_write(MON_COMMACCEPT_15, SQ_LOG_ERR, buf);
 
         }
         else
@@ -814,7 +899,7 @@
             char buf[MON_STRING_BUF_SIZE];
             snprintf(buf, sizeof(buf), "[%s], cannot accept new monitor: %s.\n",
                      method_name, strerror(errno));
-            mon_log_write(MON_COMMACCEPT_13, SQ_LOG_ERR, buf);    
+            mon_log_write(MON_COMMACCEPT_16, SQ_LOG_ERR, buf);
 
         }
         else
@@ -870,7 +955,7 @@
         char buf[MON_STRING_BUF_SIZE];
         snprintf(buf, sizeof(buf), "[%s], pthread_sigmask error=%d\n",
                  method_name, rc);
-        mon_log_write(MON_COMMACCEPT_14, SQ_LOG_ERR, buf);
+        mon_log_write(MON_COMMACCEPT_17, SQ_LOG_ERR, buf);
     }
 
     // Enter thread processing loop
@@ -893,7 +978,7 @@
         char buf[MON_STRING_BUF_SIZE];
         snprintf(buf, sizeof(buf), "[%s], thread create error=%d\n",
                  method_name, rc);
-        mon_log_write(MON_COMMACCEPT_15, SQ_LOG_ERR, buf);
+        mon_log_write(MON_COMMACCEPT_18, SQ_LOG_ERR, buf);
     }
 
     TRACE_EXIT;
diff --git a/sqf/monitor/linux/internal.h b/sqf/monitor/linux/internal.h
old mode 100755
new mode 100644
index de2ea61..98c19cf
--- a/sqf/monitor/linux/internal.h
+++ b/sqf/monitor/linux/internal.h
@@ -381,6 +381,8 @@
     char nodeName[MPI_MAX_PROCESSOR_NAME];
     char commPort[MPI_MAX_PORT_NAME];
     char syncPort[MPI_MAX_PORT_NAME];
+    int  pnid;
+    int  creatorPNid;
     int  creatorShellPid;
     Verifier_t creatorShellVerifier;
     bool creator;  // NEW monitor set to true to tell creator it is the CREATOR
diff --git a/sqf/monitor/linux/redirector.cxx b/sqf/monitor/linux/redirector.cxx
index 18d5a0b..2609953 100755
--- a/sqf/monitor/linux/redirector.cxx
+++ b/sqf/monitor/linux/redirector.cxx
@@ -59,7 +59,7 @@
 extern CMonStats *MonStats;
 extern CReqQueue ReqQueue;
 
-const char *RedirEpollEventString( __uint32_t events )
+const char *EpollEventString( __uint32_t events )
 {
     static char str[80] = {0};
     
@@ -101,6 +101,29 @@
     return( str );
 }
 
+const char *EpollOpString( int op )
+{
+    static char str[15] = {0};
+
+    switch (op)
+    {
+        case EPOLL_CTL_ADD:
+            strcpy( str, "EPOLL_CTL_ADD" );
+            break;
+        case EPOLL_CTL_MOD:
+            strcpy( str, "EPOLL_CTL_MOD" );
+            break;
+        case EPOLL_CTL_DEL:
+            strcpy( str, "EPOLL_CTL_DEL" );
+            break;
+        default:
+            strcpy( str, "Invalid OP" );
+            break;
+    }
+
+    return( str );
+}
+
 CRedirect::CRedirect(int nid, int pid)
                     :fd_(-1)
                     ,activity_(false)
@@ -1888,7 +1911,7 @@
                 if (trace_settings & TRACE_REDIRECTION)
                     trace_printf("%s@%d for fd=%d, events=%d %s\n",
                                  method_name, __LINE__, fd, events, 
-                                 RedirEpollEventString(events));
+                                 EpollEventString(events));
 
                 // Acquire lock to prevent memory modifications during
                 // fork/exec (see uses of OFED_MUTEX define)