| package org.apache.helix.manager.zk; |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| import java.io.IOException; |
| import java.net.InetAddress; |
| import java.net.UnknownHostException; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.Timer; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import javax.management.JMException; |
| |
| import com.google.common.collect.Sets; |
| import org.apache.helix.BaseDataAccessor; |
| import org.apache.helix.ClusterMessagingService; |
| import org.apache.helix.ConfigAccessor; |
| import org.apache.helix.HelixAdmin; |
| import org.apache.helix.HelixConstants.ChangeType; |
| import org.apache.helix.HelixDataAccessor; |
| import org.apache.helix.HelixException; |
| import org.apache.helix.HelixManager; |
| import org.apache.helix.HelixManagerProperties; |
| import org.apache.helix.HelixManagerProperty; |
| import org.apache.helix.HelixPropertyFactory; |
| import org.apache.helix.HelixTimerTask; |
| import org.apache.helix.InstanceType; |
| import org.apache.helix.LiveInstanceInfoProvider; |
| import org.apache.helix.PreConnectCallback; |
| import org.apache.helix.PropertyKey; |
| import org.apache.helix.PropertyKey.Builder; |
| import org.apache.helix.PropertyPathBuilder; |
| import org.apache.helix.PropertyType; |
| import org.apache.helix.SystemPropertyKeys; |
| import org.apache.helix.api.listeners.ClusterConfigChangeListener; |
| import org.apache.helix.api.listeners.ConfigChangeListener; |
| import org.apache.helix.api.listeners.ControllerChangeListener; |
| import org.apache.helix.api.listeners.CurrentStateChangeListener; |
| import org.apache.helix.api.listeners.CustomizedStateChangeListener; |
| import org.apache.helix.api.listeners.CustomizedStateConfigChangeListener; |
| import org.apache.helix.api.listeners.CustomizedStateRootChangeListener; |
| import org.apache.helix.api.listeners.CustomizedViewChangeListener; |
| import org.apache.helix.api.listeners.CustomizedViewRootChangeListener; |
| import org.apache.helix.api.listeners.ExternalViewChangeListener; |
| import org.apache.helix.api.listeners.IdealStateChangeListener; |
| import org.apache.helix.api.listeners.InstanceConfigChangeListener; |
| import org.apache.helix.api.listeners.LiveInstanceChangeListener; |
| import org.apache.helix.api.listeners.MessageListener; |
| import org.apache.helix.api.listeners.ResourceConfigChangeListener; |
| import org.apache.helix.api.listeners.ScopedConfigChangeListener; |
| import org.apache.helix.controller.GenericHelixController; |
| import org.apache.helix.controller.pipeline.Pipeline; |
| import org.apache.helix.healthcheck.ParticipantHealthReportCollector; |
| import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl; |
| import org.apache.helix.healthcheck.ParticipantHealthReportTask; |
| import org.apache.helix.messaging.DefaultMessagingService; |
| import org.apache.helix.model.BuiltInStateModelDefinitions; |
| import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty; |
| import org.apache.helix.model.LiveInstance; |
| import org.apache.helix.monitoring.ZKPathDataDumpTask; |
| import org.apache.helix.monitoring.mbeans.HelixCallbackMonitor; |
| import org.apache.helix.monitoring.mbeans.MonitorLevel; |
| import org.apache.helix.msdcommon.exception.InvalidRoutingDataException; |
| import org.apache.helix.participant.HelixStateMachineEngine; |
| import org.apache.helix.participant.StateMachineEngine; |
| import org.apache.helix.store.zk.AutoFallbackPropertyStore; |
| import org.apache.helix.store.zk.ZkHelixPropertyStore; |
| import org.apache.helix.util.HelixUtil; |
| import org.apache.helix.zookeeper.api.client.HelixZkClient; |
| import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; |
| import org.apache.helix.zookeeper.datamodel.ZNRecord; |
| import org.apache.helix.zookeeper.datamodel.serializer.ChainedPathZkSerializer; |
| import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer; |
| import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory; |
| import org.apache.helix.zookeeper.impl.factory.HelixZkClientFactory; |
| import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory; |
| import org.apache.helix.zookeeper.zkclient.IZkStateListener; |
| import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException; |
| import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer; |
| import org.apache.zookeeper.Watcher.Event.EventType; |
| import org.apache.zookeeper.Watcher.Event.KeeperState; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class ZKHelixManager implements HelixManager, IZkStateListener { |
| private static Logger LOG = LoggerFactory.getLogger(ZKHelixManager.class); |
| |
| public static final String ALLOW_PARTICIPANT_AUTO_JOIN = "allowParticipantAutoJoin"; |
| private static final int FLAPPING_TIME_WINDOW = 300000; // Default to 300 sec |
| public static final int DEFAULT_MAX_DISCONNECT_THRESHOLD = 600; // Default to be a large number |
| private static final int DEFAULT_WAIT_CONNECTED_TIMEOUT = 10 * 1000; // wait until connected for up to 10 seconds. |
| |
| protected final String _zkAddress; |
| private final String _clusterName; |
| private final String _instanceName; |
| private final InstanceType _instanceType; |
| private final int _waitForConnectedTimeout; // wait time for testing connect |
| private final int _sessionTimeout; // client side session timeout, will be overridden by server timeout. Disconnect after timeout |
| private final int _connectionInitTimeout; // client timeout to init connect |
| private final List<PreConnectCallback> _preConnectCallbacks; |
| protected final List<CallbackHandler> _handlers; |
| private final HelixManagerProperties _properties; |
| private final HelixManagerProperty _helixManagerProperty; |
| private final HelixManagerStateListener _stateListener; |
| |
| /** |
| * helix version# |
| */ |
| private final String _version; |
| private int _reportLatency; |
| |
| protected RealmAwareZkClient _zkclient; |
| private RealmAwareZkClient.RealmAwareZkConnectionConfig _realmAwareZkConnectionConfig; |
| private final DefaultMessagingService _messagingService; |
| private Map<ChangeType, HelixCallbackMonitor> _callbackMonitors; |
| |
| private final MonitorLevel _monitorLevel; |
| |
| private BaseDataAccessor<ZNRecord> _baseDataAccessor; |
| private ZKHelixDataAccessor _dataAccessor; |
| private final Builder _keyBuilder; |
| private ConfigAccessor _configAccessor; |
| private ZkHelixPropertyStore<ZNRecord> _helixPropertyStore; |
| protected LiveInstanceInfoProvider _liveInstanceInfoProvider = null; |
| |
| private volatile String _sessionId; |
| |
| /** |
| * Keep track of timestamps that zk State has become Disconnected If in a _timeWindowLengthMs |
| * window zk State has become Disconnected for more than_maxDisconnectThreshold times disconnect |
| * the zkHelixManager |
| */ |
| private final List<Long> _disconnectTimeHistory = new ArrayList<Long>(); |
| private final int _flappingTimeWindowMs; |
| private final int _maxDisconnectThreshold; |
| |
| /** |
| * participant fields |
| */ |
| private final StateMachineEngine _stateMachineEngine; |
| private final List<HelixTimerTask> _timerTasks = new ArrayList<>(); |
| |
| private final ParticipantHealthReportCollectorImpl _participantHealthInfoCollector; |
| private Long _sessionStartTime; |
| private ParticipantManager _participantManager; |
| |
| /** |
| * controller fields |
| */ |
| private GenericHelixController _controller; |
| private Set<Pipeline.Type> _enabledPipelineTypes; |
| private CallbackHandler _leaderElectionHandler = null; |
| protected final List<HelixTimerTask> _controllerTimerTasks = new ArrayList<>(); |
| |
| /** |
| * status dump timer-task |
| */ |
| protected static class StatusDumpTask extends HelixTimerTask { |
| final HelixManager helixController; |
| |
| public StatusDumpTask(HelixManager helixController) { |
| this.helixController = helixController; |
| } |
| |
| @Override |
| public synchronized void start() { |
| long initialDelay = 0; |
| long period = 15 * 60 * 1000; |
| long timeThresholdNoChangeForStatusUpdates = 15 * 60 * 1000; // 15 minutes |
| long timeThresholdNoChangeForErrors = 24 * 60 * 60 * 1000; // 1 day |
| int maximumNumberOfLeafNodesAllowed = 100; |
| |
| if (_timer == null) { |
| LOG.info("Start StatusDumpTask"); |
| _timer = new Timer("StatusDumpTimerTask", true); |
| _timer.scheduleAtFixedRate( |
| new ZKPathDataDumpTask(helixController, timeThresholdNoChangeForStatusUpdates, |
| timeThresholdNoChangeForErrors, maximumNumberOfLeafNodesAllowed), initialDelay, |
| period); |
| } |
| } |
| |
| @Override |
| public synchronized void stop() { |
| if (_timer != null) { |
| LOG.info("Stop StatusDumpTask"); |
| _timer.cancel(); |
| _timer = null; |
| } |
| } |
| } |
| |
| public ZKHelixManager(String clusterName, String instanceName, InstanceType instanceType, |
| String zkAddress) { |
| this(clusterName, instanceName, instanceType, zkAddress, null); |
| } |
| |
| public ZKHelixManager(String clusterName, String instanceName, InstanceType instanceType, |
| String zkAddress, HelixManagerStateListener stateListener) { |
| this(clusterName, instanceName, instanceType, zkAddress, stateListener, |
| HelixPropertyFactory.getInstance().getHelixManagerProperty(zkAddress, clusterName)); |
| } |
| |
| public ZKHelixManager(String clusterName, String instanceName, InstanceType instanceType, |
| String zkAddress, HelixManagerStateListener stateListener, |
| HelixManagerProperty helixManagerProperty) { |
| validateZkConnectionSettings(zkAddress, helixManagerProperty); |
| |
| _zkAddress = zkAddress; |
| _clusterName = clusterName; |
| _instanceType = instanceType; |
| LOG.info("Create a zk-based cluster manager. ZK connection: " + getZkConnectionInfo() |
| + ", clusterName: " + clusterName + ", instanceName: " + instanceName + ", type: " |
| + instanceType); |
| |
| if (instanceName == null) { |
| try { |
| instanceName = |
| InetAddress.getLocalHost().getCanonicalHostName() + "-" + instanceType.toString(); |
| } catch (UnknownHostException e) { |
| // can ignore it |
| LOG.info("Unable to get host name. Will set it to UNKNOWN, mostly ignorable", e); |
| instanceName = "UNKNOWN"; |
| } |
| } |
| |
| _instanceName = instanceName; |
| _enabledPipelineTypes = |
| Sets.newHashSet(Pipeline.Type.DEFAULT, Pipeline.Type.TASK); |
| _preConnectCallbacks = new ArrayList<>(); |
| _handlers = new ArrayList<>(); |
| _properties = new HelixManagerProperties(SystemPropertyKeys.CLUSTER_MANAGER_VERSION); |
| _version = _properties.getVersion(); |
| |
| _keyBuilder = new Builder(clusterName); |
| _messagingService = new DefaultMessagingService(this); |
| try { |
| _callbackMonitors = new HashMap<>(); |
| for (ChangeType changeType : ChangeType.values()) { |
| HelixCallbackMonitor callbackMonitor = |
| new HelixCallbackMonitor(instanceType, clusterName, instanceName, changeType); |
| callbackMonitor.register(); |
| _callbackMonitors.put(changeType, callbackMonitor); |
| } |
| } catch (JMException e) { |
| LOG.error("Error in creating callback monitor.", e); |
| } |
| |
| _stateListener = stateListener; |
| _helixManagerProperty = helixManagerProperty; |
| |
| /** |
| * use system property if available |
| */ |
| _flappingTimeWindowMs = HelixUtil.getSystemPropertyAsInt(SystemPropertyKeys.FLAPPING_TIME_WINDOW, |
| ZKHelixManager.FLAPPING_TIME_WINDOW); |
| |
| _maxDisconnectThreshold = HelixUtil |
| .getSystemPropertyAsInt(SystemPropertyKeys.MAX_DISCONNECT_THRESHOLD, |
| ZKHelixManager.DEFAULT_MAX_DISCONNECT_THRESHOLD); |
| |
| _sessionTimeout = HelixUtil.getSystemPropertyAsInt(SystemPropertyKeys.ZK_SESSION_TIMEOUT, |
| HelixZkClient.DEFAULT_SESSION_TIMEOUT); |
| |
| _connectionInitTimeout = HelixUtil |
| .getSystemPropertyAsInt(SystemPropertyKeys.ZK_CONNECTION_TIMEOUT, |
| HelixZkClient.DEFAULT_CONNECTION_TIMEOUT); |
| |
| _waitForConnectedTimeout = HelixUtil |
| .getSystemPropertyAsInt(SystemPropertyKeys.ZK_WAIT_CONNECTED_TIMEOUT, |
| DEFAULT_WAIT_CONNECTED_TIMEOUT); |
| |
| _reportLatency = HelixUtil |
| .getSystemPropertyAsInt(SystemPropertyKeys.PARTICIPANT_HEALTH_REPORT_LATENCY, |
| ParticipantHealthReportTask.DEFAULT_REPORT_LATENCY); |
| |
| MonitorLevel configuredMonitorLevel; |
| try { |
| configuredMonitorLevel = MonitorLevel.valueOf( |
| System.getProperty(SystemPropertyKeys.MONITOR_LEVEL, MonitorLevel.DEFAULT.name())); |
| } catch (IllegalArgumentException ex) { |
| LOG.warn("Unrecognizable Monitor Level configuration. Use DEFAULT monitor level.", ex); |
| configuredMonitorLevel = MonitorLevel.DEFAULT; |
| } |
| _monitorLevel = configuredMonitorLevel; |
| |
| /** |
| * instance type specific init |
| */ |
| switch (instanceType) { |
| case PARTICIPANT: |
| _stateMachineEngine = new HelixStateMachineEngine(this); |
| _participantHealthInfoCollector = |
| new ParticipantHealthReportCollectorImpl(this, _instanceName); |
| _timerTasks |
| .add(new ParticipantHealthReportTask(_participantHealthInfoCollector, _reportLatency)); |
| break; |
| case CONTROLLER: |
| _stateMachineEngine = null; |
| _participantHealthInfoCollector = null; |
| _controllerTimerTasks.add(new StatusDumpTask(this)); |
| break; |
| case CONTROLLER_PARTICIPANT: |
| _stateMachineEngine = new HelixStateMachineEngine(this); |
| _participantHealthInfoCollector = |
| new ParticipantHealthReportCollectorImpl(this, _instanceName); |
| _timerTasks |
| .add(new ParticipantHealthReportTask(_participantHealthInfoCollector, _reportLatency)); |
| _controllerTimerTasks.add(new StatusDumpTask(this)); |
| break; |
| case ADMINISTRATOR: |
| case SPECTATOR: |
| _stateMachineEngine = null; |
| _participantHealthInfoCollector = null; |
| break; |
| default: |
| throw new IllegalArgumentException("unrecognized type: " + instanceType); |
| } |
| } |
| |
| public void setEnabledControlPipelineTypes(Set<Pipeline.Type> types) { |
| if (!InstanceType.CONTROLLER.equals(_instanceType) && !InstanceType.CONTROLLER_PARTICIPANT |
| .equals(_instanceType)) { |
| throw new IllegalStateException( |
| String.format("Cannot enable control pipeline for instance type %s", _instanceType)); |
| } |
| _enabledPipelineTypes = types; |
| } |
| |
| @Override |
| public boolean removeListener(PropertyKey key, Object listener) { |
| LOG.info("Removing listener: " + listener + " on path: " + key.getPath() + " from cluster: " |
| + _clusterName + " by instance: " + _instanceName); |
| |
| synchronized (this) { |
| List<CallbackHandler> toRemove = new ArrayList<>(); |
| for (CallbackHandler handler : _handlers) { |
| // compare property-key path and listener reference |
| if (handler.getPath().equals(key.getPath()) && handler.getListener().equals(listener)) { |
| toRemove.add(handler); |
| } |
| } |
| |
| _handlers.removeAll(toRemove); |
| |
| // handler.reset() may modify the handlers list, so do it outside the iteration |
| for (CallbackHandler handler : toRemove) { |
| handler.reset(true); |
| } |
| } |
| |
| return true; |
| } |
| |
| void checkConnected() { |
| checkConnected(-1); |
| } |
| |
| /** |
| * Check if HelixManager is connected, if it is not connected, |
| * wait for the specified timeout and check again before return. |
| * |
| * @param timeout |
| */ |
| void checkConnected(long timeout) { |
| if (_zkclient == null || _zkclient.isClosed()) { |
| throw new HelixException( |
| "HelixManager (ZkClient) is not connected. Call HelixManager#connect()"); |
| } |
| |
| boolean isConnected = isConnected(); |
| if (!isConnected && timeout > 0) { |
| LOG.warn("zkClient to " + getZkConnectionInfo() + " is not connected, wait for " |
| + _waitForConnectedTimeout + "ms."); |
| isConnected = _zkclient.waitUntilConnected(_waitForConnectedTimeout, TimeUnit.MILLISECONDS); |
| } |
| |
| if (!isConnected) { |
| LOG.error("zkClient is not connected after waiting " + timeout + "ms." |
| + ", clusterName: " + _clusterName + ", zkAddress: " + getZkConnectionInfo()); |
| throw new HelixException( |
| "HelixManager is not connected within retry timeout for cluster " + _clusterName); |
| } |
| } |
| |
| void addListener(Object listener, PropertyKey propertyKey, ChangeType changeType, |
| EventType[] eventType) { |
| checkConnected(_waitForConnectedTimeout); |
| |
| PropertyType type = propertyKey.getType(); |
| |
| synchronized (this) { |
| for (CallbackHandler handler : _handlers) { |
| // compare property-key path and listener reference |
| if (handler.getPath().equals(propertyKey.getPath()) |
| && handler.getListener().equals(listener)) { |
| LOG.info("Listener: " + listener + " on path: " + propertyKey.getPath() |
| + " already exists. skip add"); |
| |
| return; |
| } |
| } |
| |
| CallbackHandler newHandler = |
| new CallbackHandler(this, _zkclient, propertyKey, listener, eventType, changeType, |
| _callbackMonitors.get(changeType)); |
| |
| _handlers.add(newHandler); |
| LOG.info("Added listener: " + listener + " for type: " + type + " to path: " |
| + newHandler.getPath()); |
| } |
| } |
| |
| @Override |
| public void addIdealStateChangeListener(final IdealStateChangeListener listener) throws Exception { |
| addListener(listener, new Builder(_clusterName).idealStates(), ChangeType.IDEAL_STATE, |
| new EventType[] { EventType.NodeChildrenChanged }); |
| } |
| |
| @Deprecated |
| @Override |
| public void addIdealStateChangeListener(final org.apache.helix.IdealStateChangeListener listener) |
| throws Exception { |
| addListener(listener, new Builder(_clusterName).idealStates(), ChangeType.IDEAL_STATE, |
| new EventType[] { EventType.NodeChildrenChanged }); |
| } |
| |
| @Override |
| public void addLiveInstanceChangeListener(LiveInstanceChangeListener listener) throws Exception { |
| addListener(listener, new Builder(_clusterName).liveInstances(), ChangeType.LIVE_INSTANCE, |
| new EventType[] { EventType.NodeChildrenChanged }); |
| } |
| |
| @Deprecated |
| @Override |
| public void addLiveInstanceChangeListener(org.apache.helix.LiveInstanceChangeListener listener) |
| throws Exception { |
| addListener(listener, new Builder(_clusterName).liveInstances(), ChangeType.LIVE_INSTANCE, |
| new EventType[] { EventType.NodeChildrenChanged }); |
| } |
| |
| @Override |
| public void addConfigChangeListener(ConfigChangeListener listener) throws Exception { |
| addListener(listener, new Builder(_clusterName).instanceConfigs(), ChangeType.INSTANCE_CONFIG, |
| new EventType[] { EventType.NodeChildrenChanged |
| }); |
| } |
| |
| @Override |
| public void addInstanceConfigChangeListener(InstanceConfigChangeListener listener) |
| throws Exception { |
| addListener(listener, new Builder(_clusterName).instanceConfigs(), ChangeType.INSTANCE_CONFIG, |
| new EventType[] { EventType.NodeChildrenChanged |
| }); |
| } |
| |
| @Deprecated |
| @Override |
| public void addInstanceConfigChangeListener(org.apache.helix.InstanceConfigChangeListener listener) |
| throws Exception { |
| addListener(listener, new Builder(_clusterName).instanceConfigs(), ChangeType.INSTANCE_CONFIG, |
| new EventType[] { EventType.NodeChildrenChanged |
| }); |
| } |
| |
| @Override |
| public void addResourceConfigChangeListener(ResourceConfigChangeListener listener) throws Exception{ |
| addListener(listener, new Builder(_clusterName).resourceConfigs(), ChangeType.RESOURCE_CONFIG, |
| new EventType[] { EventType.NodeChildrenChanged |
| }); |
| } |
| |
| @Override |
| public void addCustomizedStateConfigChangeListener( |
| CustomizedStateConfigChangeListener listener) throws Exception { |
| addListener(listener, new Builder(_clusterName).customizedStateConfig(), |
| ChangeType.CUSTOMIZED_STATE_CONFIG, new EventType[] { |
| EventType.NodeDataChanged |
| }); |
| } |
| |
| @Override |
| public void addClusterfigChangeListener(ClusterConfigChangeListener listener) throws Exception{ |
| addListener(listener, new Builder(_clusterName).clusterConfig(), ChangeType.CLUSTER_CONFIG, |
| new EventType[] { EventType.NodeDataChanged |
| }); |
| } |
| |
| @Override |
| public void addConfigChangeListener(ScopedConfigChangeListener listener, ConfigScopeProperty scope) |
| throws Exception { |
| Builder keyBuilder = new Builder(_clusterName); |
| |
| PropertyKey propertyKey = null; |
| switch (scope) { |
| case CLUSTER: |
| propertyKey = keyBuilder.clusterConfigs(); |
| break; |
| case PARTICIPANT: |
| propertyKey = keyBuilder.instanceConfigs(); |
| break; |
| case RESOURCE: |
| propertyKey = keyBuilder.resourceConfigs(); |
| break; |
| default: |
| break; |
| } |
| |
| if (propertyKey != null) { |
| addListener(listener, propertyKey, ChangeType.CONFIG, new EventType[] { |
| EventType.NodeChildrenChanged |
| }); |
| } else { |
| LOG.error("Can't add listener to config scope: " + scope); |
| } |
| } |
| |
| |
| @Deprecated |
| @Override |
| public void addConfigChangeListener(org.apache.helix.ScopedConfigChangeListener listener, ConfigScopeProperty scope) |
| throws Exception { |
| addConfigChangeListener((ScopedConfigChangeListener) listener, scope); |
| } |
| |
| // TODO: Decide if do we still need this since we are exposing |
| // ClusterMessagingService |
| @Override |
| public void addMessageListener(MessageListener listener, String instanceName) { |
| addListener(listener, new Builder(_clusterName).messages(instanceName), ChangeType.MESSAGE, |
| new EventType[] { EventType.NodeChildrenChanged }); |
| } |
| |
| @Deprecated |
| @Override |
| public void addMessageListener(org.apache.helix.MessageListener listener, String instanceName) { |
| addListener(listener, new Builder(_clusterName).messages(instanceName), ChangeType.MESSAGE, |
| new EventType[] { EventType.NodeChildrenChanged }); |
| } |
| |
| @Override |
| public void addControllerMessageListener(MessageListener listener) { |
| addListener(listener, new Builder(_clusterName).controllerMessages(), |
| ChangeType.MESSAGES_CONTROLLER, new EventType[] { EventType.NodeChildrenChanged }); |
| } |
| |
| @Deprecated |
| @Override |
| public void addControllerMessageListener(org.apache.helix.MessageListener listener) { |
| addListener(listener, new Builder(_clusterName).controllerMessages(), |
| ChangeType.MESSAGES_CONTROLLER, new EventType[] { EventType.NodeChildrenChanged }); |
| } |
| |
| @Override |
| public void addCurrentStateChangeListener(CurrentStateChangeListener listener, |
| String instanceName, String sessionId) throws Exception { |
| addListener(listener, new Builder(_clusterName).currentStates(instanceName, sessionId), |
| ChangeType.CURRENT_STATE, new EventType[] { EventType.NodeChildrenChanged |
| }); |
| } |
| |
| @Deprecated |
| @Override |
| public void addCurrentStateChangeListener(org.apache.helix.CurrentStateChangeListener listener, |
| String instanceName, String sessionId) throws Exception { |
| addListener(listener, new Builder(_clusterName).currentStates(instanceName, sessionId), |
| ChangeType.CURRENT_STATE, new EventType[] { EventType.NodeChildrenChanged |
| }); |
| } |
| |
| @Override |
| public void addTaskCurrentStateChangeListener(CurrentStateChangeListener listener, |
| String instanceName, String sessionId) throws Exception { |
| addListener(listener, new Builder(_clusterName).taskCurrentStates(instanceName, sessionId), |
| ChangeType.TASK_CURRENT_STATE, new EventType[] { EventType.NodeChildrenChanged |
| }); |
| } |
| |
| @Override |
| public void addCustomizedStateRootChangeListener(CustomizedStateRootChangeListener listener, |
| String instanceName) throws Exception { |
| addListener(listener, new Builder(_clusterName).customizedStatesRoot(instanceName), |
| ChangeType.CUSTOMIZED_STATE_ROOT, new EventType[]{EventType.NodeChildrenChanged}); |
| } |
| |
| @Override |
| public void addCustomizedStateChangeListener(CustomizedStateChangeListener listener, |
| String instanceName, String customizedStateType) throws Exception { |
| addListener(listener, |
| new Builder(_clusterName).customizedStates(instanceName, customizedStateType), |
| ChangeType.CUSTOMIZED_STATE, new EventType[]{EventType.NodeChildrenChanged}); |
| } |
| |
| @Override |
| public void addExternalViewChangeListener(ExternalViewChangeListener listener) throws Exception { |
| addListener(listener, new Builder(_clusterName).externalViews(), ChangeType.EXTERNAL_VIEW, |
| new EventType[] { EventType.NodeChildrenChanged }); |
| } |
| |
| @Override |
| public void addCustomizedViewChangeListener(CustomizedViewChangeListener listener, String customizedStateType) |
| throws Exception { |
| addListener(listener, new Builder(_clusterName).customizedView(customizedStateType), |
| ChangeType.CUSTOMIZED_VIEW, new EventType[] { |
| EventType.NodeChildrenChanged |
| }); |
| } |
| |
| @Override |
| public void addCustomizedViewRootChangeListener(CustomizedViewRootChangeListener listener) throws Exception { |
| addListener(listener, new Builder(_clusterName).customizedViews(), |
| ChangeType.CUSTOMIZED_VIEW_ROOT, new EventType[] { |
| EventType.NodeChildrenChanged |
| }); |
| } |
| |
| @Override |
| public void addTargetExternalViewChangeListener(ExternalViewChangeListener listener) throws Exception { |
| addListener(listener, new Builder(_clusterName).externalViews(), ChangeType.TARGET_EXTERNAL_VIEW, |
| new EventType[] { EventType.NodeChildrenChanged }); |
| } |
| |
| @Deprecated |
| @Override |
| public void addExternalViewChangeListener(org.apache.helix.ExternalViewChangeListener listener) throws Exception { |
| addListener(listener, new Builder(_clusterName).externalViews(), ChangeType.EXTERNAL_VIEW, |
| new EventType[] { EventType.NodeChildrenChanged }); |
| } |
| |
| @Override |
| public void addControllerListener(ControllerChangeListener listener) { |
| addListener(listener, new Builder(_clusterName).controller(), ChangeType.CONTROLLER, |
| new EventType[] { EventType.NodeChildrenChanged }); |
| } |
| |
| @Deprecated |
| @Override |
| public void addControllerListener(org.apache.helix.ControllerChangeListener listener) { |
| addListener(listener, new Builder(_clusterName).controller(), ChangeType.CONTROLLER, |
| new EventType[] { EventType.NodeChildrenChanged }); |
| } |
| |
| @Override |
| public HelixDataAccessor getHelixDataAccessor() { |
| checkConnected(_waitForConnectedTimeout); |
| return _dataAccessor; |
| } |
| |
| @Override |
| public ConfigAccessor getConfigAccessor() { |
| checkConnected(_waitForConnectedTimeout); |
| return _configAccessor; |
| } |
| |
| @Override |
| public String getClusterName() { |
| return _clusterName; |
| } |
| |
| /** |
| * Deprecated becuase ZKHelixManager shouldn't expose ZkAddress in realm-aware mode. |
| * |
| * Returns a string that can be used to connect to metadata store for this HelixManager instance |
| * i.e. for ZkHelixManager, this will have format "{zookeeper-address}:{port}" |
| * @return a string used to connect to metadata store |
| */ |
| @Deprecated |
| @Override |
| public String getMetadataStoreConnectionString() { |
| return _zkAddress; |
| } |
| |
| /** |
| * @return the RealmAwareZkConnectionConfig used to create a realm aware ZkClient |
| */ |
| public RealmAwareZkClient.RealmAwareZkConnectionConfig getRealmAwareZkConnectionConfig() { |
| return _realmAwareZkConnectionConfig; |
| } |
| |
| @Override |
| public String getInstanceName() { |
| return _instanceName; |
| } |
| |
| BaseDataAccessor<ZNRecord> createBaseDataAccessor() { |
| ZkBaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<>(_zkclient); |
| return baseDataAccessor; |
| } |
| |
| /** |
| * Add Helix built-in state model definitions if not exist |
| */ |
| private void addBuiltInStateModelDefinitions() { |
| for (BuiltInStateModelDefinitions def : BuiltInStateModelDefinitions.values()) { |
| // creation succeeds only if not exist |
| _dataAccessor.createStateModelDef(def.getStateModelDefinition()); |
| } |
| } |
| |
| private boolean isMonitorRootPathOnly() { |
| switch (_monitorLevel) { |
| case ALL: |
| return false; |
| case AGGREGATED_ONLY: |
| return true; |
| default: |
| // Otherwise, apply the default policy. Emitting full metrics for controllers only. |
| return !_instanceType.equals(InstanceType.CONTROLLER) && !_instanceType |
| .equals(InstanceType.CONTROLLER_PARTICIPANT); |
| } |
| } |
| |
| void createClient() throws Exception { |
| final RealmAwareZkClient newClient = createSingleRealmZkClient(); |
| |
| synchronized (this) { |
| if (_zkclient != null) { |
| _zkclient.close(); |
| } |
| _zkclient = newClient; |
| |
| _baseDataAccessor = createBaseDataAccessor(); |
| |
| _dataAccessor = new ZKHelixDataAccessor(_clusterName, _baseDataAccessor); |
| _configAccessor = new ConfigAccessor(_zkclient); |
| |
| if (_instanceType == InstanceType.CONTROLLER |
| || _instanceType == InstanceType.CONTROLLER_PARTICIPANT) { |
| addBuiltInStateModelDefinitions(); |
| } |
| } |
| |
| // subscribe to state change |
| _zkclient.subscribeStateChanges(this); |
| int retryCount = 0; |
| while (retryCount < 3) { |
| try { |
| final long sessionId = |
| _zkclient.waitForEstablishedSession(_connectionInitTimeout, TimeUnit.MILLISECONDS); |
| handleStateChanged(KeeperState.SyncConnected); |
| /* |
| * This listener is subscribed after SyncConnected and firing new session events, |
| * which means this listener has not yet handled new session, so we have to handle new |
| * session here just for this listener. |
| */ |
| handleNewSession(ZKUtil.toHexSessionId(sessionId)); |
| break; |
| } catch (HelixException e) { |
| LOG.error("fail to createClient.", e); |
| throw e; |
| } catch (Exception e) { |
| retryCount++; |
| LOG.error("fail to createClient. retry " + retryCount, e); |
| if (retryCount == 3) { |
| throw e; |
| } |
| } |
| } |
| } |
| |
| @Override |
| public void connect() throws Exception { |
| LOG.info("ClusterManager.connect()"); |
| if (isConnected()) { |
| LOG.warn("Cluster manager: " + _instanceName + " for cluster: " + _clusterName |
| + " already connected. skip connect"); |
| return; |
| } |
| |
| switch (_instanceType) { |
| case CONTROLLER: |
| case CONTROLLER_PARTICIPANT: |
| if (_controller == null) { |
| _controller = new GenericHelixController(_clusterName, _enabledPipelineTypes); |
| _messagingService.getExecutor().setController(_controller); |
| } |
| break; |
| default: |
| break; |
| } |
| |
| try { |
| createClient(); |
| _messagingService.onConnected(); |
| } catch (Exception e) { |
| LOG.error("fail to connect " + _instanceName, e); |
| try { |
| disconnect(); |
| } catch (Exception ex) { |
| // if zk connection fails to be created, disconnect may throw exception about reporting disconnect to ZK. |
| } |
| throw e; |
| } |
| } |
| |
| @Override |
| public void disconnect() { |
| if (_zkclient == null || _zkclient.isClosed()) { |
| LOG.info("instanceName: " + _instanceName + " already disconnected"); |
| return; |
| } |
| |
| LOG.info("disconnect " + _instanceName + "(" + _instanceType + ") from " + _clusterName); |
| |
| try { |
| /** |
| * stop all timer tasks |
| */ |
| stopTimerTasks(); |
| |
| /** |
| * shutdown thread pool first to avoid reset() being invoked in the middle of state |
| * transition |
| */ |
| _messagingService.getExecutor().shutdown(); |
| |
| if (!cleanupCallbackHandlers()) { |
| LOG.warn( |
| "The callback handler cleanup has been cleanly done. " |
| + "Some callback handlers might not be reset properly. " |
| + "Continue to finish the other Helix Mananger disconnect tasks."); |
| } |
| } finally { |
| GenericHelixController controller = _controller; |
| if (controller != null) { |
| try { |
| controller.shutdown(); |
| } catch (InterruptedException e) { |
| LOG.info("Interrupted shutting down GenericHelixController", e); |
| } |
| } |
| |
| for (HelixCallbackMonitor callbackMonitor : _callbackMonitors.values()) { |
| callbackMonitor.unregister(); |
| } |
| |
| _helixPropertyStore = null; |
| |
| synchronized (this) { |
| if (_controller != null) { |
| _controller = null; |
| _leaderElectionHandler = null; |
| } |
| |
| if (_participantManager != null) { |
| _participantManager = null; |
| } |
| |
| if (_zkclient != null) { |
| _zkclient.close(); |
| } |
| } |
| _sessionStartTime = null; |
| LOG.info("Cluster manager: " + _instanceName + " disconnected"); |
| } |
| } |
| |
| /** |
| * The callback handler cleanup operations that require an active ZkClient connection. |
| * If ZkClient is not connected, Helix Manager shall skip the cleanup. |
| * |
| * @return true if the cleanup has been done successfully. |
| */ |
| private boolean cleanupCallbackHandlers() { |
| AtomicBoolean cleanupDone = new AtomicBoolean(false); |
| |
| if (_zkclient.waitUntilConnected(_waitForConnectedTimeout, TimeUnit.MILLISECONDS)) { |
| // Create a separate thread for executing cleanup task to avoid forever retry. |
| Thread cleanupThread = new Thread(String |
| .format("Cleanup thread for %s-%s-%s", _clusterName, _instanceName, _instanceType)) { |
| @Override |
| public void run() { |
| // TODO reset user defined handlers only |
| resetHandlers(true); |
| |
| if (_leaderElectionHandler != null) { |
| _leaderElectionHandler.reset(true); |
| } |
| |
| ParticipantManager participantManager = _participantManager; |
| if (participantManager != null) { |
| participantManager.disconnect(); |
| } |
| |
| cleanupDone.set(true); |
| } |
| }; |
| |
| // Define the state listener to terminate the cleanup thread when the ZkConnection breaks. |
| IZkStateListener stateListener = new IZkStateListener() { |
| @Override |
| public void handleStateChanged(KeeperState state) { |
| // If the connection breaks during the cleanup , then stop the cleanup thread. |
| if (state != KeeperState.SyncConnected) { |
| cleanupThread.interrupt(); |
| } |
| } |
| |
| @Override |
| public void handleNewSession(String sessionId) { |
| // nothing |
| } |
| |
| @Override |
| public void handleSessionEstablishmentError(Throwable error) { |
| // nothing |
| } |
| }; |
| |
| cleanupThread.start(); |
| try { |
| // Subscribe and check the connection status one more time to ensure the thread is running |
| // with an active ZkConnection. |
| _zkclient.subscribeStateChanges(stateListener); |
| if (!_zkclient.waitUntilConnected(0, TimeUnit.MILLISECONDS)) { |
| cleanupThread.interrupt(); |
| } |
| |
| try { |
| cleanupThread.join(); |
| } catch (InterruptedException ex) { |
| cleanupThread.interrupt(); |
| } |
| } finally { |
| _zkclient.unsubscribeStateChanges(stateListener); |
| } |
| } else { |
| LOG.warn( |
| "ZkClient is not connected to the Zookeeper. Skip the cleanup work that requires accessing Zookeeper."); |
| } |
| |
| return cleanupDone.get(); |
| } |
| |
| @Override |
| public String getSessionId() { |
| checkConnected(_waitForConnectedTimeout); |
| // TODO: session id should be updated after zk client is connected. |
| // Otherwise, this session id might be an expired one. |
| return _sessionId; |
| } |
| |
| @Override |
| public boolean isConnected() { |
| if (_zkclient == null || _zkclient.isClosed()) { |
| return false; |
| } |
| // Don't check ZkConnection state, which is different from ZkClient's watcher state. |
| // ZkConnection state is the internal state of the connection, which can be different from the |
| // ZkClient state due to internal thread/retry logic. |
| try { |
| return _zkclient.waitUntilConnected(0, TimeUnit.MILLISECONDS); |
| } catch (ZkInterruptedException ex) { |
| return false; |
| } |
| } |
| |
| @Override |
| public long getLastNotificationTime() { |
| return 0; |
| } |
| |
| @Override |
| public void addPreConnectCallback(PreConnectCallback callback) { |
| LOG.info("Adding preconnect callback: " + callback); |
| _preConnectCallbacks.add(callback); |
| } |
| |
| @Override |
| public boolean isLeader() { |
| return getSessionIdIfLead().isPresent(); |
| } |
| |
| @Override |
| public Optional<String> getSessionIdIfLead() { |
| String warnLogPrefix = String |
| .format("Instance %s is not leader of cluster %s due to", _instanceName, _clusterName); |
| if (_instanceType != InstanceType.CONTROLLER |
| && _instanceType != InstanceType.CONTROLLER_PARTICIPANT) { |
| LOG.warn(String |
| .format("%s instance type %s does not match to CONTROLLER/CONTROLLER_PARTICIPANT", |
| warnLogPrefix, _instanceType.name())); |
| return Optional.empty(); |
| } |
| |
| if (!isConnected()) { |
| LOG.warn(String.format("%s HelixManager is not connected", warnLogPrefix)); |
| return Optional.empty(); |
| } |
| |
| try { |
| LiveInstance leader = _dataAccessor.getProperty(_keyBuilder.controllerLeader()); |
| if (leader != null) { |
| String leaderName = leader.getInstanceName(); |
| String sessionId = leader.getEphemeralOwner(); |
| if (leaderName != null && leaderName.equals(_instanceName) && sessionId |
| .equals(_sessionId)) { |
| // Ensure the same leader session is set and returned. If we get _session from helix |
| // manager, _session might change after the check. This guarantees the session is |
| // leader's session we checked. |
| return Optional.of(sessionId); |
| } |
| LOG.warn(String |
| .format("%s current session %s does not match leader session %s", warnLogPrefix, |
| _sessionId, sessionId)); |
| } else { |
| LOG.warn(String.format("%s leader ZNode is null", warnLogPrefix)); |
| } |
| } catch (Exception e) { |
| LOG.warn(String.format("%s exception happen when session check", warnLogPrefix), e); |
| } |
| return Optional.empty(); |
| } |
| |
| @Override |
| public synchronized ZkHelixPropertyStore<ZNRecord> getHelixPropertyStore() { |
| checkConnected(_waitForConnectedTimeout); |
| |
| if (_helixPropertyStore == null) { |
| String path = PropertyPathBuilder.propertyStore(_clusterName); |
| String fallbackPath = String.format("/%s/%s", _clusterName, "HELIX_PROPERTYSTORE"); |
| _helixPropertyStore = |
| new AutoFallbackPropertyStore<>(new ZkBaseDataAccessor<>(_zkclient), |
| path, fallbackPath); |
| } |
| |
| return _helixPropertyStore; |
| } |
| |
| @Override |
| public synchronized HelixAdmin getClusterManagmentTool() { |
| checkConnected(_waitForConnectedTimeout); |
| if (_zkclient != null && !_zkclient.isClosed()) { |
| return new ZKHelixAdmin(_zkclient); |
| } |
| |
| LOG.error("Couldn't get ZKClusterManagementTool because zkclient is null"); |
| return null; |
| } |
| |
| @Override |
| public ClusterMessagingService getMessagingService() { |
| // The caller can register message handler factories on messaging service before the |
| // helix manager is connected. Thus we do not do connected check here. |
| return _messagingService; |
| } |
| |
| @Override |
| public InstanceType getInstanceType() { |
| return _instanceType; |
| } |
| |
| @Override |
| public String getVersion() { |
| return _version; |
| } |
| |
| @Override |
| public HelixManagerProperties getProperties() { |
| return _properties; |
| } |
| |
| @Override |
| public StateMachineEngine getStateMachineEngine() { |
| return _stateMachineEngine; |
| } |
| |
| // TODO: rename this and not expose this function as part of interface |
| @Override |
| public void startTimerTasks() { |
| for (HelixTimerTask task : _timerTasks) { |
| task.start(); |
| } |
| } |
| |
| @Override |
| public void stopTimerTasks() { |
| for (HelixTimerTask task : _timerTasks) { |
| task.stop(); |
| } |
| } |
| |
| @Override |
| public void setLiveInstanceInfoProvider(LiveInstanceInfoProvider liveInstanceInfoProvider) { |
| _liveInstanceInfoProvider = liveInstanceInfoProvider; |
| } |
| |
| /** |
| * wait until we get a non-zero session-id. note that we might lose zkconnection |
| * right after we read session-id. but it's ok to get stale session-id and we will have |
| * another handle-new-session callback to correct this. |
| */ |
| void waitUntilConnected() { |
| boolean isConnected; |
| do { |
| isConnected = |
| _zkclient.waitUntilConnected(HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS); |
| if (!isConnected) { |
| LOG.error("fail to connect zkserver: " + getZkConnectionInfo() + " in " |
| + HelixZkClient.DEFAULT_CONNECTION_TIMEOUT + "ms. expiredSessionId: " + _sessionId |
| + ", clusterName: " + _clusterName); |
| continue; |
| } |
| |
| _sessionId = ZKUtil.toHexSessionId(_zkclient.getSessionId()); |
| |
| /** |
| * at the time we read session-id, zkconnection might be lost again |
| * wait until we get a non-zero session-id |
| */ |
| } while (!isConnected || "0".equals(_sessionId)); |
| |
| LOG.info("Handling new session, session id: " + _sessionId + ", instance: " + _instanceName |
| + ", instanceTye: " + _instanceType + ", cluster: " + _clusterName); |
| } |
| |
| void initHandlers(List<CallbackHandler> handlers) { |
| synchronized (this) { |
| if (handlers != null) { |
| // get a copy of the list and iterate over the copy list |
| // in case handler.init() modifies the original handler list |
| List<CallbackHandler> tmpHandlers = new ArrayList<>(handlers); |
| for (CallbackHandler handler : tmpHandlers) { |
| handler.init(); |
| LOG.info("init handler: " + handler.getPath() + ", " + handler.getListener()); |
| } |
| } |
| } |
| } |
| |
| void resetHandlers(boolean isShutdown) { |
| synchronized (this) { |
| if (_handlers != null) { |
| // get a copy of the list and iterate over the copy list |
| // in case handler.reset() modifies the original handler list |
| List<CallbackHandler> tmpHandlers = new ArrayList<>(_handlers); |
| for (CallbackHandler handler : tmpHandlers) { |
| handler.reset(isShutdown); |
| LOG.info("reset handler: " + handler.getPath() + ", " + handler.getListener()); |
| } |
| } |
| } |
| } |
| |
| /** |
| * If zk state has changed into Disconnected for maxDisconnectThreshold times during previous |
| * timeWindowLengthMs Ms |
| * time window, we think that there are something wrong going on and disconnect the zkHelixManager |
| * from zk. |
| */ |
| boolean isFlapping() { |
| if (_disconnectTimeHistory.size() == 0) { |
| return false; |
| } |
| long mostRecentTimestamp = _disconnectTimeHistory.get(_disconnectTimeHistory.size() - 1); |
| |
| // Remove disconnect history timestamp that are older than flappingTimeWindowMs ago |
| while ((_disconnectTimeHistory.get(0) + _flappingTimeWindowMs) < mostRecentTimestamp) { |
| _disconnectTimeHistory.remove(0); |
| } |
| return _disconnectTimeHistory.size() > _maxDisconnectThreshold; |
| } |
| |
| @Override |
| public void handleStateChanged(KeeperState state) { |
| switch (state) { |
| case SyncConnected: |
| LOG.info("KeeperState: " + state + ", instance: " + _instanceName + ", type: " + _instanceType); |
| break; |
| case Disconnected: |
| /** |
| * Track the time stamp that the disconnected happens, then check history and see if |
| * we should disconnect the helix-manager |
| */ |
| _disconnectTimeHistory.add(System.currentTimeMillis()); |
| if (isFlapping()) { |
| String errorMsg = "instanceName: " + _instanceName + " is flapping. disconnect it. " |
| + " maxDisconnectThreshold: " + _maxDisconnectThreshold + " disconnects in " |
| + _flappingTimeWindowMs + "ms."; |
| LOG.error(errorMsg); |
| |
| // Only disable the instance when it's instance type is PARTICIPANT |
| if (_instanceType.equals(InstanceType.PARTICIPANT)) { |
| LOG.warn("instanceName: " + _instanceName |
| + " is flapping. Since it is a participant, disable it."); |
| try { |
| getClusterManagmentTool().enableInstance(_clusterName, _instanceName, false); |
| } catch (Exception e) { |
| LOG.error("Failed to disable participant before disconnecting participant.", e); |
| } |
| } |
| |
| try { |
| // TODO Call disconnect in another thread. |
| // handleStateChanged is triggered in ZkClient eventThread. The disconnect logic will |
| // interrupt this thread. This issue prevents the ZkClient.close() from complete. So the |
| // client is left in a strange state. |
| disconnect(); |
| } catch (Exception ex) { |
| LOG.error("Disconnect HelixManager is not completely done.", ex); |
| } |
| |
| if (_stateListener != null) { |
| try { |
| _stateListener.onDisconnected(this, new HelixException(errorMsg)); |
| } catch (Exception e) { |
| LOG.warn("stateListener.onDisconnected callback fails", e); |
| } |
| } |
| break; |
| } |
| // if not flapping, share the continuous logic with Expired case |
| case Expired: |
| LOG.warn("KeeperState:" + state + ", SessionId: " + _sessionId + ", instance: " |
| + _instanceName + ", type: " + _instanceType); |
| break; |
| default: |
| LOG.info("KeeperState:" + state + ", currentSessionId: " + _sessionId + ", instance: " |
| + _instanceName + ", type: " + _instanceType); |
| break; |
| } |
| } |
| |
| /** |
| * Called after zookeeper session has expired and a new session has been established. This method |
| * may cause session race condition when creating ephemeral nodes. Internally, this method calls |
| * {@link #handleNewSession(String)} with a null value as the sessionId parameter, which results |
| * in later creating the ephemeral node in the session of the latest zk connection. |
| * But please note that the session of the latest zk connection might not be the expected session. |
| * This is the session race condition issue. |
| * |
| * To avoid the race condition issue, please use {@link #handleNewSession(String)}. |
| * |
| * @deprecated |
| * This method is deprecated, because it may cause session race condition when creating ephemeral |
| * nodes. It is kept for backward compatibility in case a user class extends this class. |
| * |
| * Please use {@link #handleNewSession(String)} instead, which takes care of race condition. |
| * |
| * @throws Exception If any error occurs. |
| */ |
| @Deprecated |
| public void handleNewSession() throws Exception { |
| handleNewSession(null); |
| } |
| |
| /** |
| * Called after the zookeeper session has expired and a new session has been established. This |
| * methods handles a new session with its session id passed in. Before handling, this method |
| * waits until zk client is connected to zk service and gets a non-zero session id(current actual |
| * session id). If the passed-in(expected) session id does not match current actual session id, |
| * the expected session id is expired and will NOT be handled. |
| * |
| * @param sessionId the new session's id. The ephemeral nodes are expected to be created in this |
| * session. If this session id is expired, ephemeral nodes should not be created. |
| * @throws Exception if any error occurs during handling new session |
| */ |
| @Override |
| public void handleNewSession(String sessionId) throws Exception { |
| /* |
| * TODO: after removing I0ItecIZkStateListenerImpl, null session should be checked and |
| * discarded. |
| * Null session is still a special case here, which is treated as non-session aware operation. |
| * This special case could still potentially cause race condition, so null session should NOT |
| * be acceptable, once I0ItecIZkStateListenerImpl is removed. Currently this special case |
| * is kept for backward compatibility. |
| */ |
| |
| // Wait until we get a non-zero session id. Otherwise, getSessionId() might be null. |
| waitUntilConnected(); |
| |
| /* |
| * Filter out stale sessions. If a session id is not null and not the same as current session |
| * id, this session is expired. With this filtering, expired sessions are NOT handled, |
| * so performance is expected to improve. |
| */ |
| if (sessionId != null && !getSessionId().equals(sessionId)) { |
| LOG.warn("Session is expired and not handled. Expected: {}. Actual: {}.", sessionId, |
| getSessionId()); |
| return; |
| } |
| |
| /* |
| * When a null session id is passed in, we will take current session's id for following |
| * operations. Please note that current session might not be the one we expect to handle, |
| * because the one we expect might be already expired when the zk event is waiting in the |
| * event queue. Why we use current session here is for backward compatibility with the old |
| * method handleNewSession(). |
| */ |
| if (sessionId == null) { |
| sessionId = getSessionId(); |
| LOG.debug("Session id: <null> is passed in. Current session id: {} will be used.", sessionId); |
| } |
| |
| LOG.info("Handle new session, instance: {}, type: {}, session id: {}.", _instanceName, |
| _instanceType, sessionId); |
| |
| // Will only create live instance |
| LiveInstance.LiveInstanceStatus liveInstanceStatus = |
| _messagingService.getExecutor().getLiveInstanceStatus(); |
| if (LiveInstance.LiveInstanceStatus.PAUSED |
| .equals(liveInstanceStatus)) { |
| handleNewSessionInManagementMode(sessionId, liveInstanceStatus); |
| return; |
| } |
| |
| /** |
| * stop all timer tasks, reset all handlers, make sure cleanup completed for previous session |
| * disconnect if fail to cleanup |
| */ |
| stopTimerTasks(); |
| if (_leaderElectionHandler != null) { |
| _leaderElectionHandler.reset(false); |
| } |
| resetHandlers(false); |
| |
| /** |
| * clean up write-through cache |
| */ |
| _baseDataAccessor.reset(); |
| |
| /** |
| * from here on, we are dealing with new session |
| */ |
| if (!ZKUtil.isClusterSetup(_clusterName, _zkclient)) { |
| throw new HelixException("Cluster structure is not set up for cluster: " + _clusterName); |
| } |
| |
| _sessionStartTime = System.currentTimeMillis(); |
| |
| switch (_instanceType) { |
| case PARTICIPANT: |
| handleNewSessionAsParticipant(sessionId, _liveInstanceInfoProvider); |
| break; |
| case CONTROLLER: |
| handleNewSessionAsController(); |
| break; |
| case CONTROLLER_PARTICIPANT: |
| handleNewSessionAsParticipant(sessionId, _liveInstanceInfoProvider); |
| handleNewSessionAsController(); |
| break; |
| case ADMINISTRATOR: |
| case SPECTATOR: |
| default: |
| break; |
| } |
| |
| startTimerTasks(); |
| |
| /** |
| * init handlers |
| * ok to init message handler and data-accessor twice |
| * the second init will be skipped (see CallbackHandler) |
| */ |
| initHandlers(_handlers); |
| |
| if (_stateListener != null) { |
| try { |
| _stateListener.onConnected(this); |
| } catch (Exception e) { |
| LOG.warn("stateListener.onConnected callback fails", e); |
| } |
| } |
| } |
| |
| private void handleNewSessionInManagementMode(String sessionId, |
| LiveInstance.LiveInstanceStatus liveInstanceStatus) throws Exception { |
| LOG.info("Skip reset because instance is in {} status", LiveInstance.LiveInstanceStatus.PAUSED); |
| if (!InstanceType.PARTICIPANT.equals(_instanceType) |
| && !InstanceType.CONTROLLER_PARTICIPANT.equals(_instanceType)) { |
| return; |
| } |
| // Add STATUS to info provider so the new live instance will have STATUS field |
| handleNewSessionAsParticipant(sessionId, |
| new LiveInstanceStatusInfoProvider(liveInstanceStatus)); |
| } |
| |
| void handleNewSessionAsParticipant(final String sessionId, LiveInstanceInfoProvider provider) |
| throws Exception { |
| if (_participantManager != null) { |
| _participantManager.reset(); |
| } |
| _participantManager = |
| new ParticipantManager(this, _zkclient, _sessionTimeout, provider, |
| _preConnectCallbacks, sessionId, _helixManagerProperty); |
| _participantManager.handleNewSession(); |
| } |
| |
| // TODO: pass in session id and make this method session aware to avoid potential session race |
| // condition. |
| void handleNewSessionAsController() { |
| if (_leaderElectionHandler != null) { |
| _leaderElectionHandler.init(); |
| } else { |
| _leaderElectionHandler = |
| new CallbackHandler(this, _zkclient, _keyBuilder.controller(), |
| new DistributedLeaderElection(this, _controller, _controllerTimerTasks), |
| new EventType[] { |
| EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated |
| }, ChangeType.CONTROLLER, _callbackMonitors.get(ChangeType.CONTROLLER)); |
| } |
| } |
| |
| @Override |
| public ParticipantHealthReportCollector getHealthReportCollector() { |
| return _participantHealthInfoCollector; |
| } |
| |
| @Override |
| public void handleSessionEstablishmentError(Throwable error) throws Exception { |
| LOG.warn("Handling Session Establishment Error. Disconnect Helix Manager.", error); |
| disconnect(); |
| |
| if (_stateListener != null) { |
| _stateListener.onDisconnected(this, error); |
| } |
| } |
| |
| @Override |
| public Long getSessionStartTime() { |
| return _sessionStartTime; |
| } |
| |
| /* |
| * Prepares connection config and client config based on the internal parameters given to |
| * HelixManager in order to create a ZkClient instance to use. Note that a shared ZkClient |
| * instance will be created if connecting as an ADMINISTRATOR to minimize the cost of creating |
| * ZkConnections. |
| */ |
| private RealmAwareZkClient createSingleRealmZkClient() { |
| final String shardingKey = HelixUtil.clusterNameToShardingKey(_clusterName); |
| PathBasedZkSerializer zkSerializer = |
| ChainedPathZkSerializer.builder(new ZNRecordSerializer()).build(); |
| |
| // If the user supplied RealmAwareZkConnectionConfig, then use it. Only create the connection |
| // config if nothing is given |
| if (_realmAwareZkConnectionConfig == null) { |
| // If no connection config is given, use the single realm mode with the cluster name as the |
| // key |
| _realmAwareZkConnectionConfig = new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder() |
| .setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM) |
| .setZkRealmShardingKey(shardingKey).setSessionTimeout(_sessionTimeout).build(); |
| } |
| |
| RealmAwareZkClient.RealmAwareZkClientConfig clientConfig = |
| new RealmAwareZkClient.RealmAwareZkClientConfig(); |
| |
| clientConfig.setZkSerializer(zkSerializer) |
| .setConnectInitTimeout(_connectionInitTimeout) |
| .setMonitorType(_instanceType.name()) |
| .setMonitorKey(_clusterName) |
| .setMonitorInstanceName(_instanceName) |
| .setMonitorRootPathOnly(isMonitorRootPathOnly()); |
| |
| if (_instanceType == InstanceType.ADMINISTRATOR) { |
| return resolveZkClient(SharedZkClientFactory.getInstance(), _realmAwareZkConnectionConfig, |
| clientConfig); |
| } |
| |
| return resolveZkClient(DedicatedZkClientFactory.getInstance(), _realmAwareZkConnectionConfig, |
| clientConfig); |
| } |
| |
| /* |
| * Resolves what type of ZkClient this HelixManager should use based on whether MULTI_ZK_ENABLED |
| * System config is set or not. Two types of ZkClients are available: |
| * 1) If MULTI_ZK_ENABLED is set to true or zkAddress is null, we create a dedicated |
| * RealmAwareZkClient that provides full ZkClient functionalities and connects to the correct ZK |
| * by querying MetadataStoreDirectoryService. |
| * 2) Otherwise, we create a dedicated HelixZkClient which plainly connects to |
| * the ZK address given. |
| */ |
| private RealmAwareZkClient resolveZkClient(HelixZkClientFactory zkClientFactory, |
| RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig, |
| RealmAwareZkClient.RealmAwareZkClientConfig clientConfig) { |
| if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED) || _zkAddress == null) { |
| try { |
| // Create realm-aware ZkClient. |
| return zkClientFactory.buildZkClient(connectionConfig, clientConfig); |
| } catch (IllegalArgumentException | IOException | InvalidRoutingDataException e) { |
| throw new HelixException("Not able to connect on realm-aware mode for sharding key: " |
| + connectionConfig.getZkRealmShardingKey(), e); |
| } |
| } |
| |
| // If multi-zk mode is not enabled, create HelixZkClient with the provided zk address. |
| HelixZkClient.ZkClientConfig helixZkClientConfig = clientConfig.createHelixZkClientConfig(); |
| HelixZkClient.ZkConnectionConfig helixZkConnectionConfig = |
| new HelixZkClient.ZkConnectionConfig(_zkAddress) |
| .setSessionTimeout(connectionConfig.getSessionTimeout()); |
| |
| return zkClientFactory.buildZkClient(helixZkConnectionConfig, helixZkClientConfig); |
| } |
| |
| /** |
| * Check that not both zkAddress and ZkConnectionConfig are set. |
| * If zkAddress is not given and ZkConnectionConfig is given, check that ZkConnectionConfig has |
| * a ZK path sharding key set because HelixManager must work on single-realm mode. |
| * @param zkAddress |
| * @param helixManagerProperty |
| */ |
| private void validateZkConnectionSettings(String zkAddress, |
| HelixManagerProperty helixManagerProperty) { |
| if (helixManagerProperty != null && helixManagerProperty.getZkConnectionConfig() != null) { |
| if (zkAddress != null) { |
| throw new HelixException( |
| "ZKHelixManager: cannot have both ZkAddress and ZkConnectionConfig set!"); |
| } |
| RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig = |
| helixManagerProperty.getZkConnectionConfig(); |
| if (connectionConfig.getZkRealmShardingKey() == null || connectionConfig |
| .getZkRealmShardingKey().isEmpty()) { |
| throw new HelixException( |
| "ZKHelixManager::ZK path sharding key must be set for ZKHelixManager! ZKHelixManager " |
| + "is only available on single-realm mode."); |
| } |
| _realmAwareZkConnectionConfig = connectionConfig; |
| } |
| } |
| |
| /** |
| * Resolve ZK connection info for logging purposes. |
| * @return |
| */ |
| private String getZkConnectionInfo() { |
| String zkConnectionInfo; |
| if (_zkAddress == null) { |
| if (_helixManagerProperty != null && _helixManagerProperty.getZkConnectionConfig() != null) { |
| zkConnectionInfo = _helixManagerProperty.getZkConnectionConfig().toString(); |
| } else { |
| zkConnectionInfo = "None"; |
| } |
| } else { |
| zkConnectionInfo = _zkAddress; |
| } |
| return zkConnectionInfo; |
| } |
| |
| /* |
| * Provides live instance status as additional live instance info in the info provider. |
| */ |
| private static class LiveInstanceStatusInfoProvider implements LiveInstanceInfoProvider { |
| private final ZNRecord _record; |
| |
| public LiveInstanceStatusInfoProvider(LiveInstance.LiveInstanceStatus status) { |
| _record = new ZNRecord("STATUS_PROVIDER"); |
| _record.setEnumField(LiveInstance.LiveInstanceProperty.STATUS.name(), status); |
| } |
| |
| @Override |
| public ZNRecord getAdditionalLiveInstanceInfo() { |
| return _record; |
| } |
| } |
| } |