blob: 39a197bff50e53092675732e06b45fe697c1d36b [file] [log] [blame]
package org.apache.helix.controller.rebalancer.waged;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import com.google.common.collect.ImmutableList;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import com.google.common.collect.ImmutableMap;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixRebalanceException;
import org.apache.helix.controller.changedetector.ResourceChangeDetector;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
import org.apache.helix.controller.rebalancer.StatefulRebalancer;
import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
import org.apache.helix.controller.rebalancer.util.DelayedRebalanceUtil;
import org.apache.helix.controller.rebalancer.util.WagedRebalanceUtil;
import org.apache.helix.controller.rebalancer.util.WagedValidationUtil;
import org.apache.helix.controller.rebalancer.waged.constraints.ConstraintBasedAlgorithmFactory;
import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
import org.apache.helix.controller.rebalancer.waged.model.ClusterModelProvider;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.ResourceConfig;
import org.apache.helix.monitoring.metrics.MetricCollector;
import org.apache.helix.monitoring.metrics.WagedRebalancerMetricCollector;
import org.apache.helix.monitoring.metrics.model.CountMetric;
import org.apache.helix.monitoring.metrics.model.LatencyMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Weight-Aware Globally-Even Distribute Rebalancer.
* @see <a
* href="https://github.com/apache/helix/wiki/Weight-Aware-Globally-Even-Distribute-Rebalancer">
* Design Document
* </a>
*/
public class WagedRebalancer implements StatefulRebalancer<ResourceControllerDataProvider> {
private static final Logger LOG = LoggerFactory.getLogger(WagedRebalancer.class);
// To identify if the preference has been configured or not.
private static final Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer>
NOT_CONFIGURED_PREFERENCE = ImmutableMap
.of(ClusterConfig.GlobalRebalancePreferenceKey.EVENNESS, -1,
ClusterConfig.GlobalRebalancePreferenceKey.LESS_MOVEMENT, -1);
// The default algorithm to use when there is no preference configured.
private static final RebalanceAlgorithm DEFAULT_REBALANCE_ALGORITHM =
ConstraintBasedAlgorithmFactory
.getInstance(ClusterConfig.DEFAULT_GLOBAL_REBALANCE_PREFERENCE);
// These failure types should be propagated to caller of computeNewIdealStates()
private static final List<HelixRebalanceException.Type> FAILURE_TYPES_TO_PROPAGATE =
ImmutableList.of(HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, HelixRebalanceException.Type.UNKNOWN_FAILURE);
private final HelixManager _manager;
private final MappingCalculator<ResourceControllerDataProvider> _mappingCalculator;
private final AssignmentMetadataStore _assignmentMetadataStore;
private final MetricCollector _metricCollector;
private final CountMetric _rebalanceFailureCount;
private final LatencyMetric _writeLatency;
private final CountMetric _emergencyRebalanceCounter;
private final LatencyMetric _emergencyRebalanceLatency;
private final CountMetric _rebalanceOverwriteCounter;
private final LatencyMetric _rebalanceOverwriteLatency;
private final AssignmentManager _assignmentManager;
private final PartialRebalanceRunner _partialRebalanceRunner;
private final GlobalRebalanceRunner _globalRebalanceRunner;
// Note, the rebalance algorithm field is mutable so it should not be directly referred except for
// the public method computeNewIdealStates.
private RebalanceAlgorithm _rebalanceAlgorithm;
private Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> _preference = NOT_CONFIGURED_PREFERENCE;
private static AssignmentMetadataStore constructAssignmentStore(String metadataStoreAddrs,
String clusterName) {
if (metadataStoreAddrs != null && clusterName != null) {
return new AssignmentMetadataStore(metadataStoreAddrs, clusterName);
}
return null;
}
public WagedRebalancer(HelixManager helixManager) {
this(helixManager == null ? null
: constructAssignmentStore(helixManager.getMetadataStoreConnectionString(),
helixManager.getClusterName()),
DEFAULT_REBALANCE_ALGORITHM,
// Use DelayedAutoRebalancer as the mapping calculator for the final assignment output.
// Mapping calculator will translate the best possible assignment into the applicable state
// mapping based on the current states.
// TODO abstract and separate the main mapping calculator logic from DelayedAutoRebalancer
new DelayedAutoRebalancer(),
// Helix Manager is required for the rebalancer scheduler
helixManager,
// If HelixManager is null, we just pass in a non-functioning WagedRebalancerMetricCollector
// that will not be registered to MBean.
// This is to handle two cases: 1. HelixManager is null for non-testing cases. In this case,
// WagedRebalancer will not read/write to metadata store and just use CurrentState-based
// rebalancing. 2. Tests that require instrumenting the rebalancer for verifying whether the
// cluster has converged.
helixManager == null ? new WagedRebalancerMetricCollector()
: new WagedRebalancerMetricCollector(helixManager.getClusterName()),
ClusterConfig.DEFAULT_GLOBAL_REBALANCE_ASYNC_MODE_ENABLED,
ClusterConfig.DEFAULT_PARTIAL_REBALANCE_ASYNC_MODE_ENABLED);
_preference = ImmutableMap.copyOf(ClusterConfig.DEFAULT_GLOBAL_REBALANCE_PREFERENCE);
}
/**
* This constructor will use null for HelixManager. With null HelixManager, the rebalancer will
* not schedule for a future delayed rebalance.
* @param assignmentMetadataStore
* @param algorithm
* @param metricCollectorOptional
*/
protected WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
RebalanceAlgorithm algorithm, Optional<MetricCollector> metricCollectorOptional) {
this(assignmentMetadataStore, algorithm, new DelayedAutoRebalancer(), null,
// If metricCollector is not provided, instantiate a version that does not register metrics
// in order to allow rebalancer to proceed
metricCollectorOptional.orElse(new WagedRebalancerMetricCollector()),
false, false);
}
private WagedRebalancer(AssignmentMetadataStore assignmentMetadataStore,
RebalanceAlgorithm algorithm, MappingCalculator mappingCalculator, HelixManager manager,
MetricCollector metricCollector, boolean isAsyncGlobalRebalanceEnabled,
boolean isAsyncPartialRebalanceEnabled) {
if (assignmentMetadataStore == null) {
LOG.warn("Assignment Metadata Store is not configured properly."
+ " The rebalancer will not access the assignment store during the rebalance.");
}
_assignmentMetadataStore = assignmentMetadataStore;
_rebalanceAlgorithm = algorithm;
_mappingCalculator = mappingCalculator;
if (manager == null) {
LOG.warn("HelixManager is not provided. The rebalancer is not going to schedule for a future "
+ "rebalance even when delayed rebalance is enabled.");
}
_manager = manager;
_metricCollector = metricCollector;
_rebalanceFailureCount = _metricCollector.getMetric(
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceFailureCounter.name(),
CountMetric.class);
_emergencyRebalanceCounter = _metricCollector.getMetric(
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.EmergencyRebalanceCounter.name(), CountMetric.class);
_emergencyRebalanceLatency = _metricCollector.getMetric(
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.EmergencyRebalanceLatencyGauge.name(),
LatencyMetric.class);
_rebalanceOverwriteCounter = _metricCollector.getMetric(
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceOverwriteCounter.name(), CountMetric.class);
_rebalanceOverwriteLatency = _metricCollector.getMetric(
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.RebalanceOverwriteLatencyGauge.name(),
LatencyMetric.class);
_writeLatency = _metricCollector.getMetric(
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.StateWriteLatencyGauge.name(),
LatencyMetric.class);
_assignmentManager = new AssignmentManager(_metricCollector.getMetric(
WagedRebalancerMetricCollector.WagedRebalancerMetricNames.StateReadLatencyGauge.name(),
LatencyMetric.class));
_partialRebalanceRunner = new PartialRebalanceRunner(_assignmentManager, assignmentMetadataStore, metricCollector,
_rebalanceFailureCount, isAsyncPartialRebalanceEnabled);
_globalRebalanceRunner = new GlobalRebalanceRunner(_assignmentManager, assignmentMetadataStore, metricCollector,
_writeLatency, _rebalanceFailureCount, isAsyncGlobalRebalanceEnabled);
}
// Update the global rebalance mode to be asynchronous or synchronous
public void setGlobalRebalanceAsyncMode(boolean isAsyncGlobalRebalanceEnabled) {
_globalRebalanceRunner.setGlobalRebalanceAsyncMode(isAsyncGlobalRebalanceEnabled);
}
// Update the partial rebalance mode to be asynchronous or synchronous
public void setPartialRebalanceAsyncMode(boolean isAsyncPartialRebalanceEnabled) {
_partialRebalanceRunner.setPartialRebalanceAsyncMode(isAsyncPartialRebalanceEnabled);
}
// Update the rebalancer preference if the new options are different from the current preference.
public synchronized void updateRebalancePreference(
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> newPreference) {
// 1. if the preference was not configured during constructing, no need to update.
// 2. if the preference equals to the new preference, no need to update.
if (!_preference.equals(NOT_CONFIGURED_PREFERENCE) && !_preference.equals(newPreference)) {
_rebalanceAlgorithm = ConstraintBasedAlgorithmFactory.getInstance(newPreference);
_preference = ImmutableMap.copyOf(newPreference);
}
}
@Override
public void reset() {
if (_assignmentMetadataStore != null) {
_assignmentMetadataStore.reset();
}
_globalRebalanceRunner.resetChangeDetector();
}
// TODO the rebalancer should reject any other computing request after being closed.
@Override
public void close() {
_partialRebalanceRunner.close();
_globalRebalanceRunner.close();
if (_assignmentMetadataStore != null) {
_assignmentMetadataStore.close();
}
_metricCollector.unregister();
}
@Override
public Map<String, IdealState> computeNewIdealStates(ResourceControllerDataProvider clusterData,
Map<String, Resource> resourceMap, final CurrentStateOutput currentStateOutput)
throws HelixRebalanceException {
LOG.info("Start computing new ideal states for resources: {}", resourceMap.keySet().toString());
validateInput(clusterData, resourceMap);
Map<String, IdealState> newIdealStates;
try {
// Calculate the target assignment based on the current cluster status.
newIdealStates = computeBestPossibleStates(clusterData, resourceMap, currentStateOutput,
_rebalanceAlgorithm);
} catch (HelixRebalanceException ex) {
LOG.error("Failed to calculate the new assignments.", ex);
// Record the failure in metrics.
_rebalanceFailureCount.increment(1L);
HelixRebalanceException.Type failureType = ex.getFailureType();
if (failureTypesToPropagate().contains(failureType)) {
// If the failure is unknown or because of assignment store access failure, throw the
// rebalance exception.
throw ex;
} else {
// return the previously calculated assignment.
LOG.warn(
"Returning the last known-good best possible assignment from metadata store due to "
+ "rebalance failure of type: {}", failureType);
// Note that don't return an assignment based on the current state if there is no previously
// calculated result in this fallback logic.
Map<String, ResourceAssignment> assignmentRecord =
_assignmentManager.getBestPossibleAssignment(_assignmentMetadataStore, new CurrentStateOutput(),
resourceMap.keySet());
newIdealStates = convertResourceAssignment(clusterData, assignmentRecord);
}
}
// Construct the new best possible states according to the current state and target assignment.
// Note that the new ideal state might be an intermediate state between the current state and
// the target assignment.
newIdealStates.values().parallelStream().forEach(idealState -> {
String resourceName = idealState.getResourceName();
// Adjust the states according to the current state.
ResourceAssignment finalAssignment = _mappingCalculator
.computeBestPossiblePartitionState(clusterData, idealState, resourceMap.get(resourceName),
currentStateOutput);
// Clean up the state mapping fields. Use the final assignment that is calculated by the
// mapping calculator to replace them.
idealState.getRecord().getMapFields().clear();
for (Partition partition : finalAssignment.getMappedPartitions()) {
Map<String, String> newStateMap = finalAssignment.getReplicaMap(partition);
// if the final states cannot be generated, override the best possible state with empty map.
idealState.setInstanceStateMap(partition.getPartitionName(),
newStateMap == null ? Collections.emptyMap() : newStateMap);
}
});
LOG.info("Finish computing new ideal states for resources: {}",
resourceMap.keySet().toString());
return newIdealStates;
}
// Coordinate global rebalance and partial rebalance according to the cluster changes.
private Map<String, IdealState> computeBestPossibleStates(
ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
final CurrentStateOutput currentStateOutput, RebalanceAlgorithm algorithm)
throws HelixRebalanceException {
Set<String> activeNodes =
DelayedRebalanceUtil.getActiveNodes(clusterData.getAssignableInstances(),
clusterData.getEnabledLiveInstances(),
clusterData.getInstanceOfflineTimeMap(),
clusterData.getAssignableLiveInstances().keySet(),
clusterData.getAssignableInstanceConfigMap(), clusterData.getClusterConfig());
// Schedule (or unschedule) delayed rebalance according to the delayed rebalance config.
delayedRebalanceSchedule(clusterData, activeNodes, resourceMap.keySet());
Map<String, ResourceAssignment> newBestPossibleAssignment =
computeBestPossibleAssignment(clusterData, resourceMap, activeNodes, currentStateOutput, algorithm);
Map<String, IdealState> newIdealStates = convertResourceAssignment(clusterData, newBestPossibleAssignment);
// Replace the assignment if user-defined preference list is configured.
// Note the user-defined list is intentionally applied to the final mapping after calculation.
// This is to avoid persisting it into the assignment store, which impacts the long term
// assignment evenness and partition movements.
newIdealStates.forEach(
(resourceName, idealState) -> applyUserDefinedPreferenceList(clusterData.getResourceConfig(resourceName),
idealState));
return newIdealStates;
}
// Coordinate global rebalance and partial rebalance according to the cluster changes.
protected Map<String, ResourceAssignment> computeBestPossibleAssignment(
ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
Set<String> activeNodes, final CurrentStateOutput currentStateOutput,
RebalanceAlgorithm algorithm)
throws HelixRebalanceException {
// Perform global rebalance for a new baseline assignment
_globalRebalanceRunner.globalRebalance(clusterData, resourceMap, currentStateOutput, algorithm);
// Perform emergency rebalance for a new best possible assignment
return emergencyRebalance(clusterData, resourceMap, activeNodes, currentStateOutput, algorithm);
}
/**
* Convert the resource assignment map into an IdealState map.
*/
private Map<String, IdealState> convertResourceAssignment(
ResourceControllerDataProvider clusterData, Map<String, ResourceAssignment> assignments)
throws HelixRebalanceException {
// Convert the assignments into IdealState for the following state mapping calculation.
Map<String, IdealState> finalIdealStateMap = new HashMap<>();
for (String resourceName : assignments.keySet()) {
try {
IdealState currentIdealState = clusterData.getIdealState(resourceName);
Map<String, Integer> statePriorityMap =
clusterData.getStateModelDef(currentIdealState.getStateModelDefRef())
.getStatePriorityMap();
// Create a new IdealState instance which contains the new calculated assignment in the
// preference list.
IdealState newIdealState = new IdealState(resourceName);
// Copy the simple fields
newIdealState.getRecord().setSimpleFields(currentIdealState.getRecord().getSimpleFields());
// Sort the preference list according to state priority.
newIdealState.setPreferenceLists(
getPreferenceLists(assignments.get(resourceName), statePriorityMap));
// Note the state mapping in the new assignment won't directly propagate to the map fields.
// The rebalancer will calculate for the final state mapping considering the current states.
finalIdealStateMap.put(resourceName, newIdealState);
} catch (Exception ex) {
throw new HelixRebalanceException(
"Failed to calculate the new IdealState for resource: " + resourceName,
HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
}
}
return finalIdealStateMap;
}
protected List<HelixRebalanceException.Type> failureTypesToPropagate() {
return FAILURE_TYPES_TO_PROPAGATE;
}
/**
* Some partition may fail to meet minActiveReplica due to delayed rebalance, because some instances are offline yet
* active. In this case, additional replicas have to be brought up -- until either the instance gets back, or timeout,
* at which we have a more permanent resolution.
* The term "overwrite" is inherited from historical approach, however, it's no longer technically an overwrite.
* It's a formal rebalance process that goes through the algorithm and all constraints.
* @param clusterData Cluster data cache
* @param resourceMap The map of resource to calculate
* @param activeNodes All active nodes (live nodes plus offline-yet-active nodes) while considering cluster's
* delayed rebalance config
* @param currentResourceAssignment The current resource assignment or the best possible assignment computed from last
* emergency rebalance.
* @param algorithm The rebalance algorithm
* @return The resource assignment with delayed rebalance minActiveReplica
*/
private Map<String, ResourceAssignment> handleDelayedRebalanceMinActiveReplica(
ResourceControllerDataProvider clusterData,
Map<String, Resource> resourceMap,
Set<String> activeNodes,
Map<String, ResourceAssignment> currentResourceAssignment,
RebalanceAlgorithm algorithm) throws HelixRebalanceException {
// the "real" live nodes at the time
final Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
if (activeNodes.equals(enabledLiveInstances) || !requireRebalanceOverwrite(clusterData, currentResourceAssignment)) {
// no need for additional process, return the current resource assignment
return currentResourceAssignment;
}
_rebalanceOverwriteCounter.increment(1L);
_rebalanceOverwriteLatency.startMeasuringLatency();
LOG.info("Start delayed rebalance overwrites in emergency rebalance.");
try {
// use the "real" live and enabled instances for calculation
ClusterModel clusterModel = ClusterModelProvider.generateClusterModelForDelayedRebalanceOverwrites(
clusterData, resourceMap, enabledLiveInstances, currentResourceAssignment);
Map<String, ResourceAssignment> assignment = WagedRebalanceUtil.calculateAssignment(clusterModel, algorithm);
// keep only the resource entries requiring changes for minActiveReplica
assignment.keySet().retainAll(clusterModel.getAssignableReplicaMap().keySet());
DelayedRebalanceUtil.mergeAssignments(assignment, currentResourceAssignment);
return currentResourceAssignment;
} catch (HelixRebalanceException e) {
LOG.error("Failed to compute for delayed rebalance overwrites in cluster {}", clusterData.getClusterName());
throw e;
} catch (Exception e) {
LOG.error("Failed to compute for delayed rebalance overwrites in cluster {}", clusterData.getClusterName());
throw new HelixRebalanceException("Failed to compute for delayed rebalance overwrites in cluster "
+ clusterData.getClusterConfig(), HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, e);
} finally {
_rebalanceOverwriteLatency.endMeasuringLatency();
}
}
/**
* Emergency rebalance is scheduled to quickly handle urgent cases like reassigning partitions from inactive nodes
* and addressing for partitions failing to meet minActiveReplicas.
* The scope of the computation here should be limited to handling urgency only and shouldn't be blocking.
* @param clusterData Cluster data cache
* @param resourceMap resource map
* @param activeNodes All active nodes (live nodes plus offline-yet-active nodes) while considering cluster's
* delayed rebalance config
* @param currentStateOutput Current state output from pipeline
* @param algorithm The rebalance algorithm
* @return The new resource assignment
*/
protected Map<String, ResourceAssignment> emergencyRebalance(
ResourceControllerDataProvider clusterData, Map<String, Resource> resourceMap,
Set<String> activeNodes, final CurrentStateOutput currentStateOutput,
RebalanceAlgorithm algorithm)
throws HelixRebalanceException {
LOG.info("Start emergency rebalance.");
_emergencyRebalanceCounter.increment(1L);
_emergencyRebalanceLatency.startMeasuringLatency();
Map<String, ResourceAssignment> currentBestPossibleAssignment =
_assignmentManager.getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput,
resourceMap.keySet());
// Step 1: Check for permanent node down
AtomicBoolean allNodesActive = new AtomicBoolean(true);
currentBestPossibleAssignment.values().parallelStream().forEach((resourceAssignment -> {
resourceAssignment.getMappedPartitions().parallelStream().forEach(partition -> {
for (String instance : resourceAssignment.getReplicaMap(partition).keySet()) {
if (!activeNodes.contains(instance)) {
allNodesActive.set(false);
break;
}
}
});
}));
// Step 2: if there are permanent node downs, calculate for a new one best possible
Map<String, ResourceAssignment> newAssignment;
if (!allNodesActive.get()) {
LOG.info("Emergency rebalance responding to permanent node down.");
ClusterModel clusterModel;
try {
clusterModel =
ClusterModelProvider.generateClusterModelForEmergencyRebalance(clusterData, resourceMap, activeNodes,
currentBestPossibleAssignment);
} catch (Exception ex) {
throw new HelixRebalanceException("Failed to generate cluster model for emergency rebalance.",
HelixRebalanceException.Type.INVALID_CLUSTER_STATUS, ex);
}
newAssignment = WagedRebalanceUtil.calculateAssignment(clusterModel, algorithm);
} else {
newAssignment = currentBestPossibleAssignment;
}
// Step 3: persist result to metadata store
persistBestPossibleAssignment(newAssignment);
// Step 4: handle delayed rebalance minActiveReplica
// Note this result is one step branching from the main calculation and SHOULD NOT be persisted -- it is temporary,
// and only apply during the delayed window of those offline yet active nodes, a definitive resolution will happen
// once the node comes back of remain offline after the delayed window.
Map<String, ResourceAssignment> assignmentWithDelayedRebalanceAdjust = newAssignment;
if (_partialRebalanceRunner.isAsyncPartialRebalanceEnabled()) {
assignmentWithDelayedRebalanceAdjust =
handleDelayedRebalanceMinActiveReplica(clusterData, resourceMap, activeNodes, newAssignment, algorithm);
}
_emergencyRebalanceLatency.endMeasuringLatency();
LOG.info("Finish emergency rebalance");
_partialRebalanceRunner.partialRebalance(clusterData, resourceMap, activeNodes, currentStateOutput, algorithm);
if (!_partialRebalanceRunner.isAsyncPartialRebalanceEnabled()) {
newAssignment = _assignmentManager.getBestPossibleAssignment(_assignmentMetadataStore, currentStateOutput,
resourceMap.keySet());
persistBestPossibleAssignment(newAssignment);
// delayed rebalance handling result is temporary, shouldn't be persisted
assignmentWithDelayedRebalanceAdjust =
handleDelayedRebalanceMinActiveReplica(clusterData, resourceMap, activeNodes, newAssignment, algorithm);
}
return assignmentWithDelayedRebalanceAdjust;
}
// Generate the preference lists from the state mapping based on state priority.
private Map<String, List<String>> getPreferenceLists(ResourceAssignment newAssignment,
Map<String, Integer> statePriorityMap) {
Map<String, List<String>> preferenceList = new HashMap<>();
for (Partition partition : newAssignment.getMappedPartitions()) {
List<String> nodes = new ArrayList<>(newAssignment.getReplicaMap(partition).keySet());
// To ensure backward compatibility, sort the preference list according to state priority.
nodes.sort((node1, node2) -> {
int statePriority1 =
statePriorityMap.get(newAssignment.getReplicaMap(partition).get(node1));
int statePriority2 =
statePriorityMap.get(newAssignment.getReplicaMap(partition).get(node2));
if (statePriority1 == statePriority2) {
return node1.compareTo(node2);
} else {
return statePriority1 - statePriority2;
}
});
preferenceList.put(partition.getPartitionName(), nodes);
}
return preferenceList;
}
private void validateInput(ResourceControllerDataProvider clusterData,
Map<String, Resource> resourceMap) throws HelixRebalanceException {
Set<String> nonCompatibleResources = resourceMap.keySet().stream()
.filter(resource -> !WagedValidationUtil.isWagedEnabled(clusterData.getIdealState(resource)))
.collect(Collectors.toSet());
if (!nonCompatibleResources.isEmpty()) {
throw new HelixRebalanceException(String.format(
"Input contains invalid resource(s) that cannot be rebalanced by the WAGED rebalancer. %s",
nonCompatibleResources.toString()), HelixRebalanceException.Type.INVALID_INPUT);
}
}
/**
* @param assignmentMetadataStore
* @param currentStateOutput
* @param resources
* @return The current best possible assignment. If record does not exist in the
* assignmentMetadataStore, return the current state assignment.
* @throws HelixRebalanceException
*/
protected Map<String, ResourceAssignment> getBestPossibleAssignment(
AssignmentMetadataStore assignmentMetadataStore, CurrentStateOutput currentStateOutput,
Set<String> resources) throws HelixRebalanceException {
return _assignmentManager.getBestPossibleAssignment(assignmentMetadataStore, currentStateOutput, resources);
}
private void persistBestPossibleAssignment(Map<String, ResourceAssignment> bestPossibleAssignment)
throws HelixRebalanceException {
// It is only persisted if the assignment is different from the currently cached assignment or the
// matching version of this assignment is not yet persisted. Partial Rebalance will not directly persist
// the assignment to the metadata store, it will only be cached. Instead, it will be persisted by the
// main thread on the next pipeline run, hence the check isBestPossibleChanged will be false.
if (_assignmentMetadataStore != null && (
_assignmentMetadataStore.isBestPossibleChanged(bestPossibleAssignment)
|| !_assignmentMetadataStore.hasPersistedLatestBestPossibleAssignment())) {
try {
_writeLatency.startMeasuringLatency();
_assignmentMetadataStore.persistBestPossibleAssignment(bestPossibleAssignment);
_writeLatency.endMeasuringLatency();
} catch (Exception ex) {
throw new HelixRebalanceException("Failed to persist the new best possible assignment.",
HelixRebalanceException.Type.INVALID_REBALANCER_STATUS, ex);
}
} else {
LOG.debug("Assignment Metadata Store is null. Skip persisting the best possible assignment.");
}
}
/**
* Schedule rebalance according to the delayed rebalance logic.
* @param clusterData the current cluster data cache
* @param delayedActiveNodes the active nodes set that is calculated with the delay time window
* @param resourceSet the rebalanced resourceSet
*/
private void delayedRebalanceSchedule(ResourceControllerDataProvider clusterData,
Set<String> delayedActiveNodes, Set<String> resourceSet) {
if (_manager != null) {
// Schedule for the next delayed rebalance in case no cluster change event happens.
ClusterConfig clusterConfig = clusterData.getClusterConfig();
boolean delayedRebalanceEnabled = DelayedRebalanceUtil.isDelayRebalanceEnabled(clusterConfig);
Set<String> offlineOrDisabledInstances = new HashSet<>(delayedActiveNodes);
offlineOrDisabledInstances.removeAll(clusterData.getEnabledLiveInstances());
for (String resource : resourceSet) {
DelayedRebalanceUtil
.setRebalanceScheduler(resource, delayedRebalanceEnabled, offlineOrDisabledInstances,
clusterData.getInstanceOfflineTimeMap(),
clusterData.getAssignableLiveInstances().keySet(),
clusterData.getAssignableInstanceConfigMap(), clusterConfig.getRebalanceDelayTime(),
clusterConfig, _manager);
}
} else {
LOG.warn("Skip scheduling a delayed rebalancer since HelixManager is not specified.");
}
}
protected boolean requireRebalanceOverwrite(ResourceControllerDataProvider clusterData,
Map<String, ResourceAssignment> bestPossibleAssignment) {
AtomicBoolean allMinActiveReplicaMet = new AtomicBoolean(true);
bestPossibleAssignment.values().parallelStream().forEach((resourceAssignment -> {
String resourceName = resourceAssignment.getResourceName();
IdealState currentIdealState = clusterData.getIdealState(resourceName);
Set<String> enabledLiveInstances = clusterData.getEnabledLiveInstances();
int numReplica = currentIdealState.getReplicaCount(enabledLiveInstances.size());
int minActiveReplica = DelayedRebalanceUtil.getMinActiveReplica(ResourceConfig
.mergeIdealStateWithResourceConfig(clusterData.getResourceConfig(resourceName),
currentIdealState), currentIdealState, numReplica);
resourceAssignment.getMappedPartitions().parallelStream().forEach(partition -> {
int enabledLivePlacementCounter = 0;
for (String instance : resourceAssignment.getReplicaMap(partition).keySet()) {
if (enabledLiveInstances.contains(instance)) {
enabledLivePlacementCounter++;
}
}
if (enabledLivePlacementCounter < Math.min(minActiveReplica, numReplica)) {
allMinActiveReplicaMet.set(false);
}
});
}));
return !allMinActiveReplicaMet.get();
}
private void applyUserDefinedPreferenceList(ResourceConfig resourceConfig,
IdealState idealState) {
if (resourceConfig != null) {
Map<String, List<String>> userDefinedPreferenceList = resourceConfig.getPreferenceLists();
if (!userDefinedPreferenceList.isEmpty()) {
LOG.info("Using user defined preference list for partitions.");
for (String partition : userDefinedPreferenceList.keySet()) {
idealState.setPreferenceList(partition, userDefinedPreferenceList.get(partition));
}
}
}
}
protected AssignmentMetadataStore getAssignmentMetadataStore() {
return _assignmentMetadataStore;
}
protected MetricCollector getMetricCollector() {
return _metricCollector;
}
protected ResourceChangeDetector getChangeDetector() {
return _globalRebalanceRunner.getChangeDetector();
}
@Override
protected void finalize()
throws Throwable {
super.finalize();
close();
}
}