[fix][client][branch-4.0] Partitioned topics are unexpectedly created by client after deletion (#24554) (#24571)

(cherry picked from commit 16271dc888c20d3e2233f1717b37f6f13fcb8af3)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index 7a82087..3f83e91 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -3073,7 +3073,7 @@
 
         // Truncate to ensure the offloaded data is not orphaned.
         // Also ensures the BK ledgers are deleted and not just scheduled for deletion
-        CompletableFuture<Void> truncateFuture = asyncTruncate();
+        CompletableFuture<Void> truncateFuture = asyncTruncate(true);
         truncateFuture.whenComplete((ignore, exc) -> {
             if (exc != null) {
                 log.error("[{}] Error truncating ledger for deletion", name, exc);
@@ -4480,6 +4480,12 @@
 
     @Override
     public CompletableFuture<Void> asyncTruncate() {
+        return asyncTruncate(false);
+    }
+
+    // When asyncTruncate is called by asyncDelete, the argument should be true because cursors will not be accessed
+    // after the managed ledger is deleted.
+    private CompletableFuture<Void> asyncTruncate(boolean ignoreCursorFailure) {
 
         final List<CompletableFuture<Void>> futures = new ArrayList();
         for (ManagedCursor cursor : cursors) {
@@ -4492,7 +4498,12 @@
 
                 @Override
                 public void clearBacklogFailed(ManagedLedgerException exception, Object ctx) {
-                    future.completeExceptionally(exception);
+                    if (ignoreCursorFailure) {
+                        log.warn("Failed to clear backlog for cursor {}", cursor.getName(), exception);
+                        future.complete(null);
+                    } else {
+                        future.completeExceptionally(exception);
+                    }
                 }
             }, null);
             futures.add(future);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
index c90ad152..efeb429 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicAutoCreationTest.java
@@ -24,7 +24,9 @@
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
+import java.io.Closeable;
 import java.net.InetSocketAddress;
+import java.time.Duration;
 import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
@@ -36,18 +38,21 @@
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.ClientBuilder;
 import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.Producer;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
+import org.apache.pulsar.client.api.PulsarClient;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.impl.LookupService;
 import org.apache.pulsar.client.impl.LookupTopicResult;
 import org.apache.pulsar.client.impl.PulsarClientImpl;
 import org.apache.pulsar.common.naming.NamespaceName;
+import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride;
 import org.apache.pulsar.common.policies.data.TopicType;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 @Test(groups = "broker-admin")
@@ -55,7 +60,7 @@
 public class TopicAutoCreationTest extends ProducerConsumerBase {
 
     @Override
-    @BeforeMethod
+    @BeforeClass
     protected void setup() throws Exception {
         conf.setAllowAutoTopicCreationType(TopicType.PARTITIONED);
         conf.setAllowAutoTopicCreation(true);
@@ -71,7 +76,7 @@
     }
 
     @Override
-    @AfterMethod(alwaysRun = true)
+    @AfterClass(alwaysRun = true)
     protected void cleanup() throws Exception {
         super.internalCleanup();
     }
@@ -87,9 +92,11 @@
                 .create();
 
         List<String> partitionedTopics = admin.topics().getPartitionedTopicList(namespaceName);
+        assertTrue(partitionedTopics.contains(topic));
         List<String> topics = admin.topics().getList(namespaceName);
-        assertEquals(partitionedTopics.size(), 1);
-        assertEquals(topics.size(), 3);
+        for (int i = 0; i < conf.getDefaultNumPartitions(); i++) {
+            assertTrue(topics.contains(topic + TopicName.PARTITIONED_TOPIC_SUFFIX + i));
+        }
 
         producer.close();
         for (String t : topics) {
@@ -248,4 +255,48 @@
         admin.namespaces().deleteNamespace(namespace, true);
     }
 
+    @Test
+    public void testPartitionsNotCreatedAfterDeletion() throws Exception {
+        @Cleanup final var client = PulsarClient.builder().serviceUrl(pulsar.getBrokerServiceUrl()).build();
+        final var topicName = TopicName.get("my-property/my-ns/testPartitionsNotCreatedAfterDeletion");
+        final var topic = topicName.toString();
+        final var interval = Duration.ofSeconds(1);
+        final ThrowableConsumer<ThrowableSupplier<Closeable>> verifier = creator -> {
+            admin.topics().createPartitionedTopic(topic, 1);
+            boolean needCleanup = false;
+            try (final var ignored = creator.get()) {
+                admin.topics().terminatePartitionedTopic(topic);
+                admin.topics().deletePartitionedTopic(topic, true);
+                Thread.sleep(interval.toMillis() + 500); // wait until the auto update partitions task has run
+
+                final var topics = admin.topics().getList(topicName.getNamespace()).stream()
+                        .filter(__ -> __.contains(topicName.getLocalName())).toList();
+                // Without https://github.com/apache/pulsar/pull/24118, the producer or consumer on partition 0 could be
+                // automatically created.
+                if (!topics.isEmpty()) {
+                    assertEquals(topics, List.of(topicName.getPartition(0).toString()));
+                    needCleanup = true;
+                }
+            }
+            if (needCleanup) {
+                admin.topics().delete(topicName.getPartition(0).toString());
+            }
+        };
+        verifier.accept(() -> client.newProducer().topic(topic)
+                .autoUpdatePartitionsInterval(interval.toSecondsPart(), TimeUnit.SECONDS).create());
+        verifier.accept(() -> client.newConsumer().topic(topic).subscriptionName("sub")
+                .autoUpdatePartitionsInterval(interval.toSecondsPart(), TimeUnit.SECONDS).subscribe());
+        verifier.accept(() -> client.newReader().topic(topic).startMessageId(MessageId.earliest)
+                .autoUpdatePartitionsInterval(interval.toSecondsPart(), TimeUnit.SECONDS).create());
+    }
+
+    private interface ThrowableConsumer<T> {
+
+        void accept(T value) throws Exception;
+    }
+
+    public interface ThrowableSupplier<T> {
+
+        T get() throws Exception;
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
index 62d4e9c..10cdd97 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java
@@ -1396,7 +1396,7 @@
     private CompletableFuture<Void> subscribeIncreasedTopicPartitions(String topicName) {
         int oldPartitionNumber = partitionedTopics.get(topicName);
 
-        return client.getPartitionsForTopic(topicName).thenCompose(list -> {
+        return client.getPartitionsForTopic(topicName, false).thenCompose(list -> {
             int currentPartitionNumber = Long.valueOf(list.stream()
                     .filter(t -> TopicName.get(t).isPartitioned()).count()).intValue();
 
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
index 77ed7d34..cee86de 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PartitionedProducerImpl.java
@@ -391,7 +391,7 @@
                 return future;
             }
 
-            client.getPartitionsForTopic(topic).thenCompose(list -> {
+            client.getPartitionsForTopic(topic, false).thenCompose(list -> {
                 int oldPartitionNumber = topicMetadata.numPartitions();
                 int currentPartitionNumber = list.size();