fix always select the same region set bug for RegionAwareEnsemblePlacementPolicy (#2658)
* fix always select the same region bug for RegionAwareEnsemblePlacementPolicy
* format code
* add test for regionAwareEnsemblePlacementPolicy
* update code
* format code
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
index dee4187..5c1b0a1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
@@ -77,6 +77,7 @@
protected boolean enableValidation = true;
protected boolean enforceDurabilityInReplace = false;
protected Feature disableDurabilityFeature;
+ private int lastRegionIndex = 0;
RegionAwareEnsemblePlacementPolicy() {
super();
@@ -263,7 +264,7 @@
Set<BookieId> comprehensiveExclusionBookiesSet = addDefaultRackBookiesIfMinNumRacksIsEnforced(
excludedBookies);
Set<Node> excludeNodes = convertBookiesToNodes(comprehensiveExclusionBookiesSet);
- Set<String> availableRegions = new HashSet<String>();
+ List<String> availableRegions = new ArrayList<>();
for (String region: perRegionPlacement.keySet()) {
if ((null == disallowBookiePlacementInRegionFeatureName)
|| !featureProvider.scope(region).getFeature(disallowBookiePlacementInRegionFeatureName)
@@ -330,9 +331,11 @@
effectiveMinRegionsForDurability, minNumRacksPerWriteQuorum);
remainingEnsembleBeforeIteration = remainingEnsemble;
int regionsToAllocate = numRemainingRegions;
- for (Map.Entry<String, Pair<Integer, Integer>> regionEntry: regionsWiseAllocation.entrySet()) {
- String region = regionEntry.getKey();
- final Pair<Integer, Integer> currentAllocation = regionEntry.getValue();
+ int startRegionIndex = lastRegionIndex % numRegionsAvailable;
+ for (int i = 0; i < numRegionsAvailable; ++i) {
+ String region = availableRegions.get(startRegionIndex % numRegionsAvailable);
+ startRegionIndex++;
+ final Pair<Integer, Integer> currentAllocation = regionsWiseAllocation.get(region);
TopologyAwareEnsemblePlacementPolicy policyWithinRegion = perRegionPlacement.get(region);
if (!regionsReachedMaxAllocation.contains(region)) {
if (numRemainingRegions <= 0) {
@@ -364,6 +367,7 @@
regionsWiseAllocation.put(region, Pair.of(newEnsembleSize, newWriteQuorumSize));
success = true;
regionsToAllocate--;
+ lastRegionIndex = startRegionIndex;
LOG.info("Region {} allocating bookies with ensemble size {} "
+ "and write quorum size {} : {}",
region, newEnsembleSize, newWriteQuorumSize, allocated);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
index bc0805a..b96d3a4 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
@@ -1480,4 +1480,48 @@
assertEquals(ensemble.get(reoderSet.get(7)), addr4.toBookieId());
}
+ public void testNewEnsembleSetWithFiveRegions() throws Exception {
+ repp.uninitalize();
+ repp = new RegionAwareEnsemblePlacementPolicy();
+ repp.initialize(conf, Optional.empty(), timer, DISABLE_ALL,
+ NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+ BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181);
+ BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.3", 3181);
+ BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.4", 3181);
+ BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.5", 3181);
+ BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.6", 3181);
+
+ // Update dns mapping
+ StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/region1/r1");
+ StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/region2/r2");
+ StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/region3/r3");
+ StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/region4/r4");
+ StaticDNSResolver.addNodeToRack(addr5.getHostName(), "/region5/r5");
+
+ // Update cluster
+ Set<BookieId> addrs = new HashSet<>();
+ addrs.add(addr1.toBookieId());
+ addrs.add(addr2.toBookieId());
+ addrs.add(addr3.toBookieId());
+ addrs.add(addr4.toBookieId());
+ addrs.add(addr5.toBookieId());
+
+ repp.onClusterChanged(addrs, new HashSet<>());
+ try {
+ List<BookieId> ensemble1 = repp.newEnsemble(3, 3, 2,
+ null, new HashSet<>()).getResult();
+ assertEquals(ensemble1.size(), 3);
+ List<BookieId> ensemble2 = repp.newEnsemble(3, 3, 2,
+ null, new HashSet<>()).getResult();
+ ensemble1.retainAll(ensemble2);
+ assert(ensemble1.size() >= 1);
+
+ List<BookieId> ensemble3 = repp.newEnsemble(3, 3, 2,
+ null, new HashSet<>()).getResult();
+ ensemble2.removeAll(ensemble3);
+ assert(ensemble2.size() >= 1);
+ } catch (BKNotEnoughBookiesException bnebe) {
+ fail("Should not get not enough bookies exception even there is only one rack.");
+ }
+ }
}