[fix][client][branch-4.0] Partitioned topics are unexpectedly created by client after deletion (#24554) (#24571)
(cherry picked from commit 16271dc888c20d3e2233f1717b37f6f13fcb8af3)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 7a82087..3f83e91 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3073,7 +3073,7 @@
// Truncate to ensure the offloaded data is not orphaned.
// Also ensures the BK ledgers are deleted and not just scheduled for deletion
- CompletableFuture<Void> truncateFuture = asyncTruncate();
+ CompletableFuture<Void> truncateFuture = asyncTruncate(true);
truncateFuture.whenComplete((ignore, exc) -> {
if (exc != null) {
log.error("[{}] Error truncating ledger for deletion", name, exc);
@@ -4480,6 +4480,12 @@
@Override
public CompletableFuture<Void> asyncTruncate() {
+ return asyncTruncate(false);
+ }
+
+ // When asyncTruncate is called by asyncDelete, the argument should be true because cursors will not be accessed
+ // after the managed ledger is deleted.
+ private CompletableFuture<Void> asyncTruncate(boolean ignoreCursorFailure) {
final List<CompletableFuture<Void>> futures = new ArrayList();
for (ManagedCursor cursor : cursors) {
@@ -4492,7 +4498,12 @@
@Override
public void clearBacklogFailed(ManagedLedgerException exception, Object ctx) {
- future.completeExceptionally(exception);
+ if (ignoreCursorFailure) {
+ log.warn("Failed to clear backlog for cursor {}", cursor.getName(), exception);
+ future.complete(null);
+ } else {
+ future.completeExceptionally(exception);
+ }
}
}, null);
futures.add(future);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
index c90ad152..efeb429 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
@@ -24,7 +24,9 @@
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
+import java.io.Closeable;
import java.net.InetSocketAddress;
+import java.time.Duration;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
@@ -36,18 +38,21 @@
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.LookupService;
import org.apache.pulsar.client.impl.LookupTopicResult;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
import org.apache.pulsar.common.policies.data.TopicType;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@Test(groups = "broker-admin")
@@ -55,7 +60,7 @@
public class TopicAutoCreationTest extends ProducerConsumerBase {
@Override
- @BeforeMethod
+ @BeforeClass
protected void setup() throws Exception {
conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
conf.setAllowAutoTopicCreation(true);
@@ -71,7 +76,7 @@
}
@Override
- @AfterMethod(alwaysRun = true)
+ @AfterClass(alwaysRun = true)
protected void cleanup() throws Exception {
super.internalCleanup();
}
@@ -87,9 +92,11 @@
.create();
List<String> partitionedTopics = admin.topics().getPartitionedTopicList(namespaceName);
+ assertTrue(partitionedTopics.contains(topic));
List<String> topics = admin.topics().getList(namespaceName);
- assertEquals(partitionedTopics.size(), 1);
- assertEquals(topics.size(), 3);
+ for (int i = 0; i < conf.getDefaultNumPartitions(); i++) {
+ assertTrue(topics.contains(topic + TopicName.PARTITIONED_TOPIC_SUFFIX + i));
+ }
producer.close();
for (String t : topics) {
@@ -248,4 +255,48 @@
admin.namespaces().deleteNamespace(namespace, true);
}
+ @Test
+ public void testPartitionsNotCreatedAfterDeletion() throws Exception {
+ @Cleanup final var client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
+ final var topicName = TopicName.get("my-property/my-ns/testPartitionsNotCreatedAfterDeletion");
+ final var topic = topicName.toString();
+ final var interval = Duration.ofSeconds(1);
+ final ThrowableConsumer<ThrowableSupplier<Closeable>> verifier = creator -> {
+ admin.topics().createPartitionedTopic(topic, 1);
+ boolean needCleanup = false;
+ try (final var ignored = creator.get()) {
+ admin.topics().terminatePartitionedTopic(topic);
+ admin.topics().deletePartitionedTopic(topic, true);
+ Thread.sleep(interval.toMillis() + 500); // wait until the auto update partitions task has run
+
+ final var topics = admin.topics().getList(topicName.getNamespace()).stream()
+ .filter(__ -> __.contains(topicName.getLocalName())).toList();
+ // Without https://github.com/apache/pulsar/pull/24118, the producer or consumer on partition 0 could be
+ // automatically created.
+ if (!topics.isEmpty()) {
+ assertEquals(topics, List.of(topicName.getPartition(0).toString()));
+ needCleanup = true;
+ }
+ }
+ if (needCleanup) {
+ admin.topics().delete(topicName.getPartition(0).toString());
+ }
+ };
+ verifier.accept(() -> client.newProducer().topic(topic)
+ .autoUpdatePartitionsInterval(interval.toSecondsPart(), TimeUnit.SECONDS).create());
+ verifier.accept(() -> client.newConsumer().topic(topic).subscriptionName("sub")
+ .autoUpdatePartitionsInterval(interval.toSecondsPart(), TimeUnit.SECONDS).subscribe());
+ verifier.accept(() -> client.newReader().topic(topic).startMessageId(MessageId.earliest)
+ .autoUpdatePartitionsInterval(interval.toSecondsPart(), TimeUnit.SECONDS).create());
+ }
+
+ private interface ThrowableConsumer<T> {
+
+ void accept(T value) throws Exception;
+ }
+
+ public interface ThrowableSupplier<T> {
+
+ T get() throws Exception;
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 62d4e9c..10cdd97 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -1396,7 +1396,7 @@
private CompletableFuture<Void> subscribeIncreasedTopicPartitions(String topicName) {
int oldPartitionNumber = partitionedTopics.get(topicName);
- return client.getPartitionsForTopic(topicName).thenCompose(list -> {
+ return client.getPartitionsForTopic(topicName, false).thenCompose(list -> {
int currentPartitionNumber = Long.valueOf(list.stream()
.filter(t -> TopicName.get(t).isPartitioned()).count()).intValue();
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index 77ed7d34..cee86de 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -391,7 +391,7 @@
return future;
}
- client.getPartitionsForTopic(topic).thenCompose(list -> {
+ client.getPartitionsForTopic(topic, false).thenCompose(list -> {
int oldPartitionNumber = topicMetadata.numPartitions();
int currentPartitionNumber = list.size();