| package org.apache.helix.controller.rebalancer; |
| |
| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collection; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Map.Entry; |
| import java.util.Random; |
| import java.util.Set; |
| import java.util.TreeMap; |
| import java.util.TreeSet; |
| |
| import com.google.common.collect.ImmutableList; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Sets; |
| import org.apache.helix.HelixDefinedState; |
| import org.apache.helix.MockAccessor; |
| import org.apache.helix.PropertyKey.Builder; |
| import org.apache.helix.ZNRecord; |
| import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; |
| import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy; |
| import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy; |
| import org.apache.helix.controller.stages.CurrentStateOutput; |
| import org.apache.helix.model.ClusterConfig; |
| import org.apache.helix.model.IdealState; |
| import org.apache.helix.model.LiveInstance; |
| import org.apache.helix.model.Partition; |
| import org.apache.helix.model.StateModelDefinition; |
| import org.apache.helix.tools.StateModelConfigGenerator; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| import org.testng.Assert; |
| import org.testng.annotations.Test; |
| |
| public class TestAutoRebalanceStrategy { |
| private static Logger logger = LoggerFactory.getLogger(TestAutoRebalanceStrategy.class); |
| |
| /** |
| * Sanity test for a basic Master-Slave model |
| */ |
| @Test |
| public void simpleMasterSlaveTest() { |
| final int NUM_ITERATIONS = 10; |
| final int NUM_PARTITIONS = 10; |
| final int NUM_LIVE_NODES = 12; |
| final int NUM_TOTAL_NODES = 20; |
| final int MAX_PER_NODE = 5; |
| |
| final String[] STATE_NAMES = { |
| "MASTER", "SLAVE" |
| }; |
| final int[] STATE_COUNTS = { |
| 1, 2 |
| }; |
| |
| runTest("BasicMasterSlave", NUM_ITERATIONS, NUM_PARTITIONS, NUM_LIVE_NODES, NUM_TOTAL_NODES, |
| MAX_PER_NODE, STATE_NAMES, STATE_COUNTS); |
| } |
| |
| /** |
| * Run a test for an arbitrary state model. |
| * @param name Name of the test state model |
| * @param numIterations Number of rebalance tasks to run |
| * @param numPartitions Number of partitions for the resource |
| * @param numLiveNodes Number of live nodes in the cluster |
| * @param numTotalNodes Number of nodes in the cluster, must be greater than or equal to |
| * numLiveNodes |
| * @param maxPerNode Maximum number of replicas a node can serve |
| * @param stateNames States ordered by preference |
| * @param stateCounts Number of replicas that should be in each state |
| */ |
| private void runTest(String name, int numIterations, int numPartitions, int numLiveNodes, |
| int numTotalNodes, int maxPerNode, String[] stateNames, int[] stateCounts) { |
| List<String> partitions = new ArrayList<String>(); |
| for (int i = 0; i < numPartitions; i++) { |
| partitions.add("p_" + i); |
| } |
| |
| List<String> liveNodes = new ArrayList<String>(); |
| List<String> allNodes = new ArrayList<String>(); |
| for (int i = 0; i < numTotalNodes; i++) { |
| allNodes.add("n_" + i); |
| if (i < numLiveNodes) { |
| liveNodes.add("n_" + i); |
| } |
| } |
| |
| Map<String, Map<String, String>> currentMapping = new TreeMap<String, Map<String, String>>(); |
| |
| LinkedHashMap<String, Integer> states = new LinkedHashMap<String, Integer>(); |
| for (int i = 0; i < Math.min(stateNames.length, stateCounts.length); i++) { |
| states.put(stateNames[i], stateCounts[i]); |
| } |
| |
| StateModelDefinition stateModelDef = getIncompleteStateModelDef(name, stateNames[0], states); |
| |
| new AutoRebalanceTester(partitions, states, liveNodes, currentMapping, allNodes, maxPerNode, |
| stateModelDef).runRepeatedly(numIterations); |
| } |
| |
| /** |
| * Get a StateModelDefinition without transitions. The auto rebalancer doesn't take transitions |
| * into account when computing mappings, so this is acceptable. |
| * @param modelName name to give the model |
| * @param initialState initial state for all nodes |
| * @param states ordered map of state to count |
| * @return incomplete StateModelDefinition for rebalancing |
| */ |
| private StateModelDefinition getIncompleteStateModelDef(String modelName, String initialState, |
| LinkedHashMap<String, Integer> states) { |
| StateModelDefinition.Builder builder = new StateModelDefinition.Builder(modelName); |
| builder.initialState(initialState); |
| int i = states.size(); |
| for (String state : states.keySet()) { |
| builder.addState(state, i); |
| builder.upperBound(state, states.get(state)); |
| i--; |
| } |
| return builder.build(); |
| } |
| |
| class AutoRebalanceTester { |
| private static final double P_KILL = 0.45; |
| private static final double P_ADD = 0.1; |
| private static final double P_RESURRECT = 0.45; |
| private static final String RESOURCE_NAME = "resource"; |
| |
| private List<String> _partitions; |
| private LinkedHashMap<String, Integer> _states; |
| private List<String> _liveNodes; |
| private Set<String> _liveSet; |
| private Set<String> _removedSet; |
| private Set<String> _nonLiveSet; |
| private Map<String, Map<String, String>> _currentMapping; |
| private List<String> _allNodes; |
| private int _maxPerNode; |
| private StateModelDefinition _stateModelDef; |
| private Random _random; |
| |
| public AutoRebalanceTester(List<String> partitions, LinkedHashMap<String, Integer> states, |
| List<String> liveNodes, Map<String, Map<String, String>> currentMapping, |
| List<String> allNodes, int maxPerNode, StateModelDefinition stateModelDef) { |
| _partitions = partitions; |
| _states = states; |
| _liveNodes = liveNodes; |
| _liveSet = new TreeSet<String>(); |
| for (String node : _liveNodes) { |
| _liveSet.add(node); |
| } |
| _removedSet = new TreeSet<String>(); |
| _nonLiveSet = new TreeSet<String>(); |
| _currentMapping = currentMapping; |
| _allNodes = allNodes; |
| for (String node : allNodes) { |
| if (!_liveSet.contains(node)) { |
| _nonLiveSet.add(node); |
| } |
| } |
| _maxPerNode = maxPerNode; |
| _stateModelDef = stateModelDef; |
| _random = new Random(); |
| } |
| |
| /** |
| * Repeatedly randomly select a task to run and report the result |
| * @param numIterations |
| * Number of random tasks to run in sequence |
| */ |
| public void runRepeatedly(int numIterations) { |
| logger.info("~~~~ Initial State ~~~~~"); |
| RebalanceStrategy strategy = |
| new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode); |
| ZNRecord initialResult = |
| strategy.computePartitionAssignment(_allNodes, _liveNodes, _currentMapping, null); |
| _currentMapping = getMapping(initialResult.getListFields()); |
| logger.info(_currentMapping.toString()); |
| getRunResult(_currentMapping, initialResult.getListFields()); |
| for (int i = 0; i < numIterations; i++) { |
| logger.info("~~~~ Iteration " + i + " ~~~~~"); |
| ZNRecord znRecord = runOnceRandomly(); |
| if (znRecord != null) { |
| final Map<String, List<String>> listResult = znRecord.getListFields(); |
| final Map<String, Map<String, String>> mapResult = getMapping(listResult); |
| logger.info(mapResult.toString()); |
| logger.info(listResult.toString()); |
| getRunResult(mapResult, listResult); |
| _currentMapping = mapResult; |
| } |
| } |
| } |
| |
| private Map<String, Map<String, String>> getMapping(final Map<String, List<String>> listResult) { |
| final Map<String, Map<String, String>> mapResult = new HashMap<String, Map<String, String>>(); |
| ResourceControllerDataProvider cache = new ResourceControllerDataProvider(); |
| MockAccessor accessor = new MockAccessor(); |
| Builder keyBuilder = accessor.keyBuilder(); |
| ClusterConfig clusterConfig = new ClusterConfig("TestCluster"); |
| accessor.setProperty(keyBuilder.clusterConfig(), clusterConfig); |
| for (String node : _liveNodes) { |
| LiveInstance liveInstance = new LiveInstance(node); |
| liveInstance.setSessionId("testSession"); |
| accessor.setProperty(keyBuilder.liveInstance(node), liveInstance); |
| } |
| cache.refresh(accessor); |
| |
| IdealState is = new IdealState("resource"); |
| for (String partition : _partitions) { |
| List<String> preferenceList = listResult.get(partition); |
| Map<String, String> currentStateMap = _currentMapping.get(partition); |
| Set<String> disabled = Collections.emptySet(); |
| Partition p = new Partition(partition); |
| CurrentStateOutput currentStateOutput = new CurrentStateOutput(); |
| if (currentStateMap != null) { |
| for (String instance : currentStateMap.keySet()) { |
| currentStateOutput |
| .setCurrentState("resource", p, instance, currentStateMap.get(instance)); |
| } |
| } |
| Map<String, String> assignment = new AutoRebalancer() |
| .computeBestPossibleStateForPartition(cache.getLiveInstances().keySet(), _stateModelDef, |
| preferenceList, currentStateOutput, disabled, is, clusterConfig, p); |
| mapResult.put(partition, assignment); |
| } |
| return mapResult; |
| } |
| |
| /** |
| * Output various statistics and correctness check results |
| * @param mapFields |
| * The map-map assignment generated by the rebalancer |
| * @param listFields |
| * The map-list assignment generated by the rebalancer |
| */ |
| public void getRunResult(final Map<String, Map<String, String>> mapFields, |
| final Map<String, List<String>> listFields) { |
| logger.info("***** Statistics *****"); |
| dumpStatistics(mapFields); |
| verifyCorrectness(mapFields, listFields); |
| } |
| |
| /** |
| * Output statistics about the assignment |
| * @param mapFields |
| * The map-map assignment generated by the rebalancer |
| */ |
| public void dumpStatistics(final Map<String, Map<String, String>> mapFields) { |
| Map<String, Integer> partitionsPerNode = getPartitionBucketsForNode(mapFields); |
| int nodeCount = _liveNodes.size(); |
| logger.info("Total number of nodes: " + nodeCount); |
| logger.info("Nodes: " + _liveNodes); |
| int sumPartitions = getSum(partitionsPerNode.values()); |
| logger.info("Total number of partitions: " + sumPartitions); |
| double averagePartitions = getAverage(partitionsPerNode.values()); |
| logger.info("Average number of partitions per node: " + averagePartitions); |
| double stdevPartitions = getStdev(partitionsPerNode.values(), averagePartitions); |
| logger.info("Standard deviation of partitions: " + stdevPartitions); |
| |
| // Statistics about each state |
| Map<String, Map<String, Integer>> statesPerNode = getStateBucketsForNode(mapFields); |
| for (String state : _states.keySet()) { |
| Map<String, Integer> nodeStateCounts = new TreeMap<String, Integer>(); |
| for (Entry<String, Map<String, Integer>> nodeStates : statesPerNode.entrySet()) { |
| Map<String, Integer> stateCounts = nodeStates.getValue(); |
| if (stateCounts.containsKey(state)) { |
| nodeStateCounts.put(nodeStates.getKey(), stateCounts.get(state)); |
| } else { |
| nodeStateCounts.put(nodeStates.getKey(), 0); |
| } |
| } |
| int sumStates = getSum(nodeStateCounts.values()); |
| logger.info("Total number of state " + state + ": " + sumStates); |
| double averageStates = getAverage(nodeStateCounts.values()); |
| logger.info("Average number of state " + state + " per node: " + averageStates); |
| double stdevStates = getStdev(nodeStateCounts.values(), averageStates); |
| logger.info("Standard deviation of state " + state + " per node: " + stdevStates); |
| } |
| } |
| |
| /** |
| * Run a set of correctness tests, reporting success or failure |
| * @param mapFields |
| * The map-map assignment generated by the rebalancer |
| * @param listFields |
| * The map-list assignment generated by the rebalancer |
| */ |
| public void verifyCorrectness(final Map<String, Map<String, String>> mapFields, |
| final Map<String, List<String>> listFields) { |
| final Map<String, Integer> partitionsPerNode = getPartitionBucketsForNode(mapFields); |
| boolean maxConstraintMet = maxNotExceeded(partitionsPerNode); |
| assert maxConstraintMet : "Max per node constraint: FAIL"; |
| logger.info("Max per node constraint: PASS"); |
| |
| boolean liveConstraintMet = onlyLiveAssigned(partitionsPerNode); |
| assert liveConstraintMet : "Only live nodes have partitions constraint: FAIL"; |
| logger.info("Only live nodes have partitions constraint: PASS"); |
| |
| boolean stateAssignmentPossible = correctStateAssignmentCount(mapFields); |
| assert stateAssignmentPossible : "State replica constraint: FAIL"; |
| logger.info("State replica constraint: PASS"); |
| |
| boolean nodesUniqueForPartitions = atMostOnePartitionReplicaPerNode(listFields); |
| assert nodesUniqueForPartitions : "Node uniqueness per partition constraint: FAIL"; |
| logger.info("Node uniqueness per partition constraint: PASS"); |
| } |
| |
| private boolean maxNotExceeded(final Map<String, Integer> partitionsPerNode) { |
| for (String node : partitionsPerNode.keySet()) { |
| Integer value = partitionsPerNode.get(node); |
| if (value > _maxPerNode) { |
| logger.error("ERROR: Node " + node + " has " + value |
| + " partitions despite a maximum of " + _maxPerNode); |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| private boolean onlyLiveAssigned(final Map<String, Integer> partitionsPerNode) { |
| for (final Entry<String, Integer> nodeState : partitionsPerNode.entrySet()) { |
| boolean isLive = _liveSet.contains(nodeState.getKey()); |
| boolean isEmpty = nodeState.getValue() == 0; |
| if (!isLive && !isEmpty) { |
| logger.error("ERROR: Node " + nodeState.getKey() + " is not live, but has " |
| + nodeState.getValue() + " replicas!"); |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| private boolean correctStateAssignmentCount(final Map<String, Map<String, String>> assignment) { |
| for (final Entry<String, Map<String, String>> partitionEntry : assignment.entrySet()) { |
| final Map<String, String> nodeMap = partitionEntry.getValue(); |
| final Map<String, Integer> stateCounts = new TreeMap<String, Integer>(); |
| for (String state : nodeMap.values()) { |
| if (!stateCounts.containsKey(state)) { |
| stateCounts.put(state, 1); |
| } else { |
| stateCounts.put(state, stateCounts.get(state) + 1); |
| } |
| } |
| for (String state : stateCounts.keySet()) { |
| if (state.equals(HelixDefinedState.DROPPED.toString())) { |
| continue; |
| } |
| int count = stateCounts.get(state); |
| int maximumCount = _states.get(state); |
| if (count > maximumCount) { |
| logger.error("ERROR: State " + state + " for partition " + partitionEntry.getKey() |
| + " has " + count + " replicas when " + maximumCount + " is allowed!"); |
| return false; |
| } |
| } |
| } |
| return true; |
| } |
| |
| private boolean atMostOnePartitionReplicaPerNode(final Map<String, List<String>> listFields) { |
| for (final Entry<String, List<String>> partitionEntry : listFields.entrySet()) { |
| Set<String> nodeSet = new HashSet<String>(partitionEntry.getValue()); |
| int numUniques = nodeSet.size(); |
| int total = partitionEntry.getValue().size(); |
| if (numUniques < total) { |
| logger.error("ERROR: Partition " + partitionEntry.getKey() + " is assigned to " + total |
| + " nodes, but only " + numUniques + " are unique!"); |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| private double getAverage(final Collection<Integer> values) { |
| double sum = 0.0; |
| for (Integer value : values) { |
| sum += value; |
| } |
| if (values.size() != 0) { |
| return sum / values.size(); |
| } else { |
| return -1.0; |
| } |
| } |
| |
| private int getSum(final Collection<Integer> values) { |
| int sum = 0; |
| for (Integer value : values) { |
| sum += value; |
| } |
| return sum; |
| } |
| |
| private double getStdev(final Collection<Integer> values, double mean) { |
| double sum = 0.0; |
| for (Integer value : values) { |
| double deviation = mean - value; |
| sum += Math.pow(deviation, 2.0); |
| } |
| if (values.size() != 0) { |
| sum /= values.size(); |
| return Math.pow(sum, 0.5); |
| } else { |
| return -1.0; |
| } |
| } |
| |
| private Map<String, Integer> getPartitionBucketsForNode( |
| final Map<String, Map<String, String>> assignment) { |
| Map<String, Integer> partitionsPerNode = new TreeMap<String, Integer>(); |
| for (String node : _liveNodes) { |
| partitionsPerNode.put(node, 0); |
| } |
| for (Entry<String, Map<String, String>> partitionEntry : assignment.entrySet()) { |
| final Map<String, String> nodeMap = partitionEntry.getValue(); |
| for (String node : nodeMap.keySet()) { |
| String state = nodeMap.get(node); |
| if (state.equals(HelixDefinedState.DROPPED.toString())) { |
| continue; |
| } |
| // add 1 for every occurrence of a node |
| if (!partitionsPerNode.containsKey(node)) { |
| partitionsPerNode.put(node, 1); |
| } else { |
| partitionsPerNode.put(node, partitionsPerNode.get(node) + 1); |
| } |
| } |
| } |
| return partitionsPerNode; |
| } |
| |
| private Map<String, Map<String, Integer>> getStateBucketsForNode( |
| final Map<String, Map<String, String>> assignment) { |
| Map<String, Map<String, Integer>> result = new TreeMap<String, Map<String, Integer>>(); |
| for (String n : _liveNodes) { |
| result.put(n, new TreeMap<String, Integer>()); |
| } |
| for (Map<String, String> nodeStateMap : assignment.values()) { |
| for (Entry<String, String> nodeState : nodeStateMap.entrySet()) { |
| if (!result.containsKey(nodeState.getKey())) { |
| result.put(nodeState.getKey(), new TreeMap<String, Integer>()); |
| } |
| Map<String, Integer> stateMap = result.get(nodeState.getKey()); |
| if (!stateMap.containsKey(nodeState.getValue())) { |
| stateMap.put(nodeState.getValue(), 1); |
| } else { |
| stateMap.put(nodeState.getValue(), stateMap.get(nodeState.getValue()) + 1); |
| } |
| } |
| } |
| return result; |
| } |
| |
| /** |
| * Randomly choose between killing, adding, or resurrecting a single node |
| * @return (Partition -> (Node -> State)) ZNRecord |
| */ |
| public ZNRecord runOnceRandomly() { |
| double choose = _random.nextDouble(); |
| ZNRecord result = null; |
| if (choose < P_KILL) { |
| result = removeSingleNode(null); |
| } else if (choose < P_KILL + P_ADD) { |
| result = addSingleNode(null); |
| } else if (choose < P_KILL + P_ADD + P_RESURRECT) { |
| result = resurrectSingleNode(null); |
| } |
| return result; |
| } |
| |
| /** |
| * Run rebalancer trying to add a never-live node |
| * @param node |
| * Optional String to add |
| * @return ZNRecord result returned by the rebalancer |
| */ |
| public ZNRecord addSingleNode(String node) { |
| logger.info("=================== add node ================="); |
| if (_nonLiveSet.size() == 0) { |
| logger.warn("Cannot add node because there are no nodes left to add."); |
| return null; |
| } |
| |
| // Get a random never-live node |
| if (node == null || !_nonLiveSet.contains(node)) { |
| node = getRandomSetElement(_nonLiveSet); |
| } |
| logger.info("Adding " + node); |
| _liveNodes.add(node); |
| _liveSet.add(node); |
| _nonLiveSet.remove(node); |
| |
| return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode). |
| computePartitionAssignment(_allNodes, _liveNodes, _currentMapping, null); |
| } |
| |
| /** |
| * Run rebalancer trying to remove a live node |
| * @param node |
| * Optional String to remove |
| * @return ZNRecord result returned by the rebalancer |
| */ |
| public ZNRecord removeSingleNode(String node) { |
| logger.info("=================== remove node ================="); |
| if (_liveSet.size() == 0) { |
| logger.warn("Cannot remove node because there are no nodes left to remove."); |
| return null; |
| } |
| |
| // Get a random never-live node |
| if (node == null || !_liveSet.contains(node)) { |
| node = getRandomSetElement(_liveSet); |
| } |
| logger.info("Removing " + node); |
| _removedSet.add(node); |
| _liveNodes.remove(node); |
| _liveSet.remove(node); |
| |
| // the rebalancer expects that the current mapping doesn't contain deleted |
| // nodes |
| for (Map<String, String> nodeMap : _currentMapping.values()) { |
| if (nodeMap.containsKey(node)) { |
| nodeMap.remove(node); |
| } |
| } |
| |
| return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode) |
| .computePartitionAssignment(_allNodes, _liveNodes, _currentMapping, null); |
| } |
| |
| /** |
| * Run rebalancer trying to add back a removed node |
| * @param node |
| * Optional String to resurrect |
| * @return ZNRecord result returned by the rebalancer |
| */ |
| public ZNRecord resurrectSingleNode(String node) { |
| logger.info("=================== resurrect node ================="); |
| if (_removedSet.size() == 0) { |
| logger.warn("Cannot remove node because there are no nodes left to resurrect."); |
| return null; |
| } |
| |
| // Get a random never-live node |
| if (node == null || !_removedSet.contains(node)) { |
| node = getRandomSetElement(_removedSet); |
| } |
| logger.info("Resurrecting " + node); |
| _removedSet.remove(node); |
| _liveNodes.add(node); |
| _liveSet.add(node); |
| |
| return new AutoRebalanceStrategy(RESOURCE_NAME, _partitions, _states, _maxPerNode) |
| .computePartitionAssignment(_allNodes, _liveNodes, _currentMapping, null); |
| } |
| |
| private <T> T getRandomSetElement(Set<T> source) { |
| int element = _random.nextInt(source.size()); |
| int i = 0; |
| for (T node : source) { |
| if (i == element) { |
| return node; |
| } |
| i++; |
| } |
| return null; |
| } |
| } |
| |
| /** |
| * Tests the following scenario: nodes come up one by one, then one node is taken down. Preference |
| * lists should prefer nodes in the current mapping at all times, but when all nodes are in the |
| * current mapping, then it should distribute states as evenly as possible. |
| */ |
| @Test |
| public void testOrphansNotPreferred() { |
| final String RESOURCE_NAME = "resource"; |
| final String[] PARTITIONS = { |
| "resource_0", "resource_1", "resource_2" |
| }; |
| final StateModelDefinition STATE_MODEL = |
| new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave()); |
| final int REPLICA_COUNT = 2; |
| final String[] NODES = { |
| "n0", "n1", "n2" |
| }; |
| |
| // initial state, one node, no mapping |
| List<String> allNodes = Lists.newArrayList(NODES[0]); |
| List<String> liveNodes = Lists.newArrayList(NODES[0]); |
| Map<String, Map<String, String>> currentMapping = Maps.newHashMap(); |
| for (String partition : PARTITIONS) { |
| currentMapping.put(partition, new HashMap<String, String>()); |
| } |
| |
| // make sure that when the first node joins, a single replica is assigned fairly |
| List<String> partitions = ImmutableList.copyOf(PARTITIONS); |
| LinkedHashMap<String, Integer> stateCount = |
| STATE_MODEL.getStateCountMap(liveNodes.size(), REPLICA_COUNT); |
| ZNRecord znRecord = |
| new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount) |
| .computePartitionAssignment(allNodes, liveNodes, currentMapping, null); |
| Map<String, List<String>> preferenceLists = znRecord.getListFields(); |
| for (String partition : currentMapping.keySet()) { |
| // make sure these are all MASTER |
| List<String> preferenceList = preferenceLists.get(partition); |
| Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); |
| Assert.assertEquals(preferenceList.size(), 1, "invalid preference list for " + partition); |
| } |
| |
| // now assign a replica to the first node in the current mapping, and add a second node |
| allNodes.add(NODES[1]); |
| liveNodes.add(NODES[1]); |
| stateCount = STATE_MODEL.getStateCountMap(liveNodes.size(), REPLICA_COUNT); |
| for (String partition : PARTITIONS) { |
| currentMapping.get(partition).put(NODES[0], "MASTER"); |
| } |
| znRecord = |
| new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount) |
| .computePartitionAssignment(allNodes, liveNodes, currentMapping, null); |
| preferenceLists = znRecord.getListFields(); |
| for (String partition : currentMapping.keySet()) { |
| List<String> preferenceList = preferenceLists.get(partition); |
| Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); |
| Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition); |
| Assert.assertEquals(preferenceList.get(0), NODES[0], "invalid preference list for " |
| + partition); |
| Assert.assertEquals(preferenceList.get(1), NODES[1], "invalid preference list for " |
| + partition); |
| } |
| |
| // now set the current mapping to reflect this update and make sure that it distributes masters |
| for (String partition : PARTITIONS) { |
| currentMapping.get(partition).put(NODES[1], "SLAVE"); |
| } |
| znRecord = |
| new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount) |
| .computePartitionAssignment(allNodes, liveNodes, currentMapping, null); |
| preferenceLists = znRecord.getListFields(); |
| Set<String> firstNodes = Sets.newHashSet(); |
| for (String partition : currentMapping.keySet()) { |
| List<String> preferenceList = preferenceLists.get(partition); |
| Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); |
| Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition); |
| firstNodes.add(preferenceList.get(0)); |
| } |
| Assert.assertEquals(firstNodes.size(), 2, "masters not evenly distributed"); |
| |
| // set a mapping corresponding to a valid mapping for 2 nodes, add a third node, check that the |
| // new node is never the most preferred |
| allNodes.add(NODES[2]); |
| liveNodes.add(NODES[2]); |
| stateCount = STATE_MODEL.getStateCountMap(liveNodes.size(), REPLICA_COUNT); |
| |
| // recall that the other two partitions are [MASTER, SLAVE], which is fine, just reorder one |
| currentMapping.get(PARTITIONS[1]).put(NODES[0], "SLAVE"); |
| currentMapping.get(PARTITIONS[1]).put(NODES[1], "MASTER"); |
| znRecord = |
| new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount) |
| .computePartitionAssignment(allNodes, liveNodes, currentMapping, null); |
| preferenceLists = znRecord.getListFields(); |
| boolean newNodeUsed = false; |
| for (String partition : currentMapping.keySet()) { |
| List<String> preferenceList = preferenceLists.get(partition); |
| Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); |
| Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition); |
| if (preferenceList.contains(NODES[2])) { |
| newNodeUsed = true; |
| Assert.assertEquals(preferenceList.get(1), NODES[2], |
| "newly added node not at preference list tail for " + partition); |
| } |
| } |
| Assert.assertTrue(newNodeUsed, "not using " + NODES[2]); |
| |
| // now remap this to take the new node into account, should go back to balancing masters, slaves |
| // evenly across all nodes |
| for (String partition : PARTITIONS) { |
| currentMapping.get(partition).clear(); |
| } |
| currentMapping.get(PARTITIONS[0]).put(NODES[0], "MASTER"); |
| currentMapping.get(PARTITIONS[0]).put(NODES[1], "SLAVE"); |
| currentMapping.get(PARTITIONS[1]).put(NODES[1], "MASTER"); |
| currentMapping.get(PARTITIONS[1]).put(NODES[2], "SLAVE"); |
| currentMapping.get(PARTITIONS[2]).put(NODES[0], "MASTER"); |
| currentMapping.get(PARTITIONS[2]).put(NODES[2], "SLAVE"); |
| znRecord = |
| new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount) |
| .computePartitionAssignment(allNodes, liveNodes, currentMapping, null); |
| preferenceLists = znRecord.getListFields(); |
| firstNodes.clear(); |
| Set<String> secondNodes = Sets.newHashSet(); |
| for (String partition : currentMapping.keySet()) { |
| List<String> preferenceList = preferenceLists.get(partition); |
| Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); |
| Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition); |
| firstNodes.add(preferenceList.get(0)); |
| secondNodes.add(preferenceList.get(1)); |
| } |
| Assert.assertEquals(firstNodes.size(), 3, "masters not distributed evenly"); |
| Assert.assertEquals(secondNodes.size(), 3, "slaves not distributed evenly"); |
| |
| // remove a node now, but use the current mapping with everything balanced just prior |
| liveNodes.remove(0); |
| stateCount = STATE_MODEL.getStateCountMap(liveNodes.size(), REPLICA_COUNT); |
| |
| // remove all references of n0 from the mapping, keep everything else in a legal state |
| for (String partition : PARTITIONS) { |
| currentMapping.get(partition).clear(); |
| } |
| currentMapping.get(PARTITIONS[0]).put(NODES[1], "MASTER"); |
| currentMapping.get(PARTITIONS[1]).put(NODES[1], "MASTER"); |
| currentMapping.get(PARTITIONS[1]).put(NODES[2], "SLAVE"); |
| currentMapping.get(PARTITIONS[2]).put(NODES[2], "MASTER"); |
| znRecord = |
| new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount) |
| .computePartitionAssignment(allNodes, liveNodes, currentMapping, null); |
| preferenceLists = znRecord.getListFields(); |
| for (String partition : currentMapping.keySet()) { |
| List<String> preferenceList = preferenceLists.get(partition); |
| Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); |
| Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition); |
| Map<String, String> stateMap = currentMapping.get(partition); |
| for (String participant : stateMap.keySet()) { |
| Assert.assertTrue(preferenceList.contains(participant), "minimal movement violated for " |
| + partition); |
| } |
| for (String participant : preferenceList) { |
| if (!stateMap.containsKey(participant)) { |
| Assert.assertNotSame(preferenceList.get(0), participant, |
| "newly moved replica should not be master for " + partition); |
| } |
| } |
| } |
| |
| // finally, adjust the current mapping to reflect 2 nodes and make sure everything's even again |
| for (String partition : PARTITIONS) { |
| currentMapping.get(partition).clear(); |
| } |
| currentMapping.get(PARTITIONS[0]).put(NODES[1], "MASTER"); |
| currentMapping.get(PARTITIONS[0]).put(NODES[2], "SLAVE"); |
| currentMapping.get(PARTITIONS[1]).put(NODES[1], "SLAVE"); |
| currentMapping.get(PARTITIONS[1]).put(NODES[2], "MASTER"); |
| currentMapping.get(PARTITIONS[2]).put(NODES[1], "SLAVE"); |
| currentMapping.get(PARTITIONS[2]).put(NODES[2], "MASTER"); |
| znRecord = |
| new AutoRebalanceStrategy(RESOURCE_NAME, partitions, stateCount) |
| .computePartitionAssignment(allNodes, liveNodes, currentMapping, null); |
| preferenceLists = znRecord.getListFields(); |
| firstNodes.clear(); |
| for (String partition : currentMapping.keySet()) { |
| List<String> preferenceList = preferenceLists.get(partition); |
| Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); |
| Assert.assertEquals(preferenceList.size(), 2, "invalid preference list for " + partition); |
| firstNodes.add(preferenceList.get(0)); |
| } |
| Assert.assertEquals(firstNodes.size(), 2, "masters not evenly distributed"); |
| } |
| |
| |
| @Test public void test() { |
| int nPartitions = 16; |
| final String resourceName = "something"; |
| final List<String> instanceNames = |
| Arrays.asList("node-1", "node-2", "node-3", "node-4"); // Initialize to 4 unique strings |
| |
| final int nReplicas = 3; |
| |
| List<String> partitions = new ArrayList<String>(nPartitions); |
| for (int i = 0; i < nPartitions; i++) { |
| partitions.add(Integer.toString(i)); |
| } |
| |
| LinkedHashMap<String, Integer> states = new LinkedHashMap<String, Integer>(2); |
| states.put("OFFLINE", 0); |
| states.put("ONLINE", nReplicas); |
| |
| AutoRebalanceStrategy strategy = new AutoRebalanceStrategy(resourceName, partitions, states); |
| ZNRecord znRecord = strategy.computePartitionAssignment(instanceNames, instanceNames, |
| new HashMap<String, Map<String, String>>(0), null); |
| |
| for (List p : znRecord.getListFields().values()) { |
| Assert.assertEquals(p.size(), nReplicas); |
| } |
| } |
| |
| /** |
| * Tests the following scenario: there is only a single partition for a resource. Two nodes up, |
| * partition should |
| * be assigned to one of them. Take down that node, partition should move. Bring back up that |
| * node, partition should not move unnecessarily. |
| */ |
| @Test |
| public void testWontMoveSinglePartitionUnnecessarily() { |
| final String RESOURCE = "resource"; |
| final String partition = "resource_0"; |
| final StateModelDefinition STATE_MODEL = |
| new StateModelDefinition(StateModelConfigGenerator.generateConfigForOnlineOffline()); |
| LinkedHashMap<String, Integer> stateCount = Maps.newLinkedHashMap(); |
| stateCount.put("ONLINE", 1); |
| final String[] NODES = {"n0", "n1"}; |
| |
| // initial state, one node, no mapping |
| List<String> allNodes = Lists.newArrayList(NODES); |
| List<String> liveNodes = Lists.newArrayList(NODES); |
| Map<String, Map<String, String>> currentMapping = Maps.newHashMap(); |
| currentMapping.put(partition, new HashMap<String, String>()); |
| |
| // Both nodes there |
| List<String> partitions = Lists.newArrayList(partition); |
| Map<String, String> upperBounds = Maps.newHashMap(); |
| for (String state : STATE_MODEL.getStatesPriorityList()) { |
| upperBounds.put(state, STATE_MODEL.getNumInstancesPerState(state)); |
| } |
| |
| ZNRecord znRecord = |
| new AutoRebalanceStrategy(RESOURCE, partitions, stateCount, Integer.MAX_VALUE) |
| .computePartitionAssignment(allNodes, liveNodes, currentMapping, null); |
| Map<String, List<String>> preferenceLists = znRecord.getListFields(); |
| List<String> preferenceList = preferenceLists.get(partition.toString()); |
| Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); |
| Assert.assertEquals(preferenceList.size(), 1, "invalid preference list for " + partition); |
| String state = znRecord.getMapField(partition.toString()).get(preferenceList.get(0)); |
| Assert.assertEquals(state, "ONLINE", "Invalid state for " + partition); |
| String preferredNode = preferenceList.get(0); |
| String otherNode = preferredNode.equals(NODES[0]) ? NODES[1] : NODES[0]; |
| // ok, see what happens if we've got the partition on the other node (e.g. due to the preferred |
| // node being down). |
| currentMapping.get(partition).put(otherNode, state); |
| |
| znRecord = |
| new AutoRebalanceStrategy(RESOURCE, partitions, stateCount, Integer.MAX_VALUE) |
| .computePartitionAssignment(allNodes, liveNodes, currentMapping, null); |
| |
| preferenceLists = znRecord.getListFields(); |
| preferenceList = preferenceLists.get(partition.toString()); |
| Assert.assertNotNull(preferenceList, "invalid preference list for " + partition); |
| Assert.assertEquals(preferenceList.size(), 1, "invalid preference list for " + partition); |
| state = znRecord.getMapField(partition.toString()).get(preferenceList.get(0)); |
| Assert.assertEquals(state, "ONLINE", "Invalid state for " + partition); |
| String finalPreferredNode = preferenceList.get(0); |
| // finally, make sure we haven't moved it. |
| Assert.assertEquals(finalPreferredNode, otherNode); |
| } |
| } |