///////////////////////////////////////////////////////////////////////////////
//
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
//
///////////////////////////////////////////////////////////////////////////////

using namespace std;

#include "commaccept.h"
#include "monlogging.h"
#include "montrace.h"
#include "monitor.h"

#include <signal.h>
#include <unistd.h>

extern CCommAccept CommAccept;
extern CMonitor *Monitor;
extern CNode *MyNode;
extern CNodeContainer *Nodes;
extern int MyPNID;
extern char MyCommPort[MPI_MAX_PORT_NAME];
extern char *ErrorMsg (int error_code);
extern const char *StateString( STATE state);
extern CommType_t CommType;
extern bool IsRealCluster;

CCommAccept::CCommAccept()
           : accepting_(true)
           , shutdown_(false)
           , thread_id_(0)
{
    const char method_name[] = "CCommAccept::CCommAccept";
    TRACE_ENTRY;


    TRACE_EXIT;
}

CCommAccept::~CCommAccept()
{
    const char method_name[] = "CCommAccept::~CCommAccept";
    TRACE_ENTRY;


    TRACE_EXIT;
}


struct message_def *CCommAccept::Notice( const char *msgText )
{
    struct message_def *msg;

    const char method_name[] = "CCluster::Notice";
    TRACE_ENTRY;
    
    msg = new struct message_def;
    msg->type = MsgType_ReintegrationError;
    msg->noreply = true;
    msg->u.request.type = ReqType_Notice;
    strncpy( msg->u.request.u.reintegrate.msg, msgText,
             sizeof(msg->u.request.u.reintegrate.msg) );

    if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
        trace_printf("%s@%d - Reintegrate notice %s\n",
                     method_name, __LINE__, msgText );

    TRACE_EXIT;

    return msg;
}

// Send node names and port numbers for all existing monitors
// to the new monitor.
bool CCommAccept::sendNodeInfoMPI( MPI_Comm interComm )
{
    const char method_name[] = "CCommAccept::sendNodeInfoMPI";
    TRACE_ENTRY;
    bool sentData = true;

    int pnodeCount = Nodes->GetPNodesCount();

    nodeId_t *nodeInfo;
    nodeInfo = new nodeId_t[pnodeCount];
    int rc;

    CNode *node;

    for (int i=0; i<pnodeCount; ++i)
    {
        node = Nodes->GetNode( i );
        if ( node->GetState() == State_Up)
        {
            strncpy(nodeInfo[i].nodeName, node->GetName(),
                    sizeof(nodeInfo[i].nodeName));
            strncpy(nodeInfo[i].commPort, node->GetCommPort(),
                    sizeof(nodeInfo[i].commPort));

            if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
            {
                trace_printf( "%s@%d - Port for node %d (%s)\n"
                              "CommPort=%s\n"
                              "SyncPort=%s\n"
                            , method_name, __LINE__
                            , i, node->GetName()
                            , node->GetCommPort()
                            , node->GetSyncPort());
            }
        }
        else
        {
            if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
            {
                trace_printf("%s@%d - No port for node %d (node not up)\n",
                             method_name, __LINE__, i);
            }

            nodeInfo[i].nodeName[0] = '\0';
            nodeInfo[i].commPort[0] = '\0';
            nodeInfo[i].syncPort[0] = '\0';
        }
    }

    if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
    {
       trace_printf("%s@%d - Sending port info to new monitor\n", method_name,
                    __LINE__);
    }

    rc = Monitor->SendMPI((char *) nodeInfo, sizeof(nodeId_t)*pnodeCount, 0,
                       MON_XCHNG_DATA, interComm);
    if ( rc != MPI_SUCCESS )
    {
        char buf[MON_STRING_BUF_SIZE];
        snprintf(buf, sizeof(buf), "[%s], cannot send node/port info to "
                 " new monitor process: %s.\n"
                 , method_name, ErrorMsg(rc));
        mon_log_write(MON_COMMACCEPT_1, SQ_LOG_ERR, buf); 

        sentData = false;
    }

    delete [] nodeInfo;

    TRACE_EXIT;

    return sentData;
}

