[Test] enhance RackAwareTest by adding test for EnforceMinNumRacksPerWriteQuorum (#13703)

diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
index 50f0797..b76f104 100644
--- a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
+++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
@@ -68,17 +68,18 @@
         store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, data.getBytes(), Optional.empty()).join();
 
         // Case1: ZKCache is given
-        BookieRackAffinityMapping mapping1 = new BookieRackAffinityMapping();
-        ClientConfiguration bkClientConf1 = new ClientConfiguration();
-        bkClientConf1.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE, store);
+        BookieRackAffinityMapping mapping = new BookieRackAffinityMapping();
+        ClientConfiguration bkClientConf = new ClientConfiguration();
+        bkClientConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE, store);
 
-        mapping1.setBookieAddressResolver(BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
-        mapping1.setConf(bkClientConf1);
-        List<String> racks1 = mapping1
+        mapping.setBookieAddressResolver(BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+        mapping.setConf(bkClientConf);
+        List<String> racks = mapping
                 .resolve(Lists.newArrayList(BOOKIE1.getHostName(), BOOKIE2.getHostName(), BOOKIE3.getHostName()));
-        assertEquals(racks1.get(0), "/rack0");
-        assertEquals(racks1.get(1), "/rack1");
-        assertNull(racks1.get(2));
+
+        assertEquals(racks.get(0), "/rack0");
+        assertEquals(racks.get(1), "/rack1");
+        assertNull(racks.get(2));
     }
 
     @Test
@@ -96,12 +97,12 @@
 
         mapping1.setBookieAddressResolver(BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
         mapping1.setConf(bkClientConf1);
-        List<String> racks1 = mapping1
+        List<String> racks = mapping1
                 .resolve(Lists.newArrayList(BOOKIE1.getHostName(), BOOKIE2.getHostName(), BOOKIE3.getHostName()));
 
-        assertNull(racks1.get(0));
-        assertNull(racks1.get(1));
-        assertNull(racks1.get(2));
+        assertNull(racks.get(0));
+        assertNull(racks.get(1));
+        assertNull(racks.get(2));
     }
 
     @Test
@@ -113,9 +114,9 @@
         mapping.setBookieAddressResolver(BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
         mapping.setConf(bkClientConf);
         List<String> racks = mapping.resolve(Lists.newArrayList("127.0.0.1", "127.0.0.2", "127.0.0.3"));
-        assertEquals(racks.get(0), null);
-        assertEquals(racks.get(1), null);
-        assertEquals(racks.get(2), null);
+        assertNull(racks.get(0));
+        assertNull(racks.get(1));
+        assertNull(racks.get(2));
 
         Map<String, Map<BookieSocketAddress, BookieInfo>> bookieMapping = new HashMap<>();
         Map<BookieSocketAddress, BookieInfo> mainBookieGroup = new HashMap<>();
@@ -132,7 +133,7 @@
             List<String> r = mapping.resolve(Lists.newArrayList("127.0.0.1", "127.0.0.2", "127.0.0.3"));
             assertEquals(r.get(0), "/rack0");
             assertEquals(r.get(1), "/rack1");
-            assertEquals(r.get(2), null);
+            assertNull(r.get(2));
         });
 
     }
@@ -160,7 +161,7 @@
                 .resolve(Lists.newArrayList(BOOKIE1.getHostName(), BOOKIE2.getHostName(), BOOKIE3.getHostName()));
         assertEquals(racks.get(0), "/rack0");
         assertEquals(racks.get(1), "/rack1");
-        assertEquals(racks.get(2), null);
+        assertNull(racks.get(2));
 
         // add info for BOOKIE3 and check if the mapping picks up the change
         Map<BookieSocketAddress, BookieInfo> secondaryBookieGroup = new HashMap<>();
@@ -180,9 +181,9 @@
 
         Awaitility.await().untilAsserted(() -> {
             List<String> r = mapping.resolve(Lists.newArrayList("127.0.0.1", "127.0.0.2", "127.0.0.3"));
-            assertEquals(r.get(0), null);
-            assertEquals(r.get(1), null);
-            assertEquals(r.get(2), null);
+            assertNull(r.get(0));
+            assertNull(r.get(1));
+            assertNull(r.get(2));
         });
     }
 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
index 7f0dace..f5c09f0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
@@ -73,7 +73,9 @@
             bkEnsemble.start();
 
             // start pulsar service
-            config = new ServiceConfiguration();
+            if (config == null) {
+                config = new ServiceConfiguration();
+            }
             config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
             config.setAdvertisedAddress("localhost");
             config.setWebServicePort(Optional.of(0));
