blob: 847109c5234a318c7b4a97e5283f35725b83ed54 [file] [log] [blame]
package org.apache.helix.controller.rebalancer.strategy;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import org.apache.helix.HelixException;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.controller.LogUtil;
import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import org.apache.helix.controller.rebalancer.strategy.crushMapping.CRUSHPlacementAlgorithm;
import org.apache.helix.controller.rebalancer.topology.InstanceNode;
import org.apache.helix.controller.rebalancer.topology.Node;
import org.apache.helix.controller.rebalancer.topology.Topology;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.util.JenkinsHash;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Multi-round CRUSH partition mapping strategy.
* This gives more even partition distribution in case of small number of partitions,
* but number of partitions to be reshuffled during node outage could be higher than CrushRebalanceStrategy.
*/
public class MultiRoundCrushRebalanceStrategy implements RebalanceStrategy<ResourceControllerDataProvider> {
private static final Logger Log =
LoggerFactory.getLogger(MultiRoundCrushRebalanceStrategy.class.getName());
private String _resourceName;
private List<String> _partitions;
private Topology _clusterTopo;
private int _replicas;
private LinkedHashMap<String, Integer> _stateCountMap;
private final int MAX_ITERATION = 3;
@Override
public void init(String resourceName, final List<String> partitions,
final LinkedHashMap<String, Integer> states, int maximumPerNode) {
_resourceName = resourceName;
_partitions = partitions;
_replicas = countStateReplicas(states);
_stateCountMap = states;
}
/**
* Compute the preference lists and (optional partition-state mapping) for the given resource.
*
* @param allNodes All instances
* @param liveNodes List of live instances
* @param currentMapping current replica mapping
* @param clusterData cluster data
* @return
* @throws HelixException if a map can not be found
*/
@Override
public ZNRecord computePartitionAssignment(final List<String> allNodes,
final List<String> liveNodes, final Map<String, Map<String, String>> currentMapping,
ResourceControllerDataProvider clusterData) throws HelixException {
Map<String, InstanceConfig> instanceConfigMap = clusterData.getInstanceConfigMap();
_clusterTopo =
new Topology(allNodes, liveNodes, instanceConfigMap, clusterData.getClusterConfig());
Node root = _clusterTopo.getRootNode();
Map<String, List<Node>> zoneMapping = new HashMap<>();
for (int i = 0; i < _partitions.size(); i++) {
String partitionName = _partitions.get(i);
long pData = partitionName.hashCode();
// select zones for this partition
List<Node> zones = select(root, _clusterTopo.getFaultZoneType(), pData, _replicas);
zoneMapping.put(partitionName, zones);
}
/* map the position in preference list to the state */
Map<Integer, String> idxStateMap = new HashMap<>();
int i = 0;
for (Map.Entry<String, Integer> e : _stateCountMap.entrySet()) {
String state = e.getKey();
int count = e.getValue();
for (int j = 0; j < count; j++) {
idxStateMap.put(i + j, state);
}
i += count;
}
// Final mapping <partition, state> -> list(node)
Map<String, Map<String, List<Node>>> partitionStateMapping = new HashMap<>();
for (Node zone : _clusterTopo.getFaultZones()) {
// partition state -> list(partitions)
LinkedHashMap<String, List<String>> statePartitionMap = new LinkedHashMap<>();
// TODO: move this outside?
for (Map.Entry<String, List<Node>> e : zoneMapping.entrySet()) {
String partition = e.getKey();
List<Node> zones = e.getValue();
for (int k = 0; k < zones.size(); k++) {
if (zones.get(k).equals(zone)) {
String state = idxStateMap.get(k);
if (!statePartitionMap.containsKey(state)) {
statePartitionMap.put(state, new ArrayList<String>());
}
statePartitionMap.get(state).add(partition);
}
}
}
for (String state : _stateCountMap.keySet()) {
List<String> partitions = statePartitionMap.get(state);
if (partitions != null && !partitions.isEmpty()) {
Map<String, Node> assignments = singleZoneMapping(zone, partitions);
for (String partition : assignments.keySet()) {
Node node = assignments.get(partition);
if (!partitionStateMapping.containsKey(partition)) {
partitionStateMapping.put(partition, new HashMap<String, List<Node>>());
}
Map<String, List<Node>> stateMapping = partitionStateMapping.get(partition);
if (!stateMapping.containsKey(state)) {
stateMapping.put(state, new ArrayList<Node>());
}
stateMapping.get(state).add(node);
}
}
}
}
return generateZNRecord(_resourceName, _partitions, partitionStateMapping,
clusterData.getClusterEventId());
}
private ZNRecord generateZNRecord(String resource, List<String> partitions,
Map<String, Map<String, List<Node>>> partitionStateMapping, String eventId) {
Map<String, List<String>> newPreferences = new HashMap<>();
for (int i = 0; i < partitions.size(); i++) {
String partitionName = partitions.get(i);
Map<String, List<Node>> stateNodeMap = partitionStateMapping.get(partitionName);
for (String state : _stateCountMap.keySet()) {
List<Node> nodes = stateNodeMap.get(state);
List<String> nodeList = new ArrayList<>();
for (int j = 0; j < nodes.size(); j++) {
Node selectedNode = nodes.get(j);
if (selectedNode instanceof InstanceNode) {
nodeList.add(((InstanceNode) selectedNode).getInstanceName());
} else {
LogUtil.logError(Log, eventId,
"Selected node is not associated with an instance: " + selectedNode.toString());
}
}
if (!newPreferences.containsKey(partitionName)) {
newPreferences.put(partitionName, new ArrayList<String>());
}
newPreferences.get(partitionName).addAll(nodeList);
}
}
ZNRecord result = new ZNRecord(resource);
result.setListFields(newPreferences);
return result;
}
/**
* Compute mapping of partition to node in a single zone.
* Assumption: A partition should have only one replica at one zone.
* Will apply CRUSH multiple times until all partitions are mostly even distributed.
*
* @param zone the zone
* @param partitions partitions to be assigned to nodes in the given zone.
* @return partition to node mapping in this zone.
*/
private Map<String, Node> singleZoneMapping(Node zone, List<String> partitions) {
if (zone.isFailed() || zone.getWeight() == 0 || partitions.isEmpty()) {
return Collections.emptyMap();
}
long totalWeight = zone.getWeight();
int totalPartition = partitions.size();
// node to all its assigned partitions.
Map<Node, List<String>> nodePartitionsMap = new HashMap<>();
Map<Node, List<String>> prevNodePartitionsMap = new HashMap<>();
List<String> partitionsToAssign = new ArrayList<>(partitions);
Map<Node, List<String>> toRemovedMap = new HashMap<>();
int iteration = 0;
Node root = zone;
boolean noAssignmentFound = false;
while (iteration++ < MAX_ITERATION && !noAssignmentFound) {
copyAssignment(nodePartitionsMap, prevNodePartitionsMap);
for (Map.Entry<Node, List<String>> e : toRemovedMap.entrySet()) {
List<String> curAssignedPartitions = nodePartitionsMap.get(e.getKey());
List<String> toRemoved = e.getValue();
curAssignedPartitions.removeAll(toRemoved);
partitionsToAssign.addAll(toRemoved);
}
for (String p : partitionsToAssign) {
long pData = p.hashCode();
List<Node> nodes;
try {
nodes = select(root, _clusterTopo.getEndNodeType(), pData, 1);
} catch (IllegalStateException e) {
nodePartitionsMap = prevNodePartitionsMap;
noAssignmentFound = true;
break;
}
for (Node n : nodes) {
if (!nodePartitionsMap.containsKey(n)) {
nodePartitionsMap.put(n, new ArrayList<String>());
}
nodePartitionsMap.get(n).add(p);
}
root = recalculateWeight(zone, totalWeight, totalPartition, nodePartitionsMap, partitions,
toRemovedMap);
}
partitionsToAssign.clear();
}
Map<String, Node> partitionMap = new HashMap<>();
for (Map.Entry<Node, List<String>> e : nodePartitionsMap.entrySet()) {
Node n = e.getKey();
List<String> assigned = e.getValue();
for (String p : assigned) {
partitionMap.put(p, n);
}
}
return partitionMap;
}
private void copyAssignment(Map<Node, List<String>> nodePartitionsMap,
Map<Node, List<String>> prevNodePartitionMap) {
if (nodePartitionsMap.size() > 0) {
for (Node node : nodePartitionsMap.keySet()) {
prevNodePartitionMap.put(node, new ArrayList<>(nodePartitionsMap.get(node)));
}
}
}
private Node recalculateWeight(Node zone, long totalWeight, int totalPartition,
Map<Node, List<String>> nodePartitionsMap, List<String> partitions,
Map<Node, List<String>> toRemovedMap) {
Map<Node, Integer> newNodeWeight = new HashMap<>();
Set<Node> completedNodes = new HashSet<>();
for (Node node : Topology.getAllLeafNodes(zone)) {
if (node.isFailed()) {
completedNodes.add(node);
continue;
}
long weight = node.getWeight();
double ratio = ((double) weight) / (double) totalWeight;
int target = (int) Math.floor(ratio * totalPartition);
List<String> assignedPatitions = nodePartitionsMap.get(node);
int numPartitions = 0;
if (assignedPatitions != null) {
numPartitions = assignedPatitions.size();
}
if (numPartitions > target + 1) {
int remove = numPartitions - target - 1;
Collections.sort(partitions);
List<String> toRemoved = new ArrayList<>(assignedPatitions.subList(0, remove));
toRemovedMap.put(node, toRemoved);
}
int missing = target - numPartitions;
if (missing > 0) {
newNodeWeight.put(node, missing * 10);
} else {
completedNodes.add(node);
}
}
if (!newNodeWeight.isEmpty()) {
// iterate more
return _clusterTopo.clone(zone, newNodeWeight, completedNodes);
}
// If the newNodeWeight map is empty, it will use old root tree to calculate it.
return zone;
}
/**
* Number of retries for finding an appropriate instance for a replica.
*/
private static final int MAX_RETRY = 100;
private final JenkinsHash hashFun = new JenkinsHash();
private CRUSHPlacementAlgorithm placementAlgorithm = new CRUSHPlacementAlgorithm();
/**
* For given input, select a number of children with given type.
* The caller will either get the expected number of selected nodes as a result,
* or an exception will be thrown.
*/
private List<Node> select(Node topNode, String nodeType, long data, int rf)
throws HelixException {
List<Node> nodes = new ArrayList<>();
long input = data;
int count = rf;
int tries = 0;
while (nodes.size() < rf) {
List<Node> selected = placementAlgorithm
.select(topNode, input, rf, nodeType, nodeAlreadySelected(new HashSet<>(nodes)));
// add the racks to the selected racks
nodes.addAll(selected);
count = rf - nodes.size();
if (count > 0) {
input = hashFun.hash(input); // create a different hash value for retrying
tries++;
if (tries >= MAX_RETRY) {
throw new HelixException(
String.format("could not find all mappings after %d tries", tries));
}
}
}
return nodes;
}
/**
* Use the predicate to reject already selected zones or nodes.
*/
private Predicate<Node> nodeAlreadySelected(Set<Node> selectedNodes) {
return Predicates.not(Predicates.in(selectedNodes));
}
/**
* Counts the total number of replicas given a state-count mapping
* @return
*/
private int countStateReplicas(Map<String, Integer> stateCountMap) {
int total = 0;
for (Integer count : stateCountMap.values()) {
total += count;
}
return total;
}
}