Publish rate support cross multiple clusters (#13496)

Publish rate support cross multiple clusters (#13496)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 449794b..28b8321 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -4251,19 +4251,20 @@
             });
     }
 
-    protected CompletableFuture<Optional<PublishRate>> internalGetPublishRate() {
-        return getTopicPoliciesAsyncWithRetry(topicName)
+    protected CompletableFuture<Optional<PublishRate>> internalGetPublishRate(boolean isGlobal) {
+        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
             .thenApply(op -> op.map(TopicPolicies::getPublishRate));
     }
 
-    protected CompletableFuture<Void> internalSetPublishRate(PublishRate publishRate) {
+    protected CompletableFuture<Void> internalSetPublishRate(PublishRate publishRate, boolean isGlobal) {
         if (publishRate == null) {
             return CompletableFuture.completedFuture(null);
         }
-        return getTopicPoliciesAsyncWithRetry(topicName)
+        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
             .thenCompose(op -> {
                 TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
                 topicPolicies.setPublishRate(publishRate);
+                topicPolicies.setIsGlobal(isGlobal);
                 return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
             });
     }
@@ -4298,13 +4299,14 @@
                 });
     }
 
-    protected CompletableFuture<Void> internalRemovePublishRate() {
-        return getTopicPoliciesAsyncWithRetry(topicName)
+    protected CompletableFuture<Void> internalRemovePublishRate(boolean isGlobal) {
+        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
             .thenCompose(op -> {
                 if (!op.isPresent()) {
                     return CompletableFuture.completedFuture(null);
                 }
                 op.get().setPublishRate(null);
+                op.get().setIsGlobal(isGlobal);
                 return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
             });
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index f031841..fd8a3a4 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -3164,11 +3164,12 @@
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenCompose(__ -> internalGetPublishRate())
+            .thenCompose(__ -> internalGetPublishRate(isGlobal))
             .thenAccept(op -> asyncResponse.resume(op.isPresent() ? op.get()
                     : Response.noContent().build()))
             .exceptionally(ex -> {
@@ -3189,20 +3190,22 @@
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "Dispatch rate for the specified topic") PublishRate publishRate) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenCompose(__ -> internalSetPublishRate(publishRate))
+            .thenCompose(__ -> internalSetPublishRate(publishRate, isGlobal))
             .thenRun(() -> {
                 try {
                     log.info("[{}] Successfully set topic publish rate:"
-                                    + " tenant={}, namespace={}, topic={}, publishRate={}",
+                                    + " tenant={}, namespace={}, topic={}, isGlobal={}, publishRate={}",
                             clientAppId(),
                             tenant,
                             namespace,
                             topicName.getLocalName(),
+                            isGlobal,
                             jsonMapper().writeValueAsString(publishRate));
                 } catch (JsonProcessingException ignore) {}
                 asyncResponse.resume(Response.noContent().build());
@@ -3225,17 +3228,19 @@
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenCompose(__ -> internalRemovePublishRate())
+            .thenCompose(__ -> internalRemovePublishRate(isGlobal))
             .thenRun(() -> {
-                log.info("[{}] Successfully remove topic publish rate: tenant={}, namespace={}, topic={}",
+                log.info("[{}] Successfully remove topic publish rate: tenant={}, namespace={}, topic={}, isGlobal={}",
                         clientAppId(),
                         tenant,
                         namespace,
-                        topicName.getLocalName());
+                        topicName.getLocalName(),
+                        isGlobal);
                 asyncResponse.resume(Response.noContent().build());
             })
             .exceptionally(ex -> {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
index 5e5d5f1..1a6abbd 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
@@ -28,6 +28,7 @@
 import java.util.Set;
 import java.util.UUID;
 import org.apache.pulsar.broker.PulsarServerException;
+import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
@@ -36,6 +37,7 @@
 import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
+import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
 import org.awaitility.Awaitility;
@@ -113,6 +115,34 @@
     }
 
     @Test
+    public void testReplicatePublishRatePolicies() throws Exception {
+        final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
+        final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();
+        init(namespace, topic);
+        // set global topic policy
+        PublishRate publishRate = new PublishRate(100, 10000);
+        admin1.topicPolicies(true).setPublishRate(topic, publishRate);
+
+        // get global topic policy
+        untilRemoteClustersAsserted(
+                admin -> assertEquals(admin.topicPolicies(true).getPublishRate(topic), publishRate));
+
+        // remove global topic policy
+        admin1.topicPolicies(true).removePublishRate(topic);
+        untilRemoteClustersAsserted(admin -> assertNull(admin.topicPolicies(true).getPublishRate(topic)));
+    }
+
+    private void untilRemoteClustersAsserted(ThrowingConsumer<PulsarAdmin> condition) {
+        Awaitility.await().untilAsserted(() -> condition.apply(admin2));
+        Awaitility.await().untilAsserted(() -> condition.apply(admin3));
+    }
+
+    private interface ThrowingConsumer<I> {
+        void apply(I input) throws Throwable;
+    }
+
+
+    @Test
     public void testReplicatePersistentPolicies() throws Exception {
         final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
         final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index c4b4fcc..a27147a 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -27,17 +27,14 @@
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNull;
-
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Sets;
-
 import java.io.File;
 import java.lang.reflect.Field;
 import java.net.URL;
 import java.net.URLClassLoader;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashMap;
@@ -45,7 +42,6 @@
 import java.util.Properties;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
-
 import org.apache.pulsar.admin.cli.utils.SchemaExtractor;
 import org.apache.pulsar.client.admin.Bookies;
 import org.apache.pulsar.client.admin.BrokerStats;
@@ -942,6 +938,13 @@
         cmdTopics.run(split("remove-persistence persistent://myprop/clust/ns1/ds1"));
         verify(mockTopicsPolicies).removePersistence("persistent://myprop/clust/ns1/ds1");
 
+        cmdTopics.run(split("get-publish-rate persistent://myprop/clust/ns1/ds1"));
+        verify(mockTopicsPolicies).getPublishRate("persistent://myprop/clust/ns1/ds1");
+        cmdTopics.run(split("set-publish-rate persistent://myprop/clust/ns1/ds1 -m 10 -b 100"));
+        verify(mockTopicsPolicies).setPublishRate("persistent://myprop/clust/ns1/ds1", new PublishRate(10, 100));
+        cmdTopics.run(split("remove-publish-rate persistent://myprop/clust/ns1/ds1"));
+        verify(mockTopicsPolicies).removePublishRate("persistent://myprop/clust/ns1/ds1");
+
         cmdTopics.run(split("get-max-consumers persistent://myprop/clust/ns1/ds1"));
         verify(mockTopicsPolicies).getMaxConsumers("persistent://myprop/clust/ns1/ds1", false);
         cmdTopics.run(split("remove-max-consumers persistent://myprop/clust/ns1/ds1"));
@@ -1016,11 +1019,11 @@
         verify(mockGlobalTopicsPolicies).setMaxProducers("persistent://myprop/clust/ns1/ds1", 99);
 
         cmdTopics.run(split("get-message-ttl persistent://myprop/clust/ns1/ds1 -g"));
-        verify(mockTopicsPolicies).getMessageTTL("persistent://myprop/clust/ns1/ds1", false);
+        verify(mockGlobalTopicsPolicies).getMessageTTL("persistent://myprop/clust/ns1/ds1", false);
         cmdTopics.run(split("set-message-ttl persistent://myprop/clust/ns1/ds1 -t 10 -g"));
-        verify(mockTopicsPolicies).setMessageTTL("persistent://myprop/clust/ns1/ds1", 10);
+        verify(mockGlobalTopicsPolicies).setMessageTTL("persistent://myprop/clust/ns1/ds1", 10);
         cmdTopics.run(split("remove-message-ttl persistent://myprop/clust/ns1/ds1 -g"));
-        verify(mockTopicsPolicies).removeMessageTTL("persistent://myprop/clust/ns1/ds1");
+        verify(mockGlobalTopicsPolicies).removeMessageTTL("persistent://myprop/clust/ns1/ds1");
 
         cmdTopics.run(split("get-persistence persistent://myprop/clust/ns1/ds1 -g"));
         verify(mockGlobalTopicsPolicies).getPersistence("persistent://myprop/clust/ns1/ds1");
@@ -1052,6 +1055,13 @@
                 .ratePeriodInSecond(2)
                 .build());
 
+        cmdTopics.run(split("get-publish-rate persistent://myprop/clust/ns1/ds1 -g"));
+        verify(mockGlobalTopicsPolicies).getPublishRate("persistent://myprop/clust/ns1/ds1");
+        cmdTopics.run(split("set-publish-rate persistent://myprop/clust/ns1/ds1 -m 10 -b 100 -g"));
+        verify(mockGlobalTopicsPolicies).setPublishRate("persistent://myprop/clust/ns1/ds1", new PublishRate(10, 100));
+        cmdTopics.run(split("remove-publish-rate persistent://myprop/clust/ns1/ds1 -g"));
+        verify(mockGlobalTopicsPolicies).removePublishRate("persistent://myprop/clust/ns1/ds1");
+
         cmdTopics.run(split("get-deduplication persistent://myprop/clust/ns1/ds1 -g"));
         verify(mockGlobalTopicsPolicies).getDeduplicationStatus("persistent://myprop/clust/ns1/ds1");
         cmdTopics.run(split("set-deduplication persistent://myprop/clust/ns1/ds1 --disable -g"));
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index bcd7001..dd7a79f 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -37,6 +37,7 @@
 import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
 import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
+import org.apache.pulsar.common.policies.data.PublishRate;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.util.RelativeTimeUtil;
 
@@ -74,6 +75,10 @@
         jcommander.addCommand("set-persistence", new SetPersistence());
         jcommander.addCommand("remove-persistence", new RemovePersistence());
 
+        jcommander.addCommand("get-publish-rate", new GetPublishRate());
+        jcommander.addCommand("set-publish-rate", new SetPublishRate());
+        jcommander.addCommand("remove-publish-rate", new RemovePublishRate());
+
         jcommander.addCommand("get-max-consumers", new GetMaxConsumers());
         jcommander.addCommand("set-max-consumers", new SetMaxConsumers());
         jcommander.addCommand("remove-max-consumers", new RemoveMaxConsumers());
@@ -601,6 +606,61 @@
         }
     }
 
+    @Parameters(commandDescription = "Get publish rate for a topic")
+    private class GetPublishRate extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = {"--global", "-g"}, description = "Whether to get this policy globally. "
+                + "If set to true, broker returns global topic policies")
+        private boolean isGlobal = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            print(getTopicPolicies(isGlobal).getPublishRate(persistentTopic));
+        }
+    }
+
+    @Parameters(commandDescription = "Set publish rate for a topic")
+    private class SetPublishRate extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = {"--msg-publish-rate", "-m"}, description = "message-publish-rate (default -1 will be "
+                + "overwrite if not passed)", required = false)
+        private int msgPublishRate = -1;
+
+        @Parameter(names = {"--byte-publish-rate", "-b"}, description = "byte-publish-rate "
+                + "(default -1 will be overwrite if not passed)", required = false)
+        private long bytePublishRate = -1;
+
+        @Parameter(names = {"--global", "-g"}, description = "Whether to set this policy globally.")
+        private boolean isGlobal = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            getTopicPolicies(isGlobal).setPublishRate(persistentTopic,
+                    new PublishRate(msgPublishRate, bytePublishRate));
+        }
+    }
+
+    @Parameters(commandDescription = "Remove publish rate for a topic")
+    private class RemovePublishRate extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = {"--global", "-g"}, description = "Whether to remove this policy globally. ")
+        private boolean isGlobal = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            getTopicPolicies(isGlobal).removePublishRate(persistentTopic);
+        }
+    }
+
     @Parameters(commandDescription = "Get the persistence policies for a topic")
     private class GetPersistence extends CliCommand {
         @Parameter(description = "persistent://tenant/namespace/topic", required = true)
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 99ab485..69b976c 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -274,6 +274,10 @@
             cmdUsageFormatter.addDeprecatedCommand("remove-max-unacked-messages-on-subscription");
             cmdUsageFormatter.addDeprecatedCommand("set-max-unacked-messages-on-subscription");
 
+            cmdUsageFormatter.addDeprecatedCommand("get-publish-rate");
+            cmdUsageFormatter.addDeprecatedCommand("set-publish-rate");
+            cmdUsageFormatter.addDeprecatedCommand("remove-publish-rate");
+
             cmdUsageFormatter.addDeprecatedCommand("get-maxProducers");
             cmdUsageFormatter.addDeprecatedCommand("set-maxProducers");
             cmdUsageFormatter.addDeprecatedCommand("remove-maxProducers");