[feat][broker] Add config to count filtered entries towards rate limits (#17686)

* [feat][broker] Add config to count filtered entries towards rate limits

* Make fixes for checkstyle

* Remove * import

* Fix incorrect conflict resolution in merge commit

### Motivation

Currently, when using entry filters, filtered out messages do not count against the rate limit. Therefore, a subscription that is completely filtered will never be throttled due to rate limiting. When the messages are delivered to the consumer for a filtered subscription, those messages will count against the rate limit, and in that case, the message filtering can be throttled because the check to delay `readMoreEntries()` happens before message filtering. Therefore, the rate limit will essentially be increased as a function of the percent of messages let through the filter (some quick math is that the new rate is likely `dispatchRate * (1 / percentDelivered)`, where percent delivered is a percent as a decimal).

It's possible that some use cases prefer this behavior, but in my case, I think it'd be valuable to include these filtered messages in the dispatch throttling because these messages still cost the broker network, memory, and cpu. This PR adds a configuration to count filtered out messages towards dispatch rate limits for the broker, the topic, and the subscription.

### Modifications

* Add configuration named `dispatchThrottlingForFilteredEntriesEnabled`. Default it to false so we maintain the original behavior. When true, count filtered messages against rate limits.
* Refactor the code to `acquirePermitsForDeliveredMessages` so that it is in the `AbstractBaseDispatcher`, which makes it available to the entry filtering logic.

### Verifying this change

A new test is added as part of this PR.

### Does this pull request potentially affect one of the following parts:

This PR introduces a new config while maintaining the current behavior.

### Documentation

- [x] `doc-not-needed` 
Config docs are auto-generated.

diff --git a/conf/broker.conf b/conf/broker.conf
index 30e79eb..d117d67 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -453,6 +453,12 @@
 # The directory for all the entry filter implementations
 entryFiltersDirectory=
 
+# Whether the broker should count filtered entries in dispatch rate limit calculations. When disabled,
+# only messages sent to a consumer count towards a dispatch rate limit at the broker, topic, and
+# subscription level. When enabled, messages filtered out due to entry filter logic are counted towards
+# each relevant rate limit.
+dispatchThrottlingForFilteredEntriesEnabled=false
+
 # Whether allow topic level entry filters policies overrides broker configuration.
 allowOverrideEntryFilters=false
 
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index a6e9a55..8c88304 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1050,6 +1050,16 @@
     )
     private boolean dispatcherDispatchMessagesInSubscriptionThread = true;
 
+    @FieldContext(
+        dynamic = false,
+        category = CATEGORY_SERVER,
+        doc = "Whether the broker should count filtered entries in dispatch rate limit calculations. When disabled, "
+            + "only messages sent to a consumer count towards a dispatch rate limit at the broker, topic, and "
+            + "subscription level. When enabled, messages filtered out due to entry filter logic are counted towards "
+            + "each relevant rate limit."
+    )
+    private boolean dispatchThrottlingForFilteredEntriesEnabled = false;
+
     // <-- dispatcher read settings -->
     @FieldContext(
         dynamic = true,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 2971006..df02bbd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -107,6 +107,9 @@
         long totalBytes = 0;
         int totalChunkedMessages = 0;
         int totalEntries = 0;
+        int filteredMessageCount = 0;
+        int filteredEntryCount = 0;
+        long filteredBytesCount = 0;
         final boolean hasFilter = CollectionUtils.isNotEmpty(entryFilters);
         List<Position> entriesToFiltered = hasFilter ? new ArrayList<>() : null;
         List<PositionImpl> entriesToRedeliver = hasFilter ? new ArrayList<>() : null;
@@ -135,6 +138,9 @@
                 // FilterResult will be always `ACCEPTED` when there is No Filter
                 // dont need to judge whether `hasFilter` is true or not.
                 this.filterRejectedMsgs.add(entryMsgCnt);
+                filteredEntryCount++;
+                filteredMessageCount += entryMsgCnt;
+                filteredBytesCount += metadataAndPayload.readableBytes();
                 entry.release();
                 continue;
             } else if (filterResult == EntryFilter.FilterResult.RESCHEDULE) {
@@ -143,6 +149,9 @@
                 // FilterResult will be always `ACCEPTED` when there is No Filter
                 // dont need to judge whether `hasFilter` is true or not.
                 this.filterRescheduledMsgs.add(entryMsgCnt);
+                filteredEntryCount++;
+                filteredMessageCount += entryMsgCnt;
+                filteredBytesCount += metadataAndPayload.readableBytes();
                 entry.release();
                 continue;
             }
