Max unacked messages per consumer support cross multiple clusters (#13547)

Max unacked messages per consumer support cross multiple clusters 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index c47c4a0..694f554 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -924,8 +924,8 @@
             });
     }
 
-    protected CompletableFuture<Integer> internalGetMaxUnackedMessagesOnConsumer(boolean applied) {
-        return getTopicPoliciesAsyncWithRetry(topicName)
+    protected CompletableFuture<Integer> internalGetMaxUnackedMessagesOnConsumer(boolean applied, boolean isGlobal) {
+        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
             .thenApply(op -> op.map(TopicPolicies::getMaxUnackedMessagesOnConsumer)
                 .orElseGet(() -> {
                     if (applied) {
@@ -936,7 +936,8 @@
                 }));
     }
 
-    protected CompletableFuture<Void> internalSetMaxUnackedMessagesOnConsumer(Integer maxUnackedNum) {
+    protected CompletableFuture<Void> internalSetMaxUnackedMessagesOnConsumer(Integer maxUnackedNum,
+                                                                              boolean isGlobal) {
         if (maxUnackedNum != null && maxUnackedNum < 0) {
             throw new RestException(Status.PRECONDITION_FAILED,
                     "maxUnackedNum must be 0 or more");
@@ -946,6 +947,7 @@
             .thenCompose(op -> {
                 TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
                 topicPolicies.setMaxUnackedMessagesOnConsumer(maxUnackedNum);
+                topicPolicies.setIsGlobal(isGlobal);
                 return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
             });
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 9c8cb25..e213278 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -349,11 +349,12 @@
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("applied") boolean applied,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenCompose(__ -> internalGetMaxUnackedMessagesOnConsumer(applied))
+            .thenCompose(__ -> internalGetMaxUnackedMessagesOnConsumer(applied, isGlobal))
             .thenApply(asyncResponse::resume).exceptionally(ex -> {
                 handleTopicPolicyException("getMaxUnackedMessagesOnConsumer", ex, asyncResponse);
                 return null;
@@ -370,13 +371,14 @@
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
             @ApiParam(value = "Max unacked messages on consumer policies for the specified topic")
                     Integer maxUnackedNum) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenCompose(__ -> internalSetMaxUnackedMessagesOnConsumer(maxUnackedNum))
+            .thenCompose(__ -> internalSetMaxUnackedMessagesOnConsumer(maxUnackedNum, isGlobal))
             .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
             .exceptionally(ex -> {
                 handleTopicPolicyException("setMaxUnackedMessagesOnConsumer", ex, asyncResponse);
@@ -393,11 +395,12 @@
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenCompose(__ -> internalSetMaxUnackedMessagesOnConsumer(null))
+            .thenCompose(__ -> internalSetMaxUnackedMessagesOnConsumer(null, isGlobal))
             .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
             .exceptionally(ex -> {
                 handleTopicPolicyException("deleteMaxUnackedMessagesOnConsumer", ex, asyncResponse);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
index 4bc7543..365fe00 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
@@ -231,6 +231,25 @@
     }
 
     @Test
+    public void testReplicateMaxUnackedMsgPerConsumer() throws Exception {
+        final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
+        final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();
+        init(namespace, topic);
+        // set max unacked msgs per consumers
+        admin1.topicPolicies(true).setMaxUnackedMessagesOnConsumer(topic, 100);
+        Awaitility.await().ignoreExceptions().untilAsserted(() ->
+                assertEquals(admin2.topicPolicies(true).getMaxUnackedMessagesOnConsumer(topic).intValue(), 100));
+        Awaitility.await().ignoreExceptions().untilAsserted(() ->
+                assertEquals(admin3.topicPolicies(true).getMaxUnackedMessagesOnConsumer(topic).intValue(), 100));
+        // remove max unacked msgs per consumers
+        admin1.topicPolicies(true).removeMaxUnackedMessagesOnConsumer(topic);
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin2.topicPolicies(true).getMaxUnackedMessagesOnConsumer(topic)));
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin3.topicPolicies(true).getMaxUnackedMessagesOnConsumer(topic)));
+    }
+
+    @Test
     public void testReplicatorTopicPolicies() throws Exception {
         final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
         final String persistentTopicName = "persistent://" + namespace + "/topic" + UUID.randomUUID();
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 7502860..f6a68e9 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -964,6 +964,14 @@
         cmdTopics.run(split("set-max-consumers persistent://myprop/clust/ns1/ds1 -c 99"));
         verify(mockTopicsPolicies).setMaxConsumers("persistent://myprop/clust/ns1/ds1", 99);
 
+        cmdTopics.run(split("remove-max-unacked-messages-per-consumer persistent://myprop/clust/ns1/ds1"));
+        verify(mockTopicsPolicies, times(1)).removeMaxUnackedMessagesOnConsumer("persistent://myprop/clust/ns1/ds1");
+        cmdTopics.run(split("get-max-unacked-messages-per-consumer persistent://myprop/clust/ns1/ds1"));
+        verify(mockTopicsPolicies, times(1))
+                .getMaxUnackedMessagesOnConsumer("persistent://myprop/clust/ns1/ds1", false);
+        cmdTopics.run(split("set-max-unacked-messages-per-consumer persistent://myprop/clust/ns1/ds1 -m 999"));
+        verify(mockTopicsPolicies, times(1)).setMaxUnackedMessagesOnConsumer("persistent://myprop/clust/ns1/ds1", 999);
+
         cmdTopics.run(split("get-message-ttl persistent://myprop/clust/ns1/ds1"));
         verify(mockTopicsPolicies).getMessageTTL("persistent://myprop/clust/ns1/ds1", false);
         cmdTopics.run(split("set-message-ttl persistent://myprop/clust/ns1/ds1 -t 10"));
@@ -1037,6 +1045,15 @@
         cmdTopics.run(split("set-max-producers persistent://myprop/clust/ns1/ds1 -p 99 -g"));
         verify(mockGlobalTopicsPolicies).setMaxProducers("persistent://myprop/clust/ns1/ds1", 99);
 
+        cmdTopics.run(split("remove-max-unacked-messages-per-consumer persistent://myprop/clust/ns1/ds1 -g"));
+        verify(mockGlobalTopicsPolicies, times(1))
+                .removeMaxUnackedMessagesOnConsumer("persistent://myprop/clust/ns1/ds1");
+        cmdTopics.run(split("get-max-unacked-messages-per-consumer persistent://myprop/clust/ns1/ds1 -g"));
+        verify(mockGlobalTopicsPolicies, times(1))
+                .getMaxUnackedMessagesOnConsumer("persistent://myprop/clust/ns1/ds1", false);
+        cmdTopics.run(split("set-max-unacked-messages-per-consumer persistent://myprop/clust/ns1/ds1 -m 999 -g"));
+        verify(mockGlobalTopicsPolicies, times(1)).setMaxUnackedMessagesOnConsumer("persistent://myprop/clust/ns1/ds1", 999);
+
         cmdTopics.run(split("get-message-ttl persistent://myprop/clust/ns1/ds1 -g"));
         verify(mockGlobalTopicsPolicies).getMessageTTL("persistent://myprop/clust/ns1/ds1", false);
         cmdTopics.run(split("set-message-ttl persistent://myprop/clust/ns1/ds1 -t 10 -g"));
@@ -1068,8 +1085,6 @@
         verify(mockGlobalTopicsPolicies).setInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1"
                 , new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true));
 
-
-
         cmdTopics.run(split("get-dispatch-rate persistent://myprop/clust/ns1/ds1 -ap -g"));
         verify(mockGlobalTopicsPolicies).getDispatchRate("persistent://myprop/clust/ns1/ds1", true);
         cmdTopics.run(split("remove-dispatch-rate persistent://myprop/clust/ns1/ds1 -g"));
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index 0003d83..aa906ab 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -50,6 +50,11 @@
         jcommander.addCommand("get-message-ttl", new GetMessageTTL());
         jcommander.addCommand("set-message-ttl", new SetMessageTTL());
         jcommander.addCommand("remove-message-ttl", new RemoveMessageTTL());
+
+        jcommander.addCommand("get-max-unacked-messages-per-consumer", new GetMaxUnackedMessagesPerConsumer());
+        jcommander.addCommand("set-max-unacked-messages-per-consumer", new SetMaxUnackedMessagesPerConsumer());
+        jcommander.addCommand("remove-max-unacked-messages-per-consumer", new RemoveMaxUnackedMessagesPerConsumer());
+
         jcommander.addCommand("get-max-consumers-per-subscription", new GetMaxConsumersPerSubscription());
         jcommander.addCommand("set-max-consumers-per-subscription", new SetMaxConsumersPerSubscription());
         jcommander.addCommand("remove-max-consumers-per-subscription", new RemoveMaxConsumersPerSubscription());
@@ -152,6 +157,60 @@
         }
     }
 
+    @Parameters(commandDescription = "Get max unacked messages policy per consumer for a topic")
+    private class GetMaxUnackedMessagesPerConsumer extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
+        private boolean applied = false;
+
+        @Parameter(names = { "--global", "-g" }, description = "Whether to get this policy globally. "
+                + "If set to true, broker returned global topic policies")
+        private boolean isGlobal = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            print(getTopicPolicies(isGlobal).getMaxUnackedMessagesOnConsumer(persistentTopic, applied));
+        }
+    }
+
+    @Parameters(commandDescription = "Remove max unacked messages policy per consumer for a topic")
+    private class RemoveMaxUnackedMessagesPerConsumer extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--global", "-g" }, description = "Whether to remove this policy globally. "
+                + "If set to true, broker returned global topic policies")
+        private boolean isGlobal = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            getTopicPolicies(isGlobal).removeMaxUnackedMessagesOnConsumer(persistentTopic);
+        }
+    }
+
+    @Parameters(commandDescription = "Set max unacked messages policy per consumer for a topic")
+    private class SetMaxUnackedMessagesPerConsumer extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = {"-m", "--maxNum"}, description = "max unacked messages num on consumer", required = true)
+        private int maxNum;
+
+        @Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. "
+                + "If set to true, broker returned global topic policies")
+        private boolean isGlobal = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            getTopicPolicies(isGlobal).setMaxUnackedMessagesOnConsumer(persistentTopic, maxNum);
+        }
+    }
+
     @Parameters(commandDescription = "Get the message TTL for a topic")
     private class GetMessageTTL extends CliCommand {
         @Parameter(description = "persistent://tenant/namespace/topic", required = true)
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 9e25332..5876165 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -258,6 +258,10 @@
             cmdUsageFormatter.addDeprecatedCommand("set-max-consumers");
             cmdUsageFormatter.addDeprecatedCommand("remove-max-consumers");
 
+            cmdUsageFormatter.addDeprecatedCommand("get-max-unacked-messages-per-consumer");
+            cmdUsageFormatter.addDeprecatedCommand("set-max-unacked-messages-per-consumer");
+            cmdUsageFormatter.addDeprecatedCommand("remove-max-unacked-messages-per-consumer");
+
             cmdUsageFormatter.addDeprecatedCommand("get-message-ttl");
             cmdUsageFormatter.addDeprecatedCommand("set-message-ttl");
             cmdUsageFormatter.addDeprecatedCommand("remove-message-ttl");