[fix][ml] Make mlOwnershipChecker asynchronous so that it doesn't block/deadlock threads (#21333)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
index b1427ba..e09fd84 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerFactory.java
@@ -90,7 +90,7 @@
* opaque context
*/
void asyncOpen(String name, ManagedLedgerConfig config, OpenLedgerCallback callback,
- Supplier<Boolean> mlOwnershipChecker, Object ctx);
+ Supplier<CompletableFuture<Boolean>> mlOwnershipChecker, Object ctx);
/**
* Open a {@link ReadOnlyCursor} positioned to the earliest entry for the specified managed ledger.
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
index 7eccd70..39f56a1 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
@@ -2685,32 +2685,47 @@
}
@Override
- public void operationFailed(MetaStoreException e) {
- if (e instanceof MetaStoreException.BadVersionException) {
+ public void operationFailed(MetaStoreException topLevelException) {
+ if (topLevelException instanceof MetaStoreException.BadVersionException) {
log.warn("[{}] Failed to update cursor metadata for {} due to version conflict {}",
- ledger.name, name, e.getMessage());
+ ledger.name, name, topLevelException.getMessage());
// it means previous owner of the ml might have updated the version incorrectly. So, check
// the ownership and refresh the version again.
- if (ledger.mlOwnershipChecker != null && ledger.mlOwnershipChecker.get()) {
- ledger.getStore().asyncGetCursorInfo(ledger.getName(), name,
- new MetaStoreCallback<ManagedCursorInfo>() {
- @Override
- public void operationComplete(ManagedCursorInfo info, Stat stat) {
- updateCursorLedgerStat(info, stat);
- }
+ if (ledger.mlOwnershipChecker != null) {
+ ledger.mlOwnershipChecker.get().whenComplete((hasOwnership, t) -> {
+ if (t == null && hasOwnership) {
+ ledger.getStore().asyncGetCursorInfo(ledger.getName(), name,
+ new MetaStoreCallback<>() {
+ @Override
+ public void operationComplete(ManagedCursorInfo info, Stat stat) {
+ updateCursorLedgerStat(info, stat);
+ // fail the top level call so that the caller can retry
+ callback.operationFailed(topLevelException);
+ }
- @Override
- public void operationFailed(MetaStoreException e) {
- if (log.isDebugEnabled()) {
- log.debug(
- "[{}] Failed to refresh cursor metadata-version for {} due "
- + "to {}", ledger.name, name, e.getMessage());
- }
- }
- });
+ @Override
+ public void operationFailed(MetaStoreException e) {
+ if (log.isDebugEnabled()) {
+ log.debug(
+ "[{}] Failed to refresh cursor metadata-version "
+ + "for {} due to {}", ledger.name, name,
+ e.getMessage());
+ }
+ // fail the top level call so that the caller can retry
+ callback.operationFailed(topLevelException);
+ }
+ });
+ } else {
+ // fail the top level call so that the caller can retry
+ callback.operationFailed(topLevelException);
+ }
+ });
+ } else {
+ callback.operationFailed(topLevelException);
}
+ } else {
+ callback.operationFailed(topLevelException);
}
- callback.operationFailed(e);
}
});
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 9107b76..03605bf 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -330,7 +330,7 @@
@Override
public void asyncOpen(final String name, final ManagedLedgerConfig config, final OpenLedgerCallback callback,
- Supplier<Boolean> mlOwnershipChecker, final Object ctx) {
+ Supplier<CompletableFuture<Boolean>> mlOwnershipChecker, final Object ctx) {
if (closed) {
callback.openLedgerFailed(new ManagedLedgerException.ManagedLedgerFactoryClosedException(), ctx);
return;
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index cc0020d..e349bf5 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -232,7 +232,7 @@
private static final Random random = new Random(System.currentTimeMillis());
private long maximumRolloverTimeMs;
- protected final Supplier<Boolean> mlOwnershipChecker;
+ protected final Supplier<CompletableFuture<Boolean>> mlOwnershipChecker;
volatile PositionImpl lastConfirmedEntry;
@@ -336,7 +336,7 @@
}
public ManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper, MetaStore store,
ManagedLedgerConfig config, OrderedScheduler scheduledExecutor,
- final String name, final Supplier<Boolean> mlOwnershipChecker) {
+ final String name, final Supplier<CompletableFuture<Boolean>> mlOwnershipChecker) {
this.factory = factory;
this.bookKeeper = bookKeeper;
this.config = config;
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
index b33dd87..8b2742d 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ShadowManagedLedgerImpl.java
@@ -24,6 +24,7 @@
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
import java.util.function.Supplier;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.client.AsyncCallback;
@@ -50,7 +51,7 @@
public ShadowManagedLedgerImpl(ManagedLedgerFactoryImpl factory, BookKeeper bookKeeper,
MetaStore store, ManagedLedgerConfig config,
OrderedScheduler scheduledExecutor,
- String name, final Supplier<Boolean> mlOwnershipChecker) {
+ String name, final Supplier<CompletableFuture<Boolean>> mlOwnershipChecker) {
super(factory, bookKeeper, store, config, scheduledExecutor, name, mlOwnershipChecker);
this.sourceMLName = config.getShadowSourceName();
}
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 5fc2da2..cd61e00 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -3389,7 +3389,7 @@
@Override
public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
}
- }, checkOwnershipFlag ? () -> true : null, null);
+ }, checkOwnershipFlag ? () -> CompletableFuture.completedFuture(true) : null, null);
latch.await();
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 388d9de..d03a94a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1748,7 +1748,7 @@
topicFuture.completeExceptionally(new PersistenceException(exception));
}
}
- }, () -> isTopicNsOwnedByBroker(topicName), null);
+ }, () -> isTopicNsOwnedByBrokerAsync(topicName), null);
}).exceptionally((exception) -> {
log.warn("[{}] Failed to get topic configuration: {}", topic, exception.getMessage(), exception);
@@ -2136,13 +2136,16 @@
});
}
- public boolean isTopicNsOwnedByBroker(TopicName topicName) {
- try {
- return pulsar.getNamespaceService().isServiceUnitOwned(topicName);
- } catch (Exception e) {
- log.warn("Failed to check the ownership of the topic: {}, {}", topicName, e.getMessage());
- }
- return false;
+ public CompletableFuture<Boolean> isTopicNsOwnedByBrokerAsync(TopicName topicName) {
+ return pulsar.getNamespaceService().isServiceUnitOwnedAsync(topicName)
+ .handle((hasOwnership, t) -> {
+ if (t == null) {
+ return hasOwnership;
+ } else {
+ log.warn("Failed to check the ownership of the topic: {}, {}", topicName, t.getMessage());
+ return false;
+ }
+ });
}
public CompletableFuture<Void> checkTopicNsOwnership(final String topic) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
index ecc6599c..5308648 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java
@@ -159,7 +159,7 @@
, originPersistentTopic.getName(), subscription.getName(), exception);
pendingAckStoreFuture.completeExceptionally(exception);
}
- }, () -> true, null);
+ }, () -> CompletableFuture.completedFuture(true), null);
}).exceptionally(e -> {
Throwable t = FutureUtil.unwrapCompletionException(e);
log.error("[{}] [{}] Failed to get managedLedger config when init pending ack store!",
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
index faf141a..4e510a5 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/TopicPoliciesTest.java
@@ -176,7 +176,7 @@
//load the nameserver, but topic is not init.
log.info("lookup:{}",admin.lookups().lookupTopic(topic));
- assertTrue(pulsar.getBrokerService().isTopicNsOwnedByBroker(topicName));
+ assertTrue(pulsar.getBrokerService().isTopicNsOwnedByBrokerAsync(topicName).join());
assertFalse(pulsar.getBrokerService().getTopics().containsKey(topic));
//make sure namespace policy reader is fully started.
Awaitility.await().untilAsserted(()-> {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipCacheForCurrentServerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipCacheForCurrentServerTest.java
index e53d5c2..22fa2c3 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipCacheForCurrentServerTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/namespace/OwnerShipCacheForCurrentServerTest.java
@@ -76,7 +76,7 @@
int verifiedBrokerNum = 0;
for (PulsarService pulsarService : this.getPulsarServiceList()) {
BrokerService bs = pulsarService.getBrokerService();
- if (bs.isTopicNsOwnedByBroker(TopicName.get(topicName))) {
+ if (bs.isTopicNsOwnedByBrokerAsync(TopicName.get(topicName)).join()) {
continue;
}
verifiedBrokerNum ++;