@@ -110,6 +112,7 @@
     @Override
     @AfterMethod(alwaysRun = true)
     protected void cleanup() throws Exception {
+        config = null;
         markCurrentSetupNumberCleaned();
         admin.close();
         pulsar.close();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java
index c8a9b44..7628600 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pulsar.broker.service;
 
+import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 import com.google.gson.Gson;
 import java.io.File;
 import java.nio.file.Files;
@@ -28,6 +30,7 @@
 import java.util.Set;
 import java.util.TreeMap;
 import java.util.stream.Collectors;
+import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
 import org.apache.bookkeeper.client.BookKeeper.DigestType;
 import org.apache.bookkeeper.client.LedgerHandle;
@@ -43,6 +46,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
+import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
 @Test(groups = "quarantine")
@@ -56,6 +60,11 @@
         super(0);
     }
 
+    @DataProvider(name = "forceMinRackNumProvider")
+    public Object[][] forceMinRackNumProvider() {
+        return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
+    }
+
     @Override
     protected void configurePulsar(ServiceConfiguration config) throws Exception {
         // Start bookies with specific racks
@@ -117,7 +126,7 @@
                     .getData(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, false, null);
             TreeMap<String, Map<String, Map<String, String>>> rackInfoMap =
                     new Gson().fromJson(new String(data), TreeMap.class);
-            assertTrue(rackInfoMap.get(group).size() == NUM_BOOKIES);
+            assertEquals(rackInfoMap.get(group).size(), NUM_BOOKIES);
             Set<String> racks = rackInfoMap.values().stream()
                     .map(Map::values)
                     .flatMap(bookieId -> bookieId.stream().map(rackInfo -> rackInfo.get("rack")))
@@ -138,5 +147,59 @@
         }
     }
 
+    @Test(dataProvider="forceMinRackNumProvider")
+    public void testPlacementMinRackNumsPerWriteQuorum(boolean forceMinRackNums) throws Exception {
+        cleanup();
+        config = new ServiceConfiguration();
+        config.setBookkeeperClientMinNumRacksPerWriteQuorum(2);
+        config.setBookkeeperClientEnforceMinNumRacksPerWriteQuorum(forceMinRackNums);
+        setup();
+        final String group = "default";
+        for (int i = 0; i < NUM_BOOKIES; i++) {
+            String bookie = bookies.get(i).getLocalAddress().toString();
+            // All bookie in one same rack "rack-1"
+            int rackId = i == 0 ? 1 : 2;
+            BookieInfo bi = BookieInfo.builder()
+                    .rack("rack-" + 1)
+                    .hostname("bookie-" + (i + 1))
+                    .build();
+            log.info("setting rack for bookie at {} -- {}", bookie, bi);
+            admin.bookies().updateBookieRackInfo(bookie, group, bi);
+        }
+
+        // Make sure the racks cache gets updated through the ZK watch
+        Awaitility.await().untilAsserted(() -> {
+            byte[] data = bkEnsemble.getZkClient()
+                    .getData(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, false, null);
+            TreeMap<String, Map<String, Map<String, String>>> rackInfoMap =
+                    new Gson().fromJson(new String(data), TreeMap.class);
+            assertEquals(rackInfoMap.get(group).size(), NUM_BOOKIES);
+
+            Set<String> racks = rackInfoMap.values().stream()
+                    .map(Map::values)
+                    .flatMap(bookieId -> bookieId.stream().map(rackInfo -> rackInfo.get("rack")))
+                    .collect(Collectors.toSet());
+            assertEquals(racks.size(), 1);
+            assertTrue(racks.contains("rack-1"));
+        });
+
+        BookKeeper bkc = this.pulsar.getBookKeeperClient();
+
+        if (forceMinRackNums) {
+            try {
+                bkc.createLedger(2, 2, DigestType.DUMMY, new byte[0]);
+                fail("Should be failed due to no enough rack can be found");
+            } catch (BKException.BKNotEnoughBookiesException e) {
+                // ignore
+            }
+        } else {
+            for (int i = 0; i < 10; i++) {
+                LedgerHandle lh = bkc.createLedger(2, 2, DigestType.DUMMY, new byte[0]);
+                log.info("Ledger: {} -- Ensemble: {}", i, lh.getLedgerMetadata().getEnsembleAt(0));
+                lh.close();
+            }
+        }
+    }
+
     private static final Logger log = LoggerFactory.getLogger(RackAwareTest.class);
 }