Support broker level dispatch rate limiter (#11325)

### Motivation

Fixes #7720

Current we support broker level publish rate limiter.
This PR will add broker level dispatch rate limiter.

### Modifications

Add broker level dispatch rate limiter for msgs and bytes.
Refact calculateToRead() to reduce duplicate code.

### Verifying this change

Add testBrokerBytesRateLimitingReceiveAllMessagesAfterThrottling()
diff --git a/conf/broker.conf b/conf/broker.conf
index fcf0717..a871dc3 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -355,6 +355,14 @@
 # Rate period for {subscribeThrottlingRatePerConsumer}. Default is 30s.
 subscribeRatePeriodPerConsumerInSecond=30
 
+# Default messages per second dispatch throttling-limit for whole broker. Using a value of 0, is disabling default
+# message dispatch-throttling
+dispatchThrottlingRateInMsg=0
+
+# Default bytes per second dispatch throttling-limit for whole broker. Using a value of 0, is disabling
+# default message-byte dispatch-throttling
+dispatchThrottlingRateInByte=0
+
 # Default messages per second dispatch throttling-limit for every topic. Using a value of 0, is disabling default
 # message dispatch-throttling
 dispatchThrottlingRatePerTopicInMsg=0
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index af6ec5a..1e93dc5 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -750,6 +750,20 @@
     )
     private long brokerPublisherThrottlingMaxByteRate = 0;
     @FieldContext(
+            category = CATEGORY_SERVER,
+            dynamic = true,
+            doc = "Default messages per second dispatch throttling-limit for whole broker. "
+                    + "Using a value of 0, is disabling default message-byte dispatch-throttling"
+    )
+    private int dispatchThrottlingRateInMsg = 0;
+    @FieldContext(
+            category = CATEGORY_SERVER,
+            dynamic = true,
+            doc = "Default bytes per second dispatch throttling-limit for whole broker. "
+                    + "Using a value of 0, is disabling default message-byte dispatch-throttling"
+    )
+    private long dispatchThrottlingRateInByte = 0;
+    @FieldContext(
         category = CATEGORY_SERVER,
         dynamic = true,
         doc = "Max Rate(in 1 seconds) of Message allowed to publish for a topic "
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 9d0043e..64bc611 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -35,6 +35,7 @@
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.intercept.BrokerInterceptor;
+import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.plugin.EntryFilter;
 import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader;
@@ -270,6 +271,26 @@
         // noop
     }
 
