| package org.apache.helix.controller.rebalancer.waged.constraints; |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.stream.Collectors; |
| |
| import com.google.common.collect.Maps; |
| import org.apache.helix.HelixRebalanceException; |
| import org.apache.helix.controller.rebalancer.waged.RebalanceAlgorithm; |
| import org.apache.helix.controller.rebalancer.waged.model.AssignableNode; |
| import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica; |
| import org.apache.helix.controller.rebalancer.waged.model.ClusterContext; |
| import org.apache.helix.controller.rebalancer.waged.model.ClusterModel; |
| import org.apache.helix.controller.rebalancer.waged.model.OptimalAssignment; |
| import org.apache.helix.model.ResourceAssignment; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * The algorithm is based on a given set of constraints |
| * - HardConstraint: Approve or deny the assignment given its condition, any assignment cannot |
| * bypass any "hard constraint" |
| * - SoftConstraint: Evaluate the assignment by points/rewards/scores, a higher point means a better |
| * assignment |
| * The goal is to accumulate the most points(rewards) from "soft constraints" while avoiding any |
| * "hard constraints" |
| */ |
| class ConstraintBasedAlgorithm implements RebalanceAlgorithm { |
| private static final Logger LOG = LoggerFactory.getLogger(ConstraintBasedAlgorithm.class); |
| private final List<HardConstraint> _hardConstraints; |
| private final Map<SoftConstraint, Float> _softConstraints; |
| |
| ConstraintBasedAlgorithm(List<HardConstraint> hardConstraints, |
| Map<SoftConstraint, Float> softConstraints) { |
| _hardConstraints = hardConstraints; |
| _softConstraints = softConstraints; |
| } |
| |
| @Override |
| public OptimalAssignment calculate(ClusterModel clusterModel) |
| throws HelixRebalanceException { |
| OptimalAssignment optimalAssignment = new OptimalAssignment(); |
| List<AssignableNode> nodes = new ArrayList<>(clusterModel.getAssignableNodes().values()); |
| Set<String> busyInstances = |
| getBusyInstances(clusterModel.getContext().getBestPossibleAssignment().values()); |
| // Sort the replicas so the input is stable for the greedy algorithm. |
| // For the other algorithm implementation, this sorting could be unnecessary. |
| for (AssignableReplica replica : getOrderedAssignableReplica(clusterModel)) { |
| Optional<AssignableNode> maybeBestNode = |
| getNodeWithHighestPoints(replica, nodes, clusterModel.getContext(), busyInstances, |
| optimalAssignment); |
| // stop immediately if any replica cannot find best assignable node |
| if (optimalAssignment.hasAnyFailure()) { |
| String errorMessage = String |
| .format("Unable to find any available candidate node for partition %s; Fail reasons: %s", |
| replica.getPartitionName(), optimalAssignment.getFailures()); |
| throw new HelixRebalanceException(errorMessage, |
| HelixRebalanceException.Type.FAILED_TO_CALCULATE); |
| } |
| maybeBestNode.ifPresent(node -> clusterModel |
| .assign(replica.getResourceName(), replica.getPartitionName(), replica.getReplicaState(), |
| node.getInstanceName())); |
| } |
| optimalAssignment.updateAssignments(clusterModel); |
| return optimalAssignment; |
| } |
| |
| private Optional<AssignableNode> getNodeWithHighestPoints(AssignableReplica replica, |
| List<AssignableNode> assignableNodes, ClusterContext clusterContext, |
| Set<String> busyInstances, OptimalAssignment optimalAssignment) { |
| Map<AssignableNode, List<HardConstraint>> hardConstraintFailures = new ConcurrentHashMap<>(); |
| List<AssignableNode> candidateNodes = assignableNodes.parallelStream().filter(candidateNode -> { |
| boolean isValid = true; |
| // need to record all the failure reasons and it gives us the ability to debug/fix the runtime |
| // cluster environment |
| for (HardConstraint hardConstraint : _hardConstraints) { |
| if (!hardConstraint.isAssignmentValid(candidateNode, replica, clusterContext)) { |
| hardConstraintFailures.computeIfAbsent(candidateNode, node -> new ArrayList<>()) |
| .add(hardConstraint); |
| isValid = false; |
| } |
| } |
| return isValid; |
| }).collect(Collectors.toList()); |
| |
| if (candidateNodes.isEmpty()) { |
| optimalAssignment.recordAssignmentFailure(replica, |
| Maps.transformValues(hardConstraintFailures, this::convertFailureReasons)); |
| return Optional.empty(); |
| } |
| |
| return candidateNodes.parallelStream().map(node -> new HashMap.SimpleEntry<>(node, |
| getAssignmentNormalizedScore(node, replica, clusterContext))) |
| .max((nodeEntry1, nodeEntry2) -> { |
| int scoreCompareResult = nodeEntry1.getValue().compareTo(nodeEntry2.getValue()); |
| if (scoreCompareResult == 0) { |
| // If the evaluation scores of 2 nodes are the same, the algorithm assigns the replica |
| // to the idle node first. |
| int idleScore1 = busyInstances.contains(nodeEntry1.getKey().getInstanceName()) ? 0 : 1; |
| int idleScore2 = busyInstances.contains(nodeEntry2.getKey().getInstanceName()) ? 0 : 1; |
| return idleScore1 - idleScore2; |
| } else { |
| return scoreCompareResult; |
| } |
| }).map(Map.Entry::getKey); |
| } |
| |
| private double getAssignmentNormalizedScore(AssignableNode node, AssignableReplica replica, |
| ClusterContext clusterContext) { |
| double sum = 0; |
| for (Map.Entry<SoftConstraint, Float> softConstraintEntry : _softConstraints.entrySet()) { |
| SoftConstraint softConstraint = softConstraintEntry.getKey(); |
| float weight = softConstraintEntry.getValue(); |
| if (weight != 0) { |
| // Skip calculating zero weighted constraints. |
| sum += weight * softConstraint.getAssignmentNormalizedScore(node, replica, clusterContext); |
| } |
| } |
| return sum; |
| } |
| |
| private List<String> convertFailureReasons(List<HardConstraint> hardConstraints) { |
| return hardConstraints.stream().map(HardConstraint::getDescription) |
| .collect(Collectors.toList()); |
| } |
| |
| private List<AssignableReplica> getOrderedAssignableReplica(ClusterModel clusterModel) { |
| Map<String, Set<AssignableReplica>> replicasByResource = clusterModel.getAssignableReplicaMap(); |
| List<AssignableReplica> orderedAssignableReplicas = |
| replicasByResource.values().stream().flatMap(replicas -> replicas.stream()) |
| .collect(Collectors.toList()); |
| |
| Map<String, ResourceAssignment> bestPossibleAssignment = |
| clusterModel.getContext().getBestPossibleAssignment(); |
| Map<String, ResourceAssignment> baselineAssignment = |
| clusterModel.getContext().getBaselineAssignment(); |
| |
| Map<String, Integer> replicaHashCodeMap = orderedAssignableReplicas.parallelStream().collect( |
| Collectors.toMap(AssignableReplica::toString, |
| replica -> Objects.hash(replica.toString(), clusterModel.getAssignableNodes().keySet()), |
| (hash1, hash2) -> hash2)); |
| |
| // 1. Sort according if the assignment exists in the best possible and/or baseline assignment |
| // 2. Sort according to the state priority. Note that prioritizing the top state is required. |
| // Or the greedy algorithm will unnecessarily shuffle the states between replicas. |
| // 3. Sort according to the resource/partition name. |
| orderedAssignableReplicas.sort((replica1, replica2) -> { |
| String resourceName1 = replica1.getResourceName(); |
| String resourceName2 = replica2.getResourceName(); |
| if (bestPossibleAssignment.containsKey(resourceName1) == bestPossibleAssignment |
| .containsKey(resourceName2)) { |
| if (baselineAssignment.containsKey(resourceName1) == baselineAssignment |
| .containsKey(resourceName2)) { |
| // If both assignment states have/not have the resource assignment the same, |
| // compare for additional dimensions. |
| int statePriority1 = replica1.getStatePriority(); |
| int statePriority2 = replica2.getStatePriority(); |
| if (statePriority1 == statePriority2) { |
| // If state priorities are the same, try to randomize the replicas order. Otherwise, |
| // the same replicas might always be moved in each rebalancing. This is because their |
| // placement calculating will always happen at the critical moment while the cluster is |
| // almost close to the expected utilization. |
| // |
| // Note that to ensure the algorithm is deterministic with the same inputs, do not use |
| // Random functions here. Use hashcode based on the cluster topology information to get |
| // a controlled randomized order is good enough. |
| Integer replicaHash1 = replicaHashCodeMap.get(replica1.toString()); |
| Integer replicaHash2 = replicaHashCodeMap.get(replica2.toString()); |
| if (!replicaHash1.equals(replicaHash2)) { |
| return replicaHash1.compareTo(replicaHash2); |
| } else { |
| // In case of hash collision, return order according to the name. |
| return replica1.toString().compareTo(replica2.toString()); |
| } |
| } else { |
| // Note we shall prioritize the replica with a higher state priority, |
| // the smaller priority number means higher priority. |
| return statePriority1 - statePriority2; |
| } |
| } else { |
| // If the baseline assignment contains the assignment, prioritize the replica. |
| return baselineAssignment.containsKey(resourceName1) ? -1 : 1; |
| } |
| } else { |
| // If the best possible assignment contains the assignment, prioritize the replica. |
| return bestPossibleAssignment.containsKey(resourceName1) ? -1 : 1; |
| } |
| }); |
| return orderedAssignableReplicas; |
| } |
| |
| /** |
| * @param assignments A collection of resource replicas assignment. |
| * @return A set of instance names that have at least one replica assigned in the input assignments. |
| */ |
| private Set<String> getBusyInstances(Collection<ResourceAssignment> assignments) { |
| return assignments.stream().flatMap( |
| resourceAssignment -> resourceAssignment.getRecord().getMapFields().values().stream() |
| .flatMap(instanceStateMap -> instanceStateMap.keySet().stream()) |
| .collect(Collectors.toSet()).stream()).collect(Collectors.toSet()); |
| } |
| } |