blob: 8d055d8d81c115e5a7059c133e1bdc2af70a40c1 [file] [log] [blame]
///////////////////////////////////////////////////////////////////////////////
//
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
//
///////////////////////////////////////////////////////////////////////////////
#include <iostream>
#include <map>
using namespace std;
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
#include <malloc.h>
#include <sys/ipc.h>
#include <sys/time.h>
#include <sys/resource.h> //add for getrlimit, strange centos6/gcc4.4 don't need this
//#include <sys/stat.h>
#include "monlogging.h"
#include "monprof.h"
#include "monsonar.h"
#include "montrace.h"
#include "redirector.h"
#include "healthcheck.h"
#include "config.h"
#include "device.h"
#include "clusterconf.h"
#include "lnode.h"
#include "pnode.h"
#include "tmsync.h"
#include "cluster.h"
#include "monitor.h"
#ifdef DMALLOC
#include "dm.h"
#endif
#include "replicate.h"
#include "robsem.h"
#include "commaccept.h"
#include <assert.h>
#include <signal.h>
#include <sys/ptrace.h>
#include <sys/time.h>
#include <sys/wait.h>
#include <sched.h>
#include "localio.h"
#include "lock.h"
#include "mlio.h"
#include "redirector.h"
#include "intprocess.h"
#include "reqqueue.h"
#include "reqworker.h"
#include "zclient.h"
#include "SCMVersHelp.h"
#define LOG_ERROR
#define RidRecvPrioritySlot 4096
#define MinimumSlotToPrioritize 4096
// Global Variables
struct rlimit Rl;
bool PidMap=false;
bool usingCpuAffinity=false;
bool usingTseCpuAffinity=false;
bool genSnmpTrapEnabled = false;
int Measure=0;
long trace_level = 0;
char MyPath[MAX_PROCESS_PATH];
char MyCommPort[MPI_MAX_PORT_NAME] = {'\0'};
char MyMPICommPort[MPI_MAX_PORT_NAME] = {'\0'};
char MySyncPort[MPI_MAX_PORT_NAME] = {'\0'};
char Node_name[MPI_MAX_PROCESSOR_NAME] = {'\0'};
sigset_t SigSet;
bool Emulate_Down = false;
long next_test_delay = 10000; // in usec.
bool IAmIntegrating = false;
bool IAmIntegrated = false;
char IntegratingMonitorPort[MPI_MAX_PORT_NAME] = {'\0'};
bool IsRealCluster = true;
CommType_t CommType = CommType_Undefined;
bool SMSIntegrating = false;
int CreatorShellPid = -1;
Verifier_t CreatorShellVerifier = -1;
bool SpareNodeColdStandby = true;
bool ZClientEnabled = true;
// Lock to manage memory modifications during fork/exec
CLock MemModLock;
CMonitor *Monitor = NULL;
CNodeContainer *Nodes = NULL;
CConfigContainer *Config = NULL;
CDeviceContainer *Devices = NULL;
int MyPNID = -1;
CNode *MyNode;
CMonLog *MonLog = NULL;
CMonLog *SnmpLog = NULL;
CMonStats * MonStats = NULL;
extern CMonTrace *MonTrace;
CRedirector Redirector;
CIntProcess IntProcess;
CReqQueue ReqQueue;
CHealthCheck HealthCheck;
CCommAccept CommAccept;
extern CReplicate Replicator;
CZClient *ZClient = NULL;
// Seabed disconnect semaphore
RobSem * sbDiscSem = NULL;
DEFINE_EXTERN_COMP_DOVERS(monitor)
DEFINE_EXTERN_COMP_GETVERS2(monitor)
_TM_Txid_External invalid_trans( void )
{
_TM_Txid_External trans1;
trans1.txid[0] = -1LL;
trans1.txid[1] = -1LL;
trans1.txid[2] = -1LL;
trans1.txid[3] = -1LL;
return trans1;
}
_TM_Txid_External null_trans( void )
{
_TM_Txid_External trans1;
trans1.txid[0] = 0LL;
trans1.txid[1] = 0LL;
trans1.txid[2] = 0LL;
trans1.txid[3] = 0LL;
return trans1;
}
bool isEqual( _TM_Txid_External trans1, _TM_Txid_External trans2 )
{
return (memcmp(&trans1,&trans2,sizeof(_TM_Txid_External)) == 0);
}
bool isNull( _TM_Txid_External transid )
{
_TM_Txid_External trans_null = null_trans();
return isEqual(transid,trans_null);
}
bool isInvalid( _TM_Txid_External transid )
{
_TM_Txid_External trans_invalid = invalid_trans();
return isEqual(transid,trans_invalid);
}
char *ErrorMsg (int error_code)
{
int rc;
int length;
static char buffer[MPI_MAX_ERROR_STRING];
rc = MPI_Error_string (error_code, buffer, &length);
if (rc != MPI_SUCCESS)
{
snprintf(buffer, sizeof(buffer),
"MPI_Error_string: Invalid error code (%d)\n", error_code);
length = strlen(buffer);
}
buffer[length] = '\0';
return buffer;
}
void child_death_signal_handler2 (int signal, siginfo_t *info, void *)
{
pid_t pid;
int saveerrno;
int status;
pid_t whichPid;
const char method_name[] = "child_death_signal_handler2";
if (trace_settings & TRACE_ENTRY_EXIT)
trace_nolock_printf("%s@%d\n", method_name, __LINE__);
saveerrno = errno; // waitpid/etc sets errno
if (trace_settings & TRACE_SIG_HANDLER)
trace_nolock_printf("%s@%d - signal=%d, code=%d, status=%d, pid=%d\n",
method_name, __LINE__, signal, info->si_code,
info->si_status, info->si_pid);
// Handle the process that triggered the signal handler as well
// as any other exited children.
whichPid = info->si_pid;
do
{
pid = waitpid (whichPid, &status, WNOHANG);
if (trace_settings & TRACE_SIG_HANDLER)
{
if ( pid != -1 )
{
trace_nolock_printf("%s@%d - waitpid(%d) returned %d\n",
method_name, __LINE__, whichPid, pid);
}
else
{
trace_nolock_printf("%s@%d - waitpid(%d) error, %s (%d)\n",
method_name, __LINE__, whichPid,
strerror(errno), errno);
}
}
if (pid > 0)
{
IntProcess.handle_signal(pid, status);
// Add this pid to a list to be examined outside the
// signal-handler (to avoid spending excessive time in
// signal handler). When that list is examined the work
// required to handle the terminated process will be done.
SQ_theLocalIOToClient->handleDeadPid(pid);
if (trace_settings & TRACE_SIG_HANDLER)
{
if ( WIFEXITED(status) )
{ // Process exited normally
trace_nolock_printf("%s@%d - process %d exited, exit"
" status=%d\n", method_name,__LINE__,
pid, WEXITSTATUS(status));
}
if ( WIFSIGNALED(status) )
{ // Process was terminated by a signal
trace_nolock_printf("%s@%d - process %d terminated by "
"signal #%d\n", method_name, __LINE__,
pid, WTERMSIG(status));
}
}
}
else if ( whichPid != -1 && pid == 0 )
{
// Process that triggered the signal handler has not yet changed
// state. Remember the "pid" so we can handle it later.
if (trace_settings & TRACE_SIG_HANDLER)
trace_nolock_printf("%s@%d - adding pid=%d to unreaped list\n",
method_name, __LINE__, pid);
SQ_theLocalIOToClient->handleAlmostDeadPid(whichPid);
pid = 1; // force at least one more waitpid to be done
}
whichPid = -1; // now do waitpid on any child process
}
while (pid > 0);
errno = saveerrno;
if (trace_settings & TRACE_ENTRY_EXIT)
trace_nolock_printf("%s@%d - Exit\n", method_name, __LINE__);
}
void monMallocStats()
{
// Log current malloc statistics in stderr
time_t mytime = time(NULL);
char *timestamp = ctime(&mytime);
timestamp[strlen(timestamp)-1] = '\0';
printf("monitor malloc statistics at %s:\n", timestamp);
malloc_stats();
}
const char *CommTypeString( CommType_t commType)
{
const char *str;
switch( commType )
{
case CommType_InfiniBand:
str = "InfiniBand";
break;
case CommType_Sockets:
str = "Sockets";
break;
default:
str = "Undefined";
break;
}
return( str );
}
CMonitor::CMonitor (int procTermSig)
: CTmSync_Container (),
OpenCount (0),
NoticeCount (0),
ProcessCount (0),
NumOutstandingIO (0),
NumOutstandingSends (0),
Last_error (MPI_SUCCESS),
processMapFd ( -1 ),
procTermSig_ ( procTermSig )
{
const char method_name[] = "CMonitor::CMonitor";
TRACE_ENTRY;
// Add eyecatcher sequence as a debugging aid
memcpy(&eyecatcher_, "MNTR", 4);
#ifdef USE_SEQUENCE_NUM
clock_gettime(CLOCK_REALTIME, &savedTime_);
#endif
TRACE_EXIT;
}
CMonitor::~CMonitor (void)
{
const char method_name[] = "CMonitor::~CMonitor";
TRACE_ENTRY;
// Alter eyecatcher sequence as a debugging aid to identify deleted object
memcpy(&eyecatcher_, "mntr", 4);
if ( processMapFd != -1)
{
close ( processMapFd );
}
TRACE_EXIT;
}
void CMonitor::IncOpenCount (void)
{
OpenCount++;
}
void CMonitor::IncNoticeCount (void)
{
NoticeCount++;
}
void CMonitor::IncProcessCount (void)
{
ProcessCount++;
}
void CMonitor::DecrOpenCount (void)
{
OpenCount--;
}
void CMonitor::DecrNoticeCount (void)
{
NoticeCount--;
}
void CMonitor::DecrProcessCount (void)
{
ProcessCount--;
}
void CMonitor::openProcessMap ( void )
{
char fname[MAX_PROCESS_PATH];
char *env;
env = getenv("SQ_PIDMAP");
if ( env && *env == '1' )
{
PidMap = true;
}
snprintf( fname, sizeof(fname), "%s/monitor.map.%d.%s",
getenv("MPI_TMPDIR"), MyPNID, Node_name );
remove(fname);
processMapFd = open(fname, O_WRONLY | O_APPEND | O_CREAT,
S_IRUSR | S_IWUSR );
if ( processMapFd == -1 )
{ // File open error
char buf[MON_STRING_BUF_SIZE];
snprintf(buf, sizeof(buf),
"[CMonitor::openProcessMap], Error opening %s, %s (%d).\n",
fname, strerror(errno), errno);
mon_log_write(MON_PROCESS_COMPLETEPSTARTUP_2, SQ_LOG_ERR, buf);
}
}
void CMonitor::writeProcessMapEntry ( const char * buf )
{
if ( processMapFd != -1 )
write( processMapFd, buf, strlen(buf));
}
void CMonitor::writeProcessMapBegin( const char *name
, int nid
, int pid
, int verifier
, int parentNid
, int parentPid
, int parentVerifier
, const char *program )
{
char buf[55+MAX_PROCESS_NAME+MAX_PROCESS_PATH];
time_t mytime = time(NULL);
char *timestamp = ctime(&mytime);
timestamp[strlen(timestamp)-1] = '\0';
snprintf( buf, sizeof(buf)
, "BEGIN %s %-8s (%d, %d:%d) P(%d, %d:%d) %s\n"
, timestamp, name, nid, pid, verifier
, parentNid, parentPid, parentVerifier, program);
writeProcessMapEntry ( buf );
}
void CMonitor::writeProcessMapEnd( const char *name
, int nid
, int pid
, int verifier
, int parentNid
, int parentPid
, int parentVerifier
, const char *program )
{
char buf[55+MAX_PROCESS_NAME+MAX_PROCESS_PATH];
time_t mytime = time(NULL);
char *timestamp = ctime(&mytime);
timestamp[strlen(timestamp)-1] = '\0';
snprintf( buf, sizeof(buf)
, "END %s %-8s (%d, %d:%d) P(%d, %d:%d) %s\n"
, timestamp, name, nid, pid, verifier
, parentNid, parentPid, parentVerifier, program);
writeProcessMapEntry ( buf );
}
bool CMonitor::CompleteProcessStartup (struct message_def * msg)
{
bool status = FAILURE;
CProcess *process;
CLNode *lnode;
const char method_name[] = "CMonitor::CompleteProcessStartup";
TRACE_ENTRY;
lnode = Nodes->GetLNode( msg->u.request.u.startup.nid );
if ( lnode )
{
process = lnode->
CompleteProcessStartup ( msg->u.request.u.startup.process_name,
msg->u.request.u.startup.port_name,
msg->u.request.u.startup.os_pid,
msg->u.request.u.startup.event_messages,
msg->u.request.u.startup.system_messages,
NULL );
}
else
{
char buf[MON_STRING_BUF_SIZE];
snprintf(buf, sizeof(buf), "[%s], Invalid Node ID=%d\n",
method_name, msg->u.request.u.startup.nid);
mon_log_write(MON_MONITOR_COMPLETEPSTARTUP_1, SQ_LOG_ERR, buf);
process = NULL;
}
if (process)
{
if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
{
trace_printf("%s@%d - Process %s started on port %s\n",
method_name, __LINE__,
msg->u.request.u.startup.process_name,
msg->u.request.u.startup.port_name);
}
CProcessContainer::ParentNewProcReply( process, MPI_SUCCESS);
status = SUCCESS;
}
else
{
char buf[MON_STRING_BUF_SIZE];
snprintf(buf, sizeof(buf), "[CMonitor::CompleteProcessStartup], Error= Can't find process: %s!\n", msg->u.request.u.startup.process_name);
mon_log_write(MON_MONITOR_COMPLETEPSTARTUP_2, SQ_LOG_ERR, buf);
msg->u.reply.type = ReplyType_Generic;
msg->u.reply.u.generic.nid = -1;
msg->u.reply.u.generic.pid = -1;
msg->u.reply.u.generic.verifier = -1;
msg->u.reply.u.generic.process_name[0] = '\0';
msg->u.reply.u.generic.return_code = MPI_ERR_NAME;
status = FAILURE;
}
TRACE_EXIT;
return status;
}
char * CMonitor::ProcCopy(char *bufPtr, CProcess *process)
{
const char method_name[] = "CMonitor::ProcCopy";
TRACE_ENTRY;
struct clone_def *procObj = (struct clone_def *)bufPtr;
procObj->nid = process->GetNid();
procObj->type = process->GetType();
procObj->priority = process->GetPriority();
procObj->backup = process->IsBackup();
procObj->unhooked = process->IsUnhooked();
procObj->pathStrId = process->pathStrId();
procObj->ldpathStrId = process->ldPathStrId();
procObj->programStrId = process->programStrId();
procObj->os_pid = process->GetPid();
procObj->verifier = process->GetVerifier();
procObj->prior_pid = process->GetPriorPid ();
procObj->parent_nid = process->GetParentNid();
procObj->parent_pid = process->GetParentPid();
procObj->parent_verifier = process->GetParentVerifier();
procObj->persistent_retries = process->GetPersistentRetries();
procObj->event_messages = process->IsEventMessages();
procObj->system_messages = process->IsSystemMessages();
procObj->argc = process->argc();
procObj->creation_time = process->GetCreationTime();
char * stringData = &procObj->stringData;
// Copy the program name
procObj->nameLen = strlen(process->GetName()) + 1;
memcpy(stringData, process->GetName(), procObj->nameLen );
stringData += procObj->nameLen;
// Copy the port
procObj->portLen = strlen(process->GetPort()) + 1;
memcpy(stringData, process->GetPort(), procObj->portLen );
stringData += procObj->portLen;
if (process->IsPersistent())
{
// Copy the standard in file name
procObj->infileLen = strlen(process->infile()) + 1;
memcpy(stringData, process->infile(), procObj->infileLen);
stringData += procObj->infileLen;
// Copy the standard out file name
procObj->outfileLen = strlen(process->outfile()) + 1;
memcpy(stringData, process->outfile(), procObj->outfileLen );
stringData += procObj->outfileLen;
// Copy the program argument strings
procObj->argvLen = process->userArgvLen();
memcpy(stringData, process->userArgv(), procObj->argvLen);
stringData += procObj->argvLen;
procObj->persistent = true;
}
else
{
procObj->infileLen = 0;
procObj->outfileLen = 0;
procObj->argvLen = 0;
procObj->persistent = false;
}
TRACE_EXIT;
return stringData;
}
int CMonitor::PackProcObjs( char *&buffer )
{
const char method_name[] = "CMonitor::PackProcObjs";
TRACE_ENTRY;
int nid = 0;
CLNode *lnode = NULL;
CProcess *process = NULL;
int procCount = 0;
char *bufPtr = buffer;
// first copy all primary and generic processes
for (nid = 0; nid < Nodes->NumberLNodes; nid++)
{
lnode = Nodes->GetLNode(nid);
process = lnode->GetFirstProcess();
while (process)
{
if (!process->IsBackup())
{
buffer = ProcCopy(buffer, process);
++procCount;
}
process = process->GetNext();
}
}
// copy all the backup processes
for (nid = 0; nid < Nodes->NumberLNodes; nid++)
{
lnode = Nodes->GetLNode(nid);
process = lnode->GetFirstProcess();
while (process)
{
if (process->IsBackup())
{
buffer = ProcCopy(buffer, process);
++procCount;
}
process = process->GetNext();
}
}
if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY))
trace_printf("%s@%d - Total Procs = %d, Total Size = %ld, Avg = %ld bytes\n",
method_name, __LINE__, procCount, buffer - bufPtr, (buffer - bufPtr)/procCount);
TRACE_EXIT;
return procCount;
}
void CMonitor::UnpackProcObjs( char *&buffer, int procCount )
{
const char method_name[] = "CMonitor::UnpackProcObjs";
TRACE_ENTRY;
CNode * node = NULL;
CProcess * process = NULL;
char *infile = NULL;
char *outfile = NULL;
char *userargv = NULL;
char *stringData = NULL;
struct clone_def *procObj;
int i;
for (i=0; i<procCount; i++)
{
procObj = (struct clone_def *)buffer;
stringData = &procObj->stringData;
node = Nodes->GetLNode (procObj->nid)->GetNode();
if (procObj->infileLen)
{
infile = &stringData[procObj->nameLen + procObj->portLen];
}
else
{
infile = NULL;
}
if (procObj->outfileLen)
{
outfile = &stringData[procObj->nameLen + procObj->portLen + procObj->infileLen];
}
else
{
outfile = NULL;
}
if (procObj->argvLen)
{
userargv = &stringData[procObj->nameLen + procObj->portLen
+ procObj->infileLen + procObj->outfileLen];
}
else
{
userargv = NULL;
}
process = node->CloneProcess (procObj->nid,
procObj->type,
procObj->priority,
procObj->backup,
procObj->unhooked,
&stringData[0], // process name
&stringData[procObj->nameLen], // port
procObj->os_pid,
procObj->verifier,
procObj->parent_nid,
procObj->parent_pid,
procObj->parent_verifier,
procObj->event_messages,
procObj->system_messages,
procObj->pathStrId,
procObj->ldpathStrId,
procObj->programStrId,
infile,
outfile,
&procObj->creation_time);
if ( process && procObj->argvLen )
{
process->userArgs ( procObj->argc, procObj->argvLen, userargv );
}
if ( process && procObj->persistent )
{
process->SetPersistent(true);
}
buffer = &stringData[procObj->nameLen + procObj->portLen + procObj->infileLen
+ procObj->outfileLen + procObj->argvLen];
}
TRACE_EXIT;
return;
}
void CMonitor::StartPrimitiveProcesses( void )
{
const char method_name[] = "CMonitor::StartPrimitiveProcesses";
TRACE_ENTRY;
if ( !MyNode->IsSpareNode() )
{
// Queue the Create primitive processes request for
// processing by a worker thread.
ReqQueue.enqueueCreatePrimitiveReq( MyPNID );
}
TRACE_EXIT;
}
void HandleMyNodeExpiration( void )
{
const char method_name[] = "HandleMyNodeExpiration";
TRACE_ENTRY;
ReqQueue.enqueueDownReq(MyPNID);
TRACE_EXIT;
}
void HandleNodeExpiration( const char *nodeName )
{
const char method_name[] = "HandleNodeExpiration";
TRACE_ENTRY;
CNode *node = Nodes->GetNode((char *)nodeName);
if (node)
{
ReqQueue.enqueueDownReq(node->GetPNid());
}
TRACE_EXIT;
}
void CMonitor::CreateZookeeperClient( void )
{
const char method_name[] = "CMonitor::CreateZookeeperClient";
TRACE_ENTRY;
if ( ZClientEnabled )
{
string hostName;
string zkQuorumHosts;
stringstream zkQuorumPort;
char *env;
char hostsStr[MAX_PROCESSOR_NAME*3] = { 0 };
char *tkn = NULL;
int zport;
env = getenv("ZOOKEEPER_PORT");
if ( env && isdigit(*env) )
{
zport = atoi(env);
}
else
{
char buf[MON_STRING_BUF_SIZE];
snprintf(buf, sizeof(buf),
"[%s], Zookeeper quorum port is not defined!\n"
, method_name);
mon_log_write(MON_MONITOR_CREATEZCLIENT_1, SQ_LOG_CRIT, buf);
ZClientEnabled = false;
TRACE_EXIT;
return;
}
env = getenv("ZOOKEEPER_NODES");
if ( env )
{
zkQuorumHosts = env;
if ( zkQuorumHosts.length() == 0 )
{
char buf[MON_STRING_BUF_SIZE];
snprintf(buf, sizeof(buf),
"[%s], Zookeeper quorum hosts are not defined!\n"
, method_name);
mon_log_write(MON_MONITOR_CREATEZCLIENT_2, SQ_LOG_CRIT, buf);
ZClientEnabled = false;
TRACE_EXIT;
return;
}
strcpy( hostsStr, zkQuorumHosts.c_str() );
zkQuorumPort.str( "" );
tkn = strtok( hostsStr, "," );
do
{
if ( tkn != NULL )
{
hostName = tkn;
zkQuorumPort << hostName.c_str()
<< ":"
<< zport;
}
tkn = strtok( NULL, "," );
if ( tkn != NULL )
{
zkQuorumPort << ",";
}
}
while( tkn != NULL );
if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
{
trace_printf( "%s@%d zkQuorumPort=%s\n"
, method_name, __LINE__
, zkQuorumPort.str().c_str() );
}
}
ZClient = new CZClient( zkQuorumPort.str().c_str()
, ZCLIENT_TRAFODION_ZNODE
, ZCLIENT_INSTANCE_ZNODE );
if ( ZClient == NULL )
{
char buf[MON_STRING_BUF_SIZE];
snprintf(buf, sizeof(buf),
"[%s], Failed to allocate ZClient object!\n"
, method_name);
mon_log_write(MON_MONITOR_CREATEZCLIENT_3, SQ_LOG_CRIT, buf);
abort();
}
}
TRACE_EXIT;
}
void CMonitor::StartZookeeperClient( void )
{
const char method_name[] = "CMonitor::StartZookeeperClient";
TRACE_ENTRY;
int rc = -1;
if ( ZClientEnabled )
{
if ( ZClient )
{
rc = ZClient->StartWork();
if (rc == 0)
{
ZClient->StartMonitoring();
char buf[MON_STRING_BUF_SIZE];
snprintf(buf, sizeof(buf),
"[%s], ZClient node monitoring started\n"
, method_name);
mon_log_write(MON_MONITOR_STARTZCLIENT_1, SQ_LOG_INFO, buf);
}
}
}
TRACE_EXIT;
}
#ifdef USE_SEQUENCE_NUM
long long CMonitor::GetTimeSeqNum()
{
const char method_name[] = "CMonitor::GetTimeSeqNum";
TRACE_ENTRY;
TimeSeqNumLock_.lock();
struct timespec currTime;
clock_gettime(CLOCK_REALTIME, &currTime);
if ( (currTime.tv_sec > savedTime_.tv_sec) ||
( (currTime.tv_sec == savedTime_.tv_sec) &&
(currTime.tv_nsec > savedTime_.tv_nsec) ) )
{
savedTime_.tv_sec = currTime.tv_sec;
savedTime_.tv_nsec = currTime.tv_nsec;
}
else
{ // time drifted back. just increment nanoseconds
savedTime_.tv_nsec++;
// overflow check is not required as it would take
// at least 3 billion requests to overflow
}
long long result;
int* value = (int *)&result;
value[0] = savedTime_.tv_nsec; // little endian
value[1] = savedTime_.tv_sec;
TimeSeqNumLock_.unlock();
if (trace_settings & TRACE_REQUEST_DETAIL)
trace_printf("%s@%d Time seq num = %Lx\n", method_name, __LINE__, result);
TRACE_EXIT;
return result;
}
#endif
int main (int argc, char *argv[])
{
int i;
int rc;
bool done = false;
bool warmstart = false;
char *env;
char *nodename = NULL;
char fname[MAX_PROCESS_PATH];
char port_fname[MAX_PROCESS_PATH];
char temp_fname[MAX_PROCESS_PATH];
char buf[MON_STRING_BUF_SIZE];
unsigned int initSleepTime = 1; // 1 second
mallopt(M_ARENA_MAX, 4); // call to limit the number of arena's of monitor to 4.This call doesn't seem to have any effect !
CALL_COMP_DOVERS(monitor, argc, argv);
const char method_name[] = "main";
MonLog = new CMonLog( "log4cxx.monitor.mon.config", "MON", "alt.mon", -1, -1, getpid(), "$MONITOR" );
SnmpLog = new CMonLog( "log4cxx.monitor.mon.snmp.config", "MON-SNMP", "alt.mon.snmp", -1, -1, getpid(), "$MONITOR" );
MonLog->setupInMemoryLog();
#ifdef MULTI_TRACE_FILES
initVariableKey();
#endif
#ifdef DMALLOC
util_dmalloc_start();
#endif
// Set flag to indicate whether we are operating in a real cluster
// or a virtual cluster. This is used throughout the monitor when
// behavior differs for a real vs. virtual cluster environment.
if ( getenv("SQ_VIRTUAL_NODES") )
{
IsRealCluster = false;
Emulate_Down = true;
}
// Save our execution path
env = getenv("PWD");
if ( env )
{
STRCPY(MyPath,env);
}
for(i=strlen(argv[0])-1; i>=0; i--)
{
if (argv[0][i] == '/')
{
argv[0][i] = '\0';
strncpy(MyPath,argv[0],sizeof(MyPath));
argv[0][i] = '/';
break;
}
}
env = getenv("SQ_MEASURE");
if ( env && *env == '1' )
{
Measure = 1;
snprintf(fname, sizeof(fname), "%s/monitor.P%d",
getenv("MPI_TMPDIR"),getpid());
setenv("MPI_INSTR", fname, 1);
}
if ( env && *env == '2' )
{
Measure = 2;
snprintf(fname, sizeof(fname), "%s/monitor.cpu.P%d:cpu",
getenv("MPI_TMPDIR"),getpid());
setenv("MPI_INSTR", fname, 1);
}
env = getenv("SQ_IC");
if ( env != NULL && strcmp(env, "IBV") == 0 )
{
CommType = CommType_InfiniBand;
}
else
{
CommType = CommType_Sockets;
}
// Mask all allowed signals
sigset_t mask;
sigfillset(&mask);
sigdelset(&mask, SIGPROF); // allows profiling such as google profiler
sigdelset(&mask, SIGUSR2);
rc = pthread_sigmask(SIG_SETMASK, &mask, NULL);
if (rc != 0)
{
char buf[MON_STRING_BUF_SIZE];
snprintf(buf, sizeof(buf), "[%s], pthread_sigmask error=%d\n",
method_name, rc);
mon_log_write(MON_MONITOR_MAIN_1, SQ_LOG_ERR, buf);
}
// Setup HP_MPI software license
int key = 413675219; //413675218 to display banner
MPI_Initialized(&key);
// Initialize MPI environment
MPI_Init (&argc, &argv);
env = getenv("MON_PROF_ENABLE");
if ( env )
{
mon_profiler_init(env); // LCOV_EXCL_LINE
}
env = getenv("MON_SNMP_ENABLE");
if ( env )
{
genSnmpTrapEnabled = true;
}
env = getenv("MON_INIT_SLEEP");
if ( env && isdigit(*env) )
{
initSleepTime = atoi(env);
}
env = getenv("SQ_COLD_STANDBY_SPARE");
if ( env && isdigit(*env) )
{
if ( strcmp(env,"0")==0 )
{
SpareNodeColdStandby = false;
}
}
// We need to delay some to make sure all monitor processes have initialized before
// any monitor tries to perform an Allgather operation.
sleep( initSleepTime );
MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
MPI_Comm_set_errhandler(MPI_COMM_SELF, MPI_ERRORS_RETURN);
MPI_Comm_rank (MPI_COMM_WORLD, &MyPNID);
MonLog->setPNid( MyPNID );
SnmpLog->setPNid( MyPNID );
gethostname(Node_name, MPI_MAX_PROCESSOR_NAME);
char *tmpptr = Node_name;
while ( *tmpptr )
{
*tmpptr = (char)tolower( *tmpptr );
tmpptr++;
}
#ifdef MULTI_TRACE_FILES
setThreadVariable( (char *)"mainThread" );
#endif
// Without mpi daemon the monitor has no default standard output.
// We create a standard output file here.
if ( IsRealCluster )
{
snprintf(fname, sizeof(fname), "%s/logs/sqmon.%s.log",
getenv("TRAF_HOME"), Node_name);
}
else
{
snprintf(fname, sizeof(fname), "%s/logs/sqmon.%d.%s.log",
getenv("TRAF_HOME"), MyPNID, Node_name);
}
remove(fname);
if( freopen (fname, "w", stdout) == NULL )
{
char buf[MON_STRING_BUF_SIZE];
snprintf(buf, sizeof(buf), "[%s], can't open stdout (%s).\n",
method_name, fname);
mon_log_write(MON_MONITOR_MAIN_2, SQ_LOG_ERR, buf);
}
setlinebuf(stdout);
// Send stderr output to same file as stdout. (Note: the monitor does
// not write to stderr but perhaps there could be components included in
// the monitor build that do write to stderr.)
if ( dup2 ( 1, 2 ) == -1 )
{
printf ( "dup2 failed for stderr: %s (%d)\n", strerror(errno), errno);
}
switch( CommType )
{
case CommType_Sockets: // Valid communication protocol
break;
case CommType_InfiniBand: // Currenly disabled - requires HA MPI
//MPI_Open_port (MPI_INFO_NULL, MyMPICommPort);
default:
printf( "SQ_IC contains invalid communication protocol: %s\n"
, CommTypeString(CommType));
abort();
}
if (argc > 3 && strcmp (argv[2], "-integrate") == 0)
{
switch( CommType )
{
case CommType_InfiniBand:
if (argc == 4 && strstr(argv[3], "$port#"))
{
SMSIntegrating = IAmIntegrating = true;
strcpy( IntegratingMonitorPort, argv[3] );
}
else
{
printf ( "Invalid integrating monitor MPI port: %s\n", argv[3]);
abort();
}
break;
case CommType_Sockets:
if ( isdigit (*argv[3]) )
{
SMSIntegrating = IAmIntegrating = true;
strcpy( IntegratingMonitorPort, argv[3] );
}
else
{
printf ( "Invalid integrating monitor socket port: %s\n", argv[3]);
abort();
}
break;
default:
// Programmer bonehead!
abort();
}
if ( isdigit (*argv[4]) )
{
CreatorShellPid = atoi( argv[4] );
}
else
{
printf ( "Invalid creator shell pid: %s\n", argv[4]);
abort();
}
if ( isdigit (*argv[5]) )
{
CreatorShellVerifier = atoi( argv[5] );
}
else
{
printf ( "Invalid creator shell verifier: %s\n", argv[5]);
abort();
}
// Trace cannot be specified on startup command but need to
// check for trace environment variable settings.
MonTrace->mon_trace_init("0", NULL);
}
else
if (argc == 3 && isdigit(*argv[2]) )
{
MonTrace->mon_trace_init(argv[2], "STDOUT");
}
else if (argc >= 3 && strcmp (argv[2], "TRACEFILE") == 0)
{
if (argc == 4 && isdigit (*argv[3]))
{
MonTrace->mon_trace_init(argv[3], NULL);
}
}
else
{ // Trace not specified on startup command but need to check
// for trace environment variable settings.
MonTrace->mon_trace_init("0", NULL);
}
if ((env = getenv("MON_SYNC_DELAY")) != NULL)
{ // Sync cycle delay value specified
int delay;
errno = 0;
delay = strtol(env, NULL, 10);
if ( errno == 0) next_test_delay = delay;
}
if (trace_settings & TRACE_INIT)
trace_printf("%s@%d next_test_delay=%ld\n", method_name, __LINE__,
next_test_delay);
#ifdef USE_SONAR
if ( (env = getenv("SQ_SONAR") ) != NULL )
{
sonar_state_init();
if ( IsRealCluster && sonar_verify_state(SONAR_ENABLED | SONAR_MONITOR_ENABLED))
{ // Not a virtual cluster and sonar is enabled.
if (trace_settings & TRACE_INIT)
{
trace_printf("%s@%d Enabling Sonar\n", method_name, __LINE__);
}
MonStats = new CMonSonarStats();
}
}
#endif
if ( MonStats == NULL )
{
if (trace_settings & TRACE_INIT)
{
trace_printf("%s@%d Using non-Sonar counters\n",
method_name, __LINE__);
}
MonStats = new CMonStats();
}
// Normal process termination uses SIGKILL by default. Environment
// variable PROC_TERM_SIG can be used to override this. Valid values
// are SIGKILL or SIGTERM (as either string or integer values).
int procTermSig = SIGKILL;
env = getenv("PROC_TERM_SIG");
if ( env )
{
if (strcasecmp(env, "SIGKILL") == 0)
{
procTermSig = SIGKILL;
}
else if (strcasecmp(env, "SIGTERM") == 0)
{
procTermSig = SIGTERM;
}
else
{
procTermSig = atoi( env );
if ( !(procTermSig == SIGKILL || procTermSig == SIGTERM) )
{
procTermSig = SIGKILL;
}
}
}
if (trace_settings & TRACE_INIT)
{
trace_printf("%s@%d Using signal %d for normal processes "
"termination.\n", method_name, __LINE__, procTermSig);
}
// Record statistics (sonar counters): monitor is busy
if (sonar_verify_state(SONAR_ENABLED | SONAR_MONITOR_ENABLED))
MonStats->MonitorBusyIncr();
snprintf(buf, sizeof(buf),
"[CMonitor::main], %s, Started! CommType: %s\n"
, CALL_COMP_GETVERS2(monitor), CommTypeString( CommType ));
mon_log_write(MON_MONITOR_MAIN_3, SQ_LOG_INFO, buf);
warmstart = (strcmp(argv[1],"WARM") == 0);
#ifdef DMALLOC
if (trace_settings & TRACE_INIT)
trace_printf("%s@%d" "DMALLOC Option set" "\n", method_name, __LINE__);
#endif
#ifdef DEBUGGING
if (trace_settings & TRACE_INIT)
trace_printf("%s@%d" "DEBUGGING Option set" "\n", method_name, __LINE__);
#endif
if ( Emulate_Down )
if (trace_settings & TRACE_INIT)
trace_printf("%s@%d" "EMULATE_DOWN Option set" "\n", method_name, __LINE__);
{
// Create thread for monitoring redirected i/o.
// This is also used for monitor logs, so start it early.
Redirector.start();
Nodes = new CNodeContainer ();
Config = new CConfigContainer ();
Monitor = new CMonitor (procTermSig);
if (!IAmIntegrating)
{
Config->Init ();
}
Devices = new CDeviceContainer ();
if ( !Devices->IsInitialized() )
{
if ( IAmIntegrating )
{ // Problem unmounting devices, let creator monitor know then abort
Monitor->ReIntegrate( CCluster::Reintegrate_Err13 );
}
else
{
MPI_Abort(MPI_COMM_SELF,99); // too early to call failsafe node down.
}
}
nodename = new char [Monitor->NumNodes * MPI_MAX_PROCESSOR_NAME];
// Create health check thread
HealthCheck.start();
// Create thread to accept connections from other monitors
CommAccept.start();
// Open file used to record process start/end times
Monitor->openProcessMap ();
// Always using localio now, no other option
SQ_theLocalIOToClient = new SQ_LocalIOToClient( MyPNID );
assert (SQ_theLocalIOToClient);
#define BLOCK_SIZE 512
char *ioBuffer = NULL;
int fd;
rc = posix_memalign( (void**)&ioBuffer, BLOCK_SIZE, BLOCK_SIZE);
if ( rc == -1 )
{
int err = rc;
char buf[MON_STRING_BUF_SIZE];
snprintf(buf, sizeof(buf), "[%s], can't allocate aligned %d bytes "
"buffer , Error=%d(%s)\n",
method_name, BLOCK_SIZE, err, ErrorMsg(err));
mon_log_write(MON_MONITOR_MAIN_4, SQ_LOG_CRIT, buf);
MPI_Abort(MPI_COMM_SELF,99);
}
memset( (void *)ioBuffer, 0 , BLOCK_SIZE );
// start ok
if (IsRealCluster)
{
snprintf(port_fname, sizeof(port_fname), "%s/monitor.port.%s",
getenv("MPI_TMPDIR"), Node_name );
}
else
{
// Write out our port number so other processes can attach.
snprintf(port_fname, sizeof(port_fname), "%s/monitor.port.%d.%s",
getenv("MPI_TMPDIR"),MyPNID,Node_name);
}
// create with no caching, user read/write, group read/write, other read
fd = open( port_fname
, O_RDWR | O_TRUNC | O_CREAT | O_DIRECT
, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH );
if ( fd != -1 )
{
snprintf( ioBuffer, BLOCK_SIZE, "%s", MyCommPort );
rc = write( fd, ioBuffer, BLOCK_SIZE );
if ( rc == -1 )
{
int err = errno;
char buf[MON_STRING_BUF_SIZE];
snprintf(buf, sizeof(buf), "[%s], can't write %d bytes to "
"port file (%s), Error=%d(%s)\n",
method_name, BLOCK_SIZE, port_fname, err,
ErrorMsg(err));
mon_log_write(MON_MONITOR_MAIN_5, SQ_LOG_CRIT, buf);
if ( IAmIntegrating )
// This monitor is reintegrating into cluster. Inform
// creator monitor of error, then abort.
Monitor->ReIntegrate( CCluster::Reintegrate_Err10 );
else
MPI_Abort(MPI_COMM_SELF,99);
}
close( fd );
if (trace_settings & TRACE_INIT)
trace_printf("%s@%d" " Port file created, pnid=%d, port=%s" "\n", method_name, __LINE__, MyPNID, MyCommPort );
}
else
{
char buf[MON_STRING_BUF_SIZE];
snprintf(buf, sizeof(buf), "[%s], can't open port file (%s), "
"Error= %s\n", method_name, port_fname,
ErrorMsg(errno));
mon_log_write(MON_MONITOR_MAIN_6, SQ_LOG_CRIT, buf);
if ( IAmIntegrating )
// This monitor is reintegrating into cluster. Inform
// creator monitor of error, then abort.
Monitor->ReIntegrate( CCluster::Reintegrate_Err11 );
else
MPI_Abort(MPI_COMM_SELF,99);
}
free( ioBuffer );
int ret = SQ_theLocalIOToClient->initWorker();
if (ret)
{
if (trace_settings & TRACE_INIT)
trace_printf("%s@%d" " Cannot start localio worker, aborting " "%d" "\n", method_name, __LINE__, ret);
delete SQ_theLocalIOToClient;
SQ_theLocalIOToClient = NULL;
if ( IAmIntegrating )
// This monitor is reintegrating into cluster. Inform
// creator monitor of error, then abort.
Monitor->ReIntegrate( CCluster::Reintegrate_Err12 );
else
assert (false);
}
if (trace_settings & TRACE_INIT)
trace_printf("%s@%d" "started LocalIOToClient environment\n" "\n", method_name, __LINE__);
if (trace_settings & TRACE_INIT)
{
int rc = getrlimit( RLIMIT_SIGPENDING, &Rl );
if ( rc == 0 )
{
printf("%s@%d" " RLIMIT_SIGPENDING cur=%d, max=%d\n", method_name, __LINE__, (int)Rl.rlim_cur, (int)Rl.rlim_max);
}
}
if ( IsRealCluster )
{
// Zookeeper client is enabled only in a real cluster
env = getenv("SQ_MON_ZCLIENT_ENABLED");
if ( env )
{
if ( env && isdigit(*env) )
{
if ( strcmp(env,"0")==0 )
{
ZClientEnabled = false;
}
}
}
if ( ZClientEnabled )
{
Monitor->CreateZookeeperClient();
}
}
else
{
ZClientEnabled = false;
}
if ( IAmIntegrating )
{
// This monitor is integrating to (joining) an existing cluster
Monitor->ReIntegrate( 0 );
MyNode->SetPhase( Phase_Activating );
if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
trace_printf("%s@%d" " After UpdateCluster" "\n", method_name, __LINE__);
}
else
{
Monitor->EnterSyncCycle();
done = Monitor->exchangeNodeData();
Monitor->ExitSyncCycle();
if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
trace_printf("%s@%d" " After ImAlive " "\n", method_name, __LINE__);
}
}
// Enable Zookeeper client in real cluster only and
// after the integration phase on a node 'up'
if ( IsRealCluster )
{
if ( ZClientEnabled )
{
Monitor->StartZookeeperClient();
}
}
// Initialize Seabed disconnect semaphore
char *port;
switch( CommType )
{
case CommType_InfiniBand:
port = strstr(MyCommPort, "$port#");
if (port) port += 5;
break;
case CommType_Sockets:
port = strchr(MyCommPort, ':');
break;
default:
// Programmer bonehead!
abort();
}
if (port != NULL)
{
int myPortNum;
int semMax = 50;
unsigned int segKey;
myPortNum = strtol(&port[1], NULL, 10);
env = getenv("MS_DISC_SEM");
if ( env )
{
errno = 0;
semMax = strtol(env, NULL, 10);
if ( errno != 0) semMax = 50;
}
segKey = RobSem::getSegKey(0x40000000, myPortNum, MyPNID);
rc = RobSem::create_sem(segKey, IPC_CREAT + IPC_EXCL, sbDiscSem, semMax);
if ( rc != 0 )
{
char buf[MON_STRING_BUF_SIZE];
snprintf(buf, sizeof(buf),
"[%s], Failed to create seabed disconnect semaphore: %s "
"(%d)\n", method_name, strerror(errno), errno);
mon_log_write(MON_MONITOR_MAIN_10, SQ_LOG_CRIT, buf);
sbDiscSem = NULL;
}
}
// Create request worker threads
CReqWorker::startReqWorkers();
if ( ! IAmIntegrating )
{
Monitor->StartPrimitiveProcesses();
}
env = getenv( "SQ_USE_CPU_AFFINITY" );
if ( env && strcmp( env, "1" ) == 0 )
{ // Set flag to indicate that logical node CPU affinity is used for
// processes.
// (see CNode::SetAffinity)
usingCpuAffinity = true;
}
env = getenv( "SQ_USE_TSE_CPU_AFFINITY" );
if ( env && strcmp( env, "1" ) == 0 )
{ // Set flag to indicate that CPU affinity is used for TSE processes.
// (see CNode::SetAffinity)
usingTseCpuAffinity = true;
}
nice(MON_BASE_NICE);
Monitor->setMonInitComplete(true);
struct timeval awakenedAt;
struct timeval awakeTime;
struct timeval now;
gettimeofday(&awakenedAt, NULL);
while (!done)
{
// Record statistics (sonar counters): monitor is NOT busy
if (sonar_verify_state(SONAR_ENABLED | SONAR_MONITOR_ENABLED))
MonStats->MonitorBusyDecr();
// Sleep for a maximum of next_test_delay. Reduce
// next_test_delay by the amount of time since awoke for
// current sync cycle. If current sync cycle took a long
// time don't sleep at all.
gettimeofday(&now, NULL);
if ( (now.tv_usec - awakenedAt.tv_usec ) < 0 )
{
awakeTime.tv_sec = now.tv_sec - awakenedAt.tv_sec - 1;
awakeTime.tv_usec = 1000000 + now.tv_usec - awakenedAt.tv_usec;
}
else
{
awakeTime.tv_sec = now.tv_sec - awakenedAt.tv_sec;
awakeTime.tv_usec = now.tv_usec - awakenedAt.tv_usec;
}
if ( awakeTime.tv_sec == 0
&& awakeTime.tv_usec < next_test_delay)
{
usleep( next_test_delay - awakeTime.tv_usec );
}
gettimeofday(&awakenedAt, NULL);
// Record statistics (sonar counters): monitor is busy
if (sonar_verify_state(SONAR_ENABLED | SONAR_MONITOR_ENABLED))
MonStats->MonitorBusyIncr();
Monitor->EnterSyncCycle();
if ( Monitor->TmSyncPending() )
{
Monitor->TmSync ();
}
Monitor->ExitSyncCycle();
if ( !Monitor->GetPendingSlaveTmSync() &&
Monitor->GetTotalSlaveTmSyncCount() == 0 )
{
Monitor->EnterSyncCycle();
done = Monitor->exchangeNodeData();
Monitor->ExitSyncCycle();
}
if (done)
break;
// Check to see if 'ckillall' is executing and disable the watchdog
if ( !SQ_theLocalIOToClient->isWDTEnabled() )
{
// HealthCheck.setState(MON_EXIT_WATCHDOG);
}
}
if (trace_settings & TRACE_STATS)
{ // Write malloc statistics info to stderr
monMallocStats();
}
if ( ZClientEnabled )
{
ZClient->StopMonitoring();
ZClient->ShutdownWork();
}
Redirector.shutdownWork();
// shut down health check thread before shutting down reqWorker thread.
HealthCheck.shutdownWork();
ReqQueue.stats();
// Stop request worker threads
CReqWorker::shutdownWork();
Replicator.stats();
Monitor->stats();
// Tell the LIO worker threads to exit
SQ_theLocalIOToClient->shutdownWork();
CommAccept.shutdownWork();
// Rename the monitor "port" file
sprintf(temp_fname, "%s.bak", port_fname);
remove(temp_fname);
rename(port_fname, temp_fname);
delete [] nodename;
delete Devices;
delete Nodes;
delete ZClient;
delete Monitor;
Monitor = NULL; // TRACE uses this
delete Config;
if ( sbDiscSem != NULL )
{
RobSem::destroy_sem( sbDiscSem );
}
if ( CommType == CommType_InfiniBand )
{
MPI_Close_port( MyCommPort );
}
#if 0
// TODO: MPICH cannot handle a node down and subsequent shutdown
// MPI_Finalize() hangs so its currently disabled, but
// causes an abnormal termination in the monitor process at exit.
if (trace_settings & TRACE_INIT)
trace_printf("%s@%d" "- Calling MPI_Finalize()" "\n", method_name, __LINE__);
MPI_Finalize ();
#endif
if (trace_settings & TRACE_STATS)
{
trace_printf("%s@%d" "- LIO Stats: shared_buffers_total=" "%d" "\n", method_name, __LINE__, SQ_theLocalIOToClient->getSharedBufferCount());
trace_printf("%s@%d" "- LIO Stats: shared_buffers_acquired_max=" "%d" "\n", method_name, __LINE__, SQ_theLocalIOToClient->getAcquiredBufferCount());
trace_printf("%s@%d" "- LIO Stats: shared_buffers_available_min=" "%d" "\n", method_name, __LINE__, SQ_theLocalIOToClient->getAvailableBufferCount());
trace_printf("%s@%d" "- LIO Stats: shared_buffer_misses=" "%d" "\n", method_name, __LINE__, SQ_theLocalIOToClient->getMissedBufferCount());
trace_printf("%s@%d" "- LIO Stats: max queued child death=%d\n",
method_name, __LINE__,
SQ_theLocalIOToClient->getMaxChildDeathCount());
trace_printf("%s@%d" "- LIO Stats: almost-dead pids=%d\n",
method_name, __LINE__,
SQ_theLocalIOToClient->getAlmostDeadPids());
trace_printf("%s@%d" "- LIO Stats: verifierMap=" "%d" "\n", method_name, __LINE__, SQ_theLocalIOToClient->getVerifierMapCount());
}
delete SQ_theLocalIOToClient;
snprintf(buf, sizeof(buf), "[CMonitor::main], Shutdown normally.\n");
mon_log_write(MON_MONITOR_MAIN_11, SQ_LOG_INFO, buf);
if (trace_settings & TRACE_STATS)
{
if (sonar_verify_state(SONAR_ENABLED | SONAR_MONITOR_ENABLED))
MonStats->displayStats();
}
// Record statistics (sonar counters): monitor is NOT busy
if (sonar_verify_state(SONAR_ENABLED | SONAR_MONITOR_ENABLED))
MonStats->MonitorBusyDecr();
delete MonLog;
return 0;
}
#ifdef DELAY_TP
void CMonitor::Delay_TP(char *tpName)
{
char nodename[20];
char keyname[MAX_KEY_NAME];
char valueC[MAX_VALUE_SIZE];
int value;
if (Config)
{
snprintf(nodename, sizeof(nodename), "NODE%d",MyPNID);
CConfigGroup *group = Config->GetGroup(nodename);
if (group)
{
strcpy(keyname,"REQUESTDELAY_TP");
CConfigKey *key = group->GetKey(keyname);
if (key)
{
strcpy(valueC,key->Value);
value = atoi(valueC);
sleep(value);
}
}
}
}
#endif