| package org.apache.helix.controller.rebalancer.strategy; |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Random; |
| |
| import org.apache.helix.HelixException; |
| import org.apache.helix.zookeeper.datamodel.ZNRecord; |
| import org.apache.helix.api.rebalancer.constraint.AbstractRebalanceHardConstraint; |
| import org.apache.helix.api.rebalancer.constraint.AbstractRebalanceSoftConstraint; |
| import org.apache.helix.api.rebalancer.constraint.dataprovider.CapacityProvider; |
| import org.apache.helix.api.rebalancer.constraint.dataprovider.PartitionWeightProvider; |
| import org.apache.helix.controller.LogUtil; |
| import org.apache.helix.controller.common.ResourcesStateMap; |
| import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; |
| import org.apache.helix.controller.rebalancer.constraint.PartitionWeightAwareEvennessConstraint; |
| import org.apache.helix.controller.rebalancer.strategy.crushMapping.CardDealingAdjustmentAlgorithmV2; |
| import org.apache.helix.controller.rebalancer.topology.Topology; |
| import org.apache.helix.model.InstanceConfig; |
| import org.apache.helix.model.Partition; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Constraints based rebalance strategy. |
| * Assignment is calculated according to the specified constraints. |
| */ |
| public class ConstraintRebalanceStrategy extends AbstractEvenDistributionRebalanceStrategy { |
| private static final Logger _logger = LoggerFactory.getLogger(ConstraintRebalanceStrategy.class); |
| // For the instances that are restricted by soft constraints, the minimum weight for assigning partitions. |
| private static final int MIN_INSTANCE_WEIGHT = 1; |
| |
| // CRUSH ensures deterministic and evenness |
| private final RebalanceStrategy<ResourceControllerDataProvider> _baseStrategy = |
| new CrushRebalanceStrategy(); |
| |
| protected RebalanceStrategy<ResourceControllerDataProvider> getBaseRebalanceStrategy() { |
| return _baseStrategy; |
| } |
| |
| private final List<AbstractRebalanceHardConstraint> _hardConstraints = new ArrayList<>(); |
| private final List<AbstractRebalanceSoftConstraint> _softConstraints = new ArrayList<>(); |
| |
| private List<String> _partitions; |
| private int _maxPerNode; |
| |
| // resource replica state requirement |
| private LinkedHashMap<String, Integer> _states; |
| // extend state requirement to a ordered list |
| private List<String> _orderedStateList; |
| |
| public ConstraintRebalanceStrategy( |
| List<? extends AbstractRebalanceHardConstraint> hardConstraints, |
| List<? extends AbstractRebalanceSoftConstraint> softConstraints) { |
| if (hardConstraints != null) { |
| _hardConstraints.addAll(hardConstraints); |
| } |
| if (softConstraints != null) { |
| _softConstraints.addAll(softConstraints); |
| } |
| if (_hardConstraints.isEmpty() && _softConstraints.isEmpty()) { |
| throw new HelixException( |
| "Failed to construct ConstraintRebalanceStrategy since no constraint is provided."); |
| } |
| } |
| |
| /** |
| * This strategy is currently for rebalance tool only. |
| * For the constructor defined for AutoRebalancer, use a simplified default constraint to ensure balance. |
| * Note this strategy will flip-flop almost for sure if directly used in the existing rebalancer. |
| * TODO Enable different constraints for automatic rebalance process in the controller later. |
| */ |
| public ConstraintRebalanceStrategy() { |
| _logger.debug("Init constraint rebalance strategy using the default even constraint."); |
| PartitionWeightAwareEvennessConstraint defaultConstraint = |
| new PartitionWeightAwareEvennessConstraint(new PartitionWeightProvider() { |
| @Override |
| public int getPartitionWeight(String resource, String partition) { |
| return 1; |
| } |
| }, new CapacityProvider() { |
| @Override |
| public int getParticipantCapacity(String participant) { |
| return MIN_INSTANCE_WEIGHT; |
| } |
| |
| @Override |
| public int getParticipantUsage(String participant) { |
| return 0; |
| } |
| }); |
| _softConstraints.add(defaultConstraint); |
| } |
| |
| protected CardDealingAdjustmentAlgorithmV2 getCardDealingAlgorithm(Topology topology) { |
| // For constraint based strategy, need more fine-grained assignment for each partition. |
| // So evenness is more important. |
| return new CardDealingAdjustmentAlgorithmV2(topology, _replica, |
| CardDealingAdjustmentAlgorithmV2.Mode.EVENNESS); |
| } |
| |
| @Override |
| public void init(String resourceName, final List<String> partitions, |
| final LinkedHashMap<String, Integer> states, int maximumPerNode) { |
| _resourceName = resourceName; |
| _partitions = new ArrayList<>(partitions); |
| _maxPerNode = maximumPerNode; |
| _states = states; |
| _orderedStateList = new ArrayList<>(); |
| for (String state : states.keySet()) { |
| for (int i = 0; i < states.get(state); i++) { |
| _orderedStateList.add(state); |
| } |
| } |
| } |
| |
| /** |
| * Generate assignment based on the constraints. |
| * |
| * @param allNodes All instances |
| * @param liveNodes List of live instances |
| * @param currentMapping current replica mapping. Will directly use this mapping if it meets state model requirement |
| * @param clusterData cluster data |
| * @return IdeaState node that contains both preference list and a proposed state mapping. |
| * @throws HelixException |
| */ |
| @Override |
| public ZNRecord computePartitionAssignment(final List<String> allNodes, |
| final List<String> liveNodes, final Map<String, Map<String, String>> currentMapping, |
| ResourceControllerDataProvider clusterData) throws HelixException { |
| // Since instance weight will be replaced by constraint evaluation, record it in advance to avoid |
| // overwriting. |
| Map<String, Integer> instanceWeightRecords = new HashMap<>(); |
| for (InstanceConfig instanceConfig : clusterData.getInstanceConfigMap().values()) { |
| if (instanceConfig.getWeight() != InstanceConfig.WEIGHT_NOT_SET) { |
| instanceWeightRecords.put(instanceConfig.getInstanceName(), instanceConfig.getWeight()); |
| } |
| } |
| |
| List<String> candidates = new ArrayList<>(allNodes); |
| // Only calculate for configured nodes. |
| // Remove all non-configured nodes. |
| candidates.retainAll(clusterData.getAllInstances()); |
| |
| // For generating the IdealState ZNRecord |
| Map<String, List<String>> preferenceList = new HashMap<>(); |
| Map<String, Map<String, String>> idealStateMap = new HashMap<>(); |
| |
| for (String partition : _partitions) { |
| if (currentMapping.containsKey(partition)) { |
| Map<String, String> partitionMapping = currentMapping.get(partition); |
| // check for the preferred assignment |
| partitionMapping = validateStateMap(partitionMapping); |
| if (partitionMapping != null) { |
| LogUtil.logDebug(_logger, clusterData.getClusterEventId(), |
| "The provided preferred partition assignment meets state model requirements. Skip rebalance."); |
| preferenceList.put(partition, new ArrayList<>(partitionMapping.keySet())); |
| idealStateMap.put(partition, partitionMapping); |
| updateConstraints(partition, partitionMapping, clusterData.getClusterEventId()); |
| continue; |
| } |
| } // else, recalculate the assignment |
| List<String> assignment = |
| computeSinglePartitionAssignment(partition, candidates, liveNodes, clusterData); |
| |
| // Shuffle the list. |
| // Note that single partition assignment won't have enough inputs to ensure state evenness. |
| // So need to shuffle again, here |
| Collections.shuffle(assignment, |
| new Random(String.format("%s.%s", _resourceName, partition).hashCode())); |
| |
| // Calculate for replica states |
| Map<String, String> stateMap = new HashMap<>(); |
| for (int i = 0; i < assignment.size(); i++) { |
| stateMap.put(assignment.get(i), _orderedStateList.get(i)); |
| } |
| |
| // Update idea states & preference list |
| idealStateMap.put(partition, stateMap); |
| preferenceList.put(partition, assignment); |
| // Note, only update with the new pending assignment |
| updateConstraints(partition, stateMap, clusterData.getClusterEventId()); |
| } |
| |
| // recover the original weight |
| for (String instanceName : instanceWeightRecords.keySet()) { |
| clusterData.getInstanceConfigMap().get(instanceName) |
| .setWeight(instanceWeightRecords.get(instanceName)); |
| } |
| |
| ZNRecord result = new ZNRecord(_resourceName); |
| result.setListFields(preferenceList); |
| result.setMapFields(idealStateMap); |
| return result; |
| } |
| |
| /** |
| * @param actualMapping |
| * @return a filtered state mapping that fit state model definition. |
| * Or null if the input mapping is conflict with state model. |
| */ |
| private Map<String, String> validateStateMap(Map<String, String> actualMapping) { |
| Map<String, String> filteredStateMapping = new HashMap<>(); |
| |
| Map<String, Integer> tmpStates = new HashMap<>(_states); |
| for (String partition : actualMapping.keySet()) { |
| String state = actualMapping.get(partition); |
| if (tmpStates.containsKey(state)) { |
| int count = tmpStates.get(state); |
| if (count > 0) { |
| filteredStateMapping.put(partition, state); |
| tmpStates.put(state, count - 1); |
| } |
| } |
| } |
| |
| for (String state : tmpStates.keySet()) { |
| if (tmpStates.get(state) > 0) { |
| return null; |
| } |
| } |
| return filteredStateMapping; |
| } |
| |
| /** |
| * Calculate for a fine-grained assignment for all replicas of a single partition. |
| * |
| * @param partitionName |
| * @param allNodes |
| * @param liveNodes |
| * @param clusterData |
| * @return |
| */ |
| private List<String> computeSinglePartitionAssignment(String partitionName, |
| final List<String> allNodes, final List<String> liveNodes, |
| ResourceControllerDataProvider clusterData) { |
| List<String> qualifiedNodes = new ArrayList<>(allNodes); |
| |
| // do hard constraints check and find all qualified instances |
| for (AbstractRebalanceHardConstraint hardConstraint : _hardConstraints) { |
| Map<String, String[]> proposedAssignment = Collections |
| .singletonMap(partitionName, qualifiedNodes.toArray(new String[qualifiedNodes.size()])); |
| boolean[] validateResults = |
| hardConstraint.isValid(_resourceName, proposedAssignment).get(partitionName); |
| for (int i = 0; i < validateResults.length; i++) { |
| if (!validateResults[i]) { |
| qualifiedNodes.remove(proposedAssignment.get(partitionName)[i]); |
| } |
| } |
| } |
| |
| int[] instancePriority = new int[qualifiedNodes.size()]; |
| Map<String, String[]> proposedAssignment = Collections |
| .singletonMap(partitionName, qualifiedNodes.toArray(new String[qualifiedNodes.size()])); |
| for (AbstractRebalanceSoftConstraint softConstraint : _softConstraints) { |
| if (softConstraint.getConstraintWeight() == 0) { |
| continue; |
| } |
| |
| int[] evaluateResults = |
| softConstraint.evaluate(_resourceName, proposedAssignment).get(partitionName); |
| for (int i = 0; i < evaluateResults.length; i++) { |
| // accumulate all evaluate results |
| instancePriority[i] += evaluateResults[i] * softConstraint.getConstraintWeight(); |
| } |
| } |
| |
| // Since the evaluated result can be a negative number, get the min result as the baseline for normalizing all priorities to set weight. |
| int baseline = Integer.MAX_VALUE; |
| for (int priority : instancePriority) { |
| if (baseline > priority) { |
| baseline = priority; |
| } |
| } |
| // Limit the weight to be at least MIN_INSTANCE_WEIGHT |
| for (int i = 0; i < instancePriority.length; i++) { |
| clusterData.getInstanceConfigMap().get(qualifiedNodes.get(i)) |
| .setWeight(instancePriority[i] - baseline + MIN_INSTANCE_WEIGHT); |
| } |
| |
| // Trigger rebalance only for a single partition. |
| // Note that if we do it for the whole resource, |
| // the result won't be accurate since the pending assignment won't be updated to constraints. |
| super.init(_resourceName, Collections.singletonList(partitionName), _states, _maxPerNode); |
| ZNRecord partitionAssignment = super |
| .computePartitionAssignment(qualifiedNodes, liveNodes, Collections.EMPTY_MAP, clusterData); |
| |
| return partitionAssignment.getListFields().get(partitionName); |
| } |
| |
| private void updateConstraints(String partition, Map<String, String> pendingAssignment, |
| String eventId) { |
| if (pendingAssignment.isEmpty()) { |
| LogUtil.logWarn(_logger, eventId, |
| "No pending assignment needs to update. Skip constraint update."); |
| return; |
| } |
| |
| ResourcesStateMap tempStateMap = new ResourcesStateMap(); |
| tempStateMap.setState(_resourceName, new Partition(partition), pendingAssignment); |
| LogUtil.logDebug(_logger, eventId, |
| "Update constraints with pending assignment: " + tempStateMap.toString()); |
| |
| for (AbstractRebalanceHardConstraint hardConstraint : _hardConstraints) { |
| hardConstraint.updateAssignment(tempStateMap); |
| } |
| for (AbstractRebalanceSoftConstraint softConstraint : _softConstraints) { |
| softConstraint.updateAssignment(tempStateMap); |
| } |
| } |
| } |