// Send node names and port numbers for all existing monitors
// to the new monitor.
bool CCommAccept::sendNodeInfoSock( int sockFd )
{
    const char method_name[] = "CCommAccept::sendNodeInfoSock";
    TRACE_ENTRY;
    bool sentData = true;

    int pnodeCount = Nodes->GetPNodesCount();

    nodeId_t *nodeInfo;
    size_t nodeInfoSize = (sizeof(nodeId_t) * pnodeCount);
    nodeInfo = (nodeId_t *) new char[nodeInfoSize];
    int rc;

    CNode *node;

    for (int i=0; i<pnodeCount; ++i)
    {
        node = Nodes->GetNodeByMap( i );
        if ( node->GetState() == State_Up)
        {
            strncpy(nodeInfo[i].nodeName, node->GetName(),
                    sizeof(nodeInfo[i].nodeName));
            strncpy(nodeInfo[i].commPort, node->GetCommPort(),
                    sizeof(nodeInfo[i].commPort));
            strncpy(nodeInfo[i].syncPort, node->GetSyncPort(),
                    sizeof(nodeInfo[i].syncPort));
            nodeInfo[i].pnid = node->GetPNid();
            nodeInfo[i].creatorPNid = (nodeInfo[i].pnid == MyPNID) ? MyPNID : -1;

            if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
            {
                trace_printf( "%s@%d - Node info for pnid=%d (%s)\n"
                              "        CommPort=%s\n"
                              "        SyncPort=%s\n"
                              "        creatorPNid=%d\n"
                            , method_name, __LINE__
                            , nodeInfo[i].pnid
                            , nodeInfo[i].nodeName
                            , nodeInfo[i].commPort
                            , nodeInfo[i].syncPort
                            , nodeInfo[i].creatorPNid );
            }
        }
        else
        {
            if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
            {
                trace_printf( "%s@%d - No nodeInfo[%d] for pnid=%d (%s) node not up!\n"
                            , method_name, __LINE__
                            , i, node->GetPNid(), node->GetName());
            }

            nodeInfo[i].pnid = -1;
            nodeInfo[i].nodeName[0] = '\0';
            nodeInfo[i].commPort[0] = '\0';
            nodeInfo[i].syncPort[0] = '\0';
            nodeInfo[i].creatorPNid = -1;
        }
    }

    if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
    {
        trace_printf( "%s@%d - Sending port info to new monitor\n"
                    , method_name, __LINE__);
        for (int i=0; i<pnodeCount; i++)
        {
            trace_printf( "Port info for pnid=%d\n"
                          "        nodeInfo[%d].nodeName=%s\n"
                          "        nodeInfo[%d].commPort=%s\n"
                          "        nodeInfo[%d].syncPort=%s\n"
                          "        nodeInfo[%d].creatorPNid=%d\n"
                        , nodeInfo[i].pnid
                        , i, nodeInfo[i].nodeName
                        , i, nodeInfo[i].commPort
                        , i, nodeInfo[i].syncPort
                        , i, nodeInfo[i].creatorPNid );
        }
    }

    rc = Monitor->SendSock( (char *) nodeInfo
                          , nodeInfoSize
                          , sockFd
                          , method_name );
    if ( rc )
    {
        char buf[MON_STRING_BUF_SIZE];
        snprintf(buf, sizeof(buf), "[%s], cannot send node/port info to "
                 " new monitor process: %s.\n"
                 , method_name, ErrorMsg(rc));
        mon_log_write(MON_COMMACCEPT_2, SQ_LOG_ERR, buf); 

        sentData = false;
    }

    delete [] nodeInfo;

    TRACE_EXIT;

    return sentData;
}

