[fix][ml] Make mlOwnershipChecker asynchronous so that it doesn't block/deadlock threads (#21333)

diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
index b1427ba..e09fd84 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
@@ -90,7 +90,7 @@
      *            opaque context
      */
     void asyncOpen(String name, ManagedLedgerConfig config, OpenLedgerCallback callback,
-            Supplier<Boolean> mlOwnershipChecker, Object ctx);
+            Supplier<CompletableFuture<Boolean>> mlOwnershipChecker, Object ctx);
 
     /**
      * Open a {@link ReadOnlyCursor} positioned to the earliest entry for the specified managed ledger.
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 7eccd70..39f56a1 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -2685,32 +2685,47 @@
                     }
 
                     @Override
-                    public void operationFailed(MetaStoreException e) {
-                        if (e instanceof MetaStoreException.BadVersionException) {
+                    public void operationFailed(MetaStoreException topLevelException) {
+                        if (topLevelException instanceof MetaStoreException.BadVersionException) {
                             log.warn("[{}] Failed to update cursor metadata for {} due to version conflict {}",
-                                    ledger.name, name, e.getMessage());
+                                    ledger.name, name, topLevelException.getMessage());
                             // it means previous owner of the ml might have updated the version incorrectly. So, check
                             // the ownership and refresh the version again.
-                            if (ledger.mlOwnershipChecker != null && ledger.mlOwnershipChecker.get()) {
-                                ledger.getStore().asyncGetCursorInfo(ledger.getName(), name,
-                                        new MetaStoreCallback<ManagedCursorInfo>() {
-                                            @Override
-                                            public void operationComplete(ManagedCursorInfo info, Stat stat) {
-                                                updateCursorLedgerStat(info, stat);
-                                            }
+                            if (ledger.mlOwnershipChecker != null) {
+                                ledger.mlOwnershipChecker.get().whenComplete((hasOwnership, t) -> {
+                                    if (t == null && hasOwnership) {
+                                        ledger.getStore().asyncGetCursorInfo(ledger.getName(), name,
+                                                new MetaStoreCallback<>() {
+                                                    @Override
+                                                    public void operationComplete(ManagedCursorInfo info, Stat stat) {
+                                                        updateCursorLedgerStat(info, stat);
+                                                        // fail the top level call so that the caller can retry
+                                                        callback.operationFailed(topLevelException);
+                                                    }
 
-                                            @Override
-                                            public void operationFailed(MetaStoreException e) {
-                                                if (log.isDebugEnabled()) {
-                                                    log.debug(
-                                                            "[{}] Failed to refresh cursor metadata-version for {} due "
-                                                            + "to {}", ledger.name, name, e.getMessage());
-                                                }
-                                            }
-                                        });
+                                                    @Override
+                                                    public void operationFailed(MetaStoreException e) {
+                                                        if (log.isDebugEnabled()) {
+                                                            log.debug(
+                                                                    "[{}] Failed to refresh cursor metadata-version "
+                                                                            + "for {} due to {}", ledger.name, name,
+                                                                    e.getMessage());
+                                                        }
+                                                        // fail the top level call so that the caller can retry
+                                                        callback.operationFailed(topLevelException);
+                                                    }
+                                                });
+                                    } else {
+                                        // fail the top level call so that the caller can retry
+                                        callback.operationFailed(topLevelException);
+                                    }
+                                });
+                            } else {
+                                callback.operationFailed(topLevelException);
                             }
+                        } else {
+                            callback.operationFailed(topLevelException);
                         }
-                        callback.operationFailed(e);
                     }
                 });
     }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 9107b76..03605bf 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -330,7 +330,7 @@
 
     @Override
     public void asyncOpen(final String name, final ManagedLedgerConfig config, final OpenLedgerCallback callback,
-            Supplier<Boolean> mlOwnershipChecker, final Object ctx) {
+            Supplier<CompletableFuture<Boolean>> mlOwnershipChecker, final Object ctx) {
         if (closed) {
             callback.openLedgerFailed(new ManagedLedgerException.ManagedLedgerFactoryClosedException(), ctx);
             return;
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index cc0020d..e349bf5 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -232,7 +232,7 @@
 
     private static final Random random = new Random(System.currentTimeMillis());
     private long maximumRolloverTimeMs;
-    protected final Supplier<Boolean> mlOwnershipChecker;
+    protected final Supplier<CompletableFuture<Boolean>> mlOwnershipChecker;
 
     volatile PositionImpl lastConfirmedEntry;
 
@@ -336,7 +336,7 @@
     }
     public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper, MetaStore store,
             ManagedLedgerConfig config, OrderedScheduler scheduledExecutor,
-            final String name, final Supplier<Boolean> mlOwnershipChecker) {
+            final String name, final Supplier<CompletableFuture<Boolean>> mlOwnershipChecker) {
         this.factory = factory;
         this.bookKeeper = bookKeeper;
         this.config = config;
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
index b33dd87..8b2742d 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
@@ -24,6 +24,7 @@
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.Supplier;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.bookkeeper.client.AsyncCallback;
@@ -50,7 +51,7 @@
     public ShadowManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper,
                                    MetaStore store, ManagedLedgerConfig config,
                                    OrderedScheduler scheduledExecutor,
-                                   String name, final Supplier<Boolean> mlOwnershipChecker) {
+                                   String name, final Supplier<CompletableFuture<Boolean>> mlOwnershipChecker) {
         super(factory, bookKeeper, store, config, scheduledExecutor, name, mlOwnershipChecker);
         this.sourceMLName = config.getShadowSourceName();
     }
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 5fc2da2..cd61e00 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -3389,7 +3389,7 @@
             @Override
             public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
             }
-        }, checkOwnershipFlag ? () -> true : null, null);
+        }, checkOwnershipFlag ? () -> CompletableFuture.completedFuture(true) : null, null);
         latch.await();
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 388d9de..d03a94a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1748,7 +1748,7 @@
                                 topicFuture.completeExceptionally(new PersistenceException(exception));
                             }
                         }
-                    }, () -> isTopicNsOwnedByBroker(topicName), null);
+                    }, () -> isTopicNsOwnedByBrokerAsync(topicName), null);
 
         }).exceptionally((exception) -> {
             log.warn("[{}] Failed to get topic configuration: {}", topic, exception.getMessage(), exception);
@@ -2136,13 +2136,16 @@
         });
     }
 
-    public boolean isTopicNsOwnedByBroker(TopicName topicName) {
-        try {
-            return pulsar.getNamespaceService().isServiceUnitOwned(topicName);
-        } catch (Exception e) {
-            log.warn("Failed to check the ownership of the topic: {}, {}", topicName, e.getMessage());
-        }
-        return false;
+    public CompletableFuture<Boolean> isTopicNsOwnedByBrokerAsync(TopicName topicName) {
+        return pulsar.getNamespaceService().isServiceUnitOwnedAsync(topicName)
+                .handle((hasOwnership, t) -> {
+                    if (t == null) {
+                        return hasOwnership;
+                    } else {
+                        log.warn("Failed to check the ownership of the topic: {}, {}", topicName, t.getMessage());
+                        return false;
+                    }
+                });
     }
 
     public CompletableFuture<Void> checkTopicNsOwnership(final String topic) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
index ecc6599c..5308648 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
@@ -159,7 +159,7 @@
                                                 , originPersistentTopic.getName(), subscription.getName(), exception);
                                         pendingAckStoreFuture.completeExceptionally(exception);
                                     }
-                                }, () -> true, null);
+                                }, () -> CompletableFuture.completedFuture(true), null);
                     }).exceptionally(e -> {
                         Throwable t = FutureUtil.unwrapCompletionException(e);
                         log.error("[{}] [{}] Failed to get managedLedger config when init pending ack store!",
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index faf141a..4e510a5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -176,7 +176,7 @@
 
         //load the nameserver, but topic is not init.
         log.info("lookup:{}",admin.lookups().lookupTopic(topic));
-        assertTrue(pulsar.getBrokerService().isTopicNsOwnedByBroker(topicName));
+        assertTrue(pulsar.getBrokerService().isTopicNsOwnedByBrokerAsync(topicName).join());
         assertFalse(pulsar.getBrokerService().getTopics().containsKey(topic));
         //make sure namespace policy reader is fully started.
         Awaitility.await().untilAsserted(()-> {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipCacheForCurrentServerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipCacheForCurrentServerTest.java
index e53d5c2..22fa2c3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipCacheForCurrentServerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipCacheForCurrentServerTest.java
@@ -76,7 +76,7 @@
         int verifiedBrokerNum = 0;
         for (PulsarService pulsarService : this.getPulsarServiceList()) {
             BrokerService bs = pulsarService.getBrokerService();
-            if (bs.isTopicNsOwnedByBroker(TopicName.get(topicName))) {
+            if (bs.isTopicNsOwnedByBrokerAsync(TopicName.get(topicName)).join()) {
                 continue;
             }
             verifiedBrokerNum ++;