[improve][admin][branch-2.10] Unset namespace policy to improve deleting namespace (#17033) (#19865)
Co-authored-by: Jiwei Guo <technoboy@apache.org>
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 2269b22..6e9ecf2 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -310,22 +310,28 @@
return;
}
// remove from owned namespace map and ephemeral node from ZK
- final List<CompletableFuture<Void>> futures = Lists.newArrayList();
+ final CompletableFuture<Void> deleteSystemTopicFuture;
// remove system topics first.
Set<String> noPartitionedTopicPolicySystemTopic = new HashSet<>();
Set<String> partitionedTopicPolicySystemTopic = new HashSet<>();
+ Set<String> partitionSystemTopic = new HashSet<>();
+ Set<String> noPartitionSystemTopic = new HashSet<>();
if (!topics.isEmpty()) {
for (String topic : topics) {
try {
+ TopicName topicName = TopicName.get(topic);
if (EventsTopicNames.isTopicPoliciesSystemTopic(topic)) {
- TopicName topicName = TopicName.get(topic);
if (topicName.isPartitioned()) {
- partitionedTopicPolicySystemTopic.add(topic);
+ partitionedTopicPolicySystemTopic.add(topicName.getPartitionedTopicName());
} else {
noPartitionedTopicPolicySystemTopic.add(topic);
}
} else {
- futures.add(pulsar().getAdminClient().topics().deleteAsync(topic, true, true));
+ if (topicName.isPartitioned()) {
+ partitionSystemTopic.add(topicName.getPartitionedTopicName());
+ } else {
+ noPartitionSystemTopic.add(topic);
+ }
}
} catch (Exception ex) {
log.error("[{}] Failed to delete system topic {}", clientAppId(), topic, ex);
@@ -333,8 +339,15 @@
return;
}
}
+ deleteSystemTopicFuture = internalDeleteTopicsAsync(noPartitionSystemTopic)
+ .thenCompose(ignore -> internalDeletePartitionedTopicsAsync(partitionSystemTopic))
+ .thenCompose(ignore -> internalDeleteTopicsAsync(noPartitionedTopicPolicySystemTopic))
+ .thenCompose(ignore -> internalDeletePartitionedTopicsAsync(partitionedTopicPolicySystemTopic));
+ } else {
+ deleteSystemTopicFuture = CompletableFuture.completedFuture(null);
}
- FutureUtil.waitForAll(futures)
+
+ deleteSystemTopicFuture
.thenCompose(ignore -> internalDeleteTopicsAsync(noPartitionedTopicPolicySystemTopic))
.thenCompose(ignore -> internalDeletePartitionedTopicsAsync(partitionedTopicPolicySystemTopic))
.thenCompose(__ -> {
@@ -510,9 +523,6 @@
}
String partitionedTopic = topicName.getPartitionedTopicName();
if (!partitionedTopics.contains(partitionedTopic)) {
- // Distinguish partitioned topic to avoid duplicate deletion of the same schema
- topicFutures.add(pulsar().getAdminClient().topics().deletePartitionedTopicAsync(
- partitionedTopic, true, true));
partitionedTopics.add(partitionedTopic);
}
} else {
@@ -524,10 +534,10 @@
}
continue;
}
- topicFutures.add(pulsar().getAdminClient().topics().deleteAsync(
- topic, true, true));
nonPartitionedTopics.add(topic);
}
+ topicFutures.add(pulsar().getAdminClient().topics().deleteAsync(
+ topic, true, true));
} catch (Exception e) {
String errorMessage = String.format("Failed to force delete topic %s, "
+ "but the previous deletion command of partitioned-topics:%s "
@@ -540,6 +550,11 @@
}
}
+ for (String partitionedTopic : partitionedTopics) {
+ topicFutures.add(namespaceResources().getPartitionedTopicResources()
+ .deletePartitionedTopicAsync(TopicName.get(partitionedTopic)));
+ }
+
if (log.isDebugEnabled()) {
log.debug("Successfully send deletion command of partitioned-topics:{} "
+ "and non-partitioned-topics:{} in namespace:{}.",
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
index fcc2f14..a928101 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApi2Test.java
@@ -125,6 +125,7 @@
@BeforeMethod
@Override
public void setup() throws Exception {
+ conf.setForceDeleteNamespaceAllowed(true);
conf.setLoadBalancerEnabled(true);
conf.setEnableNamespaceIsolationUpdateOnTime(true);
super.internalSetup();
@@ -2646,4 +2647,15 @@
assertEquals(topicStats.getPublishers().size(), 2);
topicStats.getPublishers().forEach(p -> assertTrue(p.isSupportsPartialProducer()));
}
+
+ @Test
+ private void testDeleteNamespaceForciblyWithManyTopics() throws Exception {
+ final String ns = "prop-xyz/ns-testDeleteNamespaceForciblyWithManyTopics";
+ admin.namespaces().createNamespace(ns, 2);
+ for (int i = 0; i < 100; i++) {
+ admin.topics().createPartitionedTopic(String.format("persistent://%s", ns + "/topic" + i), 3);
+ }
+ admin.namespaces().deleteNamespace(ns, true);
+ Assert.assertFalse(admin.namespaces().getNamespaces("prop-xyz").contains(ns));
+ }
}