blob: d55a6eae83e2ec8d7c4f2fca9ae2048d8b998a14 [file] [log] [blame]
package org.apache.helix.controller.rebalancer;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.apache.helix.HelixDefinedState;
import org.apache.helix.api.config.StateTransitionThrottleConfig;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.constraint.MonitoredAbnormalResolver;
import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This is the Full-Auto Rebalancer that is featured with delayed partition movement.
*/
public class DelayedAutoRebalancer extends AbstractRebalancer<ResourceControllerDataProvider> {
private static final Logger LOG = LoggerFactory.getLogger(DelayedAutoRebalancer.class);
@Override
public IdealState computeNewIdealState(String resourceName,
IdealState currentIdealState, CurrentStateOutput currentStateOutput,
ResourceControllerDataProvider clusterData) {
IdealState cachedIdealState = getCachedIdealState(resourceName, clusterData);
if (cachedIdealState != null) {
LOG.debug("Use cached IdealState for {}", resourceName);
return cachedIdealState;
}
LOG.info("Computing IdealState for " + resourceName);
List<String> allPartitions = getStablePartitionList(clusterData, currentIdealState);
if (allPartitions.size() == 0) {
LOG.info("Partition count is 0 for resource " + resourceName
+ ", stop calculate ideal mapping for the resource.");
return generateNewIdealState(resourceName, currentIdealState,
emptyMapping(currentIdealState));
}
Map<String, List<String>> userDefinedPreferenceList = new HashMap<>();
ClusterConfig clusterConfig = clusterData.getClusterConfig();
ResourceConfig resourceConfig = clusterData.getResourceConfig(resourceName);
boolean delayRebalanceEnabled =
DelayedRebalanceUtil.isDelayRebalanceEnabled(currentIdealState, clusterConfig);
if (resourceConfig != null) {
userDefinedPreferenceList = resourceConfig.getPreferenceLists();
if (!userDefinedPreferenceList.isEmpty()) {
LOG.info("Using user defined preference list for partitions: " + userDefinedPreferenceList
.keySet());
}
}
Set<String> assignableLiveEnabledNodes;
Set<String> assignableNodes;
String instanceTag = currentIdealState.getInstanceGroupTag();
if (instanceTag != null) {
assignableLiveEnabledNodes = clusterData.getEnabledLiveInstancesWithTag(instanceTag);
assignableNodes = clusterData.getAssignableInstancesWithTag(instanceTag);
if (LOG.isInfoEnabled()) {
LOG.info(String.format(
"Found the following participants with tag %s for %s: "
+ "instances: %s, liveEnabledInstances: %s",
currentIdealState.getInstanceGroupTag(), resourceName, assignableNodes, assignableLiveEnabledNodes));
}
} else {
assignableLiveEnabledNodes = clusterData.getEnabledLiveInstances();
assignableNodes = clusterData.getAssignableInstances();
}
long delay = DelayedRebalanceUtil.getRebalanceDelay(currentIdealState, clusterConfig);
Set<String> activeNodes =
DelayedRebalanceUtil.getActiveNodes(assignableNodes, currentIdealState, assignableLiveEnabledNodes,
clusterData.getInstanceOfflineTimeMap(),
clusterData.getAssignableLiveInstances().keySet(),
clusterData.getAssignableInstanceConfigMap(), delay, clusterConfig);
if (delayRebalanceEnabled) {
Set<String> offlineOrDisabledInstances = new HashSet<>(activeNodes);
offlineOrDisabledInstances.removeAll(assignableLiveEnabledNodes);
DelayedRebalanceUtil.setRebalanceScheduler(currentIdealState.getResourceName(), true,
offlineOrDisabledInstances, clusterData.getInstanceOfflineTimeMap(),
clusterData.getAssignableLiveInstances().keySet(),
clusterData.getAssignableInstanceConfigMap(), delay,
clusterConfig, _manager);
}
if (assignableNodes.isEmpty() || activeNodes.isEmpty()) {
LOG.error(String.format(
"No instances or active instances available for resource %s, "
+ "allInstances: %s, liveInstances: %s, activeInstances: %s", resourceName, assignableNodes,
assignableLiveEnabledNodes, activeNodes));
return generateNewIdealState(resourceName, currentIdealState,
emptyMapping(currentIdealState));
}
StateModelDefinition stateModelDef =
clusterData.getStateModelDef(currentIdealState.getStateModelDefRef());
int replicaCount = currentIdealState.getReplicaCount(activeNodes.size());
if (replicaCount == 0) {
LOG.error("Replica count is 0 for resource " + resourceName
+ ", stop calculate ideal mapping for the resource.");
return generateNewIdealState(resourceName, currentIdealState,
emptyMapping(currentIdealState));
}
LinkedHashMap<String, Integer> stateCountMap =
stateModelDef.getStateCountMap(activeNodes.size(), replicaCount);
Map<String, Map<String, String>> currentMapping =
currentMapping(currentStateOutput, resourceName, allPartitions, stateCountMap);
int maxPartition = currentIdealState.getMaxPartitionsPerInstance();
_rebalanceStrategy =
getRebalanceStrategy(currentIdealState.getRebalanceStrategy(), allPartitions, resourceName,
stateCountMap, maxPartition);
List<String> allNodeList = new ArrayList<>(assignableNodes);
List<String> liveEnabledAssignableNodeList = new ArrayList<>(assignableLiveEnabledNodes);
// sort node lists to ensure consistent preferred assignments
Collections.sort(allNodeList);
Collections.sort(liveEnabledAssignableNodeList);
ZNRecord newIdealMapping =
_rebalanceStrategy.computePartitionAssignment(allNodeList, liveEnabledAssignableNodeList,
currentMapping, clusterData);
ZNRecord finalMapping = newIdealMapping;
if (DelayedRebalanceUtil.isDelayRebalanceEnabled(currentIdealState, clusterConfig)
|| liveEnabledAssignableNodeList.size() != activeNodes.size()) {
List<String> activeNodeList = new ArrayList<>(activeNodes);
Collections.sort(activeNodeList);
int minActiveReplicas = DelayedRebalanceUtil.getMinActiveReplica(
ResourceConfig.mergeIdealStateWithResourceConfig(resourceConfig, currentIdealState),
currentIdealState, replicaCount);
ZNRecord newActiveMapping =
_rebalanceStrategy.computePartitionAssignment(allNodeList, activeNodeList, currentMapping,
clusterData);
finalMapping = getFinalDelayedMapping(currentIdealState, newIdealMapping, newActiveMapping,
assignableLiveEnabledNodes, replicaCount, minActiveReplicas);
}
finalMapping.getListFields().putAll(userDefinedPreferenceList);
LOG.debug("currentMapping: {}", currentMapping);
LOG.debug("stateCountMap: {}", stateCountMap);
LOG.debug("assignableLiveEnabledNodes: {}", assignableLiveEnabledNodes);
LOG.debug("activeNodes: {}", activeNodes);
LOG.debug("assignableNodes: {}", assignableNodes);
LOG.debug("maxPartition: {}", maxPartition);
LOG.debug("newIdealMapping: {}", newIdealMapping);
LOG.debug("finalMapping: {}", finalMapping);
IdealState idealState = generateNewIdealState(resourceName, currentIdealState, finalMapping);
clusterData.setCachedIdealMapping(resourceName, idealState.getRecord());
return idealState;
}
private IdealState generateNewIdealState(String resourceName, IdealState currentIdealState,
ZNRecord newMapping) {
IdealState newIdealState = new IdealState(resourceName);
newIdealState.getRecord().setSimpleFields(currentIdealState.getRecord().getSimpleFields());
newIdealState.setRebalanceMode(currentIdealState.getRebalanceMode());
newIdealState.getRecord().setListFields(newMapping.getListFields());
return newIdealState;
}
private ZNRecord getFinalDelayedMapping(IdealState idealState, ZNRecord newIdealMapping,
ZNRecord newActiveMapping, Set<String> liveInstances, int numReplica, int minActiveReplica) {
if (minActiveReplica >= numReplica) {
return newIdealMapping;
}
ZNRecord finalMapping = new ZNRecord(idealState.getResourceName());
finalMapping.setListFields(DelayedRebalanceUtil
.getFinalDelayedMapping(newIdealMapping.getListFields(), newActiveMapping.getListFields(),
liveInstances, minActiveReplica));
return finalMapping;
}
private ZNRecord emptyMapping(IdealState idealState) {
ZNRecord emptyMapping = new ZNRecord(idealState.getResourceName());
for (String partition : idealState.getPartitionSet()) {
emptyMapping.setListField(partition, new ArrayList<String>());
}
return emptyMapping;
}
/**
* Compute the best state for all partitions.
* This is the default implementation, subclasses should re-implement
* this method if its logic to generate bestpossible map for each partition is different from the default one here.
*
* @param cache
* @param idealState
* @param resource
* @param currentStateOutput Provides the current state and pending state transitions for all partitions
* @return
*/
@Override
public ResourceAssignment computeBestPossiblePartitionState(ResourceControllerDataProvider cache,
IdealState idealState, Resource resource, CurrentStateOutput currentStateOutput) {
if (LOG.isDebugEnabled()) {
LOG.debug("Processing resource:" + resource.getResourceName());
}
Set<String> allNodes = cache.getEnabledInstances();
Set<String> liveNodes = cache.getAssignableLiveInstances().keySet();
ClusterConfig clusterConfig = cache.getClusterConfig();
long delayTime = DelayedRebalanceUtil.getRebalanceDelay(idealState, clusterConfig);
Set<String> activeNodes = DelayedRebalanceUtil
.getActiveNodes(allNodes, idealState, liveNodes, cache.getInstanceOfflineTimeMap(),
cache.getAssignableLiveInstances().keySet(), cache.getAssignableInstanceConfigMap(),
delayTime,
clusterConfig);
String stateModelDefName = idealState.getStateModelDefRef();
StateModelDefinition stateModelDef = cache.getStateModelDef(stateModelDefName);
ResourceAssignment partitionMapping = new ResourceAssignment(resource.getResourceName());
for (Partition partition : resource.getPartitions()) {
Set<String> disabledInstancesForPartition =
cache.getDisabledInstancesForPartition(resource.getResourceName(), partition.toString());
List<String> preferenceList = getPreferenceList(partition, idealState, activeNodes);
Map<String, String> bestStateForPartition =
// We use cache.getLiveInstances().keySet() to make sure we gracefully handle n -> n + 1 replicas if possible
// when the one of the current nodes holding the replica is no longer considered assignable. (ex: EVACUATE)
computeBestPossibleStateForPartition(cache.getLiveInstances().keySet(),
stateModelDef, preferenceList,
currentStateOutput, disabledInstancesForPartition, idealState, clusterConfig,
partition, cache.getAbnormalStateResolver(stateModelDefName), cache);
partitionMapping.addReplicaMap(partition, bestStateForPartition);
}
if (LOG.isDebugEnabled()) {
LOG.debug("Best possible mapping for resource " + resource.getResourceName() + ": "
+ partitionMapping);
}
return partitionMapping;
}
@Override
protected Map<String, String> computeBestPossibleStateForPartition(Set<String> liveInstances,
StateModelDefinition stateModelDef, List<String> preferenceList,
CurrentStateOutput currentStateOutput, Set<String> disabledInstancesForPartition,
IdealState idealState, ClusterConfig clusterConfig, Partition partition,
MonitoredAbnormalResolver monitoredResolver) {
return computeBestPossibleStateForPartition(liveInstances, stateModelDef, preferenceList,
currentStateOutput, disabledInstancesForPartition, idealState, clusterConfig, partition,
monitoredResolver, null);
}
@Override
protected Map<String, String> computeBestPossibleStateForPartition(Set<String> liveInstances,
StateModelDefinition stateModelDef, List<String> preferenceList,
CurrentStateOutput currentStateOutput, Set<String> disabledInstancesForPartition,
IdealState idealState, ClusterConfig clusterConfig, Partition partition,
MonitoredAbnormalResolver monitoredResolver, ResourceControllerDataProvider cache) {
Optional<Map<String, String>> optionalOverwrittenStates =
computeStatesOverwriteForPartition(stateModelDef, preferenceList, currentStateOutput,
idealState, partition, monitoredResolver);
if (optionalOverwrittenStates.isPresent()) {
return optionalOverwrittenStates.get();
}
Map<String, String> currentStateMap = new HashMap<>(
currentStateOutput.getCurrentStateMap(idealState.getResourceName(), partition));
// Instances not in preference list but still have active replica, retain to avoid zero replica during movement
List<String> currentInstances = new ArrayList<>(currentStateMap.keySet());
Collections.sort(currentInstances);
Map<String, String> pendingStates =
new HashMap<>(currentStateOutput.getPendingStateMap(idealState.getResourceName(), partition));
for (String instance : pendingStates.keySet()) {
if (!currentStateMap.containsKey(instance)) {
currentStateMap.put(instance, stateModelDef.getInitialState());
currentInstances.add(instance);
}
}
Set<String> instancesToDrop = new HashSet<>();
Iterator<String> it = currentInstances.iterator();
while (it.hasNext()) {
String instance = it.next();
String state = currentStateMap.get(instance);
// TODO: This may never be a possible case, figure out if we can safely remove this.
if (state == null) {
it.remove();
instancesToDrop.add(instance); // These instances should be set to DROPPED after we get bestPossibleStateMap;
}
}
// Sort the instancesToMove by their current partition state.
// Reason: because the states are assigned to instances in the order appeared in preferenceList, if we have
// [node1:Slave, node2:Master], we want to keep it that way, instead of assigning Master to node1.
if (preferenceList == null) {
preferenceList = Collections.emptyList();
}
boolean isPreferenceListEmpty = preferenceList.isEmpty();
int numExtraReplicas = getNumExtraReplicas(clusterConfig);
// TODO : Keep the behavior consistent with existing state count, change back to read from idealstate
// replicas
int numReplicas = preferenceList.size();
List<String> instanceToAdd = new ArrayList<>(preferenceList);
instanceToAdd.removeAll(currentInstances);
List<String> combinedPreferenceList = new ArrayList<>();
if (currentInstances.size() <= numReplicas
&& numReplicas + numExtraReplicas - currentInstances.size() > 0) {
int subListSize = numReplicas + numExtraReplicas - currentInstances.size();
combinedPreferenceList.addAll(instanceToAdd
.subList(0, Math.min(subListSize, instanceToAdd.size())));
}
// Make all initial state instance not in preference list to be dropped.
Map<String, String> currentMapWithPreferenceList = new HashMap<>(currentStateMap);
currentMapWithPreferenceList.keySet().retainAll(preferenceList);
combinedPreferenceList.addAll(currentInstances);
combinedPreferenceList.sort(new PreferenceListNodeComparator(currentStateMap, stateModelDef, preferenceList));
// if preference list is not empty, and we do have new intanceToAdd, we
// should check if it has capacity to hold the partition.
boolean isWaged = WagedValidationUtil.isWagedEnabled(idealState) && cache != null;
if (isWaged && !isPreferenceListEmpty && !instanceToAdd.isEmpty()) {
// check instanceToAdd instance appears in combinedPreferenceList
for (String instance : instanceToAdd) {
if (combinedPreferenceList.contains(instance)) {
if (!cache.checkAndReduceCapacity(instance, idealState.getResourceName(),
partition.getPartitionName())) {
// if instanceToAdd instance has no capacity to hold the partition, we should
// remove it from combinedPreferenceList
LOG.info("Instance: {} has no capacity to hold resource: {}, partition: {}, removing "
+ "it from combinedPreferenceList.", instance, idealState.getResourceName(),
partition.getPartitionName());
combinedPreferenceList.remove(instance);
}
}
}
}
// Assign states to instances with the combined preference list.
Map<String, String> bestPossibleStateMap =
computeBestPossibleMap(combinedPreferenceList, stateModelDef, currentStateMap,
liveInstances, disabledInstancesForPartition);
for (String instance : instancesToDrop) {
bestPossibleStateMap.put(instance, HelixDefinedState.DROPPED.name());
}
// If the load-balance finishes (all replica are migrated to new instances),
// we should drop all partitions from previous assigned instances.
if (!currentMapWithPreferenceList.containsValue(HelixDefinedState.ERROR.name())
&& bestPossibleStateMap.size() > numReplicas && readyToDrop(currentStateMap,
bestPossibleStateMap, preferenceList, combinedPreferenceList)) {
for (int i = 0; i < combinedPreferenceList.size() - numReplicas; i++) {
String instanceToDrop = combinedPreferenceList.get(combinedPreferenceList.size() - i - 1);
bestPossibleStateMap.put(instanceToDrop, HelixDefinedState.DROPPED.name());
}
}
// TODO: This may not be necessary, all of the instances bestPossibleStateMap should be set to ERROR
// if necessary in the call to computeBestPossibleMap.
// Adding ERROR replica mapping to best possible
// ERROR assignment should be mutual excluded from DROPPED assignment because
// once there is an ERROR replica in the mapping, bestPossibleStateMap.size() > numReplicas prevents
// code entering the DROPPING stage.
for (String instance : combinedPreferenceList) {
if (currentStateMap.containsKey(instance) && currentStateMap.get(instance)
.equals(HelixDefinedState.ERROR.name())) {
bestPossibleStateMap.put(instance, HelixDefinedState.ERROR.name());
}
}
return bestPossibleStateMap;
}
private boolean readyToDrop(Map<String, String> currentStateMap,
Map<String, String> bestPossibleMap, List<String> preferenceList,
List<String> combinedPreferenceList) {
Set<String> preferenceWithActiveState = new HashSet<>(preferenceList);
preferenceWithActiveState.retainAll(combinedPreferenceList);
for (String instance : preferenceWithActiveState) {
if (!currentStateMap.containsKey(instance) || !currentStateMap.get(instance)
.equals(bestPossibleMap.get(instance))) {
return false;
}
}
return true;
}
private int getNumExtraReplicas(ClusterConfig clusterConfig) {
int numExtraReplicas = StateTransitionThrottleConfig.DEFAULT_NUM_TRANSIT_REPLICAS;
List<StateTransitionThrottleConfig> stateTransitionThrottleConfigs =
clusterConfig.getStateTransitionThrottleConfigs();
for (StateTransitionThrottleConfig throttleConfig : stateTransitionThrottleConfigs) {
if (StateTransitionThrottleConfig.ThrottleScope.PARTITION
.equals(throttleConfig.getThrottleScope())
&& StateTransitionThrottleConfig.RebalanceType.LOAD_BALANCE
.equals(throttleConfig.getRebalanceType())) {
numExtraReplicas =
(int) Math.min(numExtraReplicas, throttleConfig.getMaxPartitionInTransition());
}
}
return numExtraReplicas;
}
}