update algorithm
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java index f138143..38a5215 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/ConfigNodeDescriptor.java
@@ -359,8 +359,7 @@ } else { throw new IOException( String.format( - "Unknown leader_distribution_policy: %s, " - + "please set to \"GREEDY\" or \"CFD\"", + "Unknown leader_distribution_policy: %s, " + "please set to \"GREEDY\" or \"CFD\"", leaderDistributionPolicy)); }
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java index 4623c94..22dfa1d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/region/GreedyCopySetRegionGroupAllocator.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration; import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; -import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor; import java.util.ArrayList; import java.util.Arrays; @@ -40,47 +39,36 @@ public class GreedyCopySetRegionGroupAllocator implements IRegionGroupAllocator { private static final Random RANDOM = new Random(); - private static final int GCR_MAX_OPTIMAL_PLAN_NUM = - ConfigNodeDescriptor.getInstance().getConf().getGcrMaxOptimalPlanNum(); private int replicationFactor; // Sorted available DataNodeIds private int[] dataNodeIds; // The number of allocated Regions in each DataNode private int[] regionCounter; - // The number of allocated Regions in each DataNode within the same Database - private int[] databaseRegionCounter; // The number of 2-Region combinations in current cluster private int[][] combinationCounter; - private int maxDataNodeId; - // First Key: the sum of Regions at the DataNodes within the same Database in the allocation - // result is minimal - int optimalDatabaseRegionSum; - // Second Key: the sum of Regions at the DataNodes in the allocation result is minimal + // First Key: the sum of Regions at the DataNodes in the allocation result is minimal int optimalRegionSum; - // Third Key: the sum of overlapped 2-Region combination Regions with other allocated + // Second Key: the sum of overlapped 2-Region combination Regions with other allocated // RegionGroups is minimal int optimalCombinationSum; List<int[]> optimalReplicaSets; + private static final int MAX_OPTIMAL_PLAN_NUM = 10; private static class DataNodeEntry { private final int dataNodeId; - // First key: the number of Regions in the DataNode within the same Database - private final int databaseRegionCount; - // Second key: the number of Regions in the DataNode + // First key: the number of Regions in the DataNode private final int regionCount; - // Third key: the scatter width of the DataNode + // Second key: the scatter width of the DataNode private final int scatterWidth; - // Forth key: a random weight + // Third key: a random weight private final int randomWeight; - public DataNodeEntry( - int dataNodeId, int databaseRegionCount, int regionCount, int scatterWidth) { + public DataNodeEntry(int dataNodeId, int regionCount, int scatterWidth) { this.dataNodeId = dataNodeId; - this.databaseRegionCount = databaseRegionCount; this.regionCount = regionCount; this.scatterWidth = scatterWidth; this.randomWeight = RANDOM.nextInt(); @@ -91,13 +79,11 @@ } public int compare(DataNodeEntry e) { - return databaseRegionCount != e.databaseRegionCount - ? Integer.compare(databaseRegionCount, e.databaseRegionCount) - : regionCount != e.regionCount - ? Integer.compare(regionCount, e.regionCount) - : scatterWidth != e.scatterWidth - ? Integer.compare(scatterWidth, e.scatterWidth) - : Integer.compare(randomWeight, e.randomWeight); + return regionCount != e.regionCount + ? Integer.compare(regionCount, e.regionCount) + : scatterWidth != e.scatterWidth + ? Integer.compare(scatterWidth, e.scatterWidth) + : Integer.compare(randomWeight, e.randomWeight); } } @@ -114,12 +100,8 @@ int replicationFactor, TConsensusGroupId consensusGroupId) { try { - prepare( - replicationFactor, - availableDataNodeMap, - allocatedRegionGroups, - databaseAllocatedRegionGroups); - dfs(-1, 0, new int[replicationFactor], 0, 0); + prepare(replicationFactor, availableDataNodeMap, allocatedRegionGroups); + dfs(-1, 0, new int[replicationFactor], 0); // Randomly pick one optimal plan as result Collections.shuffle(optimalReplicaSets); @@ -129,7 +111,6 @@ for (int i = 0; i < replicationFactor; i++) { result.addToDataNodeLocations(availableDataNodeMap.get(optimalReplicaSet[i]).getLocation()); } - return result; } finally { clear(); @@ -142,13 +123,11 @@ * @param replicationFactor replication factor in the cluster * @param availableDataNodeMap currently available DataNodes, ensure size() >= replicationFactor * @param allocatedRegionGroups already allocated RegionGroups in the cluster - * @param databaseAllocatedRegionGroups already allocated RegionGroups in the same Database */ private void prepare( int replicationFactor, Map<Integer, TDataNodeConfiguration> availableDataNodeMap, - List<TRegionReplicaSet> allocatedRegionGroups, - List<TRegionReplicaSet> databaseAllocatedRegionGroups) { + List<TRegionReplicaSet> allocatedRegionGroups) { this.replicationFactor = replicationFactor; // Store the maximum DataNodeId @@ -160,13 +139,10 @@ .mapToInt(TDataNodeLocation::getDataNodeId) .max() .orElse(0)); - this.maxDataNodeId = maxDataNodeId; - // Compute regionCounter, databaseRegionCounter and combinationCounter + // Compute regionCounter and combinationCounter regionCounter = new int[maxDataNodeId + 1]; Arrays.fill(regionCounter, 0); - databaseRegionCounter = new int[maxDataNodeId + 1]; - Arrays.fill(databaseRegionCounter, 0); combinationCounter = new int[maxDataNodeId + 1][maxDataNodeId + 1]; for (int i = 0; i <= maxDataNodeId; i++) { Arrays.fill(combinationCounter[i], 0); @@ -183,12 +159,6 @@ } } } - for (TRegionReplicaSet regionReplicaSet : databaseAllocatedRegionGroups) { - List<TDataNodeLocation> dataNodeLocations = regionReplicaSet.getDataNodeLocations(); - for (TDataNodeLocation dataNodeLocation : dataNodeLocations) { - databaseRegionCounter[dataNodeLocation.getDataNodeId()]++; - } - } // Compute the DataNodeIds through sorting the DataNodeEntryMap Map<Integer, DataNodeEntry> dataNodeEntryMap = new HashMap<>(maxDataNodeId + 1); @@ -206,11 +176,7 @@ } dataNodeEntryMap.put( dataNodeId, - new DataNodeEntry( - dataNodeId, - databaseRegionCounter[dataNodeId], - regionCounter[dataNodeId], - scatterWidth)); + new DataNodeEntry(dataNodeId, regionCounter[dataNodeId], scatterWidth)); }); dataNodeIds = dataNodeEntryMap.entrySet().stream() @@ -222,7 +188,6 @@ .toArray(); // Reset the optimal result - optimalDatabaseRegionSum = Integer.MAX_VALUE; optimalRegionSum = Integer.MAX_VALUE; optimalCombinationSum = Integer.MAX_VALUE; optimalReplicaSets = new ArrayList<>(); @@ -236,27 +201,14 @@ * @param lastIndex last decided index in dataNodeIds * @param currentReplica current replica index * @param currentReplicaSet current allocation plan - * @param databaseRegionSum the sum of Regions at the DataNodes within the same Database in the - * current allocation plan * @param regionSum the sum of Regions at the DataNodes in the current allocation plan */ - private void dfs( - int lastIndex, - int currentReplica, - int[] currentReplicaSet, - int databaseRegionSum, - int regionSum) { - if (databaseRegionSum > optimalDatabaseRegionSum) { + private void dfs(int lastIndex, int currentReplica, int[] currentReplicaSet, int regionSum) { + if (regionSum > optimalRegionSum) { // Pruning: no needs for further searching when the first key // is bigger than the historical optimal result return; } - if (databaseRegionSum == optimalDatabaseRegionSum && regionSum > optimalRegionSum) { - // Pruning: no needs for further searching when the first key is equal to the historical - // optimal result - // and the second key is bigger than the historical optimal result - return; - } if (currentReplica == replicationFactor) { // A complete allocation plan is found @@ -266,17 +218,14 @@ combinationSum += combinationCounter[currentReplicaSet[i]][currentReplicaSet[j]]; } } - if (databaseRegionSum == optimalDatabaseRegionSum - && regionSum == optimalRegionSum - && combinationSum > optimalCombinationSum) { + if (combinationSum > optimalCombinationSum) { + // Pruning: no needs for further searching when the second key + // is bigger than the historical optimal result return; } - if (databaseRegionSum < optimalDatabaseRegionSum - || regionSum < optimalRegionSum - || combinationSum < optimalCombinationSum) { + if (regionSum < optimalRegionSum || combinationSum < optimalCombinationSum) { // Reset the optimal result when a better one is found - optimalDatabaseRegionSum = databaseRegionSum; optimalRegionSum = regionSum; optimalCombinationSum = combinationSum; optimalReplicaSets.clear(); @@ -288,13 +237,8 @@ for (int i = lastIndex + 1; i < dataNodeIds.length; i++) { // Decide the next DataNodeId in the allocation plan currentReplicaSet[currentReplica] = dataNodeIds[i]; - dfs( - i, - currentReplica + 1, - currentReplicaSet, - databaseRegionSum + databaseRegionCounter[dataNodeIds[i]], - regionSum + regionCounter[dataNodeIds[i]]); - if (optimalReplicaSets.size() == GCR_MAX_OPTIMAL_PLAN_NUM) { + dfs(i, currentReplica + 1, currentReplicaSet, regionSum + regionCounter[dataNodeIds[i]]); + if (optimalReplicaSets.size() == MAX_OPTIMAL_PLAN_NUM) { // Pruning: no needs for further searching when // the number of optimal plans reaches the limitation return;
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java index 1561862..4a28fb6 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/GreedyLeaderBalancer.java
@@ -23,7 +23,6 @@ import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation; import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List;
diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java index 517f35a..3bfb76d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/load/balancer/router/leader/MinCostFlowLeaderBalancer.java
@@ -25,14 +25,15 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Queue; import java.util.Set; -import java.util.TreeMap; -import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; /** Leader distribution balancer that uses minimum cost flow algorithm */ public class MinCostFlowLeaderBalancer implements ILeaderBalancer { @@ -40,9 +41,8 @@ private static final int INFINITY = Integer.MAX_VALUE; /** Input parameters */ - private final Map<String, List<TConsensusGroupId>> databaseRegionGroupMap; - private final Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap; + private final Map<TConsensusGroupId, Integer> regionLeaderMap; private final Set<Integer> disabledDataNodeSet; @@ -55,12 +55,10 @@ private int maxNode = T_NODE + 1; // Map<RegionGroupId, rNode> private final Map<TConsensusGroupId, Integer> rNodeMap; - // Map<Database, Map<DataNodeId, sDNode>> - private final Map<String, Map<Integer, Integer>> sDNodeMap; - // Map<Database, Map<sDNode, DataNodeId>> - private final Map<String, Map<Integer, Integer>> sDNodeReflect; - // Map<DataNodeId, tDNode> - private final Map<Integer, Integer> tDNodeMap; + // Map<DataNodeId, dNode> + private final Map<Integer, Integer> dNodeMap; + // Map<dNode, DataNodeId> + private final Map<Integer, Integer> dNodeReflect; /** Graph edges */ // Maximum index of graph edges @@ -77,14 +75,12 @@ private int minimumCost = 0; public MinCostFlowLeaderBalancer() { - this.databaseRegionGroupMap = new TreeMap<>(); - this.regionReplicaSetMap = new TreeMap<>(); - this.regionLeaderMap = new TreeMap<>(); - this.disabledDataNodeSet = new TreeSet<>(); - this.rNodeMap = new TreeMap<>(); - this.sDNodeMap = new TreeMap<>(); - this.sDNodeReflect = new TreeMap<>(); - this.tDNodeMap = new TreeMap<>(); + this.regionReplicaSetMap = new HashMap<>(); + this.regionLeaderMap = new HashMap<>(); + this.disabledDataNodeSet = new HashSet<>(); + this.rNodeMap = new HashMap<>(); + this.dNodeMap = new HashMap<>(); + this.dNodeReflect = new HashMap<>(); this.minCostFlowEdges = new ArrayList<>(); } @@ -95,7 +91,7 @@ Map<TConsensusGroupId, Integer> regionLeaderMap, Set<Integer> disabledDataNodeSet) { - initialize(databaseRegionGroupMap, regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet); + initialize(regionReplicaSetMap, regionLeaderMap, disabledDataNodeSet); Map<TConsensusGroupId, Integer> result; constructMCFGraph(); @@ -107,26 +103,21 @@ } private void initialize( - Map<String, List<TConsensusGroupId>> databaseRegionGroupMap, Map<TConsensusGroupId, TRegionReplicaSet> regionReplicaSetMap, Map<TConsensusGroupId, Integer> regionLeaderMap, Set<Integer> disabledDataNodeSet) { - this.databaseRegionGroupMap.putAll(databaseRegionGroupMap); this.regionReplicaSetMap.putAll(regionReplicaSetMap); this.regionLeaderMap.putAll(regionLeaderMap); this.disabledDataNodeSet.addAll(disabledDataNodeSet); } private void clear() { - this.databaseRegionGroupMap.clear(); this.regionReplicaSetMap.clear(); this.regionLeaderMap.clear(); this.disabledDataNodeSet.clear(); - this.rNodeMap.clear(); - this.sDNodeMap.clear(); - this.sDNodeReflect.clear(); - this.tDNodeMap.clear(); + this.dNodeMap.clear(); + this.dNodeReflect.clear(); this.minCostFlowEdges.clear(); this.nodeHeadEdge = null; @@ -143,30 +134,13 @@ this.minimumCost = 0; /* Indicate nodes in mcf */ - for (Map.Entry<String, List<TConsensusGroupId>> databaseEntry : - databaseRegionGroupMap.entrySet()) { - String database = databaseEntry.getKey(); - sDNodeMap.put(database, new TreeMap<>()); - sDNodeReflect.put(database, new TreeMap<>()); - List<TConsensusGroupId> regionGroupIds = databaseEntry.getValue(); - for (TConsensusGroupId regionGroupId : regionGroupIds) { - rNodeMap.put(regionGroupId, maxNode++); - for (TDataNodeLocation dataNodeLocation : - regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) { - int dataNodeId = dataNodeLocation.getDataNodeId(); - if (disabledDataNodeSet.contains(dataNodeId)) { - // Skip disabled DataNode - continue; - } - if (!sDNodeMap.get(database).containsKey(dataNodeId)) { - sDNodeMap.get(database).put(dataNodeId, maxNode); - sDNodeReflect.get(database).put(maxNode, dataNodeId); - maxNode += 1; - } - if (!tDNodeMap.containsKey(dataNodeId)) { - tDNodeMap.put(dataNodeId, maxNode); - maxNode += 1; - } + for (TRegionReplicaSet regionReplicaSet : regionReplicaSetMap.values()) { + rNodeMap.put(regionReplicaSet.getRegionId(), maxNode++); + for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) { + if (!dNodeMap.containsKey(dataNodeLocation.getDataNodeId())) { + dNodeMap.put(dataNodeLocation.getDataNodeId(), maxNode); + dNodeReflect.put(maxNode, dataNodeLocation.getDataNodeId()); + maxNode += 1; } } } @@ -180,74 +154,57 @@ /* Construct edges: sNode -> rNodes */ for (int rNode : rNodeMap.values()) { - // Capacity: 1, Cost: 0, each RegionGroup should elect exactly 1 leader + // Cost: 0 addAdjacentEdges(S_NODE, rNode, 1, 0); } - /* Construct edges: rNodes -> sdNodes */ - for (Map.Entry<String, List<TConsensusGroupId>> databaseEntry : - databaseRegionGroupMap.entrySet()) { - String database = databaseEntry.getKey(); - for (TConsensusGroupId regionGroupId : databaseEntry.getValue()) { - int rNode = rNodeMap.get(regionGroupId); - for (TDataNodeLocation dataNodeLocation : - regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) { - int dataNodeId = dataNodeLocation.getDataNodeId(); - if (disabledDataNodeSet.contains(dataNodeId)) { - // Skip disabled DataNode - continue; - } - int sDNode = sDNodeMap.get(database).get(dataNodeId); - // Capacity: 1, Cost: 1 if sDNode is the current leader of the rNode, 0 otherwise. - // Therefore, the RegionGroup will keep the leader as constant as possible. - int cost = regionLeaderMap.getOrDefault(regionGroupId, -1) == dataNodeId ? 0 : 1; - addAdjacentEdges(rNode, sDNode, 1, cost); - } - } - } - - /* Construct edges: sDNodes -> tDNodes */ - for (Map.Entry<String, List<TConsensusGroupId>> databaseEntry : - databaseRegionGroupMap.entrySet()) { - String database = databaseEntry.getKey(); - // Map<DataNodeId, leader number> - Map<Integer, Integer> leaderCounter = new TreeMap<>(); - for (TConsensusGroupId regionGroupId : databaseEntry.getValue()) { - for (TDataNodeLocation dataNodeLocation : - regionReplicaSetMap.get(regionGroupId).getDataNodeLocations()) { - int dataNodeId = dataNodeLocation.getDataNodeId(); - if (disabledDataNodeSet.contains(dataNodeId)) { - // Skip disabled DataNode - continue; - } - int sDNode = sDNodeMap.get(database).get(dataNodeId); - int tDNode = tDNodeMap.get(dataNodeId); - int leaderCount = leaderCounter.merge(dataNodeId, 1, Integer::sum); - // Capacity: 1, Cost: x^2 for the x-th edge at the current sDNode. - // Thus, the leader distribution will be as balance as possible within each Database - // based on the Jensen's-Inequality. - addAdjacentEdges(sDNode, tDNode, 1, leaderCount * leaderCount); - } - } - } - - /* Construct edges: tDNodes -> tNode */ - // Map<DataNodeId, possible max leader> Count the possible maximum number of leader in each - // DataNode - Map<Integer, Integer> maxLeaderCounter = new TreeMap<>(); + /* Construct edges: rNodes -> dNodes */ for (TRegionReplicaSet regionReplicaSet : regionReplicaSetMap.values()) { + int rNode = rNodeMap.get(regionReplicaSet.getRegionId()); for (TDataNodeLocation dataNodeLocation : regionReplicaSet.getDataNodeLocations()) { - int dataNodeId = dataNodeLocation.getDataNodeId(); - if (disabledDataNodeSet.contains(dataNodeId)) { - // Skip disabled DataNode - continue; - } - int tDNode = tDNodeMap.get(dataNodeId); - int leaderCount = maxLeaderCounter.merge(dataNodeId, 1, Integer::sum); + int dNode = dNodeMap.get(dataNodeLocation.getDataNodeId()); + // Cost: 1 if the dNode is corresponded to the current leader of the rNode, + // 0 otherwise. + // Therefore, the RegionGroup will keep the leader as constant as possible. + int cost = + regionLeaderMap.getOrDefault(regionReplicaSet.getRegionId(), -1) + == dataNodeLocation.getDataNodeId() + ? 0 + : 1; + addAdjacentEdges(rNode, dNode, 1, cost); + } + } + + /* Construct edges: dNodes -> tNode */ + // Count the possible maximum number of leader in each DataNode + Map<Integer, AtomicInteger> maxLeaderCounter = new ConcurrentHashMap<>(); + regionReplicaSetMap + .values() + .forEach( + regionReplicaSet -> + regionReplicaSet + .getDataNodeLocations() + .forEach( + dataNodeLocation -> + maxLeaderCounter + .computeIfAbsent( + dataNodeLocation.getDataNodeId(), empty -> new AtomicInteger(0)) + .getAndIncrement())); + + for (Map.Entry<Integer, Integer> dNodeEntry : dNodeMap.entrySet()) { + int dataNodeId = dNodeEntry.getKey(); + int dNode = dNodeEntry.getValue(); + + if (disabledDataNodeSet.contains(dataNodeId)) { + // Skip disabled DataNode + continue; + } + + int maxLeaderCount = maxLeaderCounter.get(dataNodeId).get(); + for (int extraEdge = 1; extraEdge <= maxLeaderCount; extraEdge++) { // Cost: x^2 for the x-th edge at the current dNode. - // Thus, the leader distribution will be as balance as possible within the cluster - // Based on the Jensen's-Inequality. - addAdjacentEdges(tDNode, T_NODE, 1, leaderCount * leaderCount); + // Thus, the leader distribution will be as balance as possible. + addAdjacentEdges(dNode, T_NODE, 1, extraEdge * extraEdge); } } } @@ -354,24 +311,22 @@ private Map<TConsensusGroupId, Integer> collectLeaderDistribution() { Map<TConsensusGroupId, Integer> result = new ConcurrentHashMap<>(); - databaseRegionGroupMap.forEach( - (database, regionGroupIds) -> - regionGroupIds.forEach( - regionGroupId -> { - boolean matchLeader = false; - for (int currentEdge = nodeHeadEdge[rNodeMap.get(regionGroupId)]; - currentEdge >= 0; - currentEdge = minCostFlowEdges.get(currentEdge).nextEdge) { - MinCostFlowEdge edge = minCostFlowEdges.get(currentEdge); - if (edge.destNode != S_NODE && edge.capacity == 0) { - matchLeader = true; - result.put(regionGroupId, sDNodeReflect.get(database).get(edge.destNode)); - } - } - if (!matchLeader) { - result.put(regionGroupId, regionLeaderMap.getOrDefault(regionGroupId, -1)); - } - })); + rNodeMap.forEach( + (regionGroupId, rNode) -> { + boolean matchLeader = false; + for (int currentEdge = nodeHeadEdge[rNode]; + currentEdge >= 0; + currentEdge = minCostFlowEdges.get(currentEdge).nextEdge) { + MinCostFlowEdge edge = minCostFlowEdges.get(currentEdge); + if (edge.destNode != S_NODE && edge.capacity == 0) { + matchLeader = true; + result.put(regionGroupId, dNodeReflect.get(edge.destNode)); + } + } + if (!matchLeader) { + result.put(regionGroupId, regionLeaderMap.getOrDefault(regionGroupId, -1)); + } + }); return result; }