void CCommAccept::processNewComm(MPI_Comm interComm)
{
    const char method_name[] = "CCommAccept::processNewComm";
    TRACE_ENTRY;

    int rc;
    MPI_Comm intraComm;
    nodeId_t nodeId;

    mem_log_write(CMonLog::MON_CONNTONEWMON_2);

    MPI_Comm_set_errhandler( interComm, MPI_ERRORS_RETURN );

    // Get info about connecting monitor
    rc = Monitor->ReceiveMPI((char *) &nodeId, sizeof(nodeId_t),
                          MPI_ANY_SOURCE, MON_XCHNG_DATA, interComm);
    if ( rc != MPI_SUCCESS )
    {   // Handle error
        MPI_Comm_free( &interComm );

        char buf[MON_STRING_BUF_SIZE];
        snprintf(buf, sizeof(buf), "[%s], unable to obtain node id from new "
                 "monitor: %s.\n", method_name, ErrorMsg(rc));
        mon_log_write(MON_COMMACCEPT_3, SQ_LOG_ERR, buf);    
        return;
    }

    if ( nodeId.creator )
    {
        // Indicate that this node is the creator monitor for the node up
        // operation.
        MyNode->SetCreator( true
                          , nodeId.creatorShellPid
                          , nodeId.creatorShellVerifier );
    }
    
    if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
    {
        trace_printf( "%s@%d - Accepted connection from node %s, port=%s, "
                      "creator=%d, creatorShellPid=%d:%d\n"
                    , method_name, __LINE__
                    , nodeId.nodeName
                    , nodeId.commPort
                    , nodeId.creator
                    , nodeId.creatorShellPid
                    , nodeId.creatorShellVerifier );
    }

    CNode * node = Nodes->GetNode( nodeId.nodeName );
    int pnid = -1;
    if ( node != NULL )
    {   // Store port number for the node
        pnid = node->GetPNid();
        node->SetCommPort( nodeId.commPort );
        node->SetSyncPort( nodeId.syncPort );
    }
    else
    {
        MPI_Comm_free( &interComm );

        char buf[MON_STRING_BUF_SIZE];
        snprintf(buf, sizeof(buf), "[%s], got connection from unknown "
                 "node %s. Ignoring it.\n", method_name, nodeId.nodeName);
        mon_log_write(MON_COMMACCEPT_4, SQ_LOG_ERR, buf);    

        return;
    }

    // Merge the inter-communicators obtained from the connect/accept
    // between this monitor and the connecting monitor.
    rc = MPI_Intercomm_merge( interComm, 0, &intraComm );
    if ( rc )
    {
        MPI_Comm_free( &interComm );

        char buf[MON_STRING_BUF_SIZE];
        snprintf(buf, sizeof(buf), "[%s], Cannot merge intercomm: %s.\n",
                 method_name, ErrorMsg(rc));
        mon_log_write(MON_COMMACCEPT_5, SQ_LOG_ERR, buf);

#ifndef NAMESERVER_PROCESS
        if ( MyNode->IsCreator() )
        {
            snprintf(buf, sizeof(buf), "Cannot merge intercomm for node %s: %s.\n",
                     nodeId.nodeName, ErrorMsg(rc));
            SQ_theLocalIOToClient->putOnNoticeQueue( MyNode->GetCreatorPid()
                                                   , MyNode->GetCreatorVerifier()
                                                   , Notice( buf )
                                                   , NULL );
        }
#endif
        return;
    }

    MPI_Comm_set_errhandler( intraComm, MPI_ERRORS_RETURN );

    mem_log_write(CMonLog::MON_CONNTONEWMON_4, pnid);

    if ( MyNode->IsCreator() )
    {  // Send port and node info for existing nodes
        mem_log_write(CMonLog::MON_CONNTONEWMON_3, pnid);

        if ( !sendNodeInfoMPI( interComm ) )
        {   // Had problem communicating with new monitor
            MPI_Comm_free( &intraComm );
            MPI_Comm_free( &interComm );

#ifndef NAMESERVER_PROCESS
            char buf[MON_STRING_BUF_SIZE];
            snprintf(buf, sizeof(buf), "Cannot send node/port info to "
                     " node %s monitor: %s.\n", nodeId.nodeName, ErrorMsg(rc));
            SQ_theLocalIOToClient->putOnNoticeQueue( MyNode->GetCreatorPid()
                                                   , MyNode->GetCreatorVerifier()
                                                   , Notice( buf )
                                                   , NULL );
#endif
            return;
        }

        Monitor->SetJoinComm( interComm );

        Monitor->SetIntegratingPNid( pnid );

        Monitor->addNewComm( pnid, 1, intraComm );

        node->SetState( State_Merging ); 
    }
    else
    {   // No longer need inter-comm from "MPI_Comm_accept"

        Monitor->addNewComm( pnid, 1, intraComm );

        node->SetState( State_Merging ); 

        if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
        {
            trace_printf( "%s@%d - Sending ready flag to new monitor\n",
                          method_name, __LINE__);
        }

        // Tell connecting monitor that we are ready to integrate it.
        int readyFlag = 1;
        rc = Monitor->SendMPI((char *) &readyFlag, sizeof(readyFlag), 0,
                           MON_XCHNG_DATA, interComm);
        if ( rc != MPI_SUCCESS )
        {
            MPI_Comm_free( &interComm );

            char buf[MON_STRING_BUF_SIZE];
            snprintf(buf, sizeof(buf), "[%s], unable to send connect "
                     "acknowledgement to new monitor: %s.\n", method_name,
                     ErrorMsg(rc));
            mon_log_write(MON_COMMACCEPT_6, SQ_LOG_ERR, buf);    

#ifndef NAMESERVER_PROCESS
            if ( MyNode->IsCreator() )
            {
                snprintf(buf, sizeof(buf), "Cannot send connect acknowledgment "
                         "to new monitor: %s.\n", ErrorMsg(rc));
                SQ_theLocalIOToClient->putOnNoticeQueue( MyNode->GetCreatorPid()
                                                       , MyNode->GetCreatorVerifier()
                                                       , Notice( buf )
                                                       , NULL );
            }
#endif

            return;
        }

        MPI_Comm_free( &interComm );
    }

    if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
    {
        trace_printf( "%s@%d - Connected to new monitor for node %d\n",
                      method_name, __LINE__, pnid );
    }

    // Ideally the following logic should be done in another thread
    // so this thread can post another accept without delay.  For
    // initial implementation simplicity this work is being done 
    // here for now
    if ( MyNode->IsCreator() )
    {
        mem_log_write(CMonLog::MON_CONNTONEWMON_5, pnid);

        // Get status from new monitor indicating whether
        // it is fully connected to other monitors.
        nodeStatus_t nodeStatus;
        rc = Monitor->ReceiveMPI((char *) &nodeStatus,
                              sizeof(nodeStatus_t),
                              MPI_ANY_SOURCE, MON_XCHNG_DATA,
                              interComm);
        if ( rc != MPI_SUCCESS )
        {   // Handle error
            char buf[MON_STRING_BUF_SIZE];
            snprintf(buf, sizeof(buf), "[%s], unable to obtain "
                     "node status from new monitor: %s.\n",
                     method_name, ErrorMsg(rc));
            mon_log_write(MON_COMMACCEPT_7, SQ_LOG_ERR, buf);

#ifndef NAMESERVER_PROCESS
            snprintf(buf, sizeof(buf), "Unable to obtain node status from "
                     "node %s monitor: %s.\n", nodeId.nodeName, ErrorMsg(rc));
            SQ_theLocalIOToClient->putOnNoticeQueue( MyNode->GetCreatorPid()
                                                   , MyNode->GetCreatorVerifier()
                                                   , Notice( buf )
                                                   , NULL );
#endif

            node->SetState( State_Down );

            MPI_Comm_free ( &interComm );
            Monitor->ResetIntegratingPNid();
        }
        else
        {
            mem_log_write(CMonLog::MON_CONNTONEWMON_6, node->GetPNid(),
                          nodeStatus.state);

            if (nodeStatus.state == State_Up)
            {
                // communicate the change and handle it after sync
                // in ImAlive
                node->SetChangeState( true );
            }
            else
            {
#ifndef NAMESERVER_PROCESS
                char buf[MON_STRING_BUF_SIZE];
                snprintf(buf, sizeof(buf), "Node %s monitor failed to complete "
                         "initialization\n", nodeId.nodeName);
                SQ_theLocalIOToClient->putOnNoticeQueue( MyNode->GetCreatorPid()
                                                       , MyNode->GetCreatorVerifier()
                                                       , Notice( buf )
                                                       , NULL );
#endif
                node->SetState( State_Down ); 

                MPI_Comm_free ( &interComm );
                Monitor->ResetIntegratingPNid();
            }
        }
    }

    TRACE_EXIT;
}

