[fix][broker] Fix issue with consumer read uncommitted messages from compacted topic (#21465)
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
index 291d46d..f062ed9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherSingleActiveConsumer.java
@@ -367,8 +367,8 @@
                 }
                 havePendingRead = true;
                 if (consumer.readCompacted()) {
-                    topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, bytesToRead, isFirstRead,
-                            this, consumer);
+                    topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, bytesToRead,
+                        topic.getMaxReadPosition(), isFirstRead, this, consumer);
                 } else {
                     ReadEntriesCtx readEntriesCtx =
                             ReadEntriesCtx.create(consumer, consumer.getConsumerEpoch());
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
index b64ad50..c0f8e40 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStreamingDispatcherSingleActiveConsumer.java
@@ -217,8 +217,8 @@
                 havePendingRead = true;
 
                 if (consumer.readCompacted()) {
-                    topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, bytesToRead, isFirstRead,
-                            this, consumer);
+                    topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, bytesToRead,
+                        PositionImpl.LATEST, isFirstRead, this, consumer);
                 } else {
                     streamingEntryReader.asyncReadEntries(messagesToRead, bytesToRead, consumer);
                 }
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
index 3cbccd4..ab1f1b9 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopic.java
@@ -24,6 +24,7 @@
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.Position;
+import org.apache.bookkeeper.mledger.impl.PositionImpl;
 import org.apache.pulsar.broker.service.Consumer;
 
 public interface CompactedTopic {
@@ -32,6 +33,7 @@
     void asyncReadEntriesOrWait(ManagedCursor cursor,
                                 int maxEntries,
                                 long bytesToRead,
+                                PositionImpl maxReadPosition,
                                 boolean isFirstRead,
                                 ReadEntriesCallback callback,
                                 Consumer consumer);
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index b20ff43..9b25b89 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -89,6 +89,7 @@
     public void asyncReadEntriesOrWait(ManagedCursor cursor,
                                        int maxEntries,
                                        long bytesToRead,
+                                       PositionImpl maxReadPosition,
                                        boolean isFirstRead,
                                        ReadEntriesCallback callback, Consumer consumer) {
         synchronized (this) {
@@ -103,7 +104,7 @@
             ReadEntriesCtx readEntriesCtx = ReadEntriesCtx.create(consumer, DEFAULT_CONSUMER_EPOCH);
             if (compactionHorizon == null
                 || compactionHorizon.compareTo(cursorPosition) < 0) {
-                cursor.asyncReadEntriesOrWait(maxEntries, bytesToRead, callback, readEntriesCtx, PositionImpl.LATEST);
+                cursor.asyncReadEntriesOrWait(maxEntries, bytesToRead, callback, readEntriesCtx, maxReadPosition);
             } else {
                 ManagedCursorImpl managedCursor = (ManagedCursorImpl) cursor;
                 int numberOfEntriesToRead = managedCursor.applyMaxSizeCap(maxEntries, bytesToRead);
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
index 740854a..9d162e1 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java
@@ -1625,4 +1625,60 @@
         Transaction abortingTxn = transaction;
         Awaitility.await().until(() -> abortingTxn.getState() == Transaction.State.ABORTING);
     }
+
+
+    @Test
+    public void testReadCommittedWithReadCompacted() throws Exception{
+        final String namespace = "tnx/ns-prechecks";
+        final String topic = "persistent://" + namespace + "/test_transaction_topic";
+        admin.namespaces().createNamespace(namespace);
+        admin.topics().createNonPartitionedTopic(topic);
+
+        admin.topicPolicies().setCompactionThreshold(topic, 100 * 1024 * 1024);
+
+        @Cleanup
+        Consumer<String> consumer = this.pulsarClient.newConsumer(Schema.STRING)
+                .topic(topic)
+                .subscriptionName("sub")
+                .subscriptionType(SubscriptionType.Exclusive)
+                .readCompacted(true)
+                .subscribe();
+
+        @Cleanup
+        Producer<String> producer = this.pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .create();
+
+        producer.newMessage().key("K1").value("V1").send();
+
+        Transaction txn = pulsarClient.newTransaction()
+                .withTransactionTimeout(1, TimeUnit.MINUTES).build().get();
+        producer.newMessage(txn).key("K2").value("V2").send();
+        producer.newMessage(txn).key("K3").value("V3").send();
+
+        List<String> messages = new ArrayList<>();
+        while (true) {
+            Message<String> message = consumer.receive(5, TimeUnit.SECONDS);
+            if (message == null) {
+                break;
+            }
+            messages.add(message.getValue());
+        }
+
+        Assert.assertEquals(messages, List.of("V1"));
+
+        txn.commit();
+
+        messages.clear();
+
+        while (true) {
+            Message<String> message = consumer.receive(5, TimeUnit.SECONDS);
+            if (message == null) {
+                break;
+            }
+            messages.add(message.getValue());
+        }
+
+        Assert.assertEquals(messages, List.of("V2", "V3"));
+    }
 }