[improve] [broker] Trigger offload on topic load (#22652)
diff --git a/conf/broker.conf b/conf/broker.conf
index 1ef68a0..2a9641b 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1211,6 +1211,9 @@
# (default is -1, which is disabled)
managedLedgerOffloadThresholdInSeconds=-1
+# Trigger offload on topic load or not. Default is false.
+# triggerOffloadOnTopicLoad=false
+
# Max number of entries to append to a cursor ledger
managedLedgerCursorMaxEntriesPerLedger=50000
diff --git a/conf/standalone.conf b/conf/standalone.conf
index a8615b7..7c6aeb6 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -835,6 +835,9 @@
# Whether trace managed ledger task execution time
managedLedgerTraceTaskExecution=true
+# Trigger offload on topic load or not. Default is false.
+# triggerOffloadOnTopicLoad=false
+
# If you want to custom bookie ID or use a dynamic network address for the bookie,
# you can set this option.
# Bookie advertises itself using bookieId rather than
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index 0c93a5b..fb2c6de 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -85,6 +85,7 @@
private int minimumBacklogCursorsForCaching = 0;
private int minimumBacklogEntriesForCaching = 1000;
private int maxBacklogBetweenCursorsForCaching = 1000;
+ private boolean triggerOffloadOnTopicLoad = false;
@Getter
@Setter
@@ -748,6 +749,22 @@
this.maxBacklogBetweenCursorsForCaching = maxBacklogBetweenCursorsForCaching;
}
+ /**
+ * Trigger offload on topic load.
+ * @return
+ */
+ public boolean isTriggerOffloadOnTopicLoad() {
+ return triggerOffloadOnTopicLoad;
+ }
+
+ /**
+ * Set trigger offload on topic load.
+ * @param triggerOffloadOnTopicLoad
+ */
+ public void setTriggerOffloadOnTopicLoad(boolean triggerOffloadOnTopicLoad) {
+ this.triggerOffloadOnTopicLoad = triggerOffloadOnTopicLoad;
+ }
+
public String getShadowSource() {
return MapUtils.getString(properties, PROPERTY_SOURCE_TOPIC_KEY);
}
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 5ce84b3..d867f2f 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -20,6 +20,7 @@
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException;
+import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.NULL_OFFLOAD_PROMISE;
import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import com.google.common.base.Predicates;
import com.google.common.collect.Maps;
@@ -395,6 +396,10 @@
// May need to update the cursor position
newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger();
+ // May need to trigger offloading
+ if (config.isTriggerOffloadOnTopicLoad()) {
+ newledger.maybeOffloadInBackground(NULL_OFFLOAD_PROMISE);
+ }
}
@Override
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index b342669..681441b 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -212,7 +212,7 @@
private final CallbackMutex trimmerMutex = new CallbackMutex();
private final CallbackMutex offloadMutex = new CallbackMutex();
- private static final CompletableFuture<PositionImpl> NULL_OFFLOAD_PROMISE = CompletableFuture
+ public static final CompletableFuture<PositionImpl> NULL_OFFLOAD_PROMISE = CompletableFuture
.completedFuture(PositionImpl.LATEST);
protected volatile LedgerHandle currentLedger;
protected volatile long currentLedgerEntries = 0;
@@ -2469,7 +2469,7 @@
100, TimeUnit.MILLISECONDS);
}
- private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
+ public void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
|| config.getLedgerOffloader().getOffloadPolicies() == null) {
return;
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 9efe185..6f03bef 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2090,11 +2090,16 @@
)
private long managedLedgerOffloadAutoTriggerSizeThresholdBytes = -1L;
@FieldContext(
- category = CATEGORY_STORAGE_OFFLOADING,
- doc = "The threshold to triggering automatic offload to long term storage"
+ category = CATEGORY_STORAGE_OFFLOADING,
+ doc = "The threshold to triggering automatic offload to long term storage"
)
private long managedLedgerOffloadThresholdInSeconds = -1L;
@FieldContext(
+ category = CATEGORY_STORAGE_OFFLOADING,
+ doc = "Trigger offload on topic load or not. Default is false"
+ )
+ private boolean triggerOffloadOnTopicLoad = false;
+ @FieldContext(
category = CATEGORY_STORAGE_ML,
doc = "Max number of entries to append to a cursor ledger"
)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 6603e24..9a08578 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1986,6 +1986,7 @@
.setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies));
}
}
+ managedLedgerConfig.setTriggerOffloadOnTopicLoad(serviceConfig.isTriggerOffloadOnTopicLoad());
managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(
serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());