blob: 7a9b5204089c77fb5ab0959be7fae9b430822074 [file] [log] [blame]
package org.apache.helix.controller.dataproviders;
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.PropertyKey;
import org.apache.helix.common.caches.AbstractDataCache;
import org.apache.helix.common.caches.CustomizedStateCache;
import org.apache.helix.common.caches.CustomizedViewCache;
import org.apache.helix.common.caches.PropertyCache;
import org.apache.helix.controller.LogUtil;
import org.apache.helix.controller.pipeline.Pipeline;
import org.apache.helix.controller.stages.MissingTopStateRecord;
import org.apache.helix.model.CustomizedState;
import org.apache.helix.model.CustomizedStateConfig;
import org.apache.helix.model.CustomizedView;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* Data provider for resource controller.
* This class will be moved to helix-resource-controller module in the future
public class ResourceControllerDataProvider extends BaseControllerDataProvider {
private static final Logger logger =
private static final String PIPELINE_NAME =;
// Resource control specific property caches
private final PropertyCache<ExternalView> _externalViewCache;
private final PropertyCache<ExternalView> _targetExternalViewCache;
private final CustomizedStateCache _customizedStateCache;
// a map from customized state type to customized view cache
private final Map<String, CustomizedViewCache> _customizedViewCacheMap;
// maintain a cache of bestPossible assignment across pipeline runs
// TODO: this is only for customRebalancer, remove it and merge it with _idealMappingCache.
private Map<String, ResourceAssignment> _resourceAssignmentCache;
// maintain a cache of idealmapping (preference list) for full-auto resource across pipeline runs
private Map<String, ZNRecord> _idealMappingCache;
// records for top state handoff
private Map<String, Map<String, MissingTopStateRecord>> _missingTopStateMap;
private Map<String, Map<String, String>> _lastTopStateLocationMap;
// Maintain a set of all ChangeTypes for change detection
private Set<HelixConstants.ChangeType> _refreshedChangeTypes;
private Set<String> _aggregationEnabledTypes = new HashSet<>();
// CrushEd strategy needs to have a stable partition list input. So this cached list persist the
// previous seen partition lists. If the members in a list are not modified, the old list will be
// used in the algorithm to ensure it calculates the same result.
// Refer to
// TODO: Sorting the partition list in CrushEd strategy instead of using the cache to reduce
// TODO: dependency. Note that this will change the cluster partition assignment and potentially
// TODO: cause shuffling. So it is not backward compatible.
private final Map<String, List<String>> _stablePartitionListCache = new HashMap<>();
public ResourceControllerDataProvider() {
public ResourceControllerDataProvider(String clusterName) {
super(clusterName, PIPELINE_NAME);
_externalViewCache = new PropertyCache<>(this, "ExternalView", new PropertyCache.PropertyCacheKeyFuncs<ExternalView>() {
public PropertyKey getRootKey(HelixDataAccessor accessor) {
return accessor.keyBuilder().externalViews();
public PropertyKey getObjPropertyKey(HelixDataAccessor accessor, String objName) {
return accessor.keyBuilder().externalView(objName);
public String getObjName(ExternalView obj) {
return obj.getResourceName();
}, true);
_targetExternalViewCache = new PropertyCache<>(this, "TargetExternalView", new PropertyCache.PropertyCacheKeyFuncs<ExternalView>() {
public PropertyKey getRootKey(HelixDataAccessor accessor) {
return accessor.keyBuilder().targetExternalViews();
public PropertyKey getObjPropertyKey(HelixDataAccessor accessor, String objName) {
return accessor.keyBuilder().targetExternalView(objName);
public String getObjName(ExternalView obj) {
return obj.getResourceName();
}, true);
_resourceAssignmentCache = new HashMap<>();
_idealMappingCache = new HashMap<>();
_missingTopStateMap = new HashMap<>();
_lastTopStateLocationMap = new HashMap<>();
_refreshedChangeTypes = ConcurrentHashMap.newKeySet();
_customizedStateCache = new CustomizedStateCache(this, _aggregationEnabledTypes);
_customizedViewCacheMap = new HashMap<>();
public synchronized void refresh(HelixDataAccessor accessor) {
long startTime = System.currentTimeMillis();
// Refresh base
Set<HelixConstants.ChangeType> changedTypes = super.doRefresh(accessor);
// Invalidate cached information if any of the important data has been refreshed
if (changedTypes.contains(HelixConstants.ChangeType.IDEAL_STATE)
|| changedTypes.contains(HelixConstants.ChangeType.LIVE_INSTANCE)
|| changedTypes.contains(HelixConstants.ChangeType.INSTANCE_CONFIG)
|| changedTypes.contains(HelixConstants.ChangeType.RESOURCE_CONFIG)
|| changedTypes.contains((HelixConstants.ChangeType.CLUSTER_CONFIG))) {
// Refresh resource controller specific property caches
_customizedStateCache.refresh(accessor, getLiveInstanceCache().getPropertyMap());
// This is part of the backward compatible workaround to fix
// TODO: remove the workaround once we are able to apply the simple fix without majorly
// TODO: impacting user's clusters.
LogUtil.logInfo(logger, getClusterEventId(), String.format(
"END: ResourceControllerDataProvider.refresh() for cluster %s, started at %d took %d for %s pipeline",
getClusterName(), startTime, System.currentTimeMillis() - startTime, getPipelineName()));
protected void dumpDebugInfo() {
if (logger.isTraceEnabled()) {
logger.trace("Cache content: " + toString());
private void refreshCustomizedStateConfig(final HelixDataAccessor accessor) {
if (_propertyDataChangedMap.get(HelixConstants.ChangeType.CUSTOMIZED_STATE_CONFIG)
.getAndSet(false)) {
CustomizedStateConfig customizedStateConfig =
if (customizedStateConfig != null) {
_aggregationEnabledTypes =
new HashSet<>(customizedStateConfig.getAggregationEnabledTypes());
} else {
LogUtil.logInfo(logger, getClusterEventId(), String
.format("Reloaded CustomizedStateConfig for cluster %s, %s pipeline.",
getClusterName(), getPipelineName()));
} else {
LogUtil.logInfo(logger, getClusterEventId(), String
.format("No customized state config change for %s cluster, %s pipeline",
getClusterName(), getPipelineName()));
private void refreshExternalViews(final HelixDataAccessor accessor) {
// As we are not listening on external view change, external view will be
// refreshed once during the cache's first refresh() call, or when full refresh is required
if (_propertyDataChangedMap.get(HelixConstants.ChangeType.EXTERNAL_VIEW).getAndSet(false)) {
synchronized (_externalViewCache) {
private void refreshTargetExternalViews(final HelixDataAccessor accessor) {
if (_propertyDataChangedMap.get(HelixConstants.ChangeType.TARGET_EXTERNAL_VIEW).getAndSet(false)) {
if (getClusterConfig() != null && getClusterConfig().isTargetExternalViewEnabled()) {
// Only refresh with data accessor for the first time
public void refreshCustomizedViewMap(final HelixDataAccessor accessor) {
// As we are not listening on customized view change, customized view will be
// refreshed once during the cache's first refresh() call, or when full refresh is required
if (_propertyDataChangedMap.get(HelixConstants.ChangeType.CUSTOMIZED_VIEW).getAndSet(false)) {
for (String stateType : _aggregationEnabledTypes) {
if (!_customizedViewCacheMap.containsKey(stateType)) {
CustomizedViewCache newCustomizedViewCache =
new CustomizedViewCache(getClusterName(), stateType);
_customizedViewCacheMap.put(stateType, newCustomizedViewCache);
Set<String> previousCachedStateTypes = new HashSet<>(_customizedViewCacheMap.keySet());
previousCachedStateTypes.removeAll(_aggregationEnabledTypes);"Remove customizedView for state: " + previousCachedStateTypes);
* Provides the customized state of the node for a given state type (resource -> customizedState)
* @param instanceName
* @param customizedStateType
* @return
public Map<String, CustomizedState> getCustomizedState(String instanceName,
String customizedStateType) {
return _customizedStateCache.getParticipantState(instanceName, customizedStateType);
public Set<String> getAggregationEnabledCustomizedStateTypes() {
return _aggregationEnabledTypes;
protected void setAggregationEnabledCustomizedStateTypes(Set<String> aggregationEnabledTypes) {
_aggregationEnabledTypes = aggregationEnabledTypes;
public ExternalView getTargetExternalView(String resourceName) {
return _targetExternalViewCache.getPropertyByName(resourceName);
public void updateTargetExternalView(String resourceName, ExternalView targetExternalView) {
* Get local cached external view map
* @return
public Map<String, ExternalView> getExternalViews() {
synchronized (_externalViewCache) {
return _externalViewCache.getPropertyMap();
* Update the cached external view map
* @param externalViews
public void updateExternalViews(List<ExternalView> externalViews) {
synchronized (_externalViewCache) {
for (ExternalView ev : externalViews) {
* Update the cached customized view map
* @param customizedViews
public void updateCustomizedViews(String customizedStateType,
List<CustomizedView> customizedViews) {
if (!_customizedViewCacheMap.containsKey(customizedStateType)) {
CustomizedViewCache customizedViewCache =
new CustomizedViewCache(getClusterName(), customizedStateType);
_customizedViewCacheMap.put(customizedStateType, customizedViewCache);
for (CustomizedView cv : customizedViews) {
* Get local cached customized view map
* @return
public Map<String, CustomizedViewCache> getCustomizedViewCacheMap() {
return _customizedViewCacheMap;
* Remove dead external views from map
* @param resourceNames
public void removeExternalViews(List<String> resourceNames) {
synchronized (_externalViewCache) {
for (String resourceName : resourceNames) {
* Remove dead customized views for certain state types from map
* @param stateTypeNames
public void removeCustomizedViewTypes(Set<String> stateTypeNames) {
for (String stateType : stateTypeNames) {
* Remove dead customized views for a certain state type from customized view cache
* @param stateType a specific customized state type
* @param resourceNames the names of resources whose customized view is stale
public void removeCustomizedViews(String stateType, List<String> resourceNames) {
if (!_customizedViewCacheMap.containsKey(stateType)) {
logger.warn(String.format("The customized state type : %s is not in the cache", stateType));
for (String resourceName : resourceNames) {
public Map<String, Map<String, MissingTopStateRecord>> getMissingTopStateMap() {
return _missingTopStateMap;
public Map<String, Map<String, String>> getLastTopStateLocationMap() {
return _lastTopStateLocationMap;
* Get cached resourceAssignment (bestPossible mapping) for a resource
* @param resource
* @return
public ResourceAssignment getCachedResourceAssignment(String resource) {
return _resourceAssignmentCache.get(resource);
* Get cached resourceAssignments
* @return
public Map<String, ResourceAssignment> getCachedResourceAssignments() {
return Collections.unmodifiableMap(_resourceAssignmentCache);
* Cache resourceAssignment (bestPossible mapping) for a resource
* @param resource
* @return
public void setCachedResourceAssignment(String resource, ResourceAssignment resourceAssignment) {
_resourceAssignmentCache.put(resource, resourceAssignment);
* Get cached resourceAssignment (ideal mapping) for a resource
* @param resource
* @return
public ZNRecord getCachedIdealMapping(String resource) {
return _idealMappingCache.get(resource);
* Invalidate the cached resourceAssignment (ideal mapping) for a resource
* @param resource
public void invalidateCachedIdealStateMapping(String resource) {
* Get cached idealmapping
* @return
public Map<String, ZNRecord> getCachedIdealMapping() {
return Collections.unmodifiableMap(_idealMappingCache);
* Cache resourceAssignment (ideal mapping) for a resource
* @param resource
* @return
public void setCachedIdealMapping(String resource, ZNRecord mapping) {
_idealMappingCache.put(resource, mapping);
* Return the set of all PropertyTypes that changed prior to this round of rebalance. The caller
* should clear this set by calling {@link #clearRefreshedChangeTypes()}.
* @return
public Set<HelixConstants.ChangeType> getRefreshedChangeTypes() {
return _refreshedChangeTypes;
* Clears the set of all PropertyTypes that changed. The caller will have consumed all change
* types by calling {@link #getRefreshedChangeTypes()}.
public void clearRefreshedChangeTypes() {
public void clearCachedResourceAssignments() {
public void clearMonitoringRecords() {
* This is for a backward compatible workaround to fix
* @param resourceName
* @return the cached stable partition list of the specified resource. If no such cached item,
* return null.
public List<String> getStablePartitionList(String resourceName) {
return _stablePartitionListCache.get(resourceName);
* Refresh the stable partition list cache items and remove the non-exist resources' cache.
* This is for a backward compatible workaround to fix
* @param idealStateMap
final void refreshStablePartitionList(Map<String, IdealState> idealStateMap) {
for (String resourceName : idealStateMap.keySet()) {
Set<String> newPartitionSet = idealStateMap.get(resourceName).getPartitionSet();
List<String> cachedPartitionList = getStablePartitionList(resourceName);
if (cachedPartitionList == null || cachedPartitionList.size() != newPartitionSet.size()
|| !newPartitionSet.containsAll(cachedPartitionList)) {
_stablePartitionListCache.put(resourceName, new ArrayList<>(newPartitionSet));