finish
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java index 2239bfd..05803a4 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/RouteBalancer.java
@@ -316,6 +316,14 @@ CnToDnRequestType.UPDATE_REGION_ROUTE_MAP, new TRegionRouteReq(broadcastTime, tmpPriorityMap), dataNodeLocationMap); + for (Map.Entry<TConsensusGroupId, TRegionReplicaSet> routeEntry : tmpPriorityMap.entrySet()) { + LOGGER.info( + "[RouteMap] {}: {}", + routeEntry.getKey(), + routeEntry.getValue().getDataNodeLocations().stream() + .mapToInt(TDataNodeLocation::getDataNodeId) + .toArray()); + } CnToDnInternalServiceAsyncRequestManager.getInstance().sendAsyncRequestWithRetry(clientHandler); } @@ -437,7 +445,7 @@ @Override public void onConsensusGroupStatisticsChanged(ConsensusGroupStatisticsChangeEvent event) { - balanceRegionLeader(); + // balanceRegionLeader(); balanceRegionPriority(); } }
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java index c8a3584..4446025 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java
@@ -114,6 +114,17 @@ databaseAllocatedRegionGroups); dfs(-1, 0, new int[replicationFactor], 0, 0, 0); + if (optimalReplicaSets.isEmpty()) { + GreedyRegionGroupAllocator tmpAllocator = new GreedyRegionGroupAllocator(); + return tmpAllocator.generateOptimalRegionReplicasDistribution( + availableDataNodeMap, + freeDiskSpaceMap, + allocatedRegionGroups, + databaseAllocatedRegionGroups, + replicationFactor, + consensusGroupId); + } + // Randomly pick one optimal plan as result Collections.shuffle(optimalReplicaSets); int[] optimalReplicaSet = optimalReplicaSets.get(0);
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/PGRA.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/PGRA.java new file mode 100644 index 0000000..1339ca2 --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/PGRA.java
@@ -0,0 +1,285 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.load.balancer.region; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.TreeMap; +import java.util.stream.Collectors; + +public class PGRA implements IRegionGroupAllocator { + + private static class DataNodeEntry { + + private final int fakeId; + private final int regionCount; + private final int scatterWidth; + private final int randomWeight; + + public DataNodeEntry(int fakeId, int regionCount, int scatterWidth, int randomWeight) { + this.fakeId = fakeId; + this.regionCount = regionCount; + this.scatterWidth = scatterWidth; + this.randomWeight = randomWeight; + } + + public int compare(PGRA.DataNodeEntry e) { + return regionCount != e.regionCount + ? Integer.compare(regionCount, e.regionCount) + : scatterWidth != e.scatterWidth + ? Integer.compare(scatterWidth, e.scatterWidth) + : Integer.compare(randomWeight, e.randomWeight); + } + } + + private static final Random RANDOM = new Random(); + + private int subGraphCount; + private int replicationFactor; + private int regionPerDataNode; + + private int dataNodeNum; + // The number of allocated Regions in each DataNode + private int[] regionCounter; + // The scatter width of each DataNode + private int[] scatterWidthCounter; + // The number of 2-Region combinations in current cluster + private int[][] combinationCounter; + private Map<Integer, Integer> fakeToRealIdMap; + private Map<Integer, Integer> realToFakeIdMap; + + private int subDataNodeNum; + // First Key: the sum of overlapped 2-Region combination Regions with + // other allocated RegionGroups is minimal + private int optimalCombinationSum; + // Second Key: the sum of DataRegions in selected DataNodes is minimal + private int optimalRegionSum; + private int[] optimalSubDataNodes; + + @Override + public TRegionReplicaSet generateOptimalRegionReplicasDistribution( + Map<Integer, TDataNodeConfiguration> availableDataNodeMap, + Map<Integer, Double> freeDiskSpaceMap, + List<TRegionReplicaSet> allocatedRegionGroups, + List<TRegionReplicaSet> databaseAllocatedRegionGroups, + int replicationFactor, + TConsensusGroupId consensusGroupId) { + + this.regionPerDataNode = + (int) + (consensusGroupId.getType().equals(TConsensusGroupType.DataRegion) + ? ConfigNodeDescriptor.getInstance().getConf().getDataRegionPerDataNode() + : ConfigNodeDescriptor.getInstance().getConf().getSchemaRegionPerDataNode()); + prepare(replicationFactor, availableDataNodeMap, allocatedRegionGroups); + for (int i = 0; i < subGraphCount; i++) { + subGraphSearch(i); + } + if (optimalCombinationSum == Integer.MAX_VALUE) { + return new GreedyRegionGroupAllocator() + .generateOptimalRegionReplicasDistribution( + availableDataNodeMap, + freeDiskSpaceMap, + allocatedRegionGroups, + databaseAllocatedRegionGroups, + replicationFactor, + consensusGroupId); + } + List<Integer> partiteNodes = partiteGraphSearch(optimalSubDataNodes[0] % subGraphCount); + if (partiteNodes.size() < replicationFactor - subDataNodeNum) { + return new GreedyRegionGroupAllocator() + .generateOptimalRegionReplicasDistribution( + availableDataNodeMap, + freeDiskSpaceMap, + allocatedRegionGroups, + databaseAllocatedRegionGroups, + replicationFactor, + consensusGroupId); + } + + TRegionReplicaSet result = new TRegionReplicaSet(); + result.setRegionId(consensusGroupId); + for (int i = 0; i < subDataNodeNum; i++) { + result.addToDataNodeLocations( + availableDataNodeMap.get(fakeToRealIdMap.get(optimalSubDataNodes[i])).getLocation()); + } + for (int i = 0; i < replicationFactor - subDataNodeNum; i++) { + result.addToDataNodeLocations( + availableDataNodeMap.get(fakeToRealIdMap.get(partiteNodes.get(i))).getLocation()); + } + return result; + } + + private void prepare( + int replicationFactor, + Map<Integer, TDataNodeConfiguration> availableDataNodeMap, + List<TRegionReplicaSet> allocatedRegionGroups) { + + this.subGraphCount = replicationFactor / 2 + (replicationFactor % 2 == 0 ? 0 : 1); + this.replicationFactor = replicationFactor; + + this.fakeToRealIdMap = new TreeMap<>(); + this.realToFakeIdMap = new TreeMap<>(); + this.dataNodeNum = availableDataNodeMap.size(); + List<Integer> dataNodeIdList = + availableDataNodeMap.values().stream() + .map(c -> c.getLocation().getDataNodeId()) + .collect(Collectors.toList()); + for (int i = 0; i < dataNodeNum; i++) { + fakeToRealIdMap.put(i, dataNodeIdList.get(i)); + realToFakeIdMap.put(dataNodeIdList.get(i), i); + } + + // Compute regionCounter, combinationCounter and scatterWidthCounter + this.regionCounter = new int[dataNodeNum]; + Arrays.fill(regionCounter, 0); + this.combinationCounter = new int[dataNodeNum][dataNodeNum]; + for (int i = 0; i < dataNodeNum; i++) { + Arrays.fill(combinationCounter[i], 0); + } + for (TRegionReplicaSet regionReplicaSet : allocatedRegionGroups) { + List<TDataNodeLocation> dataNodeLocations = regionReplicaSet.getDataNodeLocations(); + for (int i = 0; i < dataNodeLocations.size(); i++) { + int fakeIId = realToFakeIdMap.get(dataNodeLocations.get(i).getDataNodeId()); + regionCounter[fakeIId]++; + for (int j = i + 1; j < dataNodeLocations.size(); j++) { + int fakeJId = realToFakeIdMap.get(dataNodeLocations.get(j).getDataNodeId()); + combinationCounter[fakeIId][fakeJId] = 1; + combinationCounter[fakeJId][fakeIId] = 1; + } + } + } + this.scatterWidthCounter = new int[dataNodeNum]; + Arrays.fill(scatterWidthCounter, 0); + for (int i = 0; i < dataNodeNum; i++) { + for (int j = 0; j < dataNodeNum; j++) { + scatterWidthCounter[i] += combinationCounter[i][j]; + } + } + + // Reset the optimal result + this.subDataNodeNum = replicationFactor / 2 + 1; + this.optimalCombinationSum = Integer.MAX_VALUE; + this.optimalRegionSum = Integer.MAX_VALUE; + this.optimalSubDataNodes = new int[subDataNodeNum]; + } + + private void subGraphSearch(int firstIndex) { + List<DataNodeEntry> entryList = new ArrayList<>(); + for (int i = firstIndex; i < dataNodeNum; i += subGraphCount) { + if (regionCounter[i] >= regionPerDataNode) { + continue; + } + entryList.add( + new DataNodeEntry(i, regionCounter[i], scatterWidthCounter[i], RANDOM.nextInt())); + } + if (entryList.size() < subDataNodeNum) { + return; + } + entryList.sort(DataNodeEntry::compare); + int[] subDataNodes = new int[subDataNodeNum]; + // Pick replicationFactor / 2 DataNodes with the smallest regionCount first + for (int i = 0; i < subDataNodeNum - 1; i++) { + subDataNodes[i] = entryList.get(i).fakeId; + } + int curCombinationSum = Integer.MAX_VALUE; + int curRegionSum = Integer.MAX_VALUE; + // Select the last DataNode + for (int i = subDataNodeNum - 1; i < entryList.size(); i++) { + int tmpCombinationSum = 0; + for (int j = 0; j < subDataNodeNum - 1; j++) { + tmpCombinationSum += combinationCounter[subDataNodes[j]][entryList.get(i).fakeId]; + } + if (tmpCombinationSum < curCombinationSum) { + curCombinationSum = tmpCombinationSum; + curRegionSum = entryList.get(i).regionCount; + subDataNodes[subDataNodeNum - 1] = entryList.get(i).fakeId; + } else if (tmpCombinationSum == curCombinationSum + && entryList.get(i).regionCount < curRegionSum) { + curRegionSum = entryList.get(i).regionCount; + subDataNodes[subDataNodeNum - 1] = entryList.get(i).fakeId; + } + } + for (int i = 0; i < subDataNodeNum - 1; i++) { + curRegionSum += regionCounter[subDataNodes[i]]; + for (int j = i + 1; j < subDataNodeNum - 1; j++) { + curCombinationSum += combinationCounter[subDataNodes[i]][subDataNodes[j]]; + } + } + if (curCombinationSum < optimalCombinationSum + || (curCombinationSum == optimalCombinationSum && curRegionSum < optimalRegionSum)) { + optimalCombinationSum = curCombinationSum; + optimalRegionSum = curRegionSum; + optimalSubDataNodes = Arrays.copyOf(subDataNodes, subDataNodeNum); + } else if (curCombinationSum == optimalCombinationSum + && curRegionSum == optimalRegionSum + && RANDOM.nextBoolean()) { + optimalSubDataNodes = Arrays.copyOf(subDataNodes, subDataNodeNum); + } + } + + private List<Integer> partiteGraphSearch(int selected) { + List<Integer> partiteNodes = new ArrayList<>(); + for (int partiteIndex = 0; partiteIndex < subGraphCount; partiteIndex++) { + if (partiteIndex == selected) { + continue; + } + int selectedDataNode = -1; + int bestScatterWidth = 0; + int bestRegionSum = Integer.MAX_VALUE; + for (int i = partiteIndex; i < dataNodeNum; i += subGraphCount) { + if (regionCounter[i] >= regionPerDataNode) { + continue; + } + int scatterWidth = subDataNodeNum; + for (int k = 0; k < subDataNodeNum; k++) { + scatterWidth -= combinationCounter[i][optimalSubDataNodes[k]]; + } + if (scatterWidth < bestScatterWidth) { + continue; + } + if (scatterWidth > bestScatterWidth) { + bestScatterWidth = scatterWidth; + bestRegionSum = regionCounter[i]; + selectedDataNode = i; + } else if (regionCounter[i] < bestRegionSum) { + bestRegionSum = regionCounter[i]; + selectedDataNode = i; + } else if (regionCounter[i] == bestRegionSum && RANDOM.nextBoolean()) { + selectedDataNode = i; + } + } + if (selectedDataNode == -1) { + return new ArrayList<>(); + } + partiteNodes.add(selectedDataNode); + } + return partiteNodes; + } +}
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/PartiteGraphRegionGroupAllocator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/PartiteGraphRegionGroupAllocator.java new file mode 100644 index 0000000..6d743f3 --- /dev/null +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/PartiteGraphRegionGroupAllocator.java
@@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.confignode.manager.load.balancer.region; + +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType; +import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; +import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; +import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.TreeMap; +import java.util.stream.Collectors; + +public class PartiteGraphRegionGroupAllocator implements IRegionGroupAllocator { + + private static final Random RANDOM = new Random(); + private static final GreedyRegionGroupAllocator GREEDY_ALLOCATOR = + new GreedyRegionGroupAllocator(); + + private int subGraphCount; + private int replicationFactor; + private int regionPerDataNode; + + private int dataNodeNum; + // The number of allocated Regions in each DataNode + private int[] regionCounter; + // The number of 2-Region combinations in current cluster + private int[][] combinationCounter; + private Map<Integer, Integer> fakeToRealIdMap; + private Map<Integer, Integer> realToFakeIdMap; + + private int subDataNodeNum; + // First Key: the sum of overlapped 2-Region combination Regions with + // other allocated RegionGroups is minimal + private int optimalCombinationSum; + // Second Key: the sum of DataRegions in selected DataNodes is minimal + private int optimalRegionSum; + private int[] optimalSubDataNodes; + + @Override + public TRegionReplicaSet generateOptimalRegionReplicasDistribution( + Map<Integer, TDataNodeConfiguration> availableDataNodeMap, + Map<Integer, Double> freeDiskSpaceMap, + List<TRegionReplicaSet> allocatedRegionGroups, + List<TRegionReplicaSet> databaseAllocatedRegionGroups, + int replicationFactor, + TConsensusGroupId consensusGroupId) { + + this.regionPerDataNode = + (int) + (consensusGroupId.getType().equals(TConsensusGroupType.DataRegion) + ? ConfigNodeDescriptor.getInstance().getConf().getDataRegionPerDataNode() + : ConfigNodeDescriptor.getInstance().getConf().getSchemaRegionPerDataNode()); + prepare(replicationFactor, availableDataNodeMap, allocatedRegionGroups); + + for (int i = 0; i < subGraphCount; i++) { + subGraphSearch(i, 0, subDataNodeNum, 0, 0, new int[subDataNodeNum]); + } + if (optimalCombinationSum == Integer.MAX_VALUE) { + return GREEDY_ALLOCATOR.generateOptimalRegionReplicasDistribution( + availableDataNodeMap, + freeDiskSpaceMap, + allocatedRegionGroups, + databaseAllocatedRegionGroups, + replicationFactor, + consensusGroupId); + } + + List<Integer> partiteNodes = partiteGraphSearch(optimalSubDataNodes[0] % subGraphCount); + if (partiteNodes.size() < replicationFactor - subDataNodeNum) { + return GREEDY_ALLOCATOR.generateOptimalRegionReplicasDistribution( + availableDataNodeMap, + freeDiskSpaceMap, + allocatedRegionGroups, + databaseAllocatedRegionGroups, + replicationFactor, + consensusGroupId); + } + + TRegionReplicaSet result = new TRegionReplicaSet(); + result.setRegionId(consensusGroupId); + for (int i = 0; i < subDataNodeNum; i++) { + result.addToDataNodeLocations( + availableDataNodeMap.get(fakeToRealIdMap.get(optimalSubDataNodes[i])).getLocation()); + } + for (int i = 0; i < replicationFactor - subDataNodeNum; i++) { + result.addToDataNodeLocations( + availableDataNodeMap.get(fakeToRealIdMap.get(partiteNodes.get(i))).getLocation()); + } + return result; + } + + private void prepare( + int replicationFactor, + Map<Integer, TDataNodeConfiguration> availableDataNodeMap, + List<TRegionReplicaSet> allocatedRegionGroups) { + + this.subGraphCount = replicationFactor / 2 + (replicationFactor % 2 == 0 ? 0 : 1); + this.replicationFactor = replicationFactor; + + this.fakeToRealIdMap = new TreeMap<>(); + this.realToFakeIdMap = new TreeMap<>(); + this.dataNodeNum = availableDataNodeMap.size(); + List<Integer> dataNodeIdList = + availableDataNodeMap.values().stream() + .map(c -> c.getLocation().getDataNodeId()) + .collect(Collectors.toList()); + for (int i = 0; i < dataNodeNum; i++) { + fakeToRealIdMap.put(i, dataNodeIdList.get(i)); + realToFakeIdMap.put(dataNodeIdList.get(i), i); + } + + // Compute regionCounter, databaseRegionCounter and combinationCounter + this.regionCounter = new int[dataNodeNum]; + Arrays.fill(regionCounter, 0); + this.combinationCounter = new int[dataNodeNum][dataNodeNum]; + for (int i = 0; i < dataNodeNum; i++) { + Arrays.fill(combinationCounter[i], 0); + } + for (TRegionReplicaSet regionReplicaSet : allocatedRegionGroups) { + List<TDataNodeLocation> dataNodeLocations = regionReplicaSet.getDataNodeLocations(); + for (int i = 0; i < dataNodeLocations.size(); i++) { + int fakeIId = realToFakeIdMap.get(dataNodeLocations.get(i).getDataNodeId()); + regionCounter[fakeIId]++; + for (int j = i + 1; j < dataNodeLocations.size(); j++) { + int fakeJId = realToFakeIdMap.get(dataNodeLocations.get(j).getDataNodeId()); + combinationCounter[fakeIId][fakeJId] = 1; + combinationCounter[fakeJId][fakeIId] = 1; + } + } + } + + // Reset the optimal result + this.subDataNodeNum = replicationFactor / 2 + 1; + this.optimalCombinationSum = Integer.MAX_VALUE; + this.optimalRegionSum = Integer.MAX_VALUE; + this.optimalSubDataNodes = new int[subDataNodeNum]; + } + + private void subGraphSearch( + int firstIndex, + int currentReplica, + int replicaNum, + int combinationSum, + int regionSum, + int[] currentReplicaSet) { + + if (currentReplica == replicaNum) { + if (combinationSum < optimalCombinationSum + || (combinationSum == optimalCombinationSum && regionSum < optimalRegionSum)) { + // Reset the optimal result when a better one is found + optimalCombinationSum = combinationSum; + optimalRegionSum = regionSum; + optimalSubDataNodes = Arrays.copyOf(currentReplicaSet, replicationFactor); + } else if (combinationSum == optimalCombinationSum + && regionSum == optimalRegionSum + && RANDOM.nextBoolean()) { + optimalSubDataNodes = Arrays.copyOf(currentReplicaSet, replicationFactor); + } + return; + } + + for (int i = firstIndex; i < dataNodeNum; i += subGraphCount) { + if (regionCounter[i] >= regionPerDataNode) { + // Pruning: skip full DataNodes + continue; + } + int nxtCombinationSum = combinationSum; + for (int j = 0; j < currentReplica; j++) { + nxtCombinationSum += combinationCounter[i][currentReplicaSet[j]]; + } + if (combinationSum > optimalCombinationSum) { + // Pruning: no needs for further searching when the first key + // is bigger than the historical optimal result + return; + } + int nxtRegionSum = regionSum + regionCounter[i]; + if (combinationSum == optimalCombinationSum && regionSum > optimalRegionSum) { + // Pruning: no needs for further searching when the second key + // is bigger than the historical optimal result + return; + } + currentReplicaSet[currentReplica] = i; + subGraphSearch( + i + subGraphCount, + currentReplica + 1, + replicaNum, + nxtCombinationSum, + nxtRegionSum, + currentReplicaSet); + } + } + + private List<Integer> partiteGraphSearch(int selected) { + List<Integer> partiteNodes = new ArrayList<>(); + for (int partiteIndex = 0; partiteIndex < subGraphCount; partiteIndex++) { + if (partiteIndex == selected) { + continue; + } + int selectedDataNode = -1; + int bestScatterWidth = 0; + int bestRegionSum = Integer.MAX_VALUE; + for (int i = partiteIndex; i < dataNodeNum; i += subGraphCount) { + if (regionCounter[i] >= regionPerDataNode) { + continue; + } + int scatterWidth = subDataNodeNum; + for (int k = 0; k < subDataNodeNum; k++) { + scatterWidth -= combinationCounter[i][optimalSubDataNodes[k]]; + } + if (scatterWidth < bestScatterWidth) { + continue; + } + if (scatterWidth > bestScatterWidth) { + bestScatterWidth = scatterWidth; + bestRegionSum = regionCounter[i]; + selectedDataNode = i; + } else if (regionCounter[i] < bestRegionSum) { + bestRegionSum = regionCounter[i]; + selectedDataNode = i; + } else if (regionCounter[i] == bestRegionSum && RANDOM.nextBoolean()) { + selectedDataNode = i; + } + } + if (selectedDataNode == -1) { + return new ArrayList<>(); + } + partiteNodes.add(selectedDataNode); + } + return partiteNodes; + } +}
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java index 7b5914d..2afcb92 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/AbstractLeaderBalancer.java
@@ -71,6 +71,39 @@ Map<Integer, NodeStatistics> dataNodeStatisticsMap, Map<TConsensusGroupId, Map<Integer, RegionStatistics>> regionStatisticsMap) { + // LOGGER.info( + // "[LeaderDebug] databaseRegionGroupMap: {}", + // Arrays.stream( + // databaseRegionGroupMap.values().stream() + // .flatMap(List::stream) + // .collect(Collectors.toSet()) + // .stream() + // .mapToInt(TConsensusGroupId::getId) + // .toArray()) + // .sorted() + // .toArray()); + // LOGGER.info( + // "[LeaderDebug] regionLocationMap: {}", + // Arrays.stream( + // + // regionLocationMap.keySet().stream().mapToInt(TConsensusGroupId::getId).toArray()) + // .sorted() + // .toArray()); + // LOGGER.info( + // "[LeaderDebug] regionLeaderMap: {}", + // Arrays.stream( + // + // regionLeaderMap.keySet().stream().mapToInt(TConsensusGroupId::getId).toArray()) + // .sorted() + // .toArray()); + // LOGGER.info( + // "[LeaderDebug] regionStatisticsMap: {}", + // Arrays.stream( + // + // regionStatisticsMap.keySet().stream().mapToInt(TConsensusGroupId::getId).toArray()) + // .sorted() + // .toArray()); + this.databaseRegionGroupMap.putAll(databaseRegionGroupMap); this.regionLocationMap.putAll(regionLocationMap); this.regionLeaderMap.putAll(regionLeaderMap); @@ -86,6 +119,18 @@ regionGroupUnionSet.addAll(regionLocationMap.keySet()); regionGroupUnionSet.addAll(regionLeaderMap.keySet()); regionGroupUnionSet.addAll(regionStatisticsMap.keySet()); + // LOGGER.info( + // "[LeaderDebug] regionGroupIntersection: {}", + // + // Arrays.stream(regionGroupIntersection.stream().mapToInt(TConsensusGroupId::getId).toArray()) + // .sorted() + // .toArray()); + // LOGGER.info( + // "[LeaderDebug] regionGroupUnionSet: {}", + // + // Arrays.stream(regionGroupUnionSet.stream().mapToInt(TConsensusGroupId::getId).toArray()) + // .sorted() + // .toArray()); Set<TConsensusGroupId> differenceSet = regionGroupUnionSet.stream() .filter(e -> !regionGroupIntersection.contains(e))
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java index d166d4b..4232c38 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/cache/region/RegionGroupCache.java
@@ -125,7 +125,7 @@ // all Regions are in the Running status return RegionGroupStatus.Running; } else if (readonlyCount == 0) { - return unknownCount <= ((regionCacheMap.size() - 1) / 2) + return unknownCount <= (regionCacheMap.size() / 2) // The RegionGroup is considered as Available when the number of Unknown Regions is less // than half ? RegionGroupStatus.Available
diff --git a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/RegionGroupAllocatorSimulation.java b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/RegionGroupAllocatorSimulation.java index c2f7566..f42516d 100644 --- a/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/RegionGroupAllocatorSimulation.java +++ b/iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/manager/load/balancer/region/RegionGroupAllocatorSimulation.java
@@ -31,6 +31,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.FileWriter; import java.io.IOException; import java.util.ArrayList; import java.util.BitSet; @@ -46,11 +47,11 @@ private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf(); private static final int TEST_LOOP = 1; // private static final double EXAM_LOOP = 100000; - private static final int MIN_DATA_NODE_NUM = 1; - private static final int MAX_DATA_NODE_NUM = 100; - private static final int MIN_DATA_REGION_PER_DATA_NODE = 1; - private static final int MAX_DATA_REGION_PER_DATA_NODE = 10; - private static final int DATA_REPLICATION_FACTOR = 2; + private static final int MIN_DATA_NODE_NUM = 7; + private static final int MAX_DATA_NODE_NUM = 7; + private static final int MIN_DATA_REGION_PER_DATA_NODE = 3; + private static final int MAX_DATA_REGION_PER_DATA_NODE = 3; + private static final int DATA_REPLICATION_FACTOR = 3; private static final Map<Integer, TDataNodeConfiguration> AVAILABLE_DATA_NODE_MAP = new TreeMap<>(); @@ -59,7 +60,7 @@ public static class DataEntry { public final Integer N; public final Integer W; - public final Integer minScatterWidth; + public final Double minScatterRatio; // public final List<Double> disabledPercent; @@ -69,10 +70,10 @@ // this.minScatterWidth = minScatterWidth; // this.disabledPercent = disabledPercent; // } - private DataEntry(int N, int W, int minScatterWidth) { + private DataEntry(int N, int W, double minScatterRatio) { this.N = N; this.W = W; - this.minScatterWidth = minScatterWidth; + this.minScatterRatio = minScatterRatio; } } @@ -81,7 +82,7 @@ List<DataEntry> testResult = new ArrayList<>(); for (int dataNodeNum = MIN_DATA_NODE_NUM; dataNodeNum <= MAX_DATA_NODE_NUM; dataNodeNum++) { for (int dataRegionPerDataNode = MIN_DATA_REGION_PER_DATA_NODE; - dataRegionPerDataNode <= Math.min(MAX_DATA_REGION_PER_DATA_NODE, dataNodeNum); + dataRegionPerDataNode <= MAX_DATA_REGION_PER_DATA_NODE; dataRegionPerDataNode++) { CONF.setDataRegionPerDataNode(dataRegionPerDataNode); testResult.add(singleTest(dataNodeNum, dataRegionPerDataNode)); @@ -89,21 +90,21 @@ // LOGGER.info("{}, finish", dataNodeNum); } - // FileWriter scatterW = - // new FileWriter( - // "/Users/yongzaodan/Desktop/simulation/psr-simulate/scatter/r=" - // + DATA_REPLICATION_FACTOR - // + ".log"); - // for (DataEntry entry : testResult) { - // scatterW.write(entry.minScatterWidth + "\n"); - // scatterW.flush(); - // } - // scatterW.close(); +// FileWriter scatterW = +// new FileWriter( +// "/Users/yongzaodan/Desktop/simulation/psr-simulate/scatter/r=" +// + DATA_REPLICATION_FACTOR +// + ".log"); +// for (DataEntry entry : testResult) { +// scatterW.write(entry.minScatterRatio + "\n"); +// scatterW.flush(); +// } +// scatterW.close(); } private DataEntry singleTest(int N, int W) { if (N < DATA_REPLICATION_FACTOR) { - return new DataEntry(N, W, 0); + return new DataEntry(N, W, 1.0); } // Construct N DataNodes Random random = new Random(); @@ -119,9 +120,10 @@ final int dataRegionGroupNum = W * N / DATA_REPLICATION_FACTOR; List<Integer> regionCountList = new ArrayList<>(); List<Integer> scatterWidthList = new ArrayList<>(); + double minScatterRatio = 1.0; for (int loop = 1; loop <= TEST_LOOP; loop++) { List<TRegionReplicaSet> allocateResult = new ArrayList<>(); - IRegionGroupAllocator ALLOCATOR = new TieredReplicationAllocator(); + IRegionGroupAllocator ALLOCATOR = new PGRA(); for (int index = 0; index < dataRegionGroupNum; index++) { allocateResult.add( ALLOCATOR.generateOptimalRegionReplicasDistribution( @@ -166,6 +168,10 @@ for (int i = 1; i <= N; i++) { int scatterWidth = scatterWidthMap.containsKey(i) ? scatterWidthMap.get(i).cardinality() : 0; + if (regionCounter.getOrDefault(i, 0) > 0) { + int expMaxScatter = Math.min(regionCounter.get(i) * (DATA_REPLICATION_FACTOR - 1), N - 1); + minScatterRatio = Math.min(minScatterRatio, (double) scatterWidth / expMaxScatter); + } int expScatter = Math.min(Math.max(regionCounter.getOrDefault(i, 0) - 1, 0) * u, N - 1); if (scatterWidth < expScatter) { passScatter = false; @@ -177,11 +183,12 @@ scatterWidthList.add(scatterWidth); } - // for (TRegionReplicaSet regionReplicaSet : allocateResult) { - // LOGGER.info("{}", - // - // regionReplicaSet.getDataNodeLocations().stream().mapToInt(TDataNodeLocation::getDataNodeId).toArray()); - // } + for (TRegionReplicaSet regionReplicaSet : allocateResult) { + LOGGER.info("{}", + + + regionReplicaSet.getDataNodeLocations().stream().mapToInt(TDataNodeLocation::getDataNodeId).toArray()); + } } int regionRange = @@ -204,6 +211,6 @@ // regionCountList.stream().mapToInt(Integer::intValue).min().orElse(0), // regionCountList.stream().mapToInt(Integer::intValue).max().orElse(0), // minScatter); - return new DataEntry(N, W, minScatter); + return new DataEntry(N, W, minScatterRatio); } }
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java index 166fca9..9c2fbe1 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/cache/partition/PartitionCache.java
@@ -20,6 +20,7 @@ package org.apache.iotdb.db.queryengine.plan.analyze.cache.partition; import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId; +import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; import org.apache.iotdb.common.rpc.thrift.TSStatus; import org.apache.iotdb.common.rpc.thrift.TSeriesPartitionSlot; @@ -483,6 +484,14 @@ if (result) { groupIdToReplicaSetMap.clear(); groupIdToReplicaSetMap.putAll(map); + for (Map.Entry<TConsensusGroupId, TRegionReplicaSet> routeEntry : map.entrySet()) { + logger.info( + "[RouteMap] {}: {}", + routeEntry.getKey(), + routeEntry.getValue().getDataNodeLocations().stream() + .mapToInt(TDataNodeLocation::getDataNodeId) + .toArray()); + } } return result; } finally {