Delayed delivery policy support cross multiple clusters (#13550)

Delayed delivery policy support cross multiple clusters
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 2cb784e..81036d5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -502,10 +502,12 @@
         });
     }
 
-    protected CompletableFuture<Void> internalSetDelayedDeliveryPolicies(DelayedDeliveryPolicies deliveryPolicies) {
-        return getTopicPoliciesAsyncWithRetry(topicName)
+    protected CompletableFuture<Void> internalSetDelayedDeliveryPolicies(DelayedDeliveryPolicies deliveryPolicies,
+                                                                         boolean isGlobal) {
+        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
             .thenCompose(op -> {
                 TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                topicPolicies.setIsGlobal(isGlobal);
                 topicPolicies.setDelayedDeliveryEnabled(deliveryPolicies == null ? null : deliveryPolicies.isActive());
                 topicPolicies.setDelayedDeliveryTickTimeMillis(
                         deliveryPolicies == null ? null : deliveryPolicies.getTickTime());
@@ -778,8 +780,9 @@
         }
     }
 
-    protected CompletableFuture<DelayedDeliveryPolicies> internalGetDelayedDeliveryPolicies(boolean applied) {
-        return getTopicPoliciesAsyncWithRetry(topicName)
+    protected CompletableFuture<DelayedDeliveryPolicies> internalGetDelayedDeliveryPolicies(boolean applied,
+                                                                                            boolean isGlobal) {
+        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
             .thenApply(op -> {
                 TopicPolicies policies = op.orElseGet(TopicPolicies::new);
                 DelayedDeliveryPolicies delayedDeliveryPolicies = null;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 72ef853..f641dae 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -632,12 +632,13 @@
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @QueryParam("applied") boolean applied,
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenCompose(__ -> internalGetDelayedDeliveryPolicies(applied))
+            .thenCompose(__ -> internalGetDelayedDeliveryPolicies(applied, isGlobal))
             .thenApply(asyncResponse::resume)
             .exceptionally(ex -> {
                 handleTopicPolicyException("getDelayedDeliveryPolicies", ex, asyncResponse);
@@ -655,6 +656,7 @@
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "Delayed delivery policies for the specified topic")
@@ -663,7 +665,7 @@
         validatePoliciesReadOnlyAccess();
         validateTopicPolicyOperation(topicName, PolicyName.DELAYED_DELIVERY, PolicyOperation.WRITE);
         preValidation(authoritative)
-            .thenCompose(__ -> internalSetDelayedDeliveryPolicies(deliveryPolicies))
+            .thenCompose(__ -> internalSetDelayedDeliveryPolicies(deliveryPolicies, isGlobal))
             .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
             .exceptionally(ex -> {
                 handleTopicPolicyException("setDelayedDeliveryPolicies", ex, asyncResponse);
@@ -682,13 +684,14 @@
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         validatePoliciesReadOnlyAccess();
         validateTopicPolicyOperation(topicName, PolicyName.DELAYED_DELIVERY, PolicyOperation.WRITE);
         preValidation(authoritative)
-            .thenCompose(__ -> internalSetDelayedDeliveryPolicies(null))
+            .thenCompose(__ -> internalSetDelayedDeliveryPolicies(null, isGlobal))
             .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
             .exceptionally(ex -> {
                 handleTopicPolicyException("deleteDelayedDeliveryPolicies", ex, asyncResponse);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
index 4a21ed4..5bccda9 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
@@ -34,6 +34,7 @@
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.naming.TopicName;
 import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
 import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
@@ -367,6 +368,26 @@
     }
 
     @Test
+    public void testReplicateDelayedDelivery() throws Exception {
+        final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
+        final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();
+        init(namespace, topic);
+        DelayedDeliveryPolicies policies = DelayedDeliveryPolicies.builder().active(true).tickTime(10000L).build();
+        // set delayed delivery
+        admin1.topicPolicies(true).setDelayedDeliveryPolicy(topic, policies);
+        Awaitility.await().ignoreExceptions().untilAsserted(() ->
+                assertEquals(admin2.topicPolicies(true).getDelayedDeliveryPolicy(topic), policies));
+        Awaitility.await().ignoreExceptions().untilAsserted(() ->
+                assertEquals(admin3.topicPolicies(true).getDelayedDeliveryPolicy(topic), policies));
+        // remove delayed delivery
+        admin1.topicPolicies(true).removeDelayedDeliveryPolicy(topic);
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin2.topicPolicies(true).getDelayedDeliveryPolicy(topic)));
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin3.topicPolicies(true).getDelayedDeliveryPolicy(topic)));
+    }
+
+    @Test
     public void testReplicatorInactiveTopicPolicies() throws Exception {
         final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
         final String persistentTopicName = "persistent://" + namespace + "/topic" + UUID.randomUUID();
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index d8dd0dc..61a653a 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -1000,6 +1000,14 @@
         cmdTopics.run(split("set-max-unacked-messages-per-subscription persistent://myprop/clust/ns1/ds1 -m 99"));
         verify(mockTopicsPolicies, times(1)).setMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1", 99);
 
+        cmdTopics.run(split("get-delayed-delivery persistent://myprop/clust/ns1/ds1"));
+        verify(mockTopicsPolicies).getDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1", false);
+        cmdTopics.run(split("set-delayed-delivery persistent://myprop/clust/ns1/ds1 -t 10s --enable"));
+        verify(mockTopicsPolicies).setDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1",
+                DelayedDeliveryPolicies.builder().tickTime(10000).active(true).build());
+        cmdTopics.run(split("remove-delayed-delivery persistent://myprop/clust/ns1/ds1"));
+        verify(mockTopicsPolicies).removeDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1") ;
+
         cmdTopics.run(split("get-deduplication persistent://myprop/clust/ns1/ds1"));
         verify(mockTopicsPolicies).getDeduplicationStatus("persistent://myprop/clust/ns1/ds1");
         cmdTopics.run(split("set-deduplication persistent://myprop/clust/ns1/ds1 --disable"));
@@ -1117,6 +1125,14 @@
         cmdTopics.run(split("remove-subscribe-rate persistent://myprop/clust/ns1/ds1 -g"));
         verify(mockGlobalTopicsPolicies).removeSubscribeRate("persistent://myprop/clust/ns1/ds1");
 
+        cmdTopics.run(split("get-delayed-delivery persistent://myprop/clust/ns1/ds1 -g"));
+        verify(mockGlobalTopicsPolicies).getDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1", false);
+        cmdTopics.run(split("set-delayed-delivery persistent://myprop/clust/ns1/ds1 -t 10s --enable -g"));
+        verify(mockGlobalTopicsPolicies).setDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1",
+                DelayedDeliveryPolicies.builder().tickTime(10000).active(true).build());
+        cmdTopics.run(split("remove-delayed-delivery persistent://myprop/clust/ns1/ds1 -g"));
+        verify(mockGlobalTopicsPolicies).removeDelayedDeliveryPolicy("persistent://myprop/clust/ns1/ds1") ;
+
         cmdTopics.run(split("get-deduplication persistent://myprop/clust/ns1/ds1 -g"));
         verify(mockGlobalTopicsPolicies).getDeduplicationStatus("persistent://myprop/clust/ns1/ds1");
         cmdTopics.run(split("set-deduplication persistent://myprop/clust/ns1/ds1 --disable -g"));
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index fc93b8d..564c661 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -31,6 +31,7 @@
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.TopicPolicies;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
+import org.apache.pulsar.common.policies.data.DelayedDeliveryPolicies;
 import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.client.api.SubscriptionType;
@@ -97,6 +98,10 @@
         jcommander.addCommand("set-max-consumers", new SetMaxConsumers());
         jcommander.addCommand("remove-max-consumers", new RemoveMaxConsumers());
 
+        jcommander.addCommand("get-delayed-delivery", new GetDelayedDelivery());
+        jcommander.addCommand("set-delayed-delivery", new SetDelayedDelivery());
+        jcommander.addCommand("remove-delayed-delivery", new RemoveDelayedDelivery());
+
         jcommander.addCommand("get-dispatch-rate", new GetDispatchRate());
         jcommander.addCommand("set-dispatch-rate", new SetDispatchRate());
         jcommander.addCommand("remove-dispatch-rate", new RemoveDispatchRate());
@@ -561,6 +566,82 @@
         }
     }
 
+    @Parameters(commandDescription = "Get the delayed delivery policy for a topic")
+    private class GetDelayedDelivery extends CliCommand {
+        @Parameter(description = "tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
+        private boolean applied = false;
+
+        @Parameter(names = { "--global", "-g" }, description = "Whether to get this policy globally. "
+                + "If set to true, the policy will be replicate to other clusters asynchronously")
+        private boolean isGlobal;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String topicName = validateTopicName(params);
+            print(getTopicPolicies(isGlobal).getDelayedDeliveryPolicy(topicName, applied));
+        }
+    }
+
+    @Parameters(commandDescription = "Set the delayed delivery policy on a topic")
+    private class SetDelayedDelivery extends CliCommand {
+        @Parameter(description = "tenant/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--enable", "-e" }, description = "Enable delayed delivery messages")
+        private boolean enable = false;
+
+        @Parameter(names = { "--disable", "-d" }, description = "Disable delayed delivery messages")
+        private boolean disable = false;
+
+        @Parameter(names = { "--time", "-t" }, description = "The tick time for when retrying on delayed delivery messages, " +
+                "affecting the accuracy of the delivery time compared to the scheduled time. (eg: 1s, 10s, 1m, 5h, 3d)")
+        private String delayedDeliveryTimeStr = "1s";
+
+        @Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. "
+                + "If set to true, the policy will be replicate to other clusters asynchronously")
+        private boolean isGlobal;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String topicName = validateTopicName(params);
+            long delayedDeliveryTimeInMills;
+            try {
+                delayedDeliveryTimeInMills = TimeUnit.SECONDS.toMillis(
+                        RelativeTimeUtil.parseRelativeTimeInSeconds(delayedDeliveryTimeStr));
+            } catch (IllegalArgumentException exception) {
+                throw new ParameterException(exception.getMessage());
+            }
+
+            if (enable == disable) {
+                throw new ParameterException("Need to specify either --enable or --disable");
+            }
+
+            getTopicPolicies(isGlobal).setDelayedDeliveryPolicy(topicName, DelayedDeliveryPolicies.builder()
+                    .tickTime(delayedDeliveryTimeInMills)
+                    .active(enable)
+                    .build());
+        }
+    }
+
+    @Parameters(commandDescription = "Remove the delayed delivery policy on a topic")
+    private class RemoveDelayedDelivery extends CliCommand {
+        @Parameter(description = "tenant/namespace", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--global", "-g" }, description = "Whether to remove this policy globally. "
+                + "If set to true, the policy will be replicate to other clusters asynchronously")
+        private boolean isGlobal;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String topicName = validateTopicName(params);
+            getTopicPolicies(isGlobal).removeDelayedDeliveryPolicy(topicName);
+        }
+    }
+
     @Parameters(commandDescription = "Remove max number of producers for a topic")
     private class RemoveMaxProducers extends CliCommand {
         @Parameter(description = "persistent://tenant/namespace/topic", required = true)
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 79e3c2c..a10bad1 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -322,6 +322,10 @@
             cmdUsageFormatter.addDeprecatedCommand("get-subscription-types-enabled");
             cmdUsageFormatter.addDeprecatedCommand("remove-subscription-types-enabled");
 
+            cmdUsageFormatter.addDeprecatedCommand("get-delayed-delivery");
+            cmdUsageFormatter.addDeprecatedCommand("set-delayed-delivery");
+            cmdUsageFormatter.addDeprecatedCommand("remove-delayed-delivery");
+
             cmdUsageFormatter.addDeprecatedCommand("get-max-producers");
             cmdUsageFormatter.addDeprecatedCommand("set-max-producers");
             cmdUsageFormatter.addDeprecatedCommand("remove-max-producers");