Fix CPU 100% when deleting namespace (#10337) (#10454)

When deleting the namespace, the namespace Policies will be marked as deleted.
This will trigger topic's `onPoliciesUpdate`
However, in onPoliciesUpdate, the data of the Policies node on zk will be read, such as: `checkReplicationAndRetryOnFailure`
Due to the deletion of the namespace, the zk node may no longer exist at this time.
Failure to read data will trigger infinite retries.
https://github.com/apache/pulsar/blob/e970c2947aff9231202ab72bdbad047d85c55633/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java#L1175-L1193

If there are many topics, there will be a short-term CPU spike

![image](https://user-images.githubusercontent.com/9758905/115834541-ebc32480-a447-11eb-887a-95c4a3d1adf1.png)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index d301443..d87707a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -1087,7 +1087,8 @@
         return closeFuture;
     }
 
-    private CompletableFuture<Void> checkReplicationAndRetryOnFailure() {
+    @VisibleForTesting
+    CompletableFuture<Void> checkReplicationAndRetryOnFailure() {
         CompletableFuture<Void> result = new CompletableFuture<Void>();
         checkReplication().thenAccept(res -> {
             log.info("[{}] Policies updated successfully", topic);
@@ -2059,6 +2060,10 @@
         if (log.isDebugEnabled()) {
             log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic, isEncryptionRequired, data.encryption_required);
         }
+        if (data.deleted) {
+            log.debug("Ignore the update because it has been deleted : {}", data);
+            return CompletableFuture.completedFuture(null);
+        }
         isEncryptionRequired = data.encryption_required;
 
         setSchemaCompatibilityStrategy(data);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
new file mode 100644
index 0000000..f4d11b8
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentTopicTest.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.service.persistent;
+
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.collect.Sets;
+import org.apache.pulsar.broker.service.BrokerTestBase;
+import org.apache.pulsar.common.policies.data.Policies;
+import org.awaitility.Awaitility;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+@Test(groups = "broker")
+public class PersistentTopicTest extends BrokerTestBase {
+
+    @BeforeMethod(alwaysRun = true)
+    @Override
+    protected void setup() throws Exception {
+        super.baseSetup();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Test
+    public void testDeleteNamespaceInfiniteRetry() throws Exception {
+        //init namespace
+        final String myNamespace = "prop/ns" + UUID.randomUUID();
+        admin.namespaces().createNamespace(myNamespace, Sets.newHashSet("test"));
+        final String topic = "persistent://" + myNamespace + "/testDeleteNamespaceInfiniteRetry";
+        //init topic and policies
+        pulsarClient.newProducer().topic(topic).create().close();
+        admin.namespaces().setMaxConsumersPerTopic(myNamespace, 0);
+        Awaitility.await().atMost(3, TimeUnit.SECONDS).until(()
+                -> admin.namespaces().getMaxConsumersPerTopic(myNamespace) == 0);
+
+        PersistentTopic persistentTopic =
+                spy((PersistentTopic) pulsar.getBrokerService().getTopicIfExists(topic).get().get());
+
+        Policies policies = new Policies();
+        policies.deleted = true;
+        persistentTopic.onPoliciesUpdate(policies);
+        verify(persistentTopic, times(0)).checkReplicationAndRetryOnFailure();
+
+        policies.deleted = false;
+        persistentTopic.onPoliciesUpdate(policies);
+        verify(persistentTopic, times(1)).checkReplicationAndRetryOnFailure();
+    }
+}
\ No newline at end of file