maxSubscriptionsPerTopic support cross multiple clusters (#13623)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 160ca3e..1becbec 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -3107,21 +3107,23 @@
}
- protected CompletableFuture<Optional<Integer>> internalGetMaxSubscriptionsPerTopic() {
- return getTopicPoliciesAsyncWithRetry(topicName)
+ protected CompletableFuture<Optional<Integer>> internalGetMaxSubscriptionsPerTopic(boolean isGlobal) {
+ return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getMaxSubscriptionsPerTopic));
}
- protected CompletableFuture<Void> internalSetMaxSubscriptionsPerTopic(Integer maxSubscriptionsPerTopic) {
+ protected CompletableFuture<Void> internalSetMaxSubscriptionsPerTopic(Integer maxSubscriptionsPerTopic,
+ boolean isGlobal) {
if (maxSubscriptionsPerTopic != null && maxSubscriptionsPerTopic < 0) {
throw new RestException(Status.PRECONDITION_FAILED,
"maxSubscriptionsPerTopic must be 0 or more");
}
- return getTopicPoliciesAsyncWithRetry(topicName)
+ return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setMaxSubscriptionsPerTopic(maxSubscriptionsPerTopic);
+ topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 876aa39..aec2a6e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -2152,11 +2152,12 @@
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
+ @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
- .thenCompose(__ -> internalGetMaxSubscriptionsPerTopic())
+ .thenCompose(__ -> internalGetMaxSubscriptionsPerTopic(isGlobal))
.thenAccept(op -> asyncResponse.resume(op.isPresent() ? op.get()
: Response.noContent().build()))
.exceptionally(ex -> {
@@ -2178,16 +2179,17 @@
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
+ @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "The max subscriptions of the topic") int maxSubscriptionsPerTopic) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
- .thenCompose(__ -> internalSetMaxSubscriptionsPerTopic(maxSubscriptionsPerTopic))
+ .thenCompose(__ -> internalSetMaxSubscriptionsPerTopic(maxSubscriptionsPerTopic, isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully updated maxSubscriptionsPerTopic: namespace={}, topic={}"
- + ", maxSubscriptions={}"
- , clientAppId(), namespaceName, topicName.getLocalName(), maxSubscriptionsPerTopic);
+ + ", maxSubscriptions={}, isGlobal={}"
+ , clientAppId(), namespaceName, topicName.getLocalName(), maxSubscriptionsPerTopic, isGlobal);
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(ex -> {
@@ -2208,11 +2210,12 @@
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
+ @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
- .thenCompose(__ -> internalSetMaxSubscriptionsPerTopic(null))
+ .thenCompose(__ -> internalSetMaxSubscriptionsPerTopic(null, isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove maxSubscriptionsPerTopic: namespace={}, topic={}",
clientAppId(), namespaceName, topicName.getLocalName());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
index fc095df..7051bbe 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
@@ -531,6 +531,26 @@
assertNull(admin3.topicPolicies(true).getCompactionThreshold(persistentTopicName)));
}
+ @Test
+ public void testReplicateMaxSubscriptionsPerTopic() throws Exception {
+ final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
+ final String persistentTopicName = "persistent://" + namespace + "/topic" + UUID.randomUUID();
+ init(namespace, persistentTopicName);
+
+ //set max subscriptions per topic
+ admin1.topicPolicies(true).setMaxSubscriptionsPerTopic(persistentTopicName, 1024);
+
+ //get max subscriptions per topic
+ untilRemoteClustersAsserted(
+ admin -> assertEquals(admin.topicPolicies(true).getMaxSubscriptionsPerTopic(persistentTopicName),
+ Integer.valueOf(1024)));
+
+ //remove
+ admin1.topicPolicies(true).removeMaxSubscriptionsPerTopic(persistentTopicName);
+ untilRemoteClustersAsserted(
+ admin -> assertNull(admin.topicPolicies(true).getMaxSubscriptionsPerTopic(persistentTopicName)));
+ }
+
private void init(String namespace, String topic)
throws PulsarAdminException, PulsarClientException, PulsarServerException {
final String cluster2 = pulsar2.getConfig().getClusterName();
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index cdfc5ef..92d37f7 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -1041,6 +1041,14 @@
cmdTopics.run(split("remove-deduplication persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).removeDeduplicationStatus("persistent://myprop/clust/ns1/ds1");
+ cmdTopics.run(split("get-max-subscriptions-per-topic persistent://myprop/clust/ns1/ds1"));
+ verify(mockTopicsPolicies).getMaxSubscriptionsPerTopic("persistent://myprop/clust/ns1/ds1");
+ cmdTopics.run(split("set-max-subscriptions-per-topic persistent://myprop/clust/ns1/ds1 -s 1024"));
+ verify(mockTopicsPolicies).setMaxSubscriptionsPerTopic("persistent://myprop/clust/ns1/ds1", 1024);
+ cmdTopics.run(split("remove-max-subscriptions-per-topic persistent://myprop/clust/ns1/ds1"));
+ verify(mockTopicsPolicies).removeMaxSubscriptionsPerTopic("persistent://myprop/clust/ns1/ds1");
+
+
// Reset the cmd, and check global option
cmdTopics = new CmdTopicPolicies(() -> admin);
cmdTopics.run(split("get-retention persistent://myprop/clust/ns1/ds1 -g"));
@@ -1225,6 +1233,13 @@
verify(mockGlobalTopicsPolicies).getSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("remove-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).removeSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1");
+
+ cmdTopics.run(split("get-max-subscriptions-per-topic persistent://myprop/clust/ns1/ds1 -g"));
+ verify(mockGlobalTopicsPolicies).getMaxSubscriptionsPerTopic("persistent://myprop/clust/ns1/ds1");
+ cmdTopics.run(split("set-max-subscriptions-per-topic persistent://myprop/clust/ns1/ds1 -s 1024 -g"));
+ verify(mockGlobalTopicsPolicies).setMaxSubscriptionsPerTopic("persistent://myprop/clust/ns1/ds1", 1024);
+ cmdTopics.run(split("remove-max-subscriptions-per-topic persistent://myprop/clust/ns1/ds1 -g"));
+ verify(mockGlobalTopicsPolicies).removeMaxSubscriptionsPerTopic("persistent://myprop/clust/ns1/ds1");
}
@Test
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index 6d4b5f5..09dd99f 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -126,6 +126,10 @@
jcommander.addCommand("set-inactive-topic-policies", new SetInactiveTopicPolicies());
jcommander.addCommand("remove-inactive-topic-policies", new RemoveInactiveTopicPolicies());
+ jcommander.addCommand("get-max-subscriptions-per-topic", new GetMaxSubscriptionsPerTopic());
+ jcommander.addCommand("set-max-subscriptions-per-topic", new SetMaxSubscriptionsPerTopic());
+ jcommander.addCommand("remove-max-subscriptions-per-topic", new RemoveMaxSubscriptionsPerTopic());
+
}
@Parameters(commandDescription = "Get max consumers per subscription for a topic")
@@ -1427,6 +1431,59 @@
}
+ @Parameters(commandDescription = "Get max subscriptions for a topic")
+ private class GetMaxSubscriptionsPerTopic extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = {"--global", "-g"}, description = "Whether to get this policy globally. "
+ + "If set to true, broker returned global topic policies")
+ private boolean isGlobal = false;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ print(getTopicPolicies(isGlobal).getMaxSubscriptionsPerTopic(persistentTopic));
+ }
+ }
+
+ @Parameters(commandDescription = "Set max subscriptions for a topic")
+ private class SetMaxSubscriptionsPerTopic extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = {"--max-subscriptions-per-topic",
+ "-s"}, description = "max subscriptions for a topic (default -1 will be overwrite if not passed)",
+ required = true)
+ private int maxSubscriptionPerTopic;
+
+ @Parameter(names = {"--global", "-g"}, description = "Whether to set this policy globally. "
+ + "If set to true, the policy will be replicate to other clusters asynchronously")
+ private boolean isGlobal = false;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ getTopicPolicies(isGlobal).setMaxSubscriptionsPerTopic(persistentTopic, maxSubscriptionPerTopic);
+ }
+ }
+
+ @Parameters(commandDescription = "Remove max subscriptions for a topic")
+ private class RemoveMaxSubscriptionsPerTopic extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = {"--global", "-g"}, description = "Whether to remove this policy globally. "
+ + "If set to true, the policy will be replicate to other clusters asynchronously")
+ private boolean isGlobal = false;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ getTopicPolicies(isGlobal).removeMaxSubscriptionsPerTopic(persistentTopic);
+ }
+ }
+
private TopicPolicies getTopicPolicies(boolean isGlobal) {
return getAdmin().topicPolicies(isGlobal);
}
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 7883936..572c0be 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -343,6 +343,10 @@
cmdUsageFormatter.addDeprecatedCommand("get-subscription-dispatch-rate");
cmdUsageFormatter.addDeprecatedCommand("set-subscription-dispatch-rate");
cmdUsageFormatter.addDeprecatedCommand("remove-subscription-dispatch-rate");
+
+ cmdUsageFormatter.addDeprecatedCommand("get-max-subscriptions-per-topic");
+ cmdUsageFormatter.addDeprecatedCommand("set-max-subscriptions-per-topic");
+ cmdUsageFormatter.addDeprecatedCommand("remove-max-subscriptions-per-topic");
}
}