void CCommAccept::processNewSock( int joinFd )
{
    const char method_name[] = "CCommAccept::processNewSock";
    TRACE_ENTRY;

    int rc;
    int integratingFd; 
    nodeId_t nodeId;
    CNode *node;

    mem_log_write(CMonLog::MON_CONNTONEWMON_2);

    // Get info about connecting monitor
    rc = Monitor->ReceiveSock( (char *) &nodeId
                             , sizeof(nodeId_t)
                             , joinFd
                             , method_name );
    if ( rc )
    {   // Handle error
        close( joinFd );
        char buf[MON_STRING_BUF_SIZE];
        snprintf(buf, sizeof(buf), "[%s], unable to obtain node id from new "
                 "monitor: %s.\n", method_name, ErrorMsg(rc));
        mon_log_write(MON_COMMACCEPT_8, SQ_LOG_ERR, buf);    
        return;
    }

    if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
    {
        trace_printf( "%s@%d - Accepted connection from pnid=%d\n"
                      "        nodeId.nodeName=%s\n"
                      "        nodeId.commPort=%s\n"
                      "        nodeId.syncPort=%s\n"
                      "        nodeId.creatorPNid=%d\n"
                      "        nodeId.creator=%d\n"
                      "        nodeId.creatorShellPid=%d\n"
                      "        nodeId.creatorShellVerifier=%d\n"
                      "        nodeId.ping=%d\n"
                    , method_name, __LINE__
                    , nodeId.pnid
                    , nodeId.nodeName
                    , nodeId.commPort
                    , nodeId.syncPort
                    , nodeId.creatorPNid
                    , nodeId.creator
                    , nodeId.creatorShellPid
                    , nodeId.creatorShellVerifier
                    , nodeId.ping );
    }

#ifdef NAMESERVER_PROCESS
    if ( IsRealCluster )
        node = Nodes->GetNode( nodeId.nodeName );
    else
        node = Nodes->GetNode( nodeId.pnid );
#else
    node = Nodes->GetNode( nodeId.nodeName );
#endif

    if ( node == NULL )
    {
        close( joinFd );

        char buf[MON_STRING_BUF_SIZE];
        snprintf( buf, sizeof(buf)
                , "[%s], got connection from unknown "
                  "node %d (%s). Ignoring it.\n"
                , method_name
                , nodeId.pnid
                , nodeId.nodeName);
        mon_log_write(MON_COMMACCEPT_9, SQ_LOG_ERR, buf);

        // Requests is complete, begin accepting connections again
        CommAccept.startAccepting();

        return;
    }

    if ( nodeId.ping )
    {
        // Reply with my node info
        nodeId.pnid = MyPNID;
        strcpy(nodeId.nodeName, MyNode->GetName());
        strcpy(nodeId.commPort, MyNode->GetCommPort());
        strcpy(nodeId.syncPort, MyNode->GetSyncPort());
        nodeId.ping = true;
        nodeId.creatorPNid = -1;
        nodeId.creator = false;
        nodeId.creatorShellPid = -1;
        nodeId.creatorShellVerifier = -1;

        if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
        {
            trace_printf( "Sending my nodeInfo.pnid=%d\n"
                          "        nodeInfo.nodeName=%s\n"
                          "        nodeInfo.commPort=%s\n"
                          "        nodeInfo.syncPort=%s\n"
                          "        nodeInfo.ping=%d\n"
                        , nodeId.pnid
                        , nodeId.nodeName
                        , nodeId.commPort
                        , nodeId.syncPort
                        , nodeId.ping );
        }
    
        rc = Monitor->SendSock( (char *) &nodeId
                              , sizeof(nodeId_t)
                              , joinFd
                              , method_name );
        if ( rc )
        {
            close( joinFd );
            char buf[MON_STRING_BUF_SIZE];
            snprintf( buf, sizeof(buf)
                    , "[%s], Cannot send ping node info to node %s: (%s)\n"
                    , method_name, node?node->GetName():"", ErrorMsg(rc));
            mon_log_write(MON_COMMACCEPT_19, SQ_LOG_ERR, buf);    
        }

        // Requests is complete, begin accepting connections again
        CommAccept.startAccepting();

        return;
    }
    
    if ( nodeId.creator )
    {
        // Indicate that this node is the creator monitor for the node up
        // operation.
        MyNode->SetCreator( true
                          , nodeId.creatorShellPid
                          , nodeId.creatorShellVerifier );
    }
    
    // Sanity check, re-integrating node must be down
    if ( node->GetState() != State_Down )
    {
        int intdata = -1;
        rc = Monitor->SendSock( (char *) &intdata
                              , 0
                              , joinFd
                              , method_name );

        close( joinFd );

        char buf[MON_STRING_BUF_SIZE];
        snprintf( buf, sizeof(buf)
                , "[%s], got connection from node %s (pnid=%d). "
                  "Node not down, node state=%s\n"
                , method_name, nodeId.nodeName, nodeId.pnid
                , StateString(node->GetState()));
        mon_log_write(MON_COMMACCEPT_10, SQ_LOG_ERR, buf);

        // Requests is complete, begin accepting connections again
        CommAccept.startAccepting();

        return;
    }

    int pnid = -1;

    // Store port numbers for the node
    char commPort[MPI_MAX_PORT_NAME];
    char syncPort[MPI_MAX_PORT_NAME];
    strncpy(commPort, nodeId.commPort, MPI_MAX_PORT_NAME);
    strncpy(syncPort, nodeId.syncPort, MPI_MAX_PORT_NAME);
    char *pch1;
    char *pch2;
    pnid = nodeId.pnid;

    node->SetCommPort( commPort );
    pch1 = strtok (commPort,":");
    pch1 = strtok (NULL,":");
    node->SetCommSocketPort( atoi(pch1) );

    node->SetSyncPort( syncPort );
    pch2 = strtok (syncPort,":");
    pch2 = strtok (NULL,":");
    node->SetSyncSocketPort( atoi(pch2) );

    if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
    {
        trace_printf( "%s@%d - Setting node %d (%s), commPort=%s(%d), syncPort=%s(%d)\n"
                    , method_name, __LINE__
                    , node->GetPNid()
                    , node->GetName()
                    , pch1, atoi(pch1)
                    , pch2, atoi(pch2) );
    }

    mem_log_write(CMonLog::MON_CONNTONEWMON_4, pnid);

    if ( MyNode->IsCreator() )
    {  // Send port and node info for existing nodes
        mem_log_write(CMonLog::MON_CONNTONEWMON_3, pnid);

        if ( !sendNodeInfoSock( joinFd ) )
        {   // Had problem communicating with new monitor
            close( joinFd );

#ifndef NAMESERVER_PROCESS
            char buf[MON_STRING_BUF_SIZE];
            snprintf(buf, sizeof(buf), "Cannot send node/port info to "
                     " node %s monitor: %s.\n", nodeId.nodeName, ErrorMsg(rc));
            SQ_theLocalIOToClient->putOnNoticeQueue( MyNode->GetCreatorPid()
                                                   , MyNode->GetCreatorVerifier()
                                                   , Notice( buf )
                                                   , NULL );
#endif
            return;
        }
    }
    else
    {   // No longer need joinFd

        if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
        {
            trace_printf( "%s@%d - Sending ready indication to new monitor\n",
                          method_name, __LINE__);
        }

        // Tell connecting monitor that we are ready to integrate it.
        int mypnid = MyPNID;
        rc = Monitor->SendSock( (char *) &mypnid
                              , sizeof(mypnid)
                              , joinFd
                              , method_name );
        if ( rc )
        {
            close( joinFd );

            char buf[MON_STRING_BUF_SIZE];
            snprintf(buf, sizeof(buf), "[%s], unable to send connect "
                     "acknowledgement to new monitor: %s.\n", method_name,
                     ErrorMsg(rc));
            mon_log_write(MON_COMMACCEPT_11, SQ_LOG_ERR, buf);
            return;
        }

        // Connect to new monitor
        integratingFd = Monitor->MkCltSock( node->GetSyncPort() );
        Monitor->addNewSock( pnid, 1, integratingFd );

        node->SetState( State_Merging ); 

        close( joinFd );
    }

    if ( MyNode->IsCreator() )
    {
        if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
        {
            trace_printf( "%s@%d - Sending my pnid to new monitor\n",
                          method_name, __LINE__);
        }

        // Sanity check, tell integrating monitor my creator pnid
        int mypnid = MyPNID;
        rc = Monitor->SendSock( (char *) &mypnid
                              , sizeof(mypnid)
                              , joinFd
                              , method_name );
        if ( rc )
        {
            close( joinFd );

            char buf[MON_STRING_BUF_SIZE];
            snprintf(buf, sizeof(buf), "[%s], unable to send pnid "
                     "acknowledgement to new monitor: %s.\n", method_name,
                     ErrorMsg(rc));
            mon_log_write(MON_COMMACCEPT_12, SQ_LOG_ERR, buf);

#ifndef NAMESERVER_PROCESS
            snprintf(buf, sizeof(buf), "Cannot send pnid acknowledgment "
                     "to new monitor: %s.\n", ErrorMsg(rc));
            SQ_theLocalIOToClient->putOnNoticeQueue( MyNode->GetCreatorPid()
                                                   , MyNode->GetCreatorVerifier()
                                                   , Notice( buf )
                                                   , NULL );
#endif
            return;
        }

        if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
        {
            trace_printf( "%s@%d - Wait for ok to connect to new monitor in node %d\n",
                          method_name, __LINE__, pnid );
        }

        // Get new monitor acknowledgement that creator can connect
        int newpnid = -1;
        rc = Monitor->ReceiveSock( (char *) &newpnid
                                 , sizeof(newpnid)
                                 , joinFd
                                 , method_name );
        if ( rc || newpnid != pnid )
        {
            close( joinFd );

            char buf[MON_STRING_BUF_SIZE];
            snprintf(buf, sizeof(buf), "[%s], unable to send connect "
                  "acknowledgement to new monitor: %s.\n", method_name,
                  ErrorMsg(rc));
            mon_log_write(MON_COMMACCEPT_13, SQ_LOG_ERR, buf);

#ifndef NAMESERVER_PROCESS
            snprintf(buf, sizeof(buf), "Cannot receive connect acknowledgment "
                  "to new monitor: %s.\n", ErrorMsg(rc));
            SQ_theLocalIOToClient->putOnNoticeQueue( MyNode->GetCreatorPid()
                                                , MyNode->GetCreatorVerifier()
                                                , Notice( buf )
                                                , NULL );
#endif
            return;
        }

        Monitor->SetJoinSock( joinFd );

        Monitor->SetIntegratingPNid( pnid );

        // Connect to new monitor
        integratingFd = Monitor->MkCltSock( node->GetSyncPort() );
        Monitor->addNewSock( pnid, 1, integratingFd );

        node->SetState( State_Merging ); 

        if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
        {
            trace_printf( "%s@%d - Connected to new monitor in node %d\n",
                          method_name, __LINE__, pnid );
        }

        mem_log_write(CMonLog::MON_CONNTONEWMON_5, pnid);

        // Get status from new monitor indicating whether
        // it is fully connected to other monitors.
        nodeStatus_t nodeStatus;
        rc = Monitor->ReceiveSock( (char *) &nodeStatus
                                 , sizeof(nodeStatus_t)
                                 , joinFd
                                 , method_name );
        if ( rc != MPI_SUCCESS )
        {   // Handle error
            char buf[MON_STRING_BUF_SIZE];
            snprintf(buf, sizeof(buf), "[%s], unable to obtain "
                     "node status from new monitor: %s.\n",
                     method_name, ErrorMsg(rc));
            mon_log_write(MON_COMMACCEPT_14, SQ_LOG_ERR, buf);

#ifndef NAMESERVER_PROCESS
            snprintf(buf, sizeof(buf), "Unable to obtain node status from "
                     "node %s monitor: %s.\n", nodeId.nodeName, ErrorMsg(rc));
            SQ_theLocalIOToClient->putOnNoticeQueue( MyNode->GetCreatorPid()
                                                   , MyNode->GetCreatorVerifier()
                                                   , Notice( buf )
                                                   , NULL );
#endif

            node->SetState( State_Down );
            close( joinFd );
            Monitor->ResetIntegratingPNid();
            return;
        }

        mem_log_write(CMonLog::MON_CONNTONEWMON_6, node->GetPNid(),
                      nodeStatus.state);

        if (nodeStatus.state == State_Up)
        {
            // communicate the change and handle it after sync
            // in ImAlive
            node->SetChangeState( true );
        }
        else
        {
#ifndef NAMESERVER_PROCESS
            char buf[MON_STRING_BUF_SIZE];
            snprintf(buf, sizeof(buf), "Node %s monitor failed to complete "
                     "initialization\n", nodeId.nodeName);
            SQ_theLocalIOToClient->putOnNoticeQueue( MyNode->GetCreatorPid()
                                                   , MyNode->GetCreatorVerifier()
                                                   , Notice( buf )
                                                   , NULL );
#endif
            node->SetState( State_Down );
            close( joinFd );
            Monitor->ResetIntegratingPNid();
        }
    }

    TRACE_EXIT;
}

