blob: 53381458651d1773b92d9e00b9ea7e254e0a808e [file] [log] [blame]
package org.apache.helix.spectator;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyType;
import org.apache.helix.api.listeners.ConfigChangeListener;
import org.apache.helix.api.listeners.CurrentStateChangeListener;
import org.apache.helix.api.listeners.ExternalViewChangeListener;
import org.apache.helix.api.listeners.InstanceConfigChangeListener;
import org.apache.helix.api.listeners.LiveInstanceChangeListener;
import org.apache.helix.api.listeners.PreFetch;
import org.apache.helix.api.listeners.RoutingTableChangeListener;
import org.apache.helix.common.ClusterEventProcessor;
import org.apache.helix.common.caches.CurrentStateSnapshot;
import org.apache.helix.controller.stages.AttributeName;
import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.controller.stages.ClusterEventType;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.monitoring.mbeans.RoutingTableProviderMonitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.management.JMException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
public class RoutingTableProvider
implements ExternalViewChangeListener, InstanceConfigChangeListener, ConfigChangeListener,
LiveInstanceChangeListener, CurrentStateChangeListener {
private static final Logger logger = LoggerFactory.getLogger(RoutingTableProvider.class);
private static final long DEFAULT_PERIODIC_REFRESH_INTERVAL = 300000L; // 5 minutes
private final AtomicReference<RoutingTable> _routingTableRef;
private final HelixManager _helixManager;
private final RouterUpdater _routerUpdater;
private final PropertyType _sourceDataType;
private final Map<RoutingTableChangeListener, ListenerContext> _routingTableChangeListenerMap;
private final RoutingTableProviderMonitor _monitor;
// For periodic refresh
private long _lastRefreshTimestamp;
private boolean _isPeriodicRefreshEnabled = true; // Default is enabled
private long _periodRefreshInterval;
private ScheduledThreadPoolExecutor _periodicRefreshExecutor;
// For computing intensive reporting logic
private ExecutorService _reportExecutor;
private Future _reportingTask = null;
public RoutingTableProvider() {
this(null);
}
public RoutingTableProvider(HelixManager helixManager) throws HelixException {
this(helixManager, PropertyType.EXTERNALVIEW, true, DEFAULT_PERIODIC_REFRESH_INTERVAL);
}
public RoutingTableProvider(HelixManager helixManager, PropertyType sourceDataType)
throws HelixException {
this(helixManager, sourceDataType, true, DEFAULT_PERIODIC_REFRESH_INTERVAL);
}
/**
* Initialize an instance of RoutingTableProvider
*
* @param helixManager
* @param sourceDataType
* @param isPeriodicRefreshEnabled true if periodic refresh is enabled, false otherwise
* @param periodRefreshInterval only effective if isPeriodRefreshEnabled is true
* @throws HelixException
*/
public RoutingTableProvider(HelixManager helixManager, PropertyType sourceDataType,
boolean isPeriodicRefreshEnabled, long periodRefreshInterval) throws HelixException {
_routingTableRef = new AtomicReference<>(new RoutingTable());
_helixManager = helixManager;
_sourceDataType = sourceDataType;
_routingTableChangeListenerMap = new ConcurrentHashMap<>();
String clusterName = _helixManager != null ? _helixManager.getClusterName() : null;
_monitor = new RoutingTableProviderMonitor(_sourceDataType, clusterName);
try {
_monitor.register();
} catch (JMException e) {
logger.error("Failed to register RoutingTableProvider monitor MBean.", e);
}
_reportExecutor = Executors.newSingleThreadExecutor();
_routerUpdater = new RouterUpdater(clusterName, _sourceDataType);
_routerUpdater.start();
if (_helixManager != null) {
switch (_sourceDataType) {
case EXTERNALVIEW:
try {
_helixManager.addExternalViewChangeListener(this);
} catch (Exception e) {
shutdown();
logger.error("Failed to attach ExternalView Listener to HelixManager!");
throw new HelixException("Failed to attach ExternalView Listener to HelixManager!", e);
}
break;
case TARGETEXTERNALVIEW:
// Check whether target external has been enabled or not
if (!_helixManager.getHelixDataAccessor().getBaseDataAccessor().exists(
_helixManager.getHelixDataAccessor().keyBuilder().targetExternalViews().getPath(), 0)) {
shutdown();
throw new HelixException("Target External View is not enabled!");
}
try {
_helixManager.addTargetExternalViewChangeListener(this);
} catch (Exception e) {
shutdown();
logger.error("Failed to attach TargetExternalView Listener to HelixManager!");
throw new HelixException("Failed to attach TargetExternalView Listener to HelixManager!",
e);
}
break;
case CURRENTSTATES:
// CurrentState change listeners will be added later in LiveInstanceChange call.
break;
default:
throw new HelixException(String.format("Unsupported source data type: %s", sourceDataType));
}
try {
_helixManager.addInstanceConfigChangeListener(this);
_helixManager.addLiveInstanceChangeListener(this);
} catch (Exception e) {
shutdown();
logger.error(
"Failed to attach InstanceConfig and LiveInstance Change listeners to HelixManager!");
throw new HelixException(
"Failed to attach InstanceConfig and LiveInstance Change listeners to HelixManager!",
e);
}
}
// For periodic refresh
if (isPeriodicRefreshEnabled && _helixManager != null) {
_lastRefreshTimestamp = System.currentTimeMillis(); // Initialize timestamp with current time
_periodRefreshInterval = periodRefreshInterval;
// Construct a periodic refresh context
final NotificationContext periodicRefreshContext = new NotificationContext(_helixManager);
periodicRefreshContext.setType(NotificationContext.Type.PERIODIC_REFRESH);
// Create a thread that runs at specified interval
_periodicRefreshExecutor = new ScheduledThreadPoolExecutor(1);
_periodicRefreshExecutor.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
// If enough time has elapsed since last refresh, queue a refresh event
if (_lastRefreshTimestamp + _periodRefreshInterval < System.currentTimeMillis()) {
// changeType is irrelevant for NotificationContext.Type.PERIODIC_REFRESH
_routerUpdater.queueEvent(periodicRefreshContext, ClusterEventType.PeriodicalRebalance,
null);
}
}
}, _periodRefreshInterval, _periodRefreshInterval, TimeUnit.MILLISECONDS);
} else {
_isPeriodicRefreshEnabled = false;
}
}
/**
* Shutdown current RoutingTableProvider. Once it is shutdown, it should never be reused.
*/
public void shutdown() {
if (_periodicRefreshExecutor != null) {
_periodicRefreshExecutor.purge();
_periodicRefreshExecutor.shutdown();
}
_routerUpdater.shutdown();
_monitor.unregister();
if (_helixManager != null) {
PropertyKey.Builder keyBuilder = _helixManager.getHelixDataAccessor().keyBuilder();
switch (_sourceDataType) {
case EXTERNALVIEW:
_helixManager.removeListener(keyBuilder.externalViews(), this);
break;
case TARGETEXTERNALVIEW:
_helixManager.removeListener(keyBuilder.targetExternalViews(), this);
break;
case CURRENTSTATES:
NotificationContext context = new NotificationContext(_helixManager);
context.setType(NotificationContext.Type.FINALIZE);
updateCurrentStatesListeners(Collections.<LiveInstance> emptyList(), context);
break;
default:
break;
}
}
}
/**
* Get an snapshot of current RoutingTable information. The snapshot is immutable, it reflects the
* routing table information at the time this method is called.
* @return snapshot of current routing table.
*/
public RoutingTableSnapshot getRoutingTableSnapshot() {
return new RoutingTableSnapshot(_routingTableRef.get());
}
/**
* Add RoutingTableChangeListener with user defined context
* @param routingTableChangeListener
* @param context user defined context
*/
public void addRoutingTableChangeListener(
final RoutingTableChangeListener routingTableChangeListener, Object context) {
_routingTableChangeListenerMap.put(routingTableChangeListener, new ListenerContext(context));
logger.info("Attach RoutingTableProviderChangeListener {}",
routingTableChangeListener.getClass().getName());
}
/**
* Remove RoutingTableChangeListener
* @param routingTableChangeListener
*/
public Object removeRoutingTableChangeListener(
final RoutingTableChangeListener routingTableChangeListener) {
logger.info("Detach RoutingTableProviderChangeListener {}",
routingTableChangeListener.getClass().getName());
return _routingTableChangeListenerMap.remove(routingTableChangeListener);
}
/**
* returns the instances for {resource,partition} pair that are in a specific
* {state}
* This method will be deprecated, please use the
* {@link #getInstancesForResource(String, String, String)} getInstancesForResource} method.
* @param resourceName
* -
* @param partitionName
* @param state
* @return empty list if there is no instance in a given state
*/
public List<InstanceConfig> getInstances(String resourceName, String partitionName,
String state) {
return getInstancesForResource(resourceName, partitionName, state);
}
/**
* returns the instances for {resource,partition} pair that are in a specific
* {state}
* @param resourceName
* -
* @param partitionName
* @param state
* @return empty list if there is no instance in a given state
*/
public List<InstanceConfig> getInstancesForResource(String resourceName, String partitionName,
String state) {
return _routingTableRef.get().getInstancesForResource(resourceName, partitionName, state);
}
/**
* returns the instances for {resource group,partition} pair in all resources belongs to the given
* resource group that are in a specific {state}.
* The return results aggregate all partition states from all the resources in the given resource
* group.
* @param resourceGroupName
* @param partitionName
* @param state
* @return empty list if there is no instance in a given state
*/
public List<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName,
String partitionName, String state) {
return _routingTableRef.get().getInstancesForResourceGroup(resourceGroupName, partitionName,
state);
}
/**
* returns the instances for {resource group,partition} pair contains any of the given tags
* that are in a specific {state}.
* Find all resources belongs to the given resource group that have any of the given resource tags
* and return the aggregated partition states from all these resources.
* @param resourceGroupName
* @param partitionName
* @param state
* @param resourceTags
* @return empty list if there is no instance in a given state
*/
public List<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName,
String partitionName, String state, List<String> resourceTags) {
return _routingTableRef.get().getInstancesForResourceGroup(resourceGroupName, partitionName,
state, resourceTags);
}
/**
* returns all instances for {resource} that are in a specific {state}
* This method will be deprecated, please use the
* {@link #getInstancesForResource(String, String) getInstancesForResource} method.
* @param resourceName
* @param state
* @return empty list if there is no instance in a given state
*/
public Set<InstanceConfig> getInstances(String resourceName, String state) {
return getInstancesForResource(resourceName, state);
}
/**
* returns all instances for {resource} that are in a specific {state}.
* @param resourceName
* @param state
* @return empty list if there is no instance in a given state
*/
public Set<InstanceConfig> getInstancesForResource(String resourceName, String state) {
return _routingTableRef.get().getInstancesForResource(resourceName, state);
}
/**
* returns all instances for all resources in {resource group} that are in a specific {state}
* @param resourceGroupName
* @param state
* @return empty list if there is no instance in a given state
*/
public Set<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName, String state) {
return _routingTableRef.get().getInstancesForResourceGroup(resourceGroupName, state);
}
/**
* returns all instances for resources contains any given tags in {resource group} that are in a
* specific {state}
* @param resourceGroupName
* @param state
* @return empty list if there is no instance in a given state
*/
public Set<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName, String state,
List<String> resourceTags) {
return _routingTableRef.get().getInstancesForResourceGroup(resourceGroupName, state,
resourceTags);
}
/**
* Return all liveInstances in the cluster now.
* @return
*/
public Collection<LiveInstance> getLiveInstances() {
return _routingTableRef.get().getLiveInstances();
}
/**
* Return all instance's config in this cluster.
* @return
*/
public Collection<InstanceConfig> getInstanceConfigs() {
return _routingTableRef.get().getInstanceConfigs();
}
/**
* Return names of all resources (shown in ExternalView) in this cluster.
*/
public Collection<String> getResources() {
return _routingTableRef.get().getResources();
}
@Override
@PreFetch(enabled = false)
public void onExternalViewChange(List<ExternalView> externalViewList,
NotificationContext changeContext) {
HelixConstants.ChangeType changeType = changeContext.getChangeType();
if (changeType != null && !changeType.getPropertyType().equals(_sourceDataType)) {
logger.warn(
"onExternalViewChange called with mismatched change types. Source data type {}, changed data type: {}",
_sourceDataType, changeType);
return;
}
// Refresh with full list of external view.
if (externalViewList != null && externalViewList.size() > 0) {
// keep this here for back-compatibility, application can call onExternalViewChange directly
// with externalview list supplied.
refresh(externalViewList, changeContext);
} else {
ClusterEventType eventType;
if (_sourceDataType.equals(PropertyType.EXTERNALVIEW)) {
eventType = ClusterEventType.ExternalViewChange;
} else if (_sourceDataType.equals(PropertyType.TARGETEXTERNALVIEW)) {
eventType = ClusterEventType.TargetExternalViewChange;
} else {
logger.warn(
"onExternalViewChange called with mismatched change types. Source data type {}, change type: {}",
_sourceDataType, changeType);
return;
}
_routerUpdater.queueEvent(changeContext, eventType, changeType);
}
}
@Override
@PreFetch(enabled = false)
public void onInstanceConfigChange(List<InstanceConfig> configs,
NotificationContext changeContext) {
_routerUpdater.queueEvent(changeContext, ClusterEventType.InstanceConfigChange,
HelixConstants.ChangeType.INSTANCE_CONFIG);
}
@Override
@PreFetch(enabled = false)
public void onConfigChange(List<InstanceConfig> configs, NotificationContext changeContext) {
onInstanceConfigChange(configs, changeContext);
}
@Override
@PreFetch(enabled = true)
public void onLiveInstanceChange(List<LiveInstance> liveInstances,
NotificationContext changeContext) {
if (_sourceDataType.equals(PropertyType.CURRENTSTATES)) {
// Go though the live instance list and update CurrentState listeners
updateCurrentStatesListeners(liveInstances, changeContext);
}
_routerUpdater.queueEvent(changeContext, ClusterEventType.LiveInstanceChange,
HelixConstants.ChangeType.LIVE_INSTANCE);
}
@Override
@PreFetch(enabled = false)
public void onStateChange(String instanceName, List<CurrentState> statesInfo,
NotificationContext changeContext) {
if (_sourceDataType.equals(PropertyType.CURRENTSTATES)) {
_routerUpdater.queueEvent(changeContext, ClusterEventType.CurrentStateChange,
HelixConstants.ChangeType.CURRENT_STATE);
} else {
logger.warn(
"RoutingTableProvider does not use CurrentStates as source, ignore CurrentState changes!");
}
}
final AtomicReference<Map<String, LiveInstance>> _lastSeenSessions = new AtomicReference<>();
/**
* Go through all live instances in the cluster, add CurrentStateChange listener to
* them if they are newly added, and remove CurrentStateChange listener if instance is offline.
*/
private void updateCurrentStatesListeners(List<LiveInstance> liveInstances,
NotificationContext changeContext) {
HelixManager manager = changeContext.getManager();
PropertyKey.Builder keyBuilder = new PropertyKey.Builder(manager.getClusterName());
if (changeContext.getType() == NotificationContext.Type.FINALIZE) {
// on finalize, should remove all current-state listeners
logger.info("remove current-state listeners. lastSeenSessions: {}", _lastSeenSessions);
liveInstances = Collections.emptyList();
}
Map<String, LiveInstance> curSessions = new HashMap<>();
for (LiveInstance liveInstance : liveInstances) {
curSessions.put(liveInstance.getSessionId(), liveInstance);
}
// Go though the live instance list and update CurrentState listeners
synchronized (_lastSeenSessions) {
Map<String, LiveInstance> lastSessions = _lastSeenSessions.get();
if (lastSessions == null) {
lastSessions = Collections.emptyMap();
}
// add listeners to new live instances
for (String session : curSessions.keySet()) {
if (!lastSessions.containsKey(session)) {
String instanceName = curSessions.get(session).getInstanceName();
try {
// add current-state listeners for new sessions
manager.addCurrentStateChangeListener(this, instanceName, session);
logger.info("{} added current-state listener for instance: {}, session: {}, listener: {}",
manager.getInstanceName(), instanceName, session, this);
} catch (Exception e) {
logger.error("Fail to add current state listener for instance: {} with session: {}", instanceName, session,
e);
}
}
}
// remove current-state listener for expired session
for (String session : lastSessions.keySet()) {
if (!curSessions.containsKey(session)) {
String instanceName = lastSessions.get(session).getInstanceName();
manager.removeListener(keyBuilder.currentStates(instanceName, session), this);
logger.info("remove current-state listener for instance: {}, session: {}", instanceName,
session);
}
}
// update last-seen
_lastSeenSessions.set(curSessions);
}
}
private void reset() {
logger.info("Resetting the routing table.");
RoutingTable newRoutingTable = new RoutingTable();
_routingTableRef.set(newRoutingTable);
}
protected void refresh(List<ExternalView> externalViewList, NotificationContext changeContext) {
HelixDataAccessor accessor = changeContext.getManager().getHelixDataAccessor();
PropertyKey.Builder keyBuilder = accessor.keyBuilder();
List<InstanceConfig> configList = accessor.getChildValues(keyBuilder.instanceConfigs());
List<LiveInstance> liveInstances = accessor.getChildValues(keyBuilder.liveInstances());
refresh(externalViewList, configList, liveInstances);
}
protected void refresh(Collection<ExternalView> externalViews,
Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances) {
long startTime = System.currentTimeMillis();
RoutingTable newRoutingTable = new RoutingTable(externalViews, instanceConfigs, liveInstances);
resetRoutingTableAndNotify(startTime, newRoutingTable);
}
protected void refresh(Map<String, Map<String, Map<String, CurrentState>>> currentStateMap,
Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances) {
long startTime = System.currentTimeMillis();
RoutingTable newRoutingTable =
new RoutingTable(currentStateMap, instanceConfigs, liveInstances);
resetRoutingTableAndNotify(startTime, newRoutingTable);
}
private void resetRoutingTableAndNotify(long startTime, RoutingTable newRoutingTable) {
_routingTableRef.set(newRoutingTable);
logger.info("Refresh the RoutingTable for cluster {}, takes {} ms.",
(_helixManager != null ? _helixManager.getClusterName() : null),
(System.currentTimeMillis() - startTime));
notifyRoutingTableChange();
// Update timestamp for last refresh
if (_isPeriodicRefreshEnabled) {
_lastRefreshTimestamp = System.currentTimeMillis();
}
}
private void notifyRoutingTableChange() {
for (Map.Entry<RoutingTableChangeListener, ListenerContext> entry : _routingTableChangeListenerMap
.entrySet()) {
entry.getKey().onRoutingTableChange(new RoutingTableSnapshot(_routingTableRef.get()),
entry.getValue().getContext());
}
}
private class RouterUpdater extends ClusterEventProcessor {
private final RoutingDataCache _dataCache;
public RouterUpdater(String clusterName, PropertyType sourceDataType) {
super(clusterName, "Helix-RouterUpdater-event_process");
_dataCache = new RoutingDataCache(clusterName, sourceDataType);
}
@Override
protected void handleEvent(ClusterEvent event) {
NotificationContext changeContext = event.getAttribute(AttributeName.changeContext.name());
// session has expired clean up the routing table
if (changeContext.getType() == NotificationContext.Type.FINALIZE) {
reset();
} else {
// refresh routing table.
HelixManager manager = event.getAttribute(AttributeName.helixmanager.name());
if (manager == null) {
logger.error(String.format("HelixManager is null for router update event: %s", event));
throw new HelixException("HelixManager is null for router update event.");
}
if (!manager.isConnected()) {
logger.error(String.format("HelixManager is not connected for router update event: %s", event));
throw new HelixException("HelixManager is not connected for router update event.");
}
long startTime = System.currentTimeMillis();
_dataCache.refresh(manager.getHelixDataAccessor());
switch (_sourceDataType) {
case EXTERNALVIEW:
refresh(_dataCache.getExternalViews().values(),
_dataCache.getInstanceConfigMap().values(), _dataCache.getLiveInstances().values());
break;
case TARGETEXTERNALVIEW:
refresh(_dataCache.getTargetExternalViews().values(),
_dataCache.getInstanceConfigMap().values(), _dataCache.getLiveInstances().values());
break;
case CURRENTSTATES:
refresh(_dataCache.getCurrentStatesMap(), _dataCache.getInstanceConfigMap().values(),
_dataCache.getLiveInstances().values());
recordPropagationLatency(System.currentTimeMillis(), _dataCache.getCurrentStateSnapshot());
break;
default:
logger.warn("Unsupported source data type: {}, stop refreshing the routing table!",
_sourceDataType);
}
_monitor.increaseDataRefreshCounters(startTime);
}
}
/**
* Report current state to routing table propagation latency
* This method is not threadsafe. Take care of _reportingTask atomicity if use in multi-threads.
*/
private void recordPropagationLatency(final long currentTime, final CurrentStateSnapshot currentStateSnapshot) {
// Note that due to the extra mem footprint introduced by currentStateSnapshot ref, we restrict running report task count to be 1.
// Any parallel tasks will be skipped. So the reporting metric data is sampled.
if (_reportingTask == null || _reportingTask.isDone()) {
_reportingTask = _reportExecutor.submit(new Callable<Object>() {
@Override public Object call() {
// getNewCurrentStateEndTimes() needs to iterate all current states. Make it async to avoid performance impact.
Map<PropertyKey, Map<String, Long>> currentStateEndTimeMap =
currentStateSnapshot.getNewCurrentStateEndTimes();
for (PropertyKey key : currentStateEndTimeMap.keySet()) {
Map<String, Long> partitionStateEndTimes = currentStateEndTimeMap.get(key);
for (String partition : partitionStateEndTimes.keySet()) {
long endTime = partitionStateEndTimes.get(partition);
if (currentTime >= endTime) {
_monitor.recordStatePropagationLatency(currentTime - endTime);
logger.debug(
"CurrentState updated in the routing table. Node Key {}, Partition {}, end time {}, Propagation latency {}",
key.toString(), partition, endTime, currentTime - endTime);
} else {
// Verbose log in case currentTime < endTime. This could be the case that Router clock is slower than the participant clock.
logger.trace(
"CurrentState updated in the routing table. Node Key {}, Partition {}, end time {}, Propagation latency {}",
key.toString(), partition, endTime, currentTime - endTime);
}
}
}
return null;
}
});
}
}
public void queueEvent(NotificationContext context, ClusterEventType eventType,
HelixConstants.ChangeType changeType) {
ClusterEvent event = new ClusterEvent(_clusterName, eventType);
if (context == null || context.getType() != NotificationContext.Type.CALLBACK
|| context.getType() == NotificationContext.Type.PERIODIC_REFRESH) {
_dataCache.requireFullRefresh();
} else {
_dataCache.notifyDataChange(changeType, context.getPathChanged());
}
// Null check for manager in the following line is done in handleEvent()
event.addAttribute(AttributeName.helixmanager.name(), context.getManager());
event.addAttribute(AttributeName.changeContext.name(), context);
queueEvent(event);
_monitor.increaseCallbackCounters(_eventQueue.size());
}
}
private class ListenerContext {
private Object _context;
public ListenerContext(Object context) {
_context = context;
}
public Object getContext() {
return _context;
}
}
}