Cancel offload tasks when managed ledger closed. (#14545)
### Motivation
When the user config the offloader, as the ledger close, it will trigger the ledger to offload. If there are many ledgers that need to offload, but the topic has been unloaded, the offloader will continue to offload. Because the offloader uses the shared executor pool in ManagedLedgerFactoryImpl and when the managed ledger closes, it doesn't cancel the tasks.
```
15:29:59.180 [pulsar-web-41-3] INFO org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - [null] Unloading topic persistent://public/default/UpdateNodeCharts
15:29:59.201 [pulsar-web-41-3] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/UpdateNodeCharts] Closing managed ledger
15:29:59.216 [main-EventThread] INFO org.apache.bookkeeper.mledger.impl.MetaStoreImpl - [public/default/persistent/UpdateNodeCharts] [cloud-nodes-service] Updating cursor info ledgerId=-1 mark-delete=789182:82011
15:29:59.219 [bookkeeper-ml-scheduler-OrderedScheduler-4-0] INFO org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/UpdateNodeCharts][cloud-nodes-service] Closed cursor at md-position=789182:82011
15:29:59.221 [bookkeeper-ml-scheduler-OrderedScheduler-4-0] INFO org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://public/default/UpdateNodeCharts] Topic closed
15:29:59.221 [bookkeeper-ml-scheduler-OrderedScheduler-4-0] INFO org.apache.pulsar.broker.admin.impl.PersistentTopicsBase - [null] Successfully unloaded topic persistent://public/default/UpdateNodeCharts
15:31:05.432 [offloader-OrderedScheduler-1-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/UpdateNodeCharts] Preparing metadata to offload ledger 422142 with uuid 030267e2-a2f9-40a3-848b-482f9b007c00
15:31:05.432 [offloader-OrderedScheduler-1-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/UpdateNodeCharts] Found previous offload attempt for ledger 422142, uuid 030267e2-a2f9-40a3-848b-482f9b007c00, cleaning up
15:31:05.432 [offloader-OrderedScheduler-1-0] INFO org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [public/default/persistent/UpdateNodeCharts] Cleanup offload for ledgerId 422142 uuid 3725b3c1-1dbc-481f-a1dd-8aaffb75e603 because of the reason Previous failed offload.
```
### Modifications
- When do `offloadLoop`, check state first. if `Close`, nothing to do.
(cherry picked from commit e0687e37e137f55c6cffa263d8ac8af9169dad92)
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index a13cf68..5334544 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -2360,13 +2360,13 @@
+ ", total size = {}, already offloaded = {}, to offload = {}",
name, toOffload.stream().map(LedgerInfo::getLedgerId).collect(Collectors.toList()),
sizeSummed, alreadyOffloadedSize, toOffloadSize);
+ offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
} else {
// offloadLoop will complete immediately with an empty list to offload
log.debug("[{}] Nothing to offload, total size = {}, already offloaded = {}, threshold = {}",
name, sizeSummed, alreadyOffloadedSize, threshold);
+ unlockingPromise.complete(PositionImpl.LATEST);
}
-
- offloadLoop(unlockingPromise, toOffload, PositionImpl.LATEST, Optional.empty());
}
}
}
@@ -2929,6 +2929,11 @@
private void offloadLoop(CompletableFuture<PositionImpl> promise, Queue<LedgerInfo> ledgersToOffload,
PositionImpl firstUnoffloaded, Optional<Throwable> firstError) {
+ if (getState() == State.Closed) {
+ promise.completeExceptionally(new ManagedLedgerAlreadyClosedException(
+ String.format("managed ledger [%s] has already closed", name)));
+ return;
+ }
LedgerInfo info = ledgersToOffload.poll();
if (info == null) {
if (firstError.isPresent()) {
diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
index 982b914..c6008c76 100644
--- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
+++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java
@@ -54,6 +54,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
@@ -121,6 +122,7 @@
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.metadata.api.Stat;
import org.awaitility.Awaitility;
+import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -3449,4 +3451,42 @@
factory.shutdown();
}
+ @Test
+ public void testOffloadTaskCancelled() throws Exception {
+ ManagedLedgerFactory factory = new ManagedLedgerFactoryImpl(metadataStore, bkc);
+ ManagedLedgerConfig config = new ManagedLedgerConfig();
+ config.setMaxEntriesPerLedger(2);
+ config.setMinimumRolloverTime(0, TimeUnit.SECONDS);
+
+ OffloadPoliciesImpl offloadPolicies = new OffloadPoliciesImpl();
+ offloadPolicies.setManagedLedgerOffloadDriver("mock");
+ offloadPolicies.setManagedLedgerOffloadThresholdInBytes(0L);
+ LedgerOffloader ledgerOffloader = Mockito.mock(LedgerOffloader.class);
+ Mockito.when(ledgerOffloader.getOffloadPolicies()).thenReturn(offloadPolicies);
+ Mockito.when(ledgerOffloader.getOffloadDriverName()).thenReturn(offloadPolicies.getManagedLedgerOffloadDriver());
+ config.setLedgerOffloader(ledgerOffloader);
+
+ CompletableFuture<ReadHandle> readHandle = new CompletableFuture<>();
+ readHandle.complete(mock(ReadHandle.class));
+
+ CompletableFuture<Void> offloadFuture = new CompletableFuture<>();
+ offloadFuture.complete(null);
+ Mockito.when(ledgerOffloader.offload(any(ReadHandle.class), any(UUID.class), any(Map.class))).thenReturn(offloadFuture);
+
+ final ManagedLedgerImpl ledgerInit = (ManagedLedgerImpl) factory.open("test-offload-task-close", config);
+ final ManagedLedgerImpl ledger = spy(ledgerInit);
+ long ledgerId = 3L;
+ doReturn(readHandle).when(ledger).getLedgerHandle(ledgerId);
+ doReturn(ManagedLedgerImpl.State.Closed).when(ledger).getState();
+ ledger.addEntry("dummy-entry-1".getBytes(Encoding));
+ ledger.addEntry("dummy-entry-2".getBytes(Encoding));
+ ledger.addEntry("dummy-entry-3".getBytes(Encoding));
+ ledger.close();
+
+ Awaitility.await().untilAsserted(() -> {
+ CompletableFuture<LedgerInfo> ledgerInfo = ledger.getLedgerInfo(ledgerId);
+ Assert.assertFalse(ledgerInfo.get(100, TimeUnit.MILLISECONDS).getOffloadContext().getComplete());
+ });
+ }
+
}