package org.apache.helix.manager.zk;

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.Timer;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.management.JMException;

import com.google.common.collect.Sets;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.ClusterMessagingService;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixCloudProperty;
import org.apache.helix.HelixConstants.ChangeType;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerProperties;
import org.apache.helix.HelixManagerProperty;
import org.apache.helix.HelixPropertyFactory;
import org.apache.helix.HelixTimerTask;
import org.apache.helix.InstanceType;
import org.apache.helix.LiveInstanceInfoProvider;
import org.apache.helix.PreConnectCallback;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.PropertyPathBuilder;
import org.apache.helix.PropertyType;
import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.api.listeners.ClusterConfigChangeListener;
import org.apache.helix.api.listeners.ConfigChangeListener;
import org.apache.helix.api.listeners.ControllerChangeListener;
import org.apache.helix.api.listeners.CurrentStateChangeListener;
import org.apache.helix.api.listeners.CustomizedStateChangeListener;
import org.apache.helix.api.listeners.CustomizedStateConfigChangeListener;
import org.apache.helix.api.listeners.CustomizedStateRootChangeListener;
import org.apache.helix.api.listeners.CustomizedViewChangeListener;
import org.apache.helix.api.listeners.CustomizedViewRootChangeListener;
import org.apache.helix.api.listeners.ExternalViewChangeListener;
import org.apache.helix.api.listeners.IdealStateChangeListener;
import org.apache.helix.api.listeners.InstanceConfigChangeListener;
import org.apache.helix.api.listeners.LiveInstanceChangeListener;
import org.apache.helix.api.listeners.MessageListener;
import org.apache.helix.api.listeners.ResourceConfigChangeListener;
import org.apache.helix.api.listeners.ScopedConfigChangeListener;
import org.apache.helix.cloud.event.CloudEventHandlerFactory;
import org.apache.helix.cloud.event.CloudEventListener;
import org.apache.helix.cloud.event.helix.CloudEventCallbackProperty;
import org.apache.helix.cloud.event.helix.HelixCloudEventListener;
import org.apache.helix.controller.GenericHelixController;
import org.apache.helix.controller.pipeline.Pipeline;
import org.apache.helix.healthcheck.ParticipantHealthReportCollector;
import org.apache.helix.healthcheck.ParticipantHealthReportCollectorImpl;
import org.apache.helix.healthcheck.ParticipantHealthReportTask;
import org.apache.helix.messaging.DefaultMessagingService;
import org.apache.helix.model.BuiltInStateModelDefinitions;
import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.monitoring.ZKPathDataDumpTask;
import org.apache.helix.monitoring.mbeans.HelixCallbackMonitor;
import org.apache.helix.monitoring.mbeans.MonitorLevel;
import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
import org.apache.helix.participant.HelixStateMachineEngine;
import org.apache.helix.participant.StateMachineEngine;
import org.apache.helix.store.zk.AutoFallbackPropertyStore;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.util.HelixUtil;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.datamodel.serializer.ChainedPathZkSerializer;
import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
import org.apache.helix.zookeeper.impl.factory.HelixZkClientFactory;
import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
import org.apache.helix.zookeeper.zkclient.IZkStateListener;
import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
import org.apache.helix.zookeeper.zkclient.serialize.PathBasedZkSerializer;
import org.apache.zookeeper.Watcher.Event.EventType;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ZKHelixManager implements HelixManager, IZkStateListener {
  private static Logger LOG = LoggerFactory.getLogger(ZKHelixManager.class);

  public static final String ALLOW_PARTICIPANT_AUTO_JOIN = "allowParticipantAutoJoin";
  private static final int FLAPPING_TIME_WINDOW = 300000; // Default to 300 sec
  public static final int DEFAULT_MAX_DISCONNECT_THRESHOLD = 600; // Default to be a large number
  private static final int DEFAULT_WAIT_CONNECTED_TIMEOUT = 10 * 1000;  // wait until connected for up to 10 seconds.

  protected final String _zkAddress;
  private final String _clusterName;
  private final String _instanceName;
  private final InstanceType _instanceType;
  private final int _waitForConnectedTimeout; // wait time for testing connect
  private final int _sessionTimeout; // client side session timeout, will be overridden by server timeout. Disconnect after timeout
  private final int _connectionInitTimeout; // client timeout to init connect
  private final List<PreConnectCallback> _preConnectCallbacks;
  protected final List<CallbackHandler> _handlers;
  private final HelixManagerProperties _properties;
  protected final HelixManagerProperty _helixManagerProperty;
  private final HelixManagerStateListener _stateListener;

  /**
   * helix version#
   */
  private final String _version;
  private int _reportLatency;

  protected RealmAwareZkClient _zkclient;
  private RealmAwareZkClient.RealmAwareZkConnectionConfig _realmAwareZkConnectionConfig;
  private final DefaultMessagingService _messagingService;
  private Map<ChangeType, HelixCallbackMonitor> _callbackMonitors;

  private final MonitorLevel _monitorLevel;

  private BaseDataAccessor<ZNRecord> _baseDataAccessor;
  private ZKHelixDataAccessor _dataAccessor;
  private final Builder _keyBuilder;
  private ConfigAccessor _configAccessor;
  private ZkHelixPropertyStore<ZNRecord> _helixPropertyStore;
  protected LiveInstanceInfoProvider _liveInstanceInfoProvider = null;

  private volatile String _sessionId;

  /**
   * Keep track of timestamps that zk State has become Disconnected If in a _timeWindowLengthMs
   * window zk State has become Disconnected for more than_maxDisconnectThreshold times disconnect
   * the zkHelixManager
   */
  private final List<Long> _disconnectTimeHistory = new ArrayList<Long>();
  private final int _flappingTimeWindowMs;
  private final int _maxDisconnectThreshold;

  /**
   * participant fields
   */
  private final StateMachineEngine _stateMachineEngine;
  private final List<HelixTimerTask> _timerTasks = new ArrayList<>();

  private final ParticipantHealthReportCollectorImpl _participantHealthInfoCollector;
  private Long _sessionStartTime;
  private ParticipantManager _participantManager;

  /**
   * controller fields
   */
  private GenericHelixController _controller;
  private Set<Pipeline.Type> _enabledPipelineTypes;
  private CallbackHandler _leaderElectionHandler = null;
  protected final List<HelixTimerTask> _controllerTimerTasks = new ArrayList<>();

  /**
   * Cloud fields
   */
  private CloudEventListener _cloudEventListener;

  /**
   * status dump timer-task
   */
  protected static class StatusDumpTask extends HelixTimerTask {
    final HelixManager helixController;

    public StatusDumpTask(HelixManager helixController) {
      this.helixController = helixController;
    }

    @Override
    public synchronized void start() {
      long initialDelay = 0;
      long period = 15 * 60 * 1000;
      long timeThresholdNoChangeForStatusUpdates = 15 * 60 * 1000; // 15 minutes
      long timeThresholdNoChangeForErrors = 24 * 60 * 60 * 1000; // 1 day
      int maximumNumberOfLeafNodesAllowed = 100;

      if (_timer == null) {
        LOG.info("Start StatusDumpTask");
        _timer = new Timer("StatusDumpTimerTask", true);
        _timer.scheduleAtFixedRate(
            new ZKPathDataDumpTask(helixController, timeThresholdNoChangeForStatusUpdates,
                timeThresholdNoChangeForErrors, maximumNumberOfLeafNodesAllowed), initialDelay,
            period);
      }
    }

    @Override
    public synchronized void stop() {
      if (_timer != null) {
        LOG.info("Stop StatusDumpTask");
        _timer.cancel();
        _timer = null;
      }
    }
  }

  public ZKHelixManager(String clusterName, String instanceName, InstanceType instanceType,
      String zkAddress) {
    this(clusterName, instanceName, instanceType, zkAddress, null);
  }

  public ZKHelixManager(String clusterName, String instanceName, InstanceType instanceType,
      String zkAddress, HelixManagerStateListener stateListener) {
    this(clusterName, instanceName, instanceType, zkAddress, stateListener,
        HelixPropertyFactory.getInstance().getHelixManagerProperty(zkAddress, clusterName));
  }

  public ZKHelixManager(String clusterName, String instanceName, InstanceType instanceType,
      String zkAddress, HelixManagerStateListener stateListener,
      HelixManagerProperty helixManagerProperty) {
    validateZkConnectionSettings(zkAddress, helixManagerProperty);

    _zkAddress = zkAddress;
    _clusterName = clusterName;
    _instanceType = instanceType;
    LOG.info("Create a zk-based cluster manager. ZK connection: " + getZkConnectionInfo()
        + ", clusterName: " + clusterName + ", instanceName: " + instanceName + ", type: "
        + instanceType);

    if (instanceName == null) {
      try {
        instanceName =
            InetAddress.getLocalHost().getCanonicalHostName() + "-" + instanceType.toString();
      } catch (UnknownHostException e) {
        // can ignore it
        LOG.info("Unable to get host name. Will set it to UNKNOWN, mostly ignorable", e);
        instanceName = "UNKNOWN";
      }
    }

    _instanceName = instanceName;
    _enabledPipelineTypes =
        Sets.newHashSet(Pipeline.Type.DEFAULT, Pipeline.Type.TASK);
    _preConnectCallbacks = new ArrayList<>();
    _handlers = new ArrayList<>();
    _properties = new HelixManagerProperties(SystemPropertyKeys.CLUSTER_MANAGER_VERSION);
    _version = _properties.getVersion();

    _keyBuilder = new Builder(clusterName);
    _messagingService = new DefaultMessagingService(this);
    try {
      _callbackMonitors = new HashMap<>();
      for (ChangeType changeType : ChangeType.values()) {
        HelixCallbackMonitor callbackMonitor =
            new HelixCallbackMonitor(instanceType, clusterName, instanceName, changeType);
        callbackMonitor.register();
        _callbackMonitors.put(changeType, callbackMonitor);
      }
    } catch (JMException e) {
      LOG.error("Error in creating callback monitor.", e);
    }

    _stateListener = stateListener;
    // read cloud config from ZK and set cloudConfig in HelixManagerProperty
    _helixManagerProperty = helixManagerProperty;
    _helixManagerProperty.getHelixCloudProperty().populateFieldsWithCloudConfig(
        HelixPropertyFactory.getCloudConfig(_zkAddress, _clusterName));

    /**
     * use system property if available
     */
    _flappingTimeWindowMs = HelixUtil.getSystemPropertyAsInt(SystemPropertyKeys.FLAPPING_TIME_WINDOW,
        ZKHelixManager.FLAPPING_TIME_WINDOW);

    _maxDisconnectThreshold = HelixUtil
        .getSystemPropertyAsInt(SystemPropertyKeys.MAX_DISCONNECT_THRESHOLD,
            ZKHelixManager.DEFAULT_MAX_DISCONNECT_THRESHOLD);

    _sessionTimeout = HelixUtil.getSystemPropertyAsInt(SystemPropertyKeys.ZK_SESSION_TIMEOUT,
        HelixZkClient.DEFAULT_SESSION_TIMEOUT);

    _connectionInitTimeout = HelixUtil
        .getSystemPropertyAsInt(SystemPropertyKeys.ZK_CONNECTION_TIMEOUT,
            HelixZkClient.DEFAULT_CONNECTION_TIMEOUT);

    _waitForConnectedTimeout = HelixUtil
        .getSystemPropertyAsInt(SystemPropertyKeys.ZK_WAIT_CONNECTED_TIMEOUT,
            DEFAULT_WAIT_CONNECTED_TIMEOUT);

    _reportLatency = HelixUtil
        .getSystemPropertyAsInt(SystemPropertyKeys.PARTICIPANT_HEALTH_REPORT_LATENCY,
            ParticipantHealthReportTask.DEFAULT_REPORT_LATENCY);

    MonitorLevel configuredMonitorLevel;
    try {
      configuredMonitorLevel = MonitorLevel.valueOf(
          System.getProperty(SystemPropertyKeys.MONITOR_LEVEL, MonitorLevel.DEFAULT.name()));
    } catch (IllegalArgumentException ex) {
      LOG.warn("Unrecognizable Monitor Level configuration. Use DEFAULT monitor level.", ex);
      configuredMonitorLevel = MonitorLevel.DEFAULT;
    }
    _monitorLevel = configuredMonitorLevel;

    /**
     * instance type specific init
     */
    switch (instanceType) {
    case PARTICIPANT:
      _stateMachineEngine = new HelixStateMachineEngine(this);
      _participantHealthInfoCollector =
          new ParticipantHealthReportCollectorImpl(this, _instanceName);
      _timerTasks
          .add(new ParticipantHealthReportTask(_participantHealthInfoCollector, _reportLatency));
      break;
    case CONTROLLER:
      _stateMachineEngine = null;
      _participantHealthInfoCollector = null;
      _controllerTimerTasks.add(new StatusDumpTask(this));
      break;
    case CONTROLLER_PARTICIPANT:
      _stateMachineEngine = new HelixStateMachineEngine(this);
      _participantHealthInfoCollector =
          new ParticipantHealthReportCollectorImpl(this, _instanceName);
      _timerTasks
          .add(new ParticipantHealthReportTask(_participantHealthInfoCollector, _reportLatency));
      _controllerTimerTasks.add(new StatusDumpTask(this));
      break;
    case ADMINISTRATOR:
    case SPECTATOR:
      _stateMachineEngine = null;
      _participantHealthInfoCollector = null;
      break;
    default:
      throw new IllegalArgumentException("unrecognized type: " + instanceType);
    }
  }

  public void setEnabledControlPipelineTypes(Set<Pipeline.Type> types) {
    if (!InstanceType.CONTROLLER.equals(_instanceType) && !InstanceType.CONTROLLER_PARTICIPANT
        .equals(_instanceType)) {
      throw new IllegalStateException(
          String.format("Cannot enable control pipeline for instance type %s", _instanceType));
    }
    _enabledPipelineTypes = types;
  }

  @Override
  public boolean removeListener(PropertyKey key, Object listener) {
    LOG.info("Removing listener: " + listener + " on path: " + key.getPath() + " from cluster: "
        + _clusterName + " by instance: " + _instanceName);

    synchronized (this) {
      List<CallbackHandler> toRemove = new ArrayList<>();
      for (CallbackHandler handler : _handlers) {
        // compare property-key path and listener reference
        if (handler.getPath().equals(key.getPath()) && handler.getListener().equals(listener)) {
          toRemove.add(handler);
        }
      }

      _handlers.removeAll(toRemove);

      // handler.reset() may modify the handlers list, so do it outside the iteration
      for (CallbackHandler handler : toRemove) {
        handler.reset(true);
      }
    }

    return true;
  }

  void checkConnected() {
    checkConnected(-1);
  }

  /**
   * Check if HelixManager is connected, if it is not connected,
   * wait for the specified timeout and check again before return.
   *
   * @param timeout
   */
  void checkConnected(long timeout) {
    if (_zkclient == null || _zkclient.isClosed()) {
      throw new HelixException(
          "HelixManager (ZkClient) is not connected. Call HelixManager#connect()");
    }

    boolean isConnected = isConnected();
    if (!isConnected && timeout > 0) {
      LOG.warn("zkClient to " + getZkConnectionInfo() + " is not connected, wait for "
          + _waitForConnectedTimeout + "ms.");
      isConnected = _zkclient.waitUntilConnected(_waitForConnectedTimeout, TimeUnit.MILLISECONDS);
    }

    if (!isConnected) {
      LOG.error("zkClient is not connected after waiting " + timeout + "ms."
          + ", clusterName: " + _clusterName + ", zkAddress: " + getZkConnectionInfo());
      throw new HelixException(
          "HelixManager is not connected within retry timeout for cluster " + _clusterName);
    }
  }

  void addListener(Object listener, PropertyKey propertyKey, ChangeType changeType,
      EventType[] eventType) {
    checkConnected(_waitForConnectedTimeout);

    PropertyType type = propertyKey.getType();

    synchronized (this) {
      for (CallbackHandler handler : _handlers) {
        // compare property-key path and listener reference
        if (handler.getPath().equals(propertyKey.getPath())
            && handler.getListener().equals(listener)) {
          LOG.info("Listener: " + listener + " on path: " + propertyKey.getPath()
              + " already exists. skip add");

          return;
        }
      }

      CallbackHandler newHandler =
          new CallbackHandler(this, _zkclient, propertyKey, listener, eventType, changeType,
              _callbackMonitors.get(changeType));

      _handlers.add(newHandler);
      LOG.info("Added listener: " + listener + " for type: " + type + " to path: "
          + newHandler.getPath());
    }
  }

  @Override
  public void addIdealStateChangeListener(final IdealStateChangeListener listener) throws Exception {
    addListener(listener, new Builder(_clusterName).idealStates(), ChangeType.IDEAL_STATE,
        new EventType[] { EventType.NodeChildrenChanged });
  }

  @Deprecated
  @Override
  public void addIdealStateChangeListener(final org.apache.helix.IdealStateChangeListener listener)
      throws Exception {
    addListener(listener, new Builder(_clusterName).idealStates(), ChangeType.IDEAL_STATE,
        new EventType[] { EventType.NodeChildrenChanged });
  }

  @Override
  public void addLiveInstanceChangeListener(LiveInstanceChangeListener listener) throws Exception {
    addListener(listener, new Builder(_clusterName).liveInstances(), ChangeType.LIVE_INSTANCE,
        new EventType[] { EventType.NodeChildrenChanged });
  }

  @Deprecated
  @Override
  public void addLiveInstanceChangeListener(org.apache.helix.LiveInstanceChangeListener listener)
      throws Exception {
    addListener(listener, new Builder(_clusterName).liveInstances(), ChangeType.LIVE_INSTANCE,
        new EventType[] { EventType.NodeChildrenChanged });
  }

  @Override
  public void addConfigChangeListener(ConfigChangeListener listener) throws Exception {
    addListener(listener, new Builder(_clusterName).instanceConfigs(), ChangeType.INSTANCE_CONFIG,
        new EventType[] { EventType.NodeChildrenChanged
        });
  }

  @Override
  public void addInstanceConfigChangeListener(InstanceConfigChangeListener listener)
      throws Exception {
    addListener(listener, new Builder(_clusterName).instanceConfigs(), ChangeType.INSTANCE_CONFIG,
        new EventType[] { EventType.NodeChildrenChanged
        });
  }

  @Deprecated
  @Override
  public void addInstanceConfigChangeListener(org.apache.helix.InstanceConfigChangeListener listener)
      throws Exception {
    addListener(listener, new Builder(_clusterName).instanceConfigs(), ChangeType.INSTANCE_CONFIG,
        new EventType[] { EventType.NodeChildrenChanged
        });
  }

  @Override
  public void addResourceConfigChangeListener(ResourceConfigChangeListener listener) throws Exception{
    addListener(listener, new Builder(_clusterName).resourceConfigs(), ChangeType.RESOURCE_CONFIG,
        new EventType[] { EventType.NodeChildrenChanged
        });
  }

  @Override
  public void addCustomizedStateConfigChangeListener(
      CustomizedStateConfigChangeListener listener) throws Exception {
    addListener(listener, new Builder(_clusterName).customizedStateConfig(),
        ChangeType.CUSTOMIZED_STATE_CONFIG, new EventType[] {
            EventType.NodeDataChanged
        });
  }

  @Override
  public void addClusterfigChangeListener(ClusterConfigChangeListener listener) throws Exception{
    addListener(listener, new Builder(_clusterName).clusterConfig(), ChangeType.CLUSTER_CONFIG,
        new EventType[] { EventType.NodeDataChanged
        });
  }

  @Override
  public void addConfigChangeListener(ScopedConfigChangeListener listener, ConfigScopeProperty scope)
      throws Exception {
    Builder keyBuilder = new Builder(_clusterName);

    PropertyKey propertyKey = null;
    switch (scope) {
    case CLUSTER:
      propertyKey = keyBuilder.clusterConfigs();
      break;
    case PARTICIPANT:
      propertyKey = keyBuilder.instanceConfigs();
      break;
    case RESOURCE:
      propertyKey = keyBuilder.resourceConfigs();
      break;
    default:
      break;
    }

    if (propertyKey != null) {
      addListener(listener, propertyKey, ChangeType.CONFIG, new EventType[] {
        EventType.NodeChildrenChanged
      });
    } else {
      LOG.error("Can't add listener to config scope: " + scope);
    }
  }


  @Deprecated
  @Override
  public void addConfigChangeListener(org.apache.helix.ScopedConfigChangeListener listener, ConfigScopeProperty scope)
      throws Exception {
    addConfigChangeListener((ScopedConfigChangeListener) listener, scope);
  }

  // TODO: Decide if do we still need this since we are exposing
  // ClusterMessagingService
  @Override
  public void addMessageListener(MessageListener listener, String instanceName) {
    addListener(listener, new Builder(_clusterName).messages(instanceName), ChangeType.MESSAGE,
        new EventType[] { EventType.NodeChildrenChanged });
  }

  @Deprecated
  @Override
  public void addMessageListener(org.apache.helix.MessageListener listener, String instanceName) {
    addListener(listener, new Builder(_clusterName).messages(instanceName), ChangeType.MESSAGE,
        new EventType[] { EventType.NodeChildrenChanged });
  }

  @Override
  public void addControllerMessageListener(MessageListener listener) {
    addListener(listener, new Builder(_clusterName).controllerMessages(),
        ChangeType.MESSAGES_CONTROLLER, new EventType[] { EventType.NodeChildrenChanged });
  }

  @Deprecated
  @Override
  public void addControllerMessageListener(org.apache.helix.MessageListener listener) {
    addListener(listener, new Builder(_clusterName).controllerMessages(),
        ChangeType.MESSAGES_CONTROLLER, new EventType[] { EventType.NodeChildrenChanged });
  }

  @Override
  public void addCurrentStateChangeListener(CurrentStateChangeListener listener,
      String instanceName, String sessionId) throws Exception {
    addListener(listener, new Builder(_clusterName).currentStates(instanceName, sessionId),
        ChangeType.CURRENT_STATE, new EventType[] { EventType.NodeChildrenChanged
        });
  }

  @Deprecated
  @Override
  public void addCurrentStateChangeListener(org.apache.helix.CurrentStateChangeListener listener,
      String instanceName, String sessionId) throws Exception {
    addListener(listener, new Builder(_clusterName).currentStates(instanceName, sessionId),
        ChangeType.CURRENT_STATE, new EventType[] { EventType.NodeChildrenChanged
        });
  }

  @Override
  public void addTaskCurrentStateChangeListener(CurrentStateChangeListener listener,
      String instanceName, String sessionId) throws Exception {
    addListener(listener, new Builder(_clusterName).taskCurrentStates(instanceName, sessionId),
        ChangeType.TASK_CURRENT_STATE, new EventType[] { EventType.NodeChildrenChanged
        });
  }

  @Override
  public void addCustomizedStateRootChangeListener(CustomizedStateRootChangeListener listener,
      String instanceName) throws Exception {
    addListener(listener, new Builder(_clusterName).customizedStatesRoot(instanceName),
        ChangeType.CUSTOMIZED_STATE_ROOT, new EventType[]{EventType.NodeChildrenChanged});
  }

  @Override
  public void addCustomizedStateChangeListener(CustomizedStateChangeListener listener,
      String instanceName, String customizedStateType) throws Exception {
    addListener(listener,
        new Builder(_clusterName).customizedStates(instanceName, customizedStateType),
        ChangeType.CUSTOMIZED_STATE, new EventType[]{EventType.NodeChildrenChanged});
  }

  @Override
  public void addExternalViewChangeListener(ExternalViewChangeListener listener) throws Exception {
    addListener(listener, new Builder(_clusterName).externalViews(), ChangeType.EXTERNAL_VIEW,
        new EventType[] { EventType.NodeChildrenChanged });
  }

  @Override
  public void addCustomizedViewChangeListener(CustomizedViewChangeListener listener, String customizedStateType)
      throws Exception {
    addListener(listener, new Builder(_clusterName).customizedView(customizedStateType),
        ChangeType.CUSTOMIZED_VIEW, new EventType[] {
            EventType.NodeChildrenChanged
        });
  }

  @Override
  public void addCustomizedViewRootChangeListener(CustomizedViewRootChangeListener listener) throws Exception {
    addListener(listener, new Builder(_clusterName).customizedViews(),
        ChangeType.CUSTOMIZED_VIEW_ROOT, new EventType[] {
            EventType.NodeChildrenChanged
        });
  }

  @Override
  public void addTargetExternalViewChangeListener(ExternalViewChangeListener listener) throws Exception {
    addListener(listener, new Builder(_clusterName).externalViews(), ChangeType.TARGET_EXTERNAL_VIEW,
        new EventType[] { EventType.NodeChildrenChanged });
  }

  @Deprecated
  @Override
  public void addExternalViewChangeListener(org.apache.helix.ExternalViewChangeListener listener) throws Exception {
    addListener(listener, new Builder(_clusterName).externalViews(), ChangeType.EXTERNAL_VIEW,
        new EventType[] { EventType.NodeChildrenChanged });
  }

  @Override
  public void addControllerListener(ControllerChangeListener listener) {
    addListener(listener, new Builder(_clusterName).controller(), ChangeType.CONTROLLER,
        new EventType[] { EventType.NodeChildrenChanged });
  }

  @Deprecated
  @Override
  public void addControllerListener(org.apache.helix.ControllerChangeListener listener) {
    addListener(listener, new Builder(_clusterName).controller(), ChangeType.CONTROLLER,
        new EventType[] { EventType.NodeChildrenChanged });
  }

  @Override
  public HelixDataAccessor getHelixDataAccessor() {
    checkConnected(_waitForConnectedTimeout);
    return _dataAccessor;
  }

  @Override
  public ConfigAccessor getConfigAccessor() {
    checkConnected(_waitForConnectedTimeout);
    return _configAccessor;
  }

  @Override
  public String getClusterName() {
    return _clusterName;
  }

  /**
   * Deprecated becuase ZKHelixManager shouldn't expose ZkAddress in realm-aware mode.
   *
   * Returns a string that can be used to connect to metadata store for this HelixManager instance
   * i.e. for ZkHelixManager, this will have format "{zookeeper-address}:{port}"
   * @return a string used to connect to metadata store
   */
  @Deprecated
  @Override
  public String getMetadataStoreConnectionString() {
    return _zkAddress;
  }

  /**
   * @return the RealmAwareZkConnectionConfig used to create a realm aware ZkClient
   */
  public RealmAwareZkClient.RealmAwareZkConnectionConfig getRealmAwareZkConnectionConfig() {
    return _realmAwareZkConnectionConfig;
  }

  @Override
  public String getInstanceName() {
    return _instanceName;
  }

  BaseDataAccessor<ZNRecord> createBaseDataAccessor() {
    ZkBaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<>(_zkclient);
    return baseDataAccessor;
  }

  /**
   * Add Helix built-in state model definitions if not exist
   */
  private void addBuiltInStateModelDefinitions() {
    for (BuiltInStateModelDefinitions def : BuiltInStateModelDefinitions.values()) {
      // creation succeeds only if not exist
      _dataAccessor.createStateModelDef(def.getStateModelDefinition());
    }
  }

  private boolean isMonitorRootPathOnly() {
    switch (_monitorLevel) {
    case ALL:
      return false;
    case AGGREGATED_ONLY:
      return true;
    default:
      // Otherwise, apply the default policy. Emitting full metrics for controllers only.
      return !_instanceType.equals(InstanceType.CONTROLLER) && !_instanceType
          .equals(InstanceType.CONTROLLER_PARTICIPANT);
    }
  }

  void createClient() throws Exception {
    final RealmAwareZkClient newClient = createSingleRealmZkClient();

    synchronized (this) {
      if (_zkclient != null) {
        _zkclient.close();
      }
      _zkclient = newClient;

      _baseDataAccessor = createBaseDataAccessor();

      _dataAccessor = new ZKHelixDataAccessor(_clusterName, _baseDataAccessor);
      _configAccessor = new ConfigAccessor(_zkclient);

      if (_instanceType == InstanceType.CONTROLLER
          || _instanceType == InstanceType.CONTROLLER_PARTICIPANT) {
        addBuiltInStateModelDefinitions();
      }
    }

    // subscribe to state change
    _zkclient.subscribeStateChanges(this);
    int retryCount = 0;
    while (retryCount < 3) {
      try {
        final long sessionId =
            _zkclient.waitForEstablishedSession(_connectionInitTimeout, TimeUnit.MILLISECONDS);
        handleStateChanged(KeeperState.SyncConnected);
        /*
         * This listener is subscribed after SyncConnected and firing new session events,
         * which means this listener has not yet handled new session, so we have to handle new
         * session here just for this listener.
         */
        handleNewSession(ZKUtil.toHexSessionId(sessionId));
        break;
      } catch (HelixException e) {
        LOG.error("fail to createClient.", e);
        throw e;
      } catch (Exception e) {
        retryCount++;
        LOG.error("fail to createClient. retry " + retryCount, e);
        if (retryCount == 3) {
          throw e;
        }
      }
    }
  }

  @Override
  public void connect() throws Exception {
    LOG.info("ClusterManager.connect()");
    if (isConnected()) {
      LOG.warn("Cluster manager: " + _instanceName + " for cluster: " + _clusterName
          + " already connected. skip connect");
      return;
    }

    switch (_instanceType) {
    case CONTROLLER:
    case CONTROLLER_PARTICIPANT:
      if (_controller == null) {
        _controller = new GenericHelixController(_clusterName, _enabledPipelineTypes);
        _messagingService.getExecutor().setController(_controller);
      }
      break;
    default:
      break;
    }

    try {
      createClient();
      _messagingService.onConnected();
    } catch (Exception e) {
      LOG.error("fail to connect " + _instanceName, e);
      try {
        disconnect();
      } catch (Exception ex) {
        // if zk connection fails to be created, disconnect may throw exception about reporting disconnect to ZK.
      }
      throw e;
    }

    if (_helixManagerProperty != null) {
      HelixCloudProperty helixCloudProperty = _helixManagerProperty.getHelixCloudProperty();
      if (helixCloudProperty != null && helixCloudProperty.isCloudEventCallbackEnabled()) {
        _cloudEventListener =
            new HelixCloudEventListener(helixCloudProperty.getCloudEventCallbackProperty(), this);
        CloudEventHandlerFactory.getInstance(
            _helixManagerProperty.getHelixCloudProperty().getCloudEventHandlerClassName())
            .registerCloudEventListener(_cloudEventListener);
        LOG.info("Using handler: " + helixCloudProperty.getCloudEventHandlerClassName());
      }
    }
  }

  @Override
  public void disconnect() {
    if (_zkclient == null || _zkclient.isClosed()) {
      LOG.info("instanceName: " + _instanceName + " already disconnected");
      return;
    }

    LOG.info("disconnect " + _instanceName + "(" + _instanceType + ") from " + _clusterName);

    try {
      /**
       * stop all timer tasks
       */
      stopTimerTasks();

      /**
       * shutdown thread pool first to avoid reset() being invoked in the middle of state
       * transition
       */
      _messagingService.getExecutor().shutdown();

      if (!cleanupCallbackHandlers()) {
        LOG.warn(
            "The callback handler cleanup has been cleanly done. "
                + "Some callback handlers might not be reset properly. "
                + "Continue to finish the other Helix Mananger disconnect tasks.");
      }
    } finally {
      GenericHelixController controller = _controller;
      if (controller != null) {
        try {
          controller.shutdown();
        } catch (InterruptedException e) {
          LOG.info("Interrupted shutting down GenericHelixController", e);
        }
      }

      for (HelixCallbackMonitor callbackMonitor : _callbackMonitors.values()) {
        callbackMonitor.unregister();
      }

      _helixPropertyStore = null;

      if (_cloudEventListener != null) {
        try {
          CloudEventHandlerFactory.getInstance(
              _helixManagerProperty.getHelixCloudProperty().getCloudEventHandlerClassName())
              .unregisterCloudEventListener(_cloudEventListener);
        } catch (Exception e) {
          LOG.error("Failed to unregister cloudEventListener.", e);
        }
        _cloudEventListener = null;
      }

      synchronized (this) {
        if (_controller != null) {
          _controller = null;
          _leaderElectionHandler = null;
        }

        if (_participantManager != null) {
          _participantManager = null;
        }

        if (_zkclient != null) {
          _zkclient.close();
        }
      }
      _sessionStartTime = null;
      LOG.info("Cluster manager: " + _instanceName + " disconnected");
    }
  }

  /**
   * The callback handler cleanup operations that require an active ZkClient connection.
   * If ZkClient is not connected, Helix Manager shall skip the cleanup.
   *
   * @return true if the cleanup has been done successfully.
   */
  private boolean cleanupCallbackHandlers() {
    AtomicBoolean cleanupDone = new AtomicBoolean(false);

    if (_zkclient.waitUntilConnected(_waitForConnectedTimeout, TimeUnit.MILLISECONDS)) {
      // Create a separate thread for executing cleanup task to avoid forever retry.
      Thread cleanupThread = new Thread(String
          .format("Cleanup thread for %s-%s-%s", _clusterName, _instanceName, _instanceType)) {
        @Override
        public void run() {
          // TODO reset user defined handlers only
          resetHandlers(true);

          if (_leaderElectionHandler != null) {
            _leaderElectionHandler.reset(true);
          }

          ParticipantManager participantManager = _participantManager;
          if (participantManager != null) {
            participantManager.disconnect();
          }

          cleanupDone.set(true);
        }
      };

      // Define the state listener to terminate the cleanup thread when the ZkConnection breaks.
      IZkStateListener stateListener = new IZkStateListener() {
        @Override
        public void handleStateChanged(KeeperState state) {
          // If the connection breaks during the cleanup , then stop the cleanup thread.
          if (state != KeeperState.SyncConnected) {
            cleanupThread.interrupt();
          }
        }

        @Override
        public void handleNewSession(String sessionId) {
          // nothing
        }

        @Override
        public void handleSessionEstablishmentError(Throwable error) {
          // nothing
        }
      };

      cleanupThread.start();
      try {
        // Subscribe and check the connection status one more time to ensure the thread is running
        // with an active ZkConnection.
        _zkclient.subscribeStateChanges(stateListener);
        if (!_zkclient.waitUntilConnected(0, TimeUnit.MILLISECONDS)) {
          cleanupThread.interrupt();
        }

        try {
          cleanupThread.join();
        } catch (InterruptedException ex) {
          cleanupThread.interrupt();
        }
      } finally {
        _zkclient.unsubscribeStateChanges(stateListener);
      }
    } else {
      LOG.warn(
          "ZkClient is not connected to the Zookeeper. Skip the cleanup work that requires accessing Zookeeper.");
    }

    return cleanupDone.get();
  }

  @Override
  public String getSessionId() {
    checkConnected(_waitForConnectedTimeout);
    // TODO: session id should be updated after zk client is connected.
    // Otherwise, this session id might be an expired one.
    return _sessionId;
  }

  @Override
  public boolean isConnected() {
    if (_zkclient == null || _zkclient.isClosed()) {
      return false;
    }
    // Don't check ZkConnection state, which is different from ZkClient's watcher state.
    // ZkConnection state is the internal state of the connection, which can be different from the
    // ZkClient state due to internal thread/retry logic.
    try {
      return _zkclient.waitUntilConnected(0, TimeUnit.MILLISECONDS);
    } catch (ZkInterruptedException ex) {
      return false;
    }
  }

  @Override
  public long getLastNotificationTime() {
    return 0;
  }

  @Override
  public void addPreConnectCallback(PreConnectCallback callback) {
    LOG.info("Adding preconnect callback: " + callback);
    _preConnectCallbacks.add(callback);
  }

  @Override
  public boolean isLeader() {
    return getSessionIdIfLead().isPresent();
  }

  @Override
  public Optional<String> getSessionIdIfLead() {
    String warnLogPrefix = String
        .format("Instance %s is not leader of cluster %s due to", _instanceName, _clusterName);
    if (_instanceType != InstanceType.CONTROLLER
        && _instanceType != InstanceType.CONTROLLER_PARTICIPANT) {
      LOG.warn(String
          .format("%s instance type %s does not match to CONTROLLER/CONTROLLER_PARTICIPANT",
              warnLogPrefix, _instanceType.name()));
      return Optional.empty();
    }

    if (!isConnected()) {
      LOG.warn(String.format("%s HelixManager is not connected", warnLogPrefix));
      return Optional.empty();
    }

    try {
      LiveInstance leader = _dataAccessor.getProperty(_keyBuilder.controllerLeader());
      if (leader != null) {
        String leaderName = leader.getInstanceName();
        String sessionId = leader.getEphemeralOwner();
        if (leaderName != null && leaderName.equals(_instanceName) && sessionId
            .equals(_sessionId)) {
          // Ensure the same leader session is set and returned. If we get _session from helix
          // manager, _session might change after the check. This guarantees the session is
          // leader's session we checked.
          return Optional.of(sessionId);
        }
        LOG.warn(String
            .format("%s current session %s does not match leader session %s", warnLogPrefix,
                _sessionId, sessionId));
      } else {
        LOG.warn(String.format("%s leader ZNode is null", warnLogPrefix));
      }
    } catch (Exception e) {
      LOG.warn(String.format("%s exception happen when session check", warnLogPrefix), e);
    }
    return Optional.empty();
  }

  @Override
  public synchronized ZkHelixPropertyStore<ZNRecord> getHelixPropertyStore() {
    checkConnected(_waitForConnectedTimeout);

    if (_helixPropertyStore == null) {
      String path = PropertyPathBuilder.propertyStore(_clusterName);
      String fallbackPath = String.format("/%s/%s", _clusterName, "HELIX_PROPERTYSTORE");
      _helixPropertyStore =
          new AutoFallbackPropertyStore<>(new ZkBaseDataAccessor<>(_zkclient),
              path, fallbackPath);
    }

    return _helixPropertyStore;
  }

  @Override
  public synchronized HelixAdmin getClusterManagmentTool() {
    checkConnected(_waitForConnectedTimeout);
    if (_zkclient != null && !_zkclient.isClosed()) {
      return new ZKHelixAdmin(_zkclient);
    }

    LOG.error("Couldn't get ZKClusterManagementTool because zkclient is null");
    return null;
  }

  @Override
  public ClusterMessagingService getMessagingService() {
    // The caller can register message handler factories on messaging service before the
    // helix manager is connected. Thus we do not do connected check here.
    return _messagingService;
  }

  @Override
  public InstanceType getInstanceType() {
    return _instanceType;
  }

  @Override
  public String getVersion() {
    return _version;
  }

  @Override
  public HelixManagerProperties getProperties() {
    return _properties;
  }

  @Override
  public StateMachineEngine getStateMachineEngine() {
    return _stateMachineEngine;
  }

  // TODO: rename this and not expose this function as part of interface
  @Override
  public void startTimerTasks() {
    for (HelixTimerTask task : _timerTasks) {
      task.start();
    }
  }

  @Override
  public void stopTimerTasks() {
    for (HelixTimerTask task : _timerTasks) {
      task.stop();
    }
  }

  @Override
  public void setLiveInstanceInfoProvider(LiveInstanceInfoProvider liveInstanceInfoProvider) {
    _liveInstanceInfoProvider = liveInstanceInfoProvider;
  }

  /**
   * wait until we get a non-zero session-id. note that we might lose zkconnection
   * right after we read session-id. but it's ok to get stale session-id and we will have
   * another handle-new-session callback to correct this.
   */
  void waitUntilConnected() {
    boolean isConnected;
    do {
      isConnected =
          _zkclient.waitUntilConnected(HelixZkClient.DEFAULT_CONNECTION_TIMEOUT, TimeUnit.MILLISECONDS);
      if (!isConnected) {
        LOG.error("fail to connect zkserver: " + getZkConnectionInfo() + " in "
            + HelixZkClient.DEFAULT_CONNECTION_TIMEOUT + "ms. expiredSessionId: " + _sessionId
            + ", clusterName: " + _clusterName);
        continue;
      }

      _sessionId = ZKUtil.toHexSessionId(_zkclient.getSessionId());

      /**
       * at the time we read session-id, zkconnection might be lost again
       * wait until we get a non-zero session-id
       */
    } while (!isConnected || "0".equals(_sessionId));

    LOG.info("Handling new session, session id: " + _sessionId + ", instance: " + _instanceName
        + ", instanceTye: " + _instanceType + ", cluster: " + _clusterName);
  }

  void initHandlers(List<CallbackHandler> handlers) {
    synchronized (this) {
      if (handlers != null) {
        // get a copy of the list and iterate over the copy list
        // in case handler.init() modifies the original handler list
        List<CallbackHandler> tmpHandlers = new ArrayList<>(handlers);
        for (CallbackHandler handler : tmpHandlers) {
          handler.init();
          LOG.info("init handler: " + handler.getPath() + ", " + handler.getListener());
        }
      }
    }
  }

  void resetHandlers(boolean isShutdown) {
    synchronized (this) {
      if (_handlers != null) {
        // get a copy of the list and iterate over the copy list
        // in case handler.reset() modifies the original handler list
        List<CallbackHandler> tmpHandlers = new ArrayList<>(_handlers);
        for (CallbackHandler handler : tmpHandlers) {
          handler.reset(isShutdown);
          LOG.info("reset handler: " + handler.getPath() + ", " + handler.getListener());
        }
      }
    }
  }

  /**
   * If zk state has changed into Disconnected for maxDisconnectThreshold times during previous
   * timeWindowLengthMs Ms
   * time window, we think that there are something wrong going on and disconnect the zkHelixManager
   * from zk.
   */
  boolean isFlapping() {
    if (_disconnectTimeHistory.size() == 0) {
      return false;
    }
    long mostRecentTimestamp = _disconnectTimeHistory.get(_disconnectTimeHistory.size() - 1);

    // Remove disconnect history timestamp that are older than flappingTimeWindowMs ago
    while ((_disconnectTimeHistory.get(0) + _flappingTimeWindowMs) < mostRecentTimestamp) {
      _disconnectTimeHistory.remove(0);
    }
    return _disconnectTimeHistory.size() > _maxDisconnectThreshold;
  }

  @Override
  public void handleStateChanged(KeeperState state) {
    switch (state) {
    case SyncConnected:
      LOG.info("KeeperState: " + state + ", instance: " + _instanceName + ", type: " + _instanceType);
      break;
    case Disconnected:
      /**
       * Track the time stamp that the disconnected happens, then check history and see if
       * we should disconnect the helix-manager
       */
      _disconnectTimeHistory.add(System.currentTimeMillis());
      if (isFlapping()) {
        String errorMsg = "instanceName: " + _instanceName + " is flapping. disconnect it. "
            + " maxDisconnectThreshold: " + _maxDisconnectThreshold + " disconnects in "
            + _flappingTimeWindowMs + "ms.";
        LOG.error(errorMsg);

        // Only disable the instance when it's instance type is PARTICIPANT
        if (_instanceType.equals(InstanceType.PARTICIPANT)) {
          LOG.warn("instanceName: " + _instanceName
              + " is flapping. Since it is a participant, disable it.");
          try {
            getClusterManagmentTool().enableInstance(_clusterName, _instanceName, false);
          } catch (Exception e) {
            LOG.error("Failed to disable participant before disconnecting participant.", e);
          }
        }

        try {
          // TODO Call disconnect in another thread.
          // handleStateChanged is triggered in ZkClient eventThread. The disconnect logic will
          // interrupt this thread. This issue prevents the ZkClient.close() from complete. So the
          // client is left in a strange state.
          disconnect();
        } catch (Exception ex) {
          LOG.error("Disconnect HelixManager is not completely done.", ex);
        }

        if (_stateListener != null) {
          try {
            _stateListener.onDisconnected(this, new HelixException(errorMsg));
          } catch (Exception e) {
            LOG.warn("stateListener.onDisconnected callback fails", e);
          }
        }
        break;
      }
    // if not flapping, share the continuous logic with Expired case
    case Expired:
      LOG.warn("KeeperState:" + state + ", SessionId: " + _sessionId + ", instance: "
          + _instanceName + ", type: " + _instanceType);
      break;
    default:
      LOG.info("KeeperState:" + state + ", currentSessionId: " + _sessionId + ", instance: "
          + _instanceName + ", type: " + _instanceType);
      break;
    }
  }

  /**
   * Called after zookeeper session has expired and a new session has been established. This method
   * may cause session race condition when creating ephemeral nodes. Internally, this method calls
   * {@link #handleNewSession(String)} with a null value as the sessionId parameter, which results
   * in later creating the ephemeral node in the session of the latest zk connection.
   * But please note that the session of the latest zk connection might not be the expected session.
   * This is the session race condition issue.
   *
   * To avoid the race condition issue, please use {@link #handleNewSession(String)}.
   *
   * @deprecated
   * This method is deprecated, because it may cause session race condition when creating ephemeral
   * nodes. It is kept for backward compatibility in case a user class extends this class.
   *
   * Please use {@link #handleNewSession(String)} instead, which takes care of race condition.
   *
   * @throws Exception If any error occurs.
   */
  @Deprecated
  public void handleNewSession() throws Exception {
    handleNewSession(null);
  }

  /**
   * Called after the zookeeper session has expired and a new session has been established. This
   * methods handles a new session with its session id passed in. Before handling, this method
   * waits until zk client is connected to zk service and gets a non-zero session id(current actual
   * session id). If the passed-in(expected) session id does not match current actual session id,
   * the expected session id is expired and will NOT be handled.
   *
   * @param sessionId the new session's id. The ephemeral nodes are expected to be created in this
   *                  session. If this session id is expired, ephemeral nodes should not be created.
   * @throws Exception if any error occurs during handling new session
   */
  @Override
  public void handleNewSession(String sessionId) throws Exception {
    /*
     * TODO: after removing I0ItecIZkStateListenerImpl, null session should be checked and
     *  discarded.
     * Null session is still a special case here, which is treated as non-session aware operation.
     * This special case could still potentially cause race condition, so null session should NOT
     * be acceptable, once I0ItecIZkStateListenerImpl is removed. Currently this special case
     * is kept for backward compatibility.
     */

    // Wait until we get a non-zero session id. Otherwise, getSessionId() might be null.
    waitUntilConnected();

    /*
     * Filter out stale sessions. If a session id is not null and not the same as current session
     * id, this session is expired. With this filtering, expired sessions are NOT handled,
     * so performance is expected to improve.
     */
    if (sessionId != null && !getSessionId().equals(sessionId)) {
      LOG.warn("Session is expired and not handled. Expected: {}. Actual: {}.", sessionId,
          getSessionId());
      return;
    }

    /*
     * When a null session id is passed in, we will take current session's id for following
     * operations. Please note that current session might not be the one we expect to handle,
     * because the one we expect might be already expired when the zk event is waiting in the
     * event queue. Why we use current session here is for backward compatibility with the old
     * method handleNewSession().
     */
    if (sessionId == null) {
      sessionId = getSessionId();
      LOG.debug("Session id: <null> is passed in. Current session id: {} will be used.", sessionId);
    }

    LOG.info("Handle new session, instance: {}, type: {}, session id: {}.", _instanceName,
        _instanceType,  sessionId);

    // Will only create live instance
    LiveInstance.LiveInstanceStatus liveInstanceStatus =
        _messagingService.getExecutor().getLiveInstanceStatus();
    if (LiveInstance.LiveInstanceStatus.FROZEN
        .equals(liveInstanceStatus)) {
      handleNewSessionInManagementMode(sessionId, liveInstanceStatus);
      return;
    }

    /**
     * stop all timer tasks, reset all handlers, make sure cleanup completed for previous session
     * disconnect if fail to cleanup
     */
    stopTimerTasks();
    if (_leaderElectionHandler != null) {
      _leaderElectionHandler.reset(false);
    }
    resetHandlers(false);

    /**
     * clean up write-through cache
     */
    _baseDataAccessor.reset();

    /**
     * from here on, we are dealing with new session
     */
    if (!ZKUtil.isClusterSetup(_clusterName, _zkclient)) {
      throw new HelixException("Cluster structure is not set up for cluster: " + _clusterName);
    }

    _sessionStartTime = System.currentTimeMillis();

    switch (_instanceType) {
    case PARTICIPANT:
      handleNewSessionAsParticipant(sessionId, _liveInstanceInfoProvider);
      break;
    case CONTROLLER:
      handleNewSessionAsController();
      break;
    case CONTROLLER_PARTICIPANT:
      handleNewSessionAsParticipant(sessionId, _liveInstanceInfoProvider);
      handleNewSessionAsController();
      break;
    case ADMINISTRATOR:
    case SPECTATOR:
    default:
      break;
    }

    startTimerTasks();

    /**
     * init handlers
     * ok to init message handler and data-accessor twice
     * the second init will be skipped (see CallbackHandler)
     */
    initHandlers(_handlers);

    if (_stateListener != null) {
      try {
        _stateListener.onConnected(this);
      } catch (Exception e) {
        LOG.warn("stateListener.onConnected callback fails", e);
      }
    }
  }

  private void handleNewSessionInManagementMode(String sessionId,
      LiveInstance.LiveInstanceStatus liveInstanceStatus) throws Exception {
    LOG.info("Skip reset because instance is in {} status", LiveInstance.LiveInstanceStatus.FROZEN);
    if (!InstanceType.PARTICIPANT.equals(_instanceType)
        && !InstanceType.CONTROLLER_PARTICIPANT.equals(_instanceType)) {
      return;
    }
    // Add STATUS to info provider so the new live instance will have STATUS field
    handleNewSessionAsParticipant(sessionId,
        new LiveInstanceStatusInfoProvider(liveInstanceStatus));
  }

  void handleNewSessionAsParticipant(final String sessionId, LiveInstanceInfoProvider provider)
      throws Exception {
    if (_participantManager != null) {
      _participantManager.reset();
    }
    _participantManager =
        new ParticipantManager(this, _zkclient, _sessionTimeout, provider,
            _preConnectCallbacks, sessionId, _helixManagerProperty);
    _participantManager.handleNewSession();
  }

  // TODO: pass in session id and make this method session aware to avoid potential session race
  //  condition.
  void handleNewSessionAsController() {
    if (_leaderElectionHandler != null) {
      _leaderElectionHandler.init();
    } else {
      _leaderElectionHandler =
          new CallbackHandler(this, _zkclient, _keyBuilder.controller(),
              new DistributedLeaderElection(this, _controller, _controllerTimerTasks),
              new EventType[] {
                  EventType.NodeChildrenChanged, EventType.NodeDeleted, EventType.NodeCreated
              }, ChangeType.CONTROLLER, _callbackMonitors.get(ChangeType.CONTROLLER));
    }
  }

  @Override
  public ParticipantHealthReportCollector getHealthReportCollector() {
    return _participantHealthInfoCollector;
  }

  @Override
  public void handleSessionEstablishmentError(Throwable error) throws Exception {
    LOG.warn("Handling Session Establishment Error. Disconnect Helix Manager.", error);
    disconnect();

    if (_stateListener != null) {
      _stateListener.onDisconnected(this, error);
    }
  }

  @Override
  public Long getSessionStartTime() {
    return _sessionStartTime;
  }

  private CloudEventCallbackProperty getCloudEventListenerCallbackProperty() {
    HelixCloudProperty cloudProperty = _helixManagerProperty.getHelixCloudProperty();
    if (cloudProperty == null || !cloudProperty.isCloudEventCallbackEnabled()) {
      return null;
    }
    return cloudProperty.getCloudEventCallbackProperty();
  }

  /*
   * Prepares connection config and client config based on the internal parameters given to
   * HelixManager in order to create a ZkClient instance to use. Note that a shared ZkClient
   * instance will be created if connecting as an ADMINISTRATOR to minimize the cost of creating
   * ZkConnections.
   */
  private RealmAwareZkClient createSingleRealmZkClient() {
    final String shardingKey = HelixUtil.clusterNameToShardingKey(_clusterName);
    PathBasedZkSerializer zkSerializer =
        ChainedPathZkSerializer.builder(new ZNRecordSerializer()).build();

    // If the user supplied RealmAwareZkConnectionConfig, then use it. Only create the connection
    // config if nothing is given
    if (_realmAwareZkConnectionConfig == null) {
      // If no connection config is given, use the single realm mode with the cluster name as the
      // key
      _realmAwareZkConnectionConfig = new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder()
          .setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
          .setZkRealmShardingKey(shardingKey).setSessionTimeout(_sessionTimeout).build();
    }

    RealmAwareZkClient.RealmAwareZkClientConfig clientConfig =
        new RealmAwareZkClient.RealmAwareZkClientConfig();

    clientConfig.setZkSerializer(zkSerializer)
        .setConnectInitTimeout(_connectionInitTimeout)
        .setMonitorType(_instanceType.name())
        .setMonitorKey(_clusterName)
        .setMonitorInstanceName(_instanceName)
        .setMonitorRootPathOnly(isMonitorRootPathOnly());

    if (_instanceType == InstanceType.ADMINISTRATOR) {
      return resolveZkClient(SharedZkClientFactory.getInstance(), _realmAwareZkConnectionConfig,
          clientConfig);
    }

    return resolveZkClient(DedicatedZkClientFactory.getInstance(), _realmAwareZkConnectionConfig,
        clientConfig);
  }

  /*
   * Resolves what type of ZkClient this HelixManager should use based on whether MULTI_ZK_ENABLED
   * System config is set or not. Two types of ZkClients are available:
   * 1) If MULTI_ZK_ENABLED is set to true or zkAddress is null, we create a dedicated
   * RealmAwareZkClient that provides full ZkClient functionalities and connects to the correct ZK
   * by querying MetadataStoreDirectoryService.
   * 2) Otherwise, we create a dedicated HelixZkClient which plainly connects to
   * the ZK address given.
   */
  private RealmAwareZkClient resolveZkClient(HelixZkClientFactory zkClientFactory,
      RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig,
      RealmAwareZkClient.RealmAwareZkClientConfig clientConfig) {
    if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED) || _zkAddress == null) {
      try {
        // Create realm-aware ZkClient.
        return zkClientFactory.buildZkClient(connectionConfig, clientConfig);
      } catch (IllegalArgumentException | IOException | InvalidRoutingDataException e) {
        throw new HelixException("Not able to connect on realm-aware mode for sharding key: "
            + connectionConfig.getZkRealmShardingKey(), e);
      }
    }

    // If multi-zk mode is not enabled, create HelixZkClient with the provided zk address.
    HelixZkClient.ZkClientConfig helixZkClientConfig = clientConfig.createHelixZkClientConfig();
    HelixZkClient.ZkConnectionConfig helixZkConnectionConfig =
        new HelixZkClient.ZkConnectionConfig(_zkAddress)
            .setSessionTimeout(connectionConfig.getSessionTimeout());

    return zkClientFactory.buildZkClient(helixZkConnectionConfig, helixZkClientConfig);
  }

  /**
   * Check that not both zkAddress and ZkConnectionConfig are set.
   * If zkAddress is not given and ZkConnectionConfig is given, check that ZkConnectionConfig has
   * a ZK path sharding key set because HelixManager must work on single-realm mode.
   * @param zkAddress
   * @param helixManagerProperty
   */
  private void validateZkConnectionSettings(String zkAddress,
      HelixManagerProperty helixManagerProperty) {
    if (helixManagerProperty != null && helixManagerProperty.getZkConnectionConfig() != null) {
      if (zkAddress != null) {
        throw new HelixException(
            "ZKHelixManager: cannot have both ZkAddress and ZkConnectionConfig set!");
      }
      RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig =
          helixManagerProperty.getZkConnectionConfig();
      if (connectionConfig.getZkRealmShardingKey() == null || connectionConfig
          .getZkRealmShardingKey().isEmpty()) {
        throw new HelixException(
            "ZKHelixManager::ZK path sharding key must be set for ZKHelixManager! ZKHelixManager "
                + "is only available on single-realm mode.");
      }
      _realmAwareZkConnectionConfig = connectionConfig;
    }
  }

  /**
   * Resolve ZK connection info for logging purposes.
   * @return
   */
  private String getZkConnectionInfo() {
    String zkConnectionInfo;
    if (_zkAddress == null) {
      if (_helixManagerProperty != null && _helixManagerProperty.getZkConnectionConfig() != null) {
        zkConnectionInfo = _helixManagerProperty.getZkConnectionConfig().toString();
      } else {
        zkConnectionInfo = "None";
      }
    } else {
      zkConnectionInfo = _zkAddress;
    }
    return zkConnectionInfo;
  }

  /*
   * Provides live instance status as additional live instance info in the info provider.
   */
  private static class LiveInstanceStatusInfoProvider implements LiveInstanceInfoProvider {
    private final ZNRecord _record;

    public LiveInstanceStatusInfoProvider(LiveInstance.LiveInstanceStatus status) {
      _record = new ZNRecord("STATUS_PROVIDER");
      _record.setEnumField(LiveInstance.LiveInstanceProperty.STATUS.name(), status);
    }

    @Override
    public ZNRecord getAdditionalLiveInstanceInfo() {
      return _record;
    }
  }
}
