DeduplicationSnapshotIntervalSeconds support cross multiple clusters (#13578)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index db3c255..c009a16 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -970,14 +970,15 @@
});
}
- protected CompletableFuture<Void> internalSetDeduplicationSnapshotInterval(Integer interval) {
+ protected CompletableFuture<Void> internalSetDeduplicationSnapshotInterval(Integer interval, boolean isGlobal) {
if (interval != null && interval < 0) {
throw new RestException(Status.PRECONDITION_FAILED, "interval must be 0 or more");
}
- return getTopicPoliciesAsyncWithRetry(topicName)
+ return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies policies = op.orElseGet(TopicPolicies::new);
policies.setDeduplicationSnapshotIntervalSeconds(interval);
+ policies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, policies);
});
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index d096d65..db08474 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -421,11 +421,12 @@
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
+ @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
- .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName))
+ .thenCompose(__ -> getTopicPoliciesAsyncWithRetry(topicName, isGlobal))
.thenAccept(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
asyncResponse.resume(topicPolicies.getDeduplicationSnapshotIntervalSeconds());
@@ -448,11 +449,12 @@
@PathParam("topic") @Encoded String encodedTopic,
@ApiParam(value = "Interval to take deduplication snapshot for the specified topic")
Integer interval,
+ @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
- .thenCompose(__ -> internalSetDeduplicationSnapshotInterval(interval))
+ .thenCompose(__ -> internalSetDeduplicationSnapshotInterval(interval, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
handleTopicPolicyException("setDeduplicationSnapshotInterval", ex, asyncResponse);
@@ -469,11 +471,12 @@
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
+ @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
- .thenCompose(__ -> internalSetDeduplicationSnapshotInterval(null))
+ .thenCompose(__ -> internalSetDeduplicationSnapshotInterval(null, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
handleTopicPolicyException("deleteDeduplicationSnapshotInterval", ex, asyncResponse);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
index 637cb3c..29f0b8e 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
@@ -171,6 +171,25 @@
untilRemoteClustersAsserted(admin -> assertNull(admin.topicPolicies(true).getPublishRate(topic)));
}
+ @Test
+ public void testReplicateDeduplicationSnapshotIntervalPolicies() throws Exception {
+ final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
+ final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();
+ init(namespace, topic);
+ // set global topic policy
+ admin1.topicPolicies(true).setDeduplicationSnapshotInterval(topic, 100);
+
+ // get global topic policy
+ untilRemoteClustersAsserted(
+ admin -> assertEquals(admin.topicPolicies(true).getDeduplicationSnapshotInterval(topic),
+ Integer.valueOf(100)));
+
+ // remove global topic policy
+ admin1.topicPolicies(true).removeDeduplicationSnapshotInterval(topic);
+ untilRemoteClustersAsserted(
+ admin -> assertNull(admin.topicPolicies(true).getDeduplicationSnapshotInterval(topic)));
+ }
+
private void untilRemoteClustersAsserted(ThrowingConsumer<PulsarAdmin> condition) {
Awaitility.await().untilAsserted(() -> condition.apply(admin2));
Awaitility.await().untilAsserted(() -> condition.apply(admin3));
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/DeprecatedCommanderTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/DeprecatedCommanderTest.java
index 0a5505b..53d78dd 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/DeprecatedCommanderTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/DeprecatedCommanderTest.java
@@ -19,6 +19,12 @@
package org.apache.pulsar.admin.cli;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotEquals;
+import static org.testng.Assert.assertTrue;
import com.beust.jcommander.DefaultUsageFormatter;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.Schemas;
@@ -27,13 +33,6 @@
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotEquals;
-import static org.testng.Assert.assertTrue;
-
public class DeprecatedCommanderTest {
PulsarAdmin admin;
Topics mockTopics;
@@ -67,7 +66,7 @@
assertTrue(defaultOutput.contains("enable-deduplication"));
assertFalse(outputWithFiltered.contains("get-max-unacked-messages-on-consumer"));
assertTrue(defaultOutput.contains("get-max-unacked-messages-on-consumer"));
- assertTrue(outputWithFiltered.contains("get-deduplication"));
+ assertFalse(outputWithFiltered.contains("get-deduplication"));
assertTrue(defaultOutput.contains("get-deduplication"));
// annotation was changed to hidden, reset it.
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index 05636f5..569671d 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -1062,6 +1062,12 @@
cmdTopics.run(split("remove-max-subscriptions-per-topic persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).removeMaxSubscriptionsPerTopic("persistent://myprop/clust/ns1/ds1");
+ cmdTopics.run(split("get-deduplication-snapshot-interval persistent://myprop/clust/ns1/ds1"));
+ verify(mockTopicsPolicies).getDeduplicationSnapshotInterval("persistent://myprop/clust/ns1/ds1");
+ cmdTopics.run(split("set-deduplication-snapshot-interval persistent://myprop/clust/ns1/ds1 -i 100"));
+ verify(mockTopicsPolicies).setDeduplicationSnapshotInterval("persistent://myprop/clust/ns1/ds1", 100);
+ cmdTopics.run(split("remove-deduplication-snapshot-interval persistent://myprop/clust/ns1/ds1"));
+ verify(mockTopicsPolicies).removeDeduplicationSnapshotInterval("persistent://myprop/clust/ns1/ds1");
// Reset the cmd, and check global option
cmdTopics = new CmdTopicPolicies(() -> admin);
@@ -1195,6 +1201,13 @@
cmdTopics.run(split("remove-deduplication persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).removeDeduplicationStatus("persistent://myprop/clust/ns1/ds1");
+ cmdTopics.run(split("get-deduplication-snapshot-interval persistent://myprop/clust/ns1/ds1 -g"));
+ verify(mockGlobalTopicsPolicies).getDeduplicationSnapshotInterval("persistent://myprop/clust/ns1/ds1");
+ cmdTopics.run(split("set-deduplication-snapshot-interval persistent://myprop/clust/ns1/ds1 -i 100 -g"));
+ verify(mockGlobalTopicsPolicies).setDeduplicationSnapshotInterval("persistent://myprop/clust/ns1/ds1", 100);
+ cmdTopics.run(split("remove-deduplication-snapshot-interval persistent://myprop/clust/ns1/ds1 -g"));
+ verify(mockGlobalTopicsPolicies).removeDeduplicationSnapshotInterval("persistent://myprop/clust/ns1/ds1");
+
cmdTopics.run(split("get-max-consumers-per-subscription persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).getMaxConsumersPerSubscription("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("set-max-consumers-per-subscription persistent://myprop/clust/ns1/ds1 -c 5 -g"));
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index f343257..00ea7fa 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -84,6 +84,10 @@
jcommander.addCommand("get-deduplication", new GetDeduplicationStatus());
jcommander.addCommand("remove-deduplication", new RemoveDeduplicationStatus());
+ jcommander.addCommand("get-deduplication-snapshot-interval", new GetDeduplicationSnapshotInterval());
+ jcommander.addCommand("set-deduplication-snapshot-interval", new SetDeduplicationSnapshotInterval());
+ jcommander.addCommand("remove-deduplication-snapshot-interval", new RemoveDeduplicationSnapshotInterval());
+
jcommander.addCommand("get-persistence", new GetPersistence());
jcommander.addCommand("set-persistence", new SetPersistence());
jcommander.addCommand("remove-persistence", new RemovePersistence());
@@ -791,6 +795,62 @@
}
}
+ @Parameters(commandDescription = "Get deduplication snapshot interval for a topic")
+ private class GetDeduplicationSnapshotInterval extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = {"--global", "-g"}, description = "Whether to get this policy globally. "
+ + "If set to true, broker returns global topic policies")
+ private boolean isGlobal = false;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ print(getTopicPolicies(isGlobal).getDeduplicationSnapshotInterval(persistentTopic));
+ }
+ }
+
+ @Parameters(commandDescription = "Set deduplication snapshot interval for a topic")
+ private class SetDeduplicationSnapshotInterval extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = {"-i", "--interval"}, description =
+ "Deduplication snapshot interval for topic in second, allowed range from 0 to Integer.MAX_VALUE",
+ required = true)
+ private int interval;
+
+ @Parameter(names = {"--global", "-g"}, description = "Whether to set this policy globally.")
+ private boolean isGlobal = false;
+
+ @Override
+ void run() throws PulsarAdminException {
+ if (interval < 0) {
+ throw new ParameterException(String.format("Invalid interval '%d'. ", interval));
+ }
+
+ String persistentTopic = validatePersistentTopic(params);
+ getTopicPolicies(isGlobal).setDeduplicationSnapshotInterval(persistentTopic, interval);
+ }
+ }
+
+ @Parameters(commandDescription = "Remove deduplication snapshot interval for a topic")
+ private class RemoveDeduplicationSnapshotInterval extends CliCommand {
+
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = {"--global", "-g"}, description = "Whether to remove this policy globally. ")
+ private boolean isGlobal = false;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ getTopicPolicies(isGlobal).removeDeduplicationSnapshotInterval(persistentTopic);
+ }
+ }
+
@Parameters(commandDescription = "Get the backlog quota policies for a topic")
private class GetBacklogQuotaMap extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index e5127f9..742b96a 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -320,6 +320,10 @@
cmdUsageFormatter.addDeprecatedCommand("set-deduplication");
cmdUsageFormatter.addDeprecatedCommand("remove-deduplication");
+ cmdUsageFormatter.addDeprecatedCommand("get-deduplication-snapshot-interval");
+ cmdUsageFormatter.addDeprecatedCommand("set-deduplication-snapshot-interval");
+ cmdUsageFormatter.addDeprecatedCommand("remove-deduplication-snapshot-interval");
+
cmdUsageFormatter.addDeprecatedCommand("get-max-unacked-messages-on-subscription");
cmdUsageFormatter.addDeprecatedCommand("set-max-unacked-messages-on-subscription");
cmdUsageFormatter.addDeprecatedCommand("remove-max-unacked-messages-on-subscription");