Subscribe rate support cross multiple clusters (#13561)
Subscribe rate support cross multiple clusters
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 694f554..2cb784e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -4320,8 +4320,8 @@
});
}
- protected CompletableFuture<SubscribeRate> internalGetSubscribeRate(boolean applied) {
- return getTopicPoliciesAsyncWithRetry(topicName)
+ protected CompletableFuture<SubscribeRate> internalGetSubscribeRate(boolean applied, boolean isGlobal) {
+ return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getSubscribeRate)
.orElseGet(() -> {
if (applied) {
@@ -4333,25 +4333,27 @@
}));
}
- protected CompletableFuture<Void> internalSetSubscribeRate(SubscribeRate subscribeRate) {
+ protected CompletableFuture<Void> internalSetSubscribeRate(SubscribeRate subscribeRate, boolean isGlobal) {
if (subscribeRate == null) {
return CompletableFuture.completedFuture(null);
}
- return getTopicPoliciesAsyncWithRetry(topicName)
+ return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setSubscribeRate(subscribeRate);
+ topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
}
- protected CompletableFuture<Void> internalRemoveSubscribeRate() {
- return getTopicPoliciesAsyncWithRetry(topicName)
+ protected CompletableFuture<Void> internalRemoveSubscribeRate(boolean isGlobal) {
+ return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
if (!op.isPresent()) {
return CompletableFuture.completedFuture(null);
}
op.get().setSubscribeRate(null);
+ op.get().setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, op.get());
});
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index e213278..72ef853 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -3368,11 +3368,12 @@
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("applied") boolean applied,
+ @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
- .thenCompose(__ -> internalGetSubscribeRate(applied))
+ .thenCompose(__ -> internalGetSubscribeRate(applied, isGlobal))
.thenApply(asyncResponse::resume).exceptionally(ex -> {
handleTopicPolicyException("getSubscribeRate", ex, asyncResponse);
return null;
@@ -3392,20 +3393,22 @@
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
+ @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Subscribe rate for the specified topic") SubscribeRate subscribeRate) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
- .thenCompose(__ -> internalSetSubscribeRate(subscribeRate))
+ .thenCompose(__ -> internalSetSubscribeRate(subscribeRate, isGlobal))
.thenRun(() -> {
try {
log.info("[{}] Successfully set topic subscribe rate:"
- + " tenant={}, namespace={}, topic={}, subscribeRate={}",
+ + " tenant={}, namespace={}, topic={}, isGlobal={} subscribeRate={}",
clientAppId(),
tenant,
namespace,
topicName.getLocalName(),
+ isGlobal,
jsonMapper().writeValueAsString(subscribeRate));
} catch (JsonProcessingException ignore) {}
asyncResponse.resume(Response.noContent().build());
@@ -3428,17 +3431,20 @@
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
+ @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Subscribe rate for the specified topic") SubscribeRate subscribeRate) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
- .thenCompose(__ -> internalRemoveSubscribeRate())
+ .thenCompose(__ -> internalRemoveSubscribeRate(isGlobal))
.thenRun(() -> {
- log.info("[{}] Successfully remove topic subscribe rate: tenant={}, namespace={}, topic={}",
+ log.info(
+ "[{}] Successfully remove topic subscribe rate: tenant={}, namespace={}, topic={}, isGlobal={}",
clientAppId(),
tenant,
namespace,
- topicName.getLocalName());
+ topicName.getLocalName(),
+ isGlobal);
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(ex -> {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
index 365fe00..4a21ed4 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
@@ -33,12 +33,13 @@
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.common.naming.TopicName;
+import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
-import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.policies.data.impl.BacklogQuotaImpl;
import org.awaitility.Awaitility;
import org.testng.annotations.AfterClass;
@@ -115,6 +116,24 @@
}
@Test
+ public void testReplicateSubscribeRatePolicies() throws Exception {
+ final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
+ final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();
+ init(namespace, topic);
+ // set global topic policy
+ SubscribeRate subscribeRate = new SubscribeRate(100, 10000);
+ admin1.topicPolicies(true).setSubscribeRate(topic, subscribeRate);
+
+ // get global topic policy
+ untilRemoteClustersAsserted(
+ admin -> assertEquals(admin.topicPolicies(true).getSubscribeRate(topic), subscribeRate));
+
+ // remove global topic policy
+ admin1.topicPolicies(true).removeSubscribeRate(topic);
+ untilRemoteClustersAsserted(admin -> assertNull(admin.topicPolicies(true).getSubscribeRate(topic)));
+ }
+
+ @Test
public void testReplicatePublishRatePolicies() throws Exception {
final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
final String topic = "persistent://" + namespace + "/topic" + UUID.randomUUID();
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index f6a68e9..d8dd0dc 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -957,6 +957,13 @@
cmdTopics.run(split("remove-publish-rate persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).removePublishRate("persistent://myprop/clust/ns1/ds1");
+ cmdTopics.run(split("get-subscribe-rate persistent://myprop/clust/ns1/ds1 -ap"));
+ verify(mockTopicsPolicies).getSubscribeRate("persistent://myprop/clust/ns1/ds1", true);
+ cmdTopics.run(split("set-subscribe-rate persistent://myprop/clust/ns1/ds1 -sr 10 -st 100"));
+ verify(mockTopicsPolicies).setSubscribeRate("persistent://myprop/clust/ns1/ds1", new SubscribeRate(10, 100));
+ cmdTopics.run(split("remove-subscribe-rate persistent://myprop/clust/ns1/ds1"));
+ verify(mockTopicsPolicies).removeSubscribeRate("persistent://myprop/clust/ns1/ds1");
+
cmdTopics.run(split("get-max-consumers persistent://myprop/clust/ns1/ds1"));
verify(mockTopicsPolicies).getMaxConsumers("persistent://myprop/clust/ns1/ds1", false);
cmdTopics.run(split("remove-max-consumers persistent://myprop/clust/ns1/ds1"));
@@ -1103,6 +1110,13 @@
cmdTopics.run(split("remove-publish-rate persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).removePublishRate("persistent://myprop/clust/ns1/ds1");
+ cmdTopics.run(split("get-subscribe-rate persistent://myprop/clust/ns1/ds1 -ap -g"));
+ verify(mockGlobalTopicsPolicies).getSubscribeRate("persistent://myprop/clust/ns1/ds1", true);
+ cmdTopics.run(split("set-subscribe-rate persistent://myprop/clust/ns1/ds1 -sr 10 -st 100 -g"));
+ verify(mockGlobalTopicsPolicies).setSubscribeRate("persistent://myprop/clust/ns1/ds1", new SubscribeRate(10, 100));
+ cmdTopics.run(split("remove-subscribe-rate persistent://myprop/clust/ns1/ds1 -g"));
+ verify(mockGlobalTopicsPolicies).removeSubscribeRate("persistent://myprop/clust/ns1/ds1");
+
cmdTopics.run(split("get-deduplication persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).getDeduplicationStatus("persistent://myprop/clust/ns1/ds1");
cmdTopics.run(split("set-deduplication persistent://myprop/clust/ns1/ds1 --disable -g"));
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index aa906ab..fc93b8d 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -39,6 +39,7 @@
import org.apache.pulsar.common.policies.data.PersistencePolicies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
+import org.apache.pulsar.common.policies.data.SubscribeRate;
import org.apache.pulsar.common.util.RelativeTimeUtil;
@Parameters(commandDescription = "Operations on persistent topics")
@@ -88,6 +89,10 @@
jcommander.addCommand("set-publish-rate", new SetPublishRate());
jcommander.addCommand("remove-publish-rate", new RemovePublishRate());
+ jcommander.addCommand("get-subscribe-rate", new GetSubscribeRate());
+ jcommander.addCommand("set-subscribe-rate", new SetSubscribeRate());
+ jcommander.addCommand("remove-subscribe-rate", new RemoveSubscribeRate());
+
jcommander.addCommand("get-max-consumers", new GetMaxConsumers());
jcommander.addCommand("set-max-consumers", new SetMaxConsumers());
jcommander.addCommand("remove-max-consumers", new RemoveMaxConsumers());
@@ -781,6 +786,64 @@
}
}
+ @Parameters(commandDescription = "Get consumer subscribe rate for a topic")
+ private class GetSubscribeRate extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
+ private boolean applied = false;
+
+ @Parameter(names = {"--global", "-g"}, description = "Whether to get this policy globally. "
+ + "If set to true, broker returns global topic policies")
+ private boolean isGlobal = false;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ print(getTopicPolicies(isGlobal).getSubscribeRate(persistentTopic, applied));
+ }
+ }
+
+ @Parameters(commandDescription = "Set consumer subscribe rate for a topic")
+ private class SetSubscribeRate extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "--subscribe-rate",
+ "-sr" }, description = "subscribe-rate (default -1 will be overwrite if not passed)", required = false)
+ private int subscribeRate = -1;
+
+ @Parameter(names = { "--subscribe-rate-period",
+ "-st" }, description = "subscribe-rate-period in second type (default 30 second will be overwrite if not passed)", required = false)
+ private int subscribeRatePeriodSec = 30;
+
+ @Parameter(names = {"--global", "-g"}, description = "Whether to set this policy globally.")
+ private boolean isGlobal = false;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ getTopicPolicies(isGlobal).setSubscribeRate(persistentTopic,
+ new SubscribeRate(subscribeRate, subscribeRatePeriodSec));
+ }
+ }
+
+ @Parameters(commandDescription = "Remove consumer subscribe rate for a topic")
+ private class RemoveSubscribeRate extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = {"--global", "-g"}, description = "Whether to remove this policy globally. ")
+ private boolean isGlobal = false;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ getTopicPolicies(isGlobal).removeSubscribeRate(persistentTopic);
+ }
+ }
+
@Parameters(commandDescription = "Get the persistence policies for a topic")
private class GetPersistence extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index 5876165..79e3c2c 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -282,6 +282,10 @@
cmdUsageFormatter.addDeprecatedCommand("set-publish-rate");
cmdUsageFormatter.addDeprecatedCommand("remove-publish-rate");
+ cmdUsageFormatter.addDeprecatedCommand("get-subscribe-rate");
+ cmdUsageFormatter.addDeprecatedCommand("set-subscribe-rate");
+ cmdUsageFormatter.addDeprecatedCommand("remove-subscribe-rate");
+
cmdUsageFormatter.addDeprecatedCommand("get-maxProducers");
cmdUsageFormatter.addDeprecatedCommand("set-maxProducers");
cmdUsageFormatter.addDeprecatedCommand("remove-maxProducers");