void CCommAccept::commAcceptor()
{
    const char method_name[] = "CCommAccept::commAcceptor";
    TRACE_ENTRY;

    switch( CommType )
    {
        case CommType_InfiniBand:
            commAcceptorIB();
            break;
        case CommType_Sockets:
            commAcceptorSock();
            break;
        default:
            // Programmer bonehead!
            abort();
    }

    TRACE_EXIT;
    pthread_exit(0);
}

// commAcceptor thread main processing loop.  Keep an MPI_Comm_accept
// request outstanding.  After accepting a connection process it.
void CCommAccept::commAcceptorIB()
{
    const char method_name[] = "CCommAccept::commAcceptorIB";
    TRACE_ENTRY;

    int rc;
    int errClass;
    MPI_Comm interComm;

    if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
    {
        trace_printf("%s@%d thread %lx starting\n", method_name,
                     __LINE__, thread_id_);
    }

    while (true)
    {
        if (isAccepting())
        {
            if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
            {
                trace_printf("%s@%d - Posting socket accept\n", method_name, __LINE__);
            }
    
            mem_log_write(CMonLog::MON_CONNTONEWMON_1);
            interComm = MPI_COMM_NULL;
            rc = MPI_Comm_accept( MyCommPort, MPI_INFO_NULL, 0, MPI_COMM_SELF,
                                  &interComm );
            // Stop accepting connections until this request completes
            CommAccept.stopAccepting();
        }
        else
        {
            if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
            {
                trace_printf("%s@%d - Waiting to post accept\n", method_name, __LINE__);
            }
    
            CLock::lock();
            CLock::wait();
            CLock::unlock();
            if (!shutdown_)
            {
                continue; // Ok to accept another connection
            }
        }

        if (shutdown_)
        {   // We are being notified to exit.
            break;
        }

        if ( rc )
        {
            char buf[MON_STRING_BUF_SIZE];
            MPI_Error_class( rc, &errClass );
            snprintf(buf, sizeof(buf), "[%s], cannot accept remote monitor: %s.\n",
                     method_name, ErrorMsg(rc));
            mon_log_write(MON_COMMACCEPT_15, SQ_LOG_ERR, buf);

        }
        else
        {
            processNewComm( interComm );
        }
    }

    if ( interComm != MPI_COMM_NULL ) MPI_Comm_free ( &interComm );

    if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
        trace_printf("%s@%d thread %lx exiting\n", method_name,
                     __LINE__, pthread_self());

    TRACE_EXIT;
}

