| /////////////////////////////////////////////////////////////////////////////// |
| // |
| // @@@ START COPYRIGHT @@@ |
| // |
| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| // |
| // @@@ END COPYRIGHT @@@ |
| // |
| /////////////////////////////////////////////////////////////////////////////// |
| #include <unistd.h> |
| #include <stdlib.h> |
| #include <errno.h> |
| #include <sys/ioctl.h> |
| #include <sys/time.h> |
| #include <sys/resource.h> |
| #include <signal.h> |
| #include <ctype.h> |
| #include <string.h> |
| #include <ifaddrs.h> |
| #include <netdb.h> |
| #include <new> |
| #include <stdio.h> |
| #include <list> |
| #include <string> |
| |
| #include "lock.h" |
| #include "msgdef.h" |
| #include "montrace.h" |
| #include "monlogging.h" |
| #include "reqqueue.h" |
| #include "type2str.h" |
| #include "zclient.h" |
| #include "pnode.h" |
| |
| // |
| // The following specify the default values for the timers if the |
| // zclient cluster timer related environment variables are not defined. |
| // |
| // - ZCLIENT_MY_ZNODE_CHECKRATE is the rate the local monitor's znode is checked |
| #define ZCLIENT_MY_ZNODE_CHECKRATE 5 // seconds |
| #define ZCLIENT_SESSION_TIMEOUT 60 // seconds (1 minute) |
| |
| // zookeeper connection retries |
| #define ZOOKEEPER_CHILD_RETRY_COUNT 5 |
| #define ZOOKEEPER_RETRY_COUNT 3 |
| #define ZOOKEEPER_RETRY_WAIT 1 // seconds |
| |
| using namespace std; |
| |
| extern char Node_name[MAX_PROCESSOR_NAME]; |
| extern int MyPNID; |
| extern int MyNid; |
| extern int MyPid; |
| |
| extern CReqQueue ReqQueue; |
| extern CZClient *ZClient; |
| extern CMonLog *MonLog; |
| extern CNodeContainer *Nodes; |
| extern CNode *MyNode; |
| extern bool debugFlag; |
| extern bool IsAgentMode; |
| extern bool IsMaster; |
| |
| static zhandle_t *ZHandle; |
| static clientid_t MyZooId; |
| |
| void ZSessionWatcher( zhandle_t *zzh |
| , int type |
| , int state |
| , const char *path |
| , void *watcherCtx); |
| |
| void FreeStringVector( struct String_vector *v ) |
| { |
| if ( v->data ) |
| { |
| for ( int32_t i=0; i < v->count; i++ ) |
| { |
| free( v->data[i] ); |
| } |
| free( v->data ); |
| v->data = NULL; |
| v->count = 0; |
| } |
| } |
| |
| static const char *ZClientStateStr( CZClient::ZClientState_t state ) |
| { |
| switch (state) |
| { |
| case CZClient::ZC_DISABLED: |
| return "ZC_DISABLED"; |
| case CZClient::ZC_START: |
| return "ZC_START"; |
| case CZClient::ZC_WATCH: |
| return "ZC_WATCH"; |
| case CZClient::ZC_CLUSTER: |
| return "ZC_CLUSTER"; |
| case CZClient::ZC_ZNODE_CREATED: |
| return "ZC_ZNODE_CREATED"; |
| case CZClient::ZC_ZNODE_CHANGED: |
| return "ZC_ZNODE_CHANGED"; |
| case CZClient::ZC_ZNODE_CHILD: |
| return "ZC_ZNODE_CHILD"; |
| case CZClient::ZC_ZNODE_DELETED: |
| return "ZC_ZNODE_DELETED"; |
| case CZClient::ZC_MYZNODE: |
| return "ZC_MYZNODE"; |
| case CZClient::ZC_STOP: |
| return "ZC_STOP"; |
| case CZClient::ZC_SHUTDOWN: |
| return "ZC_SHUTDOWN"; |
| default: |
| break; |
| } |
| return "ZClient State Invalid"; |
| } |
| |
| // ZClientThread main |
| static void *ZClientThread(void *arg) |
| { |
| const char method_name[] = "ZClientThread"; |
| TRACE_ENTRY; |
| |
| // Parameter passed to the thread is an instance of the CZClient object |
| CZClient *zooClient = (CZClient *) arg; |
| |
| // Mask all allowed signals |
| sigset_t mask; |
| sigfillset(&mask); |
| sigdelset(&mask, SIGPROF); // allows profiling such as google profiler |
| int rc = pthread_sigmask(SIG_SETMASK, &mask, NULL); |
| if (rc != 0) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), "[%s], pthread_sigmask error=%d\n", |
| method_name, rc); |
| mon_log_write(MON_ZCLIENT_ZCLIENTTHREAD_1, SQ_LOG_ERR, buf); |
| } |
| |
| // Enter thread processing loop |
| zooClient->MonitorCluster(); |
| |
| TRACE_EXIT; |
| return NULL; |
| } |
| |
| |
| void ZSessionWatcher( zhandle_t *zzh |
| , int type |
| , int state |
| , const char *path |
| , void *watcherCtx) |
| { |
| const char method_name[] = "ZSessionWatcher"; |
| TRACE_ENTRY; |
| |
| watcherCtx = watcherCtx; // Make compiler happy! |
| |
| /* |
| * Be careful using ZHandle here rather than zzh - as this may be mt code |
| * the client lib may call the watcher before zookeeper_init returns |
| */ |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| if ( path && strlen( path ) > 0 ) |
| { |
| trace_printf( "%s@%d" " - Watcher %s state = %s for path %s\n" |
| , method_name, __LINE__ |
| , ZooConnectionTypeStr( type ) |
| , ZooConnectionStateStr( state ) |
| , path ); |
| } |
| else |
| { |
| trace_printf( "%s@%d" " - Watcher %s state = %s\n" |
| , method_name, __LINE__ |
| , ZooConnectionTypeStr( type ) |
| , ZooConnectionStateStr( state ) ); |
| } |
| } |
| |
| if ( type == ZOO_SESSION_EVENT ) |
| { |
| if ( state == ZOO_CONNECTED_STATE ) |
| { |
| const clientid_t *id = zoo_client_id( zzh ); |
| if ( MyZooId.client_id == 0 || MyZooId.client_id != id->client_id ) |
| { |
| MyZooId = *id; |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d" " - Got a new session id: 0x%llx\n" |
| , method_name, __LINE__ |
| , static_cast<long long unsigned int>(MyZooId.client_id) ); |
| } |
| } |
| } |
| else if ( state == ZOO_AUTH_FAILED_STATE ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], Error Zookeeper authentication failure. Node going down (terminating!) ...\n" |
| , method_name ); |
| mon_log_write(MON_ZCLIENT_ZSESSIONWATCHER_1, SQ_LOG_CRIT, buf); |
| |
| mon_failure_exit(); |
| } |
| else if ( state == ZOO_EXPIRED_SESSION_STATE ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], Error Zookeeper session expired. Node going down (terminating!) ...\n" |
| , method_name ); |
| mon_log_write(MON_ZCLIENT_ZSESSIONWATCHER_2, SQ_LOG_CRIT, buf); |
| |
| mon_failure_exit(); |
| } |
| } |
| else |
| { |
| ZClient->TriggerCheck( type, path ); |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| CZClient::CZClient( const char *quorumHosts |
| , const char *rootNode |
| , const char *instanceNode ) |
| :threadId_(0) |
| ,state_(ZC_DISABLED) |
| ,enabled_(false) |
| ,clusterWatchEnabled_(false) |
| ,resetMyZNodeFailedTime_(true) |
| ,shutdown_(false) |
| ,zcMonitoringRate_(ZCLIENT_MY_ZNODE_CHECKRATE) // seconds |
| ,zkQuorumHosts_(quorumHosts) |
| ,zkRootNode_(rootNode) |
| ,zkRootNodeInstance_(instanceNode) |
| ,zkQuorumPort_("") |
| ,zkSessionTimeout_(ZCLIENT_SESSION_TIMEOUT) // seconds |
| { |
| const char method_name[] = "CZClient::CZClient"; |
| TRACE_ENTRY; |
| |
| memcpy(&eyecatcher_, "ZCLT", 4); |
| |
| char *zcMonitoringRateValueC; |
| int zcMonitoringRateValue; |
| if ( (zcMonitoringRateValueC = getenv( "SQ_MON_ZCLIENT_MY_ZNODE_CHECKRATE" )) ) |
| { |
| // in seconds |
| zcMonitoringRateValue = atoi( zcMonitoringRateValueC ); |
| zcMonitoringRate_ = zcMonitoringRateValue; // in seconds |
| } |
| |
| char *zkSessionTimeoutC; |
| int zkSessionTimeoutValue; |
| if ( (zkSessionTimeoutC = getenv( "SQ_MON_ZCLIENT_SESSION_TIMEOUT" )) ) |
| { |
| // in seconds |
| zkSessionTimeoutValue = atoi( zkSessionTimeoutC ); |
| zkSessionTimeout_ = zkSessionTimeoutValue; // in seconds |
| } |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d" " - ZClient monitoring rate in seconds=%ld\n" |
| , method_name, __LINE__, zcMonitoringRate_ ); |
| trace_printf( "%s@%d" " - ZClient session timeout in seconds =%d\n" |
| , method_name, __LINE__, zkSessionTimeout_ ); |
| } |
| |
| if ( zkQuorumHosts_.length() == 0 ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], Zookeeper quorum port address not initialized\n" |
| , method_name); |
| mon_log_write(MON_ZCLIENT_ZCLIENT_1, SQ_LOG_ERR, buf); |
| |
| mon_failure_exit(); |
| } |
| else |
| { |
| zkQuorumPort_ << zkQuorumHosts_.c_str(); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d zkQuorumPort is: %s\n" |
| , method_name, __LINE__ |
| , zkQuorumPort_.str( ).c_str( )); |
| } |
| } |
| |
| // Initialize zookeeper |
| zoo_deterministic_conn_order( 0 ); // non-deterministic order for client connections |
| ZHandle = zookeeper_init( zkQuorumPort_.str( ).c_str( ) |
| , ZSessionWatcher |
| , zkSessionTimeout_ * 1000 |
| , &MyZooId |
| , 0 |
| , 0 ); |
| if ( ZHandle == 0 ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], zookeeper_init() failed for host:port %s\n" |
| , method_name, zkQuorumPort_.str( ).c_str( )); |
| mon_log_write(MON_ZCLIENT_ZCLIENT_2, SQ_LOG_ERR, buf); |
| |
| mon_failure_exit(); |
| } |
| |
| int rc = InitializeZClient(); |
| if ( rc ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], Failed ZClient initialization (%s)\n" |
| , method_name, zerror(rc) ); |
| mon_log_write(MON_ZCLIENT_ZCLIENT_3, SQ_LOG_ERR, buf); |
| |
| mon_failure_exit(); |
| } |
| |
| ConfiguredZNodesDelete(); |
| ErrorZNodesDelete(); |
| |
| TRACE_EXIT; |
| } |
| |
| CZClient::~CZClient( void ) |
| { |
| const char method_name[] = "CZClient::~CZClient"; |
| TRACE_ENTRY; |
| |
| memcpy(&eyecatcher_, "zclt", 4); |
| |
| if (ZHandle) |
| { |
| ConfiguredZNodesDelete(); |
| ErrorZNodesDelete(); |
| RunningZNodeDelete( Node_name ); |
| zookeeper_close(ZHandle); |
| ZHandle = 0; |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| void CZClient::ClusterMonitoringStart( void ) |
| { |
| const char method_name[] = "CZClient::ClusterMonitoringStart"; |
| TRACE_ENTRY; |
| |
| if ( !IsEnabled() ) |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d Cluster monitoring started!\n\n", method_name, __LINE__ ); |
| } |
| EnabledSet( true ); |
| StateSet( ZC_WATCH ); |
| CLock::wakeOne(); |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| void CZClient::ClusterMonitoringStop( void ) |
| { |
| const char method_name[] = "CZClient::ClusterMonitoringStop"; |
| TRACE_ENTRY; |
| |
| if ( IsEnabled() ) |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "\n%s@%d Cluster monitoring stopped!\n", method_name, __LINE__ ); |
| } |
| ClusterWatchEnabledSet( false ); |
| EnabledSet( false ); |
| StateSet( ZC_DISABLED ); |
| CLock::wakeOne(); |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| int CZClient::ConfiguredZNodeCreate( const char *nodeName ) |
| { |
| const char method_name[] = "CZClient::ConfiguredZNodeCreate"; |
| TRACE_ENTRY; |
| |
| int rc; |
| |
| lock(); |
| |
| stringstream newpath; |
| newpath.str( "" ); |
| newpath << configuredZNodePath_.c_str() << "/" |
| << nodeName; |
| string configZnode = newpath.str( ); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d ZNodeCreate(%s)\n" |
| , method_name, __LINE__ |
| , configZnode.c_str() ); |
| } |
| |
| // Suppress error logging if error == ZNODEEXISTS |
| rc = ZNodeCreate( configZnode.c_str(), NULL, 0, true ); |
| |
| unlock(); |
| |
| TRACE_EXIT; |
| return(rc); |
| } |
| |
| int CZClient::ConfiguredZNodeDelete( const char *nodeName ) |
| { |
| const char method_name[] = "CZClient::ConfiguredZNodeDelete"; |
| TRACE_ENTRY; |
| |
| int rc; |
| |
| lock(); |
| |
| stringstream newpath; |
| newpath.str( "" ); |
| newpath << configuredZNodePath_.c_str() << "/" |
| << nodeName; |
| string configZnode = newpath.str( ); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d ZNodeCreate(%s)\n" |
| , method_name, __LINE__ |
| , configZnode.c_str() ); |
| } |
| |
| rc = ZNodeDelete( configZnode ); |
| |
| unlock(); |
| |
| TRACE_EXIT; |
| return(rc); |
| } |
| |
| void CZClient::ConfiguredZNodesDelete( void ) |
| { |
| const char method_name[] = "CZClient::ConfiguredZNodesDelete"; |
| TRACE_ENTRY; |
| |
| int rc = -1; |
| struct String_vector nodes; |
| |
| rc = ConfiguredZNodesGet( &nodes ); |
| if ( rc != ZOK && rc != ZNONODE ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], ConfiguredZNodesGet() failed!\n" |
| , method_name ); |
| mon_log_write(MON_ZCLIENT_CONFIGZNODESDELETE_1, SQ_LOG_ERR, buf); |
| CLock::wakeOne(); |
| return; |
| } |
| |
| stringstream newpath; |
| string configznode; |
| |
| if ( nodes.count > 0 ) |
| { |
| for (int i = 0; i < nodes.count ; i++ ) |
| { |
| newpath.str( "" ); |
| newpath << configuredZNodePath_.c_str() << "/" |
| << nodes.data[i]; |
| configznode = newpath.str( ); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d Deleting configznode=%s\n" |
| , method_name, __LINE__ |
| , configznode.c_str() ); |
| } |
| |
| ZNodeDelete( configznode ); |
| } |
| FreeStringVector( &nodes ); |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| int CZClient::ConfiguredZNodesGet( String_vector *nodes ) |
| { |
| const char method_name[] = "CZClient::ConfiguredZNodesGet"; |
| TRACE_ENTRY; |
| |
| bool found = false; |
| int rc = -1; |
| int retries = 0; |
| Stat stat; |
| |
| string configznodes( configuredZNodePath_.c_str() ); |
| |
| nodes->count = 0; |
| nodes->data = NULL; |
| |
| while ( !found ) |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d configznodes=%s\n" |
| , method_name, __LINE__, configznodes.c_str() ); |
| } |
| // Verify the existence of the parent |
| rc = ZooExistRetry( ZHandle, configznodes.c_str( ), 0, &stat ); |
| if ( rc == ZNONODE ) |
| { |
| if (retries > 10) |
| break; |
| retries++; |
| continue; |
| } |
| else if ( rc == ZOK ) |
| { |
| // Now get the list of available znodes in the cluster. |
| // |
| // This will return child znodes for each monitor process that has |
| // registered, including this process. |
| rc = zoo_get_children( ZHandle, configznodes.c_str( ), 0, nodes ); |
| if ( rc == ZOK ) |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d nodes.count=%d\n" |
| , method_name, __LINE__ |
| , nodes->count ); |
| } |
| found = true; |
| } |
| else |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], zoo_get_children(%s) failed with error %s\n" |
| , method_name, configznodes.c_str( ), zerror(rc)); |
| mon_log_write(MON_ZCLIENT_CONFIGZNODESGET_1, SQ_LOG_ERR, buf); |
| break; |
| } |
| } |
| else // error |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], zoo_exists(%s) failed with error %s\n" |
| , method_name, configznodes.c_str( ), zerror(rc)); |
| mon_log_write(MON_ZCLIENT_CONFIGZNODESGET_2, SQ_LOG_ERR, buf); |
| break; |
| } |
| } |
| |
| TRACE_EXIT; |
| return( rc ); |
| } |
| |
| void CZClient::ConfiguredZNodesWatchSet( void ) |
| { |
| const char method_name[] = "CZClient::ConfiguredZNodesWatchSet"; |
| TRACE_ENTRY; |
| |
| int rc; |
| |
| stringstream configpath; |
| string confignode; |
| |
| configpath.str( "" ); |
| configpath << configuredZNodePath_.c_str(); |
| confignode = configpath.str( ); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d Setting watch set on confignode=%s\n" |
| , method_name, __LINE__ |
| , confignode.c_str() ); |
| } |
| |
| rc = ZNodeWatchChildSet( confignode ); |
| if ( rc != ZOK ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], ZNodeWatchChildSet(%s) failed!\n" |
| , confignode.c_str() |
| , method_name ); |
| mon_log_write(MON_ZCLIENT_CONFIGZNODESWATCHSET_1, SQ_LOG_ERR, buf); |
| |
| TRACE_EXIT; |
| return; |
| } |
| else |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d Watch set on confignode=%s\n" |
| , method_name, __LINE__ |
| , confignode.c_str() ); |
| } |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| int CZClient::ConfiguredZNodeWatchAdd( void ) |
| { |
| const char method_name[] = "CZClient::ConfiguredZNodeWatchAdd"; |
| TRACE_ENTRY; |
| |
| int rc; |
| string configznode = configuredZNodePath_.c_str(); |
| |
| lock(); |
| rc = ZNodeWatchSet( configznode ); |
| unlock(); |
| if ( rc != ZOK ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], ZNodeWatchSet(%s) failed!\n" |
| , method_name |
| , configznode.c_str() ); |
| mon_log_write(MON_ZCLIENT_CONFZNODEWATCHADD_1, SQ_LOG_ERR, buf); |
| } |
| else |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d Watch set on configznode=%s\n" |
| , method_name, __LINE__ |
| , configznode.c_str() ); |
| } |
| } |
| |
| TRACE_EXIT; |
| return(rc); |
| } |
| |
| int CZClient::ConfiguredZNodeWatchDelete( void ) |
| { |
| const char method_name[] = "CZClient::ConfiguredZNodeWatchDelete"; |
| TRACE_ENTRY; |
| |
| int rc = -1; |
| |
| string configznode = configuredZNodePath_.c_str(); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d Deleting configznode(%s)\n" |
| , method_name, __LINE__ |
| , configznode.c_str() ); |
| } |
| rc = ZNodeWatchReset( configznode ); |
| if ( rc == ZOK ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], configznode (%s) deleted!\n" |
| , method_name, configznode.c_str() ); |
| mon_log_write(MON_ZCLIENT_CONFZNODEWATCHDELETE_1, SQ_LOG_INFO, buf); |
| } |
| |
| TRACE_EXIT; |
| return( rc ); |
| } |
| |
| int CZClient::ErrorZNodeCreate( const char *errorNode ) |
| { |
| const char method_name[] = "CZClient::ErrorZNodeCreate"; |
| TRACE_ENTRY; |
| |
| bool createZSequence = false; |
| int rc; |
| int zerr; |
| |
| lock(); |
| if ( IsRunningZNodeExpired( errorNode, zerr ) ) |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d Running znode %s already expired (%s)\n" |
| , method_name, __LINE__ |
| , errorNode |
| , zerror(zerr) ); |
| } |
| unlock(); |
| return(ZOK); |
| } |
| unlock(); |
| pthread_yield(); |
| lock(); |
| |
| stringstream errorpath; |
| errorpath.str( "" ); |
| errorpath << errorZNodePath_.c_str() << "/" |
| << errorNode; |
| string errorznode = errorpath.str( ); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d Error ZNodeCreate(%s)\n" |
| , method_name, __LINE__ |
| , errorznode.c_str() ); |
| } |
| |
| // Suppress error logging if error == ZNODEEXISTS |
| rc = ZNodeCreate( errorznode.c_str(), NULL, 0, true ); |
| if ( rc == ZNODEEXISTS ) |
| { |
| createZSequence = true; |
| } |
| |
| errorpath.str( "" ); |
| errorpath << errorZNodePath_.c_str() << "/" |
| << errorNode << "/" |
| << Node_name; |
| errorznode = errorpath.str( ); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d Error child ZNodeCreate(%s)\n" |
| , method_name, __LINE__ |
| , errorznode.c_str() ); |
| } |
| |
| // Suppress error logging if error == ZNODEEXISTS |
| rc = ZNodeCreate( errorznode.c_str(), NULL, 0, true ); |
| if ( rc == ZNODEEXISTS ) |
| { |
| createZSequence = true; |
| } |
| |
| if ( createZSequence ) |
| { |
| errorpath.str( "" ); |
| errorpath << errorZNodePath_.c_str() << "/ZERR" ; |
| errorznode = errorpath.str( ); |
| |
| stringstream ss; |
| ss.str( "" ); |
| ss << Node_name; |
| string zdata( ss.str( ) ); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d Error ZNodeCreate(%s:%s,ZOO_SEQUENCE)\n" |
| , method_name, __LINE__ |
| , errorznode.c_str() |
| , zdata.c_str()); |
| } |
| |
| int rc1 = ZNodeCreate( errorznode.c_str(), zdata.c_str(), ZOO_SEQUENCE, true ); |
| if ( rc1 != ZOK ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], ZNodeCreate(%s:%s,ZOO_SEQUENCE) failed with error %s\n" |
| , method_name |
| , errorznode.c_str() |
| , zdata.c_str() |
| , zerror(rc1) ); |
| mon_log_write(MON_ZCLIENT_ERRORZNODECREATE_1, SQ_LOG_ERR, buf); |
| } |
| } |
| |
| unlock(); |
| |
| TRACE_EXIT; |
| return(rc); |
| } |
| |
| int CZClient::ErrorZNodeDelete( const char *errorNode ) |
| { |
| const char method_name[] = "CZClient::ErrorZNodeDelete"; |
| TRACE_ENTRY; |
| |
| int rc; |
| int zerr; |
| struct String_vector childnodes; |
| |
| lock(); |
| |
| stringstream errorchildpath; |
| errorchildpath.str( "" ); |
| errorchildpath << errorZNodePath_.c_str() << "/" |
| << errorNode << "/" |
| << Node_name; |
| string errorchildznode = errorchildpath.str( ); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d Error child ZNodeDelete(%s)\n" |
| , method_name, __LINE__ |
| , errorchildznode.c_str() ); |
| } |
| |
| rc = ZNodeDelete( errorchildznode ); |
| |
| rc = ErrorZNodesGetChild( errorNode, &childnodes ); |
| if ( rc != ZOK && rc != ZNONODE ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], ErrorZNodesGetChild(%s) failed!\n" |
| , method_name, errorNode ); |
| mon_log_write(MON_ZCLIENT_ERRORZNODEDELETE_1, SQ_LOG_ERR, buf); |
| CLock::wakeOne(); |
| return(rc); |
| } |
| |
| if ( childnodes.count == 0 ) |
| { |
| stringstream errorpath; |
| errorpath.str( "" ); |
| errorpath << errorZNodePath_.c_str() << "/" |
| << errorNode; |
| string errorznode = errorpath.str( ); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d Error ZNodeDelete(%s)\n" |
| , method_name, __LINE__ |
| , errorznode.c_str() ); |
| } |
| |
| rc = ZNodeDelete( errorznode ); |
| } |
| |
| unlock(); |
| |
| FreeStringVector( &childnodes ); |
| |
| TRACE_EXIT; |
| return(rc); |
| } |
| |
| // The errorNode is the znode which contains more than one errorChildNodes |
| // and whose corresponding running znode is deleted to bring its node down |
| // (see CZClient::HandleErrorChildZNodes()) |
| // The possibility exist that each errorChildNode is also an errorNode under |
| // errorZNodePath_ if the errorNode passed in could not communicate with |
| // one or more errorChildNodes. |
| // Therefore, the each errorChildNode that is also an errorNode and its child |
| // znode must be also be deleted. |
| // For example, if the error znode tree is as follows: |
| // o node-b is the errorNode |
| // /trafodion/1/cluster/error/node-a/node-b |
| // /trafodion/1/cluster/error/node-b/node-a |
| // /trafodion/1/cluster/error/node-b/node-c |
| // /trafodion/1/cluster/error/node-c/node-b |
| // o Therefore, |
| // ErrorZNodesDelete( node-b, errorChildNodes-of-node-b ) |
| // Delete(/trafodion/1/cluster/error/node-a/node-b) |
| // Delete(/trafodion/1/cluster/error/node-a) |
| // Delete(/trafodion/1/cluster/error/node-c/node-b) |
| // Delete(/trafodion/1/cluster/error/node-c) |
| // Delete(/trafodion/1/cluster/error/node-b/node-a) |
| // Delete(/trafodion/1/cluster/error/node-b/node-c) |
| // Delete(/trafodion/1/cluster/error/node-b) |
| int CZClient::ErrorZNodesDelete( const char *errorNode, String_vector *errorChildNodes ) |
| { |
| const char method_name[] = "CZClient::ErrorZNodesDelete"; |
| TRACE_ENTRY; |
| |
| int rc = -1; |
| struct String_vector childnodes; |
| |
| lock(); |
| |
| stringstream errorpath; |
| stringstream childpath; |
| string errorznode; |
| string childznode; |
| |
| errorpath.str( "" ); |
| errorpath << errorZNodePath_.c_str() << "/" |
| << errorNode; |
| errorznode = errorpath.str( ); |
| |
| retry: |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| for (int i = 0; i < errorChildNodes->count ; i++ ) |
| { |
| trace_printf( "%s@%d errorNode=%s, errorChildNode[%d]=%s\n" |
| , method_name, __LINE__ |
| , errorNode |
| , i |
| , errorChildNodes->data[i] ); |
| } |
| trace_printf( "%s@%d Processing delete of errorznode=%s\n" |
| , method_name, __LINE__ |
| , errorznode.c_str() ); |
| } |
| |
| if ( errorChildNodes->count > 0 ) |
| { |
| for (int j = 0; j < errorChildNodes->count ; j++ ) |
| { |
| rc = ErrorZNodesGetChild( errorChildNodes->data[j], &childnodes ); |
| if (rc == ZOK) |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d errorNode=%s, errorChildNode=%s, childnodes.count=%d\n" |
| , method_name, __LINE__ |
| , errorNode |
| , errorChildNodes->data[j] |
| , childnodes.count ); |
| } |
| |
| if (strcmp( errorChildNodes->data[j], errorNode) == 0) |
| { |
| FreeStringVector( &childnodes ); |
| continue; |
| } |
| |
| if (childnodes.count == 1 ) |
| { |
| ErrorChildZNodeDelete( errorNode, errorChildNodes->data[j], &childnodes ); |
| } |
| FreeStringVector( &childnodes ); |
| } |
| |
| childpath.str( "" ); |
| childpath << errorZNodePath_ << "/" |
| << errorNode << "/" |
| << errorChildNodes->data[j]; |
| childznode = childpath.str( ); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d Deleting childznode=%s\n" |
| , method_name, __LINE__ |
| , childznode.c_str() ); |
| } |
| |
| ZNodeDelete( childznode ); |
| } |
| } |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d Deleting errorznode=%s\n" |
| , method_name, __LINE__ |
| , errorznode.c_str() ); |
| } |
| |
| rc = ZNodeDelete( errorznode ); |
| if (rc == ZNOTEMPTY) |
| { |
| FreeStringVector( errorChildNodes ); |
| rc = ErrorZNodesGetChild( errorNode, errorChildNodes ); |
| if ( rc != ZOK && rc != ZNONODE) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], ErrorZNodesGetChild() failed!\n" |
| , method_name ); |
| mon_log_write(MON_ZCLIENT_HNDLEERRORCHILDZNODES_1, SQ_LOG_ERR, buf); |
| CLock::wakeOne(); |
| return(rc); |
| } |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d Retry deleting errorznode=%s\n" |
| , method_name, __LINE__ |
| , errorznode.c_str() ); |
| } |
| |
| goto retry; |
| } |
| |
| unlock(); |
| |
| TRACE_EXIT; |
| return(rc); |
| } |
| |
| int CZClient::ErrorChildZNodeDelete( const char *errorNode |
| , const char *errorChildNode |
| , String_vector *errorChildNodes ) |
| { |
| const char method_name[] = "CZClient::ErrorChildZNodeDelete"; |
| TRACE_ENTRY; |
| |
| int rc1 = -1; |
| int rc2 = -1; |
| stringstream errorpath; |
| stringstream childpath; |
| string errorznode; |
| string errorchildznode; |
| string childznode; |
| |
| errorpath.str( "" ); |
| errorpath << errorZNodePath_.c_str() << "/" |
| << errorNode << "/" |
| << errorChildNode; |
| errorchildznode = errorpath.str( ); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| for (int i = 0; i < errorChildNodes->count ; i++ ) |
| { |
| trace_printf( "%s@%d errorNode=%s, errorChildNode=%s, errorChildNodes.count=%d, errorChildNode[%d]=%s\n" |
| , method_name, __LINE__ |
| , errorNode |
| , errorChildNode |
| , errorChildNodes->count |
| , i |
| , errorChildNodes->data[i] ); |
| } |
| trace_printf( "%s@%d Processing delete of errorchildznode=%s\n" |
| , method_name, __LINE__ |
| , errorchildznode.c_str() ); |
| } |
| |
| if ( errorChildNodes->count > 0 ) |
| { |
| for (int j = 0; j < errorChildNodes->count ; j++ ) |
| { |
| if (strcmp( errorChildNodes->data[j], errorNode) == 0) |
| { |
| childpath.str( "" ); |
| childpath << errorZNodePath_ << "/" |
| << errorChildNode << "/" |
| << errorNode; |
| childznode = childpath.str( ); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d Deleting childznode=%s\n" |
| , method_name, __LINE__ |
| , childznode.c_str() ); |
| } |
| |
| rc1 = ZNodeDelete( childznode ); |
| |
| childpath.str( "" ); |
| childpath << errorZNodePath_ << "/" |
| << errorChildNode; |
| childznode = childpath.str( ); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d Deleting childznode=%s\n" |
| , method_name, __LINE__ |
| , childznode.c_str() ); |
| } |
| |
| rc2 = ZNodeDelete( childznode ); |
| } |
| } |
| } |
| |
| TRACE_EXIT; |
| return((rc1 != ZOK)?rc1:rc2); |
| } |
| |
| int CZClient::ErrorZNodesGet( String_vector *nodes, bool doRetries ) |
| { |
| const char method_name[] = "CZClient::ErrorZNodesGet"; |
| TRACE_ENTRY; |
| |
| bool found = false; |
| int rc = -1; |
| int retries = 0; |
| Stat stat; |
| |
| string errorznodes( errorZNodePath_.c_str() ); |
| |
| nodes->count = 0; |
| nodes->data = NULL; |
| |
| while ( !found ) |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d errorznode=%s\n" |
| , method_name, __LINE__, errorznodes.c_str() ); |
| } |
| |
| // Verify the existence of the parent |
| rc = ZooExistRetry( ZHandle, errorznodes.c_str( ), 0, &stat ); |
| if ( rc == ZNONODE ) |
| { |
| if (doRetries) |
| { |
| if (retries > ZOOKEEPER_RETRY_COUNT) |
| break; |
| retries++; |
| continue; |
| } |
| } |
| else if ( rc == ZOK ) |
| { |
| // Now get the list of error znodes. |
| // |
| // This will return child znodes for each monitor process that has |
| // registered an error with another monitor process. |
| rc = zoo_get_children( ZHandle, errorznodes.c_str( ), 0, nodes ); |
| if ( rc == ZOK ) |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d errorznode=%s, errornodes.count=%d\n" |
| , method_name, __LINE__ |
| , errorznodes.c_str() |
| , nodes->count ); |
| for (int i = 0; i < nodes->count ; i++ ) |
| { |
| trace_printf( "%s@%d errornodes[%d]=%s\n" |
| , method_name, __LINE__ |
| , i |
| , nodes->data[i] ); |
| } |
| } |
| if (doRetries) |
| { |
| if ( nodes->count && nodes->count < 2 ) |
| { |
| unlock(); |
| sleep(ZOOKEEPER_RETRY_WAIT); |
| lock(); |
| if (retries < ZOOKEEPER_CHILD_RETRY_COUNT) |
| { |
| // Wait a bit to see if at least one other node is |
| // having communications problems with the same node |
| retries++; |
| continue; |
| } |
| found = true; |
| } |
| else |
| { |
| unlock(); |
| sleep(ZOOKEEPER_RETRY_WAIT); |
| lock(); |
| if (retries > ZOOKEEPER_CHILD_RETRY_COUNT) |
| break; |
| retries++; |
| continue; |
| } |
| } |
| else |
| { |
| break; |
| } |
| } |
| else |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], zoo_get_children(%s) failed with error %s\n" |
| , method_name, errorznodes.c_str( ), zerror(rc)); |
| mon_log_write(MON_ZCLIENT_ERRORZNODESGET_1, SQ_LOG_ERR, buf); |
| break; |
| } |
| } |
| else // error |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], zoo_exists(%s) failed with error %s\n" |
| , method_name, errorznodes.c_str( ), zerror(rc)); |
| mon_log_write(MON_ZCLIENT_ERRORZNODESGET_2, SQ_LOG_ERR, buf); |
| break; |
| } |
| } |
| |
| TRACE_EXIT; |
| return( rc ); |
| } |
| |
| int CZClient::ErrorZNodesGetChild( const char *errorNode, String_vector *childnodes ) |
| { |
| const char method_name[] = "CZClient::ErrorZNodesGetChild"; |
| TRACE_ENTRY; |
| |
| bool found = false; |
| int rc = -1; |
| int retries = 0; |
| Stat stat; |
| |
| stringstream ss; |
| ss.str( "" ); |
| ss << errorZNodePath_.c_str() << "/" |
| << errorNode; |
| string errorchildznode( ss.str( ) ); |
| |
| childnodes->count = 0; |
| childnodes->data = NULL; |
| |
| while ( !found ) |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d errorchildznode=%s\n" |
| , method_name, __LINE__, errorchildznode.c_str() ); |
| } |
| // Verify the existence of the parent |
| rc = ZooExistRetry( ZHandle, errorchildznode.c_str( ), 0, &stat ); |
| if ( rc == ZNONODE ) |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d errorchildznode=%s does not exist!\n" |
| , method_name, __LINE__ |
| , errorchildznode.c_str( ) ); |
| } |
| break; |
| } |
| else if ( rc == ZOK ) |
| { |
| // Now get the list of available znodes in the cluster. |
| // |
| // This will return child znodes for each monitor process that has |
| // registered, including this process. |
| rc = zoo_get_children( ZHandle, errorchildznode.c_str( ), 0, childnodes ); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d errorNode=%s, childnodes.count=%d\n" |
| , method_name, __LINE__ |
| , errorNode |
| , childnodes->count ); |
| } |
| |
| if ( childnodes->count > 0 ) |
| { |
| found = true; |
| } |
| else |
| { |
| sleep(ZOOKEEPER_RETRY_WAIT); |
| if (retries > ZOOKEEPER_CHILD_RETRY_COUNT) |
| break; |
| retries++; |
| continue; |
| } |
| } |
| else // error |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], zoo_exists() for %s failed with error %s\n" |
| , method_name, errorchildznode.c_str( ), zerror(rc)); |
| mon_log_write(MON_ZCLIENT_ERRORCHILDZNODESGET_1, SQ_LOG_ERR, buf); |
| break; |
| } |
| } |
| |
| TRACE_EXIT; |
| return( rc ); |
| } |
| |
| void CZClient::ErrorZNodesDelete( void ) |
| { |
| const char method_name[] = "CZClient::ErrorZNodesDelete"; |
| TRACE_ENTRY; |
| |
| int rc = -1; |
| struct String_vector errornodes; |
| struct String_vector childnodes; |
| |
| lock(); |
| rc = ErrorZNodesGet( &errornodes ); |
| unlock(); |
| if ( rc != ZOK && rc != ZNONODE ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], ErrorZNodesGet() failed!\n" |
| , method_name ); |
| mon_log_write(MON_ZCLIENT_ERRORZNODESDELETE_1, SQ_LOG_ERR, buf); |
| CLock::wakeOne(); |
| return; |
| } |
| |
| stringstream errorpath; |
| stringstream childpath; |
| string errorznode; |
| string childznode; |
| |
| if ( errornodes.count > 0 ) |
| { |
| for (int i = 0; i < errornodes.count ; i++ ) |
| { |
| errorpath.str( "" ); |
| errorpath << errorZNodePath_.c_str() << "/" |
| << errornodes.data[i]; |
| errorznode = errorpath.str( ); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d Deleting errorznode=%s\n" |
| , method_name, __LINE__ |
| , errorznode.c_str() ); |
| } |
| |
| rc = ErrorZNodesGetChild( errornodes.data[i], &childnodes ); |
| if ( rc != ZOK && rc != ZNONODE ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], ErrorZNodesGetChild() failed!\n" |
| , method_name ); |
| mon_log_write(MON_ZCLIENT_ERRORZNODESDELETE_2, SQ_LOG_ERR, buf); |
| CLock::wakeOne(); |
| return; |
| } |
| |
| if ( childnodes.count > 0 ) |
| { |
| for (int j = 0; j < childnodes.count ; j++ ) |
| { |
| childpath.str( "" ); |
| childpath << errorZNodePath_.c_str() << "/" |
| << errornodes.data[i] << "/" |
| << childnodes.data[j]; |
| childznode = childpath.str( ); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d Deleting childznode=%s\n" |
| , method_name, __LINE__ |
| , childznode.c_str() ); |
| } |
| |
| ZNodeDelete( childznode ); |
| } |
| } |
| |
| FreeStringVector( &childnodes ); |
| ZNodeDelete( errorznode ); |
| } |
| FreeStringVector( &errornodes ); |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| void CZClient::ErrorZNodesWatchSet( void ) |
| { |
| const char method_name[] = "CZClient::ErrorZNodesWatchSet"; |
| TRACE_ENTRY; |
| |
| int rc; |
| |
| stringstream errorpath; |
| string errornode; |
| |
| errornode = errorZNodePath_.c_str(); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d Setting watch set on errornode=%s\n" |
| , method_name, __LINE__ |
| , errornode.c_str() ); |
| } |
| |
| rc = ZNodeWatchChildSet( errornode ); |
| if ( rc != ZOK ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], ZNodeWatchChildSet(%s) failed!\n" |
| , errornode.c_str() |
| , method_name ); |
| mon_log_write(MON_ZCLIENT_ERRORZNODESWATCHSET_1, SQ_LOG_ERR, buf); |
| |
| TRACE_EXIT; |
| return; |
| } |
| else |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d Watch set on errornode=%s\n" |
| , method_name, __LINE__ |
| , errornode.c_str() ); |
| } |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| int CZClient::ErrorZNodeWatchAdd( void ) |
| { |
| const char method_name[] = "CZClient::ErrorZNodeWatchAdd"; |
| TRACE_ENTRY; |
| |
| int rc; |
| string errorznode = errorZNodePath_.c_str(); |
| |
| lock(); |
| rc = ZNodeWatchSet( errorznode ); |
| unlock(); |
| if ( rc != ZOK ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], ZNodeWatchSet(%s) failed!\n" |
| , method_name |
| , errorznode.c_str() ); |
| mon_log_write(MON_ZCLIENT_ERRORZNODEWATCHADD_1, SQ_LOG_ERR, buf); |
| } |
| else |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d Watch set on errorznode=%s\n" |
| , method_name, __LINE__ |
| , errorznode.c_str() ); |
| } |
| } |
| |
| TRACE_EXIT; |
| return(rc); |
| } |
| |
| int CZClient::ErrorZNodeWatchDelete( void ) |
| { |
| const char method_name[] = "CZClient::ErrorZNodeWatchDelete"; |
| TRACE_ENTRY; |
| |
| int rc = -1; |
| |
| string errorznode = errorZNodePath_.c_str(); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d Deleting errorznode(%s)\n" |
| , method_name, __LINE__ |
| , errorznode.c_str() ); |
| } |
| rc = ZNodeWatchReset( errorznode ); |
| if ( rc == ZOK ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], errorznode (%s) deleted!\n" |
| , method_name, errorznode.c_str() ); |
| mon_log_write(MON_ZCLIENT_ERRORZNODEWATCHDELETE_1, SQ_LOG_INFO, buf); |
| } |
| |
| TRACE_EXIT; |
| return( rc ); |
| } |
| |
| void CZClient::HandleChangedZNode( void ) |
| { |
| const char method_name[] = "CZClient::HandleChangedZNode"; |
| TRACE_ENTRY; |
| |
| if ( IsClusterWatchEnabled() ) |
| { |
| char pathStr[MAX_PROCESSOR_NAME] = { 0 }; |
| char nodeName[MAX_PROCESSOR_NAME] = { 0 }; |
| string znode; |
| |
| |
| while (znodeChangedQueue_.size() != 0) |
| { |
| znode.assign( znodeChangedQueue_.front() ); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf("%s@%d" " - znodePath=%s, znodeChangedQueue_.size=%ld\n" |
| , method_name, __LINE__ |
| , znode.c_str(), znodeChangedQueue_.size() ); |
| } |
| |
| znodeChangedQueue_.pop_front(); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d nodeName=%s\n" |
| , method_name, __LINE__ |
| , strlen(nodeName) ? nodeName : "" ); |
| } |
| |
| HandleNodeChange( nodeName ); |
| } |
| } |
| else |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d ClusterWatchEnabled is NOT set!\n" |
| , method_name, __LINE__ ); |
| } |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| void CZClient::HandleChildZNode( void ) |
| { |
| const char method_name[] = "CZClient::HandleChildZNode"; |
| TRACE_ENTRY; |
| |
| if ( IsClusterWatchEnabled() ) |
| { |
| char pathStr[MAX_PROCESSOR_NAME] = { 0 }; |
| char nodeName[MAX_PROCESSOR_NAME] = { 0 }; |
| char *tkn = NULL; |
| char *tknStart = pathStr; |
| char *tknLast = NULL; |
| string znode; |
| |
| while (znodeChildQueue_.size() != 0) |
| { |
| znode.assign( znodeChildQueue_.front() ); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf("%s@%d" " - znodePath=%s, znodeChildQueue_.size=%ld\n" |
| , method_name, __LINE__ |
| , znode.c_str(), znodeChildQueue_.size() ); |
| } |
| |
| znodeChildQueue_.pop_front(); |
| |
| if (znode.compare( configuredZNodePath_ ) == 0) |
| { |
| // The configuredZNodePath_ contains child znodes of each |
| // node in the static configuration. |
| // As node are added or deleted from the static configuration |
| // a correspoding child znode is added or deleted under the |
| // configuredZNodePath_ |
| HandleConfiguredZNodes(); |
| } |
| else if (znode.compare( errorZNodePath_ ) == 0) |
| { |
| HandleErrorZNodes(); |
| } |
| else |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], Don't know how to handle children of znode=%s\n" |
| , method_name |
| , znode.c_str() ); |
| mon_log_write(MON_ZCLIENT_HANDLECHILDZNODE_1, SQ_LOG_ERR, buf); |
| } |
| } |
| } |
| else |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d ClusterWatchEnabled is NOT set!\n" |
| , method_name, __LINE__ ); |
| } |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| void CZClient::HandleConfiguredZNodes( void ) |
| { |
| const char method_name[] = "CZClient::HandleConfiguredZNodes"; |
| TRACE_ENTRY; |
| |
| int rc = -1; |
| struct String_vector confignodes; |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d Handling Configured ZNodes!\n" |
| , method_name, __LINE__ ); |
| } |
| |
| rc = ConfiguredZNodesGet( &confignodes ); |
| if ( rc != ZOK ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], ConfiguredZNodesGet() failed!\n" |
| , method_name ); |
| mon_log_write(MON_ZCLIENT_HANDLEERRORZNODES_1, SQ_LOG_ERR, buf); |
| CLock::wakeOne(); |
| return; |
| } |
| |
| stringstream configpath; |
| string configznode; |
| |
| if ( confignodes.count > 0 ) |
| { |
| for (int i = 0; i < confignodes.count ; i++ ) |
| { |
| configpath.str( "" ); |
| configpath << configuredZNodePath_.c_str() << "/" |
| << confignodes.data[i]; |
| configznode = configpath.str( ); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d Handling configznode=%s\n" |
| , method_name, __LINE__ |
| , configznode.c_str() ); |
| } |
| } |
| HandleNodeConfigurationChange(); |
| FreeStringVector( &confignodes ); |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| void CZClient::HandleCreatedZNode( void ) |
| { |
| const char method_name[] = "CZClient::HandleCreatedZNode"; |
| TRACE_ENTRY; |
| |
| if ( IsClusterWatchEnabled() ) |
| { |
| char pathStr[MAX_PROCESSOR_NAME] = { 0 }; |
| char nodeName[MAX_PROCESSOR_NAME] = { 0 }; |
| char *tkn = NULL; |
| char *tknStart = pathStr; |
| char *tknLast = NULL; |
| string znode; |
| |
| |
| while (znodeCreatedQueue_.size() != 0) |
| { |
| znode.assign( znodeCreatedQueue_.front() ); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf("%s@%d" " - znodePath=%s, znodeCreatedQueue_.size=%ld\n" |
| , method_name, __LINE__ |
| , znode.c_str(), znodeCreatedQueue_.size() ); |
| } |
| |
| znodeCreatedQueue_.pop_front(); |
| |
| strcpy( pathStr, znode.c_str() ); |
| |
| tknStart++; // skip the first '/' |
| tkn = strtok( tknStart, "/" ); |
| do |
| { |
| tknLast = tkn; |
| tkn = strtok( NULL, "/" ); |
| } |
| while( tkn != NULL ); |
| |
| strcpy( nodeName, tknLast ); |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d nodeName=%s\n" |
| , method_name, __LINE__ |
| , strlen(nodeName) ? nodeName : "" ); |
| } |
| |
| HandleNodeCreated( nodeName ); |
| } |
| } |
| else |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d ClusterWatchEnabled is NOT set!\n" |
| , method_name, __LINE__ ); |
| } |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| void CZClient::HandleDeletedZNode( void ) |
| { |
| const char method_name[] = "CZClient::HandleDeletedZNode"; |
| TRACE_ENTRY; |
| |
| if ( IsClusterWatchEnabled() ) |
| { |
| char pathStr[MAX_PROCESSOR_NAME] = { 0 }; |
| char nodeName[MAX_PROCESSOR_NAME] = { 0 }; |
| char *tkn = NULL; |
| char *tknStart = pathStr; |
| char *tknLast = NULL; |
| string znode; |
| |
| |
| while (znodeDeletedQueue_.size() != 0) |
| { |
| znode.assign( znodeDeletedQueue_.front() ); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf("%s@%d" " - znodePath=%s, znodeDeletedQueue_.size=%ld\n" |
| , method_name, __LINE__ |
| , znode.c_str(), znodeDeletedQueue_.size() ); |
| } |
| |
| znodeDeletedQueue_.pop_front(); |
| |
| strcpy( pathStr, znode.c_str() ); |
| |
| tknStart++; // skip the first '/' |
| tkn = strtok( tknStart, "/" ); |
| do |
| { |
| tknLast = tkn; |
| tkn = strtok( NULL, "/" ); |
| } |
| while( tkn != NULL ); |
| |
| strcpy( nodeName, tknLast ); |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d nodeName=%s\n" |
| , method_name, __LINE__ |
| , strlen(nodeName) ? nodeName : "" ); |
| } |
| |
| // Invoke the callback to handle the node expiration |
| HandleNodeExpiration( nodeName ); |
| } |
| } |
| else |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d ClusterWatchEnabled is NOT set!\n" |
| , method_name, __LINE__ ); |
| } |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| void CZClient::HandleErrorZNode( const char *errorNode, const char *childNode ) |
| { |
| const char method_name[] = "CZClient::HandleErrorZNode"; |
| TRACE_ENTRY; |
| |
| int rc = -1; |
| bool deleteErrorznode = false; |
| struct String_vector childnodes; |
| stringstream errorpath; |
| stringstream childpath; |
| string errorznode; |
| string childznode; |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d Handling errorNode=%s\n" |
| , method_name, __LINE__ |
| , errorNode ); |
| } |
| |
| rc = ErrorZNodesGetChild( errorNode, &childnodes ); |
| if ( rc != ZOK && rc != ZNONODE) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], ErrorZNodesGetChild() failed!\n" |
| , method_name ); |
| mon_log_write(MON_ZCLIENT_HANDLEERRORZNODE_1, SQ_LOG_ERR, buf); |
| return; |
| } |
| |
| if ( childnodes.count > 0 ) |
| { |
| for (int i = 0; i < childnodes.count ; i++ ) |
| { |
| if (strcmp( childnodes.data[i], childNode ) == 0) |
| { |
| errorpath.str( "" ); |
| errorpath << errorZNodePath_.c_str() << "/" |
| << errorNode; |
| errorznode = errorpath.str( ); |
| |
| childpath.str( "" ); |
| childpath << errorpath.str( ) << "/" |
| << childNode; |
| childznode = childpath.str( ); |
| |
| // Delete the parent errorznode if it only had one childznode |
| if (childnodes.count == 1) |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d Deleting childznode=%s\n" |
| , method_name, __LINE__ |
| , childznode.c_str() ); |
| } |
| ZNodeDelete( childznode ); |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d Deleting errorznode=%s\n" |
| , method_name, __LINE__ |
| , errorznode.c_str() ); |
| } |
| ZNodeDelete( errorznode ); |
| } |
| else if (childnodes.count > 1) |
| { |
| HandleErrorChildZNodes( errorNode ); |
| } |
| } |
| } |
| } |
| |
| FreeStringVector( &childnodes ); |
| |
| TRACE_EXIT; |
| } |
| |
| void CZClient::HandleErrorZNodes( void ) |
| { |
| const char method_name[] = "CZClient::HandleErrorZNodes"; |
| TRACE_ENTRY; |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d Handling Error ZNodes!\n" |
| , method_name, __LINE__ ); |
| } |
| |
| int rc = -1; |
| struct String_vector errornodes; |
| |
| rc = ErrorZNodesGet( &errornodes, false ); |
| if ( rc != ZOK ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], ErrorZNodesGet() failed!\n" |
| , method_name ); |
| mon_log_write(MON_ZCLIENT_HANDLEERRORZNODES_1, SQ_LOG_ERR, buf); |
| return; |
| } |
| |
| stringstream errorpath; |
| string errorznode; |
| |
| if ( errornodes.count > 0 ) |
| { |
| for (int i = 0; i < errornodes.count ; i++ ) |
| { |
| errorpath.str( "" ); |
| errorpath << errorZNodePath_.c_str() << "/" |
| << errornodes.data[i]; |
| errorznode = errorpath.str( ); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d Handling errorznode=%s\n" |
| , method_name, __LINE__ |
| , errorznode.c_str() ); |
| } |
| |
| HandleErrorChildZNodes( errornodes.data[i] ); |
| } |
| FreeStringVector( &errornodes ); |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| void CZClient::HandleErrorChildZNodes( const char *errorNode ) |
| { |
| const char method_name[] = "CZClient::HandleErrorChildZNodes"; |
| TRACE_ENTRY; |
| |
| int rc = -1; |
| bool deleteErrorznode = false; |
| struct String_vector childnodes; |
| stringstream errorpath; |
| stringstream childpath; |
| string errorznode; |
| string childznode; |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d Handling errorNode=%s\n" |
| , method_name, __LINE__ |
| , errorNode ); |
| } |
| |
| rc = ErrorZNodesGetChild( errorNode, &childnodes ); |
| if ( rc != ZOK && rc != ZNONODE) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], ErrorZNodesGetChild() failed!\n" |
| , method_name ); |
| mon_log_write(MON_ZCLIENT_HNDLEERRORCHILDZNODES_1, SQ_LOG_ERR, buf); |
| return; |
| } |
| |
| if ( childnodes.count > 1 ) |
| { |
| ErrorZNodesDelete( errorNode, &childnodes ); |
| // Delete the corresponding running znode which will trigger node down |
| RunningZNodeDelete( errorNode ); |
| } |
| else |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d Bypassing errorNode=%s, childnodes.count=%d\n" |
| , method_name, __LINE__ |
| , errorNode |
| , childnodes.count ); |
| } |
| } |
| |
| FreeStringVector( &childnodes ); |
| |
| TRACE_EXIT; |
| } |
| |
| void CZClient::HandleErrorChildZNodesForZNodeChild( const char *childNode, bool doRetries ) |
| { |
| const char method_name[] = "CZClient::HandleErrorChildZNodesForZNodeChild"; |
| TRACE_ENTRY; |
| |
| int rc = -1; |
| bool deleteErrorznode = false; |
| struct String_vector errornodes; |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d Handling childNode=%s\n" |
| , method_name, __LINE__ |
| , childNode ); |
| } |
| |
| rc = ErrorZNodesGet( &errornodes, doRetries ); |
| if ( rc != ZOK ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], ErrorZNodesGet() failed!\n" |
| , method_name ); |
| mon_log_write(MON_ZCLIENT_HNDLERRCHLZNFORZNCHL_1, SQ_LOG_ERR, buf); |
| return; |
| } |
| |
| stringstream errorpath; |
| string errorznode; |
| |
| if ( errornodes.count > 0 ) |
| { |
| for (int i = 0; i < errornodes.count ; i++ ) |
| { |
| errorpath.str( "" ); |
| errorpath << errorZNodePath_.c_str() << "/" |
| << errornodes.data[i]; |
| errorznode = errorpath.str( ); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d Handling errorznode=%s\n" |
| , method_name, __LINE__ |
| , errorznode.c_str() ); |
| } |
| |
| HandleErrorZNode( errornodes.data[i], childNode ); |
| } |
| FreeStringVector( &errornodes ); |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| int CZClient::InitializeZClient( void ) |
| { |
| const char method_name[] = "CZClient::InitializeZClient"; |
| TRACE_ENTRY; |
| |
| int rc; |
| int retries = 0; |
| |
| rc = ZNodesTreeCreate(); |
| |
| while ( rc != ZOK && retries < ZOOKEEPER_RETRY_COUNT) |
| { |
| sleep(ZOOKEEPER_RETRY_WAIT); |
| retries++; |
| rc = ZNodesTreeCreate(); |
| } |
| |
| if ( rc == ZOK ) |
| { |
| rc = MyRunningZNodeCreate(); |
| } |
| |
| TRACE_EXIT; |
| return( rc ); |
| } |
| |
| bool CZClient::IsRunningZNodeExpired( const char *nodeName, int &zerr ) |
| { |
| const char method_name[] = "CZClient::IsRunningZNodeExpired"; |
| TRACE_ENTRY; |
| |
| bool expired = false; |
| int rc = -1; |
| Stat stat; |
| stringstream newpath; |
| newpath.str( "" ); |
| newpath << runningZNodePath_.c_str() << "/" |
| << nodeName; |
| string monZnode = newpath.str( ); |
| |
| zerr = ZOK; |
| |
| rc = ZooExistRetry( ZHandle, monZnode.c_str( ), 0, &stat ); |
| if ( rc == ZNONODE ) |
| { |
| expired = true; |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d monZnode=%s does not exist!\n" |
| , method_name, __LINE__, monZnode.c_str() ); |
| } |
| } |
| else if ( rc == ZCONNECTIONLOSS || rc == ZOPERATIONTIMEOUT ) |
| { |
| // Treat this as not expired until communication resumes |
| expired = false; |
| zerr = rc; |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], zoo_exists() for %s failed with error %s\n" |
| , method_name, monZnode.c_str( ), zerror(rc)); |
| mon_log_write(MON_ZCLIENT_ISZNODEEXPIRED_1, SQ_LOG_ERR, buf); |
| } |
| else if ( rc == ZOK ) |
| { |
| expired = false; |
| } |
| else |
| { |
| expired = true; |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], zoo_exists() for %s failed with error %s\n" |
| , method_name, monZnode.c_str( ), zerror(rc)); |
| mon_log_write(MON_ZCLIENT_ISZNODEEXPIRED_2, SQ_LOG_CRIT, buf); |
| switch ( rc ) |
| { |
| case ZSYSTEMERROR: |
| case ZRUNTIMEINCONSISTENCY: |
| case ZDATAINCONSISTENCY: |
| case ZMARSHALLINGERROR: |
| case ZUNIMPLEMENTED: |
| case ZBADARGUMENTS: |
| case ZINVALIDSTATE: |
| case ZSESSIONEXPIRED: |
| case ZCLOSING: |
| // Treat these error like a session expiration, since |
| // we can't communicate with quorum servers |
| HandleMyNodeExpiration(); |
| break; |
| default: |
| break; |
| } |
| } |
| |
| TRACE_EXIT; |
| return( expired ); |
| } |
| |
| bool CZClient::IsZNodeMaster( const char *nodeName ) |
| { |
| const char method_name[] = "CZClient::IsZNodeMaster"; |
| TRACE_ENTRY; |
| |
| bool isMaster = false; |
| string masterZNode; |
| |
| masterZNode.assign(MasterWaitForAndReturn( true )); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d masterZNode=%s, nodeName=%s\n" |
| , method_name, __LINE__ |
| , masterZNode.c_str() |
| , nodeName ); |
| } |
| |
| isMaster = (masterZNode.compare( nodeName ) == 0) ? true : false; |
| |
| TRACE_EXIT; |
| return( isMaster ); |
| } |
| |
| const char* CZClient::MasterWaitForAndReturn( bool doWait ) |
| { |
| const char method_name[] = "CZClient::MasterWaitForAndReturn"; |
| TRACE_ENTRY; |
| |
| bool found = false; |
| int rc = -1; |
| int retries = 0; |
| Stat stat; |
| |
| struct String_vector nodes = {0, NULL}; |
| string masterMonitor( masterZNodePath_.c_str() ); |
| |
| // wait for ZCLIENT_MASTER_ZNODE_RETRY_COUNT minutes for giving up. |
| while ( (StateGet() != ZC_SHUTDOWN) && (!found) && (retries < ZCLIENT_MASTER_ZNODE_RETRY_COUNT)) |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d masterMonitor path=%s\n" |
| , method_name, __LINE__, masterMonitor.c_str() ); |
| } |
| |
| if (MyNode && MyNode->IsPendingNodeDown()) |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d MyNode IsPendingNodeDown=%s\n" |
| , method_name, __LINE__ |
| , MyNode->IsPendingNodeDown()?"true":"false" ); |
| } |
| break; |
| } |
| |
| // Verify the existence of the parent ZCLIENT_MASTER_ZNODE |
| rc = ZooExistRetry( ZHandle, masterMonitor.c_str( ), 0, &stat ); |
| |
| if ( rc == ZNONODE ) |
| { |
| if (doWait == false) |
| { |
| break; |
| } |
| sleep(ZOOKEEPER_RETRY_WAIT); |
| retries++; |
| continue; |
| } |
| else if ( rc == ZOK ) |
| { |
| // Now get the master znode that registered under the masterMonitor |
| // znode. |
| // |
| // This will return one child znode for the monitor process that has |
| // registered as the current master. |
| rc = zoo_get_children( ZHandle, masterMonitor.c_str( ), 0, &nodes ); |
| if ( nodes.count > 0 ) |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d nodes.count=%d\n" |
| , method_name, __LINE__ |
| , nodes.count ); |
| } |
| found = true; |
| } |
| else |
| { |
| if (doWait == false) |
| { |
| break; |
| } |
| sleep(ZOOKEEPER_RETRY_WAIT); |
| retries++; |
| continue; |
| } |
| } |
| |
| else // error |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d Error (MasterMonitor) MasterWaitForAndReturn() returned rc (%d), retries %d\n" |
| , method_name, __LINE__, rc, retries ); |
| } |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], ZooExistRetry() for %s failed with error %s\n" |
| , method_name, masterMonitor.c_str( ), zerror(rc)); |
| mon_log_write(MON_ZCLIENT_WAITFORRETURNMASTER_1, SQ_LOG_ERR, buf); |
| break; |
| } |
| } |
| |
| //should we assert nodes.count == 1? |
| if (found) |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d (MasterMonitor) Master Monitor found (%s/%s)\n" |
| , method_name, __LINE__, masterMonitor.c_str(), nodes.data[0] ); |
| } |
| TRACE_EXIT; |
| return nodes.data[0]; |
| } |
| else |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d (MasterMonitor) Master Monitor NOT found\n" , method_name, __LINE__); |
| } |
| } |
| |
| TRACE_EXIT; |
| return NULL; |
| } |
| |
| int CZClient::MasterZNodeCreate( const char *nodeName ) |
| { |
| const char method_name[] = "CZClient::MasterZNodeCreate"; |
| TRACE_ENTRY; |
| |
| int rc; |
| int retries = 0; |
| |
| stringstream masterpath; |
| masterpath.str( "" ); |
| masterpath << masterZNodePath_.c_str() << "/" |
| << nodeName; |
| |
| string monZnode = masterpath.str( ); |
| |
| stringstream ss; |
| ss.str( "" ); |
| ss <<nodeName << ":" << MyPNID; |
| string monData = ss.str( ); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d ZNodeCreate(%s:%s)\n" |
| , method_name, __LINE__ |
| , monZnode.c_str() |
| , monData.c_str() ); |
| } |
| |
| rc = ZNodeCreate( monZnode.c_str(), monData.c_str(), ZOO_EPHEMERAL ); |
| while ( ((rc == ZCONNECTIONLOSS) || (rc == ZOPERATIONTIMEOUT)) && retries < ZOOKEEPER_RETRY_COUNT) |
| { |
| sleep(ZOOKEEPER_RETRY_WAIT); |
| retries++; |
| rc = ZNodeCreate( monZnode.c_str(), monData.c_str(), ZOO_EPHEMERAL ); |
| } |
| |
| if (rc != ZOK) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], ZNodeCreate(%s) failed with error %s\n" |
| , method_name, monData.c_str(), zerror(rc) ); |
| mon_log_write(MON_ZCLIENT_MASTERZNODECREATE_1, SQ_LOG_ERR, buf); |
| |
| TRACE_EXIT; |
| return(rc); // Return the error |
| } |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d (MasterMonitor) Created master node for %s with rc = %d)\n" |
| , method_name, __LINE__, monZnode.c_str( ), rc); |
| } |
| TRACE_EXIT; |
| return(rc); |
| } |
| |
| int CZClient::MasterZNodeDelete( const char *nodeName ) |
| { |
| const char method_name[] = "CZClient::MasterZNodeDelete"; |
| TRACE_ENTRY; |
| |
| int rc = -1; |
| stringstream newpath; |
| newpath.str( "" ); |
| newpath << masterZNodePath_.c_str() << "/" |
| << nodeName; |
| |
| string znode = newpath.str( ); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d Deleting znode(%s)\n" |
| , method_name, __LINE__ |
| , znode.c_str() ); |
| } |
| |
| rc = ZNodeDelete( znode ); |
| if ( rc == ZOK ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], Master znode (%s) deleted!\n" |
| , method_name, nodeName ); |
| mon_log_write(MON_ZCLIENT_MASTERZNODEDELETE_1, SQ_LOG_INFO, buf); |
| } |
| else if ( rc == ZNONODE ) |
| { |
| // This is ok since we call it indiscriminately |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d (MasterMonitor) Master ZNode %s already deleted\n" |
| , method_name, __LINE__ |
| , nodeName ); |
| } |
| } |
| |
| TRACE_EXIT; |
| return( rc ); |
| } |
| |
| // ZClient main processing loop |
| void CZClient::MonitorCluster() |
| { |
| const char method_name[] = "CZClient::MonitorCluster"; |
| TRACE_ENTRY; |
| |
| int rc; |
| struct timespec timeout; |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d thread %lx starting\n" |
| , method_name, __LINE__, threadId_); |
| } |
| |
| if (zcMonitoringRate_ >= 0) |
| { |
| TimeToWakeUpSet( timeout ); |
| } |
| |
| while ( StateGet() != ZC_SHUTDOWN ) |
| { |
| lock(); |
| if ( !IsEnabled() ) |
| { |
| // Wait until enabled |
| CLock::wait(); |
| } |
| else |
| { |
| |
| if (zcMonitoringRate_ < 0 || StateGet() == ZC_DISABLED) |
| { |
| // Wait until signaled |
| CLock::wait(); |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d" " - ZCluster signaled, state_=%s\n" |
| , method_name, __LINE__ |
| , ZClientStateStr(StateGet()) ); |
| } |
| } |
| |
| if (znodeDeletedQueue_.size() != 0) |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d - ZCluster signaling: " |
| "ZC_ZNODE_DELETED, znodeDeletedQueue_=%ld\n" |
| , method_name, __LINE__ |
| , znodeDeletedQueue_.size() ); |
| } |
| StateSet( ZC_ZNODE_DELETED ); |
| } |
| else if (znodeChildQueue_.size() != 0) |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d - ZCluster signaling: " |
| "ZC_ZNODE_CHILD, znodeChildQueue_=%ld\n" |
| , method_name, __LINE__ |
| , znodeChildQueue_.size() ); |
| } |
| StateSet( ZC_ZNODE_CHILD ); |
| } |
| else if (znodeCreatedQueue_.size() != 0) |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d - ZCluster signaling: " |
| "ZC_ZNODE_CREATED, znodeCreatedQueue_=%ld\n" |
| , method_name, __LINE__ |
| , znodeCreatedQueue_.size() ); |
| } |
| StateSet( ZC_ZNODE_CREATED ); |
| } |
| else if (znodeChangedQueue_.size() != 0) |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d - ZCluster signaling: " |
| "ZC_ZNODE_CHANGED, znodeChangedQueue_=%ld\n" |
| , method_name, __LINE__ |
| , znodeChangedQueue_.size() ); |
| } |
| StateSet( ZC_ZNODE_CHANGED ); |
| } |
| else |
| { |
| // Wait until signaled or timer expires |
| rc = CLock::timedWait( &timeout ); |
| if ( rc != ETIMEDOUT ) |
| { |
| if ( rc != 0 ) |
| { |
| ClusterMonitoringStop(); |
| } |
| else |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d" " - ZCluster signaled, state_=%s\n" |
| , method_name, __LINE__ |
| , ZClientStateStr(StateGet()) ); |
| } |
| } |
| } |
| } |
| } |
| |
| switch ( StateGet() ) |
| { |
| case ZC_START: |
| ClusterMonitoringStart(); |
| break; |
| case ZC_CLUSTER: |
| if ( IsClusterWatchEnabled() ) |
| { |
| RunningZNodesCheck(); |
| if (StateGet() != ZC_STOP) |
| { |
| StateSet( ZC_MYZNODE ); |
| } |
| } |
| break; |
| case ZC_WATCH: |
| if ( !IsClusterWatchEnabled() ) |
| { |
| ConfiguredZNodesWatchSet(); |
| ErrorZNodesWatchSet(); |
| RunningZNodesWatchSet(); |
| if (StateGet() != ZC_STOP) |
| { |
| ClusterWatchEnabledSet( true ); |
| StateSet( ZC_MYZNODE ); |
| } |
| } |
| break; |
| case ZC_MYZNODE: |
| if ( IsClusterWatchEnabled() ) |
| { |
| MyRunningZNodeCheck(); |
| } |
| break; |
| case ZC_ZNODE_CHANGED: |
| if ( IsClusterWatchEnabled() ) |
| { |
| HandleChangedZNode(); |
| StateSet( ZC_MYZNODE ); |
| } |
| break; |
| case ZC_ZNODE_CHILD: |
| if ( IsClusterWatchEnabled() ) |
| { |
| HandleChildZNode(); |
| StateSet( ZC_MYZNODE ); |
| } |
| break; |
| case ZC_ZNODE_CREATED: |
| if ( IsClusterWatchEnabled() ) |
| { |
| HandleCreatedZNode(); |
| StateSet( ZC_MYZNODE ); |
| } |
| break; |
| case ZC_ZNODE_DELETED: |
| if ( IsClusterWatchEnabled() ) |
| { |
| HandleDeletedZNode(); |
| StateSet( ZC_MYZNODE ); |
| } |
| break; |
| case ZC_STOP: |
| ClusterMonitoringStop(); |
| break; |
| default: |
| break; |
| } |
| |
| if (zcMonitoringRate_ >= 0) |
| { |
| TimeToWakeUpSet( timeout ); |
| } |
| unlock(); |
| } |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf("%s@%d thread %lx exiting\n" |
| , method_name,__LINE__, pthread_self()); |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| void CZClient::MyRunningZNodeCheck( void ) |
| { |
| const char method_name[] = "CZClient::MyRunningZNodeCheck"; |
| TRACE_ENTRY; |
| |
| int zerr; |
| struct timespec currentTime; |
| |
| if ( IsClusterWatchEnabled() ) |
| { |
| if (resetMyZNodeFailedTime_) |
| { |
| resetMyZNodeFailedTime_ = false; |
| clock_gettime(CLOCK_REALTIME, &myZNodeFailedTime_); |
| myZNodeFailedTime_.tv_sec += (SessionTimeoutGet() * 2); |
| #if 0 |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d" " - Resetting MyZnode Fail Time %ld(secs)\n" |
| , method_name, __LINE__ |
| , myZNodeFailedTime_.tv_sec ); |
| } |
| #endif |
| } |
| if (MyNode->IsPendingNodeDown()) |
| { |
| return; |
| } |
| if ( ! IsRunningZNodeExpired( Node_name, zerr ) ) |
| { |
| if ( zerr == ZCONNECTIONLOSS || zerr == ZOPERATIONTIMEOUT ) |
| { |
| // Ignore transient errors with the quorum. |
| // However, if longer than the session |
| // timeout, handle it as a hard error. |
| clock_gettime(CLOCK_REALTIME, ¤tTime); |
| if (currentTime.tv_sec > myZNodeFailedTime_.tv_sec) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], Zookeeper quorum comm error: %s - Handling my znode (%s) as expired! Node is going down.\n" |
| , method_name, zerror(zerr), Node_name ); |
| mon_log_write(MON_ZCLIENT_MYRUNNINGZNODECHECK_1, SQ_LOG_ERR, buf); |
| HandleMyNodeExpiration(); |
| } |
| } |
| else |
| { |
| resetMyZNodeFailedTime_ = true; |
| } |
| } |
| else |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], My znode (%s) expired! Node is going down.\n" |
| , method_name, Node_name ); |
| mon_log_write(MON_ZCLIENT_MYRUNNINGZNODECHECK_2, SQ_LOG_ERR, buf); |
| HandleMyNodeExpiration(); |
| } |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| int CZClient::MyRunningZNodeCreate( void ) |
| { |
| const char method_name[] = "CZClient::MyRunningZNodeCreate"; |
| TRACE_ENTRY; |
| |
| int rc; |
| int zerr; |
| char pnidStr[10]; |
| |
| sprintf( pnidStr, "%d", MyPNID); |
| |
| stringstream newpath; |
| newpath.str( "" ); |
| newpath << runningZNodePath_.c_str() << "/" |
| << Node_name; |
| string monZnode = newpath.str( ); |
| |
| stringstream ss; |
| ss.str( "" ); |
| ss << Node_name << ":" << pnidStr; |
| string monData = ss.str( ); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d ZNodeCreate(%s:%s)\n" |
| , method_name, __LINE__ |
| , monZnode.c_str() |
| , monData.c_str() ); |
| } |
| |
| lock(); |
| // Clean up previous error znodes |
| HandleErrorChildZNodes( Node_name ); |
| unlock(); |
| |
| // Clean up my previous running znode, if any |
| rc = ZNodeDelete( monZnode ); |
| if ( rc == ZOK ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], My znode (%s) deleted!\n" |
| , method_name, Node_name ); |
| mon_log_write(MON_ZCLIENT_MYRUNNINGZNODECREATE_1, SQ_LOG_INFO, buf); |
| } |
| |
| rc = ZNodeCreate( monZnode.c_str(), monData.c_str(), ZOO_EPHEMERAL ); |
| |
| TRACE_EXIT; |
| return(rc); |
| } |
| |
| int CZClient::RunningZNodeDelete( const char *nodeName ) |
| { |
| const char method_name[] = "CZClient::RunningZNodeDelete"; |
| TRACE_ENTRY; |
| |
| int rc = -1; |
| |
| stringstream newpath; |
| newpath.str( "" ); |
| newpath << runningZNodePath_.c_str() << "/" |
| << nodeName; |
| string monZnode = newpath.str( ); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d Deleting znode(%s)\n" |
| , method_name, __LINE__ |
| , monZnode.c_str() ); |
| } |
| |
| rc = ZNodeDelete( monZnode ); |
| if ( rc == ZOK ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], znode (%s) deleted!\n" |
| , method_name, nodeName ); |
| mon_log_write(MON_ZCLIENT_RUNZNODEWATCHDELETE_1, SQ_LOG_INFO, buf); |
| } |
| |
| if (strcmp( Node_name, nodeName) == 0) |
| { |
| // Clean up my error znode and children |
| HandleErrorChildZNodes( Node_name ); |
| // Clean up error znodes and where I am their 'only' child |
| lock(); |
| HandleErrorChildZNodesForZNodeChild( Node_name, true ); |
| unlock(); |
| } |
| |
| TRACE_EXIT; |
| return( rc ); |
| } |
| |
| int CZClient::RunningZNodeWatchAdd( const char *nodeName ) |
| { |
| const char method_name[] = "CZClient::RunningZNodeWatchAdd"; |
| TRACE_ENTRY; |
| |
| int rc; |
| stringstream newpath; |
| newpath.str( "" ); |
| newpath << runningZNodePath_.c_str() << "/" |
| << nodeName; |
| string monZnode = newpath.str( ); |
| |
| lock(); |
| rc = ZNodeWatchSet( monZnode ); |
| unlock(); |
| if ( rc != ZOK ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], ZNodeWatchSet(%s) failed!\n" |
| , method_name |
| , monZnode.c_str() ); |
| mon_log_write(MON_ZCLIENT_RUNZNODEWATCHADD_1, SQ_LOG_ERR, buf); |
| } |
| else |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d Watch set on monZnode=%s\n" |
| , method_name, __LINE__ |
| , monZnode.c_str() ); |
| } |
| } |
| |
| TRACE_EXIT; |
| return(rc); |
| } |
| |
| int CZClient::RunningZNodeWatchDelete( const char *nodeName ) |
| { |
| const char method_name[] = "CZClient::RunningZNodeWatchDelete"; |
| TRACE_ENTRY; |
| |
| int rc = -1; |
| |
| stringstream newpath; |
| newpath.str( "" ); |
| newpath << runningZNodePath_.c_str() << "/" |
| << nodeName; |
| string monZnode = newpath.str( ); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d Deleting znode(%s)\n" |
| , method_name, __LINE__ |
| , monZnode.c_str() ); |
| } |
| rc = ZNodeWatchReset( monZnode ); |
| if ( rc == ZOK ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], znode (%s) deleted!\n" |
| , method_name, nodeName ); |
| mon_log_write(MON_ZCLIENT_RUNZNODEWATCHDELETE_1, SQ_LOG_INFO, buf); |
| } |
| |
| TRACE_EXIT; |
| return( rc ); |
| } |
| |
| void CZClient::RunningZNodesCheck( void ) |
| { |
| const char method_name[] = "CZClient::RunningZNodesCheck"; |
| TRACE_ENTRY; |
| |
| int rc; |
| struct String_vector nodes; |
| |
| if ( IsClusterWatchEnabled() ) |
| { |
| rc = RunningZNodesGet( &nodes ); |
| if ( rc != ZOK ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], RunningZNodesGet() failed!\n" |
| , method_name ); |
| mon_log_write(MON_ZCLIENT_RUNZNODESCHECK_1, SQ_LOG_ERR, buf); |
| StateSet( CZClient::ZC_STOP ); |
| CLock::wakeOne(); |
| return; |
| } |
| |
| stringstream newpath; |
| string monZnode; |
| string nodeName; |
| int pnid = -1; |
| |
| if ( nodes.count > 0 ) |
| { |
| for (int i = 0; i < nodes.count ; i++ ) |
| { |
| newpath.str( "" ); |
| newpath << runningZNodePath_.c_str() << "/" |
| << nodes.data[i]; |
| string monZnode = newpath.str( ); |
| |
| rc = ZNodeDataGet( monZnode, nodeName, pnid ); |
| if ( rc != ZOK ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], ZNodeDataGet(%s) failed!\n" |
| , method_name |
| , monZnode.c_str() ); |
| mon_log_write(MON_ZCLIENT_RUNZNODESCHECK_2, SQ_LOG_ERR, buf); |
| } |
| else |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d monZnode=%s, nodeName=%s, pnid=%d)\n" |
| , method_name, __LINE__ |
| , monZnode.c_str(), nodeName.c_str(), pnid ); |
| } |
| } |
| } |
| FreeStringVector( &nodes ); |
| } |
| } |
| else |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d ClusterWatch is NOT set!\n" |
| , method_name, __LINE__ ); |
| } |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| void CZClient::RunningZNodesDelete( void ) |
| { |
| const char method_name[] = "CZClient::RunningZNodesDelete"; |
| TRACE_ENTRY; |
| |
| int rc; |
| struct String_vector nodes; |
| |
| rc = RunningZNodesGet( &nodes ); |
| if ( rc != ZOK ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], RunningZNodesGet() failed!\n" |
| , method_name ); |
| mon_log_write(MON_ZCLIENT_RUNZNODESDELETE_1, SQ_LOG_ERR, buf); |
| CLock::wakeOne(); |
| return; |
| } |
| |
| stringstream newpath; |
| string monZnode; |
| string nodeName; |
| int pnid = -1; |
| |
| if ( nodes.count > 0 ) |
| { |
| for (int i = 0; i < nodes.count ; i++ ) |
| { |
| newpath.str( "" ); |
| newpath << runningZNodePath_.c_str() << "/" |
| << nodes.data[i]; |
| string monZnode = newpath.str( ); |
| |
| rc = ZNodeDataGet( monZnode, nodeName, pnid ); |
| if ( rc != ZOK ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], ZNodeDataGet(%s) failed!\n" |
| , method_name |
| , monZnode.c_str() ); |
| mon_log_write(MON_ZCLIENT_RUNZNODESDELETE_2, SQ_LOG_ERR, buf); |
| } |
| else |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d monZnode=%s, nodeName=%s, pnid=%d)\n" |
| , method_name, __LINE__ |
| , monZnode.c_str(), nodeName.c_str(), pnid ); |
| } |
| ZClient->RunningZNodeDelete( nodeName.c_str() ); |
| ZClient->MasterZNodeDelete( nodeName.c_str() ); |
| } |
| } |
| FreeStringVector( &nodes ); |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| int CZClient::RunningZNodesGet( String_vector *nodes ) |
| { |
| const char method_name[] = "CZClient::RunningZNodesGet"; |
| TRACE_ENTRY; |
| |
| bool found = false; |
| int rc = -1; |
| int retries = 0; |
| Stat stat; |
| |
| string trafCluster( runningZNodePath_.c_str() ); |
| |
| nodes->count = 0; |
| nodes->data = NULL; |
| |
| while ( !found ) |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d trafCluster=%s\n" |
| , method_name, __LINE__, trafCluster.c_str() ); |
| } |
| // Verify the existence of the parent |
| rc = ZooExistRetry( ZHandle, trafCluster.c_str( ), 0, &stat ); |
| if ( rc == ZNONODE ) |
| { |
| if (retries > ZOOKEEPER_RETRY_COUNT) |
| break; |
| retries++; |
| continue; |
| } |
| else if ( rc == ZOK ) |
| { |
| // Now get the list of available znodes in the cluster. |
| // |
| // This will return child znodes for each monitor process that has |
| // registered, including this process. |
| rc = zoo_get_children( ZHandle, trafCluster.c_str( ), 0, nodes ); |
| if ( nodes->count > 0 ) |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d nodes.count=%d\n" |
| , method_name, __LINE__ |
| , nodes->count ); |
| } |
| found = true; |
| } |
| else |
| { |
| sleep(ZOOKEEPER_RETRY_WAIT); |
| if (retries > ZOOKEEPER_CHILD_RETRY_COUNT) |
| break; |
| retries++; |
| continue; |
| } |
| } |
| else // error |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], zoo_exists() for %s failed with error %s\n" |
| , method_name, trafCluster.c_str( ), zerror(rc)); |
| mon_log_write(MON_ZCLIENT_RUNZNODESGET_1, SQ_LOG_ERR, buf); |
| break; |
| } |
| } |
| |
| TRACE_EXIT; |
| return( rc ); |
| } |
| |
| void CZClient::RunningZNodesWatchSet( void ) |
| { |
| const char method_name[] = "CZClient::RunningZNodesWatchSet"; |
| TRACE_ENTRY; |
| |
| int rc; |
| struct String_vector nodes; |
| |
| if ( !IsClusterWatchEnabled() ) |
| { |
| rc = RunningZNodesGet( &nodes ); |
| if ( rc != ZOK ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], RunningZNodesGet() failed!\n" |
| , method_name ); |
| mon_log_write(MON_ZCLIENT_RUNZNODESWATCHSET_1, SQ_LOG_ERR, buf); |
| CLock::wakeOne(); |
| return; |
| } |
| |
| stringstream runningpath; |
| string runningznode; |
| |
| if ( nodes.count > 0 ) |
| { |
| for (int i = 0; i < nodes.count ; i++ ) |
| { |
| runningpath.str( "" ); |
| runningpath << runningZNodePath_.c_str() << "/" |
| << nodes.data[i]; |
| string runningznode = runningpath.str( ); |
| |
| rc = ZNodeWatchSet( runningznode ); |
| if ( rc != ZOK ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], ZNodeWatchSet(%s) failed!\n" |
| , runningznode.c_str() |
| , method_name ); |
| mon_log_write(MON_ZCLIENT_RUNZNODESWATCHSET_2, SQ_LOG_ERR, buf); |
| |
| FreeStringVector( &nodes ); |
| TRACE_EXIT; |
| return; |
| } |
| else |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d Watch set on monZnode=%s\n" |
| , method_name, __LINE__ |
| , runningznode.c_str() ); |
| } |
| } |
| } |
| FreeStringVector( &nodes ); |
| } |
| } |
| else |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d Cluster watch already enabled!\n" |
| , method_name, __LINE__ ); |
| } |
| } |
| |
| TRACE_EXIT; |
| } |
| |
| int CZClient::ShutdownWork(void) |
| { |
| const char method_name[] = "CZClient::ShutdownWork"; |
| TRACE_ENTRY; |
| |
| // Set flag that tells the commAcceptor thread to exit |
| StateSet( ZC_SHUTDOWN ); |
| CLock::wakeOne(); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d waiting for ZClient thread %lx to exit.\n" |
| , method_name, __LINE__, threadId_); |
| } |
| |
| // Wait for commAcceptor thread to exit |
| int rc = pthread_join( threadId_, NULL ); |
| if (rc != 0) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| int err = rc; |
| sprintf(buf, "[%s], Error= Can't join thread! - errno=%d (%s)\n", method_name, err, strerror(err)); |
| mon_log_write(MON_ZCLIENT_SHUTDOWNWORK_1, SQ_LOG_ERR, buf); |
| } |
| |
| TRACE_EXIT; |
| return(rc); |
| } |
| |
| // Create the ZClientThread |
| int CZClient::StartWork( void ) |
| { |
| const char method_name[] = "CZClient::StartWork"; |
| TRACE_ENTRY; |
| |
| int rc = pthread_create(&threadId_, NULL, ZClientThread, this); |
| if (rc != 0) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf(buf, sizeof(buf), "[%s], ZClientThread create error=%d\n", |
| method_name, rc); |
| mon_log_write(MON_ZCLIENT_STARTWORK_1, SQ_LOG_ERR, buf); |
| } |
| |
| TRACE_EXIT; |
| return(rc); |
| } |
| |
| void CZClient::StartMonitoring( void ) |
| { |
| const char method_name[] = "CZClient::StartMonitoring"; |
| TRACE_ENTRY; |
| if (ZHandle) |
| { |
| ZClient->StateSet( CZClient::ZC_START ); |
| ZClient->CLock::wakeOne(); |
| } |
| TRACE_EXIT; |
| } |
| |
| void CZClient::StateSet( ZClientState_t state ) |
| { |
| CAutoLock lock(getLocker()); |
| |
| if ( StateGet() != ZC_SHUTDOWN ) |
| { |
| if (state == ZC_SHUTDOWN) |
| { |
| shutdown_ = true; |
| } |
| state_ = state; |
| } |
| } |
| |
| void CZClient::StateSet( int type, ZClientState_t state, const char *znodePath ) |
| { |
| const char method_name[] = "CZClient::StateSet"; |
| |
| CAutoLock lock(getLocker()); |
| if ( StateGet() != ZC_SHUTDOWN ) |
| { |
| StateSet( state ); |
| if ( type == ZOO_CHANGED_EVENT ) |
| { |
| znodeChangedQueue_.push_back( znodePath ); |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d - state_=%s, " |
| "znodeChangedQueue_=%ld\n" |
| , method_name, __LINE__ |
| , ZClientStateStr(StateGet()) |
| , znodeChangedQueue_.size() ); |
| } |
| } |
| else if ( type == ZOO_CHILD_EVENT ) |
| { |
| znodeChildQueue_.push_back( znodePath ); |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d - state_=%s, " |
| "znodeChildQueue_=%ld\n" |
| , method_name, __LINE__ |
| , ZClientStateStr(StateGet()) |
| , znodeChildQueue_.size() ); |
| } |
| } |
| else if ( type == ZOO_CREATED_EVENT ) |
| { |
| znodeCreatedQueue_.push_back( znodePath ); |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d - state_=%s, " |
| "znodeCreatedQueue_=%ld\n" |
| , method_name, __LINE__ |
| , ZClientStateStr(StateGet()) |
| , znodeCreatedQueue_.size() ); |
| } |
| } |
| else if ( type == ZOO_DELETED_EVENT ) |
| { |
| znodeDeletedQueue_.push_back( znodePath ); |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d - state_=%s, " |
| "znodeDeletedQueue_=%ld\n" |
| , method_name, __LINE__ |
| , ZClientStateStr(StateGet()) |
| , znodeDeletedQueue_.size() ); |
| } |
| } |
| else |
| { |
| abort(); // Programmer bonehead! |
| } |
| } |
| } |
| |
| void CZClient::StopMonitoring( void ) |
| { |
| const char method_name[] = "CZClient::StopMonitoring"; |
| TRACE_ENTRY; |
| ZClient->StateSet( CZClient::ZC_STOP ); |
| ZClient->CLock::wakeOne(); |
| TRACE_EXIT; |
| } |
| |
| char* CZClient::StrCpyLeafZNode( char* znode, const char* znodePath ) |
| { |
| char pathStr[MAX_PROCESSOR_NAME] = { 0 }; |
| char *tkn = NULL; |
| char *tknStart = pathStr; |
| char *tknLast = NULL; |
| |
| strcpy( pathStr, znodePath ); |
| |
| tknStart++; // skip the first '/' |
| tkn = strtok( tknStart, "/" ); |
| do |
| { |
| tknLast = tkn; |
| tkn = strtok( NULL, "/" ); |
| } |
| while( tkn != NULL ); |
| |
| strcpy( znode, tknLast ); |
| |
| return( znode ); |
| } |
| |
| void CZClient::TimeToWakeUpSet( struct timespec &ts ) |
| { |
| const char method_name[] = "CZClient::TimeToWakeUpSet"; |
| TRACE_ENTRY; |
| |
| clock_gettime(CLOCK_REALTIME, &ts); |
| #if 0 |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf("%s@%d" " - Clock time %ld(secs):%ld(nsecs)(zcMonitoringRate_=%ld)\n" |
| , method_name, __LINE__ |
| , ts.tv_sec, ts.tv_nsec, zcMonitoringRate_); |
| } |
| #endif |
| |
| ts.tv_sec += zcMonitoringRate_; |
| |
| #if 0 |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf("%s@%d" " - Timeout time %ld(secs):%ld(nsecs)(zcMonitoringRate_=%ld)\n" |
| , method_name, __LINE__ |
| , ts.tv_sec, ts.tv_nsec, zcMonitoringRate_); |
| } |
| #endif |
| TRACE_EXIT; |
| } |
| |
| void CZClient::TriggerCheck( int type, const char *znodePath ) |
| { |
| const char method_name[] = "CZClient::TriggerCheck"; |
| TRACE_ENTRY; |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d" " - type=%s, path=%s\n" |
| , method_name, __LINE__ |
| , ZooConnectionTypeStr( type ) |
| , znodePath ); |
| } |
| |
| CAutoLock lock(getLocker()); |
| if ( StateGet() != ZC_SHUTDOWN ) |
| { |
| if ( type == ZOO_CHANGED_EVENT ) |
| { |
| StateSet( type, ZC_ZNODE_CHANGED, znodePath ); |
| } |
| else if ( type == ZOO_CHILD_EVENT ) |
| { |
| string znode; |
| znode.assign( znodePath ); |
| |
| if (configuredZNodePath_.compare( znode ) == 0) |
| { |
| // We are here due to a configured ZC_ZNODE_CHILD so reset the watch |
| ConfiguredZNodesWatchSet(); |
| } |
| else if (errorZNodePath_.compare( znode ) == 0) |
| { |
| // We are here due to an error ZC_ZNODE_CHILD so reset the watch |
| ErrorZNodesWatchSet(); |
| } |
| |
| StateSet( type, ZC_ZNODE_CHILD, znodePath ); |
| } |
| else if ( type == ZOO_CREATED_EVENT ) |
| { |
| StateSet( type, ZC_ZNODE_CREATED, znodePath ); |
| } |
| else if ( type == ZOO_DELETED_EVENT ) |
| { |
| StateSet( type, ZC_ZNODE_DELETED, znodePath ); |
| } |
| else if ( type == ZOO_NOTWATCHING_EVENT ) |
| { |
| StateSet( ZC_CLUSTER ); |
| } |
| |
| CLock::wakeOne(); |
| } |
| TRACE_EXIT; |
| } |
| |
| int CZClient::ZNodeCreate( const char *znodePath |
| , const char *znodeData |
| , int flags |
| , bool existOk ) |
| { |
| const char method_name[] = "CZClient::ZNodeCreate"; |
| TRACE_ENTRY; |
| |
| int rc = -1; |
| char realpath[1024] = { 0 }; |
| |
| stringstream ss; |
| ss.str( "" ); |
| ss << znodePath; |
| string zpath( ss.str( ) ); |
| |
| ss.str( "" ); |
| ss << ((znodeData) ? znodeData : ""); |
| string zdata( ss.str( ) ); |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d zoo_create (%s:%s)\n" |
| , method_name, __LINE__ |
| , zpath.c_str() |
| , zdata.c_str()); |
| } |
| rc = zoo_create( ZHandle |
| , zpath.c_str( ) |
| , zdata.length() ? zdata.c_str() : NULL |
| , zdata.length() ? zdata.length() : -1 |
| , &ZOO_OPEN_ACL_UNSAFE |
| , flags |
| , realpath |
| , sizeof(realpath)-1 ); |
| if ( rc != ZOK ) |
| { |
| if ( rc != ZNODEEXISTS || |
| (rc == ZNODEEXISTS && !existOk) ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], zoo_create(%s) failed with error %s\n" |
| , method_name |
| , zpath.c_str() |
| , zerror(rc) ); |
| mon_log_write(MON_ZCLIENT_ZNODECREATE_1, SQ_LOG_ERR, buf); |
| } |
| } |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf("%s@%d realpath=%s\n", method_name, __LINE__, realpath); |
| } |
| |
| TRACE_EXIT; |
| return( rc ); |
| } |
| |
| int CZClient::ZNodeDataGet( string &monZnode, string &nodeName, int &pnid ) |
| { |
| const char method_name[] = "CZClient::ZNodeDataGet"; |
| TRACE_ENTRY; |
| |
| char pnidStr[8] = { 0 }; |
| char *tkn = NULL; |
| char zkData[MAX_PROCESSOR_NAME]; |
| int rc = -1; |
| int zkDataLen = sizeof(zkData); |
| Stat stat; |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d monZnode=%s\n" |
| , method_name, __LINE__, monZnode.c_str() ); |
| } |
| rc = ZooExistRetry( ZHandle, monZnode.c_str( ), 0, &stat ); |
| if ( rc == ZNONODE ) |
| { |
| // return the error |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d monZnode=%s does not exist (ZNONODE)\n" |
| , method_name, __LINE__, monZnode.c_str() ); |
| } |
| } |
| else if ( rc == ZOK ) |
| { |
| // Get the pnid from the data part of znode |
| rc = zoo_get( ZHandle, monZnode.c_str( ), false, zkData, &zkDataLen, &stat ); |
| if ( rc == ZOK ) |
| { |
| // The first token is the node name |
| tkn = strtok( zkData, ":" ); |
| if ( tkn != NULL ) |
| { |
| nodeName = tkn; |
| } |
| tkn = strtok( NULL, ":" ); |
| if ( tkn != NULL ) |
| { |
| strcpy( pnidStr, tkn ); |
| pnid = atoi( pnidStr ); |
| } |
| // TODO: Save monZnode path in corresponding physical node object |
| // to match with when ZC_NODE is triggered |
| } |
| else |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], zoo_get() for %s failed with error %s\n" |
| , method_name, monZnode.c_str( ), zerror(rc)); |
| mon_log_write(MON_ZCLIENT_ZNODEDATAGET_1, SQ_LOG_ERR, buf); |
| } |
| } |
| else |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], zoo_exists() for %s failed with error %s\n" |
| , method_name, monZnode.c_str( ), zerror(rc)); |
| mon_log_write(MON_ZCLIENT_ZNODEDATAGET_2, SQ_LOG_ERR, buf); |
| } |
| |
| TRACE_EXIT; |
| return( rc ); |
| } |
| |
| int CZClient::ZNodeDelete( string &znode ) |
| { |
| const char method_name[] = "CZClient::ZNodeDelete"; |
| TRACE_ENTRY; |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d Deleting znode=%s\n" |
| , method_name, __LINE__ |
| , znode.c_str() ); |
| } |
| |
| int rc = -1; |
| rc = zoo_delete( ZHandle |
| , znode.c_str() |
| , -1 ); |
| if ( rc == ZOK || rc == ZNONODE) |
| { |
| if ( rc == ZNONODE) |
| { |
| // This is ok since we call it indiscriminately |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d znode=%s already deleted!\n" |
| , method_name, __LINE__ |
| , znode.c_str() ); |
| } |
| } |
| else |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d znode=%s deleted!\n" |
| , method_name, __LINE__ |
| , znode.c_str() ); |
| } |
| } |
| } |
| else if ( rc == ZCONNECTIONLOSS || |
| rc == ZOPERATIONTIMEOUT ) |
| { |
| rc = ZOK; |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], znode (%s) cannot be accessed!\n" |
| , method_name, znode.c_str() ); |
| mon_log_write(MON_ZCLIENT_ZNODEDELETE_1, SQ_LOG_INFO, buf); |
| } |
| else |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], zoo_delete(%s) failed with error %s\n" |
| , method_name, znode.c_str(), zerror(rc) ); |
| mon_log_write(MON_ZCLIENT_ZNODEDELETE_1, SQ_LOG_CRIT, buf); |
| switch ( rc ) |
| { |
| case ZSYSTEMERROR: |
| case ZRUNTIMEINCONSISTENCY: |
| case ZDATAINCONSISTENCY: |
| case ZMARSHALLINGERROR: |
| case ZUNIMPLEMENTED: |
| case ZBADARGUMENTS: |
| case ZINVALIDSTATE: |
| case ZSESSIONEXPIRED: |
| case ZCLOSING: |
| // Treat these error like a session expiration, since |
| // we can't communicate with quorum servers |
| HandleMyNodeExpiration(); |
| break; |
| default: |
| break; |
| } |
| } |
| |
| TRACE_EXIT; |
| return( rc ); |
| } |
| |
| int CZClient::ZNodeWatchReset( string &znode ) |
| { |
| const char method_name[] = "CZClient::ZNodeWatchReset"; |
| TRACE_ENTRY; |
| |
| char zkData[MAX_PROCESSOR_NAME]; |
| int rc = -1; |
| int zkDataLen = sizeof(zkData); |
| Stat stat; |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d znode=%s\n" |
| , method_name, __LINE__, znode.c_str() ); |
| } |
| rc = ZooExistRetry( ZHandle, znode.c_str( ), 0, &stat ); |
| if ( rc == ZNONODE || |
| rc == ZCONNECTIONLOSS || |
| rc == ZOPERATIONTIMEOUT ) |
| { |
| // return the error |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d znode=%s does not exist or " |
| "cannot be accessed!\n" |
| , method_name, __LINE__, znode.c_str() ); |
| } |
| } |
| else if ( rc == ZOK ) |
| { |
| // Reset a watch on monZode |
| int watch = 0; |
| rc = zoo_get( ZHandle, znode.c_str( ), watch, zkData, &zkDataLen, &stat ); |
| if ( rc != ZOK ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], zoo_get() for %s failed with error %s\n" |
| , method_name, znode.c_str( ), zerror(rc)); |
| mon_log_write(MON_ZCLIENT_ZNODEWATCHRESET_1, SQ_LOG_ERR, buf); |
| } |
| } |
| else |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], zoo_exists() for %s failed with error %s\n" |
| , method_name, znode.c_str( ), zerror(rc)); |
| mon_log_write(MON_ZCLIENT_ZNODEWATCHRESET_2, SQ_LOG_CRIT, buf); |
| switch ( rc ) |
| { |
| case ZSYSTEMERROR: |
| case ZRUNTIMEINCONSISTENCY: |
| case ZDATAINCONSISTENCY: |
| case ZMARSHALLINGERROR: |
| case ZUNIMPLEMENTED: |
| case ZBADARGUMENTS: |
| case ZINVALIDSTATE: |
| case ZSESSIONEXPIRED: |
| case ZCLOSING: |
| // Treat these error like a session expiration, since |
| // we can't communicate with quorum servers |
| HandleMyNodeExpiration(); |
| break; |
| default: |
| break; |
| } |
| } |
| |
| TRACE_EXIT; |
| return( rc ); |
| } |
| |
| int CZClient::ZNodeWatchSet( string &znode ) |
| { |
| const char method_name[] = "CZClient::ZNodeWatchSet"; |
| TRACE_ENTRY; |
| |
| char zkData[MAX_PROCESSOR_NAME]; |
| int rc = -1; |
| int zkDataLen = sizeof(zkData); |
| Stat stat; |
| |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d znode=%s\n" |
| , method_name, __LINE__, znode.c_str() ); |
| } |
| rc = ZooExistRetry( ZHandle, znode.c_str( ), 0, &stat ); |
| if ( rc == ZNONODE || |
| rc == ZCONNECTIONLOSS || |
| rc == ZOPERATIONTIMEOUT ) |
| { |
| // return the error |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d znode=%s does not exist or " |
| "cannot be accessed!\n" |
| , method_name, __LINE__, znode.c_str() ); |
| } |
| } |
| else if ( rc == ZOK ) |
| { |
| // Set a watch on monZode |
| int watch = 1; |
| rc = zoo_get( ZHandle, znode.c_str( ), watch, zkData, &zkDataLen, &stat ); |
| if ( rc != ZOK ) |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], zoo_get() for %s failed with error %s\n" |
| , method_name, znode.c_str( ), zerror(rc)); |
| mon_log_write(MON_ZCLIENT_ZNODEWATCHSET_1, SQ_LOG_ERR, buf); |
| } |
| } |
| else |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], zoo_exists() for %s failed with error %s\n" |
| , method_name, znode.c_str( ), zerror(rc)); |
| mon_log_write(MON_ZCLIENT_ZNODEWATCHSET_2, SQ_LOG_CRIT, buf); |
| switch ( rc ) |
| { |
| case ZSYSTEMERROR: |
| case ZRUNTIMEINCONSISTENCY: |
| case ZDATAINCONSISTENCY: |
| case ZMARSHALLINGERROR: |
| case ZUNIMPLEMENTED: |
| case ZBADARGUMENTS: |
| case ZINVALIDSTATE: |
| case ZSESSIONEXPIRED: |
| case ZCLOSING: |
| // Treat these error like a session expiration, since |
| // we can't communicate with quorum servers |
| HandleMyNodeExpiration(); |
| break; |
| default: |
| break; |
| } |
| } |
| |
| TRACE_EXIT; |
| return( rc ); |
| } |
| |
| int CZClient::ZNodeWatchChildSet( string &parentznode ) |
| { |
| const char method_name[] = "CZClient::ZNodeWatchChildSet"; |
| TRACE_ENTRY; |
| |
| bool found = false; |
| int rc = -1; |
| int retries = 0; |
| int watch = 1; |
| Stat stat; |
| struct String_vector nodes; |
| |
| nodes.count = 0; |
| nodes.data = NULL; |
| |
| while ( !found ) |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d parentznode=%s\n" |
| , method_name, __LINE__, parentznode.c_str() ); |
| } |
| // Verify the existence of the parent |
| rc = ZooExistRetry( ZHandle, parentznode.c_str( ), 0, &stat ); |
| if ( rc == ZNONODE ) |
| { |
| if (retries > 10) |
| break; |
| retries++; |
| continue; |
| } |
| else if ( rc == ZOK ) |
| { |
| // Now get the list of available znodes in the cluster. |
| // |
| // This will return child znodes for each monitor process that has |
| // registered, including this process. |
| rc = zoo_get_children( ZHandle, parentznode.c_str( ), watch, &nodes ); |
| if ( rc == ZOK ) |
| { |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d nodes.count=%d\n" |
| , method_name, __LINE__ |
| , nodes.count ); |
| } |
| FreeStringVector( &nodes ); |
| found = true; |
| } |
| else |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], zoo_get_children(%s) failed with error %s\n" |
| , method_name, parentznode.c_str( ), zerror(rc)); |
| mon_log_write(MON_ZCLIENT_ZNODEWATCHCHILDSET_1, SQ_LOG_ERR, buf); |
| break; |
| } |
| } |
| else // error |
| { |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], zoo_exists() for %s failed with error %s\n" |
| , method_name, parentznode.c_str( ), zerror(rc)); |
| mon_log_write(MON_ZCLIENT_ZNODEWATCHCHILDSET_2, SQ_LOG_ERR, buf); |
| break; |
| } |
| } |
| |
| TRACE_EXIT; |
| return( rc ); |
| } |
| |
| int CZClient::ZNodesTreeCreate( void ) |
| { |
| const char method_name[] = "CZClient::ZNodesTreeCreate"; |
| TRACE_ENTRY; |
| |
| int rc; |
| Stat stat; |
| |
| stringstream ss; |
| ss.str( "" ); |
| ss << zkRootNode_.c_str(); |
| string rootDir( ss.str( ) ); |
| |
| rc = ZooExistRetry( ZHandle, rootDir.c_str(), 0, &stat ); |
| switch (rc) |
| { |
| case ZOK: |
| break; |
| case ZNONODE: |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d ZNodeCreate(%s)\n" |
| , method_name, __LINE__ |
| , rootDir.c_str() ); |
| } |
| rc = ZNodeCreate( rootDir.c_str(), NULL, 0 ); |
| if ( rc && rc != ZNODEEXISTS ) |
| { |
| return(rc); // Return the error |
| } |
| rc = ZOK; |
| break; |
| default: |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], zoo_exists(%s) failed with error %s\n" |
| , method_name, rootDir.c_str(), zerror(rc) ); |
| mon_log_write(MON_ZCLIENT_ZNODESTREECREATE_1, SQ_LOG_ERR, buf); |
| if (rc) return(rc); // Return the error |
| break; |
| } |
| |
| ss.str( "" ); |
| ss << zkRootNode_.c_str() |
| << zkRootNodeInstance_.c_str(); |
| string instanceDir( ss.str( ) ); |
| |
| rc = ZooExistRetry( ZHandle, instanceDir.c_str( ), 0, &stat ); |
| switch (rc) |
| { |
| case ZOK: |
| break; |
| case ZNONODE: |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d ZNodeCreate(%s)\n" |
| , method_name, __LINE__ |
| , instanceDir.c_str() ); |
| } |
| rc = ZNodeCreate( instanceDir.c_str(), NULL, 0 ); |
| if ( rc && rc != ZNODEEXISTS ) |
| { |
| return(rc); // Return the error |
| } |
| rc = ZOK; |
| break; |
| default: |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], zoo_exists(%s) failed with error %s\n" |
| , method_name, instanceDir.c_str( ), zerror(rc) ); |
| mon_log_write(MON_ZCLIENT_ZNODESTREECREATE_2, SQ_LOG_ERR, buf); |
| break; |
| } |
| |
| ss.str( "" ); |
| ss << zkRootNode_.c_str() |
| << zkRootNodeInstance_.c_str() |
| << ZCLIENT_CLUSTER_ZNODE; |
| clusterZNodePath_ = ss.str(); |
| |
| rc = ZooExistRetry( ZHandle, clusterZNodePath_.c_str( ), 0, &stat ); |
| switch (rc) |
| { |
| case ZOK: |
| break; |
| case ZNONODE: |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d ZNodeCreate(%s)\n" |
| , method_name, __LINE__ |
| , clusterZNodePath_.c_str() ); |
| } |
| rc = ZNodeCreate( clusterZNodePath_.c_str(), NULL, 0 ); |
| if ( rc && rc != ZNODEEXISTS ) |
| { |
| return(rc); // Return the error |
| } |
| rc = ZOK; |
| break; |
| default: |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], zoo_exists(%s) failed with error %s\n" |
| , method_name, clusterZNodePath_.c_str(), zerror(rc) ); |
| mon_log_write(MON_ZCLIENT_ZNODESTREECREATE_3, SQ_LOG_ERR, buf); |
| break; |
| } |
| |
| ss.str( "" ); |
| ss << zkRootNode_.c_str() |
| << zkRootNodeInstance_.c_str() |
| << ZCLIENT_CLUSTER_ZNODE |
| << ZCLIENT_CONFIGURED_ZNODE; |
| configuredZNodePath_ = ss.str(); |
| |
| rc = ZooExistRetry( ZHandle, configuredZNodePath_.c_str( ), 0, &stat ); |
| switch (rc) |
| { |
| case ZOK: |
| break; |
| case ZNONODE: |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d ZNodeCreate(%s)\n" |
| , method_name, __LINE__ |
| , configuredZNodePath_.c_str() ); |
| } |
| rc = ZNodeCreate( configuredZNodePath_.c_str(), NULL, 0 ); |
| if ( rc && rc != ZNODEEXISTS ) |
| { |
| return(rc); // Return the error |
| } |
| rc = ZOK; |
| break; |
| default: |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], zoo_exists(%s) failed with error %s\n" |
| , method_name, configuredZNodePath_.c_str(), zerror(rc) ); |
| mon_log_write(MON_ZCLIENT_ZNODESTREECREATE_4, SQ_LOG_ERR, buf); |
| break; |
| } |
| |
| ss.str( "" ); |
| ss << zkRootNode_.c_str() |
| << zkRootNodeInstance_.c_str() |
| << ZCLIENT_CLUSTER_ZNODE |
| << ZCLIENT_ERROR_ZNODE; |
| errorZNodePath_ = ss.str(); |
| |
| rc = ZooExistRetry( ZHandle, errorZNodePath_.c_str( ), 0, &stat ); |
| switch (rc) |
| { |
| case ZOK: |
| break; |
| case ZNONODE: |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d ZNodeCreate(%s)\n" |
| , method_name, __LINE__ |
| , errorZNodePath_.c_str() ); |
| } |
| rc = ZNodeCreate( errorZNodePath_.c_str(), NULL, 0 ); |
| if ( rc && rc != ZNODEEXISTS ) |
| { |
| return(rc); // Return the error |
| } |
| rc = ZOK; |
| break; |
| default: |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], zoo_exists(%s) failed with error %s\n" |
| , method_name, errorZNodePath_.c_str(), zerror(rc) ); |
| mon_log_write(MON_ZCLIENT_ZNODESTREECREATE_6, SQ_LOG_ERR, buf); |
| break; |
| } |
| |
| ss.str( "" ); |
| ss << zkRootNode_.c_str() |
| << zkRootNodeInstance_.c_str() |
| << ZCLIENT_CLUSTER_ZNODE |
| << ZCLIENT_RUNNING_ZNODE; |
| runningZNodePath_ = ss.str(); |
| |
| rc = ZooExistRetry( ZHandle, runningZNodePath_.c_str( ), 0, &stat ); |
| switch (rc) |
| { |
| case ZOK: |
| break; |
| case ZNONODE: |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d ZNodeCreate(%s)\n" |
| , method_name, __LINE__ |
| , runningZNodePath_.c_str() ); |
| } |
| rc = ZNodeCreate( runningZNodePath_.c_str(), NULL, 0 ); |
| if ( rc && rc != ZNODEEXISTS ) |
| { |
| return(rc); // Return the error |
| } |
| rc = ZOK; |
| break; |
| default: |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], zoo_exists(%s) failed with error %s\n" |
| , method_name, runningZNodePath_.c_str(), zerror(rc) ); |
| mon_log_write(MON_ZCLIENT_ZNODESTREECREATE_5, SQ_LOG_ERR, buf); |
| break; |
| } |
| |
| ss.str( "" ); |
| ss << zkRootNode_.c_str() |
| << zkRootNodeInstance_.c_str() |
| << ZCLIENT_MONITOR_ZNODE; |
| monitorZNodePath_ = ss.str(); |
| |
| rc = ZooExistRetry( ZHandle, monitorZNodePath_.c_str( ), 0, &stat ); |
| switch (rc) |
| { |
| case ZOK: |
| break; |
| case ZNONODE: |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d ZNodeCreate(%s)\n" |
| , method_name, __LINE__ |
| , monitorZNodePath_.c_str() ); |
| } |
| rc = ZNodeCreate( monitorZNodePath_.c_str(), NULL, 0 ); |
| if ( rc && rc != ZNODEEXISTS ) |
| { |
| return(rc); // Return the error |
| } |
| rc = ZOK; |
| break; |
| default: |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], zoo_exists(%s) failed with error %s\n" |
| , method_name, monitorZNodePath_.c_str(), zerror(rc) ); |
| mon_log_write(MON_ZCLIENT_ZNODESTREECREATE_7, SQ_LOG_ERR, buf); |
| break; |
| } |
| |
| ss.str( "" ); |
| ss << zkRootNode_.c_str() |
| << zkRootNodeInstance_.c_str() |
| << ZCLIENT_MONITOR_ZNODE |
| << ZCLIENT_MASTER_ZNODE; |
| string masterDir( ss.str( ) ); |
| masterZNodePath_ = ss.str(); |
| |
| rc = ZooExistRetry( ZHandle, masterZNodePath_.c_str( ), 0, &stat ); |
| switch (rc) |
| { |
| case ZOK: |
| break; |
| case ZNONODE: |
| if (trace_settings & (TRACE_INIT | TRACE_RECOVERY)) |
| { |
| trace_printf( "%s@%d Invoking ZNodeCreate(%s)\n" |
| , method_name, __LINE__ |
| , masterZNodePath_.c_str() ); |
| } |
| rc = ZNodeCreate( masterZNodePath_.c_str(), NULL, 0 ); |
| if ( rc && rc != ZNODEEXISTS ) |
| { |
| return(rc); // Return the error |
| } |
| rc = ZOK; |
| break; |
| default: |
| char buf[MON_STRING_BUF_SIZE]; |
| snprintf( buf, sizeof(buf) |
| , "[%s], zoo_exists(%s) failed with error %s\n" |
| , method_name, masterZNodePath_.c_str(), zerror(rc) ); |
| mon_log_write(MON_ZCLIENT_ZNODESTREECREATE_8, SQ_LOG_ERR, buf); |
| break; |
| } |
| |
| TRACE_EXIT; |
| return(rc); |
| } |
| |
| int CZClient::ZooExistRetry(zhandle_t *zh, const char *path, int watch, struct Stat *stat) |
| { |
| int retries = 0; |
| int rc; |
| rc = zoo_exists(zh, path, watch, stat); |
| |
| // retry when loss zconnection or timeout, this may be caused by one zookeeper server down |
| while ( (rc == ZCONNECTIONLOSS |
| || rc == ZOPERATIONTIMEOUT |
| || rc == ZSESSIONMOVED) |
| && retries < ZOOKEEPER_RETRY_COUNT) |
| { |
| sleep(ZOOKEEPER_RETRY_WAIT); |
| retries++; |
| rc = zoo_exists(zh, path, watch, stat); |
| } |
| return rc; |
| } |
| |