blob: 8a21bbb74fa4da31881be3bd790ca87fa2d97435 [file] [log] [blame]
package org.apache.helix.controller.rebalancer.waged;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.helix.HelixConstants;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixRebalanceException;
import org.apache.helix.controller.changedetector.ResourceChangeDetector;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
import org.apache.helix.controller.rebalancer.StatefulRebalancer;
import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
import org.apache.helix.controller.rebalancer.waged.constraints.ConstraintBasedAlgorithmFactory;
import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
import org.apache.helix.controller.rebalancer.waged.model.OptimalAssignment;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.monitoring.metrics.MetricCollector;
import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector;
import org.apache.helix.monitoring.metrics.implementation.BaselineDivergenceGauge;
import org.apache.helix.monitoring.metrics.model.CountMetric;
import org.apache.helix.monitoring.metrics.model.LatencyMetric;
import org.apache.helix.util.RebalanceUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Weight-Aware Globally-Even Distribute Rebalancer.
* @see <a
* href="https://github.com/apache/helix/wiki/Design-Proposal---Weight-Aware-Globally-Even-Distribute-Rebalancer">
* Design Document
* </a>
*/
public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDataProvider> {
private static final Logger LOG = LoggerFactory.getLogger(WagedRebalancer.class);
// When any of the following change happens, the rebalancer needs to do a global rebalance which
// contains 1. baseline recalculate, 2. partial rebalance that is based on the new baseline.
private static final Set<HelixConstants.ChangeType> GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES =
ImmutableSet
.of(HelixConstants.ChangeType.RESOURCE_CONFIG, HelixConstants.ChangeType.IDEAL_STATE,
HelixConstants.ChangeType.CLUSTER_CONFIG, HelixConstants.ChangeType.INSTANCE_CONFIG);
// To identify if the preference has been configured or not.
private static final Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer>
NOT_CONFIGURED_PREFERENCE = ImmutableMap
.of(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS, -1,
ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT, -1);
// The default algorithm to use when there is no preference configured.
private static final RebalanceAlgorithm DEFAULT_REBALANCE_ALGORITHM =
ConstraintBasedAlgorithmFactory
.getInstance(ClusterConfig.DEFAULT_GLOBAL_REBALANCE_PREFERENCE);
// To calculate the baseline asynchronously
private final ExecutorService _baselineCalculateExecutor;
private final ResourceChangeDetector _changeDetector;
private final HelixManager _manager;
private final MappingCalculator<ResourceControllerDataProvider> _mappingCalculator;
private final AssignmentMetadataStore _assignmentMetadataStore;
private final MetricCollector _metricCollector;
private final CountMetric _rebalanceFailureCount;
private final CountMetric _baselineCalcCounter;
private final LatencyMetric _baselineCalcLatency;
private final LatencyMetric _writeLatency;
private final CountMetric _partialRebalanceCounter;
private final LatencyMetric _partialRebalanceLatency;
private final LatencyMetric _stateReadLatency;
private final BaselineDivergenceGauge _baselineDivergenceGauge;
private boolean _asyncGlobalRebalanceEnabled;
// Note, the rebalance algorithm field is mutable so it should not be directly referred except for
// the public method computeNewIdealStates.
private RebalanceAlgorithm _rebalanceAlgorithm;
private Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> _preference =
NOT_CONFIGURED_PREFERENCE;
private static AssignmentMetadataStore constructAssignmentStore(String metadataStoreAddrs,
String clusterName) {
if (metadataStoreAddrs != null && clusterName != null) {
return new AssignmentMetadataStore(metadataStoreAddrs, clusterName);
}
return null;
}
public WagedRebalancer(HelixManager helixManager) {
this(helixManager == null ? null
: constructAssignmentStore(helixManager.getMetadataStoreConnectionString(),
helixManager.getClusterName()),
DEFAULT_REBALANCE_ALGORITHM,
// Use DelayedAutoRebalancer as the mapping calculator for the final assignment output.
// Mapping calculator will translate the best possible assignment into the applicable state
// mapping based on the current states.
// TODO abstract and separate the main mapping calculator logic from DelayedAutoRebalancer
new DelayedAutoRebalancer(),
// Helix Manager is required for the rebalancer scheduler
helixManager,
// If HelixManager is null, we just pass in a non-functioning WagedRebalancerMetricCollector
// that will not be registered to MBean.
// This is to handle two cases: 1. HelixManager is null for non-testing cases. In this case,
// WagedRebalancer will not read/write to metadata store and just use CurrentState-based
// rebalancing. 2. Tests that require instrumenting the rebalancer for verifying whether the
// cluster has converged.
helixManager == null ? new WagedRebalancerMetricCollector()
: new WagedRebalancerMetricCollector(helixManager.getClusterName()),
ClusterConfig.DEFAULT_GLOBAL_REBALANCE_ASYNC_MODE_ENABLED);
_preference = ImmutableMap.copyOf(ClusterConfig.DEFAULT_GLOBAL_REBALANCE_PREFERENCE);
}
/**
* This constructor will use null for HelixManager. With null HelixManager, the rebalancer will
* not schedule for a future delayed rebalance.
* @param assignmentMetadataStore
* @param algorithm
* @param metricCollectorOptional
*/
protected WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
RebalanceAlgorithm algorithm, Optional<MetricCollector> metricCollectorOptional) {
this(assignmentMetadataStore, algorithm, new DelayedAutoRebalancer(), null,
// If metricCollector is not provided, instantiate a version that does not register metrics
// in order to allow rebalancer to proceed
metricCollectorOptional.orElse(new WagedRebalancerMetricCollector()),
false);
}
private WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
RebalanceAlgorithm algorithm, MappingCalculator mappingCalculator, HelixManager manager,
MetricCollector metricCollector, boolean isAsyncGlobalRebalanceEnabled) {
if (assignmentMetadataStore == null) {
LOG.warn("Assignment Metadata Store is not configured properly."
+ " The rebalancer will not access the assignment store during the rebalance.");
}
_assignmentMetadataStore = assignmentMetadataStore;
_rebalanceAlgorithm = algorithm;
_mappingCalculator = mappingCalculator;
if (manager == null) {
LOG.warn("HelixManager is not provided. The rebalancer is not going to schedule for a future "
+ "rebalance even when delayed rebalance is enabled.");
}
_manager = manager;
_metricCollector = metricCollector;
_rebalanceFailureCount = _metricCollector.getMetric(
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceFailureCounter.name(),
CountMetric.class);
_baselineCalcCounter = _metricCollector.getMetric(
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.GlobalBaselineCalcCounter.name(),
CountMetric.class);
_baselineCalcLatency = _metricCollector.getMetric(
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.GlobalBaselineCalcLatencyGauge
.name(),
LatencyMetric.class);
_partialRebalanceCounter = _metricCollector.getMetric(
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.PartialRebalanceCounter.name(),
CountMetric.class);
_partialRebalanceLatency = _metricCollector.getMetric(
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.PartialRebalanceLatencyGauge
.name(),
LatencyMetric.class);
_writeLatency = _metricCollector.getMetric(
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.StateWriteLatencyGauge.name(),
LatencyMetric.class);
_stateReadLatency = _metricCollector.getMetric(
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.StateReadLatencyGauge.name(),
LatencyMetric.class);
_baselineDivergenceGauge = _metricCollector.getMetric(
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.BaselineDivergenceGauge.name(),
BaselineDivergenceGauge.class);
_changeDetector = new ResourceChangeDetector(true);
_baselineCalculateExecutor = Executors.newSingleThreadExecutor();
_asyncGlobalRebalanceEnabled = isAsyncGlobalRebalanceEnabled;
}
// Update the global rebalance mode to be asynchronous or synchronous
public void setGlobalRebalanceAsyncMode(boolean isAsyncGlobalRebalanceEnabled) {
_asyncGlobalRebalanceEnabled = isAsyncGlobalRebalanceEnabled;
}
// Update the rebalancer preference if the new options are different from the current preference.
public synchronized void updateRebalancePreference(
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> newPreference) {
// 1. if the preference was not configured during constructing, no need to update.
// 2. if the preference equals to the new preference, no need to update.
if (!_preference.equals(NOT_CONFIGURED_PREFERENCE) && !_preference.equals(newPreference)) {
_rebalanceAlgorithm = ConstraintBasedAlgorithmFactory.getInstance(newPreference);
_preference = ImmutableMap.copyOf(newPreference);
}
}
@Override
public void reset() {
if (_assignmentMetadataStore != null) {
_assignmentMetadataStore.reset();
}
_changeDetector.resetSnapshots();
}
// TODO the rebalancer should reject any other computing request after being closed.
@Override
public void close() {
if (_baselineCalculateExecutor != null) {
_baselineCalculateExecutor.shutdownNow();
}
if (_assignmentMetadataStore != null) {
_assignmentMetadataStore.close();
}
_metricCollector.unregister();
}
@Override
public Map<String, IdealState> computeNewIdealStates(ResourceControllerDataProvider clusterData,
Map<String, Resource> resourceMap, final CurrentStateOutput currentStateOutput)
throws HelixRebalanceException {
if (resourceMap.isEmpty()) {
LOG.warn("There is no resource to be rebalanced by {}", this.getClass().getSimpleName());
return Collections.emptyMap();
}
LOG.info("Start computing new ideal states for resources: {}", resourceMap.keySet().toString());
validateInput(clusterData, resourceMap);
Map<String, IdealState> newIdealStates;
try {
// Calculate the target assignment based on the current cluster status.
newIdealStates = computeBestPossibleStates(clusterData, resourceMap, currentStateOutput,
_rebalanceAlgorithm);
} catch (HelixRebalanceException ex) {
LOG.error("Failed to calculate the new assignments.", ex);
// Record the failure in metrics.
_rebalanceFailureCount.increment(1L);
HelixRebalanceException.Type failureType = ex.getFailureType();
if (failureType.equals(HelixRebalanceException.Type.INVALID_REBALANCER_STATUS) || failureType
.equals(HelixRebalanceException.Type.UNKNOWN_FAILURE)) {
// If the failure is unknown or because of assignment store access failure, throw the
// rebalance exception.
throw ex;
} else { // return the previously calculated assignment.
LOG.warn(
"Returning the last known-good best possible assignment from metadata store due to "
+ "rebalance failure of type: {}", failureType);
// Note that don't return an assignment based on the current state if there is no previously
// calculated result in this fallback logic.
Map<String, ResourceAssignment> assignmentRecord =
getBestPossibleAssignment(_assignmentMetadataStore, new CurrentStateOutput(),
resourceMap.keySet());
newIdealStates = convertResourceAssignment(clusterData, assignmentRecord);
}
}
// Construct the new best possible states according to the current state and target assignment.
// Note that the new ideal state might be an intermediate state between the current state and
// the target assignment.
newIdealStates.values().parallelStream().forEach(idealState -> {
String resourceName = idealState.getResourceName();
// Adjust the states according to the current state.
ResourceAssignment finalAssignment = _mappingCalculator
.computeBestPossiblePartitionState(clusterData, idealState, resourceMap.get(resourceName),
currentStateOutput);
// Clean up the state mapping fields. Use the final assignment that is calculated by the
// mapping calculator to replace them.
idealState.getRecord().getMapFields().clear();
for (Partition partition : finalAssignment.getMappedPartitions()) {
Map<String, String> newStateMap = finalAssignment.getReplicaMap(partition);
// if the final states cannot be generated, override the best possible state with empty map.
idealState.setInstanceStateMap(partition.getPartitionName(),
newStateMap == null ? Collections.emptyMap() : newStateMap);
}
});
LOG.info("Finish computing new ideal states for resources: {}",
resourceMap.keySet().toString());
return newIdealStates;
}
// Coordinate global rebalance and partial rebalance according to the cluster changes.
private Map<String, IdealState> computeBestPossibleStates(
ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
final CurrentStateOutput currentStateOutput, RebalanceAlgorithm algorithm)
throws HelixRebalanceException {
Set<String> activeNodes = DelayedRebalanceUtil
.getActiveNodes(clusterData.getAllInstances(), clusterData.getEnabledLiveInstances(),
clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(),
clusterData.getInstanceConfigMap(), clusterData.getClusterConfig());
// Schedule (or unschedule) delayed rebalance according to the delayed rebalance config.
delayedRebalanceSchedule(clusterData, activeNodes, resourceMap.keySet());
Map<String, IdealState> newIdealStates = convertResourceAssignment(clusterData,
computeBestPossibleAssignment(clusterData, resourceMap, activeNodes, currentStateOutput,
algorithm));
// The additional rebalance overwrite is required since the calculated mapping may contain
// some delayed rebalanced assignments.
if (!activeNodes.equals(clusterData.getEnabledLiveInstances())) {
applyRebalanceOverwrite(newIdealStates, clusterData, resourceMap,
getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet()),
algorithm);
}
// Replace the assignment if user-defined preference list is configured.
// Note the user-defined list is intentionally applied to the final mapping after calculation.
// This is to avoid persisting it into the assignment store, which impacts the long term
// assignment evenness and partition movements.
newIdealStates.entrySet().stream().forEach(idealStateEntry -> applyUserDefinedPreferenceList(
clusterData.getResourceConfig(idealStateEntry.getKey()), idealStateEntry.getValue()));
return newIdealStates;
}
// Coordinate global rebalance and partial rebalance according to the cluster changes.
protected Map<String, ResourceAssignment> computeBestPossibleAssignment(
ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
Set<String> activeNodes, final CurrentStateOutput currentStateOutput,
RebalanceAlgorithm algorithm)
throws HelixRebalanceException {
// Perform global rebalance for a new baseline assignment
globalRebalance(clusterData, resourceMap, currentStateOutput, algorithm);
// Perform partial rebalance for a new best possible assignment
Map<String, ResourceAssignment> newAssignment =
partialRebalance(clusterData, resourceMap, activeNodes, currentStateOutput, algorithm);
return newAssignment;
}
/**
* Convert the resource assignment map into an IdealState map.
*/
private Map<String, IdealState> convertResourceAssignment(
ResourceControllerDataProvider clusterData, Map<String, ResourceAssignment> assignments)
throws HelixRebalanceException {
// Convert the assignments into IdealState for the following state mapping calculation.
Map<String, IdealState> finalIdealStateMap = new HashMap<>();
for (String resourceName : assignments.keySet()) {
try {
IdealState currentIdealState = clusterData.getIdealState(resourceName);
Map<String, Integer> statePriorityMap =
clusterData.getStateModelDef(currentIdealState.getStateModelDefRef())
.getStatePriorityMap();
// Create a new IdealState instance which contains the new calculated assignment in the
// preference list.
IdealState newIdealState = new IdealState(resourceName);
// Copy the simple fields
newIdealState.getRecord().setSimpleFields(currentIdealState.getRecord().getSimpleFields());
// Sort the preference list according to state priority.
newIdealState.setPreferenceLists(
getPreferenceLists(assignments.get(resourceName), statePriorityMap));
// Note the state mapping in the new assignment won't directly propagate to the map fields.
// The rebalancer will calculate for the final state mapping considering the current states.
finalIdealStateMap.put(resourceName, newIdealState);
} catch (Exception ex) {
throw new HelixRebalanceException(
"Failed to calculate the new IdealState for resource: " + resourceName,
HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
}
}
return finalIdealStateMap;
}
/**
* Global rebalance calculates for a new baseline assignment.
* The new baseline assignment will be persisted and leveraged by the partial rebalance.
* @param clusterData
* @param resourceMap
* @param currentStateOutput
* @param algorithm
* @throws HelixRebalanceException
*/
private void globalRebalance(ResourceControllerDataProvider clusterData,
Map<String, Resource> resourceMap, final CurrentStateOutput currentStateOutput,
RebalanceAlgorithm algorithm)
throws HelixRebalanceException {
_changeDetector.updateSnapshots(clusterData);
// Get all the changed items' information. Filter for the items that have content changed.
final Map<HelixConstants.ChangeType, Set<String>> clusterChanges =
_changeDetector.getAllChanges();
if (clusterChanges.keySet().stream()
.anyMatch(GLOBAL_REBALANCE_REQUIRED_CHANGE_TYPES::contains)) {
// Build the cluster model for rebalance calculation.
// Note, for a Baseline calculation,
// 1. Ignore node status (disable/offline).
// 2. Use the previous Baseline as the only parameter about the previous assignment.
Map<String, ResourceAssignment> currentBaseline =
getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet());
ClusterModel clusterModel;
try {
clusterModel = ClusterModelProvider
.generateClusterModelForBaseline(clusterData, resourceMap,
clusterData.getAllInstances(), clusterChanges, currentBaseline);
} catch (Exception ex) {
throw new HelixRebalanceException("Failed to generate cluster model for global rebalance.",
HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
}
final boolean waitForGlobalRebalance = !_asyncGlobalRebalanceEnabled;
final String clusterName = clusterData.getClusterName();
// Calculate the Baseline assignment for global rebalance.
Future<Boolean> result = _baselineCalculateExecutor.submit(() -> {
try {
// Note that we should schedule a new partial rebalance for a future rebalance pipeline if
// the planned partial rebalance in the current rebalance pipeline won't wait for the new
// baseline being calculated.
// So set shouldSchedulePartialRebalance to be !waitForGlobalRebalance
calculateAndUpdateBaseline(clusterModel, algorithm, !waitForGlobalRebalance, clusterName);
} catch (HelixRebalanceException e) {
LOG.error("Failed to calculate baseline assignment!", e);
return false;
}
return true;
});
if (waitForGlobalRebalance) {
try {
if (!result.get()) {
throw new HelixRebalanceException("Failed to calculate for the new Baseline.",
HelixRebalanceException.Type.FAILED_TO_CALCULATE);
}
} catch (InterruptedException | ExecutionException e) {
throw new HelixRebalanceException("Failed to execute new Baseline calculation.",
HelixRebalanceException.Type.FAILED_TO_CALCULATE, e);
}
}
}
}
/**
* Calculate and update the Baseline assignment
* @param clusterModel
* @param algorithm
* @param shouldSchedulePartialRebalance True if the call should trigger a following partial rebalance
* so the new Baseline could be applied to cluster.
* @param clusterName
* @throws HelixRebalanceException
*/
private void calculateAndUpdateBaseline(ClusterModel clusterModel, RebalanceAlgorithm algorithm,
boolean shouldSchedulePartialRebalance, String clusterName)
throws HelixRebalanceException {
LOG.info("Start calculating the new baseline.");
_baselineCalcCounter.increment(1L);
_baselineCalcLatency.startMeasuringLatency();
boolean isBaselineChanged = false;
Map<String, ResourceAssignment> newBaseline = calculateAssignment(clusterModel, algorithm);
// Write the new baseline to metadata store
if (_assignmentMetadataStore != null) {
try {
_writeLatency.startMeasuringLatency();
isBaselineChanged = _assignmentMetadataStore.persistBaseline(newBaseline);
_writeLatency.endMeasuringLatency();
} catch (Exception ex) {
throw new HelixRebalanceException("Failed to persist the new baseline assignment.",
HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
}
} else {
LOG.debug("Assignment Metadata Store is null. Skip persisting the baseline assignment.");
}
_baselineCalcLatency.endMeasuringLatency();
LOG.info("Global baseline calculation completed and has been persisted into metadata store.");
if (isBaselineChanged && shouldSchedulePartialRebalance) {
LOG.info("Schedule a new rebalance after the new baseline calculation has finished.");
RebalanceUtil.scheduleOnDemandPipeline(clusterName, 0L, false);
}
}
private Map<String, ResourceAssignment> partialRebalance(
ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
Set<String> activeNodes, final CurrentStateOutput currentStateOutput,
RebalanceAlgorithm algorithm)
throws HelixRebalanceException {
LOG.info("Start calculating the new best possible assignment.");
_partialRebalanceCounter.increment(1L);
_partialRebalanceLatency.startMeasuringLatency();
// TODO: Consider combining the metrics for both baseline/best possible?
// Read the baseline from metadata store
Map<String, ResourceAssignment> currentBaseline =
getBaselineAssignment(_assignmentMetadataStore, currentStateOutput, resourceMap.keySet());
// Read the best possible assignment from metadata store
Map<String, ResourceAssignment> currentBestPossibleAssignment =
getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput,
resourceMap.keySet());
ClusterModel clusterModel;
try {
clusterModel = ClusterModelProvider
.generateClusterModelForPartialRebalance(clusterData, resourceMap, activeNodes,
currentBaseline, currentBestPossibleAssignment);
} catch (Exception ex) {
throw new HelixRebalanceException("Failed to generate cluster model for partial rebalance.",
HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
}
Map<String, ResourceAssignment> newAssignment = calculateAssignment(clusterModel, algorithm);
// Asynchronously report baseline divergence metric before persisting to metadata store,
// just in case if persisting fails, we still have the metric.
// To avoid changes of the new assignment and make it safe when being used to measure baseline
// divergence, use a deep copy of the new assignment.
Map<String, ResourceAssignment> newAssignmentCopy = new HashMap<>();
for (Map.Entry<String, ResourceAssignment> entry : newAssignment.entrySet()) {
newAssignmentCopy.put(entry.getKey(), new ResourceAssignment(entry.getValue().getRecord()));
}
_baselineDivergenceGauge.asyncMeasureAndUpdateValue(clusterData.getAsyncTasksThreadPool(),
currentBaseline, newAssignmentCopy);
if (_assignmentMetadataStore != null) {
try {
_writeLatency.startMeasuringLatency();
_assignmentMetadataStore.persistBestPossibleAssignment(newAssignment);
_writeLatency.endMeasuringLatency();
} catch (Exception ex) {
throw new HelixRebalanceException("Failed to persist the new best possible assignment.",
HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
}
} else {
LOG.debug("Assignment Metadata Store is null. Skip persisting the baseline assignment.");
}
_partialRebalanceLatency.endMeasuringLatency();
LOG.info("Finish calculating the new best possible assignment.");
return newAssignment;
}
/**
* @param clusterModel the cluster model that contains all the cluster status for the purpose of
* rebalancing.
* @return the new optimal assignment for the resources.
*/
private Map<String, ResourceAssignment> calculateAssignment(ClusterModel clusterModel,
RebalanceAlgorithm algorithm) throws HelixRebalanceException {
long startTime = System.currentTimeMillis();
LOG.info("Start calculating for an assignment with algorithm {}",
algorithm.getClass().getSimpleName());
OptimalAssignment optimalAssignment = algorithm.calculate(clusterModel);
Map<String, ResourceAssignment> newAssignment =
optimalAssignment.getOptimalResourceAssignment();
LOG.info("Finish calculating an assignment with algorithm {}. Took: {} ms.",
algorithm.getClass().getSimpleName(), System.currentTimeMillis() - startTime);
return newAssignment;
}
// Generate the preference lists from the state mapping based on state priority.
private Map<String, List<String>> getPreferenceLists(ResourceAssignment newAssignment,
Map<String, Integer> statePriorityMap) {
Map<String, List<String>> preferenceList = new HashMap<>();
for (Partition partition : newAssignment.getMappedPartitions()) {
List<String> nodes = new ArrayList<>(newAssignment.getReplicaMap(partition).keySet());
// To ensure backward compatibility, sort the preference list according to state priority.
nodes.sort((node1, node2) -> {
int statePriority1 =
statePriorityMap.get(newAssignment.getReplicaMap(partition).get(node1));
int statePriority2 =
statePriorityMap.get(newAssignment.getReplicaMap(partition).get(node2));
if (statePriority1 == statePriority2) {
return node1.compareTo(node2);
} else {
return statePriority1 - statePriority2;
}
});
preferenceList.put(partition.getPartitionName(), nodes);
}
return preferenceList;
}
private void validateInput(ResourceControllerDataProvider clusterData,
Map<String, Resource> resourceMap) throws HelixRebalanceException {
Set<String> nonCompatibleResources = resourceMap.entrySet().stream().filter(resourceEntry -> {
IdealState is = clusterData.getIdealState(resourceEntry.getKey());
return is == null || !is.getRebalanceMode().equals(IdealState.RebalanceMode.FULL_AUTO)
|| !WagedRebalancer.class.getName().equals(is.getRebalancerClassName());
}).map(Map.Entry::getKey).collect(Collectors.toSet());
if (!nonCompatibleResources.isEmpty()) {
throw new HelixRebalanceException(String.format(
"Input contains invalid resource(s) that cannot be rebalanced by the WAGED rebalancer. %s",
nonCompatibleResources.toString()), HelixRebalanceException.Type.INVALID_INPUT);
}
}
/**
* @param assignmentMetadataStore
* @param currentStateOutput
* @param resources
* @return The current baseline assignment. If record does not exist in the
* assignmentMetadataStore, return the current state assignment.
* @throws HelixRebalanceException
*/
private Map<String, ResourceAssignment> getBaselineAssignment(
AssignmentMetadataStore assignmentMetadataStore, CurrentStateOutput currentStateOutput,
Set<String> resources) throws HelixRebalanceException {
Map<String, ResourceAssignment> currentBaseline = Collections.emptyMap();
if (assignmentMetadataStore != null) {
try {
_stateReadLatency.startMeasuringLatency();
currentBaseline = assignmentMetadataStore.getBaseline();
_stateReadLatency.endMeasuringLatency();
} catch (Exception ex) {
throw new HelixRebalanceException(
"Failed to get the current baseline assignment because of unexpected error.",
HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
}
}
if (currentBaseline.isEmpty()) {
LOG.warn("The current baseline assignment record is empty. Use the current states instead.");
currentBaseline = currentStateOutput.getAssignment(resources);
}
currentBaseline.keySet().retainAll(resources);
return currentBaseline;
}
/**
* @param assignmentMetadataStore
* @param currentStateOutput
* @param resources
* @return The current best possible assignment. If record does not exist in the
* assignmentMetadataStore, return the current state assignment.
* @throws HelixRebalanceException
*/
protected Map<String, ResourceAssignment> getBestPossibleAssignment(
AssignmentMetadataStore assignmentMetadataStore, CurrentStateOutput currentStateOutput,
Set<String> resources) throws HelixRebalanceException {
Map<String, ResourceAssignment> currentBestAssignment = Collections.emptyMap();
if (assignmentMetadataStore != null) {
try {
_stateReadLatency.startMeasuringLatency();
currentBestAssignment = assignmentMetadataStore.getBestPossibleAssignment();
_stateReadLatency.endMeasuringLatency();
} catch (Exception ex) {
throw new HelixRebalanceException(
"Failed to get the current best possible assignment because of unexpected error.",
HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
}
}
if (currentBestAssignment.isEmpty()) {
LOG.warn(
"The current best possible assignment record is empty. Use the current states instead.");
currentBestAssignment = currentStateOutput.getAssignment(resources);
}
currentBestAssignment.keySet().retainAll(resources);
return currentBestAssignment;
}
/**
* Schedule rebalance according to the delayed rebalance logic.
* @param clusterData the current cluster data cache
* @param delayedActiveNodes the active nodes set that is calculated with the delay time window
* @param resourceSet the rebalanced resourceSet
*/
private void delayedRebalanceSchedule(ResourceControllerDataProvider clusterData,
Set<String> delayedActiveNodes, Set<String> resourceSet) {
if (_manager != null) {
// Schedule for the next delayed rebalance in case no cluster change event happens.
ClusterConfig clusterConfig = clusterData.getClusterConfig();
boolean delayedRebalanceEnabled = DelayedRebalanceUtil.isDelayRebalanceEnabled(clusterConfig);
Set<String> offlineOrDisabledInstances = new HashSet<>(delayedActiveNodes);
offlineOrDisabledInstances.removeAll(clusterData.getEnabledLiveInstances());
for (String resource : resourceSet) {
DelayedRebalanceUtil
.setRebalanceScheduler(resource, delayedRebalanceEnabled, offlineOrDisabledInstances,
clusterData.getInstanceOfflineTimeMap(), clusterData.getLiveInstances().keySet(),
clusterData.getInstanceConfigMap(), clusterConfig.getRebalanceDelayTime(),
clusterConfig, _manager);
}
} else {
LOG.warn("Skip scheduling a delayed rebalancer since HelixManager is not specified.");
}
}
/**
* Update the rebalanced ideal states according to the real active nodes.
* Since the rebalancing might be done with the delayed logic, the rebalanced ideal states
* might include inactive nodes.
* This overwrite will adjust the final mapping, so as to ensure the result is completely valid.
* @param idealStateMap the calculated ideal states.
* @param clusterData the cluster data cache.
* @param resourceMap the rebalanaced resource map.
* @param baseline the baseline assignment.
* @param algorithm the rebalance algorithm.
*/
private void applyRebalanceOverwrite(Map<String, IdealState> idealStateMap,
ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
Map<String, ResourceAssignment> baseline, RebalanceAlgorithm algorithm)
throws HelixRebalanceException {
ClusterModel clusterModel;
try {
// Note this calculation uses the baseline as the best possible assignment input here.
// This is for minimizing unnecessary partition movement.
clusterModel = ClusterModelProvider
.generateClusterModelFromExistingAssignment(clusterData, resourceMap, baseline);
} catch (Exception ex) {
throw new HelixRebalanceException(
"Failed to generate cluster model for delayed rebalance overwrite.",
HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
}
Map<String, IdealState> activeIdealStates =
convertResourceAssignment(clusterData, calculateAssignment(clusterModel, algorithm));
for (String resourceName : idealStateMap.keySet()) {
// The new calculated ideal state before overwrite
IdealState newIdealState = idealStateMap.get(resourceName);
if (!activeIdealStates.containsKey(resourceName)) {
throw new HelixRebalanceException(
"Failed to calculate the complete partition assignment with all active nodes. Cannot find the resource assignment for "
+ resourceName, HelixRebalanceException.Type.FAILED_TO_CALCULATE);
}
// The ideal state that is calculated based on the real alive/enabled instances list
IdealState newActiveIdealState = activeIdealStates.get(resourceName);
// The current ideal state that exists in the IdealState znode
IdealState currentIdealState = clusterData.getIdealState(resourceName);
Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
int numReplica = currentIdealState.getReplicaCount(enabledLiveInstances.size());
int minActiveReplica =
DelayedRebalanceUtil.getMinActiveReplica(currentIdealState, numReplica);
Map<String, List<String>> finalPreferenceLists = DelayedRebalanceUtil
.getFinalDelayedMapping(newActiveIdealState.getPreferenceLists(),
newIdealState.getPreferenceLists(), enabledLiveInstances,
Math.min(minActiveReplica, numReplica));
newIdealState.setPreferenceLists(finalPreferenceLists);
}
}
private void applyUserDefinedPreferenceList(ResourceConfig resourceConfig,
IdealState idealState) {
if (resourceConfig != null) {
Map<String, List<String>> userDefinedPreferenceList = resourceConfig.getPreferenceLists();
if (!userDefinedPreferenceList.isEmpty()) {
LOG.info("Using user defined preference list for partitions.");
for (String partition : userDefinedPreferenceList.keySet()) {
idealState.setPreferenceList(partition, userDefinedPreferenceList.get(partition));
}
}
}
}
protected AssignmentMetadataStore getAssignmentMetadataStore() {
return _assignmentMetadataStore;
}
protected MetricCollector getMetricCollector() {
return _metricCollector;
}
@Override
protected void finalize()
throws Throwable {
super.finalize();
close();
}
}