Subscription dispatch rate support cross multiple clusters (#13511)

* Subscription dispatch rate support cross multiple clusters
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 2bce311..c47c4a0 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -4147,8 +4147,8 @@
             });
     }
 
-    protected CompletableFuture<DispatchRate> internalGetSubscriptionDispatchRate(boolean applied) {
-        return getTopicPoliciesAsyncWithRetry(topicName)
+    protected CompletableFuture<DispatchRate> internalGetSubscriptionDispatchRate(boolean applied, boolean isGlobal) {
+        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
             .thenApply(op -> op.map(TopicPolicies::getSubscriptionDispatchRate)
                 .orElseGet(() -> {
                     if (applied) {
@@ -4160,25 +4160,29 @@
                 }));
     }
 
-    protected CompletableFuture<Void> internalSetSubscriptionDispatchRate(DispatchRateImpl dispatchRate) {
+    protected CompletableFuture<Void> internalSetSubscriptionDispatchRate
+            (DispatchRateImpl dispatchRate, boolean isGlobal) {
         if (dispatchRate == null) {
             return CompletableFuture.completedFuture(null);
         }
-        return getTopicPoliciesAsyncWithRetry(topicName)
+        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
             .thenCompose(op -> {
                 TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
                 topicPolicies.setSubscriptionDispatchRate(dispatchRate);
+                topicPolicies.setIsGlobal(isGlobal);
                 return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
             });
     }
 
-    protected CompletableFuture<Void> internalRemoveSubscriptionDispatchRate() {
-        return getTopicPoliciesAsyncWithRetry(topicName)
+    protected CompletableFuture<Void> internalRemoveSubscriptionDispatchRate(boolean isGlobal) {
+        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
             .thenCompose(op -> {
                 if (!op.isPresent()) {
                     return CompletableFuture.completedFuture(null);
                 }
-                op.get().setSubscriptionDispatchRate(null);
+                TopicPolicies topicPolicies = op.get();
+                topicPolicies.setSubscriptionDispatchRate(null);
+                topicPolicies.setIsGlobal(isGlobal);
                 return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
             });
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 938da35..9c8cb25 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -2885,11 +2885,12 @@
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("applied") boolean applied,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenCompose(__ -> internalGetSubscriptionDispatchRate(applied))
+            .thenCompose(__ -> internalGetSubscriptionDispatchRate(applied, isGlobal))
             .thenApply(asyncResponse::resume)
             .exceptionally(ex -> {
                 handleTopicPolicyException("getSubscriptionDispatchRate", ex, asyncResponse);
@@ -2912,11 +2913,12 @@
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Subscription message dispatch rate for the specified topic")
                     DispatchRateImpl dispatchRate) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenCompose(__ -> internalSetSubscriptionDispatchRate(dispatchRate))
+            .thenCompose(__ -> internalSetSubscriptionDispatchRate(dispatchRate, isGlobal))
             .thenRun(() -> {
                 try {
                     log.info("[{}] Successfully set topic subscription dispatch rate:"
@@ -2947,11 +2949,12 @@
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenCompose(__ -> internalRemoveSubscriptionDispatchRate())
+            .thenCompose(__ -> internalRemoveSubscriptionDispatchRate(isGlobal))
             .thenRun(() -> {
                 log.info("[{}] Successfully remove topic subscription dispatch rate: tenant={}, namespace={}, topic={}",
                         clientAppId(),
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
index 9737c46..4bc7543 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
@@ -353,6 +353,35 @@
     }
 
     @Test
+    public void testReplicatorSubscriptionDispatchRatePolicies() throws Exception {
+        final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
+        final String persistentTopicName = "persistent://" + namespace + "/topic" + UUID.randomUUID();
+
+        init(namespace, persistentTopicName);
+        // set subscription dispatch rate
+        DispatchRate dispatchRate = DispatchRate.builder()
+                .dispatchThrottlingRateInMsg(1)
+                .ratePeriodInSecond(1)
+                .dispatchThrottlingRateInByte(1)
+                .relativeToPublishRate(true)
+                .build();
+        admin1.topicPolicies(true).setSubscriptionDispatchRate(persistentTopicName, dispatchRate);
+        // get subscription dispatch rate
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(admin2.topicPolicies(true)
+                        .getSubscriptionDispatchRate(persistentTopicName), dispatchRate));
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(admin3.topicPolicies(true)
+                        .getSubscriptionDispatchRate(persistentTopicName), dispatchRate));
+
+        //remove subscription dispatch rate
+        admin1.topicPolicies(true).removeSubscriptionDispatchRate(persistentTopicName);
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin2.topicPolicies(true).getSubscriptionDispatchRate(persistentTopicName)));
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin3.topicPolicies(true).getSubscriptionDispatchRate(persistentTopicName)));
+    }
+    @Test
     public void testReplicateMaxUnackedMsgPerSub() throws Exception {
         final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
         final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 812b27e..7502860 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -930,6 +930,18 @@
                 .ratePeriodInSecond(2)
                 .build());
 
+        cmdTopics.run(split("set-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -md -1 -bd -1 -dt 2"));
+        verify(mockTopicsPolicies).setSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1",
+                DispatchRate.builder()
+                        .dispatchThrottlingRateInMsg(-1)
+                        .dispatchThrottlingRateInByte(-1)
+                        .ratePeriodInSecond(2)
+                        .build());
+        cmdTopics.run(split("get-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1"));
+        verify(mockTopicsPolicies).getSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1", false);
+        cmdTopics.run(split("remove-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1"));
+        verify(mockTopicsPolicies).removeSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1");
+
         cmdTopics.run(split("get-persistence persistent://myprop/clust/ns1/ds1"));
         verify(mockTopicsPolicies).getPersistence("persistent://myprop/clust/ns1/ds1");
         cmdTopics.run(split("set-persistence persistent://myprop/clust/ns1/ds1 -e 2 -w 1 -a 1 -r 100.0"));
@@ -1104,6 +1116,18 @@
         verify(mockGlobalTopicsPolicies).removeMaxConsumers("persistent://myprop/clust/ns1/ds1");
         cmdTopics.run(split("set-max-consumers persistent://myprop/clust/ns1/ds1 -c 99 -g"));
         verify(mockGlobalTopicsPolicies).setMaxConsumers("persistent://myprop/clust/ns1/ds1", 99);
+
+        cmdTopics.run(split("set-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -md -1 -bd -1 -dt 2 -g"));
+        verify(mockGlobalTopicsPolicies).setSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1",
+                DispatchRate.builder()
+                        .dispatchThrottlingRateInMsg(-1)
+                        .dispatchThrottlingRateInByte(-1)
+                        .ratePeriodInSecond(2)
+                        .build());
+        cmdTopics.run(split("get-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -g"));
+        verify(mockGlobalTopicsPolicies).getSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1", false);
+        cmdTopics.run(split("remove-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -g"));
+        verify(mockGlobalTopicsPolicies).removeSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1");
     }
 
     @Test
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index bcecb59..0003d83 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -32,6 +32,7 @@
 import org.apache.pulsar.client.admin.TopicPolicies;
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
 import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
@@ -74,6 +75,10 @@
         jcommander.addCommand("set-persistence", new SetPersistence());
         jcommander.addCommand("remove-persistence", new RemovePersistence());
 
+        jcommander.addCommand("get-subscription-dispatch-rate", new GetSubscriptionDispatchRate());
+        jcommander.addCommand("set-subscription-dispatch-rate", new SetSubscriptionDispatchRate());
+        jcommander.addCommand("remove-subscription-dispatch-rate", new RemoveSubscriptionDispatchRate());
+      
         jcommander.addCommand("get-publish-rate", new GetPublishRate());
         jcommander.addCommand("set-publish-rate", new SetPublishRate());
         jcommander.addCommand("remove-publish-rate", new RemovePublishRate());
@@ -938,6 +943,80 @@
         }
     }
 
+    @Parameters(commandDescription = "Get subscription message-dispatch-rate for a topic")
+    private class GetSubscriptionDispatchRate extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
+        private boolean applied = false;
+
+        @Parameter(names = { "--global", "-g" }, description = "Whether to get this policy globally. "
+                + "If set to true, broker returned global topic policies")
+        private boolean isGlobal = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            print(getTopicPolicies(isGlobal).getSubscriptionDispatchRate(persistentTopic, applied));
+        }
+    }
+
+    @Parameters(commandDescription = "Set subscription message-dispatch-rate for a topic")
+    private class SetSubscriptionDispatchRate extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--msg-dispatch-rate",
+                "-md" }, description = "message-dispatch-rate (default -1 will be overwrite if not passed)")
+        private int msgDispatchRate = -1;
+
+        @Parameter(names = { "--byte-dispatch-rate",
+                "-bd" }, description = "byte-dispatch-rate (default -1 will be overwrite if not passed)")
+        private long byteDispatchRate = -1;
+
+        @Parameter(names = { "--dispatch-rate-period",
+                "-dt" }, description = "dispatch-rate-period in second type (default 1 second will be overwrite if not passed)")
+        private int dispatchRatePeriodSec = 1;
+
+        @Parameter(names = { "--relative-to-publish-rate",
+                "-rp" }, description = "dispatch rate relative to publish-rate (if publish-relative flag is enabled then broker will apply throttling value to (publish-rate + dispatch rate))")
+        private boolean relativeToPublishRate = false;
+
+        @Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. "
+                + "If set to true, the policy will be replicate to other clusters asynchronously")
+        private boolean isGlobal = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            getTopicPolicies(isGlobal).setSubscriptionDispatchRate(persistentTopic,
+                    DispatchRate.builder()
+                            .dispatchThrottlingRateInMsg(msgDispatchRate)
+                            .dispatchThrottlingRateInByte(byteDispatchRate)
+                            .ratePeriodInSecond(dispatchRatePeriodSec)
+                            .relativeToPublishRate(relativeToPublishRate)
+                            .build());
+        }
+    }
+
+    @Parameters(commandDescription = "Remove subscription message-dispatch-rate for a topic")
+    private class RemoveSubscriptionDispatchRate extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. "
+                + "If set to true, the policy will be replicate to other clusters asynchronously")
+        private boolean isGlobal = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            getTopicPolicies(isGlobal).removeSubscriptionDispatchRate(persistentTopic);
+        }
+
+    }
+
     private TopicPolicies getTopicPolicies(boolean isGlobal) {
         return getAdmin().topicPolicies(isGlobal);
     }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index dcbdc9b..9e25332 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -317,6 +317,10 @@
             cmdUsageFormatter.addDeprecatedCommand("get-max-producers");
             cmdUsageFormatter.addDeprecatedCommand("set-max-producers");
             cmdUsageFormatter.addDeprecatedCommand("remove-max-producers");
+
+            cmdUsageFormatter.addDeprecatedCommand("get-subscription-dispatch-rate");
+            cmdUsageFormatter.addDeprecatedCommand("set-subscription-dispatch-rate");
+            cmdUsageFormatter.addDeprecatedCommand("remove-subscription-dispatch-rate");
         }
     }