// commAcceptor thread main processing loop.  Keep an accept
// request outstanding.  After accepting a connection process it.
void CCommAccept::commAcceptorSock()
{
    const char method_name[] = "CCommAccept::commAcceptorSock";
    TRACE_ENTRY;

    int joinFd = -1;

    if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
    {
        trace_printf("%s@%d thread %lx starting\n", method_name,
                     __LINE__, thread_id_);
    }

    while (true)
    {
        if (isAccepting())
        {
            if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
            {
                trace_printf("%s@%d - Posting accept\n", method_name, __LINE__);
            }
    
            mem_log_write(CMonLog::MON_CONNTONEWMON_1);
            joinFd = Monitor->AcceptCommSock();
            // Stop accepting connections until this request completes
            CommAccept.stopAccepting();
        }
        else
        {
            if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
            {
                trace_printf("%s@%d - Waiting to post accept\n", method_name, __LINE__);
            }
    
            CLock::lock();
            CLock::wait();
            CLock::unlock();
            if (!shutdown_)
            {
                continue; // Ok to accept another connection
            }
        }
        
        if (shutdown_)
        {   // We are being notified to exit.
            break;
        }

        if ( joinFd < 0 )
        {
            char buf[MON_STRING_BUF_SIZE];
            snprintf(buf, sizeof(buf), "[%s], cannot accept new monitor: %s.\n",
                     method_name, strerror(errno));
            mon_log_write(MON_COMMACCEPT_16, SQ_LOG_ERR, buf);

        }
        else
        {
            processNewSock( joinFd );
        }
    }

    if ( !(joinFd < 0) ) close( joinFd );

    if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
        trace_printf("%s@%d thread %lx exiting\n", method_name,
                     __LINE__, pthread_self());

    TRACE_EXIT;
}

