DeduplicationSnapshotIntervalSeconds support cross multiple clusters (#13578)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index db3c255..c009a16 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -970,14 +970,15 @@
             });
     }
 
-    protected CompletableFuture<Void> internalSetDeduplicationSnapshotInterval(Integer interval) {
+    protected CompletableFuture<Void> internalSetDeduplicationSnapshotInterval(Integer interval, boolean isGlobal) {
         if (interval != null && interval < 0) {
             throw new RestException(Status.PRECONDITION_FAILED, "interval must be 0 or more");
         }
-        return getTopicPoliciesAsyncWithRetry(topicName)
+        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
             .thenCompose(op -> {
                 TopicPolicies policies = op.orElseGet(TopicPolicies::new);
                 policies.setDeduplicationSnapshotIntervalSeconds(interval);
+                policies.setIsGlobal(isGlobal);
                 return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, policies);
             });
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index d096d65..db08474 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -421,11 +421,12 @@
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
+            .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName, isGlobal))
             .thenAccept(op -> {
                 TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
                 asyncResponse.resume(topicPolicies.getDeduplicationSnapshotIntervalSeconds());
@@ -448,11 +449,12 @@
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "Interval to take deduplication snapshot for the specified topic")
                     Integer interval,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenCompose(__ -> internalSetDeduplicationSnapshotInterval(interval))
+            .thenCompose(__ -> internalSetDeduplicationSnapshotInterval(interval, isGlobal))
             .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
             .exceptionally(ex -> {
                 handleTopicPolicyException("setDeduplicationSnapshotInterval", ex, asyncResponse);
@@ -469,11 +471,12 @@
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenCompose(__ -> internalSetDeduplicationSnapshotInterval(null))
+            .thenCompose(__ -> internalSetDeduplicationSnapshotInterval(null, isGlobal))
             .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
             .exceptionally(ex -> {
                 handleTopicPolicyException("deleteDeduplicationSnapshotInterval", ex, asyncResponse);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
index 637cb3c..29f0b8e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
@@ -171,6 +171,25 @@
         untilRemoteClustersAsserted(admin -> assertNull(admin.topicPolicies(true).getPublishRate(topic)));
     }
 
+    @Test
+    public void testReplicateDeduplicationSnapshotIntervalPolicies() throws Exception {
+        final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
+        final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();
+        init(namespace, topic);
+        // set global topic policy
+        admin1.topicPolicies(true).setDeduplicationSnapshotInterval(topic, 100);
+
+        // get global topic policy
+        untilRemoteClustersAsserted(
+                admin -> assertEquals(admin.topicPolicies(true).getDeduplicationSnapshotInterval(topic),
+                        Integer.valueOf(100)));
+
+        // remove global topic policy
+        admin1.topicPolicies(true).removeDeduplicationSnapshotInterval(topic);
+        untilRemoteClustersAsserted(
+                admin -> assertNull(admin.topicPolicies(true).getDeduplicationSnapshotInterval(topic)));
+    }
+
     private void untilRemoteClustersAsserted(ThrowingConsumer<PulsarAdmin> condition) {
         Awaitility.await().untilAsserted(() -> condition.apply(admin2));
         Awaitility.await().untilAsserted(() -> condition.apply(admin3));
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/DeprecatedCommanderTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/DeprecatedCommanderTest.java
index 0a5505b..53d78dd 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/DeprecatedCommanderTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/DeprecatedCommanderTest.java
@@ -19,6 +19,12 @@
 package org.apache.pulsar.admin.cli;
 
 
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
 import com.beust.jcommander.DefaultUsageFormatter;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.Schemas;
@@ -27,13 +33,6 @@
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotEquals;
-import static org.testng.Assert.assertTrue;
-
 public class DeprecatedCommanderTest {
     PulsarAdmin admin;
     Topics mockTopics;
@@ -67,7 +66,7 @@
         assertTrue(defaultOutput.contains("enable-deduplication"));
         assertFalse(outputWithFiltered.contains("get-max-unacked-messages-on-consumer"));
         assertTrue(defaultOutput.contains("get-max-unacked-messages-on-consumer"));
-        assertTrue(outputWithFiltered.contains("get-deduplication"));
+        assertFalse(outputWithFiltered.contains("get-deduplication"));
         assertTrue(defaultOutput.contains("get-deduplication"));
 
         // annotation was changed to hidden, reset it.
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 05636f5..569671d 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -1062,6 +1062,12 @@
         cmdTopics.run(split("remove-max-subscriptions-per-topic persistent://myprop/clust/ns1/ds1"));
         verify(mockTopicsPolicies).removeMaxSubscriptionsPerTopic("persistent://myprop/clust/ns1/ds1");
 
+        cmdTopics.run(split("get-deduplication-snapshot-interval persistent://myprop/clust/ns1/ds1"));
+        verify(mockTopicsPolicies).getDeduplicationSnapshotInterval("persistent://myprop/clust/ns1/ds1");
+        cmdTopics.run(split("set-deduplication-snapshot-interval persistent://myprop/clust/ns1/ds1 -i 100"));
+        verify(mockTopicsPolicies).setDeduplicationSnapshotInterval("persistent://myprop/clust/ns1/ds1", 100);
+        cmdTopics.run(split("remove-deduplication-snapshot-interval persistent://myprop/clust/ns1/ds1"));
+        verify(mockTopicsPolicies).removeDeduplicationSnapshotInterval("persistent://myprop/clust/ns1/ds1");
 
         // Reset the cmd, and check global option
         cmdTopics = new CmdTopicPolicies(() -> admin);
@@ -1195,6 +1201,13 @@
         cmdTopics.run(split("remove-deduplication persistent://myprop/clust/ns1/ds1 -g"));
         verify(mockGlobalTopicsPolicies).removeDeduplicationStatus("persistent://myprop/clust/ns1/ds1");
 
+        cmdTopics.run(split("get-deduplication-snapshot-interval persistent://myprop/clust/ns1/ds1 -g"));
+        verify(mockGlobalTopicsPolicies).getDeduplicationSnapshotInterval("persistent://myprop/clust/ns1/ds1");
+        cmdTopics.run(split("set-deduplication-snapshot-interval persistent://myprop/clust/ns1/ds1 -i 100 -g"));
+        verify(mockGlobalTopicsPolicies).setDeduplicationSnapshotInterval("persistent://myprop/clust/ns1/ds1", 100);
+        cmdTopics.run(split("remove-deduplication-snapshot-interval persistent://myprop/clust/ns1/ds1 -g"));
+        verify(mockGlobalTopicsPolicies).removeDeduplicationSnapshotInterval("persistent://myprop/clust/ns1/ds1");
+
         cmdTopics.run(split("get-max-consumers-per-subscription persistent://myprop/clust/ns1/ds1 -g"));
         verify(mockGlobalTopicsPolicies).getMaxConsumersPerSubscription("persistent://myprop/clust/ns1/ds1");
         cmdTopics.run(split("set-max-consumers-per-subscription persistent://myprop/clust/ns1/ds1 -c 5 -g"));
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index f343257..00ea7fa 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -84,6 +84,10 @@
         jcommander.addCommand("get-deduplication", new GetDeduplicationStatus());
         jcommander.addCommand("remove-deduplication", new RemoveDeduplicationStatus());
 
+        jcommander.addCommand("get-deduplication-snapshot-interval", new GetDeduplicationSnapshotInterval());
+        jcommander.addCommand("set-deduplication-snapshot-interval", new SetDeduplicationSnapshotInterval());
+        jcommander.addCommand("remove-deduplication-snapshot-interval", new RemoveDeduplicationSnapshotInterval());
+
         jcommander.addCommand("get-persistence", new GetPersistence());
         jcommander.addCommand("set-persistence", new SetPersistence());
         jcommander.addCommand("remove-persistence", new RemovePersistence());
@@ -791,6 +795,62 @@
         }
     }
 
+    @Parameters(commandDescription = "Get deduplication snapshot interval for a topic")
+    private class GetDeduplicationSnapshotInterval extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = {"--global", "-g"}, description = "Whether to get this policy globally. "
+                + "If set to true, broker returns global topic policies")
+        private boolean isGlobal = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            print(getTopicPolicies(isGlobal).getDeduplicationSnapshotInterval(persistentTopic));
+        }
+    }
+
+    @Parameters(commandDescription = "Set deduplication snapshot interval for a topic")
+    private class SetDeduplicationSnapshotInterval extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = {"-i", "--interval"}, description =
+                "Deduplication snapshot interval for topic in second, allowed range from 0 to Integer.MAX_VALUE",
+                required = true)
+        private int interval;
+
+        @Parameter(names = {"--global", "-g"}, description = "Whether to set this policy globally.")
+        private boolean isGlobal = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            if (interval < 0) {
+                throw new ParameterException(String.format("Invalid interval '%d'. ", interval));
+            }
+
+            String persistentTopic = validatePersistentTopic(params);
+            getTopicPolicies(isGlobal).setDeduplicationSnapshotInterval(persistentTopic, interval);
+        }
+    }
+
+    @Parameters(commandDescription = "Remove deduplication snapshot interval for a topic")
+    private class RemoveDeduplicationSnapshotInterval extends CliCommand {
+
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = {"--global", "-g"}, description = "Whether to remove this policy globally. ")
+        private boolean isGlobal = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            getTopicPolicies(isGlobal).removeDeduplicationSnapshotInterval(persistentTopic);
+        }
+    }
+
     @Parameters(commandDescription = "Get the backlog quota policies for a topic")
     private class GetBacklogQuotaMap extends CliCommand {
         @Parameter(description = "persistent://tenant/namespace/topic", required = true)
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index e5127f9..742b96a 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -320,6 +320,10 @@
             cmdUsageFormatter.addDeprecatedCommand("set-deduplication");
             cmdUsageFormatter.addDeprecatedCommand("remove-deduplication");
 
+            cmdUsageFormatter.addDeprecatedCommand("get-deduplication-snapshot-interval");
+            cmdUsageFormatter.addDeprecatedCommand("set-deduplication-snapshot-interval");
+            cmdUsageFormatter.addDeprecatedCommand("remove-deduplication-snapshot-interval");
+
             cmdUsageFormatter.addDeprecatedCommand("get-max-unacked-messages-on-subscription");
             cmdUsageFormatter.addDeprecatedCommand("set-max-unacked-messages-on-subscription");
             cmdUsageFormatter.addDeprecatedCommand("remove-max-unacked-messages-on-subscription");