package org.apache.helix.spectator;

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyType;
import org.apache.helix.api.listeners.ConfigChangeListener;
import org.apache.helix.api.listeners.CurrentStateChangeListener;
import org.apache.helix.api.listeners.ExternalViewChangeListener;
import org.apache.helix.api.listeners.InstanceConfigChangeListener;
import org.apache.helix.api.listeners.LiveInstanceChangeListener;
import org.apache.helix.api.listeners.PreFetch;
import org.apache.helix.api.listeners.RoutingTableChangeListener;
import org.apache.helix.common.ClusterEventProcessor;
import org.apache.helix.common.caches.CurrentStateSnapshot;
import org.apache.helix.controller.stages.AttributeName;
import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.controller.stages.ClusterEventType;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.monitoring.mbeans.RoutingTableProviderMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.management.JMException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

public class RoutingTableProvider
    implements ExternalViewChangeListener, InstanceConfigChangeListener, ConfigChangeListener,
               LiveInstanceChangeListener, CurrentStateChangeListener {
  private static final Logger logger = LoggerFactory.getLogger(RoutingTableProvider.class);
  private static final long DEFAULT_PERIODIC_REFRESH_INTERVAL = 300000L; // 5 minutes
  private final AtomicReference<RoutingTable> _routingTableRef;
  private final HelixManager _helixManager;
  private final RouterUpdater _routerUpdater;
  private final PropertyType _sourceDataType;
  private final Map<RoutingTableChangeListener, ListenerContext> _routingTableChangeListenerMap;
  private final RoutingTableProviderMonitor _monitor;

  // For periodic refresh
  private long _lastRefreshTimestamp;
  private boolean _isPeriodicRefreshEnabled = true; // Default is enabled
  private long _periodRefreshInterval;
  private ScheduledThreadPoolExecutor _periodicRefreshExecutor;
  // For computing intensive reporting logic
  private ExecutorService _reportExecutor;
  private Future _reportingTask = null;


  public RoutingTableProvider() {
    this(null);
  }

  public RoutingTableProvider(HelixManager helixManager) throws HelixException {
    this(helixManager, PropertyType.EXTERNALVIEW, true, DEFAULT_PERIODIC_REFRESH_INTERVAL);
  }

  public RoutingTableProvider(HelixManager helixManager, PropertyType sourceDataType)
      throws HelixException {
    this(helixManager, sourceDataType, true, DEFAULT_PERIODIC_REFRESH_INTERVAL);
  }

  /**
   * Initialize an instance of RoutingTableProvider
   *
   * @param helixManager
   * @param sourceDataType
   * @param isPeriodicRefreshEnabled true if periodic refresh is enabled, false otherwise
   * @param periodRefreshInterval only effective if isPeriodRefreshEnabled is true
   * @throws HelixException
   */
  public RoutingTableProvider(HelixManager helixManager, PropertyType sourceDataType,
      boolean isPeriodicRefreshEnabled, long periodRefreshInterval) throws HelixException {
    _routingTableRef = new AtomicReference<>(new RoutingTable());
    _helixManager = helixManager;
    _sourceDataType = sourceDataType;
    _routingTableChangeListenerMap = new ConcurrentHashMap<>();
    String clusterName = _helixManager != null ? _helixManager.getClusterName() : null;

    _monitor = new RoutingTableProviderMonitor(_sourceDataType, clusterName);
    try {
      _monitor.register();
    } catch (JMException e) {
      logger.error("Failed to register RoutingTableProvider monitor MBean.", e);
    }
    _reportExecutor = Executors.newSingleThreadExecutor();

    _routerUpdater = new RouterUpdater(clusterName, _sourceDataType);
    _routerUpdater.start();

    if (_helixManager != null) {
      switch (_sourceDataType) {
        case EXTERNALVIEW:
          try {
            _helixManager.addExternalViewChangeListener(this);
          } catch (Exception e) {
            shutdown();
            logger.error("Failed to attach ExternalView Listener to HelixManager!");
            throw new HelixException("Failed to attach ExternalView Listener to HelixManager!", e);
          }
          break;

        case TARGETEXTERNALVIEW:
          // Check whether target external has been enabled or not
          if (!_helixManager.getHelixDataAccessor().getBaseDataAccessor().exists(
              _helixManager.getHelixDataAccessor().keyBuilder().targetExternalViews().getPath(), 0)) {
            shutdown();
            throw new HelixException("Target External View is not enabled!");
          }

          try {
            _helixManager.addTargetExternalViewChangeListener(this);
          } catch (Exception e) {
            shutdown();
            logger.error("Failed to attach TargetExternalView Listener to HelixManager!");
            throw new HelixException("Failed to attach TargetExternalView Listener to HelixManager!",
                e);
          }
          break;

        case CURRENTSTATES:
          // CurrentState change listeners will be added later in LiveInstanceChange call.
          break;

        default:
          throw new HelixException(String.format("Unsupported source data type: %s", sourceDataType));
      }

      try {
        _helixManager.addInstanceConfigChangeListener(this);
        _helixManager.addLiveInstanceChangeListener(this);
      } catch (Exception e) {
        shutdown();
        logger.error(
            "Failed to attach InstanceConfig and LiveInstance Change listeners to HelixManager!");
        throw new HelixException(
            "Failed to attach InstanceConfig and LiveInstance Change listeners to HelixManager!",
            e);
      }
    }

    // For periodic refresh
    if (isPeriodicRefreshEnabled && _helixManager != null) {
      _lastRefreshTimestamp = System.currentTimeMillis(); // Initialize timestamp with current time
      _periodRefreshInterval = periodRefreshInterval;
      // Construct a periodic refresh context
      final NotificationContext periodicRefreshContext = new NotificationContext(_helixManager);
      periodicRefreshContext.setType(NotificationContext.Type.PERIODIC_REFRESH);
      // Create a thread that runs at specified interval
      _periodicRefreshExecutor = new ScheduledThreadPoolExecutor(1);
      _periodicRefreshExecutor.scheduleAtFixedRate(new Runnable() {
        @Override
        public void run() {
          // If enough time has elapsed since last refresh, queue a refresh event
          if (_lastRefreshTimestamp + _periodRefreshInterval < System.currentTimeMillis()) {
            // changeType is irrelevant for NotificationContext.Type.PERIODIC_REFRESH
            _routerUpdater.queueEvent(periodicRefreshContext, ClusterEventType.PeriodicalRebalance,
                null);
          }
        }
      }, _periodRefreshInterval, _periodRefreshInterval, TimeUnit.MILLISECONDS);
    } else {
      _isPeriodicRefreshEnabled = false;
    }
  }

  /**
   * Shutdown current RoutingTableProvider. Once it is shutdown, it should never be reused.
   */
  public void shutdown() {
    if (_periodicRefreshExecutor != null) {
      _periodicRefreshExecutor.purge();
      _periodicRefreshExecutor.shutdown();
    }
    _routerUpdater.shutdown();

    _monitor.unregister();

    if (_helixManager != null) {
      PropertyKey.Builder keyBuilder = _helixManager.getHelixDataAccessor().keyBuilder();
      switch (_sourceDataType) {
        case EXTERNALVIEW:
          _helixManager.removeListener(keyBuilder.externalViews(), this);
          break;
        case TARGETEXTERNALVIEW:
          _helixManager.removeListener(keyBuilder.targetExternalViews(), this);
          break;
        case CURRENTSTATES:
          NotificationContext context = new NotificationContext(_helixManager);
          context.setType(NotificationContext.Type.FINALIZE);
          updateCurrentStatesListeners(Collections.<LiveInstance> emptyList(), context);
          break;
        default:
          break;
      }
    }
  }

  /**
   * Get an snapshot of current RoutingTable information. The snapshot is immutable, it reflects the
   * routing table information at the time this method is called.
   * @return snapshot of current routing table.
   */
  public RoutingTableSnapshot getRoutingTableSnapshot() {
    return new RoutingTableSnapshot(_routingTableRef.get());
  }

  /**
   * Add RoutingTableChangeListener with user defined context
   * @param routingTableChangeListener
   * @param context user defined context
   */
  public void addRoutingTableChangeListener(
      final RoutingTableChangeListener routingTableChangeListener, Object context) {
    _routingTableChangeListenerMap.put(routingTableChangeListener, new ListenerContext(context));
    logger.info("Attach RoutingTableProviderChangeListener {}",
        routingTableChangeListener.getClass().getName());
  }

  /**
   * Remove RoutingTableChangeListener
   * @param routingTableChangeListener
   */
  public Object removeRoutingTableChangeListener(
      final RoutingTableChangeListener routingTableChangeListener) {
    logger.info("Detach RoutingTableProviderChangeListener {}",
        routingTableChangeListener.getClass().getName());
    return _routingTableChangeListenerMap.remove(routingTableChangeListener);
  }

  /**
   * returns the instances for {resource,partition} pair that are in a specific
   * {state}
   * This method will be deprecated, please use the
   * {@link #getInstancesForResource(String, String, String)} getInstancesForResource} method.
   * @param resourceName
   *          -
   * @param partitionName
   * @param state
   * @return empty list if there is no instance in a given state
   */
  public List<InstanceConfig> getInstances(String resourceName, String partitionName,
      String state) {
    return getInstancesForResource(resourceName, partitionName, state);
  }

  /**
   * returns the instances for {resource,partition} pair that are in a specific
   * {state}
   * @param resourceName
   *          -
   * @param partitionName
   * @param state
   * @return empty list if there is no instance in a given state
   */
  public List<InstanceConfig> getInstancesForResource(String resourceName, String partitionName,
      String state) {
    return _routingTableRef.get().getInstancesForResource(resourceName, partitionName, state);
  }

  /**
   * returns the instances for {resource group,partition} pair in all resources belongs to the given
   * resource group that are in a specific {state}.
   * The return results aggregate all partition states from all the resources in the given resource
   * group.
   * @param resourceGroupName
   * @param partitionName
   * @param state
   * @return empty list if there is no instance in a given state
   */
  public List<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName,
      String partitionName, String state) {
    return _routingTableRef.get().getInstancesForResourceGroup(resourceGroupName, partitionName,
        state);
  }

  /**
   * returns the instances for {resource group,partition} pair contains any of the given tags
   * that are in a specific {state}.
   * Find all resources belongs to the given resource group that have any of the given resource tags
   * and return the aggregated partition states from all these resources.
   * @param resourceGroupName
   * @param partitionName
   * @param state
   * @param resourceTags
   * @return empty list if there is no instance in a given state
   */
  public List<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName,
      String partitionName, String state, List<String> resourceTags) {
    return _routingTableRef.get().getInstancesForResourceGroup(resourceGroupName, partitionName,
        state, resourceTags);
  }

  /**
   * returns all instances for {resource} that are in a specific {state}
   * This method will be deprecated, please use the
   * {@link #getInstancesForResource(String, String) getInstancesForResource} method.
   * @param resourceName
   * @param state
   * @return empty list if there is no instance in a given state
   */
  public Set<InstanceConfig> getInstances(String resourceName, String state) {
    return getInstancesForResource(resourceName, state);
  }

  /**
   * returns all instances for {resource} that are in a specific {state}.
   * @param resourceName
   * @param state
   * @return empty list if there is no instance in a given state
   */
  public Set<InstanceConfig> getInstancesForResource(String resourceName, String state) {
    return _routingTableRef.get().getInstancesForResource(resourceName, state);
  }

  /**
   * returns all instances for all resources in {resource group} that are in a specific {state}
   * @param resourceGroupName
   * @param state
   * @return empty list if there is no instance in a given state
   */
  public Set<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName, String state) {
    return _routingTableRef.get().getInstancesForResourceGroup(resourceGroupName, state);
  }

  /**
   * returns all instances for resources contains any given tags in {resource group} that are in a
   * specific {state}
   * @param resourceGroupName
   * @param state
   * @return empty list if there is no instance in a given state
   */
  public Set<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName, String state,
      List<String> resourceTags) {
    return _routingTableRef.get().getInstancesForResourceGroup(resourceGroupName, state,
        resourceTags);
  }

  /**
   * Return all liveInstances in the cluster now.
   * @return
   */
  public Collection<LiveInstance> getLiveInstances() {
    return _routingTableRef.get().getLiveInstances();
  }

  /**
   * Return all instance's config in this cluster.
   * @return
   */
  public Collection<InstanceConfig> getInstanceConfigs() {
    return _routingTableRef.get().getInstanceConfigs();
  }

  /**
   * Return names of all resources (shown in ExternalView) in this cluster.
   */
  public Collection<String> getResources() {
    return _routingTableRef.get().getResources();
  }

  @Override
  @PreFetch(enabled = false)
  public void onExternalViewChange(List<ExternalView> externalViewList,
      NotificationContext changeContext) {
    HelixConstants.ChangeType changeType = changeContext.getChangeType();
    if (changeType != null && !changeType.getPropertyType().equals(_sourceDataType)) {
      logger.warn(
          "onExternalViewChange called with mismatched change types. Source data type {}, changed data type: {}",
          _sourceDataType, changeType);
      return;
    }
    // Refresh with full list of external view.
    if (externalViewList != null && externalViewList.size() > 0) {
      // keep this here for back-compatibility, application can call onExternalViewChange directly
      // with externalview list supplied.
      refresh(externalViewList, changeContext);
    } else {
      ClusterEventType eventType;
      if (_sourceDataType.equals(PropertyType.EXTERNALVIEW)) {
        eventType = ClusterEventType.ExternalViewChange;
      } else if (_sourceDataType.equals(PropertyType.TARGETEXTERNALVIEW)) {
        eventType = ClusterEventType.TargetExternalViewChange;
      } else {
        logger.warn(
            "onExternalViewChange called with mismatched change types. Source data type {}, change type: {}",
            _sourceDataType, changeType);
        return;
      }
      _routerUpdater.queueEvent(changeContext, eventType, changeType);
    }
  }

  @Override
  @PreFetch(enabled = false)
  public void onInstanceConfigChange(List<InstanceConfig> configs,
      NotificationContext changeContext) {
    _routerUpdater.queueEvent(changeContext, ClusterEventType.InstanceConfigChange,
        HelixConstants.ChangeType.INSTANCE_CONFIG);
  }

  @Override
  @PreFetch(enabled = false)
  public void onConfigChange(List<InstanceConfig> configs, NotificationContext changeContext) {
    onInstanceConfigChange(configs, changeContext);
  }

  @Override
  @PreFetch(enabled = true)
  public void onLiveInstanceChange(List<LiveInstance> liveInstances,
      NotificationContext changeContext) {
    if (_sourceDataType.equals(PropertyType.CURRENTSTATES)) {
      // Go though the live instance list and update CurrentState listeners
      updateCurrentStatesListeners(liveInstances, changeContext);
    }

    _routerUpdater.queueEvent(changeContext, ClusterEventType.LiveInstanceChange,
        HelixConstants.ChangeType.LIVE_INSTANCE);
  }

  @Override
  @PreFetch(enabled = false)
  public void onStateChange(String instanceName, List<CurrentState> statesInfo,
      NotificationContext changeContext) {
    if (_sourceDataType.equals(PropertyType.CURRENTSTATES)) {
      _routerUpdater.queueEvent(changeContext, ClusterEventType.CurrentStateChange,
          HelixConstants.ChangeType.CURRENT_STATE);
    } else {
      logger.warn(
          "RoutingTableProvider does not use CurrentStates as source, ignore CurrentState changes!");
    }
  }

  final AtomicReference<Map<String, LiveInstance>> _lastSeenSessions = new AtomicReference<>();

  /**
   * Go through all live instances in the cluster, add CurrentStateChange listener to
   * them if they are newly added, and remove CurrentStateChange listener if instance is offline.
   */
  private void updateCurrentStatesListeners(List<LiveInstance> liveInstances,
      NotificationContext changeContext) {
    HelixManager manager = changeContext.getManager();
    PropertyKey.Builder keyBuilder = new PropertyKey.Builder(manager.getClusterName());

    if (changeContext.getType() == NotificationContext.Type.FINALIZE) {
      // on finalize, should remove all current-state listeners
      logger.info("remove current-state listeners. lastSeenSessions: {}", _lastSeenSessions);
      liveInstances = Collections.emptyList();
    }

    Map<String, LiveInstance> curSessions = new HashMap<>();
    for (LiveInstance liveInstance : liveInstances) {
      curSessions.put(liveInstance.getSessionId(), liveInstance);
    }

    // Go though the live instance list and update CurrentState listeners
    synchronized (_lastSeenSessions) {
      Map<String, LiveInstance> lastSessions = _lastSeenSessions.get();
      if (lastSessions == null) {
        lastSessions = Collections.emptyMap();
      }

      // add listeners to new live instances
      for (String session : curSessions.keySet()) {
        if (!lastSessions.containsKey(session)) {
          String instanceName = curSessions.get(session).getInstanceName();
          try {
            // add current-state listeners for new sessions
            manager.addCurrentStateChangeListener(this, instanceName, session);
            logger.info("{} added current-state listener for instance: {}, session: {}, listener: {}",
                manager.getInstanceName(), instanceName, session, this);
          } catch (Exception e) {
            logger.error("Fail to add current state listener for instance: {} with session: {}", instanceName, session,
                e);
          }
        }
      }

      // remove current-state listener for expired session
      for (String session : lastSessions.keySet()) {
        if (!curSessions.containsKey(session)) {
          String instanceName = lastSessions.get(session).getInstanceName();
          manager.removeListener(keyBuilder.currentStates(instanceName, session), this);
          logger.info("remove current-state listener for instance: {}, session: {}", instanceName,
              session);
        }
      }

      // update last-seen
      _lastSeenSessions.set(curSessions);
    }
  }

  private void reset() {
    logger.info("Resetting the routing table.");
    RoutingTable newRoutingTable = new RoutingTable();
    _routingTableRef.set(newRoutingTable);
  }

  protected void refresh(List<ExternalView> externalViewList, NotificationContext changeContext) {
    HelixDataAccessor accessor = changeContext.getManager().getHelixDataAccessor();
    PropertyKey.Builder keyBuilder = accessor.keyBuilder();

    List<InstanceConfig> configList = accessor.getChildValues(keyBuilder.instanceConfigs());
    List<LiveInstance> liveInstances = accessor.getChildValues(keyBuilder.liveInstances());
    refresh(externalViewList, configList, liveInstances);
  }

  protected void refresh(Collection<ExternalView> externalViews,
      Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances) {
    long startTime = System.currentTimeMillis();
    RoutingTable newRoutingTable = new RoutingTable(externalViews, instanceConfigs, liveInstances);
    resetRoutingTableAndNotify(startTime, newRoutingTable);
  }

  protected void refresh(Map<String, Map<String, Map<String, CurrentState>>> currentStateMap,
      Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances) {
    long startTime = System.currentTimeMillis();
    RoutingTable newRoutingTable =
        new RoutingTable(currentStateMap, instanceConfigs, liveInstances);
    resetRoutingTableAndNotify(startTime, newRoutingTable);
  }

  private void resetRoutingTableAndNotify(long startTime, RoutingTable newRoutingTable) {
    _routingTableRef.set(newRoutingTable);
    logger.info("Refresh the RoutingTable for cluster {}, takes {} ms.",
        (_helixManager != null ? _helixManager.getClusterName() : null),
        (System.currentTimeMillis() - startTime));
    notifyRoutingTableChange();

    // Update timestamp for last refresh
    if (_isPeriodicRefreshEnabled) {
      _lastRefreshTimestamp = System.currentTimeMillis();
    }
  }

  private void notifyRoutingTableChange() {
    for (Map.Entry<RoutingTableChangeListener, ListenerContext> entry : _routingTableChangeListenerMap
        .entrySet()) {
      entry.getKey().onRoutingTableChange(new RoutingTableSnapshot(_routingTableRef.get()),
          entry.getValue().getContext());
    }
  }

  private class RouterUpdater extends ClusterEventProcessor {
    private final RoutingDataCache _dataCache;

    public RouterUpdater(String clusterName, PropertyType sourceDataType) {
      super(clusterName, "Helix-RouterUpdater-event_process");
      _dataCache = new RoutingDataCache(clusterName, sourceDataType);
    }

    @Override
    protected void handleEvent(ClusterEvent event) {
      NotificationContext changeContext = event.getAttribute(AttributeName.changeContext.name());
      // session has expired clean up the routing table
      if (changeContext.getType() == NotificationContext.Type.FINALIZE) {
        reset();
      } else {
        // refresh routing table.
        HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
        if (manager == null) {
          logger.error(String.format("HelixManager is null for router update event: %s", event));
          throw new HelixException("HelixManager is null for router update event.");
        }
        if (!manager.isConnected()) {
          logger.error(String.format("HelixManager is not connected for router update event: %s", event));
          throw new HelixException("HelixManager is not connected for router update event.");
        }

        long startTime = System.currentTimeMillis();

        _dataCache.refresh(manager.getHelixDataAccessor());
        switch (_sourceDataType) {
        case EXTERNALVIEW:
          refresh(_dataCache.getExternalViews().values(),
              _dataCache.getInstanceConfigMap().values(), _dataCache.getLiveInstances().values());
          break;
        case TARGETEXTERNALVIEW:
          refresh(_dataCache.getTargetExternalViews().values(),
              _dataCache.getInstanceConfigMap().values(), _dataCache.getLiveInstances().values());
          break;
        case CURRENTSTATES:
          refresh(_dataCache.getCurrentStatesMap(), _dataCache.getInstanceConfigMap().values(),
              _dataCache.getLiveInstances().values());

          recordPropagationLatency(System.currentTimeMillis(), _dataCache.getCurrentStateSnapshot());
          break;
        default:
          logger.warn("Unsupported source data type: {}, stop refreshing the routing table!",
              _sourceDataType);
        }

        _monitor.increaseDataRefreshCounters(startTime);
      }
    }

    /**
     * Report current state to routing table propagation latency
     * This method is not threadsafe. Take care of _reportingTask atomicity if use in multi-threads.
     */
    private void recordPropagationLatency(final long currentTime, final CurrentStateSnapshot currentStateSnapshot) {
      // Note that due to the extra mem footprint introduced by currentStateSnapshot ref, we restrict running report task count to be 1.
      // Any parallel tasks will be skipped. So the reporting metric data is sampled.
      if (_reportingTask == null || _reportingTask.isDone()) {
        _reportingTask = _reportExecutor.submit(new Callable<Object>() {
          @Override public Object call() {
            // getNewCurrentStateEndTimes() needs to iterate all current states. Make it async to avoid performance impact.
            Map<PropertyKey, Map<String, Long>> currentStateEndTimeMap =
                currentStateSnapshot.getNewCurrentStateEndTimes();
            for (PropertyKey key : currentStateEndTimeMap.keySet()) {
              Map<String, Long> partitionStateEndTimes = currentStateEndTimeMap.get(key);
              for (String partition : partitionStateEndTimes.keySet()) {
                long endTime = partitionStateEndTimes.get(partition);
                if (currentTime >= endTime) {
                  _monitor.recordStatePropagationLatency(currentTime - endTime);
                  logger.debug(
                      "CurrentState updated in the routing table. Node Key {}, Partition {}, end time {}, Propagation latency {}",
                      key.toString(), partition, endTime, currentTime - endTime);
                } else {
                  // Verbose log in case currentTime < endTime. This could be the case that Router clock is slower than the participant clock.
                  logger.trace(
                      "CurrentState updated in the routing table. Node Key {}, Partition {}, end time {}, Propagation latency {}",
                      key.toString(), partition, endTime, currentTime - endTime);
                }
              }
            }
            return null;
          }
        });
      }
    }

    public void queueEvent(NotificationContext context, ClusterEventType eventType,
        HelixConstants.ChangeType changeType) {
      ClusterEvent event = new ClusterEvent(_clusterName, eventType);
      if (context == null || context.getType() != NotificationContext.Type.CALLBACK
          || context.getType() == NotificationContext.Type.PERIODIC_REFRESH) {
        _dataCache.requireFullRefresh();
      } else {
        _dataCache.notifyDataChange(changeType, context.getPathChanged());
      }

      // Null check for manager in the following line is done in handleEvent()
      event.addAttribute(AttributeName.helixmanager.name(), context.getManager());
      event.addAttribute(AttributeName.changeContext.name(), context);
      queueEvent(event);

      _monitor.increaseCallbackCounters(_eventQueue.size());
    }
  }

  private class ListenerContext {
    private Object _context;

    public ListenerContext(Object context) {
      _context = context;
    }

    public Object getContext() {
      return _context;
    }
  }
}
