blob: 65737f75aa683a1b75f97c9dd32c6c680f8402c9 [file] [log] [blame]
package org.apache.helix.controller.rebalancer.waged.constraints;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import com.google.common.collect.Maps;
import org.apache.helix.HelixRebalanceException;
import org.apache.helix.controller.rebalancer.waged.RebalanceAlgorithm;
import org.apache.helix.controller.rebalancer.waged.model.AssignableNode;
import org.apache.helix.controller.rebalancer.waged.model.AssignableReplica;
import org.apache.helix.controller.rebalancer.waged.model.ClusterContext;
import org.apache.helix.controller.rebalancer.waged.model.ClusterModel;
import org.apache.helix.controller.rebalancer.waged.model.OptimalAssignment;
import org.apache.helix.model.ResourceAssignment;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The algorithm is based on a given set of constraints
* - HardConstraint: Approve or deny the assignment given its condition, any assignment cannot
* bypass any "hard constraint"
* - SoftConstraint: Evaluate the assignment by points/rewards/scores, a higher point means a better
* assignment
* The goal is to accumulate the most points(rewards) from "soft constraints" while avoiding any
* "hard constraints"
*/
class ConstraintBasedAlgorithm implements RebalanceAlgorithm {
private static final Logger LOG = LoggerFactory.getLogger(ConstraintBasedAlgorithm.class);
private final List<HardConstraint> _hardConstraints;
private final Map<SoftConstraint, Float> _softConstraints;
ConstraintBasedAlgorithm(List<HardConstraint> hardConstraints,
Map<SoftConstraint, Float> softConstraints) {
_hardConstraints = hardConstraints;
_softConstraints = softConstraints;
}
@Override
public OptimalAssignment calculate(ClusterModel clusterModel) throws HelixRebalanceException {
OptimalAssignment optimalAssignment = new OptimalAssignment();
List<AssignableNode> nodes = new ArrayList<>(clusterModel.getAssignableNodes().values());
// Sort the replicas so the input is stable for the greedy algorithm.
// For the other algorithm implementation, this sorting could be unnecessary.
for (AssignableReplica replica : getOrderedAssignableReplica(clusterModel)) {
Optional<AssignableNode> maybeBestNode =
getNodeWithHighestPoints(replica, nodes, clusterModel.getContext(), optimalAssignment);
// stop immediately if any replica cannot find best assignable node
if (optimalAssignment.hasAnyFailure()) {
String errorMessage = String.format(
"Unable to find any available candidate node for partition %s; Fail reasons: %s",
replica.getPartitionName(), optimalAssignment.getFailures());
throw new HelixRebalanceException(errorMessage,
HelixRebalanceException.Type.FAILED_TO_CALCULATE);
}
maybeBestNode.ifPresent(node -> clusterModel
.assign(replica.getResourceName(), replica.getPartitionName(), replica.getReplicaState(),
node.getInstanceName()));
}
optimalAssignment.updateAssignments(clusterModel);
return optimalAssignment;
}
private Optional<AssignableNode> getNodeWithHighestPoints(AssignableReplica replica,
List<AssignableNode> assignableNodes, ClusterContext clusterContext,
OptimalAssignment optimalAssignment) {
Map<AssignableNode, List<HardConstraint>> hardConstraintFailures = new HashMap<>();
List<AssignableNode> candidateNodes = assignableNodes.stream().filter(candidateNode -> {
boolean isValid = true;
// need to record all the failure reasons and it gives us the ability to debug/fix the runtime
// cluster environment
for (HardConstraint hardConstraint : _hardConstraints) {
if (!hardConstraint.isAssignmentValid(candidateNode, replica, clusterContext)) {
hardConstraintFailures.computeIfAbsent(candidateNode, node -> new ArrayList<>())
.add(hardConstraint);
isValid = false;
}
}
return isValid;
}).collect(Collectors.toList());
if (candidateNodes.isEmpty()) {
optimalAssignment.recordAssignmentFailure(replica,
Maps.transformValues(hardConstraintFailures, this::convertFailureReasons));
return Optional.empty();
}
Function<AssignableNode, Double> calculatePoints =
(candidateNode) -> getAssignmentNormalizedScore(candidateNode, replica, clusterContext);
return candidateNodes.stream().max(Comparator.comparing(calculatePoints));
}
private double getAssignmentNormalizedScore(AssignableNode node, AssignableReplica replica,
ClusterContext clusterContext) {
double sum = 0;
for (Map.Entry<SoftConstraint, Float> softConstraintEntry : _softConstraints.entrySet()) {
SoftConstraint softConstraint = softConstraintEntry.getKey();
float weight = softConstraintEntry.getValue();
if (weight != 0) {
// Skip calculating zero weighted constraints.
sum += weight * softConstraint.getAssignmentNormalizedScore(node, replica, clusterContext);
}
}
return sum;
}
private List<String> convertFailureReasons(List<HardConstraint> hardConstraints) {
return hardConstraints.stream().map(HardConstraint::getDescription)
.collect(Collectors.toList());
}
private List<AssignableReplica> getOrderedAssignableReplica(ClusterModel clusterModel) {
Map<String, Set<AssignableReplica>> replicasByResource = clusterModel.getAssignableReplicaMap();
List<AssignableReplica> orderedAssignableReplicas =
replicasByResource.values().stream().flatMap(replicas -> replicas.stream())
.collect(Collectors.toList());
Map<String, ResourceAssignment> bestPossibleAssignment =
clusterModel.getContext().getBestPossibleAssignment();
Map<String, ResourceAssignment> baselineAssignment =
clusterModel.getContext().getBaselineAssignment();
// 1. Sort according if the assignment exists in the best possible and/or baseline assignment
// 2. Sort according to the state priority. Note that prioritizing the top state is required.
// Or the greedy algorithm will unnecessarily shuffle the states between replicas.
// 3. Sort according to the resource/partition name.
orderedAssignableReplicas.sort((replica1, replica2) -> {
String resourceName1 = replica1.getResourceName();
String resourceName2 = replica2.getResourceName();
if (bestPossibleAssignment.containsKey(resourceName1) == bestPossibleAssignment
.containsKey(resourceName2)) {
if (baselineAssignment.containsKey(resourceName1) == baselineAssignment
.containsKey(resourceName2)) {
// If both assignment states have/not have the resource assignment the same,
// compare for additional dimensions.
int statePriority1 = replica1.getStatePriority();
int statePriority2 = replica2.getStatePriority();
if (statePriority1 == statePriority2) {
// If state priorities are the same, try to randomize the replicas order. Otherwise,
// the same replicas might always be moved in each rebalancing. This is because their
// placement calculating will always happen at the critical moment while the cluster is
// almost close to the expected utilization.
//
// Note that to ensure the algorithm is deterministic with the same inputs, do not use
// Random functions here. Use hashcode based on the cluster topology information to get
// a controlled randomized order is good enough.
Long replicaHash1 = (long) Objects
.hash(replica1.toString(), clusterModel.getAssignableNodes().keySet());
Long replicaHash2 = (long) Objects
.hash(replica2.toString(), clusterModel.getAssignableNodes().keySet());
if (!replicaHash1.equals(replicaHash2)) {
return replicaHash1.compareTo(replicaHash2);
} else {
// In case of hash collision, return order according to the name.
return replica1.toString().compareTo(replica2.toString());
}
} else {
// Note we shall prioritize the replica with a higher state priority,
// the smaller priority number means higher priority.
return statePriority1 - statePriority2;
}
} else {
// If the baseline assignment contains the assignment, prioritize the replica.
return baselineAssignment.containsKey(resourceName1) ? -1 : 1;
}
} else {
// If the best possible assignment contains the assignment, prioritize the replica.
return bestPossibleAssignment.containsKey(resourceName1) ? -1 : 1;
}
});
return orderedAssignableReplicas;
}
}