| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| package org.apache.iotdb.confignode.manager.load.balancer.router.leader; |
| |
| import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; |
| import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; |
| import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; |
| import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; |
| import org.apache.iotdb.commons.cluster.NodeStatus; |
| import org.apache.iotdb.commons.cluster.RegionStatus; |
| import org.apache.iotdb.confignode.manager.load.cache.node.NodeStatistics; |
| import org.apache.iotdb.confignode.manager.load.cache.region.RegionStatistics; |
| |
| import org.apache.tsfile.utils.Pair; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import java.io.FileWriter; |
| import java.io.IOException; |
| import java.util.ArrayList; |
| import java.util.Comparator; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.PriorityQueue; |
| import java.util.Random; |
| import java.util.Set; |
| import java.util.TreeMap; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| public class LeaderBalancerComparisonTest { |
| |
| // Set this field to true, and you can see the readable test results in command line |
| private static final boolean isCommandLineMode = false; |
| |
| private static final Logger LOGGER = LoggerFactory.getLogger(LeaderBalancerComparisonTest.class); |
| private static FileWriter WRITER; |
| |
| private static final GreedyLeaderBalancer GREEDY_LEADER_BALANCER = new GreedyLeaderBalancer(); |
| private static final MinCostFlowLeaderBalancer MIN_COST_FLOW_LEADER_BALANCER = |
| new MinCostFlowLeaderBalancer(); |
| |
| private static final Random RANDOM = new Random(); |
| private static final int TEST_MAX_DATA_NODE_NUM = 100; |
| private static final int TEST_CPU_CORE_NUM = 16; |
| private static final int TEST_REPLICA_NUM = 3; |
| private static final double GREEDY_INIT_RATE = 0.9; |
| private static final double DISABLE_DATA_NODE_RATE = 0.05; |
| |
| // Invoke this interface if you want to record the test result |
| public static void prepareWriter() throws IOException { |
| if (isCommandLineMode) { |
| WRITER = null; |
| } else { |
| WRITER = new FileWriter("./leaderBalancerTest.txt"); |
| } |
| } |
| |
| // Add @Test here to enable this test |
| public void leaderBalancerComparisonTest() throws IOException { |
| for (int dataNodeNum = 3; dataNodeNum <= TEST_MAX_DATA_NODE_NUM; dataNodeNum++) { |
| // Simulate each DataNode has 16 CPU cores |
| // and each RegionGroup has 3 replicas |
| int regionGroupNum = TEST_CPU_CORE_NUM * dataNodeNum / TEST_REPLICA_NUM; |
| Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap = new HashMap<>(); |
| Map<TConsensusGroupId, Integer> regionLeaderMap = new HashMap<>(); |
| generateTestData(dataNodeNum, regionGroupNum, regionReplicaSetMap, regionLeaderMap); |
| |
| if (isCommandLineMode) { |
| LOGGER.info("============================"); |
| LOGGER.info("DataNodeNum: {}, RegionGroupNum: {}", dataNodeNum, regionGroupNum); |
| } |
| |
| // Basic test |
| Map<TConsensusGroupId, Integer> greedyLeaderDistribution = new ConcurrentHashMap<>(); |
| Map<Integer, NodeStatistics> allRunningDataNodeStatistics = new TreeMap<>(); |
| for (int i = 0; i < dataNodeNum; i++) { |
| allRunningDataNodeStatistics.put(i, new NodeStatistics(NodeStatus.Running)); |
| } |
| Map<TConsensusGroupId, Map<Integer, RegionStatistics>> allRunningRegionStatistics = |
| new TreeMap<>(); |
| regionReplicaSetMap.forEach( |
| (regionGroupId, regionReplicaSet) -> { |
| Map<Integer, RegionStatistics> regionStatistics = new TreeMap<>(); |
| regionReplicaSet |
| .getDataNodeLocations() |
| .forEach( |
| dataNodeLocation -> |
| regionStatistics.put( |
| dataNodeLocation.getDataNodeId(), |
| new RegionStatistics(RegionStatus.Running))); |
| allRunningRegionStatistics.put(regionGroupId, regionStatistics); |
| }); |
| Statistics greedyStatistics = |
| doBalancing( |
| dataNodeNum, |
| regionGroupNum, |
| GREEDY_LEADER_BALANCER, |
| regionReplicaSetMap, |
| regionLeaderMap, |
| allRunningDataNodeStatistics, |
| allRunningRegionStatistics, |
| greedyLeaderDistribution); |
| Map<TConsensusGroupId, Integer> mcfLeaderDistribution = new ConcurrentHashMap<>(); |
| Statistics mcfStatistics = |
| doBalancing( |
| dataNodeNum, |
| regionGroupNum, |
| MIN_COST_FLOW_LEADER_BALANCER, |
| regionReplicaSetMap, |
| regionLeaderMap, |
| allRunningDataNodeStatistics, |
| allRunningRegionStatistics, |
| mcfLeaderDistribution); |
| if (isCommandLineMode) { |
| LOGGER.info("[Basic test]"); |
| LOGGER.info("Greedy balancer: {}", greedyStatistics); |
| LOGGER.info("MinCostFlow balancer: {}", mcfStatistics); |
| } else { |
| greedyStatistics.toFile(); |
| mcfStatistics.toFile(); |
| } |
| |
| // Disaster test |
| int disabledDataNodeNum = (int) Math.ceil(dataNodeNum * DISABLE_DATA_NODE_RATE); |
| HashSet<Integer> disabledDataNodeSet = new HashSet<>(); |
| while (disabledDataNodeSet.size() < disabledDataNodeNum) { |
| int dataNodeId = RANDOM.nextInt(dataNodeNum); |
| if (disabledDataNodeSet.contains(dataNodeId)) { |
| continue; |
| } |
| disabledDataNodeSet.add(dataNodeId); |
| } |
| Map<Integer, NodeStatistics> disabledDataNodeStatistics = new TreeMap<>(); |
| for (int i = 0; i < dataNodeNum; i++) { |
| disabledDataNodeStatistics.put( |
| i, |
| disabledDataNodeSet.contains(i) |
| ? new NodeStatistics(NodeStatus.Unknown) |
| : new NodeStatistics(NodeStatus.Running)); |
| } |
| Map<TConsensusGroupId, Map<Integer, RegionStatistics>> disabledRegionStatistics = |
| new TreeMap<>(); |
| regionReplicaSetMap.forEach( |
| (regionGroupId, regionReplicaSet) -> { |
| Map<Integer, RegionStatistics> regionStatistics = new TreeMap<>(); |
| regionReplicaSet |
| .getDataNodeLocations() |
| .forEach( |
| dataNodeLocation -> |
| regionStatistics.put( |
| dataNodeLocation.getDataNodeId(), |
| disabledDataNodeSet.contains(dataNodeLocation.getDataNodeId()) |
| ? new RegionStatistics(RegionStatus.Unknown) |
| : new RegionStatistics(RegionStatus.Running))); |
| disabledRegionStatistics.put(regionGroupId, regionStatistics); |
| }); |
| greedyStatistics = |
| doBalancing( |
| dataNodeNum, |
| regionGroupNum, |
| GREEDY_LEADER_BALANCER, |
| regionReplicaSetMap, |
| greedyLeaderDistribution, |
| disabledDataNodeStatistics, |
| disabledRegionStatistics, |
| greedyLeaderDistribution); |
| mcfStatistics = |
| doBalancing( |
| dataNodeNum, |
| regionGroupNum, |
| MIN_COST_FLOW_LEADER_BALANCER, |
| regionReplicaSetMap, |
| mcfLeaderDistribution, |
| disabledDataNodeStatistics, |
| disabledRegionStatistics, |
| mcfLeaderDistribution); |
| if (isCommandLineMode) { |
| LOGGER.info("[Disaster test]"); |
| LOGGER.info("Greedy balancer: {}", greedyStatistics); |
| LOGGER.info("MinCostFlow balancer: {}", mcfStatistics); |
| } else { |
| greedyStatistics.toFile(); |
| mcfStatistics.toFile(); |
| } |
| |
| // Recovery test |
| greedyStatistics = |
| doBalancing( |
| dataNodeNum, |
| regionGroupNum, |
| GREEDY_LEADER_BALANCER, |
| regionReplicaSetMap, |
| greedyLeaderDistribution, |
| allRunningDataNodeStatistics, |
| allRunningRegionStatistics, |
| greedyLeaderDistribution); |
| mcfStatistics = |
| doBalancing( |
| dataNodeNum, |
| regionGroupNum, |
| MIN_COST_FLOW_LEADER_BALANCER, |
| regionReplicaSetMap, |
| mcfLeaderDistribution, |
| allRunningDataNodeStatistics, |
| allRunningRegionStatistics, |
| mcfLeaderDistribution); |
| if (isCommandLineMode) { |
| LOGGER.info("[Recovery test]"); |
| LOGGER.info("Greedy balancer: {}", greedyStatistics); |
| LOGGER.info("MinCostFlow balancer: {}", mcfStatistics); |
| } else { |
| greedyStatistics.toFile(); |
| mcfStatistics.toFile(); |
| } |
| } |
| } |
| |
| private void generateTestData( |
| int dataNodeNum, |
| int regionGroupNum, |
| Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap, |
| Map<TConsensusGroupId, Integer> regionLeaderMap) { |
| |
| Map<Integer, AtomicInteger> regionCounter = new ConcurrentHashMap<>(); |
| Map<Integer, AtomicInteger> leaderCounter = new ConcurrentHashMap<>(); |
| for (int i = 0; i < dataNodeNum; i++) { |
| regionCounter.put(i, new AtomicInteger(0)); |
| leaderCounter.put(i, new AtomicInteger(0)); |
| } |
| |
| int greedyNum = (int) (GREEDY_INIT_RATE * regionGroupNum); |
| int randomNum = regionGroupNum - greedyNum; |
| for (int index = 0; index < regionGroupNum; index++) { |
| int leaderId = -1; |
| TConsensusGroupId regionGroupId = |
| new TConsensusGroupId(TConsensusGroupType.DataRegion, index); |
| TRegionReplicaSet regionReplicaSet = new TRegionReplicaSet().setRegionId(regionGroupId); |
| |
| int seed = RANDOM.nextInt(greedyNum + randomNum); |
| if (seed < greedyNum) { |
| // Greedy pick RegionReplicas and leader |
| int leaderWeight = Integer.MAX_VALUE; |
| PriorityQueue<Pair<Integer, Integer>> dataNodePriorityQueue = |
| new PriorityQueue<>(Comparator.comparingInt(Pair::getRight)); |
| regionCounter.forEach( |
| (dataNodeId, regionGroupCount) -> |
| dataNodePriorityQueue.offer(new Pair<>(dataNodeId, regionGroupCount.get()))); |
| for (int i = 0; i < TEST_REPLICA_NUM; i++) { |
| int dataNodeId = Objects.requireNonNull(dataNodePriorityQueue.poll()).getLeft(); |
| regionReplicaSet.addToDataNodeLocations( |
| new TDataNodeLocation().setDataNodeId(dataNodeId)); |
| if (leaderCounter.get(dataNodeId).get() < leaderWeight) { |
| leaderWeight = leaderCounter.get(dataNodeId).get(); |
| leaderId = dataNodeId; |
| } |
| } |
| greedyNum -= 1; |
| } else { |
| // Random pick RegionReplicas and leader |
| Set<Integer> randomSet = new HashSet<>(); |
| while (randomSet.size() < TEST_REPLICA_NUM) { |
| int dataNodeId = RANDOM.nextInt(dataNodeNum); |
| if (randomSet.contains(dataNodeId)) { |
| continue; |
| } |
| |
| randomSet.add(dataNodeId); |
| regionReplicaSet.addToDataNodeLocations( |
| new TDataNodeLocation().setDataNodeId(dataNodeId)); |
| } |
| leaderId = new ArrayList<>(randomSet).get(RANDOM.nextInt(TEST_REPLICA_NUM)); |
| randomNum -= 1; |
| } |
| |
| regionReplicaSetMap.put(regionGroupId, regionReplicaSet); |
| regionReplicaSet |
| .getDataNodeLocations() |
| .forEach( |
| dataNodeLocation -> |
| regionCounter.get(dataNodeLocation.getDataNodeId()).getAndIncrement()); |
| regionLeaderMap.put(regionGroupId, leaderId); |
| leaderCounter.get(leaderId).getAndIncrement(); |
| } |
| } |
| |
| private Statistics doBalancing( |
| int dataNodeNum, |
| int regionGroupNum, |
| AbstractLeaderBalancer leaderBalancer, |
| Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap, |
| Map<TConsensusGroupId, Integer> regionLeaderMap, |
| Map<Integer, NodeStatistics> nodeStatisticsMap, |
| Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap, |
| Map<TConsensusGroupId, Integer> stableLeaderDistribution) { |
| |
| Statistics result = new Statistics(); |
| result.rounds = -1; |
| Map<TConsensusGroupId, Integer> lastDistribution = new ConcurrentHashMap<>(regionLeaderMap); |
| for (int rounds = 0; rounds < 1000; rounds++) { |
| Map<TConsensusGroupId, Integer> currentDistribution = |
| leaderBalancer.generateOptimalLeaderDistribution( |
| new TreeMap<>(), |
| regionReplicaSetMap, |
| lastDistribution, |
| nodeStatisticsMap, |
| regionStatisticsMap); |
| if (currentDistribution.equals(lastDistribution)) { |
| // The leader distribution is stable |
| result.rounds = rounds; |
| break; |
| } |
| |
| AtomicInteger switchTimes = new AtomicInteger(); |
| lastDistribution |
| .keySet() |
| .forEach( |
| regionGroupId -> { |
| if (!Objects.equals( |
| lastDistribution.get(regionGroupId), currentDistribution.get(regionGroupId))) { |
| switchTimes.getAndIncrement(); |
| } |
| }); |
| |
| result.switchTimes += switchTimes.get(); |
| lastDistribution.clear(); |
| lastDistribution.putAll(currentDistribution); |
| } |
| |
| stableLeaderDistribution.clear(); |
| stableLeaderDistribution.putAll(lastDistribution); |
| |
| double sum = 0; |
| double avg = (double) (regionGroupNum) / (double) (dataNodeNum); |
| int minLeaderCount = Integer.MAX_VALUE; |
| int maxLeaderCount = Integer.MIN_VALUE; |
| Map<Integer, AtomicInteger> leaderCounter = new ConcurrentHashMap<>(); |
| lastDistribution.forEach( |
| (regionGroupId, leaderId) -> |
| leaderCounter |
| .computeIfAbsent(leaderId, empty -> new AtomicInteger(0)) |
| .getAndIncrement()); |
| for (Map.Entry<Integer, AtomicInteger> entry : leaderCounter.entrySet()) { |
| int leaderCount = entry.getValue().get(); |
| sum += Math.pow((double) leaderCount - avg, 2); |
| minLeaderCount = Math.min(minLeaderCount, leaderCount); |
| maxLeaderCount = Math.max(maxLeaderCount, leaderCount); |
| } |
| result.range = maxLeaderCount - minLeaderCount; |
| result.variance = sum / (double) (dataNodeNum); |
| |
| return result; |
| } |
| |
| private static class Statistics { |
| |
| // The number of execution rounds that the output of balance algorithm is stable |
| private int rounds; |
| // The number of change leader until the output of balance algorithm is stable |
| private int switchTimes; |
| // The range of the number of cluster leaders |
| private int range; |
| // The variance of the number of cluster leaders |
| private double variance; |
| |
| private Statistics() { |
| this.rounds = 0; |
| this.switchTimes = 0; |
| this.range = 0; |
| this.variance = 0; |
| } |
| |
| private void toFile() throws IOException { |
| WRITER.write( |
| rounds + "," + switchTimes + "," + range + "," + String.format("%.6f", variance) + "\n"); |
| WRITER.flush(); |
| } |
| |
| @Override |
| public String toString() { |
| return "Statistics{" |
| + "rounds=" |
| + rounds |
| + ", switchTimes=" |
| + switchTimes |
| + ", range=" |
| + range |
| + ", variance=" |
| + variance |
| + '}'; |
| } |
| |
| @Override |
| public boolean equals(Object o) { |
| if (this == o) return true; |
| if (o == null || getClass() != o.getClass()) return false; |
| Statistics that = (Statistics) o; |
| return rounds == that.rounds |
| && switchTimes == that.switchTimes |
| && range == that.range |
| && Math.abs(variance - that.variance) <= 0.1; |
| } |
| |
| @Override |
| public int hashCode() { |
| return Objects.hash(rounds, switchTimes, range, variance); |
| } |
| } |
| } |