+    protected abstract void reScheduleRead();
+
+    protected boolean reachDispatchRateLimit(DispatchRateLimiter dispatchRateLimiter) {
+        if (dispatchRateLimiter.isDispatchRateLimitingEnabled()) {
+            if (!dispatchRateLimiter.hasMessageDispatchPermit()) {
+                reScheduleRead();
+                return true;
+            }
+        }
+        return false;
+    }
+
+    protected Pair<Integer, Long> updateMessagesToRead(DispatchRateLimiter dispatchRateLimiter,
+                                                       int messagesToRead, long bytesToRead) {
+        // update messagesToRead according to available dispatch rate limit.
+        return computeReadLimits(messagesToRead,
+                (int) dispatchRateLimiter.getAvailableDispatchRateLimitOnMsg(),
+                bytesToRead, dispatchRateLimiter.getAvailableDispatchRateLimitOnByte());
+    }
+
     protected static Pair<Integer, Long> computeReadLimits(int messagesToRead, int availablePermitsOnMsg,
                                                            long bytesToRead, long availablePermitsOnByte) {
         if (availablePermitsOnMsg > 0) {
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
index c2a27ad..613961d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/BrokerService.java
@@ -235,6 +235,7 @@
     private ScheduledExecutorService brokerPublishRateLimiterMonitor;
     private ScheduledExecutorService deduplicationSnapshotMonitor;
     protected volatile PublishRateLimiter brokerPublishRateLimiter = PublishRateLimiter.DISABLED_RATE_LIMITER;
+    protected volatile DispatchRateLimiter brokerDispatchRateLimiter = null;
 
     private DistributedIdGenerator producerNameGenerator;
 
@@ -495,6 +496,7 @@
         this.startConsumedLedgersMonitor();
         this.startBacklogQuotaChecker();
         this.updateBrokerPublisherThrottlingMaxRate();
+        this.updateBrokerDispatchThrottlingMaxRate();
         this.startCheckReplicationPolicies();
         this.startDeduplicationSnapshotMonitor();
     }
@@ -2146,7 +2148,13 @@
         registerConfigurationListener("brokerPublisherThrottlingMaxByteRate",
                 (brokerPublisherThrottlingMaxByteRate) ->
                         updateBrokerPublisherThrottlingMaxRate());
-
+        // add listener to notify broker dispatch-rate dynamic config
+        registerConfigurationListener("dispatchThrottlingRateInMsg",
+                (dispatchThrottlingRateInMsg) ->
+                        updateBrokerDispatchThrottlingMaxRate());
+        registerConfigurationListener("dispatchThrottlingRateInByte",
+                (dispatchThrottlingRateInByte) ->
+                        updateBrokerDispatchThrottlingMaxRate());
         // add listener to notify topic publish-rate monitoring
         if (!preciseTopicPublishRateLimitingEnable) {
             registerConfigurationListener("topicPublisherThrottlingTickTimeMillis",
@@ -2161,6 +2169,14 @@
         // add more listeners here
     }
 
+    private void updateBrokerDispatchThrottlingMaxRate() {
+        if (brokerDispatchRateLimiter == null) {
+            brokerDispatchRateLimiter = new DispatchRateLimiter(this);
+        } else {
+            brokerDispatchRateLimiter.updateDispatchRate();
+        }
+    }
+
     private void updateBrokerPublisherThrottlingMaxRate() {
         int currentMaxMessageRate = pulsar.getConfiguration().getBrokerPublisherThrottlingMaxMessageRate();
         long currentMaxByteRate = pulsar.getConfiguration().getBrokerPublisherThrottlingMaxByteRate();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
index 6f5fd43..ba7ad0d 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Topic.java
@@ -274,6 +274,10 @@
         return Optional.empty();
     }
 
+    default Optional<DispatchRateLimiter> getBrokerDispatchRateLimiter() {
+        return Optional.empty();
+    }
+
     default boolean isSystemTopic() {
         return false;
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
index fae2344..7fff7a7 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherMultipleConsumers.java
@@ -218,6 +218,11 @@
         return consumer != null && consumer.getAvailablePermits() > 0 && consumer.isWritable();
     }
 
+    @Override
+    protected void reScheduleRead() {
+        // No-op
+    }
+
     private static final Logger log = LoggerFactory.getLogger(NonPersistentDispatcherMultipleConsumers.class);
 
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
index ce6a30b..9c404d5 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentDispatcherSingleActiveConsumer.java
@@ -110,4 +110,9 @@
     protected void cancelPendingRead() {
         // No-op
     }
+
+    @Override
+    protected void reScheduleRead() {
+        // No-op
+    }
 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
index 928c9d9..528f2d8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/DispatchRateLimiter.java
@@ -40,7 +40,8 @@
     public enum Type {
         TOPIC,
         SUBSCRIPTION,
-        REPLICATOR
+        REPLICATOR,
+        BROKER
     }
 
     private final PersistentTopic topic;
@@ -59,6 +60,14 @@
         updateDispatchRate();
     }
 
+    public DispatchRateLimiter(BrokerService brokerService) {
+        this.topic = null;
+        this.topicName = null;
+        this.brokerService = brokerService;
+        this.type = Type.BROKER;
+        updateDispatchRate();
+    }
+
     /**
      * returns available msg-permit if msg-dispatch-throttling is enabled else it returns -1.
      *
@@ -136,6 +145,10 @@
                 dispatchThrottlingRateInMsg = config.getDispatchThrottlingRatePerReplicatorInMsg();
                 dispatchThrottlingRateInByte = config.getDispatchThrottlingRatePerReplicatorInByte();
                 break;
+            case BROKER:
+                dispatchThrottlingRateInMsg = config.getDispatchThrottlingRateInMsg();
+                dispatchThrottlingRateInByte = config.getDispatchThrottlingRateInByte();
+                break;
             default:
                 dispatchThrottlingRateInMsg = -1;
                 dispatchThrottlingRateInByte = -1;
@@ -146,7 +159,7 @@
                 .dispatchThrottlingRateInMsg(dispatchThrottlingRateInMsg)
                 .dispatchThrottlingRateInByte(dispatchThrottlingRateInByte)
                 .ratePeriodInSecond(1)
-                .relativeToPublishRate(config.isDispatchThrottlingRateRelativeToPublishRate())
+                .relativeToPublishRate(type != Type.BROKER && config.isDispatchThrottlingRateRelativeToPublishRate())
                 .build();
     }
 
@@ -163,9 +176,12 @@
                     dispatchRateOp = Optional.of(createDispatchRate());
                 }
                 updateDispatchRate(dispatchRateOp.get());
-                log.info("[{}] configured {} message-dispatch rate at broker {}", this.topicName, type,
-                        dispatchRateOp.get());
-
+                if (type == Type.BROKER) {
+                  log.info("configured broker message-dispatch rate {}", dispatchRate.get());
+                } else {
+                  log.info("[{}] configured {} message-dispatch rate at broker {}",
+                          this.topicName, type, dispatchRate.get());
+                }
             }).exceptionally(ex -> {
                 log.error("[{}] failed to get the dispatch rate policy from the namespace resource for type {}",
                         topicName, type, ex);
@@ -180,6 +196,10 @@
     public static boolean isDispatchRateNeeded(BrokerService brokerService, Optional<Policies> policies,
             String topicName, Type type) {
         final ServiceConfiguration serviceConfig = brokerService.pulsar().getConfiguration();
+        if (type == Type.BROKER) {
+            return brokerService.getBrokerDispatchRateLimiter().isDispatchRateLimitingEnabled();
+        }
+
         Optional<DispatchRate> dispatchRate = getTopicPolicyDispatchRate(brokerService, topicName, type);
         if (dispatchRate.isPresent()) {
             return true;
@@ -318,6 +338,9 @@
      * @return
      */
     public CompletableFuture<Optional<DispatchRate>> getPoliciesDispatchRateAsync(BrokerService brokerService) {
+        if (topicName == null) {
+            return CompletableFuture.completedFuture(Optional.empty());
+        }
         final String cluster = brokerService.pulsar().getConfiguration().getClusterName();
         return getPoliciesAsync(brokerService, topicName).thenApply(policiesOp ->
                 Optional.ofNullable(getPoliciesDispatchRate(cluster, policiesOp, type)));
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 0ccbc02..0def122 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -282,6 +282,12 @@
         }
     }
 
