Inactive topic policies support cross multiple clusters (#13509)
* inactiveTopicPolicies support cross multiple clusters
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 980ecd5..449794b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -838,8 +838,9 @@
});
}
- protected CompletableFuture<InactiveTopicPolicies> internalGetInactiveTopicPolicies(boolean applied) {
- return getTopicPoliciesAsyncWithRetry(topicName)
+ protected CompletableFuture<InactiveTopicPolicies> internalGetInactiveTopicPolicies
+ (boolean applied, boolean isGlobal) {
+ return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getInactiveTopicPolicies)
.orElseGet(() -> {
if (applied) {
@@ -853,10 +854,12 @@
}));
}
- protected CompletableFuture<Void> internalSetInactiveTopicPolicies(InactiveTopicPolicies inactiveTopicPolicies) {
- return getTopicPoliciesAsyncWithRetry(topicName)
+ protected CompletableFuture<Void> internalSetInactiveTopicPolicies
+ (InactiveTopicPolicies inactiveTopicPolicies, boolean isGlobal) {
+ return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
+ topicPolicies.setIsGlobal(isGlobal);
topicPolicies.setInactiveTopicPolicies(inactiveTopicPolicies);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 73b1d20..f031841 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -486,11 +486,12 @@
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("applied") boolean applied,
+ @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
- .thenCompose(__ -> internalGetInactiveTopicPolicies(applied))
+ .thenCompose(__ -> internalGetInactiveTopicPolicies(applied, isGlobal))
.thenApply(asyncResponse::resume).exceptionally(ex -> {
handleTopicPolicyException("getInactiveTopicPolicies", ex, asyncResponse);
return null;
@@ -508,11 +509,12 @@
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
+ @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "inactive topic policies for the specified topic")
InactiveTopicPolicies inactiveTopicPolicies) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
- .thenCompose(__ -> internalSetInactiveTopicPolicies(inactiveTopicPolicies))
+ .thenCompose(__ -> internalSetInactiveTopicPolicies(inactiveTopicPolicies, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
handleTopicPolicyException("setInactiveTopicPolicies", ex, asyncResponse);
@@ -529,11 +531,12 @@
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
+ @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
- .thenCompose(__ -> internalSetInactiveTopicPolicies(null))
+ .thenCompose(__ -> internalSetInactiveTopicPolicies(null, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
handleTopicPolicyException("deleteInactiveTopicPolicies", ex, asyncResponse);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
index c33c3d8..5e5d5f1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
@@ -32,6 +32,8 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
@@ -296,6 +298,30 @@
assertNull(admin3.topicPolicies(true).getDispatchRate(persistentTopicName)));
}
+ @Test
+ public void testReplicatorInactiveTopicPolicies() throws Exception {
+ final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
+ final String persistentTopicName = "persistent://" + namespace + "/topic" + UUID.randomUUID();
+ init(namespace, persistentTopicName);
+
+ // set InactiveTopicPolicies
+ InactiveTopicPolicies inactiveTopicPolicies =
+ new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true);
+ admin1.topicPolicies(true).setInactiveTopicPolicies(persistentTopicName, inactiveTopicPolicies);
+ Awaitility.await().untilAsserted(() ->
+ assertEquals(admin2.topicPolicies(true)
+ .getInactiveTopicPolicies(persistentTopicName), inactiveTopicPolicies));
+ Awaitility.await().untilAsserted(() ->
+ assertEquals(admin3.topicPolicies(true)
+ .getInactiveTopicPolicies(persistentTopicName), inactiveTopicPolicies));
+ // remove InactiveTopicPolicies
+ admin1.topicPolicies(true).removeInactiveTopicPolicies(persistentTopicName);
+ Awaitility.await().untilAsserted(() ->
+ assertNull(admin2.topicPolicies(true).getInactiveTopicPolicies(persistentTopicName)));
+ Awaitility.await().untilAsserted(() ->
+ assertNull(admin3.topicPolicies(true).getInactiveTopicPolicies(persistentTopicName)));
+ }
+
private void init(String namespace, String topic)
throws PulsarAdminException, PulsarClientException, PulsarServerException {
final String cluster2 = pulsar2.getConfig().getClusterName();
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 40086ee..c4b4fcc 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -908,6 +908,14 @@
cmdTopics.run(split("remove-retention persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).removeRetention("persistent://myprop/clust/ns1/ds1");
+ cmdTopics.run(split("get-inactive-topic-policies persistent://myprop/clust/ns1/ds1"));
+ verify(mockTopicsPolicies).getInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1",false);
+ cmdTopics.run(split("remove-inactive-topic-policies persistent://myprop/clust/ns1/ds1"));
+ verify(mockTopicsPolicies).removeInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1");
+ cmdTopics.run(split("set-inactive-topic-policies persistent://myprop/clust/ns1/ds1"
+ + " -e -t 1s -m delete_when_no_subscriptions"));
+ verify(mockTopicsPolicies).setInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1"
+ , new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true));
cmdTopics.run(split("get-max-producers persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).getMaxProducers("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("remove-max-producers persistent://myprop/clust/ns1/ds1"));
@@ -1022,6 +1030,16 @@
cmdTopics.run(split("remove-persistence persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).removePersistence("persistent://myprop/clust/ns1/ds1");
+ cmdTopics.run(split("get-inactive-topic-policies persistent://myprop/clust/ns1/ds1 -g"));
+ verify(mockGlobalTopicsPolicies).getInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1",false);
+ cmdTopics.run(split("remove-inactive-topic-policies persistent://myprop/clust/ns1/ds1 -g"));
+ verify(mockGlobalTopicsPolicies).removeInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1");
+ cmdTopics.run(split("set-inactive-topic-policies persistent://myprop/clust/ns1/ds1"
+ + " -e -t 1s -m delete_when_no_subscriptions -g"));
+ verify(mockGlobalTopicsPolicies).setInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1"
+ , new InactiveTopicPolicies(InactiveTopicDeleteMode.delete_when_no_subscriptions, 1, true));
+
+
cmdTopics.run(split("get-dispatch-rate persistent://myprop/clust/ns1/ds1 -ap -g"));
verify(mockGlobalTopicsPolicies).getDispatchRate("persistent://myprop/clust/ns1/ds1", true);
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index 8be0fb3..bcd7001 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -33,6 +33,9 @@
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.common.policies.data.BacklogQuota;
+import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
+import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.util.RelativeTimeUtil;
@@ -78,6 +81,11 @@
jcommander.addCommand("get-dispatch-rate", new GetDispatchRate());
jcommander.addCommand("set-dispatch-rate", new SetDispatchRate());
jcommander.addCommand("remove-dispatch-rate", new RemoveDispatchRate());
+
+ jcommander.addCommand("get-inactive-topic-policies",new GetInactiveTopicPolicies());
+ jcommander.addCommand("set-inactive-topic-policies",new SetInactiveTopicPolicies());
+ jcommander.addCommand("remove-inactive-topic-policies",new RemoveInactiveTopicPolicies());
+
}
@Parameters(commandDescription = "Get max consumers per subscription for a topic")
@@ -733,6 +741,87 @@
}
}
+ @Parameters(commandDescription = "Get the inactive topic policies on a topic")
+ private class GetInactiveTopicPolicies extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
+ private boolean applied = false;
+
+ @Parameter(names = { "--global", "-g" }, description = "Whether to get this policy globally. "
+ + "If set to true, broker returned global topic policies")
+ private boolean isGlobal = false;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ print(getTopicPolicies(isGlobal).getInactiveTopicPolicies(persistentTopic, applied));
+ }
+ }
+
+ @Parameters(commandDescription = "Set the inactive topic policies on a topic")
+ private class SetInactiveTopicPolicies extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "--enable-delete-while-inactive", "-e" }, description = "Enable delete while inactive")
+ private boolean enableDeleteWhileInactive = false;
+
+ @Parameter(names = { "--disable-delete-while-inactive", "-d" }, description = "Disable delete while inactive")
+ private boolean disableDeleteWhileInactive = false;
+
+ @Parameter(names = {"--max-inactive-duration", "-t"}, description = "Max duration of topic inactivity in seconds" +
+ ",topics that are inactive for longer than this value will be deleted (eg: 1s, 10s, 1m, 5h, 3d)", required = true)
+ private String deleteInactiveTopicsMaxInactiveDuration;
+
+ @Parameter(names = { "--delete-mode", "-m" }, description = "Mode of delete inactive topic" +
+ ",Valid options are: [delete_when_no_subscriptions, delete_when_subscriptions_caught_up]", required = true)
+ private String inactiveTopicDeleteMode;
+
+ @Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. "
+ + "If set to true, the policy will be replicate to other clusters asynchronously")
+ private boolean isGlobal = false;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ long maxInactiveDurationInSeconds;
+ try {
+ maxInactiveDurationInSeconds = TimeUnit.SECONDS.toSeconds(
+ RelativeTimeUtil.parseRelativeTimeInSeconds(deleteInactiveTopicsMaxInactiveDuration));
+ } catch (IllegalArgumentException exception) {
+ throw new ParameterException(exception.getMessage());
+ }
+
+ if (enableDeleteWhileInactive == disableDeleteWhileInactive) {
+ throw new ParameterException("Need to specify either enable-delete-while-inactive or disable-delete-while-inactive");
+ }
+ InactiveTopicDeleteMode deleteMode;
+ try {
+ deleteMode = InactiveTopicDeleteMode.valueOf(inactiveTopicDeleteMode);
+ } catch (IllegalArgumentException e) {
+ throw new ParameterException("delete mode can only be set to delete_when_no_subscriptions or delete_when_subscriptions_caught_up");
+ }
+ getTopicPolicies(isGlobal).setInactiveTopicPolicies(persistentTopic,
+ new InactiveTopicPolicies(deleteMode, (int) maxInactiveDurationInSeconds, enableDeleteWhileInactive));
+ }
+ }
+
+ @Parameters(commandDescription = "Remove inactive topic policies from a topic")
+ private class RemoveInactiveTopicPolicies extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+ @Parameter(names = { "--global", "-g" }, description = "Whether to remove this policy globally. "
+ + "If set to true, the removing operation will be replicate to other clusters asynchronously")
+ private boolean isGlobal = false;
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ getTopicPolicies(isGlobal).removeInactiveTopicPolicies(persistentTopic);
+ }
+ }
+
private TopicPolicies getTopicPolicies(boolean isGlobal) {
return getAdmin().topicPolicies(isGlobal);
}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 53b45ea..99ab485 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -290,6 +290,10 @@
cmdUsageFormatter.addDeprecatedCommand("set-persistence");
cmdUsageFormatter.addDeprecatedCommand("remove-persistence");
+ cmdUsageFormatter.addDeprecatedCommand("get-inactive-topic-policies");
+ cmdUsageFormatter.addDeprecatedCommand("set-inactive-topic-policies");
+ cmdUsageFormatter.addDeprecatedCommand("remove-inactive-topic-policies");
+
cmdUsageFormatter.addDeprecatedCommand("get-dispatch-rate");
cmdUsageFormatter.addDeprecatedCommand("set-dispatch-rate");
cmdUsageFormatter.addDeprecatedCommand("remove-dispatch-rate");