Inactive topic policies support cross multiple clusters (#13509)

* inactiveTopicPolicies support cross multiple clusters
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 980ecd5..449794b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -838,8 +838,9 @@
             });
     }
 
-    protected CompletableFuture<InactiveTopicPolicies> internalGetInactiveTopicPolicies(boolean applied) {
-        return getTopicPoliciesAsyncWithRetry(topicName)
+    protected CompletableFuture<InactiveTopicPolicies> internalGetInactiveTopicPolicies
+            (boolean applied, boolean isGlobal) {
+        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
             .thenApply(op -> op.map(TopicPolicies::getInactiveTopicPolicies)
                 .orElseGet(() -> {
                     if (applied) {
@@ -853,10 +854,12 @@
                 }));
     }
 
-    protected CompletableFuture<Void> internalSetInactiveTopicPolicies(InactiveTopicPolicies inactiveTopicPolicies) {
-        return getTopicPoliciesAsyncWithRetry(topicName)
+    protected CompletableFuture<Void> internalSetInactiveTopicPolicies
+            (InactiveTopicPolicies inactiveTopicPolicies, boolean isGlobal) {
+        return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
             .thenCompose(op -> {
                 TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+                topicPolicies.setIsGlobal(isGlobal);
                 topicPolicies.setInactiveTopicPolicies(inactiveTopicPolicies);
                 return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
             });
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 73b1d20..f031841 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -486,11 +486,12 @@
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
             @QueryParam("applied") boolean applied,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenCompose(__ -> internalGetInactiveTopicPolicies(applied))
+            .thenCompose(__ -> internalGetInactiveTopicPolicies(applied, isGlobal))
             .thenApply(asyncResponse::resume).exceptionally(ex -> {
                 handleTopicPolicyException("getInactiveTopicPolicies", ex, asyncResponse);
                 return null;
@@ -508,11 +509,12 @@
             @PathParam("topic") @Encoded String encodedTopic,
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "inactive topic policies for the specified topic")
             InactiveTopicPolicies inactiveTopicPolicies) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenCompose(__ -> internalSetInactiveTopicPolicies(inactiveTopicPolicies))
+            .thenCompose(__ -> internalSetInactiveTopicPolicies(inactiveTopicPolicies, isGlobal))
             .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
             .exceptionally(ex -> {
                 handleTopicPolicyException("setInactiveTopicPolicies", ex, asyncResponse);
@@ -529,11 +531,12 @@
             @PathParam("tenant") String tenant,
             @PathParam("namespace") String namespace,
             @PathParam("topic") @Encoded String encodedTopic,
+            @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
             @ApiParam(value = "Is authentication required to perform this operation")
             @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
         validateTopicName(tenant, namespace, encodedTopic);
         preValidation(authoritative)
-            .thenCompose(__ -> internalSetInactiveTopicPolicies(null))
+            .thenCompose(__ -> internalSetInactiveTopicPolicies(null, isGlobal))
             .thenRun(() -> asyncResponse.resume(Response.noContent().build()))
             .exceptionally(ex -> {
                 handleTopicPolicyException("deleteInactiveTopicPolicies", ex, asyncResponse);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
index c33c3d8..5e5d5f1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
@@ -32,6 +32,8 @@
 import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.SubscriptionType;
 import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
@@ -296,6 +298,30 @@
                 assertNull(admin3.topicPolicies(true).getDispatchRate(persistentTopicName)));
     }
 
+    @Test
+    public void testReplicatorInactiveTopicPolicies() throws Exception {
+        final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
+        final String persistentTopicName = "persistent://" + namespace + "/topic" + UUID.randomUUID();
+        init(namespace, persistentTopicName);
+
+        // set InactiveTopicPolicies
+        InactiveTopicPolicies inactiveTopicPolicies =
+                new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true);
+        admin1.topicPolicies(true).setInactiveTopicPolicies(persistentTopicName, inactiveTopicPolicies);
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(admin2.topicPolicies(true)
+                        .getInactiveTopicPolicies(persistentTopicName), inactiveTopicPolicies));
+        Awaitility.await().untilAsserted(() ->
+                assertEquals(admin3.topicPolicies(true)
+                        .getInactiveTopicPolicies(persistentTopicName), inactiveTopicPolicies));
+        // remove InactiveTopicPolicies
+        admin1.topicPolicies(true).removeInactiveTopicPolicies(persistentTopicName);
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin2.topicPolicies(true).getInactiveTopicPolicies(persistentTopicName)));
+        Awaitility.await().untilAsserted(() ->
+                assertNull(admin3.topicPolicies(true).getInactiveTopicPolicies(persistentTopicName)));
+    }
+
     private void init(String namespace, String topic)
             throws PulsarAdminException, PulsarClientException, PulsarServerException {
         final String cluster2 = pulsar2.getConfig().getClusterName();
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 40086ee..c4b4fcc 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -908,6 +908,14 @@
         cmdTopics.run(split("remove-retention persistent://myprop/clust/ns1/ds1"));
         verify(mockTopicsPolicies).removeRetention("persistent://myprop/clust/ns1/ds1");
 
+        cmdTopics.run(split("get-inactive-topic-policies persistent://myprop/clust/ns1/ds1"));
+        verify(mockTopicsPolicies).getInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1",false);
+        cmdTopics.run(split("remove-inactive-topic-policies persistent://myprop/clust/ns1/ds1"));
+        verify(mockTopicsPolicies).removeInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1");
+        cmdTopics.run(split("set-inactive-topic-policies persistent://myprop/clust/ns1/ds1"
+                + " -e -t 1s -m delete_when_no_subscriptions"));
+        verify(mockTopicsPolicies).setInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1"
+                , new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true));
         cmdTopics.run(split("get-max-producers persistent://myprop/clust/ns1/ds1"));
         verify(mockTopicsPolicies).getMaxProducers("persistent://myprop/clust/ns1/ds1", false);
         cmdTopics.run(split("remove-max-producers persistent://myprop/clust/ns1/ds1"));
@@ -1022,6 +1030,16 @@
         cmdTopics.run(split("remove-persistence persistent://myprop/clust/ns1/ds1 -g"));
         verify(mockGlobalTopicsPolicies).removePersistence("persistent://myprop/clust/ns1/ds1");
 
+        cmdTopics.run(split("get-inactive-topic-policies persistent://myprop/clust/ns1/ds1 -g"));
+        verify(mockGlobalTopicsPolicies).getInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1",false);
+        cmdTopics.run(split("remove-inactive-topic-policies persistent://myprop/clust/ns1/ds1 -g"));
+        verify(mockGlobalTopicsPolicies).removeInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1");
+        cmdTopics.run(split("set-inactive-topic-policies persistent://myprop/clust/ns1/ds1"
+                + " -e -t 1s -m delete_when_no_subscriptions -g"));
+        verify(mockGlobalTopicsPolicies).setInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1"
+                , new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true));
+
+
 
         cmdTopics.run(split("get-dispatch-rate persistent://myprop/clust/ns1/ds1 -ap -g"));
         verify(mockGlobalTopicsPolicies).getDispatchRate("persistent://myprop/clust/ns1/ds1", true);
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index 8be0fb3..bcd7001 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -33,6 +33,9 @@
 import org.apache.pulsar.common.policies.data.BacklogQuota;
 import org.apache.pulsar.common.policies.data.DispatchRate;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.policies.data.BacklogQuota;
