[improve] [broker] Trigger offload on topic load (#22652)

diff --git a/conf/broker.conf b/conf/broker.conf
index 1ef68a0..2a9641b 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -1211,6 +1211,9 @@
 # (default is -1, which is disabled)
 managedLedgerOffloadThresholdInSeconds=-1
 
+# Trigger offload on topic load or not. Default is false.
+# triggerOffloadOnTopicLoad=false
+
 # Max number of entries to append to a cursor ledger
 managedLedgerCursorMaxEntriesPerLedger=50000
 
diff --git a/conf/standalone.conf b/conf/standalone.conf
index a8615b7..7c6aeb6 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -835,6 +835,9 @@
 # Whether trace managed ledger task execution time
 managedLedgerTraceTaskExecution=true
 
+# Trigger offload on topic load or not. Default is false.
+# triggerOffloadOnTopicLoad=false
+
 # If you want to custom bookie ID or use a dynamic network address for the bookie,
 # you can set this option.
 # Bookie advertises itself using bookieId rather than
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
index 0c93a5b..fb2c6de 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
@@ -85,6 +85,7 @@
     private int minimumBacklogCursorsForCaching = 0;
     private int minimumBacklogEntriesForCaching = 1000;
     private int maxBacklogBetweenCursorsForCaching = 1000;
+    private boolean triggerOffloadOnTopicLoad = false;
 
     @Getter
     @Setter
@@ -748,6 +749,22 @@
         this.maxBacklogBetweenCursorsForCaching = maxBacklogBetweenCursorsForCaching;
     }
 
+    /**
+     * Trigger offload on topic load.
+     * @return
+     */
+    public boolean isTriggerOffloadOnTopicLoad() {
+        return triggerOffloadOnTopicLoad;
+    }
+
+    /**
+     * Set trigger offload on topic load.
+     * @param triggerOffloadOnTopicLoad
+     */
+    public void setTriggerOffloadOnTopicLoad(boolean triggerOffloadOnTopicLoad) {
+        this.triggerOffloadOnTopicLoad = triggerOffloadOnTopicLoad;
+    }
+
     public String getShadowSource() {
         return MapUtils.getString(properties, PROPERTY_SOURCE_TOPIC_KEY);
     }
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
index 5ce84b3..d867f2f 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerFactoryImpl.java
@@ -20,6 +20,7 @@
 
 import static com.google.common.base.Preconditions.checkArgument;
 import static org.apache.bookkeeper.mledger.ManagedLedgerException.getManagedLedgerException;
+import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.NULL_OFFLOAD_PROMISE;
 import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
 import com.google.common.base.Predicates;
 import com.google.common.collect.Maps;
@@ -395,6 +396,10 @@
 
                     // May need to update the cursor position
                     newledger.maybeUpdateCursorBeforeTrimmingConsumedLedger();
+                    // May need to trigger offloading
+                    if (config.isTriggerOffloadOnTopicLoad()) {
+                        newledger.maybeOffloadInBackground(NULL_OFFLOAD_PROMISE);
+                    }
                 }
 
                 @Override
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index b342669..681441b 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -212,7 +212,7 @@
     private final CallbackMutex trimmerMutex = new CallbackMutex();
 
     private final CallbackMutex offloadMutex = new CallbackMutex();
-    private static final CompletableFuture<PositionImpl> NULL_OFFLOAD_PROMISE = CompletableFuture
+    public static final CompletableFuture<PositionImpl> NULL_OFFLOAD_PROMISE = CompletableFuture
             .completedFuture(PositionImpl.LATEST);
     protected volatile LedgerHandle currentLedger;
     protected volatile long currentLedgerEntries = 0;
@@ -2469,7 +2469,7 @@
                 100, TimeUnit.MILLISECONDS);
     }
 
-    private void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
+    public void maybeOffloadInBackground(CompletableFuture<PositionImpl> promise) {
         if (config.getLedgerOffloader() == null || config.getLedgerOffloader() == NullLedgerOffloader.INSTANCE
                 || config.getLedgerOffloader().getOffloadPolicies() == null) {
             return;
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 9efe185..6f03bef 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -2090,11 +2090,16 @@
     )
     private long managedLedgerOffloadAutoTriggerSizeThresholdBytes = -1L;
     @FieldContext(
-            category = CATEGORY_STORAGE_OFFLOADING,
-            doc = "The threshold to triggering automatic offload to long term storage"
+        category = CATEGORY_STORAGE_OFFLOADING,
+        doc = "The threshold to triggering automatic offload to long term storage"
     )
     private long managedLedgerOffloadThresholdInSeconds = -1L;
     @FieldContext(
+        category = CATEGORY_STORAGE_OFFLOADING,
+        doc = "Trigger offload on topic load or not. Default is false"
+    )
+    private boolean triggerOffloadOnTopicLoad = false;
+    @FieldContext(
         category = CATEGORY_STORAGE_ML,
         doc = "Max number of entries to append to a cursor ledger"
     )
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index 6603e24..9a08578 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -1986,6 +1986,7 @@
                             .setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies));
                 }
             }
+            managedLedgerConfig.setTriggerOffloadOnTopicLoad(serviceConfig.isTriggerOffloadOnTopicLoad());
 
             managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled(
                     serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled());