Max unacked messages on subscription support cross multiple clusters (#13549)
* Max unacked messages per subscription support cross multiple clusters
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 28b8321..2bce311 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -894,8 +894,9 @@
});
}
- protected CompletableFuture<Integer> internalGetMaxUnackedMessagesOnSubscription(boolean applied) {
- return getTopicPoliciesAsyncWithRetry(topicName)
+ protected CompletableFuture<Integer> internalGetMaxUnackedMessagesOnSubscription(boolean applied,
+ boolean isGlobal) {
+ return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getMaxUnackedMessagesOnSubscription)
.orElseGet(() -> {
if (applied) {
@@ -907,16 +908,18 @@
}));
}
- protected CompletableFuture<Void> internalSetMaxUnackedMessagesOnSubscription(Integer maxUnackedNum) {
+ protected CompletableFuture<Void> internalSetMaxUnackedMessagesOnSubscription(Integer maxUnackedNum,
+ boolean isGlobal) {
if (maxUnackedNum != null && maxUnackedNum < 0) {
throw new RestException(Status.PRECONDITION_FAILED,
"maxUnackedNum must be 0 or more");
}
- return getTopicPoliciesAsyncWithRetry(topicName)
+ return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setMaxUnackedMessagesOnSubscription(maxUnackedNum);
+ topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index fd8a3a4..938da35 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -555,11 +555,12 @@
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("applied") boolean applied,
+ @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
- .thenCompose(__ -> internalGetMaxUnackedMessagesOnSubscription(applied))
+ .thenCompose(__ -> internalGetMaxUnackedMessagesOnSubscription(applied, isGlobal))
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
handleTopicPolicyException("getMaxUnackedMessagesOnSubscription", ex, asyncResponse);
@@ -577,6 +578,7 @@
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
+ @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Max unacked messages on subscription policies for the specified topic")
@@ -584,7 +586,7 @@
validateTopicName(tenant, namespace, encodedTopic);
validateTopicPolicyOperation(topicName, PolicyName.MAX_UNACKED, PolicyOperation.WRITE);
preValidation(authoritative)
- .thenCompose(__ -> internalSetMaxUnackedMessagesOnSubscription(maxUnackedNum))
+ .thenCompose(__ -> internalSetMaxUnackedMessagesOnSubscription(maxUnackedNum, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
handleTopicPolicyException("setMaxUnackedMessagesOnSubscription", ex, asyncResponse);
@@ -603,12 +605,13 @@
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
+ @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
validateTopicPolicyOperation(topicName, PolicyName.MAX_UNACKED, PolicyOperation.WRITE);
preValidation(authoritative)
- .thenCompose(__ -> internalSetMaxUnackedMessagesOnSubscription(null))
+ .thenCompose(__ -> internalSetMaxUnackedMessagesOnSubscription(null, isGlobal))
.thenRun(() -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
handleTopicPolicyException("deleteMaxUnackedMessagesOnSubscription", ex, asyncResponse);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
index 1a6abbd..9737c46 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
@@ -352,6 +352,25 @@
assertNull(admin3.topicPolicies(true).getInactiveTopicPolicies(persistentTopicName)));
}
+ @Test
+ public void testReplicateMaxUnackedMsgPerSub() throws Exception {
+ final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
+ final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();
+ init(namespace, topic);
+ // set max unacked msgs per sub
+ admin1.topicPolicies(true).setMaxUnackedMessagesOnSubscription(topic, 100);
+ Awaitility.await().ignoreExceptions().untilAsserted(() ->
+ assertEquals(admin2.topicPolicies(true).getMaxUnackedMessagesOnSubscription(topic).intValue(), 100));
+ Awaitility.await().ignoreExceptions().untilAsserted(() ->
+ assertEquals(admin3.topicPolicies(true).getMaxUnackedMessagesOnSubscription(topic).intValue(), 100));
+ // remove max unacked msgs per sub
+ admin1.topicPolicies(true).removeMaxUnackedMessagesOnSubscription(topic);
+ Awaitility.await().untilAsserted(() ->
+ assertNull(admin2.topicPolicies(true).getMaxUnackedMessagesOnSubscription(topic)));
+ Awaitility.await().untilAsserted(() ->
+ assertNull(admin3.topicPolicies(true).getMaxUnackedMessagesOnSubscription(topic)));
+ }
+
private void init(String namespace, String topic)
throws PulsarAdminException, PulsarClientException, PulsarServerException {
final String cluster2 = pulsar2.getConfig().getClusterName();
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index a27147a..812b27e 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -966,6 +966,13 @@
cmdTopics.run(split("remove-max-consumers-per-subscription persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).removeMaxConsumersPerSubscription("persistent://myprop/clust/ns1/ds1");
+ cmdTopics.run(split("get-max-unacked-messages-per-subscription persistent://myprop/clust/ns1/ds1"));
+ verify(mockTopicsPolicies, times(1)).getMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1", false);
+ cmdTopics.run(split("remove-max-unacked-messages-per-subscription persistent://myprop/clust/ns1/ds1"));
+ verify(mockTopicsPolicies, times(1)).removeMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1");
+ cmdTopics.run(split("set-max-unacked-messages-per-subscription persistent://myprop/clust/ns1/ds1 -m 99"));
+ verify(mockTopicsPolicies, times(1)).setMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1", 99);
+
cmdTopics.run(split("get-deduplication persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).getDeduplicationStatus("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("set-deduplication persistent://myprop/clust/ns1/ds1 --disable"));
@@ -1033,6 +1040,13 @@
cmdTopics.run(split("remove-persistence persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).removePersistence("persistent://myprop/clust/ns1/ds1");
+ cmdTopics.run(split("get-max-unacked-messages-per-subscription persistent://myprop/clust/ns1/ds1 -g"));
+ verify(mockGlobalTopicsPolicies, times(1)).getMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1", false);
+ cmdTopics.run(split("remove-max-unacked-messages-per-subscription persistent://myprop/clust/ns1/ds1 -g"));
+ verify(mockGlobalTopicsPolicies, times(1)).removeMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1");
+ cmdTopics.run(split("set-max-unacked-messages-per-subscription persistent://myprop/clust/ns1/ds1 -m 99 -g"));
+ verify(mockGlobalTopicsPolicies, times(1)).setMaxUnackedMessagesOnSubscription("persistent://myprop/clust/ns1/ds1", 99);
+
cmdTopics.run(split("get-inactive-topic-policies persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).getInactiveTopicPolicies("persistent://myprop/clust/ns1/ds1",false);
cmdTopics.run(split("remove-inactive-topic-policies persistent://myprop/clust/ns1/ds1 -g"));
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index dd7a79f..bcecb59 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -33,7 +33,6 @@
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.client.api.SubscriptionType;
-import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
@@ -87,6 +86,10 @@
jcommander.addCommand("set-dispatch-rate", new SetDispatchRate());
jcommander.addCommand("remove-dispatch-rate", new RemoveDispatchRate());
+ jcommander.addCommand("get-max-unacked-messages-per-subscription", new GetMaxUnackedMessagesPerSubscription());
+ jcommander.addCommand("set-max-unacked-messages-per-subscription", new SetMaxUnackedMessagesPerSubscription());
+ jcommander.addCommand("remove-max-unacked-messages-per-subscription", new RemoveMaxUnackedMessagesPerSubscription());
+
jcommander.addCommand("get-inactive-topic-policies",new GetInactiveTopicPolicies());
jcommander.addCommand("set-inactive-topic-policies",new SetInactiveTopicPolicies());
jcommander.addCommand("remove-inactive-topic-policies",new RemoveInactiveTopicPolicies());
@@ -397,6 +400,59 @@
}
}
+ @Parameters(commandDescription = "Get max unacked messages policy per subscription for a topic")
+ private class GetMaxUnackedMessagesPerSubscription extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
+ private boolean applied = false;
+
+ @Parameter(names = { "--global", "-g" }, description = "Whether to get this policy globally. "
+ + "If set to true, the removing operation will be replicate to other clusters asynchronously")
+ private boolean isGlobal = false;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ print(getTopicPolicies(isGlobal).getMaxUnackedMessagesOnSubscription(persistentTopic, applied));
+ }
+ }
+
+ @Parameters(commandDescription = "Remove max unacked messages policy per subscription for a topic")
+ private class RemoveMaxUnackedMessagesPerSubscription extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "--global", "-g" }, description = "Whether to remove this policy globally. "
+ + "If set to true, the removing operation will be replicate to other clusters asynchronously")
+ private boolean isGlobal = false;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ getTopicPolicies(isGlobal).removeMaxUnackedMessagesOnSubscription(persistentTopic);
+ }
+ }
+
+ @Parameters(commandDescription = "Set max unacked messages policy on subscription for a topic")
+ private class SetMaxUnackedMessagesPerSubscription extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = {"-m", "--maxNum"}, description = "max unacked messages num on subscription", required = true)
+ private int maxNum;
+
+ @Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. "
+ + "If set to true, the removing operation will be replicate to other clusters asynchronously")
+ private boolean isGlobal = false;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ getTopicPolicies(isGlobal).setMaxUnackedMessagesOnSubscription(persistentTopic, maxNum);
+ }
+ }
@Parameters(commandDescription = "Get max number of producers for a topic")
private class GetMaxProducers extends CliCommand {
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 69b976c..dcbdc9b 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -306,6 +306,10 @@
cmdUsageFormatter.addDeprecatedCommand("set-deduplication");
cmdUsageFormatter.addDeprecatedCommand("remove-deduplication");
+ cmdUsageFormatter.addDeprecatedCommand("get-max-unacked-messages-on-subscription");
+ cmdUsageFormatter.addDeprecatedCommand("set-max-unacked-messages-on-subscription");
+ cmdUsageFormatter.addDeprecatedCommand("remove-max-unacked-messages-on-subscription");
+
cmdUsageFormatter.addDeprecatedCommand("set-subscription-types-enabled");
cmdUsageFormatter.addDeprecatedCommand("get-subscription-types-enabled");
cmdUsageFormatter.addDeprecatedCommand("remove-subscription-types-enabled");