@@ -231,6 +240,11 @@
 
         }
 
+        if (serviceConfig.isDispatchThrottlingForFilteredEntriesEnabled()) {
+            acquirePermitsForDeliveredMessages(subscription.getTopic(), cursor, filteredEntryCount,
+                    filteredMessageCount, filteredBytesCount);
+        }
+
         sendMessageInfo.setTotalMessages(totalMessages);
         sendMessageInfo.setTotalBytes(totalBytes);
         sendMessageInfo.setTotalChunkedMessages(totalChunkedMessages);
@@ -243,6 +257,19 @@
         }
     }
 
+    protected void acquirePermitsForDeliveredMessages(Topic topic, ManagedCursor cursor, long totalEntries,
+                                                      long totalMessagesSent, long totalBytesSent) {
+        if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled()
+                || (cursor != null && !cursor.isActive())) {
+            long permits = dispatchThrottlingOnBatchMessageEnabled ? totalEntries : totalMessagesSent;
+            topic.getBrokerDispatchRateLimiter().ifPresent(rateLimiter ->
+                    rateLimiter.tryDispatchPermit(permits, totalBytesSent));
+            topic.getDispatchRateLimiter().ifPresent(rateLimter ->
+                    rateLimter.tryDispatchPermit(permits, totalBytesSent));
+            getRateLimiter().ifPresent(rateLimiter -> rateLimiter.tryDispatchPermit(permits, totalBytesSent));
+        }
+    }
+
     /**
      * Determine whether the number of consumers on the subscription reaches the threshold.
      * @return
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 02d2e72..15b42fe 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -684,7 +684,7 @@
             totalBytesSent += sendMessageInfo.getTotalBytes();
         }
 
-        acquirePermitsForDeliveredMessages(totalEntries, totalMessagesSent, totalBytesSent);
+        acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);
 
         if (entriesToDispatch > 0) {
             if (log.isDebugEnabled()) {
@@ -700,23 +700,6 @@
         return true;
     }
 
-    private void acquirePermitsForDeliveredMessages(long totalEntries, long totalMessagesSent, long totalBytesSent) {
-        // acquire message-dispatch permits for already delivered messages
-        long permits = dispatchThrottlingOnBatchMessageEnabled ? totalEntries : totalMessagesSent;
-        if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
-            if (topic.getBrokerDispatchRateLimiter().isPresent()) {
-                topic.getBrokerDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent);
-            }
-            if (topic.getDispatchRateLimiter().isPresent()) {
-                topic.getDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent);
-            }
-
-            if (dispatchRateLimiter.isPresent()) {
-                dispatchRateLimiter.get().tryDispatchPermit(permits, totalBytesSent);
-            }
-        }
-    }
-
     private boolean sendChunkedMessagesToConsumers(ReadType readType,
                                                    List<Entry> entries,
                                                    MessageMetadata[] metadataArray) {
@@ -775,7 +758,7 @@
             totalBytesSent += sendMessageInfo.getTotalBytes();
         }
 
-        acquirePermitsForDeliveredMessages(totalEntries, totalMessagesSent, totalBytesSent);
+        acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);
 
         return numConsumers.get() == 0; // trigger a new readMoreEntries() call
     }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index accab20..3ba7a82 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -221,23 +221,8 @@
                     redeliveryTracker, epoch)
             .addListener(future -> {
                 if (future.isSuccess()) {
-                    int permits = dispatchThrottlingOnBatchMessageEnabled ? entries.size()
-                            : sendMessageInfo.getTotalMessages();
-                    // acquire message-dispatch permits for already delivered messages
-                    if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
-                        if (topic.getBrokerDispatchRateLimiter().isPresent()) {
-                            topic.getBrokerDispatchRateLimiter().get().tryDispatchPermit(permits,
-                                    sendMessageInfo.getTotalBytes());
-                        }
-
-                        if (topic.getDispatchRateLimiter().isPresent()) {
-                            topic.getDispatchRateLimiter().get().tryDispatchPermit(permits,
-                                    sendMessageInfo.getTotalBytes());
-                        }
-                        dispatchRateLimiter.ifPresent(rateLimiter ->
-                                rateLimiter.tryDispatchPermit(permits,
-                                        sendMessageInfo.getTotalBytes()));
-                    }
+                    acquirePermitsForDeliveredMessages(topic, cursor, entries.size(),
+                            sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes());
 
                     // Schedule a new read batch operation only after the previous batch has been written to the socket.
                     topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 024ed85..5eb5531 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -296,19 +296,7 @@
         }
 
         // acquire message-dispatch permits for already delivered messages
-        if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
-            long permits = dispatchThrottlingOnBatchMessageEnabled ? totalEntries : totalMessagesSent;
-            if (topic.getBrokerDispatchRateLimiter().isPresent()) {
-                topic.getBrokerDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent);
-            }
-            if (topic.getDispatchRateLimiter().isPresent()) {
-                topic.getDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent);
-            }
-
-            if (dispatchRateLimiter.isPresent()) {
-                dispatchRateLimiter.get().tryDispatchPermit(permits, totalBytesSent);
-            }
-        }
+        acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);
 
         stuckConsumers.clear();
 
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
index b129995..cba15b0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
@@ -22,6 +22,7 @@
 import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import io.netty.buffer.ByteBuf;
@@ -29,11 +30,14 @@
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.impl.EntryImpl;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
 import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
 import org.apache.pulsar.broker.service.persistent.PersistentTopic;
 import org.apache.pulsar.broker.service.plugin.EntryFilter;
@@ -60,8 +64,9 @@
     @BeforeMethod
     public void setup() throws Exception {
         this.svcConfig = mock(ServiceConfiguration.class);
+        when(svcConfig.isDispatchThrottlingForFilteredEntriesEnabled()).thenReturn(true);
         this.subscriptionMock = mock(PersistentSubscription.class);
-        this.helper = new AbstractBaseDispatcherTestHelper(this.subscriptionMock, this.svcConfig);
+        this.helper = new AbstractBaseDispatcherTestHelper(this.subscriptionMock, this.svcConfig, null);
     }
 
     @Test
@@ -89,17 +94,24 @@
                 EntryFilter.FilterResult.REJECT);
         Map<String, EntryFilterWithClassLoader> entryFilters = Map.of("key", mockFilter);
         when(mockTopic.getEntryFilters()).thenReturn(entryFilters);
+        DispatchRateLimiter subscriptionDispatchRateLimiter = mock(DispatchRateLimiter.class);
 
-        this.helper = new AbstractBaseDispatcherTestHelper(this.subscriptionMock, this.svcConfig);
+        this.helper = new AbstractBaseDispatcherTestHelper(this.subscriptionMock, this.svcConfig,
+                subscriptionDispatchRateLimiter);
 
         List<Entry> entries = new ArrayList<>();
 
-        entries.add(EntryImpl.create(1, 2, createMessage("message1", 1)));
+        Entry e = EntryImpl.create(1, 2, createMessage("message1", 1));
+        long expectedBytePermits = e.getLength();
+        entries.add(e);
         SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
         EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
-        //
-        int size = this.helper.filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, null, null, false, null);
+
+        ManagedCursor cursor = mock(ManagedCursor.class);
+
+        int size = this.helper.filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, null, cursor, false, null);
         assertEquals(size, 0);
+        verify(subscriptionDispatchRateLimiter).tryDispatchPermit(1, expectedBytePermits);
     }
 
     @Test
@@ -201,9 +213,18 @@
 
     private static class AbstractBaseDispatcherTestHelper extends AbstractBaseDispatcher {
 
+        private final Optional<DispatchRateLimiter> dispatchRateLimiter;
+
         protected AbstractBaseDispatcherTestHelper(Subscription subscription,
-                                                   ServiceConfiguration serviceConfig) {
+                                                   ServiceConfiguration serviceConfig,
+                                                   DispatchRateLimiter rateLimiter) {
             super(subscription, serviceConfig);
+            dispatchRateLimiter = Optional.ofNullable(rateLimiter);
+        }
+
+        @Override
+        public Optional<DispatchRateLimiter> getRateLimiter() {
+            return dispatchRateLimiter;
         }
 
         @Override