Release old bundle from ownership cache when operator split bundle (#13678)
Fixes #13677
Modifications
release the old bundle from ownership and temporary znode cache when we split the old bundle;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
index 5e2ee96..a8a808c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java
@@ -894,6 +894,8 @@
// update bundled_topic cache for load-report-generation
pulsar.getBrokerService().refreshTopicToStatsMaps(bundle);
loadManager.get().setLoadReportForceUpdateFlag();
+ // release old bundle from ownership cache
+ pulsar.getNamespaceService().getOwnershipCache().removeOwnership(bundle);
completionFuture.complete(null);
if (unload) {
// Unload new split bundles, in background. This will not
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
index 2d75e21..0b66653 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -73,6 +73,7 @@
import org.apache.pulsar.policies.data.loadbalancer.LoadReport;
import org.apache.pulsar.policies.data.loadbalancer.LocalBrokerData;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
+import org.awaitility.Awaitility;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -496,6 +497,48 @@
}
}
+
+ @Test
+ public void testSplitBundleAndRemoveOldBundleFromOwnerShipCache() throws Exception {
+ OwnershipCache ownershipCache = spy(pulsar.getNamespaceService().getOwnershipCache());
+ doReturn(CompletableFuture.completedFuture(null)).when(ownershipCache).disableOwnership(any(NamespaceBundle.class));
+
+ Field ownership = NamespaceService.class.getDeclaredField("ownershipCache");
+ ownership.setAccessible(true);
+ ownership.set(pulsar.getNamespaceService(), ownershipCache);
+
+ NamespaceService namespaceService = pulsar.getNamespaceService();
+ NamespaceName nsname = NamespaceName.get("pulsar/global/ns1");
+ TopicName topicName = TopicName.get("persistent://pulsar/global/ns1/topic-1");
+ NamespaceBundles bundles = namespaceService.getNamespaceBundleFactory().getBundles(nsname);
+
+ NamespaceBundle splitBundle1 = bundles.findBundle(topicName);
+ ownershipCache.tryAcquiringOwnership(splitBundle1);
+ CompletableFuture<Void> result1 = namespaceService.splitAndOwnBundle(splitBundle1, false, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO);
+ try {
+ result1.get();
+ } catch (Exception e) {
+ fail("split bundle failed", e);
+ }
+ Awaitility.await().untilAsserted(()
+ -> assertNull(namespaceService.getOwnershipCache().getOwnedBundles().get(splitBundle1)));
+
+ //unload split
+ bundles = namespaceService.getNamespaceBundleFactory().getBundles(nsname);
+ assertNotNull(bundles);
+ NamespaceBundle splitBundle2 = bundles.findBundle(topicName);
+ CompletableFuture<Void> result2 = namespaceService.splitAndOwnBundle(splitBundle2, true, NamespaceBundleSplitAlgorithm.RANGE_EQUALLY_DIVIDE_ALGO);
+ try {
+ result2.get();
+ } catch (Exception e) {
+ // make sure: NPE does not occur
+ fail("split bundle failed", e);
+ }
+ Awaitility.await().untilAsserted(()
+ -> assertNull(namespaceService.getOwnershipCache().getOwnedBundles().get(splitBundle2)));
+ }
+
+
@Test
public void testSplitLargestBundle() throws Exception {
String namespace = "prop/test/ns-abc2";