blob: db9ada93c4694d3d5282fae86bea0f0fdb514e97 [file] [log] [blame]
package org.apache.helix.spectator;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import com.google.common.collect.ImmutableMap;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.PropertyType;
import org.apache.helix.common.caches.BasicClusterDataCache;
import org.apache.helix.common.caches.CurrentStateCache;
import org.apache.helix.common.caches.CurrentStateSnapshot;
import org.apache.helix.common.caches.CustomizedViewCache;
import org.apache.helix.common.caches.TargetExternalViewCache;
import org.apache.helix.constants.InstanceConstants;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.CustomizedView;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Cache the cluster data that are needed by RoutingTableProvider.
*/
class RoutingDataCache extends BasicClusterDataCache {
private static Logger LOG = LoggerFactory.getLogger(RoutingDataCache.class.getName());
private final Map<PropertyType, List<String>> _sourceDataTypeMap;
private CurrentStateCache _currentStateCache;
// TODO: CustomizedCache needs to be migrated to propertyCache. Once we migrate all cache to
// propertyCache, this hardcoded list of fields won't be necessary.
private Map<String, CustomizedViewCache> _customizedViewCaches;
private TargetExternalViewCache _targetExternalViewCache;
private Map<String, LiveInstance> _routableLiveInstanceMap;
private Map<String, InstanceConfig> _routableInstanceConfigMap;
public RoutingDataCache(String clusterName, PropertyType sourceDataType) {
this (clusterName, ImmutableMap.of(sourceDataType, Collections.emptyList()));
}
/**
* Initialize empty RoutingDataCache with clusterName, _propertyTypes.
* @param clusterName
* @param sourceDataTypeMap
*/
public RoutingDataCache(String clusterName, Map<PropertyType, List<String>> sourceDataTypeMap) {
super(clusterName);
_sourceDataTypeMap = sourceDataTypeMap;
_currentStateCache = new CurrentStateCache(clusterName);
_customizedViewCaches = new HashMap<>();
sourceDataTypeMap.getOrDefault(PropertyType.CUSTOMIZEDVIEW, Collections.emptyList())
.forEach(customizedStateType -> _customizedViewCaches.put(customizedStateType,
new CustomizedViewCache(clusterName, customizedStateType)));
_targetExternalViewCache = new TargetExternalViewCache(clusterName);
_routableInstanceConfigMap = new HashMap<>();
_routableLiveInstanceMap = new HashMap<>();
requireFullRefresh();
}
/**
* This refreshes the cluster data by re-fetching the data from zookeeper in an efficient way
*
* @param accessor
*
* @return
*/
@Override
public synchronized void refresh(HelixDataAccessor accessor) {
LOG.info("START: RoutingDataCache.refresh() for cluster " + _clusterName);
long startTime = System.currentTimeMillis();
// Store whether a refresh for routable instances is necessary, as the super.refresh() call will
// set the _propertyDataChangedMap values for the instance config and live instance change types to false.
boolean refreshRoutableInstanceConfigs =
_propertyDataChangedMap.getOrDefault(HelixConstants.ChangeType.INSTANCE_CONFIG, false);
// If there is an InstanceConfig change, update the routable instance configs and live instances.
// Must also do live instances because whether and instance is routable is based off of the instance config.
boolean refreshRoutableLiveInstances =
_propertyDataChangedMap.getOrDefault(HelixConstants.ChangeType.LIVE_INSTANCE, false)
|| refreshRoutableInstanceConfigs;
super.refresh(accessor);
if (refreshRoutableInstanceConfigs) {
updateRoutableInstanceConfigMap(_instanceConfigPropertyCache.getPropertyMap());
}
if (refreshRoutableLiveInstances) {
updateRoutableLiveInstanceMap(getRoutableInstanceConfigMap(),
_liveInstancePropertyCache.getPropertyMap());
}
for (PropertyType propertyType : _sourceDataTypeMap.keySet()) {
long start = System.currentTimeMillis();
switch (propertyType) {
case TARGETEXTERNALVIEW:
if (_propertyDataChangedMap.get(HelixConstants.ChangeType.TARGET_EXTERNAL_VIEW)) {
_propertyDataChangedMap.put(HelixConstants.ChangeType.TARGET_EXTERNAL_VIEW, false);
_targetExternalViewCache.refresh(accessor);
LOG.info("Reload " + _targetExternalViewCache.getExternalViewMap().keySet().size()
+ " TargetExternalViews. Takes " + (System.currentTimeMillis() - start) + " ms");
}
break;
case CURRENTSTATES:
if (_propertyDataChangedMap.get(HelixConstants.ChangeType.CURRENT_STATE)) {
_propertyDataChangedMap.put(HelixConstants.ChangeType.CURRENT_STATE, false);
/**
* Workaround of https://github.com/apache/helix/issues/919.
* Why it is workaround?
* 1. Before a larger scale refactoring, to minimize the impact on cache logic, this change
* introduces extra read to update the liveInstance list before processing current states.
* 2. This change does not handle the corresponding callback handlers, which should also be
* registered when a new liveInstance node is found.
* TODO: Refactor cache processing logic and also refine the callback handler registration
* TODO: logic.
**/
_liveInstancePropertyCache.refresh(accessor);
updateRoutableLiveInstanceMap(getRoutableInstanceConfigMap(),
_liveInstancePropertyCache.getPropertyMap());
Map<String, LiveInstance> liveInstanceMap = getRoutableLiveInstances();
_currentStateCache.refresh(accessor, liveInstanceMap);
LOG.info("Reload CurrentStates. Takes " + (System.currentTimeMillis() - start) + " ms");
}
break;
case CUSTOMIZEDVIEW: {
if (_propertyDataChangedMap.get(HelixConstants.ChangeType.CUSTOMIZED_VIEW)) {
for (String customizedStateType : _sourceDataTypeMap.get(PropertyType.CUSTOMIZEDVIEW)) {
_customizedViewCaches.get(customizedStateType).refresh(accessor);
}
LOG.info("Reload CustomizedView for types "
+ _sourceDataTypeMap.get(PropertyType.CUSTOMIZEDVIEW) + " Takes "
+ (System.currentTimeMillis() - start) + " ms");
}
_propertyDataChangedMap.put(HelixConstants.ChangeType.CUSTOMIZED_VIEW, false);
}
break;
default:
break;
}
}
long endTime = System.currentTimeMillis();
LOG.info("END: RoutingDataCache.refresh() for cluster " + _clusterName + ", took " + (endTime
- startTime) + " ms");
LOG.debug("CurrentStates: {}", _currentStateCache);
LOG.debug("TargetExternalViews: {}", _targetExternalViewCache.getExternalViewMap());
if (LOG.isDebugEnabled()) {
for (String customizedStateType : _sourceDataTypeMap
.getOrDefault(PropertyType.CUSTOMIZEDVIEW, Collections.emptyList())) {
LOG.debug("CustomizedViews customizedStateType: {} {}", customizedStateType,
_customizedViewCaches.get(customizedStateType).getCustomizedViewMap());
}
}
}
private void updateRoutableInstanceConfigMap(Map<String, InstanceConfig> instanceConfigMap) {
_routableInstanceConfigMap = instanceConfigMap.entrySet().stream().filter(
(instanceConfigEntry) -> !InstanceConstants.UNSERVABLE_INSTANCE_OPERATIONS.contains(
instanceConfigEntry.getValue().getInstanceOperation()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
private void updateRoutableLiveInstanceMap(Map<String, InstanceConfig> instanceConfigMap,
Map<String, LiveInstance> liveInstanceMap) {
_routableLiveInstanceMap = liveInstanceMap.entrySet().stream().filter(
(liveInstanceEntry) -> instanceConfigMap.containsKey(liveInstanceEntry.getKey())
&& !InstanceConstants.UNSERVABLE_INSTANCE_OPERATIONS.contains(
instanceConfigMap.get(liveInstanceEntry.getKey()).getInstanceOperation()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
/**
* Returns the LiveInstances for each of the routable instances that are currently up and
* running.
*
* @return a map of LiveInstances
*/
public Map<String, LiveInstance> getRoutableLiveInstances() {
return Collections.unmodifiableMap(_routableLiveInstanceMap);
}
/**
* Returns the instance config map for all the routable instances that are in the cluster.
*
* @return a map of InstanceConfigs
*/
public Map<String, InstanceConfig> getRoutableInstanceConfigMap() {
return Collections.unmodifiableMap(_routableInstanceConfigMap);
}
/**
* Retrieves the TargetExternalView for all resources
*
* @return
*/
public Map<String, ExternalView> getTargetExternalViews() {
return _targetExternalViewCache.getExternalViewMap();
}
/**
* Retrieves the CustomizedView for all resources
* @return
*/
public Map<String, CustomizedView> getCustomizedView(String customizedStateType) {
if (_customizedViewCaches.containsKey(customizedStateType)) {
return _customizedViewCaches.get(customizedStateType).getCustomizedViewMap();
}
throw new HelixException(String.format(
"customizedStateType %s does not exist in customizedViewCaches.", customizedStateType));
}
/**
* Get map of current states in cluster. {InstanceName -> {SessionId -> {ResourceName ->
* CurrentState}}}
*
* @return
*/
public Map<String, Map<String, Map<String, CurrentState>>> getCurrentStatesMap() {
return _currentStateCache.getParticipantStatesMap();
}
public CurrentStateSnapshot getCurrentStateSnapshot() {
return _currentStateCache.getSnapshot();
}
}