Offload policies support cross multiple clusters (#13534)

* Offload policies support cross multiple clusters
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 1becbec..5e651cf 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -805,8 +805,8 @@
             });
     }
 
-    protected CompletableFuture<OffloadPoliciesImpl> internalGetOffloadPolicies(boolean applied) {
-        return getTopicPoliciesAsyncWithRetry(topicName)
+    protected CompletableFuture<OffloadPoliciesImpl> internalGetOffloadPolicies(boolean applied, boolean isGlobal) {
+        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
             .thenApply(op -> {
                 OffloadPoliciesImpl offloadPolicies = op.map(TopicPolicies::getOffloadPolicies).orElse(null);
                 if (applied) {
@@ -819,11 +819,13 @@
             });
     }
 
-    protected CompletableFuture<Void> internalSetOffloadPolicies(OffloadPoliciesImpl offloadPolicies) {
-        return getTopicPoliciesAsyncWithRetry(topicName)
+    protected CompletableFuture<Void> internalSetOffloadPolicies
+            (OffloadPoliciesImpl offloadPolicies, boolean isGlobal) {
+        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
             .thenCompose(op -> {
                 TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
                 topicPolicies.setOffloadPolicies(offloadPolicies);
+                topicPolicies.setIsGlobal(isGlobal);
                 return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
             }).thenCompose(__ -> {
                 //The policy update is asynchronous. Cache at this step may not be updated yet.
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index aec2a6e..2bc058a 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -283,11 +283,12 @@
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("applied") boolean applied,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenCompose(__ -> internalGetOffloadPolicies(applied))
+            .thenCompose(__ -> internalGetOffloadPolicies(applied, isGlobal))
             .thenApply(asyncResponse::resume)
             .exceptionally(ex -> {
                 handleTopicPolicyException("getOffloadPolicies", ex, asyncResponse);
@@ -306,10 +307,11 @@
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Offload policies for the specified topic") OffloadPoliciesImpl offloadPolicies) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenCompose(__ -> internalSetOffloadPolicies(offloadPolicies))
+            .thenCompose(__ -> internalSetOffloadPolicies(offloadPolicies, isGlobal))
             .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
             .exceptionally(ex -> {
                 handleTopicPolicyException("setOffloadPolicies", ex, asyncResponse);
@@ -326,11 +328,12 @@
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenCompose(__ -> internalSetOffloadPolicies(null))
+            .thenCompose(__ -> internalSetOffloadPolicies(null, isGlobal))
             .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
             .exceptionally(ex -> {
                 handleTopicPolicyException("removeOffloadPolicies", ex, asyncResponse);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
index 7051bbe..637cb3c 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
@@ -37,6 +37,8 @@
 import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
 import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
+import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
+import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
@@ -551,6 +553,38 @@
                 admin -> assertNull(admin.topicPolicies(true).getMaxSubscriptionsPerTopic(persistentTopicName)));
     }
 
+    @Test
+    public void testReplicatorOffloadPolicies() throws Exception {
+        final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
+        final String persistentTopicName = "persistent://" + namespace + "/topic" + UUID.randomUUID();
+
+        init(namespace, persistentTopicName);
+        OffloadPoliciesImpl offloadPolicies =
+                OffloadPoliciesImpl.create("s3", "region", "bucket", "endpoint", null, null, null, null,
+                8, 9, 10L, null, OffloadedReadPriority.BOOKKEEPER_FIRST);
+
+        // set offload policies
+        try{
+            admin1.topicPolicies(true).setOffloadPolicies(persistentTopicName, offloadPolicies);
+        }catch (Exception exception){
+            // driver not found exception.
+            assertTrue(exception instanceof PulsarAdminException.ServerSideErrorException);
+        }
+        // get offload policies
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(admin2.topicPolicies(true).getOffloadPolicies(persistentTopicName), offloadPolicies));
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(admin3.topicPolicies(true).getOffloadPolicies(persistentTopicName), offloadPolicies));
+
+        //remove offload policies
+        admin1.topicPolicies(true).removeOffloadPolicies(persistentTopicName);
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin2.topicPolicies(true).getOffloadPolicies(persistentTopicName)));
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin3.topicPolicies(true).getOffloadPolicies(persistentTopicName)));
+    }
+
+
     private void init(String namespace, String topic)
             throws PulsarAdminException, PulsarClientException, PulsarServerException {
         final String cluster2 = pulsar2.getConfig().getClusterName();
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 92d37f7..560c5b0 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -896,6 +896,19 @@
         cmdTopics.run(split("remove-subscription-types-enabled persistent://myprop/clust/ns1/ds1"));
         verify(mockTopicsPolicies).removeSubscriptionTypesEnabled("persistent://myprop/clust/ns1/ds1");
 
+        cmdTopics.run(split("get-offload-policies persistent://myprop/clust/ns1/ds1"));
+        verify(mockTopicsPolicies).getOffloadPolicies("persistent://myprop/clust/ns1/ds1", false);
+
+        cmdTopics.run(split("remove-offload-policies persistent://myprop/clust/ns1/ds1"));
+        verify(mockTopicsPolicies).removeOffloadPolicies("persistent://myprop/clust/ns1/ds1");
+
+        cmdTopics.run(split("set-offload-policies persistent://myprop/clust/ns1/ds1 -d s3 -r" +
+                " region -b bucket -e endpoint -m 8 -rb 9 -t 10 -orp tiered-storage-first"));
+        verify(mockTopicsPolicies)
+                .setOffloadPolicies("persistent://myprop/clust/ns1/ds1",
+                        OffloadPoliciesImpl.create("s3", "region", "bucket" , "endpoint", null, null, null, null,
+                8, 9, 10L, null, OffloadedReadPriority.TIERED_STORAGE_FIRST));
+
         cmdTopics.run(split("get-retention persistent://myprop/clust/ns1/ds1"));
         verify(mockTopicsPolicies).getRetention("persistent://myprop/clust/ns1/ds1", false);
         cmdTopics.run(split("set-retention persistent://myprop/clust/ns1/ds1 -t 10m -s 20M"));
@@ -1240,6 +1253,19 @@
         verify(mockGlobalTopicsPolicies).setMaxSubscriptionsPerTopic("persistent://myprop/clust/ns1/ds1", 1024);
         cmdTopics.run(split("remove-max-subscriptions-per-topic persistent://myprop/clust/ns1/ds1 -g"));
         verify(mockGlobalTopicsPolicies).removeMaxSubscriptionsPerTopic("persistent://myprop/clust/ns1/ds1");
+
+        cmdTopics.run(split("get-offload-policies persistent://myprop/clust/ns1/ds1 -g"));
+        verify(mockGlobalTopicsPolicies).getOffloadPolicies("persistent://myprop/clust/ns1/ds1", false);
+
+        cmdTopics.run(split("remove-offload-policies persistent://myprop/clust/ns1/ds1 -g"));
+        verify(mockGlobalTopicsPolicies).removeOffloadPolicies("persistent://myprop/clust/ns1/ds1");
+
+        cmdTopics.run(split("set-offload-policies persistent://myprop/clust/ns1/ds1 -d s3 -r" +
+                " region -b bucket -e endpoint -m 8 -rb 9 -t 10 -orp tiered-storage-first -g"));
+        verify(mockGlobalTopicsPolicies)
+                .setOffloadPolicies("persistent://myprop/clust/ns1/ds1",
+                        OffloadPoliciesImpl.create("s3", "region", "bucket" , "endpoint", null, null, null, null,
+                                8, 9, 10L, null, OffloadedReadPriority.TIERED_STORAGE_FIRST));
     }
 
     @Test
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index 09dd99f..f343257 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -27,6 +27,7 @@
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.TopicPolicies;
@@ -36,6 +37,8 @@
 import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
 import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
+import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl;
+import org.apache.pulsar.common.policies.data.OffloadedReadPriority;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
@@ -117,6 +120,10 @@
         jcommander.addCommand("set-dispatch-rate", new SetDispatchRate());
         jcommander.addCommand("remove-dispatch-rate", new RemoveDispatchRate());
 
+        jcommander.addCommand("get-offload-policies", new GetOffloadPolicies());
+        jcommander.addCommand("set-offload-policies", new SetOffloadPolicies());
+        jcommander.addCommand("remove-offload-policies", new RemoveOffloadPolicies());
+
         jcommander.addCommand("get-max-unacked-messages-per-subscription", new GetMaxUnackedMessagesPerSubscription());
         jcommander.addCommand("set-max-unacked-messages-per-subscription", new SetMaxUnackedMessagesPerSubscription());
         jcommander.addCommand("remove-max-unacked-messages-per-subscription",
@@ -1484,6 +1491,139 @@
         }
     }
 
+
+    @Parameters(commandDescription = "Get the offload policies for a topic")
+    private class GetOffloadPolicies extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
+        private boolean applied = false;
+
+        @Parameter(names = { "--global", "-g" }, description = "Whether to get this policy globally. "
+                + "If set to true, broker returned global topic policies")
+        private boolean isGlobal = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            print(getTopicPolicies(isGlobal).getOffloadPolicies(persistentTopic, applied));
+        }
+    }
+
+    @Parameters(commandDescription = "Remove the offload policies for a topic")
+    private class RemoveOffloadPolicies extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--global", "-g" }, description = "Whether to remove this policy globally. "
+                + "If set to true, the removing operation will be replicate to other clusters asynchronously")
+        private boolean isGlobal = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            getTopicPolicies(isGlobal).removeOffloadPolicies(persistentTopic);
+        }
+    }
+
+    @Parameters(commandDescription = "Set the offload policies for a topic")
+    private class SetOffloadPolicies extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = {"-d", "--driver"}, description = "ManagedLedger offload driver", required = true)
+        private String driver;
+
+        @Parameter(names = {"-r", "--region"}
+                , description = "ManagedLedger offload region, s3 and google-cloud-storage requires this parameter")
+        private String region;
+
+        @Parameter(names = {"-b", "--bucket"}
+                , description = "ManagedLedger offload bucket, s3 and google-cloud-storage requires this parameter")
+        private String bucket;
+
+        @Parameter(names = {"-e", "--endpoint"}
+                , description = "ManagedLedger offload service endpoint, only s3 requires this parameter")
+        private String endpoint;
+
+        @Parameter(names = {"-i", "--aws-id"}
+                , description = "AWS Credential Id to use when using driver S3 or aws-s3")
+        private String awsId;
+
+        @Parameter(names = {"-s", "--aws-secret"}
+                , description = "AWS Credential Secret to use when using driver S3 or aws-s3")
+        private String awsSecret;
+
+        @Parameter(names = {"--ro", "--s3-role"}
+                , description = "S3 Role used for STSAssumeRoleSessionCredentialsProvider")
+        private String s3Role;
+
+        @Parameter(names = {"--s3-role-session-name", "-rsn"}
+                , description = "S3 role session name used for STSAssumeRoleSessionCredentialsProvider")
+        private String s3RoleSessionName;
+
+        @Parameter(names = {"-m", "--maxBlockSizeInBytes"},
+                description = "ManagedLedger offload max block Size in bytes,"
+                        + "s3 and google-cloud-storage requires this parameter")
+        private int maxBlockSizeInBytes;
+
+        @Parameter(names = {"-rb", "--readBufferSizeInBytes"},
+                description = "ManagedLedger offload read buffer size in bytes,"
+                        + "s3 and google-cloud-storage requires this parameter")
+        private int readBufferSizeInBytes;
+
+        @Parameter(names = {"-t", "--offloadThresholdInBytes"}
+                , description = "ManagedLedger offload threshold in bytes", required = true)
+        private long offloadThresholdInBytes;
+
+        @Parameter(names = {"-dl", "--offloadDeletionLagInMillis"}
+                , description = "ManagedLedger offload deletion lag in bytes")
+        private Long offloadDeletionLagInMillis;
+
+        @Parameter(
+                names = {"--offloadedReadPriority", "-orp"},
+                description = "Read priority for offloaded messages. By default, once messages are offloaded to"
+                        + " long-term storage, brokers read messages from long-term storage, but messages can still"
+                        + " exist in BookKeeper for a period depends on your configuration. For messages that exist"
+                        + " in both long-term storage and BookKeeper, you can set where to read messages from with"
+                        + " the option `tiered-storage-first` or `bookkeeper-first`.",
+                required = false
+        )
+        private String offloadReadPriorityStr;
+
+        @Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. "
+                + "If set to true, the policy will be replicate to other clusters asynchronously")
+        private boolean isGlobal = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+
+            OffloadedReadPriority offloadedReadPriority = OffloadPoliciesImpl.DEFAULT_OFFLOADED_READ_PRIORITY;
+
+            if (this.offloadReadPriorityStr != null) {
+                try {
+                    offloadedReadPriority = OffloadedReadPriority.fromString(this.offloadReadPriorityStr);
+                } catch (Exception e) {
+                    throw new ParameterException("--offloadedReadPriority parameter must be one of "
+                            + Arrays.stream(OffloadedReadPriority.values())
+                            .map(OffloadedReadPriority::toString)
+                            .collect(Collectors.joining(","))
+                            + " but got: " + this.offloadReadPriorityStr, e);
+                }
+            }
+
+            OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.create(driver, region, bucket, endpoint,
+                    s3Role, s3RoleSessionName,
+                    awsId, awsSecret,
+                    maxBlockSizeInBytes,
+                    readBufferSizeInBytes, offloadThresholdInBytes, offloadDeletionLagInMillis, offloadedReadPriority);
+
+            getTopicPolicies(isGlobal).setOffloadPolicies(persistentTopic, offloadPolicies);
+        }
+    }
+
     private TopicPolicies getTopicPolicies(boolean isGlobal) {
         return getAdmin().topicPolicies(isGlobal);
     }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 572c0be..e5127f9 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -347,6 +347,10 @@
             cmdUsageFormatter.addDeprecatedCommand("get-max-subscriptions-per-topic");
             cmdUsageFormatter.addDeprecatedCommand("set-max-subscriptions-per-topic");
             cmdUsageFormatter.addDeprecatedCommand("remove-max-subscriptions-per-topic");
+
+            cmdUsageFormatter.addDeprecatedCommand("get-offload-policies");
+            cmdUsageFormatter.addDeprecatedCommand("set-offload-policies");
+            cmdUsageFormatter.addDeprecatedCommand("remove-offload-policies");
         }
     }