| package org.apache.helix.manager.zk; |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| import java.lang.management.ManagementFactory; |
| import java.net.InetAddress; |
| import java.net.UnknownHostException; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Timer; |
| import java.util.concurrent.TimeUnit; |
| |
| import org.I0Itec.zkclient.ZkConnection; |
| import org.apache.helix.BaseDataAccessor; |
| import org.apache.helix.ClusterMessagingService; |
| import org.apache.helix.ConfigAccessor; |
| import org.apache.helix.ConfigChangeListener; |
| import org.apache.helix.AccessOption; |
| import org.apache.helix.ControllerChangeListener; |
| import org.apache.helix.CurrentStateChangeListener; |
| import org.apache.helix.ExternalViewChangeListener; |
| import org.apache.helix.HealthStateChangeListener; |
| import org.apache.helix.HelixAdmin; |
| import org.apache.helix.HelixConstants.ChangeType; |
| import org.apache.helix.HelixDataAccessor; |
| import org.apache.helix.HelixException; |
| import org.apache.helix.HelixManager; |
| import org.apache.helix.HelixTimerTask; |
| import org.apache.helix.IdealStateChangeListener; |
| import org.apache.helix.InstanceConfigChangeListener; |
| import org.apache.helix.InstanceType; |
| import org.apache.helix.LiveInstanceChangeListener; |
| import org.apache.helix.LiveInstanceInfoProvider; |
| import org.apache.helix.MessageListener; |
| import org.apache.helix.PreConnectCallback; |
| import org.apache.helix.HelixManagerProperties; |
| import org.apache.helix.PropertyKey; |
| import org.apache.helix.PropertyKey.Builder; |
| import org.apache.helix.PropertyPathConfig; |
| import org.apache.helix.PropertyType; |
| import org.apache.helix.ScopedConfigChangeListener; |
| import org.apache.helix.ZNRecord; |
| import org.apache.helix.controller.restlet.ZKPropertyTransferServer; |
| import org.apache.helix.healthcheck.HealthStatsAggregationTask; |
| import org.apache.helix.healthcheck.ParticipantHealthReportCollector; |
| import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl; |
| import org.apache.helix.messaging.DefaultMessagingService; |
| import org.apache.helix.messaging.handling.MessageHandlerFactory; |
| import org.apache.helix.model.ConfigScope; |
| import org.apache.helix.model.CurrentState; |
| import org.apache.helix.model.HelixConfigScope; |
| import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty; |
| import org.apache.helix.model.InstanceConfig; |
| import org.apache.helix.model.LiveInstance; |
| import org.apache.helix.model.Message.MessageType; |
| import org.apache.helix.model.builder.ConfigScopeBuilder; |
| import org.apache.helix.model.builder.HelixConfigScopeBuilder; |
| import org.apache.helix.model.StateModelDefinition; |
| import org.apache.helix.monitoring.ZKPathDataDumpTask; |
| import org.apache.helix.participant.DistClusterControllerElection; |
| import org.apache.helix.participant.HelixStateMachineEngine; |
| import org.apache.helix.participant.StateMachineEngine; |
| import org.apache.helix.participant.statemachine.ScheduledTaskStateModelFactory; |
| import org.apache.helix.store.zk.ZkHelixPropertyStore; |
| import org.apache.log4j.Logger; |
| import org.apache.zookeeper.Watcher.Event.EventType; |
| import org.apache.zookeeper.Watcher.Event.KeeperState; |
| |
| |
| public class ZKHelixManager implements HelixManager |
| { |
| private static Logger logger = |
| Logger.getLogger(ZKHelixManager.class); |
| private static final int RETRY_LIMIT = 3; |
| private static final int CONNECTIONTIMEOUT = 60 * 1000; |
| private final String _clusterName; |
| private final String _instanceName; |
| private final String _zkConnectString; |
| private static final int DEFAULT_SESSION_TIMEOUT = 30 * 1000; |
| private ZKHelixDataAccessor _helixAccessor; |
| private ConfigAccessor _configAccessor; |
| protected ZkClient _zkClient; |
| protected final List<CallbackHandler> _handlers = new ArrayList<CallbackHandler>(); |
| private final ZkStateChangeListener _zkStateChangeListener; |
| private final InstanceType _instanceType; |
| volatile String _sessionId; |
| private Timer _timer; |
| private CallbackHandler _leaderElectionHandler; |
| private ParticipantHealthReportCollectorImpl _participantHealthCheckInfoCollector; |
| private final DefaultMessagingService _messagingService; |
| private ZKHelixAdmin _managementTool; |
| private final String _version; |
| private final HelixManagerProperties _properties; |
| private final StateMachineEngine _stateMachEngine; |
| private int _sessionTimeout; |
| private ZkHelixPropertyStore<ZNRecord> _helixPropertyStore; |
| private final List<HelixTimerTask> _controllerTimerTasks; |
| private BaseDataAccessor<ZNRecord> _baseDataAccessor; |
| List<PreConnectCallback> _preConnectCallbacks = |
| new LinkedList<PreConnectCallback>(); |
| ZKPropertyTransferServer _transferServer = null; |
| int _flappingTimeWindowMs; |
| int _maxDisconnectThreshold; |
| public static final int FLAPPING_TIME_WINDIOW = 300000; // Default to 300 sec |
| public static final int MAX_DISCONNECT_THRESHOLD = 5; |
| LiveInstanceInfoProvider _liveInstanceInfoProvider = null; |
| public static final String ALLOW_PARTICIPANT_AUTO_JOIN = "allowParticipantAutoJoin"; |
| |
| public ZKHelixManager(String clusterName, |
| String instanceName, |
| InstanceType instanceType, |
| String zkConnectString) |
| { |
| logger.info("Create a zk-based cluster manager. clusterName:" + clusterName |
| + ", instanceName:" + instanceName + ", type:" + instanceType + ", zkSvr:" |
| + zkConnectString); |
| _flappingTimeWindowMs = FLAPPING_TIME_WINDIOW; |
| try |
| { |
| _flappingTimeWindowMs = |
| Integer.parseInt(System.getProperty("helixmanager.flappingTimeWindow", "" |
| + FLAPPING_TIME_WINDIOW)); |
| } |
| catch (NumberFormatException e) |
| { |
| logger.warn("Exception while parsing helixmanager.flappingTimeWindow: " |
| + System.getProperty("helixmanager.flappingTimeWindow", "" + FLAPPING_TIME_WINDIOW)); |
| } |
| _maxDisconnectThreshold = MAX_DISCONNECT_THRESHOLD; |
| try |
| { |
| _maxDisconnectThreshold = |
| Integer.parseInt(System.getProperty("helixmanager.maxDisconnectThreshold", "" |
| + MAX_DISCONNECT_THRESHOLD)); |
| } |
| catch (NumberFormatException e) |
| { |
| logger.warn("Exception while parsing helixmanager.maxDisconnectThreshold: " |
| + System.getProperty("helixmanager.maxDisconnectThreshold", "" + MAX_DISCONNECT_THRESHOLD)); |
| } |
| int sessionTimeoutInt = -1; |
| try |
| { |
| sessionTimeoutInt = |
| Integer.parseInt(System.getProperty("zk.session.timeout", "" |
| + DEFAULT_SESSION_TIMEOUT)); |
| } |
| catch (NumberFormatException e) |
| { |
| logger.warn("Exception while parsing session timeout: " |
| + System.getProperty("zk.session.timeout", "" + DEFAULT_SESSION_TIMEOUT)); |
| } |
| if (sessionTimeoutInt > 0) |
| { |
| _sessionTimeout = sessionTimeoutInt; |
| } |
| else |
| { |
| _sessionTimeout = DEFAULT_SESSION_TIMEOUT; |
| } |
| if (instanceName == null) |
| { |
| try |
| { |
| instanceName = |
| InetAddress.getLocalHost().getCanonicalHostName() + "-" |
| + instanceType.toString(); |
| } |
| catch (UnknownHostException e) |
| { |
| // can ignore it |
| logger.info("Unable to get host name. Will set it to UNKNOWN, mostly ignorable", |
| e); |
| instanceName = "UNKNOWN"; |
| } |
| } |
| |
| _clusterName = clusterName; |
| _instanceName = instanceName; |
| _instanceType = instanceType; |
| _zkConnectString = zkConnectString; |
| _zkStateChangeListener = new ZkStateChangeListener(this, _flappingTimeWindowMs, _maxDisconnectThreshold); |
| _timer = null; |
| |
| _messagingService = new DefaultMessagingService(this); |
| |
| _properties = |
| new HelixManagerProperties("cluster-manager-version.properties"); |
| _version = _properties.getVersion(); |
| |
| _stateMachEngine = new HelixStateMachineEngine(this); |
| |
| // add all timer tasks |
| _controllerTimerTasks = new ArrayList<HelixTimerTask>(); |
| if (_instanceType == InstanceType.CONTROLLER) |
| { |
| _controllerTimerTasks.add(new HealthStatsAggregationTask(this)); |
| } |
| } |
| |
| @Override |
| public boolean removeListener(PropertyKey key, Object listener) |
| { |
| logger.info("Removing listener: " + listener + " on path: " + key.getPath() |
| + " from cluster: " + _clusterName + " by instance: " + _instanceName); |
| |
| synchronized (this) |
| { |
| List<CallbackHandler> toRemove = new ArrayList<CallbackHandler>(); |
| for (CallbackHandler handler : _handlers) |
| { |
| // compare property-key path and listener reference |
| if (handler.getPath().equals(key.getPath()) && handler.getListener().equals(listener)) |
| { |
| toRemove.add(handler); |
| } |
| } |
| |
| _handlers.removeAll(toRemove); |
| |
| // handler.reset() may modify the handlers list, so do it outside the iteration |
| for (CallbackHandler handler : toRemove) { |
| handler.reset(); |
| } |
| } |
| |
| return true; |
| } |
| |
| private void addListener(Object listener, PropertyKey propertyKey, ChangeType changeType, EventType[] eventType) |
| { |
| checkConnected(); |
| |
| PropertyType type = propertyKey.getType(); |
| |
| synchronized (this) |
| { |
| for (CallbackHandler handler : _handlers) |
| { |
| // compare property-key path and listener reference |
| if (handler.getPath().equals(propertyKey.getPath()) && handler.getListener().equals(listener)) |
| { |
| logger.info("Listener: " + listener + " on path: " + propertyKey.getPath() + " already exists. skip adding it"); |
| return; |
| } |
| } |
| |
| CallbackHandler newHandler = |
| createCallBackHandler(propertyKey, listener, eventType, changeType); |
| _handlers.add(newHandler); |
| logger.info("Add listener: " + listener + " for type: " + type + " to path: " + newHandler.getPath()); |
| } |
| } |
| |
| @Override |
| public void addIdealStateChangeListener(final IdealStateChangeListener listener) throws Exception |
| { |
| addListener(listener, new Builder(_clusterName).idealStates(), ChangeType.IDEAL_STATE, |
| new EventType[] { EventType.NodeDataChanged, EventType.NodeDeleted, EventType.NodeCreated }); |
| } |
| |
| @Override |
| public void addLiveInstanceChangeListener(LiveInstanceChangeListener listener) throws Exception |
| { |
| addListener(listener, new Builder(_clusterName).liveInstances(), ChangeType.LIVE_INSTANCE, |
| new EventType[] { EventType.NodeDataChanged, EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated }); |
| } |
| |
| @Override |
| public void addConfigChangeListener(ConfigChangeListener listener) |
| { |
| addListener(listener, new Builder(_clusterName).instanceConfigs(), ChangeType.INSTANCE_CONFIG, |
| new EventType[] { EventType.NodeChildrenChanged }); |
| } |
| |
| @Override |
| public void addInstanceConfigChangeListener(InstanceConfigChangeListener listener) |
| { |
| addListener(listener, new Builder(_clusterName).instanceConfigs(), ChangeType.INSTANCE_CONFIG, |
| new EventType[] { EventType.NodeChildrenChanged }); |
| } |
| |
| @Override |
| public void addConfigChangeListener(ScopedConfigChangeListener listener, ConfigScopeProperty scope) |
| { |
| Builder keyBuilder = new Builder(_clusterName); |
| |
| PropertyKey propertyKey = null; |
| switch(scope) |
| { |
| case CLUSTER: |
| propertyKey = keyBuilder.clusterConfigs(); |
| break; |
| case PARTICIPANT: |
| propertyKey = keyBuilder.instanceConfigs(); |
| break; |
| case RESOURCE: |
| propertyKey = keyBuilder.resourceConfigs(); |
| break; |
| default: |
| break; |
| } |
| |
| if (propertyKey != null) |
| { |
| addListener(listener, propertyKey, ChangeType.CONFIG, |
| new EventType[] { EventType.NodeChildrenChanged }); |
| } else |
| { |
| logger.error("Can't add listener to config scope: " + scope); |
| } |
| } |
| |
| // TODO: Decide if do we still need this since we are exposing |
| // ClusterMessagingService |
| @Override |
| public void addMessageListener(MessageListener listener, String instanceName) |
| { |
| addListener(listener, new Builder(_clusterName).messages(instanceName), ChangeType.MESSAGE, |
| new EventType[] { EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated }); |
| } |
| |
| void addControllerMessageListener(MessageListener listener) |
| { |
| addListener(listener, new Builder(_clusterName).controllerMessages(), ChangeType.MESSAGES_CONTROLLER, |
| new EventType[] { EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated }); |
| } |
| |
| @Override |
| public void addCurrentStateChangeListener(CurrentStateChangeListener listener, |
| String instanceName, |
| String sessionId) |
| { |
| addListener(listener, new Builder(_clusterName).currentStates(instanceName, sessionId), ChangeType.CURRENT_STATE, |
| new EventType[] { EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated }); |
| } |
| |
| @Override |
| public void addHealthStateChangeListener(HealthStateChangeListener listener, |
| String instanceName) |
| { |
| addListener(listener, new Builder(_clusterName).healthReports(instanceName), ChangeType.HEALTH, |
| new EventType[] { EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated }); |
| } |
| |
| @Override |
| public void addExternalViewChangeListener(ExternalViewChangeListener listener) |
| { |
| addListener(listener, new Builder(_clusterName).externalViews(), ChangeType.EXTERNAL_VIEW, |
| new EventType[] { EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated }); |
| } |
| |
| @Override |
| public void addControllerListener(ControllerChangeListener listener) |
| { |
| addListener(listener, new Builder(_clusterName).controller(), ChangeType.CONTROLLER, |
| new EventType[] { EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated }); |
| } |
| |
| @Override |
| public HelixDataAccessor getHelixDataAccessor() |
| { |
| checkConnected(); |
| return _helixAccessor; |
| } |
| |
| @Override |
| public ConfigAccessor getConfigAccessor() |
| { |
| checkConnected(); |
| return _configAccessor; |
| } |
| |
| @Override |
| public String getClusterName() |
| { |
| return _clusterName; |
| } |
| |
| @Override |
| public String getInstanceName() |
| { |
| return _instanceName; |
| } |
| |
| @Override |
| public void connect() throws Exception |
| { |
| logger.info("ClusterManager.connect()"); |
| if (_zkStateChangeListener.isConnected()) |
| { |
| logger.warn("Cluster manager " + _clusterName + " " + _instanceName |
| + " already connected"); |
| return; |
| } |
| |
| try |
| { |
| createClient(_zkConnectString); |
| _messagingService.onConnected(); |
| } |
| catch (Exception e) |
| { |
| logger.error(e); |
| disconnect(); |
| throw e; |
| } |
| } |
| |
| @Override |
| public void disconnect() |
| { |
| if (!isConnected()) |
| { |
| logger.error("ClusterManager " + _instanceName + " already disconnected"); |
| return; |
| } |
| disconnectInternal(); |
| } |
| |
| void disconnectInternal() |
| { |
| // This function can be called when the connection are in bad state(e.g. flapping), |
| // in which isConnected() could be false and we want to disconnect from cluster. |
| logger.info("disconnect " + _instanceName + "(" + _instanceType + ") from " |
| + _clusterName); |
| |
| /** |
| * shutdown thread pool first to avoid reset() being invoked in the middle of state |
| * transition |
| */ |
| _messagingService.getExecutor().shutdown(); |
| resetHandlers(); |
| |
| _helixAccessor.shutdown(); |
| |
| if (_leaderElectionHandler != null) |
| { |
| _leaderElectionHandler.reset(); |
| } |
| |
| if (_participantHealthCheckInfoCollector != null) |
| { |
| _participantHealthCheckInfoCollector.stop(); |
| } |
| |
| if (_timer != null) |
| { |
| _timer.cancel(); |
| _timer = null; |
| } |
| |
| if (_instanceType == InstanceType.CONTROLLER) |
| { |
| stopTimerTasks(); |
| } |
| |
| // unsubscribe accessor from controllerChange |
| _zkClient.unsubscribeAll(); |
| |
| _zkClient.close(); |
| |
| // HACK seems that zkClient is not sending DISCONNECT event |
| _zkStateChangeListener.disconnect(); |
| logger.info("Cluster manager: " + _instanceName + " disconnected"); |
| |
| } |
| |
| @Override |
| public String getSessionId() |
| { |
| checkConnected(); |
| return _sessionId; |
| } |
| |
| @Override |
| public boolean isConnected() |
| { |
| return _zkStateChangeListener.isConnected(); |
| } |
| |
| @Override |
| public long getLastNotificationTime() |
| { |
| return -1; |
| } |
| |
| private void addLiveInstance() |
| { |
| LiveInstance liveInstance = new LiveInstance(_instanceName); |
| liveInstance.setSessionId(_sessionId); |
| liveInstance.setHelixVersion(_version); |
| liveInstance.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName()); |
| |
| if(_liveInstanceInfoProvider != null) |
| { |
| logger.info("invoking _liveInstanceInfoProvider"); |
| ZNRecord additionalLiveInstanceInfo = _liveInstanceInfoProvider.getAdditionalLiveInstanceInfo(); |
| if(additionalLiveInstanceInfo != null) |
| { |
| additionalLiveInstanceInfo.merge(liveInstance.getRecord()); |
| ZNRecord mergedLiveInstance = new ZNRecord(additionalLiveInstanceInfo, _instanceName); |
| liveInstance = new LiveInstance(mergedLiveInstance); |
| logger.info("liveInstance content :" + _instanceName + " " + liveInstance.toString()); |
| } |
| } |
| |
| logger.info("Add live instance: InstanceName: " + _instanceName + " Session id:" |
| + _sessionId); |
| Builder keyBuilder = _helixAccessor.keyBuilder(); |
| if (!_helixAccessor.createProperty(keyBuilder.liveInstance(_instanceName), |
| liveInstance)) |
| { |
| String errorMsg = |
| "Fail to create live instance node after waiting, so quit. instance:" |
| + _instanceName; |
| logger.warn(errorMsg); |
| throw new HelixException(errorMsg); |
| |
| } |
| String currentStatePathParent = |
| PropertyPathConfig.getPath(PropertyType.CURRENTSTATES, |
| _clusterName, |
| _instanceName, |
| getSessionId()); |
| |
| if (!_zkClient.exists(currentStatePathParent)) |
| { |
| _zkClient.createPersistent(currentStatePathParent); |
| logger.info("Creating current state path " + currentStatePathParent); |
| } |
| } |
| |
| private void startStatusUpdatedumpTask() |
| { |
| long initialDelay = 30 * 60 * 1000; |
| long period = 120 * 60 * 1000; |
| int timeThresholdNoChange = 180 * 60 * 1000; |
| |
| if (_timer == null) |
| { |
| _timer = new Timer(true); |
| _timer.scheduleAtFixedRate(new ZKPathDataDumpTask(this, |
| _zkClient, |
| timeThresholdNoChange), |
| initialDelay, |
| period); |
| } |
| } |
| |
| private void createClient(String zkServers) throws Exception |
| { |
| String propertyStorePath = |
| PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, _clusterName); |
| |
| // by default use ZNRecordStreamingSerializer except for paths within the property |
| // store which expects raw byte[] serialization/deserialization |
| PathBasedZkSerializer zkSerializer = |
| ChainedPathZkSerializer.builder(new ZNRecordStreamingSerializer()) |
| .serialize(propertyStorePath, new ByteArraySerializer()) |
| .build(); |
| |
| _zkClient = new ZkClient(zkServers, _sessionTimeout, CONNECTIONTIMEOUT, zkSerializer); |
| |
| ZkBaseDataAccessor<ZNRecord> baseDataAccessor = |
| new ZkBaseDataAccessor<ZNRecord>(_zkClient); |
| if (_instanceType == InstanceType.PARTICIPANT) |
| { |
| String curStatePath = |
| PropertyPathConfig.getPath(PropertyType.CURRENTSTATES, |
| _clusterName, |
| _instanceName); |
| _baseDataAccessor = |
| new ZkCacheBaseDataAccessor<ZNRecord>(baseDataAccessor, |
| Arrays.asList(curStatePath)); |
| } |
| else if (_instanceType == InstanceType.CONTROLLER) |
| { |
| String extViewPath = PropertyPathConfig.getPath(PropertyType.EXTERNALVIEW, |
| |
| _clusterName); |
| _baseDataAccessor = |
| new ZkCacheBaseDataAccessor<ZNRecord>(baseDataAccessor, |
| Arrays.asList(extViewPath)); |
| |
| } |
| else |
| { |
| _baseDataAccessor = baseDataAccessor; |
| } |
| |
| _helixAccessor = |
| new ZKHelixDataAccessor(_clusterName, _instanceType, _baseDataAccessor); |
| _configAccessor = new ConfigAccessor(_zkClient); |
| int retryCount = 0; |
| |
| _zkClient.subscribeStateChanges(_zkStateChangeListener); |
| while (retryCount < RETRY_LIMIT) |
| { |
| try |
| { |
| _zkClient.waitUntilConnected(_sessionTimeout, TimeUnit.MILLISECONDS); |
| _zkStateChangeListener.handleStateChanged(KeeperState.SyncConnected); |
| _zkStateChangeListener.handleNewSession(); |
| break; |
| } |
| catch (HelixException e) |
| { |
| logger.error("fail to createClient.", e); |
| throw e; |
| } |
| catch (Exception e) |
| { |
| retryCount++; |
| |
| logger.error("fail to createClient. retry " + retryCount, e); |
| if (retryCount == RETRY_LIMIT) |
| { |
| throw e; |
| } |
| } |
| } |
| } |
| |
| private CallbackHandler createCallBackHandler(PropertyKey propertyKey, |
| Object listener, |
| EventType[] eventTypes, |
| ChangeType changeType) |
| { |
| if (listener == null) |
| { |
| throw new HelixException("Listener cannot be null"); |
| } |
| return new CallbackHandler(this, _zkClient, propertyKey, listener, eventTypes, changeType); |
| } |
| |
| /** |
| * This will be invoked when ever a new session is created<br/> |
| * |
| * case 1: the cluster manager was a participant carry over current state, add live |
| * instance, and invoke message listener; case 2: the cluster manager was controller and |
| * was a leader before do leader election, and if it becomes leader again, invoke ideal |
| * state listener, current state listener, etc. if it fails to become leader in the new |
| * session, then becomes standby; case 3: the cluster manager was controller and was NOT |
| * a leader before do leader election, and if it becomes leader, instantiate and invoke |
| * ideal state listener, current state listener, etc. if if fails to become leader in |
| * the new session, stay as standby |
| */ |
| |
| protected void handleNewSession() |
| { |
| boolean isConnected = _zkClient.waitUntilConnected(CONNECTIONTIMEOUT, TimeUnit.MILLISECONDS); |
| while (!isConnected) |
| { |
| logger.error("Could NOT connect to zk server in " + CONNECTIONTIMEOUT + "ms. zkServer: " |
| + _zkConnectString + ", expiredSessionId: " + _sessionId + ", clusterName: " |
| + _clusterName); |
| isConnected = _zkClient.waitUntilConnected(CONNECTIONTIMEOUT, TimeUnit.MILLISECONDS); |
| } |
| |
| ZkConnection zkConnection = ((ZkConnection) _zkClient.getConnection()); |
| |
| synchronized (this) |
| { |
| _sessionId = Long.toHexString(zkConnection.getZookeeper().getSessionId()); |
| } |
| _baseDataAccessor.reset(); |
| |
| // reset all handlers so they have a chance to unsubscribe zk changes from zkclient |
| // abandon all callback-handlers added in expired session |
| resetHandlers(); |
| |
| logger.info("Handling new session, session id:" + _sessionId + ", instance:" |
| + _instanceName + ", instanceTye: " + _instanceType + ", cluster: " + _clusterName); |
| |
| logger.info(zkConnection.getZookeeper()); |
| |
| if (!ZKUtil.isClusterSetup(_clusterName, _zkClient)) |
| { |
| throw new HelixException("Initial cluster structure is not set up for cluster:" |
| + _clusterName); |
| } |
| // Read cluster config and see if instance can auto join the cluster |
| boolean autoJoin = false; |
| try |
| { |
| HelixConfigScope scope = |
| new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(getClusterName()) |
| .build(); |
| autoJoin = Boolean.parseBoolean(getConfigAccessor().get(scope, ALLOW_PARTICIPANT_AUTO_JOIN)); |
| logger.info("Auto joining " + _clusterName +" is true"); |
| } |
| catch(Exception e) |
| { |
| } |
| if (!ZKUtil.isInstanceSetup(_zkClient, _clusterName, _instanceName, _instanceType)) |
| { |
| if(!autoJoin) |
| { |
| throw new HelixException("Initial cluster structure is not set up for instance:" |
| + _instanceName + " instanceType:" + _instanceType); |
| } |
| else |
| { |
| logger.info("Auto joining instance " + _instanceName); |
| InstanceConfig instanceConfig = new InstanceConfig(_instanceName); |
| String hostName = _instanceName; |
| String port = ""; |
| int lastPos = _instanceName.lastIndexOf("_"); |
| if (lastPos > 0) |
| { |
| hostName = _instanceName.substring(0, lastPos); |
| port = _instanceName.substring(lastPos + 1); |
| } |
| instanceConfig.setHostName(hostName); |
| instanceConfig.setPort(port); |
| instanceConfig.setInstanceEnabled(true); |
| getClusterManagmentTool().addInstance(_clusterName, instanceConfig); |
| } |
| } |
| |
| if (_instanceType == InstanceType.PARTICIPANT |
| || _instanceType == InstanceType.CONTROLLER_PARTICIPANT) |
| { |
| handleNewSessionAsParticipant(); |
| } |
| |
| if (_instanceType == InstanceType.CONTROLLER |
| || _instanceType == InstanceType.CONTROLLER_PARTICIPANT) |
| { |
| addControllerMessageListener(_messagingService.getExecutor()); |
| MessageHandlerFactory defaultControllerMsgHandlerFactory = |
| new DefaultControllerMessageHandlerFactory(); |
| _messagingService.getExecutor() |
| .registerMessageHandlerFactory(defaultControllerMsgHandlerFactory.getMessageType(), |
| defaultControllerMsgHandlerFactory); |
| MessageHandlerFactory defaultSchedulerMsgHandlerFactory = |
| new DefaultSchedulerMessageHandlerFactory(this); |
| _messagingService.getExecutor() |
| .registerMessageHandlerFactory(defaultSchedulerMsgHandlerFactory.getMessageType(), |
| defaultSchedulerMsgHandlerFactory); |
| MessageHandlerFactory defaultParticipantErrorMessageHandlerFactory = |
| new DefaultParticipantErrorMessageHandlerFactory(this); |
| _messagingService.getExecutor() |
| .registerMessageHandlerFactory(defaultParticipantErrorMessageHandlerFactory.getMessageType(), |
| defaultParticipantErrorMessageHandlerFactory); |
| |
| if (_leaderElectionHandler != null) { |
| _leaderElectionHandler.reset(); |
| _leaderElectionHandler.init(); |
| } else { |
| _leaderElectionHandler = |
| createCallBackHandler(new Builder(_clusterName).controller(), |
| new DistClusterControllerElection(_zkConnectString), |
| new EventType[] { EventType.NodeChildrenChanged, |
| EventType.NodeDeleted, EventType.NodeCreated }, |
| ChangeType.CONTROLLER); |
| } |
| } |
| |
| if (_instanceType == InstanceType.PARTICIPANT |
| || _instanceType == InstanceType.CONTROLLER_PARTICIPANT |
| || (_instanceType == InstanceType.CONTROLLER && isLeader())) |
| { |
| initHandlers(); |
| } |
| } |
| |
| private void handleNewSessionAsParticipant() |
| { |
| // In case there is a live instance record on zookeeper |
| Builder keyBuilder = _helixAccessor.keyBuilder(); |
| |
| if (_helixAccessor.getProperty(keyBuilder.liveInstance(_instanceName)) != null) |
| { |
| logger.warn("Found another instance with same instanceName: " + _instanceName |
| + " in cluster " + _clusterName); |
| // Wait for a while, in case previous storage node exits unexpectedly |
| // and its liveinstance |
| // still hangs around until session timeout happens |
| try |
| { |
| Thread.sleep(_sessionTimeout + 5000); |
| } |
| catch (InterruptedException e) |
| { |
| logger.warn("Sleep interrupted while waiting for previous liveinstance to go away.", |
| e); |
| } |
| |
| if (_helixAccessor.getProperty(keyBuilder.liveInstance(_instanceName)) != null) |
| { |
| String errorMessage = |
| "instance " + _instanceName + " already has a liveinstance in cluster " |
| + _clusterName; |
| logger.error(errorMessage); |
| throw new HelixException(errorMessage); |
| } |
| } |
| // Invoke the PreConnectCallbacks |
| for (PreConnectCallback callback : _preConnectCallbacks) |
| { |
| callback.onPreConnect(); |
| } |
| addLiveInstance(); |
| carryOverPreviousCurrentState(); |
| |
| // In case the cluster manager is running as a participant, setup message |
| // listener |
| _messagingService.registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(), |
| _stateMachEngine); |
| addMessageListener(_messagingService.getExecutor(), _instanceName); |
| addControllerListener(_helixAccessor); |
| |
| ScheduledTaskStateModelFactory stStateModelFactory = new ScheduledTaskStateModelFactory(_messagingService.getExecutor()); |
| _stateMachEngine.registerStateModelFactory(DefaultSchedulerMessageHandlerFactory.SCHEDULER_TASK_QUEUE, stStateModelFactory); |
| |
| if (_participantHealthCheckInfoCollector == null) |
| { |
| _participantHealthCheckInfoCollector = |
| new ParticipantHealthReportCollectorImpl(this, _instanceName); |
| _participantHealthCheckInfoCollector.start(); |
| } |
| // start the participant health check timer, also create zk path for health |
| // check info |
| String healthCheckInfoPath = |
| _helixAccessor.keyBuilder().healthReports(_instanceName).getPath(); |
| if (!_zkClient.exists(healthCheckInfoPath)) |
| { |
| _zkClient.createPersistent(healthCheckInfoPath, true); |
| logger.info("Creating healthcheck info path " + healthCheckInfoPath); |
| } |
| } |
| |
| @Override |
| public void addPreConnectCallback(PreConnectCallback callback) |
| { |
| logger.info("Adding preconnect callback"); |
| _preConnectCallbacks.add(callback); |
| } |
| |
| private void resetHandlers() |
| { |
| synchronized (this) |
| { |
| if (_handlers != null) |
| { |
| // get a copy of the list and iterate over the copy list |
| // in case handler.reset() will modify the original handler list |
| List<CallbackHandler> tmpHandlers = new ArrayList<CallbackHandler>(); |
| tmpHandlers.addAll(_handlers); |
| |
| for (CallbackHandler handler : tmpHandlers) |
| { |
| handler.reset(); |
| logger.info("reset handler: " + handler.getPath() + ", " + handler.getListener()); |
| } |
| } |
| } |
| } |
| |
| private void initHandlers() |
| { |
| synchronized (this) |
| { |
| if (_handlers != null) |
| { |
| // may add new currentState and message listeners during init() |
| // so make a copy and iterate over the copy |
| List<CallbackHandler> tmpHandlers = new ArrayList<CallbackHandler>(); |
| tmpHandlers.addAll(_handlers); |
| for (CallbackHandler handler : tmpHandlers) |
| { |
| handler.init(); |
| logger.info("init handler: " + handler.getPath() + ", " + handler.getListener()); |
| } |
| } |
| } |
| } |
| |
| @Override |
| public boolean isLeader() |
| { |
| if (!isConnected()) |
| { |
| return false; |
| } |
| |
| if (_instanceType != InstanceType.CONTROLLER) |
| { |
| return false; |
| } |
| |
| Builder keyBuilder = _helixAccessor.keyBuilder(); |
| LiveInstance leader = _helixAccessor.getProperty(keyBuilder.controllerLeader()); |
| if (leader == null) |
| { |
| return false; |
| } |
| else |
| { |
| String leaderName = leader.getInstanceName(); |
| // TODO need check sessionId also, but in distributed mode, leader's |
| // sessionId is |
| // not equal to |
| // the leader znode's sessionId field which is the sessionId of the |
| // controller_participant that |
| // successfully creates the leader node |
| if (leaderName == null || !leaderName.equals(_instanceName)) |
| { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| /** |
| * carry over current-states from last sessions |
| * set to initial state for current session only when the state doesn't exist in current session |
| */ |
| private void carryOverPreviousCurrentState() |
| { |
| Builder keyBuilder = _helixAccessor.keyBuilder(); |
| List<String> sessions = _helixAccessor.getChildNames(keyBuilder.sessions(_instanceName)); |
| |
| // carry-over |
| for (String session : sessions) { |
| if (session.equals(_sessionId)) { |
| continue; |
| } |
| |
| List<CurrentState> lastCurStates = |
| _helixAccessor.getChildValues(keyBuilder.currentStates(_instanceName, session)); |
| |
| for (CurrentState lastCurState : lastCurStates) { |
| logger.info("Carrying over old session: " + session + ", resource: " |
| + lastCurState.getId() + " to current session: " + _sessionId); |
| String stateModelDefRef = lastCurState.getStateModelDefRef(); |
| if (stateModelDefRef == null) |
| { |
| logger.error("skip carry-over because previous current state doesn't have a state model definition. previous current-state: " |
| + lastCurState); |
| continue; |
| } |
| StateModelDefinition stateModel = |
| _helixAccessor.getProperty(keyBuilder.stateModelDef(stateModelDefRef)); |
| |
| String curStatePath = keyBuilder.currentState(_instanceName, _sessionId, lastCurState.getResourceName()).getPath(); |
| _helixAccessor.getBaseDataAccessor().update(curStatePath, |
| new CurStateCarryOverUpdater(_sessionId, stateModel.getInitialState(), lastCurState), AccessOption.PERSISTENT); |
| } |
| } |
| |
| // remove previous current states |
| for (String session : sessions) |
| { |
| if (session.equals(_sessionId)) { |
| continue; |
| } |
| |
| String path = |
| _helixAccessor.keyBuilder() |
| .currentStates(_instanceName, session) |
| .getPath(); |
| logger.info("Removing current states from previous sessions. path: " + path); |
| _zkClient.deleteRecursive(path); |
| } |
| } |
| |
| @Override |
| public synchronized ZkHelixPropertyStore<ZNRecord> getHelixPropertyStore() |
| { |
| checkConnected(); |
| |
| if (_helixPropertyStore == null) |
| { |
| String path = |
| PropertyPathConfig.getPath(PropertyType.HELIX_PROPERTYSTORE, _clusterName); |
| |
| _helixPropertyStore = |
| new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_zkClient), |
| path, |
| null); |
| } |
| |
| return _helixPropertyStore; |
| } |
| |
| @Override |
| public synchronized HelixAdmin getClusterManagmentTool() |
| { |
| checkConnected(); |
| if (_zkClient != null) |
| { |
| _managementTool = new ZKHelixAdmin(_zkClient); |
| } |
| else |
| { |
| logger.error("Couldn't get ZKClusterManagementTool because zkClient is null"); |
| } |
| |
| return _managementTool; |
| } |
| |
| @Override |
| public ClusterMessagingService getMessagingService() |
| { |
| // The caller can register message handler factories on messaging service before the |
| // helix manager is connected. Thus we do not do connected check here. |
| return _messagingService; |
| } |
| |
| @Override |
| public ParticipantHealthReportCollector getHealthReportCollector() |
| { |
| checkConnected(); |
| return _participantHealthCheckInfoCollector; |
| } |
| |
| @Override |
| public InstanceType getInstanceType() |
| { |
| return _instanceType; |
| } |
| |
| private void checkConnected() |
| { |
| if (!isConnected()) |
| { |
| throw new HelixException("ClusterManager not connected. Call clusterManager.connect()"); |
| } |
| } |
| |
| @Override |
| public String getVersion() |
| { |
| return _version; |
| } |
| |
| @Override |
| public HelixManagerProperties getProperties() { |
| return _properties; |
| } |
| |
| @Override |
| public StateMachineEngine getStateMachineEngine() |
| { |
| return _stateMachEngine; |
| } |
| |
| // TODO: rename this and not expose this function as part of interface |
| @Override |
| public void startTimerTasks() |
| { |
| for (HelixTimerTask task : _controllerTimerTasks) |
| { |
| task.start(); |
| } |
| startStatusUpdatedumpTask(); |
| } |
| |
| @Override |
| public void stopTimerTasks() |
| { |
| for (HelixTimerTask task : _controllerTimerTasks) |
| { |
| task.stop(); |
| } |
| } |
| |
| @Override |
| public void setLiveInstanceInfoProvider( |
| LiveInstanceInfoProvider liveInstanceInfoProvider) |
| { |
| _liveInstanceInfoProvider = liveInstanceInfoProvider; |
| } |
| } |