void CCommAccept::shutdownWork(void)
{
    const char method_name[] = "CCommAccept::shutdownWork";
    TRACE_ENTRY;

    // Set flag that tells the commAcceptor thread to exit
    shutdown_ = true;   
    Monitor->ConnectToSelf();
    CLock::wakeOne();

    if (trace_settings & TRACE_INIT)
        trace_printf("%s@%d waiting for commAccept thread %lx to exit.\n",
                     method_name, __LINE__, thread_id_);

    // Wait for commAcceptor thread to exit
    pthread_join(thread_id_, NULL);

    TRACE_EXIT;
}

// Initialize commAcceptor thread
static void *commAccept(void *arg)
{
    const char method_name[] = "commAccept";
    TRACE_ENTRY;

    // Parameter passed to the thread is an instance of the CommAccept object
    CCommAccept *cao = (CCommAccept *) arg;

    // Mask all allowed signals 
    sigset_t  mask;
    sigfillset(&mask);
    sigdelset(&mask, SIGPROF); // allows profiling such as google profiler
    int rc = pthread_sigmask(SIG_SETMASK, &mask, NULL);
    if (rc != 0)
    {
        char buf[MON_STRING_BUF_SIZE];
        snprintf(buf, sizeof(buf), "[%s], pthread_sigmask error=%d\n",
                 method_name, rc);
        mon_log_write(MON_COMMACCEPT_17, SQ_LOG_ERR, buf);
    }

    // Enter thread processing loop
    cao->commAcceptor();

    TRACE_EXIT;
    return NULL;
}


