[improve][broker] perf: Reduce stickyHash calculations of non-persistent topics in SHARED subscriptions (#22536)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
index 6b20280..b1c3687 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/Consumer.java
@@ -286,16 +286,29 @@
                 totalChunkedMessages, redeliveryTracker, DEFAULT_CONSUMER_EPOCH);
     }
 
+    public Future<Void> sendMessages(final List<? extends Entry> entries, EntryBatchSizes batchSizes,
+                                     EntryBatchIndexesAcks batchIndexesAcks,
+                                     int totalMessages, long totalBytes, long totalChunkedMessages,
+                                     RedeliveryTracker redeliveryTracker, long epoch) {
+        return sendMessages(entries, null, batchSizes, batchIndexesAcks, totalMessages, totalBytes,
+                totalChunkedMessages, redeliveryTracker, epoch);
+    }
+
     /**
      * Dispatch a list of entries to the consumer. <br/>
      * <b>It is also responsible to release entries data and recycle entries object.</b>
      *
      * @return a SendMessageInfo object that contains the detail of what was sent to consumer
      */
-    public Future<Void> sendMessages(final List<? extends Entry> entries, EntryBatchSizes batchSizes,
+    public Future<Void> sendMessages(final List<? extends Entry> entries,
+                                     final List<Integer> stickyKeyHashes,
+                                     EntryBatchSizes batchSizes,
                                      EntryBatchIndexesAcks batchIndexesAcks,
-                                     int totalMessages, long totalBytes, long totalChunkedMessages,
-                                     RedeliveryTracker redeliveryTracker, long epoch) {
+                                     int totalMessages,
+                                     long totalBytes,
+                                     long totalChunkedMessages,
+                                     RedeliveryTracker redeliveryTracker,
+                                     long epoch) {
         this.lastConsumedTimestamp = System.currentTimeMillis();
 
         if (entries.isEmpty() || totalMessages == 0) {
@@ -323,7 +336,7 @@
                 // because this consumer is possible to disconnect at this time.
                 if (pendingAcks != null) {
                     int batchSize = batchSizes.getBatchSize(i);
-                    int stickyKeyHash = getStickyKeyHash(entry);
+                    int stickyKeyHash = stickyKeyHashes == null ? getStickyKeyHash(entry) : stickyKeyHashes.get(i);
                     long[] ackSet = batchIndexesAcks == null ? null : batchIndexesAcks.getAckSet(i);
                     if (ackSet != null) {
                         unackedMessages -= (batchSize - BitSet.valueOf(ackSet).cardinality());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
index 2cad253..fb7bd22 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumers.java
@@ -126,6 +126,14 @@
                 }
             };
 
+    private static final FastThreadLocal<Map<Consumer, List<Integer>>> localGroupedStickyKeyHashes =
+            new FastThreadLocal<Map<Consumer, List<Integer>>>() {
+                @Override
+                protected Map<Consumer, List<Integer>> initialValue() throws Exception {
+                    return new HashMap<>();
+                }
+            };
+
     @Override
     public void sendMessages(List<Entry> entries) {
         if (entries.isEmpty()) {
@@ -139,28 +147,38 @@
 
         final Map<Consumer, List<Entry>> groupedEntries = localGroupedEntries.get();
         groupedEntries.clear();
+        final Map<Consumer, List<Integer>> consumerStickyKeyHashesMap = localGroupedStickyKeyHashes.get();
+        consumerStickyKeyHashesMap.clear();
 
         for (Entry entry : entries) {
-            Consumer consumer = selector.select(peekStickyKey(entry.getDataBuffer()));
+            byte[] stickyKey = peekStickyKey(entry.getDataBuffer());
+            int stickyKeyHash = StickyKeyConsumerSelector.makeStickyKeyHash(stickyKey);
+
+            Consumer consumer = selector.select(stickyKeyHash);
             if (consumer != null) {
-                groupedEntries.computeIfAbsent(consumer, k -> new ArrayList<>()).add(entry);
+                int startingSize = Math.max(10, entries.size() / (2 * consumerSet.size()));
+                groupedEntries.computeIfAbsent(consumer, k -> new ArrayList<>(startingSize)).add(entry);
+                consumerStickyKeyHashesMap
+                        .computeIfAbsent(consumer, k -> new ArrayList<>(startingSize)).add(stickyKeyHash);
             } else {
                 entry.release();
             }
         }
 
         for (Map.Entry<Consumer, List<Entry>> entriesByConsumer : groupedEntries.entrySet()) {
-            Consumer consumer = entriesByConsumer.getKey();
-            List<Entry> entriesForConsumer = entriesByConsumer.getValue();
+            final Consumer consumer = entriesByConsumer.getKey();
+            final List<Entry> entriesForConsumer = entriesByConsumer.getValue();
+            final List<Integer> stickyKeysForConsumer = consumerStickyKeyHashesMap.get(consumer);
 
             SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
             EntryBatchSizes batchSizes = EntryBatchSizes.get(entriesForConsumer.size());
             filterEntriesForConsumer(entriesForConsumer, batchSizes, sendMessageInfo, null, null, false, consumer);
 
             if (consumer.getAvailablePermits() > 0 && consumer.isWritable()) {
-                consumer.sendMessages(entriesForConsumer, batchSizes, null, sendMessageInfo.getTotalMessages(),
+                consumer.sendMessages(entriesForConsumer, stickyKeysForConsumer, batchSizes,
+                        null, sendMessageInfo.getTotalMessages(),
                         sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(),
-                        getRedeliveryTracker());
+                        getRedeliveryTracker(), Commands.DEFAULT_CONSUMER_EPOCH);
                 TOTAL_AVAILABLE_PERMITS_UPDATER.addAndGet(this, -sendMessageInfo.getTotalMessages());
             } else {
                 entriesForConsumer.forEach(e -> {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
index b2638d5..6b0f48a 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentStickyKeyDispatcherMultipleConsumersTest.java
@@ -128,15 +128,15 @@
                 assertEquals(byteBuf.toString(UTF_8), "message" + index);
             };
             return mockPromise;
-        }).when(consumerMock).sendMessages(any(List.class), any(EntryBatchSizes.class), any(),
-                anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class));
+        }).when(consumerMock).sendMessages(any(List.class), any(List.class), any(EntryBatchSizes.class), any(),
+                anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class), anyLong());
         try {
             nonpersistentDispatcher.sendMessages(entries);
         } catch (Exception e) {
             fail("Failed to sendMessages.", e);
         }
-        verify(consumerMock, times(1)).sendMessages(any(List.class), any(EntryBatchSizes.class),
-                eq(null), anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class));
+        verify(consumerMock, times(1)).sendMessages(any(List.class), any(List.class), any(EntryBatchSizes.class),
+                eq(null), anyInt(), anyLong(), anyLong(), any(RedeliveryTracker.class), anyLong());
     }
 
     @Test(timeOut = 10000)