+    @Override
+    protected void reScheduleRead() {
+        topic.getBrokerService().executor().schedule(() -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS,
+                TimeUnit.MILLISECONDS);
+    }
+
     // left pair is messagesToRead, right pair is bytesToRead
     protected Pair<Integer, Long> calculateToRead(int currentTotalAvailablePermits) {
         int messagesToRead = Math.min(currentTotalAvailablePermits, readBatchSize);
@@ -307,51 +313,56 @@
         // active-cursor reads message from cache rather from bookkeeper (2) if topic has reached message-rate
         // threshold: then schedule the read after MESSAGE_RATE_BACKOFF_MS
         if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
-            if (topic.getDispatchRateLimiter().isPresent()
-                    && topic.getDispatchRateLimiter().get().isDispatchRateLimitingEnabled()) {
+            if (topic.getBrokerDispatchRateLimiter().isPresent()) {
+                DispatchRateLimiter brokerRateLimiter = topic.getBrokerDispatchRateLimiter().get();
+                if (reachDispatchRateLimit(brokerRateLimiter)) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] message-read exceeded broker message-rate {}/{}, schedule after a {}", name,
+                                brokerRateLimiter.getDispatchRateOnMsg(), brokerRateLimiter.getDispatchRateOnByte(),
+                                MESSAGE_RATE_BACKOFF_MS);
+                    }
+                    return Pair.of(-1, -1L);
+                } else {
+                    Pair<Integer, Long> calculateToRead =
+                            updateMessagesToRead(brokerRateLimiter, messagesToRead, bytesToRead);
+                    messagesToRead = calculateToRead.getLeft();
+                    bytesToRead = calculateToRead.getRight();
+                }
+            }
+
+            if (topic.getDispatchRateLimiter().isPresent()) {
                 DispatchRateLimiter topicRateLimiter = topic.getDispatchRateLimiter().get();
-                if (!topicRateLimiter.hasMessageDispatchPermit()) {
+                if (reachDispatchRateLimit(topicRateLimiter)) {
                     if (log.isDebugEnabled()) {
                         log.debug("[{}] message-read exceeded topic message-rate {}/{}, schedule after a {}", name,
                                 topicRateLimiter.getDispatchRateOnMsg(), topicRateLimiter.getDispatchRateOnByte(),
                                 MESSAGE_RATE_BACKOFF_MS);
                     }
-                    topic.getBrokerService().executor().schedule(() -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS,
-                            TimeUnit.MILLISECONDS);
                     return Pair.of(-1, -1L);
                 } else {
-                    Pair<Integer, Long> calculateResult = computeReadLimits(messagesToRead,
-                            (int) topicRateLimiter.getAvailableDispatchRateLimitOnMsg(),
-                            bytesToRead, topicRateLimiter.getAvailableDispatchRateLimitOnByte());
-
-                    messagesToRead = calculateResult.getLeft();
-                    bytesToRead = calculateResult.getRight();
-
+                    Pair<Integer, Long> calculateToRead =
+                            updateMessagesToRead(topicRateLimiter, messagesToRead, bytesToRead);
+                    messagesToRead = calculateToRead.getLeft();
+                    bytesToRead = calculateToRead.getRight();
                 }
             }
 
-            if (dispatchRateLimiter.isPresent() && dispatchRateLimiter.get().isDispatchRateLimitingEnabled()) {
-                if (!dispatchRateLimiter.get().hasMessageDispatchPermit()) {
+            if (dispatchRateLimiter.isPresent()) {
+                if (reachDispatchRateLimit(dispatchRateLimiter.get())) {
                     if (log.isDebugEnabled()) {
-                        log.debug("[{}] message-read exceeded subscription message-rate {}/{},"
-                                        + " schedule after a {}", name,
-                                dispatchRateLimiter.get().getDispatchRateOnMsg(),
+                        log.debug("[{}] message-read exceeded subscription message-rate {}/{}, schedule after a {}",
+                                name, dispatchRateLimiter.get().getDispatchRateOnMsg(),
                                 dispatchRateLimiter.get().getDispatchRateOnByte(),
                                 MESSAGE_RATE_BACKOFF_MS);
                     }
-                    topic.getBrokerService().executor().schedule(() -> readMoreEntries(), MESSAGE_RATE_BACKOFF_MS,
-                            TimeUnit.MILLISECONDS);
                     return Pair.of(-1, -1L);
                 } else {
-                    Pair<Integer, Long> calculateResult = computeReadLimits(messagesToRead,
-                            (int) dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg(),
-                            bytesToRead, dispatchRateLimiter.get().getAvailableDispatchRateLimitOnByte());
-
-                    messagesToRead = calculateResult.getLeft();
-                    bytesToRead = calculateResult.getRight();
+                    Pair<Integer, Long> calculateToRead =
+                            updateMessagesToRead(dispatchRateLimiter.get(), messagesToRead, bytesToRead);
+                    messagesToRead = calculateToRead.getLeft();
+                    bytesToRead = calculateToRead.getRight();
                 }
             }
-
         }
 
         if (havePendingReplayRead) {
@@ -362,7 +373,9 @@
         }
 
         // If messagesToRead is 0 or less, correct it to 1 to prevent IllegalArgumentException
-        return Pair.of(Math.max(messagesToRead, 1), Math.max(bytesToRead, 1));
+        messagesToRead = Math.max(messagesToRead, 1);
+        bytesToRead = Math.max(bytesToRead, 1);
+        return Pair.of(messagesToRead, bytesToRead);
     }
 
     protected Set<? extends Position> asyncReplayEntries(Set<? extends Position> positions) {
@@ -578,6 +591,9 @@
         // acquire message-dispatch permits for already delivered messages
         long permits = dispatchThrottlingOnBatchMessageEnabled ? totalEntries : totalMessagesSent;
         if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
+            if (topic.getBrokerDispatchRateLimiter().isPresent()) {
+                topic.getBrokerDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent);
+            }
             if (topic.getDispatchRateLimiter().isPresent()) {
                 topic.getDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent);
             }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 043f64d..4937e86 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -218,6 +218,11 @@
                             : sendMessageInfo.getTotalMessages();
                     // acquire message-dispatch permits for already delivered messages
                     if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
+                        if (topic.getBrokerDispatchRateLimiter().isPresent()) {
+                            topic.getBrokerDispatchRateLimiter().get().tryDispatchPermit(permits,
+                                    sendMessageInfo.getTotalBytes());
+                        }
+
                         if (topic.getDispatchRateLimiter().isPresent()) {
                             topic.getDispatchRateLimiter().get().tryDispatchPermit(permits,
                                     sendMessageInfo.getTotalBytes());
@@ -356,6 +361,23 @@
         }
     }
 
