[improve][broker] perf: Reduce stickyHash calculations of non-persistent topics in SHARED subscriptions (#22536)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 6b20280..b1c3687 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -286,16 +286,29 @@
totalChunkedMessages, redeliveryTracker, DEFAULT_CONSUMER_EPOCH);
}
+ public Future<Void> sendMessages(final List<? extends Entry> entries, EntryBatchSizes batchSizes,
+ EntryBatchIndexesAcks batchIndexesAcks,
+ int totalMessages, long totalBytes, long totalChunkedMessages,
+ RedeliveryTracker redeliveryTracker, long epoch) {
+ return sendMessages(entries, null, batchSizes, batchIndexesAcks, totalMessages, totalBytes,
+ totalChunkedMessages, redeliveryTracker, epoch);
+ }
+
/**
* Dispatch a list of entries to the consumer. <br/>
* <b>It is also responsible to release entries data and recycle entries object.</b>
*
* @return a SendMessageInfo object that contains the detail of what was sent to consumer
*/
- public Future<Void> sendMessages(final List<? extends Entry> entries, EntryBatchSizes batchSizes,
+ public Future<Void> sendMessages(final List<? extends Entry> entries,
+ final List<Integer> stickyKeyHashes,
+ EntryBatchSizes batchSizes,
EntryBatchIndexesAcks batchIndexesAcks,
- int totalMessages, long totalBytes, long totalChunkedMessages,
- RedeliveryTracker redeliveryTracker, long epoch) {
+ int totalMessages,
+ long totalBytes,
+ long totalChunkedMessages,
+ RedeliveryTracker redeliveryTracker,
+ long epoch) {
this.lastConsumedTimestamp = System.currentTimeMillis();
if (entries.isEmpty() || totalMessages == 0) {
@@ -323,7 +336,7 @@
// because this consumer is possible to disconnect at this time.
if (pendingAcks != null) {
int batchSize = batchSizes.getBatchSize(i);
- int stickyKeyHash = getStickyKeyHash(entry);
+ int stickyKeyHash = stickyKeyHashes == null ? getStickyKeyHash(entry) : stickyKeyHashes.get(i);
long[] ackSet = batchIndexesAcks == null ? null : batchIndexesAcks.getAckSet(i);
if (ackSet != null) {
unackedMessages -= (batchSize - BitSet.valueOf(ackSet).cardinality());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
index 2cad253..fb7bd22 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
@@ -126,6 +126,14 @@
}
};
+ private static final FastThreadLocal<Map<Consumer, List<Integer>>> localGroupedStickyKeyHashes =
+ new FastThreadLocal<Map<Consumer, List<Integer>>>() {
+ @Override
+ protected Map<Consumer, List<Integer>> initialValue() throws Exception {
+ return new HashMap<>();
+ }
+ };
+
@Override
public void sendMessages(List<Entry> entries) {
if (entries.isEmpty()) {
@@ -139,28 +147,38 @@
final Map<Consumer, List<Entry>> groupedEntries = localGroupedEntries.get();
groupedEntries.clear();
+ final Map<Consumer, List<Integer>> consumerStickyKeyHashesMap = localGroupedStickyKeyHashes.get();
+ consumerStickyKeyHashesMap.clear();
for (Entry entry : entries) {
- Consumer consumer = selector.select(peekStickyKey(entry.getDataBuffer()));
+ byte[] stickyKey = peekStickyKey(entry.getDataBuffer());
+ int stickyKeyHash = StickyKeyConsumerSelector.makeStickyKeyHash(stickyKey);
+
+ Consumer consumer = selector.select(stickyKeyHash);
if (consumer != null) {
- groupedEntries.computeIfAbsent(consumer, k -> new ArrayList<>()).add(entry);
+ int startingSize = Math.max(10, entries.size() / (2 * consumerSet.size()));
+ groupedEntries.computeIfAbsent(consumer, k -> new ArrayList<>(startingSize)).add(entry);
+ consumerStickyKeyHashesMap
+ .computeIfAbsent(consumer, k -> new ArrayList<>(startingSize)).add(stickyKeyHash);
} else {
entry.release();
}
}
for (Map.Entry<Consumer, List<Entry>> entriesByConsumer : groupedEntries.entrySet()) {
- Consumer consumer = entriesByConsumer.getKey();
- List<Entry> entriesForConsumer = entriesByConsumer.getValue();
+ final Consumer consumer = entriesByConsumer.getKey();
+ final List<Entry> entriesForConsumer = entriesByConsumer.getValue();
+ final List<Integer> stickyKeysForConsumer = consumerStickyKeyHashesMap.get(consumer);
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForConsumer.size());
filterEntriesForConsumer(entriesForConsumer, batchSizes, sendMessageInfo, null, null, false, consumer);
if (consumer.getAvailablePermits() > 0 && consumer.isWritable()) {
- consumer.sendMessages(entriesForConsumer, batchSizes, null, sendMessageInfo.getTotalMessages(),
+ consumer.sendMessages(entriesForConsumer, stickyKeysForConsumer, batchSizes,
+ null, sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(),
- getRedeliveryTracker());
+ getRedeliveryTracker(), Commands.DEFAULT_CONSUMER_EPOCH);
TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -sendMessageInfo.getTotalMessages());
} else {
entriesForConsumer.forEach(e -> {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
index b2638d5..6b0f48a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -128,15 +128,15 @@
assertEquals(byteBuf.toString(UTF_8), "message" + index);
};
return mockPromise;
- }).when(consumerMock).sendMessages(any(List.class), any(EntryBatchSizes.class), any(),
- anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class));
+ }).when(consumerMock).sendMessages(any(List.class), any(List.class), any(EntryBatchSizes.class), any(),
+ anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class), anyLong());
try {
nonpersistentDispatcher.sendMessages(entries);
} catch (Exception e) {
fail("Failed to sendMessages.", e);
}
- verify(consumerMock, times(1)).sendMessages(any(List.class), any(EntryBatchSizes.class),
- eq(null), anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class));
+ verify(consumerMock, times(1)).sendMessages(any(List.class), any(List.class), any(EntryBatchSizes.class),
+ eq(null), anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class), anyLong());
}
@Test(timeOut = 10000)