fix always select the same region set bug for RegionAwareEnsemblePlacementPolicy (#2658)

* fix always select the same region bug for RegionAwareEnsemblePlacementPolicy

* format code

* add test for regionAwareEnsemblePlacementPolicy

* update code

* format code
diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
index dee4187..5c1b0a1 100644
--- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/RegionAwareEnsemblePlacementPolicy.java
@@ -77,6 +77,7 @@
     protected boolean enableValidation = true;
     protected boolean enforceDurabilityInReplace = false;
     protected Feature disableDurabilityFeature;
+    private int lastRegionIndex = 0;
 
     RegionAwareEnsemblePlacementPolicy() {
         super();
@@ -263,7 +264,7 @@
             Set<BookieId> comprehensiveExclusionBookiesSet = addDefaultRackBookiesIfMinNumRacksIsEnforced(
                     excludedBookies);
             Set<Node> excludeNodes = convertBookiesToNodes(comprehensiveExclusionBookiesSet);
-            Set<String> availableRegions = new HashSet<String>();
+            List<String> availableRegions = new ArrayList<>();
             for (String region: perRegionPlacement.keySet()) {
                 if ((null == disallowBookiePlacementInRegionFeatureName)
                         || !featureProvider.scope(region).getFeature(disallowBookiePlacementInRegionFeatureName)
@@ -330,9 +331,11 @@
                         effectiveMinRegionsForDurability, minNumRacksPerWriteQuorum);
                 remainingEnsembleBeforeIteration = remainingEnsemble;
                 int regionsToAllocate = numRemainingRegions;
-                for (Map.Entry<String, Pair<Integer, Integer>> regionEntry: regionsWiseAllocation.entrySet()) {
-                    String region = regionEntry.getKey();
-                    final Pair<Integer, Integer> currentAllocation = regionEntry.getValue();
+                int startRegionIndex = lastRegionIndex % numRegionsAvailable;
+                for (int i = 0; i < numRegionsAvailable; ++i) {
+                    String region = availableRegions.get(startRegionIndex % numRegionsAvailable);
+                    startRegionIndex++;
+                    final Pair<Integer, Integer> currentAllocation = regionsWiseAllocation.get(region);
                     TopologyAwareEnsemblePlacementPolicy policyWithinRegion = perRegionPlacement.get(region);
                     if (!regionsReachedMaxAllocation.contains(region)) {
                         if (numRemainingRegions <= 0) {
@@ -364,6 +367,7 @@
                                 regionsWiseAllocation.put(region, Pair.of(newEnsembleSize, newWriteQuorumSize));
                                 success = true;
                                 regionsToAllocate--;
+                                lastRegionIndex = startRegionIndex;
                                 LOG.info("Region {} allocating bookies with ensemble size {} "
                                         + "and write quorum size {} : {}",
                                         region, newEnsembleSize, newWriteQuorumSize, allocated);
diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
index bc0805a..b96d3a4 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/TestRegionAwareEnsemblePlacementPolicy.java
@@ -1480,4 +1480,48 @@
         assertEquals(ensemble.get(reoderSet.get(7)), addr4.toBookieId());
     }
 
+    public void testNewEnsembleSetWithFiveRegions() throws Exception {
+        repp.uninitalize();
+        repp = new RegionAwareEnsemblePlacementPolicy();
+        repp.initialize(conf, Optional.empty(), timer, DISABLE_ALL,
+            NullStatsLogger.INSTANCE, BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+        BookieSocketAddress addr1 = new BookieSocketAddress("127.0.0.2", 3181);
+        BookieSocketAddress addr2 = new BookieSocketAddress("127.0.0.3", 3181);
+        BookieSocketAddress addr3 = new BookieSocketAddress("127.0.0.4", 3181);
+        BookieSocketAddress addr4 = new BookieSocketAddress("127.0.0.5", 3181);
+        BookieSocketAddress addr5 = new BookieSocketAddress("127.0.0.6", 3181);
+
+        // Update dns mapping
+        StaticDNSResolver.addNodeToRack(addr1.getHostName(), "/region1/r1");
+        StaticDNSResolver.addNodeToRack(addr2.getHostName(), "/region2/r2");
+        StaticDNSResolver.addNodeToRack(addr3.getHostName(), "/region3/r3");
+        StaticDNSResolver.addNodeToRack(addr4.getHostName(), "/region4/r4");
+        StaticDNSResolver.addNodeToRack(addr5.getHostName(), "/region5/r5");
+
+        // Update cluster
+        Set<BookieId> addrs = new HashSet<>();
+        addrs.add(addr1.toBookieId());
+        addrs.add(addr2.toBookieId());
+        addrs.add(addr3.toBookieId());
+        addrs.add(addr4.toBookieId());
+        addrs.add(addr5.toBookieId());
+
+        repp.onClusterChanged(addrs, new HashSet<>());
+        try {
+            List<BookieId> ensemble1 = repp.newEnsemble(3, 3, 2,
+                null, new HashSet<>()).getResult();
+            assertEquals(ensemble1.size(), 3);
+            List<BookieId> ensemble2 = repp.newEnsemble(3, 3, 2,
+                null, new HashSet<>()).getResult();
+            ensemble1.retainAll(ensemble2);
+            assert(ensemble1.size() >= 1);
+
+            List<BookieId> ensemble3 = repp.newEnsemble(3, 3, 2,
+                null, new HashSet<>()).getResult();
+            ensemble2.removeAll(ensemble3);
+            assert(ensemble2.size() >= 1);
+        } catch (BKNotEnoughBookiesException bnebe) {
+            fail("Should not get not enough bookies exception even there is only one rack.");
+        }
+    }
 }