| /////////////////////////////////////////////////////////////////////////////// |
| // |
| // @@@ START COPYRIGHT @@@ |
| // |
| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| // |
| // @@@ END COPYRIGHT @@@ |
| // |
| /////////////////////////////////////////////////////////////////////////////// |
| |
| #include "nstype.h" |
| |
| using namespace std; |
| |
| #include <signal.h> |
| #include <stdio.h> |
| #include <unistd.h> |
| #include <netdb.h> |
| #include <sys/socket.h> |
| |
| #include "nscommacceptmon.h" |
| #include "monlogging.h" |
| #include "montrace.h" |
| #include "monitor.h" |
| |
| extern CCommAcceptMon *CommAcceptMon; |
| extern CMonitor *Monitor; |
| extern CNode *MyNode; |
| extern CNodeContainer *Nodes; |
| extern int MyPNID; |
| extern char Node_name[MPI_MAX_PROCESSOR_NAME]; |
| extern char MyMon2NsPort[MPI_MAX_PORT_NAME]; |
| extern char *ErrorMsg (int error_code); |
| extern const char *StateString( STATE state); |
| extern CommType_t CommType; |
| extern CReqQueue ReqQueue; |
| extern bool IsRealCluster; |
| |
| static void *mon2nsProcess(void *arg); |
| |
| CCommAcceptMon::CCommAcceptMon() |
| : accepting_(false) |
| , shutdown_(false) |
| , ioWaitTimeout_(EPOLL_IO_WAIT_TIMEOUT_MSEC) |
| , ioRetryCount_(EPOLL_IO_RETRY_COUNT) |
| , mon2nsSock_(-1) |
| , mon2NsSocketPort_(-1) |
| , mon2NsPort_("") |
| , thread_id_(0) |
| , process_thread_id_(0) |
| { |
| const char method_name[] = "CCommAcceptMon::CCommAcceptMon"; |
| TRACE_ENTRY; |
| |
| // Use the EPOLL timeout and retry values |
| char *ioWaitTimeoutEnv = getenv( "SQ_MON_EPOLL_WAIT_TIMEOUT" ); |
| if ( ioWaitTimeoutEnv ) |
| { |
| // Timeout in seconds |
| ioWaitTimeout_ = atoi( ioWaitTimeoutEnv ); |
| char *ioRetryCountEnv = getenv( "SQ_MON_EPOLL_RETRY_COUNT" ); |
| if ( ioRetryCountEnv ) |
| { |
| ioRetryCount_ = atoi( ioRetryCountEnv ); |
| } |
| if ( ioRetryCount_ > EPOLL_IO_RETRY_COUNT_MAX ) |
| { |
| ioRetryCount_ = EPOLL_IO_RETRY_COUNT_MAX; |
| } |
| } |
| |
| int mon2nsPort = 0; |
| int val = 0; |
| unsigned char addr[4] = {0,0,0,0}; |
| struct hostent *he; |
| |
| he = gethostbyname( Node_name ); |
| if ( !he ) |
| { |
| char ebuff[256]; |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s@%d] gethostbyname(%s) error: %s\n" |
| , method_name, __LINE__ |
| , Node_name, strerror_r( h_errno, ebuff, 256 ) ); |
| mon_log_write( NS_NSCOMMACCEPT_NSCOMMACCEPT_1, SQ_LOG_CRIT, buf ); |
| |
| mon_failure_exit(); |
| } |
| memcpy( addr, he->h_addr, 4 ); |
| |
| char *env = getenv("NS_M2N_COMM_PORT"); |
| if ( env ) |
| { |
| val = atoi(env); |
| if ( val > 0) |
| { |
| if ( !IsRealCluster ) |
| { |
| val += MyPNID; |
| } |
| mon2nsPort = val; |
| } |
| } |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d NS_M2N_COMM_PORT Node_name=%s, env=%s, mon2nsPort=%d, val=%d\n" |
| , method_name, __LINE__ |
| , Node_name, env, mon2nsPort, val ); |
| } |
| |
| mon2nsSock_ = CComm::Listen( &mon2nsPort ); |
| if ( mon2nsSock_ < 0 ) |
| { |
| char ebuff[256]; |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s@%d] MkSrvSock(NS_M2N_COMM_PORT=%d) error: %s\n" |
| , method_name, __LINE__, mon2nsPort |
| , strerror_r( errno, ebuff, 256 ) ); |
| mon_log_write( NS_NSCOMMACCEPT_NSCOMMACCEPT_2, SQ_LOG_CRIT, buf ); |
| |
| mon_failure_exit(); |
| } |
| else |
| { |
| snprintf( MyMon2NsPort, sizeof(MyMon2NsPort) |
| , "%d.%d.%d.%d:%d" |
| , (int)((unsigned char *)addr)[0] |
| , (int)((unsigned char *)addr)[1] |
| , (int)((unsigned char *)addr)[2] |
| , (int)((unsigned char *)addr)[3] |
| , mon2nsPort ); |
| setMon2NsPort( MyMon2NsPort ); |
| setMon2NsSocketPort( mon2nsPort ); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| trace_printf( "%s@%d Initialized my mon2ns comm socket port, " |
| "pnid=%d (%s:%s) (Mon2NsPort=%s, Mon2NsSocketPort=%d)\n" |
| , method_name, __LINE__ |
| , MyPNID, MyNode->GetName(), MyMon2NsPort |
| , getMon2NsPort() |
| , getMon2NsSocketPort() ); |
| |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| CCommAcceptMon::~CCommAcceptMon() |
| { |
| const char method_name[] = "CCommAcceptMon::~CCommAcceptMon"; |
| TRACE_ENTRY; |
| |
| TRACE_EXIT; |
| } |
| |
| void CCommAcceptMon::monReqDeleteProcess( struct message_def* msg, int sockFd ) |
| { |
| const char method_name[] = "CCommAcceptMon::monReqDeleteProcess"; |
| TRACE_ENTRY; |
| |
| if ( trace_settings & ( TRACE_NS | TRACE_REQUEST) ) |
| { |
| trace_printf( "%s@%d - Received monitor request delete-process data.\n" |
| " msg.del_process_ns.nid=%d\n" |
| " msg.del_process_ns.pid=%d\n" |
| " msg.del_process_ns.verifier=%d\n" |
| " msg.del_process_ns.process_name=%s\n" |
| " msg.del_process_ns.target_nid=%d\n" |
| " msg.del_process_ns.target_pid=%d\n" |
| " msg.del_process_ns.target_verifier=%d\n" |
| " msg.del_process_ns.target_process_name=%s\n" |
| " msg.del_process_ns.target_abended=%d\n" |
| , method_name, __LINE__ |
| , msg->u.request.u.del_process_ns.nid |
| , msg->u.request.u.del_process_ns.pid |
| , msg->u.request.u.del_process_ns.verifier |
| , msg->u.request.u.del_process_ns.process_name |
| , msg->u.request.u.del_process_ns.target_nid |
| , msg->u.request.u.del_process_ns.target_pid |
| , msg->u.request.u.del_process_ns.target_verifier |
| , msg->u.request.u.del_process_ns.target_process_name |
| , msg->u.request.u.del_process_ns.target_abended |
| ); |
| } |
| |
| CExternalReq::reqQueueMsg_t msgType; |
| msgType = CExternalReq::NonStartupMsg; |
| int nid = msg->u.request.u.del_process_ns.nid; |
| int pid = msg->u.request.u.del_process_ns.pid; |
| // Place new request on request queue |
| ReqQueue.enqueueReq(msgType, nid, pid, sockFd, msg); |
| |
| TRACE_EXIT; |
| } |
| |
| void CCommAcceptMon::monReqExec( CExternalReq * request ) |
| { |
| const char method_name[] = "CCommAcceptMon::monReqExec"; |
| TRACE_ENTRY; |
| |
| if ( trace_settings & ( TRACE_NS | TRACE_REQUEST_DETAIL ) ) |
| { |
| request->populateRequestString(); |
| trace_printf("%s@%d request = %s\n", method_name, __LINE__, request->requestString()); |
| } |
| request->validateObj(); |
| request->performRequest(); |
| delete request; |
| |
| TRACE_EXIT; |
| } |
| |
| void CCommAcceptMon::monReqNameServerStop( struct message_def* msg, int sockFd ) |
| { |
| const char method_name[] = "CCommAcceptMon::monReqNameServerStop"; |
| TRACE_ENTRY; |
| |
| if ( trace_settings & ( TRACE_NS | TRACE_REQUEST) ) |
| { |
| trace_printf( "%s@%d - Received monitor request down-nameserver data.\n" |
| " msg.nameserver_stop.nid=%d\n" |
| " msg.nameserver_stop.pid=%d\n" |
| " msg.nameserver_stop.node_name=%s\n" |
| , method_name, __LINE__ |
| , msg->u.request.u.nameserver_stop.nid |
| , msg->u.request.u.nameserver_stop.pid |
| , msg->u.request.u.nameserver_stop.node_name |
| ); |
| } |
| |
| CExternalReq::reqQueueMsg_t msgType; |
| msgType = CExternalReq::NonStartupMsg; |
| int nid = msg->u.request.u.nameserver_stop.nid; |
| int pid = msg->u.request.u.nameserver_stop.pid; |
| // Place new request on request queue |
| ReqQueue.enqueueReq(msgType, nid, pid, sockFd, msg); |
| |
| TRACE_EXIT; |
| } |
| |
| void CCommAcceptMon::monReqNodeDown( struct message_def* msg, int sockFd ) |
| { |
| const char method_name[] = "CCommAcceptMon::monReqNodeDown"; |
| TRACE_ENTRY; |
| |
| if ( trace_settings & ( TRACE_NS | TRACE_REQUEST) ) |
| { |
| trace_printf( "%s@%d - Received monitor node-down request.\n" |
| " msg.down.nid=%d\n" |
| " msg.down.node_name=%s\n" |
| " msg.down.takeover=%d\n" |
| " msg.down.reason=%s\n" |
| , method_name, __LINE__ |
| , msg->u.request.u.down.nid |
| , msg->u.request.u.down.node_name |
| , msg->u.request.u.down.takeover |
| , msg->u.request.u.down.reason |
| ); |
| } |
| |
| CExternalReq::reqQueueMsg_t msgType; |
| msgType = CExternalReq::NonStartupMsg; |
| int nid = msg->u.request.u.down.nid; |
| int pid = -1; |
| // Place new request on request queue |
| ReqQueue.enqueueReq(msgType, nid, pid, sockFd, msg); |
| |
| TRACE_EXIT; |
| } |
| |
| void CCommAcceptMon::monReqProcessInfo( struct message_def* msg, int sockFd ) |
| { |
| const char method_name[] = "CCommAcceptMon::monReqProcessInfo"; |
| TRACE_ENTRY; |
| |
| if ( trace_settings & ( TRACE_NS | TRACE_REQUEST) ) |
| { |
| trace_printf( "%s@%d - Received monitor request process-info data.\n" |
| " msg.info.nid=%d\n" |
| " msg.info.pid=%d\n" |
| " msg.info.verifier=%d\n" |
| " msg.info.target_nid=%d\n" |
| " msg.info.target_pid=%d\n" |
| " msg.info.target_verifier=%d\n" |
| " msg.info.target_process_name=%s\n" |
| " msg.info.target_process_pattern=%s\n" |
| " msg.info.type=%d\n" |
| , method_name, __LINE__ |
| , msg->u.request.u.process_info.nid |
| , msg->u.request.u.process_info.pid |
| , msg->u.request.u.process_info.verifier |
| , msg->u.request.u.process_info.target_nid |
| , msg->u.request.u.process_info.target_pid |
| , msg->u.request.u.process_info.target_verifier |
| , msg->u.request.u.process_info.target_process_name |
| , msg->u.request.u.process_info.target_process_pattern |
| , msg->u.request.u.process_info.type |
| ); |
| } |
| |
| CExternalReq::reqQueueMsg_t msgType; |
| msgType = CExternalReq::NonStartupMsg; |
| int nid = msg->u.request.u.process_info.nid; |
| int pid = msg->u.request.u.process_info.pid; |
| // Place new request on request queue |
| ReqQueue.enqueueReq(msgType, nid, pid, sockFd, msg); |
| |
| TRACE_EXIT; |
| } |
| |
| void CCommAcceptMon::monReqProcessInfoCont( struct message_def* msg, int sockFd ) |
| { |
| const char method_name[] = "CCommAcceptMon::monReqProcessInfoCont"; |
| TRACE_ENTRY; |
| |
| if ( trace_settings & ( TRACE_NS | TRACE_REQUEST) ) |
| { |
| trace_printf( "%s@%d - Received monitor request process-info-cont data.\n" |
| " msg.info_cont.nid=%d\n" |
| " msg.info_cont.pid=%d\n" |
| " msg.info_cont.context[0].nid=%d\n" |
| " msg.info_cont.context[0].pid=%d\n" |
| " msg.info_cont.context[1].nid=%d\n" |
| " msg.info_cont.context[1].pid=%d\n" |
| " msg.info_cont.context[2].nid=%d\n" |
| " msg.info_cont.context[2].pid=%d\n" |
| " msg.info_cont.context[3].nid=%d\n" |
| " msg.info_cont.context[3].pid=%d\n" |
| " msg.info_cont.context[4].nid=%d\n" |
| " msg.info_cont.context[5].pid=%d\n" |
| " msg.info_cont.type=%d\n" |
| " msg.info_cont.allNodes=%d\n" |
| , method_name, __LINE__ |
| , msg->u.request.u.process_info_cont.nid |
| , msg->u.request.u.process_info_cont.pid |
| , msg->u.request.u.process_info_cont.context[0].nid |
| , msg->u.request.u.process_info_cont.context[0].pid |
| , msg->u.request.u.process_info_cont.context[1].nid |
| , msg->u.request.u.process_info_cont.context[1].pid |
| , msg->u.request.u.process_info_cont.context[2].nid |
| , msg->u.request.u.process_info_cont.context[2].pid |
| , msg->u.request.u.process_info_cont.context[3].nid |
| , msg->u.request.u.process_info_cont.context[3].pid |
| , msg->u.request.u.process_info_cont.context[4].nid |
| , msg->u.request.u.process_info_cont.context[4].pid |
| , msg->u.request.u.process_info_cont.type |
| , msg->u.request.u.process_info_cont.allNodes |
| ); |
| } |
| |
| CExternalReq::reqQueueMsg_t msgType; |
| msgType = CExternalReq::NonStartupMsg; |
| int nid = msg->u.request.u.process_info_cont.nid; |
| int pid = msg->u.request.u.process_info_cont.pid; |
| // Place new request on request queue |
| ReqQueue.enqueueReq(msgType, nid, pid, sockFd, msg); |
| |
| TRACE_EXIT; |
| } |
| |
| void CCommAcceptMon::monReqProcessInfoNs( struct message_def* msg, int sockFd ) |
| { |
| const char method_name[] = "CCommAcceptMon::monReqProcessInfoNs"; |
| TRACE_ENTRY; |
| |
| if ( trace_settings & ( TRACE_NS | TRACE_REQUEST) ) |
| { |
| trace_printf( "%s@%d - Received monitor request process-info-ns data.\n" |
| " msg.info.nid=%d\n" |
| " msg.info.pid=%d\n" |
| " msg.info.verifier=%d\n" |
| " msg.info.target_nid=%d\n" |
| " msg.info.target_pid=%d\n" |
| " msg.info.target_verifier=%d\n" |
| " msg.info.target_process_name=%s\n" |
| " msg.info.target_process_pattern=%s\n" |
| " msg.info.type=%d\n" |
| , method_name, __LINE__ |
| , msg->u.request.u.process_info.nid |
| , msg->u.request.u.process_info.pid |
| , msg->u.request.u.process_info.verifier |
| , msg->u.request.u.process_info.target_nid |
| , msg->u.request.u.process_info.target_pid |
| , msg->u.request.u.process_info.target_verifier |
| , msg->u.request.u.process_info.target_process_name |
| , msg->u.request.u.process_info.target_process_pattern |
| , msg->u.request.u.process_info.type |
| ); |
| } |
| |
| CExternalReq::reqQueueMsg_t msgType; |
| msgType = CExternalReq::NonStartupMsg; |
| int nid = msg->u.request.u.process_info.nid; |
| int pid = msg->u.request.u.process_info.pid; |
| // Place new request on request queue |
| ReqQueue.enqueueReq(msgType, nid, pid, sockFd, msg); |
| |
| TRACE_EXIT; |
| } |
| |
| void CCommAcceptMon::monReqNewProcess( struct message_def* msg, int sockFd ) |
| { |
| const char method_name[] = "CCommAcceptMon::monReqNewProcess"; |
| TRACE_ENTRY; |
| if ( trace_settings & ( TRACE_NS | TRACE_REQUEST) ) |
| { |
| trace_printf( "%s@%d - Received monitor request new-process data.\n" |
| " msg.new_process_ns.nid=%d\n" |
| " msg.new_process_ns.pid=%d\n" |
| " msg.new_process_ns.verifier=%d\n" |
| " msg.new_process_ns.process_name=%s\n" |
| " msg.new_process_ns.type=%d\n" |
| " msg.new_process_ns.parent_nid=%d\n" |
| " msg.new_process_ns.parent_pid=%d\n" |
| " msg.new_process_ns.parent_verifier=%d\n" |
| " msg.new_process_ns.pair_parent_nid=%d\n" |
| " msg.new_process_ns.pair_parent_pid=%d\n" |
| " msg.new_process_ns.pair_parent_verifier=%d\n" |
| " msg.new_process_ns.priority=%d\n" |
| " msg.new_process_ns.backup=%d\n" |
| " msg.new_process_ns.unhooked=%d\n" |
| " msg.new_process_ns.event_messages=%d\n" |
| " msg.new_process_ns.system_messages=%d\n" |
| " msg.new_process_ns.path=%s\n" |
| " msg.new_process_ns.ldpath=%s\n" |
| " msg.new_process_ns.program=%s\n" |
| " msg.new_process_ns.port=%s\n" |
| " msg.new_process_ns.infile=%s\n" |
| " msg.new_process_ns.outfile=%s\n" |
| " msg.new_process_ns.creation_time=%ld(secs):%ld(nsecs)\n" |
| , method_name, __LINE__ |
| , msg->u.request.u.new_process_ns.nid |
| , msg->u.request.u.new_process_ns.pid |
| , msg->u.request.u.new_process_ns.verifier |
| , msg->u.request.u.new_process_ns.process_name |
| , msg->u.request.u.new_process_ns.type |
| , msg->u.request.u.new_process_ns.parent_nid |
| , msg->u.request.u.new_process_ns.parent_pid |
| , msg->u.request.u.new_process_ns.parent_verifier |
| , msg->u.request.u.new_process_ns.pair_parent_nid |
| , msg->u.request.u.new_process_ns.pair_parent_pid |
| , msg->u.request.u.new_process_ns.pair_parent_verifier |
| , msg->u.request.u.new_process_ns.priority |
| , msg->u.request.u.new_process_ns.backup |
| , msg->u.request.u.new_process_ns.unhooked |
| , msg->u.request.u.new_process_ns.event_messages |
| , msg->u.request.u.new_process_ns.system_messages |
| , msg->u.request.u.new_process_ns.path |
| , msg->u.request.u.new_process_ns.ldpath |
| , msg->u.request.u.new_process_ns.program |
| , msg->u.request.u.new_process_ns.port_name |
| , msg->u.request.u.new_process_ns.infile |
| , msg->u.request.u.new_process_ns.outfile |
| , msg->u.request.u.new_process_ns.creation_time.tv_sec |
| , msg->u.request.u.new_process_ns.creation_time.tv_nsec |
| ); |
| trace_printf("%s@%d - msg.new_process_ns.argc=%d\n" |
| , method_name, __LINE__ |
| , msg->u.request.u.new_process_ns.argc ); |
| for (int i=0; i < msg->u.request.u.new_process_ns.argc; i++) |
| { |
| trace_printf("%s@%d - msg.new_process_ns.argv[%d]=%s\n" |
| , method_name, __LINE__ |
| , i, msg->u.request.u.new_process_ns.argv[i]); |
| } |
| } |
| |
| CExternalReq::reqQueueMsg_t msgType; |
| msgType = CExternalReq::NonStartupMsg; |
| int nid = msg->u.request.u.new_process_ns.nid; |
| int pid = msg->u.request.u.new_process_ns.pid; |
| // Place new request on request queue |
| ReqQueue.enqueueReq(msgType, nid, pid, sockFd, msg); |
| |
| TRACE_EXIT; |
| } |
| |
| void CCommAcceptMon::monReqShutdown( struct message_def* msg, int sockFd ) |
| { |
| const char method_name[] = "CCommAcceptMon::monReqShutdown"; |
| TRACE_ENTRY; |
| |
| if ( trace_settings & ( TRACE_NS | TRACE_REQUEST) ) |
| { |
| trace_printf( "%s@%d - Received monitor request shutdown data.\n" |
| " msg.shutdown.nid=%d\n" |
| " msg.shutdown.pid=%d\n" |
| " msg.shutdown.level=%d\n" |
| , method_name, __LINE__ |
| , msg->u.request.u.shutdown_ns.nid |
| , msg->u.request.u.shutdown_ns.pid |
| , msg->u.request.u.shutdown_ns.level |
| ); |
| } |
| |
| CExternalReq::reqQueueMsg_t msgType; |
| msgType = CExternalReq::NonStartupMsg; |
| int nid = msg->u.request.u.shutdown_ns.nid; |
| int pid = msg->u.request.u.shutdown_ns.pid; |
| // Place new request on request queue |
| ReqQueue.enqueueReq(msgType, nid, pid, sockFd, msg); |
| |
| TRACE_EXIT; |
| } |
| |
| void CCommAcceptMon::monReqUnknown( struct message_def* msg, int sockFd ) |
| { |
| const char method_name[] = "CCommAcceptMon::monReqUnknown"; |
| TRACE_ENTRY; |
| if ( trace_settings & ( TRACE_NS | TRACE_REQUEST) ) |
| { |
| trace_printf( "%s@%d - Received monitor request UNKNOWN data.\n" |
| , method_name, __LINE__ |
| ); |
| } |
| |
| CExternalReq::reqQueueMsg_t msgType; |
| msgType = CExternalReq::NonStartupMsg; |
| // Place new request on request queue |
| ReqQueue.enqueueReq(msgType, -1, -1, sockFd, msg); |
| |
| TRACE_EXIT; |
| } |
| |
| void CCommAcceptMon::processMonReqs( int sockFd ) |
| { |
| const char method_name[] = "CCommAcceptMon::processMonReqs"; |
| TRACE_ENTRY; |
| |
| int rc; |
| nodeId_t nodeId; |
| struct message_def msg; |
| |
| static int sv_io_wait_timeout = EPOLL_IO_WAIT_TIMEOUT_MSEC; |
| static int sv_io_retry_count = EPOLL_IO_RETRY_COUNT; |
| |
| if ( trace_settings & ( TRACE_NS ) ) |
| { |
| trace_printf( "%s@%d - Accepted connection sock=%d\n" |
| , method_name, __LINE__, sockFd ); |
| } |
| |
| // Get info about connecting monitor |
| rc = CComm::Receive( sockFd |
| , (char *) &nodeId |
| , sizeof(nodeId_t) |
| , (char *) "Remote monitor" |
| , method_name ); |
| if ( rc ) |
| { // Handle error |
| close( sockFd ); |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), "[%s], unable to obtain node id from new " |
| "monitor: %s.\n", method_name, ErrorMsg(rc)); |
| mon_log_write(NS_COMMACCEPT_PROCESSMONREQS_1, SQ_LOG_ERR, buf); |
| return; |
| } |
| |
| if ( trace_settings & ( TRACE_NS ) ) |
| { |
| trace_printf( "%s@%d - Accepted connection from pnid=%d\n" |
| " nodeId.nodeName=%s\n" |
| " nodeId.commPort=%s\n" |
| " nodeId.syncPort=%s\n" |
| " nodeId.creatorPNid=%d\n" |
| " nodeId.creator=%d\n" |
| " nodeId.creatorShellPid=%d\n" |
| " nodeId.creatorShellVerifier=%d\n" |
| " nodeId.ping=%d\n" |
| , method_name, __LINE__ |
| , nodeId.pnid |
| , nodeId.nodeName |
| , nodeId.commPort |
| , nodeId.syncPort |
| , nodeId.creatorPNid |
| , nodeId.creator |
| , nodeId.creatorShellPid |
| , nodeId.creatorShellVerifier |
| , nodeId.ping ); |
| } |
| |
| CNode *node; |
| node = Nodes->GetNode( nodeId.pnid ); |
| if ( node != NULL ) |
| { |
| if ( node->GetState() != State_Up ) |
| { |
| if ( trace_settings & ( TRACE_NS ) ) |
| { |
| trace_printf( "%s@%d - Bringing node up, node=%s, pnid=%d\n" |
| , method_name, __LINE__ |
| , node->GetName(), node->GetPNid() ); |
| } |
| rc = Monitor->HardNodeUpNs( node->GetPNid() ); |
| if ( rc ) |
| { // Handle error |
| close( sockFd ); |
| return; |
| } |
| } |
| } |
| else |
| { // Handle error |
| close( sockFd ); |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), "[%s], invalid physical node id, " |
| "pnid: %d\n", method_name, nodeId.pnid ); |
| mon_log_write(NS_COMMACCEPT_PROCESSMONREQS_2, SQ_LOG_ERR, buf); |
| return; |
| } |
| |
| strcpy(nodeId.nodeName, MyNode->GetName()); |
| strcpy(nodeId.commPort, MyNode->GetCommPort()); |
| strcpy(nodeId.syncPort, MyNode->GetSyncPort()); |
| nodeId.pnid = MyNode->GetPNid(); |
| nodeId.creatorPNid = -1; |
| nodeId.creatorShellPid = -1; |
| nodeId.creatorShellVerifier = -1; |
| nodeId.creator = false; |
| nodeId.ping = false; |
| nodeId.nsPid = getpid(); |
| int pnidNs = processMonReqsGetBestNs(); |
| nodeId.nsPNid = pnidNs; |
| |
| if ( trace_settings & ( TRACE_NS ) ) |
| { |
| trace_printf( "%s@%d - Sending nodeId back\n" |
| " nodeId.nodeName=%s\n" |
| " nodeId.commPort=%s\n" |
| " nodeId.syncPort=%s\n" |
| " nodeId.pnid=%d\n" |
| " nodeId.nsPid=%d\n" |
| " nodeId.nsPNid=%d\n" |
| , method_name, __LINE__ |
| , nodeId.nodeName |
| , nodeId.commPort |
| , nodeId.syncPort |
| , nodeId.pnid |
| , nodeId.nsPid |
| , nodeId.nsPNid ); |
| } |
| |
| // return Send info to connecting monitor |
| rc = CComm::SendWait( sockFd |
| , (char *) &nodeId |
| , sizeof(nodeId_t) |
| , sv_io_wait_timeout |
| , sv_io_retry_count |
| , (char *) node->GetName() |
| , method_name ); |
| if ( rc ) |
| { // Handle error |
| close( sockFd ); |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), "[%s], unable to send node id from new " |
| "monitor: %s.\n", method_name, ErrorMsg(rc)); |
| mon_log_write(NS_COMMACCEPT_PROCESSMONREQS_3, SQ_LOG_ERR, buf); |
| return; |
| } |
| |
| while ( true ) |
| { |
| // Get monitor request (hdr) |
| int size; |
| rc = CComm::Receive( sockFd |
| , (char *) &size |
| , sizeof(size) |
| , (char *) node->GetName() |
| , method_name ); |
| if ( rc ) |
| { // Handle error |
| close( sockFd ); |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), "[%s], unable to obtain message size from " |
| "monitor: %s.\n", method_name, ErrorMsg(rc)); |
| mon_log_write(NS_COMMACCEPT_PROCESSMONREQS_4, SQ_LOG_ERR, buf); |
| return; |
| } |
| |
| rc = CComm::Receive( sockFd |
| , (char *) &msg |
| , size |
| , (char *) node->GetName() |
| , method_name ); |
| if ( rc ) |
| { // Handle error |
| close( sockFd ); |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), "[%s], unable to obtain message from " |
| "monitor: %s.\n", method_name, ErrorMsg(rc)); |
| mon_log_write(NS_COMMACCEPT_PROCESSMONREQS_5, SQ_LOG_ERR, buf); |
| return; |
| } |
| if ( trace_settings & ( TRACE_NS ) ) |
| { |
| const char *mtype = "?"; |
| if ( msg.type == MsgType_Service ) |
| mtype = "Service"; |
| const char *rtype; |
| switch (msg.u.request.type) |
| { |
| case ReqType_DelProcessNs: |
| rtype = "DelProcessNs"; |
| break; |
| case ReqType_ProcessInfo: |
| rtype = "ProcessInfo"; |
| break; |
| case ReqType_ProcessInfoCont: |
| rtype = "ProcessInfoCont"; |
| break; |
| case ReqType_ProcessInfoNs: |
| rtype = "ProcessInfoNs"; |
| break; |
| case ReqType_NameServerStop: |
| rtype = "NameServerStop"; |
| break; |
| case ReqType_NewProcessNs: |
| rtype = "NewProcessNs"; |
| break; |
| case ReqType_ShutdownNs: |
| rtype = "ShutdownNs"; |
| break; |
| default: |
| rtype = "?"; |
| } |
| |
| trace_printf( "%s@%d - Received monitor request, sock=%d.\n" |
| " msg.type=%d(%s)\n" |
| " msg.noreply=%d\n" |
| " msg.reply_tag=%d\n" |
| " msg.u.request.type=%d(%s)\n" |
| , method_name, __LINE__ |
| , sockFd |
| , msg.type |
| , mtype |
| , msg.noreply |
| , msg.reply_tag |
| , msg.u.request.type |
| , rtype |
| ); |
| } |
| switch (msg.u.request.type) |
| { |
| case ReqType_DelProcessNs: |
| monReqDeleteProcess(&msg, sockFd); |
| break; |
| |
| case ReqType_NameServerStop: |
| monReqNameServerStop(&msg, sockFd); |
| break; |
| |
| case ReqType_NodeDown: |
| monReqNodeDown(&msg, sockFd); |
| break; |
| |
| case ReqType_ProcessInfo: |
| monReqProcessInfo(&msg, sockFd); |
| break; |
| |
| case ReqType_ProcessInfoCont: |
| monReqProcessInfoCont(&msg, sockFd); |
| break; |
| |
| case ReqType_ProcessInfoNs: |
| monReqProcessInfoNs(&msg, sockFd); |
| break; |
| |
| case ReqType_NewProcessNs: |
| monReqNewProcess(&msg, sockFd); |
| break; |
| |
| case ReqType_ShutdownNs: |
| monReqShutdown(&msg, sockFd); |
| break; |
| |
| default: |
| monReqUnknown(&msg, sockFd); |
| break; |
| } |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| int CCommAcceptMon::processMonReqsGetBestNs( void ) |
| { |
| const char method_name[] = "CCommAcceptMon::processMonReqsGetBestNs"; |
| TRACE_ENTRY; |
| int pnid; |
| |
| int myCount = MyNode->GetMonConnCount(); |
| int minCount = Monitor->GetMinMonConnCount(); |
| if ( myCount <= (minCount + HEURISTIC_COUNT) ) |
| { |
| pnid = -1; |
| } |
| else |
| { |
| pnid = Monitor->GetMinMonConnPnid(); |
| } |
| if ( trace_settings & ( TRACE_NS ) ) |
| { |
| trace_printf("%s@%d - myCount=%d, minCount=%d, pnid=%d\n" |
| ,method_name, __LINE__, myCount, myCount, pnid); |
| } |
| |
| TRACE_EXIT; |
| return pnid; |
| } |
| |
| void CCommAcceptMon::processNewSock( int joinFd ) |
| { |
| const char method_name[] = "CCommAcceptMon::processNewSock"; |
| TRACE_ENTRY; |
| |
| int rc; |
| |
| mem_log_write(CMonLog::MON_NSCONNTONEWMON_1); |
| |
| // need to create context in case back-to-back accept is too fast |
| Context *ctx = new Context(); |
| ctx->this_ = this; |
| ctx->pendingFd_ = joinFd; |
| rc = pthread_create(&process_thread_id_, NULL, mon2nsProcess, ctx); |
| if (rc != 0) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), "[%s], mon2nsProcess thread create error=%d\n", |
| method_name, rc); |
| mon_log_write(NS_COMMACCEPT_PROCESSNEWSOCK_1, SQ_LOG_ERR, buf); |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| void CCommAcceptMon::commAcceptor() |
| { |
| const char method_name[] = "CCommAcceptMon::commAcceptor"; |
| TRACE_ENTRY; |
| |
| switch( CommType ) |
| { |
| case CommType_Sockets: |
| commAcceptorSock(); |
| break; |
| default: |
| // Programmer bonehead! |
| abort(); |
| } |
| |
| TRACE_EXIT; |
| pthread_exit(0); |
| } |
| |
| // commAcceptor thread main processing loop. Keep an accept |
| // request outstanding. After accepting a connection process it. |
| void CCommAcceptMon::commAcceptorSock() |
| { |
| const char method_name[] = "CCommAcceptMon::commAcceptorSock"; |
| TRACE_ENTRY; |
| |
| int joinFd = -1; |
| |
| if ( trace_settings & ( TRACE_NS ) ) |
| { |
| trace_printf("%s@%d thread %lx starting\n", method_name, |
| __LINE__, thread_id_); |
| } |
| |
| while ( true ) |
| { |
| if ( isAccepting() ) |
| { |
| if ( trace_settings & ( TRACE_NS ) ) |
| { |
| trace_printf("%s@%d - Posting accept\n", method_name, __LINE__); |
| } |
| |
| mem_log_write(CMonLog::MON_NSCONNTONEWMON_2); |
| joinFd = CComm::Accept( mon2nsSock_ ); |
| } |
| else |
| { |
| if ( trace_settings & ( TRACE_NS) ) |
| { |
| trace_printf("%s@%d - Waiting to post accept\n", method_name, __LINE__); |
| } |
| |
| CLock::lock(); |
| CLock::wait(); |
| CLock::unlock(); |
| if ( !shutdown_ ) |
| { |
| continue; // Ok to accept another connection |
| } |
| } |
| |
| if ( shutdown_ ) |
| { // We are being notified to exit. |
| break; |
| } |
| |
| if ( joinFd < 0 ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), "[%s], cannot accept new monitor: %s.\n", |
| method_name, strerror(errno)); |
| mon_log_write(NS_COMMACCEPT_COMMACCEPTORSOCK_1, SQ_LOG_ERR, buf); |
| |
| } |
| else |
| { |
| processNewSock( joinFd ); |
| } |
| } |
| |
| if ( !(joinFd < 0) ) close( joinFd ); |
| |
| if ( trace_settings & ( TRACE_NS ) ) |
| trace_printf("%s@%d thread %lx exiting\n", method_name, |
| __LINE__, pthread_self()); |
| |
| TRACE_EXIT; |
| } |
| |
| void CCommAcceptMon::connectToCommSelf( void ) |
| { |
| const char method_name[] = "CCommAcceptMon::connectToCommSelf"; |
| TRACE_ENTRY; |
| |
| CComm::ConnectLocal( getMon2NsSocketPort() ); |
| |
| TRACE_EXIT; |
| } |
| |
| void CCommAcceptMon::shutdownWork(void) |
| { |
| const char method_name[] = "CCommAcceptMon::shutdownWork"; |
| TRACE_ENTRY; |
| |
| // Set flag that tells the commAcceptor thread to exit |
| shutdown_ = true; |
| connectToCommSelf(); |
| CLock::wakeOne(); |
| |
| if ( trace_settings & ( TRACE_NS ) ) |
| trace_printf("%s@%d waiting for mon2nsAcceptMon thread %lx to exit.\n", |
| method_name, __LINE__, thread_id_); |
| |
| // Wait for commAcceptor thread to exit |
| pthread_join(thread_id_, NULL); |
| |
| TRACE_EXIT; |
| } |
| |
| // Initialize mon2nsAcceptor thread |
| static void *mon2nsAcceptMon(void *arg) |
| { |
| const char method_name[] = "mon2nsAcceptMon"; |
| TRACE_ENTRY; |
| |
| // Parameter passed to the thread is an instance of the CommAcceptMon object |
| CCommAcceptMon *cao = (CCommAcceptMon *) arg; |
| |
| // Mask all allowed signals |
| sigset_t mask; |
| sigfillset(&mask); |
| sigdelset(&mask, SIGPROF); // allows profiling such as google profiler |
| int rc = pthread_sigmask(SIG_SETMASK, &mask, NULL); |
| if (rc != 0) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), "[%s], pthread_sigmask error=%d\n", |
| method_name, rc); |
| mon_log_write(NS_COMMACCEPT_MON2NSACCEPTMON_1, SQ_LOG_ERR, buf); |
| } |
| |
| // Enter thread processing loop |
| cao->commAcceptor(); |
| |
| TRACE_EXIT; |
| return NULL; |
| } |
| |
| // Initialize mon2nsProcess thread |
| static void *mon2nsProcess(void *arg) |
| { |
| const char method_name[] = "mon2nsProcess"; |
| TRACE_ENTRY; |
| |
| // Parameter passed to the thread is an context |
| CCommAcceptMon::Context *ctx = (CCommAcceptMon::Context *) arg; |
| CCommAcceptMon *cao = ctx->this_; |
| |
| // Mask all allowed signals |
| sigset_t mask; |
| sigfillset(&mask); |
| sigdelset(&mask, SIGPROF); // allows profiling such as google profiler |
| int rc = pthread_sigmask(SIG_SETMASK, &mask, NULL); |
| if (rc != 0) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), "[%s], pthread_sigmask error=%d\n", |
| method_name, rc); |
| mon_log_write(NS_COMMACCEPT_MON2NSPROCESS_1, SQ_LOG_ERR, buf); |
| } |
| |
| MyNode->AddMonConnCount(1); |
| |
| // Enter thread processing loop |
| cao->processMonReqs(ctx->pendingFd_); |
| delete ctx; |
| |
| MyNode->AddMonConnCount(-1); |
| |
| TRACE_EXIT; |
| return NULL; |
| } |
| |
| |
| // Create a commAcceptorMon thread |
| void CCommAcceptMon::start() |
| { |
| const char method_name[] = "CCommAcceptMon::start"; |
| TRACE_ENTRY; |
| |
| int rc = pthread_create(&thread_id_, NULL, mon2nsAcceptMon, this); |
| if (rc != 0) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), "[%s], thread create error=%d\n", |
| method_name, rc); |
| mon_log_write(NS_COMMACCEPT_START_1, SQ_LOG_ERR, buf); |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| void CCommAcceptMon::startAccepting( void ) |
| { |
| const char method_name[] = "CCommAcceptMon::startAccepting"; |
| TRACE_ENTRY; |
| |
| CAutoLock lock( getLocker( ) ); |
| |
| if ( !accepting_ ) |
| { |
| accepting_ = true; |
| if ( trace_settings & ( TRACE_NS ) ) |
| { |
| trace_printf( "%s@%d - Enabling accepting_=%d\n" |
| , method_name, __LINE__, accepting_ ); |
| } |
| CLock::wakeOne(); |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| void CCommAcceptMon::stopAccepting( void ) |
| { |
| const char method_name[] = "CCommAcceptMon::stopAccepting"; |
| TRACE_ENTRY; |
| |
| CAutoLock lock( getLocker( ) ); |
| |
| if ( accepting_ ) |
| { |
| accepting_ = false; |
| if ( trace_settings & ( TRACE_NS ) ) |
| { |
| trace_printf( "%s@%d - Disabling accepting_=%d\n" |
| , method_name, __LINE__, accepting_ ); |
| } |
| CLock::wakeOne(); |
| } |
| |
| TRACE_EXIT; |
| } |