blob: 22cac7e9cb12b7843819a80682659f92e09debf3 [file] [log] [blame]
package org.apache.helix.controller.rebalancer.waged;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixRebalanceException;
import org.apache.helix.controller.changedetector.ResourceChangeDetector;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
import org.apache.helix.controller.rebalancer.waged.constraints.ConstraintBasedAlgorithmFactory;
import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
import org.apache.helix.controller.rebalancer.waged.model.OptimalAssignment;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Weight-Aware Globally-Even Distribute Rebalancer.
* @see <a
* href="https://github.com/apache/helix/wiki/Design-Proposal---Weight-Aware-Globally-Even-Distribute-Rebalancer">
* Design Document
* </a>
*/
public class WagedRebalancer {
private static final Logger LOG = LoggerFactory.getLogger(WagedRebalancer.class);
// When any of the following change happens, the rebalancer needs to do a global rebalance which
// contains 1. baseline recalculate, 2. partial rebalance that is based on the new baseline.
private static final Set<HelixConstants.ChangeType> GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES =
Collections
.unmodifiableSet(new HashSet<>(Arrays.asList(HelixConstants.ChangeType.RESOURCE_CONFIG,
HelixConstants.ChangeType.CLUSTER_CONFIG,
HelixConstants.ChangeType.INSTANCE_CONFIG)));
// The cluster change detector is a stateful object.
// Make it static to avoid unnecessary reinitialization.
private static final ThreadLocal<ResourceChangeDetector> CHANGE_DETECTOR_THREAD_LOCAL =
new ThreadLocal<>();
private final MappingCalculator<ResourceControllerDataProvider> _mappingCalculator;
// --------- The following fields are placeholders and need replacement. -----------//
// TODO Shall we make the metadata store a static threadlocal object as well to avoid
// reinitialization?
private final AssignmentMetadataStore _assignmentMetadataStore;
private final RebalanceAlgorithm _rebalanceAlgorithm;
// ------------------------------------------------------------------------------------//
public WagedRebalancer(HelixManager helixManager) {
this(
// TODO init the metadata store according to their requirement when integrate,
// or change to final static method if possible.
new AssignmentMetadataStore(helixManager),
// TODO parse the cluster setting
ConstraintBasedAlgorithmFactory.getInstance(),
// Use DelayedAutoRebalancer as the mapping calculator for the final assignment output.
// Mapping calculator will translate the best possible assignment into the applicable state
// mapping based on the current states.
// TODO abstract and separate the main mapping calculator logic from DelayedAutoRebalancer
new DelayedAutoRebalancer());
}
private WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
RebalanceAlgorithm algorithm, MappingCalculator mappingCalculator) {
_assignmentMetadataStore = assignmentMetadataStore;
_rebalanceAlgorithm = algorithm;
_mappingCalculator = mappingCalculator;
}
@VisibleForTesting
protected WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
RebalanceAlgorithm algorithm) {
this(assignmentMetadataStore, algorithm, new DelayedAutoRebalancer());
}
/**
* Compute the new IdealStates for all the input resources. The IdealStates include both new
* partition assignment (in the listFiles) and the new replica state mapping (in the mapFields).
* @param clusterData The Cluster status data provider.
* @param resourceMap A map containing all the rebalancing resources.
* @param currentStateOutput The present Current States of the resources.
* @return A map of the new IdealStates with the resource name as key.
*/
public Map<String, IdealState> computeNewIdealStates(ResourceControllerDataProvider clusterData,
Map<String, Resource> resourceMap, final CurrentStateOutput currentStateOutput)
throws HelixRebalanceException {
LOG.info("Start computing new ideal states for resources: {}", resourceMap.keySet().toString());
// Find the compatible resources: 1. FULL_AUTO 2. Configured to use the WAGED rebalancer
resourceMap = resourceMap.entrySet().stream().filter(resourceEntry -> {
IdealState is = clusterData.getIdealState(resourceEntry.getKey());
return is != null && is.getRebalanceMode().equals(IdealState.RebalanceMode.FULL_AUTO)
&& getClass().getName().equals(is.getRebalancerClassName());
}).collect(Collectors.toMap(resourceEntry -> resourceEntry.getKey(),
resourceEntry -> resourceEntry.getValue()));
if (resourceMap.isEmpty()) {
LOG.warn("There is no valid resource to be rebalanced by {}",
this.getClass().getSimpleName());
return Collections.emptyMap();
} else {
LOG.info("Valid resources that will be rebalanced by {}: {}", this.getClass().getSimpleName(),
resourceMap.keySet().toString());
}
// Calculate the target assignment based on the current cluster status.
Map<String, IdealState> newIdealStates = computeBestPossibleStates(clusterData, resourceMap);
// Construct the new best possible states according to the current state and target assignment.
// Note that the new ideal state might be an intermediate state between the current state and
// the target assignment.
for (IdealState is : newIdealStates.values()) {
String resourceName = is.getResourceName();
// Adjust the states according to the current state.
ResourceAssignment finalAssignment = _mappingCalculator.computeBestPossiblePartitionState(
clusterData, is, resourceMap.get(resourceName), currentStateOutput);
// Clean up the state mapping fields. Use the final assignment that is calculated by the
// mapping calculator to replace them.
is.getRecord().getMapFields().clear();
for (Partition partition : finalAssignment.getMappedPartitions()) {
Map<String, String> newStateMap = finalAssignment.getReplicaMap(partition);
// if the final states cannot be generated, override the best possible state with empty map.
is.setInstanceStateMap(partition.getPartitionName(),
newStateMap == null ? Collections.emptyMap() : newStateMap);
}
}
LOG.info("Finish computing new ideal states for resources: {}",
resourceMap.keySet().toString());
return newIdealStates;
}
// Coordinate baseline recalculation and partial rebalance according to the cluster changes.
private Map<String, IdealState> computeBestPossibleStates(
ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap)
throws HelixRebalanceException {
getChangeDetector().updateSnapshots(clusterData);
// Get all the modified and new items' information
Map<HelixConstants.ChangeType, Set<String>> clusterChanges =
getChangeDetector().getChangeTypes().stream()
.collect(Collectors.toMap(changeType -> changeType, changeType -> {
Set<String> itemKeys = new HashSet<>();
itemKeys.addAll(getChangeDetector().getAdditionsByType(changeType));
itemKeys.addAll(getChangeDetector().getChangesByType(changeType));
return itemKeys;
}));
if (clusterChanges.keySet().stream()
.anyMatch(changeType -> GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES.contains(changeType))) {
refreshBaseline(clusterData, clusterChanges, resourceMap);
// Inject a cluster config change for large scale partial rebalance once the baseline changed.
clusterChanges.putIfAbsent(HelixConstants.ChangeType.CLUSTER_CONFIG, Collections.emptySet());
}
Map<String, ResourceAssignment> newAssignment =
partialRebalance(clusterData, clusterChanges, resourceMap);
// Convert the assignments into IdealState for the following state mapping calculation.
Map<String, IdealState> finalIdealState = new HashMap<>();
for (String resourceName : newAssignment.keySet()) {
IdealState newIdeaState;
try {
IdealState currentIdealState = clusterData.getIdealState(resourceName);
Map<String, Integer> statePriorityMap = clusterData
.getStateModelDef(currentIdealState.getStateModelDefRef()).getStatePriorityMap();
// Create a new IdealState instance contains the new calculated assignment in the preference
// list.
newIdeaState = generateIdealStateWithAssignment(resourceName, currentIdealState,
newAssignment.get(resourceName), statePriorityMap);
} catch (Exception ex) {
throw new HelixRebalanceException(
"Fail to calculate the new IdealState for resource: " + resourceName,
HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
}
finalIdealState.put(resourceName, newIdeaState);
}
return finalIdealState;
}
// TODO make the Baseline calculation async if complicated algorithm is used for the Baseline
private void refreshBaseline(ResourceControllerDataProvider clusterData,
Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Map<String, Resource> resourceMap)
throws HelixRebalanceException {
// For baseline calculation
// 1. Ignore node status (disable/offline).
// 2. Use the baseline as the previous best possible assignment since there is no "baseline" for
// the baseline.
LOG.info("Start calculating the new baseline.");
Map<String, ResourceAssignment> currentBaseline;
try {
currentBaseline = _assignmentMetadataStore.getBaseline();
} catch (Exception ex) {
throw new HelixRebalanceException("Failed to get the current baseline assignment.",
HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
}
Map<String, ResourceAssignment> baseline = calculateAssignment(clusterData, clusterChanges,
resourceMap, clusterData.getAllInstances(), Collections.emptyMap(), currentBaseline);
try {
_assignmentMetadataStore.persistBaseline(baseline);
} catch (Exception ex) {
throw new HelixRebalanceException("Failed to persist the new baseline assignment.",
HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
}
LOG.info("Finish calculating the new baseline.");
}
private Map<String, ResourceAssignment> partialRebalance(
ResourceControllerDataProvider clusterData,
Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Map<String, Resource> resourceMap)
throws HelixRebalanceException {
LOG.info("Start calculating the new best possible assignment.");
Set<String> activeInstances = clusterData.getEnabledLiveInstances();
Map<String, ResourceAssignment> baseline;
Map<String, ResourceAssignment> prevBestPossibleAssignment;
try {
baseline = _assignmentMetadataStore.getBaseline();
prevBestPossibleAssignment = _assignmentMetadataStore.getBestPossibleAssignment();
} catch (Exception ex) {
throw new HelixRebalanceException("Failed to get the persisted assignment records.",
HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
}
Map<String, ResourceAssignment> newAssignment = calculateAssignment(clusterData, clusterChanges,
resourceMap, activeInstances, baseline, prevBestPossibleAssignment);
try {
// TODO Test to confirm if persisting the final assignment (with final partition states)
// would be a better option.
_assignmentMetadataStore.persistBestPossibleAssignment(newAssignment);
} catch (Exception ex) {
throw new HelixRebalanceException("Failed to persist the new best possible assignment.",
HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
}
LOG.info("Finish calculating the new best possible assignment.");
return newAssignment;
}
/**
* Generate the cluster model based on the input and calculate the optimal assignment.
* @param clusterData the cluster data cache.
* @param clusterChanges the detected cluster changes.
* @param resourceMap the rebalancing resources.
* @param activeNodes the alive and enabled nodes.
* @param baseline the baseline assignment for the algorithm as a reference.
* @param prevBestPossibleAssignment the previous best possible assignment for the algorithm as a
* reference.
* @return the new optimal assignment for the resources.
*/
private Map<String, ResourceAssignment> calculateAssignment(
ResourceControllerDataProvider clusterData,
Map<HelixConstants.ChangeType, Set<String>> clusterChanges, Map<String, Resource> resourceMap,
Set<String> activeNodes, Map<String, ResourceAssignment> baseline,
Map<String, ResourceAssignment> prevBestPossibleAssignment) throws HelixRebalanceException {
long startTime = System.currentTimeMillis();
LOG.info("Start calculating for an assignment");
ClusterModel clusterModel;
try {
clusterModel = ClusterModelProvider.generateClusterModel(clusterData, resourceMap,
activeNodes, clusterChanges, baseline, prevBestPossibleAssignment);
} catch (Exception ex) {
throw new HelixRebalanceException("Failed to generate cluster model.",
HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
}
OptimalAssignment optimalAssignment = _rebalanceAlgorithm.calculate(clusterModel);
Map<String, ResourceAssignment> newAssignment =
optimalAssignment.getOptimalResourceAssignment();
LOG.info("Finish calculating. Time spent: {}ms.", System.currentTimeMillis() - startTime);
return newAssignment;
}
private ResourceChangeDetector getChangeDetector() {
if (CHANGE_DETECTOR_THREAD_LOCAL.get() == null) {
CHANGE_DETECTOR_THREAD_LOCAL.set(new ResourceChangeDetector());
}
return CHANGE_DETECTOR_THREAD_LOCAL.get();
}
// Generate a new IdealState based on the input newAssignment.
// The assignment will be propagate to the preference lists.
// Note that we will recalculate the states based on the current state, so there is no need to
// update the mapping fields in the IdealState output.
private IdealState generateIdealStateWithAssignment(String resourceName,
IdealState currentIdealState, ResourceAssignment newAssignment,
Map<String, Integer> statePriorityMap) {
IdealState newIdealState = new IdealState(resourceName);
// Copy the simple fields
newIdealState.getRecord().setSimpleFields(currentIdealState.getRecord().getSimpleFields());
// Sort the preference list according to state priority.
newIdealState.setPreferenceLists(getPreferenceLists(newAssignment, statePriorityMap));
// Note the state mapping in the new assignment won't be directly propagate to the map fields.
// The rebalancer will calculate for the final state mapping considering the current states.
return newIdealState;
}
// Generate the preference lists from the state mapping based on state priority.
private Map<String, List<String>> getPreferenceLists(ResourceAssignment newAssignment,
Map<String, Integer> statePriorityMap) {
Map<String, List<String>> preferenceList = new HashMap<>();
for (Partition partition : newAssignment.getMappedPartitions()) {
List<String> nodes = new ArrayList<>(newAssignment.getReplicaMap(partition).keySet());
// To ensure backward compatibility, sort the preference list according to state priority.
nodes.sort((node1, node2) -> {
int statePriority1 =
statePriorityMap.get(newAssignment.getReplicaMap(partition).get(node1));
int statePriority2 =
statePriorityMap.get(newAssignment.getReplicaMap(partition).get(node2));
if (statePriority1 == statePriority2) {
return node1.compareTo(node2);
} else {
return statePriority1 - statePriority2;
}
});
preferenceList.put(partition.getPartitionName(), nodes);
}
return preferenceList;
}
}