ReplicatorDispatchRate support cross multiple clusters (#13624)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 37d5bcb..160ca3e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -3126,8 +3126,8 @@
});
}
- protected CompletableFuture<DispatchRateImpl> internalGetReplicatorDispatchRate(boolean applied) {
- return getTopicPoliciesAsyncWithRetry(topicName)
+ protected CompletableFuture<DispatchRateImpl> internalGetReplicatorDispatchRate(boolean applied, boolean isGlobal) {
+ return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenApply(op -> op.map(TopicPolicies::getReplicatorDispatchRate)
.orElseGet(() -> {
if (applied) {
@@ -3139,11 +3139,13 @@
}));
}
- protected CompletableFuture<Void> internalSetReplicatorDispatchRate(DispatchRateImpl dispatchRate) {
- return getTopicPoliciesAsyncWithRetry(topicName)
+ protected CompletableFuture<Void> internalSetReplicatorDispatchRate(DispatchRateImpl dispatchRate,
+ boolean isGlobal) {
+ return getTopicPoliciesAsyncWithRetry(topicName, isGlobal)
.thenCompose(op -> {
TopicPolicies topicPolicies = op.orElseGet(TopicPolicies::new);
topicPolicies.setReplicatorDispatchRate(dispatchRate);
+ topicPolicies.setIsGlobal(isGlobal);
return pulsar().getTopicPoliciesService().updateTopicPoliciesAsync(topicName, topicPolicies);
});
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
index 6d64f5c..876aa39 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/PersistentTopics.java
@@ -2236,12 +2236,13 @@
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
+ @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@QueryParam("applied") boolean applied,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
- .thenCompose(__ -> internalGetReplicatorDispatchRate(applied))
+ .thenCompose(__ -> internalGetReplicatorDispatchRate(applied, isGlobal))
.thenApply(asyncResponse::resume)
.exceptionally(ex -> {
handleTopicPolicyException("getReplicatorDispatchRate", ex, asyncResponse);
@@ -2262,16 +2263,17 @@
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
+ @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
@ApiParam(value = "Replicator dispatch rate of the topic") DispatchRateImpl dispatchRate) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
- .thenCompose(__ -> internalSetReplicatorDispatchRate(dispatchRate))
+ .thenCompose(__ -> internalSetReplicatorDispatchRate(dispatchRate, isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully updated replicatorDispatchRate: namespace={}, topic={}"
- + ", replicatorDispatchRate={}",
- clientAppId(), namespaceName, topicName.getLocalName(), dispatchRate);
+ + ", replicatorDispatchRate={}, isGlobal={}",
+ clientAppId(), namespaceName, topicName.getLocalName(), dispatchRate, isGlobal);
asyncResponse.resume(Response.noContent().build());
})
.exceptionally(ex -> {
@@ -2292,11 +2294,12 @@
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
+ @QueryParam("isGlobal") @DefaultValue("false") boolean isGlobal,
@ApiParam(value = "Is authentication required to perform this operation")
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateTopicName(tenant, namespace, encodedTopic);
preValidation(authoritative)
- .thenCompose(__ -> internalSetReplicatorDispatchRate(null))
+ .thenCompose(__ -> internalSetReplicatorDispatchRate(null, isGlobal))
.thenRun(() -> {
log.info("[{}] Successfully remove replicatorDispatchRate limit: namespace={}, topic={}",
clientAppId(), namespaceName, topicName.getLocalName());
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
index b0fee97..fc095df 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTopicPoliciesTest.java
@@ -457,6 +457,37 @@
Awaitility.await().untilAsserted(() ->
assertNull(admin3.topicPolicies(true).getSubscriptionDispatchRate(persistentTopicName)));
}
+
+ @Test
+ public void testReplicateReplicatorDispatchRatePolicies() throws Exception {
+ final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
+ final String persistentTopicName = "persistent://" + namespace + "/topic" + UUID.randomUUID();
+
+ init(namespace, persistentTopicName);
+ // set replicator dispatch rate
+ DispatchRate dispatchRate = DispatchRate.builder()
+ .dispatchThrottlingRateInMsg(1)
+ .ratePeriodInSecond(1)
+ .dispatchThrottlingRateInByte(1)
+ .relativeToPublishRate(true)
+ .build();
+ admin1.topicPolicies(true).setReplicatorDispatchRate(persistentTopicName, dispatchRate);
+ // get replicator dispatch rate
+ Awaitility.await().untilAsserted(() ->
+ assertEquals(admin2.topicPolicies(true)
+ .getReplicatorDispatchRate(persistentTopicName), dispatchRate));
+ Awaitility.await().untilAsserted(() ->
+ assertEquals(admin3.topicPolicies(true)
+ .getReplicatorDispatchRate(persistentTopicName), dispatchRate));
+
+ //remove replicator dispatch rate
+ admin1.topicPolicies(true).removeReplicatorDispatchRate(persistentTopicName);
+ Awaitility.await().untilAsserted(() ->
+ assertNull(admin2.topicPolicies(true).getReplicatorDispatchRate(persistentTopicName)));
+ Awaitility.await().untilAsserted(() ->
+ assertNull(admin3.topicPolicies(true).getReplicatorDispatchRate(persistentTopicName)));
+ }
+
@Test
public void testReplicateMaxUnackedMsgPerSub() throws Exception {
final String namespace = "pulsar/partitionedNs-" + UUID.randomUUID();
diff --git a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
index afbef70..cdfc5ef 100644
--- a/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
+++ b/pulsar-client-tools-test/src/test/java/org/apache/pulsar/admin/cli/PulsarAdminToolTest.java
@@ -937,6 +937,18 @@
.ratePeriodInSecond(2)
.build());
+ cmdTopics.run(split("set-replicator-dispatch-rate persistent://myprop/clust/ns1/ds1 -md -1 -bd -1 -dt 2"));
+ verify(mockTopicsPolicies).setReplicatorDispatchRate("persistent://myprop/clust/ns1/ds1",
+ DispatchRate.builder()
+ .dispatchThrottlingRateInMsg(-1)
+ .dispatchThrottlingRateInByte(-1)
+ .ratePeriodInSecond(2)
+ .build());
+ cmdTopics.run(split("get-replicator-dispatch-rate persistent://myprop/clust/ns1/ds1"));
+ verify(mockTopicsPolicies).getReplicatorDispatchRate("persistent://myprop/clust/ns1/ds1", false);
+ cmdTopics.run(split("remove-replicator-dispatch-rate persistent://myprop/clust/ns1/ds1"));
+ verify(mockTopicsPolicies).removeReplicatorDispatchRate("persistent://myprop/clust/ns1/ds1");
+
cmdTopics.run(split("set-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -md -1 -bd -1 -dt 2"));
verify(mockTopicsPolicies).setSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1",
DispatchRate.builder()
@@ -1190,6 +1202,18 @@
cmdTopics.run(split("remove-compaction-threshold persistent://myprop/clust/ns1/ds1 -g"));
verify(mockGlobalTopicsPolicies).removeCompactionThreshold("persistent://myprop/clust/ns1/ds1");
+ cmdTopics.run(split("set-replicator-dispatch-rate persistent://myprop/clust/ns1/ds1 -md -1 -bd -1 -dt 2 -g"));
+ verify(mockGlobalTopicsPolicies).setReplicatorDispatchRate("persistent://myprop/clust/ns1/ds1",
+ DispatchRate.builder()
+ .dispatchThrottlingRateInMsg(-1)
+ .dispatchThrottlingRateInByte(-1)
+ .ratePeriodInSecond(2)
+ .build());
+ cmdTopics.run(split("get-replicator-dispatch-rate persistent://myprop/clust/ns1/ds1 -g"));
+ verify(mockGlobalTopicsPolicies).getReplicatorDispatchRate("persistent://myprop/clust/ns1/ds1", false);
+ cmdTopics.run(split("remove-replicator-dispatch-rate persistent://myprop/clust/ns1/ds1 -g"));
+ verify(mockGlobalTopicsPolicies).removeReplicatorDispatchRate("persistent://myprop/clust/ns1/ds1");
+
cmdTopics.run(split("set-subscription-dispatch-rate persistent://myprop/clust/ns1/ds1 -md -1 -bd -1 -dt 2 -g"));
verify(mockGlobalTopicsPolicies).setSubscriptionDispatchRate("persistent://myprop/clust/ns1/ds1",
DispatchRate.builder()
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
index 8ec3071..6d4b5f5 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopicPolicies.java
@@ -89,6 +89,10 @@
jcommander.addCommand("set-subscription-dispatch-rate", new SetSubscriptionDispatchRate());
jcommander.addCommand("remove-subscription-dispatch-rate", new RemoveSubscriptionDispatchRate());
+ jcommander.addCommand("get-replicator-dispatch-rate", new GetReplicatorDispatchRate());
+ jcommander.addCommand("set-replicator-dispatch-rate", new SetReplicatorDispatchRate());
+ jcommander.addCommand("remove-replicator-dispatch-rate", new RemoveReplicatorDispatchRate());
+
jcommander.addCommand("get-publish-rate", new GetPublishRate());
jcommander.addCommand("set-publish-rate", new SetPublishRate());
jcommander.addCommand("remove-publish-rate", new RemovePublishRate());
@@ -1271,6 +1275,82 @@
}
}
+ @Parameters(commandDescription = "Get replicator message-dispatch-rate for a topic")
+ private class GetReplicatorDispatchRate extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "-ap", "--applied" }, description = "Get the applied policy of the topic")
+ private boolean applied = false;
+
+ @Parameter(names = { "--global", "-g" }, description = "Whether to get this policy globally. "
+ + "If set to true, broker returned global topic policies")
+ private boolean isGlobal = false;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ print(getTopicPolicies(isGlobal).getReplicatorDispatchRate(persistentTopic, applied));
+ }
+ }
+
+ @Parameters(commandDescription = "Set replicator message-dispatch-rate for a topic")
+ private class SetReplicatorDispatchRate extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "--msg-dispatch-rate",
+ "-md" }, description = "message-dispatch-rate (default -1 will be overwrite if not passed)")
+ private int msgDispatchRate = -1;
+
+ @Parameter(names = { "--byte-dispatch-rate",
+ "-bd" }, description = "byte-dispatch-rate (default -1 will be overwrite if not passed)")
+ private long byteDispatchRate = -1;
+
+ @Parameter(names = {"--dispatch-rate-period",
+ "-dt"}, description = "dispatch-rate-period in second type (default 1 second will be overwrite if not"
+ + " passed)")
+ private int dispatchRatePeriodSec = 1;
+
+ @Parameter(names = {"--relative-to-publish-rate",
+ "-rp"}, description = "dispatch rate relative to publish-rate (if publish-relative flag is enabled "
+ + "then broker will apply throttling value to (publish-rate + dispatch rate))")
+ private boolean relativeToPublishRate = false;
+
+ @Parameter(names = { "--global", "-g" }, description = "Whether to set this policy globally. "
+ + "If set to true, the policy will be replicate to other clusters asynchronously")
+ private boolean isGlobal = false;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ getTopicPolicies(isGlobal).setReplicatorDispatchRate(persistentTopic,
+ DispatchRate.builder()
+ .dispatchThrottlingRateInMsg(msgDispatchRate)
+ .dispatchThrottlingRateInByte(byteDispatchRate)
+ .ratePeriodInSecond(dispatchRatePeriodSec)
+ .relativeToPublishRate(relativeToPublishRate)
+ .build());
+ }
+ }
+
+ @Parameters(commandDescription = "Remove replicator message-dispatch-rate for a topic")
+ private class RemoveReplicatorDispatchRate extends CliCommand {
+ @Parameter(description = "persistent://tenant/namespace/topic", required = true)
+ private java.util.List<String> params;
+
+ @Parameter(names = { "--global", "-g" }, description = "Whether to remove this policy globally. "
+ + "If set to true, the policy will be replicate to other clusters asynchronously")
+ private boolean isGlobal = false;
+
+ @Override
+ void run() throws PulsarAdminException {
+ String persistentTopic = validatePersistentTopic(params);
+ getTopicPolicies(isGlobal).removeReplicatorDispatchRate(persistentTopic);
+ }
+
+ }
+
@Parameters(commandDescription = "Get subscription message-dispatch-rate for a topic")
private class GetSubscriptionDispatchRate extends CliCommand {
@Parameter(description = "persistent://tenant/namespace/topic", required = true)
diff --git a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
index b9d5ad1..7883936 100644
--- a/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
+++ b/pulsar-client-tools/src/main/java/org/apache/pulsar/admin/cli/CmdTopics.java
@@ -336,6 +336,10 @@
cmdUsageFormatter.addDeprecatedCommand("set-max-producers");
cmdUsageFormatter.addDeprecatedCommand("remove-max-producers");
+ cmdUsageFormatter.addDeprecatedCommand("get-replicator-dispatch-rate");
+ cmdUsageFormatter.addDeprecatedCommand("set-replicator-dispatch-rate");
+ cmdUsageFormatter.addDeprecatedCommand("remove-replicator-dispatch-rate");
+
cmdUsageFormatter.addDeprecatedCommand("get-subscription-dispatch-rate");
cmdUsageFormatter.addDeprecatedCommand("set-subscription-dispatch-rate");
cmdUsageFormatter.addDeprecatedCommand("remove-subscription-dispatch-rate");