Merge [TRAFODION-2884] PR-1664 Multiple fixes with Name Server enabled logic
diff --git a/core/sqf/export/include/common/evl_sqlog_eventnum.h b/core/sqf/export/include/common/evl_sqlog_eventnum.h
index 8930ccc..10268d8 100644
--- a/core/sqf/export/include/common/evl_sqlog_eventnum.h
+++ b/core/sqf/export/include/common/evl_sqlog_eventnum.h
@@ -84,6 +84,7 @@
#define MON_CLUSTER_MARKDOWN_1 101011101
#define MON_CLUSTER_MARKDOWN_2 101011102
#define MON_CLUSTER_MARKDOWN_3 101011103
+#define MON_CLUSTER_MARKDOWN_4 101011104
//#define MON_CLUSTER_MARKUP 101011201
#define MON_CLUSTER_NODE_TM_READY_1 101011301
#define MON_CLUSTER_NODE_TM_READY_2 101011302
@@ -253,8 +254,16 @@
#define MON_CLUSTER_ASSIGNMONITORLEADER_2 101015302
#define MON_CLUSTER_ASSIGNMONITORLEADER_3 101015303
#define MON_CLUSTER_ASSIGNMONITORLEADER_4 101015304
+
#define MON_CLUSTER_CHECKIFDONE_1 101015401
+#define MON_CLUSTER_HARDNODEUPNS_1 101015501
+#define MON_CLUSTER_HARDNODEUPNS_2 101015502
+
+#define MON_CLUSTER_RECEIVESOCK_1 101015601
+
+#define MON_CLUSTER_SENDSOCK_1 101015701
+
/* Module: monitor.cxx = 02 */
#define MON_MONITOR_MAIN_1 101020101
@@ -1016,24 +1025,30 @@
#define ZCONFIG_DELETECONFIGZNODE_3 101381003
/* Module nameserver.cxx = 39 */
-#define MON_NAMESERVER_MKCLTSOCK_1 101390101
-#define MON_NAMESERVER_MKCLTSOCK_2 101390102
-#define MON_NAMESERVER_MKCLTSOCK_3 101390103
-#define MON_NAMESERVER_MKCLTSOCK_4 101390104
-#define MON_NAMESERVER_MKCLTSOCK_5 101390105
-#define MON_NAMESERVER_MKCLTSOCK_6 101390106
+#define NAMESERVER_CLIENTSOCKCREATE_1 101390101
+#define NAMESERVER_CLIENTSOCKCREATE_2 101390102
+#define NAMESERVER_CLIENTSOCKCREATE_3 101390103
+#define NAMESERVER_CLIENTSOCKCREATE_4 101390104
+#define NAMESERVER_CLIENTSOCKCREATE_5 101390105
+#define NAMESERVER_CLIENTSOCKCREATE_6 101390106
+#define NAMESERVER_SENDTONS_1 101390201
+#define NAMESERVER_SENDTONS_2 101390202
+#define NAMESERVER_SOCKRECEIVE_1 101390301
+#define NAMESERVER_SOCKSEND_1 101390401
+#define NAMESERVER_GETM2NPORT_1 101390501
+#define NAMESERVER_CHOOSENEXTNS_1 101390601
/* Module nscommaccept.cxx = 40 */
-#define NS_COMMACCEPT_1 101400101
-#define NS_COMMACCEPT_3 101400102
-#define NS_COMMACCEPT_2 101400103
-#define NS_COMMACCEPT_4 101400104
-#define NS_COMMACCEPT_5 101400105
-#define NS_COMMACCEPT_6 101400106
-#define NS_COMMACCEPT_7 101400107
-#define NS_COMMACCEPT_8 101400108
-#define NS_COMMACCEPT_9 101400109
-#define NS_COMMACCEPT_10 101400110
+#define NS_COMMACCEPT_PROCESSMONREQS_1 101400101
+#define NS_COMMACCEPT_PROCESSMONREQS_2 101400102
+#define NS_COMMACCEPT_PROCESSMONREQS_3 101400103
+#define NS_COMMACCEPT_PROCESSMONREQS_4 101400104
+#define NS_COMMACCEPT_PROCESSMONREQS_5 101400105
+#define NS_COMMACCEPT_PROCESSNEWSOCK_1 101400201
+#define NS_COMMACCEPT_COMMACCEPTORSOCK_1 101400301
+#define NS_COMMACCEPT_MON2NSACCEPTMON_1 101400401
+#define NS_COMMACCEPT_MON2NSPROCESS_1 101400501
+#define NS_COMMACCEPT_START_1 101400601
/* Module: reqnodedown.cxx = 41 */
#define MON_EXT_NAMESERVERDOWN_REQ 101410101
@@ -1067,6 +1082,8 @@
#define PTPCLIENT_STDINREQ_2 101930202
#define PTPCLIENT_STDIODATA_1 101930301
#define PTPCLIENT_STDIODATA_2 101930302
+#define PTPCLIENT_SENDTOMON_1 101930401
+#define PTPCLIENT_SENDTOMON_2 101930402
/* Module ptpcommaccept.cxx = 94 */
#define PTP_COMMACCEPT_1 101940101
diff --git a/core/sqf/monitor/linux/cluster.cxx b/core/sqf/monitor/linux/cluster.cxx
index 4d0d3c2..762c1a0 100644
--- a/core/sqf/monitor/linux/cluster.cxx
+++ b/core/sqf/monitor/linux/cluster.cxx
@@ -69,6 +69,7 @@
#include "nscommacceptmon.h"
#else
#include "nameserver.h"
+#include "ptpclient.h"
#endif
extern bool IAmIntegrating;
@@ -88,7 +89,9 @@
extern CCommAcceptMon CommAcceptMon;
extern char MyMon2NsPort[MPI_MAX_PORT_NAME];
#else
+extern CProcess *NameServerProcess;
extern CNameServer *NameServer;
+extern CPtpClient *PtpClient;
extern bool NameServerEnabled;
extern char MyPtPPort[MPI_MAX_PORT_NAME];
#endif
@@ -1069,12 +1072,11 @@
}
+#ifndef NAMESERVER_PROCESS
void CCluster::HardNodeDown (int pnid, bool communicate_state)
{
-#ifndef NAMESERVER_PROCESS
char port_fname[MAX_PROCESS_PATH];
char temp_fname[MAX_PROCESS_PATH];
-#endif
CNode *node;
CLNode *lnode;
char buf[MON_STRING_BUF_SIZE];
@@ -1130,7 +1132,6 @@
return;
}
-#ifndef NAMESERVER_PROCESS
if ( !Emulate_Down )
{
if( !IsRealCluster )
@@ -1161,7 +1162,6 @@
remove(temp_fname);
rename(port_fname, temp_fname);
}
-#endif
if (node->GetState() != State_Down || !node->isInQuiesceState())
{
@@ -1194,9 +1194,7 @@
if ( ! Emulate_Down )
{
// make sure no processes are alive if in the middle of re-integration
-#ifndef NAMESERVER_PROCESS
node->KillAllDown();
-#endif
snprintf(buf, sizeof(buf),
"[CCluster::HardNodeDown], Node %s (%d)is down.\n",
node->GetName(), node->GetPNid());
@@ -1212,29 +1210,29 @@
}
else
{
- if ( node->GetPNid() == integratingPNid_ )
+ if (node->GetState() != State_Down)
{
- ResetIntegratingPNid();
- }
-#ifndef NAMESERVER_PROCESS
- node->KillAllDown();
-#endif
- node->SetState( State_Down );
- // Send node down message to local node's processes
- lnode = node->GetFirstLNode();
- for ( ; lnode; lnode = lnode->GetNextP() )
- {
- lnode->Down();
- }
- if ( ZClientEnabled )
- {
- ZClient->WatchNodeDelete( node->GetName() );
- ZClient->WatchNodeMasterDelete( node->GetName() );
+ if ( node->GetPNid() == integratingPNid_ )
+ {
+ ResetIntegratingPNid();
+ }
+ node->KillAllDown();
+ node->SetState( State_Down );
+ // Send node down message to local node's processes
+ lnode = node->GetFirstLNode();
+ for ( ; lnode; lnode = lnode->GetNextP() )
+ {
+ lnode->Down();
+ }
+ if ( ZClientEnabled )
+ {
+ ZClient->WatchNodeDelete( node->GetName() );
+ ZClient->WatchNodeMasterDelete( node->GetName() );
+ }
}
}
}
-#ifndef NAMESERVER_PROCESS
// we need to abort any active TmSync
if (( MyNode->GetTmSyncState() == SyncState_Start ) ||
( MyNode->GetTmSyncState() == SyncState_Continue ) ||
@@ -1245,21 +1243,79 @@
if (trace_settings & (TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC))
trace_printf("%s@%d - Node %s (pnid=%d) TmSyncState updated (%d)(%s)\n", method_name, __LINE__, MyNode->GetName(), MyPNID, MyNode->GetTmSyncState(), SyncStateString( MyNode->GetTmSyncState() ));
}
-#endif
-#ifndef NAMESERVER_PROCESS
if ( Emulate_Down )
{
AssignTmLeader(pnid, false);
}
else
-#endif
{
AssignLeaders(pnid, node->GetName(), false);
}
TRACE_EXIT;
}
+#endif
+
+#ifdef NAMESERVER_PROCESS
+void CCluster::HardNodeDownNs( int pnid )
+{
+ CNode *node;
+ char buf[MON_STRING_BUF_SIZE];
+
+ const char method_name[] = "CCluster::HardNodeDownNs";
+ TRACE_ENTRY;
+
+ node = Nodes->GetNode(pnid);
+
+ if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY))
+ trace_printf( "%s@%d - pnid=%d, state=%s, isInQuiesceState=%d,"
+ " (local pnid=%d, state=%s, isInQuiesceState=%d, "
+ "shutdown level=%d)\n", method_name, __LINE__,
+ pnid, StateString(node->GetState()),
+ node->isInQuiesceState(),
+ MyPNID, StateString(MyNode->GetState()),
+ MyNode->isInQuiesceState(), MyNode->GetShutdownLevel() );
+
+ if (( MyPNID == pnid ) &&
+ ( MyNode->GetState() == State_Down ||
+ MyNode->IsKillingNode() ) )
+ {
+ // we are coming down ... don't process it
+ if ( !IsRealCluster && MyNode->isInQuiesceState())
+ {
+ // in virtual env, this would be called after node quiescing,
+ // so continue with mark down processing.
+ }
+ else
+ {
+ return;
+ }
+ }
+
+ if (node->GetState() != State_Down)
+ {
+ snprintf( buf, sizeof(buf)
+ , "[%s], Node %s (%d) is going down.\n"
+ , method_name, node->GetName(), node->GetPNid());
+ mon_log_write(MON_CLUSTER_MARKDOWN_4, SQ_LOG_INFO, buf);
+
+ node->SetKillingNode( true );
+ node->DeleteAllDown();
+ node->SetState( State_Down );
+
+ if ( ZClientEnabled )
+ {
+ //ZClient->WatchNodeDelete( node->GetName() );
+ ZClient->WatchNodeMasterDelete( node->GetName() );
+ }
+ }
+
+ AssignLeaders(pnid, node->GetName(), false);
+
+ TRACE_EXIT;
+}
+#endif
void CCluster::SoftNodeDown( int pnid )
{
@@ -1651,8 +1707,10 @@
if ( nodeState == State_Down )
{
node->SetKillingNode( false );
+#ifndef NAMESERVER_PROCESS
if ( Emulate_Down )
{
+#endif
// Any DTMs running?
for ( int i=0; !tmCount && i < Nodes->GetPNodesCount(); i++ )
{
@@ -1706,6 +1764,7 @@
}
}
}
+#ifndef NAMESERVER_PROCESS
}
else
{
@@ -1714,6 +1773,7 @@
method_name, __LINE__ );
}
+#endif
}
else if ( nodeState == State_Merged )
{
@@ -1866,6 +1926,74 @@
return( rc );
}
+#ifdef NAMESERVER_PROCESS
+int CCluster::HardNodeUpNs( int pnid )
+{
+ int rc = 0;
+ CNode *node;
+ STATE nodeState;
+
+ const char method_name[] = "CCluster::HardNodeUpNs";
+ TRACE_ENTRY;
+
+ if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY))
+ trace_printf( "%s@%d - pnid=%d, MyPNID = %d, currentNodes_=%d\n"
+ , method_name, __LINE__, pnid, MyPNID, currentNodes_ );
+
+ node = Nodes->GetNode( pnid );
+ if ( node == NULL )
+ {
+ if ( rc )
+ { // Handle error
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s], Invalid node, pnid=%d\n"
+ , method_name, pnid );
+ mon_log_write(MON_CLUSTER_HARDNODEUPNS_1, SQ_LOG_ERR, buf);
+ return( -1 );
+ }
+ }
+
+ nodeState = node->GetState();
+
+ if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY))
+ trace_printf( "%s@%d" " - Node state=%s" "\n"
+ , method_name, __LINE__, StateString( nodeState ) );
+
+ if ( nodeState != State_Up )
+ {
+ if ( nodeState == State_Down )
+ {
+ node->SetKillingNode( false );
+ // We need to remove any old process objects before we restart the node.
+ node->CleanUpProcesses();
+ node->SetState( State_Up );
+ if ( MyPNID != pnid )
+ {
+ // Let other monitors know this node is up
+ CReplNodeUp *repl = new CReplNodeUp(pnid);
+ Replicator.addItem(repl);
+ }
+ }
+ }
+ else
+ { // Handle error
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s], Invalid node state, node %s, pnid=%d, state=%s\n"
+ , method_name
+ , node->GetName()
+ , node->GetPNid()
+ , StateString( nodeState ) );
+ mon_log_write(MON_CLUSTER_HARDNODEUPNS_2, SQ_LOG_ERR, buf);
+ return( -1 );
+ }
+
+ TRACE_EXIT;
+ return( rc );
+}
+#endif
+
int CCluster::SoftNodeUpPrepare( int pnid )
{
char buf[MON_STRING_BUF_SIZE];
@@ -7456,7 +7584,10 @@
case State_Unknown:
break;
case State_Down:
- doShutdown = true;
+ if (IsRealCluster)
+ {
+ doShutdown = true;
+ }
break;
case State_Stopped:
case State_Shutdown:
@@ -7780,19 +7911,23 @@
// let the watchdog process exit
HealthCheck.setState(MON_EXIT_PRIMITIVES);
}
- else if ( (MyNode->GetNumProcs() <= // only My Name Server alive
- myNameServerCount )
+ else if ( NameServerProcess != NULL
+ && myNameServerCount > 0
+ && (MyNode->GetNumProcs() <= myNameServerCount ) // only My Name Server alive
&& !MyNode->isInQuiesceState() // post-quiescing will
// expire WDG (cluster)
&& !waitForNameServerExit_ ) // Name Server not yet exiting
{
if (trace_settings & (TRACE_PROCESS | TRACE_PROCESS_DETAIL | TRACE_SYNC))
- trace_printf("%s@%d - Stopping Name Server process. "
- "(process count: cluster=%d, MyNode=%d)\n",
- method_name, __LINE__,
- Nodes->ProcessCount(), MyNode->ProcessCount());
-
+ {
+ trace_printf("%s@%d - Stopping Name Server process. "
+ "(process count: cluster=%d, MyNode=%d)\n",
+ method_name, __LINE__,
+ Nodes->ProcessCount(), MyNode->ProcessCount());
+ }
+
waitForNameServerExit_ = true;
+ MyNode->SetProcessState( NameServerProcess, State_Down, false );
int rc = NameServer->ProcessShutdown();
if (rc)
{
@@ -10196,6 +10331,14 @@
if ( errno != EINTR)
{
error = errno;
+ char la_buf[MON_STRING_BUF_SIZE];
+ sprintf( la_buf, "[%s], recv(), received=%d, sock=%d, error=%d(%s), desc=%s\n"
+ , method_name
+ , received
+ , sockFd
+ , error, strerror(error)
+ , desc );
+ mon_log_write(MON_CLUSTER_RECEIVESOCK_1, SQ_LOG_ERR, la_buf);
readAgain = false;
}
else
@@ -10264,6 +10407,14 @@
if ( errno != EINTR)
{
error = errno;
+ char la_buf[MON_STRING_BUF_SIZE];
+ sprintf( la_buf, "[%s], send(), sent=%d, sock=%d, error=%d(%s), desc=%s\n"
+ , method_name
+ , sent
+ , sockFd
+ , error, strerror(error)
+ , desc );
+ mon_log_write(MON_CLUSTER_SENDSOCK_1, SQ_LOG_ERR, la_buf);
sendAgain = false;
}
else
diff --git a/core/sqf/monitor/linux/cluster.h b/core/sqf/monitor/linux/cluster.h
index 970cf4c..2dfbfb8 100644
--- a/core/sqf/monitor/linux/cluster.h
+++ b/core/sqf/monitor/linux/cluster.h
@@ -163,7 +163,11 @@
int GetConfigPNodesMax() { return configPNodesMax_; }
bool ImAlive( bool needed=false, struct sync_def *sync = NULL );
int MapRank( int current_rank );
+#ifndef NAMESERVER_PROCESS
void HardNodeDown( int nid, bool communicate_state=false );
+#else
+ void HardNodeDownNs( int nid );
+#endif
void SoftNodeDown( int pnid );
int SoftNodeUpPrepare( int pnid );
bool CheckSpareSet( int pnid );
@@ -174,6 +178,9 @@
void ResetIntegratingPNid( void );
void SetIntegratingPNid( int pnid );
int HardNodeUp( int pnid, char *node_name );
+#ifdef NAMESERVER_PROCESS
+ int HardNodeUpNs( int pnid );
+#endif
inline CNode *GetIntegratingNode() { return Node[integratingPNid_]; }
inline CNode *GetNode( int pnid ) { return Node[pnid]; }
static char *Timestamp( void );
diff --git a/core/sqf/monitor/linux/commaccept.cxx b/core/sqf/monitor/linux/commaccept.cxx
index 5c4e3a5..13c2ebd 100644
--- a/core/sqf/monitor/linux/commaccept.cxx
+++ b/core/sqf/monitor/linux/commaccept.cxx
@@ -220,12 +220,18 @@
, i, node->GetPNid(), node->GetName());
}
- nodeInfo[i].pnid = -1;
nodeInfo[i].nodeName[0] = '\0';
nodeInfo[i].commPort[0] = '\0';
nodeInfo[i].syncPort[0] = '\0';
+ nodeInfo[i].pnid = -1;
nodeInfo[i].creatorPNid = -1;
}
+ nodeInfo[i].creatorShellPid = -1;
+ nodeInfo[i].creatorShellVerifier = -1;
+ nodeInfo[i].creator = false;
+ nodeInfo[i].ping = false;
+ nodeInfo[i].nsPid = -1;
+ nodeInfo[i].nsPNid = -1;
}
if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
diff --git a/core/sqf/monitor/linux/config.cxx b/core/sqf/monitor/linux/config.cxx
index 4ef9b72..9794199 100644
--- a/core/sqf/monitor/linux/config.cxx
+++ b/core/sqf/monitor/linux/config.cxx
@@ -1193,7 +1193,7 @@
regClusterEntry->valueLength = strlen (regClusterConfig[i].value);
if (trace_settings & (TRACE_INIT | TRACE_REQUEST))
{
- trace_printf ("%s%d pack type %d, scope %s (%d), key %s (%d), value %s(%d)\n",method_name, __LINE__,
+ trace_printf ("%s@%d pack type %d, scope %s (%d), key %s (%d), value %s(%d)\n",method_name, __LINE__,
regClusterEntry->type, regClusterConfig[i].scope,
regClusterEntry->scopeLength,regClusterConfig[i].key,regClusterEntry->keyLength,
regClusterConfig[i].value, regClusterEntry->valueLength);
@@ -1226,7 +1226,7 @@
if (regClusterConfig)
{
- delete regClusterConfig;
+ delete [] regClusterConfig;
}
return numberOfEntries;
@@ -1258,7 +1258,7 @@
if (trace_settings & (TRACE_INIT | TRACE_REQUEST))
{
- trace_printf ("%s%d scope length %d, key length %d, value length %d\n", method_name, __LINE__,
+ trace_printf ("%s@%d scope length %d, key length %d, value length %d\n", method_name, __LINE__,
clusterObj2->scopeLength,
clusterObj2->keyLength, clusterObj2->valueLength);
}
@@ -1317,7 +1317,7 @@
stringObj->stringLength = strlen(unique_string);
if (trace_settings & (TRACE_INIT | TRACE_REQUEST))
{
- trace_printf ("%s%d packing nid %d, unique id %d, stringt %s (length %d)\n", method_name, __LINE__,
+ trace_printf ("%s@%d packing nid %d, unique id %d, stringt %s (length %d)\n", method_name, __LINE__,
pnid, maxId, unique_string,stringObj->stringLength );
}
stringObj->unique_id = maxId;
diff --git a/core/sqf/monitor/linux/healthcheck.cxx b/core/sqf/monitor/linux/healthcheck.cxx
index 1f344fc..203465c 100644
--- a/core/sqf/monitor/linux/healthcheck.cxx
+++ b/core/sqf/monitor/linux/healthcheck.cxx
@@ -54,7 +54,6 @@
#include "redirector.h"
#include "replicate.h"
-
extern CReqQueue ReqQueue;
extern CMonitor *Monitor;
extern CNode *MyNode;
@@ -64,6 +63,7 @@
extern CHealthCheck HealthCheck;
extern CReplicate Replicator;
extern int MyPNID;
+extern bool IsRealCluster;
// constructor
CHealthCheck::CHealthCheck()
@@ -229,7 +229,6 @@
TRACE_ENTRY;
HealthCheckStates state;
-
struct timespec ts;
if (trace_settings & TRACE_HEALTH)
diff --git a/core/sqf/monitor/linux/internal.h b/core/sqf/monitor/linux/internal.h
index 35fa32a..5dc6456 100644
--- a/core/sqf/monitor/linux/internal.h
+++ b/core/sqf/monitor/linux/internal.h
@@ -537,5 +537,10 @@
char msg[MAX_SYNC_SIZE];
};
+typedef struct ptpMsgInfo
+{
+ int pnid; // Current offset into the msg buffer
+ int size; // Number if messages to replicate
+} ptpMsgInfo_t;
#endif
diff --git a/core/sqf/monitor/linux/lnode.cxx b/core/sqf/monitor/linux/lnode.cxx
index 69a186d..bbe5ac4 100644
--- a/core/sqf/monitor/linux/lnode.cxx
+++ b/core/sqf/monitor/linux/lnode.cxx
@@ -41,6 +41,7 @@
#include "lnode.h"
#include "pnode.h"
#include "mlio.h"
+#include "nameserver.h"
extern bool IsRealCluster;
extern CommType_t CommType;
@@ -50,6 +51,10 @@
extern CMonStats *MonStats;
extern bool usingCpuAffinity;
extern bool usingTseCpuAffinity;
+#ifndef NAMESERVER_PROCESS
+extern CNameServer *NameServer;
+extern bool NameServerEnabled;
+#endif
void CoreMaskString( char *str, cpu_set_t coreMask, int totalCores )
{
@@ -396,7 +401,12 @@
, method_name, __LINE__, GetNid()
, GetNode()->GetName(), msg->u.request.u.down.takeover );
}
-
+#ifndef NAMESERVER_PROCESS
+ if ( NameServerEnabled )
+ {
+ NameServer->ProcessNodeDown( Nid, msg->u.request.u.down.node_name );
+ }
+#endif
MyNode->Bcast( msg );
delete msg;
}
diff --git a/core/sqf/monitor/linux/makefile b/core/sqf/monitor/linux/makefile
index 73127e4..3d16bab 100644
--- a/core/sqf/monitor/linux/makefile
+++ b/core/sqf/monitor/linux/makefile
@@ -265,6 +265,7 @@
NSOBJS += $(OUTDIR)/nsreqdelproc.o
NSOBJS += $(OUTDIR)/nsreqstop.o
NSOBJS += $(OUTDIR)/nsreqnewproc.o
+NSOBJS += $(OUTDIR)/nsreqnodedown.o
NSOBJS += $(OUTDIR)/nsreqprocinfo.o
NSOBJS += $(OUTDIR)/nsreqprocinfons.o
NSOBJS += $(OUTDIR)/nsreqstart.o
diff --git a/core/sqf/monitor/linux/monitor.cxx b/core/sqf/monitor/linux/monitor.cxx
index d588945..2ad0528 100755
--- a/core/sqf/monitor/linux/monitor.cxx
+++ b/core/sqf/monitor/linux/monitor.cxx
@@ -1367,7 +1367,8 @@
env = getenv("SQ_NAMESERVER_ENABLED");
if ( env && isdigit(*env) )
{
- NameServerEnabled = atoi(env);
+ int val = atoi(env);
+ NameServerEnabled = (val != 0) ? true : false;
}
#endif
@@ -1605,6 +1606,7 @@
}
setlinebuf(stdout);
+#ifndef NAMESERVER_PROCESS
// Send stderr output to same file as stdout. (Note: the monitor does
// not write to stderr but perhaps there could be components included in
// the monitor build that do write to stderr.)
@@ -1612,6 +1614,10 @@
{
printf ( "dup2 failed for stderr: %s (%d)\n", strerror(errno), errno);
}
+#else
+ // Name Server is a child process of the monitor, the process create logic
+ // will establish IO redirection between the monitor process and the child.
+#endif
switch( CommType )
{
@@ -2052,13 +2058,15 @@
#ifdef NAMESERVER_PROCESS
Monitor = new CMonitor ();
#else
- Monitor = new CMonitor (procTermSig);
-#endif
-#ifndef NAMESERVER_PROCESS
if (NameServerEnabled)
{
+ PtpClient = new CPtpClient ();
+ Monitor = new CMonitor (procTermSig);
NameServer = new CNameServer ();
- PtpClient = new CPtpClient ();
+ }
+ else
+ {
+ Monitor = new CMonitor (procTermSig);
}
#endif
diff --git a/core/sqf/monitor/linux/msgdef.h b/core/sqf/monitor/linux/msgdef.h
index 639c15c..8218a8e 100644
--- a/core/sqf/monitor/linux/msgdef.h
+++ b/core/sqf/monitor/linux/msgdef.h
@@ -89,7 +89,7 @@
#define MAX_PROCINFO_LIST 64
#define MAX_PROC_CONTEXT 5
#define MAX_PROCESS_NAME MAX_KEY_NAME
-#define MAX_PROCESS_NAME_STR 12
+#define MAX_PROCESS_NAME_STR 13
#define MAX_PROCESS_PATH 256
#define MAX_PROCESSOR_NAME 128
#define MAX_RECONN_PING_WAIT_TIMEOUT 5
diff --git a/core/sqf/monitor/linux/nameserver.cxx b/core/sqf/monitor/linux/nameserver.cxx
index e9f1900..ad024f6 100644
--- a/core/sqf/monitor/linux/nameserver.cxx
+++ b/core/sqf/monitor/linux/nameserver.cxx
@@ -44,6 +44,7 @@
#include <limits.h>
#include <unistd.h>
+#include "trafconf/trafconfig.h"
#include "lnode.h"
#include "pnode.h"
#include "nameserver.h"
@@ -51,21 +52,24 @@
#include "montrace.h"
#include "nameserverconfig.h"
#include "meas.h"
+#include "reqqueue.h"
extern CNode *MyNode;
extern CProcess *NameServerProcess;
extern CNodeContainer *Nodes;
+extern CReqQueue ReqQueue;
extern bool IsRealCluster;
extern int MyPNID;
extern CNameServerConfigContainer *NameServerConfig;
extern CMeas Meas;
+#define NAMESERVER_IO_RETRIES 3
+
CNameServer::CNameServer( void )
-: mon2nsSock_(-1)
-, nsConfigInx_(-1)
-, nsStartupComplete_(false)
-, seqNum_(0)
-, shutdown_(false)
+ : mon2nsSock_(-1)
+ , nsStartupComplete_(false)
+ , seqNum_(0)
+ , shutdown_(false)
{
const char method_name[] = "CNameServer::CNameServer";
TRACE_ENTRY;
@@ -84,7 +88,7 @@
TRACE_EXIT;
}
-void CNameServer::ChooseNextNs( void )
+int CNameServer::ChooseNextNs( void )
{
const char method_name[] = "CNameServer::ChooseNextNs";
TRACE_ENTRY;
@@ -103,17 +107,58 @@
{
config = config->GetNext();
}
- strcpy( mon2nsHost_, config->GetName() );
- if ( trace_settings & TRACE_NS )
+ CNode *node = Nodes->GetNode( (char*) config->GetName() );
+ if (node && node->GetState() == State_Up)
{
- trace_printf( "%s@%d - nameserver=%s, rnd=%d, cnt=%d\n"
- , method_name, __LINE__
- , mon2nsHost_
- , rnd
- , cnt );
+ strcpy( mon2nsHost_, config->GetName() );
+ if ( trace_settings & TRACE_NS )
+ {
+ trace_printf( "%s@%d - nameserver=%s, rnd=%d, cnt=%d\n"
+ , method_name, __LINE__
+ , mon2nsHost_
+ , rnd
+ , cnt );
+ }
+ }
+ else
+ {
+ config = config->GetNext()?config->GetNext():NameServerConfig->GetFirstConfig();
+ while (config)
+ {
+ node = Nodes->GetNode( (char*) config->GetName() );
+ if (node && node->GetState() != State_Up)
+ {
+ config = config->GetNext();
+ continue;
+ }
+
+ strcpy( mon2nsHost_, config->GetName() );
+ if ( trace_settings & TRACE_NS )
+ {
+ trace_printf( "%s@%d - selected alternate nameserver=%s\n"
+ , method_name, __LINE__
+ , mon2nsHost_ );
+ }
+ break;
+ }
+ }
+
+ if (strlen(mon2nsHost_) == 0)
+ {
+ char la_buf[MON_STRING_BUF_SIZE];
+ sprintf( la_buf
+ , "[%s], No Name Server nodes available.\n"
+ "Scheduling shutdown (abrupt)!\n"
+ , method_name );
+ mon_log_write(NAMESERVER_CHOOSENEXTNS_1, SQ_LOG_CRIT, la_buf );
+ ReqQueue.enqueueShutdownReq( ShutdownLevel_Abrupt );
+
+ TRACE_EXIT;
+ return( -2 );
}
TRACE_EXIT;
+ return( 0 );
}
int CNameServer::ConnectToNs( bool *retry )
@@ -123,21 +168,32 @@
int err = 0;
+reconnect:
+
if ( !mon2nsPort_[0] )
- CNameServer::GetM2NPort( -1 );
- if ( !mon2nsHost_[0] )
- ChooseNextNs();
+ {
+ err = GetM2NPort( -1 );
+ }
+ if ( err == 0 && !mon2nsHost_[0] )
+ {
+ err = ChooseNextNs();
+ }
int sock = 0;
if ( shutdown_ )
+ {
err = -1;
+ }
if ( err == 0 )
{
- sock = SockCreate();
+ sock = ClientSockCreate();
if ( sock < 0 )
+ {
err = sock;
+ goto reconnect;
+ }
}
if ( err == 0 )
{
@@ -191,7 +247,7 @@
, nodeId.ping );
}
err = SockSend( ( char *) &nodeId, sizeof(nodeId) );
- if ( err == 0 )
+ if (err == 0)
{
if ( trace_settings & TRACE_NS )
{
@@ -252,7 +308,7 @@
if ( IsRealCluster )
{
CNode *node = Nodes->GetNode( nodeId.nsPNid );
- if ( node )
+ if (node && node->GetState() == State_Up)
{
strcpy( mon2nsHost_, node->GetName() );
GetM2NPort( nodeId.nsPNid );
@@ -273,50 +329,123 @@
return err;
}
-void CNameServer::GetM2NPort( int PNid )
+int CNameServer::GetM2NPort( int nsPNid )
{
+ const char method_name[] = "CNameServer::GetM2NPort";
+ TRACE_ENTRY;
+
+ bool done = false;
int port;
char *p = getenv( "NS_M2N_COMM_PORT" );
if ( p )
+ {
port = atoi(p);
+ }
else
+ {
port = 0;
+ }
if ( !IsRealCluster )
- port += PNid < 0 ? MyPNID : PNid;
+ {
+ // choose initial port
+ int nsMax = NameServerConfig->GetCount();
+ int candidatePNid = nsPNid < 0 ? MyPNID : nsPNid;
+ int chosenPNid =
+ candidatePNid < nsMax ? candidatePNid : candidatePNid%nsMax;
+ int lastChosenPNid = chosenPNid;
+ while (!done)
+ {
+ // check that corresponding node is UP
+ // node is up, chosen is good to go
+ // not up,
+ // round-robin on other name server nodes and chose 1st up node
+ // no name server nodes available
+ // log event and down my node (MyPNID)
+ CNode *node = Nodes->GetNode( chosenPNid );
+ if (node && node->GetState() == State_Up)
+ {
+ port += chosenPNid;
+
+ if ( trace_settings & TRACE_NS )
+ {
+ trace_printf( "%s@%d - nsMax=%d, nsPNid=%d, MyPNID=%d, "
+ "candidatePNid=%d, chosenPNid=%d, port=%d\n"
+ , method_name, __LINE__
+ , nsMax
+ , nsPNid
+ , MyPNID
+ , candidatePNid
+ , chosenPNid
+ , port );
+ }
+ done = true;
+ }
+ else
+ {
+ chosenPNid = (chosenPNid+1) < nsMax ? (chosenPNid+1) : 0;
+ if (chosenPNid == lastChosenPNid)
+ {
+ char la_buf[MON_STRING_BUF_SIZE];
+ sprintf( la_buf
+ , "[%s], No Name Server nodes available, "
+ "chosenPNid=%d, lastChosenPNid=%d.\n"
+ "Scheduling shutdown (abrupt)!\n"
+ , method_name
+ , chosenPNid, lastChosenPNid );
+ mon_log_write(NAMESERVER_GETM2NPORT_1, SQ_LOG_CRIT, la_buf );
+ ReqQueue.enqueueShutdownReq( ShutdownLevel_Abrupt );
+ done = true;
+ }
+ port += chosenPNid;
+ TRACE_EXIT;
+ return( -2 );
+ }
+ }
+ }
sprintf( mon2nsPort_, "%d", port );
-}
-
-void CNameServer::SetLocalHost( void )
-{
- gethostname( mon2nsHost_, MAX_PROCESSOR_NAME );
-}
-
-void CNameServer::SetShutdown( bool shutdown )
-{
- const char method_name[] = "CNameServer::SetShutdown";
- TRACE_ENTRY;
-
- if ( trace_settings & TRACE_NS )
- trace_printf( "%s@%d - set shutdown_=%d\n"
- , method_name, __LINE__, shutdown );
- shutdown_ = shutdown;
TRACE_EXIT;
+ return( 0 );
}
-void CNameServer::SockClose( void )
+bool CNameServer::IsNameServerConfigured( int pnid )
{
- const char method_name[] = "CNameServer::SockClose";
+ const char method_name[] = "CNameServer::IsNameServerConfigured";
TRACE_ENTRY;
- close( mon2nsSock_ );
- mon2nsSock_ = -1;
+ bool rs = false;
+
+ if ( IsRealCluster )
+ {
+ CNameServerConfig *config;
+ CNode *node = Nodes->GetNode( pnid );
+ if ( node )
+ {
+ config = NameServerConfig->GetConfig( node->GetName() );
+ if ( config )
+ {
+ rs = true;
+ }
+ }
+ }
+ else
+ {
+ rs = pnid < NameServerConfig->GetCount() ? true : false;
+ }
+
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - pnid=%d, configured=%s\n"
+ , method_name, __LINE__, pnid, rs?"True":"False" );
+ }
+
TRACE_EXIT;
+ return(rs);
}
-int CNameServer::SockCreate( void )
+int CNameServer::ClientSockCreate( void )
{
- const char method_name[] = "CNameServer::SockCreate";
+ const char method_name[] = "CNameServer::ClientSockCreate";
TRACE_ENTRY;
int sock; // socket
@@ -363,7 +492,8 @@
snprintf( la_buf, sizeof(la_buf)
, "[%s], socket() failed! errno=%d (%s)\n"
, method_name, err, strerror(err) );
- mon_log_write( MON_NAMESERVER_MKCLTSOCK_1, SQ_LOG_ERR, la_buf );
+ mon_log_write( NAMESERVER_CLIENTSOCKCREATE_1, SQ_LOG_ERR, la_buf );
+ TRACE_EXIT;
return ( -1 );
}
@@ -375,8 +505,9 @@
snprintf( la_buf, sizeof(la_buf ),
"[%s] gethostbyname(%s) failed! errno=%d (%s)\n"
, method_name, host, err, strerror(err) );
- mon_log_write(MON_NAMESERVER_MKCLTSOCK_2, SQ_LOG_ERR, la_buf );
+ mon_log_write(NAMESERVER_CLIENTSOCKCREATE_2, SQ_LOG_ERR, la_buf );
close( sock );
+ TRACE_EXIT;
return ( -1 );
}
@@ -418,7 +549,7 @@
int err = errno;
sprintf( la_buf, "[%s], connect() failed! errno=%d (%s)\n"
, method_name, err, strerror(err) );
- mon_log_write(MON_NAMESERVER_MKCLTSOCK_3, SQ_LOG_ERR, la_buf );
+ mon_log_write(NAMESERVER_CLIENTSOCKCREATE_3, SQ_LOG_ERR, la_buf );
struct timespec req, rem;
req.tv_sec = 0;
req.tv_nsec = 500000000L; // 500,000,000
@@ -439,8 +570,9 @@
char la_buf[MON_STRING_BUF_SIZE];
sprintf( la_buf, "[%s], connect() exceeded retries! count=%d\n"
, method_name, retries );
- mon_log_write(MON_NAMESERVER_MKCLTSOCK_4, SQ_LOG_ERR, la_buf );
+ mon_log_write(NAMESERVER_CLIENTSOCKCREATE_4, SQ_LOG_ERR, la_buf );
close( sock );
+ TRACE_EXIT;
return ( -1 );
}
struct timespec req, rem;
@@ -449,6 +581,8 @@
nanosleep( &req, &rem );
}
close( sock );
+ TRACE_EXIT;
+ return( -1 );
}
if ( trace_settings & TRACE_NS )
@@ -470,8 +604,9 @@
int err = errno;
sprintf( la_buf, "[%s], setsockopt() failed! errno=%d (%s)\n"
, method_name, err, strerror(err) );
- mon_log_write(MON_NAMESERVER_MKCLTSOCK_5, SQ_LOG_ERR, la_buf );
+ mon_log_write(NAMESERVER_CLIENTSOCKCREATE_5, SQ_LOG_ERR, la_buf );
close( sock );
+ TRACE_EXIT;
return ( -2 );
}
@@ -481,8 +616,9 @@
int err = errno;
sprintf( la_buf, "[%s], setsockopt() failed! errno=%d (%s)\n"
, method_name, err, strerror(err) );
- mon_log_write(MON_NAMESERVER_MKCLTSOCK_6, SQ_LOG_ERR, la_buf );
+ mon_log_write(NAMESERVER_CLIENTSOCKCREATE_6, SQ_LOG_ERR, la_buf );
close( sock );
+ TRACE_EXIT;
return ( -2 );
}
@@ -490,6 +626,19 @@
return ( sock );
}
+void CNameServer::NameServerExited( void )
+{
+ const char method_name[] = "CNameServer::NameServerExited";
+ TRACE_ENTRY;
+
+ mon2nsHost_[0] = '\0';
+ mon2nsPort_[0] = '\0';
+ nsStartupComplete_ = false;
+ SockClose();
+
+ TRACE_EXIT;
+}
+
int CNameServer::NameServerStop( struct message_def* msg )
{
const char method_name[] = "CNameServer::NameServerStop";
@@ -599,9 +748,6 @@
msgnew->unhooked = process->IsUnhooked();
msgnew->event_messages = process->IsEventMessages();
msgnew->system_messages = process->IsSystemMessages();
-// msgnew->pathStrId = process->pathStrId();
-// msgnew->ldpathStrId = process->ldPathStrId();
-// msgnew->programStrId = process->programStrId();
strcpy( msgnew->path, process->path() );
strcpy( msgnew->ldpath, process->ldpath() );
strcpy( msgnew->program, process->program() );
@@ -682,6 +828,45 @@
return error;
}
+int CNameServer::ProcessNodeDown( int nid, char *nodeName )
+{
+ const char method_name[] = "CNameServer::ProcessNodeDown";
+ TRACE_ENTRY;
+
+ int error = 0;
+ CProcess *process = MyNode->GetProcessByType( ProcessType_NameServer );
+ if (process)
+ {
+ struct message_def msg;
+ memset(&msg, 0, sizeof(msg) ); // TODO: remove!
+ msg.type = MsgType_Service;
+ msg.noreply = false;
+ msg.reply_tag = seqNum_++;
+ msg.u.request.type = ReqType_NodeDown;
+ struct NodeDown_def *msgdown = &msg.u.request.u.down;
+ msgdown->nid = nid;
+ strcpy( msgdown->node_name, nodeName );
+ msgdown->takeover = 0;
+ msgdown->reason[0] = 0;
+
+ if ( trace_settings & TRACE_NS )
+ {
+ trace_printf( "%s@%d - sending node-down request to nameserver=%s:%s\n"
+ " msg.down.nid=%d\n"
+ " msg.down.node_name=%s\n"
+ , method_name, __LINE__
+ , mon2nsHost_, mon2nsPort_
+ , msgdown->nid
+ , msgdown->node_name );
+ }
+
+ error = SendReceive(&msg );
+ }
+
+ TRACE_EXIT;
+ return error;
+}
+
int CNameServer::ProcessShutdown( void )
{
const char method_name[] = "CNameServer::ProcessShutdown";
@@ -696,7 +881,6 @@
struct ShutdownNs_def *msgshutdown = &msg.u.request.u.shutdown_ns;
msgshutdown->nid = -1;
msgshutdown->pid = -1;
- //msgshutdown->level = msgIn->u.request.u.shutdown.level;
msgshutdown->level = ShutdownLevel_Normal;
int error = SendReceive(&msg );
@@ -711,16 +895,20 @@
int CNameServer::SendReceive( struct message_def* msg )
{
const char method_name[] = "CNameServer::SendReceive";
+ TRACE_ENTRY;
+
+ int retryCount = 0;
char desc[256];
char* descp;
- struct DelProcessNs_def *msgdel;
- struct NewProcessNs_def *msgnew;
- struct ShutdownNs_def *msgshutdown;
- struct NameServerStart_def *msgstart;
- struct NameServerStop_def *msgstop;
- struct ProcessInfo_def *msginfo;
-
- TRACE_ENTRY;
+ struct DelProcessNs_def* msgdel;
+ struct NameServerStart_def* msgstart;
+ struct NameServerStop_def* msgstop;
+ struct NewProcessNs_def* msgnew;
+ struct NodeDown_def* msgdown;
+ struct ProcessInfo_def* msginfo;
+ struct ShutdownNs_def* msgshutdown;
+ struct message_def msg_reply;
+ struct message_def* pmsg_reply = &msg_reply;
descp = desc;
int size = offsetof(struct message_def, u.request.u);
@@ -750,6 +938,13 @@
msgnew->nid, msgnew->pid, msgnew->verifier, msgnew->process_name );
size += sizeof(msg->u.request.u.new_process_ns);
break;
+ case ReqType_NodeDown:
+ msgdown = &msg->u.request.u.down;
+ sprintf( desc, "node-down (nid=%d, node-name=%s, takeover=%d, reason=%s)",
+ msgdown->nid, msgdown->node_name,
+ msgdown->takeover, msgdown->reason );
+ size += sizeof(msg->u.request.u.down);
+ break;
case ReqType_ProcessInfo:
msginfo = &msg->u.request.u.process_info;
sprintf( desc, "process-info (nid=%d, pid=%d, verifier=%d, name=%s)\n"
@@ -774,7 +969,7 @@
break;
case ReqType_ShutdownNs:
msgshutdown = &msg->u.request.u.shutdown_ns;
- sprintf( desc, "shutdown (nid=%d, pid=%d, level=%d)",
+ sprintf( desc, "shutdown-ns (nid=%d, pid=%d, level=%d)",
msgshutdown->nid, msgshutdown->pid, msgshutdown->level );
size += sizeof(msg->u.request.u.shutdown_ns);
break;
@@ -783,13 +978,16 @@
break;
}
+retryIO:
+
int error = SendToNs( descp, msg, size );
if ( error == 0 )
error = SockReceive( (char *) &size, sizeof(size ) );
if ( error == 0 )
- error = SockReceive( (char *) msg, size );
+ error = SockReceive( (char *) pmsg_reply, size );
if ( error == 0 )
{
+ memcpy( msg, pmsg_reply, size );
if ( trace_settings & ( TRACE_NS | TRACE_PROCESS ) )
{
char desc[2048];
@@ -827,7 +1025,6 @@
msg->u.reply.u.process_info.more_data );
break;
case ReplyType_ProcessInfoNs:
-// int argvLen = sizeof(msg->u.reply.u.process_info_ns.argv);
sprintf( desc,
"process-info-ns reply:\n"
" process_info_ns.nid=%d\n"
@@ -847,18 +1044,14 @@
" process_info_ns.path=%s\n"
" process_info_ns.ldpath=%s\n"
" process_info_ns.program=%s\n"
-// " process_info_ns.pathStrId=%d:%d\n"
-// " process_info_ns.ldpathStrId=%d:%d\n"
-// " process_info_ns.programStrId=%d:%d\n"
" process_info_ns.port_name=%s\n"
" process_info_ns.argc=%d\n"
-// " process_info_ns.argv=[%.*s]\n"
" process_info_ns.infile=%s\n"
" process_info_ns.outfile=%s\n"
-//#if 0
-// " process_info_ns.creation_time=%ld(secs)\n",
-// " process_info_ns.creation_time=%ld(secs):%ld(nsecs)\n",
-//#endif
+#if 0
+ " process_info_ns.creation_time=%ld(secs)\n",
+ " process_info_ns.creation_time=%ld(secs):%ld(nsecs)\n",
+#endif
" process_info_ns.return_code=%d"
, msg->u.reply.u.process_info_ns.nid
, msg->u.reply.u.process_info_ns.pid
@@ -877,21 +1070,14 @@
, msg->u.reply.u.process_info_ns.path
, msg->u.reply.u.process_info_ns.ldpath
, msg->u.reply.u.process_info_ns.program
-// , msg->u.reply.u.process_info_ns.pathStrId.nid
-// , msg->u.reply.u.process_info_ns.pathStrId.id
-// , msg->u.reply.u.process_info_ns.ldpathStrId.nid
-// , msg->u.reply.u.process_info_ns.ldpathStrId.id
-// , msg->u.reply.u.process_info_ns.programStrId.nid
-// , msg->u.reply.u.process_info_ns.programStrId.id
, msg->u.reply.u.process_info_ns.port_name
, msg->u.reply.u.process_info_ns.argc
-// , &msg->u.reply.u.process_info_ns.argv
, msg->u.reply.u.process_info_ns.infile
, msg->u.reply.u.process_info_ns.outfile
-//#if 0
-// , msg->u.reply.u.process_info_ns.creation_time.tv_sec
-// , msg->u.reply.u.process_info_ns.creation_time.tv_nsec
-//#endif
+#if 0
+ , msg->u.reply.u.process_info_ns.creation_time.tv_sec
+ , msg->u.reply.u.process_info_ns.creation_time.tv_nsec
+#endif
, msg->u.reply.u.process_info_ns.return_code );
break;
default:
@@ -905,7 +1091,20 @@
);
}
}
- else
+ else if ( error != -2 && retryCount < NAMESERVER_IO_RETRIES )
+ {
+ retryCount++;
+ if ( trace_settings & TRACE_NS )
+ {
+ trace_printf( "%s@%d - retrying IO (%d) to nameserver=%s:%s\n"
+ , method_name, __LINE__
+ , retryCount
+ , mon2nsHost_, mon2nsPort_ );
+ }
+ goto retryIO;
+ }
+
+ if ( error )
{
// create a synthetic reply
msg->u.reply.u.generic.nid = -1;
@@ -943,9 +1142,10 @@
if ( trace_settings & TRACE_NS )
{
- trace_printf( "%s@%d - sending %s REQ to nameserver=%s:%s, sock=%d, shutdown=%d\n"
+ trace_printf( "%s@%d - sending %s\tREQ (size=%d) to nameserver=%s:%s, sock=%d, shutdown=%d\n"
, method_name, __LINE__
, reqType
+ , size
, mon2nsHost_
, mon2nsPort_
, mon2nsSock_
@@ -967,15 +1167,72 @@
error = ConnectToNs( &retry );
}
}
+
if ( error == 0 )
+ {
error = SockSend( (char *) &size, sizeof(size) );
- if ( error == 0 )
- error = SockSend( (char *) msg, size );
+ if (error)
+ {
+ int err = error;
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s], unable to send %s request size %d to "
+ "nameserver=%s:%s, error: %d(%s)\n"
+ , method_name, reqType, size, mon2nsHost_, mon2nsPort_, err, strerror(err) );
+ mon_log_write(NAMESERVER_SENDTONS_1, SQ_LOG_ERR, buf);
+ }
+ else
+ {
+ error = SockSend( (char *) msg, size );
+ if (error)
+ {
+ int err = error;
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s], unable to send %s request to "
+ "nameserver=%s:%s, error: %d(%s)\n"
+ , method_name, reqType, mon2nsHost_, mon2nsPort_, err, strerror(err) );
+ mon_log_write(NAMESERVER_SENDTONS_2, SQ_LOG_ERR, buf);
+ }
+ }
+ }
TRACE_EXIT;
return error;
}
+void CNameServer::SetLocalHost( void )
+{
+ gethostname( mon2nsHost_, MAX_PROCESSOR_NAME );
+}
+
+void CNameServer::SetShutdown( bool shutdown )
+{
+ const char method_name[] = "CNameServer::SetShutdown";
+ TRACE_ENTRY;
+
+ if ( trace_settings & TRACE_NS )
+ trace_printf( "%s@%d - set shutdown_=%d\n"
+ , method_name, __LINE__, shutdown );
+ shutdown_ = shutdown;
+
+ TRACE_EXIT;
+}
+
+void CNameServer::SockClose( void )
+{
+ const char method_name[] = "CNameServer::SockClose";
+ TRACE_ENTRY;
+
+ if (mon2nsSock_ != -1)
+ {
+ close( mon2nsSock_ );
+ mon2nsSock_ = -1;
+ }
+
+ TRACE_EXIT;
+}
+
int CNameServer::SockReceive( char *buf, int size )
{
const char method_name[] = "CNameServer::SockReceive";
@@ -1045,9 +1302,29 @@
, error, strerror(error) );
}
- if ( error )
+ if (error)
+ {
SockClose();
+ int err = error;
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s], unable to receive request size %d to "
+ "nameserver=%s:%s, error: %d(%s)\n"
+ , method_name, size, mon2nsHost_, mon2nsPort_, err, strerror(err) );
+ mon_log_write(NAMESERVER_SOCKRECEIVE_1, SQ_LOG_ERR, buf);
+
+ // Choose another name server on IO retry
+ if (IsRealCluster)
+ {
+ mon2nsHost_[0] = 0;
+ }
+ else
+ {
+ mon2nsPort_[0] = 0;
+ }
+ }
+
TRACE_EXIT;
return error;
}
@@ -1112,9 +1389,28 @@
, error, strerror(error) );
}
- if ( error )
+ if (error)
+ {
SockClose();
+ int err = error;
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s], unable to send request size %d to "
+ "nameserver=%s:%s, error: %d(%s)\n"
+ , method_name, size, mon2nsHost_, mon2nsPort_, err, strerror(err) );
+ mon_log_write(NAMESERVER_SOCKSEND_1, SQ_LOG_ERR, buf);
+ // Choose another name server on IO retry
+ if (IsRealCluster)
+ {
+ mon2nsHost_[0] = 0;
+ }
+ else
+ {
+ mon2nsPort_[0] = 0;
+ }
+ }
+
TRACE_EXIT;
return error;
}
diff --git a/core/sqf/monitor/linux/nameserver.h b/core/sqf/monitor/linux/nameserver.h
index a8ccb4b..009eced 100644
--- a/core/sqf/monitor/linux/nameserver.h
+++ b/core/sqf/monitor/linux/nameserver.h
@@ -40,12 +40,15 @@
CNameServer( void );
virtual ~CNameServer( void );
+ bool IsNameServerConfigured( int pnid );
+ void NameServerExited( void );
int NameServerStop( struct message_def* msg );
int ProcessDelete(CProcess* process );
int ProcessInfo( struct message_def* msg );
int ProcessInfoCont( struct message_def* msg );
int ProcessInfoNs( struct message_def* msg );
int ProcessNew(CProcess* process );
+ int ProcessNodeDown( int nid, char* nodeName );
int ProcessShutdown( void );
void SetLocalHost( void );
@@ -53,21 +56,20 @@
char mon2nsHost_[MAX_PROCESSOR_NAME];
char mon2nsPort_[10];
int mon2nsSock_;
- int nsConfigInx_;
bool nsStartupComplete_;
int seqNum_;
bool shutdown_;
- void ChooseNextNs( void );
- int ConnectToNs( bool *retry );
- void GetM2NPort( int PNid );
+ int ChooseNextNs( void );
+ int ClientSockCreate();
+ int ConnectToNs( bool* retry );
+ int GetM2NPort( int PNid );
int SendReceive( struct message_def* msg );
- int SendToNs( const char *reqType, struct message_def *msg, int size );
+ int SendToNs( const char* reqType, struct message_def* msg, int size );
void SetShutdown( bool shutdown );
void SockClose( void );
- int SockCreate();
- int SockReceive( char *buf, int size );
- int SockSend( char *buf, int size );
+ int SockReceive( char* buf, int size );
+ int SockSend( char* buf, int size );
};
#endif
diff --git a/core/sqf/monitor/linux/nscommacceptmon.cxx b/core/sqf/monitor/linux/nscommacceptmon.cxx
index edddca7..0857cc9 100644
--- a/core/sqf/monitor/linux/nscommacceptmon.cxx
+++ b/core/sqf/monitor/linux/nscommacceptmon.cxx
@@ -155,6 +155,36 @@
TRACE_EXIT;
}
+void CCommAcceptMon::monReqNodeDown( struct message_def* msg, int sockFd )
+{
+ const char method_name[] = "CCommAcceptMon::monReqNodeDown";
+ TRACE_ENTRY;
+
+ if ( trace_settings & ( TRACE_NS | TRACE_REQUEST) )
+ {
+ trace_printf( "%s@%d - Received monitor node-down request.\n"
+ " msg.down.nid=%d\n"
+ " msg.down.node_name=%s\n"
+ " msg.down.takeover=%d\n"
+ " msg.down.reason=%s\n"
+ , method_name, __LINE__
+ , msg->u.request.u.down.nid
+ , msg->u.request.u.down.node_name
+ , msg->u.request.u.down.takeover
+ , msg->u.request.u.down.reason
+ );
+ }
+
+ CExternalReq::reqQueueMsg_t msgType;
+ msgType = CExternalReq::NonStartupMsg;
+ int nid = msg->u.request.u.down.nid;
+ int pid = -1;
+ // Place new request on request queue
+ ReqQueue.enqueueReq(msgType, nid, pid, sockFd, msg);
+
+ TRACE_EXIT;
+}
+
void CCommAcceptMon::monReqProcessInfo( struct message_def* msg, int sockFd )
{
const char method_name[] = "CCommAcceptMon::monReqProcessInfo";
@@ -412,12 +442,12 @@
void CCommAcceptMon::processMonReqs( int sockFd )
{
const char method_name[] = "CCommAcceptMon::processMonReqs";
+ TRACE_ENTRY;
+
int rc;
nodeId_t nodeId;
struct message_def msg;
- TRACE_ENTRY;
-
if ( trace_settings & ( TRACE_NS ) )
{
trace_printf( "%s@%d - Accepted connection sock=%d\n"
@@ -435,7 +465,7 @@
char buf[MON_STRING_BUF_SIZE];
snprintf(buf, sizeof(buf), "[%s], unable to obtain node id from new "
"monitor: %s.\n", method_name, ErrorMsg(rc));
- mon_log_write(NS_COMMACCEPT_2, SQ_LOG_ERR, buf);
+ mon_log_write(NS_COMMACCEPT_PROCESSMONREQS_1, SQ_LOG_ERR, buf);
return;
}
@@ -462,6 +492,36 @@
, nodeId.ping );
}
+ CNode *node;
+ node = Nodes->GetNode( nodeId.pnid );
+ if ( node != NULL )
+ {
+ if ( node->GetState() != State_Up )
+ {
+ if ( trace_settings & ( TRACE_NS ) )
+ {
+ trace_printf( "%s@%d - Bringing node up, node=%s, pnid=%d\n"
+ , method_name, __LINE__
+ , node->GetName(), node->GetPNid() );
+ }
+ rc = Monitor->HardNodeUpNs( node->GetPNid() );
+ if ( rc )
+ { // Handle error
+ close( sockFd );
+ return;
+ }
+ }
+ }
+ else
+ { // Handle error
+ close( sockFd );
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf(buf, sizeof(buf), "[%s], invalid physical node id, "
+ "pnid: %d\n", method_name, nodeId.pnid );
+ mon_log_write(NS_COMMACCEPT_PROCESSMONREQS_2, SQ_LOG_ERR, buf);
+ return;
+ }
+
strcpy(nodeId.nodeName, MyNode->GetName());
strcpy(nodeId.commPort, MyNode->GetCommPort());
strcpy(nodeId.syncPort, MyNode->GetSyncPort());
@@ -504,7 +564,7 @@
char buf[MON_STRING_BUF_SIZE];
snprintf(buf, sizeof(buf), "[%s], unable to send node id from new "
"monitor: %s.\n", method_name, ErrorMsg(rc));
- mon_log_write(NS_COMMACCEPT_3, SQ_LOG_ERR, buf);
+ mon_log_write(NS_COMMACCEPT_PROCESSMONREQS_3, SQ_LOG_ERR, buf);
return;
}
@@ -517,9 +577,9 @@
{ // Handle error
close( sockFd );
char buf[MON_STRING_BUF_SIZE];
- snprintf(buf, sizeof(buf), "[%s], unable to obtain node id from new "
+ snprintf(buf, sizeof(buf), "[%s], unable to obtain message size from "
"monitor: %s.\n", method_name, ErrorMsg(rc));
- mon_log_write(NS_COMMACCEPT_4, SQ_LOG_ERR, buf);
+ mon_log_write(NS_COMMACCEPT_PROCESSMONREQS_4, SQ_LOG_ERR, buf);
return;
}
@@ -528,9 +588,9 @@
{ // Handle error
close( sockFd );
char buf[MON_STRING_BUF_SIZE];
- snprintf(buf, sizeof(buf), "[%s], unable to obtain node id from new "
+ snprintf(buf, sizeof(buf), "[%s], unable to obtain message from "
"monitor: %s.\n", method_name, ErrorMsg(rc));
- mon_log_write(NS_COMMACCEPT_5, SQ_LOG_ERR, buf);
+ mon_log_write(NS_COMMACCEPT_PROCESSMONREQS_5, SQ_LOG_ERR, buf);
return;
}
if ( trace_settings & ( TRACE_NS ) )
@@ -591,6 +651,10 @@
monReqNameServerStop(&msg, sockFd);
break;
+ case ReqType_NodeDown:
+ monReqNodeDown(&msg, sockFd);
+ break;
+
case ReqType_ProcessInfo:
monReqProcessInfo(&msg, sockFd);
break;
@@ -663,9 +727,9 @@
if (rc != 0)
{
char buf[MON_STRING_BUF_SIZE];
- snprintf(buf, sizeof(buf), "[%s], thread create error=%d\n",
+ snprintf(buf, sizeof(buf), "[%s], mon2nsProcess thread create error=%d\n",
method_name, rc);
- mon_log_write(NS_COMMACCEPT_6, SQ_LOG_ERR, buf);
+ mon_log_write(NS_COMMACCEPT_PROCESSNEWSOCK_1, SQ_LOG_ERR, buf);
}
TRACE_EXIT;
@@ -743,7 +807,7 @@
char buf[MON_STRING_BUF_SIZE];
snprintf(buf, sizeof(buf), "[%s], cannot accept new monitor: %s.\n",
method_name, strerror(errno));
- mon_log_write(NS_COMMACCEPT_7, SQ_LOG_ERR, buf);
+ mon_log_write(NS_COMMACCEPT_COMMACCEPTORSOCK_1, SQ_LOG_ERR, buf);
}
else
@@ -800,7 +864,7 @@
char buf[MON_STRING_BUF_SIZE];
snprintf(buf, sizeof(buf), "[%s], pthread_sigmask error=%d\n",
method_name, rc);
- mon_log_write(NS_COMMACCEPT_8, SQ_LOG_ERR, buf);
+ mon_log_write(NS_COMMACCEPT_MON2NSACCEPTMON_1, SQ_LOG_ERR, buf);
}
// Enter thread processing loop
@@ -830,7 +894,7 @@
char buf[MON_STRING_BUF_SIZE];
snprintf(buf, sizeof(buf), "[%s], pthread_sigmask error=%d\n",
method_name, rc);
- mon_log_write(NS_COMMACCEPT_9, SQ_LOG_ERR, buf);
+ mon_log_write(NS_COMMACCEPT_MON2NSPROCESS_1, SQ_LOG_ERR, buf);
}
MyNode->AddMonConnCount(1);
@@ -858,7 +922,7 @@
char buf[MON_STRING_BUF_SIZE];
snprintf(buf, sizeof(buf), "[%s], thread create error=%d\n",
method_name, rc);
- mon_log_write(NS_COMMACCEPT_10, SQ_LOG_ERR, buf);
+ mon_log_write(NS_COMMACCEPT_START_1, SQ_LOG_ERR, buf);
}
TRACE_EXIT;
diff --git a/core/sqf/monitor/linux/nscommacceptmon.h b/core/sqf/monitor/linux/nscommacceptmon.h
index 41b2c9b..1749b16 100644
--- a/core/sqf/monitor/linux/nscommacceptmon.h
+++ b/core/sqf/monitor/linux/nscommacceptmon.h
@@ -46,6 +46,7 @@
void monReqExec( CExternalReq * request );
void monReqNameServerStop( struct message_def* msg, int sockFd );
void monReqNewProcess( struct message_def* msg, int sockFd );
+ void monReqNodeDown( struct message_def* msg, int sockFd );
void monReqProcessInfo( struct message_def* msg, int sockFd );
void monReqProcessInfoCont( struct message_def* msg, int sockFd );
void monReqProcessInfoNs( struct message_def* msg, int sockFd );
@@ -68,9 +69,9 @@
bool accepting_;
bool shutdown_;
- // commAccept thread's id
+ // mon2nsAcceptMon thread's id
pthread_t thread_id_;
- // commAccept thread's id
+ // mon2nsProcess thread's id
pthread_t process_thread_id_;
enum { HEURISTIC_COUNT = 10 };
diff --git a/core/sqf/monitor/linux/nsreqnodedown.cxx b/core/sqf/monitor/linux/nsreqnodedown.cxx
new file mode 100644
index 0000000..9b5ddc2
--- /dev/null
+++ b/core/sqf/monitor/linux/nsreqnodedown.cxx
@@ -0,0 +1,100 @@
+///////////////////////////////////////////////////////////////////////////////
+//
+// @@@ START COPYRIGHT @@@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// @@@ END COPYRIGHT @@@
+//
+///////////////////////////////////////////////////////////////////////////////
+
+#include "nstype.h"
+
+#include <stdio.h>
+#include "reqqueue.h"
+#include "montrace.h"
+#include "monsonar.h"
+#include "monlogging.h"
+
+extern CMonStats *MonStats;
+extern CNode *MyNode;
+extern CNodeContainer *Nodes;
+extern CMonitor *Monitor;
+
+CExtNodeDownNsReq::CExtNodeDownNsReq( reqQueueMsg_t msgType
+ , int pid
+ , int sockFd
+ , struct message_def *msg )
+ : CExternalReq(msgType, -1, pid, sockFd, msg)
+{
+ // Add eyecatcher sequence as a debugging aid
+ memcpy(&eyecatcher_, "RqEJ", 4);
+
+ priority_ = High;
+}
+
+CExtNodeDownNsReq::~CExtNodeDownNsReq()
+{
+ // Alter eyecatcher sequence as a debugging aid to identify deleted object
+ memcpy(&eyecatcher_, "rQej", 4);
+}
+
+void CExtNodeDownNsReq::populateRequestString( void )
+{
+ char strBuf[MON_STRING_BUF_SIZE/2] = { 0 };
+
+ snprintf( strBuf, sizeof(strBuf),
+ "ExtReq(%s) req #=%ld requester(pid=%d) (nid=%d)"
+ , CReqQueue::svcReqType[reqType_], getId(), pid_
+ , msg_->u.request.u.down.nid );
+ requestString_.assign( strBuf );
+}
+
+void CExtNodeDownNsReq::performRequest()
+{
+ const char method_name[] = "CExtNodeDownNsReq::performRequest";
+ TRACE_ENTRY;
+
+ CNode *node = NULL;
+
+ // Record statistics (sonar counters)
+ if (sonar_verify_state(SONAR_ENABLED | SONAR_MONITOR_ENABLED))
+ MonStats->req_type_nodedown_Incr();
+
+ // Trace info about request
+ if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+ {
+ trace_printf("%s@%d request #%ld: NodeDown, nid=%d\n", method_name,
+ __LINE__, id_, msg_->u.request.u.down.nid);
+ }
+
+ node = Nodes->GetLNode( msg_->u.request.u.down.nid )->GetNode();
+ Monitor->HardNodeDownNs( node->GetPNid() );
+
+ msg_->u.reply.type = ReplyType_Generic;
+ msg_->u.reply.u.generic.nid = -1;
+ msg_->u.reply.u.generic.pid = pid_;
+ msg_->u.reply.u.generic.verifier = -1;
+ msg_->u.reply.u.generic.process_name[0] = '\0';
+ msg_->u.reply.u.generic.return_code = MPI_SUCCESS;
+
+ // Send reply to monitor
+ monreply(msg_, sockFd_);
+
+ TRACE_EXIT;
+}
diff --git a/core/sqf/monitor/linux/nsreqprocinfons.cxx b/core/sqf/monitor/linux/nsreqprocinfons.cxx
index aa53437..37c09f6 100644
--- a/core/sqf/monitor/linux/nsreqprocinfons.cxx
+++ b/core/sqf/monitor/linux/nsreqprocinfons.cxx
@@ -222,18 +222,21 @@
if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
{
- trace_printf( "%s@%d request #%ld: ProcessInfoNs, for (%d, %d:%d), "
+ trace_printf( "%s@%d request #%ld: ProcessInfoNs, for %s (%d, %d:%d), "
"process type=%s\n"
, method_name, __LINE__, id_
- , target_nid, target_pid, target_verifier
+ , target_process_name.c_str(), target_nid, target_pid, target_verifier
, ProcessTypeString(target_type));
}
if (target_process_name.size())
{ // find by name (don't check node state, don't check process state, not backup)
- process = Nodes->GetProcess( target_process_name.c_str()
- , target_verifier
- , false, false, false );
+ if (msg_->u.request.u.process_info.target_process_name[0] == '$' )
+ {
+ process = Nodes->GetProcess( target_process_name.c_str()
+ , target_verifier
+ , false, false, false );
+ }
}
else
{
diff --git a/core/sqf/monitor/linux/nsreqshutdown.cxx b/core/sqf/monitor/linux/nsreqshutdown.cxx
index e5888d2..63d9400 100644
--- a/core/sqf/monitor/linux/nsreqshutdown.cxx
+++ b/core/sqf/monitor/linux/nsreqshutdown.cxx
@@ -30,12 +30,10 @@
#include "montrace.h"
#include "monsonar.h"
#include "monlogging.h"
-#include "replicate.h"
extern CMonStats *MonStats;
extern CNode *MyNode;
extern CNodeContainer *Nodes;
-extern CReplicate Replicator;
CExtShutdownNsReq::CExtShutdownNsReq (reqQueueMsg_t msgType,
int nid, int pid, int sockFd,
@@ -43,7 +41,7 @@
: CExternalReq(msgType, nid, pid, sockFd, msg)
{
// Add eyecatcher sequence as a debugging aid
- memcpy(&eyecatcher_, "RQER", 4); // TODO
+ memcpy(&eyecatcher_, "RqER", 4);
priority_ = High;
}
@@ -51,7 +49,7 @@
CExtShutdownNsReq::~CExtShutdownNsReq()
{
// Alter eyecatcher sequence as a debugging aid to identify deleted object
- memcpy(&eyecatcher_, "rqer", 4); // TODO
+ memcpy(&eyecatcher_, "rQer", 4);
}
void CExtShutdownNsReq::populateRequestString( void )
diff --git a/core/sqf/monitor/linux/nsreqstop.cxx b/core/sqf/monitor/linux/nsreqstop.cxx
index 4ff46ce..a20e9bd 100644
--- a/core/sqf/monitor/linux/nsreqstop.cxx
+++ b/core/sqf/monitor/linux/nsreqstop.cxx
@@ -90,7 +90,7 @@
int nid = atoi( msg_->u.request.u.nameserver_stop.node_name );
node = Nodes->GetLNode( nid )->GetNode();
}
- Monitor->HardNodeDown( node->GetPNid(), true );
+ Monitor->HardNodeDownNs( node->GetPNid() );
char la_buf[MON_STRING_BUF_SIZE*2];
snprintf( la_buf, sizeof(la_buf)
diff --git a/core/sqf/monitor/linux/pnode.cxx b/core/sqf/monitor/linux/pnode.cxx
index 4a4b8c4..364837b 100644
--- a/core/sqf/monitor/linux/pnode.cxx
+++ b/core/sqf/monitor/linux/pnode.cxx
@@ -1065,11 +1065,11 @@
!MyNode->IsMyNode(targetLNode->GetNid()))
{
// Forward the unique string to the target node
- int rc = PtpClient->AddUniqStr( id.nid
- , id.id
- , candidate
- , targetLNode->GetNid()
- , targetLNode->GetNode()->GetName() );
+ int rc = PtpClient->ProcessAddUniqStr( id.nid
+ , id.id
+ , candidate
+ , targetLNode->GetNid()
+ , targetLNode->GetNode()->GetName() );
if (rc)
{
char la_buf[MON_STRING_BUF_SIZE];
@@ -1110,11 +1110,11 @@
!MyNode->IsMyNode(targetLNode->GetNid()))
{
// Forward the unique string to the target node
- int rc = PtpClient->AddUniqStr( id.nid
- , id.id
- , candidate
- , targetLNode->GetNid()
- , targetLNode->GetNode()->GetName());
+ int rc = PtpClient->ProcessAddUniqStr( id.nid
+ , id.id
+ , candidate
+ , targetLNode->GetNid()
+ , targetLNode->GetNode()->GetName());
if (rc)
{
char la_buf[MON_STRING_BUF_SIZE];
@@ -1240,6 +1240,16 @@
const char method_name[] = "CNode::StartNameServerProcess";
TRACE_ENTRY;
+ if ( !NameServer->IsNameServerConfigured( MyPNID ) )
+ {
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d" " - NameServer is not configured in my node\n"
+ , method_name, __LINE__);
+ }
+ return;
+ }
+
char path[MAX_SEARCH_PATH];
char *ldpath = NULL; // = getenv("LD_LIBRARY_PATH");
char filename[MAX_PROCESS_PATH];
@@ -1250,7 +1260,9 @@
snprintf( stdout, sizeof(stdout), "stdout_TNS%d", MyNode->GetZone() );
if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
- trace_printf("%s@%d" " - Creating NameService Process\n", method_name, __LINE__);
+ {
+ trace_printf("%s@%d" " - Creating NameServer Process\n", method_name, __LINE__);
+ }
strcpy(path,getenv("PATH"));
strcat(path,":");
@@ -1281,12 +1293,14 @@
if ( NameServerProcess )
{
if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
- trace_printf("%s@%d" " - NameService Process created\n", method_name, __LINE__);
+ {
+ trace_printf("%s@%d" " - NameServer Process created\n", method_name, __LINE__);
+ }
}
else
{
char la_buf[MON_STRING_BUF_SIZE];
- sprintf(la_buf, "[%s], NameService Process creation failed.\n", method_name);
+ sprintf(la_buf, "[%s], NameServer Process creation failed.\n", method_name);
mon_log_write( MON_NODE_STARTNAMESERVER_1, SQ_LOG_ERR, la_buf );
}
@@ -2556,11 +2570,27 @@
}
else
{
- char buf[MON_STRING_BUF_SIZE];
- snprintf( buf, sizeof(buf),
- "[%s] ProcessInfo failed, rc=%d\n"
- , method_name, msg.u.reply.u.process_info_ns.return_code );
- mon_log_write( MON_NODE_CLONEPROCESSNS_1, SQ_LOG_ERR, buf );
+ if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+ {
+ trace_printf( "%s@%d - ProcessInfoNs(%d, %d:%d) -- can't find target process\n"
+ , method_name, __LINE__
+ , msg.u.reply.u.process_info_ns.nid
+ , msg.u.reply.u.process_info_ns.pid
+ , msg.u.reply.u.process_info_ns.verifier);
+ }
+
+ if ( msg.u.reply.u.process_info_ns.return_code != MPI_ERR_NAME )
+ {
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf),
+ "[%s] ProcessInfo(%d, %d:%d) failed, rc=%d\n"
+ , method_name
+ , msg.u.reply.u.process_info_ns.nid
+ , msg.u.reply.u.process_info_ns.pid
+ , msg.u.reply.u.process_info_ns.verifier
+ , msg.u.reply.u.process_info_ns.return_code );
+ mon_log_write( MON_NODE_CLONEPROCESSNS_1, SQ_LOG_ERR, buf );
+ }
}
}
else
@@ -2625,11 +2655,25 @@
}
else
{
- char buf[MON_STRING_BUF_SIZE];
- snprintf( buf, sizeof(buf),
- "[%s] ProcessInfo failed, rc=%d\n"
- , method_name, msg.u.reply.u.process_info_ns.return_code );
- mon_log_write( MON_NODE_CLONEPROCESSNS_4, SQ_LOG_ERR, buf );
+ if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+ {
+ trace_printf( "%s@%d - ProcessInfoNs(%s:%d) -- can't find target process\n"
+ , method_name, __LINE__
+ , msg.u.reply.u.process_info_ns.process_name
+ , msg.u.reply.u.process_info_ns.verifier);
+ }
+
+ if ( msg.u.reply.u.process_info_ns.return_code != MPI_ERR_NAME )
+ {
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf),
+ "[%s] ProcessInfo(%s:%d) failed, rc=%d\n"
+ , method_name
+ , msg.u.reply.u.process_info_ns.process_name
+ , msg.u.reply.u.process_info_ns.verifier
+ , msg.u.reply.u.process_info_ns.return_code );
+ mon_log_write( MON_NODE_CLONEPROCESSNS_4, SQ_LOG_ERR, buf );
+ }
}
}
else
diff --git a/core/sqf/monitor/linux/process.cxx b/core/sqf/monitor/linux/process.cxx
index 9b9ee00..a39a589 100644
--- a/core/sqf/monitor/linux/process.cxx
+++ b/core/sqf/monitor/linux/process.cxx
@@ -2589,27 +2589,46 @@
// Take fork semaphore. We need to wait until parent indicates
// it is ok to proceed. Pipes between parent and child need to
// be set up before child can continue.
+ bool sem_log_error = false;
int sem_rc;
+ int err = 0;
struct timeval logTime;
struct tm *ltime;
-
- gettimeofday(&logTime, NULL);
- ltime = localtime(&logTime.tv_sec);
-
struct timespec ts;
- ts.tv_sec = 1;
- ts.tv_nsec = 0;
+
+ if (clock_gettime(CLOCK_REALTIME, &ts) == -1)
+ {
+ err = errno;
+ gettimeofday(&logTime, NULL);
+ ltime = localtime(&logTime.tv_sec);
+ snprintf(la_buf, sizeof(la_buf),
+ "%02d/%02d/%02d-%02d:%02d:%02d "
+ "[CProcess::Create], clock_gettime(CLOCK_REALTIME),"
+ " Child can't get time, %s (%d), program %s, (pid=%d).\n"
+ , ltime->tm_mon+1, ltime->tm_mday, ltime->tm_year-100, ltime->tm_hour, ltime->tm_min, ltime->tm_sec
+ , strerror(err), err
+ , filename, getpid());
+ write (2, la_buf, strlen(la_buf));
+ }
+ ts.tv_sec += 1;
+
env = getenv( "MON_CREATE_SEM_DELAY" );
if (env && isdigit(*env))
{
ts.tv_sec = atol(env);
}
- int err;
+
+ env = getenv( "MON_CREATE_SEM_LOG_ERROR" );
+ if (env && isdigit(*env))
+ {
+ int val = atoi(env);
+ sem_log_error = (val != 0) ? true : false;
+ }
do
{
sem_rc = sem_timedwait(MyNode->GetMutex(), &ts);
err = errno;
- if ( err == ETIMEDOUT )
+ if ( sem_log_error && err == ETIMEDOUT )
{
gettimeofday(&logTime, NULL);
ltime = localtime(&logTime.tv_sec);
@@ -2625,7 +2644,7 @@
}
while (sem_rc == -1 && (err == EINTR || err == ETIMEDOUT));
- if ( sem_rc == -1 && !(err == EINTR || err == ETIMEDOUT))
+ if ( sem_log_error && sem_rc == -1 && !(err == EINTR || err == ETIMEDOUT))
{
gettimeofday(&logTime, NULL);
ltime = localtime(&logTime.tv_sec);
@@ -3319,6 +3338,10 @@
case ProcessType_NameServer:
if ( IsAbended() )
{
+ if (!Clone)
+ {
+ NameServer->NameServerExited();
+ }
if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS))
trace_printf("%s@%d" " - NameServer abended" "\n", method_name, __LINE__);
}
@@ -4095,6 +4118,7 @@
CProcessContainer::CProcessContainer (void)
:numProcs_(0)
,nodeContainer_(false)
+ ,processNameFormatLong_(true)
,nameMap_(NULL)
,pidMap_(NULL)
,head_(NULL)
@@ -4121,12 +4145,22 @@
abort();
}
+#ifndef NAMESERVER_PROCESS
+ char *env = getenv("SQ_MON_PROCESS_NAME_FORMAT_LONG");
+ if ( env && isdigit(*env) )
+ {
+ int val = atoi(env);
+ processNameFormatLong_ = (val != 0) ? true : false;
+ }
+#endif
+
TRACE_EXIT;
}
CProcessContainer::CProcessContainer( bool nodeContainer )
:numProcs_(0)
,nodeContainer_(nodeContainer)
+ ,processNameFormatLong_(true)
,nameMap_(NULL)
,pidMap_(NULL)
,head_(NULL)
@@ -4161,6 +4195,15 @@
abort();
}
+#ifndef NAMESERVER_PROCESS
+ char *env = getenv("SQ_MON_PROCESS_NAME_FORMAT_LONG");
+ if ( env && isdigit(*env) )
+ {
+ int val = atoi(env);
+ processNameFormatLong_ = (val != 0) ? true : false;
+ }
+#endif
+
if ( nodeContainer_ )
{
nameMap_ = new nameMap_t;
@@ -4775,46 +4818,85 @@
char *CProcessContainer::BuildOurName( int nid, int pid, char *name )
{
- int i;
- int rem;
- int cnt[4];
-
const char method_name[] = "CProcessContainer::BuildOurName";
TRACE_ENTRY;
- // Convert Pid into base 35 acsii
- cnt[0] = pid / 42875;
- rem = pid - ( cnt[0] * 42875 );
- cnt[1] = rem / 1225;
- rem -= ( cnt[1] * 1225 );
- cnt[2] = rem / 35;
- rem -= ( cnt[2] * 35 );
- cnt[3] = rem;
+ int i;
+ int rem;
+ int cnt[6];
- // Convert Nid into base 16 acsii
- sprintf(name,"$Z%2.2X",nid);
- for(i=3; i>=0; i--)
+ if (!processNameFormatLong_)
{
- if( cnt[i] < 10 )
+ // Convert Pid into base 35 acsii
+ cnt[0] = pid / 42875; // (35 * 35 * 35)
+ rem = pid - ( cnt[0] * 42875 );
+ cnt[1] = rem / 1225; // (35 * 35)
+ rem -= ( cnt[1] * 1225 );
+ cnt[2] = rem / 35;
+ rem -= ( cnt[2] * 35 );
+ cnt[3] = rem;
+
+ // Process name format long: '$Zxxpppp' xx = nid, pppp = pid
+
+ // Convert Nid into base 16 acsii
+ sprintf(name,"$Z%2.2X",nid);
+
+ // Convert Pid into base 36 ascii
+ for(i=3; i>=0; i--)
{
- name[i+4] = '0'+cnt[i];
- }
- else
- {
- cnt[i] -= 10;
- // we are skipping cap 'o' because it looks like zero.
- if( cnt[i] >= 14 )
+ if( cnt[i] < 10 )
{
- name[i+4] = 'P'+(cnt[i]-14);
+ name[i+4] = '0'+cnt[i];
}
else
{
- name[i+4] = 'A'+cnt[i];
+ cnt[i] -= 10;
+ // we are skipping cap 'o' because it looks like zero.
+ if( cnt[i] >= 14 )
+ {
+ name[i+4] = 'P'+(cnt[i]-14);
+ }
+ else
+ {
+ name[i+4] = 'A'+cnt[i];
+ }
}
}
+ name[8] = '\0';
}
- name[8] = '\0';
-
+ else
+ {
+ // We are skipping 'A', 'I', 'O', and 'U' to distinguish between zero
+ // and one digits, and for political correctness in generated names
+ char b32table[32] = {'0','1','2','3','4','5','6','7','8','9'
+ ,'B','C','D','E','F','G','H','J','K','L','M'
+ ,'N','P','Q','R','S','T','V','W','X','Y','Z' };
+
+ // Convert Pid into base 32 ascii
+ cnt[0] = pid / 33554432; // (32 * 32 * 32 * 32 * 32)
+ rem = pid - ( cnt[0] * 33554432 );
+ cnt[1] = rem / 1048576; // (32 * 32 * 32 * 32)
+ rem -= ( cnt[1] * 1048576 );
+ cnt[2] = rem / 32768; // (32 * 32 * 32)
+ rem -= ( cnt[2] * 32768 );
+ cnt[3] = rem / 1024; // (32 * 32)
+ rem -= ( cnt[3] * 1024 );
+ cnt[4] = rem / 32;
+ rem -= ( cnt[4] * 32 );
+ cnt[5] = rem;
+
+ // Process name format long: '$Zxxxxpppppp' xxxx = nid, pppppp = pid
+
+ // Convert Nid into base 16 ascii
+ sprintf(name,"$Z%4.4X",nid);
+
+ // Convert Pid into base 32 ascii
+ for(i=5; i>=0; i--)
+ {
+ name[i+6] = static_cast<char>(b32table[cnt[i]]);
+ }
+ name[12] = '\0';
+ }
TRACE_EXIT;
return name;
@@ -5398,6 +5480,65 @@
}
#endif
+#ifdef NAMESERVER_PROCESS
+void CProcessContainer::DeleteAllDown()
+{
+ CProcess *process = NULL;
+ int nid = -1;
+ int pid = -1;
+
+ const char method_name[] = "CProcessContainer::DeleteAllDown";
+ TRACE_ENTRY;
+
+ nameMap_t::iterator nameMapIt;
+
+ while ( true )
+ {
+ nameMapLock_.lock();
+ nameMapIt = nameMap_->begin();
+
+ if (nameMap_->size() == 0)
+ {
+ nameMapLock_.unlock();
+ break; // all done
+ }
+
+ process = nameMapIt->second;
+
+ // Delete name map entry
+ nameMap_->erase (nameMapIt);
+
+ nameMapLock_.unlock();
+
+ nid = process->GetNid();
+ pid = process->GetPid();
+
+ if (trace_settings & (TRACE_PROCESS | TRACE_PROCESS_DETAIL))
+ {
+ trace_printf("%s@%d removed from nameMap %p: %s (%d, %d)\n",
+ method_name, __LINE__, nameMap_,
+ process->GetName(), nid, pid);
+ }
+
+ // Delete pid map entry
+ DelFromPidMap ( process );
+
+ if (trace_settings & (TRACE_SYNC | TRACE_REQUEST | TRACE_PROCESS))
+ {
+ trace_printf( "%s@%d - Completed delete for %s (%d, %d)\n"
+ , method_name, __LINE__
+ , process->GetName(), nid, pid);
+ }
+
+ // Remove all processes
+ // PSD will re-create persistent processes on spare node activation
+ Exit_Process( process, true, nid );
+ }
+
+ TRACE_EXIT;
+}
+#endif
+
void CProcessContainer::DeleteFromList( CProcess *process )
{
const char method_name[] = "CProcessContainer::DeleteFromList";
@@ -7086,7 +7227,9 @@
// Note: Exit_Process() will delete the process object, so
// save the process information needed before the call
+#ifndef NAMESERVER_PROCESS
PROCESSTYPE processType = process->GetType();
+#endif
string processName = process->GetName();
int processNid = process->GetNid();
int processPid = process->GetPid();
@@ -7101,6 +7244,7 @@
, processName.c_str(), processNid, processPid, processVerifier
, abend, downNode
, MyNode->IsKillingNode(), MyNode->IsDTMAborted(), MyNode->IsSMSAborted());
+#ifndef NAMESERVER_PROCESS
if ( !MyNode->IsKillingNode() )
{
switch ( processType )
@@ -7147,6 +7291,7 @@
break;
}
}
+#endif
}
break;
default:
diff --git a/core/sqf/monitor/linux/process.h b/core/sqf/monitor/linux/process.h
index 3cde3e5..736ddcc 100644
--- a/core/sqf/monitor/linux/process.h
+++ b/core/sqf/monitor/linux/process.h
@@ -139,6 +139,9 @@
, void *tag
, int & result
);
+#ifdef NAMESERVER_PROCESS
+ void DeleteAllDown();
+#endif
bool Dump_Process( CProcess *dumper, CProcess *process, char *core_path );
void DumpCallback( int nid, pid_t pid, int status );
void Exit_Process( CProcess *process, bool abend, int downNode );
@@ -185,10 +188,12 @@
inline void SetNumProcs( int numProcs ) { numProcs_ = numProcs; };
private:
- int numProcs_; // Number of processes in container
+ int numProcs_; // Number of processes in container
sem_t *Mutex;
- bool nodeContainer_; // true when physical node process container
+ bool nodeContainer_; // true when physical node process container
+ bool processNameFormatLong_; // when true process name format is:
+ // '$Zxxxxpppppp' xxxx = nid, pppppp = pid
nameMap_t *nameMap_;
pidMap_t *pidMap_;
CLock pidMapLock_;
diff --git a/core/sqf/monitor/linux/pstartd.cxx b/core/sqf/monitor/linux/pstartd.cxx
index 99343e1..74b35f3 100644
--- a/core/sqf/monitor/linux/pstartd.cxx
+++ b/core/sqf/monitor/linux/pstartd.cxx
@@ -1037,7 +1037,6 @@
case ProcessType_TMID:
case ProcessType_PERSIST:
case ProcessType_SSMP:
- case ProcessType_NameServer:
if ( persistConfig->GetRequiresDTM() && !requiresDTM )
{
if ( tracing )
@@ -1114,6 +1113,7 @@
break;
case ProcessType_DTM:
case ProcessType_PSD:
+ case ProcessType_NameServer:
case ProcessType_Watchdog:
default:
// Skip these, they are managed by DTM Lead and monitor processes
diff --git a/core/sqf/monitor/linux/ptpclient.cxx b/core/sqf/monitor/linux/ptpclient.cxx
index a88e2d2..39e4443 100644
--- a/core/sqf/monitor/linux/ptpclient.cxx
+++ b/core/sqf/monitor/linux/ptpclient.cxx
@@ -57,9 +57,13 @@
extern CNodeContainer *Nodes;
extern bool IsRealCluster;
extern CMeas Meas;
+extern int MyPNID;
+
+#define MON2MON_IO_RETRIES 3
CPtpClient::CPtpClient (void)
- : ptpSock_(0)
+ : ptpCommPort_(0)
+ , ptpClusterSocks_(NULL)
, seqNum_(0)
{
const char method_name[] = "CPtpClient::CPtpClient";
@@ -72,11 +76,10 @@
SetLocalHost();
}
-
- char * p = getenv( "MON2MON_COMM_PORT" );
- if ( p )
+ char * env = getenv( "MON2MON_COMM_PORT" );
+ if ( env )
{
- basePort_ = atoi( p );
+ ptpCommPort_ = atoi( env );
}
else
{
@@ -88,6 +91,12 @@
abort();
}
+ ptpClusterSocks_ = new int[MAX_NODES];
+ for (int i=0; i < MAX_NODES; ++i)
+ {
+ ptpClusterSocks_[i] = -1;
+ }
+
TRACE_EXIT;
}
@@ -96,17 +105,83 @@
const char method_name[] = "CPtpClient::~CPtpClient";
TRACE_ENTRY;
+ delete [] ptpClusterSocks_;
+
TRACE_EXIT;
}
-int CPtpClient::AddUniqStr( int nid
- , int id
- , const char *stringValue
- , int targetNid
- , const char *targetNodeName )
+int CPtpClient::InitializePtpClient( int pnid, char * ptpPort )
{
- const char method_name[] = "CPtpClient::AddUniqStr";
+ const char method_name[] = "CPtpClient::InitializePtpClient";
TRACE_ENTRY;
+ int err = 0;
+
+ if (ptpClusterSocks_[pnid] == -1)
+ {
+ int sock = Monitor->MkCltSock( ptpPort );
+ if (sock < 0)
+ {
+ err = sock;
+ if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+ {
+ trace_printf( "%s@%d - MkCltSock failed with error %d\n"
+ , method_name, __LINE__, err );
+ }
+ }
+ else
+ {
+ ptpClusterSocks_[pnid] = sock;
+ if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+ {
+ trace_printf( "%s@%d - connected to monitor node=%d(%s), sock=%d, "
+ "ptpClusterSocks_[%d]=%d\n"
+ , method_name, __LINE__
+ , pnid
+ , ptpPort
+ , sock
+ , pnid
+ , ptpClusterSocks_[pnid] );
+ }
+ }
+ }
+
+ TRACE_EXIT;
+ return err;
+}
+
+bool CPtpClient::IsTargetRemote( int targetNid )
+{
+ const char method_name[] = "CPtpClient::IsTargetRemote";
+ TRACE_ENTRY;
+
+ CLNode *targetLNode = Nodes->GetLNode( targetNid );
+ CNode *targetNode = targetLNode->GetNode();
+ bool rs = (targetNode && targetNode->GetPNid() == MyPNID) ? false : true ;
+
+ TRACE_EXIT;
+ return(rs);
+}
+
+int CPtpClient::ProcessAddUniqStr( int nid
+ , int id
+ , const char *stringValue
+ , int targetNid
+ , const char *targetNodeName )
+{
+ const char method_name[] = "CPtpClient::ProcessAddUniqStr";
+ TRACE_ENTRY;
+
+ if (!IsTargetRemote( targetNid ))
+ {
+ if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+ {
+ trace_printf( "%s@%d - Not Sending InternalType_UniqStr request to "
+ "local nid=%d\n"
+ , method_name, __LINE__
+ , targetNid );
+ }
+ return(0);
+ }
if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
{
@@ -129,59 +204,32 @@
// Copy the string
memcpy( stringData, stringValue, stringDataLen );
- int size = offsetof(struct internal_msg_def, u);
- size += sizeof(msg.u.uniqstr);
- size += stringDataLen;
-
+ ptpMsgInfo_t myInfo;
+ myInfo.pnid = MyPNID;
+ myInfo.size = offsetof(struct internal_msg_def, u);
+ myInfo.size += sizeof(msg.u.uniqstr);
+ myInfo.size += stringDataLen;
+
if (trace_settings & TRACE_PROCESS_DETAIL)
{
trace_printf( "%s@%d - size_=%d, forwarding unique string [%d, %d] (%s)\n"
, method_name, __LINE__
- , size
+ , myInfo.size
, msg.u.uniqstr.nid
, msg.u.uniqstr.id
, &msg.u.uniqstr.valueData );
}
- int error = SendToMon("add-unique-string", &msg, size, targetNid, targetNodeName);
+ int error = SendToMon( "process-add-unique-string"
+ , &msg
+ , myInfo
+ , targetNid
+ , targetNodeName);
TRACE_EXIT;
return error;
}
-int CPtpClient::InitializePtpClient( char * ptpPort )
-{
- const char method_name[] = "CPtpClient::InitializePtpClient";
- TRACE_ENTRY;
- int err = 0;
-
- int sock = Monitor->MkCltSock( ptpPort );
- if (sock < 0)
- {
- err = sock;
-
- if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
- {
- trace_printf( "%s@%d - MkCltSock failed with error %d\n"
- , method_name, __LINE__, err );
- }
- }
- else
- {
- ptpSock_ = sock;
- if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
- {
- trace_printf( "%s@%d - connected to monitor node=%s, sock=%d\n"
- , method_name, __LINE__
- , ptpPort
- , ptpSock_ );
- }
- }
-
- TRACE_EXIT;
- return err;
-}
-
int CPtpClient::ProcessClone( CProcess *process )
{
const char method_name[] = "CPtpClient::ProcessClone";
@@ -209,6 +257,18 @@
return(0);
}
+ if (!IsTargetRemote( process->GetParentNid() ))
+ {
+ if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+ {
+ trace_printf( "%s@%d - Not Sending InternalType_Clone request to "
+ "local nid=%d\n"
+ , method_name, __LINE__
+ , process->GetParentNid() );
+ }
+ return(0);
+ }
+
if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
{
trace_printf( "%s@%d - Sending InternalType_Clone request to %s, parentNid=%d"
@@ -281,13 +341,15 @@
msg.u.clone.argvLen = argvLen;
memcpy( stringData, process->userArgv(), argvLen );
- int size = offsetof(struct internal_msg_def, u);
- size += sizeof(msg.u.clone);
- size += nameLen ;
- size += portLen ;
- size += infileLen ;
- size += outfileLen ;
- size += argvLen ;
+ ptpMsgInfo_t myInfo;
+ myInfo.pnid = MyPNID;
+ myInfo.size = offsetof(struct internal_msg_def, u);
+ myInfo.size += sizeof(msg.u.clone);
+ myInfo.size += nameLen ;
+ myInfo.size += portLen ;
+ myInfo.size += infileLen ;
+ myInfo.size += outfileLen ;
+ myInfo.size += argvLen ;
if (trace_settings & TRACE_PROCESS_DETAIL)
{
@@ -299,7 +361,7 @@
"outfile=%s, strlen(outfile)=%d, "
"argc=%d, strlen(total argv)=%d, args=[%.*s]\n"
, method_name, __LINE__
- , size
+ , myInfo.size
, msg.u.clone.programStrId.nid
, msg.u.clone.programStrId.id
, msg.u.clone.pathStrId.nid
@@ -322,7 +384,7 @@
int error = SendToMon( "process-clone"
, &msg
- , size
+ , myInfo
, process->GetParentNid()
, parentLNode->GetNode()->GetName());
@@ -337,6 +399,18 @@
const char method_name[] = "CPtpClient::ProcessExit";
TRACE_ENTRY;
+ if (!IsTargetRemote( targetNid ))
+ {
+ if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+ {
+ trace_printf( "%s@%d - Not Sending InternalType_Exit request to "
+ "local nid=%d\n"
+ , method_name, __LINE__
+ , targetNid );
+ }
+ return(0);
+ }
+
if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
{
trace_printf( "%s@%d - Sending InternalType_Exit request to %s, targetNid=%d"
@@ -359,15 +433,17 @@
strcpy(msg.u.exit.name, process->GetName());
msg.u.exit.abended = process->IsAbended();
- int size = offsetof(struct internal_msg_def, u);
- size += sizeof(msg.u.exit);
+ ptpMsgInfo_t myInfo;
+ myInfo.pnid = MyPNID;
+ myInfo.size = offsetof(struct internal_msg_def, u);
+ myInfo.size += sizeof(msg.u.exit);
if (trace_settings & TRACE_PROCESS_DETAIL)
{
trace_printf( "%s@%d - size_=%d, process %s (%d,%d:%d) "
"abended=%d\n"
, method_name, __LINE__
- , size
+ , myInfo.size
, msg.u.exit.name
, msg.u.exit.nid
, msg.u.exit.pid
@@ -375,7 +451,11 @@
, msg.u.exit.abended );
}
- int error = SendToMon("process-exit", &msg, size, targetNid, targetNodeName);
+ int error = SendToMon( "process-exit"
+ , &msg
+ , myInfo
+ , targetNid
+ , targetNodeName);
TRACE_EXIT;
return error;
@@ -411,6 +491,18 @@
return(0);
}
+ if (!IsTargetRemote( process->GetParentNid() ))
+ {
+ if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+ {
+ trace_printf( "%s@%d - Not Sending InternalType_ProcessInit request to "
+ "local nid=%d\n"
+ , method_name, __LINE__
+ , process->GetParentNid() );
+ }
+ return(0);
+ }
+
if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
{
trace_printf( "%s@%d" " - Sending InternalType_ProcessInit to parent node %s, parentNid=%d"
@@ -438,12 +530,14 @@
msg.u.processInit.tag = tag;
msg.u.processInit.origNid = process->GetParentNid();
- int size = offsetof(struct internal_msg_def, u);
- size += sizeof(msg.u.processInit);
+ ptpMsgInfo_t myInfo;
+ myInfo.pnid = MyPNID;
+ myInfo.size = offsetof(struct internal_msg_def, u);
+ myInfo.size += sizeof(msg.u.processInit);
int error = SendToMon( "process-init"
, &msg
- , size
+ , myInfo
, parentNid
, parentLNode->GetNode()->GetName() );
@@ -460,6 +554,18 @@
const char method_name[] = "CPtpClient::ProcessKill";
TRACE_ENTRY;
+ if (!IsTargetRemote( targetNid ))
+ {
+ if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+ {
+ trace_printf( "%s@%d - Not Sending InternalType_Kill request to "
+ "local nid=%d\n"
+ , method_name, __LINE__
+ , targetNid );
+ }
+ return(0);
+ }
+
if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
{
trace_printf( "%s@%d - Sending InternalType_Kill request to %s, targetNid=%d"
@@ -480,22 +586,28 @@
msg.u.kill.verifier = process->GetVerifier();
msg.u.kill.persistent_abort = abort;
- int size = offsetof(struct internal_msg_def, u);
- size += sizeof(msg.u.exit);
+ ptpMsgInfo_t myInfo;
+ myInfo.pnid = MyPNID;
+ myInfo.size = offsetof(struct internal_msg_def, u);
+ myInfo.size += sizeof(msg.u.exit);
if (trace_settings & TRACE_PROCESS_DETAIL)
{
trace_printf( "%s@%d - size_=%d, process (%d,%d:%d) "
"persistent_abort=%d\n"
, method_name, __LINE__
- , size
+ , myInfo.size
, msg.u.kill.nid
, msg.u.kill.pid
, msg.u.kill.verifier
, msg.u.kill.persistent_abort );
}
- int error = SendToMon("process-kill", &msg, size, targetNid, targetNodeName);
+ int error = SendToMon( "process-kill"
+ , &msg
+ , myInfo
+ , targetNid
+ , targetNodeName);
TRACE_EXIT;
return error;
@@ -508,6 +620,18 @@
const char method_name[] = "CPtpClient::ProcessNew";
TRACE_ENTRY;
+ if (!IsTargetRemote( targetNid ))
+ {
+ if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+ {
+ trace_printf( "%s@%d - Not Sending InternalType_Process request to "
+ "local nid=%d\n"
+ , method_name, __LINE__
+ , targetNid );
+ }
+ return(0);
+ }
+
if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
{
trace_printf( "%s@%d - Sending InternalType_Process request to %s, targetNid=%d"
@@ -567,12 +691,14 @@
msg.u.process.argvLen = argvLen;
memcpy( stringData, process->userArgv(), argvLen );
- int size = offsetof(struct internal_msg_def, u);
- size += sizeof(msg.u.process);
- size += nameLen ;
- size += infileLen ;
- size += outfileLen ;
- size += argvLen ;
+ ptpMsgInfo_t myInfo;
+ myInfo.pnid = MyPNID;
+ myInfo.size = offsetof(struct internal_msg_def, u);
+ myInfo.size += sizeof(msg.u.process);
+ myInfo.size += nameLen ;
+ myInfo.size += infileLen ;
+ myInfo.size += outfileLen ;
+ myInfo.size += argvLen ;
if (trace_settings & TRACE_PROCESS_DETAIL)
{
@@ -583,7 +709,7 @@
"outfile=%s, strlen(outfile)=%d, "
"argc=%d, strlen(total argv)=%d, args=[%.*s]\n"
, method_name, __LINE__
- , size
+ , myInfo.size
, msg.u.process.programStrId.nid
, msg.u.process.programStrId.id
, msg.u.process.pathStrId.nid
@@ -602,7 +728,11 @@
, &msg.u.process.stringData+nameLen+infileLen+outfileLen);
}
- int error = SendToMon("process-new", &msg, size, targetNid, targetNodeName);
+ int error = SendToMon( "process-new"
+ , &msg
+ , myInfo
+ , targetNid
+ , targetNodeName);
TRACE_EXIT;
return error;
@@ -620,6 +750,18 @@
const char method_name[] = "CPtpClient::ProcessNotify";
TRACE_ENTRY;
+ if (!IsTargetRemote( targetNid ))
+ {
+ if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+ {
+ trace_printf( "%s@%d - Not Sending InternalType_Notify request to "
+ "local nid=%d\n"
+ , method_name, __LINE__
+ , targetNid );
+ }
+ return(0);
+ }
+
if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
{
trace_printf( "%s@%d - Sending InternalType_Notify request to %s"
@@ -682,18 +824,342 @@
}
}
- int size = offsetof(struct internal_msg_def, u);
- size += sizeof(msg.u.notify);
+ ptpMsgInfo_t myInfo;
+ myInfo.pnid = MyPNID;
+ myInfo.size = offsetof(struct internal_msg_def, u);
+ myInfo.size += sizeof(msg.u.notify);
- int error = SendToMon("process-notify", &msg, size, targetNid, targetNodeName);
+ int error = SendToMon( "process-notify"
+ , &msg
+ , myInfo
+ , targetNid
+ , targetNodeName);
TRACE_EXIT;
return error;
}
-int CPtpClient::ReceiveSock(char *buf, int size, int sockFd)
+int CPtpClient::ProcessStdInReq( int nid
+ , int pid
+ , StdinReqType type
+ , int supplierNid
+ , int supplierPid )
{
- const char method_name[] = "CPtpClient::ReceiveSock";
+ const char method_name[] = "CPtpClient::ProcessStdInReq";
+ TRACE_ENTRY;
+
+ if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS))
+ {
+ trace_printf( "%s@%d - Sending InternalType_StdinReq request type =%d "
+ "from (%d,%d), for supplier (%d,%d)\n"
+ , method_name, __LINE__
+ , type
+ , nid
+ , pid
+ , supplierNid
+ , supplierPid );
+ }
+
+ CLNode *lnode = Nodes->GetLNode( supplierNid );
+ if (lnode == NULL)
+ {
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s], Can't find supplier node nid=%d "
+ "for stdin data request.\n"
+ , method_name
+ , supplierNid );
+ mon_log_write(PTPCLIENT_STDINREQ_1, SQ_LOG_ERR, buf);
+
+ TRACE_EXIT;
+ return -1;
+ }
+
+ CProcess *process = lnode->GetProcessL( supplierPid );
+ if (process == NULL)
+ {
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s], Can't find process nid=%d, "
+ "pid=%d for stdin data request.\n"
+ , method_name
+ , supplierNid
+ , supplierPid );
+ mon_log_write(PTPCLIENT_STDINREQ_2, SQ_LOG_ERR, buf);
+
+ TRACE_EXIT;
+ return -1;
+ }
+
+ struct internal_msg_def msg;
+ memset(&msg, 0, sizeof(msg));
+ msg.type = InternalType_StdinReq;
+ msg.u.stdin_req.nid = nid;
+ msg.u.stdin_req.pid = pid;
+ msg.u.stdin_req.reqType = type;
+ msg.u.stdin_req.supplier_nid = supplierNid;
+ msg.u.stdin_req.supplier_pid = supplierPid;
+
+ ptpMsgInfo_t myInfo;
+ myInfo.pnid = MyPNID;
+ myInfo.size = offsetof(struct internal_msg_def, u);
+ myInfo.size += sizeof(msg.u.stdin_req);
+
+ if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS_DETAIL))
+ {
+ trace_printf( "%s@%d - size_=%d, type =%d "
+ "from (%d,%d), for supplier (%d,%d)\n"
+ , method_name, __LINE__
+ , myInfo.size
+ , msg.u.stdin_req.reqType
+ , msg.u.stdin_req.nid
+ , msg.u.stdin_req.pid
+ , msg.u.stdin_req.supplier_nid
+ , msg.u.stdin_req.supplier_pid );
+ }
+
+ int error = SendToMon( "process-stdin"
+ , &msg
+ , myInfo
+ , process->GetNid()
+ , lnode->GetNode()->GetName());
+
+ TRACE_EXIT;
+ return error;
+}
+
+int CPtpClient::ProcessStdIoData( int nid
+ , int pid
+ , StdIoType type
+ , ssize_t count
+ , char *data )
+{
+ const char method_name[] = "CPtpClient::ProcessStdIoData";
+ TRACE_ENTRY;
+
+ if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS))
+ {
+ trace_printf( "%s@%d - Sending InternalType_IoData request type =%d "
+ "to (%d,%d), count=%ld\n"
+ , method_name, __LINE__
+ , type
+ , nid
+ , pid
+ , count );
+ }
+
+ CLNode *lnode = Nodes->GetLNode( nid );
+ if (lnode == NULL)
+ {
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s], Can't find supplier node nid=%d "
+ "for stdin data request.\n"
+ , method_name
+ , nid );
+ mon_log_write(PTPCLIENT_STDIODATA_1, SQ_LOG_ERR, buf);
+
+ TRACE_EXIT;
+ return -1;
+ }
+
+ CProcess *process = lnode->GetProcessL( pid );
+ if (process == NULL)
+ {
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s], Can't find process nid=%d, "
+ "pid=%d for stdin data request.\n"
+ , method_name
+ , nid
+ , pid );
+ mon_log_write(PTPCLIENT_STDIODATA_2, SQ_LOG_ERR, buf);
+
+ TRACE_EXIT;
+ return -1;
+ }
+
+ struct internal_msg_def msg;
+ memset(&msg, 0, sizeof(msg));
+ msg.type = InternalType_IoData;
+ msg.u.iodata.nid = nid ;
+ msg.u.iodata.pid = pid ;
+ msg.u.iodata.ioType = type ;
+ msg.u.iodata.length = count;
+ memcpy(&msg.u.iodata.data, data, count);
+
+ ptpMsgInfo_t myInfo;
+ myInfo.pnid = MyPNID;
+ myInfo.size = offsetof(struct internal_msg_def, u);
+ myInfo.size += sizeof(msg.u.iodata);
+
+ if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS_DETAIL))
+ {
+ trace_printf( "%s@%d - size_=%d, type =%d "
+ "to (%d,%d), count=%d\n(%s)"
+ , method_name, __LINE__
+ , myInfo.size
+ , msg.u.iodata.ioType
+ , msg.u.iodata.nid
+ , msg.u.iodata.pid
+ , msg.u.iodata.length
+ , msg.u.iodata.length?msg.u.iodata.data:"\n" );
+ }
+
+ int error = SendToMon( "process-stdio-data"
+ , &msg
+ , myInfo
+ , process->GetNid()
+ , lnode->GetNode()->GetName());
+
+ TRACE_EXIT;
+ return error;
+}
+
+int CPtpClient::SendToMon(const char *reqType, internal_msg_def *msg
+ , ptpMsgInfo_t &myInfo
+ , int targetNid, const char *hostName)
+{
+ const char method_name[] = "CPtpClient::SendToMon";
+ TRACE_ENTRY;
+
+ char ptpHost[MAX_PROCESSOR_NAME];
+ char ptpPort[MAX_PROCESSOR_NAME];
+ int error = 0;
+ int tempPort = ptpCommPort_;
+ int pnid = 0;
+ int sendSock = -1;
+ int retryCount = 0;
+ CNode *node = NULL;
+ CLNode *lnode = NULL;
+
+ ptpHost[0] = '\0';
+ lnode = Nodes->GetLNode( targetNid );
+ node = lnode->GetNode();
+ pnid = node->GetPNid();
+
+ // For virtual env
+ if (!IsRealCluster)
+ {
+ tempPort += targetNid;
+ strcat( ptpHost, ptpHost_ );
+ }
+ else
+ {
+ strcat( ptpHost, hostName );
+ }
+
+ if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+ {
+ trace_printf( "%s@%d - reqType=%s, hostName=%s, targetNid=%d, "
+ "ptpHost=%s, tempPort=%d, ptpCommPort_=%d\n"
+ , method_name, __LINE__
+ , reqType
+ , hostName
+ , targetNid
+ , ptpHost
+ , tempPort
+ , ptpCommPort_ );
+ }
+
+ memset( &ptpPort, 0, MAX_PROCESSOR_NAME );
+ memset( &ptpPortBase_, 0, MAX_PROCESSOR_NAME+100 );
+ sprintf( ptpPortBase_,"%s:", ptpHost );
+ sprintf( ptpPort,"%s%d", ptpPortBase_, tempPort );
+
+retryIO:
+
+ if (ptpClusterSocks_[pnid] == -1)
+ {
+ error = InitializePtpClient( pnid, ptpPort );
+ if (error < 0)
+ {
+ ptpClusterSocks_[pnid] = -1;
+ TRACE_EXIT;
+ return error;
+ }
+ }
+
+ sendSock = ptpClusterSocks_[pnid];
+
+ if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+ {
+ trace_printf( "%s@%d - sending %s REQ to Monitor=%s, sock=%d\n"
+ , method_name, __LINE__
+ , reqType
+ , ptpPort
+ , sendSock );
+ }
+
+ error = SockSend((char *) &myInfo, sizeof(ptpMsgInfo_t), sendSock);
+ if (error)
+ {
+ int err = error;
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s], unable to send %s request size %ld to "
+ "node %s, error: %d(%s)\n"
+ , method_name, reqType, sizeof(ptpMsgInfo_t), ptpHost, err, strerror(err) );
+ mon_log_write(PTPCLIENT_SENDTOMON_1, SQ_LOG_ERR, buf);
+ }
+ else
+ {
+ error = SockSend((char *) msg, myInfo.size, sendSock);
+ if (error)
+ {
+ int err = error;
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s], unable to send %s request to "
+ "node %s, error: %d(%s)\n"
+ , method_name, reqType, ptpHost, err, strerror(err) );
+ mon_log_write(PTPCLIENT_SENDTOMON_2, SQ_LOG_ERR, buf);
+ }
+ }
+
+ if (error)
+ {
+ SockClose( pnid );
+ if ( retryCount < MON2MON_IO_RETRIES )
+ {
+ retryCount++;
+ if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+ {
+ trace_printf( "%s@%d - retrying IO (%d) to node %s\n"
+ , method_name, __LINE__
+ , retryCount
+ , ptpHost );
+ }
+ goto retryIO;
+ }
+ }
+
+ TRACE_EXIT;
+ return error;
+}
+
+void CPtpClient::SockClose( int pnid )
+{
+ const char method_name[] = "CPtpClient::SockClose";
+ TRACE_ENTRY;
+
+ if (ptpClusterSocks_[pnid] != -1)
+ {
+ close( ptpClusterSocks_[pnid] );
+ ptpClusterSocks_[pnid] = -1;
+ }
+
+ TRACE_EXIT;
+}
+
+void CPtpClient::SetLocalHost( void )
+{
+ gethostname( ptpHost_, MAX_PROCESSOR_NAME );
+}
+
+int CPtpClient::SockReceive(char *buf, int size, int sockFd)
+{
+ const char method_name[] = "CPtpClient::SockReceive";
TRACE_ENTRY;
bool readAgain = false;
@@ -764,14 +1230,9 @@
return error;
}
-void CPtpClient::SetLocalHost( void )
+int CPtpClient::SockSend(char *buf, int size, int sockFd)
{
- gethostname( ptpHost_, MAX_PROCESSOR_NAME );
-}
-
-int CPtpClient::SendSock(char *buf, int size, int sockFd)
-{
- const char method_name[] = "CPtpClient::SendSock";
+ const char method_name[] = "CPtpClient::SockSend";
TRACE_ENTRY;
bool sendAgain = false;
@@ -833,270 +1294,3 @@
return error;
}
-int CPtpClient::SendToMon(const char *reqType, internal_msg_def *msg, int size,
- int receiveNode, const char *hostName)
-{
- const char method_name[] = "CPtpClient::SendToMon";
- TRACE_ENTRY;
-
- char monPortString[MAX_PROCESSOR_NAME];
- char ptpHost[MAX_PROCESSOR_NAME];
- char ptpPort[MAX_PROCESSOR_NAME];
- int tempPort = basePort_;
-
- ptpHost[0] = '\0';
-
- // For virtual env
- if (!IsRealCluster)
- {
- tempPort += receiveNode;
- strcat( ptpHost, ptpHost_ );
- }
- else
- {
- strcat( ptpHost, hostName );
- }
-
- if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
- {
- trace_printf( "%s@%d - reqType=%s, hostName=%s, receiveNode=%d, "
- "ptpHost=%s, tempPort=%d, basePort_=%d\n"
- , method_name, __LINE__
- , reqType
- , hostName
- , receiveNode
- , ptpHost
- , tempPort
- , basePort_ );
- }
-
- memset( &ptpPort, 0, MAX_PROCESSOR_NAME );
- memset( &ptpPortBase_, 0, MAX_PROCESSOR_NAME+100 );
-
- strcat( ptpPortBase_, ptpHost );
- strcat( ptpPortBase_, ":" );
- sprintf( monPortString,"%d", tempPort );
- strcat( ptpPort, ptpPortBase_ );
- strcat( ptpPort, monPortString );
-
- int error = InitializePtpClient( ptpPort );
- if (error < 0)
- {
- TRACE_EXIT;
- return error;
- }
-
- if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
- {
- trace_printf( "%s@%d - sending %s REQ to Monitor=%s, sock=%d\n"
- , method_name, __LINE__
- , reqType
- , ptpPort
- , ptpSock_);
- }
-
- error = SendSock((char *) &size, sizeof(size), ptpSock_);
- if (error)
- {
- if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
- {
- trace_printf( "%s@%d - error sending to Monitor=%s, sock=%d, error=%d\n"
- , method_name, __LINE__
- , ptpPort
- , ptpSock_
- , error );
- }
- }
-
- error = SendSock((char *) msg, size, ptpSock_);
- if (error)
- {
- if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
- {
- trace_printf( "%s@%d - error sending to nameserver=%s, sock=%d, error=%d\n"
- , method_name, __LINE__
- , ptpPort
- , ptpSock_
- , error );
- }
- }
-
- close( ptpSock_ );
-
- TRACE_EXIT;
- return error;
-}
-
-int CPtpClient::StdInReq( int nid
- , int pid
- , StdinReqType type
- , int supplierNid
- , int supplierPid )
-{
- const char method_name[] = "CPtpClient::StdInReq";
- TRACE_ENTRY;
-
- if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS))
- {
- trace_printf( "%s@%d - Sending InternalType_StdinReq request type =%d "
- "from (%d,%d), for supplier (%d,%d)\n"
- , method_name, __LINE__
- , type
- , nid
- , pid
- , supplierNid
- , supplierPid );
- }
-
- CLNode *lnode = Nodes->GetLNode( supplierNid );
- if (lnode == NULL)
- {
- char buf[MON_STRING_BUF_SIZE];
- snprintf( buf, sizeof(buf)
- , "[%s], Can't find supplier node nid=%d "
- "for stdin data request.\n"
- , method_name
- , supplierNid );
- mon_log_write(PTPCLIENT_STDINREQ_1, SQ_LOG_ERR, buf);
-
- TRACE_EXIT;
- return -1;
- }
-
- CProcess *process = lnode->GetProcessL( supplierPid );
- if (process == NULL)
- {
- char buf[MON_STRING_BUF_SIZE];
- snprintf( buf, sizeof(buf)
- , "[%s], Can't find process nid=%d, "
- "pid=%d for stdin data request.\n"
- , method_name
- , supplierNid
- , supplierPid );
- mon_log_write(PTPCLIENT_STDINREQ_2, SQ_LOG_ERR, buf);
-
- TRACE_EXIT;
- return -1;
- }
-
- struct internal_msg_def msg;
- memset(&msg, 0, sizeof(msg));
- msg.type = InternalType_StdinReq;
- msg.u.stdin_req.nid = nid;
- msg.u.stdin_req.pid = pid;
- msg.u.stdin_req.reqType = type;
- msg.u.stdin_req.supplier_nid = supplierNid;
- msg.u.stdin_req.supplier_pid = supplierPid;
-
- int size = offsetof(struct internal_msg_def, u);
- size += sizeof(msg.u.stdin_req);
-
- if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS_DETAIL))
- {
- trace_printf( "%s@%d - size_=%d, type =%d "
- "from (%d,%d), for supplier (%d,%d)\n"
- , method_name, __LINE__
- , size
- , msg.u.stdin_req.reqType
- , msg.u.stdin_req.nid
- , msg.u.stdin_req.pid
- , msg.u.stdin_req.supplier_nid
- , msg.u.stdin_req.supplier_pid );
- }
-
- int error = SendToMon("stdin"
- , &msg
- , size
- , process->GetNid()
- , lnode->GetNode()->GetName());
-
- TRACE_EXIT;
- return error;
-}
-
-int CPtpClient::StdIoData( int nid
- , int pid
- , StdIoType type
- , ssize_t count
- , char *data )
-{
- const char method_name[] = "CPtpClient::StdIoData";
- TRACE_ENTRY;
-
- if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS))
- {
- trace_printf( "%s@%d - Sending InternalType_IoData request type =%d "
- "to (%d,%d), count=%ld\n"
- , method_name, __LINE__
- , type
- , nid
- , pid
- , count );
- }
-
- CLNode *lnode = Nodes->GetLNode( nid );
- if (lnode == NULL)
- {
- char buf[MON_STRING_BUF_SIZE];
- snprintf( buf, sizeof(buf)
- , "[%s], Can't find supplier node nid=%d "
- "for stdin data request.\n"
- , method_name
- , nid );
- mon_log_write(PTPCLIENT_STDIODATA_1, SQ_LOG_ERR, buf);
-
- TRACE_EXIT;
- return -1;
- }
-
- CProcess *process = lnode->GetProcessL( pid );
- if (process == NULL)
- {
- char buf[MON_STRING_BUF_SIZE];
- snprintf( buf, sizeof(buf)
- , "[%s], Can't find process nid=%d, "
- "pid=%d for stdin data request.\n"
- , method_name
- , nid
- , pid );
- mon_log_write(PTPCLIENT_STDIODATA_2, SQ_LOG_ERR, buf);
-
- TRACE_EXIT;
- return -1;
- }
-
- struct internal_msg_def msg;
- memset(&msg, 0, sizeof(msg));
- msg.type = InternalType_IoData;
- msg.u.iodata.nid = nid ;
- msg.u.iodata.pid = pid ;
- msg.u.iodata.ioType = type ;
- msg.u.iodata.length = count;
- memcpy(&msg.u.iodata.data, data, count);
-
- int size = offsetof(struct internal_msg_def, u);
- size += sizeof(msg.u.iodata);
-
- if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS_DETAIL))
- {
- trace_printf( "%s@%d - size_=%d, type =%d "
- "to (%d,%d), count=%d\n(%s)"
- , method_name, __LINE__
- , size
- , msg.u.iodata.ioType
- , msg.u.iodata.nid
- , msg.u.iodata.pid
- , msg.u.iodata.length
- , msg.u.iodata.length?msg.u.iodata.data:"\n" );
- }
-
- int error = SendToMon("stdio-data"
- , &msg
- , size
- , process->GetNid()
- , lnode->GetNode()->GetName());
-
- TRACE_EXIT;
- return error;
-}
-
diff --git a/core/sqf/monitor/linux/ptpclient.h b/core/sqf/monitor/linux/ptpclient.h
index e6ddeb4..5239c78 100644
--- a/core/sqf/monitor/linux/ptpclient.h
+++ b/core/sqf/monitor/linux/ptpclient.h
@@ -40,58 +40,66 @@
CPtpClient( void );
virtual ~CPtpClient( void );
- int AddUniqStr( int nid
- , int id
- , const char *stringValue
- , int targetNid
- , const char *targetNodeName );
- int InitializePtpClient( char * ptpPort );
- int ProcessClone( CProcess *process );
+ int InitializePtpClient( int pnid, char* ptpPort );
+ int ProcessAddUniqStr( int nid
+ , int id
+ , const char* stringValue
+ , int targetNid
+ , const char* targetNodeName );
+ int ProcessClone( CProcess* process );
int ProcessExit( CProcess* process
, int parentNid
- , const char *targetNodeName );
- int ProcessInit( CProcess *process
- , void *tag
+ , const char* targetNodeName );
+ int ProcessInit( CProcess* process
+ , void* tag
, int result
, int parentNid );
int ProcessKill( CProcess* process
, bool abort
, int targetNid
- , const char *targetNodeName );
+ , const char* targetNodeName );
int ProcessNew( CProcess* process
, int targetNid
- , const char *targetNodeName );
+ , const char* targetNodeName );
int ProcessNotify( int nid
, int pid
, Verifier_t verifier
, _TM_Txid_External transId
, bool canceled
- , CProcess *targetProcess
+ , CProcess* targetProcess
, int targetNid
- , const char *targetNodeName );
- int StdInReq( int nid
- , int pid
- , StdinReqType type
- , int supplierNid
- , int supplierPid );
- int StdIoData( int nid
- , int pid
- , StdIoType type
- , ssize_t count
- , char *data );
+ , const char* targetNodeName );
+ int ProcessStdInReq( int nid
+ , int pid
+ , StdinReqType type
+ , int supplierNid
+ , int supplierPid );
+ int ProcessStdIoData( int nid
+ , int pid
+ , StdIoType type
+ , ssize_t count
+ , char* data );
private:
- int basePort_;
+ int ptpCommPort_;
char ptpHost_[MAX_PROCESSOR_NAME];
char ptpPortBase_[MAX_PROCESSOR_NAME+100];
- int ptpSock_;
+ int *ptpClusterSocks_;
int seqNum_;
- int ReceiveSock(char *buf, int size, int sockFd);
- int SendSock(char *buf, int size, int sockFd);
- int SendToMon(const char *reqType, internal_msg_def *msg, int size, int receiveNode, const char *hostName);
+ bool IsTargetRemote( int targetNid );
+ int SendToMon( const char* reqType
+ , internal_msg_def* msg
+ , ptpMsgInfo_t &myInfo
+ , int receiveNode
+ , const char* hostName);
void SetLocalHost( void );
+ void SockClose( int pnid );
+ int SockReceive(char* buf, int size, int sockFd);
+ int SockSend( char* buf
+ , int size
+ , int sockFd);
};
#endif
diff --git a/core/sqf/monitor/linux/ptpcommaccept.cxx b/core/sqf/monitor/linux/ptpcommaccept.cxx
index d380d3a..15933dd 100644
--- a/core/sqf/monitor/linux/ptpcommaccept.cxx
+++ b/core/sqf/monitor/linux/ptpcommaccept.cxx
@@ -47,6 +47,7 @@
extern const char *StateString( STATE state);
extern CommType_t CommType;
+static void *ptpProcess( void *arg );
CPtpCommAccept::CPtpCommAccept()
: accepting_(true)
@@ -71,156 +72,206 @@
{
const char method_name[] = "CPtpCommAccept::processNewSock";
TRACE_ENTRY;
-
- struct internal_msg_def msg;
- int rc;
-
- mem_log_write(CMonLog::MON_CONNTONEWMON_2);
- int size;
- rc = Monitor->ReceiveSock( (char *) &size, sizeof(size), sockFd, method_name );
- if ( rc )
- { // Handle error
- close( sockFd );
- char buf[MON_STRING_BUF_SIZE];
- snprintf(buf, sizeof(buf), "[%s], unable to obtain node id from new "
- "monitor: %s.\n", method_name, ErrorMsg(rc));
- mon_log_write(PTP_COMMACCEPT_1, SQ_LOG_ERR, buf);
- return;
- }
- // Get info about connecting monitor
- rc = Monitor->ReceiveSock( (char *) &msg
- , size
- , sockFd
- , method_name );
-
- if ( rc )
- { // Handle error
- close( sockFd );
- char buf[MON_STRING_BUF_SIZE];
- snprintf(buf, sizeof(buf), "[%s], unable to obtain node id from new "
- "monitor: %s.\n", method_name, ErrorMsg(rc));
- mon_log_write(PTP_COMMACCEPT_2, SQ_LOG_ERR, buf);
- return;
- }
- else
+ int rc;
+
+ mem_log_write(CMonLog::MON_CONNTONEWMON_1);
+
+ // need to create context in case back-to-back accept is too fast
+ Context *ctx = new Context();
+ ctx->this_ = this;
+ ctx->pendingFd_ = sockFd;
+ rc = pthread_create(&process_thread_id_, NULL, ptpProcess, ctx);
+ if (rc != 0)
{
- switch ( msg.type )
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf(buf, sizeof(buf), "[%s], ptpProcess thread create error=%d\n",
+ method_name, rc);
+ mon_log_write(PTP_COMMACCEPT_1, SQ_LOG_ERR, buf);
+ }
+
+ TRACE_EXIT;
+}
+
+void CPtpCommAccept::processMonReqs( int sockFd )
+{
+ const char method_name[] = "CPtpCommAccept::processMonReqs";
+ TRACE_ENTRY;
+
+ int rc;
+ struct internal_msg_def msg;
+
+ while ( true )
+ {
+ mem_log_write(CMonLog::MON_CONNTONEWMON_2);
+ ptpMsgInfo_t remoteInfo;
+
+ // Get info about connecting monitor
+ rc = Monitor->ReceiveSock( (char *) &remoteInfo
+ , sizeof(ptpMsgInfo_t)
+ , sockFd
+ , method_name );
+ if ( rc )
+ { // Handle error
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf(buf, sizeof(buf), "[%s], unable to obtain message size and pnid "
+ "from remote monitor: %s.\n", method_name, ErrorMsg(rc));
+ mon_log_write(PTP_COMMACCEPT_2, SQ_LOG_ERR, buf);
+ return;
+ }
+
+ // Get info about connecting monitor
+ rc = Monitor->ReceiveSock( (char *) &msg
+ , remoteInfo.size
+ , sockFd
+ , method_name );
+ if ( rc )
+ { // Handle error
+ char buf[MON_STRING_BUF_SIZE];
+ CNode *node = Nodes->GetNode(remoteInfo.pnid);
+ snprintf( buf, sizeof(buf)
+ , "[%s], unable to obtain message size (%d) from remote "
+ "monitor %d(%s), error: %s.\n"
+ , method_name
+ , remoteInfo.size
+ , remoteInfo.pnid
+ , node ? node->GetName() : ""
+ , ErrorMsg(rc));
+ mon_log_write(PTP_COMMACCEPT_3, SQ_LOG_ERR, buf);
+ return;
+ }
+ else
{
- case InternalType_UniqStr:
+ switch ( msg.type )
{
- if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+ case InternalType_UniqStr:
{
- trace_printf( "%s@%d" " - Received InternalType_UniqStr\n"
- , method_name, __LINE__ );
+ if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+ {
+ trace_printf( "%s@%d" " - Received InternalType_UniqStr\n"
+ , method_name, __LINE__ );
+ }
+ ReqQueue.enqueueUniqStrReq( &msg.u.uniqstr);
+ break;
}
- ReqQueue.enqueueUniqStrReq( &msg.u.uniqstr);
- break;
- }
- case InternalType_Process:
- {
- if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+ case InternalType_Process:
{
- trace_printf( "%s@%d" " - Received InternalType_Process\n"
- , method_name, __LINE__ );
+ if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+ {
+ trace_printf( "%s@%d" " - Received InternalType_Process\n"
+ , method_name, __LINE__ );
+ }
+ ReqQueue.enqueueNewProcReq( &msg.u.process);
+ break;
}
- ReqQueue.enqueueNewProcReq( &msg.u.process);
- break;
- }
- case InternalType_ProcessInit:
- {
- if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+ case InternalType_ProcessInit:
{
- trace_printf( "%s@%d" " - Received InternalType_ProcessInit\n"
- , method_name, __LINE__ );
+ if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+ {
+ trace_printf( "%s@%d" " - Received InternalType_ProcessInit\n"
+ , method_name, __LINE__ );
+ }
+ if ( MyNode->IsMyNode(msg.u.processInit.origNid) )
+ { // New process request originated on this node
+ ReqQueue.enqueueProcInitReq( &msg.u.processInit);
+ }
+ else
+ {
+ abort();
+ }
+ break;
}
- if ( MyNode->IsMyNode(msg.u.processInit.origNid) )
- { // New process request originated on this node
- ReqQueue.enqueueProcInitReq( &msg.u.processInit);
- }
- else
+ case InternalType_Clone:
{
+ if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+ {
+ trace_printf( "%s@%d" " - Received InternalType_Clone\n"
+ , method_name, __LINE__ );
+ }
+ ReqQueue.enqueueCloneReq( &msg.u.clone );
+ break;
+ }
+ case InternalType_Open:
+ {
+ if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+ {
+ trace_printf( "%s@%d" " - Received InternalType_Open\n"
+ , method_name, __LINE__ );
+ }
+ ReqQueue.enqueueOpenReq( &msg.u.open );
+ break;
+ }
+ case InternalType_Notify:
+ {
+ if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+ {
+ trace_printf( "%s@%d" " - Received InternalType_Notify\n"
+ , method_name, __LINE__ );
+ }
+ ReqQueue.enqueueNotifyReq( &msg.u.notify );
+ break;
+ }
+ case InternalType_Exit:
+ {
+ if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+ {
+ trace_printf( "%s@%d" " - Received InternalType_Exit\n"
+ , method_name, __LINE__ );
+ }
+ ReqQueue.enqueueExitReq( &msg.u.exit );
+ break;
+ }
+ case InternalType_Kill:
+ {
+ if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
+ {
+ trace_printf( "%s@%d" " - Received InternalType_Kill\n"
+ , method_name, __LINE__ );
+ }
+ ReqQueue.enqueueKillReq( &msg.u.kill );
+ break;
+ }
+ case InternalType_IoData:
+ {
+ if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS))
+ {
+ trace_printf( "%s@%d" " - Received InternalType_IoData\n"
+ , method_name, __LINE__ );
+ }
+ ReqQueue.enqueueIoDataReq( &msg.u.iodata );
+ break;
+ }
+ case InternalType_StdinReq:
+ {
+ if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS))
+ {
+ trace_printf( "%s@%d" " - Received InternalType_StdinReq\n"
+ , method_name, __LINE__ );
+ }
+ ReqQueue.enqueueStdInReq( &msg.u.stdin_req );
+ break;
+ }
+ default:
+ {
+ char buf[MON_STRING_BUF_SIZE];
+ CNode *node = Nodes->GetNode(remoteInfo.pnid);
+ snprintf( buf, sizeof(buf)
+ , "[%s], Invalid msg.type: %d, msg size=%d, "
+ "remote monitor %d(%s)\n"
+ , method_name
+ , msg.type
+ , remoteInfo.size
+ , remoteInfo.pnid
+ , node ? node->GetName() : "" );
+ mon_log_write(PTP_COMMACCEPT_4, SQ_LOG_ERR, buf);
abort();
}
- break;
- }
- case InternalType_Clone:
- {
- if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
- {
- trace_printf( "%s@%d" " - Received InternalType_Clone\n"
- , method_name, __LINE__ );
- }
- ReqQueue.enqueueCloneReq( &msg.u.clone );
- break;
- }
- case InternalType_Open:
- {
- if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
- {
- trace_printf( "%s@%d" " - Received InternalType_Open\n"
- , method_name, __LINE__ );
- }
- ReqQueue.enqueueOpenReq( &msg.u.open );
- break;
- }
- case InternalType_Notify:
- {
- if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
- {
- trace_printf( "%s@%d" " - Received InternalType_Notify\n"
- , method_name, __LINE__ );
- }
- ReqQueue.enqueueNotifyReq( &msg.u.notify );
- break;
- }
- case InternalType_Exit:
- {
- if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
- {
- trace_printf( "%s@%d" " - Received InternalType_Exit\n"
- , method_name, __LINE__ );
- }
- ReqQueue.enqueueExitReq( &msg.u.exit );
- break;
- }
- case InternalType_Kill:
- {
- if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
- {
- trace_printf( "%s@%d" " - Received InternalType_Kill\n"
- , method_name, __LINE__ );
- }
- ReqQueue.enqueueKillReq( &msg.u.kill );
- break;
- }
- case InternalType_IoData:
- {
- if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS))
- {
- trace_printf( "%s@%d" " - Received InternalType_IoData\n"
- , method_name, __LINE__ );
- }
- ReqQueue.enqueueIoDataReq( &msg.u.iodata );
- break;
- }
- case InternalType_StdinReq:
- {
- if (trace_settings & (TRACE_REDIRECTION | TRACE_PROCESS))
- {
- trace_printf( "%s@%d" " - Received InternalType_StdinReq\n"
- , method_name, __LINE__ );
- }
- ReqQueue.enqueueStdInReq( &msg.u.stdin_req );
- break;
- }
- default:
- {
- abort();
}
}
}
+ close( sockFd );
+
TRACE_EXIT;
}
@@ -285,7 +336,7 @@
continue; // Ok to accept another connection
}
}
-
+
if (shutdown_)
{ // We are being notified to exit.
break;
@@ -296,12 +347,12 @@
char buf[MON_STRING_BUF_SIZE];
snprintf(buf, sizeof(buf), "[%s], cannot accept new monitor: %s.\n",
method_name, strerror(errno));
- mon_log_write(PTP_COMMACCEPT_6, SQ_LOG_ERR, buf);
+ mon_log_write(PTP_COMMACCEPT_5, SQ_LOG_ERR, buf);
}
else
{
processNewSock( sockFd );
- close( sockFd );
+ //close( sockFd );
}
}
@@ -334,7 +385,7 @@
TRACE_EXIT;
}
-// Initialize PtpCommAcceptor thread
+// Initialize ptpCommAcceptor thread
static void *ptpCommAccept(void *arg)
{
const char method_name[] = "ptpCommAccept";
@@ -353,7 +404,7 @@
char buf[MON_STRING_BUF_SIZE];
snprintf(buf, sizeof(buf), "[%s], pthread_sigmask error=%d\n",
method_name, rc);
- mon_log_write(PTP_COMMACCEPT_7, SQ_LOG_ERR, buf);
+ mon_log_write(PTP_COMMACCEPT_6, SQ_LOG_ERR, buf);
}
// Enter thread processing loop
@@ -364,7 +415,38 @@
}
-// Create a commAcceptor thread
+// Initialize ptpProcess thread
+static void *ptpProcess(void *arg)
+{
+ const char method_name[] = "ptpProcess";
+ TRACE_ENTRY;
+
+ // Parameter passed to the thread is an context
+ CPtpCommAccept::Context *ctx = (CPtpCommAccept::Context *) arg;
+ CPtpCommAccept *cao = ctx->this_;
+
+ // Mask all allowed signals
+ sigset_t mask;
+ sigfillset(&mask);
+ sigdelset(&mask, SIGPROF); // allows profiling such as google profiler
+ int rc = pthread_sigmask(SIG_SETMASK, &mask, NULL);
+ if (rc != 0)
+ {
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf(buf, sizeof(buf), "[%s], pthread_sigmask error=%d\n",
+ method_name, rc);
+ mon_log_write(PTP_COMMACCEPT_7, SQ_LOG_ERR, buf);
+ }
+
+ // Enter thread processing loop
+ cao->processMonReqs(ctx->pendingFd_);
+ delete ctx;
+
+ TRACE_EXIT;
+ return NULL;
+}
+
+// Create a ptpCommAccept thread
void CPtpCommAccept::start()
{
const char method_name[] = "CPtpCommAccept::start";
diff --git a/core/sqf/monitor/linux/ptpcommaccept.h b/core/sqf/monitor/linux/ptpcommaccept.h
index ca58139..78e9fe0 100644
--- a/core/sqf/monitor/linux/ptpcommaccept.h
+++ b/core/sqf/monitor/linux/ptpcommaccept.h
@@ -41,12 +41,19 @@
bool isAccepting( void ) { CAutoLock lock(getLocker()); return( accepting_ ); }
void monReqExec( void *req ); //stupid compiler and circular header files
+ void processMonReqs( int sockFd );
void processNewSock( int sockFd );
void startAccepting( void );
void stopAccepting( void );
void start( void );
void shutdownWork( void );
+ typedef struct
+ {
+ CPtpCommAccept *this_;
+ int pendingFd_;
+ } Context;
+
private:
void commAcceptorSock( void );
@@ -54,9 +61,10 @@
bool accepting_;
bool shutdown_;
- // commAccept thread's id
+ // ptpCommAccept thread's id
pthread_t thread_id_;
-
+ // ptpProcess thread's id
+ pthread_t process_thread_id_;
};
#endif
diff --git a/core/sqf/monitor/linux/redirector.cxx b/core/sqf/monitor/linux/redirector.cxx
index 43bb231..70e8f9c 100644
--- a/core/sqf/monitor/linux/redirector.cxx
+++ b/core/sqf/monitor/linux/redirector.cxx
@@ -564,7 +564,10 @@
TRACE_ENTRY;
// Delete pending buffer (if any)
- delete buffer_;
+ if (buffer_)
+ {
+ delete [] buffer_;
+ }
// Delete queued data (if any)
while (!ioDataList_.empty())
@@ -572,7 +575,7 @@
// Get first data buffer from list
buffer_ = ioDataList_.front();
ioDataList_.pop_front();
- delete buffer_;
+ delete [] buffer_;
}
// Alter eyecatcher sequence as a debugging aid to identify deleted object
@@ -646,7 +649,7 @@
retVal = -1;
bufferPos_ = 0;
- delete buffer_;
+ delete [] buffer_;
buffer_ = NULL;
reqType = STDIN_FLOW_ON;
@@ -659,7 +662,7 @@
else
{ // Have written all data, will need to get more.
bufferPos_ = 0;
- delete buffer_;
+ delete [] buffer_;
buffer_ = NULL;
reqType = STDIN_FLOW_ON;
@@ -667,11 +670,11 @@
if (NameServerEnabled)
{
- PtpClient->StdInReq( MyPNID
- , pid_
- , reqType
- , ancestorNid_
- , ancestorPid_ );
+ PtpClient->ProcessStdInReq( MyPNID
+ , pid_
+ , reqType
+ , ancestorNid_
+ , ancestorPid_ );
}
else
{
@@ -792,7 +795,7 @@
char buf[MON_STRING_BUF_SIZE];
sprintf(buf, "[%s], %s is an unsupported file type.\n",
method_name, filename);
- mon_log_write(MON_REDIR_STDINREMOTE_2, SQ_LOG_ERR, buf);
+ mon_log_write(MON_REDIR_STDINREMOTE_2, SQ_LOG_INFO, buf);
close(fd_);
fd_ = -1;
@@ -874,11 +877,11 @@
if (NameServerEnabled)
{
- PtpClient->StdIoData( requesterNid_
- , pid_
- , STDIN_DATA
- , count
- , buffer );
+ PtpClient->ProcessStdIoData( requesterNid_
+ , pid_
+ , STDIN_DATA
+ , count
+ , buffer );
}
else
{
@@ -1177,11 +1180,11 @@
if (NameServerEnabled)
{
- PtpClient->StdIoData( ancestor_nid_
- , ancestor_pid_
- , STDOUT_DATA
- , count
- , buffer );
+ PtpClient->ProcessStdIoData( ancestor_nid_
+ , ancestor_pid_
+ , STDOUT_DATA
+ , count
+ , buffer );
}
else
{
@@ -1654,11 +1657,11 @@
if (NameServerEnabled)
{
- PtpClient->StdInReq( nid
- , pid
- , STDIN_REQ_DATA
- , ancestor_nid
- , ancestor_pid );
+ PtpClient->ProcessStdInReq( nid
+ , pid
+ , STDIN_REQ_DATA
+ , ancestor_nid
+ , ancestor_pid );
}
else
{
diff --git a/core/sqf/monitor/linux/reqdump.cxx b/core/sqf/monitor/linux/reqdump.cxx
index 5d2dd5e..fda3cea 100644
--- a/core/sqf/monitor/linux/reqdump.cxx
+++ b/core/sqf/monitor/linux/reqdump.cxx
@@ -129,8 +129,11 @@
{
if ( target_process_name.size() )
{ // find by name
- targetProcess = Nodes->GetProcess( target_process_name.c_str()
- , target_verifier );
+ if (msg_->u.request.u.dump.target_process_name[0] == '$' )
+ {
+ targetProcess = Nodes->GetProcess( target_process_name.c_str()
+ , target_verifier );
+ }
}
else
{ // find by nid, pid
@@ -152,9 +155,12 @@
, target_process_name.c_str()
, target_verifier );
}
- cloneProcess = Nodes->CloneProcessNs( target_process_name.c_str()
- , target_verifier );
- targetProcess = cloneProcess;
+ if (msg_->u.request.u.dump.target_process_name[0] == '$' )
+ {
+ cloneProcess = Nodes->CloneProcessNs( target_process_name.c_str()
+ , target_verifier );
+ targetProcess = cloneProcess;
+ }
}
else
{ // Name Server find by nid,pid:verifier
diff --git a/core/sqf/monitor/linux/reqevent.cxx b/core/sqf/monitor/linux/reqevent.cxx
index 01c9067..f86582d 100644
--- a/core/sqf/monitor/linux/reqevent.cxx
+++ b/core/sqf/monitor/linux/reqevent.cxx
@@ -163,8 +163,11 @@
if ( target_process_name.size() )
{ // find by name
- targetProcess = Nodes->GetProcess( target_process_name.c_str()
- , target_verifier );
+ if (msg_->u.request.u.event.target_process_name[0] == '$' )
+ {
+ targetProcess = Nodes->GetProcess( target_process_name.c_str()
+ , target_verifier );
+ }
if ( !targetProcess )
{
if (NameServerEnabled)
@@ -176,9 +179,12 @@
, target_process_name.c_str()
, target_verifier );
}
- cloneProcess = Nodes->CloneProcessNs( target_process_name.c_str()
- , target_verifier );
- targetProcess = cloneProcess;
+ if (msg_->u.request.u.event.target_process_name[0] == '$' )
+ {
+ cloneProcess = Nodes->CloneProcessNs( target_process_name.c_str()
+ , target_verifier );
+ targetProcess = cloneProcess;
+ }
}
}
if ( targetProcess && trace_settings & (TRACE_REQUEST | TRACE_PROCESS))
diff --git a/core/sqf/monitor/linux/reqkill.cxx b/core/sqf/monitor/linux/reqkill.cxx
index b59cae2..a7f7b62 100644
--- a/core/sqf/monitor/linux/reqkill.cxx
+++ b/core/sqf/monitor/linux/reqkill.cxx
@@ -211,14 +211,17 @@
{
if ( target_process_name.size() )
{ // find by name (check node state, don't check process state, not backup)
- targetProcess = Nodes->GetProcess( target_process_name.c_str()
- , target_verifier
- , true, false, false );
- if ( targetProcess &&
- (msg_->u.request.u.kill.target_nid == -1 ||
- msg_->u.request.u.kill.target_pid == -1))
+ if (msg_->u.request.u.kill.target_process_name[0] == '$' )
{
- backup = targetProcess->GetBackup ();
+ targetProcess = Nodes->GetProcess( target_process_name.c_str()
+ , target_verifier
+ , true, false, false );
+ if ( targetProcess &&
+ (msg_->u.request.u.kill.target_nid == -1 ||
+ msg_->u.request.u.kill.target_pid == -1))
+ {
+ backup = targetProcess->GetBackup ();
+ }
}
}
else
@@ -256,9 +259,12 @@
, target_process_name.c_str()
, target_verifier );
}
- cloneProcess = Nodes->CloneProcessNs( target_process_name.c_str()
- , target_verifier );
- targetProcess = cloneProcess;
+ if (msg_->u.request.u.kill.target_process_name[0] == '$' )
+ {
+ cloneProcess = Nodes->CloneProcessNs( target_process_name.c_str()
+ , target_verifier );
+ targetProcess = cloneProcess;
+ }
}
else
{ // Name Server find by nid,pid:verifier
diff --git a/core/sqf/monitor/linux/reqnotify.cxx b/core/sqf/monitor/linux/reqnotify.cxx
index 4d278ce..5e69681 100644
--- a/core/sqf/monitor/linux/reqnotify.cxx
+++ b/core/sqf/monitor/linux/reqnotify.cxx
@@ -180,9 +180,12 @@
, target_process_name.c_str()
, target_verifier );
}
- targetProcess = Nodes->GetProcess( target_process_name.c_str()
- , target_verifier
- , true, false, false );
+ if (msg_->u.request.u.notify.target_process_name[0] == '$' )
+ {
+ targetProcess = Nodes->GetProcess( target_process_name.c_str()
+ , target_verifier
+ , true, false, false );
+ }
}
else
{ // find by nid (check node state, don't check process state, backup is Ok)
@@ -226,8 +229,11 @@
, target_process_name.c_str()
, target_verifier );
}
- targetProcess = Nodes->CloneProcessNs( target_process_name.c_str()
- , target_verifier );
+ if (msg_->u.request.u.notify.target_process_name[0] == '$' )
+ {
+ targetProcess = Nodes->CloneProcessNs( target_process_name.c_str()
+ , target_verifier );
+ }
}
else
{ // Name Server find by nid,pid:verifier
@@ -319,7 +325,7 @@
{
if (trace_settings & TRACE_REQUEST)
{
- trace_printf("%s@%d" " - Can't find targerProcess" "\n", method_name, __LINE__);
+ trace_printf("%s@%d" " - Can't find targetProcess" "\n", method_name, __LINE__);
}
}
}
diff --git a/core/sqf/monitor/linux/reqopen.cxx b/core/sqf/monitor/linux/reqopen.cxx
index f131a08..f44b8d4 100644
--- a/core/sqf/monitor/linux/reqopen.cxx
+++ b/core/sqf/monitor/linux/reqopen.cxx
@@ -229,8 +229,8 @@
return false;
}
- CProcess * openerProcess;
- CProcess * openedProcess;
+ CProcess * openerProcess = NULL;
+ CProcess * openedProcess = NULL;
// Get process object for opener process
if ( msg_->u.request.u.open.process_name[0] )
@@ -263,9 +263,12 @@
// Get process object for process to open
if ( msg_->u.request.u.open.target_process_name[0] )
{ // find by name (check node state, don't check process state, backup is NOT Ok)
- openedProcess = Nodes->GetProcess( msg_->u.request.u.open.target_process_name
- , msg_->u.request.u.open.target_verifier
- , true, false, false );
+ if (msg_->u.request.u.open.target_process_name[0] == '$' )
+ {
+ openedProcess = Nodes->GetProcess( msg_->u.request.u.open.target_process_name
+ , msg_->u.request.u.open.target_verifier
+ , true, false, false );
+ }
}
else
{ // find by pid (check node state, don't check process state, backup is Ok)
@@ -291,8 +294,11 @@
, method_name, __LINE__
, target_process_name.c_str()
, target_verifier );
- openedProcess = Nodes->CloneProcessNs( target_process_name.c_str()
- , target_verifier );
+ if (msg_->u.request.u.open.target_process_name[0] == '$' )
+ {
+ openedProcess = Nodes->CloneProcessNs( target_process_name.c_str()
+ , target_verifier );
+ }
}
else
{ // Name Server find by nid,pid:verifier
diff --git a/core/sqf/monitor/linux/reqprocinfo.cxx b/core/sqf/monitor/linux/reqprocinfo.cxx
index d3f04e2..c49a877 100644
--- a/core/sqf/monitor/linux/reqprocinfo.cxx
+++ b/core/sqf/monitor/linux/reqprocinfo.cxx
@@ -417,9 +417,6 @@
requester =
Nodes->GetProcess( nid_ , pid_ , verifier_
, false, false, true );
-// CLNode *lnode = Nodes->GetLNode( nid_ );
-// CNode *node = lnode->GetNode();
-// requester = node->GetProcess( pid_, verifier_ );
#else
requester = MyNode->GetProcess( pid_
, verifier_ );
@@ -483,12 +480,16 @@
, false, false
, target_verifier == -1 ? false : true );
#else
+ CProcess *process = NULL;
// find by name (check node state, don't check process state,
// if verifier is -1, backup is NOT Ok, else is Ok)
- CProcess *process = Nodes->GetProcess( target_process_name.c_str()
- , target_verifier
- , true, false
- , target_verifier == -1 ? false : true );
+ if (msg_->u.request.u.process_info.target_process_name[0] == '$' )
+ {
+ process = Nodes->GetProcess( target_process_name.c_str()
+ , target_verifier
+ , true, false
+ , target_verifier == -1 ? false : true );
+ }
#endif
if (process)
{
diff --git a/core/sqf/monitor/linux/reqqueue.cxx b/core/sqf/monitor/linux/reqqueue.cxx
index 2238d71..3d425f2 100644
--- a/core/sqf/monitor/linux/reqqueue.cxx
+++ b/core/sqf/monitor/linux/reqqueue.cxx
@@ -64,6 +64,7 @@
extern CRedirector Redirector;
extern bool NameServerEnabled;
extern CPtpClient *PtpClient;
+extern CProcess *NameServerProcess;
extern CNameServer *NameServer;
extern CNameServerConfigContainer *NameServerConfig;
#endif
@@ -1578,15 +1579,18 @@
{
if (NameServerEnabled)
{
- if (trace_settings & TRACE_REQUEST)
- trace_printf( "%s@%d" " - Getting parent process from Name Server (%d,%d:%d)\n"
- , method_name, __LINE__
- , parentNid_
- , parentPid_
- , parentVerifier_ );
- parentProcess = Nodes->CloneProcessNs( parentNid_
- , parentPid_
- , parentVerifier_ );
+ if (parentNid_ != -1 && parentPid_ != -1)
+ {
+ if (trace_settings & TRACE_REQUEST)
+ trace_printf( "%s@%d" " - Getting parent process from Name Server (%d,%d:%d)\n"
+ , method_name, __LINE__
+ , parentNid_
+ , parentPid_
+ , parentVerifier_ );
+ parentProcess = Nodes->CloneProcessNs( parentNid_
+ , parentPid_
+ , parentVerifier_ );
+ }
}
}
}
@@ -2598,7 +2602,7 @@
, process_->GetVerifier() );
}
#ifndef NAMESERVER_PROCESS
- if ( NameServerEnabled )
+ if ( NameServerEnabled && process_ != NameServerProcess)
{
int rc = NameServer->ProcessDelete(process_); // in reqQueue thread (CIntChildDeathReq)
if (rc)
@@ -2713,9 +2717,11 @@
else
{
// Stop all processes
- Monitor->HardNodeDown( MyPNID );
#ifndef NAMESERVER_PROCESS
+ Monitor->HardNodeDown( MyPNID );
MyNode->EmptyQuiescingPids();
+#else
+ Monitor->HardNodeDownNs( MyPNID );
#endif
// now stop the Watchdog process
HealthCheck.setState(MON_NODE_DOWN);
@@ -3261,7 +3267,11 @@
if (trace_settings & (TRACE_SYNC | TRACE_REQUEST))
trace_printf("%s@%d - Node down request, pnid=%d\n",
method_name, __LINE__, pnid_);
+#ifndef NAMESERVER_PROCESS
Monitor->HardNodeDown( pnid_ );
+#else
+ Monitor->HardNodeDownNs( pnid_ );
+#endif
TRACE_EXIT;
}
@@ -4063,7 +4073,11 @@
else
{
// Stop all processes
+#ifndef NAMESERVER_PROCESS
Monitor->HardNodeDown( MyPNID );
+#else
+ Monitor->HardNodeDownNs( MyPNID );
+#endif
#ifndef NAMESERVER_PROCESS
MyNode->EmptyQuiescingPids();
#endif
@@ -4241,6 +4255,11 @@
request->setConcurrent(reqConcurrent[msg->u.request.type]);
break;
+ case ReqType_NodeDown:
+ request = new CExtNodeDownNsReq(msgType, pid, sockFd, msg);
+ request->setConcurrent(reqConcurrent[msg->u.request.type]);
+ break;
+
case ReqType_NewProcessNs:
request = new CExtNewProcNsReq(msgType, nid, pid, sockFd, msg);
request->setConcurrent(reqConcurrent[msg->u.request.type]);
@@ -5376,7 +5395,7 @@
}
}
- if (!request->isShutdown())
+ if (request && !request->isShutdown())
{
// Take request out of list
reqQueue_.erase (it);
diff --git a/core/sqf/monitor/linux/reqqueue.h b/core/sqf/monitor/linux/reqqueue.h
index 2f6f030..b600a0f 100644
--- a/core/sqf/monitor/linux/reqqueue.h
+++ b/core/sqf/monitor/linux/reqqueue.h
@@ -456,6 +456,23 @@
};
#endif
+#ifdef NAMESERVER_PROCESS
+class CExtNodeDownNsReq: public CExternalReq
+{
+public:
+ CExtNodeDownNsReq( reqQueueMsg_t msgType
+ , int pid
+ , int sockFd
+ , struct message_def *msg );
+ virtual ~CExtNodeDownNsReq();
+
+ void performRequest();
+
+private:
+ void populateRequestString( void );
+};
+#endif
+
#ifndef NAMESERVER_PROCESS
class CExtNameServerAddReq: public CExternalReq
{
@@ -1801,6 +1818,7 @@
RQEI CExtNewProcReq
RqEB CExtNewProcessNsReq
RQEJ CExtNodeDownReq
+ RqEJ CExtNodeDownNsReq
RQEK CExtNodeInfoReq
RQEK CExtPNodeInfoReq
RQEL CExtNodeUpReq
@@ -1816,6 +1834,7 @@
RQEP CExtProcInfoContReq
RQEQ CExtSetReq
RQER CExtShutdownReq
+ RqER CExtShutdownNsReq
RQES CExtStartupReq
RQET CExtTmLeaderReq
RQEV CExtTmSyncReq
diff --git a/core/sqf/monitor/linux/shell.cxx b/core/sqf/monitor/linux/shell.cxx
index 6c5f14b..0da8c32 100644
--- a/core/sqf/monitor/linux/shell.cxx
+++ b/core/sqf/monitor/linux/shell.cxx
@@ -80,7 +80,7 @@
char LDpath[MAX_SEARCH_PATH];
char Path[MAX_SEARCH_PATH];
char Wdir[MAX_SEARCH_PATH];
-char prompt[13];
+char prompt[MAX_PROCESS_NAME];
int VirtualNodes = 0;
int VirtualNid = -1;
int NumNodes = 0;
@@ -394,51 +394,41 @@
{
bool rs = true;
bool isNameServerEnabled = false;
+ bool isAgentModeEnabled = false;
char* env;
char msgString[MAX_BUFFER] = { 0 };
int val = 0;
- env = getenv("MONITOR_COMM_PORT");
- if ( env )
+ env = getenv("SQ_MON_RUN_MODE");
+ if ( env && (strcmp(env, "AGENT") == 0) )
{
- val = atoi(env);
- if ( val <= 0)
+ isAgentModeEnabled = true;
+ }
+
+ if (isAgentModeEnabled)
+ {
+ env = getenv("MONITOR_COMM_PORT");
+ if ( env )
{
- if (VirtualNodes)
+ val = atoi(env);
+ if ( val <= 0)
{
sprintf( msgString, "[%s] Warning: MONITOR_COMM_PORT value is invalid (%s)!", MyName, env );
write_startup_log( msgString );
printf("%s\n", msgString );
}
- else
- {
- sprintf( msgString, "[%s] Error: MONITOR_COMM_PORT value is invalid (%s)! Set MONITOR_COMM_PORT environment variable and try again.", MyName, env );
- write_startup_log( msgString );
- printf("%s\n", msgString );
- rs = false;
- }
}
- }
-
- env = getenv("MONITOR_SYNC_PORT");
- if ( env )
- {
- val = atoi(env);
- if ( val <= 0)
+
+ env = getenv("MONITOR_SYNC_PORT");
+ if ( env )
{
- if (VirtualNodes)
+ val = atoi(env);
+ if ( val <= 0)
{
sprintf( msgString, "[%s] Warning: MONITOR_SYNC_PORT value is invalid (%s)!", MyName, env );
write_startup_log( msgString );
printf("%s\n", msgString );
}
- else
- {
- sprintf( msgString, "[%s] Error: MONITOR_SYNC_PORT value is invalid (%s)! Set MONITOR_COMM_PORT environment variable and try again.", MyName, env );
- write_startup_log( msgString );
- printf("%s\n", msgString );
- rs = false;
- }
}
}
@@ -446,10 +436,7 @@
if ( env )
{
val = atoi(env);
- if ( val > 0)
- {
- isNameServerEnabled = (val != 0);
- }
+ isNameServerEnabled = (val != 0) ? true : false;
}
if (isNameServerEnabled)
@@ -2391,8 +2378,8 @@
{
if (displayHeader)
{
- printf("[%s] NID,PID(os) PRI TYPE STATES NAME PARENT PROGRAM\n",MyName);
- printf("[%s] ------------ --- ---- ------- ----------- ----------- ---------------\n",MyName);
+ printf("[%s] NID,PID(os) PRI TYPE STATES NAME PARENT PROGRAM\n",MyName);
+ printf("[%s] ------------ --- ---- ------- ------------ ------------ ---------------\n",MyName);
}
show_proc_info();
@@ -5257,7 +5244,7 @@
msg->u.reply.u.process_info.process[i].type
= ProcessType_Undefined;
}
- printf("%3.3d %-4s %c%c%c%c%c%c%c %-11s %-11s %-15s\n",
+ printf("%3.3d %-4s %c%c%c%c%c%c%c %-12s %-12s %-15s\n",
msg->u.reply.u.process_info.process[i].priority,
processTypeStr[msg->u.reply.u.process_info.process[i].type],
(msg->u.reply.u.process_info.process[i].event_messages?'E':'-'),
@@ -9572,14 +9559,8 @@
env = getenv("SQ_NAMESERVER_ENABLED");
if ( env && isdigit(*env) )
{
- if ( strcmp(env,"0") == 0 )
- {
- NameServerEnabled = false;
- }
- else
- {
- NameServerEnabled = true;
- }
+ int val = atoi(env);
+ NameServerEnabled = (val != 0) ? true : false;
}
if ( !VirtualNodes )
diff --git a/core/sqf/monitor/linux/tmsync.cxx b/core/sqf/monitor/linux/tmsync.cxx
index 548ae81..b56c5f8 100644
--- a/core/sqf/monitor/linux/tmsync.cxx
+++ b/core/sqf/monitor/linux/tmsync.cxx
@@ -1012,7 +1012,8 @@
}
if (NameServerEnabled)
{
- if (!MyNode->IsMyNode( tm->GetNid() ))
+ if (!MyNode->IsMyNode( tm->GetNid() )
+ && (req->GetNext() && req->GetNext()->Nid != tm->GetNid() ) )
{
if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_REQUEST | TRACE_SYNC | TRACE_TMSYNC))
{
@@ -1024,6 +1025,7 @@
, tm->GetVerifier() );
}
Nodes->DeleteCloneProcess( tm );
+ tm = NULL;
}
}
diff --git a/core/sqf/monitor/linux/zclient.cxx b/core/sqf/monitor/linux/zclient.cxx
index f9bd698..19a7679 100644
--- a/core/sqf/monitor/linux/zclient.cxx
+++ b/core/sqf/monitor/linux/zclient.cxx
@@ -506,7 +506,7 @@
string masterMonitor( ss.str( ) );
// wait for 3 minutes for giving up.
- while ( (!found) && (retries < 180))
+ while ( (GetState() != ZC_SHUTDOWN) && (!found) && (retries < 180))
{
if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
{
diff --git a/core/sqf/sqenvcom.sh b/core/sqf/sqenvcom.sh
index cd2ea37..0bac47c 100644
--- a/core/sqf/sqenvcom.sh
+++ b/core/sqf/sqenvcom.sh
@@ -681,11 +681,14 @@
# (meaning that mpirun is the parent process of the monitor process)
# AGENT - monitor process runs in agent mode versus MPI collective
#
-# Uncomment the next four environment variables
-#export SQ_MON_CREATOR=MPIRUN
-#export SQ_MON_RUN_MODE=AGENT
-#export MONITOR_COMM_PORT=23390
-#export MONITOR_SYNC_PORT=23380
+# Uncomment the next environment variable
+export SQ_MON_CREATOR=MPIRUN
+if [[ "$SQ_MON_CREATOR" == "MPIRUN" ]]; then
+ export SQ_MON_RUN_MODE=${SQ_MON_RUN_MODE:-AGENT}
+ export MONITOR_COMM_PORT=${MONITOR_COMM_PORT:-23390}
+ export MONITOR_SYNC_PORT=${MONITOR_SYNC_PORT:-23380}
+ export TRAF_SCALING_FACTOR=${TRAF_SCALING_FACTOR:-0.75}
+fi
#
# NAME-SERVER - to disable process replication and enable the name-server
@@ -743,6 +746,11 @@
# set to 0 to disable phandle verifier
export SQ_PHANDLE_VERIFIER=1
+# set to 0 to disable process name long format in clusters larger that 256 nodes
+#export SQ_MON_PROCESS_NAME_FORMAT_LONG=0
+# short format: '$Zxxpppp' xx = nid, pppp = pid
+# long format: '$Zxxxxpppppp' xxxx = nid, pppppp = pid (default)
+
# set to 0 to disable or 1 to enable configuration of DTM as a persistent process
# must re-execute 'sqgen' to effect change
export SQ_DTM_PERSISTENT_PROCESS=1
diff --git a/core/sqf/sql/scripts/gomon.cold b/core/sqf/sql/scripts/gomon.cold
index 6055490..f963e29 100755
--- a/core/sqf/sql/scripts/gomon.cold
+++ b/core/sqf/sql/scripts/gomon.cold
@@ -90,15 +90,24 @@
echo `date`" - Continuing with Startup ..."
echo
fi
+fi
+
+if (
+ [[ $TRAF_AGENT == "CM" ]] ||
+ [[ $SQ_MON_RUN_MODE == "AGENT" ]]
+ )
+then
+ export TRAF_SCALING_FACTOR=${TRAF_SCALING_FACTOR:-0.75}
# Set the number of nodes configured
let node_count=`trafconf -nid-count`
+ #echo "***"
#echo "*** node_count = ${node_count}"
- #echo "*** TRAF_SCALING_FACTOR = $TRAF_SCALING_FACTOR"
+ #echo "*** TRAF_SCALING_FACTOR = ${TRAF_SCALING_FACTOR}"
# allow time for other nodes to integrate, scaled to cluster size
# scaling factor may be non-integer, so use awk to evaluate
- start_delay=$( echo "${node_count} $TRAF_SCALING_FACTOR" | awk '{print $1 * $2}')
+ start_delay=$( echo "${node_count} ${TRAF_SCALING_FACTOR}" | awk '{print $1 * $2}')
echo "***"
echo "***" %`date`" - Waiting ${start_delay} seconds for Monitor processes to integrate"
echo "***"