Support broker level dispatch rate limiter (#11325)
### Motivation
Fixes #7720
Current we support broker level publish rate limiter.
This PR will add broker level dispatch rate limiter.
### Modifications
Add broker level dispatch rate limiter for msgs and bytes.
Refact calculateToRead() to reduce duplicate code.
### Verifying this change
Add testBrokerBytesRateLimitingReceiveAllMessagesAfterThrottling()
diff --git a/conf/broker.conf b/conf/broker.conf
index fcf0717..a871dc3 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -355,6 +355,14 @@
# Rate period for {subscribeThrottlingRatePerConsumer}. Default is 30s.
subscribeRatePeriodPerConsumerInSecond=30
+# Default messages per second dispatch throttling-limit for whole broker. Using a value of 0, is disabling default
+# message dispatch-throttling
+dispatchThrottlingRateInMsg=0
+
+# Default bytes per second dispatch throttling-limit for whole broker. Using a value of 0, is disabling
+# default message-byte dispatch-throttling
+dispatchThrottlingRateInByte=0
+
# Default messages per second dispatch throttling-limit for every topic. Using a value of 0, is disabling default
# message dispatch-throttling
dispatchThrottlingRatePerTopicInMsg=0
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index af6ec5a..1e93dc5 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -750,6 +750,20 @@
)
private long brokerPublisherThrottlingMaxByteRate = 0;
@FieldContext(
+ category = CATEGORY_SERVER,
+ dynamic = true,
+ doc = "Default messages per second dispatch throttling-limit for whole broker. "
+ + "Using a value of 0, is disabling default message-byte dispatch-throttling"
+ )
+ private int dispatchThrottlingRateInMsg = 0;
+ @FieldContext(
+ category = CATEGORY_SERVER,
+ dynamic = true,
+ doc = "Default bytes per second dispatch throttling-limit for whole broker. "
+ + "Using a value of 0, is disabling default message-byte dispatch-throttling"
+ )
+ private long dispatchThrottlingRateInByte = 0;
+ @FieldContext(
category = CATEGORY_SERVER,
dynamic = true,
doc = "Max Rate(in 1 seconds) of Message allowed to publish for a topic "
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 9d0043e..64bc611 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -35,6 +35,7 @@
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.intercept.BrokerInterceptor;
+import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
@@ -270,6 +271,26 @@
// noop
}
+ protected abstract void reScheduleRead();
+
+ protected boolean reachDispatchRateLimit(DispatchRateLimiter dispatchRateLimiter) {
+ if (dispatchRateLimiter.isDispatchRateLimitingEnabled()) {
+ if (!dispatchRateLimiter.hasMessageDispatchPermit()) {
+ reScheduleRead();
+ return true;
+ }
+ }
+ return false;
+ }
+
+ protected Pair<Integer, Long> updateMessagesToRead(DispatchRateLimiter dispatchRateLimiter,
+ int messagesToRead, long bytesToRead) {
+ // update messagesToRead according to available dispatch rate limit.
+ return computeReadLimits(messagesToRead,
+ (int) dispatchRateLimiter.getAvailableDispatchRateLimitOnMsg(),
+ bytesToRead, dispatchRateLimiter.getAvailableDispatchRateLimitOnByte());
+ }
+
protected static Pair<Integer, Long> computeReadLimits(int messagesToRead, int availablePermitsOnMsg,
long bytesToRead, long availablePermitsOnByte) {
if (availablePermitsOnMsg > 0) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index c2a27ad..613961d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -235,6 +235,7 @@
private ScheduledExecutorService brokerPublishRateLimiterMonitor;
private ScheduledExecutorService deduplicationSnapshotMonitor;
protected volatile PublishRateLimiter brokerPublishRateLimiter = PublishRateLimiter.DISABLED_RATE_LIMITER;
+ protected volatile DispatchRateLimiter brokerDispatchRateLimiter = null;
private DistributedIdGenerator producerNameGenerator;
@@ -495,6 +496,7 @@
this.startConsumedLedgersMonitor();
this.startBacklogQuotaChecker();
this.updateBrokerPublisherThrottlingMaxRate();
+ this.updateBrokerDispatchThrottlingMaxRate();
this.startCheckReplicationPolicies();
this.startDeduplicationSnapshotMonitor();
}
@@ -2146,7 +2148,13 @@
registerConfigurationListener("brokerPublisherThrottlingMaxByteRate",
(brokerPublisherThrottlingMaxByteRate) ->
updateBrokerPublisherThrottlingMaxRate());
-
+ // add listener to notify broker dispatch-rate dynamic config
+ registerConfigurationListener("dispatchThrottlingRateInMsg",
+ (dispatchThrottlingRateInMsg) ->
+ updateBrokerDispatchThrottlingMaxRate());
+ registerConfigurationListener("dispatchThrottlingRateInByte",
+ (dispatchThrottlingRateInByte) ->
+ updateBrokerDispatchThrottlingMaxRate());
// add listener to notify topic publish-rate monitoring
if (!preciseTopicPublishRateLimitingEnable) {
registerConfigurationListener("topicPublisherThrottlingTickTimeMillis",
@@ -2161,6 +2169,14 @@
// add more listeners here
}
+ private void updateBrokerDispatchThrottlingMaxRate() {
+ if (brokerDispatchRateLimiter == null) {
+ brokerDispatchRateLimiter = new DispatchRateLimiter(this);
+ } else {
+ brokerDispatchRateLimiter.updateDispatchRate();
+ }
+ }
+
private void updateBrokerPublisherThrottlingMaxRate() {
int currentMaxMessageRate = pulsar.getConfiguration().getBrokerPublisherThrottlingMaxMessageRate();
long currentMaxByteRate = pulsar.getConfiguration().getBrokerPublisherThrottlingMaxByteRate();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 6f5fd43..ba7ad0d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -274,6 +274,10 @@
return Optional.empty();
}
+ default Optional<DispatchRateLimiter> getBrokerDispatchRateLimiter() {
+ return Optional.empty();
+ }
+
default boolean isSystemTopic() {
return false;
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
index fae2344..7fff7a7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
@@ -218,6 +218,11 @@
return consumer != null && consumer.getAvailablePermits() > 0 && consumer.isWritable();
}
+ @Override
+ protected void reScheduleRead() {
+ // No-op
+ }
+
private static final Logger log = LoggerFactory.getLogger(NonPersistentDispatcherMultipleConsumers.class);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
index ce6a30b..9c404d5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
@@ -110,4 +110,9 @@
protected void cancelPendingRead() {
// No-op
}
+
+ @Override
+ protected void reScheduleRead() {
+ // No-op
+ }
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index 928c9d9..528f2d8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -40,7 +40,8 @@
public enum Type {
TOPIC,
SUBSCRIPTION,
- REPLICATOR
+ REPLICATOR,
+ BROKER
}
private final PersistentTopic topic;
@@ -59,6 +60,14 @@
updateDispatchRate();
}
+ public DispatchRateLimiter(BrokerService brokerService) {
+ this.topic = null;
+ this.topicName = null;
+ this.brokerService = brokerService;
+ this.type = Type.BROKER;
+ updateDispatchRate();
+ }
+
/**
* returns available msg-permit if msg-dispatch-throttling is enabled else it returns -1.
*
@@ -136,6 +145,10 @@
dispatchThrottlingRateInMsg = config.getDispatchThrottlingRatePerReplicatorInMsg();
dispatchThrottlingRateInByte = config.getDispatchThrottlingRatePerReplicatorInByte();
break;
+ case BROKER:
+ dispatchThrottlingRateInMsg = config.getDispatchThrottlingRateInMsg();
+ dispatchThrottlingRateInByte = config.getDispatchThrottlingRateInByte();
+ break;
default:
dispatchThrottlingRateInMsg = -1;
dispatchThrottlingRateInByte = -1;
@@ -146,7 +159,7 @@
.dispatchThrottlingRateInMsg(dispatchThrottlingRateInMsg)
.dispatchThrottlingRateInByte(dispatchThrottlingRateInByte)
.ratePeriodInSecond(1)
- .relativeToPublishRate(config.isDispatchThrottlingRateRelativeToPublishRate())
+ .relativeToPublishRate(type != Type.BROKER && config.isDispatchThrottlingRateRelativeToPublishRate())
.build();
}
@@ -163,9 +176,12 @@
dispatchRateOp = Optional.of(createDispatchRate());
}
updateDispatchRate(dispatchRateOp.get());
- log.info("[{}] configured {} message-dispatch rate at broker {}", this.topicName, type,
- dispatchRateOp.get());
-
+ if (type == Type.BROKER) {
+ log.info("configured broker message-dispatch rate {}", dispatchRate.get());
+ } else {
+ log.info("[{}] configured {} message-dispatch rate at broker {}",
+ this.topicName, type, dispatchRate.get());
+ }
}).exceptionally(ex -> {
log.error("[{}] failed to get the dispatch rate policy from the namespace resource for type {}",
topicName, type, ex);
@@ -180,6 +196,10 @@
public static boolean isDispatchRateNeeded(BrokerService brokerService, Optional<Policies> policies,
String topicName, Type type) {
final ServiceConfiguration serviceConfig = brokerService.pulsar().getConfiguration();
+ if (type == Type.BROKER) {
+ return brokerService.getBrokerDispatchRateLimiter().isDispatchRateLimitingEnabled();
+ }
+
Optional<DispatchRate> dispatchRate = getTopicPolicyDispatchRate(brokerService, topicName, type);
if (dispatchRate.isPresent()) {
return true;
@@ -318,6 +338,9 @@
* @return
*/
public CompletableFuture<Optional<DispatchRate>> getPoliciesDispatchRateAsync(BrokerService brokerService) {
+ if (topicName == null) {
+ return CompletableFuture.completedFuture(Optional.empty());
+ }
final String cluster = brokerService.pulsar().getConfiguration().getClusterName();
return getPoliciesAsync(brokerService, topicName).thenApply(policiesOp ->
Optional.ofNullable(getPoliciesDispatchRate(cluster, policiesOp, type)));
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 0ccbc02..0def122 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -282,6 +282,12 @@
}
}
+ @Override
+ protected void reScheduleRead() {
+ topic.getBrokerService().executor().schedule(() -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS,
+ TimeUnit.MILLISECONDS);
+ }
+
// left pair is messagesToRead, right pair is bytesToRead
protected Pair<Integer, Long> calculateToRead(int currentTotalAvailablePermits) {
int messagesToRead = Math.min(currentTotalAvailablePermits, readBatchSize);
@@ -307,51 +313,56 @@
// active-cursor reads message from cache rather from bookkeeper (2) if topic has reached message-rate
// threshold: then schedule the read after MESSAGE_RATE_BACKOFF_MS
if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
- if (topic.getDispatchRateLimiter().isPresent()
- && topic.getDispatchRateLimiter().get().isDispatchRateLimitingEnabled()) {
+ if (topic.getBrokerDispatchRateLimiter().isPresent()) {
+ DispatchRateLimiter brokerRateLimiter = topic.getBrokerDispatchRateLimiter().get();
+ if (reachDispatchRateLimit(brokerRateLimiter)) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] message-read exceeded broker message-rate {}/{}, schedule after a {}", name,
+ brokerRateLimiter.getDispatchRateOnMsg(), brokerRateLimiter.getDispatchRateOnByte(),
+ MESSAGE_RATE_BACKOFF_MS);
+ }
+ return Pair.of(-1, -1L);
+ } else {
+ Pair<Integer, Long> calculateToRead =
+ updateMessagesToRead(brokerRateLimiter, messagesToRead, bytesToRead);
+ messagesToRead = calculateToRead.getLeft();
+ bytesToRead = calculateToRead.getRight();
+ }
+ }
+
+ if (topic.getDispatchRateLimiter().isPresent()) {
DispatchRateLimiter topicRateLimiter = topic.getDispatchRateLimiter().get();
- if (!topicRateLimiter.hasMessageDispatchPermit()) {
+ if (reachDispatchRateLimit(topicRateLimiter)) {
if (log.isDebugEnabled()) {
log.debug("[{}] message-read exceeded topic message-rate {}/{}, schedule after a {}", name,
topicRateLimiter.getDispatchRateOnMsg(), topicRateLimiter.getDispatchRateOnByte(),
MESSAGE_RATE_BACKOFF_MS);
}
- topic.getBrokerService().executor().schedule(() -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS,
- TimeUnit.MILLISECONDS);
return Pair.of(-1, -1L);
} else {
- Pair<Integer, Long> calculateResult = computeReadLimits(messagesToRead,
- (int) topicRateLimiter.getAvailableDispatchRateLimitOnMsg(),
- bytesToRead, topicRateLimiter.getAvailableDispatchRateLimitOnByte());
-
- messagesToRead = calculateResult.getLeft();
- bytesToRead = calculateResult.getRight();
-
+ Pair<Integer, Long> calculateToRead =
+ updateMessagesToRead(topicRateLimiter, messagesToRead, bytesToRead);
+ messagesToRead = calculateToRead.getLeft();
+ bytesToRead = calculateToRead.getRight();
}
}
- if (dispatchRateLimiter.isPresent() && dispatchRateLimiter.get().isDispatchRateLimitingEnabled()) {
- if (!dispatchRateLimiter.get().hasMessageDispatchPermit()) {
+ if (dispatchRateLimiter.isPresent()) {
+ if (reachDispatchRateLimit(dispatchRateLimiter.get())) {
if (log.isDebugEnabled()) {
- log.debug("[{}] message-read exceeded subscription message-rate {}/{},"
- + " schedule after a {}", name,
- dispatchRateLimiter.get().getDispatchRateOnMsg(),
+ log.debug("[{}] message-read exceeded subscription message-rate {}/{}, schedule after a {}",
+ name, dispatchRateLimiter.get().getDispatchRateOnMsg(),
dispatchRateLimiter.get().getDispatchRateOnByte(),
MESSAGE_RATE_BACKOFF_MS);
}
- topic.getBrokerService().executor().schedule(() -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS,
- TimeUnit.MILLISECONDS);
return Pair.of(-1, -1L);
} else {
- Pair<Integer, Long> calculateResult = computeReadLimits(messagesToRead,
- (int) dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg(),
- bytesToRead, dispatchRateLimiter.get().getAvailableDispatchRateLimitOnByte());
-
- messagesToRead = calculateResult.getLeft();
- bytesToRead = calculateResult.getRight();
+ Pair<Integer, Long> calculateToRead =
+ updateMessagesToRead(dispatchRateLimiter.get(), messagesToRead, bytesToRead);
+ messagesToRead = calculateToRead.getLeft();
+ bytesToRead = calculateToRead.getRight();
}
}
-
}
if (havePendingReplayRead) {
@@ -362,7 +373,9 @@
}
// If messagesToRead is 0 or less, correct it to 1 to prevent IllegalArgumentException
- return Pair.of(Math.max(messagesToRead, 1), Math.max(bytesToRead, 1));
+ messagesToRead = Math.max(messagesToRead, 1);
+ bytesToRead = Math.max(bytesToRead, 1);
+ return Pair.of(messagesToRead, bytesToRead);
}
protected Set<? extends Position> asyncReplayEntries(Set<? extends Position> positions) {
@@ -578,6 +591,9 @@
// acquire message-dispatch permits for already delivered messages
long permits = dispatchThrottlingOnBatchMessageEnabled ? totalEntries : totalMessagesSent;
if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
+ if (topic.getBrokerDispatchRateLimiter().isPresent()) {
+ topic.getBrokerDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent);
+ }
if (topic.getDispatchRateLimiter().isPresent()) {
topic.getDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 043f64d..4937e86 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -218,6 +218,11 @@
: sendMessageInfo.getTotalMessages();
// acquire message-dispatch permits for already delivered messages
if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
+ if (topic.getBrokerDispatchRateLimiter().isPresent()) {
+ topic.getBrokerDispatchRateLimiter().get().tryDispatchPermit(permits,
+ sendMessageInfo.getTotalBytes());
+ }
+
if (topic.getDispatchRateLimiter().isPresent()) {
topic.getDispatchRateLimiter().get().tryDispatchPermit(permits,
sendMessageInfo.getTotalBytes());
@@ -356,6 +361,23 @@
}
}
+ @Override
+ protected void reScheduleRead() {
+ topic.getBrokerService().executor().schedule(() -> {
+ Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+ if (currentConsumer != null && !havePendingRead) {
+ readMoreEntries(currentConsumer);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] Skipping read retry for topic: Current Consumer {},"
+ + " havePendingRead {}",
+ topic.getName(), currentConsumer, havePendingRead);
+ }
+ }
+ }, MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
+
+ }
+
protected Pair<Integer, Long> calculateToRead(Consumer consumer) {
int availablePermits = consumer.getAvailablePermits();
if (!consumer.isWritable()) {
@@ -378,75 +400,62 @@
// active-cursor reads message from cache rather from bookkeeper (2) if topic has reached message-rate
// threshold: then schedule the read after MESSAGE_RATE_BACKOFF_MS
if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
- if (topic.getDispatchRateLimiter().isPresent()
- && topic.getDispatchRateLimiter().get().isDispatchRateLimitingEnabled()) {
+ if (topic.getBrokerDispatchRateLimiter().isPresent()) {
+ DispatchRateLimiter brokerRateLimiter = topic.getBrokerDispatchRateLimiter().get();
+ if (reachDispatchRateLimit(brokerRateLimiter)) {
+ if (log.isDebugEnabled()) {
+ log.debug("[{}] message-read exceeded broker message-rate {}/{}, schedule after a {}", name,
+ brokerRateLimiter.getDispatchRateOnMsg(), brokerRateLimiter.getDispatchRateOnByte(),
+ MESSAGE_RATE_BACKOFF_MS);
+ }
+ return Pair.of(-1, -1L);
+ } else {
+ Pair<Integer, Long> calculateToRead =
+ updateMessagesToRead(brokerRateLimiter, messagesToRead, bytesToRead);
+ messagesToRead = calculateToRead.getLeft();
+ bytesToRead = calculateToRead.getRight();
+ }
+ }
+
+ if (topic.getDispatchRateLimiter().isPresent()) {
DispatchRateLimiter topicRateLimiter = topic.getDispatchRateLimiter().get();
- if (!topicRateLimiter.hasMessageDispatchPermit()) {
+ if (reachDispatchRateLimit(topicRateLimiter)) {
if (log.isDebugEnabled()) {
log.debug("[{}] message-read exceeded topic message-rate {}/{}, schedule after a {}", name,
topicRateLimiter.getDispatchRateOnMsg(), topicRateLimiter.getDispatchRateOnByte(),
MESSAGE_RATE_BACKOFF_MS);
}
- topic.getBrokerService().executor().schedule(() -> {
- Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
- if (currentConsumer != null && !havePendingRead) {
- readMoreEntries(currentConsumer);
- } else {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Skipping read retry for topic: Current Consumer {},"
- + " havePendingRead {}",
- topic.getName(), currentConsumer, havePendingRead);
- }
- }
- }, MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
return Pair.of(-1, -1L);
} else {
-
- Pair<Integer, Long> calculateResult = computeReadLimits(messagesToRead,
- (int) topicRateLimiter.getAvailableDispatchRateLimitOnMsg(),
- bytesToRead, topicRateLimiter.getAvailableDispatchRateLimitOnByte());
-
- messagesToRead = calculateResult.getLeft();
- bytesToRead = calculateResult.getRight();
-
+ Pair<Integer, Long> calculateToRead =
+ updateMessagesToRead(topicRateLimiter, messagesToRead, bytesToRead);
+ messagesToRead = calculateToRead.getLeft();
+ bytesToRead = calculateToRead.getRight();
}
}
- if (dispatchRateLimiter.isPresent() && dispatchRateLimiter.get().isDispatchRateLimitingEnabled()) {
- if (!dispatchRateLimiter.get().hasMessageDispatchPermit()) {
+ if (dispatchRateLimiter.isPresent()) {
+ if (reachDispatchRateLimit(dispatchRateLimiter.get())) {
if (log.isDebugEnabled()) {
- log.debug("[{}] message-read exceeded subscription message-rate {}/{},"
- + " schedule after a {}",
+ log.debug("[{}] message-read exceeded subscription message-rate {}/{}, schedule after a {}",
name, dispatchRateLimiter.get().getDispatchRateOnMsg(),
dispatchRateLimiter.get().getDispatchRateOnByte(),
MESSAGE_RATE_BACKOFF_MS);
}
- topic.getBrokerService().executor().schedule(() -> {
- Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
- if (currentConsumer != null && !havePendingRead) {
- readMoreEntries(currentConsumer);
- } else {
- if (log.isDebugEnabled()) {
- log.debug("[{}] Skipping read retry: Current Consumer {}, havePendingRead {}",
- topic.getName(), currentConsumer, havePendingRead);
- }
- }
- }, MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
return Pair.of(-1, -1L);
} else {
-
- Pair<Integer, Long> calculateResult = computeReadLimits(messagesToRead,
- (int) dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg(),
- bytesToRead, dispatchRateLimiter.get().getAvailableDispatchRateLimitOnByte());
-
- messagesToRead = calculateResult.getLeft();
- bytesToRead = calculateResult.getRight();
+ Pair<Integer, Long> calculateToRead =
+ updateMessagesToRead(dispatchRateLimiter.get(), messagesToRead, bytesToRead);
+ messagesToRead = calculateToRead.getLeft();
+ bytesToRead = calculateToRead.getRight();
}
}
}
// If messagesToRead is 0 or less, correct it to 1 to prevent IllegalArgumentException
- return Pair.of(Math.max(messagesToRead, 1), Math.max(bytesToRead, 1));
+ messagesToRead = Math.max(messagesToRead, 1);
+ bytesToRead = Math.max(bytesToRead, 1);
+ return Pair.of(messagesToRead, bytesToRead);
}
@Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 116a9eb..1a9ed78 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -285,6 +285,9 @@
// acquire message-dispatch permits for already delivered messages
if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
long permits = dispatchThrottlingOnBatchMessageEnabled ? totalEntries : totalMessagesSent;
+ if (topic.getBrokerDispatchRateLimiter().isPresent()) {
+ topic.getBrokerDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent);
+ }
if (topic.getDispatchRateLimiter().isPresent()) {
topic.getDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent);
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index cd8c83e..104575e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2699,6 +2699,11 @@
return this.dispatchRateLimiter;
}
+ @Override
+ public Optional<DispatchRateLimiter> getBrokerDispatchRateLimiter() {
+ return Optional.ofNullable(this.brokerService.getBrokerDispatchRateLimiter());
+ }
+
public Optional<SubscribeRateLimiter> getSubscribeRateLimiter() {
return this.subscribeRateLimiter;
}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
index faa86fb..9e65eec 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
@@ -20,6 +20,7 @@
import com.google.common.collect.Sets;
+import java.time.Duration;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
@@ -31,6 +32,7 @@
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.Assert;
@@ -309,6 +311,206 @@
log.info("-- Exiting {} test --", methodName);
}
+ private void testDispatchRate(SubscriptionType subscription,
+ int brokerRate, int topicRate, int subRate, int expectRate) throws Exception {
+
+ final String namespace = "my-property/throttling_ns";
+ final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/throttlingAll");
+ final String subName = "my-subscriber-name-" + subscription;
+
+ DispatchRate subscriptionDispatchRate = DispatchRate.builder()
+ .dispatchThrottlingRateInMsg(-1)
+ .dispatchThrottlingRateInByte(subRate)
+ .ratePeriodInSecond(1)
+ .build();
+ DispatchRate topicDispatchRate = DispatchRate.builder()
+ .dispatchThrottlingRateInMsg(-1)
+ .dispatchThrottlingRateInByte(topicRate)
+ .ratePeriodInSecond(1)
+ .build();
+ admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
+ admin.namespaces().setSubscriptionDispatchRate(namespace, subscriptionDispatchRate);
+ admin.namespaces().setDispatchRate(namespace, topicDispatchRate);
+ admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + brokerRate);
+
+ final int numProducedMessages = 30;
+ final CountDownLatch latch = new CountDownLatch(numProducedMessages);
+ final AtomicInteger totalReceived = new AtomicInteger(0);
+ // enable throttling for nonBacklog consumers
+ conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
+
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
+ .receiverQueueSize(10)
+ .subscriptionType(subscription).messageListener((c1, msg) -> {
+ Assert.assertNotNull(msg, "Message cannot be null");
+ String receivedMessage = new String(msg.getData());
+ log.debug("Received message [{}] in the listener", receivedMessage);
+ totalReceived.incrementAndGet();
+ latch.countDown();
+ }).subscribe();
+
+ Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+ PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+
+ DispatchRateLimiter subRateLimiter = null;
+ Dispatcher subDispatcher = topic.getSubscription(subName).getDispatcher();
+ if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
+ subRateLimiter = subDispatcher.getRateLimiter().get();
+ } else if (subDispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
+ subRateLimiter = subDispatcher.getRateLimiter().get();
+ } else {
+ Assert.fail("Should only have PersistentDispatcher in this test");
+ }
+ final DispatchRateLimiter subDispatchRateLimiter = subRateLimiter;
+ Awaitility.await().atMost(Duration.ofMillis(500)).untilAsserted(() -> {
+ DispatchRateLimiter brokerDispatchRateLimiter = pulsar.getBrokerService().getBrokerDispatchRateLimiter();
+ Assert.assertTrue(brokerDispatchRateLimiter != null
+ && brokerDispatchRateLimiter.getDispatchRateOnByte() > 0);
+ DispatchRateLimiter topicDispatchRateLimiter = topic.getDispatchRateLimiter().orElse(null);
+ Assert.assertTrue(topicDispatchRateLimiter != null
+ && topicDispatchRateLimiter.getDispatchRateOnByte() > 0);
+ Assert.assertTrue(subDispatchRateLimiter != null
+ && subDispatchRateLimiter.getDispatchRateOnByte() > 0);
+ });
+
+ Assert.assertEquals(admin.namespaces().getSubscriptionDispatchRate(namespace)
+ .getDispatchThrottlingRateInByte(), subRate);
+ Assert.assertEquals(admin.namespaces().getDispatchRate(namespace)
+ .getDispatchThrottlingRateInByte(), topicRate);
+
+ long start = System.currentTimeMillis();
+ // Asynchronously produce messages
+ for (int i = 0; i < numProducedMessages; i++) {
+ producer.send(new byte[expectRate / 10]);
+ }
+ latch.await();
+ Assert.assertEquals(totalReceived.get(), numProducedMessages, 10);
+ long end = System.currentTimeMillis();
+ log.info("-- end - start: {} ", end - start);
+
+ // first 10 messages, which equals receiverQueueSize, will not wait.
+ Assert.assertTrue((end - start) >= 2500);
+ Assert.assertTrue((end - start) <= 8000);
+
+ consumer.close();
+ producer.close();
+ admin.topics().delete(topicName, true);
+ admin.namespaces().deleteNamespace(namespace);
+ }
+
+ /**
+ * Verify whether rate-limiting works well when different levels rate-limiting enabled.
+ *
+ * <pre>
+ * 1. Set broker level, topic level and subscription level dispatch-byte-rate with different limit rate value.
+ * 2. Start one consumer for one topics.
+ * 3. the expect dispatch rate should be the minimum value of different limit rate.
+ * </pre>
+ *
+ * @param subscription
+ * @throws Exception
+ */
+ @Test(dataProvider = "subscriptions")
+ public void testMultiLevelDispatch(SubscriptionType subscription) throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ testDispatchRate(subscription, 1000, 5000, 10000, 1000);
+
+ testDispatchRate(subscription, 10000, 1000, 5000, 1000);
+
+ testDispatchRate(subscription, 5000, 10000, 1000, 1000);
+
+ log.info("-- Exiting {} test --", methodName);
+ }
+
+ /**
+ * Verify whether the broker level rate-limiting is throttle message-dispatching based on byte-rate or not
+ *
+ * <pre>
+ * 1. Broker level dispatch-byte-rate is equal to 1000 bytes per second.
+ * 2. Start two consumers for two topics.
+ * 3. Send 15 msgs to each of the two topics. Each msgs with 100 bytes, thus 3000 bytes in total.
+ * 4. It should take up to 2 seconds to receive all messages of the two topics.
+ * </pre>
+ *
+ * @param subscription
+ * @throws Exception
+ */
+ @Test(dataProvider = "subscriptions", timeOut = 8000)
+ public void testBrokerBytesRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionType subscription) throws Exception {
+ log.info("-- Starting {} test --", methodName);
+
+ final String namespace1 = "my-property/throttling_ns1";
+ final String topicName1 = BrokerTestUtil.newUniqueName("persistent://" + namespace1 + "/throttlingAll");
+ final String namespace2 = "my-property/throttling_ns2";
+ final String topicName2 = BrokerTestUtil.newUniqueName("persistent://" + namespace2 + "/throttlingAll");
+ final String subName = "my-subscriber-name-" + subscription;
+
+ final int byteRate = 1000;
+ admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + byteRate);
+ admin.namespaces().createNamespace(namespace1, Sets.newHashSet("test"));
+ admin.namespaces().createNamespace(namespace2, Sets.newHashSet("test"));
+
+ final int numProducedMessagesEachTopic = 15;
+ final int numProducedMessages = numProducedMessagesEachTopic * 2;
+ final CountDownLatch latch = new CountDownLatch(numProducedMessages);
+ final AtomicInteger totalReceived = new AtomicInteger(0);
+ // enable throttling for nonBacklog consumers
+ conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
+
+ Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topicName1).subscriptionName(subName)
+ .receiverQueueSize(10)
+ .subscriptionType(subscription).messageListener((c1, msg) -> {
+ Assert.assertNotNull(msg, "Message cannot be null");
+ String receivedMessage = new String(msg.getData());
+ log.debug("Received message [{}] in topic1", receivedMessage);
+ totalReceived.incrementAndGet();
+ latch.countDown();
+ }).subscribe();
+
+ Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topicName2).subscriptionName(subName)
+ .receiverQueueSize(10)
+ .subscriptionType(subscription).messageListener((c1, msg) -> {
+ Assert.assertNotNull(msg, "Message cannot be null");
+ String receivedMessage = new String(msg.getData());
+ log.debug("Received message [{}] in topic2", receivedMessage);
+ totalReceived.incrementAndGet();
+ latch.countDown();
+ }).subscribe();
+
+ Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topicName1).create();
+ Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topicName2).create();
+
+ boolean isMessageRateUpdate = false;
+ DispatchRateLimiter dispatchRateLimiter;
+
+ Awaitility.await().atMost(Duration.ofMillis(500)).untilAsserted(() -> {
+ DispatchRateLimiter rateLimiter = pulsar.getBrokerService().getBrokerDispatchRateLimiter();
+ Assert.assertTrue(rateLimiter != null
+ && rateLimiter.getDispatchRateOnByte() > 0);
+ });
+
+ long start = System.currentTimeMillis();
+ // Asynchronously produce messages
+ for (int i = 0; i < numProducedMessagesEachTopic; i++) {
+ producer1.send(new byte[byteRate / 10]);
+ producer2.send(new byte[byteRate / 10]);
+ }
+ latch.await();
+ Assert.assertEquals(totalReceived.get(), numProducedMessages, 10);
+ long end = System.currentTimeMillis();
+ log.info("-- time to receive all messages: {} ", end - start);
+
+ // first 10 messages, which equals receiverQueueSize, will not wait.
+ Assert.assertTrue((end - start) >= 2000);
+
+ consumer1.close();
+ consumer2.close();
+ producer1.close();
+ producer2.close();
+ log.info("-- Exiting {} test --", methodName);
+ }
+
/**
* verify message-rate on multiple consumers with shared-subscription
*