Unload bundle: close topic forcefully and enable bundle on ownership removal failure (#174)

diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/OwnedServiceUnit.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/OwnedServiceUnit.java
index d6dc229..23100ed 100644
--- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/OwnedServiceUnit.java
+++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/OwnedServiceUnit.java
@@ -104,11 +104,25 @@
         int unloadedTopics = 0;
         try {
             LOG.info("Disabling ownership: {}", this.suName);
-            pulsar.getNamespaceService().getOwnershipCache().disableOwnership(this.suName);
+            pulsar.getNamespaceService().getOwnershipCache().updateBundleState(this.suName, false);
 
-            // Handle unload of persistent topics
-            unloadedTopics = pulsar.getBrokerService().unloadServiceUnit(suName).get();
-            pulsar.getNamespaceService().getOwnershipCache().removeOwnership(suName);
+            // close topics forcefully
+            try {
+                unloadedTopics = pulsar.getBrokerService().unloadServiceUnit(suName).get();
+            } catch (Exception e) {
+                // ignore topic-close failure to unload bundle
+                LOG.error("Failed to close topics under namespace {}", suName.toString(), e);
+            }
+            
+            // delete ownership node on zk
+            try {
+                pulsar.getNamespaceService().getOwnershipCache().removeOwnership(suName);
+            } catch (Exception e) {
+                // Failed to remove ownership node: enable namespace-bundle again so, it can serve new topics
+                pulsar.getNamespaceService().getOwnershipCache().updateBundleState(this.suName, true);
+                throw new RuntimeException(String.format("Failed to delete ownership node %s", suName.toString()),
+                        e.getCause());
+            }
         } catch (Exception e) {
             LOG.error(String.format("failed to unload a namespace. ns=%s", suName.toString()), e);
             throw new RuntimeException(e);
@@ -126,4 +140,8 @@
     public boolean isActive() {
         return this.isActive.get();
     }
+    
+    public void setActive(boolean active) {
+        isActive.set(active);
+    }
 }
diff --git a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/OwnershipCache.java b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/OwnershipCache.java
index 18dee92..c47e990 100644
--- a/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/OwnershipCache.java
+++ b/pulsar-broker/src/main/java/com/yahoo/pulsar/broker/namespace/OwnershipCache.java
@@ -151,23 +151,6 @@
         }
     }
 
-    private class OwnedServiceUnitCacheRemovalListener implements RemovalListener<String, OwnedServiceUnit> {
-
-        @Override
-        public void onRemoval(RemovalNotification<String, OwnedServiceUnit> notification) {
-            // Under the cache sync lock, removing the ZNode
-            // If succeeded, we guaranteed that the cache entry is removed together w/ ZNode
-            try {
-                localZkCache.getZooKeeper().delete(notification.getKey(), -1);
-                ownershipReadOnlyCache.invalidate(notification.getKey());
-
-                LOG.info("Removed zk lock for service unit: {}", notification.getKey());
-            } catch (Exception e) {
-                LOG.error("Failed to delete the namespace ephemeral node. key={}", notification.getKey(), e);
-            }
-        }
-    }
-
     /**
      * Constructor of <code>OwnershipCache</code>
      *
@@ -187,8 +170,7 @@
         this.localZkCache = pulsar.getLocalZkCache();
         this.ownershipReadOnlyCache = pulsar.getLocalZkCacheService().ownerInfoCache();
         // ownedServiceUnitsCache contains all namespaces that are owned by the local broker
-        this.ownedServiceUnitsCache = CacheBuilder.newBuilder()
-                .removalListener(new OwnedServiceUnitCacheRemovalListener()).build(new OwnedServiceUnitCacheLoader());
+        this.ownedServiceUnitsCache = CacheBuilder.newBuilder().build(new OwnedServiceUnitCacheLoader());
     }
 
     /**
@@ -237,9 +219,21 @@
      *
      * @param suId
      *            identifier of the <code>ServiceUnit</code>
+     * @throws Exception 
      */
