maxSubscriptionsPerTopic support cross multiple clusters (#13623)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 160ca3e..1becbec 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -3107,21 +3107,23 @@
 
     }
 
-    protected CompletableFuture<Optional<Integer>> internalGetMaxSubscriptionsPerTopic() {
-        return getTopicPoliciesAsyncWithRetry(topicName)
+    protected CompletableFuture<Optional<Integer>> internalGetMaxSubscriptionsPerTopic(boolean isGlobal) {
+        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
                 .thenApply(op -> op.map(TopicPolicies::getMaxSubscriptionsPerTopic));
     }
 
-    protected CompletableFuture<Void> internalSetMaxSubscriptionsPerTopic(Integer maxSubscriptionsPerTopic) {
+    protected CompletableFuture<Void> internalSetMaxSubscriptionsPerTopic(Integer maxSubscriptionsPerTopic,
+                                                                          boolean isGlobal) {
         if (maxSubscriptionsPerTopic != null && maxSubscriptionsPerTopic < 0) {
             throw new RestException(Status.PRECONDITION_FAILED,
                     "maxSubscriptionsPerTopic must be 0 or more");
         }
 
-        return getTopicPoliciesAsyncWithRetry(topicName)
+        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
             .thenCompose(op -> {
                 TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
                 topicPolicies.setMaxSubscriptionsPerTopic(maxSubscriptionsPerTopic);
+                topicPolicies.setIsGlobal(isGlobal);
                 return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
             });
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 876aa39..aec2a6e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -2152,11 +2152,12 @@
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenCompose(__ -> internalGetMaxSubscriptionsPerTopic())
+            .thenCompose(__ -> internalGetMaxSubscriptionsPerTopic(isGlobal))
             .thenAccept(op -> asyncResponse.resume(op.isPresent() ? op.get()
                     : Response.noContent().build()))
             .exceptionally(ex -> {
@@ -2178,16 +2179,17 @@
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "The max subscriptions of the topic") int maxSubscriptionsPerTopic) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenCompose(__ -> internalSetMaxSubscriptionsPerTopic(maxSubscriptionsPerTopic))
+            .thenCompose(__ -> internalSetMaxSubscriptionsPerTopic(maxSubscriptionsPerTopic, isGlobal))
             .thenRun(() -> {
                 log.info("[{}] Successfully updated maxSubscriptionsPerTopic: namespace={}, topic={}"
-                                + ", maxSubscriptions={}"
-                        , clientAppId(), namespaceName, topicName.getLocalName(), maxSubscriptionsPerTopic);
+                                + ", maxSubscriptions={}, isGlobal={}"
+                        , clientAppId(), namespaceName, topicName.getLocalName(), maxSubscriptionsPerTopic, isGlobal);
                 asyncResponse.resume(Response.noContent().build());
             })
             .exceptionally(ex -> {
@@ -2208,11 +2210,12 @@
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenCompose(__ -> internalSetMaxSubscriptionsPerTopic(null))
+            .thenCompose(__ -> internalSetMaxSubscriptionsPerTopic(null, isGlobal))
             .thenRun(() -> {
                 log.info("[{}] Successfully remove maxSubscriptionsPerTopic: namespace={}, topic={}",
                         clientAppId(), namespaceName, topicName.getLocalName());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
index fc095df..7051bbe 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
@@ -531,6 +531,26 @@
                 assertNull(admin3.topicPolicies(true).getCompactionThreshold(persistentTopicName)));
     }
 
+    @Test
+    public void testReplicateMaxSubscriptionsPerTopic() throws Exception {
+        final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
+        final String persistentTopicName = "persistent://" + namespace + "/topic" + UUID.randomUUID();
+        init(namespace, persistentTopicName);
+
+        //set max subscriptions per topic
+        admin1.topicPolicies(true).setMaxSubscriptionsPerTopic(persistentTopicName, 1024);
+
+        //get max subscriptions per topic
+        untilRemoteClustersAsserted(
+                admin -> assertEquals(admin.topicPolicies(true).getMaxSubscriptionsPerTopic(persistentTopicName),
+                        Integer.valueOf(1024)));
+
+        //remove
+        admin1.topicPolicies(true).removeMaxSubscriptionsPerTopic(persistentTopicName);
+        untilRemoteClustersAsserted(
+                admin -> assertNull(admin.topicPolicies(true).getMaxSubscriptionsPerTopic(persistentTopicName)));
+    }
+
     private void init(String namespace, String topic)
             throws PulsarAdminException, PulsarClientException, PulsarServerException {
         final String cluster2 = pulsar2.getConfig().getClusterName();
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index cdfc5ef..92d37f7 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -1041,6 +1041,14 @@
         cmdTopics.run(split("remove-deduplication persistent://myprop/clust/ns1/ds1"));
         verify(mockTopicsPolicies).removeDeduplicationStatus("persistent://myprop/clust/ns1/ds1");
 
+        cmdTopics.run(split("get-max-subscriptions-per-topic persistent://myprop/clust/ns1/ds1"));
+        verify(mockTopicsPolicies).getMaxSubscriptionsPerTopic("persistent://myprop/clust/ns1/ds1");
+        cmdTopics.run(split("set-max-subscriptions-per-topic persistent://myprop/clust/ns1/ds1 -s 1024"));
+        verify(mockTopicsPolicies).setMaxSubscriptionsPerTopic("persistent://myprop/clust/ns1/ds1", 1024);
+        cmdTopics.run(split("remove-max-subscriptions-per-topic persistent://myprop/clust/ns1/ds1"));
+        verify(mockTopicsPolicies).removeMaxSubscriptionsPerTopic("persistent://myprop/clust/ns1/ds1");
+
+
         // Reset the cmd, and check global option
         cmdTopics = new CmdTopicPolicies(() -> admin);
         cmdTopics.run(split("get-retention persistent://myprop/clust/ns1/ds1 -g"));
@@ -1225,6 +1233,13 @@
         verify(mockGlobalTopicsPolicies).getSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1", false);
         cmdTopics.run(split("remove-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -g"));
         verify(mockGlobalTopicsPolicies).removeSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1");
+
+        cmdTopics.run(split("get-max-subscriptions-per-topic persistent://myprop/clust/ns1/ds1 -g"));
+        verify(mockGlobalTopicsPolicies).getMaxSubscriptionsPerTopic("persistent://myprop/clust/ns1/ds1");
+        cmdTopics.run(split("set-max-subscriptions-per-topic persistent://myprop/clust/ns1/ds1 -s 1024 -g"));
+        verify(mockGlobalTopicsPolicies).setMaxSubscriptionsPerTopic("persistent://myprop/clust/ns1/ds1", 1024);
+        cmdTopics.run(split("remove-max-subscriptions-per-topic persistent://myprop/clust/ns1/ds1 -g"));
+        verify(mockGlobalTopicsPolicies).removeMaxSubscriptionsPerTopic("persistent://myprop/clust/ns1/ds1");
     }
 
     @Test
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index 6d4b5f5..09dd99f 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -126,6 +126,10 @@
         jcommander.addCommand("set-inactive-topic-policies", new SetInactiveTopicPolicies());
         jcommander.addCommand("remove-inactive-topic-policies", new RemoveInactiveTopicPolicies());
 
+        jcommander.addCommand("get-max-subscriptions-per-topic", new GetMaxSubscriptionsPerTopic());
+        jcommander.addCommand("set-max-subscriptions-per-topic", new SetMaxSubscriptionsPerTopic());
+        jcommander.addCommand("remove-max-subscriptions-per-topic", new RemoveMaxSubscriptionsPerTopic());
+
     }
 
     @Parameters(commandDescription = "Get max consumers per subscription for a topic")
@@ -1427,6 +1431,59 @@
 
     }
 
+    @Parameters(commandDescription = "Get max subscriptions for a topic")
+    private class GetMaxSubscriptionsPerTopic extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = {"--global", "-g"}, description = "Whether to get this policy globally. "
+                + "If set to true, broker returned global topic policies")
+        private boolean isGlobal = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            print(getTopicPolicies(isGlobal).getMaxSubscriptionsPerTopic(persistentTopic));
+        }
+    }
+
+    @Parameters(commandDescription = "Set max subscriptions for a topic")
+    private class SetMaxSubscriptionsPerTopic extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = {"--max-subscriptions-per-topic",
+                "-s"}, description = "max subscriptions for a topic (default -1 will be overwrite if not passed)",
+                required = true)
+        private int maxSubscriptionPerTopic;
+
+        @Parameter(names = {"--global", "-g"}, description = "Whether to set this policy globally. "
+                + "If set to true, the policy will be replicate to other clusters asynchronously")
+        private boolean isGlobal = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            getTopicPolicies(isGlobal).setMaxSubscriptionsPerTopic(persistentTopic, maxSubscriptionPerTopic);
+        }
+    }
+
+    @Parameters(commandDescription = "Remove max subscriptions for a topic")
+    private class RemoveMaxSubscriptionsPerTopic extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = {"--global", "-g"}, description = "Whether to remove this policy globally. "
+                + "If set to true, the policy will be replicate to other clusters asynchronously")
+        private boolean isGlobal = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            getTopicPolicies(isGlobal).removeMaxSubscriptionsPerTopic(persistentTopic);
+        }
+    }
+
     private TopicPolicies getTopicPolicies(boolean isGlobal) {
         return getAdmin().topicPolicies(isGlobal);
     }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 7883936..572c0be 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -343,6 +343,10 @@
             cmdUsageFormatter.addDeprecatedCommand("get-subscription-dispatch-rate");
             cmdUsageFormatter.addDeprecatedCommand("set-subscription-dispatch-rate");
             cmdUsageFormatter.addDeprecatedCommand("remove-subscription-dispatch-rate");
+
+            cmdUsageFormatter.addDeprecatedCommand("get-max-subscriptions-per-topic");
+            cmdUsageFormatter.addDeprecatedCommand("set-max-subscriptions-per-topic");
+            cmdUsageFormatter.addDeprecatedCommand("remove-max-subscriptions-per-topic");
         }
     }