[cherry-pick][branch-2.9] Fix delete system topic clean topic policy (#18823) (#18831)
cherry-pick #18823
### Motivation
If users set topic policy for system topic, then delete this system topic, the topic policy should be deleted.
### Modification
Only change_events topic do not need to clear topic policies.
### Matching PR in forked repository
PR in forked repository: https://github.com/liangyepianzhou/pulsar/pull/16
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
index 7e2fa1a..358085c 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java
@@ -20,6 +20,7 @@
package org.apache.pulsar.broker.admin.impl;
import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.pulsar.broker.service.BrokerService.isTopicPoliciesSystemTopic;
import static org.apache.pulsar.common.policies.data.PoliciesUtil.defaultBundle;
import static org.apache.pulsar.common.policies.data.PoliciesUtil.getBundles;
import com.google.common.collect.Lists;
@@ -279,21 +280,46 @@
return;
}
+ CompletableFuture<Void> deleteSystemTopicFuture = null;
if (!isEmpty) {
if (log.isDebugEnabled()) {
log.debug("Found topics on namespace {}", namespaceName);
}
+ List<String> allSystemTopics = new ArrayList<>();
+ List<String> allPartitionedSystemTopics = new ArrayList<>();
+ List<String> topicPolicy = new ArrayList<>();
+ List<String> partitionedTopicPolicy = new ArrayList<>();
boolean hasNonSystemTopic = false;
for (String topic : topics) {
if (!pulsar().getBrokerService().isSystemTopic(TopicName.get(topic))) {
hasNonSystemTopic = true;
break;
}
+ TopicName topicName = TopicName.get(topic);
+ if (topicName.isPartitioned()) {
+ if (isTopicPoliciesSystemTopic(topic)) {
+ partitionedTopicPolicy.add(topic);
+ } else {
+ allPartitionedSystemTopics.add(topic);
+ }
+ } else {
+ if (isTopicPoliciesSystemTopic(topic)) {
+ topicPolicy.add(topic);
+ } else {
+ allSystemTopics.add(topic);
+ }
+ }
}
if (hasNonSystemTopic) {
asyncResponse.resume(new RestException(Status.CONFLICT, "Cannot delete non empty namespace"));
return;
}
+ deleteSystemTopicFuture = internalDeleteTopicsAsync(allSystemTopics)
+ .thenCompose(ignore -> internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics))
+ .thenCompose(ignore -> internalDeleteTopicsAsync(topicPolicy))
+ .thenCompose(ignore__ -> internalDeletePartitionedTopicsAsync(partitionedTopicPolicy));
+ } else {
+ deleteSystemTopicFuture = CompletableFuture.completedFuture(null);
}
// set the policies to deleted so that somebody else cannot acquire this namespace
@@ -308,21 +334,7 @@
return;
}
- // remove from owned namespace map and ephemeral node from ZK
- final List<CompletableFuture<Void>> futures = Lists.newArrayList();
- // remove system topics first.
- if (!topics.isEmpty()) {
- for (String topic : topics) {
- try {
- futures.add(pulsar().getAdminClient().topics().deleteAsync(topic, true, true));
- } catch (Exception ex) {
- log.error("[{}] Failed to delete system topic {}", clientAppId(), topic, ex);
- asyncResponse.resume(new RestException(Status.INTERNAL_SERVER_ERROR, ex));
- return;
- }
- }
- }
- FutureUtil.waitForAll(futures).thenCompose(__ -> {
+ deleteSystemTopicFuture.thenCompose(__ -> {
List<CompletableFuture<Void>> deleteBundleFutures = Lists.newArrayList();
NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundles(namespaceName);
@@ -473,17 +485,23 @@
try {
// firstly remove all topics including system topics
if (!topics.isEmpty()) {
- Set<String> partitionedTopics = new HashSet<>();
- Set<String> nonPartitionedTopics = new HashSet<>();
- Set<String> allSystemTopics = new HashSet<>();
- Set<String> allPartitionedSystemTopics = new HashSet<>();
+ List<String> partitionedTopics = new ArrayList<>();
+ List<String> nonPartitionedTopics = new ArrayList<>();
+ List<String> allSystemTopics = new ArrayList<>();
+ List<String> allPartitionedSystemTopics = new ArrayList<>();
+ List<String> topicPolicy = new ArrayList<>();
+ List<String> partitionedTopicPolicy = new ArrayList<>();
for (String topic : topics) {
try {
TopicName topicName = TopicName.get(topic);
if (topicName.isPartitioned()) {
if (pulsar().getBrokerService().isSystemTopic(topicName)) {
- allPartitionedSystemTopics.add(topic);
+ if (isTopicPoliciesSystemTopic(topic)) {
+ partitionedTopicPolicy.add(topic);
+ } else {
+ allPartitionedSystemTopics.add(topic);
+ }
continue;
}
String partitionedTopic = topicName.getPartitionedTopicName();
@@ -495,7 +513,11 @@
}
} else {
if (pulsar().getBrokerService().isSystemTopic(topicName)) {
- allSystemTopics.add(topic);
+ if (isTopicPoliciesSystemTopic(topic)) {
+ topicPolicy.add(topic);
+ } else {
+ allSystemTopics.add(topic);
+ }
continue;
}
topicFutures.add(pulsar().getAdminClient().topics().deleteAsync(
@@ -522,8 +544,10 @@
final CompletableFuture<Throwable> topicFutureEx =
FutureUtil.waitForAll(topicFutures)
- .thenCompose(ignore -> internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics))
.thenCompose(ignore -> internalDeleteTopicsAsync(allSystemTopics))
+ .thenCompose(ignore -> internalDeletePartitionedTopicsAsync(allPartitionedSystemTopics))
+ .thenCompose(ignore -> internalDeleteTopicsAsync(topicPolicy))
+ .thenCompose(ignore__ -> internalDeletePartitionedTopicsAsync(partitionedTopicPolicy))
.handle((result, exception) -> {
if (exception != null) {
if (exception.getCause() instanceof PulsarAdminException) {
@@ -582,7 +606,7 @@
});
}
- private CompletableFuture<Void> internalDeletePartitionedTopicsAsync(Set<String> topicNames) {
+ private CompletableFuture<Void> internalDeletePartitionedTopicsAsync(List<String> topicNames) {
if (CollectionUtils.isEmpty(topicNames)) {
return CompletableFuture.completedFuture(null);
}
@@ -601,7 +625,7 @@
}
return FutureUtil.waitForAll(futures);
}
- private CompletableFuture<Void> internalDeleteTopicsAsync(Set<String> topicNames) {
+ private CompletableFuture<Void> internalDeleteTopicsAsync(List<String> topicNames) {
if (CollectionUtils.isEmpty(topicNames)) {
return CompletableFuture.completedFuture(null);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index b251b0d..10ae4fa 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -24,6 +24,7 @@
import static org.apache.commons.collections.CollectionUtils.isEmpty;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
import static org.apache.pulsar.broker.PulsarService.isTransactionSystemTopic;
+import static org.apache.pulsar.common.events.EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
@@ -2846,6 +2847,13 @@
return false;
}
+ public static boolean isTopicPoliciesSystemTopic(String topic) {
+ if (topic == null) {
+ return false;
+ }
+ return TopicName.get(topic).getLocalName().equals(NAMESPACE_EVENTS_LOCAL_NAME);
+ }
+
/**
* Get {@link TopicPolicies} for the parameterized topic.
* @param topicName
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index bcf81fb..cc90770 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -20,6 +20,7 @@
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.commons.lang3.StringUtils.isBlank;
+import static org.apache.pulsar.broker.service.BrokerService.isTopicPoliciesSystemTopic;
import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames;
import static org.apache.pulsar.compaction.Compactor.COMPACTION_SUBSCRIPTION;
import com.carrotsearch.hppc.ObjectObjectHashMap;
@@ -1182,8 +1183,7 @@
deleteTopicAuthenticationFuture.thenCompose(ignore -> deleteSchema ? deleteSchema() :
CompletableFuture.completedFuture(null))
.thenCompose(ignore -> {
- if (!this.getBrokerService().getPulsar().getBrokerService()
- .isSystemTopic(TopicName.get(topic))) {
+ if (!isTopicPoliciesSystemTopic(topic)) {
return deleteTopicPolicies();
} else {
return CompletableFuture.completedFuture(null);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
index b5b5776..55441d9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.admin;
+import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
@@ -33,6 +34,8 @@
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
+import static org.testng.AssertJUnit.assertNull;
+
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.lang.reflect.Field;
@@ -57,6 +60,7 @@
import javax.ws.rs.core.Response.Status;
import javax.ws.rs.core.UriBuilder;
import javax.ws.rs.core.UriInfo;
+import lombok.Cleanup;
import org.apache.bookkeeper.client.api.ReadHandle;
import org.apache.bookkeeper.mledger.LedgerOffloader;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
@@ -80,6 +84,7 @@
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.events.EventsTopicNames;
import org.apache.pulsar.common.naming.NamespaceBundle;
@@ -99,6 +104,7 @@
import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.policies.data.TenantInfoImpl;
+import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.metadata.cache.impl.MetadataCacheImpl;
import org.apache.pulsar.metadata.impl.AbstractMetadataStore;
import org.apache.zookeeper.KeeperException.Code;
@@ -1816,7 +1822,7 @@
}
@Test
- public void testNotClearTopicPolicesWhenDeleteSystemTopic() throws Exception {
+ public void testNotClearTopicPolicesWhenDeleteTopicPolicyTopic() throws Exception {
String namespace = this.testTenant + "/delete-systemTopic";
String topic = TopicName.get(TopicDomain.persistent.toString(), this.testTenant, "delete-systemTopic",
"testNotClearTopicPolicesWhenDeleteSystemTopic").toString();
@@ -1836,4 +1842,33 @@
// 4. delete the policies topic and the topic wil not to clear topic polices
admin.topics().delete(namespace + "/" + EventsTopicNames.NAMESPACE_EVENTS_LOCAL_NAME, true);
}
+ @Test
+ public void testDeleteTopicPolicyWhenDeleteSystemTopic() throws Exception {
+ conf.setTopicLevelPoliciesEnabled(true);
+ conf.setSystemTopicEnabled(true);
+ Field field = PulsarService.class.getDeclaredField("topicPoliciesService");
+ field.setAccessible(true);
+ field.set(pulsar, new SystemTopicBasedTopicPoliciesService(pulsar));
+
+ String systemTopic = SYSTEM_NAMESPACE.toString() + "/" + "testDeleteTopicPolicyWhenDeleteSystemTopic";
+ admin.tenants().createTenant(SYSTEM_NAMESPACE.getTenant(),
+ new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("use", "usc", "usw")));
+
+ admin.namespaces().createNamespace(SYSTEM_NAMESPACE.toString());
+ @Cleanup
+ Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+ .topic(systemTopic).create();
+ admin.topicPolicies().setMaxConsumers(systemTopic, 5);
+
+ Integer maxConsumerPerTopic = pulsar
+ .getTopicPoliciesService()
+ .getTopicPoliciesBypassCacheAsync(TopicName.get(systemTopic)).get()
+ .getMaxConsumerPerTopic();
+
+ assertEquals(maxConsumerPerTopic.intValue(), 5);
+ admin.topics().delete(systemTopic, true);
+ TopicPolicies topicPolicies = pulsar.getTopicPoliciesService()
+ .getTopicPoliciesBypassCacheAsync(TopicName.get(systemTopic)).get(5, TimeUnit.SECONDS);
+ assertNull(topicPolicies);
+ }
}