[feat][broker] Add config to count filtered entries towards rate limits (#17686)
* [feat][broker] Add config to count filtered entries towards rate limits
* Make fixes for checkstyle
* Remove * import
* Fix incorrect conflict resolution in merge commit
### Motivation
Currently, when using entry filters, filtered out messages do not count against the rate limit. Therefore, a subscription that is completely filtered will never be throttled due to rate limiting. When the messages are delivered to the consumer for a filtered subscription, those messages will count against the rate limit, and in that case, the message filtering can be throttled because the check to delay `readMoreEntries()` happens before message filtering. Therefore, the rate limit will essentially be increased as a function of the percent of messages let through the filter (some quick math is that the new rate is likely `dispatchRate * (1 / percentDelivered)`, where percent delivered is a percent as a decimal).
It's possible that some use cases prefer this behavior, but in my case, I think it'd be valuable to include these filtered messages in the dispatch throttling because these messages still cost the broker network, memory, and cpu. This PR adds a configuration to count filtered out messages towards dispatch rate limits for the broker, the topic, and the subscription.
### Modifications
* Add configuration named `dispatchThrottlingForFilteredEntriesEnabled`. Default it to false so we maintain the original behavior. When true, count filtered messages against rate limits.
* Refactor the code to `acquirePermitsForDeliveredMessages` so that it is in the `AbstractBaseDispatcher`, which makes it available to the entry filtering logic.
### Verifying this change
A new test is added as part of this PR.
### Does this pull request potentially affect one of the following parts:
This PR introduces a new config while maintaining the current behavior.
### Documentation
- [x] `doc-not-needed`
Config docs are auto-generated.
diff --git a/conf/broker.conf b/conf/broker.conf
index 30e79eb..d117d67 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -453,6 +453,12 @@
# The directory for all the entry filter implementations
entryFiltersDirectory=
+# Whether the broker should count filtered entries in dispatch rate limit calculations. When disabled,
+# only messages sent to a consumer count towards a dispatch rate limit at the broker, topic, and
+# subscription level. When enabled, messages filtered out due to entry filter logic are counted towards
+# each relevant rate limit.
+dispatchThrottlingForFilteredEntriesEnabled=false
+
# Whether allow topic level entry filters policies overrides broker configuration.
allowOverrideEntryFilters=false
diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index a6e9a55..8c88304 100644
--- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -1050,6 +1050,16 @@
)
private boolean dispatcherDispatchMessagesInSubscriptionThread = true;
+ @FieldContext(
+ dynamic = false,
+ category = CATEGORY_SERVER,
+ doc = "Whether the broker should count filtered entries in dispatch rate limit calculations. When disabled, "
+ + "only messages sent to a consumer count towards a dispatch rate limit at the broker, topic, and "
+ + "subscription level. When enabled, messages filtered out due to entry filter logic are counted towards "
+ + "each relevant rate limit."
+ )
+ private boolean dispatchThrottlingForFilteredEntriesEnabled = false;
+
// <-- dispatcher read settings -->
@FieldContext(
dynamic = true,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
index 2971006..df02bbd 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractBaseDispatcher.java
@@ -107,6 +107,9 @@
long totalBytes = 0;
int totalChunkedMessages = 0;
int totalEntries = 0;
+ int filteredMessageCount = 0;
+ int filteredEntryCount = 0;
+ long filteredBytesCount = 0;
final boolean hasFilter = CollectionUtils.isNotEmpty(entryFilters);
List<Position> entriesToFiltered = hasFilter ? new ArrayList<>() : null;
List<PositionImpl> entriesToRedeliver = hasFilter ? new ArrayList<>() : null;
@@ -135,6 +138,9 @@
// FilterResult will be always `ACCEPTED` when there is No Filter
// dont need to judge whether `hasFilter` is true or not.
this.filterRejectedMsgs.add(entryMsgCnt);
+ filteredEntryCount++;
+ filteredMessageCount += entryMsgCnt;
+ filteredBytesCount += metadataAndPayload.readableBytes();
entry.release();
continue;
} else if (filterResult == EntryFilter.FilterResult.RESCHEDULE) {
@@ -143,6 +149,9 @@
// FilterResult will be always `ACCEPTED` when there is No Filter
// dont need to judge whether `hasFilter` is true or not.
this.filterRescheduledMsgs.add(entryMsgCnt);
+ filteredEntryCount++;
+ filteredMessageCount += entryMsgCnt;
+ filteredBytesCount += metadataAndPayload.readableBytes();
entry.release();
continue;
}
@@ -231,6 +240,11 @@
}
+ if (serviceConfig.isDispatchThrottlingForFilteredEntriesEnabled()) {
+ acquirePermitsForDeliveredMessages(subscription.getTopic(), cursor, filteredEntryCount,
+ filteredMessageCount, filteredBytesCount);
+ }
+
sendMessageInfo.setTotalMessages(totalMessages);
sendMessageInfo.setTotalBytes(totalBytes);
sendMessageInfo.setTotalChunkedMessages(totalChunkedMessages);
@@ -243,6 +257,19 @@
}
}
+ protected void acquirePermitsForDeliveredMessages(Topic topic, ManagedCursor cursor, long totalEntries,
+ long totalMessagesSent, long totalBytesSent) {
+ if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled()
+ || (cursor != null && !cursor.isActive())) {
+ long permits = dispatchThrottlingOnBatchMessageEnabled ? totalEntries : totalMessagesSent;
+ topic.getBrokerDispatchRateLimiter().ifPresent(rateLimiter ->
+ rateLimiter.tryDispatchPermit(permits, totalBytesSent));
+ topic.getDispatchRateLimiter().ifPresent(rateLimter ->
+ rateLimter.tryDispatchPermit(permits, totalBytesSent));
+ getRateLimiter().ifPresent(rateLimiter -> rateLimiter.tryDispatchPermit(permits, totalBytesSent));
+ }
+ }
+
/**
* Determine whether the number of consumers on the subscription reaches the threshold.
* @return
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
index 02d2e72..15b42fe 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java
@@ -684,7 +684,7 @@
totalBytesSent += sendMessageInfo.getTotalBytes();
}
- acquirePermitsForDeliveredMessages(totalEntries, totalMessagesSent, totalBytesSent);
+ acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);
if (entriesToDispatch > 0) {
if (log.isDebugEnabled()) {
@@ -700,23 +700,6 @@
return true;
}
- private void acquirePermitsForDeliveredMessages(long totalEntries, long totalMessagesSent, long totalBytesSent) {
- // acquire message-dispatch permits for already delivered messages
- long permits = dispatchThrottlingOnBatchMessageEnabled ? totalEntries : totalMessagesSent;
- if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
- if (topic.getBrokerDispatchRateLimiter().isPresent()) {
- topic.getBrokerDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent);
- }
- if (topic.getDispatchRateLimiter().isPresent()) {
- topic.getDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent);
- }
-
- if (dispatchRateLimiter.isPresent()) {
- dispatchRateLimiter.get().tryDispatchPermit(permits, totalBytesSent);
- }
- }
- }
-
private boolean sendChunkedMessagesToConsumers(ReadType readType,
List<Entry> entries,
MessageMetadata[] metadataArray) {
@@ -775,7 +758,7 @@
totalBytesSent += sendMessageInfo.getTotalBytes();
}
- acquirePermitsForDeliveredMessages(totalEntries, totalMessagesSent, totalBytesSent);
+ acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);
return numConsumers.get() == 0; // trigger a new readMoreEntries() call
}
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index accab20..3ba7a82 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -221,23 +221,8 @@
redeliveryTracker, epoch)
.addListener(future -> {
if (future.isSuccess()) {
- int permits = dispatchThrottlingOnBatchMessageEnabled ? entries.size()
- : sendMessageInfo.getTotalMessages();
- // acquire message-dispatch permits for already delivered messages
- if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
- if (topic.getBrokerDispatchRateLimiter().isPresent()) {
- topic.getBrokerDispatchRateLimiter().get().tryDispatchPermit(permits,
- sendMessageInfo.getTotalBytes());
- }
-
- if (topic.getDispatchRateLimiter().isPresent()) {
- topic.getDispatchRateLimiter().get().tryDispatchPermit(permits,
- sendMessageInfo.getTotalBytes());
- }
- dispatchRateLimiter.ifPresent(rateLimiter ->
- rateLimiter.tryDispatchPermit(permits,
- sendMessageInfo.getTotalBytes()));
- }
+ acquirePermitsForDeliveredMessages(topic, cursor, entries.size(),
+ sendMessageInfo.getTotalMessages(), sendMessageInfo.getTotalBytes());
// Schedule a new read batch operation only after the previous batch has been written to the socket.
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName,
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 024ed85..5eb5531 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -296,19 +296,7 @@
}
// acquire message-dispatch permits for already delivered messages
- if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
- long permits = dispatchThrottlingOnBatchMessageEnabled ? totalEntries : totalMessagesSent;
- if (topic.getBrokerDispatchRateLimiter().isPresent()) {
- topic.getBrokerDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent);
- }
- if (topic.getDispatchRateLimiter().isPresent()) {
- topic.getDispatchRateLimiter().get().tryDispatchPermit(permits, totalBytesSent);
- }
-
- if (dispatchRateLimiter.isPresent()) {
- dispatchRateLimiter.get().tryDispatchPermit(permits, totalBytesSent);
- }
- }
+ acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);
stuckConsumers.clear();
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
index b129995..cba15b0 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/AbstractBaseDispatcherTest.java
@@ -22,6 +22,7 @@
import static org.apache.pulsar.common.protocol.Commands.serializeMetadataAndPayload;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import io.netty.buffer.ByteBuf;
@@ -29,11 +30,14 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.Entry;
+import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.EntryImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.service.plugin.EntryFilter;
@@ -60,8 +64,9 @@
@BeforeMethod
public void setup() throws Exception {
this.svcConfig = mock(ServiceConfiguration.class);
+ when(svcConfig.isDispatchThrottlingForFilteredEntriesEnabled()).thenReturn(true);
this.subscriptionMock = mock(PersistentSubscription.class);
- this.helper = new AbstractBaseDispatcherTestHelper(this.subscriptionMock, this.svcConfig);
+ this.helper = new AbstractBaseDispatcherTestHelper(this.subscriptionMock, this.svcConfig, null);
}
@Test
@@ -89,17 +94,24 @@
EntryFilter.FilterResult.REJECT);
Map<String, EntryFilterWithClassLoader> entryFilters = Map.of("key", mockFilter);
when(mockTopic.getEntryFilters()).thenReturn(entryFilters);
+ DispatchRateLimiter subscriptionDispatchRateLimiter = mock(DispatchRateLimiter.class);
- this.helper = new AbstractBaseDispatcherTestHelper(this.subscriptionMock, this.svcConfig);
+ this.helper = new AbstractBaseDispatcherTestHelper(this.subscriptionMock, this.svcConfig,
+ subscriptionDispatchRateLimiter);
List<Entry> entries = new ArrayList<>();
- entries.add(EntryImpl.create(1, 2, createMessage("message1", 1)));
+ Entry e = EntryImpl.create(1, 2, createMessage("message1", 1));
+ long expectedBytePermits = e.getLength();
+ entries.add(e);
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
- //
- int size = this.helper.filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, null, null, false, null);
+
+ ManagedCursor cursor = mock(ManagedCursor.class);
+
+ int size = this.helper.filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, null, cursor, false, null);
assertEquals(size, 0);
+ verify(subscriptionDispatchRateLimiter).tryDispatchPermit(1, expectedBytePermits);
}
@Test
@@ -201,9 +213,18 @@
private static class AbstractBaseDispatcherTestHelper extends AbstractBaseDispatcher {
+ private final Optional<DispatchRateLimiter> dispatchRateLimiter;
+
protected AbstractBaseDispatcherTestHelper(Subscription subscription,
- ServiceConfiguration serviceConfig) {
+ ServiceConfiguration serviceConfig,
+ DispatchRateLimiter rateLimiter) {
super(subscription, serviceConfig);
+ dispatchRateLimiter = Optional.ofNullable(rateLimiter);
+ }
+
+ @Override
+ public Optional<DispatchRateLimiter> getRateLimiter() {
+ return dispatchRateLimiter;
}
@Override