[TRAFODION-3334] Refactored and re-implemented monitor communication. All monitor
communication channels managed by a specific object in a thread now use a common
class inherited by the object managing a specific communication channel. This new
class provides blocking and non-blocking IO methods.
diff --git a/core/rest/Makefile b/core/rest/Makefile
index c87996e..22db929 100644
--- a/core/rest/Makefile
+++ b/core/rest/Makefile
@@ -36,9 +36,9 @@
$(GENVERS) > $(VFILE)
@if [ $(GENVERS) -nt target/$(BLD_TRAFODION_REST_TARNAME) ]; then echo "update manifest"; $(RM) -f target/$(BLD_TRAFODION_REST_TARNAME); fi
@if [ $(TRAF_HOME)/export/include/SCMBuildStr.h -nt target/$(BLD_TRAFODION_REST_TARNAME) ]; then echo "update manifest"; $(RM) -f target/$(BLD_TRAFODION_REST_TARNAME); fi
- echo "$(MAVEN) site package -DskipTests"
+ echo "$(MAVEN) site package -DskipTests -P jdbc,!eclipse"
echo "### For full Maven output, see file build_rest.log"
- set -o pipefail && $(MAVEN) site package -DskipTests | tee build_rest.log | grep --line-buffered -E -e '^\[[^WId]' -e '^\[INFO\] B[Uu][Ii][Ll][Dd]' -e 'to compile'
+ set -o pipefail && $(MAVEN) site package -DskipTests -P jdbc,!eclipse | tee build_rest.log | grep --line-buffered -E -e '^\[[^WId]' -e '^\[INFO\] B[Uu][Ii][Ll][Dd]' -e 'to compile'
$(RM) $(VFILE)
diff --git a/core/sqf/export/include/common/evl_sqlog_eventnum.h b/core/sqf/export/include/common/evl_sqlog_eventnum.h
index c7e8879..b3165d6 100644
--- a/core/sqf/export/include/common/evl_sqlog_eventnum.h
+++ b/core/sqf/export/include/common/evl_sqlog_eventnum.h
@@ -370,6 +370,7 @@
#define MON_MONITOR_CREATEZCLIENT_5 101022305
#define MON_MONITOR_STARTZCLIENT_1 101022401
#define MON_MONITOR_SIGTERMSIGNALHANDLER_1 101022501
+#define MON_MONITOR_FAILURE_EXIT_1 101022601
/* Module: process.cxx = 03 */
@@ -781,6 +782,7 @@
#define MON_REQQUEUE_REVIVE_4 101180504
#define MON_REQQUEUE_REVIVE_5 101180505
#define MON_REQQUEUE_REVIVE_6 101180506
+#define MON_REQQUEUE_REVIVE_7 101180507
#define MON_REQQUEUE_REQUEST_1 101180601
#define MON_REQ_NODE_ADD_1 101180701
#define MON_REQ_NODE_ADD_2 101180702
@@ -866,6 +868,7 @@
#define MON_WATCHDOG_INITLOCALIO_1 101220301
#define MON_WATCHDOG_INITLOCALIO_2 101220302
#define MON_WATCHDOG_INITLOCALIO_3 101220303
+#define MON_WATCHDOG_FAILURE_EXIT_1 101220401
/* Module: healthcheck.cxx = 23 */
#define MON_HEALTHCHECK_BAD_STATE 101230101
@@ -962,6 +965,7 @@
#define PSTARTD_STARTPROCS_2 101301002
#define PSTARTD_STARTPROCS_3 101301003
#define PSTARTD_MAIN_1 101301101
+#define PSTARTD_FAILURE_EXIT_1 101301201
/* Module robsem.cxx = 31 */
@@ -971,26 +975,29 @@
#define MON_ROBSEM_4 101310104
/* Module commaccept.cxx = 32 */
-#define MON_COMMACCEPT_1 101320101
-#define MON_COMMACCEPT_2 101320102
-#define MON_COMMACCEPT_3 101320103
-#define MON_COMMACCEPT_4 101320104
-#define MON_COMMACCEPT_5 101320105
-#define MON_COMMACCEPT_6 101320106
-#define MON_COMMACCEPT_7 101320107
-#define MON_COMMACCEPT_8 101320108
-#define MON_COMMACCEPT_9 101320109
-#define MON_COMMACCEPT_10 101320110
-#define MON_COMMACCEPT_11 101320111
-#define MON_COMMACCEPT_12 101320112
-#define MON_COMMACCEPT_13 101320113
-#define MON_COMMACCEPT_14 101320114
-#define MON_COMMACCEPT_15 101320115
-#define MON_COMMACCEPT_16 101320116
-#define MON_COMMACCEPT_17 101320117
-#define MON_COMMACCEPT_18 101320118
-#define MON_COMMACCEPT_19 101320119
-#define MON_COMMACCEPT_20 101320120
+#define MON_COMMACCEPT_COMMACCEPT_1 101320101
+#define MON_COMMACCEPT_COMMACCEPT_2 101320102
+#define MON_COMMACCEPT_COMMACCEPT_3 101320103
+#define MON_COMMACCEPT_1 101320201
+#define MON_COMMACCEPT_1 101320201
+#define MON_COMMACCEPT_2 101320202
+#define MON_COMMACCEPT_3 101320203
+#define MON_COMMACCEPT_4 101320204
+#define MON_COMMACCEPT_5 101320205
+#define MON_COMMACCEPT_6 101320206
+#define MON_COMMACCEPT_7 101320207
+#define MON_COMMACCEPT_8 101320208
+#define MON_COMMACCEPT_9 101320209
+#define MON_COMMACCEPT_10 101320210
+#define MON_COMMACCEPT_11 101320211
+#define MON_COMMACCEPT_12 101320212
+#define MON_COMMACCEPT_13 101320213
+#define MON_COMMACCEPT_14 101320214
+#define MON_COMMACCEPT_15 101320215
+#define MON_COMMACCEPT_16 101320216
+#define MON_COMMACCEPT_17 101320217
+#define MON_COMMACCEPT_18 101320218
+#define MON_COMMACCEPT_19 101320219
/* Module: reqnodedown.cxx = 33 */
#define MON_EXT_NODEDOWN_REQ 101330101
@@ -1066,6 +1073,8 @@
#define MON_ZCLIENT_HNDLEERRORCHILDZNODES_1 101373901
#define MON_ZCLIENT_HANDLEERRORZNODE_1 101374001
#define MON_ZCLIENT_HNDLERRCHLZNFORZNCHL_1 101374101
+#define MON_ZCLIENT_ERRORZNODECREATE_1 101374201
+#define MON_ZCLIENT_MYRUNNINGZNODECREATE_1 101374301
/* Module: zconfig.cxx = 38 */
#define ZCONFIG_ZCONFIG_1 101380101
@@ -1100,10 +1109,15 @@
#define NAMESERVER_CLIENTSOCKCREATE_6 101390106
#define NAMESERVER_SENDTONS_1 101390201
#define NAMESERVER_SENDTONS_2 101390202
-#define NAMESERVER_SOCKRECEIVE_1 101390301
-#define NAMESERVER_SOCKSEND_1 101390401
+#define NAMESERVER_NAMESERVER_1 101390301
+#define NAMESERVER_SOCKSENDRECVWAIT_1 101390401
+#define NAMESERVER_SOCKSENDRECVWAIT_2 101390402
+#define NAMESERVER_SOCKSENDRECVWAIT_3 101390403
+#define NAMESERVER_SOCKSENDRECVWAIT_4 101390404
+#define NAMESERVER_SOCKSENDRECVWAIT_5 101390405
#define NAMESERVER_GETM2NPORT_1 101390501
#define NAMESERVER_CHOOSENEXTNS_1 101390601
+#define NAMESERVER_EPOLLCTL_1 101390701
/* Module nscommaccept.cxx = 40 */
#define NS_COMMACCEPT_PROCESSMONREQS_1 101400101
@@ -1116,6 +1130,8 @@
#define NS_COMMACCEPT_MON2NSACCEPTMON_1 101400401
#define NS_COMMACCEPT_MON2NSPROCESS_1 101400501
#define NS_COMMACCEPT_START_1 101400601
+#define NS_NSCOMMACCEPT_NSCOMMACCEPT_1 101400701
+#define NS_NSCOMMACCEPT_NSCOMMACCEPT_2 101400702
/* Module: reqnodedown.cxx = 41 */
#define MON_EXT_NAMESERVERDOWN_REQ 101410101
@@ -1145,26 +1161,83 @@
/* Module ptpclient.cxx = 93 */
#define PTPCLIENT_PTPCLIENT_1 101930101
+#define PTPCLIENT_PTPCLIENT_2 101930102
#define PTPCLIENT_STDINREQ_1 101930201
#define PTPCLIENT_STDINREQ_2 101930202
#define PTPCLIENT_STDIODATA_1 101930301
#define PTPCLIENT_STDIODATA_2 101930302
#define PTPCLIENT_SENDTOMON_1 101930401
#define PTPCLIENT_SENDTOMON_2 101930402
+#define PTPCLIENT_EPOLLCTL_1 101930501
+#define PTPCLIENT_SOCKSENDRECVWAIT_1 101930601
+#define PTPCLIENT_SOCKSENDRECVWAIT_2 101930602
+#define PTPCLIENT_SOCKSENDRECVWAIT_3 101930603
+#define PTPCLIENT_SOCKSENDRECVWAIT_4 101930604
+#define PTPCLIENT_SOCKSENDRECVWAIT_5 101930605
/* Module ptpcommaccept.cxx = 94 */
-#define PTP_COMMACCEPT_1 101940101
-#define PTP_COMMACCEPT_2 101940102
-#define PTP_COMMACCEPT_3 101940103
-#define PTP_COMMACCEPT_4 101940104
-#define PTP_COMMACCEPT_5 101940105
-#define PTP_COMMACCEPT_6 101940106
-#define PTP_COMMACCEPT_7 101940107
-#define PTP_COMMACCEPT_8 101940108
+#define PTP_COMMACCEPT_COMMACCEPT_1 101940101
+#define PTP_COMMACCEPT_COMMACCEPT_2 101940102
+#define PTP_COMMACCEPT_COMMACCEPT_3 101940103
+#define PTP_COMMACCEPT_1 101940201
+#define PTP_COMMACCEPT_2 101940202
+#define PTP_COMMACCEPT_3 101940203
+#define PTP_COMMACCEPT_4 101940204
+#define PTP_COMMACCEPT_5 101940205
+#define PTP_COMMACCEPT_6 101940206
+#define PTP_COMMACCEPT_7 101940207
+#define PTP_COMMACCEPT_8 101940208
/* Module notice.cxx = 95 */
#define NOTICE_NOTIFYREMOTE_1 101950101
+/* Module comm.cxx = 96 */
+#define COMM_COMM_1 101960101
+#define COMM_ACCEPT_1 101960201
+#define COMM_ACCEPT_2 101960202
+#define COMM_ACCEPT_3 101960203
+#define COMM_CONNECTLOCAL_1 101960301
+#define COMM_CONNECTLOCAL_2 101960302
+#define COMM_CONNECTLOCAL_3 101960303
+#define COMM_CONNECT_1 101960401
+#define COMM_CONNECT_2 101960402
+#define COMM_CONNECT_3 101960403
+#define COMM_CONNECT_4 101960404
+#define COMM_CONNECT_5 101960405
+#define COMM_CONNECT_6 101960406
+#define COMM_CONNECT_7 101960407
+#define COMM_CONNECT_8 101960408
+#define COMM_CONNECT_9 101960409
+#define COMM_CONNECT_10 101960410
+#define COMM_CONNECT_11 101960411
+#define COMM_CONNECT_12 101960412
+#define COMM_CONNECT_13 101960413
+#define COMM_CONNECT_14 101960414
+#define COMM_CONNECT_15 101960415
+#define COMM_CONNECT_16 101960416
+#define COMM_CONNECT_17 101960417
+#define COMM_EPOLLCTL_1 101960501
+#define COMM_EPOLLCTLDELETE_1 101960601
+#define COMM_LISTEN_1 101960701
+#define COMM_LISTEN_2 101960702
+#define COMM_LISTEN_3 101960703
+#define COMM_LISTEN_4 101960704
+#define COMM_LISTEN_5 101960705
+#define COMM_LISTEN_6 101960706
+#define COMM_RECEIVE_1 101960801
+#define COMM_SEND_1 101960901
+#define COMM_SENDRECVWAIT_1 101961001
+#define COMM_SENDRECVWAIT_2 101961002
+#define COMM_SENDRECVWAIT_3 101961003
+#define COMM_SENDRECVWAIT_4 101961004
+#define COMM_SENDRECVWAIT_5 101961005
+#define COMM_SENDRECVWAIT_6 101961006
+#define COMM_SENDRECVWAIT_7 101961007
+#define COMM_SETKEEPALIVESOCKOPT_1 101961101
+#define COMM_SETKEEPALIVESOCKOPT_2 101961102
+#define COMM_SETKEEPALIVESOCKOPT_3 101961103
+#define COMM_SETKEEPALIVESOCKOPT_4 101961104
+
/**********************************************/
/*********** Seabed ***********/
diff --git a/core/sqf/monitor/linux/cluster.cxx b/core/sqf/monitor/linux/cluster.cxx
index 1dbcb65..ed3a2ce 100644
--- a/core/sqf/monitor/linux/cluster.cxx
+++ b/core/sqf/monitor/linux/cluster.cxx
@@ -46,6 +46,7 @@
#include "localio.h"
#include "mlio.h"
+#include "comm.h"
#include "monlogging.h"
#include "monsonar.h"
#include "montrace.h"
@@ -86,7 +87,7 @@
extern char MyMPICommPort[MPI_MAX_PORT_NAME];
extern char MySyncPort[MPI_MAX_PORT_NAME];
#ifdef NAMESERVER_PROCESS
-extern CCommAcceptMon CommAcceptMon;
+extern CCommAcceptMon *CommAcceptMon;
extern char MyMon2NsPort[MPI_MAX_PORT_NAME];
#else
extern CProcess *NameServerProcess;
@@ -117,7 +118,7 @@
#endif
extern CMonLog *MonLog;
extern CHealthCheck HealthCheck;
-extern CCommAccept CommAccept;
+extern CCommAccept *CommAccept;
extern CZClient *ZClient;
extern CMeas Meas;
@@ -166,7 +167,7 @@
CNode *node;
CLNode *lnode;
- if (trace_settings & TRACE_INIT)
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
{
trace_printf( "%s@%d - pnid=%d, name=%s (%s) is taking over pnid=%d, name=%s (%s), check health=%d, isIntegrating=%d , integrating pnid=%d\n"
, method_name, __LINE__
@@ -839,14 +840,9 @@
:NumRanks (-1)
,socks_(NULL)
,sockPorts_(NULL)
- ,commSock_(-1)
,syncPort_(0)
,syncSock_(-1)
-#ifdef NAMESERVER_PROCESS
- ,mon2nsSock_(-1)
-#endif
- ,epollFD_(-1)
- ,epollPingFD_(-1),
+ ,epollFD_(-1),
Node (NULL),
LNode (NULL),
currentNodes_ (0),
@@ -1050,21 +1046,11 @@
const char method_name[] = "CCluster::~CCluster";
TRACE_ENTRY;
- if (epollPingFD_ != -1)
- {
- close( epollPingFD_ );
- }
-
if (epollFD_ != -1)
{
close( epollFD_ );
}
- if (commSock_ != -1)
- {
- close( commSock_ );
- }
-
if (syncSock_ != -1)
{
close( syncSock_ );
@@ -1202,10 +1188,23 @@
}
}
- if ( communicate_state && pnid != MyPNID )
+#if 1
+ if ( communicate_state )
{
// just communicate the change and let the real node handle it.
- node->SetChangeState( true );
+ if ( ZClientEnabled )
+ {
+ ZClient->RunningZNodeDelete( node->GetName() );
+ ZClient->MasterZNodeDelete( node->GetName() );
+ }
+ else
+#else
+ if ( communicate_state && pnid != MyPNID )
+ {
+#endif
+ {
+ node->SetChangeState( true );
+ }
return;
}
@@ -1270,7 +1269,7 @@
}
if( IsRealCluster )
{ // Terminate CommAccept thread, remote pings will fail
- CommAccept.shutdownWork();
+ CommAccept->shutdownWork();
}
break;
default: // in all other states
@@ -1392,6 +1391,7 @@
TRACE_ENTRY;
int err = MPI_SUCCESS;
+ int zerr = ZOK;
CNode *node;
if( !IsRealCluster )
@@ -1408,6 +1408,15 @@
node = Nodes->GetNode( pnid );
if (node)
{
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Checking remote monitor %s, "
+ "pnid=%d, peerZnodeFailTime=%ld(secs)\n"
+ , method_name, __LINE__
+ , node->GetName(), node->GetPNid()
+ , peer->znodeFailedTime.tv_sec );
+ }
+
if (node->GetState() != State_Up)
{
if (socks_[pnid] != -1)
@@ -1442,7 +1451,10 @@
struct epoll_event event;
event.data.fd = socks_[pnid];
event.events = 0;
- EpollCtlDelete( epollFD_, socks_[pnid], &event );
+ CComm::EpollCtlDelete( epollFD_
+ , socks_[pnid]
+ , &event
+ , node?(char*)node->GetName():(char*)"remoteNode" );
shutdown( socks_[pnid], SHUT_RDWR);
close( socks_[pnid] );
socks_[pnid] = -1;
@@ -1467,7 +1479,8 @@
}
else
{
- if (node->GetState() != State_Up)
+ if ((ZClientEnabled && ZClient->IsRunningZNodeExpired( node->GetName(), zerr ))
+ || node->GetState() != State_Up)
{
if (socks_[pnid] != -1)
{
@@ -1489,7 +1502,10 @@
struct epoll_event event;
event.data.fd = socks_[pnid];
event.events = 0;
- EpollCtlDelete( epollFD_, socks_[pnid], &event );
+ CComm::EpollCtlDelete( epollFD_
+ , socks_[pnid]
+ , &event
+ , node?(char*)node->GetName():(char*)"remoteNode" );
shutdown( socks_[pnid], SHUT_RDWR);
close( socks_[pnid] );
socks_[pnid] = -1;
@@ -1528,7 +1544,8 @@
}
else
{
- if (node->GetState() != State_Up)
+ if ((ZClientEnabled && ZClient->IsRunningZNodeExpired( node->GetName(), zerr ))
+ || node->GetState() != State_Up)
{
if (socks_[pnid] != -1)
{
@@ -1550,7 +1567,10 @@
struct epoll_event event;
event.data.fd = socks_[pnid];
event.events = 0;
- EpollCtlDelete( epollFD_, socks_[pnid], &event );
+ CComm::EpollCtlDelete( epollFD_
+ , socks_[pnid]
+ , &event
+ , node?(char*)node->GetName():(char*)"remoteNode" );
shutdown( socks_[pnid], SHUT_RDWR);
close( socks_[pnid] );
socks_[pnid] = -1;
@@ -2371,8 +2391,10 @@
{
case InternalType_Null:
if (trace_settings & TRACE_SYNC_DETAIL)
+ {
trace_printf("%s@%d - Physical Node pnid=n%d has nothing to "
"update. \n", method_name, __LINE__, pnid);
+ }
break;
case InternalType_ActivateSpare:
@@ -3297,6 +3319,8 @@
Nodes->AddNodes( );
MyNode = Nodes->GetNode(MyPNID);
+ MyNode->SetCommPort( (char *)CommAccept->getCommPort() );
+ MyNode->SetCommSocketPort( CommAccept->getCommSocketPort() );
Nodes->SetupCluster( &Node, &LNode, &indexToPnid_ );
if ( CommType == CommType_Sockets )
@@ -3820,6 +3844,7 @@
case Reintegrate_Err14:
snprintf(buf, sizeof(buf), "[%s] Aborting.\n", method_name);
+ abortIn = true;
break;
case Reintegrate_Err15:
@@ -3837,7 +3862,7 @@
if ( abortIn )
{
- abort();
+ mon_failure_exit();
}
TRACE_EXIT;
@@ -3853,6 +3878,9 @@
nodeStatus.state = nodeState;
nodeStatus.status = initErr;
+ static int sv_io_wait_timeout = EPOLL_IO_WAIT_TIMEOUT_MSEC;
+ static int sv_io_retry_count = EPOLL_IO_RETRY_COUNT;
+
if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
{
trace_printf( "%s@%d - Sending reintegrate status: state=%s, error=%d\n"
@@ -3875,10 +3903,13 @@
}
break;
case CommType_Sockets:
- rc = Monitor->SendSock( (char *) &nodeStatus
- , sizeof(nodeStatus_t)
- , joinSock_
- , method_name );
+ rc = CComm::SendWait( joinSock_
+ , (char *) &nodeStatus
+ , sizeof(nodeStatus_t)
+ , sv_io_wait_timeout
+ , sv_io_retry_count
+ , IntegratingMonitorPort
+ , method_name );
if ( rc )
{
HandleReintegrateError( rc, Reintegrate_Err8, -1, NULL );
@@ -3904,6 +3935,7 @@
const char method_name[] = "CCluster::PingSockPeer";
TRACE_ENTRY;
+ int zerr = ZOK;
static int sv_connect_wait_timeout = -2;
static int sv_connect_retry_count = 1;
if ( sv_connect_wait_timeout == -2 )
@@ -3919,43 +3951,53 @@
{
sv_connect_retry_count = atoi( lv_connect_retry_count_env );
}
- if ( sv_connect_retry_count > 180 )
+ if ( sv_connect_retry_count > CONNECT_RETRY_COUNT_MAX )
{
- sv_connect_retry_count = 180;
+ sv_connect_retry_count = CONNECT_RETRY_COUNT_MAX;
}
}
else
{
- // default to 64 seconds
- sv_connect_wait_timeout = 16;
- sv_connect_retry_count = 4;
+ sv_connect_wait_timeout = CONNECT_WAIT_TIMEOUT_SEC;
+ sv_connect_retry_count = CONNECT_RETRY_COUNT;
}
}
+ // Use IO timeout same values as connect timeout
+ static int sv_io_wait_timeout = sv_connect_wait_timeout * 1000;
+ static int sv_io_retry_count = sv_connect_retry_count;
+
bool createErrorZNode = true;
int pingSock = -1;
struct timespec currentTime;
- if (MyNode->IsPendingNodeDown())
+ if ((ZClientEnabled && ZClient->IsRunningZNodeExpired( node->GetName(), zerr ))
+ || node->GetState() != State_Up
+ || node->IsPendingNodeDown()
+ || MyNode->IsPendingNodeDown())
{
if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
{
- trace_printf( "%s@%d - MyNode %s (%d) is going down, "
- "socks_[%d]=%d, state=%s, pendingNodeDown=%d\n"
+ trace_printf( "%s@%d - MyNode %s (%d), "
+ "socks_[%d]=%d, state=%s, pendingNodeDown=%d,"
+ "node=%s (%d), RunningZNodeExpired=%d\n"
, method_name, __LINE__
, MyNode->GetName(), MyNode->GetPNid()
, MyNode->GetPNid(), socks_[MyNode->GetPNid()]
, StateString(MyNode->GetState())
- , MyNode->IsPendingNodeDown() );
+ , MyNode->IsPendingNodeDown()
+ , node->GetName(), node->GetPNid()
+ , ZClientEnabled?ZClient->IsRunningZNodeExpired( node->GetName(), zerr ):false );
}
return( false );
}
char buf[MON_STRING_BUF_SIZE];
snprintf( buf, sizeof(buf)
- , "[%s@%d] Pinging remote monitor %s, pnid=%d\n"
+ , "[%s@%d] Pinging remote monitor %s, pnid=%d, peerZnodeFailTime=%ld(secs)\n"
, method_name, __LINE__
- , node->GetName(), node->GetPNid() );
+ , node->GetName(), node->GetPNid()
+ , peerZnodeFailTime.tv_sec );
mon_log_write( MON_PINGSOCKPEER_1, SQ_LOG_INFO, buf );
@@ -3964,21 +4006,26 @@
for (int i = 0; i < (sv_connect_retry_count*sv_connect_wait_timeout); i++ )
{
// Disable connect internal retries
- pingSock = Monitor->Connect( node->GetCommPort(), false );
+ pingSock = CComm::Connect( node->GetCommPort(), false );
if ( pingSock < 0 )
{
clock_gettime(CLOCK_REALTIME, ¤tTime);
- if (node->GetState() != State_Up || node->IsPendingNodeDown())
+ if ((ZClientEnabled && ZClient->IsRunningZNodeExpired( node->GetName(), zerr ))
+ || node->GetState() != State_Up
+ || node->IsPendingNodeDown()
+ || MyNode->IsPendingNodeDown())
{
if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
{
trace_printf( "%s@%d - Node %s (%d) is not up, "
- "socks_[%d]=%d, state=%s, pendingNodeDown=%d\n"
+ "socks_[%d]=%d, state=%s, pendingNodeDown=%d"
+ "MyPendingNodeDown=%d\n"
, method_name, __LINE__
, node->GetName(), node->GetPNid()
, node->GetPNid(), socks_[node->GetPNid()]
, StateString(node->GetState())
- , node->IsPendingNodeDown() );
+ , node->IsPendingNodeDown()
+ , MyNode->IsPendingNodeDown() );
}
break;
}
@@ -4022,7 +4069,7 @@
, method_name, __LINE__
, node->GetName(), node->GetPNid(), (i+1) );
mon_log_write( MON_PINGSOCKPEER_3, SQ_LOG_INFO, buf );
- sleep( 1 );
+ sleep( CONNECT_WAIT_TIMEOUT_SEC );
}
}
else
@@ -4076,78 +4123,59 @@
, nodeInfo.creatorShellVerifier
, nodeInfo.ping );
}
-
- rc = SendSock( (char *) &nodeInfo
- , sizeof(nodeId_t)
- , pingSock
- , method_name );
+ nodeId_t remoteNodeInfo;
+ rc = CComm::SendRecvWait( pingSock
+ , (char *) &nodeInfo
+ , sizeof(nodeId_t)
+ , (char *) &remoteNodeInfo
+ , sizeof(nodeId_t)
+ , sv_io_wait_timeout
+ , sv_io_retry_count
+ , (char *) node->GetName()
+ , method_name);
if ( rc )
{
shutdown( pingSock, SHUT_RDWR);
close( (int)pingSock );
+ char ebuff[256];
char buf[MON_STRING_BUF_SIZE];
snprintf( buf, sizeof(buf)
- , "[%s], Cannot send my node info to node %s: (%s)\n"
+ , "[%s], Cannot send my node info or obtain node info from node %s: (%s)\n"
, method_name
- , node?node->GetName():"", ErrorMsg(rc));
+ , node?node->GetName():"", strerror_r( rc, ebuff, 256 ));
mon_log_write(MON_PINGSOCKPEER_4, SQ_LOG_ERR, buf);
return(false);
}
else
{
- // Get info about connecting monitor
- rc = ReceiveSock( (char *) &nodeInfo
- , sizeof(nodeId_t)
- , pingSock
- , method_name );
- if ( rc )
- { // Handle error
+ if ((ZClientEnabled && ZClient->IsRunningZNodeExpired( node->GetName(), zerr )))
+ { // Handle znode expiration
shutdown( pingSock, SHUT_RDWR);
close( (int)pingSock );
char buf[MON_STRING_BUF_SIZE];
snprintf( buf, sizeof(buf)
- , "[%s], unable to obtain node sync info from remote"
- "monitor: %s.\n"
- , method_name, ErrorMsg(rc));
- mon_log_write(MON_PINGSOCKPEER_5, SQ_LOG_ERR, buf);
+ , "[%s], Ping successful, but znode expired on "
+ "node: %s.\n"
+ , method_name, zerror(zerr));
+ mon_log_write(MON_PINGSOCKPEER_6, SQ_LOG_ERR, buf);
return(false);
}
- else
+
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
{
- if (ZClientEnabled)
- {
- int zerr;
- if ( ZClient->IsRunningZNodeExpired( node->GetName(), zerr ) )
- { // Handle znode expiration
- shutdown( pingSock, SHUT_RDWR);
- close( (int)pingSock );
-
- char buf[MON_STRING_BUF_SIZE];
- snprintf( buf, sizeof(buf)
- , "[%s], Ping successful, but znode expired on "
- "node: %s.\n"
- , method_name, zerror(zerr));
- mon_log_write(MON_PINGSOCKPEER_6, SQ_LOG_ERR, buf);
- return(false);
- }
- }
-
- if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
- {
- trace_printf( "%s@%d - Received from nodeInfo.pnid=%d\n"
- " nodeInfo.nodeName=%s\n"
- " nodeInfo.commPort=%s\n"
- " nodeInfo.syncPort=%s\n"
- " nodeInfo.ping=%d\n"
- , method_name, __LINE__
- , nodeInfo.pnid
- , nodeInfo.nodeName
- , nodeInfo.commPort
- , nodeInfo.syncPort
- , nodeInfo.ping );
- }
+ trace_printf( "%s@%d - Received from remoteNodeInfo.pnid=%d\n"
+ " remoteNodeInfo.nodeName=%s\n"
+ " remoteNodeInfo.commPort=%s\n"
+ " remoteNodeInfo.syncPort=%s\n"
+ " remoteNodeInfo.ping=%d\n"
+ , method_name, __LINE__
+ , remoteNodeInfo.pnid
+ , remoteNodeInfo.nodeName
+ , remoteNodeInfo.commPort
+ , remoteNodeInfo.syncPort
+ , remoteNodeInfo.ping );
}
}
@@ -4161,12 +4189,6 @@
, node->GetName(), node->GetPNid() );
}
- if (ZClientEnabled)
- {
- // Clean up error znodes and where I am their 'only' child
- ZClient->HandleErrorChildZNodesForZNodeChild( Node_name, false );
- }
-
TRACE_EXIT;
return( true );
}
@@ -4452,6 +4474,9 @@
char *pch1;
char *pch2;
+ static int sv_io_wait_timeout = EPOLL_IO_WAIT_TIMEOUT_MSEC;
+ static int sv_io_retry_count = EPOLL_IO_RETRY_COUNT;
+
// Set bit indicating my node is up
upNodes_.upNodes[MyPNID/MAX_NODE_BITMASK] |= (1ull << (MyPNID%MAX_NODE_BITMASK));
@@ -4480,7 +4505,7 @@
bool lv_did_not_connect_in_first_attempt = false;
while ( ! lv_done )
{
- joinSock_ = Monitor->Connect( IntegratingMonitorPort );
+ joinSock_ = CComm::Connect( IntegratingMonitorPort );
if ( joinSock_ < 0 )
{
if ( IsAgentMode )
@@ -4540,10 +4565,13 @@
, myNodeInfo.ping );
}
- rc = Monitor->SendSock( (char *) &myNodeInfo
- , sizeof(nodeId_t)
- , joinSock_
- , method_name );
+ rc = CComm::SendWait( joinSock_
+ , (char *) &myNodeInfo
+ , sizeof(nodeId_t)
+ , sv_io_wait_timeout
+ , sv_io_retry_count
+ , IntegratingMonitorPort
+ , method_name );
if ( rc )
{
HandleReintegrateError( rc, Reintegrate_Err9, -1, NULL );
@@ -4565,10 +4593,13 @@
nodeId_t *nodeInfo;
size_t nodeInfoSize = (sizeof(nodeId_t) * pnodeCount);
nodeInfo = (nodeId_t *) new char[nodeInfoSize];
- rc = Monitor->ReceiveSock( (char *)nodeInfo
- , nodeInfoSize
- , joinSock_
- , method_name );
+ rc = CComm::ReceiveWait( joinSock_
+ , (char *)nodeInfo
+ , nodeInfoSize
+ , sv_io_wait_timeout
+ , sv_io_retry_count
+ , IntegratingMonitorPort
+ , method_name );
if ( rc )
{
HandleReintegrateError( rc, Reintegrate_Err3, -1, NULL );
@@ -4614,10 +4645,13 @@
// Get acknowledgement that creator monitor is ready to
// integrate this node.
int creatorpnid = -1;
- rc = Monitor->ReceiveSock( (char *) &creatorpnid
- , sizeof(creatorpnid)
- , joinSock_
- , method_name );
+ rc = CComm::ReceiveWait( joinSock_
+ , (char *)&creatorpnid
+ , sizeof(creatorpnid)
+ , sv_io_wait_timeout
+ , sv_io_retry_count
+ , IntegratingMonitorPort
+ , method_name );
if ( rc || creatorpnid != nodeInfo[i].creatorPNid )
{
HandleReintegrateError( rc, Reintegrate_Err15, i, NULL );
@@ -4643,10 +4677,12 @@
pch1 = strtok (commPort,":");
pch1 = strtok (NULL,":");
Node[nodeInfo[i].pnid]->SetCommSocketPort( atoi(pch1) );
+
Node[nodeInfo[i].pnid]->SetSyncPort( syncPort );
pch2 = strtok (syncPort,":");
pch2 = strtok (NULL,":");
Node[nodeInfo[i].pnid]->SetSyncSocketPort( atoi(pch2) );
+
sockPorts_[nodeInfo[i].pnid] = Node[nodeInfo[i].pnid]->GetSyncSocketPort();
Node[nodeInfo[i].pnid]->SetState( State_Up );
@@ -4663,10 +4699,13 @@
// Tell creator we are ready to accept its connection
int mypnid = MyPNID;
- rc = Monitor->SendSock( (char *) &mypnid
- , sizeof(mypnid)
- , joinSock_
- , method_name );
+ rc = CComm::SendWait( joinSock_
+ , (char *) &mypnid
+ , sizeof(mypnid)
+ , sv_io_wait_timeout
+ , sv_io_retry_count
+ , IntegratingMonitorPort
+ , method_name );
if ( rc )
{
HandleReintegrateError( rc, Reintegrate_Err4, i, &nodeInfo[i] );
@@ -4725,7 +4764,7 @@
TEST_POINT( TP013_NODE_UP );
// Connect to existing monitor
- existingCommFd = Monitor->Connect( nodeInfo[i].commPort );
+ existingCommFd = CComm::Connect( nodeInfo[i].commPort );
if ( existingCommFd < 0 )
{
HandleReintegrateError( rc, Reintegrate_Err5, i, &nodeInfo[i] );
@@ -4742,10 +4781,13 @@
// Send this nodes name and port number so other monitor
// knows who we are.
- rc = Monitor->SendSock( (char *) &myNodeInfo
- , sizeof(nodeId_t)
- , existingCommFd
- , method_name );
+ rc = CComm::SendWait( existingCommFd
+ , (char *) &myNodeInfo
+ , sizeof(nodeId_t)
+ , sv_io_wait_timeout
+ , sv_io_retry_count
+ , IntegratingMonitorPort
+ , method_name );
if ( rc )
{
HandleReintegrateError( rc, Reintegrate_Err4, i, &nodeInfo[i] );
@@ -4758,10 +4800,13 @@
// the monitors in the cluster to integrate the new node
// before one or more was ready to do the integration.
int remotepnid = -1;
- rc = Monitor->ReceiveSock( (char *) &remotepnid
- , sizeof(remotepnid)
- , existingCommFd
- , method_name );
+ rc = CComm::ReceiveWait( existingCommFd
+ , (char *)&remotepnid
+ , sizeof(remotepnid)
+ , sv_io_wait_timeout
+ , sv_io_retry_count
+ , IntegratingMonitorPort
+ , method_name );
if ( rc || remotepnid != nodeInfo[i].pnid )
{
HandleReintegrateError( rc, Reintegrate_Err15, i, NULL );
@@ -4953,7 +4998,7 @@
integratingPNid_ = -1;
#ifdef NAMESERVER_PROCESS
- if (!CommAcceptMon.isAccepting())
+ if (!CommAcceptMon->isAccepting())
{
if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
{
@@ -4962,11 +5007,11 @@
}
// Indicate to the commAcceptor thread to begin accepting connections
- CommAcceptMon.startAccepting();
+ CommAcceptMon->startAccepting();
}
#endif
- if (!CommAccept.isAccepting())
+ if (!CommAccept->isAccepting())
{
if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
{
@@ -4975,7 +5020,7 @@
}
// Indicate to the commAcceptor thread to begin accepting connections
- CommAccept.startAccepting();
+ CommAccept->startAccepting();
}
TRACE_EXIT;
@@ -5380,10 +5425,53 @@
memset( p, 0, sizeof(p) );
tag = tag; // make compiler happy
struct timespec currentTime;
+ struct timespec ioInitialTime;
// Set to twice the ZClient session timeout
static int sessionTimeout = ZClientEnabled
? (ZClient->SessionTimeoutGet() * 2) : 120;
+ static int sv_epoll_wait_timeout = -2;
+ static int sv_epoll_retry_count = 1;
+ if ( sv_epoll_wait_timeout == -2 )
+ {
+ char *lv_epoll_wait_timeout_env = getenv( "SQ_MON_EPOLL_WAIT_TIMEOUT" );
+ if ( lv_epoll_wait_timeout_env )
+ {
+ // convert to milliseconds
+ sv_epoll_wait_timeout = atoi( lv_epoll_wait_timeout_env ) * 1000;
+ char *lv_epoll_retry_count_env = getenv( "SQ_MON_EPOLL_RETRY_COUNT" );
+ if ( lv_epoll_retry_count_env )
+ {
+ sv_epoll_retry_count = atoi( lv_epoll_retry_count_env );
+ }
+ else
+ {
+ sv_epoll_wait_timeout = EPOLL_IO_WAIT_TIMEOUT_MSEC;
+ sv_epoll_retry_count = EPOLL_IO_RETRY_COUNT;
+ }
+ if ( sv_epoll_retry_count > EPOLL_IO_RETRY_COUNT_MAX )
+ {
+ sv_epoll_retry_count = EPOLL_IO_RETRY_COUNT_MAX;
+ }
+ }
+ else
+ {
+ // default to 64 seconds
+ sv_epoll_wait_timeout = EPOLL_IO_WAIT_TIMEOUT_MSEC;
+ sv_epoll_retry_count = EPOLL_IO_RETRY_COUNT;
+ }
+
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s@%d] EPOLL timeout wait_timeout=%d msecs, retry_count=%d\n"
+ , method_name
+ , __LINE__
+ , sv_epoll_wait_timeout
+ , sv_epoll_retry_count );
+
+ mon_log_write( MON_CLUSTER_ALLGATHERSOCK_1, SQ_LOG_INFO, buf );
+ }
+
int nsent = 0, nrecv = 0;
for ( int iPeer = 0; iPeer < GetConfigPNodesCount(); iPeer++ )
{
@@ -5405,10 +5493,37 @@
peer->p_n2recv = -1;
peer->p_buff = ((char *) rbuf) + (indexToPnid_[iPeer] * CommBufSize);
+ // Set the session timeout relative to now
+ clock_gettime(CLOCK_REALTIME, &peer->znodeFailedTime);
+ peer->znodeFailedTime.tv_sec += sessionTimeout;
+ if (trace_settings & (TRACE_SYNC_DETAIL))
+ {
+ trace_printf( "%s@%d" " - peer %d znodeFailedTime=%ld(secs)\n"
+ , method_name, __LINE__
+ , indexToPnid_[iPeer]
+ , peer->znodeFailedTime.tv_sec);
+ }
+
struct epoll_event event;
event.data.fd = socks_[indexToPnid_[iPeer]];
event.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP | EPOLLERR | EPOLLHUP;
- EpollCtl( epollFD_, EPOLL_CTL_ADD, socks_[indexToPnid_[iPeer]], &event );
+ if (trace_settings & (TRACE_SYNC_DETAIL))
+ {
+ trace_printf( "%s@%d - EPOLL state change "
+ "to/from pnid=%d, efd=%d, op=%s, sockFd=%d, event=%s\n"
+ , method_name, __LINE__
+ , indexToPnid_[iPeer]
+ , epollFD_
+ , EpollOpString(EPOLL_CTL_ADD)
+ , socks_[indexToPnid_[iPeer]]
+ , EpollEventString(event.events) );
+
+ }
+ CComm::EpollCtl( epollFD_
+ , EPOLL_CTL_ADD
+ , socks_[indexToPnid_[iPeer]]
+ , &event
+ , (char*)Node[indexToPnid_[iPeer]]->GetName() );
}
}
@@ -5434,54 +5549,12 @@
inBarrier_ = true;
MonStats->BarrierWaitIncr( );
- static int sv_epoll_wait_timeout = -2;
- static int sv_epoll_retry_count = 1;
- if ( sv_epoll_wait_timeout == -2 )
- {
- char *lv_epoll_wait_timeout_env = getenv( "SQ_MON_EPOLL_WAIT_TIMEOUT" );
- if ( lv_epoll_wait_timeout_env )
- {
- // convert to milliseconds
- sv_epoll_wait_timeout = atoi( lv_epoll_wait_timeout_env ) * 1000;
- char *lv_epoll_retry_count_env = getenv( "SQ_MON_EPOLL_RETRY_COUNT" );
- if ( lv_epoll_retry_count_env )
- {
- sv_epoll_retry_count = atoi( lv_epoll_retry_count_env );
- }
- else
- {
- // default to 64 seconds
- sv_epoll_wait_timeout = 16000;
- sv_epoll_retry_count = 4;
- }
- if ( sv_epoll_retry_count > 180 )
- {
- sv_epoll_retry_count = 180;
- }
- }
- else
- {
- // default to 64 seconds
- sv_epoll_wait_timeout = 16000;
- sv_epoll_retry_count = 4;
- }
-
- char buf[MON_STRING_BUF_SIZE];
- snprintf( buf, sizeof(buf)
- , "[%s@%d] EPOLL timeout wait_timeout=%d msecs, retry_count=%d\n"
- , method_name
- , __LINE__
- , sv_epoll_wait_timeout
- , sv_epoll_retry_count );
-
- mon_log_write( MON_CLUSTER_ALLGATHERSOCK_1, SQ_LOG_INFO, buf );
- }
-
bool resetConnections = false;
int peerTimedoutCount = 0;
// do the work
struct epoll_event events[2*GetConfigPNodesMax() + 1];
+ clock_gettime(CLOCK_REALTIME, &ioInitialTime);
while ( 1 )
{
reconnected:
@@ -5493,6 +5566,16 @@
int nw;
peer_t *peer;
+ if (trace_settings & (TRACE_SYNC_DETAIL))
+ {
+ trace_printf( "%s@%d" " - IO (seqNum_=%lld) ioInitialTime=%ld(secs), "
+ "maxEvents=%d\n"
+ , method_name, __LINE__
+ , seqNum_
+ , ioInitialTime.tv_sec
+ , maxEvents );
+ }
+
while ( 1 )
{
nw = epoll_wait( epollFD_, events, maxEvents, sv_epoll_wait_timeout );
@@ -5515,18 +5598,18 @@
if ( (peer->p_receiving) || (peer->p_sending) )
{
if (peer->p_initial_check && !reconnecting)
- { // Set the session timeout relative to now
+ {
peer->p_initial_check = false;
- clock_gettime(CLOCK_REALTIME, &peer->znodeFailedTime);
- peer->znodeFailedTime.tv_sec += sessionTimeout;
if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
{
- trace_printf( "%s@%d" " - Znode Fail Time %ld(secs)\n"
+ trace_printf( "%s@%d - peer=%d, ioInitialTime=%ld(secs), "
+ "znodeFailedTime=%ld(secs)\n"
, method_name, __LINE__
+ , indexToPnid_[iPeer]
+ , ioInitialTime.tv_sec
, peer->znodeFailedTime.tv_sec);
}
}
-
numPeersTimedout++;
peer->p_timeout_count++;
if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
@@ -5623,7 +5706,23 @@
struct epoll_event event;
event.data.fd = socks_[indexToPnid_[i]];
event.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP | EPOLLERR | EPOLLHUP;
- EpollCtl( epollFD_, EPOLL_CTL_ADD, socks_[indexToPnid_[i]], &event );
+ if (trace_settings & (TRACE_SYNC_DETAIL))
+ {
+ trace_printf( "%s@%d - EPOLL state change "
+ "to/from pnid=%d, efd=%d, op=%s, sockFd=%d, event=%s\n"
+ , method_name, __LINE__
+ , indexToPnid_[i]
+ , epollFD_
+ , EpollOpString(EPOLL_CTL_ADD)
+ , socks_[indexToPnid_[i]]
+ , EpollEventString(event.events) );
+
+ }
+ CComm::EpollCtl( epollFD_
+ , EPOLL_CTL_ADD
+ , socks_[indexToPnid_[i]]
+ , &event
+ , (char*)Node[indexToPnid_[i]]->GetName() );
}
}
} // (resetConnections)
@@ -6036,7 +6135,23 @@
}
if ( op == EPOLL_CTL_DEL || op == EPOLL_CTL_MOD )
{
- EpollCtl( epollFD_, op, fd, &event );
+ if (trace_settings & (TRACE_SYNC_DETAIL))
+ {
+ trace_printf( "%s@%d - EPOLL state change "
+ "to/from pnid=%d, efd=%d, op=%s, sockFd=%d, event=%s\n"
+ , method_name, __LINE__
+ , indexToPnid_[iPeer]
+ , epollFD_
+ , EpollOpString(op)
+ , fd
+ , EpollEventString(event.events) );
+
+ }
+ CComm::EpollCtl( epollFD_
+ , op
+ , fd
+ , &event
+ , (char*)Node[indexToPnid_[iPeer]]->GetName() );
if (op == EPOLL_CTL_DEL
&& stats[indexToPnid_[iPeer]].MPI_ERROR == MPI_ERR_EXITED)
{
@@ -6094,6 +6209,7 @@
int zerr = ZOK;
CNode *node;
peer_t *peer;
+ struct timespec currentTime;
if( !IsRealCluster )
{ // In virtual cluster, just return success
@@ -6127,7 +6243,8 @@
idst = j;
node = Nodes->GetNode( indexToPnid_[idst] );
if (!node) continue;
- if (node->GetState() != State_Up)
+ if ((ZClientEnabled && ZClient->IsRunningZNodeExpired( node->GetName(), zerr ))
+ || node->GetState() != State_Up )
{
if (socks_[indexToPnid_[idst]] != -1)
{ // Peer socket is still active
@@ -6161,7 +6278,10 @@
struct epoll_event event;
event.data.fd = socks_[indexToPnid_[idst]];
event.events = 0;
- EpollCtlDelete( epollFD_, socks_[indexToPnid_[idst]], &event );
+ CComm::EpollCtlDelete( epollFD_
+ , socks_[indexToPnid_[idst]]
+ , &event
+ , node?(char*)node->GetName():(char*)"remoteNode" );
shutdown( socks_[indexToPnid_[idst]], SHUT_RDWR);
close( socks_[indexToPnid_[idst]] );
socks_[indexToPnid_[idst]] = -1;
@@ -6236,7 +6356,10 @@
struct epoll_event event;
event.data.fd = socks_[indexToPnid_[idst]];
event.events = 0;
- EpollCtlDelete( epollFD_, socks_[indexToPnid_[idst]], &event );
+ CComm::EpollCtlDelete( epollFD_
+ , socks_[indexToPnid_[idst]]
+ , &event
+ , node?(char*)node->GetName():(char*)"remoteNode" );
shutdown( socks_[indexToPnid_[idst]], SHUT_RDWR);
close( socks_[indexToPnid_[idst]] );
socks_[indexToPnid_[idst]] = -1;
@@ -6263,7 +6386,8 @@
idst = i;
node = Nodes->GetNode( indexToPnid_[idst] );
if (!node) continue;
- if (node->GetState() != State_Up)
+ if ((ZClientEnabled && ZClient->IsRunningZNodeExpired( node->GetName(), zerr ))
+ || node->GetState() != State_Up )
{
if (socks_[indexToPnid_[idst]] != -1)
{ // Peer socket is still active
@@ -6297,7 +6421,10 @@
struct epoll_event event;
event.data.fd = socks_[indexToPnid_[idst]];
event.events = 0;
- EpollCtlDelete( epollFD_, socks_[indexToPnid_[idst]], &event );
+ CComm::EpollCtlDelete( epollFD_
+ , socks_[indexToPnid_[idst]]
+ , &event
+ , node?(char*)node->GetName():(char*)"remoteNode" );
shutdown( socks_[indexToPnid_[idst]], SHUT_RDWR);
close( socks_[indexToPnid_[idst]] );
socks_[indexToPnid_[idst]] = -1;
@@ -6359,7 +6486,10 @@
struct epoll_event event;
event.data.fd = socks_[indexToPnid_[idst]];
event.events = 0;
- EpollCtlDelete( epollFD_, socks_[indexToPnid_[idst]], &event );
+ CComm::EpollCtlDelete( epollFD_
+ , socks_[indexToPnid_[idst]]
+ , &event
+ , node?(char*)node->GetName():(char*)"remoteNode" );
shutdown( socks_[indexToPnid_[idst]], SHUT_RDWR);
close( socks_[indexToPnid_[idst]] );
socks_[indexToPnid_[idst]] = -1;
@@ -6433,6 +6563,33 @@
int reconnectSock = -1;
struct hostent *he;
+ static int sv_io_wait_timeout = -2;
+ static int sv_io_retry_count = -1;
+ if ( sv_io_wait_timeout == -2 )
+ {
+ // Use the EPOLL timeout and retry values
+ char *lv_wait_timeout_env = getenv( "SQ_MON_EPOLL_WAIT_TIMEOUT" );
+ if ( lv_wait_timeout_env )
+ {
+ // Timeout in seconds
+ sv_io_wait_timeout = atoi( lv_wait_timeout_env ) * 1000;
+ char *lv_retry_count_env = getenv( "SQ_MON_EPOLL_RETRY_COUNT" );
+ if ( lv_retry_count_env )
+ {
+ sv_io_retry_count = atoi( lv_retry_count_env );
+ }
+ if ( sv_io_retry_count > EPOLL_IO_RETRY_COUNT_MAX )
+ {
+ sv_io_retry_count = EPOLL_IO_RETRY_COUNT_MAX;
+ }
+ }
+ else
+ {
+ sv_io_wait_timeout = EPOLL_IO_WAIT_TIMEOUT_MSEC;
+ sv_io_retry_count = EPOLL_IO_RETRY_COUNT;
+ }
+ }
+
// Get my host structure via my node name
he = gethostbyname( MyNode->GetName() );
if ( !he )
@@ -6458,11 +6615,11 @@
}
// Accept connection from peer
- reconnectSock = AcceptSock( syncSock_ );
+ reconnectSock = CComm::Accept( syncSock_ );
if (reconnectSock < 0)
{
char buf[MON_STRING_BUF_SIZE];
- snprintf( buf, sizeof(buf), "[%s@%d] AcceptSock(%d) failed!\n",
+ snprintf( buf, sizeof(buf), "[%s@%d] Accept(%d) failed!\n",
method_name, __LINE__, syncSock_ );
mon_log_write( MON_CLUSTER_ACCEPTSOCKPEER_2, SQ_LOG_ERR, buf );
rc = -1;
@@ -6474,10 +6631,13 @@
{
nodeSyncInfo_t readSyncInfo;
// Get info about connecting monitor
- rc = ReceiveSock( (char *) &readSyncInfo
- , sizeof(nodeSyncInfo_t)
- , reconnectSock
- , method_name );
+ rc = CComm::ReceiveWait( reconnectSock
+ , (char *)&readSyncInfo
+ , sizeof(nodeSyncInfo_t)
+ , sv_io_wait_timeout
+ , sv_io_retry_count
+ , (char *)"Unknown remote node"
+ , method_name );
if ( rc )
{ // Handle error
shutdown( reconnectSock, SHUT_RDWR);
@@ -6512,7 +6672,7 @@
close( (int)reconnectSock );
char buf[MON_STRING_BUF_SIZE];
- snprintf( buf, sizeof(buf), "[%s@%d] AcceptSock(%d) failed!\n",
+ snprintf( buf, sizeof(buf), "[%s@%d] Accept(%d) failed!\n",
method_name, __LINE__, syncSock_ );
mon_log_write( MON_CLUSTER_ACCEPTSOCKPEER_2, SQ_LOG_ERR, buf );
return(-1);
@@ -6531,10 +6691,13 @@
writeSyncInfo.pnid = MyPNID;
writeSyncInfo.seqNum = seqNum_;
writeSyncInfo.reconnectSeqNum = reconnectSeqNum_;
- rc = SendSock( (char *) &writeSyncInfo
- , sizeof(nodeSyncInfo_t)
- , reconnectSock
- , method_name );
+ rc = CComm::SendWait( reconnectSock
+ , (char *) &writeSyncInfo
+ , sizeof(nodeSyncInfo_t)
+ , sv_io_wait_timeout
+ , sv_io_retry_count
+ , (char *) acceptedNode->GetName()
+ , method_name );
if ( rc )
{
shutdown( reconnectSock, SHUT_RDWR);
@@ -6578,7 +6741,10 @@
struct epoll_event event;
event.data.fd = socks_[acceptedNode->GetPNid()];
event.events = 0;
- EpollCtlDelete( epollFD_, socks_[acceptedNode->GetPNid()], &event );
+ CComm::EpollCtlDelete( epollFD_
+ , socks_[acceptedNode->GetPNid()]
+ , &event
+ , (char*)acceptedNode->GetName() );
if (acceptedNode->GetState() != State_Up)
{
if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
@@ -6640,6 +6806,33 @@
unsigned char srcaddr[4], dstaddr[4];
struct hostent *he;
+ static int sv_io_wait_timeout = -2;
+ static int sv_io_retry_count = -1;
+ if ( sv_io_wait_timeout == -2 )
+ {
+ // Use the EPOLL timeout and retry values
+ char *lv_wait_timeout_env = getenv( "SQ_MON_EPOLL_WAIT_TIMEOUT" );
+ if ( lv_wait_timeout_env )
+ {
+ // Timeout in seconds
+ sv_io_wait_timeout = atoi( lv_wait_timeout_env ) * 1000;
+ char *lv_retry_count_env = getenv( "SQ_MON_EPOLL_RETRY_COUNT" );
+ if ( lv_retry_count_env )
+ {
+ sv_io_retry_count = atoi( lv_retry_count_env );
+ }
+ if ( sv_io_retry_count > EPOLL_IO_RETRY_COUNT_MAX )
+ {
+ sv_io_retry_count = EPOLL_IO_RETRY_COUNT_MAX;
+ }
+ }
+ else
+ {
+ sv_io_wait_timeout = EPOLL_IO_WAIT_TIMEOUT_MSEC;
+ sv_io_retry_count = EPOLL_IO_RETRY_COUNT;
+ }
+ }
+
// Get my host structure via my node name
he = gethostbyname( MyNode->GetName() );
if ( !he )
@@ -6693,7 +6886,7 @@
, sockPorts_[peer] );
}
// Connect to peer
- reconnectSock = MkCltSock( srcaddr, dstaddr, sockPorts_[peer] );
+ reconnectSock = CComm::Connect( srcaddr, dstaddr, sockPorts_[peer] );
if (reconnectSock > -1)
{
if (trace_settings & TRACE_RECOVERY)
@@ -6710,7 +6903,7 @@
{
char buf[MON_STRING_BUF_SIZE];
snprintf( buf, sizeof(buf)
- , "[%s@%d] MkCltSock() src=%d.%d.%d.%d, "
+ , "[%s@%d] Connect() src=%d.%d.%d.%d, "
"dst(%s)=%d.%d.%d.%d failed!\n"
, method_name, __LINE__
, (int)((unsigned char *)srcaddr)[0]
@@ -6741,7 +6934,10 @@
struct epoll_event event;
event.data.fd = socks_[peer];
event.events = 0;
- EpollCtlDelete( epollFD_, socks_[peer], &event );
+ CComm::EpollCtlDelete( epollFD_
+ , socks_[peer]
+ , &event
+ , node?(char*)node->GetName():(char*)"remoteNode" );
if (node->GetState() != State_Up)
{
if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
@@ -6767,10 +6963,13 @@
writeSyncInfo.pnid = MyPNID;
writeSyncInfo.seqNum = seqNum_;
writeSyncInfo.reconnectSeqNum = reconnectSeqNum_;
- rc = SendSock( (char *) &writeSyncInfo
- , sizeof(nodeSyncInfo_t)
- , reconnectSock
- , method_name );
+ rc = CComm::SendWait( reconnectSock
+ , (char *) &writeSyncInfo
+ , sizeof(nodeSyncInfo_t)
+ , sv_io_wait_timeout
+ , sv_io_retry_count
+ , node?(char *) node->GetName():(char *) "Remote node"
+ , method_name );
if ( rc )
{
shutdown( reconnectSock, SHUT_RDWR);
@@ -6799,10 +6998,13 @@
}
nodeSyncInfo_t readSyncInfo;
// Get info about connecting monitor
- rc = ReceiveSock( (char *) &readSyncInfo
- , sizeof(nodeSyncInfo_t)
- , reconnectSock
- , method_name );
+ rc = CComm::ReceiveWait( reconnectSock
+ , (char *)&readSyncInfo
+ , sizeof(nodeSyncInfo_t)
+ , sv_io_wait_timeout
+ , sv_io_retry_count
+ , (char *)node->GetName()
+ , method_name );
if ( rc )
{ // Handle error
shutdown( reconnectSock, SHUT_RDWR);
@@ -6810,7 +7012,7 @@
char buf[MON_STRING_BUF_SIZE];
snprintf( buf, sizeof(buf)
- , "[%s], unable to obtain node sync infor from remote"
+ , "[%s], unable to obtain node sync info from remote"
"monitor: %s.\n"
, method_name, ErrorMsg(rc));
mon_log_write(MON_CLUSTER_CONNECTSOCKPEER_6, SQ_LOG_ERR, buf);
@@ -6830,6 +7032,7 @@
, readSyncInfo.reconnectSeqNum );
}
socks_[peer] = reconnectSock; // ConnectSockPeer
+ ZClient->ErrorZNodeDelete( node->GetName() );
}
}
}
@@ -7542,7 +7745,7 @@
mem_log_write(CMonLog::MON_UPDATE_CLUSTER_2, MyPNID);
if( IsRealCluster )
{ // Terminate CommAccept thread, remote pings will fail
- CommAccept.shutdownWork();
+ CommAccept->shutdownWork();
if ( ZClientEnabled )
{
ZClient->RunningZNodeDelete( MyNode->GetName() );
@@ -7846,9 +8049,21 @@
if (NameServerEnabled)
{
clusterProcCount_ = 0;
- for (int index = 0; index < GetConfigPNodesMax(); index++)
+ for (int index = 0; index < GetConfigPNodesCount(); index++)
{
- clusterProcCount_ += nodestate[index].monProcCount;
+ clusterProcCount_ += nodestate[indexToPnid_[index]].monProcCount;
+#if 0
+ // Temporary trace - debugging only
+ if (trace_settings & (TRACE_PROCESS_DETAIL | TRACE_SYNC))
+ {
+ trace_printf( "%s@%d - nodestate[%d].monProcCount=%d, "
+ "clusterProcCount_=%d\n"
+ , method_name, __LINE__
+ , indexToPnid_[index]
+ , nodestate[indexToPnid_[index]].monProcCount
+ , clusterProcCount_ );
+ }
+#endif
}
}
#endif
@@ -8582,98 +8797,6 @@
return result;
}
-void CCluster::EpollCtl( int efd, int op, int fd, struct epoll_event *event )
-{
- const char method_name[] = "CCluster::EpollCtl";
- TRACE_ENTRY;
-#if 0
- if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
- {
- int iPeer;
- for ( iPeer = 0; iPeer < GetConfigPNodesCount(); iPeer++ )
- { // Find corresponding peer by matching socket fd
- if ( fd == socks_[indexToPnid_[iPeer]] ) break;
- }
- trace_printf( "%s@%d epoll_ctl( efd=%d,%s, fd=%d(%s), %s )\n"
- , method_name, __LINE__
- , efd
- , EpollOpString(op)
- , fd, Node[indexToPnid_[iPeer]]->GetName()
- , EpollEventString(event->events) );
- }
-#endif
- int rc = epoll_ctl( efd, op, fd, event );
- if ( rc == -1 )
- {
- char ebuff[256];
- char buf[MON_STRING_BUF_SIZE];
- int iPeer;
- for ( iPeer = 0; iPeer < GetConfigPNodesCount(); iPeer++ )
- { // Find corresponding peer by matching socket fd
- if ( fd == socks_[indexToPnid_[iPeer]] ) break;
- }
- snprintf( buf, sizeof(buf), "[%s@%d] epoll_ctl(efd=%d,%s, fd=%d(%s), %s) error: %s\n"
- , method_name, __LINE__
- , efd
- , EpollOpString(op)
- , fd, Node[indexToPnid_[iPeer]]->GetName()
- , EpollEventString(event->events)
- , strerror_r( errno, ebuff, 256 ) );
- mon_log_write( MON_CLUSTER_EPOLLCTL_1, SQ_LOG_CRIT, buf );
-
- mon_failure_exit();
- }
-
- TRACE_EXIT;
- return;
-}
-
-void CCluster::EpollCtlDelete( int efd, int fd, struct epoll_event *event )
-{
- const char method_name[] = "CCluster::EpollCtlDelete";
- TRACE_ENTRY;
-
- if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
- {
- int iPeer;
- for ( iPeer = 0; iPeer < GetConfigPNodesCount(); iPeer++ )
- { // Find corresponding peer by matching socket fd
- if ( fd == socks_[indexToPnid_[iPeer]] ) break;
- }
- trace_printf( "%s@%d epoll_ctl( efd=%d,%s, fd=%d(%s), %s )\n"
- , method_name, __LINE__
- , efd
- , EpollOpString(EPOLL_CTL_DEL)
- , fd, Node[indexToPnid_[iPeer]]->GetName()
- , EpollEventString(event->events) );
- }
-
- // Remove old socket from epoll set, it may not be there
- int rc = epoll_ctl( efd, EPOLL_CTL_DEL, fd, event );
- if ( rc == -1 )
- {
- int err = errno;
- if (err != ENOENT)
- {
- char ebuff[256];
- char buf[MON_STRING_BUF_SIZE];
- snprintf( buf, sizeof(buf), "[%s@%d] epoll_ctl(efd=%d, %s, fd=%d, %s) error: %s\n"
- , method_name, __LINE__
- , efd
- , EpollOpString(EPOLL_CTL_DEL)
- , fd
- , EpollEventString(event->events)
- , strerror_r( err, ebuff, 256 ) );
- mon_log_write( MON_CLUSTER_EPOLLCTLDELETE_1, SQ_LOG_CRIT, buf );
-
- mon_failure_exit();
- }
- }
-
- TRACE_EXIT;
- return;
-}
-
void CCluster::InitClusterSocks( int worldSize, int myRank, char *nodeNames, int *rankToPnid )
{
const char method_name[] = "CCluster::InitClusterSocks";
@@ -8793,7 +8916,7 @@
, sockPorts_[j] );
}
// Connect to peer
- socks_[rankToPnid[j]] = MkCltSock( srcaddr, dstaddr, sockPorts_[j] ); // InitClusterSocks
+ socks_[rankToPnid[j]] = CComm::Connect( srcaddr, dstaddr, sockPorts_[j] ); // InitClusterSocks
}
else if ( j == myRank )
{ // Current [j] peer my node, accept connection from peer [i] node
@@ -8810,7 +8933,7 @@
idst = i;
// Accept connection from peer [i]
- socks_[rankToPnid[i]] = AcceptSock( syncSock_ ); // InitClusterSocks
+ socks_[rankToPnid[i]] = CComm::Accept( syncSock_ ); // InitClusterSocks
}
else
{
@@ -8861,15 +8984,9 @@
{
const char method_name[] = "CCluster::InitServerSock";
TRACE_ENTRY;
- int serverCommPort = 0;
- int serverSyncPort = 0;
-#ifdef NAMESERVER_PROCESS
- int mon2nsPort = 0;
-#else
- int ptpPort = 0;
-#endif
- int val = 0;
+ int serverSyncPort = 0;
+ int val = 0;
unsigned char addr[4];
struct hostent *he;
@@ -8889,72 +9006,9 @@
memcpy( addr, he->h_addr, 4 );
#ifdef NAMESERVER_PROCESS
- char *env = getenv ("NS_COMM_PORT");
+ char *env = getenv("NS_SYNC_PORT");
#else
- char *env = getenv("MONITOR_COMM_PORT");
-#endif
- if ( env )
- {
- val = atoi(env);
- if ( val > 0)
- {
- if ( !IsRealCluster )
- {
- val += MyPNID;
- }
- serverCommPort = val;
- }
- }
-
- if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
- {
- trace_printf( "%s@%d COMM_PORT Node_name=%s, env=%s, serverCommPort=%d, val=%d\n"
- , method_name, __LINE__
- , Node_name, env, serverCommPort, val );
- }
-
- commSock_ = MkSrvSock( &serverCommPort );
- if ( commSock_ < 0 )
- {
- char ebuff[256];
- char buf[MON_STRING_BUF_SIZE];
- snprintf( buf, sizeof(buf)
-#ifdef NAMESERVER_PROCESS
- , "[%s@%d] MkSrvSock(NS_COMM_PORT=%d) error: %s\n"
-#else
- , "[%s@%d] MkSrvSock(MONITOR_COMM_PORT=%d) error: %s\n"
-#endif
- , method_name, __LINE__, serverCommPort
- , strerror_r( errno, ebuff, 256 ) );
- mon_log_write( MON_CLUSTER_INITSERVERSOCK_2, SQ_LOG_CRIT, buf );
-
- mon_failure_exit();
- }
- else
- {
- snprintf( MyCommPort, sizeof(MyCommPort)
- , "%d.%d.%d.%d:%d"
- , (int)((unsigned char *)addr)[0]
- , (int)((unsigned char *)addr)[1]
- , (int)((unsigned char *)addr)[2]
- , (int)((unsigned char *)addr)[3]
- , serverCommPort );
- MyNode->SetCommSocketPort( serverCommPort );
- MyNode->SetCommPort( MyCommPort );
-
- if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
- trace_printf( "%s@%d Initialized my comm socket port, "
- "pnid=%d (%s:%s) (commPort=%s)\n"
- , method_name, __LINE__
- , MyPNID, MyNode->GetName(), MyCommPort
- , MyNode->GetCommPort() );
-
- }
-
-#ifdef NAMESERVER_PROCESS
- env = getenv("NS_SYNC_PORT");
-#else
- env = getenv("MONITOR_SYNC_PORT");
+ char *env = getenv("MONITOR_SYNC_PORT");
#endif
if ( env )
{
@@ -8976,16 +9030,16 @@
, Node_name, env, syncPort_, val );
}
- syncSock_ = MkSrvSock( &serverSyncPort );
+ syncSock_ = CComm::Listen( &serverSyncPort );
if ( syncSock_ < 0 )
{
char ebuff[256];
char buf[MON_STRING_BUF_SIZE];
snprintf( buf, sizeof(buf)
#ifdef NAMESERVER_PROCESS
- , "[%s@%d] MkSrvSock(NS_SYNC_PORT=%d) error: %s\n"
+ , "[%s@%d] Listen(NS_SYNC_PORT=%d) error: %s\n"
#else
- , "[%s@%d] MkSrvSock(MONITOR_SYNC_PORT=%d) error: %s\n"
+ , "[%s@%d] Listen(MONITOR_SYNC_PORT=%d) error: %s\n"
#endif
, method_name, __LINE__, serverSyncPort
, strerror_r( errno, ebuff, 256 ) );
@@ -9013,120 +9067,6 @@
, MyNode->GetSyncPort() );
}
-#ifdef NAMESERVER_PROCESS
- env = getenv("NS_M2N_COMM_PORT");
- if ( env )
- {
- val = atoi(env);
- if ( val > 0)
- {
- if ( !IsRealCluster )
- {
- val += MyPNID;
- }
- mon2nsPort = val;
- }
- }
-
- mon2nsSock_ = MkSrvSock( &mon2nsPort );
- if ( mon2nsSock_ < 0 )
- {
- char ebuff[256];
- char buf[MON_STRING_BUF_SIZE];
- snprintf( buf, sizeof(buf)
- , "[%s@%d] MkSrvSock(NS_M2N_COMM_PORT=%d) error: %s\n"
- , method_name, __LINE__, mon2nsPort
- , strerror_r( errno, ebuff, 256 ) );
- mon_log_write( MON_CLUSTER_INITSERVERSOCK_4, SQ_LOG_CRIT, buf );
-
- mon_failure_exit();
- }
- else
- {
- snprintf( MyMon2NsPort, sizeof(MyMon2NsPort)
- , "%d.%d.%d.%d:%d"
- , (int)((unsigned char *)addr)[0]
- , (int)((unsigned char *)addr)[1]
- , (int)((unsigned char *)addr)[2]
- , (int)((unsigned char *)addr)[3]
- , mon2nsPort );
- MyNode->SetMon2NsPort( MyMon2NsPort );
- MyNode->SetMon2NsSocketPort( mon2nsPort );
-
- if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
- trace_printf( "%s@%d Initialized my mon2ns comm socket port, "
- "pnid=%d (%s:%s) (Mon2NsPort=%s, Mon2NsSocketPort=%d)\n"
- , method_name, __LINE__
- , MyPNID, MyNode->GetName(), MyMon2NsPort
- , MyNode->GetMon2NsPort()
- , MyNode->GetMon2NsSocketPort() );
-
- }
-#else
- if (NameServerEnabled)
- {
- env = getenv("MON2MON_COMM_PORT");
- if ( env )
- {
- val = atoi(env);
- if ( val > 0)
- {
- ptpPort = val;
- }
- }
- else
- {
- char buf[MON_STRING_BUF_SIZE];
- snprintf( buf, sizeof(buf)
- , "[%s@%d] MON2MON_COMM_PORT environment variable is not set!\n"
- , method_name, __LINE__ );
- mon_log_write( MON_CLUSTER_INITSERVERSOCK_5, SQ_LOG_CRIT, buf );
-
- mon_failure_exit();
- }
-
- // For virtual env, add PNid to the port so we can still test without collisions of port numbers
- if (!IsRealCluster)
- {
- ptpPort += MyNode->GetPNid();
- }
-
- ptpSock_ = MkSrvSock( &ptpPort );
- if ( ptpSock_ < 0 )
- {
- char ebuff[MON_STRING_BUF_SIZE];
- char buf[MON_STRING_BUF_SIZE];
- snprintf( buf, sizeof(buf)
- , "[%s@%d] MkSrvSock(MON2MON_COMM_PORT=%d) error: %s\n"
- , method_name, __LINE__, ptpPort
- , strerror_r( errno, ebuff, MON_STRING_BUF_SIZE ) );
- mon_log_write( MON_CLUSTER_INITSERVERSOCK_6, SQ_LOG_CRIT, buf );
-
- mon_failure_exit();
- }
- else
- {
- snprintf( MyPtPPort, sizeof(MyPtPPort)
- , "%d.%d.%d.%d:%d"
- , (int)((unsigned char *)addr)[0]
- , (int)((unsigned char *)addr)[1]
- , (int)((unsigned char *)addr)[2]
- , (int)((unsigned char *)addr)[3]
- , ptpPort );
- MyNode->SetPtPPort( MyPtPPort );
- MyNode->SetPtPSocketPort( ptpPort );
-
- if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
- trace_printf( "%s@%d Initialized my ptp socket port, "
- "pnid=%d (%s:%s) (ptpPort=%s)\n"
- , method_name, __LINE__
- , MyPNID, MyNode->GetName(), MyPtPPort
- , MyNode->GetPtPPort() );
-
- }
- }
-#endif
-
epollFD_ = epoll_create1( EPOLL_CLOEXEC );
if ( epollFD_ < 0 )
{
@@ -9139,1073 +9079,20 @@
mon_failure_exit();
}
- epollPingFD_ = epoll_create1( EPOLL_CLOEXEC );
- if ( epollPingFD_ < 0 )
- {
- char ebuff[256];
- char buf[MON_STRING_BUF_SIZE];
- snprintf( buf, sizeof(buf), "[%s@%d] epoll_create1(ping) error: %s\n",
- method_name, __LINE__, strerror_r( errno, ebuff, 256 ) );
- mon_log_write( MON_CLUSTER_INITSERVERSOCK_5, SQ_LOG_CRIT, buf );
-
- mon_failure_exit();
- }
-
TRACE_EXIT;
}
-int CCluster::AcceptCommSock( void )
-{
- const char method_name[] = "CCluster::AcceptCommSock";
- TRACE_ENTRY;
-
- int csock = AcceptSock( commSock_ );
-
- TRACE_EXIT;
- return( csock );
-}
-
int CCluster::AcceptSyncSock( void )
{
const char method_name[] = "CCluster::AcceptSyncSock";
TRACE_ENTRY;
- int csock = AcceptSock( syncSock_ );
+ int csock = CComm::Accept( syncSock_ );
TRACE_EXIT;
return( csock );
}
-#ifndef NAMESERVER_PROCESS
-int CCluster::AcceptPtPSock( void )
-{
- const char method_name[] = "CCluster::AcceptPtPSock";
- TRACE_ENTRY;
-
- int csock = AcceptSock( ptpSock_ );
-
- TRACE_EXIT;
- return( csock );
-}
-#endif
-
-
-int CCluster::AcceptSock( int sock )
-{
- const char method_name[] = "CCluster::AcceptSock";
- TRACE_ENTRY;
-
-#if defined(_XOPEN_SOURCE_EXTENDED)
-#ifdef __LP64__
- socklen_t size; // size of socket address
-#else
- size_t size; // size of socket address
-#endif
-#else
- int size; // size of socket address
-#endif
- int csock; // connected socket
- struct sockaddr_in sockinfo; // socket address info
-
- size = sizeof(struct sockaddr *);
- if ( getsockname( sock, (struct sockaddr *) &sockinfo, &size ) )
- {
- char buf[MON_STRING_BUF_SIZE];
- int err = errno;
- snprintf(buf, sizeof(buf), "[%s], getsockname() failed, errno=%d (%s).\n",
- method_name, err, strerror(err));
- mon_log_write(MON_CLUSTER_ACCEPTSOCK_1, SQ_LOG_ERR, buf);
- return ( -1 );
- }
-
- if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
- {
- unsigned char *addrp = (unsigned char *) &sockinfo.sin_addr.s_addr;
- trace_printf( "%s@%d - Accepting socket on addr=%d.%d.%d.%d, port=%d\n"
- , method_name, __LINE__
- , addrp[0]
- , addrp[1]
- , addrp[2]
- , addrp[3]
- , (int) ntohs( sockinfo.sin_port ) );
- }
-
- while ( ((csock = accept( sock
- , (struct sockaddr *) 0
- , (socklen_t *) 0 ) ) < 0) && (errno == EINTR) );
-
- if ( csock > 0 )
- {
- if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
- {
- unsigned char *addrp = (unsigned char *) &sockinfo.sin_addr.s_addr;
- trace_printf( "%s@%d - Accepted socket on addr=%d.%d.%d.%d, port=%d, sock=%d\n"
- , method_name, __LINE__
- , addrp[0]
- , addrp[1]
- , addrp[2]
- , addrp[3]
- , (int) ntohs( sockinfo.sin_port )
- , csock );
- }
-
- int nodelay = 1;
- if ( setsockopt( csock
- , IPPROTO_TCP
- , TCP_NODELAY
- , (char *) &nodelay
- , sizeof(int) ) )
- {
- char buf[MON_STRING_BUF_SIZE];
- int err = errno;
- snprintf(buf, sizeof(buf), "[%s], setsockopt() failed, errno=%d (%s).\n",
- method_name, err, strerror(err));
- mon_log_write(MON_CLUSTER_ACCEPTSOCK_2, SQ_LOG_ERR, buf);
- return ( -2 );
- }
-
- int reuse = 1;
- if ( setsockopt( csock
- , SOL_SOCKET
- , SO_REUSEADDR
- , (char *) &reuse
- , sizeof(int) ) )
- {
- char buf[MON_STRING_BUF_SIZE];
- int err = errno;
- snprintf(buf, sizeof(buf), "[%s], setsockopt() failed, errno=%d (%s).\n",
- method_name, err, strerror(err));
- mon_log_write(MON_CLUSTER_ACCEPTSOCK_3, SQ_LOG_ERR, buf);
- return ( -2 );
- }
- }
-
- TRACE_EXIT;
- return ( csock );
-}
-
-int CCluster::Connect( const char *portName, bool doRetries )
-{
- const char method_name[] = "CCluster::Connect";
- TRACE_ENTRY;
-
- int sock; // socket
- int ret; // returned value
- int nodelay = 1; // sockopt reuse option
- int reuse = 1; // sockopt reuse option
-#if defined(_XOPEN_SOURCE_EXTENDED)
-#ifdef __LP64__
- socklen_t size; // size of socket address
-#else
- size_t size; // size of socket address
-#endif
-#else
- int size; // size of socket address
-#endif
- static int retries = 0; // # times to retry connect
- int outer_failures = 0; // # failed connect loops
- int connect_failures = 0; // # failed connects
- char *p; // getenv results
- struct sockaddr_in sockinfo; // socket address info
- struct hostent *he;
- char host[1000];
- const char *colon;
- unsigned int port;
-
- colon = strstr(portName, ":");
- strcpy(host, portName);
- int len = colon - portName;
- host[len] = '\0';
- port = atoi(&colon[1]);
- size = sizeof(sockinfo);
-
- if ( !retries )
- {
- p = getenv( "HPMP_CONNECT_RETRIES" );
- if ( p ) retries = atoi( p );
- else retries = 5;
- }
-
- for ( ;; )
- {
- sock = socket( AF_INET, SOCK_STREAM, 0 );
- if ( sock < 0 )
- {
- char la_buf[MON_STRING_BUF_SIZE];
- int err = errno;
- sprintf( la_buf, "[%s], socket() failed! errno=%d (%s)\n"
- , method_name, err, strerror( err ));
- mon_log_write(MON_CLUSTER_CONNECT_1, SQ_LOG_CRIT, la_buf);
-
- mon_failure_exit();
- }
-
- he = gethostbyname( host );
- if ( !he )
- {
- char ebuff[256];
- char buf[MON_STRING_BUF_SIZE];
- snprintf( buf, sizeof(buf), "[%s@%d] gethostbyname(%s) error: %s\n",
- method_name, __LINE__, host, strerror_r( h_errno, ebuff, 256 ) );
- mon_log_write( MON_CLUSTER_CONNECT_2, SQ_LOG_CRIT, buf );
-
- mon_failure_exit();
- }
-
- // Connect socket.
- memset( (char *) &sockinfo, 0, size );
- memcpy( (char *) &sockinfo.sin_addr, (char *) he->h_addr, 4 );
- sockinfo.sin_family = AF_INET;
- sockinfo.sin_port = htons( (unsigned short) port );
-
- // Note the outer loop uses "retries" from HPMP_CONNECT_RETRIES,
- // and has a yield between each retry, since it's more oriented
- // toward failures from network overload and putting a pause
- // between retries. This inner loop should only iterate when
- // a signal interrupts the local process, so it doesn't pause
- // or use the same HPMP_CONNECT_RETRIES count.
- connect_failures = 0;
- ret = 1;
- while ( ret != 0 && connect_failures <= 10 )
- {
- if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
- {
- if (doRetries)
- {
- trace_printf( "%s@%d - Connecting to %s, addr=%d.%d.%d.%d, port=%d, connect_failures=%d\n"
- , method_name, __LINE__
- , portName
- , (int)((unsigned char *)he->h_addr)[0]
- , (int)((unsigned char *)he->h_addr)[1]
- , (int)((unsigned char *)he->h_addr)[2]
- , (int)((unsigned char *)he->h_addr)[3]
- , port
- , connect_failures );
- }
- else
- {
- trace_printf( "%s@%d - Connecting to %s, addr=%d.%d.%d.%d, port=%d\n"
- , method_name, __LINE__
- , portName
- , (int)((unsigned char *)he->h_addr)[0]
- , (int)((unsigned char *)he->h_addr)[1]
- , (int)((unsigned char *)he->h_addr)[2]
- , (int)((unsigned char *)he->h_addr)[3]
- , port );
- }
- }
-
- ret = connect( sock, (struct sockaddr *) &sockinfo, size );
- if ( ret == 0 ) break;
- if ( errno == EINTR )
- {
- ++connect_failures;
- }
-#ifdef NAMESERVER_PROCESS
- else if ( errno == ECONNREFUSED )
- {
- ++connect_failures;
- sleep( 1 );
- }
-#endif
- else
- {
- char la_buf[MON_STRING_BUF_SIZE];
- int err = errno;
- sprintf( la_buf, "[%s], connect(%s) failed! errno=%d (%s)\n"
- , method_name, portName, err, strerror( err ));
- mon_log_write(MON_CLUSTER_CONNECT_3, SQ_LOG_ERR, la_buf);
- close(sock);
- return ( -1 );
- }
- }
-
- if ( ret == 0 ) break;
-
- if (doRetries == false)
- {
- close( sock );
- return( -1 );
- }
-
- // For large clusters, the connect/accept calls seem to fail occasionally,
- // no doubt do to the large number (1000's) of simultaneous connect packets
- // flooding the network at once. So, we retry up to HPMP_CONNECT_RETRIES
- // number of times.
- if ( errno != EINTR )
- {
- if ( ++outer_failures > retries )
- {
- char la_buf[MON_STRING_BUF_SIZE];
- sprintf( la_buf, "[%s], connect(%s) exceeded retries! count=%d\n"
- , method_name, portName, retries);
- mon_log_write(MON_CLUSTER_CONNECT_4, SQ_LOG_ERR, la_buf);
- close( sock );
- return ( -1 );
- }
- struct timespec req, rem;
- req.tv_sec = 0;
- req.tv_nsec = 500000;
- nanosleep( &req, &rem );
- }
- close( sock );
- }
-
- if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
- {
- trace_printf( "%s@%d - Connected to %s addr=%d.%d.%d.%d, port=%d, sock=%d\n"
- , method_name, __LINE__
- , host
- , (int)((unsigned char *)he->h_addr)[0]
- , (int)((unsigned char *)he->h_addr)[1]
- , (int)((unsigned char *)he->h_addr)[2]
- , (int)((unsigned char *)he->h_addr)[3]
- , port
- , sock );
- }
-
- if ( setsockopt( sock, IPPROTO_TCP, TCP_NODELAY, (char *) &nodelay, sizeof(int) ) )
- {
- char la_buf[MON_STRING_BUF_SIZE];
- int err = errno;
- sprintf( la_buf, "[%s], setsockopt() failed! errno=%d (%s)\n"
- , method_name, err, strerror( err ));
- mon_log_write(MON_CLUSTER_CONNECT_5, SQ_LOG_ERR, la_buf);
- close( sock );
- return ( -2 );
- }
-
- if ( setsockopt( sock, SOL_SOCKET, SO_REUSEADDR, (char *) &reuse, sizeof(int) ) )
- {
- char la_buf[MON_STRING_BUF_SIZE];
- int err = errno;
- sprintf( la_buf, "[%s], setsockopt() failed! errno=%d (%s)\n"
- , method_name, err, strerror( err ));
- mon_log_write(MON_CLUSTER_CONNECT_6, SQ_LOG_ERR, la_buf);
- close( sock );
- return ( -2 );
- }
-
- TRACE_EXIT;
- return ( sock );
-}
-
-#ifdef NAMESERVER_PROCESS
-void CCluster::ConnectToMon2NsCommSelf( void )
-{
- const char method_name[] = "CCluster::ConnectToMon2NsCommSelf";
- TRACE_ENTRY;
-
- Connect( MyNode->GetMon2NsSocketPort() );
-
- TRACE_EXIT;
-}
-#else
-void CCluster::ConnectToPtPCommSelf( void )
-{
- const char method_name[] = "CCluster::ConnectToPtPCommSelf";
- TRACE_ENTRY;
-
- Connect( MyNode->GetPtPSocketPort() );
-
- TRACE_EXIT;
-}
-#endif
-
-void CCluster::ConnectToSelf( void )
-{
- const char method_name[] = "CCluster::ConnectToSelf";
- TRACE_ENTRY;
-
- Connect( MyNode->GetCommSocketPort() );
-
- TRACE_EXIT;
-}
-
-void CCluster::Connect( int socketPort )
-{
- const char method_name[] = "CCluster::Connect";
- TRACE_ENTRY;
-
- int sock; // socket
- int ret; // returned value
-#if defined(_XOPEN_SOURCE_EXTENDED)
-#ifdef __LP64__
- socklen_t size; // size of socket address
-#else
- size_t size; // size of socket address
-#endif
-#else
- int size; // size of socket address
-#endif
- static int retries = 0; // # times to retry connect
- int connect_failures = 0; // # failed connects
- char *p; // getenv results
- struct sockaddr_in sockinfo; // socket address info
- struct hostent *he;
-
- size = sizeof(sockinfo);
-
- if ( !retries )
- {
- p = getenv( "HPMP_CONNECT_RETRIES" );
- if ( p ) retries = atoi( p );
- else retries = 5;
- }
-
- sock = socket( AF_INET, SOCK_STREAM, 0 );
- if ( sock < 0 )
- {
- char la_buf[MON_STRING_BUF_SIZE];
- int err = errno;
- sprintf( la_buf, "[%s], socket() failed! errno=%d (%s)\n"
- , method_name, err, strerror( err ));
- mon_log_write(MON_CLUSTER_CONNECTTOSELF_1, SQ_LOG_CRIT, la_buf);
-
- mon_failure_exit();
- }
-
- he = gethostbyname( "localhost" );
- if ( !he )
- {
- char ebuff[256];
- char buf[MON_STRING_BUF_SIZE];
- snprintf( buf, sizeof(buf), "[%s@%d] gethostbyname(%s) error: %s\n",
- method_name, __LINE__, "localhost", strerror_r( h_errno, ebuff, 256 ) );
- mon_log_write( MON_CLUSTER_CONNECTTOSELF_2, SQ_LOG_CRIT, buf );
-
- mon_failure_exit();
- }
-
- // Connect socket.
- memset( (char *) &sockinfo, 0, size );
- memcpy( (char *) &sockinfo.sin_addr, (char *) he->h_addr, 4 );
- sockinfo.sin_family = AF_INET;
- sockinfo.sin_port = htons( (unsigned short) socketPort );
-
- connect_failures = 0;
- ret = 1;
- while ( ret != 0 && connect_failures <= 10 )
- {
- if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
- {
- trace_printf( "%s@%d - Connecting to localhost addr=%d.%d.%d.%d, port=%d, connect_failures=%d\n"
- , method_name, __LINE__
- , (int)((unsigned char *)he->h_addr)[0]
- , (int)((unsigned char *)he->h_addr)[1]
- , (int)((unsigned char *)he->h_addr)[2]
- , (int)((unsigned char *)he->h_addr)[3]
- , socketPort
- , connect_failures );
- }
-
- ret = connect( sock, (struct sockaddr *) &sockinfo, size );
- if ( ret == 0 ) break;
- if ( errno == EINTR )
- {
- ++connect_failures;
- }
- else
- {
- char la_buf[MON_STRING_BUF_SIZE];
- int err = errno;
- sprintf( la_buf, "[%s], connect() failed! errno=%d (%s)\n"
- , method_name, err, strerror( err ));
- mon_log_write(MON_CLUSTER_CONNECTTOSELF_3, SQ_LOG_CRIT, la_buf);
-
- mon_failure_exit();
- }
- }
-
- close( sock );
-
- TRACE_EXIT;
-}
-
-int CCluster::MkSrvSock( int *pport )
-{
- const char method_name[] = "CCluster::MkSrvSock";
- TRACE_ENTRY;
-
- int sock; // socket
- int err; // return code
-#if defined(_XOPEN_SOURCE_EXTENDED)
-#ifdef __LP64__
- socklen_t size; // size of socket address
-#else
- size_t size; // size of socket address
-#endif
-#else
- unsigned int size; // size of socket address
-#endif
- struct sockaddr_in sockinfo; // socket address info
- sock = socket( AF_INET, SOCK_STREAM, 0 );
- if ( sock < 0 )
- {
- char la_buf[MON_STRING_BUF_SIZE];
- int err = errno;
- sprintf( la_buf, "[%s], socket() failed! errno=%d (%s)\n"
- , method_name, err, strerror( err ));
- mon_log_write(MON_CLUSTER_MKSRVSOCK_1, SQ_LOG_CRIT, la_buf);
- return ( -1 );
- }
-
- int nodelay = 1; // sockopt nodelay option
- if ( setsockopt( sock, IPPROTO_TCP, TCP_NODELAY, (char *) &nodelay, sizeof(int) ) )
- {
- char la_buf[MON_STRING_BUF_SIZE];
- int err = errno;
- sprintf( la_buf, "[%s], setsockopt() failed! errno=%d (%s)\n"
- , method_name, err, strerror( err ));
- mon_log_write(MON_CLUSTER_MKSRVSOCK_2, SQ_LOG_ERR, la_buf);
- close( sock );
- return ( -2 );
- }
-
- int reuse = 1; // sockopt reuse option
- if ( setsockopt( sock, SOL_SOCKET, SO_REUSEADDR, (char *) &reuse, sizeof(int) ) )
- {
- char la_buf[MON_STRING_BUF_SIZE];
- int err = errno;
- sprintf( la_buf, "[%s], setsockopt(SO_REUSEADDR) failed! errno=%d (%s)\n"
- , method_name, err, strerror( err ));
- mon_log_write(MON_CLUSTER_MKSRVSOCK_3, SQ_LOG_ERR, la_buf);
- close( sock );
- return ( -1 );
- }
-
- // Bind socket.
- size = sizeof(sockinfo);
- memset( (char *) &sockinfo, 0, size );
- sockinfo.sin_family = AF_INET;
- sockinfo.sin_addr.s_addr = htonl( INADDR_ANY );
- sockinfo.sin_port = htons( *pport );
- int lv_bind_tries = 0;
- do
- {
- if (lv_bind_tries > 0)
- {
- sleep(5);
- }
- err = bind( sock, (struct sockaddr *) &sockinfo, size );
- sched_yield( );
- } while ( err &&
- (errno == EADDRINUSE) &&
- (++lv_bind_tries < 4) );
- if ( err )
- {
- char la_buf[MON_STRING_BUF_SIZE];
- int err = errno;
- sprintf( la_buf, "[%s], bind() failed! port=%d, errno=%d (%s)\n"
- , method_name, *pport, err, strerror( err ));
- mon_log_write(MON_CLUSTER_MKSRVSOCK_4, SQ_LOG_CRIT, la_buf);
- close( sock );
- return ( -1 );
- }
- if ( pport )
- {
- if ( getsockname( sock, (struct sockaddr *) &sockinfo, &size ) )
- {
- char la_buf[MON_STRING_BUF_SIZE];
- int err = errno;
- sprintf( la_buf, "[%s], getsockname() failed! errno=%d (%s)\n"
- , method_name, err, strerror( err ));
- mon_log_write(MON_CLUSTER_MKSRVSOCK_5, SQ_LOG_CRIT, la_buf);
- close( sock );
- return ( -1 );
- }
-
- *pport = (int) ntohs( sockinfo.sin_port );
- }
- if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
- {
- unsigned char *addrp = (unsigned char *) &sockinfo.sin_addr.s_addr;
- trace_printf( "%s@%d listening on addr=%d.%d.%d.%d, port=%d\n"
- , method_name, __LINE__
- , addrp[0]
- , addrp[1]
- , addrp[2]
- , addrp[3]
- , pport?*pport:0);
- }
-
- int lv_retcode = SetKeepAliveSockOpt( sock );
- if ( lv_retcode != 0 )
- {
- return lv_retcode;
- }
-
- // Listen
- if ( listen( sock, SOMAXCONN ) )
- {
- char la_buf[MON_STRING_BUF_SIZE];
- int err = errno;
- sprintf( la_buf, "[%s], listen() failed! errno=%d (%s)\n"
- , method_name, err, strerror( err ));
- mon_log_write(MON_CLUSTER_MKSRVSOCK_6, SQ_LOG_CRIT, la_buf);
- close( sock );
- return ( -1 );
- }
- TRACE_EXIT;
- return ( sock );
-}
-
-int CCluster::MkCltSock( const char *portName )
-{
- const char method_name[] = "CCluster::MkCltSock1";
- TRACE_ENTRY;
-
- int sock; // socket
- int ret; // returned value
- int reuse = 1; // sockopt reuse option
- int nodelay = 1; // sockopt nodelay option
-#if defined(_XOPEN_SOURCE_EXTENDED)
-#ifdef __LP64__
- socklen_t size; // size of socket address
-#else
- size_t size; // size of socket address
-#endif
-#else
- int size; // size of socket address
-#endif
- static int retries = 0; // # times to retry connect
- int outer_failures = 0; // # failed connect loops
- int connect_failures = 0; // # failed connects
- char *p; // getenv results
- struct sockaddr_in sockinfo; // socket address info
- struct hostent *he;
- char host[1000];
- const char *colon;
- unsigned int port;
-
- colon = strstr(portName, ":");
- strcpy(host, portName);
- int len = colon - portName;
- host[len] = '\0';
- port = atoi(&colon[1]);
-
- if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
- {
- if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
- {
- trace_printf( "%s@%d - Connecting to %s:%d\n"
- , method_name, __LINE__
- , host
- , port );
- }
- }
-
- size = sizeof(sockinfo);
-
- if ( !retries )
- {
- p = getenv( "HPMP_CONNECT_RETRIES" );
- if ( p ) retries = atoi( p );
- else retries = 5;
- }
-
- for ( ;; )
- {
- sock = socket( AF_INET, SOCK_STREAM, 0 );
- if ( sock < 0 )
- {
- char la_buf[MON_STRING_BUF_SIZE];
- int err = errno;
- snprintf( la_buf, sizeof(la_buf)
- , "[%s], socket() failed! errno=%d (%s)\n"
- , method_name, err, strerror( err ));
- mon_log_write(MON_CLUSTER_MKCLTSOCK_1, SQ_LOG_ERR, la_buf);
- return ( -1 );
- }
-
- he = gethostbyname( host );
- if ( !he )
- {
- char la_buf[MON_STRING_BUF_SIZE];
- int err = h_errno;
- snprintf( la_buf, sizeof(la_buf),
- "[%s] gethostbyname(%s) failed! errno=%d (%s)\n"
- , method_name, host, err, strerror( err ));
- mon_log_write(MON_CLUSTER_MKCLTSOCK_2, SQ_LOG_ERR, la_buf);
- close( sock );
- return ( -1 );
- }
-
- // Connect socket.
- memset( (char *) &sockinfo, 0, size );
- memcpy( (char *) &sockinfo.sin_addr, (char *) he->h_addr, 4 );
- sockinfo.sin_family = AF_INET;
- sockinfo.sin_port = htons( (unsigned short) port );
-
- // Note the outer loop uses "retries" from HPMP_CONNECT_RETRIES,
- // and has a yield between each retry, since it's more oriented
- // toward failures from network overload and putting a pause
- // between retries. This inner loop should only iterate when
- // a signal interrupts the local process, so it doesn't pause
- // or use the same HPMP_CONNECT_RETRIES count.
- connect_failures = 0;
- ret = 1;
- while ( ret != 0 && connect_failures <= 10 )
- {
- if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
- {
- trace_printf( "%s@%d - Connecting to %s addr=%d.%d.%d.%d, port=%d, connect_failures=%d\n"
- , method_name, __LINE__
- , host
- , (int)((unsigned char *)he->h_addr)[0]
- , (int)((unsigned char *)he->h_addr)[1]
- , (int)((unsigned char *)he->h_addr)[2]
- , (int)((unsigned char *)he->h_addr)[3]
- , port
- , connect_failures );
- }
-
- ret = connect( sock, (struct sockaddr *) &sockinfo, size );
- if ( ret == 0 ) break;
- if ( errno == EINTR )
- {
- ++connect_failures;
- }
- else
- {
- char la_buf[MON_STRING_BUF_SIZE];
- int err = errno;
- sprintf( la_buf, "[%s], connect() failed! errno=%d (%s)\n"
- , method_name, err, strerror( err ));
- mon_log_write(MON_CLUSTER_MKCLTSOCK_3, SQ_LOG_ERR, la_buf);
- close(sock);
- return ( -1 );
- }
- }
-
- if ( ret == 0 ) break;
-
- // For large clusters, the connect/accept calls seem to fail occasionally,
- // no doubt do to the large number (1000's) of simultaneous connect packets
- // flooding the network at once. So, we retry up to HPMP_CONNECT_RETRIES
- // number of times.
- if ( errno != EINTR )
- {
- if ( ++outer_failures > retries )
- {
- char la_buf[MON_STRING_BUF_SIZE];
- sprintf( la_buf, "[%s], connect() exceeded retries! count=%d\n"
- , method_name, retries);
- mon_log_write(MON_CLUSTER_MKCLTSOCK_4, SQ_LOG_ERR, la_buf);
- close( sock );
- return ( -1 );
- }
- struct timespec req, rem;
- req.tv_sec = 0;
- req.tv_nsec = 500000;
- nanosleep( &req, &rem );
- }
- close( sock );
- }
-
- if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
- {
- trace_printf( "%s@%d - Connected to %s addr=%d.%d.%d.%d, port=%d, sock=%d\n"
- , method_name, __LINE__
- , host
- , (int)((unsigned char *)he->h_addr)[0]
- , (int)((unsigned char *)he->h_addr)[1]
- , (int)((unsigned char *)he->h_addr)[2]
- , (int)((unsigned char *)he->h_addr)[3]
- , port
- , sock );
- }
-
- if ( setsockopt( sock, IPPROTO_TCP, TCP_NODELAY, (char *) &nodelay, sizeof(int) ) )
- {
- char la_buf[MON_STRING_BUF_SIZE];
- int err = errno;
- sprintf( la_buf, "[%s], setsockopt() failed! errno=%d (%s)\n"
- , method_name, err, strerror( err ));
- mon_log_write(MON_CLUSTER_MKCLTSOCK_5, SQ_LOG_ERR, la_buf);
- close( sock );
- return ( -2 );
- }
-
- if ( setsockopt( sock, SOL_SOCKET, SO_REUSEADDR, (char *) &reuse, sizeof(int) ) )
- {
- char la_buf[MON_STRING_BUF_SIZE];
- int err = errno;
- sprintf( la_buf, "[%s], setsockopt(SO_REUSEADDR) failed! errno=%d (%s)\n"
- , method_name, err, strerror( err ));
- mon_log_write(MON_CLUSTER_MKCLTSOCK_6, SQ_LOG_ERR, la_buf);
- close( sock );
- return ( -2 );
- }
-
- TRACE_EXIT;
- return ( sock );
-}
-
-int CCluster::SetKeepAliveSockOpt( int sock )
-{
- const char method_name[] = "CCluster::SetKeepAliveSockOpt";
- TRACE_ENTRY;
-
- static int sv_keepalive = -1;
- static int sv_keepidle = 120;
- static int sv_keepintvl = 12;
- static int sv_keepcnt = 5;
-
- if ( sv_keepalive == -1 )
- {
- char *lv_keepalive_env = getenv( "SQ_MON_KEEPALIVE" );
- if ( lv_keepalive_env )
- {
- sv_keepalive = atoi( lv_keepalive_env );
- }
- if ( sv_keepalive == 1 )
- {
- char *lv_keepidle_env = getenv( "SQ_MON_KEEPIDLE" );
- if ( lv_keepidle_env )
- {
- sv_keepidle = atoi( lv_keepidle_env );
- }
- char *lv_keepintvl_env = getenv( "SQ_MON_KEEPINTVL" );
- if ( lv_keepintvl_env )
- {
- sv_keepintvl = atoi( lv_keepintvl_env );
- }
- char *lv_keepcnt_env = getenv( "SQ_MON_KEEPCNT" );
- if ( lv_keepcnt_env )
- {
- sv_keepcnt = atoi( lv_keepcnt_env );
- }
- }
- }
-
- if ( sv_keepalive == 1 )
- {
- if ( setsockopt( sock, SOL_SOCKET, SO_KEEPALIVE, &sv_keepalive, sizeof(int) ) )
- {
- char la_buf[MON_STRING_BUF_SIZE];
- int err = errno;
- sprintf( la_buf, "[%s], setsockopt so_keepalive() failed! errno=%d (%s)\n"
- , method_name, err, strerror( err ) );
- mon_log_write( MON_CLUSTER_SETKEEPALIVESOCKOPT_1, SQ_LOG_ERR, la_buf );
- close( sock );
- return ( -2 );
- }
-
- if ( setsockopt( sock, SOL_TCP, TCP_KEEPIDLE, &sv_keepidle, sizeof(int) ) )
- {
- char la_buf[MON_STRING_BUF_SIZE];
- int err = errno;
- sprintf( la_buf, "[%s], setsockopt tcp_keepidle() failed! errno=%d (%s)\n"
- , method_name, err, strerror( err ) );
- mon_log_write( MON_CLUSTER_SETKEEPALIVESOCKOPT_2, SQ_LOG_ERR, la_buf );
- close( sock );
- return ( -2 );
- }
-
- if ( setsockopt( sock, SOL_TCP, TCP_KEEPINTVL, &sv_keepintvl, sizeof(int) ) )
- {
- char la_buf[MON_STRING_BUF_SIZE];
- int err = errno;
- sprintf( la_buf, "[%s], setsockopt tcp_keepintvl() failed! errno=%d (%s)\n"
- , method_name, err, strerror( err ) );
- mon_log_write( MON_CLUSTER_SETKEEPALIVESOCKOPT_3, SQ_LOG_ERR, la_buf );
- close( sock );
- return ( -2 );
- }
-
- if ( setsockopt( sock, SOL_TCP, TCP_KEEPCNT, &sv_keepcnt, sizeof(int) ) )
- {
- char la_buf[MON_STRING_BUF_SIZE];
- int err = errno;
- sprintf( la_buf, "[%s], setsockopt tcp_keepcnt() failed! errno=%d (%s)\n"
- , method_name, err, strerror( err ) );
- mon_log_write( MON_CLUSTER_SETKEEPALIVESOCKOPT_4, SQ_LOG_ERR, la_buf );
- close( sock );
- return ( -2 );
- }
- }
-
- TRACE_EXIT;
- return ( 0 );
-}
-
-int CCluster::MkCltSock( unsigned char srcip[4], unsigned char dstip[4], int port )
-{
- const char method_name[] = "CCluster::MkCltSock2";
- TRACE_ENTRY;
-
- int sock; // socket
- int ret; // returned value
- int reuse = 1; // sockopt reuse option
- int nodelay = 1; // sockopt nodelay option
-#if defined(_XOPEN_SOURCE_EXTENDED)
-#ifdef __LP64__
- socklen_t size; // size of socket address
-#else
- size_t size; // size of socket address
-#endif
-#else
- int size; // size of socket address
-#endif
- static int retries = 0; // # times to retry connect
- int outer_failures = 0; // # failed connect loops
- int connect_failures = 0; // # failed connects
- char *p; // getenv results
- struct sockaddr_in sockinfo; // socket address info
-
- size = sizeof(sockinfo);
- const char * size_srcip = (const char *) srcip;
-
- if ( !retries )
- {
- p = getenv( "HPMP_CONNECT_RETRIES" );
- if ( p ) retries = atoi( p );
- else retries = 5;
- }
-
- for ( ;; )
- {
- sock = socket( AF_INET, SOCK_STREAM, 0 );
- if ( sock < 0 ) return ( -1 );
-
- // Bind local address if specified.
- if ( srcip )
- {
- memset( (char *) &sockinfo, 0, size );
- memcpy( (char *) &sockinfo.sin_addr,
- (unsigned char *) srcip, strlen(size_srcip));
-
- sockinfo.sin_family = AF_INET;
- sockinfo.sin_port = 0;
- if ( bind( sock, (struct sockaddr *) &sockinfo, size ) )
- {
- char la_buf[MON_STRING_BUF_SIZE];
- int err = errno;
- sprintf( la_buf, "[%s], bind() failed! errno=%d (%s)\n"
- , method_name, err, strerror( err ));
- mon_log_write(MON_CLUSTER_MKCLTSOCK_7, SQ_LOG_ERR, la_buf);
- close( sock );
- return ( -1 );
- }
- }
-
- // Connect socket.
- memset( (char *) &sockinfo, 0, size );
- memcpy( (char *) &sockinfo.sin_addr, (char *) dstip, 4 );
- sockinfo.sin_family = AF_INET;
- sockinfo.sin_port = htons( (unsigned short) port );
-
- // Note the outer loop uses "retries" from HPMP_CONNECT_RETRIES,
- // and has a yield between each retry, since it's more oriented
- // toward failures from network overload and putting a pause
- // between retries. This inner loop should only iterate when
- // a signal interrupts the local process, so it doesn't pause
- // or use the same HPMP_CONNECT_RETRIES count.
- connect_failures = 0;
- ret = 1;
- while ( ret != 0 && connect_failures <= 10 )
- {
- if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
- {
- trace_printf( "%s@%d - Connecting to addr=%d.%d.%d.%d, port=%d, connect_failures=%d\n"
- , method_name, __LINE__
- , (int)dstip[0]
- , (int)dstip[1]
- , (int)dstip[2]
- , (int)dstip[3]
- , port
- , connect_failures );
- }
- ret = connect( sock, (struct sockaddr *) &sockinfo,
- size );
- if ( ret == 0 ) break;
- if ( errno == EINTR )
- {
- ++connect_failures;
- }
-#ifdef NAMESERVER_PROCESS
- else if ( errno == ECONNREFUSED )
- {
- ++connect_failures;
- sleep( 1 );
- }
-#endif
- else
- {
- char la_buf[MON_STRING_BUF_SIZE];
- int err = errno;
- sprintf( la_buf, "[%s], connect(%d.%d.%d.%d:%d) failed! errno=%d (%s)\n"
- , method_name
- , (int)((unsigned char *)dstip)[0]
- , (int)((unsigned char *)dstip)[1]
- , (int)((unsigned char *)dstip)[2]
- , (int)((unsigned char *)dstip)[3]
- , port
- , err, strerror( err ));
- mon_log_write(MON_CLUSTER_MKCLTSOCK_8, SQ_LOG_ERR, la_buf);
- close(sock);
- return ( -1 );
- }
- }
-
- if ( ret == 0 ) break;
-
- // For large clusters, the connect/accept calls seem to fail occasionally,
- // no doubt do to the large number (1000's) of simultaneous connect packets
- // flooding the network at once. So, we retry up to HPMP_CONNECT_RETRIES
- // number of times.
- if ( errno != EINTR )
- {
- if ( ++outer_failures > retries )
- {
- char la_buf[MON_STRING_BUF_SIZE];
- sprintf( la_buf, "[%s], connect() exceeded retries! count=%d\n"
- , method_name, retries);
- mon_log_write(MON_CLUSTER_MKCLTSOCK_9, SQ_LOG_ERR, la_buf);
- close( sock );
- return ( -1 );
- }
- struct timespec req, rem;
- req.tv_sec = 0;
- req.tv_nsec = 500000;
- nanosleep( &req, &rem );
- }
- close( sock );
- }
-
- if ( setsockopt( sock, IPPROTO_TCP, TCP_NODELAY, (char *) &nodelay, sizeof(int) ) )
- {
- char la_buf[MON_STRING_BUF_SIZE];
- int err = errno;
- sprintf( la_buf, "[%s], setsockopt() failed! errno=%d (%s)\n"
- , method_name, err, strerror( err ));
- mon_log_write(MON_CLUSTER_MKCLTSOCK_10, SQ_LOG_ERR, la_buf);
- close( sock );
- return ( -2 );
- }
-
- if ( setsockopt( sock, SOL_SOCKET, SO_REUSEADDR, (char *) &reuse, sizeof(int) ) )
- {
- char la_buf[MON_STRING_BUF_SIZE];
- int err = errno;
- sprintf( la_buf, "[%s], setsockopt() failed! errno=%d (%s)\n"
- , method_name, err, strerror( err ));
- mon_log_write(MON_CLUSTER_MKCLTSOCK_11, SQ_LOG_ERR, la_buf);
- close( sock );
- return ( -2 );
- }
-
- int lv_retcode = SetKeepAliveSockOpt( sock );
- if ( lv_retcode != 0 )
- {
- return lv_retcode;
- }
-
- TRACE_EXIT;
- return ( sock );
-}
-
int CCluster::ReceiveMPI(char *buf, int size, int source, MonXChngTags tag, MPI_Comm comm)
{
const char method_name[] = "CCluster::ReceiveMPI";
@@ -10278,163 +9165,3 @@
return error;
}
-int CCluster::ReceiveSock(char *buf, int size, int sockFd, const char *desc)
-{
- const char method_name[] = "CCluster::ReceiveSock";
- TRACE_ENTRY;
-
- bool readAgain = false;
- int error = 0;
- int readCount = 0;
- int received = 0;
- int sizeCount = size;
-
- do
- {
- readCount = (int) recv( sockFd
- , buf
- , sizeCount
- , 0 );
- if ( readCount > 0 ) Meas.addSockRcvdBytes( readCount );
-
- if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY))
- {
- trace_printf( "%s@%d - recv(%d), sock=%d, readCount=%d, desc=%s\n"
- , method_name, __LINE__
- , sizeCount
- , sockFd
- , readCount
- , desc );
- }
-
- if ( readCount > 0 )
- { // Got data
- received += readCount;
- buf += readCount;
- if ( received == size )
- {
- readAgain = false;
- }
- else
- {
- sizeCount -= readCount;
- readAgain = true;
- }
- }
- else if ( readCount == 0 )
- { // EOF
- error = ENODATA;
- readAgain = false;
- }
- else
- { // Got an error
- if ( errno != EINTR)
- {
- error = errno;
- char la_buf[MON_STRING_BUF_SIZE];
- sprintf( la_buf, "[%s], recv(), received=%d, sock=%d, error=%d(%s), desc=%s\n"
- , method_name
- , received
- , sockFd
- , error, strerror(error)
- , desc );
- mon_log_write(MON_CLUSTER_RECEIVESOCK_1, SQ_LOG_ERR, la_buf);
- readAgain = false;
- }
- else
- {
- readAgain = true;
- }
- }
- }
- while( readAgain );
-
- if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY))
- {
- trace_printf( "%s@%d - recv(), received=%d, sock=%d, error=%d(%s), desc=%s\n"
- , method_name, __LINE__
- , received
- , sockFd
- , error, strerror(error)
- , desc );
- }
-
- TRACE_EXIT;
- return error;
-}
-
-int CCluster::SendSock(char *buf, int size, int sockFd, const char *desc)
-{
- const char method_name[] = "CCluster::SendSock";
- TRACE_ENTRY;
-
- bool sendAgain = false;
- int error = 0;
- int sendCount = 0;
- int sent = 0;
-
- do
- {
- sendCount = (int) send( sockFd
- , buf
- , size
- , 0 );
- if ( sendCount > 0 ) Meas.addSockSentBytes( sendCount );
-
- if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY))
- {
- trace_printf( "%s@%d - send(), sock=%d, sendCount=%d, desc=%s\n"
- , method_name, __LINE__
- , sockFd
- , sendCount
- , desc );
- }
-
- if ( sendCount > 0 )
- { // Sent data
- sent += sendCount;
- if ( sendCount == size )
- {
- sendAgain = false;
- }
- else
- {
- sendAgain = true;
- }
- }
- else
- { // Got an error
- if ( errno != EINTR)
- {
- error = errno;
- char la_buf[MON_STRING_BUF_SIZE];
- sprintf( la_buf, "[%s], send(), sent=%d, sock=%d, error=%d(%s), desc=%s\n"
- , method_name
- , sent
- , sockFd
- , error, strerror(error)
- , desc );
- mon_log_write(MON_CLUSTER_SENDSOCK_1, SQ_LOG_ERR, la_buf);
- sendAgain = false;
- }
- else
- {
- sendAgain = true;
- }
- }
- }
- while( sendAgain );
-
- if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY))
- {
- trace_printf( "%s@%d - send(), sent=%d, sock=%d, error=%d(%s), desc=%s\n"
- , method_name, __LINE__
- , sent
- , sockFd
- , error, strerror(error)
- , desc );
- }
-
- TRACE_EXIT;
- return error;
-}
diff --git a/core/sqf/monitor/linux/cluster.h b/core/sqf/monitor/linux/cluster.h
index 2c22fbf..c2288d8 100644
--- a/core/sqf/monitor/linux/cluster.h
+++ b/core/sqf/monitor/linux/cluster.h
@@ -26,6 +26,7 @@
#ifndef CLUSTER_H_
#define CLUSTER_H_
+#include "comm.h"
#include "pnode.h"
#include "msgdef.h"
#include "internal.h"
@@ -73,7 +74,7 @@
class CNode;
class CLNode;
-class CCluster
+class CCluster : public CComm
{
protected:
int eyecatcher_; // Debuggging aid -- leave as first
@@ -110,24 +111,6 @@
int AcceptCommSock( void );
int AcceptSyncSock( void );
-#ifdef NAMESERVER_PROCESS
- int AcceptMon2NsSock( void );
-#else
- int AcceptPtPSock( void );
-#endif
- int Connect( const char *portName, bool doRetries = true );
- void Connect( int socketPort );
-#ifdef NAMESERVER_PROCESS
- void ConnectToMon2NsCommSelf( void );
-#else
- void ConnectToPtPCommSelf( void );
-#endif
-#ifdef NAMESERVER_PROCESS
- void ConnectToMonCommSelf( void );
-#endif
- void ConnectToSelf( void );
- int SetKeepAliveSockOpt( int sock );
- int MkCltSock( const char *portName );
#ifndef USE_BARRIER
void ArmWakeUpSignal (void);
#endif
@@ -205,10 +188,7 @@
bool IsNodeDownDeathNotices() { return nodeDownDeathNotices_; }
int ReceiveMPI(char *buf, int size, int source, MonXChngTags tag, MPI_Comm comm);
- int ReceiveSock(char *buf, int size, int sockFd, const char *desc);
int SendMPI(char *buf, int size, int source, MonXChngTags tag, MPI_Comm comm);
- int SendSock(char *buf, int size, int sockFd, const char *desc);
-
bool ReinitializeConfigCluster( bool nodeAdded, int pnid );
int incrGetVerifierNum();
@@ -243,16 +223,9 @@
protected:
int *socks_;
int *sockPorts_;
- int commSock_;
int syncPort_;
int syncSock_;
-#ifdef NAMESERVER_PROCESS
- int mon2nsSock_;
-#else
- int ptpSock_;
-#endif
int epollFD_;
- int epollPingFD_;
int *indexToPnid_;
int configMaster_;
@@ -434,11 +407,6 @@
void InitClusterSocks( int worldSize, int myRank, char *nodeNames,int *rankToPnid );
void InitServerSock( void );
- int AcceptSock( int sock );
- void EpollCtl( int efd, int op, int fd, struct epoll_event *event );
- void EpollCtlDelete( int efd, int fd, struct epoll_event *event );
- int MkSrvSock( int *pport );
- int MkCltSock( unsigned char srcip[4], unsigned char dstip[4], int port );
};
diff --git a/core/sqf/monitor/linux/comm.cxx b/core/sqf/monitor/linux/comm.cxx
new file mode 100644
index 0000000..88cd208
--- /dev/null
+++ b/core/sqf/monitor/linux/comm.cxx
@@ -0,0 +1,1757 @@
+///////////////////////////////////////////////////////////////////////////////
+//
+// @@@ START COPYRIGHT @@@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// @@@ END COPYRIGHT @@@
+//
+///////////////////////////////////////////////////////////////////////////////
+
+#include <iostream>
+
+using namespace std;
+
+#include <errno.h>
+#include <limits.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <netdb.h>
+#include <unistd.h>
+#include <sys/epoll.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <netinet/tcp.h>
+
+#include "monlogging.h"
+#include "montrace.h"
+#include "comm.h"
+
+const char *EpollEventString( __uint32_t events );
+const char *EpollOpString( int op );
+
+CComm::CComm( void )
+ :epollFd_(-1)
+{
+ const char method_name[] = "CComm::CComm";
+ TRACE_ENTRY;
+
+ // Add eyecatcher sequence as a debugging aid
+ memcpy(&eyecatcher_, "COMM", 4);
+
+ epollFd_ = epoll_create1( EPOLL_CLOEXEC );
+ if ( epollFd_ < 0 )
+ {
+ char ebuff[256];
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf), "[%s@%d] epoll_create1(sendrecv) error: %s\n",
+ method_name, __LINE__, strerror_r( errno, ebuff, 256 ) );
+ mon_log_write( COMM_COMM_1, SQ_LOG_CRIT, buf );
+
+ mon_failure_exit();
+ }
+
+ TRACE_EXIT;
+}
+
+CComm::~CComm( void )
+{
+ const char method_name[] = "CComm::~CComm";
+ TRACE_ENTRY;
+
+ if (epollFd_ != -1)
+ {
+ close( epollFd_ );
+ }
+
+ // Alter eyecatcher sequence as a debugging aid to identify deleted object
+ memcpy(&eyecatcher_, "comm", 4);
+
+ TRACE_EXIT;
+}
+
+int CComm::Accept( int listenSock )
+{
+ const char method_name[] = "CComm::Accept";
+ TRACE_ENTRY;
+
+#if defined(_XOPEN_SOURCE_EXTENDED)
+#ifdef __LP64__
+ socklen_t size; // size of socket address
+#else
+ size_t size; // size of socket address
+#endif
+#else
+ int size; // size of socket address
+#endif
+ int csock; // connected socket
+ struct sockaddr_in sockinfo; // socket address info
+
+ size = sizeof(struct sockaddr *);
+ if ( getsockname( listenSock, (struct sockaddr *) &sockinfo, &size ) )
+ {
+ char buf[MON_STRING_BUF_SIZE];
+ int err = errno;
+ snprintf(buf, sizeof(buf), "[%s], getsockname() failed, errno=%d (%s).\n",
+ method_name, err, strerror(err));
+ mon_log_write(COMM_ACCEPT_1, SQ_LOG_ERR, buf);
+ return ( -1 );
+ }
+
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ unsigned char *addrp = (unsigned char *) &sockinfo.sin_addr.s_addr;
+ trace_printf( "%s@%d - Accepting socket on addr=%d.%d.%d.%d, port=%d\n"
+ , method_name, __LINE__
+ , addrp[0]
+ , addrp[1]
+ , addrp[2]
+ , addrp[3]
+ , (int) ntohs( sockinfo.sin_port ) );
+ }
+
+ while ( ((csock = accept( listenSock
+ , (struct sockaddr *) 0
+ , (socklen_t *) 0 ) ) < 0) && (errno == EINTR) );
+
+ if ( csock > 0 )
+ {
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ unsigned char *addrp = (unsigned char *) &sockinfo.sin_addr.s_addr;
+ trace_printf( "%s@%d - Accepted socket on addr=%d.%d.%d.%d, "
+ "port=%d, listenSock=%d, csock=%d\n"
+ , method_name, __LINE__
+ , addrp[0]
+ , addrp[1]
+ , addrp[2]
+ , addrp[3]
+ , (int) ntohs( sockinfo.sin_port )
+ , listenSock
+ , csock );
+ }
+
+ int nodelay = 1;
+ if ( setsockopt( csock
+ , IPPROTO_TCP
+ , TCP_NODELAY
+ , (char *) &nodelay
+ , sizeof(int) ) )
+ {
+ char buf[MON_STRING_BUF_SIZE];
+ int err = errno;
+ snprintf(buf, sizeof(buf), "[%s], setsockopt() failed, errno=%d (%s).\n",
+ method_name, err, strerror(err));
+ mon_log_write(COMM_ACCEPT_2, SQ_LOG_ERR, buf);
+ return ( -2 );
+ }
+
+ int reuse = 1;
+ if ( setsockopt( csock
+ , SOL_SOCKET
+ , SO_REUSEADDR
+ , (char *) &reuse
+ , sizeof(int) ) )
+ {
+ char buf[MON_STRING_BUF_SIZE];
+ int err = errno;
+ snprintf(buf, sizeof(buf), "[%s], setsockopt() failed, errno=%d (%s).\n",
+ method_name, err, strerror(err));
+ mon_log_write(COMM_ACCEPT_3, SQ_LOG_ERR, buf);
+ return ( -2 );
+ }
+ }
+
+ TRACE_EXIT;
+ return ( csock );
+}
+
+void CComm::ConnectLocal( int port )
+{
+ const char method_name[] = "CComm::ConnectLocal";
+ TRACE_ENTRY;
+
+ int sock; // socket
+ int ret; // returned value
+#if defined(_XOPEN_SOURCE_EXTENDED)
+#ifdef __LP64__
+ socklen_t size; // size of socket address
+#else
+ size_t size; // size of socket address
+#endif
+#else
+ int size; // size of socket address
+#endif
+ static int retries = 0; // # times to retry connect
+ int connect_failures = 0; // # failed connects
+ char *p; // getenv results
+ struct sockaddr_in sockinfo; // socket address info
+ struct hostent *he;
+
+ size = sizeof(sockinfo);
+
+ if ( !retries )
+ {
+ p = getenv( "HPMP_CONNECT_RETRIES" );
+ if ( p ) retries = atoi( p );
+ else retries = 5;
+ }
+
+ sock = socket( AF_INET, SOCK_STREAM, 0 );
+ if ( sock < 0 )
+ {
+ char la_buf[MON_STRING_BUF_SIZE];
+ int err = errno;
+ sprintf( la_buf, "[%s], socket() failed! errno=%d (%s)\n"
+ , method_name, err, strerror( err ));
+ mon_log_write( COMM_CONNECTLOCAL_1, SQ_LOG_CRIT, la_buf );
+
+ mon_failure_exit();
+ }
+
+ he = gethostbyname( "localhost" );
+ if ( !he )
+ {
+ char ebuff[256];
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf), "[%s@%d] gethostbyname(%s) error: %s\n",
+ method_name, __LINE__, "localhost", strerror_r( h_errno, ebuff, 256 ) );
+ mon_log_write( COMM_CONNECTLOCAL_2, SQ_LOG_CRIT, buf );
+
+ mon_failure_exit();
+ }
+
+ // Connect socket.
+ memset( (char *) &sockinfo, 0, size );
+ memcpy( (char *) &sockinfo.sin_addr, (char *) he->h_addr, 4 );
+ sockinfo.sin_family = AF_INET;
+ sockinfo.sin_port = htons( (unsigned short) port );
+
+ connect_failures = 0;
+ ret = 1;
+ while ( ret != 0 && connect_failures <= 10 )
+ {
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Connecting to localhost addr=%d.%d.%d.%d, port=%d, connect_failures=%d\n"
+ , method_name, __LINE__
+ , (int)((unsigned char *)he->h_addr)[0]
+ , (int)((unsigned char *)he->h_addr)[1]
+ , (int)((unsigned char *)he->h_addr)[2]
+ , (int)((unsigned char *)he->h_addr)[3]
+ , port
+ , connect_failures );
+ }
+
+ ret = connect( sock, (struct sockaddr *) &sockinfo, size );
+ if ( ret == 0 ) break;
+ if ( errno == EINTR )
+ {
+ ++connect_failures;
+ }
+ else
+ {
+ char la_buf[MON_STRING_BUF_SIZE];
+ int err = errno;
+ sprintf( la_buf, "[%s], connect() failed! errno=%d (%s)\n"
+ , method_name, err, strerror( err ));
+ mon_log_write(COMM_CONNECTLOCAL_3, SQ_LOG_CRIT, la_buf);
+
+ mon_failure_exit();
+ }
+ }
+
+ close( sock );
+
+ TRACE_EXIT;
+}
+
+int CComm::Connect( const char *portName, bool doRetries )
+{
+ const char method_name[] = "CComm::Connect";
+ TRACE_ENTRY;
+
+ int sock; // socket
+ int ret; // returned value
+ int nodelay = 1; // sockopt reuse option
+ int reuse = 1; // sockopt reuse option
+#if defined(_XOPEN_SOURCE_EXTENDED)
+#ifdef __LP64__
+ socklen_t size; // size of socket address
+#else
+ size_t size; // size of socket address
+#endif
+#else
+ int size; // size of socket address
+#endif
+ static int retries = 0; // # times to retry connect
+ int outer_failures = 0; // # failed connect loops
+ int connect_failures = 0; // # failed connects
+ char *p; // getenv results
+ struct sockaddr_in sockinfo; // socket address info
+ struct hostent *he;
+ char host[1000];
+ const char *colon;
+ unsigned int port;
+
+ colon = strstr(portName, ":");
+ strcpy(host, portName);
+ int len = colon - portName;
+ host[len] = '\0';
+ port = atoi(&colon[1]);
+ size = sizeof(sockinfo);
+
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Connecting to %s:%d\n"
+ , method_name, __LINE__
+ , host
+ , port );
+ }
+
+ if ( !retries )
+ {
+ p = getenv( "HPMP_CONNECT_RETRIES" );
+ if ( p ) retries = atoi( p );
+ else retries = 5;
+ }
+
+ for ( ;; )
+ {
+ sock = socket( AF_INET, SOCK_STREAM, 0 );
+ if ( sock < 0 )
+ {
+ char la_buf[MON_STRING_BUF_SIZE];
+ int err = errno;
+ sprintf( la_buf, "[%s], socket() failed! errno=%d (%s)\n"
+ , method_name, err, strerror( err ));
+ mon_log_write( COMM_CONNECT_1, SQ_LOG_ERR, la_buf );
+ return ( -1 );
+ }
+
+ he = gethostbyname( host );
+ if ( !he )
+ {
+ char ebuff[256];
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf), "[%s@%d] gethostbyname(%s) error: %s\n",
+ method_name, __LINE__, host, strerror_r( h_errno, ebuff, 256 ) );
+ mon_log_write( COMM_CONNECT_2, SQ_LOG_ERR, buf );
+ close( sock );
+ return ( -1 );
+ }
+
+ // Connect socket.
+ memset( (char *) &sockinfo, 0, size );
+ memcpy( (char *) &sockinfo.sin_addr, (char *) he->h_addr, 4 );
+ sockinfo.sin_family = AF_INET;
+ sockinfo.sin_port = htons( (unsigned short) port );
+
+ // Note the outer loop uses "retries" from HPMP_CONNECT_RETRIES,
+ // and has a yield between each retry, since it's more oriented
+ // toward failures from network overload and putting a pause
+ // between retries. This inner loop should only iterate when
+ // a signal interrupts the local process, so it doesn't pause
+ // or use the same HPMP_CONNECT_RETRIES count.
+ connect_failures = 0;
+ ret = 1;
+ while ( ret != 0 && connect_failures <= 10 )
+ {
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Connecting to %s, addr=%d.%d.%d.%d, "
+ "port=%d, doRetries=%d, connect_failures=%d\n"
+ , method_name, __LINE__
+ , portName
+ , (int)((unsigned char *)he->h_addr)[0]
+ , (int)((unsigned char *)he->h_addr)[1]
+ , (int)((unsigned char *)he->h_addr)[2]
+ , (int)((unsigned char *)he->h_addr)[3]
+ , port
+ , doRetries
+ , connect_failures );
+ }
+
+ ret = connect( sock, (struct sockaddr *) &sockinfo, size );
+ if ( ret == 0 ) break;
+
+ ++connect_failures;
+
+#ifdef NAMESERVER_PROCESS
+ if ( errno == ECONNREFUSED )
+ {
+ ++connect_failures;
+ sleep( 1 );
+ }
+ else
+#endif
+ if ( errno != EINTR )
+ {
+ if (doRetries)
+ {
+ char la_buf[MON_STRING_BUF_SIZE];
+ int err = errno;
+ sprintf( la_buf, "[%s], connect(%s) failed! errno=%d (%s)\n"
+ , method_name, portName, err, strerror( err ));
+ mon_log_write( COMM_CONNECT_3, SQ_LOG_ERR, la_buf );
+ struct timespec req, rem;
+ req.tv_sec = 0;
+ req.tv_nsec = 500000000L; // 500,000,000
+ nanosleep( &req, &rem );
+ }
+ else
+ {
+ char la_buf[MON_STRING_BUF_SIZE];
+ int err = errno;
+ sprintf( la_buf, "[%s], connect(%s) failed! errno=%d (%s)\n"
+ , method_name, portName, err, strerror( err ));
+ mon_log_write( COMM_CONNECT_4, SQ_LOG_ERR, la_buf );
+ close(sock);
+ return ( -1 );
+ }
+ }
+ } // while
+
+ if ( ret == 0 ) break;
+
+ if (doRetries == false)
+ {
+ close( sock );
+ TRACE_EXIT;
+ return( -1 );
+ }
+
+ // For large clusters, the connect/accept calls seem to fail occasionally,
+ // no doubt do to the large number (1000's) of simultaneous connect packets
+ // flooding the network at once. So, we retry up to HPMP_CONNECT_RETRIES
+ // number of times.
+ if ( errno != EINTR )
+ {
+ if ( ++outer_failures > retries )
+ {
+ char la_buf[MON_STRING_BUF_SIZE];
+ sprintf( la_buf, "[%s], connect(%s) exceeded retries! count=%d\n"
+ , method_name, portName, retries);
+ mon_log_write( COMM_CONNECT_5, SQ_LOG_ERR, la_buf );
+ close( sock );
+ TRACE_EXIT;
+ return ( -1 );
+ }
+ struct timespec req, rem;
+ req.tv_sec = 0;
+ req.tv_nsec = 500000; // 500,000
+ nanosleep( &req, &rem );
+ }
+ close( sock );
+ sock = -1;
+ } // for
+
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Connected to %s addr=%d.%d.%d.%d, port=%d, sock=%d\n"
+ , method_name, __LINE__
+ , host
+ , (int)((unsigned char *)he->h_addr)[0]
+ , (int)((unsigned char *)he->h_addr)[1]
+ , (int)((unsigned char *)he->h_addr)[2]
+ , (int)((unsigned char *)he->h_addr)[3]
+ , port
+ , sock );
+ }
+
+ if ( setsockopt( sock, IPPROTO_TCP, TCP_NODELAY, (char *) &nodelay, sizeof(int) ) )
+ {
+ char la_buf[MON_STRING_BUF_SIZE];
+ int err = errno;
+ sprintf( la_buf, "[%s], setsockopt() failed! errno=%d (%s)\n"
+ , method_name, err, strerror( err ));
+ mon_log_write( COMM_CONNECT_6, SQ_LOG_ERR, la_buf );
+ close( sock );
+ TRACE_EXIT;
+ return ( -2 );
+ }
+
+ if ( setsockopt( sock, SOL_SOCKET, SO_REUSEADDR, (char *) &reuse, sizeof(int) ) )
+ {
+ char la_buf[MON_STRING_BUF_SIZE];
+ int err = errno;
+ sprintf( la_buf, "[%s], setsockopt() failed! errno=%d (%s)\n"
+ , method_name, err, strerror( err ));
+ mon_log_write( COMM_CONNECT_7, SQ_LOG_ERR, la_buf );
+ close( sock );
+ TRACE_EXIT;
+ return ( -2 );
+ }
+
+ TRACE_EXIT;
+ return ( sock );
+}
+
+int CComm::Connect( unsigned char srcip[4]
+ , unsigned char dstip[4]
+ , int port )
+{
+ const char method_name[] = "CComm::Connect";
+ TRACE_ENTRY;
+
+ int sock; // socket
+ int ret; // returned value
+ int reuse = 1; // sockopt reuse option
+ int nodelay = 1; // sockopt nodelay option
+#if defined(_XOPEN_SOURCE_EXTENDED)
+#ifdef __LP64__
+ socklen_t size; // size of socket address
+#else
+ size_t size; // size of socket address
+#endif
+#else
+ int size; // size of socket address
+#endif
+ static int retries = 0; // # times to retry connect
+ int outer_failures = 0; // # failed connect loops
+ int connect_failures = 0; // # failed connects
+ char *p; // getenv results
+ struct sockaddr_in sockinfo; // socket address info
+
+ size = sizeof(sockinfo);
+ const char * size_srcip = (const char *) srcip;
+
+ if ( !retries )
+ {
+ p = getenv( "HPMP_CONNECT_RETRIES" );
+ if ( p ) retries = atoi( p );
+ else retries = 5;
+ }
+
+ for ( ;; )
+ {
+ sock = socket( AF_INET, SOCK_STREAM, 0 );
+ if ( sock < 0 ) return ( -1 );
+
+ // Bind local address if specified.
+ if ( srcip )
+ {
+ memset( (char *) &sockinfo, 0, size );
+ memcpy( (char *) &sockinfo.sin_addr,
+ (unsigned char *) srcip, strlen(size_srcip));
+
+ sockinfo.sin_family = AF_INET;
+ sockinfo.sin_port = 0;
+ if ( bind( sock, (struct sockaddr *) &sockinfo, size ) )
+ {
+ char la_buf[MON_STRING_BUF_SIZE];
+ int err = errno;
+ sprintf( la_buf, "[%s], bind() failed! errno=%d (%s)\n"
+ , method_name, err, strerror( err ));
+ mon_log_write( COMM_CONNECT_13, SQ_LOG_ERR, la_buf );
+ close( sock );
+ return ( -1 );
+ }
+ }
+
+ // Connect socket.
+ memset( (char *) &sockinfo, 0, size );
+ memcpy( (char *) &sockinfo.sin_addr, (char *) dstip, 4 );
+ sockinfo.sin_family = AF_INET;
+ sockinfo.sin_port = htons( (unsigned short) port );
+
+ // Note the outer loop uses "retries" from HPMP_CONNECT_RETRIES,
+ // and has a yield between each retry, since it's more oriented
+ // toward failures from network overload and putting a pause
+ // between retries. This inner loop should only iterate when
+ // a signal interrupts the local process, so it doesn't pause
+ // or use the same HPMP_CONNECT_RETRIES count.
+ connect_failures = 0;
+ ret = 1;
+ while ( ret != 0 && connect_failures <= 10 )
+ {
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Connecting to addr=%d.%d.%d.%d, port=%d, connect_failures=%d\n"
+ , method_name, __LINE__
+ , (int)dstip[0]
+ , (int)dstip[1]
+ , (int)dstip[2]
+ , (int)dstip[3]
+ , port
+ , connect_failures );
+ }
+
+ ret = connect( sock, (struct sockaddr *) &sockinfo,
+ size );
+ if ( ret == 0 ) break;
+ if ( errno == EINTR )
+ {
+ ++connect_failures;
+ }
+#ifdef NAMESERVER_PROCESS
+ else if ( errno == ECONNREFUSED )
+ {
+ ++connect_failures;
+ sleep( 1 );
+ }
+#endif
+ else
+ {
+ char la_buf[MON_STRING_BUF_SIZE];
+ int err = errno;
+ sprintf( la_buf, "[%s], connect(%d.%d.%d.%d:%d) failed! errno=%d (%s)\n"
+ , method_name
+ , (int)((unsigned char *)dstip)[0]
+ , (int)((unsigned char *)dstip)[1]
+ , (int)((unsigned char *)dstip)[2]
+ , (int)((unsigned char *)dstip)[3]
+ , port
+ , err, strerror( err ));
+ mon_log_write( COMM_CONNECT_14, SQ_LOG_ERR, la_buf );
+ close(sock);
+ return ( -1 );
+ }
+ }
+
+ if ( ret == 0 ) break;
+
+ // For large clusters, the connect/accept calls seem to fail occasionally,
+ // no doubt do to the large number (1000's) of simultaneous connect packets
+ // flooding the network at once. So, we retry up to HPMP_CONNECT_RETRIES
+ // number of times.
+ if ( errno != EINTR )
+ {
+ if ( ++outer_failures > retries )
+ {
+ char la_buf[MON_STRING_BUF_SIZE];
+ sprintf( la_buf, "[%s], connect() exceeded retries! count=%d\n"
+ , method_name, retries);
+ mon_log_write( COMM_CONNECT_15, SQ_LOG_ERR, la_buf );
+ close( sock );
+ return ( -1 );
+ }
+ struct timespec req, rem;
+ req.tv_sec = 0;
+ req.tv_nsec = 500000;
+ nanosleep( &req, &rem );
+ }
+ close( sock );
+ }
+
+ if ( setsockopt( sock, IPPROTO_TCP, TCP_NODELAY, (char *) &nodelay, sizeof(int) ) )
+ {
+ char la_buf[MON_STRING_BUF_SIZE];
+ int err = errno;
+ sprintf( la_buf, "[%s], setsockopt() failed! errno=%d (%s)\n"
+ , method_name, err, strerror( err ));
+ mon_log_write( COMM_CONNECT_16, SQ_LOG_ERR, la_buf );
+ close( sock );
+ return ( -2 );
+ }
+
+ if ( setsockopt( sock, SOL_SOCKET, SO_REUSEADDR, (char *) &reuse, sizeof(int) ) )
+ {
+ char la_buf[MON_STRING_BUF_SIZE];
+ int err = errno;
+ sprintf( la_buf, "[%s], setsockopt() failed! errno=%d (%s)\n"
+ , method_name, err, strerror( err ));
+ mon_log_write( COMM_CONNECT_17, SQ_LOG_ERR, la_buf );
+ close( sock );
+ return ( -2 );
+ }
+
+ int lv_retcode = SetKeepAliveSockOpt( sock );
+ if ( lv_retcode != 0 )
+ {
+ return lv_retcode;
+ }
+
+ TRACE_EXIT;
+ return ( sock );
+}
+
+int CComm::Close( int sock )
+{
+ const char method_name[] = "CComm::Close";
+ TRACE_ENTRY;
+
+ int rc = 0;
+ if (sock != -1)
+ {
+ rc = close( sock );
+ }
+
+ TRACE_EXIT;
+ return( rc );
+}
+
+void CComm::EpollCtl( int efd
+ , int op
+ , int fd
+ , struct epoll_event *event
+ , char *remoteName )
+{
+ const char method_name[] = "CComm::EpollCtl";
+ TRACE_ENTRY;
+#if 0
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d epoll_ctl( efd=%d,%s, fd=%d(%s), %s )\n"
+ , method_name, __LINE__
+ , efd
+ , EpollOpString(op)
+ , fd
+ , remoteName
+ , EpollEventString(event->events) );
+ }
+#endif
+ int rc = epoll_ctl( efd, op, fd, event );
+ if ( rc == -1 )
+ {
+ char ebuff[256];
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf), "[%s@%d] epoll_ctl(efd=%d, %s, fd=%d(%s), %s) error: %s\n"
+ , method_name, __LINE__
+ , efd
+ , EpollOpString(op)
+ , fd
+ , remoteName
+ , EpollEventString(event->events)
+ , strerror_r( errno, ebuff, 256 ) );
+ mon_log_write( COMM_EPOLLCTL_1, SQ_LOG_CRIT, buf );
+
+ mon_failure_exit();
+ }
+
+ TRACE_EXIT;
+ return;
+}
+
+void CComm::EpollCtlDelete( int efd
+ , int fd
+ , struct epoll_event *event
+ , char *remoteName )
+{
+ const char method_name[] = "CComm::EpollCtlDelete";
+ TRACE_ENTRY;
+
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d epoll_ctl( efd=%d,%s, fd=%d(%s), %s )\n"
+ , method_name, __LINE__
+ , efd
+ , EpollOpString(EPOLL_CTL_DEL)
+ , fd
+ , remoteName
+ , EpollEventString(event->events) );
+ }
+
+ // Remove old socket from epoll set, it may not be there
+ int rc = epoll_ctl( efd, EPOLL_CTL_DEL, fd, event );
+ if ( rc == -1 )
+ {
+ int err = errno;
+ if (err != ENOENT)
+ {
+ char ebuff[256];
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf), "[%s@%d] epoll_ctl(efd=%d, %s, fd=%d, %s) error: %s\n"
+ , method_name, __LINE__
+ , efd
+ , EpollOpString(EPOLL_CTL_DEL)
+ , fd
+ , EpollEventString(event->events)
+ , strerror_r( err, ebuff, 256 ) );
+ mon_log_write( COMM_EPOLLCTLDELETE_1, SQ_LOG_CRIT, buf );
+
+ mon_failure_exit();
+ }
+ }
+
+ TRACE_EXIT;
+ return;
+}
+
+int CComm::Listen( int *port )
+{
+ const char method_name[] = "CComm::Listen";
+ TRACE_ENTRY;
+
+ int sock; // socket
+ int err; // return code
+#if defined(_XOPEN_SOURCE_EXTENDED)
+#ifdef __LP64__
+ socklen_t size; // size of socket address
+#else
+ size_t size; // size of socket address
+#endif
+#else
+ unsigned int size; // size of socket address
+#endif
+ struct sockaddr_in sockinfo; // socket address info
+ sock = socket( AF_INET, SOCK_STREAM, 0 );
+ if ( sock < 0 )
+ {
+ char la_buf[MON_STRING_BUF_SIZE];
+ int err = errno;
+ sprintf( la_buf, "[%s], socket() failed! errno=%d (%s)\n"
+ , method_name, err, strerror( err ));
+ mon_log_write( COMM_LISTEN_1, SQ_LOG_CRIT, la_buf );
+ return ( -1 );
+ }
+
+ int nodelay = 1; // sockopt nodelay option
+ if ( setsockopt( sock, IPPROTO_TCP, TCP_NODELAY, (char *) &nodelay, sizeof(int) ) )
+ {
+ char la_buf[MON_STRING_BUF_SIZE];
+ int err = errno;
+ sprintf( la_buf, "[%s], setsockopt() failed! errno=%d (%s)\n"
+ , method_name, err, strerror( err ));
+ mon_log_write( COMM_LISTEN_2, SQ_LOG_ERR, la_buf );
+ close( sock );
+ return ( -2 );
+ }
+
+ int reuse = 1; // sockopt reuse option
+ if ( setsockopt( sock, SOL_SOCKET, SO_REUSEADDR, (char *) &reuse, sizeof(int) ) )
+ {
+ char la_buf[MON_STRING_BUF_SIZE];
+ int err = errno;
+ sprintf( la_buf, "[%s], setsockopt(SO_REUSEADDR) failed! errno=%d (%s)\n"
+ , method_name, err, strerror( err ));
+ mon_log_write( COMM_LISTEN_3, SQ_LOG_ERR, la_buf );
+ close( sock );
+ return ( -1 );
+ }
+
+ // Bind socket.
+ size = sizeof(sockinfo);
+ memset( (char *) &sockinfo, 0, size );
+ sockinfo.sin_family = AF_INET;
+ sockinfo.sin_addr.s_addr = htonl( INADDR_ANY );
+ sockinfo.sin_port = htons( *port );
+ int lv_bind_tries = 0;
+ do
+ {
+ if (lv_bind_tries > 0)
+ {
+ sleep(5);
+ }
+ err = bind( sock, (struct sockaddr *) &sockinfo, size );
+ sched_yield( );
+ } while ( err &&
+ (errno == EADDRINUSE) &&
+ (++lv_bind_tries < 4) );
+ if ( err )
+ {
+ char la_buf[MON_STRING_BUF_SIZE];
+ int err = errno;
+ sprintf( la_buf, "[%s], bind() failed! port=%d, errno=%d (%s)\n"
+ , method_name, *port, err, strerror( err ));
+ mon_log_write( COMM_LISTEN_4, SQ_LOG_CRIT, la_buf );
+ close( sock );
+ return ( -1 );
+ }
+ if ( port )
+ {
+ if ( getsockname( sock, (struct sockaddr *) &sockinfo, &size ) )
+ {
+ char la_buf[MON_STRING_BUF_SIZE];
+ int err = errno;
+ sprintf( la_buf, "[%s], getsockname() failed! errno=%d (%s)\n"
+ , method_name, err, strerror( err ));
+ mon_log_write( COMM_LISTEN_5, SQ_LOG_CRIT, la_buf );
+ close( sock );
+ return ( -1 );
+ }
+
+ *port = (int) ntohs( sockinfo.sin_port );
+ }
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ unsigned char *addrp = (unsigned char *) &sockinfo.sin_addr.s_addr;
+ trace_printf( "%s@%d listening on addr=%d.%d.%d.%d, port=%d\n"
+ , method_name, __LINE__
+ , addrp[0]
+ , addrp[1]
+ , addrp[2]
+ , addrp[3]
+ , port?*port:0);
+ }
+
+ int lv_retcode = SetKeepAliveSockOpt( sock );
+ if ( lv_retcode != 0 )
+ {
+ return lv_retcode;
+ }
+
+ // Listen
+ if ( listen( sock, SOMAXCONN ) )
+ {
+ char la_buf[MON_STRING_BUF_SIZE];
+ int err = errno;
+ sprintf( la_buf, "[%s], listen() failed! errno=%d (%s)\n"
+ , method_name, err, strerror( err ));
+ mon_log_write( COMM_LISTEN_6, SQ_LOG_CRIT, la_buf );
+ close( sock );
+ return ( -1 );
+ }
+
+ TRACE_EXIT;
+ return ( sock );
+}
+
+int CComm::Listen( const char *portName, int *port )
+{
+ const char method_name[] = "CComm::Listen";
+ TRACE_ENTRY;
+
+ *port = atoi(portName);
+
+ TRACE_EXIT;
+ return( CComm::Listen( port ) );
+}
+
+int CComm::Receive( int sockFd
+ , char *buf
+ , int size
+ , char *remoteName
+ , const char *desc )
+{
+ const char method_name[] = "CComm::Receive";
+ TRACE_ENTRY;
+
+ if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS | TRACE_NS))
+ {
+ trace_printf( "%s@%d (%s) - read from %s, sockFd=%d, size=%d\n"
+ , method_name, __LINE__, desc
+ , remoteName
+ , sockFd
+ , size );
+ }
+
+ bool readAgain = false;
+ int error = 0;
+ int readCount = 0;
+ int received = 0;
+ int sizeCount = size;
+
+ do
+ {
+ readCount = (int) recv( sockFd
+ , buf
+ , sizeCount
+ , 0 );
+ //if ( readCount > 0 ) Meas.addSockNsRcvdBytes( readCount );
+
+ if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS | TRACE_NS))
+ {
+ trace_printf( "%s@%d (%s) - read from %s, sockFd=%d, "
+ "count read=%d, sizeCount=%d\n"
+ , method_name, __LINE__, desc
+ , remoteName, sockFd
+ , readCount, sizeCount );
+ }
+
+ if ( readCount > 0 )
+ { // Got data
+ received += readCount;
+ buf += readCount;
+ if ( received == size )
+ {
+ readAgain = false;
+ }
+ else
+ {
+ sizeCount -= readCount;
+ readAgain = true;
+ }
+ }
+ else if ( readCount == 0 )
+ { // EOF
+ error = ENODATA;
+ readAgain = false;
+ }
+ else
+ { // Got an error
+ if ( errno != EINTR)
+ {
+ error = errno;
+ readAgain = false;
+ }
+ else
+ {
+ readAgain = true;
+ }
+ }
+ }
+ while( readAgain );
+
+ if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS | TRACE_NS))
+ {
+ trace_printf( "%s@%d - recv(), received=%d, error=%d(%s)\n"
+ , method_name, __LINE__
+ , received
+ , error, strerror(error) );
+ }
+
+ if (error)
+ {
+ int err = error;
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s](%s), unable to receive request from %s, size %d,"
+ "sockFd=%d, error: %d(%s)\n"
+ , method_name, desc, remoteName, size
+ , sockFd, err, strerror(err) );
+ mon_log_write( COMM_RECEIVE_1, SQ_LOG_ERR, buf );
+ }
+
+ TRACE_EXIT;
+ return error;
+}
+
+int CComm::ReceiveWait( int sockFd
+ , char *buf
+ , int size
+ , int timeout
+ , int retries
+ , char *remoteName
+ , const char *desc)
+{
+ const char method_name[] = "CComm::ReceiveWait";
+ TRACE_ENTRY;
+
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_SYNC))
+ {
+ trace_printf( "%s@%d (%s) - read from %s, sockFd=%d, size=%d\n"
+ , method_name, __LINE__, desc
+ , remoteName
+ , sockFd
+ , size );
+ }
+
+ int error = SendRecvWait( sockFd
+ , NULL // char *sendbuf
+ , 0 // int sendsize
+ , buf
+ , size
+ , timeout
+ , retries
+ , remoteName
+ , desc);
+
+ TRACE_EXIT;
+ return error;
+}
+
+int CComm::Send( int sockFd
+ , char *buf
+ , int size
+ , char *remoteName
+ , const char *desc )
+{
+ const char method_name[] = "CComm::Send";
+ TRACE_ENTRY;
+
+ if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS | TRACE_NS))
+ {
+ trace_printf( "%s@%d (%s) - read from %s, sockFd=%d, size=%d\n"
+ , method_name, __LINE__, desc
+ , remoteName
+ , sockFd
+ , size );
+ }
+
+ bool sendAgain = false;
+ int error = 0;
+ int sendCount = 0;
+ int sent = 0;
+
+ do
+ {
+ sendCount = (int) send( sockFd
+ , buf
+ , size
+ , 0 );
+ //if ( sendCount > 0 ) Meas.addSockNsSentBytes( sendCount );
+ if ( sendCount > 0 )
+ { // Sent data
+ sent += sendCount;
+ if ( sendCount == size )
+ {
+ sendAgain = false;
+ }
+ else
+ {
+ sendAgain = true;
+ }
+ }
+ else
+ { // Got an error
+ if ( errno != EINTR)
+ {
+ error = errno;
+ sendAgain = false;
+ }
+ else
+ {
+ sendAgain = true;
+ }
+ }
+ if (trace_settings & (TRACE_REQUEST | TRACE_PROCESS | TRACE_NS))
+ {
+ trace_printf( "%s@%d (%s) - send to %s, sockFd=%d, "
+ "count size=%d, sendCount=%d, sent=%d\n"
+ , method_name, __LINE__, desc
+ , remoteName, sockFd
+ , size, sendCount, sent );
+ }
+ }
+ while( sendAgain );
+
+ if ( trace_settings & TRACE_NS )
+ {
+ trace_printf( "%s@%d - send(), sent=%d, error=%d(%s)\n"
+ , method_name, __LINE__
+ , sent
+ , error, strerror(error) );
+ }
+
+ if (error)
+ {
+ int err = error;
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s](%s), unable to send request to %s, size %d,"
+ "sockFd=%d, error: %d(%s)\n"
+ , method_name, desc, remoteName, size
+ , sockFd, err, strerror(err) );
+ mon_log_write( COMM_SEND_1, SQ_LOG_ERR, buf );
+ }
+
+ TRACE_EXIT;
+ return error;
+}
+
+int CComm::SendWait(int sockFd
+ , char *buf
+ , int size
+ , int timeout
+ , int retries
+ , char *remoteName
+ , const char *desc)
+{
+ const char method_name[] = "CComm::SendWait";
+ TRACE_ENTRY;
+
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_SYNC))
+ {
+ trace_printf( "%s@%d (%s) - write to %s, sockFd=%d, size=%d\n"
+ , method_name, __LINE__, desc
+ , remoteName
+ , sockFd
+ , size );
+ }
+
+ int error = SendRecvWait( sockFd
+ , buf
+ , size
+ , NULL // char *recvbuf
+ , 0 // int recvsize
+ , timeout
+ , retries
+ , remoteName
+ , desc);
+
+ TRACE_EXIT;
+ return error;
+}
+
+int CComm::SendRecv( int sockFd
+ , char *sendbuf
+ , int sendsize
+ , char *recvbuf
+ , int recvsize
+ , char *remoteName
+ , const char *desc)
+{
+ const char method_name[] = "CComm::SendRecv";
+ TRACE_ENTRY;
+
+ int error = Send( sockFd
+ , sendbuf
+ , sendsize
+ , remoteName
+ , desc);
+ if (error != 0)
+ {
+ TRACE_EXIT;
+ return error;
+ }
+
+ error = Receive( sockFd
+ , recvbuf
+ , recvsize
+ , remoteName
+ , desc);
+
+ TRACE_EXIT;
+ return error;
+}
+
+int CComm::SendRecvWait( int sockFd
+ , char *sendbuf
+ , int sendsize
+ , char *recvbuf
+ , int recvsize
+ , int timeout
+ , int retries
+ , char *remoteName
+ , const char *desc)
+{
+ const char method_name[] = "CComm::SendRecvWait";
+ TRACE_ENTRY;
+
+ bool receiving = (recvsize > 0) ? true : false;
+ bool sending = (sendsize > 0) ? true : false;
+ bool sendAgain = false;
+ int error = 0;
+ int nrecv = 0;
+ int nsent = 0;
+ int num2recv = (receiving) ? 1 : 0;
+ int num2send = (sending) ? 1 : 0;
+ int received = 0;
+ int sent = 0;
+ int retry = 0;
+
+ struct epoll_event event;
+ event.data.fd = sockFd;
+ event.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP | EPOLLERR | EPOLLHUP;
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_SYNC))
+ {
+ trace_printf( "%s@%d (%s) - EPOLL state change "
+ "to/from %s, efd=%d, op=%s, sockFd=%d, event=%s\n"
+ , method_name, __LINE__, desc
+ , remoteName
+ , epollFd_
+ , EpollOpString(EPOLL_CTL_ADD)
+ , sockFd
+ , EpollEventString(event.events) );
+
+ }
+ EpollCtl( epollFd_, EPOLL_CTL_ADD, sockFd, &event, remoteName );
+
+ if (trace_settings & (TRACE_SYNC_DETAIL))
+ {
+ trace_printf( "%s@%d (%s) - write/read to/from %s, sockFd=%d, "
+ "sending=%d, sendsize=%d, receiving=%d, recvsize=%d, "
+ "wait_timeout=%d, retry_count=%d\n"
+ , method_name, __LINE__, desc
+ , remoteName
+ , sockFd
+ , sending
+ , sendsize
+ , receiving
+ , recvsize
+ , timeout
+ , retries );
+ }
+
+ struct epoll_event ioEvents;
+ while(1)
+ {
+ bool stateChange = false;
+ int maxEvents = num2recv + num2send - nsent - nrecv;
+ int nw;
+
+ if (trace_settings & (TRACE_SYNC_DETAIL))
+ {
+ trace_printf( "%s@%d (%s) - write/read to/from %s, "
+ "sockFd=%d, maxEvents=%d, nsent=%d, nrecv=%d, "
+ "sending=%d (%d), "
+ "sent=%d, "
+ "receiving=%d (%d), "
+ "received=%d\n"
+ , method_name, __LINE__, desc, remoteName
+ , sockFd, maxEvents, nsent, nrecv
+ , sending, sendsize
+ , sent
+ , receiving, recvsize
+ , received );
+ }
+
+do_again:
+
+ if (maxEvents == 0) break;
+
+ while(1)
+ {
+ nw = epoll_wait( epollFd_, &ioEvents, maxEvents, timeout );
+ if ( nw >= 0 || errno != EINTR ) break;
+ }
+
+ if ( nw == 0 )
+ { // Timeout, no fd's ready
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_SYNC))
+ {
+ trace_printf( "%s@%d (%s) - IO timedout! (node=%s, retry=%d, "
+ "timeout=%d, retries=%d) - "
+ "sending=%d (%d), "
+ "sent=%d, "
+ "receiving=%d (%d), "
+ "received=%d\n"
+ , method_name, __LINE__
+ , desc, remoteName, retry, timeout, retries
+ , sending, sendsize
+ , sent
+ , receiving, recvsize
+ , received );
+ }
+
+ retry++;
+ if (retry < retries)
+ {
+ goto do_again;
+ }
+
+ error = ETIMEDOUT;
+ if ( sending )
+ {
+ nsent++;
+ sending = false;
+ }
+ if ( receiving )
+ {
+ nrecv++;
+ receiving = false;
+ }
+ stateChange = true;
+ goto early_exit;
+ } // ( nw == 0 )
+
+ if (nw < 0)
+ { // Got an error
+ char ebuff[256];
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf), "[%s@%d] (%s) - epoll_wait(%d, %d) error: %s\n",
+ method_name, __LINE__, desc, epollFd_, maxEvents,
+ strerror_r( errno, ebuff, 256 ) );
+ mon_log_write( COMM_SENDRECVWAIT_1, SQ_LOG_CRIT, buf );
+
+ mon_failure_exit();
+ }
+ else
+ {
+ if ((ioEvents.events & EPOLLERR) ||
+ (ioEvents.events & EPOLLHUP) ||
+ (!(ioEvents.events & (EPOLLIN|EPOLLOUT))))
+ {
+ // An error has occurred on this fd, or the socket is not
+ // ready for reading nor writing
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s@%d] (%s) - Error: node=%s, events.data.fd=%d, event=%s\n"
+ , method_name, __LINE__, desc
+ , remoteName
+ , ioEvents.data.fd
+ , EpollEventString(ioEvents.events) );
+ mon_log_write( COMM_SENDRECVWAIT_2, SQ_LOG_CRIT, buf );
+ error = -1;
+ if ( sending )
+ {
+ nsent++;
+ sending = false;
+ }
+ if ( receiving )
+ {
+ nrecv++;
+ receiving = false;
+ }
+ stateChange = true;
+ goto early_exit;
+ }
+ if (receiving && ioEvents.events & EPOLLIN)
+ { // Got receive (read) completion
+ int eagain_ok = 0;
+ int recverror = 0;
+read_again:
+ char *r = &((char *)recvbuf)[received];
+ int n2get = recvsize - received;
+ int nr = 0;
+ while ( 1 )
+ {
+ if (trace_settings & (TRACE_SYNC_DETAIL))
+ {
+ trace_printf( "%s@%d (%s) - EPOLLIN from %s,"
+ " receiving=%d (%d)"
+ " received=%d"
+ " nr=%d"
+ " n2get=%d"
+ " recverror=%d(%s)\n"
+ , method_name, __LINE__, desc
+ , remoteName
+ , receiving, n2get
+ , received
+ , nr
+ , n2get
+ , recverror, strerror(recverror) );
+ }
+ nr = recv( sockFd, r, n2get, 0 );
+ recverror = errno;
+// if ( nr > 0 ) Meas.addSockRcvdBytes( (nr<n2get)?nr:0 );
+ if ( nr >= 0 || recverror == EINTR ) break;
+ if ( nr > n2get || nr <= 0 ) break;
+ }
+
+ if (nr > n2get || nr <= 0)
+ {
+ if (recverror == 0)
+ { // Timeout
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_SYNC))
+ {
+ trace_printf( "%s@%d (%s) - IO timeout!\n"
+ , method_name, __LINE__, desc );
+ }
+ }
+ else
+ {
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s@%d] (%s) - recv(%d) from %s error %d (%s)\n"
+ , method_name, __LINE__, desc
+ , nr
+ , remoteName
+ , recverror, strerror(recverror) );
+ mon_log_write( COMM_SENDRECVWAIT_3, SQ_LOG_CRIT, buf );
+ }
+ error = -1;
+ nrecv++;
+ receiving = false;
+ stateChange = true;
+ goto early_exit;
+ }
+
+ if ( nr < 0 )
+ {
+ if ( nr < 0 && eagain_ok && recverror == EAGAIN )
+ {
+ // do nothing
+ }
+ else
+ {
+ // error, down socket
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s@%d] (%s) - recv(%d) from %s error %d (%s)\n"
+ , method_name, __LINE__, desc
+ , nr
+ , remoteName
+ , recverror, strerror(recverror) );
+ mon_log_write( COMM_SENDRECVWAIT_4, SQ_LOG_CRIT, buf );
+ nrecv++;
+ receiving = false;
+ if ( sending )
+ {
+ nsent++;
+ sending = false;
+ }
+ stateChange = true;
+ }
+ }
+ else
+ {
+ received += nr;
+ // reading buffer, update counters
+ n2get -= nr;
+ if ( n2get > 0 )
+ {
+ eagain_ok = 1;
+ goto read_again;
+ }
+ if ( n2get < 0 )
+ {
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf),
+ "[%s@%d] (%s) - error n2get=%d\n",
+ method_name, __LINE__, desc, n2get );
+ mon_log_write( COMM_SENDRECVWAIT_5, SQ_LOG_CRIT, buf );
+
+ mon_failure_exit();
+ }
+ if ( n2get == 0 )
+ {
+ // this buffer is done
+ nrecv++;
+ receiving = false;
+ if (trace_settings & (TRACE_SYNC_DETAIL))
+ {
+ trace_printf( "%s@%d (%s) - EPOLLIN from %s,"
+ " receiving=%d (%d)"
+ " received=%d"
+ " n2get=%d\n"
+ , method_name, __LINE__, desc
+ , remoteName
+ , receiving, n2get
+ , received
+ , n2get );
+ }
+ stateChange = true;
+ }
+ }
+ }
+ if (sending && ioEvents.events & EPOLLOUT)
+ { // Got send (write) completion
+ char *s = &((char *)sendbuf)[sent];
+ int n2send = sendsize - sent;
+ int ns;
+ int senderror = 0;
+ while ( 1 )
+ {
+ if (trace_settings & (TRACE_SYNC_DETAIL))
+ {
+ trace_printf( "%s@%d (%s) - EPOLLOUT to %s,"
+ " sending=%d (%d),"
+ " sent=%d, "
+ " recverror=%d(%s)\n"
+ , method_name, __LINE__, desc
+ , remoteName
+ , sendsize, n2send
+ , sent
+ , senderror, strerror(senderror) );
+ }
+ ns = send( sockFd, s, n2send, 0 );
+ int senderror = errno;
+// if ( ns > 0 ) Meas.addSockSentBytes( ns );
+ if ( ns >= 0 || senderror != EINTR ) break;
+ if ( ns > n2send || ns <= 0 ) break;
+ }
+
+ if (ns > n2send || ns <= 0)
+ { // Timeout
+ if (senderror == 0)
+ { // Timeout
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY | TRACE_SYNC))
+ {
+ trace_printf( "%s@%d (%s) - IO timeout!\n"
+ , method_name, __LINE__, desc );
+ }
+ }
+ else
+ {
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s@%d] (%s) - send(%d) to %s, error=%d (%s)\n"
+ , method_name, __LINE__, desc
+ , ns
+ , remoteName
+ , senderror, strerror(senderror) );
+ mon_log_write( COMM_SENDRECVWAIT_6, SQ_LOG_CRIT, buf );
+ }
+ error = -1;
+ nsent++;
+ sending = false;
+ stateChange = true;
+ goto early_exit;
+ }
+
+ if ( ns < 0 )
+ { // error, down socket
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s@%d] (%s) - send(%d) to %s, error=%d (%s)\n"
+ , method_name, __LINE__, desc
+ , ns
+ , remoteName
+ , senderror, strerror(senderror) );
+ mon_log_write( COMM_SENDRECVWAIT_7, SQ_LOG_CRIT, buf );
+ nsent++;
+ sending = false;
+ if ( receiving )
+ {
+ nrecv++;
+ receiving = false;
+ }
+ stateChange = true;
+ }
+ else
+ {
+ sent += ns;
+ if ( sent == sendsize )
+ {
+ nsent++;
+ sending = false;
+ // finished sending to this destination
+ if (trace_settings & (TRACE_SYNC_DETAIL))
+ {
+ trace_printf( "%s@%d (%s) - EPOLLOUT to %s,"
+ " sending=%d (%d),"
+ " sent=%d\n"
+ , method_name, __LINE__, desc
+ , remoteName
+ , sendsize, n2send
+ , sent );
+ }
+ stateChange = true;
+ }
+ }
+ }
+early_exit:
+ if (stateChange)
+ {
+ struct epoll_event event;
+ event.data.fd = sockFd;
+ int op = 0;
+ if ( !sending && !receiving )
+ {
+ op = EPOLL_CTL_DEL;
+ event.events = 0;
+ }
+ else if (sending)
+ {
+ op = EPOLL_CTL_MOD;
+ event.events = EPOLLOUT | EPOLLET | EPOLLRDHUP;
+ }
+ else if (receiving)
+ {
+ op = EPOLL_CTL_MOD;
+ event.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
+ }
+ if ( op == EPOLL_CTL_DEL || op == EPOLL_CTL_MOD )
+ {
+ if (trace_settings & (TRACE_SYNC_DETAIL))
+ {
+ trace_printf( "%s@%d (%s) - EPOLL state change "
+ "to/from %s, efd=%d, op=%s, sockFd=%d, event=%s\n"
+ , method_name, __LINE__, desc
+ , remoteName
+ , epollFd_
+ , EpollOpString(op)
+ , sockFd
+ , EpollEventString(event.events) );
+
+ }
+ EpollCtl( epollFd_, op, sockFd, &event, remoteName );
+ if (op == EPOLL_CTL_DEL)
+ {
+ if (trace_settings & (TRACE_SYNC_DETAIL))
+ {
+ trace_printf( "%s@%d (%s) - write/read to/from %s, "
+ "removed socket from epoll set, "
+ "sockFd=%d, event=%s\n"
+ , method_name, __LINE__, desc
+ , remoteName
+ , sockFd
+ , EpollEventString(event.events) );
+
+ }
+ }
+ }
+ }
+ }
+ }
+
+ TRACE_EXIT;
+ return error;
+}
+
+int CComm::SetKeepAliveSockOpt( int sock )
+{
+ const char method_name[] = "CComm::SetKeepAliveSockOpt";
+ TRACE_ENTRY;
+
+ static int sv_keepalive = -1;
+ static int sv_keepidle = 120;
+ static int sv_keepintvl = 12;
+ static int sv_keepcnt = 5;
+
+ if ( sv_keepalive == -1 )
+ {
+ char *lv_keepalive_env = getenv( "SQ_MON_KEEPALIVE" );
+ if ( lv_keepalive_env )
+ {
+ sv_keepalive = atoi( lv_keepalive_env );
+ }
+ if ( sv_keepalive == 1 )
+ {
+ char *lv_keepidle_env = getenv( "SQ_MON_KEEPIDLE" );
+ if ( lv_keepidle_env )
+ {
+ sv_keepidle = atoi( lv_keepidle_env );
+ }
+ char *lv_keepintvl_env = getenv( "SQ_MON_KEEPINTVL" );
+ if ( lv_keepintvl_env )
+ {
+ sv_keepintvl = atoi( lv_keepintvl_env );
+ }
+ char *lv_keepcnt_env = getenv( "SQ_MON_KEEPCNT" );
+ if ( lv_keepcnt_env )
+ {
+ sv_keepcnt = atoi( lv_keepcnt_env );
+ }
+ }
+ }
+
+ if ( sv_keepalive == 1 )
+ {
+ if ( setsockopt( sock, SOL_SOCKET, SO_KEEPALIVE, &sv_keepalive, sizeof(int) ) )
+ {
+ char la_buf[MON_STRING_BUF_SIZE];
+ int err = errno;
+ sprintf( la_buf, "[%s], setsockopt so_keepalive() failed! errno=%d (%s)\n"
+ , method_name, err, strerror( err ) );
+ mon_log_write( COMM_SETKEEPALIVESOCKOPT_1, SQ_LOG_ERR, la_buf );
+ close( sock );
+ return ( -2 );
+ }
+
+ if ( setsockopt( sock, SOL_TCP, TCP_KEEPIDLE, &sv_keepidle, sizeof(int) ) )
+ {
+ char la_buf[MON_STRING_BUF_SIZE];
+ int err = errno;
+ sprintf( la_buf, "[%s], setsockopt tcp_keepidle() failed! errno=%d (%s)\n"
+ , method_name, err, strerror( err ) );
+ mon_log_write( COMM_SETKEEPALIVESOCKOPT_2, SQ_LOG_ERR, la_buf );
+ close( sock );
+ return ( -2 );
+ }
+
+ if ( setsockopt( sock, SOL_TCP, TCP_KEEPINTVL, &sv_keepintvl, sizeof(int) ) )
+ {
+ char la_buf[MON_STRING_BUF_SIZE];
+ int err = errno;
+ sprintf( la_buf, "[%s], setsockopt tcp_keepintvl() failed! errno=%d (%s)\n"
+ , method_name, err, strerror( err ) );
+ mon_log_write( COMM_SETKEEPALIVESOCKOPT_3, SQ_LOG_ERR, la_buf );
+ close( sock );
+ return ( -2 );
+ }
+
+ if ( setsockopt( sock, SOL_TCP, TCP_KEEPCNT, &sv_keepcnt, sizeof(int) ) )
+ {
+ char la_buf[MON_STRING_BUF_SIZE];
+ int err = errno;
+ sprintf( la_buf, "[%s], setsockopt tcp_keepcnt() failed! errno=%d (%s)\n"
+ , method_name, err, strerror( err ) );
+ mon_log_write( COMM_SETKEEPALIVESOCKOPT_4, SQ_LOG_ERR, la_buf );
+ close( sock );
+ return ( -2 );
+ }
+ }
+
+ TRACE_EXIT;
+ return ( 0 );
+}
+
diff --git a/core/sqf/monitor/linux/comm.h b/core/sqf/monitor/linux/comm.h
new file mode 100644
index 0000000..54fb8b0
--- /dev/null
+++ b/core/sqf/monitor/linux/comm.h
@@ -0,0 +1,106 @@
+///////////////////////////////////////////////////////////////////////////////
+//
+// @@@ START COPYRIGHT @@@
+//
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+//
+// @@@ END COPYRIGHT @@@
+//
+///////////////////////////////////////////////////////////////////////////////
+
+#ifndef COMM_H_
+#define COMM_H_
+
+class CComm
+{
+protected:
+ int eyecatcher_; // Debuggging aid -- leave as first
+ // member variable of the class
+public:
+
+ CComm( void );
+ ~CComm( void );
+
+ int Accept( int listenSock );
+ void ConnectLocal( int port );
+ int Connect( const char *portName, bool doRetries = true );
+ int Connect( unsigned char srcip[4], unsigned char dstip[4], int port );
+ int Close( int sock );
+ void EpollCtl( int efd
+ , int op
+ , int fd
+ , struct epoll_event *event
+ , char *remoteName );
+ void EpollCtlDelete( int efd
+ , int fd
+ , struct epoll_event *event
+ , char *remoteName );
+ int Listen( int *pport );
+ int Listen( const char *portName, int *port );
+ int Receive( int sockFd
+ , char *buf
+ , int size
+ , char *remoteName
+ , const char *desc );
+ int ReceiveWait(int sockFd
+ , char *buf
+ , int size
+ , int timeout
+ , int retries
+ , char *remoteName
+ , const char *desc );
+ int Send( int sockFd
+ , char *buf
+ , int size
+ , char *remoteName
+ , const char *desc );
+ int SendWait(int sockFd
+ , char *buf
+ , int size
+ , int timeout
+ , int retries
+ , char *remoteName
+ , const char *desc );
+ int SendRecv( int sockFd
+ , char *sendbuf
+ , int sendsize
+ , char *recvbuf
+ , int recvsize
+ , char *remoteName
+ , const char *desc );
+ int SendRecvWait( int sockFd
+ , char *sendbuf
+ , int sendsize
+ , char *recvbuf
+ , int recvsize
+ , int timeout
+ , int retries
+ , char *remoteName
+ , const char *desc );
+
+protected:
+
+private:
+
+ int SetKeepAliveSockOpt( int sock );
+
+ int epollFd_;
+ int listenSock_;
+};
+
+#endif /*COMM_H_*/
diff --git a/core/sqf/monitor/linux/commaccept.cxx b/core/sqf/monitor/linux/commaccept.cxx
index b925e9b..64de370 100644
--- a/core/sqf/monitor/linux/commaccept.cxx
+++ b/core/sqf/monitor/linux/commaccept.cxx
@@ -25,33 +25,145 @@
using namespace std;
+#include <signal.h>
+#include <unistd.h>
+#include <netdb.h>
+#include <sys/socket.h>
+
#include "commaccept.h"
#include "monlogging.h"
#include "montrace.h"
#include "monitor.h"
-#include <signal.h>
-#include <unistd.h>
-
-extern CCommAccept CommAccept;
+extern CCommAccept *CommAccept;
extern CMonitor *Monitor;
extern CNode *MyNode;
extern CNodeContainer *Nodes;
extern int MyPNID;
extern char MyCommPort[MPI_MAX_PORT_NAME];
-extern char *ErrorMsg (int error_code);
-extern const char *StateString( STATE state);
+extern char Node_name[MPI_MAX_PROCESSOR_NAME];
extern CommType_t CommType;
extern bool IsRealCluster;
+extern char *ErrorMsg (int error_code);
+extern const char *StateString( STATE state);
+
CCommAccept::CCommAccept()
: accepting_(true)
, shutdown_(false)
+ , ioWaitTimeout_(EPOLL_IO_WAIT_TIMEOUT_MSEC)
+ , ioRetryCount_(EPOLL_IO_RETRY_COUNT)
+ , commSock_(-1)
+ , commSocketPort_(-1)
+ , commPort_("")
, thread_id_(0)
{
const char method_name[] = "CCommAccept::CCommAccept";
TRACE_ENTRY;
+ // Use the EPOLL timeout and retry values
+ char *ioWaitTimeoutEnv = getenv( "SQ_MON_EPOLL_WAIT_TIMEOUT" );
+ if ( ioWaitTimeoutEnv )
+ {
+ // Timeout in seconds
+ ioWaitTimeout_ = atoi( ioWaitTimeoutEnv );
+ char *ioRetryCountEnv = getenv( "SQ_MON_EPOLL_RETRY_COUNT" );
+ if ( ioRetryCountEnv )
+ {
+ ioRetryCount_ = atoi( ioRetryCountEnv );
+ }
+ if ( ioRetryCount_ > EPOLL_IO_RETRY_COUNT_MAX )
+ {
+ ioRetryCount_ = EPOLL_IO_RETRY_COUNT_MAX;
+ }
+ }
+
+ int serverCommPort = 0;
+ int val = 0;
+ unsigned char addr[4] = {0,0,0,0};
+ struct hostent *he;
+
+ he = gethostbyname( Node_name );
+ if ( !he )
+ {
+ char ebuff[256];
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s@%d] gethostbyname(%s) error: %s\n"
+ , method_name, __LINE__
+ , Node_name, strerror_r( h_errno, ebuff, 256 ) );
+ mon_log_write( MON_COMMACCEPT_COMMACCEPT_1, SQ_LOG_CRIT, buf );
+
+ mon_failure_exit();
+ }
+ memcpy( addr, he->h_addr, 4 );
+
+#ifdef NAMESERVER_PROCESS
+ char *env = getenv ("NS_COMM_PORT");
+#else
+ char *env = getenv("MONITOR_COMM_PORT");
+#endif
+ if ( env )
+ {
+ val = atoi(env);
+ if ( val > 0)
+ {
+ if ( !IsRealCluster )
+ {
+ val += MyPNID;
+ }
+ serverCommPort = val;
+ }
+ }
+
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+#ifdef NAMESERVER_PROCESS
+ trace_printf( "%s@%d NS_COMM_PORT Node_name=%s, env=%s, serverCommPort=%d, val=%d\n"
+#else
+ trace_printf( "%s@%d MONITOR_COMM_PORT Node_name=%s, env=%s, serverCommPort=%d, val=%d\n"
+#endif
+ , method_name, __LINE__
+ , Node_name, env, serverCommPort, val );
+ }
+
+ commSock_ = CComm::Listen( &serverCommPort );
+ if ( commSock_ < 0 )
+ {
+ char ebuff[256];
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+#ifdef NAMESERVER_PROCESS
+ , "[%s@%d] Listen(NS_COMM_PORT=%d) error: %s\n"
+#else
+ , "[%s@%d] Listen(MONITOR_COMM_PORT=%d) error: %s\n"
+#endif
+ , method_name, __LINE__, serverCommPort
+ , strerror_r( errno, ebuff, 256 ) );
+ mon_log_write( MON_COMMACCEPT_COMMACCEPT_2, SQ_LOG_CRIT, buf );
+
+ mon_failure_exit();
+ }
+ else
+ {
+ snprintf( MyCommPort, sizeof(MyCommPort)
+ , "%d.%d.%d.%d:%d"
+ , (int)((unsigned char *)addr)[0]
+ , (int)((unsigned char *)addr)[1]
+ , (int)((unsigned char *)addr)[2]
+ , (int)((unsigned char *)addr)[3]
+ , serverCommPort );
+ setCommSocketPort( serverCommPort );
+ setCommPort( MyCommPort );
+
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ trace_printf( "%s@%d Initialized my comm socket port, "
+ "pnid=%d (%s:%s) (commPort=%s)\n"
+ , method_name, __LINE__
+ , MyPNID, Node_name, MyCommPort
+ , getCommPort() );
+
+ }
TRACE_EXIT;
}
@@ -65,6 +177,15 @@
TRACE_EXIT;
}
+void CCommAccept::connectToCommSelf( void )
+{
+ const char method_name[] = "CCommAccept::connectToCommSelf";
+ TRACE_ENTRY;
+
+ CComm::ConnectLocal( getCommSocketPort() );
+
+ TRACE_EXIT;
+}
struct message_def *CCommAccept::Notice( const char *msgText )
{
@@ -253,10 +374,13 @@
}
}
- rc = Monitor->SendSock( (char *) nodeInfo
- , nodeInfoSize
- , sockFd
- , method_name );
+ rc = CComm::SendWait( sockFd
+ , (char *) nodeInfo
+ , nodeInfoSize
+ , ioWaitTimeout_
+ , ioRetryCount_
+ , (char *)"Remote joining node"
+ , method_name );
if ( rc )
{
char buf[MON_STRING_BUF_SIZE];
@@ -536,10 +660,13 @@
mem_log_write(CMonLog::MON_CONNTONEWMON_2);
// Get info about connecting monitor
- rc = Monitor->ReceiveSock( (char *) &nodeId
- , sizeof(nodeId_t)
- , joinFd
- , method_name );
+ rc = CComm::ReceiveWait( joinFd
+ , (char *) &nodeId
+ , sizeof(nodeId_t)
+ , ioWaitTimeout_
+ , ioRetryCount_
+ , (char *) "Remote monitor"
+ , method_name );
if ( rc )
{ // Handle error
close( joinFd );
@@ -547,7 +674,7 @@
snprintf(buf, sizeof(buf), "[%s], unable to obtain node id from new "
"monitor: %s.\n", method_name, ErrorMsg(rc));
mon_log_write(MON_COMMACCEPT_8, SQ_LOG_ERR, buf);
- CommAccept.startAccepting();
+ startAccepting();
return;
}
@@ -597,7 +724,7 @@
mon_log_write(MON_COMMACCEPT_9, SQ_LOG_ERR, buf);
// Requests is complete, begin accepting connections again
- CommAccept.startAccepting();
+ startAccepting();
return;
}
@@ -629,10 +756,13 @@
, nodeId.ping );
}
- rc = Monitor->SendSock( (char *) &nodeId
- , sizeof(nodeId_t)
- , joinFd
- , method_name );
+ rc = CComm::SendWait( joinFd
+ , (char *) &nodeId
+ , sizeof(nodeId_t)
+ , ioWaitTimeout_
+ , ioRetryCount_
+ , node?(char *)node->GetName():(char *)"Remote joining node"
+ , method_name );
if ( rc )
{
close( joinFd );
@@ -644,7 +774,7 @@
}
// Requests is complete, begin accepting connections again
- CommAccept.startAccepting();
+ startAccepting();
return;
}
@@ -662,11 +792,13 @@
if ( node->GetState() != State_Down )
{
int intdata = -1;
- rc = Monitor->SendSock( (char *) &intdata
- , 0
- , joinFd
- , method_name );
-
+ rc = CComm::SendWait( joinFd
+ , (char *) &intdata
+ , 0
+ , ioWaitTimeout_
+ , ioRetryCount_
+ , node?(char *)node->GetName():(char *)"Remote joining node"
+ , method_name );
close( joinFd );
// This reply will terminate the other monitor
@@ -681,7 +813,7 @@
mon_log_write(MON_COMMACCEPT_10, SQ_LOG_ERR, buf);
// Requests is complete, begin accepting connections again
- CommAccept.startAccepting();
+ startAccepting();
return;
}
@@ -750,10 +882,13 @@
// Tell connecting monitor that we are ready to integrate it.
int mypnid = MyPNID;
- rc = Monitor->SendSock( (char *) &mypnid
- , sizeof(mypnid)
- , joinFd
- , method_name );
+ rc = CComm::SendWait( joinFd
+ , (char *) &mypnid
+ , sizeof(mypnid)
+ , ioWaitTimeout_
+ , ioRetryCount_
+ , node?(char *)node->GetName():(char *)"Remote joining node"
+ , method_name );
if ( rc )
{
close( joinFd );
@@ -767,12 +902,26 @@
}
// Connect to new monitor
- integratingFd = Monitor->MkCltSock( node->GetSyncPort() );
+ integratingFd = CComm::Connect( node->GetSyncPort() );
Monitor->addNewSock( pnid, 1, integratingFd );
node->SetState( State_Merging );
close( joinFd );
+#if 0
+ // TODO: This change may work when TmSync is no longer used
+ if (!isAccepting())
+ {
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Begin accepting connections\n",
+ method_name, __LINE__ );
+ }
+
+ // Begin accepting connections
+ startAccepting();
+ }
+#endif
}
if ( MyNode->IsCreator() )
@@ -785,10 +934,13 @@
// Sanity check, tell integrating monitor my creator pnid
int mypnid = MyPNID;
- rc = Monitor->SendSock( (char *) &mypnid
- , sizeof(mypnid)
- , joinFd
- , method_name );
+ rc = CComm::SendWait( joinFd
+ , (char *) &mypnid
+ , sizeof(mypnid)
+ , ioWaitTimeout_
+ , ioRetryCount_
+ , node?(char *)node->GetName():(char *)"Remote joining node"
+ , method_name );
if ( rc )
{
close( joinFd );
@@ -818,10 +970,13 @@
// Get new monitor acknowledgement that creator can connect
int newpnid = -1;
- rc = Monitor->ReceiveSock( (char *) &newpnid
- , sizeof(newpnid)
- , joinFd
- , method_name );
+ rc = CComm::ReceiveWait( joinFd
+ , (char *) &newpnid
+ , sizeof(newpnid)
+ , ioWaitTimeout_
+ , ioRetryCount_
+ , (char *) node->GetName()
+ , method_name );
if ( rc || newpnid != pnid )
{
close( joinFd );
@@ -848,7 +1003,7 @@
Monitor->SetIntegratingPNid( pnid );
// Connect to new monitor
- integratingFd = Monitor->MkCltSock( node->GetSyncPort() );
+ integratingFd = CComm::Connect( node->GetSyncPort() );
Monitor->addNewSock( pnid, 1, integratingFd );
node->SetState( State_Merging );
@@ -864,10 +1019,13 @@
// Get status from new monitor indicating whether
// it is fully connected to other monitors.
nodeStatus_t nodeStatus;
- rc = Monitor->ReceiveSock( (char *) &nodeStatus
- , sizeof(nodeStatus_t)
- , joinFd
- , method_name );
+ rc = CComm::ReceiveWait( joinFd
+ , (char *) &nodeStatus
+ , sizeof(nodeStatus_t)
+ , ioWaitTimeout_
+ , ioRetryCount_
+ , (char *) node->GetName()
+ , method_name );
if ( rc != MPI_SUCCESS )
{ // Handle error
char buf[MON_STRING_BUF_SIZE];
@@ -980,7 +1138,7 @@
rc = MPI_Comm_accept( MyCommPort, MPI_INFO_NULL, 0, MPI_COMM_SELF,
&interComm );
// Stop accepting connections until this request completes
- CommAccept.stopAccepting();
+ stopAccepting();
}
else
{
@@ -1052,9 +1210,9 @@
}
mem_log_write(CMonLog::MON_CONNTONEWMON_1);
- joinFd = Monitor->AcceptCommSock();
+ joinFd = CComm::Accept( commSock_ );
// Stop accepting connections until this request completes
- CommAccept.stopAccepting();
+ stopAccepting();
}
else
{
@@ -1107,7 +1265,7 @@
// Set flag that tells the commAcceptor thread to exit
shutdown_ = true;
- Monitor->ConnectToSelf();
+ connectToCommSelf();
CLock::wakeOne();
if (trace_settings & TRACE_INIT)
diff --git a/core/sqf/monitor/linux/commaccept.h b/core/sqf/monitor/linux/commaccept.h
index c32d975..45e0e28 100644
--- a/core/sqf/monitor/linux/commaccept.h
+++ b/core/sqf/monitor/linux/commaccept.h
@@ -26,11 +26,14 @@
#ifndef COMMACCEPT_H
#define COMMACCEPT_H
+#include <string>
#include <pthread.h>
#include <mpi.h>
#include "lock.h"
+#include "comm.h"
class CCommAccept : public CLock
+ , public CComm
{
public:
@@ -38,9 +41,13 @@
virtual ~CCommAccept();
void commAcceptor( void );
+ inline const char *getCommPort( void ) { return commPort_.c_str(); }
+ inline int getCommSocketPort( void ) { return( commSocketPort_ ); }
bool isAccepting( void ) { CAutoLock lock(getLocker()); return( accepting_ ); }
void processNewComm( MPI_Comm interComm );
void processNewSock( int sockFd );
+ inline void setCommPort( char *commPort) { commPort_ = commPort; }
+ inline void setCommSocketPort( int commSocketPort) { commSocketPort_ = commSocketPort; }
void startAccepting( void );
void stopAccepting( void );
void start( void );
@@ -51,15 +58,18 @@
void commAcceptorIB( void );
void commAcceptorSock( void );
+ void connectToCommSelf( void );
bool sendNodeInfoMPI( MPI_Comm interComm );
bool sendNodeInfoSock( int sockFd );
- bool accepting_;
- bool shutdown_;
-
- // commAccept thread's id
- pthread_t thread_id_;
-
+ bool accepting_;
+ bool shutdown_;
+ int ioWaitTimeout_;
+ int ioRetryCount_;
+ int commSock_;
+ int commSocketPort_; // Node Re-Integration port
+ string commPort_; // Node Re-Integration port ip address
+ pthread_t thread_id_; // commAccept thread's id
};
#endif
diff --git a/core/sqf/monitor/linux/config.cxx b/core/sqf/monitor/linux/config.cxx
index 4749bd3..9e8a33c 100644
--- a/core/sqf/monitor/linux/config.cxx
+++ b/core/sqf/monitor/linux/config.cxx
@@ -787,7 +787,7 @@
int rc;
- if (trace_settings & (TRACE_INIT | TRACE_REQUEST))
+ if (trace_settings & (TRACE_PROCESS | TRACE_REQUEST))
{
trace_printf( "%s@%d saving registry process, name=%s\n"
, method_name, __LINE__
@@ -844,7 +844,7 @@
int rc;
- if (trace_settings & (TRACE_INIT | TRACE_REQUEST))
+ if (trace_settings & (TRACE_PROCESS | TRACE_REQUEST))
{
trace_printf( "%s@%d saving registry process data procName=%s, "
"key=%s\n"
@@ -872,7 +872,7 @@
int rc;
- if (trace_settings & (TRACE_INIT | TRACE_REQUEST))
+ if (trace_settings & (TRACE_PROCESS | TRACE_PROCESS_DETAIL))
{
trace_printf( "%s@%d saving unique string nid=%d id=%d\n"
, method_name, __LINE__
@@ -900,7 +900,7 @@
int id = 0;
int rc;
- if (trace_settings & (TRACE_INIT | TRACE_REQUEST))
+ if (trace_settings & (TRACE_PROCESS | TRACE_PROCESS_DETAIL))
{
trace_printf( "%s@%d finding max unique string id for nid=%d\n"
, method_name, __LINE__
@@ -921,7 +921,7 @@
}
else
{
- if (trace_settings & (TRACE_INIT | TRACE_REQUEST))
+ if (trace_settings & (TRACE_PROCESS | TRACE_PROCESS_DETAIL))
{
trace_printf( "%s@%d found max(id)=%d for nid=%d\n"
, method_name, __LINE__, id, nid);
@@ -941,7 +941,7 @@
int rc;
char uniqueString[TC_UNIQUE_STRING_VALUE_MAX] = { 0 };
- if (trace_settings & (TRACE_INIT | TRACE_REQUEST))
+ if (trace_settings & (TRACE_PROCESS | TRACE_PROCESS_DETAIL))
{
trace_printf( "%s@%d Get unique string, stringId(nid=%d, id=%d)\n"
, method_name, __LINE__
@@ -963,7 +963,7 @@
}
else
{
- if (trace_settings & (TRACE_INIT | TRACE_REQUEST))
+ if (trace_settings & (TRACE_PROCESS | TRACE_PROCESS_DETAIL))
{
trace_printf( "%s@%d Found unique string, stringId(nid=%d, id=%d), string=%s\n"
, method_name, __LINE__
@@ -988,7 +988,7 @@
int rc;
int id;
- if (trace_settings & (TRACE_INIT | TRACE_REQUEST))
+ if (trace_settings & (TRACE_PROCESS | TRACE_PROCESS_DETAIL))
{
trace_printf( "%s@%d finding unique string nid=%d string=%s\n"
, method_name, __LINE__
diff --git a/core/sqf/monitor/linux/healthcheck.cxx b/core/sqf/monitor/linux/healthcheck.cxx
index 50aeca7..b1521c3 100644
--- a/core/sqf/monitor/linux/healthcheck.cxx
+++ b/core/sqf/monitor/linux/healthcheck.cxx
@@ -399,6 +399,11 @@
}
else
{
+ if ( ZClientEnabled )
+ {
+ ZClient->RunningZNodeDelete( MyNode->GetName() );
+ ZClient->MasterZNodeDelete( MyNode->GetName() );
+ }
// Bring down the node by expiring the watchdog process
sendEventToWatchDog(Watchdog_Expire);
// wait forever
diff --git a/core/sqf/monitor/linux/makefile b/core/sqf/monitor/linux/makefile
index b3f97c0..4c48e7a 100644
--- a/core/sqf/monitor/linux/makefile
+++ b/core/sqf/monitor/linux/makefile
@@ -102,6 +102,7 @@
endif
MONITORSRC = monitor.cxx
+MONITORSRC += comm.cxx
MONITORSRC += cluster.cxx
MONITORSRC += open.cxx
MONITORSRC += process.cxx
@@ -160,6 +161,7 @@
MONITOROBJS = $(OUTDIR)/versmon.o
MONITOROBJS += $(OUTDIR)/monitor.o
+MONITOROBJS += $(OUTDIR)/comm.o
MONITOROBJS += $(OUTDIR)/cluster.o
MONITOROBJS += $(OUTDIR)/nameserver.o
MONITOROBJS += $(OUTDIR)/open.o
@@ -238,6 +240,7 @@
NSOBJS = $(OUTDIR)/versns.o
NSOBJS += $(OUTDIR)/nsmonitor.o
+NSOBJS += $(OUTDIR)/comm.o
NSOBJS += $(OUTDIR)/nscluster.o
NSOBJS += $(OUTDIR)/open.o
NSOBJS += $(OUTDIR)/nsprocess.o
@@ -510,8 +513,8 @@
$(BINEXPDIR)/pstartd: $(PSTARTDOBJS) $(OUTDIR)/monclio.o $(TRACE_LOG_OBJS)
@echo 'Building target: $@'
@echo 'Invoking: C++ Compile & Linker'
- @echo $(CXX) $(CDEPFLAGS) $(FLAGS) $(OPTIONS) -pthread $(INCLUDES) $^ -o $@ $(LIBS)
- @$(CXX) $(CDEPFLAGS) $(FLAGS) $(OPTIONS) -pthread $(INCLUDES) $^ -o $@ $(LIBS)
+ @echo $(CXX) $(CDEPFLAGS) $(FLAGS) -pthread $(OPTIONS) $(INCLUDES) -o $@ $(LIBS) $^
+ @$(CXX) $(CDEPFLAGS) $(FLAGS) $(OPTIONS) -pthread $(INCLUDES) $^ -o $@ $(LIBS)
@echo 'Finished building target: $@'
@echo ' '
diff --git a/core/sqf/monitor/linux/monitor.cxx b/core/sqf/monitor/linux/monitor.cxx
index 4ed4db6..68db11b 100644
--- a/core/sqf/monitor/linux/monitor.cxx
+++ b/core/sqf/monitor/linux/monitor.cxx
@@ -101,6 +101,7 @@
bool usingCpuAffinity = false;
bool usingTseCpuAffinity = false;
bool genSnmpTrapEnabled = false;
+bool GenCoreOnFailureExit = false;
int Measure = 0;
long trace_level = 0;
char MyPath[MAX_PROCESS_PATH];
@@ -168,11 +169,11 @@
CIntProcess IntProcess;
CReqQueue ReqQueue;
CHealthCheck HealthCheck;
-CCommAccept CommAccept;
+CCommAccept *CommAccept;
#ifdef NAMESERVER_PROCESS
-CCommAcceptMon CommAcceptMon;
+CCommAcceptMon *CommAcceptMon;
#else
-CPtpCommAccept PtpCommAccept;
+CPtpCommAccept *PtpCommAccept;
#endif
extern CReplicate Replicator;
CZClient *ZClient = NULL;
@@ -276,7 +277,7 @@
, MyPNID );
mon_log_write(MON_MONITOR_SIGTERMSIGNALHANDLER_1, SQ_LOG_CRIT, la_buf);
- Monitor->HardNodeDown( MyPNID, true );
+ ReqQueue.enqueueDownReq(MyPNID);
if (trace_settings & TRACE_ENTRY_EXIT)
trace_nolock_printf("%s@%d - Exit\n", method_name, __LINE__);
@@ -367,6 +368,37 @@
}
#endif
+void mon_failure_exit( bool genCoreOnFailureExit )
+{
+ const char method_name[] = "mon_failure_exit";
+
+ if ( ZClientEnabled )
+ {
+ ZClient->RunningZNodeDelete( MyNode->GetName() );
+ ZClient->MasterZNodeDelete( MyNode->GetName() );
+ }
+
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf(buf, sizeof(buf), "[%s], Aborting! genCore=%d, GenCore=%d\n",
+ method_name, genCoreOnFailureExit, GenCoreOnFailureExit);
+ mon_log_write(MON_MONITOR_FAILURE_EXIT_1, SQ_LOG_CRIT, buf);
+
+ if (genCoreOnFailureExit || GenCoreOnFailureExit)
+ {
+ // Generate a core file, abort is intentional
+ abort();
+ }
+ else
+ {
+ // Don't generate a core file, abort is intentional
+ struct rlimit limit;
+ limit.rlim_cur = 0;
+ limit.rlim_max = 0;
+ setrlimit(RLIMIT_CORE, &limit);
+ abort();
+ }
+}
+
void monMallocStats()
{
// Log current malloc statistics in stderr
@@ -1562,13 +1594,13 @@
// Set flag to indicate whether we are operating in a real cluster
// or a virtual cluster. This is used throughout the monitor when
// behavior differs for a real vs. virtual cluster environment.
+ if ( getenv( "SQ_VIRTUAL_NODES" ) )
+ {
+ IsRealCluster = false;
+ Emulate_Down = true;
+ }
if ( !IsAgentMode )
{
- if ( getenv( "SQ_VIRTUAL_NODES" ) )
- {
- IsRealCluster = false;
- Emulate_Down = true;
- }
if ( IsRealCluster )
{
// The monitor processes may be started by MPIrun utility
@@ -1605,7 +1637,7 @@
}
#endif
- if ( IsAgentMode || IsNameServer )
+ // Always load the trace files in the local node
{
MON_Props xprops( true );
char *envfile;
@@ -2106,13 +2138,18 @@
MonStats->MonitorBusyIncr();
const char message_tag[] = "Trafodion";
- snprintf( buf, sizeof(buf), "[%s] - monitor Started!\n", message_tag );
- mon_log_write(MON_MONITOR_MAIN_3, SQ_LOG_INFO, buf);
#ifdef NAMESERVER_PROCESS
+ snprintf( buf, sizeof(buf), "[%s] - trafns Started!\n", message_tag );
+ mon_log_write(MON_MONITOR_MAIN_3, SQ_LOG_INFO, buf);
snprintf(buf, sizeof(buf), "[%s] - %s, Started! CommType: %s\n"
, message_tag
, CALL_COMP_GETVERS2(trafns), CommTypeString( CommType ));
+ mon_log_write(MON_MONITOR_MAIN_3, SQ_LOG_INFO, buf);
+ snprintf( buf, sizeof(buf), "[%s] - trafns Started!\n", message_tag );
+ mon_log_write(MON_MONITOR_MAIN_3, SQ_LOG_INFO, buf);
#else
+ snprintf( buf, sizeof(buf), "[%s] - monitor Started!\n", message_tag );
+ mon_log_write(MON_MONITOR_MAIN_3, SQ_LOG_INFO, buf);
snprintf(buf, sizeof(buf), "[%s] - %s, Started! CommType: %s (%s%s%s%s%s)\n"
, message_tag
, CALL_COMP_GETVERS2(monitor)
@@ -2122,10 +2159,10 @@
, IsAgentMode?AgentTypeString( AgentType ):""
, IsMPIChild?"/MPIChild":""
, NameServerEnabled?"/NameServerEnabled":"" );
-#endif
mon_log_write(MON_MONITOR_MAIN_3, SQ_LOG_INFO, buf);
snprintf( buf, sizeof(buf), "[%s] - monitor Started!\n", message_tag );
mon_log_write(MON_MONITOR_MAIN_3, SQ_LOG_INFO, buf);
+#endif
#ifdef DMALLOC
if (trace_settings & TRACE_INIT)
@@ -2365,16 +2402,21 @@
Nodes = new CNodeContainer ();
Config = new CConfigContainer ();
#ifdef NAMESERVER_PROCESS
+ CommAccept = new CCommAccept();
Monitor = new CMonitor ();
+ CommAcceptMon = new CCommAcceptMon();
#else
if (NameServerEnabled)
{
- PtpClient = new CPtpClient ();
+ CommAccept = new CCommAccept();
Monitor = new CMonitor (procTermSig);
+ PtpClient = new CPtpClient ();
+ PtpCommAccept = new CPtpCommAccept();
NameServer = new CNameServer ();
}
else
{
+ CommAccept = new CCommAccept();
Monitor = new CMonitor (procTermSig);
}
#endif
@@ -2560,19 +2602,19 @@
CReqWorker::startReqWorkers();
// Create thread to accept connections from other monitors
- CommAccept.start();
+ CommAccept->start();
#ifdef NAMESERVER_PROCESS
// Create thread to accept connections from other name servers
- CommAcceptMon.start();
+ CommAcceptMon->start();
if (IsMaster)
{
- CommAcceptMon.startAccepting();
+ CommAcceptMon->startAccepting();
}
#else
if (NameServerEnabled)
{
// Create thread to accept point-2-point connections from other monitors
- PtpCommAccept.start();
+ PtpCommAccept->start();
}
#endif
#ifndef NAMESERVER_PROCESS
@@ -2912,13 +2954,13 @@
SQ_theLocalIOToClient->shutdownWork();
if (NameServerEnabled)
{
- PtpCommAccept.shutdownWork();
+ PtpCommAccept->shutdownWork();
}
#endif
- CommAccept.shutdownWork();
+ CommAccept->shutdownWork();
#ifdef NAMESERVER_PROCESS
- CommAcceptMon.shutdownWork();
+ CommAcceptMon->shutdownWork();
#endif
#ifndef NAMESERVER_PROCESS
diff --git a/core/sqf/monitor/linux/monlogging.cxx b/core/sqf/monitor/linux/monlogging.cxx
index 412a4e8..f54c2ed 100644
--- a/core/sqf/monitor/linux/monlogging.cxx
+++ b/core/sqf/monitor/linux/monlogging.cxx
@@ -36,7 +36,6 @@
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/msg.h>
-#include <sys/resource.h>
#include <errno.h>
#include "seabed/logalt.h"
@@ -46,32 +45,12 @@
#define gettid() syscall(__NR_gettid)
-bool GenCoreOnFailureExit = false;
-
extern bool IsRealCluster;
extern int MyPNID;
-extern CMonLog *MonLog;
+extern CMonLog * MonLog;
pthread_mutex_t MonLogMutex = PTHREAD_MUTEX_INITIALIZER;
-void mon_failure_exit( bool genCoreOnFailureExit )
-{
- if (genCoreOnFailureExit || GenCoreOnFailureExit)
- {
- // Generate a core file, abort is intentional
- abort();
- }
- else
- {
- // Don't generate a core file, abort is intentional
- struct rlimit limit;
- limit.rlim_cur = 0;
- limit.rlim_max = 0;
- setrlimit(RLIMIT_CORE, &limit);
- abort();
- }
-}
-
int mon_log_write(int eventType, posix_sqlog_severity_t severity, char *msg)
{
if (MonLog->isUseAltLog())
@@ -178,7 +157,6 @@
, hostname);
}
}
-
CommonLogger::instance().initLog4cxx(log4cxxConfig_.c_str(), logFileSuffix);
}
diff --git a/core/sqf/monitor/linux/msgdef.h b/core/sqf/monitor/linux/msgdef.h
index e40f124..6216b44 100644
--- a/core/sqf/monitor/linux/msgdef.h
+++ b/core/sqf/monitor/linux/msgdef.h
@@ -106,6 +106,17 @@
#define MAX_VALUE_SIZE 512
#define MAX_VALUE_SIZE_INT 4096
+// Connect default 64 seconds
+#define CONNECT_WAIT_TIMEOUT_MSEC 1000
+#define CONNECT_WAIT_TIMEOUT_SEC 1
+#define CONNECT_RETRY_COUNT 64
+#define CONNECT_RETRY_COUNT_MAX 180
+// IO default 64 seconds
+#define EPOLL_IO_WAIT_TIMEOUT_MSEC 1000
+#define EPOLL_IO_WAIT_TIMEOUT_SEC 1
+#define EPOLL_IO_RETRY_COUNT 64
+#define EPOLL_IO_RETRY_COUNT_MAX 180
+
// The following defines specify the default values for the HA
// timers if the timer related environment variables are not defined.
// Defaults to 60 second Watchdog process timer expiration
diff --git a/core/sqf/monitor/linux/nameserver.cxx b/core/sqf/monitor/linux/nameserver.cxx
index ad024f6..fe83c49 100644
--- a/core/sqf/monitor/linux/nameserver.cxx
+++ b/core/sqf/monitor/linux/nameserver.cxx
@@ -54,6 +54,7 @@
#include "meas.h"
#include "reqqueue.h"
+extern CMonitor *Monitor;
extern CNode *MyNode;
extern CProcess *NameServerProcess;
extern CNodeContainer *Nodes;
@@ -66,14 +67,33 @@
#define NAMESERVER_IO_RETRIES 3
CNameServer::CNameServer( void )
- : mon2nsSock_(-1)
- , nsStartupComplete_(false)
- , seqNum_(0)
+ : nsStartupComplete_(false)
, shutdown_(false)
+ , ioWaitTimeout_(EPOLL_IO_WAIT_TIMEOUT_MSEC)
+ , ioRetryCount_(EPOLL_IO_RETRY_COUNT)
+ , mon2nsSock_(-1)
+ , seqNum_(0)
{
const char method_name[] = "CNameServer::CNameServer";
TRACE_ENTRY;
+ // Use the EPOLL timeout and retry values
+ char *ioWaitTimeoutEnv = getenv( "SQ_MON_EPOLL_WAIT_TIMEOUT" );
+ if ( ioWaitTimeoutEnv )
+ {
+ // Timeout in seconds
+ ioWaitTimeout_ = atoi( ioWaitTimeoutEnv );
+ char *ioRetryCountEnv = getenv( "SQ_MON_EPOLL_RETRY_COUNT" );
+ if ( ioRetryCountEnv )
+ {
+ ioRetryCount_ = atoi( ioRetryCountEnv );
+ }
+ if ( ioRetryCount_ > EPOLL_IO_RETRY_COUNT_MAX )
+ {
+ ioRetryCount_ = EPOLL_IO_RETRY_COUNT_MAX;
+ }
+ }
+
mon2nsHost_[0] = '\0';
mon2nsPort_[0] = '\0';
@@ -188,7 +208,20 @@
if ( err == 0 )
{
- sock = ClientSockCreate();
+ char mon2nsPort[MAX_PROCESSOR_NAME+MAX_PROCESSOR_NAME];
+
+ //memset( &mon2nsPort, 0, sizeof(mon2nsPort) );
+ mon2nsPort[0] = 0;
+ sprintf( mon2nsPort,"%s:%s", mon2nsHost_, mon2nsPort_ );
+
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Connecting to %s\n"
+ , method_name, __LINE__
+ , mon2nsPort );
+ }
+
+ sock = CComm::Connect( mon2nsPort );
if ( sock < 0 )
{
err = sock;
@@ -246,7 +279,11 @@
, nodeId.creatorShellVerifier
, nodeId.ping );
}
- err = SockSend( ( char *) &nodeId, sizeof(nodeId) );
+ err = CComm::Send( mon2nsSock_
+ , (char *) &nodeId
+ , sizeof(nodeId)
+ , mon2nsHost_
+ , method_name );
if (err == 0)
{
if ( trace_settings & TRACE_NS )
@@ -258,7 +295,11 @@
, mon2nsSock_
, err );
}
- err = SockReceive( (char *) &nodeId, sizeof(nodeId ) );
+ err = CComm::Receive( mon2nsSock_
+ , (char *) &nodeId
+ , sizeof(nodeId)
+ , mon2nsHost_
+ , method_name );
if ( err )
{
if ( trace_settings & TRACE_NS )
@@ -319,7 +360,7 @@
gethostname( mon2nsHost_, MAX_PROCESSOR_NAME);
GetM2NPort( -1 );
}
- SockClose();
+ CloseNs();
}
}
}
@@ -443,189 +484,6 @@
return(rs);
}
-int CNameServer::ClientSockCreate( void )
-{
- const char method_name[] = "CNameServer::ClientSockCreate";
- TRACE_ENTRY;
-
- int sock; // socket
- int ret; // returned value
- int reuse = 1; // sockopt reuse option
- int nodelay = 1; // sockopt nodelay option
- socklen_t size; // size of socket address
- static int retries = 0; // # times to retry connect
- int outer_failures = 0; // # failed connect loops
- int connect_failures = 0; // # failed connects
- char *p; // getenv results
- struct sockaddr_in sockinfo; // socket address info
- struct hostent *he;
- char host[MAX_PROCESSOR_NAME];
- unsigned int port;
-
- strcpy( host, mon2nsHost_ );
- port = atoi( mon2nsPort_ );
-
- if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
- {
- trace_printf( "%s@%d - Connecting to %s:%d\n"
- , method_name, __LINE__
- , host
- , port );
- }
-
- size = sizeof(sockinfo );
-
- if ( !retries )
- {
- p = getenv( "HPMP_CONNECT_RETRIES" );
- if ( p ) retries = atoi( p );
- else retries = 5;
- }
-
- for ( ;; )
- {
- sock = socket( AF_INET, SOCK_STREAM, 0 );
- if ( sock < 0 )
- {
- char la_buf[MON_STRING_BUF_SIZE];
- int err = errno;
- snprintf( la_buf, sizeof(la_buf)
- , "[%s], socket() failed! errno=%d (%s)\n"
- , method_name, err, strerror(err) );
- mon_log_write( NAMESERVER_CLIENTSOCKCREATE_1, SQ_LOG_ERR, la_buf );
- TRACE_EXIT;
- return ( -1 );
- }
-
- he = gethostbyname( host );
- if ( !he )
- {
- char la_buf[MON_STRING_BUF_SIZE];
- int err = h_errno;
- snprintf( la_buf, sizeof(la_buf ),
- "[%s] gethostbyname(%s) failed! errno=%d (%s)\n"
- , method_name, host, err, strerror(err) );
- mon_log_write(NAMESERVER_CLIENTSOCKCREATE_2, SQ_LOG_ERR, la_buf );
- close( sock );
- TRACE_EXIT;
- return ( -1 );
- }
-
- // Connect socket.
- memset( (char *) &sockinfo, 0, size );
- memcpy( (char *) &sockinfo.sin_addr, (char *) he->h_addr, 4 );
- sockinfo.sin_family = AF_INET;
- sockinfo.sin_port = htons( (unsigned short) port );
-
- // Note the outer loop uses "retries" from HPMP_CONNECT_RETRIES,
- // and has a yield between each retry, since it's more oriented
- // toward failures from network overload and putting a pause
- // between retries. This inner loop should only iterate when
- // a signal interrupts the local process, so it doesn't pause
- // or use the same HPMP_CONNECT_RETRIES count.
- connect_failures = 0;
- ret = 1;
- while ( ret != 0 && connect_failures <= 10 )
- {
- if ( trace_settings & TRACE_NS )
- {
- trace_printf( "%s@%d - Connecting to %s addr=%d.%d.%d.%d, port=%d, connect_failures=%d\n"
- , method_name, __LINE__
- , host
- , (int)((unsigned char *)he->h_addr)[0]
- , (int)((unsigned char *)he->h_addr)[1]
- , (int)((unsigned char *)he->h_addr)[2]
- , (int)((unsigned char *)he->h_addr)[3]
- , port
- , connect_failures );
- }
-
- ret = connect( sock, (struct sockaddr *) &sockinfo, size );
- if ( ret == 0 ) break;
- ++connect_failures;
- if ( errno != EINTR )
- {
- char la_buf[MON_STRING_BUF_SIZE];
- int err = errno;
- sprintf( la_buf, "[%s], connect() failed! errno=%d (%s)\n"
- , method_name, err, strerror(err) );
- mon_log_write(NAMESERVER_CLIENTSOCKCREATE_3, SQ_LOG_ERR, la_buf );
- struct timespec req, rem;
- req.tv_sec = 0;
- req.tv_nsec = 500000000L; // 500,000,000
- nanosleep( &req, &rem );
- }
- }
-
- if ( ret == 0 ) break;
-
- // For large clusters, the connect/accept calls seem to fail occasionally,
- // no doubt do to the large number (1000's) of simultaneous connect packets
- // flooding the network at once. So, we retry up to HPMP_CONNECT_RETRIES
- // number of times.
- if ( errno != EINTR )
- {
- if ( ++outer_failures > retries )
- {
- char la_buf[MON_STRING_BUF_SIZE];
- sprintf( la_buf, "[%s], connect() exceeded retries! count=%d\n"
- , method_name, retries );
- mon_log_write(NAMESERVER_CLIENTSOCKCREATE_4, SQ_LOG_ERR, la_buf );
- close( sock );
- TRACE_EXIT;
- return ( -1 );
- }
- struct timespec req, rem;
- req.tv_sec = 0;
- req.tv_nsec = 500000;
- nanosleep( &req, &rem );
- }
- close( sock );
- TRACE_EXIT;
- return( -1 );
- }
-
- if ( trace_settings & TRACE_NS )
- {
- trace_printf( "%s@%d - Connected to %s addr=%d.%d.%d.%d, port=%d, sock=%d\n"
- , method_name, __LINE__
- , host
- , (int)((unsigned char *)he->h_addr)[0]
- , (int)((unsigned char *)he->h_addr)[1]
- , (int)((unsigned char *)he->h_addr)[2]
- , (int)((unsigned char *)he->h_addr)[3]
- , port
- , sock );
- }
-
- if ( setsockopt( sock, IPPROTO_TCP, TCP_NODELAY, (char *) &nodelay, sizeof(int) ) )
- {
- char la_buf[MON_STRING_BUF_SIZE];
- int err = errno;
- sprintf( la_buf, "[%s], setsockopt() failed! errno=%d (%s)\n"
- , method_name, err, strerror(err) );
- mon_log_write(NAMESERVER_CLIENTSOCKCREATE_5, SQ_LOG_ERR, la_buf );
- close( sock );
- TRACE_EXIT;
- return ( -2 );
- }
-
- if ( setsockopt( sock, SOL_SOCKET, SO_REUSEADDR, (char *) &reuse, sizeof(int) ) )
- {
- char la_buf[MON_STRING_BUF_SIZE];
- int err = errno;
- sprintf( la_buf, "[%s], setsockopt() failed! errno=%d (%s)\n"
- , method_name, err, strerror(err) );
- mon_log_write(NAMESERVER_CLIENTSOCKCREATE_6, SQ_LOG_ERR, la_buf );
- close( sock );
- TRACE_EXIT;
- return ( -2 );
- }
-
- TRACE_EXIT;
- return ( sock );
-}
-
void CNameServer::NameServerExited( void )
{
const char method_name[] = "CNameServer::NameServerExited";
@@ -634,7 +492,7 @@
mon2nsHost_[0] = '\0';
mon2nsPort_[0] = '\0';
nsStartupComplete_ = false;
- SockClose();
+ CloseNs();
TRACE_EXIT;
}
@@ -644,7 +502,14 @@
const char method_name[] = "CNameServer::NameServerStop";
TRACE_ENTRY;
- int error = SendReceive( msg );
+ if ( trace_settings & TRACE_NS )
+ {
+ trace_printf( "%s@%d - sending nameserver-stop request to nameserver=%s:%s\n"
+ , method_name, __LINE__
+ , mon2nsHost_, mon2nsPort_ );
+ }
+
+ int error = ProcessRequest( msg );
TRACE_EXIT;
return error;
@@ -672,7 +537,19 @@
strcpy( msgdel->target_process_name, msgdel->process_name );
msgdel->target_abended = process->IsAbended();
- int error = SendReceive(&msg );
+ if ( trace_settings & TRACE_NS )
+ {
+ trace_printf( "%s@%d - sending del_process_ns request to nameserver=%s:%s\n"
+ " msg.del_process_ns.process_name=%s (%d,%d:%d)\n"
+ , method_name, __LINE__
+ , mon2nsHost_, mon2nsPort_
+ , msgdel->process_name
+ , msgdel->nid
+ , msgdel->pid
+ , msgdel->verifier );
+ }
+
+ int error = ProcessRequest( &msg );
TRACE_EXIT;
return error;
@@ -683,7 +560,14 @@
const char method_name[] = "CNameServer::ProcessInfo";
TRACE_ENTRY;
- int error = SendReceive( msg );
+ if ( trace_settings & TRACE_NS )
+ {
+ trace_printf( "%s@%d - sending process-info request to nameserver=%s:%s\n"
+ , method_name, __LINE__
+ , mon2nsHost_, mon2nsPort_ );
+ }
+
+ int error = ProcessRequest( msg );
TRACE_EXIT;
return error;
@@ -694,7 +578,14 @@
const char method_name[] = "CNameServer::ProcessInfoCont";
TRACE_ENTRY;
- int error = SendReceive( msg );
+ if ( trace_settings & TRACE_NS )
+ {
+ trace_printf( "%s@%d - sending process-info-continue request to nameserver=%s:%s\n"
+ , method_name, __LINE__
+ , mon2nsHost_, mon2nsPort_ );
+ }
+
+ int error = ProcessRequest( msg );
TRACE_EXIT;
return error;
@@ -705,7 +596,14 @@
const char method_name[] = "CNameServer::ProcessInfoNs";
TRACE_ENTRY;
- int error = SendReceive( msg );
+ if ( trace_settings & TRACE_NS )
+ {
+ trace_printf( "%s@%d - sending process-info-ns request to nameserver=%s:%s\n"
+ , method_name, __LINE__
+ , mon2nsHost_, mon2nsPort_ );
+ }
+
+ int error = ProcessRequest( msg );
TRACE_EXIT;
return error;
@@ -761,7 +659,7 @@
if ( trace_settings & ( TRACE_NS | TRACE_REQUEST) )
{
- trace_printf( "%s@%d - Received monitor request new-process data.\n"
+ trace_printf( "%s@%d - sending new_process_ns request to nameserver=%s:%s\n"
" msg.new_process_ns.nid=%d\n"
" msg.new_process_ns.pid=%d\n"
" msg.new_process_ns.verifier=%d\n"
@@ -786,6 +684,7 @@
" msg.new_process_ns.outfile=%s\n"
" msg.new_process_ns.creation_time=%ld(secs):%ld(nsecs)\n"
, method_name, __LINE__
+ , mon2nsHost_, mon2nsPort_
, msgnew->nid
, msgnew->pid
, msgnew->verifier
@@ -822,7 +721,7 @@
}
}
- int error = SendReceive(&msg );
+ int error = ProcessRequest( &msg );
TRACE_EXIT;
return error;
@@ -860,7 +759,7 @@
, msgdown->node_name );
}
- error = SendReceive(&msg );
+ error = ProcessRequest( &msg );
}
TRACE_EXIT;
@@ -883,18 +782,29 @@
msgshutdown->pid = -1;
msgshutdown->level = ShutdownLevel_Normal;
- int error = SendReceive(&msg );
+ if ( trace_settings & TRACE_NS )
+ {
+ trace_printf( "%s@%d - sending shutdown_ns request to nameserver=%s:%s\n"
+ " msg.shutdown_ns.level=%d\n"
+ , method_name, __LINE__
+ , mon2nsHost_, mon2nsPort_
+ , msgshutdown->level );
+ }
+
+ int error = ProcessRequest( &msg );
if ( error == 0 )
+ {
SetShutdown( true );
+ }
TRACE_EXIT;
return error;
}
-int CNameServer::SendReceive( struct message_def* msg )
+int CNameServer::ProcessRequest( struct message_def* msg )
{
- const char method_name[] = "CNameServer::SendReceive";
+ const char method_name[] = "CNameServer::ProcessRequest";
TRACE_ENTRY;
int retryCount = 0;
@@ -982,9 +892,34 @@
int error = SendToNs( descp, msg, size );
if ( error == 0 )
- error = SockReceive( (char *) &size, sizeof(size ) );
+ {
+ error = CComm::Receive( mon2nsSock_
+ , (char *) &size
+ , sizeof(size)
+ , mon2nsHost_
+ , method_name );
+ }
if ( error == 0 )
- error = SockReceive( (char *) pmsg_reply, size );
+ {
+ error = CComm::Receive( mon2nsSock_
+ , (char *) pmsg_reply
+ , size
+ , mon2nsHost_
+ , method_name );
+ }
+ if ( error != 0 )
+ {
+ // Choose another name server on IO retry
+ if (IsRealCluster)
+ {
+ mon2nsHost_[0] = 0;
+ }
+ else
+ {
+ mon2nsPort_[0] = 0;
+ }
+ }
+
if ( error == 0 )
{
memcpy( msg, pmsg_reply, size );
@@ -1170,7 +1105,11 @@
if ( error == 0 )
{
- error = SockSend( (char *) &size, sizeof(size) );
+ error = CComm::Send( mon2nsSock_
+ , (char *) &size
+ , sizeof(size)
+ , mon2nsHost_
+ , method_name );
if (error)
{
int err = error;
@@ -1183,7 +1122,11 @@
}
else
{
- error = SockSend( (char *) msg, size );
+ error = CComm::Send( mon2nsSock_
+ , (char *) msg
+ , size
+ , mon2nsHost_
+ , method_name );
if (error)
{
int err = error;
@@ -1219,198 +1162,17 @@
TRACE_EXIT;
}
-void CNameServer::SockClose( void )
+void CNameServer::CloseNs( void )
{
- const char method_name[] = "CNameServer::SockClose";
+ const char method_name[] = "CNameServer::CloseNs";
TRACE_ENTRY;
if (mon2nsSock_ != -1)
{
- close( mon2nsSock_ );
+ CComm::Close( mon2nsSock_ );
mon2nsSock_ = -1;
}
TRACE_EXIT;
}
-int CNameServer::SockReceive( char *buf, int size )
-{
- const char method_name[] = "CNameServer::SockReceive";
- TRACE_ENTRY;
-
- bool readAgain = false;
- int error = 0;
- int readCount = 0;
- int received = 0;
- int sizeCount = size;
-
- do
- {
- readCount = (int) recv( mon2nsSock_
- , buf
- , sizeCount
- , 0 );
- if ( readCount > 0 ) Meas.addSockNsRcvdBytes( readCount );
-
- if ( trace_settings & TRACE_NS )
- {
- trace_printf( "%s@%d - Count read %d = recv(%d)\n"
- , method_name, __LINE__
- , readCount
- , sizeCount );
- }
-
- if ( readCount > 0 )
- { // Got data
- received += readCount;
- buf += readCount;
- if ( received == size )
- {
- readAgain = false;
- }
- else
- {
- sizeCount -= readCount;
- readAgain = true;
- }
- }
- else if ( readCount == 0 )
- { // EOF
- error = ENODATA;
- readAgain = false;
- }
- else
- { // Got an error
- if ( errno != EINTR)
- {
- error = errno;
- readAgain = false;
- }
- else
- {
- readAgain = true;
- }
- }
- }
- while( readAgain );
-
- if ( trace_settings & TRACE_NS )
- {
- trace_printf( "%s@%d - recv(), received=%d, error=%d(%s)\n"
- , method_name, __LINE__
- , received
- , error, strerror(error) );
- }
-
- if (error)
- {
- SockClose();
-
- int err = error;
- char buf[MON_STRING_BUF_SIZE];
- snprintf( buf, sizeof(buf)
- , "[%s], unable to receive request size %d to "
- "nameserver=%s:%s, error: %d(%s)\n"
- , method_name, size, mon2nsHost_, mon2nsPort_, err, strerror(err) );
- mon_log_write(NAMESERVER_SOCKRECEIVE_1, SQ_LOG_ERR, buf);
-
- // Choose another name server on IO retry
- if (IsRealCluster)
- {
- mon2nsHost_[0] = 0;
- }
- else
- {
- mon2nsPort_[0] = 0;
- }
- }
-
- TRACE_EXIT;
- return error;
-}
-
-int CNameServer::SockSend( char *buf, int size )
-{
- const char method_name[] = "CNameServer::SockSend";
- TRACE_ENTRY;
-
- bool sendAgain = false;
- int error = 0;
- int sendCount = 0;
- int sent = 0;
-
- do
- {
- sendCount = (int) send( mon2nsSock_
- , buf
- , size
- , 0 );
- if ( sendCount > 0 ) Meas.addSockNsSentBytes( sendCount );
-
- if ( trace_settings & TRACE_NS )
- {
- trace_printf( "%s@%d - send(), sendCount=%d\n"
- , method_name, __LINE__
- , sendCount );
- }
-
- if ( sendCount > 0 )
- { // Sent data
- sent += sendCount;
- if ( sendCount == size )
- {
- sendAgain = false;
- }
- else
- {
- sendAgain = true;
- }
- }
- else
- { // Got an error
- if ( errno != EINTR)
- {
- error = errno;
- sendAgain = false;
- }
- else
- {
- sendAgain = true;
- }
- }
- }
- while( sendAgain );
-
- if ( trace_settings & TRACE_NS )
- {
- trace_printf( "%s@%d - send(), sent=%d, error=%d(%s)\n"
- , method_name, __LINE__
- , sent
- , error, strerror(error) );
- }
-
- if (error)
- {
- SockClose();
-
- int err = error;
- char buf[MON_STRING_BUF_SIZE];
- snprintf( buf, sizeof(buf)
- , "[%s], unable to send request size %d to "
- "nameserver=%s:%s, error: %d(%s)\n"
- , method_name, size, mon2nsHost_, mon2nsPort_, err, strerror(err) );
- mon_log_write(NAMESERVER_SOCKSEND_1, SQ_LOG_ERR, buf);
- // Choose another name server on IO retry
- if (IsRealCluster)
- {
- mon2nsHost_[0] = 0;
- }
- else
- {
- mon2nsPort_[0] = 0;
- }
- }
-
- TRACE_EXIT;
- return error;
-}
diff --git a/core/sqf/monitor/linux/nameserver.h b/core/sqf/monitor/linux/nameserver.h
index 009eced..805f495 100644
--- a/core/sqf/monitor/linux/nameserver.h
+++ b/core/sqf/monitor/linux/nameserver.h
@@ -28,9 +28,10 @@
#ifndef NAMESERVER_PROCESS
#include "process.h"
+#include "comm.h"
-class CNameServer
+class CNameServer : public CComm
{
protected:
int eyecatcher_; // Debuggging aid -- leave as first
@@ -53,23 +54,22 @@
void SetLocalHost( void );
private:
+ bool nsStartupComplete_;
+ bool shutdown_;
+ int ioWaitTimeout_;
+ int ioRetryCount_;
+ int mon2nsSock_;
+ int seqNum_;
char mon2nsHost_[MAX_PROCESSOR_NAME];
char mon2nsPort_[10];
- int mon2nsSock_;
- bool nsStartupComplete_;
- int seqNum_;
- bool shutdown_;
int ChooseNextNs( void );
- int ClientSockCreate();
+ void CloseNs( void );
int ConnectToNs( bool* retry );
int GetM2NPort( int PNid );
- int SendReceive( struct message_def* msg );
+ int ProcessRequest( struct message_def* msg );
int SendToNs( const char* reqType, struct message_def* msg, int size );
void SetShutdown( bool shutdown );
- void SockClose( void );
- int SockReceive( char* buf, int size );
- int SockSend( char* buf, int size );
};
#endif
diff --git a/core/sqf/monitor/linux/nscluster.cxx b/core/sqf/monitor/linux/nscluster.cxx
index d5ad9ef..5f6e641 100644
--- a/core/sqf/monitor/linux/nscluster.cxx
+++ b/core/sqf/monitor/linux/nscluster.cxx
@@ -27,14 +27,3 @@
#include "cluster.cxx"
-int CCluster::AcceptMon2NsSock( void )
-{
- const char method_name[] = "CCluster::AcceptMon2NsSock";
- TRACE_ENTRY;
-
- int csock = AcceptSock( mon2nsSock_ );
-
- TRACE_EXIT;
- return( csock );
-}
-
diff --git a/core/sqf/monitor/linux/nscommacceptmon.cxx b/core/sqf/monitor/linux/nscommacceptmon.cxx
index 0857cc9..3e91600 100644
--- a/core/sqf/monitor/linux/nscommacceptmon.cxx
+++ b/core/sqf/monitor/linux/nscommacceptmon.cxx
@@ -30,34 +30,135 @@
#include <signal.h>
#include <stdio.h>
#include <unistd.h>
+#include <netdb.h>
+#include <sys/socket.h>
#include "nscommacceptmon.h"
#include "monlogging.h"
#include "montrace.h"
#include "monitor.h"
-extern CCommAcceptMon CommAcceptMon;
+extern CCommAcceptMon *CommAcceptMon;
extern CMonitor *Monitor;
extern CNode *MyNode;
extern CNodeContainer *Nodes;
extern int MyPNID;
+extern char Node_name[MPI_MAX_PROCESSOR_NAME];
extern char MyMon2NsPort[MPI_MAX_PORT_NAME];
extern char *ErrorMsg (int error_code);
extern const char *StateString( STATE state);
extern CommType_t CommType;
extern CReqQueue ReqQueue;
+extern bool IsRealCluster;
static void *mon2nsProcess(void *arg);
CCommAcceptMon::CCommAcceptMon()
- : accepting_(false)
- , shutdown_(false)
- , thread_id_(0)
- , process_thread_id_(0)
+ : accepting_(false)
+ , shutdown_(false)
+ , ioWaitTimeout_(EPOLL_IO_WAIT_TIMEOUT_MSEC)
+ , ioRetryCount_(EPOLL_IO_RETRY_COUNT)
+ , mon2nsSock_(-1)
+ , mon2NsSocketPort_(-1)
+ , mon2NsPort_("")
+ , thread_id_(0)
+ , process_thread_id_(0)
{
const char method_name[] = "CCommAcceptMon::CCommAcceptMon";
TRACE_ENTRY;
+ // Use the EPOLL timeout and retry values
+ char *ioWaitTimeoutEnv = getenv( "SQ_MON_EPOLL_WAIT_TIMEOUT" );
+ if ( ioWaitTimeoutEnv )
+ {
+ // Timeout in seconds
+ ioWaitTimeout_ = atoi( ioWaitTimeoutEnv );
+ char *ioRetryCountEnv = getenv( "SQ_MON_EPOLL_RETRY_COUNT" );
+ if ( ioRetryCountEnv )
+ {
+ ioRetryCount_ = atoi( ioRetryCountEnv );
+ }
+ if ( ioRetryCount_ > EPOLL_IO_RETRY_COUNT_MAX )
+ {
+ ioRetryCount_ = EPOLL_IO_RETRY_COUNT_MAX;
+ }
+ }
+
+ int mon2nsPort = 0;
+ int val = 0;
+ unsigned char addr[4] = {0,0,0,0};
+ struct hostent *he;
+
+ he = gethostbyname( Node_name );
+ if ( !he )
+ {
+ char ebuff[256];
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s@%d] gethostbyname(%s) error: %s\n"
+ , method_name, __LINE__
+ , Node_name, strerror_r( h_errno, ebuff, 256 ) );
+ mon_log_write( NS_NSCOMMACCEPT_NSCOMMACCEPT_1, SQ_LOG_CRIT, buf );
+
+ mon_failure_exit();
+ }
+ memcpy( addr, he->h_addr, 4 );
+
+ char *env = getenv("NS_M2N_COMM_PORT");
+ if ( env )
+ {
+ val = atoi(env);
+ if ( val > 0)
+ {
+ if ( !IsRealCluster )
+ {
+ val += MyPNID;
+ }
+ mon2nsPort = val;
+ }
+ }
+
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d NS_M2N_COMM_PORT Node_name=%s, env=%s, mon2nsPort=%d, val=%d\n"
+ , method_name, __LINE__
+ , Node_name, env, mon2nsPort, val );
+ }
+
+ mon2nsSock_ = CComm::Listen( &mon2nsPort );
+ if ( mon2nsSock_ < 0 )
+ {
+ char ebuff[256];
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s@%d] MkSrvSock(NS_M2N_COMM_PORT=%d) error: %s\n"
+ , method_name, __LINE__, mon2nsPort
+ , strerror_r( errno, ebuff, 256 ) );
+ mon_log_write( NS_NSCOMMACCEPT_NSCOMMACCEPT_2, SQ_LOG_CRIT, buf );
+
+ mon_failure_exit();
+ }
+ else
+ {
+ snprintf( MyMon2NsPort, sizeof(MyMon2NsPort)
+ , "%d.%d.%d.%d:%d"
+ , (int)((unsigned char *)addr)[0]
+ , (int)((unsigned char *)addr)[1]
+ , (int)((unsigned char *)addr)[2]
+ , (int)((unsigned char *)addr)[3]
+ , mon2nsPort );
+ setMon2NsPort( MyMon2NsPort );
+ setMon2NsSocketPort( mon2nsPort );
+
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ trace_printf( "%s@%d Initialized my mon2ns comm socket port, "
+ "pnid=%d (%s:%s) (Mon2NsPort=%s, Mon2NsSocketPort=%d)\n"
+ , method_name, __LINE__
+ , MyPNID, MyNode->GetName(), MyMon2NsPort
+ , getMon2NsPort()
+ , getMon2NsSocketPort() );
+
+ }
TRACE_EXIT;
}
@@ -448,6 +549,9 @@
nodeId_t nodeId;
struct message_def msg;
+ static int sv_io_wait_timeout = EPOLL_IO_WAIT_TIMEOUT_MSEC;
+ static int sv_io_retry_count = EPOLL_IO_RETRY_COUNT;
+
if ( trace_settings & ( TRACE_NS ) )
{
trace_printf( "%s@%d - Accepted connection sock=%d\n"
@@ -455,10 +559,11 @@
}
// Get info about connecting monitor
- rc = Monitor->ReceiveSock( (char *) &nodeId
- , sizeof(nodeId_t)
- , sockFd
- , method_name );
+ rc = CComm::Receive( sockFd
+ , (char *) &nodeId
+ , sizeof(nodeId_t)
+ , (char *) "Remote monitor"
+ , method_name );
if ( rc )
{ // Handle error
close( sockFd );
@@ -554,10 +659,13 @@
}
// return Send info to connecting monitor
- rc = Monitor->SendSock( (char *) &nodeId
- , sizeof(nodeId_t)
- , sockFd
- , method_name );
+ rc = CComm::SendWait( sockFd
+ , (char *) &nodeId
+ , sizeof(nodeId_t)
+ , sv_io_wait_timeout
+ , sv_io_retry_count
+ , (char *) node->GetName()
+ , method_name );
if ( rc )
{ // Handle error
close( sockFd );
@@ -572,7 +680,11 @@
{
// Get monitor request (hdr)
int size;
- rc = Monitor->ReceiveSock( (char *) &size, sizeof(size), sockFd, method_name );
+ rc = CComm::Receive( sockFd
+ , (char *) &size
+ , sizeof(size)
+ , (char *) node->GetName()
+ , method_name );
if ( rc )
{ // Handle error
close( sockFd );
@@ -583,7 +695,11 @@
return;
}
- rc = Monitor->ReceiveSock( (char *) &msg, size, sockFd, method_name );
+ rc = CComm::Receive( sockFd
+ , (char *) &msg
+ , size
+ , (char *) node->GetName()
+ , method_name );
if ( rc )
{ // Handle error
close( sockFd );
@@ -779,7 +895,7 @@
}
mem_log_write(CMonLog::MON_NSCONNTONEWMON_2);
- joinFd = Monitor->AcceptMon2NsSock();
+ joinFd = CComm::Accept( mon2nsSock_ );
}
else
{
@@ -825,6 +941,16 @@
TRACE_EXIT;
}
+void CCommAcceptMon::connectToCommSelf( void )
+{
+ const char method_name[] = "CCommAcceptMon::connectToCommSelf";
+ TRACE_ENTRY;
+
+ CComm::ConnectLocal( getMon2NsSocketPort() );
+
+ TRACE_EXIT;
+}
+
void CCommAcceptMon::shutdownWork(void)
{
const char method_name[] = "CCommAcceptMon::shutdownWork";
@@ -832,7 +958,7 @@
// Set flag that tells the commAcceptor thread to exit
shutdown_ = true;
- Monitor->ConnectToMon2NsCommSelf();
+ connectToCommSelf();
CLock::wakeOne();
if ( trace_settings & ( TRACE_NS ) )
diff --git a/core/sqf/monitor/linux/nscommacceptmon.h b/core/sqf/monitor/linux/nscommacceptmon.h
index 1749b16..bc4cbf8 100644
--- a/core/sqf/monitor/linux/nscommacceptmon.h
+++ b/core/sqf/monitor/linux/nscommacceptmon.h
@@ -29,8 +29,10 @@
#include <pthread.h>
#include "lock.h"
#include "reqqueue.h"
+#include "comm.h"
class CCommAcceptMon : public CLock
+ , public CComm
{
public:
@@ -38,6 +40,8 @@
virtual ~CCommAcceptMon();
void commAcceptor( void );
+ inline const char * getMon2NsPort( void ) { return mon2NsPort_.c_str(); }
+ inline int getMon2NsSocketPort( void ) { return( mon2NsSocketPort_ ); }
bool isAccepting( void ) { CAutoLock lock(getLocker()); return( accepting_ ); }
void processNewSock( int sockFd );
void processMonReqs( int sockFd );
@@ -52,6 +56,8 @@
void monReqProcessInfoNs( struct message_def* msg, int sockFd );
void monReqShutdown( struct message_def* msg, int sockFd );
void monReqUnknown( struct message_def* msg, int sockFd );
+ inline void setMon2NsPort( char *mon2NsPort) { mon2NsPort_ = mon2NsPort; }
+ inline void setMon2NsSocketPort( int mon2NsSocketPort) { mon2NsSocketPort_ = mon2NsSocketPort; }
void startAccepting( void );
void stopAccepting( void );
void start( void );
@@ -65,9 +71,15 @@
private:
void commAcceptorSock( void );
+ void connectToCommSelf( void );
- bool accepting_;
- bool shutdown_;
+ bool accepting_;
+ bool shutdown_;
+ int ioWaitTimeout_;
+ int ioRetryCount_;
+ int mon2nsSock_;
+ int mon2NsSocketPort_; // monitor to ns port
+ string mon2NsPort_; // monitor to ns port ip address
// mon2nsAcceptMon thread's id
pthread_t thread_id_;
diff --git a/core/sqf/monitor/linux/nsreqqueue.cxx b/core/sqf/monitor/linux/nsreqqueue.cxx
index d10a963..7ffa67a 100644
--- a/core/sqf/monitor/linux/nsreqqueue.cxx
+++ b/core/sqf/monitor/linux/nsreqqueue.cxx
@@ -32,8 +32,13 @@
const char method_name[] = "CRequest::monreply";
TRACE_ENTRY;
+ static int sv_io_wait_timeout = EPOLL_IO_WAIT_TIMEOUT_MSEC;
+ static int sv_io_retry_count = EPOLL_IO_RETRY_COUNT;
+
if (error)
+ {
*error = 0;
+ }
if (!msg->noreply) // send reply
{
int size = offsetof(struct message_def, u.reply.u);
@@ -95,9 +100,12 @@
}
abort();
}
- int rc = Monitor->SendSock( (char *) &size
+ int rc = Monitor->SendWait( sockFd
+ , (char *) &size
, sizeof(size)
- , sockFd
+ , sv_io_wait_timeout
+ , sv_io_retry_count
+ , (char *) "Remote node"
, method_name );
if ( rc )
{
@@ -109,9 +117,12 @@
*error = rc;
} else
{
- rc = Monitor->SendSock( (char *) msg
+ rc = Monitor->SendWait( sockFd
+ , (char *) msg
, size
- , sockFd
+ , sv_io_wait_timeout
+ , sv_io_retry_count
+ , (char *) "Remote node"
, method_name );
if ( rc )
{
diff --git a/core/sqf/monitor/linux/pnode.cxx b/core/sqf/monitor/linux/pnode.cxx
index 54c6254..b54ee08 100644
--- a/core/sqf/monitor/linux/pnode.cxx
+++ b/core/sqf/monitor/linux/pnode.cxx
@@ -177,17 +177,12 @@
,wdtKeepAliveTimerValue_(WDT_KEEPALIVETIMERDEFAULT)
,zid_(pnid)
,commPort_("")
- ,syncPort_("")
-#ifdef NAMESERVER_PROCESS
- ,mon2NsPort_("")
- ,mon2NsSocketPort_(-1)
- ,monConnCount_(0)
-#else
- ,ptpPort_("")
- ,ptpSocketPort_(-1)
-#endif
,commSocketPort_(-1)
+ ,syncPort_("")
,syncSocketPort_(-1)
+#ifdef NAMESERVER_PROCESS
+ ,monConnCount_(0)
+#endif
,uniqStrId_(-1)
,procStatFile_(NULL)
,procMeminfoFile_(-1)
@@ -318,17 +313,12 @@
,wdtKeepAliveTimerValue_(WDT_KEEPALIVETIMERDEFAULT)
,zid_(-1)
,commPort_("")
- ,syncPort_("")
-#ifdef NAMESERVER_PROCESS
- ,mon2NsPort_("")
- ,mon2NsSocketPort_(-1)
- ,monConnCount_(-1)
-#else
- ,ptpPort_("")
- ,ptpSocketPort_(-1)
-#endif
,commSocketPort_(-1)
+ ,syncPort_("")
,syncSocketPort_(-1)
+#ifdef NAMESERVER_PROCESS
+ ,monConnCount_(-1)
+#endif
,uniqStrId_(-1)
,procStatFile_(NULL)
,procMeminfoFile_(-1)
@@ -553,7 +543,9 @@
if ( tmReady )
{
if (trace_settings & (TRACE_INIT | TRACE_SYNC | TRACE_TMSYNC))
+ {
trace_printf("%s@%d - Setting Phase_Ready on node %s, pnid=%d\n", method_name, __LINE__, GetName(), GetPNid());
+ }
phase_ = Phase_Ready;
HealthCheck.triggerTimeToLogHealth();
}
@@ -1734,7 +1726,7 @@
// Send local PSD process event to start persistent processes
// that don't require transactions
process = lnode->GetProcessLByType( ProcessType_PSD );
- if ( process )
+ if ( process && process->IsFirstInstance() )
{
char nidString[6];
sprintf(nidString,"%d",lnode->GetNid());
@@ -3960,7 +3952,7 @@
}
}
- if (trace_settings & (TRACE_SYNC_DETAIL | TRACE_TMSYNC))
+ if (trace_settings & (TRACE_PROCESS_DETAIL | TRACE_SYNC_DETAIL | TRACE_TMSYNC))
{
#ifdef NAMESERVER_PROCESS
trace_printf( "%s@%d - Node %s (pnid=%d) node_state=(%d)(%s), internalState=%d, change_nid=%d, seqNum_=%lld, monConnCount=%d\n"
@@ -4296,7 +4288,7 @@
char la_buf[MON_STRING_BUF_SIZE];
sprintf(la_buf, "[%s], Failed to load nameserver configuration.\n", method_name);
mon_log_write(MON_NODECONT_LOAD_CONFIG_4, SQ_LOG_CRIT, la_buf);
-
+
mon_failure_exit();
}
}
diff --git a/core/sqf/monitor/linux/pnode.h b/core/sqf/monitor/linux/pnode.h
index 402bf40..57f68f5 100644
--- a/core/sqf/monitor/linux/pnode.h
+++ b/core/sqf/monitor/linux/pnode.h
@@ -43,7 +43,7 @@
class CNode;
typedef enum {
- Phase_Undefined=0 // Node ready for use
+ Phase_Undefined=0 // Initial phase
,Phase_Ready // Node ready for use
,Phase_Activating // Spare node going active
} NodePhase;
@@ -265,17 +265,12 @@
inline int GetRank( void ) { return( rank_ ); }
inline ShutdownLevel GetShutdownLevel( void) { return( shutdownLevel_ ); }
inline const char *GetCommPort( void ) { return commPort_.c_str(); }
- inline const char *GetSyncPort( void ) { return syncPort_.c_str(); }
-#ifdef NAMESERVER_PROCESS
- inline const char *GetMon2NsPort( void ) { return mon2NsPort_.c_str(); }
- inline int GetMon2NsSocketPort( void ) { return( mon2NsSocketPort_ ); }
- inline int GetMonConnCount( void ) { return monConnCount_; }
-#else
- inline const char *GetPtPPort( void ) { return ptpPort_.c_str(); }
- inline int GetPtPSocketPort( void ) { return( ptpSocketPort_ ); }
-#endif
inline int GetCommSocketPort( void ) { return( commSocketPort_ ); }
+ inline const char *GetSyncPort( void ) { return syncPort_.c_str(); }
inline int GetSyncSocketPort( void ) { return( syncSocketPort_ ); }
+#ifdef NAMESERVER_PROCESS
+ inline int GetMonConnCount( void ) { return monConnCount_; }
+#endif
inline PNidVector &GetSparePNids( void ) { return( sparePNids_ ); }
inline STATE GetState( void ) { return( state_ ); }
@@ -340,18 +335,9 @@
inline void SetRank( int rank ) { rank_ = rank; }
inline void SetRankFailure( bool failed ) { rankFailure_ = failed;
rank_ = rankFailure_ ? -1 : rank_; }
- //inline void SetPort( char * port) { port_ = port; }
inline void SetCommPort( char *commPort) { commPort_ = commPort; }
- inline void SetSyncPort( char *syncPort) { syncPort_ = syncPort; }
-#ifdef NAMESERVER_PROCESS
- inline void SetMon2NsPort( char *mon2NsPort) { mon2NsPort_ = mon2NsPort; }
- inline void SetMon2NsSocketPort( int mon2NsSocketPort) { mon2NsSocketPort_ = mon2NsSocketPort; }
-#else
- inline void SetPtPPort( char *ptpPort) { ptpPort_ = ptpPort; }
- inline void SetPtPSocketPort( int ptpSocketPort) { ptpSocketPort_ = ptpSocketPort; }
-#endif
- //inline void SetSockPort( int sockPort ) { sockPort_ = sockPort; }
inline void SetCommSocketPort( int commSocketPort) { commSocketPort_ = commSocketPort; }
+ inline void SetSyncPort( char *syncPort) { syncPort_ = syncPort; }
inline void SetSyncSocketPort( int syncSocketPort) { syncSocketPort_ = syncSocketPort; }
inline void SetSpareNode( void ) { spareNode_ = true; }
inline void SetShutdownNameServer( bool shutdown ) { shutdownNameServer_ = shutdown; }
@@ -452,17 +438,12 @@
int zid_;
string commPort_; // monitor MPI or Integration port
- string syncPort_; // monitor socket allgather port
-#ifdef NAMESERVER_PROCESS
- string mon2NsPort_; // monitor to ns port
- int mon2NsSocketPort_; // monitor to ns socket port
- int monConnCount_; // monitor connections
-#else
- string ptpPort_;
- int ptpSocketPort_; // point-2-point socket port
-#endif
int commSocketPort_; // re-integration socket port
+ string syncPort_; // monitor socket allgather port
int syncSocketPort_; // algather socket port
+#ifdef NAMESERVER_PROCESS
+ int monConnCount_; // monitor connections
+#endif
int uniqStrId_;
diff --git a/core/sqf/monitor/linux/pstartd.cxx b/core/sqf/monitor/linux/pstartd.cxx
index eb78d91..e8d3061 100644
--- a/core/sqf/monitor/linux/pstartd.cxx
+++ b/core/sqf/monitor/linux/pstartd.cxx
@@ -26,6 +26,7 @@
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
+#include <sys/resource.h>
#include "SCMVersHelp.h"
@@ -40,6 +41,7 @@
const char *MyName;
char ga_ms_su_c_port[MPI_MAX_PORT_NAME] = {0}; // connection port - not used
+bool GenCoreOnFailureExit = false;
long trace_settings = 0;
bool IsRealCluster = true;
@@ -61,6 +63,31 @@
DEFINE_EXTERN_COMP_DOVERS(pstartd)
DEFINE_EXTERN_COMP_PRINTVERS(pstartd)
+void mon_failure_exit( bool genCoreOnFailureExit )
+{
+ const char method_name[] = "mon_failure_exit";
+
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf(buf, sizeof(buf), "[%s], Aborting! genCore=%d, GenCore=%d\n",
+ method_name, genCoreOnFailureExit, GenCoreOnFailureExit);
+ monproc_log_write(PSTARTD_FAILURE_EXIT_1, SQ_LOG_CRIT, buf);
+
+ if (genCoreOnFailureExit || GenCoreOnFailureExit)
+ {
+ // Generate a core file, abort is intentional
+ abort();
+ }
+ else
+ {
+ // Don't generate a core file, abort is intentional
+ struct rlimit limit;
+ limit.rlim_cur = 0;
+ limit.rlim_max = 0;
+ setrlimit(RLIMIT_CORE, &limit);
+ abort();
+ }
+}
+
const char *ProcessTypeString( PROCESSTYPE type );
const char *MessageTypeString( MSGTYPE type )
diff --git a/core/sqf/monitor/linux/ptpclient.cxx b/core/sqf/monitor/linux/ptpclient.cxx
index f2aa887..deee6b8 100644
--- a/core/sqf/monitor/linux/ptpclient.cxx
+++ b/core/sqf/monitor/linux/ptpclient.cxx
@@ -59,16 +59,38 @@
extern CMeas Meas;
extern int MyPNID;
+const char *EpollEventString( __uint32_t events );
+const char *EpollOpString( int op );
+
#define MON2MON_IO_RETRIES 3
CPtpClient::CPtpClient (void)
- : ptpCommPort_(0)
+ : ioWaitTimeout_(EPOLL_IO_WAIT_TIMEOUT_MSEC)
+ , ioRetryCount_(EPOLL_IO_RETRY_COUNT)
+ , ptpCommPort_(0)
, ptpClusterSocks_(NULL)
, seqNum_(0)
{
const char method_name[] = "CPtpClient::CPtpClient";
TRACE_ENTRY;
+ // Use the EPOLL timeout and retry values
+ char *ioWaitTimeoutEnv = getenv( "SQ_MON_EPOLL_WAIT_TIMEOUT" );
+ if ( ioWaitTimeoutEnv )
+ {
+ // Timeout in seconds
+ ioWaitTimeout_ = atoi( ioWaitTimeoutEnv );
+ char *ioRetryCountEnv = getenv( "SQ_MON_EPOLL_RETRY_COUNT" );
+ if ( ioRetryCountEnv )
+ {
+ ioRetryCount_ = atoi( ioRetryCountEnv );
+ }
+ if ( ioRetryCount_ > EPOLL_IO_RETRY_COUNT_MAX )
+ {
+ ioRetryCount_ = EPOLL_IO_RETRY_COUNT_MAX;
+ }
+ }
+
ptpHost_[0] = '\0';
ptpPortBase_[0] = '\0';
if ( !IsRealCluster )
@@ -76,7 +98,7 @@
SetLocalHost();
}
- char * env = getenv( "MON2MON_COMM_PORT" );
+ char * env = getenv( "MON_P2P_COMM_PORT" );
if ( env )
{
ptpCommPort_ = atoi( env );
@@ -85,10 +107,10 @@
{
char buf[MON_STRING_BUF_SIZE];
snprintf( buf, sizeof(buf)
- , "[%s@%d] MON2MON_COMM_PORT environment variable is not set!\n"
+ , "[%s@%d] MON_P2P_COMM_PORT environment variable is not set!\n"
, method_name, __LINE__ );
mon_log_write( PTPCLIENT_PTPCLIENT_1, SQ_LOG_CRIT, buf );
-
+
mon_failure_exit();
}
@@ -119,7 +141,7 @@
if (ptpClusterSocks_[pnid] == -1)
{
- int sock = Monitor->MkCltSock( ptpPort );
+ int sock = CComm::Connect( ptpPort );
if (sock < 0)
{
err = sock;
@@ -1166,9 +1188,11 @@
return error;
}
-int CPtpClient::SendToMon(const char *reqType, internal_msg_def *msg
+int CPtpClient::SendToMon(const char *reqType
+ , internal_msg_def *msg
, ptpMsgInfo_t &myInfo
- , int targetNid, const char *hostName)
+ , int targetNid
+ , const char *hostName)
{
const char method_name[] = "CPtpClient::SendToMon";
TRACE_ENTRY;
@@ -1241,7 +1265,13 @@
, sendSock );
}
- error = SockSend((char *) &myInfo, sizeof(ptpMsgInfo_t), sendSock);
+ error = CComm::SendWait( sendSock
+ , (char *) &myInfo
+ , sizeof(ptpMsgInfo_t)
+ , ioWaitTimeout_
+ , ioRetryCount_
+ , (char*)node->GetName()
+ , method_name );
if (error)
{
int err = error;
@@ -1254,7 +1284,13 @@
}
else
{
- error = SockSend((char *) msg, myInfo.size, sendSock);
+ error = CComm::SendWait( sendSock
+ , (char *) msg
+ , myInfo.size
+ , ioWaitTimeout_
+ , ioRetryCount_
+ , (char*)node->GetName()
+ , method_name);
if (error)
{
int err = error;
@@ -1269,7 +1305,7 @@
if (error)
{
- SockClose( pnid );
+ Close( pnid );
if ( retryCount < MON2MON_IO_RETRIES )
{
retryCount++;
@@ -1288,9 +1324,9 @@
return error;
}
-void CPtpClient::SockClose( int pnid )
+void CPtpClient::Close( int pnid )
{
- const char method_name[] = "CPtpClient::SockClose";
+ const char method_name[] = "CPtpClient::Close";
TRACE_ENTRY;
if (ptpClusterSocks_[pnid] != -1)
@@ -1307,140 +1343,3 @@
gethostname( ptpHost_, MAX_PROCESSOR_NAME );
}
-int CPtpClient::SockReceive(char *buf, int size, int sockFd)
-{
- const char method_name[] = "CPtpClient::SockReceive";
- TRACE_ENTRY;
-
- bool readAgain = false;
- int error = 0;
- int readCount = 0;
- int received = 0;
- int sizeCount = size;
-
- do
- {
- readCount = (int) recv( sockFd
- , buf
- , sizeCount
- , 0 );
- if ( readCount > 0 ) Meas.addSockPtpRcvdBytes( readCount );
-
- if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY))
- {
- trace_printf( "%s@%d - Count read %d = recv(%d)\n"
- , method_name, __LINE__
- , readCount
- , sizeCount );
- }
-
- if ( readCount > 0 )
- { // Got data
- received += readCount;
- buf += readCount;
- if ( received == size )
- {
- readAgain = false;
- }
- else
- {
- sizeCount -= readCount;
- readAgain = true;
- }
- }
- else if ( readCount == 0 )
- { // EOF
- error = ENODATA;
- readAgain = false;
- }
- else
- { // Got an error
- if ( errno != EINTR)
- {
- error = errno;
- readAgain = false;
- }
- else
- {
- readAgain = true;
- }
- }
- }
- while( readAgain );
-
- if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY))
- {
- trace_printf( "%s@%d - recv(), received=%d, error=%d(%s)\n"
- , method_name, __LINE__
- , received
- , error, strerror(error) );
- }
-
- TRACE_EXIT;
- return error;
-}
-
-int CPtpClient::SockSend(char *buf, int size, int sockFd)
-{
- const char method_name[] = "CPtpClient::SockSend";
- TRACE_ENTRY;
-
- bool sendAgain = false;
- int error = 0;
- int sendCount = 0;
- int sent = 0;
-
- do
- {
- sendCount = (int) send( sockFd
- , buf
- , size
- , 0 );
- if ( sendCount > 0 ) Meas.addSockPtpSentBytes( sendCount );
-
- if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY))
- {
- trace_printf( "%s@%d - send(), sendCount=%d\n"
- , method_name, __LINE__
- , sendCount );
- }
-
- if ( sendCount > 0 )
- { // Sent data
- sent += sendCount;
- if ( sendCount == size )
- {
- sendAgain = false;
- }
- else
- {
- sendAgain = true;
- }
- }
- else
- { // Got an error
- if ( errno != EINTR)
- {
- error = errno;
- sendAgain = false;
- }
- else
- {
- sendAgain = true;
- }
- }
- }
- while( sendAgain );
-
- if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY))
- {
- trace_printf( "%s@%d - send(), sent=%d, error=%d(%s)\n"
- , method_name, __LINE__
- , sent
- , error, strerror(error) );
- }
-
- TRACE_EXIT;
- return error;
-}
-
diff --git a/core/sqf/monitor/linux/ptpclient.h b/core/sqf/monitor/linux/ptpclient.h
index 95563f8..e28ffdd 100644
--- a/core/sqf/monitor/linux/ptpclient.h
+++ b/core/sqf/monitor/linux/ptpclient.h
@@ -27,10 +27,11 @@
#define PTPCLIENT_H_
#ifndef NAMESERVER_PROCESS
+#include "comm.h"
#include "process.h"
#include "internal.h"
-class CPtpClient
+class CPtpClient : public CComm
{
protected:
int eyecatcher_; // Debuggging aid -- leave as first
@@ -84,12 +85,15 @@
private:
+ int ioWaitTimeout_;
+ int ioRetryCount_;
int ptpCommPort_;
char ptpHost_[MAX_PROCESSOR_NAME];
char ptpPortBase_[MAX_PROCESSOR_NAME+100];
int *ptpClusterSocks_;
int seqNum_;
+ void Close( int pnid );
bool IsTargetRemote( int targetNid );
int SendToMon( const char* reqType
, internal_msg_def* msg
@@ -97,11 +101,6 @@
, int receiveNode
, const char* hostName);
void SetLocalHost( void );
- void SockClose( int pnid );
- int SockReceive(char* buf, int size, int sockFd);
- int SockSend( char* buf
- , int size
- , int sockFd);
};
#endif
diff --git a/core/sqf/monitor/linux/ptpcommaccept.cxx b/core/sqf/monitor/linux/ptpcommaccept.cxx
index 0972ec9..08ae576 100644
--- a/core/sqf/monitor/linux/ptpcommaccept.cxx
+++ b/core/sqf/monitor/linux/ptpcommaccept.cxx
@@ -26,6 +26,9 @@
using namespace std;
#include <stdio.h>
+#include <netdb.h>
+#include <sys/socket.h>
+
#include "redirector.h"
#include "ptpcommaccept.h"
#include "monlogging.h"
@@ -36,27 +39,145 @@
extern CRedirector Redirector;
extern CReqQueue ReqQueue;
-extern CPtpCommAccept PtpCommAccept;
+extern CPtpCommAccept *PtpCommAccept;
extern CMonitor *Monitor;
extern CNode *MyNode;
extern CNodeContainer *Nodes;
extern CRedirector Redirector;
+extern bool IsRealCluster;
+extern bool NameServerEnabled;
extern int MyPNID;
extern char MyPtPPort[MPI_MAX_PORT_NAME];
+extern char Node_name[MPI_MAX_PROCESSOR_NAME];
+extern CommType_t CommType;
+
extern char *ErrorMsg (int error_code);
extern const char *StateString( STATE state);
-extern CommType_t CommType;
static void *ptpProcess( void *arg );
CPtpCommAccept::CPtpCommAccept()
- : accepting_(true)
- , shutdown_(false)
- , thread_id_(0)
+ : accepting_(true)
+ , shutdown_(false)
+ , ioWaitTimeout_(EPOLL_IO_WAIT_TIMEOUT_MSEC)
+ , ioRetryCount_(EPOLL_IO_RETRY_COUNT)
+ , ptpSock_(-1)
+ , ptpSocketPort_(-1)
+ , ptpPort_("")
+ , thread_id_(0)
{
const char method_name[] = "CPtpCommAccept::CPtpCommAccept";
TRACE_ENTRY;
+ // Use the EPOLL timeout and retry values
+ char *ioWaitTimeoutEnv = getenv( "SQ_MON_EPOLL_WAIT_TIMEOUT" );
+ if ( ioWaitTimeoutEnv )
+ {
+ // Timeout in seconds
+ ioWaitTimeout_ = atoi( ioWaitTimeoutEnv );
+ char *ioRetryCountEnv = getenv( "SQ_MON_EPOLL_RETRY_COUNT" );
+ if ( ioRetryCountEnv )
+ {
+ ioRetryCount_ = atoi( ioRetryCountEnv );
+ }
+ if ( ioRetryCount_ > EPOLL_IO_RETRY_COUNT_MAX )
+ {
+ ioRetryCount_ = EPOLL_IO_RETRY_COUNT_MAX;
+ }
+ }
+
+ int ptpPort = 0;
+ int val = 0;
+ unsigned char addr[4] = {0,0,0,0};
+ struct hostent *he;
+
+ he = gethostbyname( Node_name );
+ if ( !he )
+ {
+ char ebuff[256];
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s@%d] gethostbyname(%s) error: %s\n"
+ , method_name, __LINE__
+ , Node_name, strerror_r( h_errno, ebuff, 256 ) );
+ mon_log_write( PTP_COMMACCEPT_COMMACCEPT_1, SQ_LOG_CRIT, buf );
+
+ mon_failure_exit();
+ }
+ memcpy( addr, he->h_addr, 4 );
+
+ if (NameServerEnabled)
+ {
+ char *env = getenv("MON_P2P_COMM_PORT");
+ if ( env )
+ {
+ val = atoi(env);
+ if ( val > 0)
+ {
+ ptpPort = val;
+ }
+ }
+ else
+ {
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s@%d] MON_P2P_COMM_PORT environment variable is not set!\n"
+ , method_name, __LINE__ );
+ mon_log_write( PTP_COMMACCEPT_COMMACCEPT_2, SQ_LOG_CRIT, buf );
+
+ mon_failure_exit();
+ }
+
+ // For virtual env, add PNid to the port so we can still test without collisions of port numbers
+ if (!IsRealCluster)
+ {
+ ptpPort += MyNode->GetPNid();
+ }
+
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d MON_P2P_COMM_PORT Node_name=%s, env=%s, ptpPort=%d, val=%d\n"
+ , method_name, __LINE__
+ , Node_name, env, ptpPort, val );
+ }
+
+ ptpSock_ = CComm::Listen( &ptpPort );
+ if ( ptpSock_ < 0 )
+ {
+ char ebuff[MON_STRING_BUF_SIZE];
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s@%d] Listen(MON_P2P_COMM_PORT=%d) error: %s\n"
+ , method_name, __LINE__, ptpPort
+ , strerror_r( errno, ebuff, MON_STRING_BUF_SIZE ) );
+ mon_log_write( PTP_COMMACCEPT_COMMACCEPT_3, SQ_LOG_CRIT, buf );
+
+ mon_failure_exit();
+ }
+ else
+ {
+ snprintf( MyPtPPort, sizeof(MyPtPPort)
+ , "%d.%d.%d.%d:%d"
+ , (int)((unsigned char *)addr)[0]
+ , (int)((unsigned char *)addr)[1]
+ , (int)((unsigned char *)addr)[2]
+ , (int)((unsigned char *)addr)[3]
+ , ptpPort );
+ setPtPPort( MyPtPPort );
+ setPtPSocketPort( ptpPort );
+
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ trace_printf( "%s@%d Initialized my ptp socket port, "
+ "pnid=%d (%s:%s) (ptpPort=%s)\n"
+ , method_name, __LINE__
+ , MyPNID
+ , MyNode->GetName()
+ , MyPtPPort
+ , getPtPPort() );
+
+ }
+ }
+
TRACE_EXIT;
}
@@ -107,10 +228,11 @@
ptpMsgInfo_t remoteInfo;
// Get info about connecting monitor
- rc = Monitor->ReceiveSock( (char *) &remoteInfo
- , sizeof(ptpMsgInfo_t)
- , sockFd
- , method_name );
+ rc = CComm::Receive( sockFd
+ , (char *) &remoteInfo
+ , sizeof(ptpMsgInfo_t)
+ , (char *) "Remote monitor"
+ , method_name );
if ( rc )
{ // Handle error
char buf[MON_STRING_BUF_SIZE];
@@ -121,10 +243,11 @@
}
// Get info about connecting monitor
- rc = Monitor->ReceiveSock( (char *) &msg
- , remoteInfo.size
- , sockFd
- , method_name );
+ rc = CComm::Receive( sockFd
+ , (char *) &msg
+ , remoteInfo.size
+ , (char *) "Remote monitor"
+ , method_name );
if ( rc )
{ // Handle error
char buf[MON_STRING_BUF_SIZE];
@@ -319,7 +442,7 @@
}
mem_log_write(CMonLog::MON_CONNTONEWMON_1);
- sockFd = Monitor->AcceptPtPSock();
+ sockFd = CComm::Accept( ptpSock_ );
}
else
{
@@ -365,6 +488,16 @@
TRACE_EXIT;
}
+void CPtpCommAccept::connectToCommSelf( void )
+{
+ const char method_name[] = "CPtpCommAccept::connectToCommSelf";
+ TRACE_ENTRY;
+
+ CComm::ConnectLocal( getPtPSocketPort() );
+
+ TRACE_EXIT;
+}
+
void CPtpCommAccept::shutdownWork(void)
{
const char method_name[] = "CPtpCommAccept::shutdownWork";
@@ -372,7 +505,7 @@
// Set flag that tells the PtpCommAccept thread to exit
shutdown_ = true;
- Monitor->ConnectToPtPCommSelf();
+ connectToCommSelf();
CLock::wakeOne();
if (trace_settings & TRACE_INIT)
@@ -391,7 +524,7 @@
const char method_name[] = "ptpCommAccept";
TRACE_ENTRY;
- // Parameter passed to the thread is an instance of the CommAccept object
+ // Parameter passed to the thread is an instance of the CPtpCommAccept object
CPtpCommAccept *cao = (CPtpCommAccept *) arg;
// Mask all allowed signals
diff --git a/core/sqf/monitor/linux/ptpcommaccept.h b/core/sqf/monitor/linux/ptpcommaccept.h
index 78e9fe0..6a5527c 100644
--- a/core/sqf/monitor/linux/ptpcommaccept.h
+++ b/core/sqf/monitor/linux/ptpcommaccept.h
@@ -28,9 +28,11 @@
#include <pthread.h>
#include "lock.h"
+#include "comm.h"
class CPtpCommAccept : public CLock
+ , public CComm
{
public:
@@ -38,11 +40,15 @@
virtual ~CPtpCommAccept();
void commAcceptor( void );
+ inline const char * getPtPPort( void ) { return ptpPort_.c_str(); }
+ inline int getPtPSocketPort( void ) { return( ptpSocketPort_ ); }
bool isAccepting( void ) { CAutoLock lock(getLocker()); return( accepting_ ); }
void monReqExec( void *req ); //stupid compiler and circular header files
void processMonReqs( int sockFd );
void processNewSock( int sockFd );
+ inline void setPtPPort( char *ptpPort) { ptpPort_ = ptpPort; }
+ inline void setPtPSocketPort( int ptpSocketPort) { ptpSocketPort_ = ptpSocketPort; }
void startAccepting( void );
void stopAccepting( void );
void start( void );
@@ -57,14 +63,18 @@
private:
void commAcceptorSock( void );
+ void connectToCommSelf( void );
- bool accepting_;
- bool shutdown_;
-
- // ptpCommAccept thread's id
- pthread_t thread_id_;
- // ptpProcess thread's id
- pthread_t process_thread_id_;
+ bool accepting_;
+ bool shutdown_;
+ int ioWaitTimeout_;
+ int ioRetryCount_;
+ int ptpSock_;
+ int ptpSocketPort_; // point-2-point socket port
+ string ptpPort_;
+
+ pthread_t thread_id_; // ptpCommAccept thread's id
+ pthread_t process_thread_id_; // ptpProcess thread's id
};
#endif
diff --git a/core/sqf/monitor/linux/reqnodedown.cxx b/core/sqf/monitor/linux/reqnodedown.cxx
index 7a04f13..a34e0f3 100644
--- a/core/sqf/monitor/linux/reqnodedown.cxx
+++ b/core/sqf/monitor/linux/reqnodedown.cxx
@@ -85,7 +85,6 @@
if ( requester )
{
node = Nodes->GetLNode( msg_->u.request.u.down.nid )->GetNode();
- Monitor->HardNodeDown( node->GetPNid(), true );
char la_buf[MON_STRING_BUF_SIZE*2];
snprintf( la_buf, sizeof(la_buf)
@@ -110,6 +109,8 @@
, msg_->u.request.u.down.reason);
genSnmpTrap( la_buf );
+ Monitor->HardNodeDown( node->GetPNid(), true );
+
if (!msg_->noreply) // client needs a reply
{
msg_->u.reply.type = ReplyType_Generic;
diff --git a/core/sqf/monitor/linux/reqqueue.cxx b/core/sqf/monitor/linux/reqqueue.cxx
index 0860c4d..6646b3b 100644
--- a/core/sqf/monitor/linux/reqqueue.cxx
+++ b/core/sqf/monitor/linux/reqqueue.cxx
@@ -3857,6 +3857,9 @@
int error;
+ static int sv_io_wait_timeout = EPOLL_IO_WAIT_TIMEOUT_MSEC;
+ static int sv_io_retry_count = EPOLL_IO_RETRY_COUNT;
+
Monitor->EnterSyncCycle();
if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY))
@@ -3876,10 +3879,13 @@
, Monitor->getJoinComm());
break;
case CommType_Sockets:
- error = Monitor->ReceiveSock( (char *)&header
- , sizeof(header)
- , Monitor->getJoinSock()
- , method_name );
+ error = Monitor->ReceiveWait( Monitor->getJoinSock()
+ , (char *)&header
+ , sizeof(header)
+ , sv_io_wait_timeout
+ , sv_io_retry_count
+ , (char *) "Master monitor"
+ , method_name );
break;
default:
// Programmer bonehead!
@@ -3888,43 +3894,60 @@
mem_log_write(MON_REQQUEUE_REVIVE_2, error);
- if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY))
- trace_printf("%s@%d - Msg Received - header. Error = %d\n", method_name, __LINE__, error);
+ if (!error && trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d - Msg Received - header.compressedSize_=%ld\n"
+ , method_name, __LINE__, header.compressedSize_ );
+ }
if (error)
{
- if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY))
- trace_printf("%s@%d - Unable to receive header. Exiting.", method_name, __LINE__);
-
- TRACE_EXIT;
- return;
- }
-
- if (header.compressedSize_ == -1)
- { // creator monitor ran into compression error, abort.
- if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY))
- trace_printf("%s@%d - Creator monitor compression error. Exiting.", method_name, __LINE__);
-
char buf[MON_STRING_BUF_SIZE];
- sprintf(buf, "Creator monitor had compression error. Aborting node reintegration.\n");
- mon_log_write(MON_REQQUEUE_REVIVE_2, SQ_LOG_CRIT, buf);
+ sprintf( buf,
+ "[%s] Unable to receive header. Exiting!\n"
+ , method_name );
+ mon_log_write(MON_REQQUEUE_REVIVE_3, SQ_LOG_CRIT, buf);
// exit call below runs desctructors. Stop healthcheck thread so that its lock can be destructed.
HealthCheck.shutdownWork();
TRACE_EXIT;
- exit(0); // this will cause other monitors to disconnect from the new monitor.
+ // this will cause other monitors to disconnect from the new monitor.
+ mon_failure_exit();
+ }
+
+ if (header.compressedSize_ == -1)
+ { // creator monitor ran into compression error, abort.
+ char buf[MON_STRING_BUF_SIZE];
+ sprintf( buf,
+ "[%s] Creator monitor had compression error. Exiting!\n"
+ , method_name );
+ mon_log_write(MON_REQQUEUE_REVIVE_4, SQ_LOG_CRIT, buf);
+
+ // exit call below runs desctructors. Stop healthcheck thread so that its lock can be destructed.
+ HealthCheck.shutdownWork();
+
+ TRACE_EXIT;
+ // this will cause other monitors to disconnect from the new monitor.
+ mon_failure_exit();
}
char *compBuf = (char *) malloc ( header.compressedSize_ );
if (!compBuf)
{
- if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY))
- trace_printf("%s@%d - Unable to allocate buffer of size = %ld\n",
- method_name, __LINE__, header.compressedSize_);
+ char buf[MON_STRING_BUF_SIZE];
+ sprintf( buf,
+ "[%s] Unable to allocate buffer of size = %ld. Exiting!\n"
+ , method_name, header.compressedSize_ );
+ mon_log_write(MON_REQQUEUE_REVIVE_5, SQ_LOG_CRIT, buf);
+
+ // exit call below runs desctructors. Stop healthcheck thread so that its lock can be destructed.
+ HealthCheck.shutdownWork();
+
TRACE_EXIT;
- return;
+ // this will cause other monitors to disconnect from the new monitor.
+ mon_failure_exit();
}
switch( CommType )
@@ -3937,9 +3960,12 @@
, Monitor->getJoinComm());
break;
case CommType_Sockets:
- error = Monitor->ReceiveSock( compBuf
+ error = Monitor->ReceiveWait( Monitor->getJoinSock()
+ , compBuf
, header.compressedSize_
- , Monitor->getJoinSock()
+ , sv_io_wait_timeout
+ , sv_io_retry_count
+ , (char *) "Master monitor"
, method_name );
break;
default:
@@ -3954,27 +3980,39 @@
if (error)
{
- if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY))
- trace_printf("%s@%d - Unable to receive data. Exiting.", method_name, __LINE__);
-
free( compBuf );
+ char buf[MON_STRING_BUF_SIZE];
+ sprintf( buf,
+ "[%s] Unable to receive data. Exiting!\n"
+ , method_name );
+ mon_log_write(MON_REQQUEUE_REVIVE_6, SQ_LOG_CRIT, buf);
+
+ // exit call below runs desctructors. Stop healthcheck thread so that its lock can be destructed.
+ HealthCheck.shutdownWork();
+
TRACE_EXIT;
- return;
+ // this will cause other monitors to disconnect from the new monitor.
+ mon_failure_exit();
}
char *buf = (char *) malloc ( header.fullSize_ );
if (!buf)
{
- if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY))
- trace_printf("%s@%d - Unable to allocate buffer of size = %ld\n",
- method_name, __LINE__, header.fullSize_);
-
free( compBuf );
+ char buf[MON_STRING_BUF_SIZE];
+ sprintf( buf,
+ "[%s] Unable to allocate buffer of header.fullSize_=%ld. Exiting!\n"
+ , method_name, header.fullSize_ );
+ mon_log_write(MON_REQQUEUE_REVIVE_7, SQ_LOG_CRIT, buf);
+
+ // exit call below runs desctructors. Stop healthcheck thread so that its lock can be destructed.
+ HealthCheck.shutdownWork();
TRACE_EXIT;
- return;
+ // this will cause other monitors to disconnect from the new monitor.
+ mon_failure_exit();
}
unsigned long bufLen = header.fullSize_;
@@ -4095,6 +4133,9 @@
struct timespec startTime, snapShotTime, compressTime;
int error = 0;
+ static int sv_io_wait_timeout = EPOLL_IO_WAIT_TIMEOUT_MSEC;
+ static int sv_io_retry_count = EPOLL_IO_RETRY_COUNT;
+
if (trace_settings & (TRACE_REQUEST | TRACE_INIT | TRACE_RECOVERY))
trace_printf("%s@%d - Snapshot request\n", method_name, __LINE__);
@@ -4260,9 +4301,12 @@
, Monitor->getJoinComm());
break;
case CommType_Sockets:
- error = Monitor->SendSock( (char *)&header
+ error = Monitor->SendWait( Monitor->getJoinSock()
+ , (char *)&header
, sizeof(header)
- , Monitor->getJoinSock()
+ , sv_io_wait_timeout
+ , sv_io_retry_count
+ , (char *) "Remote joining node"
, method_name );
break;
default:
@@ -4307,9 +4351,12 @@
, Monitor->getJoinComm());
break;
case CommType_Sockets:
- error = Monitor->SendSock( (char *)&header
+ error = Monitor->SendWait( Monitor->getJoinSock()
+ , (char *)&header
, sizeof(header)
- , Monitor->getJoinSock()
+ , sv_io_wait_timeout
+ , sv_io_retry_count
+ , (char *) "Remote joining node"
, method_name );
break;
default:
@@ -4343,9 +4390,12 @@
, Monitor->getJoinComm());
break;
case CommType_Sockets:
- error = Monitor->SendSock( compBuf
+ error = Monitor->SendWait( Monitor->getJoinSock()
+ , compBuf
, header.compressedSize_
- , Monitor->getJoinSock()
+ , sv_io_wait_timeout
+ , sv_io_retry_count
+ , (char *) "Remote joining node"
, method_name );
break;
default:
diff --git a/core/sqf/monitor/linux/shell.cxx b/core/sqf/monitor/linux/shell.cxx
index 47149c1..418b61e 100644
--- a/core/sqf/monitor/linux/shell.cxx
+++ b/core/sqf/monitor/linux/shell.cxx
@@ -90,6 +90,7 @@
int NumDown = 0;
int PNodesConfigMax = 0;
int LNodesConfigMax = 0;
+int MpiRunPid = -1; // Process id of last mpirun process started on 'node up'
bool Debug = false;
int Measure = 0;
bool Attached = false;
@@ -514,13 +515,13 @@
rs = false;
}
- env = getenv("MON2MON_COMM_PORT");
+ env = getenv("MON_P2P_COMM_PORT");
if ( env )
{
val = atoi(env);
if ( val <= 0)
{
- sprintf( msgString, "[%s] Error: Name Server is enabled and MON2MON_COMM_PORT value is invalid (%s)! Set MON2MON_COMM_PORT environment variable and try again.", MyName, env );
+ sprintf( msgString, "[%s] Error: Name Server is enabled and MON_P2P_COMM_PORT value is invalid (%s)! Set MON2MON_COMM_PORT environment variable and try again.", MyName, env );
write_startup_log( msgString );
printf("%s\n", msgString );
rs = false;
@@ -528,7 +529,7 @@
}
else
{
- sprintf( msgString, "[%s] Error: Name Server is enabled and MON2MON_COMM_PORT is undefined! Set MON2MON_COMM_PORT environment variable and try again.", MyName );
+ sprintf( msgString, "[%s] Error: Name Server is enabled and MON_P2P_COMM_PORT is undefined! Set MON2MON_COMM_PORT environment variable and try again.", MyName );
write_startup_log( msgString );
printf("%s\n", msgString );
rs = false;
@@ -1289,6 +1290,8 @@
void recv_notice_msg(struct message_def *recv_msg, int )
{
+ const char method_name[] = "recv_notice_msg";
+
switch (recv_msg->type )
{
case MsgType_Change:
@@ -1398,6 +1401,19 @@
}
}
+ // If mpirun on a node up command still running, kill it!
+ if (MpiRunPid != -1)
+ {
+ if ( trace_settings & TRACE_SHELL_CMD )
+ {
+ trace_printf( "%s@%d [%s] Killing mpirun, MpiRunPid=%d\n"
+ , method_name, __LINE__, MyName
+ , MpiRunPid );
+ }
+ kill( MpiRunPid, SIGKILL );
+ MpiRunPid = -1;
+ }
+
break;
@@ -1435,6 +1451,19 @@
nodePendingComplete();
}
}
+
+ // If mpirun on a node up command still running, kill it!
+ if (MpiRunPid != -1)
+ {
+ if ( trace_settings & TRACE_SHELL_CMD )
+ {
+ trace_printf( "%s@%d [%s] Killing mpirun, MpiRunPid=%d\n"
+ , method_name, __LINE__, MyName
+ , MpiRunPid );
+ }
+ kill( MpiRunPid, SIGKILL );
+ MpiRunPid = -1;
+ }
break;
case MsgType_ProcessCreated:
@@ -4701,6 +4730,10 @@
}
}
+ if ( trace_settings & TRACE_SHELL_CMD )
+ trace_printf( "%s@%d [%s] Creating monitor process in node %s.\n ",
+ method_name, __LINE__, MyName, node_name );
+
// remove shared segment on the node
char cmd[256];
sprintf(cmd, "pdsh -w %s \"sqipcrm %s >> $TRAF_LOG/node_up_%s.log\"", node_name, node_name, node_name);
@@ -4751,9 +4784,10 @@
if ( !VirtualNodes )
{
if ( trace_settings & TRACE_SHELL_CMD )
- trace_printf( "%s@%d [%s] %s node up successful, rtn=%d\n ",
- method_name, __LINE__, MyName, node_name,
- msg->u.reply.u.generic.return_code );
+ {
+ trace_printf( "%s@%d [%s] Monitor in node %s created and merging to existing cluster.\n ",
+ method_name, __LINE__, MyName, node_name );
+ }
sprintf( msgString, "[%s] Node %s is merging to existing cluster.",
MyName, node_name);
@@ -4763,6 +4797,12 @@
if ( ! nowait )
{
+ if ( trace_settings & TRACE_SHELL_CMD )
+ {
+ trace_printf( "%s@%d [%s] Waiting for node %s up message\n ",
+ method_name, __LINE__, MyName, node_name );
+ }
+
struct sigaction int_act, old_act;
int_act.sa_sigaction = interrupt_handler;
sigemptyset(&int_act.sa_mask);
@@ -6131,12 +6171,19 @@
if (os_pid == -1)
{
if ( trace_settings & TRACE_SHELL_CMD )
- trace_printf ("%s@%d [%s] Monitor fork() failed, errno=%d\n",
- method_name, __LINE__, MyName, errno);
+ {
+ trace_printf( "%s@%d [%s] Monitor fork() failed, errno=%d\n"
+ , method_name, __LINE__, MyName, errno);
+ }
rc = MPI_ERR_SPAWN;
}
else
{
+ if ( trace_settings & TRACE_SHELL_CMD )
+ {
+ trace_printf( "%s@%d [%s] Monitor fork() success, os_pid=%d\n"
+ , method_name, __LINE__, MyName, os_pid);
+ }
rc = MPI_SUCCESS;
}
@@ -6171,6 +6218,16 @@
else if (child == 0)
{ // mpirun has not yet changed state, delay.
// do not wait for mpirun to complete when using mpich!
+ if ( reintegrate )
+ {
+ MpiRunPid = os_pid;
+ if ( trace_settings & TRACE_SHELL_CMD )
+ {
+ trace_printf( "%s@%d [%s] No mpirun state change, child=%d, MpiRunPid=%d, os_pid=%d\n"
+ , method_name, __LINE__, MyName
+ , child, MpiRunPid, os_pid);
+ }
+ }
done = true;
}
else
@@ -6179,13 +6236,16 @@
{
if (child == -1)
{
- trace_printf("[%s] waiting for mpirun: %s (%d)\n",
- MyName, strerror(errno), errno);
+ trace_printf( "%s@%d [%s] waiting for mpirun: %s (%d)\n"
+ , method_name, __LINE__, MyName
+ , strerror(errno), errno);
}
else
{
- trace_printf("[%s] waiting for mpirun pid=%d but got "
- "pid=%d\n", MyName, os_pid, child);
+ trace_printf( "%s@%d [%s] waiting for mpirun pid=%d but got "
+ "pid=%d\n"
+ , method_name, __LINE__, MyName
+ , os_pid, child);
}
}
done = true;
@@ -6572,7 +6632,7 @@
trace_printf("%s@%d [%s] dumped process successfully. "
"error=%s\n", method_name, __LINE__, MyName,
ErrorMsg(msg->u.reply.u.dump.return_code));
- printf("dump file created@ %s\n",
+ printf("dump file created: %s\n",
msg->u.reply.u.dump.core_file);
}
else
diff --git a/core/sqf/monitor/linux/watchdog.cxx b/core/sqf/monitor/linux/watchdog.cxx
index fedb591..763004b 100644
--- a/core/sqf/monitor/linux/watchdog.cxx
+++ b/core/sqf/monitor/linux/watchdog.cxx
@@ -30,6 +30,7 @@
#include <mpi.h>
#include <unistd.h>
#include <sys/time.h>
+#include <sys/resource.h>
#include "msgdef.h"
#include "props.h"
#include "localio.h"
@@ -67,6 +68,7 @@
Verifier_t MyVerifier = -1;
int Timeout = 0;
bool genSnmpTrapEnabled = false;
+bool GenCoreOnFailureExit = false;
class CWatchdog;
@@ -77,6 +79,31 @@
DEFINE_EXTERN_COMP_DOVERS(sqwatchdog)
DEFINE_EXTERN_COMP_PRINTVERS(sqwatchdog)
+void mon_failure_exit( bool genCoreOnFailureExit )
+{
+ const char method_name[] = "mon_failure_exit";
+
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf(buf, sizeof(buf), "[%s], Aborting! genCore=%d, GenCore=%d\n",
+ method_name, genCoreOnFailureExit, GenCoreOnFailureExit);
+ mon_log_write(MON_WATCHDOG_FAILURE_EXIT_1, SQ_LOG_CRIT, buf);
+
+ if (genCoreOnFailureExit || GenCoreOnFailureExit)
+ {
+ // Generate a core file, abort is intentional
+ abort();
+ }
+ else
+ {
+ // Don't generate a core file, abort is intentional
+ struct rlimit limit;
+ limit.rlim_cur = 0;
+ limit.rlim_max = 0;
+ setrlimit(RLIMIT_CORE, &limit);
+ abort();
+ }
+}
+
CWatchdog::CWatchdog( void )
:CLock()
,event_(Watchdog_Stop)
diff --git a/core/sqf/monitor/linux/zclient.cxx b/core/sqf/monitor/linux/zclient.cxx
index 38b629d..b7a34c2 100644
--- a/core/sqf/monitor/linux/zclient.cxx
+++ b/core/sqf/monitor/linux/zclient.cxx
@@ -1,4 +1,5 @@
-/**********************************************************************
+///////////////////////////////////////////////////////////////////////////////
+//
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
@@ -19,7 +20,8 @@
// under the License.
//
// @@@ END COPYRIGHT @@@
-********************************************************************/
+//
+///////////////////////////////////////////////////////////////////////////////
#include <unistd.h>
#include <stdlib.h>
#include <errno.h>
@@ -135,7 +137,7 @@
const char method_name[] = "ZClientThread";
TRACE_ENTRY;
- // Parameter passed to the thread is an instance of the CommAccept object
+ // Parameter passed to the thread is an instance of the CZClient object
CZClient *zooClient = (CZClient *) arg;
// Mask all allowed signals
@@ -698,6 +700,7 @@
const char method_name[] = "CZClient::ErrorZNodeCreate";
TRACE_ENTRY;
+ bool createZSequence = false;
int rc;
int zerr;
@@ -733,6 +736,10 @@
// Suppress error logging if error == ZNODEEXISTS
rc = ZNodeCreate( errorznode.c_str(), NULL, 0, true );
+ if ( rc == ZNODEEXISTS )
+ {
+ createZSequence = true;
+ }
errorpath.str( "" );
errorpath << errorZNodePath_.c_str() << "/"
@@ -749,6 +756,43 @@
// Suppress error logging if error == ZNODEEXISTS
rc = ZNodeCreate( errorznode.c_str(), NULL, 0, true );
+ if ( rc == ZNODEEXISTS )
+ {
+ createZSequence = true;
+ }
+
+ if ( createZSequence )
+ {
+ errorpath.str( "" );
+ errorpath << errorZNodePath_.c_str() << "/ZERR" ;
+ errorznode = errorpath.str( );
+
+ stringstream ss;
+ ss.str( "" );
+ ss << Node_name;
+ string zdata( ss.str( ) );
+
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d Error ZNodeCreate(%s:%s,ZOO_SEQUENCE)\n"
+ , method_name, __LINE__
+ , errorznode.c_str()
+ , zdata.c_str());
+ }
+
+ int rc1 = ZNodeCreate( errorznode.c_str(), zdata.c_str(), ZOO_SEQUENCE, true );
+ if ( rc1 != ZOK )
+ {
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s], ZNodeCreate(%s:%s,ZOO_SEQUENCE) failed with error %s\n"
+ , method_name
+ , errorznode.c_str()
+ , zdata.c_str()
+ , zerror(rc1) );
+ mon_log_write(MON_ZCLIENT_ERRORZNODECREATE_1, SQ_LOG_ERR, buf);
+ }
+ }
unlock();
@@ -756,13 +800,78 @@
return(rc);
}
+int CZClient::ErrorZNodeDelete( const char *errorNode )
+{
+ const char method_name[] = "CZClient::ErrorZNodeDelete";
+ TRACE_ENTRY;
+
+ int rc;
+ int zerr;
+ struct String_vector childnodes;
+
+ lock();
+
+ stringstream errorchildpath;
+ errorchildpath.str( "" );
+ errorchildpath << errorZNodePath_.c_str() << "/"
+ << errorNode << "/"
+ << Node_name;
+ string errorchildznode = errorchildpath.str( );
+
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d Error child ZNodeDelete(%s)\n"
+ , method_name, __LINE__
+ , errorchildznode.c_str() );
+ }
+
+ rc = ZNodeDelete( errorchildznode );
+
+ rc = ErrorZNodesGetChild( errorNode, &childnodes );
+ if ( rc != ZOK && rc != ZNONODE )
+ {
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s], ErrorZNodesGetChild(%s) failed!\n"
+ , method_name, errorNode );
+ mon_log_write(MON_ZCLIENT_ERRORZNODEDELETE_1, SQ_LOG_ERR, buf);
+ CLock::wakeOne();
+ return(rc);
+ }
+
+ if ( childnodes.count == 0 )
+ {
+ stringstream errorpath;
+ errorpath.str( "" );
+ errorpath << errorZNodePath_.c_str() << "/"
+ << errorNode;
+ string errorznode = errorpath.str( );
+
+ if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
+ {
+ trace_printf( "%s@%d Error ZNodeDelete(%s)\n"
+ , method_name, __LINE__
+ , errorznode.c_str() );
+ }
+
+ rc = ZNodeDelete( errorznode );
+ }
+
+ unlock();
+
+ FreeStringVector( &childnodes );
+
+ TRACE_EXIT;
+ return(rc);
+}
+
// The errorNode is the znode which contains more than one errorChildNodes
// and whose corresponding running znode is deleted to bring its node down
// (see CZClient::HandleErrorChildZNodes())
// The possibility exist that each errorChildNode is also an errorNode under
// errorZNodePath_ if the errorNode passed in could not communicate with
// one or more errorChildNodes.
-// Therefore, the each errorChildNode that is also an errorNode and it child
+// Therefore, the each errorChildNode that is also an errorNode and its child
// znode must be also be deleted.
// For example, if the error znode tree is as follows:
// o node-b is the errorNode
@@ -771,17 +880,17 @@
// /trafodion/1/cluster/error/node-b/node-c
// /trafodion/1/cluster/error/node-c/node-b
// o Therefore,
-// ErrorZNodeDelete( node-b, errorChildNodes-of-node-b )
+// ErrorZNodesDelete( node-b, errorChildNodes-of-node-b )
// Delete(/trafodion/1/cluster/error/node-a/node-b)
// Delete(/trafodion/1/cluster/error/node-a)
// Delete(/trafodion/1/cluster/error/node-c/node-b)
// Delete(/trafodion/1/cluster/error/node-c)
// Delete(/trafodion/1/cluster/error/node-b/node-a)
-// Delete(/trafodion/1/cluster/error/node-b/node-b)
+// Delete(/trafodion/1/cluster/error/node-b/node-c)
// Delete(/trafodion/1/cluster/error/node-b)
-int CZClient::ErrorZNodeDelete( const char *errorNode, String_vector *errorChildNodes )
+int CZClient::ErrorZNodesDelete( const char *errorNode, String_vector *errorChildNodes )
{
- const char method_name[] = "CZClient::ErrorZNodeDelete";
+ const char method_name[] = "CZClient::ErrorZNodesDelete";
TRACE_ENTRY;
int rc = -1;
@@ -805,10 +914,9 @@
{
for (int i = 0; i < errorChildNodes->count ; i++ )
{
- trace_printf( "%s@%d errorNode=%s, errorChildNodes.count=%d, errorChildNode[%d]=%s\n"
+ trace_printf( "%s@%d errorNode=%s, errorChildNode[%d]=%s\n"
, method_name, __LINE__
, errorNode
- , errorChildNodes->count
, i
, errorChildNodes->data[i] );
}
@@ -1019,10 +1127,10 @@
}
else if ( rc == ZOK )
{
- // Now get the list of available znodes in the cluster.
+ // Now get the list of error znodes.
//
// This will return child znodes for each monitor process that has
- // registered, including this process.
+ // registered an error with another monitor process.
rc = zoo_get_children( ZHandle, errorznodes.c_str( ), 0, nodes );
if ( rc == ZOK )
{
@@ -1823,7 +1931,7 @@
if ( childnodes.count > 1 )
{
- ErrorZNodeDelete( errorNode, &childnodes );
+ ErrorZNodesDelete( errorNode, &childnodes );
// Delete the corresponding running znode which will trigger node down
RunningZNodeDelete( errorNode );
}
@@ -2501,6 +2609,7 @@
TRACE_ENTRY;
int rc;
+ int zerr;
char pnidStr[10];
sprintf( pnidStr, "%d", MyPNID);
@@ -2529,6 +2638,17 @@
HandleErrorChildZNodes( Node_name );
unlock();
+ // Clean up my previous running znode, if any
+ rc = ZNodeDelete( monZnode );
+ if ( rc == ZOK )
+ {
+ char buf[MON_STRING_BUF_SIZE];
+ snprintf( buf, sizeof(buf)
+ , "[%s], My znode (%s) deleted!\n"
+ , method_name, Node_name );
+ mon_log_write(MON_ZCLIENT_MYRUNNINGZNODECREATE_1, SQ_LOG_INFO, buf);
+ }
+
rc = ZNodeCreate( monZnode.c_str(), monData.c_str(), ZOO_EPHEMERAL );
TRACE_EXIT;
@@ -2555,16 +2675,6 @@
, monZnode.c_str() );
}
- if (strcmp( Node_name, nodeName) == 0)
- {
- // Clean up my error znode and children
- HandleErrorChildZNodes( Node_name );
- // Clean up error znodes and where I am their 'only' child
- lock();
- HandleErrorChildZNodesForZNodeChild( Node_name, true );
- unlock();
- }
-
rc = ZNodeDelete( monZnode );
if ( rc == ZOK )
{
@@ -2575,6 +2685,16 @@
mon_log_write(MON_ZCLIENT_RUNZNODEWATCHDELETE_1, SQ_LOG_INFO, buf);
}
+ if (strcmp( Node_name, nodeName) == 0)
+ {
+ // Clean up my error znode and children
+ HandleErrorChildZNodes( Node_name );
+ // Clean up error znodes and where I am their 'only' child
+ lock();
+ HandleErrorChildZNodesForZNodeChild( Node_name, true );
+ unlock();
+ }
+
TRACE_EXIT;
return( rc );
}
@@ -3204,7 +3324,7 @@
if (trace_settings & (TRACE_INIT | TRACE_RECOVERY))
{
- trace_printf( "%s@%d zoo_create (%s : %s)\n"
+ trace_printf( "%s@%d zoo_create (%s:%s)\n"
, method_name, __LINE__
, zpath.c_str()
, zdata.c_str());
diff --git a/core/sqf/monitor/linux/zclient.h b/core/sqf/monitor/linux/zclient.h
index 46d0c45..36f3854 100644
--- a/core/sqf/monitor/linux/zclient.h
+++ b/core/sqf/monitor/linux/zclient.h
@@ -1,4 +1,5 @@
-/*********************************************************************
+///////////////////////////////////////////////////////////////////////////////
+//
// @@@ START COPYRIGHT @@@
//
// Licensed to the Apache Software Foundation (ASF) under one
@@ -19,7 +20,8 @@
// under the License.
//
// @@@ END COPYRIGHT @@@
-********************************************************************/
+//
+///////////////////////////////////////////////////////////////////////////////
//
// Zookeeper Client (CZClient class)
//
@@ -208,6 +210,7 @@
void ConfiguredZNodesDelete( void );
int ConfiguredZNodesGet( String_vector *children );
int ErrorZNodeCreate( const char *errorNode );
+ int ErrorZNodeDelete( const char *errorNode );
int ErrorZNodeWatchAdd( void );
int ErrorZNodeWatchDelete( void );
void ErrorZNodesDelete( void );
@@ -237,7 +240,7 @@
void ClusterMonitoringStop( void );
void ConfiguredZNodesWatchSet( void );
void EnabledSet( bool enabled ) { CAutoLock lock(getLocker()); enabled_ = enabled; }
- int ErrorZNodeDelete( const char *errorNode, String_vector *errorChildNodes );
+ int ErrorZNodesDelete( const char *errorNode, String_vector *errorChildNodes );
int ErrorChildZNodeDelete( const char *errorNode
, const char *errorChildNode
, String_vector *errorChildNodes );
diff --git a/core/sqf/sqenvcom.sh b/core/sqf/sqenvcom.sh
index fcfc5b9..ee65050 100644
--- a/core/sqf/sqenvcom.sh
+++ b/core/sqf/sqenvcom.sh
@@ -967,7 +967,7 @@
export NS_COMM_PORT=${NS_COMM_PORT:-23370}
# export NS_SYNC_PORT=${NS_SYNC_PORT:-23360}
export NS_M2N_COMM_PORT=${NS_M2N_COMM_PORT:-23350}
- export MON2MON_COMM_PORT=${MON2MON_COMM_PORT:-23340}
+ export MON_P2P_COMM_PORT=${MON_P2P_COMM_PORT:-23340}
fi
# Alternative logging capability in monitor
@@ -995,9 +995,9 @@
export SQ_MON_KEEPCNT=5
# Monitor sync thread epoll wait timeout is in seconds
-# Currently set to 64 seconds (16 second timeout, 4 retries)
-export SQ_MON_EPOLL_WAIT_TIMEOUT=${SQ_MON_EPOLL_WAIT_TIMEOUT:-16}
-export SQ_MON_EPOLL_RETRY_COUNT=${SQ_MON_EPOLL_RETRY_COUNT:-4}
+# Currently set to 64 seconds (1 second timeout, 64 retries)
+export SQ_MON_EPOLL_WAIT_TIMEOUT=${SQ_MON_EPOLL_WAIT_TIMEOUT:-1}
+export SQ_MON_EPOLL_RETRY_COUNT=${SQ_MON_EPOLL_RETRY_COUNT:-64}
# Monitor Zookeeper client
# - A zero value disables the zclient logic in the monitor process.
@@ -1022,6 +1022,7 @@
# increase SQ_MON,ZCLIENT,WDT timeout only to jenkins env.
if [[ "$TRAF_HOME" == *"/home/jenkins"* ]]; then
export SQ_MON_EPOLL_WAIT_TIMEOUT=20
+export SQ_MON_EPOLL_RETRY_COUNT=4
export SQ_MON_ZCLIENT_SESSION_TIMEOUT=360
export SQ_WDT_KEEPALIVETIMERVALUE=360
fi
diff --git a/dcs/pom.xml b/dcs/pom.xml
index 206e424..f2f2585 100644
--- a/dcs/pom.xml
+++ b/dcs/pom.xml
@@ -673,6 +673,9 @@
<version>${jdbct2.version}</version>
</dependency>
</dependencies>
+ <properties>
+ <additionalparam>-Xdoclint:none</additionalparam>
+ </properties>
</profile>
<profile>
<id>runSmallTests</id>