// Create a commAcceptor thread
void CCommAccept::start()
{
    const char method_name[] = "CCommAccept::start";
    TRACE_ENTRY;

    int rc = pthread_create(&thread_id_, NULL, commAccept, this);
    if (rc != 0)
    {
        char buf[MON_STRING_BUF_SIZE];
        snprintf(buf, sizeof(buf), "[%s], thread create error=%d\n",
                 method_name, rc);
        mon_log_write(MON_COMMACCEPT_18, SQ_LOG_ERR, buf);
    }

    TRACE_EXIT;
}

void CCommAccept::startAccepting( void ) 
{
    const char method_name[] = "CCommAccept::startAccepting";
    TRACE_ENTRY;

    CAutoLock lock( getLocker( ) );
    
    if ( !accepting_ )
    {
        accepting_ = true;
        if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
        {
            trace_printf( "%s@%d - Enabling accepting_=%d\n"
                        , method_name, __LINE__, accepting_ );
        }
        CLock::wakeOne();
    }

    TRACE_EXIT;
}

void CCommAccept::stopAccepting( void ) 
{
    const char method_name[] = "CCommAccept::stopAccepting";
    TRACE_ENTRY;

    CAutoLock lock( getLocker( ) );
    
    if ( accepting_ )
    {
        accepting_ = false;
        if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
        {
            trace_printf( "%s@%d - Disabling accepting_=%d\n"
                        , method_name, __LINE__, accepting_ );
        }
        CLock::wakeOne();
    }

    TRACE_EXIT;
}
