Compaction threshold policies support cross multiple clusters (#13513)

* Compaction threshold policies support cross multiple clusters
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 81036d5..ced3bad 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -4224,8 +4224,8 @@
             });
     }
 
-    protected CompletableFuture<Long> internalGetCompactionThreshold(boolean applied) {
-        return getTopicPoliciesAsyncWithRetry(topicName)
+    protected CompletableFuture<Long> internalGetCompactionThreshold(boolean applied, boolean isGlobal) {
+        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
             .thenApply(op -> op.map(TopicPolicies::getCompactionThreshold)
                 .orElseGet(() -> {
                     if (applied) {
@@ -4238,27 +4238,30 @@
                 }));
     }
 
-    protected CompletableFuture<Void> internalSetCompactionThreshold(Long compactionThreshold) {
+    protected CompletableFuture<Void> internalSetCompactionThreshold(Long compactionThreshold , boolean isGlobal) {
         if (compactionThreshold != null && compactionThreshold < 0) {
             throw new RestException(Status.PRECONDITION_FAILED, "Invalid value for compactionThreshold");
         }
 
-        return getTopicPoliciesAsyncWithRetry(topicName)
+        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
             .thenCompose(op -> {
                 TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
                 topicPolicies.setCompactionThreshold(compactionThreshold);
+                topicPolicies.setIsGlobal(isGlobal);
                 return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
             });
 
     }
 
-    protected CompletableFuture<Void> internalRemoveCompactionThreshold() {
-        return getTopicPoliciesAsyncWithRetry(topicName)
+    protected CompletableFuture<Void> internalRemoveCompactionThreshold(boolean isGlobal) {
+        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
             .thenCompose(op -> {
                 if (!op.isPresent()) {
                     return CompletableFuture.completedFuture(null);
                 }
-                op.get().setCompactionThreshold(null);
+                TopicPolicies topicPolicies = op.get();
+                topicPolicies.setCompactionThreshold(null);
+                topicPolicies.setIsGlobal(isGlobal);
                 return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
             });
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index f641dae..f2b8939 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -2988,11 +2988,12 @@
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("applied") boolean applied,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenCompose(__ -> internalGetCompactionThreshold(applied))
+            .thenCompose(__ -> internalGetCompactionThreshold(applied, isGlobal))
             .thenApply(asyncResponse::resume)
             .exceptionally(ex -> {
                 handleTopicPolicyException("getCompactionThreshold", ex, asyncResponse);
@@ -3014,10 +3015,11 @@
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Dispatch rate for the specified topic") long compactionThreshold) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenCompose(__ -> internalSetCompactionThreshold(compactionThreshold))
+            .thenCompose(__ -> internalSetCompactionThreshold(compactionThreshold, isGlobal))
             .thenRun(() -> {
                 try {
                     log.info("[{}] Successfully set topic compaction threshold:"
@@ -3048,11 +3050,12 @@
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenCompose(__ -> internalRemoveCompactionThreshold())
+            .thenCompose(__ -> internalRemoveCompactionThreshold(isGlobal))
             .thenRun(() -> {
                 log.info("[{}] Successfully remove topic compaction threshold: tenant={}, namespace={}, topic={}",
                         clientAppId(),
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
index 5bccda9..39fb781 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
@@ -459,6 +459,30 @@
                 assertNull(admin3.topicPolicies(true).getMaxUnackedMessagesOnSubscription(topic)));
     }
 
+    @Test
+    public void testReplicatorCompactionThresholdPolicies() throws Exception {
+        final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
+        final String persistentTopicName = "persistent://" + namespace + "/topic" + UUID.randomUUID();
+
+        init(namespace, persistentTopicName);
+        // set compaction threshold
+        admin1.topicPolicies(true).setCompactionThreshold(persistentTopicName, 1);
+        // get compaction threshold
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(admin2.topicPolicies(true)
+                        .getCompactionThreshold(persistentTopicName), Long.valueOf(1)));
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(admin3.topicPolicies(true)
+                        .getCompactionThreshold(persistentTopicName), Long.valueOf(1)));
+
+        //remove compaction threshold
+        admin1.topicPolicies(true).removeCompactionThreshold(persistentTopicName);
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin2.topicPolicies(true).getCompactionThreshold(persistentTopicName)));
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin3.topicPolicies(true).getCompactionThreshold(persistentTopicName)));
+    }
+
     private void init(String namespace, String topic)
             throws PulsarAdminException, PulsarClientException, PulsarServerException {
         final String cluster2 = pulsar2.getConfig().getClusterName();
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 61a653a..08442b2 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -912,6 +912,13 @@
                 + " -e -t 1s -m delete_when_no_subscriptions"));
         verify(mockTopicsPolicies).setInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1"
                 , new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true));
+        cmdTopics.run(split("get-compaction-threshold persistent://myprop/clust/ns1/ds1"));
+        verify(mockTopicsPolicies).getCompactionThreshold("persistent://myprop/clust/ns1/ds1", false);
+        cmdTopics.run(split("set-compaction-threshold persistent://myprop/clust/ns1/ds1 -t 10k"));
+        verify(mockTopicsPolicies).setCompactionThreshold("persistent://myprop/clust/ns1/ds1", 10 * 1024);
+        cmdTopics.run(split("remove-compaction-threshold persistent://myprop/clust/ns1/ds1"));
+        verify(mockTopicsPolicies).removeCompactionThreshold("persistent://myprop/clust/ns1/ds1");
+
         cmdTopics.run(split("get-max-producers persistent://myprop/clust/ns1/ds1"));
         verify(mockTopicsPolicies).getMaxProducers("persistent://myprop/clust/ns1/ds1", false);
         cmdTopics.run(split("remove-max-producers persistent://myprop/clust/ns1/ds1"));
@@ -1162,6 +1169,13 @@
         cmdTopics.run(split("set-max-consumers persistent://myprop/clust/ns1/ds1 -c 99 -g"));
         verify(mockGlobalTopicsPolicies).setMaxConsumers("persistent://myprop/clust/ns1/ds1", 99);
 
+        cmdTopics.run(split("get-compaction-threshold persistent://myprop/clust/ns1/ds1 -g"));
+        verify(mockGlobalTopicsPolicies).getCompactionThreshold("persistent://myprop/clust/ns1/ds1", false);
+        cmdTopics.run(split("set-compaction-threshold persistent://myprop/clust/ns1/ds1 -t 10k -g"));
+        verify(mockGlobalTopicsPolicies).setCompactionThreshold("persistent://myprop/clust/ns1/ds1", 10 * 1024);
+        cmdTopics.run(split("remove-compaction-threshold persistent://myprop/clust/ns1/ds1 -g"));
+        verify(mockGlobalTopicsPolicies).removeCompactionThreshold("persistent://myprop/clust/ns1/ds1");
+
         cmdTopics.run(split("set-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -md -1 -bd -1 -dt 2 -g"));
         verify(mockGlobalTopicsPolicies).setSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1",
                 DispatchRate.builder()
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index 564c661..9e0b463 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -90,6 +90,10 @@
         jcommander.addCommand("set-publish-rate", new SetPublishRate());
         jcommander.addCommand("remove-publish-rate", new RemovePublishRate());
 
+        jcommander.addCommand("get-compaction-threshold", new GetCompactionThreshold());
+        jcommander.addCommand("set-compaction-threshold", new SetCompactionThreshold());
+        jcommander.addCommand("remove-compaction-threshold", new RemoveCompactionThreshold());
+
         jcommander.addCommand("get-subscribe-rate", new GetSubscribeRate());
         jcommander.addCommand("set-subscribe-rate", new SetSubscribeRate());
         jcommander.addCommand("remove-subscribe-rate", new RemoveSubscribeRate());
@@ -991,6 +995,59 @@
         }
     }
 
+    @Parameters(commandDescription = "Get compaction threshold for a topic")
+    private class GetCompactionThreshold extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
+        private boolean applied = false;
+        @Parameter(names = { "--global", "-g" }, description = "Whether to get this policy globally. "
+                + "If set to true, broker returned global topic policies")
+        private boolean isGlobal = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            print(getTopicPolicies(isGlobal).getCompactionThreshold(persistentTopic, applied));
+        }
+    }
+
+    @Parameters(commandDescription = "Set compaction threshold for a topic")
+    private class SetCompactionThreshold extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--threshold", "-t" },
+                description = "Maximum number of bytes in a topic backlog before compaction is triggered "
+                        + "(eg: 10M, 16G, 3T). 0 disables automatic compaction",
+                required = true)
+        private String thresholdStr = "0";
+        @Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. "
+                + "If set to true, the policy will be replicate to other clusters asynchronously")
+        private boolean isGlobal = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            long threshold = validateSizeString(thresholdStr);
+            getTopicPolicies(isGlobal).setCompactionThreshold(persistentTopic, threshold);
+        }
+    }
+
+    @Parameters(commandDescription = "Remove compaction threshold for a topic")
+    private class RemoveCompactionThreshold extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+        @Parameter(names = { "--global", "-g" }, description = "Whether to remove this policy globally. "
+                + "If set to true, the removing operation will be replicate to other clusters asynchronously")
+        private boolean isGlobal = false;
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            getTopicPolicies(isGlobal).removeCompactionThreshold(persistentTopic);
+        }
+    }
 
     @Parameters(commandDescription = "Get message dispatch rate for a topic")
     private class GetDispatchRate extends CliCommand {
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index a10bad1..446be60 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -306,6 +306,10 @@
             cmdUsageFormatter.addDeprecatedCommand("set-inactive-topic-policies");
             cmdUsageFormatter.addDeprecatedCommand("remove-inactive-topic-policies");
 
+            cmdUsageFormatter.addDeprecatedCommand("get-compaction-threshold");
+            cmdUsageFormatter.addDeprecatedCommand("set-compaction-threshold");
+            cmdUsageFormatter.addDeprecatedCommand("remove-compaction-threshold");
+
             cmdUsageFormatter.addDeprecatedCommand("get-dispatch-rate");
             cmdUsageFormatter.addDeprecatedCommand("set-dispatch-rate");
             cmdUsageFormatter.addDeprecatedCommand("remove-dispatch-rate");