blob: 2ad0528790b1778208a926226863c467ff07f3f1 [file] [log] [blame]
///////////////////////////////////////////////////////////////////////////////
//
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// @@@ END COPYRIGHT @@@
//
///////////////////////////////////////////////////////////////////////////////
#include <iostream>
#include <map>
using namespace std;
#include <stdlib.h>
#include <errno.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
#include <malloc.h>
#include <sys/ipc.h>
#include <sys/time.h>
#include <sys/resource.h> //add for getrlimit, strange centos6/gcc4.4 don't need this
//#include <sys/stat.h>
#include "monlogging.h"
#include "monprof.h"
#include "monsonar.h"
#include "montrace.h"
#include "redirector.h"
#include "healthcheck.h"
#include "config.h"
#include "device.h"
#include "clusterconf.h"
#include "nameserverconfig.h"
#include "lnode.h"
#include "pnode.h"
#include "tmsync.h"
#include "cluster.h"
#include "monitor.h"
#include "props.h"
#ifdef DMALLOC
#include "dm.h"
#endif
#include "replicate.h"
#include "robsem.h"
#include "commaccept.h"
#ifdef NAMESERVER_PROCESS
#include "nscommacceptmon.h"
#endif
#include <assert.h>
#include <signal.h>
#include <sys/ptrace.h>
#include <sys/time.h>
#include <sys/wait.h>
#include <sched.h>
#include "localio.h"
#include "lock.h"
#include "mlio.h"
#include "redirector.h"
#include "intprocess.h"
#include "nameserver.h"
#include "meas.h"
#include "reqqueue.h"
#include "reqworker.h"
#include "zclient.h"
#include "SCMVersHelp.h"
#ifndef NAMESERVER_PROCESS
#include "ptpcommaccept.h"
#include "ptpclient.h"
#endif
#define LOG_ERROR
#define RidRecvPrioritySlot 4096
#define MinimumSlotToPrioritize 4096
// Global Variables
struct rlimit Rl;
bool PidMap = false;
bool usingCpuAffinity = false;
bool usingTseCpuAffinity = false;
bool genSnmpTrapEnabled = false;
int Measure = 0;
long trace_level = 0;
char MyPath[MAX_PROCESS_PATH];
char MyCommPort[MPI_MAX_PORT_NAME] = {'\0'};
char MyMPICommPort[MPI_MAX_PORT_NAME] = {'\0'};
char MySyncPort[MPI_MAX_PORT_NAME] = {'\0'};
#ifdef NAMESERVER_PROCESS
char MyMon2NsPort[MPI_MAX_PORT_NAME] = {'\0'};
#else
char MyPtPPort[MPI_MAX_PORT_NAME] = {'\0'};
#endif
char Node_name[MPI_MAX_PROCESSOR_NAME] = {'\0'};
sigset_t SigSet;
bool Emulate_Down = false;
#ifdef NAMESERVER_PROCESS
long next_test_delay = 10000; // in usec. (default 10 msec)
#else
long next_test_delay = 100000; // in usec. (default 100 msec)
#endif
CClusterConfig *ClusterConfig = NULL;
bool IAmIntegrating = false;
bool IAmIntegrated = false;
char IntegratingMonitorPort[MPI_MAX_PORT_NAME] = {'\0'};
bool IsRealCluster = true;
bool IsAgentMode = false;
bool IsNameServer = false;
bool IsMaster = false;
bool IsMPIChild = false;
char MasterMonitorName[MAX_PROCESS_PATH]= {'\0'};
CommType_t CommType = CommType_Undefined;
bool SMSIntegrating = false;
int CreatorShellPid = -1;
Verifier_t CreatorShellVerifier = -1;
bool SpareNodeColdStandby = true;
bool ZClientEnabled = true;
#ifndef NAMESERVER_PROCESS
bool NameServerEnabled = false;
#endif
// Lock to manage memory modifications during fork/exec
CLock MemModLock;
CMonitor *Monitor = NULL;
#ifndef NAMESERVER_PROCESS
CNameServer *NameServer = NULL;
CProcess *NameServerProcess = NULL;
CPtpClient *PtpClient = NULL;
#endif
CNodeContainer *Nodes = NULL;
CConfigContainer *Config = NULL;
CNameServerConfigContainer *NameServerConfig = NULL;
#ifndef NAMESERVER_PROCESS
CDeviceContainer *Devices = NULL;
#endif
int MyPNID = -1;
CNode *MyNode;
CMonLog *MonLog = NULL;
CMonStats * MonStats = NULL;
extern CMonTrace *MonTrace;
#ifndef NAMESERVER_PROCESS
CRedirector Redirector;
#endif
CIntProcess IntProcess;
CReqQueue ReqQueue;
CHealthCheck HealthCheck;
CCommAccept CommAccept;
#ifdef NAMESERVER_PROCESS
CCommAcceptMon CommAcceptMon;
#else
CPtpCommAccept PtpCommAccept;
#endif
extern CReplicate Replicator;
CZClient *ZClient = NULL;
#ifndef NAMESERVER_PROCESS
// Seabed disconnect semaphore
RobSem * sbDiscSem = NULL;
#endif
int monitorArgc = 0;
char monitorArgv[MAX_ARGS][MAX_ARG_SIZE];
CMeas Meas;
#ifdef NAMESERVER_PROCESS
DEFINE_EXTERN_COMP_DOVERS(trafns)
DEFINE_EXTERN_COMP_GETVERS2(trafns)
#else
DEFINE_EXTERN_COMP_DOVERS(monitor)
DEFINE_EXTERN_COMP_GETVERS2(monitor)
#endif
_TM_Txid_External invalid_trans( void )
{
_TM_Txid_External trans1;
trans1.txid[0] = -1LL;
trans1.txid[1] = -1LL;
trans1.txid[2] = -1LL;
trans1.txid[3] = -1LL;
return trans1;
}
_TM_Txid_External null_trans( void )
{
_TM_Txid_External trans1;
trans1.txid[0] = 0LL;
trans1.txid[1] = 0LL;
trans1.txid[2] = 0LL;
trans1.txid[3] = 0LL;
return trans1;
}
bool isEqual( _TM_Txid_External trans1, _TM_Txid_External trans2 )
{
return (memcmp(&trans1,&trans2,sizeof(_TM_Txid_External)) == 0);
}
bool isNull( _TM_Txid_External transid )
{
_TM_Txid_External trans_null = null_trans();
return isEqual(transid,trans_null);
}
bool isInvalid( _TM_Txid_External transid )
{
_TM_Txid_External trans_invalid = invalid_trans();
return isEqual(transid,trans_invalid);
}
char *ErrorMsg (int error_code)
{
int rc;
int length;
static char buffer[MPI_MAX_ERROR_STRING];
rc = MPI_Error_string (error_code, buffer, &length);
if (rc != MPI_SUCCESS)
{
snprintf(buffer, sizeof(buffer),
"MPI_Error_string: Invalid error code (%d)\n", error_code);
length = strlen(buffer);
}
buffer[length] = '\0';
return buffer;
}
#ifndef NAMESERVER_PROCESS
void child_death_signal_handler2 (int signal, siginfo_t *info, void *)
{
pid_t pid;
int saveerrno;
int status;
pid_t whichPid;
const char method_name[] = "child_death_signal_handler2";
if (trace_settings & TRACE_ENTRY_EXIT)
trace_nolock_printf("%s@%d\n", method_name, __LINE__);
saveerrno = errno; // waitpid/etc sets errno
if (trace_settings & TRACE_SIG_HANDLER)
trace_nolock_printf("%s@%d - signal=%d, code=%d, status=%d, pid=%d\n",
method_name, __LINE__, signal, info->si_code,
info->si_status, info->si_pid);
// Handle the process that triggered the signal handler as well
// as any other exited children.
whichPid = info->si_pid;
do
{
pid = waitpid (whichPid, &status, WNOHANG);
if (trace_settings & TRACE_SIG_HANDLER)
{
if ( pid != -1 )
{
trace_nolock_printf("%s@%d - waitpid(%d) returned %d\n",
method_name, __LINE__, whichPid, pid);
}
else
{
trace_nolock_printf("%s@%d - waitpid(%d) error, %s (%d)\n",
method_name, __LINE__, whichPid,
strerror(errno), errno);
}
}
if (pid > 0)
{
IntProcess.handle_signal(pid, status);
// Add this pid to a list to be examined outside the
// signal-handler (to avoid spending excessive time in
// signal handler). When that list is examined the work
// required to handle the terminated process will be done.
SQ_theLocalIOToClient->handleDeadPid(pid);
if (trace_settings & TRACE_SIG_HANDLER)
{
if ( WIFEXITED(status) )
{ // Process exited normally
trace_nolock_printf("%s@%d - process %d exited, exit"
" status=%d\n", method_name,__LINE__,
pid, WEXITSTATUS(status));
}
if ( WIFSIGNALED(status) )
{ // Process was terminated by a signal
trace_nolock_printf("%s@%d - process %d terminated by "
"signal #%d\n", method_name, __LINE__,
pid, WTERMSIG(status));
}
}
}
else if ( whichPid != -1 && pid == 0 )
{
// Process that triggered the signal handler has not yet changed
// state. Remember the "pid" so we can handle it later.
if (trace_settings & TRACE_SIG_HANDLER)
trace_nolock_printf("%s@%d - adding pid=%d to unreaped list\n",
method_name, __LINE__, pid);
SQ_theLocalIOToClient->handleAlmostDeadPid(whichPid);
pid = 1; // force at least one more waitpid to be done
}
whichPid = -1; // now do waitpid on any child process
}
while (pid > 0);
errno = saveerrno;
if (trace_settings & TRACE_ENTRY_EXIT)
trace_nolock_printf("%s@%d - Exit\n", method_name, __LINE__);
}
#endif
void monMallocStats()
{
// Log current malloc statistics in stderr
time_t mytime = time(NULL);
char *timestamp = ctime(&mytime);
timestamp[strlen(timestamp)-1] = '\0';
printf("monitor malloc statistics at %s:\n", timestamp);
malloc_stats();
}
const char *CommTypeString( CommType_t commType)
{
const char *str;
switch( commType )
{
case CommType_InfiniBand:
str = "InfiniBand";
break;
case CommType_Sockets:
str = "Sockets";
break;
default:
str = "Undefined";
break;
}
return( str );
}
#ifdef NAMESERVER_PROCESS
CMonitor::CMonitor ()
: CCluster (),
#else
CMonitor::CMonitor (int procTermSig)
: CTmSync_Container (),
#endif
OpenCount (0)
, NoticeCount (0)
, ProcessCount (0)
, NumOutstandingIO (0)
, NumOutstandingSends (0)
, Last_error (MPI_SUCCESS)
#ifndef NAMESERVER_PROCESS
, processMapFd ( -1 )
, procTermSig_ ( procTermSig )
#endif
{
const char method_name[] = "CMonitor::CMonitor";
TRACE_ENTRY;
// Add eyecatcher sequence as a debugging aid
memcpy(&eyecatcher_, "MNTR", 4);
#ifdef USE_SEQUENCE_NUM
clock_gettime(CLOCK_REALTIME, &savedTime_);
#endif
TRACE_EXIT;
}
CMonitor::~CMonitor (void)
{
const char method_name[] = "CMonitor::~CMonitor";
TRACE_ENTRY;
// Alter eyecatcher sequence as a debugging aid to identify deleted object
memcpy(&eyecatcher_, "mntr", 4);
#ifndef NAMESERVER_PROCESS
if ( processMapFd != -1)
{
close ( processMapFd );
}
#endif
TRACE_EXIT;
}
void CMonitor::IncOpenCount (void)
{
OpenCount++;
}
void CMonitor::IncNoticeCount (void)
{
NoticeCount++;
}
void CMonitor::IncProcessCount (void)
{
ProcessCount++;
}
void CMonitor::DecrOpenCount (void)
{
OpenCount--;
}
void CMonitor::DecrNoticeCount (void)
{
NoticeCount--;
}
void CMonitor::DecrProcessCount (void)
{
ProcessCount--;
}
#ifndef NAMESERVER_PROCESS
void CMonitor::openProcessMap ( void )
{
char fname[MAX_PROCESS_PATH];
char *env;
env = getenv("SQ_PIDMAP");
if ( env && *env == '1' )
{
PidMap = true;
}
snprintf( fname, sizeof(fname), "%s/monitor.map.%d.%s",
getenv("MPI_TMPDIR"), MyPNID, Node_name );
remove(fname);
processMapFd = open(fname, O_WRONLY | O_APPEND | O_CREAT,
S_IRUSR | S_IWUSR );
if ( processMapFd == -1 )
{ // File open error
char buf[MON_STRING_BUF_SIZE];
snprintf(buf, sizeof(buf),
"[CMonitor::openProcessMap], Error opening %s, %s (%d).\n",
fname, strerror(errno), errno);
mon_log_write(MON_PROCESS_COMPLETEPSTARTUP_2, SQ_LOG_ERR, buf);
}
}
#endif
#ifndef NAMESERVER_PROCESS
void CMonitor::writeProcessMapEntry ( const char * buf )
{
if ( processMapFd != -1 )
write( processMapFd, buf, strlen(buf));
}
#endif
#ifndef NAMESERVER_PROCESS
void CMonitor::writeProcessMapBegin( const char *name
, int nid
, int pid
, int verifier
, int parentNid
, int parentPid
, int parentVerifier
, const char *program )
{
char buf[55+MAX_PROCESS_NAME+MAX_PROCESS_PATH];
time_t mytime = time(NULL);
char *timestamp = ctime(&mytime);
timestamp[strlen(timestamp)-1] = '\0';
snprintf( buf, sizeof(buf)
, "BEGIN %s %-8s (%d, %d:%d) P(%d, %d:%d) %s\n"
, timestamp, name, nid, pid, verifier
, parentNid, parentPid, parentVerifier, program);
writeProcessMapEntry ( buf );
}
#endif
#ifndef NAMESERVER_PROCESS
void CMonitor::writeProcessMapEnd( const char *name
, int nid
, int pid
, int verifier
, int parentNid
, int parentPid
, int parentVerifier
, const char *program )
{
char buf[55+MAX_PROCESS_NAME+MAX_PROCESS_PATH];
time_t mytime = time(NULL);
char *timestamp = ctime(&mytime);
timestamp[strlen(timestamp)-1] = '\0';
snprintf( buf, sizeof(buf)
, "END %s %-8s (%d, %d:%d) P(%d, %d:%d) %s\n"
, timestamp, name, nid, pid, verifier
, parentNid, parentPid, parentVerifier, program);
writeProcessMapEntry ( buf );
}
#endif
bool CMonitor::CompleteProcessStartup (struct message_def * msg)
{
bool status = FAILURE;
CProcess *process;
CLNode *lnode;
const char method_name[] = "CMonitor::CompleteProcessStartup";
TRACE_ENTRY;
lnode = Nodes->GetLNode( msg->u.request.u.startup.nid );
if ( lnode )
{
process = lnode->
CompleteProcessStartup ( msg->u.request.u.startup.process_name,
msg->u.request.u.startup.port_name,
msg->u.request.u.startup.os_pid,
msg->u.request.u.startup.event_messages,
msg->u.request.u.startup.system_messages,
NULL,
-1 );
}
else
{
char buf[MON_STRING_BUF_SIZE];
snprintf(buf, sizeof(buf), "[%s], Invalid Node ID=%d\n",
method_name, msg->u.request.u.startup.nid);
mon_log_write(MON_MONITOR_COMPLETEPSTARTUP_1, SQ_LOG_ERR, buf);
process = NULL;
}
if (process)
{
if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
{
trace_printf("%s@%d - Process %s started on port %s\n",
method_name, __LINE__,
msg->u.request.u.startup.process_name,
msg->u.request.u.startup.port_name);
}
#ifndef NAMESERVER_PROCESS
CProcessContainer::ParentNewProcReply( process, MPI_SUCCESS);
status = SUCCESS;
#endif
}
else
{
char buf[MON_STRING_BUF_SIZE];
snprintf(buf, sizeof(buf), "[CMonitor::CompleteProcessStartup], Error= Can't find process: %s!\n", msg->u.request.u.startup.process_name);
mon_log_write(MON_MONITOR_COMPLETEPSTARTUP_2, SQ_LOG_ERR, buf);
msg->u.reply.type = ReplyType_Generic;
msg->u.reply.u.generic.nid = -1;
msg->u.reply.u.generic.pid = -1;
msg->u.reply.u.generic.verifier = -1;
msg->u.reply.u.generic.process_name[0] = '\0';
msg->u.reply.u.generic.return_code = MPI_ERR_NAME;
status = FAILURE;
}
TRACE_EXIT;
return status;
}
char * CMonitor::ProcCopy(char *bufPtr, CProcess *process)
{
const char method_name[] = "CMonitor::ProcCopy";
TRACE_ENTRY;
int stringDataLen = 0;
struct clone_def *procObj = (struct clone_def *)bufPtr;
procObj->nid = process->GetNid();
procObj->type = process->GetType();
procObj->priority = process->GetPriority();
procObj->backup = process->IsBackup();
procObj->unhooked = process->IsUnhooked();
#ifndef NAMESERVER_PROCESS
procObj->pathStrId = process->pathStrId();
procObj->ldpathStrId = process->ldPathStrId();
procObj->programStrId = process->programStrId();
#endif
procObj->os_pid = process->GetPid();
procObj->verifier = process->GetVerifier();
procObj->prior_pid = process->GetPriorPid ();
procObj->parent_nid = process->GetParentNid();
procObj->parent_pid = process->GetParentPid();
procObj->parent_verifier = process->GetParentVerifier();
procObj->persistent_retries = process->GetPersistentRetries();
procObj->event_messages = process->IsEventMessages();
procObj->system_messages = process->IsSystemMessages();
procObj->argc = process->argc();
procObj->creation_time = process->GetCreationTime();
if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY))
trace_printf( "%s@%d - Packing process %s (%d,%d:%d)\n"
, method_name, __LINE__
, process->GetName()
, process->GetNid()
, process->GetPid()
, process->GetVerifier() );
char *stringData = &procObj->stringData;
if (strlen(process->GetName()))
{
// Copy the program name
procObj->nameLen = strlen(process->GetName()) + 1;
memcpy(stringData, process->GetName(), procObj->nameLen );
stringData += procObj->nameLen;
stringDataLen = procObj->nameLen;
}
else
{
procObj->nameLen = 0;
}
if (strlen(process->GetPort()))
{
// Copy the port
procObj->portLen = strlen(process->GetPort()) + 1;
memcpy(stringData, process->GetPort(), procObj->portLen );
stringData += procObj->portLen;
stringDataLen += procObj->portLen;
}
else
{
procObj->portLen = 0;
}
#ifdef NAMESERVER_PROCESS
if (strlen(process->path()))
{
// Copy the path
procObj->pathLen = strlen(process->path()) + 1;
memcpy(stringData, process->path(), procObj->pathLen );
stringData += procObj->pathLen;
stringDataLen = procObj->pathLen;
}
else
{
procObj->pathLen = 0;
}
if (strlen(process->ldpath()))
{
// Copy the ldpath
procObj->ldpathLen = strlen(process->ldpath()) + 1;
memcpy(stringData, process->ldpath(), procObj->ldpathLen );
stringData += procObj->ldpathLen;
stringDataLen = procObj->ldpathLen;
}
else
{
procObj->ldpathLen = 0;
}
if (strlen(process->program()))
{
// Copy the program
procObj->programLen = strlen(process->program()) + 1;
memcpy(stringData, process->program(), procObj->programLen );
stringData += procObj->programLen;
stringDataLen = procObj->programLen;
}
else
{
procObj->programLen = 0;
}
#endif
if (process->IsPersistent())
{
if (strlen(process->infile()))
{
// Copy the standard in file name
procObj->infileLen = strlen(process->infile()) + 1;
memcpy(stringData, process->infile(), procObj->infileLen);
stringData += procObj->infileLen;
stringDataLen += procObj->infileLen;
}
else
{
procObj->infileLen = 0;
}
if (strlen(process->outfile()))
{
// Copy the standard out file name
procObj->outfileLen = strlen(process->outfile()) + 1;
memcpy(stringData, process->outfile(), procObj->outfileLen );
stringData += procObj->outfileLen;
stringDataLen += procObj->outfileLen;
}
else
{
procObj->outfileLen = 0;
}
procObj->argvLen = process->userArgvLen();
if (procObj->argvLen)
{
// Copy the program argument strings
memcpy(stringData, process->userArgv(), procObj->argvLen);
stringData += procObj->argvLen;
stringDataLen += procObj->argvLen;
}
procObj->persistent = true;
}
else
{
procObj->infileLen = 0;
procObj->outfileLen = 0;
procObj->argvLen = 0;
procObj->persistent = false;
}
#ifdef NAMESERVER_PROCESS
if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY))
trace_printf( "%s@%d - Packing process string data:\n"
" name(%d) =%s\n"
" port(%d) =%s\n"
" path(%d) =%s\n"
" ldpath(%d) =%s\n"
" program(%d) =%s\n"
" infile(%d) =%s\n"
" outfile(%d) =%s\n"
" userArgv(%d) =%s\n"
" stringData(%d) =%s\n"
, method_name, __LINE__
, procObj->nameLen
, process->GetName()
, procObj->portLen
, process->GetPort()
, procObj->pathLen
, process->path()
, procObj->ldpathLen
, process->ldpath()
, procObj->programLen
, process->program()
, procObj->infileLen
, process->infile()
, procObj->outfileLen
, process->outfile()
, procObj->argvLen
, procObj->argvLen?process->userArgv():""
, stringDataLen
, stringDataLen?&procObj->stringData:"" );
#else
if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY))
trace_printf( "%s@%d - Packing process string data:\n"
" name(%d) =%s\n"
" port(%d) =%s\n"
" infile(%d) =%s\n"
" outfile(%d) =%s\n"
" userArgv(%d) =%s\n"
" stringData(%d) =%s\n"
, method_name, __LINE__
, procObj->nameLen
, process->GetName()
, procObj->portLen
, process->GetPort()
, procObj->infileLen
, process->infile()
, procObj->outfileLen
, process->outfile()
, procObj->argvLen
, procObj->argvLen?process->userArgv():""
, stringDataLen
, stringDataLen?&procObj->stringData:"" );
#endif
TRACE_EXIT;
return stringData;
}
int CMonitor::PackProcObjs( char *&buffer )
{
const char method_name[] = "CMonitor::PackProcObjs";
TRACE_ENTRY;
CLNode *lnode = NULL;
CProcess *process = NULL;
int procCount = 0;
char *bufPtr = buffer;
#ifndef NAMESERVER_PROCESS
if (!NameServerEnabled)
#endif
{
// first copy all primary and generic processes
lnode = Nodes->GetFirstLNode();
for ( ; lnode ; lnode = lnode->GetNext() )
{
process = lnode->GetFirstProcess();
while (process)
{
if (!process->IsBackup())
{
buffer = ProcCopy(buffer, process);
++procCount;
}
process = process->GetNext();
}
}
// copy all the backup processes
lnode = Nodes->GetFirstLNode();
for ( ; lnode ; lnode = lnode->GetNext() )
{
process = lnode->GetFirstProcess();
while (process)
{
if (process->IsBackup())
{
buffer = ProcCopy(buffer, process);
++procCount;
}
process = process->GetNext();
}
}
}
if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY))
{
long avg = 0;
if ( procCount > 0 )
avg = (buffer - bufPtr) / procCount;
trace_printf("%s@%d - Total Procs = %d, Total Size = %ld, Avg = %ld bytes\n",
method_name, __LINE__, procCount, buffer - bufPtr, avg);
}
TRACE_EXIT;
return procCount;
}
void CMonitor::UnpackProcObjs( char *&buffer, int procCount )
{
const char method_name[] = "CMonitor::UnpackProcObjs";
TRACE_ENTRY;
CNode * node = NULL;
CProcess * process = NULL;
int stringDataLen;
#ifdef NAMESERVER_PROCESS
char *path = NULL;
char *ldpath = NULL;
char *program = NULL;
#endif
char *name = NULL;
char *port = NULL;
char *infile = NULL;
char *outfile = NULL;
char *userargv = NULL;
char *stringData = NULL;
struct clone_def *procObj;
int i;
for (i = 0; i < procCount; i++)
{
procObj = (struct clone_def *)buffer;
stringDataLen = 0;
stringData = &procObj->stringData;
node = Nodes->GetLNode (procObj->nid)->GetNode();
if (procObj->nameLen)
{
name = &stringData[stringDataLen];
stringDataLen += procObj->nameLen;
}
if (procObj->portLen)
{
port = &stringData[stringDataLen];
stringDataLen += procObj->portLen;
}
#ifdef NAMESERVER_PROCESS
if (procObj->pathLen)
{
path = &stringData[stringDataLen];
stringDataLen += procObj->pathLen;
}
if (procObj->ldpathLen)
{
ldpath = &stringData[stringDataLen];
stringDataLen += procObj->ldpathLen;
}
if (procObj->programLen)
{
program = &stringData[stringDataLen];
stringDataLen += procObj->programLen;
}
#endif
if (procObj->infileLen)
{
infile = &stringData[stringDataLen];
stringDataLen += procObj->infileLen;
}
if (procObj->outfileLen)
{
outfile = &stringData[stringDataLen];
stringDataLen += procObj->outfileLen;
}
if (procObj->argvLen)
{
userargv = &stringData[stringDataLen];
stringDataLen += procObj->argvLen;
}
#ifdef NAMESERVER_PROCESS
if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY))
trace_printf( "%s@%d - Unpacking process string data:\n"
" stringData(%d) =%s\n"
" name(%d) =%s\n"
" port(%d) =%s\n"
" path(%d) =%s\n"
" ldpath(%d) =%s\n"
" program(%d) =%s\n"
" infile(%d) =%s\n"
" outfile(%d) =%s\n"
" userArgc =%d\n"
" userArgv(%d) =%s\n"
, method_name, __LINE__
, stringDataLen
, stringDataLen?&procObj->stringData:""
, procObj->nameLen
, procObj->nameLen?name:""
, procObj->portLen
, procObj->portLen?port:""
, procObj->pathLen
, procObj->pathLen?path:""
, procObj->ldpathLen
, procObj->ldpathLen?ldpath:""
, procObj->programLen
, procObj->programLen?program:""
, procObj->infileLen
, procObj->infileLen?infile:""
, procObj->outfileLen
, procObj->outfileLen?outfile:""
, procObj->argc
, procObj->argvLen
, procObj->argvLen?userargv:"" );
#else
if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY))
trace_printf( "%s@%d - Unpacking process string data:\n"
" stringData(%d) =%s\n"
" name(%d) =%s\n"
" port(%d) =%s\n"
" infile(%d) =%s\n"
" outfile(%d) =%s\n"
" userArgc =%d\n"
" userArgv(%d) =%s\n"
, method_name, __LINE__
, stringDataLen
, stringDataLen?&procObj->stringData:""
, procObj->nameLen
, procObj->nameLen?name:""
, procObj->portLen
, procObj->portLen?port:""
, procObj->infileLen
, procObj->infileLen?infile:""
, procObj->outfileLen
, procObj->outfileLen?outfile:""
, procObj->argc
, procObj->argvLen
, procObj->argvLen?userargv:"" );
#endif
process = node->CloneProcess (procObj->nid,
procObj->type,
procObj->priority,
procObj->backup,
procObj->unhooked,
procObj->nameLen?name:(char *)"",
procObj->portLen?port:(char *)"",
procObj->os_pid,
procObj->verifier,
procObj->parent_nid,
procObj->parent_pid,
procObj->parent_verifier,
procObj->event_messages,
procObj->system_messages,
#ifdef NAMESERVER_PROCESS
path,
ldpath,
program,
#else
procObj->pathStrId,
procObj->ldpathStrId,
procObj->programStrId,
#endif
procObj->infileLen?infile:(char *)"",
procObj->outfileLen?outfile:(char *)"",
&procObj->creation_time,
procObj->origPNidNs);
if ( process && procObj->argvLen )
{
process->userArgs ( procObj->argc, procObj->argvLen, userargv );
}
if ( process && procObj->persistent )
{
process->SetPersistent(true);
}
buffer = &stringData[stringDataLen];
}
TRACE_EXIT;
return;
}
#ifndef NAMESERVER_PROCESS
void CMonitor::StartPrimitiveProcesses( void )
{
const char method_name[] = "CMonitor::StartPrimitiveProcesses";
TRACE_ENTRY;
if ( !MyNode->IsSpareNode() )
{
// Queue the Create primitive processes request for
// processing by a worker thread.
ReqQueue.enqueueCreatePrimitiveReq( MyPNID );
}
TRACE_EXIT;
}
#endif
void HandleAssignMonitorLeader ( const char *failedMaster )
{
const char method_name[] = "HandleAssignMonitorLeader";
TRACE_ENTRY;
if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
{
trace_printf("%s@%d HandleAssignMonitorLeader called for %s\n"
, method_name, __LINE__, failedMaster);
}
// only relevant in AgentMode
if (IsAgentMode)
{
Monitor->AssignMonitorLeader(failedMaster);
}
TRACE_EXIT;
}
void HandleMyNodeExpiration( void )
{
const char method_name[] = "HandleMyNodeExpiration";
TRACE_ENTRY;
ReqQueue.enqueueDownReq(MyPNID);
TRACE_EXIT;
}
void HandleNodeExpiration( const char *nodeName )
{
const char method_name[] = "HandleNodeExpiration";
TRACE_ENTRY;
CNode *node = Nodes->GetNode((char *)nodeName);
if (node)
{
ReqQueue.enqueueDownReq(node->GetPNid());
}
TRACE_EXIT;
}
void CreateZookeeperClient( void )
{
const char method_name[] = "CreateZookeeperClient";
TRACE_ENTRY;
if ( ZClientEnabled )
{
string hostName;
string zkQuorumHosts;
stringstream zkQuorumPort;
char *env;
char hostsStr[MAX_PROCESSOR_NAME * 3] = { 0 };
char *tkn = NULL;
int zport;
env = getenv("ZOOKEEPER_PORT");
if ( env && isdigit(*env) )
{
zport = atoi(env);
}
else
{
char buf[MON_STRING_BUF_SIZE];
snprintf(buf, sizeof(buf),
"[%s], Zookeeper quorum port is not defined!\n"
, method_name);
mon_log_write(MON_MONITOR_CREATEZCLIENT_1, SQ_LOG_CRIT, buf);
ZClientEnabled = false;
TRACE_EXIT;
return;
}
env = getenv("ZOOKEEPER_NODES");
if ( env )
{
zkQuorumHosts = env;
if ( zkQuorumHosts.length() == 0 )
{
char buf[MON_STRING_BUF_SIZE];
snprintf(buf, sizeof(buf),
"[%s], Zookeeper quorum hosts are not defined!\n"
, method_name);
mon_log_write(MON_MONITOR_CREATEZCLIENT_2, SQ_LOG_CRIT, buf);
ZClientEnabled = false;
TRACE_EXIT;
return;
}
strcpy( hostsStr, zkQuorumHosts.c_str() );
zkQuorumPort.str( "" );
tkn = strtok( hostsStr, "," );
do
{
if ( tkn != NULL )
{
hostName = tkn;
zkQuorumPort << hostName.c_str()
<< ":"
<< zport;
}
tkn = strtok( NULL, "," );
if ( tkn != NULL )
{
zkQuorumPort << ",";
}
}
while( tkn != NULL );
if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
{
trace_printf( "%s@%d zkQuorumPort=%s\n"
, method_name, __LINE__
, zkQuorumPort.str().c_str() );
}
}
ZClient = new CZClient( zkQuorumPort.str().c_str()
, ZCLIENT_TRAFODION_ZNODE
, ZCLIENT_INSTANCE_ZNODE );
if ( ZClient == NULL )
{
char buf[MON_STRING_BUF_SIZE];
snprintf(buf, sizeof(buf),
"[%s], Failed to allocate ZClient object!\n"
, method_name);
mon_log_write(MON_MONITOR_CREATEZCLIENT_3, SQ_LOG_CRIT, buf);
abort();
}
}
TRACE_EXIT;
}
void StartZookeeperClient( void )
{
const char method_name[] = "StartZookeeperClient";
TRACE_ENTRY;
int rc = -1;
if ( ZClientEnabled )
{
if ( ZClient )
{
rc = ZClient->StartWork();
if (rc == 0)
{
ZClient->StartMonitoring();
char buf[MON_STRING_BUF_SIZE];
snprintf(buf, sizeof(buf),
"[%s], ZClient node monitoring started\n"
, method_name);
mon_log_write(MON_MONITOR_STARTZCLIENT_1, SQ_LOG_INFO, buf);
}
}
}
TRACE_EXIT;
}
#ifdef USE_SEQUENCE_NUM
long long CMonitor::GetTimeSeqNum()
{
const char method_name[] = "CMonitor::GetTimeSeqNum";
TRACE_ENTRY;
TimeSeqNumLock_.lock();
struct timespec currTime;
clock_gettime(CLOCK_REALTIME, &currTime);
if ( (currTime.tv_sec > savedTime_.tv_sec) ||
( (currTime.tv_sec == savedTime_.tv_sec) &&
(currTime.tv_nsec > savedTime_.tv_nsec) ) )
{
savedTime_.tv_sec = currTime.tv_sec;
savedTime_.tv_nsec = currTime.tv_nsec;
}
else
{ // time drifted back. just increment nanoseconds
savedTime_.tv_nsec++;
// overflow check is not required as it would take
// at least 3 billion requests to overflow
}
long long result;
int* value = (int *)&result;
value[0] = savedTime_.tv_nsec; // little endian
value[1] = savedTime_.tv_sec;
TimeSeqNumLock_.unlock();
if (trace_settings & TRACE_REQUEST_DETAIL)
trace_printf("%s@%d Time seq num = %Lx\n", method_name, __LINE__, result);
TRACE_EXIT;
return result;
}
#endif
int main (int argc, char *argv[])
{
int i;
int rc;
bool done = false;
char *env;
char *nodename = NULL;
char fname[MAX_PROCESS_PATH];
char short_node_name[MPI_MAX_PROCESSOR_NAME];
#ifndef NAMESERVER_PROCESS
char port_fname[MAX_PROCESS_PATH];
char temp_fname[MAX_PROCESS_PATH];
#endif
char buf[MON_STRING_BUF_SIZE];
#ifndef NAMESERVER_PROCESS
unsigned int initSleepTime = 1; // 1 second
#endif
mallopt(M_ARENA_MAX, 4); // call to limit the number of arena's of monitor to 4.This call doesn't seem to have any effect !
#ifdef NAMESERVER_PROCESS
CALL_COMP_DOVERS(trafns, argc, argv);
#else
CALL_COMP_DOVERS(monitor, argc, argv);
#endif
const char method_name[] = "main";
#ifdef NAMESERVER_PROCESS
IsNameServer = true;
#endif
if (argc < 2) {
printf("error: monitor needs an argument...exitting...\n");
exit(0);
}
int lv_arg_index = 1;
while ( lv_arg_index < argc )
{
// In installations like Cloudera Manager, the monitor is started in AGENT mode
if ( strcmp( argv[lv_arg_index], "COLD_AGENT" ) == 0 )
{
IsAgentMode = true;
}
lv_arg_index++;
}
// Set flag to indicate whether we are operating in a real cluster
// or a virtual cluster. This is used throughout the monitor when
// behavior differs for a real vs. virtual cluster environment.
if ( !IsAgentMode )
{
if ( getenv( "SQ_VIRTUAL_NODES" ) )
{
IsRealCluster = false;
Emulate_Down = true;
}
if ( IsRealCluster )
{
// The monitor processes may be started by MPIrun utility
env = getenv("SQ_MON_CREATOR");
if ( env != NULL && strcmp(env, "MPIRUN") == 0 )
{
IsMPIChild = true;
}
// The monitor can be set to run in AGENT mode
env = getenv("SQ_MON_RUN_MODE");
if ( env != NULL && strcmp(env, "AGENT") == 0 )
{
IsAgentMode = true;
}
}
#ifdef NAMESERVER_PROCESS
else
{
env = getenv("SQ_MON_RUN_MODE");
if ( env != NULL && strcmp(env, "AGENT") == 0 )
{
IsAgentMode = true;
}
}
#endif
}
#ifndef NAMESERVER_PROCESS
env = getenv("SQ_NAMESERVER_ENABLED");
if ( env && isdigit(*env) )
{
int val = atoi(env);
NameServerEnabled = (val != 0) ? true : false;
}
#endif
if ( IsAgentMode || IsNameServer )
{
MON_Props xprops( true );
#ifdef NAMESERVER_PROCESS
xprops.load( "nameserver.env" );
#else
xprops.load( "monitor.env" );
#endif
MON_Smap_Enum xenum( &xprops );
while ( xenum.more( ) )
{
char *xkey = xenum.next( );
const char *xvalue = xprops.get( xkey );
if ( xkey && xkey[0] && xvalue )
{
setenv( xkey, xvalue, 1 );
}
}
}
#ifdef NAMESERVER_PROCESS
MonLog = new CMonLog( "log4cxx.monitor.trafns.config", "NS", "alt.mon", -1, -1, getpid(), "$TNS" );
#else
MonLog = new CMonLog( "log4cxx.monitor.mon.config", "MON", "alt.mon", -1, -1, getpid(), "$MONITOR" );
#endif
MonLog->setupInMemoryLog();
#ifdef MULTI_TRACE_FILES
initVariableKey();
#endif
#ifdef DMALLOC
util_dmalloc_start();
#endif
// Save our execution path
env = getenv("PWD");
if ( env )
{
STRCPY(MyPath,env);
}
for (i = strlen(argv[0])-1; i >= 0; i--)
{
if (argv[0][i] == '/')
{
argv[0][i] = '\0';
strncpy(MyPath,argv[0],sizeof(MyPath));
argv[0][i] = '/';
break;
}
}
env = getenv("SQ_MEASURE");
if ( env && *env == '1' )
{
Measure = 1;
snprintf(fname, sizeof(fname), "%s/monitor.P%d",
getenv("MPI_TMPDIR"),getpid());
setenv("MPI_INSTR", fname, 1);
}
if ( env && *env == '2' )
{
Measure = 2;
snprintf(fname, sizeof(fname), "%s/monitor.cpu.P%d:cpu",
getenv("MPI_TMPDIR"),getpid());
setenv("MPI_INSTR", fname, 1);
}
env = getenv("SQ_IC");
if ( env != NULL && strcmp(env, "IBV") == 0 )
{
CommType = CommType_InfiniBand;
}
else
{
CommType = CommType_Sockets;
}
// Mask all allowed signals
sigset_t mask;
sigfillset(&mask);
sigdelset(&mask, SIGPROF); // allows profiling such as google profiler
sigdelset(&mask, SIGUSR2);
rc = pthread_sigmask(SIG_SETMASK, &mask, NULL);
if (rc != 0)
{
char buf[MON_STRING_BUF_SIZE];
snprintf(buf, sizeof(buf), "[%s], pthread_sigmask error=%d\n",
method_name, rc);
mon_log_write(MON_MONITOR_MAIN_1, SQ_LOG_ERR, buf);
}
// Setup HP_MPI software license
int key = 413675219; //413675218 to display banner
MPI_Initialized(&key);
// Initialize MPI environment
MPI_Init (&argc, &argv);
#ifdef NAMESERVER_PROCESS
if ( argc > 1 )
{
if ( strcmp(argv[1], "SQMON1.1") == 0 )
{
MyPNID = atoi( argv[2] );
int arg = 1;
for ( ; argv[arg+11]; arg++ )
{
argv[arg] = argv[arg+11];
}
argv[arg] = NULL;
argc -= 11;
}
}
#else
monitorArgc = argc;
STRCPY(monitorArgv[0], "trafns");
for ( int arg = 1; arg < argc; arg++ )
STRCPY(monitorArgv[arg], argv[arg]);
#endif
env = getenv("MON_PROF_ENABLE");
if ( env )
{
mon_profiler_init(env); // LCOV_EXCL_LINE
}
env = getenv("MON_SNMP_ENABLE");
if ( env )
{
genSnmpTrapEnabled = true;
}
#ifndef NAMESERVER_PROCESS
env = getenv("MON_INIT_SLEEP");
if ( env && isdigit(*env) )
{
initSleepTime = atoi(env);
}
#endif
env = getenv("SQ_COLD_STANDBY_SPARE");
if ( env && isdigit(*env) )
{
if ( strcmp(env,"0")==0 )
{
SpareNodeColdStandby = false;
}
}
#ifndef NAMESERVER_PROCESS
// We need to delay some to make sure all monitor processes have initialized before
// any monitor tries to perform an Allgather operation.
sleep( initSleepTime );
#endif
MPI_Comm_set_errhandler(MPI_COMM_WORLD, MPI_ERRORS_RETURN);
MPI_Comm_set_errhandler(MPI_COMM_SELF, MPI_ERRORS_RETURN);
#ifndef NAMESERVER_PROCESS
MPI_Comm_rank (MPI_COMM_WORLD, &MyPNID);
#endif
MonLog->setPNid( MyPNID );
gethostname(Node_name, MPI_MAX_PROCESSOR_NAME);
char *tmpptr = Node_name;
while ( *tmpptr )
{
*tmpptr = (char)tolower( *tmpptr );
tmpptr++;
}
// Remove the domain portion of the name if any
char str1[MPI_MAX_PROCESSOR_NAME];
memset( str1, 0, MPI_MAX_PROCESSOR_NAME );
memset( short_node_name, 0, MPI_MAX_PROCESSOR_NAME );
strcpy (str1, Node_name );
char *str1_dot = strchr( (char *) str1, '.' );
if ( str1_dot )
{
memcpy( short_node_name, str1, str1_dot - str1 );
}
else
{
strcpy (short_node_name, str1 );
}
if (IsRealCluster)
{
strcpy (Node_name, short_node_name );
}
#ifdef MULTI_TRACE_FILES
setThreadVariable( (char *)"mainThread" );
#endif
#ifdef NAMESERVER_PROCESS
// Without mpi daemon the monitor has no default standard output.
// We create a standard output file here.
if ( IsRealCluster )
{
snprintf(fname, sizeof(fname), "%s/logs/trafns.%s.log",
getenv("TRAF_HOME"), Node_name);
}
else
{
snprintf(fname, sizeof(fname), "%s/logs/trafns.%d.%s.log",
getenv("TRAF_HOME"), MyPNID, Node_name);
}
#else
// Without mpi daemon the monitor has no default standard output.
// We create a standard output file here.
if ( IsRealCluster )
{
snprintf(fname, sizeof(fname), "%s/logs/sqmon.%s.log",
getenv("TRAF_HOME"), Node_name);
}
else
{
snprintf(fname, sizeof(fname), "%s/logs/sqmon.%d.%s.log",
getenv("TRAF_HOME"), MyPNID, Node_name);
}
#endif
remove(fname);
if( freopen (fname, "w", stdout) == NULL )
{
char buf[MON_STRING_BUF_SIZE];
snprintf(buf, sizeof(buf), "[%s], can't open stdout (%s).\n",
method_name, fname);
mon_log_write(MON_MONITOR_MAIN_2, SQ_LOG_ERR, buf);
}
setlinebuf(stdout);
#ifndef NAMESERVER_PROCESS
// Send stderr output to same file as stdout. (Note: the monitor does
// not write to stderr but perhaps there could be components included in
// the monitor build that do write to stderr.)
if ( dup2 ( 1, 2 ) == -1 )
{
printf ( "dup2 failed for stderr: %s (%d)\n", strerror(errno), errno);
}
#else
// Name Server is a child process of the monitor, the process create logic
// will establish IO redirection between the monitor process and the child.
#endif
switch( CommType )
{
case CommType_Sockets: // Valid communication protocol
break;
case CommType_InfiniBand: // Currenly disabled - requires HA MPI
//MPI_Open_port (MPI_INFO_NULL, MyMPICommPort);
default:
printf( "SQ_IC contains invalid communication protocol: %s\n"
, CommTypeString(CommType));
abort();
}
if ((!IsAgentMode) && (argc > 3 && strcmp (argv[2], "-integrate") == 0))
{
switch( CommType )
{
case CommType_InfiniBand:
if (argc == 4 && strstr(argv[3], "$port#"))
{
SMSIntegrating = IAmIntegrating = true;
strcpy( IntegratingMonitorPort, argv[3] );
}
else
{
printf ( "Invalid integrating monitor MPI port: %s\n", argv[3]);
abort();
}
break;
case CommType_Sockets:
if ( IsAgentMode || isdigit (*argv[3]) )
{
// In agent mode and when re-integrating (node up), all
// monitors processes start as a cluster of 1 and join to the
// creator monitor to establish the real cluster.
// Therefore, MyPNID will always be zero then it is
// necessary to use the node name to obtain the correct
// <pnid> from the configuration which occurs when creating the
// CMonitor object down below. By setting MyPNID to -1, when the
// CCluster::InitializeConfigCluster() invoked during the creation
// of the CMonitor object it will set MyPNID using Node_name.
#ifdef NAMESERVER_PROCESS
if ( IsRealCluster )
MyPNID = -1;
#else
MyPNID = -1;
#endif
SMSIntegrating = IAmIntegrating = true;
strcpy( IntegratingMonitorPort, argv[3] );
}
else
{
printf ( "Invalid integrating monitor socket port: %s\n", argv[3]);
abort();
}
break;
default:
// Programmer bonehead!
abort();
}
if ( isdigit (*argv[4]) )
{
CreatorShellPid = atoi( argv[4] );
}
else
{
printf ( "Invalid creator shell pid: %s\n", argv[4]);
abort();
}
if ( isdigit (*argv[5]) )
{
CreatorShellVerifier = atoi( argv[5] );
}
else
{
printf ( "Invalid creator shell verifier: %s\n", argv[5]);
abort();
}
// Trace cannot be specified on startup command but need to
// check for trace environment variable settings.
MonTrace->mon_trace_init("0", NULL);
}
if (IsAgentMode)
{
if ( IsRealCluster )
{
MyPNID = -1;
}
CreatorShellPid = 1000; // per monitor.sh
CreatorShellVerifier = 0;
}
if (argc == 3 && isdigit(*argv[2]) )
{
MonTrace->mon_trace_init(argv[2], "STDOUT");
}
else if (argc >= 3 && strcmp (argv[2], "TRACEFILE") == 0)
{
if (argc == 4 && isdigit (*argv[3]))
{
MonTrace->mon_trace_init(argv[3], NULL);
}
}
else
{ // Trace not specified on startup command but need to check
// for trace environment variable settings.
MonTrace->mon_trace_init("0", NULL);
}
if ((env = getenv("MON_SYNC_DELAY")) != NULL)
{ // Sync cycle delay value specified
int delay;
errno = 0;
delay = strtol(env, NULL, 10);
if ( errno == 0) next_test_delay = delay;
}
if (trace_settings & TRACE_INIT)
trace_printf("%s@%d next_test_delay=%ld\n", method_name, __LINE__,
next_test_delay);
#ifdef USE_SONAR
if ( (env = getenv("SQ_SONAR") ) != NULL )
{
sonar_state_init();
if ( IsRealCluster && sonar_verify_state(SONAR_ENABLED | SONAR_MONITOR_ENABLED))
{ // Not a virtual cluster and sonar is enabled.
if (trace_settings & TRACE_INIT)
{
trace_printf("%s@%d Enabling Sonar\n", method_name, __LINE__);
}
MonStats = new CMonSonarStats();
}
}
#endif
if ( MonStats == NULL )
{
if (trace_settings & TRACE_INIT)
{
trace_printf("%s@%d Using non-Sonar counters\n",
method_name, __LINE__);
}
MonStats = new CMonStats();
}
// Normal process termination uses SIGKILL by default. Environment
// variable PROC_TERM_SIG can be used to override this. Valid values
// are SIGKILL or SIGTERM (as either string or integer values).
int procTermSig = SIGKILL;
env = getenv("PROC_TERM_SIG");
if ( env )
{
if (strcasecmp(env, "SIGKILL") == 0)
{
procTermSig = SIGKILL;
}
else if (strcasecmp(env, "SIGTERM") == 0)
{
procTermSig = SIGTERM;
}
else
{
procTermSig = atoi( env );
if ( !(procTermSig == SIGKILL || procTermSig == SIGTERM) )
{
procTermSig = SIGKILL;
}
}
}
if (trace_settings & TRACE_INIT)
{
trace_printf("%s@%d Using signal %d for normal processes "
"termination.\n", method_name, __LINE__, procTermSig);
}
// Record statistics (sonar counters): monitor is busy
if (sonar_verify_state(SONAR_ENABLED | SONAR_MONITOR_ENABLED))
MonStats->MonitorBusyIncr();
#ifdef NAMESERVER_PROCESS
snprintf(buf, sizeof(buf),
"[CMonitor::main], %s, Started! CommType: %s\n"
, CALL_COMP_GETVERS2(trafns), CommTypeString( CommType ));
#else
snprintf(buf, sizeof(buf),
"[CMonitor::main], %s, Started! CommType: %s (%s%s%s%s)\n"
, CALL_COMP_GETVERS2(monitor)
, CommTypeString( CommType )
, IsRealCluster?"RealCluster":"VirtualCluster"
, IsAgentMode?"/AgentMode":""
, IsMPIChild?"/MPIChild":""
, NameServerEnabled?"/NameServerEnabled":"" );
#endif
mon_log_write(MON_MONITOR_MAIN_3, SQ_LOG_INFO, buf);
#ifdef DMALLOC
if (trace_settings & TRACE_INIT)
trace_printf("%s@%d" "DMALLOC Option set" "\n", method_name, __LINE__);
#endif
#ifdef DEBUGGING
if (trace_settings & TRACE_INIT)
trace_printf("%s@%d" "DEBUGGING Option set" "\n", method_name, __LINE__);
#endif
if ( Emulate_Down )
if (trace_settings & TRACE_INIT)
trace_printf("%s@%d" "EMULATE_DOWN Option set" "\n", method_name, __LINE__);
{
#ifndef NAMESERVER_PROCESS
// Create thread for monitoring redirected i/o.
// This is also used for monitor logs, so start it early.
Redirector.start();
#endif
// Create global configuration now
ClusterConfig = new CClusterConfig();
if (ClusterConfig)
{
bool traceEnabled = (trace_settings & TRACE_TRAFCONFIG) ? true : false;
if (ClusterConfig->Initialize( traceEnabled, MonTrace->getTraceFileName()))
{
if (!ClusterConfig->LoadConfig())
{
char la_buf[MON_STRING_BUF_SIZE];
sprintf(la_buf, "[%s], Failed to load cluster configuration.\n", method_name);
mon_log_write(MON_MONITOR_MAIN_12, SQ_LOG_CRIT, la_buf);
abort();
}
}
else
{
char la_buf[MON_STRING_BUF_SIZE];
sprintf(la_buf, "[%s], Failed to open cluster configuration.\n", method_name);
mon_log_write(MON_MONITOR_MAIN_13, SQ_LOG_CRIT, la_buf);
abort();
}
}
else
{
char la_buf[MON_STRING_BUF_SIZE];
sprintf(la_buf, "[%s], Failed to allocate cluster configuration.\n", method_name);
mon_log_write(MON_MONITOR_MAIN_14, SQ_LOG_CRIT, la_buf);
abort();
}
//Moved creation of the below to later on
//Nodes = new CNodeContainer ();
//Config = new CConfigContainer ();
//Monitor = new CMonitor (procTermSig);
// Set up zookeeper and determine the master
if ( IsAgentMode || IsRealCluster )
{
// Zookeeper client is enabled only in a real cluster
env = getenv("SQ_MON_ZCLIENT_ENABLED");
if ( env )
{
if ( env && isdigit(*env) )
{
if ( strcmp(env,"0")==0 )
{
ZClientEnabled = false;
}
}
}
if ( ZClientEnabled )
{
CreateZookeeperClient( );
}
}
else
{
ZClientEnabled = false;
}
if (IsAgentMode)
{
if ((ZClientEnabled) && (ZClient != NULL))
{
// Do not wait, just see if one exists
const char *masterMonitor = ZClient->WaitForAndReturnMaster(false);
if (masterMonitor)
{
strcpy (MasterMonitorName, masterMonitor);
if (trace_settings & TRACE_INIT)
{
trace_printf("%s@%d (MasterMonitor) IsAgentMode = TRUE, masterMonitor from ZK: %s, Node_name: %s\n"
, method_name
, __LINE__
, MasterMonitorName
, Node_name);
}
// unfortunately, we have to do this to see if we are the master before
// other things are set up. This is how we must do that
if (strcmp(Node_name, masterMonitor) == 0)
{
IsMaster = true;
}
else
{
IsMaster = false;
}
}
else
{
strcpy (MasterMonitorName, ClusterConfig->GetConfigMasterByName());
if (trace_settings & TRACE_INIT)
{
trace_printf("%s@%d (MasterMonitor) IsAgentMode = TRUE, ConfigMasterMonitor: %s, Node_name:%s \n"
, method_name
, __LINE__
, MasterMonitorName
, Node_name);
}
if (strcmp (Node_name, ClusterConfig->GetConfigMasterByName()) == 0)
{
IsMaster = true;
}
else
{
IsMaster = false;
}
}
}
#ifdef NAMESERVER_PROCESS
else
{
strcpy (MasterMonitorName, ClusterConfig->GetConfigMasterByName());
if (trace_settings & TRACE_INIT)
{
trace_printf("%s@%d (MasterMonitor) IsAgentMode = TRUE, ConfigMasterMonitor: %s, Node_name:%s \n"
, method_name
, __LINE__
, MasterMonitorName
, Node_name);
}
if ( IsRealCluster )
{
if (strcmp (Node_name, ClusterConfig->GetConfigMasterByName()) == 0)
{
IsMaster = true;
}
else
{
IsMaster = false;
}
}
else
{
IsMaster = ( MyPNID == 0 );
}
}
#endif
}
if (IsAgentMode)
{
if (!IsMaster)
{
#ifdef NAMESERVER_PROCESS
if ( IsRealCluster )
{
MyPNID = -1;
}
#else
MyPNID = -1;
#endif
SMSIntegrating = IAmIntegrating = true;
#ifdef NAMESERVER_PROCESS
char *monitorPort = getenv ("NS_COMM_PORT");
#else
char *monitorPort = getenv ("MONITOR_COMM_PORT");
#endif
if (monitorPort)
{
#ifdef NAMESERVER_PROCESS
if ( IsRealCluster )
{
strcpy( IntegratingMonitorPort, MasterMonitorName);
}
else
{
char localHost[MAX_PROCESSOR_NAME];
gethostname( localHost, MAX_PROCESSOR_NAME );
strcpy( IntegratingMonitorPort, localHost);
}
#else
strcpy( IntegratingMonitorPort, MasterMonitorName);
#endif
strcat( IntegratingMonitorPort, ":");
strcat( IntegratingMonitorPort, monitorPort);
}
if (trace_settings & TRACE_INIT)
{
trace_printf( "%s@%d (MasterMonitor) IsAgentMode = TRUE, I am NOT the master, "
"MyPNID=%d, master port=%s\n"
, method_name, __LINE__
, MyPNID, IntegratingMonitorPort );
}
}
else
{
if (trace_settings & TRACE_INIT)
{
trace_printf( "%s@%d (MasterMonitor) IsAgentMode = TRUE, I am the master, MyPNID=%d\n"
, method_name, __LINE__, MyPNID );
}
IAmIntegrating = false;
}
}
NameServerConfig = new CNameServerConfigContainer ();
Nodes = new CNodeContainer ();
Config = new CConfigContainer ();
#ifdef NAMESERVER_PROCESS
Monitor = new CMonitor ();
#else
if (NameServerEnabled)
{
PtpClient = new CPtpClient ();
Monitor = new CMonitor (procTermSig);
NameServer = new CNameServer ();
}
else
{
Monitor = new CMonitor (procTermSig);
}
#endif
if ( IsAgentMode )
{
if (trace_settings & TRACE_INIT)
{
trace_printf( "%s@%d MyPNID=%d\n"
, method_name, __LINE__, MyPNID );
}
MonLog->setPNid( MyPNID );
}
if (IsAgentMode)
{
int monitorLead = -1;
CNode *myNode = Nodes->GetNode(MyPNID);
const char *masterMonitor = NULL;
if (myNode == NULL)
{
char la_buf[MON_STRING_BUF_SIZE];
sprintf( la_buf
, "[%s], Failed to get my Node, MyPNID=%d\n"
, method_name, MyPNID );
mon_log_write(MON_MONITOR_MAIN_15, SQ_LOG_CRIT, la_buf);
abort();
}
if ((ZClientEnabled) && (ZClient != NULL))
{
CNode *masterNode = Nodes->GetNode(MasterMonitorName);
if (!masterNode)
{
if (trace_settings & TRACE_INIT)
{
trace_printf("%s@%d (MasterMonitor) IsMaster=%d, masterNode is NULL, with MasterMonitorName %s\n", method_name, __LINE__, IsMaster, MasterMonitorName);
}
char la_buf[MON_STRING_BUF_SIZE];
sprintf(la_buf, "[%s], Failed to get my Master Node.\n", method_name);
mon_log_write(MON_MONITOR_MAIN_16, SQ_LOG_CRIT, la_buf);
abort();
}
else
{
if (trace_settings & TRACE_INIT)
{
trace_printf("%s@%d (MasterMonitor) IsMaster=%d, masterNode=%s\n", method_name, __LINE__, IsMaster, masterNode->GetName() );
}
}
monitorLead = masterNode->GetPNid();
if (MyPNID == monitorLead)
{
ZClient->WatchNodeMasterDelete (myNode->GetName() ); // just in case of stale info
ZClient->CreateMasterZNode ( myNode->GetName() );
strcpy (MasterMonitorName, myNode->GetName());
ZClient->WatchMasterNode( MasterMonitorName );
if (trace_settings & TRACE_INIT)
{
trace_printf("%s@%d (MasterMonitor) IsMaster=%d, set monitor lead to %d\n", method_name, __LINE__, IsMaster, MyPNID);
}
}
else
{
masterMonitor = ZClient->WaitForAndReturnMaster(true);
CNode *masterNode = NULL;
if (masterMonitor)
{
strcpy (MasterMonitorName, masterMonitor);
masterNode = Nodes->GetNode(MasterMonitorName);
}
if (masterNode)
{
if (trace_settings & TRACE_INIT)
{
trace_printf("%s@%d (MasterMonitor) IsMaster=%d, set monitor lead to %d\n", method_name, __LINE__, IsMaster, masterNode->GetPNid());
}
monitorLead = masterNode->GetPNid();
ZClient->WatchMasterNode( MasterMonitorName );
}
else
{
if (trace_settings & TRACE_INIT)
{
trace_printf("%s@%d (MasterMonitor) IsMaster=%d, masterNode is NULL, with MasterMonitorName %s\n", method_name, __LINE__, IsMaster, MasterMonitorName);
}
char la_buf[MON_STRING_BUF_SIZE];
sprintf(la_buf, "[%s], Failed to get my Master Node.\n", method_name);
mon_log_write(MON_MONITOR_MAIN_17, SQ_LOG_CRIT, la_buf);
abort();
}
}
}
#ifdef NAMESERVER_PROCESS
else
{
if ( !IsRealCluster )
{
monitorLead = 0;
}
}
#endif
char buf[MON_STRING_BUF_SIZE];
snprintf( buf, sizeof(buf)
, "[%s], Master Monitor is on node %d\n"
, method_name, monitorLead);
mon_log_write(MON_MONITOR_MAIN_18, SQ_LOG_INFO, buf);
}
if (!IAmIntegrating)
{
Config->Init ();
}
#ifndef NAMESERVER_PROCESS
Devices = new CDeviceContainer ();
if ( !Devices->IsInitialized() )
{
if ( IAmIntegrating )
{ // Problem unmounting devices, let creator monitor know then abort
Monitor->ReIntegrate( CCluster::Reintegrate_Err13 );
}
else
{
MPI_Abort(MPI_COMM_SELF,99); // too early to call failsafe node down.
}
}
#endif
nodename = new char [Monitor->GetConfigPNodesCount() * MPI_MAX_PROCESSOR_NAME];
// Create health check thread
HealthCheck.start();
// Create thread to accept connections from other monitors
CommAccept.start();
#ifdef NAMESERVER_PROCESS
// Create thread to accept connections from other name servers
CommAcceptMon.start();
if (IsMaster)
{
CommAcceptMon.startAccepting();
}
#else
if (NameServerEnabled)
{
// Create thread to accept point-2-point connections from other monitors
PtpCommAccept.start();
}
#endif
#ifndef NAMESERVER_PROCESS
// Open file used to record process start/end times
Monitor->openProcessMap ();
#endif
#ifndef NAMESERVER_PROCESS
// Always using localio now, no other option
SQ_theLocalIOToClient = new SQ_LocalIOToClient( MyPNID );
assert (SQ_theLocalIOToClient);
#define BLOCK_SIZE 512
char *ioBuffer = NULL;
int fd;
rc = posix_memalign( (void**)&ioBuffer, BLOCK_SIZE, BLOCK_SIZE);
if ( rc == -1 )
{
int err = rc;
char buf[MON_STRING_BUF_SIZE];
snprintf(buf, sizeof(buf), "[%s], can't allocate aligned %d bytes "
"buffer , Error=%d(%s)\n",
method_name, BLOCK_SIZE, err, ErrorMsg(err));
mon_log_write(MON_MONITOR_MAIN_4, SQ_LOG_CRIT, buf);
MPI_Abort(MPI_COMM_SELF,99);
}
memset( (void *)ioBuffer, 0 , BLOCK_SIZE );
#endif
// start ok
#ifndef NAMESERVER_PROCESS
if (IsRealCluster)
{
snprintf(port_fname, sizeof(port_fname), "%s/monitor.port.%s",
getenv("MPI_TMPDIR"), short_node_name );
}
else
{
// Write out our port number so other processes can attach.
snprintf(port_fname, sizeof(port_fname), "%s/monitor.port.%d.%s",
getenv("MPI_TMPDIR"),MyPNID,Node_name);
}
#endif
// Change Node_name what we have in our configuration
CNode *myNode = Nodes->GetNode(MyPNID);
if (myNode)
{
strcpy (Node_name, myNode->GetName());
}
#ifndef NAMESERVER_PROCESS
// create with no caching, user read/write, group read/write, other read
fd = open( port_fname
, O_RDWR | O_TRUNC | O_CREAT | O_DIRECT
, S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH );
if ( fd != -1 )
{
snprintf( ioBuffer, BLOCK_SIZE, "%s", MyCommPort );
rc = write( fd, ioBuffer, BLOCK_SIZE );
if ( rc == -1 )
{
int err = errno;
char buf[MON_STRING_BUF_SIZE];
snprintf(buf, sizeof(buf), "[%s], can't write %d bytes to "
"port file (%s), Error=%d(%s)\n",
method_name, BLOCK_SIZE, port_fname, err,
ErrorMsg(err));
mon_log_write(MON_MONITOR_MAIN_5, SQ_LOG_CRIT, buf);
if ( IAmIntegrating )
// This monitor is reintegrating into cluster. Inform
// creator monitor of error, then abort.
Monitor->ReIntegrate( CCluster::Reintegrate_Err10 );
else
MPI_Abort(MPI_COMM_SELF,99);
}
close( fd );
if (trace_settings & TRACE_INIT)
trace_printf("%s@%d" " Port file created, pnid=%d, port=%s" "\n", method_name, __LINE__, MyPNID, MyCommPort );
}
else
{
char buf[MON_STRING_BUF_SIZE];
snprintf(buf, sizeof(buf), "[%s], can't open port file (%s), "
"Error= %s\n", method_name, port_fname,
ErrorMsg(errno));
mon_log_write(MON_MONITOR_MAIN_6, SQ_LOG_CRIT, buf);
if ( IAmIntegrating )
// This monitor is reintegrating into cluster. Inform
// creator monitor of error, then abort.
Monitor->ReIntegrate( CCluster::Reintegrate_Err11 );
else
MPI_Abort(MPI_COMM_SELF,99);
}
free( ioBuffer );
int ret = SQ_theLocalIOToClient->initWorker();
if (ret)
{
if (trace_settings & TRACE_INIT)
trace_printf("%s@%d" " Cannot start localio worker, aborting " "%d" "\n", method_name, __LINE__, ret);
delete SQ_theLocalIOToClient;
SQ_theLocalIOToClient = NULL;
if ( IAmIntegrating )
// This monitor is reintegrating into cluster. Inform
// creator monitor of error, then abort.
Monitor->ReIntegrate( CCluster::Reintegrate_Err12 );
else
assert (false);
}
if (trace_settings & TRACE_INIT)
trace_printf("%s@%d" "started LocalIOToClient environment\n" "\n", method_name, __LINE__);
#endif
if (trace_settings & TRACE_INIT)
{
int rc = getrlimit( RLIMIT_SIGPENDING, &Rl );
if ( rc == 0 )
{
printf("%s@%d" " RLIMIT_SIGPENDING cur=%d, max=%d\n", method_name, __LINE__, (int)Rl.rlim_cur, (int)Rl.rlim_max);
}
}
if ( IAmIntegrating )
{
// This monitor is integrating to (joining) an existing cluster
Monitor->ReIntegrate( 0 );
MyNode->SetPhase( Phase_Activating );
if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
trace_printf("%s@%d" " After UpdateCluster" "\n", method_name, __LINE__);
}
else
{
Monitor->EnterSyncCycle();
done = Monitor->exchangeNodeData();
Monitor->ExitSyncCycle();
if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
trace_printf("%s@%d" " After ImAlive " "\n", method_name, __LINE__);
}
}
// Enable Zookeeper client in real cluster only and
// after the integration phase on a node 'up'
if ( IsRealCluster )
{
if ( ZClientEnabled )
{
StartZookeeperClient();
// Set watch for master
if (IsAgentMode)
{
ZClient->WatchMasterNode( MasterMonitorName );
}
if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
{
trace_printf( "%s@%d (MasterMonitor) set watch for MasterMonitorName %s\n", method_name, __LINE__, MasterMonitorName );
}
}
}
#ifndef NAMESERVER_PROCESS
// Initialize Seabed disconnect semaphore
char *port;
switch( CommType )
{
case CommType_InfiniBand:
port = strstr(MyCommPort, "$port#");
if (port) port += 5;
break;
case CommType_Sockets:
port = strchr(MyCommPort, ':');
break;
default:
// Programmer bonehead!
abort();
}
if (port != NULL)
{
int myPortNum;
int semMax = 50;
unsigned int segKey;
myPortNum = strtol(&port[1], NULL, 10);
env = getenv("MS_DISC_SEM");
if ( env )
{
errno = 0;
semMax = strtol(env, NULL, 10);
if ( errno != 0) semMax = 50;
}
segKey = RobSem::getSegKey(0x40000000, myPortNum, MyPNID);
rc = RobSem::create_sem(segKey, IPC_CREAT + IPC_EXCL, sbDiscSem, semMax);
if ( rc != 0 )
{
char buf[MON_STRING_BUF_SIZE];
snprintf(buf, sizeof(buf),
"[%s], Failed to create seabed disconnect semaphore: %s "
"(%d)\n", method_name, strerror(errno), errno);
mon_log_write(MON_MONITOR_MAIN_10, SQ_LOG_CRIT, buf);
sbDiscSem = NULL;
}
}
#endif
// Create request worker threads
CReqWorker::startReqWorkers();
#ifndef NAMESERVER_PROCESS
if ( ! IAmIntegrating )
{
Monitor->StartPrimitiveProcesses();
}
env = getenv( "SQ_USE_CPU_AFFINITY" );
if ( env && strcmp( env, "1" ) == 0 )
{ // Set flag to indicate that logical node CPU affinity is used for
// processes.
// (see CNode::SetAffinity)
usingCpuAffinity = true;
}
env = getenv( "SQ_USE_TSE_CPU_AFFINITY" );
if ( env && strcmp( env, "1" ) == 0 )
{ // Set flag to indicate that CPU affinity is used for TSE processes.
// (see CNode::SetAffinity)
usingTseCpuAffinity = true;
}
#endif
nice(MON_BASE_NICE);
Monitor->setMonInitComplete(true);
struct timeval awakenedAt;
struct timeval awakeTime;
struct timeval now;
gettimeofday(&awakenedAt, NULL);
while (!done)
{
// Record statistics (sonar counters): monitor is NOT busy
if (sonar_verify_state(SONAR_ENABLED | SONAR_MONITOR_ENABLED))
MonStats->MonitorBusyDecr();
// Sleep for a maximum of next_test_delay. Reduce
// next_test_delay by the amount of time since awoke for
// current sync cycle. If current sync cycle took a long
// time don't sleep at all.
gettimeofday(&now, NULL);
if ( (now.tv_usec - awakenedAt.tv_usec ) < 0 )
{
awakeTime.tv_sec = now.tv_sec - awakenedAt.tv_sec - 1;
awakeTime.tv_usec = 1000000 + now.tv_usec - awakenedAt.tv_usec;
}
else
{
awakeTime.tv_sec = now.tv_sec - awakenedAt.tv_sec;
awakeTime.tv_usec = now.tv_usec - awakenedAt.tv_usec;
}
if ( awakeTime.tv_sec == 0
&& awakeTime.tv_usec < next_test_delay)
{
usleep( next_test_delay - awakeTime.tv_usec );
}
gettimeofday(&awakenedAt, NULL);
// Record statistics (sonar counters): monitor is busy
if (sonar_verify_state(SONAR_ENABLED | SONAR_MONITOR_ENABLED))
MonStats->MonitorBusyIncr();
#ifndef NAMESERVER_PROCESS
Monitor->EnterSyncCycle();
if ( Monitor->TmSyncPending() )
{
Monitor->TmSync ();
}
Monitor->ExitSyncCycle();
#endif
#ifndef NAMESERVER_PROCESS
if ( !Monitor->GetPendingSlaveTmSync() &&
Monitor->GetTotalSlaveTmSyncCount() == 0 )
{
#endif
Monitor->EnterSyncCycle();
done = Monitor->exchangeNodeData();
Monitor->ExitSyncCycle();
#ifndef NAMESERVER_PROCESS
}
#endif
if (done)
break;
#ifndef NAMESERVER_PROCESS
// Check to see if 'ckillall' is executing and disable the watchdog
if ( !SQ_theLocalIOToClient->isWDTEnabled() )
{
// HealthCheck.setState(MON_EXIT_WATCHDOG);
}
#endif
}
if (trace_settings & TRACE_STATS)
{ // Write malloc statistics info to stderr
monMallocStats();
}
if ( ZClientEnabled )
{
ZClient->StopMonitoring();
ZClient->ShutdownWork();
}
#ifndef NAMESERVER_PROCESS
Redirector.shutdownWork();
#endif
// shut down health check thread before shutting down reqWorker thread.
HealthCheck.shutdownWork();
ReqQueue.stats();
// Stop request worker threads
CReqWorker::shutdownWork();
Replicator.stats();
Monitor->stats();
#ifndef NAMESERVER_PROCESS
// Tell the LIO worker threads to exit
SQ_theLocalIOToClient->shutdownWork();
if (NameServerEnabled)
{
PtpCommAccept.shutdownWork();
}
#endif
CommAccept.shutdownWork();
#ifdef NAMESERVER_PROCESS
CommAcceptMon.shutdownWork();
#endif
#ifndef NAMESERVER_PROCESS
// Rename the monitor "port" file
sprintf(temp_fname, "%s.bak", port_fname);
remove(temp_fname);
rename(port_fname, temp_fname);
#endif
delete [] nodename;
#ifndef NAMESERVER_PROCESS
delete Devices;
#endif
delete Nodes;
delete ZClient;
delete Monitor;
Monitor = NULL; // TRACE uses this
delete Config;
#ifndef NAMESERVER_PROCESS
if ( sbDiscSem != NULL )
{
RobSem::destroy_sem( sbDiscSem );
}
#endif
if ( CommType == CommType_InfiniBand )
{
MPI_Close_port( MyCommPort );
}
#if 0
// TODO: MPICH cannot handle a node down and subsequent shutdown
// MPI_Finalize() hangs so its currently disabled, but
// causes an abnormal termination in the monitor process at exit.
if (trace_settings & TRACE_INIT)
trace_printf("%s@%d" "- Calling MPI_Finalize()" "\n", method_name, __LINE__);
MPI_Finalize ();
#endif
#ifndef NAMESERVER_PROCESS
if (trace_settings & TRACE_STATS)
{
trace_printf("%s@%d" "- LIO Stats: shared_buffers_total=" "%d" "\n", method_name, __LINE__, SQ_theLocalIOToClient->getSharedBufferCount());
trace_printf("%s@%d" "- LIO Stats: shared_buffers_acquired_max=" "%d" "\n", method_name, __LINE__, SQ_theLocalIOToClient->getAcquiredBufferCount());
trace_printf("%s@%d" "- LIO Stats: shared_buffers_available_min=" "%d" "\n", method_name, __LINE__, SQ_theLocalIOToClient->getAvailableBufferCount());
trace_printf("%s@%d" "- LIO Stats: shared_buffer_misses=" "%d" "\n", method_name, __LINE__, SQ_theLocalIOToClient->getMissedBufferCount());
trace_printf("%s@%d" "- LIO Stats: max queued child death=%d\n",
method_name, __LINE__,
SQ_theLocalIOToClient->getMaxChildDeathCount());
trace_printf("%s@%d" "- LIO Stats: almost-dead pids=%d\n",
method_name, __LINE__,
SQ_theLocalIOToClient->getAlmostDeadPids());
trace_printf("%s@%d" "- LIO Stats: verifierMap=" "%d" "\n", method_name, __LINE__, SQ_theLocalIOToClient->getVerifierMapCount());
}
delete SQ_theLocalIOToClient;
#endif
snprintf(buf, sizeof(buf), "[CMonitor::main], Shutdown normally.\n");
mon_log_write(MON_MONITOR_MAIN_11, SQ_LOG_INFO, buf);
if (trace_settings & TRACE_STATS)
{
if (sonar_verify_state(SONAR_ENABLED | SONAR_MONITOR_ENABLED))
MonStats->displayStats();
}
// Record statistics (sonar counters): monitor is NOT busy
if (sonar_verify_state(SONAR_ENABLED | SONAR_MONITOR_ENABLED))
MonStats->MonitorBusyDecr();
delete MonLog;
return 0;
}
#ifdef DELAY_TP
void CMonitor::Delay_TP(char *tpName)
{
char nodename[20];
char keyname[MAX_KEY_NAME];
char valueC[MAX_VALUE_SIZE];
int value;
if (Config)
{
snprintf(nodename, sizeof(nodename), "NODE%d",MyPNID);
CConfigGroup *group = Config->GetGroup(nodename);
if (group)
{
strcpy(keyname,"REQUESTDELAY_TP");
CConfigKey *key = group->GetKey(keyname);
if (key)
{
strcpy(valueC,key->Value);
value = atoi(valueC);
sleep(value);
}
}
}
}
#endif