[improve][admin][branch-2.10] Unset namespace policy to improve deleting namespace (#17033) (#19865)

Co-authored-by: Jiwei Guo <technoboy@apache.org>
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 2269b22..6e9ecf2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -310,22 +310,28 @@
             return;
         }
         // remove from owned namespace map and ephemeral node from ZK
-        final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+        final CompletableFuture<Void> deleteSystemTopicFuture;
         // remove system topics first.
         Set<String> noPartitionedTopicPolicySystemTopic = new HashSet<>();
         Set<String> partitionedTopicPolicySystemTopic = new HashSet<>();
+        Set<String> partitionSystemTopic = new HashSet<>();
+        Set<String> noPartitionSystemTopic = new HashSet<>();
         if (!topics.isEmpty()) {
             for (String topic : topics) {
                 try {
+                    TopicName topicName = TopicName.get(topic);
                     if (EventsTopicNames.isTopicPoliciesSystemTopic(topic)) {
-                        TopicName topicName = TopicName.get(topic);
                         if (topicName.isPartitioned()) {
-                            partitionedTopicPolicySystemTopic.add(topic);
+                            partitionedTopicPolicySystemTopic.add(topicName.getPartitionedTopicName());
                         } else {
                             noPartitionedTopicPolicySystemTopic.add(topic);
                         }
                     } else {
-                        futures.add(pulsar().getAdminClient().topics().deleteAsync(topic, true, true));
+                        if (topicName.isPartitioned()) {
+                            partitionSystemTopic.add(topicName.getPartitionedTopicName());
+                        } else {
+                            noPartitionSystemTopic.add(topic);
+                        }
                     }
                 } catch (Exception ex) {
                     log.error("[{}] Failed to delete system topic {}", clientAppId(), topic, ex);
@@ -333,8 +339,15 @@
                     return;
                 }
             }
+            deleteSystemTopicFuture = internalDeleteTopicsAsync(noPartitionSystemTopic)
+                    .thenCompose(ignore -> internalDeletePartitionedTopicsAsync(partitionSystemTopic))
+                    .thenCompose(ignore -> internalDeleteTopicsAsync(noPartitionedTopicPolicySystemTopic))
+                    .thenCompose(ignore -> internalDeletePartitionedTopicsAsync(partitionedTopicPolicySystemTopic));
+        } else {
+            deleteSystemTopicFuture = CompletableFuture.completedFuture(null);
         }
-        FutureUtil.waitForAll(futures)
+
+        deleteSystemTopicFuture
                 .thenCompose(ignore -> internalDeleteTopicsAsync(noPartitionedTopicPolicySystemTopic))
                 .thenCompose(ignore -> internalDeletePartitionedTopicsAsync(partitionedTopicPolicySystemTopic))
                 .thenCompose(__ -> {
@@ -510,9 +523,6 @@
                             }
                             String partitionedTopic = topicName.getPartitionedTopicName();
                             if (!partitionedTopics.contains(partitionedTopic)) {
-                                // Distinguish partitioned topic to avoid duplicate deletion of the same schema
-                                topicFutures.add(pulsar().getAdminClient().topics().deletePartitionedTopicAsync(
-                                        partitionedTopic, true, true));
                                 partitionedTopics.add(partitionedTopic);
                             }
                         } else {
@@ -524,10 +534,10 @@
                                 }
                                 continue;
                             }
-                            topicFutures.add(pulsar().getAdminClient().topics().deleteAsync(
-                                    topic, true, true));
                             nonPartitionedTopics.add(topic);
                         }
+                        topicFutures.add(pulsar().getAdminClient().topics().deleteAsync(
+                                topic, true, true));
                     } catch (Exception e) {
                         String errorMessage = String.format("Failed to force delete topic %s, "
                                         + "but the previous deletion command of partitioned-topics:%s "
@@ -540,6 +550,11 @@
                     }
                 }
 
+                for (String partitionedTopic : partitionedTopics) {
+                    topicFutures.add(namespaceResources().getPartitionedTopicResources()
+                            .deletePartitionedTopicAsync(TopicName.get(partitionedTopic)));
+                }
+
                 if (log.isDebugEnabled()) {
                     log.debug("Successfully send deletion command of partitioned-topics:{} "
                                     + "and non-partitioned-topics:{} in namespace:{}.",
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index fcc2f14..a928101 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -125,6 +125,7 @@
     @BeforeMethod
     @Override
     public void setup() throws Exception {
+        conf.setForceDeleteNamespaceAllowed(true);
         conf.setLoadBalancerEnabled(true);
         conf.setEnableNamespaceIsolationUpdateOnTime(true);
         super.internalSetup();
@@ -2646,4 +2647,15 @@
         assertEquals(topicStats.getPublishers().size(), 2);
         topicStats.getPublishers().forEach(p -> assertTrue(p.isSupportsPartialProducer()));
     }
+
+    @Test
+    private void testDeleteNamespaceForciblyWithManyTopics() throws Exception {
+        final String ns = "prop-xyz/ns-testDeleteNamespaceForciblyWithManyTopics";
+        admin.namespaces().createNamespace(ns, 2);
+        for (int i = 0; i < 100; i++) {
+            admin.topics().createPartitionedTopic(String.format("persistent://%s", ns + "/topic" + i), 3);
+        }
+        admin.namespaces().deleteNamespace(ns, true);
+        Assert.assertFalse(admin.namespaces().getNamespaces("prop-xyz").contains(ns));
+    }
 }