+import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
 import org.apache.pulsar.common.policies.data.PersistencePolicies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.util.RelativeTimeUtil;
@@ -78,6 +81,11 @@
         jcommander.addCommand("get-dispatch-rate", new GetDispatchRate());
         jcommander.addCommand("set-dispatch-rate", new SetDispatchRate());
         jcommander.addCommand("remove-dispatch-rate", new RemoveDispatchRate());
+
+        jcommander.addCommand("get-inactive-topic-policies",new GetInactiveTopicPolicies());
+        jcommander.addCommand("set-inactive-topic-policies",new SetInactiveTopicPolicies());
+        jcommander.addCommand("remove-inactive-topic-policies",new RemoveInactiveTopicPolicies());
+
     }
 
     @Parameters(commandDescription = "Get max consumers per subscription for a topic")
@@ -733,6 +741,87 @@
         }
     }
 
+    @Parameters(commandDescription = "Get the inactive topic policies on a topic")
+    private class GetInactiveTopicPolicies extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
+        private boolean applied = false;
+
+        @Parameter(names = { "--global", "-g" }, description = "Whether to get this policy globally. "
+                + "If set to true, broker returned global topic policies")
+        private boolean isGlobal = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            print(getTopicPolicies(isGlobal).getInactiveTopicPolicies(persistentTopic, applied));
+        }
+    }
+
+    @Parameters(commandDescription = "Set the inactive topic policies on a topic")
+    private class SetInactiveTopicPolicies extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+
+        @Parameter(names = { "--enable-delete-while-inactive", "-e" }, description = "Enable delete while inactive")
+        private boolean enableDeleteWhileInactive = false;
+
+        @Parameter(names = { "--disable-delete-while-inactive", "-d" }, description = "Disable delete while inactive")
+        private boolean disableDeleteWhileInactive = false;
+
+        @Parameter(names = {"--max-inactive-duration", "-t"}, description = "Max duration of topic inactivity in seconds" +
+                ",topics that are inactive for longer than this value will be deleted (eg: 1s, 10s, 1m, 5h, 3d)", required = true)
+        private String deleteInactiveTopicsMaxInactiveDuration;
+
+        @Parameter(names = { "--delete-mode", "-m" }, description = "Mode of delete inactive topic" +
+                ",Valid options are: [delete_when_no_subscriptions, delete_when_subscriptions_caught_up]", required = true)
+        private String inactiveTopicDeleteMode;
+
+        @Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. "
+                + "If set to true, the policy will be replicate to other clusters asynchronously")
+        private boolean isGlobal = false;
+
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            long maxInactiveDurationInSeconds;
+            try {
+                maxInactiveDurationInSeconds = TimeUnit.SECONDS.toSeconds(
+                        RelativeTimeUtil.parseRelativeTimeInSeconds(deleteInactiveTopicsMaxInactiveDuration));
+            } catch (IllegalArgumentException exception) {
+                throw new ParameterException(exception.getMessage());
+            }
+
+            if (enableDeleteWhileInactive == disableDeleteWhileInactive) {
+                throw new ParameterException("Need to specify either enable-delete-while-inactive or disable-delete-while-inactive");
+            }
+            InactiveTopicDeleteMode deleteMode;
+            try {
+                deleteMode = InactiveTopicDeleteMode.valueOf(inactiveTopicDeleteMode);
+            } catch (IllegalArgumentException e) {
+                throw new ParameterException("delete mode can only be set to delete_when_no_subscriptions or delete_when_subscriptions_caught_up");
+            }
+            getTopicPolicies(isGlobal).setInactiveTopicPolicies(persistentTopic,
+                    new InactiveTopicPolicies(deleteMode, (int) maxInactiveDurationInSeconds, enableDeleteWhileInactive));
+        }
+    }
+
+    @Parameters(commandDescription = "Remove inactive topic policies from a topic")
+    private class RemoveInactiveTopicPolicies extends CliCommand {
+        @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+        private java.util.List<String> params;
+        @Parameter(names = { "--global", "-g" }, description = "Whether to remove this policy globally. "
+                + "If set to true, the removing operation will be replicate to other clusters asynchronously")
+        private boolean isGlobal = false;
+        @Override
+        void run() throws PulsarAdminException {
+            String persistentTopic = validatePersistentTopic(params);
+            getTopicPolicies(isGlobal).removeInactiveTopicPolicies(persistentTopic);
+        }
+    }
+
     private TopicPolicies getTopicPolicies(boolean isGlobal) {
         return getAdmin().topicPolicies(isGlobal);
     }
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 53b45ea..99ab485 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -290,6 +290,10 @@
             cmdUsageFormatter.addDeprecatedCommand("set-persistence");
             cmdUsageFormatter.addDeprecatedCommand("remove-persistence");
 
+            cmdUsageFormatter.addDeprecatedCommand("get-inactive-topic-policies");
+            cmdUsageFormatter.addDeprecatedCommand("set-inactive-topic-policies");
+            cmdUsageFormatter.addDeprecatedCommand("remove-inactive-topic-policies");
+
             cmdUsageFormatter.addDeprecatedCommand("get-dispatch-rate");
             cmdUsageFormatter.addDeprecatedCommand("set-dispatch-rate");
             cmdUsageFormatter.addDeprecatedCommand("remove-dispatch-rate");