[Test] enhance RackAwareTest by adding test for EnforceMinNumRacksPerWriteQuorum (#13703)
diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
index 50f0797..b76f104 100644
--- a/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
+++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/bookie/rackawareness/BookieRackAffinityMappingTest.java
@@ -68,17 +68,18 @@
store.put(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, data.getBytes(), Optional.empty()).join();
// Case1: ZKCache is given
- BookieRackAffinityMapping mapping1 = new BookieRackAffinityMapping();
- ClientConfiguration bkClientConf1 = new ClientConfiguration();
- bkClientConf1.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE, store);
+ BookieRackAffinityMapping mapping = new BookieRackAffinityMapping();
+ ClientConfiguration bkClientConf = new ClientConfiguration();
+ bkClientConf.setProperty(BookieRackAffinityMapping.METADATA_STORE_INSTANCE, store);
- mapping1.setBookieAddressResolver(BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
- mapping1.setConf(bkClientConf1);
- List<String> racks1 = mapping1
+ mapping.setBookieAddressResolver(BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
+ mapping.setConf(bkClientConf);
+ List<String> racks = mapping
.resolve(Lists.newArrayList(BOOKIE1.getHostName(), BOOKIE2.getHostName(), BOOKIE3.getHostName()));
- assertEquals(racks1.get(0), "/rack0");
- assertEquals(racks1.get(1), "/rack1");
- assertNull(racks1.get(2));
+
+ assertEquals(racks.get(0), "/rack0");
+ assertEquals(racks.get(1), "/rack1");
+ assertNull(racks.get(2));
}
@Test
@@ -96,12 +97,12 @@
mapping1.setBookieAddressResolver(BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
mapping1.setConf(bkClientConf1);
- List<String> racks1 = mapping1
+ List<String> racks = mapping1
.resolve(Lists.newArrayList(BOOKIE1.getHostName(), BOOKIE2.getHostName(), BOOKIE3.getHostName()));
- assertNull(racks1.get(0));
- assertNull(racks1.get(1));
- assertNull(racks1.get(2));
+ assertNull(racks.get(0));
+ assertNull(racks.get(1));
+ assertNull(racks.get(2));
}
@Test
@@ -113,9 +114,9 @@
mapping.setBookieAddressResolver(BookieSocketAddress.LEGACY_BOOKIEID_RESOLVER);
mapping.setConf(bkClientConf);
List<String> racks = mapping.resolve(Lists.newArrayList("127.0.0.1", "127.0.0.2", "127.0.0.3"));
- assertEquals(racks.get(0), null);
- assertEquals(racks.get(1), null);
- assertEquals(racks.get(2), null);
+ assertNull(racks.get(0));
+ assertNull(racks.get(1));
+ assertNull(racks.get(2));
Map<String, Map<BookieSocketAddress, BookieInfo>> bookieMapping = new HashMap<>();
Map<BookieSocketAddress, BookieInfo> mainBookieGroup = new HashMap<>();
@@ -132,7 +133,7 @@
List<String> r = mapping.resolve(Lists.newArrayList("127.0.0.1", "127.0.0.2", "127.0.0.3"));
assertEquals(r.get(0), "/rack0");
assertEquals(r.get(1), "/rack1");
- assertEquals(r.get(2), null);
+ assertNull(r.get(2));
});
}
@@ -160,7 +161,7 @@
.resolve(Lists.newArrayList(BOOKIE1.getHostName(), BOOKIE2.getHostName(), BOOKIE3.getHostName()));
assertEquals(racks.get(0), "/rack0");
assertEquals(racks.get(1), "/rack1");
- assertEquals(racks.get(2), null);
+ assertNull(racks.get(2));
// add info for BOOKIE3 and check if the mapping picks up the change
Map<BookieSocketAddress, BookieInfo> secondaryBookieGroup = new HashMap<>();
@@ -180,9 +181,9 @@
Awaitility.await().untilAsserted(() -> {
List<String> r = mapping.resolve(Lists.newArrayList("127.0.0.1", "127.0.0.2", "127.0.0.3"));
- assertEquals(r.get(0), null);
- assertEquals(r.get(1), null);
- assertEquals(r.get(2), null);
+ assertNull(r.get(0));
+ assertNull(r.get(1));
+ assertNull(r.get(2));
});
}
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
index 7f0dace..f5c09f0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BkEnsemblesTestBase.java
@@ -73,7 +73,9 @@
bkEnsemble.start();
// start pulsar service
- config = new ServiceConfiguration();
+ if (config == null) {
+ config = new ServiceConfiguration();
+ }
config.setZookeeperServers("127.0.0.1" + ":" + bkEnsemble.getZookeeperPort());
config.setAdvertisedAddress("localhost");
config.setWebServicePort(Optional.of(0));
@@ -110,6 +112,7 @@
@Override
@AfterMethod(alwaysRun = true)
protected void cleanup() throws Exception {
+ config = null;
markCurrentSetupNumberCleaned();
admin.close();
pulsar.close();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java
index c8a9b44..7628600 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/RackAwareTest.java
@@ -18,7 +18,9 @@
*/
package org.apache.pulsar.broker.service;
+import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
import com.google.gson.Gson;
import java.io.File;
import java.nio.file.Files;
@@ -28,6 +30,7 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.stream.Collectors;
+import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.LedgerHandle;
@@ -43,6 +46,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
+import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@Test(groups = "quarantine")
@@ -56,6 +60,11 @@
super(0);
}
+ @DataProvider(name = "forceMinRackNumProvider")
+ public Object[][] forceMinRackNumProvider() {
+ return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
+ }
+
@Override
protected void configurePulsar(ServiceConfiguration config) throws Exception {
// Start bookies with specific racks
@@ -117,7 +126,7 @@
.getData(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, false, null);
TreeMap<String, Map<String, Map<String, String>>> rackInfoMap =
new Gson().fromJson(new String(data), TreeMap.class);
- assertTrue(rackInfoMap.get(group).size() == NUM_BOOKIES);
+ assertEquals(rackInfoMap.get(group).size(), NUM_BOOKIES);
Set<String> racks = rackInfoMap.values().stream()
.map(Map::values)
.flatMap(bookieId -> bookieId.stream().map(rackInfo -> rackInfo.get("rack")))
@@ -138,5 +147,59 @@
}
}
+ @Test(dataProvider="forceMinRackNumProvider")
+ public void testPlacementMinRackNumsPerWriteQuorum(boolean forceMinRackNums) throws Exception {
+ cleanup();
+ config = new ServiceConfiguration();
+ config.setBookkeeperClientMinNumRacksPerWriteQuorum(2);
+ config.setBookkeeperClientEnforceMinNumRacksPerWriteQuorum(forceMinRackNums);
+ setup();
+ final String group = "default";
+ for (int i = 0; i < NUM_BOOKIES; i++) {
+ String bookie = bookies.get(i).getLocalAddress().toString();
+ // All bookie in one same rack "rack-1"
+ int rackId = i == 0 ? 1 : 2;
+ BookieInfo bi = BookieInfo.builder()
+ .rack("rack-" + 1)
+ .hostname("bookie-" + (i + 1))
+ .build();
+ log.info("setting rack for bookie at {} -- {}", bookie, bi);
+ admin.bookies().updateBookieRackInfo(bookie, group, bi);
+ }
+
+ // Make sure the racks cache gets updated through the ZK watch
+ Awaitility.await().untilAsserted(() -> {
+ byte[] data = bkEnsemble.getZkClient()
+ .getData(BookieRackAffinityMapping.BOOKIE_INFO_ROOT_PATH, false, null);
+ TreeMap<String, Map<String, Map<String, String>>> rackInfoMap =
+ new Gson().fromJson(new String(data), TreeMap.class);
+ assertEquals(rackInfoMap.get(group).size(), NUM_BOOKIES);
+
+ Set<String> racks = rackInfoMap.values().stream()
+ .map(Map::values)
+ .flatMap(bookieId -> bookieId.stream().map(rackInfo -> rackInfo.get("rack")))
+ .collect(Collectors.toSet());
+ assertEquals(racks.size(), 1);
+ assertTrue(racks.contains("rack-1"));
+ });
+
+ BookKeeper bkc = this.pulsar.getBookKeeperClient();
+
+ if (forceMinRackNums) {
+ try {
+ bkc.createLedger(2, 2, DigestType.DUMMY, new byte[0]);
+ fail("Should be failed due to no enough rack can be found");
+ } catch (BKException.BKNotEnoughBookiesException e) {
+ // ignore
+ }
+ } else {
+ for (int i = 0; i < 10; i++) {
+ LedgerHandle lh = bkc.createLedger(2, 2, DigestType.DUMMY, new byte[0]);
+ log.info("Ledger: {} -- Ensemble: {}", i, lh.getLedgerMetadata().getEnsembleAt(0));
+ lh.close();
+ }
+ }
+ }
+
private static final Logger log = LoggerFactory.getLogger(RackAwareTest.class);
}