Unload bundle: close topic forcefully and enable bundle on ownership removal failure (#174)
diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/OwnedServiceUnit.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/OwnedServiceUnit.java
index d6dc229..23100ed 100644
--- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/OwnedServiceUnit.java
+++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/OwnedServiceUnit.java
@@ -104,11 +104,25 @@
int unloadedTopics = 0;
try {
LOG.info("Disabling ownership: {}", this.suName);
- pulsar.getNamespaceService().getOwnershipCache().disableOwnership(this.suName);
+ pulsar.getNamespaceService().getOwnershipCache().updateBundleState(this.suName, false);
- // Handle unload of persistent topics
- unloadedTopics = pulsar.getBrokerService().unloadServiceUnit(suName).get();
- pulsar.getNamespaceService().getOwnershipCache().removeOwnership(suName);
+ // close topics forcefully
+ try {
+ unloadedTopics = pulsar.getBrokerService().unloadServiceUnit(suName).get();
+ } catch (Exception e) {
+ // ignore topic-close failure to unload bundle
+ LOG.error("Failed to close topics under namespace {}", suName.toString(), e);
+ }
+
+ // delete ownership node on zk
+ try {
+ pulsar.getNamespaceService().getOwnershipCache().removeOwnership(suName);
+ } catch (Exception e) {
+ // Failed to remove ownership node: enable namespace-bundle again so, it can serve new topics
+ pulsar.getNamespaceService().getOwnershipCache().updateBundleState(this.suName, true);
+ throw new RuntimeException(String.format("Failed to delete ownership node %s", suName.toString()),
+ e.getCause());
+ }
} catch (Exception e) {
LOG.error(String.format("failed to unload a namespace. ns=%s", suName.toString()), e);
throw new RuntimeException(e);
@@ -126,4 +140,8 @@
public boolean isActive() {
return this.isActive.get();
}
+
+ public void setActive(boolean active) {
+ isActive.set(active);
+ }
}
diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/OwnershipCache.java
index 18dee92..c47e990 100644
--- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/OwnershipCache.java
+++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/OwnershipCache.java
@@ -151,23 +151,6 @@
}
}
- private class OwnedServiceUnitCacheRemovalListener implements RemovalListener<String, OwnedServiceUnit> {
-
- @Override
- public void onRemoval(RemovalNotification<String, OwnedServiceUnit> notification) {
- // Under the cache sync lock, removing the ZNode
- // If succeeded, we guaranteed that the cache entry is removed together w/ ZNode
- try {
- localZkCache.getZooKeeper().delete(notification.getKey(), -1);
- ownershipReadOnlyCache.invalidate(notification.getKey());
-
- LOG.info("Removed zk lock for service unit: {}", notification.getKey());
- } catch (Exception e) {
- LOG.error("Failed to delete the namespace ephemeral node. key={}", notification.getKey(), e);
- }
- }
- }
-
/**
* Constructor of <code>OwnershipCache</code>
*
@@ -187,8 +170,7 @@
this.localZkCache = pulsar.getLocalZkCache();
this.ownershipReadOnlyCache = pulsar.getLocalZkCacheService().ownerInfoCache();
// ownedServiceUnitsCache contains all namespaces that are owned by the local broker
- this.ownedServiceUnitsCache = CacheBuilder.newBuilder()
- .removalListener(new OwnedServiceUnitCacheRemovalListener()).build(new OwnedServiceUnitCacheLoader());
+ this.ownedServiceUnitsCache = CacheBuilder.newBuilder().build(new OwnedServiceUnitCacheLoader());
}
/**
@@ -237,9 +219,21 @@
*
* @param suId
* identifier of the <code>ServiceUnit</code>
+ * @throws Exception
*/
- public void removeOwnership(ServiceUnitId suname) {
- this.ownedServiceUnitsCache.invalidate(ServiceUnitZkUtils.path(suname));
+ public void removeOwnership(ServiceUnitId suname) throws Exception {
+ // Under the cache sync lock, removing the ZNode
+ // If succeeded, we guaranteed that the cache entry is removed together w/ ZNode
+ String key = ServiceUnitZkUtils.path(suname);
+ try {
+ localZkCache.getZooKeeper().delete(key, -1);
+ this.ownedServiceUnitsCache.invalidate(key);
+ this.ownershipReadOnlyCache.invalidate(key);
+ LOG.info("Removed zk lock for service unit: {}", key);
+ } catch (KeeperException.NoNodeException e) {
+ LOG.warn("[{}] Failed to delete the namespace ephemeral node. key={}", key, e.getMessage());
+ }
+
}
/**
@@ -283,4 +277,18 @@
localZkCache.getZooKeeper().setData(path, jsonMapper.writeValueAsBytes(selfOwnerInfos[1]), -1);
this.ownershipReadOnlyCache.invalidate(path);
}
+
+ /**
+ * Update bundle state in a local cache
+ *
+ * @param bundle
+ * @throws Exception
+ */
+ public void updateBundleState(ServiceUnitId bundle, boolean isActive) throws Exception {
+ // Disable owned instance in local cache
+ OwnedServiceUnit ownedServiceUnit = getOwnedServiceUnit(bundle);
+ if (ownedServiceUnit != null) {
+ ownedServiceUnit.setActive(isActive);
+ }
+ }
}
diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/namespace/NamespaceServiceTest.java
index 0718d14..38dc8d2 100644
--- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/namespace/NamespaceServiceTest.java
+++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -57,6 +57,13 @@
import com.yahoo.pulsar.common.naming.ServiceUnitId;
import com.yahoo.pulsar.common.policies.data.Policies;
import com.yahoo.pulsar.common.util.ObjectMapperFactory;
+import static org.mockito.Mockito.doAnswer;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import com.yahoo.pulsar.broker.service.Topic;
+import com.yahoo.pulsar.client.api.Consumer;
+import com.yahoo.pulsar.client.api.ConsumerConfiguration;
+import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashMap;
public class NamespaceServiceTest extends BrokerTestBase {
@@ -245,4 +252,39 @@
((LoadingCache<NamespaceName, NamespaceBundles>) bCacheField.get(utilityFactory)).put(nsname, bundles);
return utilityFactory.splitBundles(targetBundle, 2);
}
+
+ @Test
+ public void testUnloadNamespaceBundleFailure() throws Exception {
+
+ final String topicName = "persistent://my-property/use/my-ns/my-topic1";
+ ConsumerConfiguration conf = new ConsumerConfiguration();
+ Consumer consumer = pulsarClient.subscribe(topicName, "my-subscriber-name", conf);
+ ConcurrentOpenHashMap<String, CompletableFuture<Topic>> topics = pulsar.getBrokerService().getTopics();
+ Topic spyTopic = spy(topics.get(topicName).get());
+ topics.clear();
+ CompletableFuture<Topic> topicFuture = CompletableFuture.completedFuture(spyTopic);
+ // add mock topic
+ topics.put(topicName, topicFuture);
+ doAnswer(new Answer<CompletableFuture<Void>>() {
+ @Override
+ public CompletableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
+ CompletableFuture<Void> result = new CompletableFuture<>();
+ result.completeExceptionally(new RuntimeException("first time failed"));
+ return result;
+ }
+ }).when(spyTopic).close();
+ ServiceUnitId bundle = pulsar.getNamespaceService().getBundle(DestinationName.get(topicName));
+ try {
+ pulsar.getNamespaceService().unloadNamespaceBundle((NamespaceBundle) bundle);
+ } catch (Exception e) {
+ // fail
+ fail(e.getMessage());
+ }
+ try {
+ pulsar.getLocalZkCache().getZooKeeper().getData(ServiceUnitZkUtils.path(bundle), null, null);
+ fail("it should fail as node is not present");
+ } catch (org.apache.zookeeper.KeeperException.NoNodeException e) {
+ // ok
+ }
+ }
}