+    @Override
+    protected void reScheduleRead() {
+        topic.getBrokerService().executor().schedule(() -> {
+            Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
+            if (currentConsumer != null && !havePendingRead) {
+                readMoreEntries(currentConsumer);
+            } else {
+                if (log.isDebugEnabled()) {
+                    log.debug("[{}] Skipping read retry for topic: Current Consumer {},"
+                                    + " havePendingRead {}",
+                            topic.getName(), currentConsumer, havePendingRead);
+                }
+            }
+        }, MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
+
+    }
+
     protected Pair<Integer, Long> calculateToRead(Consumer consumer) {
         int availablePermits = consumer.getAvailablePermits();
         if (!consumer.isWritable()) {
@@ -378,75 +400,62 @@
         // active-cursor reads message from cache rather from bookkeeper (2) if topic has reached message-rate
         // threshold: then schedule the read after MESSAGE_RATE_BACKOFF_MS
         if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
-            if (topic.getDispatchRateLimiter().isPresent()
-                    && topic.getDispatchRateLimiter().get().isDispatchRateLimitingEnabled()) {
+            if (topic.getBrokerDispatchRateLimiter().isPresent()) {
+                DispatchRateLimiter brokerRateLimiter = topic.getBrokerDispatchRateLimiter().get();
+                if (reachDispatchRateLimit(brokerRateLimiter)) {
+                    if (log.isDebugEnabled()) {
+                        log.debug("[{}] message-read exceeded broker message-rate {}/{}, schedule after a {}", name,
+                                brokerRateLimiter.getDispatchRateOnMsg(), brokerRateLimiter.getDispatchRateOnByte(),
+                                MESSAGE_RATE_BACKOFF_MS);
+                    }
+                    return Pair.of(-1, -1L);
+                } else {
+                    Pair<Integer, Long> calculateToRead =
+                            updateMessagesToRead(brokerRateLimiter, messagesToRead, bytesToRead);
+                    messagesToRead = calculateToRead.getLeft();
+                    bytesToRead = calculateToRead.getRight();
+                }
+            }
+
+            if (topic.getDispatchRateLimiter().isPresent()) {
                 DispatchRateLimiter topicRateLimiter = topic.getDispatchRateLimiter().get();
-                if (!topicRateLimiter.hasMessageDispatchPermit()) {
+                if (reachDispatchRateLimit(topicRateLimiter)) {
                     if (log.isDebugEnabled()) {
                         log.debug("[{}] message-read exceeded topic message-rate {}/{}, schedule after a {}", name,
                                 topicRateLimiter.getDispatchRateOnMsg(), topicRateLimiter.getDispatchRateOnByte(),
                                 MESSAGE_RATE_BACKOFF_MS);
                     }
-                    topic.getBrokerService().executor().schedule(() -> {
-                        Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
-                        if (currentConsumer != null && !havePendingRead) {
-                            readMoreEntries(currentConsumer);
-                        } else {
-                            if (log.isDebugEnabled()) {
-                                log.debug("[{}] Skipping read retry for topic: Current Consumer {},"
-                                                + " havePendingRead {}",
-                                        topic.getName(), currentConsumer, havePendingRead);
-                            }
-                        }
-                    }, MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
                     return Pair.of(-1, -1L);
                 } else {
-
-                    Pair<Integer, Long> calculateResult = computeReadLimits(messagesToRead,
-                            (int) topicRateLimiter.getAvailableDispatchRateLimitOnMsg(),
-                            bytesToRead, topicRateLimiter.getAvailableDispatchRateLimitOnByte());
-
-                    messagesToRead = calculateResult.getLeft();
-                    bytesToRead = calculateResult.getRight();
-
+                    Pair<Integer, Long> calculateToRead =
+                            updateMessagesToRead(topicRateLimiter, messagesToRead, bytesToRead);
+                    messagesToRead = calculateToRead.getLeft();
+                    bytesToRead = calculateToRead.getRight();
                 }
             }
 
-            if (dispatchRateLimiter.isPresent() && dispatchRateLimiter.get().isDispatchRateLimitingEnabled()) {
-                if (!dispatchRateLimiter.get().hasMessageDispatchPermit()) {
+            if (dispatchRateLimiter.isPresent()) {
+                if (reachDispatchRateLimit(dispatchRateLimiter.get())) {
                     if (log.isDebugEnabled()) {
-                        log.debug("[{}] message-read exceeded subscription message-rate {}/{},"
-                                        + " schedule after a {}",
+                        log.debug("[{}] message-read exceeded subscription message-rate {}/{}, schedule after a {}",
                                 name, dispatchRateLimiter.get().getDispatchRateOnMsg(),
                                 dispatchRateLimiter.get().getDispatchRateOnByte(),
                                 MESSAGE_RATE_BACKOFF_MS);
                     }
-                    topic.getBrokerService().executor().schedule(() -> {
-                        Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
-                        if (currentConsumer != null && !havePendingRead) {
-                            readMoreEntries(currentConsumer);
-                        } else {
-                            if (log.isDebugEnabled()) {
-                                log.debug("[{}] Skipping read retry: Current Consumer {}, havePendingRead {}",
-                                        topic.getName(), currentConsumer, havePendingRead);
-                            }
-                        }
-                    }, MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
                     return Pair.of(-1, -1L);
                 } else {
-
-                    Pair<Integer, Long> calculateResult = computeReadLimits(messagesToRead,
-                            (int) dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg(),
-                            bytesToRead, dispatchRateLimiter.get().getAvailableDispatchRateLimitOnByte());
-
-                    messagesToRead = calculateResult.getLeft();
-                    bytesToRead = calculateResult.getRight();
+                    Pair<Integer, Long> calculateToRead =
+                            updateMessagesToRead(dispatchRateLimiter.get(), messagesToRead, bytesToRead);
+                    messagesToRead = calculateToRead.getLeft();
+                    bytesToRead = calculateToRead.getRight();
                 }
             }
         }
 
         // If messagesToRead is 0 or less, correct it to 1 to prevent IllegalArgumentException
-        return Pair.of(Math.max(messagesToRead, 1), Math.max(bytesToRead, 1));
+        messagesToRead = Math.max(messagesToRead, 1);
+        bytesToRead = Math.max(bytesToRead, 1);
+        return Pair.of(messagesToRead, bytesToRead);
     }
 
     @Override
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 116a9eb..1a9ed78 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -285,6 +285,9 @@
         // acquire message-dispatch permits for already delivered messages
         if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
             long permits = dispatchThrottlingOnBatchMessageEnabled ? totalEntries : totalMessagesSent;
+            if (topic.getBrokerDispatchRateLimiter().isPresent()) {
+                topic.getBrokerDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent);
+            }
             if (topic.getDispatchRateLimiter().isPresent()) {
                 topic.getDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent);
             }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index cd8c83e..104575e 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -2699,6 +2699,11 @@
         return this.dispatchRateLimiter;
     }
 
+    @Override
+    public Optional<DispatchRateLimiter> getBrokerDispatchRateLimiter() {
+        return Optional.ofNullable(this.brokerService.getBrokerDispatchRateLimiter());
+    }
+
     public Optional<SubscribeRateLimiter> getSubscribeRateLimiter() {
         return this.subscribeRateLimiter;
     }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
index faa86fb..9e65eec 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SubscriptionMessageDispatchThrottlingTest.java
@@ -20,6 +20,7 @@
 
 import com.google.common.collect.Sets;
 
+import java.time.Duration;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -31,6 +32,7 @@
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.common.policies.data.DispatchRate;
+import org.awaitility.Awaitility;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
@@ -309,6 +311,206 @@
         log.info("-- Exiting {} test --", methodName);
     }
 
+    private void testDispatchRate(SubscriptionType subscription,
+                                  int brokerRate, int topicRate, int subRate, int expectRate) throws Exception {
+
+        final String namespace = "my-property/throttling_ns";
+        final String topicName = BrokerTestUtil.newUniqueName("persistent://" + namespace + "/throttlingAll");
+        final String subName = "my-subscriber-name-" + subscription;
+
+        DispatchRate subscriptionDispatchRate = DispatchRate.builder()
+                .dispatchThrottlingRateInMsg(-1)
+                .dispatchThrottlingRateInByte(subRate)
+                .ratePeriodInSecond(1)
+                .build();
+        DispatchRate topicDispatchRate = DispatchRate.builder()
+                .dispatchThrottlingRateInMsg(-1)
+                .dispatchThrottlingRateInByte(topicRate)
+                .ratePeriodInSecond(1)
+                .build();
+        admin.namespaces().createNamespace(namespace, Sets.newHashSet("test"));
+        admin.namespaces().setSubscriptionDispatchRate(namespace, subscriptionDispatchRate);
+        admin.namespaces().setDispatchRate(namespace, topicDispatchRate);
+        admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + brokerRate);
+
+        final int numProducedMessages = 30;
+        final CountDownLatch latch = new CountDownLatch(numProducedMessages);
+        final AtomicInteger totalReceived = new AtomicInteger(0);
+        // enable throttling for nonBacklog consumers
+        conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
+
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName(subName)
+                .receiverQueueSize(10)
+                .subscriptionType(subscription).messageListener((c1, msg) -> {
+                    Assert.assertNotNull(msg, "Message cannot be null");
+                    String receivedMessage = new String(msg.getData());
+                    log.debug("Received message [{}] in the listener", receivedMessage);
+                    totalReceived.incrementAndGet();
+                    latch.countDown();
+                }).subscribe();
+
+        Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
+        PersistentTopic topic = (PersistentTopic) pulsar.getBrokerService().getOrCreateTopic(topicName).get();
+
+        DispatchRateLimiter subRateLimiter = null;
+        Dispatcher subDispatcher = topic.getSubscription(subName).getDispatcher();
+        if (subDispatcher instanceof PersistentDispatcherMultipleConsumers) {
+            subRateLimiter = subDispatcher.getRateLimiter().get();
+        } else if (subDispatcher instanceof PersistentDispatcherSingleActiveConsumer) {
+            subRateLimiter = subDispatcher.getRateLimiter().get();
+        } else {
+            Assert.fail("Should only have PersistentDispatcher in this test");
+        }
+        final DispatchRateLimiter subDispatchRateLimiter = subRateLimiter;
+        Awaitility.await().atMost(Duration.ofMillis(500)).untilAsserted(() -> {
+            DispatchRateLimiter brokerDispatchRateLimiter = pulsar.getBrokerService().getBrokerDispatchRateLimiter();
+            Assert.assertTrue(brokerDispatchRateLimiter != null
+                    && brokerDispatchRateLimiter.getDispatchRateOnByte() > 0);
+            DispatchRateLimiter topicDispatchRateLimiter = topic.getDispatchRateLimiter().orElse(null);
+            Assert.assertTrue(topicDispatchRateLimiter != null
+                    && topicDispatchRateLimiter.getDispatchRateOnByte() > 0);
+            Assert.assertTrue(subDispatchRateLimiter != null
+                    && subDispatchRateLimiter.getDispatchRateOnByte() > 0);
+        });
+
+        Assert.assertEquals(admin.namespaces().getSubscriptionDispatchRate(namespace)
+                .getDispatchThrottlingRateInByte(), subRate);
+        Assert.assertEquals(admin.namespaces().getDispatchRate(namespace)
+                .getDispatchThrottlingRateInByte(), topicRate);
+
+        long start = System.currentTimeMillis();
+        // Asynchronously produce messages
+        for (int i = 0; i < numProducedMessages; i++) {
+            producer.send(new byte[expectRate / 10]);
+        }
+        latch.await();
+        Assert.assertEquals(totalReceived.get(), numProducedMessages, 10);
+        long end = System.currentTimeMillis();
+        log.info("-- end - start: {} ", end - start);
+
+        // first 10 messages, which equals receiverQueueSize, will not wait.
+        Assert.assertTrue((end - start) >= 2500);
+        Assert.assertTrue((end - start) <= 8000);
+
+        consumer.close();
+        producer.close();
+        admin.topics().delete(topicName, true);
+        admin.namespaces().deleteNamespace(namespace);
+    }
+
+    /**
+     * Verify whether rate-limiting works well when different levels rate-limiting enabled.
+     *
+     * <pre>
+     *  1. Set broker level, topic level and subscription level dispatch-byte-rate with different limit rate value.
+     *  2. Start one consumer for one topics.
+     *  3. the expect dispatch rate should be the minimum value of different limit rate.
+     * </pre>
+     *
+     * @param subscription
+     * @throws Exception
+     */
+    @Test(dataProvider = "subscriptions")
+    public void testMultiLevelDispatch(SubscriptionType subscription) throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        testDispatchRate(subscription, 1000, 5000, 10000, 1000);
+
+        testDispatchRate(subscription, 10000, 1000, 5000, 1000);
+
+        testDispatchRate(subscription, 5000, 10000, 1000, 1000);
+
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+    /**
+     * Verify whether the broker level rate-limiting is throttle message-dispatching based on byte-rate or not
+     *
+     * <pre>
+     *  1. Broker level dispatch-byte-rate is equal to 1000 bytes per second.
+     *  2. Start two consumers for two topics.
+     *  3. Send 15 msgs to each of the two topics. Each msgs with 100 bytes, thus 3000 bytes in total.
+     *  4. It should take up to 2 seconds to receive all messages of the two topics.
+     * </pre>
+     *
+     * @param subscription
+     * @throws Exception
+     */
+    @Test(dataProvider = "subscriptions", timeOut = 8000)
+    public void testBrokerBytesRateLimitingReceiveAllMessagesAfterThrottling(SubscriptionType subscription) throws Exception {
+        log.info("-- Starting {} test --", methodName);
+
+        final String namespace1 = "my-property/throttling_ns1";
+        final String topicName1 = BrokerTestUtil.newUniqueName("persistent://" + namespace1 + "/throttlingAll");
+        final String namespace2 = "my-property/throttling_ns2";
+        final String topicName2 = BrokerTestUtil.newUniqueName("persistent://" + namespace2 + "/throttlingAll");
+        final String subName = "my-subscriber-name-" + subscription;
+
+        final int byteRate = 1000;
+        admin.brokers().updateDynamicConfiguration("dispatchThrottlingRateInByte", "" + byteRate);
+        admin.namespaces().createNamespace(namespace1, Sets.newHashSet("test"));
+        admin.namespaces().createNamespace(namespace2, Sets.newHashSet("test"));
+
+        final int numProducedMessagesEachTopic = 15;
+        final int numProducedMessages = numProducedMessagesEachTopic * 2;
+        final CountDownLatch latch = new CountDownLatch(numProducedMessages);
+        final AtomicInteger totalReceived = new AtomicInteger(0);
+        // enable throttling for nonBacklog consumers
+        conf.setDispatchThrottlingOnNonBacklogConsumerEnabled(true);
+
+        Consumer<byte[]> consumer1 = pulsarClient.newConsumer().topic(topicName1).subscriptionName(subName)
+                .receiverQueueSize(10)
+                .subscriptionType(subscription).messageListener((c1, msg) -> {
+                    Assert.assertNotNull(msg, "Message cannot be null");
+                    String receivedMessage = new String(msg.getData());
+                    log.debug("Received message [{}] in topic1", receivedMessage);
+                    totalReceived.incrementAndGet();
+                    latch.countDown();
+                }).subscribe();
+
+        Consumer<byte[]> consumer2 = pulsarClient.newConsumer().topic(topicName2).subscriptionName(subName)
+                .receiverQueueSize(10)
+                .subscriptionType(subscription).messageListener((c1, msg) -> {
+                    Assert.assertNotNull(msg, "Message cannot be null");
+                    String receivedMessage = new String(msg.getData());
+                    log.debug("Received message [{}] in topic2", receivedMessage);
+                    totalReceived.incrementAndGet();
+                    latch.countDown();
+                }).subscribe();
+
+        Producer<byte[]> producer1 = pulsarClient.newProducer().topic(topicName1).create();
+        Producer<byte[]> producer2 = pulsarClient.newProducer().topic(topicName2).create();
+
+        boolean isMessageRateUpdate = false;
+        DispatchRateLimiter dispatchRateLimiter;
+
+        Awaitility.await().atMost(Duration.ofMillis(500)).untilAsserted(() -> {
+            DispatchRateLimiter rateLimiter = pulsar.getBrokerService().getBrokerDispatchRateLimiter();
+            Assert.assertTrue(rateLimiter != null
+                    && rateLimiter.getDispatchRateOnByte() > 0);
+        });
+
+        long start = System.currentTimeMillis();
+        // Asynchronously produce messages
+        for (int i = 0; i < numProducedMessagesEachTopic; i++) {
+            producer1.send(new byte[byteRate / 10]);
+            producer2.send(new byte[byteRate / 10]);
+        }
+        latch.await();
+        Assert.assertEquals(totalReceived.get(), numProducedMessages, 10);
+        long end = System.currentTimeMillis();
+        log.info("-- time to receive all messages: {} ", end - start);
+
+        // first 10 messages, which equals receiverQueueSize, will not wait.
+        Assert.assertTrue((end - start) >= 2000);
+
+        consumer1.close();
+        consumer2.close();
+        producer1.close();
+        producer2.close();
+        log.info("-- Exiting {} test --", methodName);
+    }
+
     /**
      * verify message-rate on multiple consumers with shared-subscription
      *