-    public void removeOwnership(ServiceUnitId suname) {
-        this.ownedServiceUnitsCache.invalidate(ServiceUnitZkUtils.path(suname));
+    public void removeOwnership(ServiceUnitId suname) throws Exception {
+        // Under the cache sync lock, removing the ZNode
+        // If succeeded, we guaranteed that the cache entry is removed together w/ ZNode
+        String key = ServiceUnitZkUtils.path(suname);
+        try {
+            localZkCache.getZooKeeper().delete(key, -1);
+            this.ownedServiceUnitsCache.invalidate(key);
+            this.ownershipReadOnlyCache.invalidate(key);
+            LOG.info("Removed zk lock for service unit: {}", key);
+        } catch (KeeperException.NoNodeException e) {
+            LOG.warn("[{}] Failed to delete the namespace ephemeral node. key={}", key, e.getMessage());
+        }
+
     }
 
     /**
@@ -283,4 +277,18 @@
         localZkCache.getZooKeeper().setData(path, jsonMapper.writeValueAsBytes(selfOwnerInfos[1]), -1);
         this.ownershipReadOnlyCache.invalidate(path);
     }
+    
+    /**
+     * Update bundle state in a local cache
+     * 
+     * @param bundle
+     * @throws Exception
+     */
+    public void updateBundleState(ServiceUnitId bundle, boolean isActive) throws Exception {
+        // Disable owned instance in local cache
+        OwnedServiceUnit ownedServiceUnit = getOwnedServiceUnit(bundle);
+        if (ownedServiceUnit != null) {
+            ownedServiceUnit.setActive(isActive);
+        }
+    }
 }
diff --git a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/namespace/NamespaceServiceTest.java b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/namespace/NamespaceServiceTest.java
index 0718d14..38dc8d2 100644
--- a/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/namespace/NamespaceServiceTest.java
+++ b/pulsar-broker/src/test/java/com/yahoo/pulsar/broker/namespace/NamespaceServiceTest.java
@@ -57,6 +57,13 @@
 import com.yahoo.pulsar.common.naming.ServiceUnitId;
 import com.yahoo.pulsar.common.policies.data.Policies;
 import com.yahoo.pulsar.common.util.ObjectMapperFactory;
+import static org.mockito.Mockito.doAnswer;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import com.yahoo.pulsar.broker.service.Topic;
+import com.yahoo.pulsar.client.api.Consumer;
+import com.yahoo.pulsar.client.api.ConsumerConfiguration;
+import com.yahoo.pulsar.common.util.collections.ConcurrentOpenHashMap;
 
 public class NamespaceServiceTest extends BrokerTestBase {
 
@@ -245,4 +252,39 @@
         ((LoadingCache<NamespaceName, NamespaceBundles>) bCacheField.get(utilityFactory)).put(nsname, bundles);
         return utilityFactory.splitBundles(targetBundle, 2);
     }
+    
+    @Test
+    public void testUnloadNamespaceBundleFailure() throws Exception {
+
+        final String topicName = "persistent://my-property/use/my-ns/my-topic1";
+        ConsumerConfiguration conf = new ConsumerConfiguration();
+        Consumer consumer = pulsarClient.subscribe(topicName, "my-subscriber-name", conf);
+        ConcurrentOpenHashMap<String, CompletableFuture<Topic>> topics = pulsar.getBrokerService().getTopics();
+        Topic spyTopic = spy(topics.get(topicName).get());
+        topics.clear();
+        CompletableFuture<Topic> topicFuture = CompletableFuture.completedFuture(spyTopic);
+        // add mock topic
+        topics.put(topicName, topicFuture);
+        doAnswer(new Answer<CompletableFuture<Void>>() {
+            @Override
+            public CompletableFuture<Void> answer(InvocationOnMock invocation) throws Throwable {
+                CompletableFuture<Void> result = new CompletableFuture<>();
+                result.completeExceptionally(new RuntimeException("first time failed"));
+                return result;
+            }
+        }).when(spyTopic).close();
+        ServiceUnitId bundle = pulsar.getNamespaceService().getBundle(DestinationName.get(topicName));
+        try {
+            pulsar.getNamespaceService().unloadNamespaceBundle((NamespaceBundle) bundle);
+        } catch (Exception e) {
+            // fail
+            fail(e.getMessage());
+        }
+        try {
+            pulsar.getLocalZkCache().getZooKeeper().getData(ServiceUnitZkUtils.path(bundle), null, null);
+            fail("it should fail as node is not present");
+        } catch (org.apache.zookeeper.KeeperException.NoNodeException e) {
+            // ok
+        }
+    }
 }