blob: 8eb0e00c5da0ba073561e78f206e9662ee2a2dac [file] [log] [blame]
package org.apache.helix;
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.helix.model.IdealState;
import org.apache.helix.tools.DefaultIdealStateCalculator;
import org.apache.helix.util.RebalanceUtil;
import org.testng.Assert;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
public class TestEspressoStorageClusterIdealState {
@Test()
public void testEspressoStorageClusterIdealState() throws Exception {
List<String> instanceNames = new ArrayList<String>();
for (int i = 0; i < 5; i++) {
instanceNames.add("localhost:123" + i);
}
int partitions = 8, replicas = 0;
Map<String, Object> result0 =
DefaultIdealStateCalculator.calculateInitialIdealState(instanceNames, partitions, replicas);
Verify(result0, partitions, replicas);
partitions = 8192;
replicas = 3;
instanceNames.clear();
for (int i = 0; i < 20; i++) {
instanceNames.add("localhost:123" + i);
}
Map<String, Object> resultOriginal =
DefaultIdealStateCalculator.calculateInitialIdealState(instanceNames, partitions, replicas);
Verify(resultOriginal, partitions, replicas);
printStat(resultOriginal);
Map<String, Object> result1 =
DefaultIdealStateCalculator.calculateInitialIdealState(instanceNames, partitions, replicas);
List<String> instanceNames2 = new ArrayList<String>();
for (int i = 30; i < 35; i++) {
instanceNames2.add("localhost:123" + i);
}
DefaultIdealStateCalculator.calculateNextIdealState(instanceNames2, result1);
List<String> instanceNames3 = new ArrayList<String>();
for (int i = 35; i < 40; i++) {
instanceNames3.add("localhost:123" + i);
}
DefaultIdealStateCalculator.calculateNextIdealState(instanceNames3, result1);
Double masterKeepRatio = 0.0, slaveKeepRatio = 0.0;
Verify(result1, partitions, replicas);
double[] result = compareResult(resultOriginal, result1);
masterKeepRatio = result[0];
slaveKeepRatio = result[1];
Assert.assertTrue(0.66 < masterKeepRatio && 0.67 > masterKeepRatio);
Assert.assertTrue(0.49 < slaveKeepRatio && 0.51 > slaveKeepRatio);
}
@Test
public void testRebalance2() {
int partitions = 1256, replicas = 3;
List<String> instanceNames = new ArrayList<String>();
for (int i = 0; i < 10; i++) {
instanceNames.add("localhost:123" + i);
}
Map<String, Object> resultOriginal =
DefaultIdealStateCalculator.calculateInitialIdealState(instanceNames, partitions, replicas);
ZNRecord idealState1 =
DefaultIdealStateCalculator.convertToZNRecord(resultOriginal, "TestDB", "MASTER", "SLAVE");
Map<String, Object> result1 =
RebalanceUtil.buildInternalIdealState(new IdealState(idealState1));
List<String> instanceNames2 = new ArrayList<String>();
for (int i = 30; i < 35; i++) {
instanceNames2.add("localhost:123" + i);
}
Map<String, Object> result2 =
DefaultIdealStateCalculator.calculateNextIdealState(instanceNames2, result1);
Verify(resultOriginal, partitions, replicas);
Verify(result2, partitions, replicas);
Double masterKeepRatio = 0.0, slaveKeepRatio = 0.0;
double[] result = compareResult(resultOriginal, result2);
masterKeepRatio = result[0];
slaveKeepRatio = result[1];
Assert.assertTrue(0.66 < masterKeepRatio && 0.67 > masterKeepRatio);
Assert.assertTrue(0.49 < slaveKeepRatio && 0.51 > slaveKeepRatio);
}
public static void Verify(Map<String, Object> result, int partitions, int replicas) {
Map<String, List<Integer>> masterAssignmentMap =
(Map<String, List<Integer>>) (result.get("MasterAssignmentMap"));
Map<String, Map<String, List<Integer>>> nodeSlaveAssignmentMap =
(Map<String, Map<String, List<Integer>>>) (result.get("SlaveAssignmentMap"));
AssertJUnit.assertTrue(partitions == (Integer) (result.get("partitions")));
// Verify master partitions covers all master partitions on each node
Map<Integer, Integer> masterCounterMap = new TreeMap<Integer, Integer>();
for (int i = 0; i < partitions; i++) {
masterCounterMap.put(i, 0);
}
int minMasters = Integer.MAX_VALUE, maxMasters = Integer.MIN_VALUE;
for (String instanceName : masterAssignmentMap.keySet()) {
List<Integer> masterList = masterAssignmentMap.get(instanceName);
// the assert needs to be changed when weighting is introduced
// AssertJUnit.assertTrue(masterList.size() == partitions /masterAssignmentMap.size() |
// masterList.size() == (partitions /masterAssignmentMap.size()+1) );
for (Integer x : masterList) {
AssertJUnit.assertTrue(masterCounterMap.get(x) == 0);
masterCounterMap.put(x, 1);
}
if (minMasters > masterList.size()) {
minMasters = masterList.size();
}
if (maxMasters < masterList.size()) {
maxMasters = masterList.size();
}
}
// Master partition should be evenly distributed most of the time
System.out.println("Masters: max: " + maxMasters + " Min:" + minMasters);
// Each master partition should occur only once
for (int i = 0; i < partitions; i++) {
AssertJUnit.assertTrue(masterCounterMap.get(i) == 1);
}
AssertJUnit.assertTrue(masterCounterMap.size() == partitions);
// for each node, verify the master partitions and the slave partition assignment map
if (replicas == 0) {
AssertJUnit.assertTrue(nodeSlaveAssignmentMap.size() == 0);
return;
}
AssertJUnit.assertTrue(masterAssignmentMap.size() == nodeSlaveAssignmentMap.size());
for (String instanceName : masterAssignmentMap.keySet()) {
AssertJUnit.assertTrue(nodeSlaveAssignmentMap.containsKey(instanceName));
Map<String, List<Integer>> slaveAssignmentMap = nodeSlaveAssignmentMap.get(instanceName);
Map<Integer, Integer> slaveCountMap = new TreeMap<Integer, Integer>();
List<Integer> masterList = masterAssignmentMap.get(instanceName);
for (Integer masterPartitionId : masterList) {
slaveCountMap.put(masterPartitionId, 0);
}
// Make sure that masterList are covered replica times by the slave assignment.
int minSlaves = Integer.MAX_VALUE, maxSlaves = Integer.MIN_VALUE;
for (String hostInstance : slaveAssignmentMap.keySet()) {
List<Integer> slaveAssignment = slaveAssignmentMap.get(hostInstance);
Set<Integer> occurenceSet = new HashSet<Integer>();
// Each slave should occur only once in the list, since the list is per-node slaves
for (Integer slavePartition : slaveAssignment) {
AssertJUnit.assertTrue(!occurenceSet.contains(slavePartition));
occurenceSet.add(slavePartition);
slaveCountMap.put(slavePartition, slaveCountMap.get(slavePartition) + 1);
}
if (minSlaves > slaveAssignment.size()) {
minSlaves = slaveAssignment.size();
}
if (maxSlaves < slaveAssignment.size()) {
maxSlaves = slaveAssignment.size();
}
}
// check if slave distribution is even
AssertJUnit.assertTrue(maxSlaves - minSlaves <= 1);
// System.out.println("Slaves: max: "+maxSlaves+" Min:"+ minSlaves);
// for each node, the slave assignment map should cover the masters for exactly replica
// times
AssertJUnit.assertTrue(slaveCountMap.size() == masterList.size());
for (Integer masterPartitionId : masterList) {
AssertJUnit.assertTrue(slaveCountMap.get(masterPartitionId) == replicas);
}
}
}
public void printStat(Map<String, Object> result) {
// print out master distribution
// print out slave distribution
}
public static double[] compareResult(Map<String, Object> result1, Map<String, Object> result2) {
double[] result = new double[2];
Map<String, List<Integer>> masterAssignmentMap1 =
(Map<String, List<Integer>>) (result1.get("MasterAssignmentMap"));
Map<String, Map<String, List<Integer>>> nodeSlaveAssignmentMap1 =
(Map<String, Map<String, List<Integer>>>) (result1.get("SlaveAssignmentMap"));
Map<String, List<Integer>> masterAssignmentMap2 =
(Map<String, List<Integer>>) (result2.get("MasterAssignmentMap"));
Map<String, Map<String, List<Integer>>> nodeSlaveAssignmentMap2 =
(Map<String, Map<String, List<Integer>>>) (result2.get("SlaveAssignmentMap"));
int commonMasters = 0;
int commonSlaves = 0;
int partitions = (Integer) (result1.get("partitions"));
int replicas = (Integer) (result1.get("replicas"));
AssertJUnit.assertTrue((Integer) (result2.get("partitions")) == partitions);
AssertJUnit.assertTrue((Integer) (result2.get("replicas")) == replicas);
// masterMap1 maps from partition id to the holder instance name
Map<Integer, String> masterMap1 = new TreeMap<Integer, String>();
for (String instanceName : masterAssignmentMap1.keySet()) {
List<Integer> masterList1 = masterAssignmentMap1.get(instanceName);
for (Integer partition : masterList1) {
AssertJUnit.assertTrue(!masterMap1.containsKey(partition));
masterMap1.put(partition, instanceName);
}
}
// go through masterAssignmentMap2 and find out the common number
for (String instanceName : masterAssignmentMap2.keySet()) {
List<Integer> masterList2 = masterAssignmentMap2.get(instanceName);
for (Integer partition : masterList2) {
if (masterMap1.get(partition).equalsIgnoreCase(instanceName)) {
commonMasters++;
}
}
}
result[0] = 1.0 * commonMasters / partitions;
System.out.println(
commonMasters + " master partitions are kept, " + (partitions - commonMasters)
+ " moved, keep ratio:" + 1.0 * commonMasters / partitions);
// maps from the partition id to the instance names that holds its slave partition
Map<Integer, Set<String>> slaveMap1 = new TreeMap<Integer, Set<String>>();
for (String instanceName : nodeSlaveAssignmentMap1.keySet()) {
Map<String, List<Integer>> slaveAssignment1 = nodeSlaveAssignmentMap1.get(instanceName);
for (String slaveHostName : slaveAssignment1.keySet()) {
List<Integer> slaveList = slaveAssignment1.get(slaveHostName);
for (Integer partition : slaveList) {
if (!slaveMap1.containsKey(partition)) {
slaveMap1.put(partition, new TreeSet<String>());
}
AssertJUnit.assertTrue(!slaveMap1.get(partition).contains(slaveHostName));
slaveMap1.get(partition).add(slaveHostName);
}
}
}
for (String instanceName : nodeSlaveAssignmentMap2.keySet()) {
Map<String, List<Integer>> slaveAssignment2 = nodeSlaveAssignmentMap2.get(instanceName);
for (String slaveHostName : slaveAssignment2.keySet()) {
List<Integer> slaveList = slaveAssignment2.get(slaveHostName);
for (Integer partition : slaveList) {
if (slaveMap1.get(partition).contains(slaveHostName)) {
commonSlaves++;
}
}
}
}
result[1] = 1.0 * commonSlaves / partitions / replicas;
System.out.println(
commonSlaves + " slave partitions are kept, " + (partitions * replicas - commonSlaves)
+ " moved. keep ratio:" + 1.0 * commonSlaves / partitions / replicas);
return result;
}
}