Fix reader skipped remaining compacted data during the topic unloading. (#13629)

### Motivation

To fix the reader skipping remaining compacted data while the topic has been unloaded.
#11287 fixed the data skipped issue while the reader first time to read the messages
with the earliest position. But if the reader has consumed some messages from the
compacted ledger but not all, the start position will not be `earliest`, the broker
will rewind the cursor for the reader to the next valid position of the original topic.
So the remaining messages in the compacted ledger will be skipped.

Here are the logs from the broker:

```
10:44:36.035 [bookkeeper-ml-scheduler-OrderedScheduler-4-0] INFO  org.apache.pulsar.broker.service.BrokerService - Created topic persistent://xxx/product-full-prod/5126 - dedup is disabled
10:44:36.035 [bookkeeper-ml-scheduler-OrderedScheduler-4-0] INFO  org.apache.pulsar.broker.service.persistent.PersistentTopic - [persistent://xxx/product-full-prod/5126][xxx] Creating non-durable subscription at msg id 181759:14:-1:-1
10:44:36.035 [bookkeeper-ml-scheduler-OrderedScheduler-4-0] INFO  org.apache.bookkeeper.mledger.impl.NonDurableCursorImpl - [xxx/product-full-prod/persistent/5126] Created non-durable cursor read-position=221199:0 mark-delete-position=181759:13
10:44:36.035 [bookkeeper-ml-scheduler-OrderedScheduler-4-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl - [xxx/product-full-prod/persistent/5126] Opened new cursor: NonDurableCursorImpl{ledger=xxx/product-full-prod/persistent/5126, ackPos=181759:13, readPos=221199:0}
10:44:36.035 [bookkeeper-ml-scheduler-OrderedScheduler-4-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [xxx/product-full-prod/persistent/5126-xxx] Rewind from 221199:0 to 221199:0
```

There some many compacted messages after `181759:13`, but the broker will not dispatch them to the reader.
The issue also can be reproduced by the unit test that was added in this PR.

### Modification

If the cursor with `readCompacted = true`, just rewind to the next message of the mark delete position,
so that the reader can continue to read the data from the compacted ledger.

### Verification

A new test added for testing the reader can get all the compacted messages and non-compacted messages from the topic during the topic unloading.
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
index 9ff2fba..74ab2ce 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedger.java
@@ -259,7 +259,8 @@
      */
     ManagedCursor newNonDurableCursor(Position startCursorPosition) throws ManagedLedgerException;
     ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionName) throws ManagedLedgerException;
-    ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionName, InitialPosition initialPosition) throws ManagedLedgerException;
+    ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionName, InitialPosition initialPosition,
+                                      boolean isReadCompacted) throws ManagedLedgerException;
 
     /**
      * Delete a ManagedCursor asynchronously.
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
index d2fc9060..fc42fe2 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java
@@ -1033,11 +1033,12 @@
 
     @Override
     public ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionName) throws ManagedLedgerException {
-        return newNonDurableCursor(startPosition, subscriptionName, InitialPosition.Latest);
+        return newNonDurableCursor(startPosition, subscriptionName, InitialPosition.Latest, false);
     }
 
     @Override
-    public ManagedCursor newNonDurableCursor(Position startCursorPosition, String cursorName, InitialPosition initialPosition)
+    public ManagedCursor newNonDurableCursor(Position startCursorPosition, String cursorName, InitialPosition initialPosition,
+                                             boolean isReadCompacted)
             throws ManagedLedgerException {
         Objects.requireNonNull(cursorName, "cursor name can't be null");
         checkManagedLedgerIsOpen();
@@ -1052,7 +1053,7 @@
         }
 
         NonDurableCursorImpl cursor = new NonDurableCursorImpl(bookKeeper, config, this, cursorName,
-                (PositionImpl) startCursorPosition, initialPosition);
+                (PositionImpl) startCursorPosition, initialPosition, isReadCompacted);
         cursor.setActive();
 
         log.info("[{}] Opened new cursor: {}", name, cursor);
diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
index 0f7ffe41..1d545bd 100644
--- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
+++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/NonDurableCursorImpl.java
@@ -33,11 +33,12 @@
 
 public class NonDurableCursorImpl extends ManagedCursorImpl {
 
-    private volatile boolean readCompacted;
+    private final boolean readCompacted;
 
     NonDurableCursorImpl(BookKeeper bookkeeper, ManagedLedgerConfig config, ManagedLedgerImpl ledger, String cursorName,
-                         PositionImpl startCursorPosition, CommandSubscribe.InitialPosition initialPosition) {
+                         PositionImpl startCursorPosition, CommandSubscribe.InitialPosition initialPosition, boolean isReadCompacted) {
         super(bookkeeper, config, ledger, cursorName);
+        this.readCompacted = isReadCompacted;
 
         // Compare with "latest" position marker by using only the ledger id. Since the C++ client is using 48bits to
         // store the entryId, it's not able to pass a Long.max() as entryId. In this case there's no point to require
@@ -67,7 +68,7 @@
 
     private void recoverCursor(PositionImpl mdPosition) {
         Pair<PositionImpl, Long> lastEntryAndCounter = ledger.getLastPositionAndCounter();
-        this.readPosition = ledger.getNextValidPosition(mdPosition);
+        this.readPosition = isReadCompacted() ? mdPosition.getNext() : ledger.getNextValidPosition(mdPosition);
         markDeletePosition = mdPosition;
 
         // Initialize the counter such that the difference between the messages written on the ML and the
@@ -118,15 +119,23 @@
         callback.deleteCursorComplete(ctx);
     }
 
-    public void setReadCompacted(boolean readCompacted) {
-        this.readCompacted = readCompacted;
-    }
-
     public boolean isReadCompacted() {
         return readCompacted;
     }
 
     @Override
+    public void rewind() {
+        // For reading the compacted data,
+        // we couldn't reset the read position to the next valid position of the original topic.
+        // Otherwise, the remaining data in the compacted ledger will be skipped.
+        if (!readCompacted) {
+            super.rewind();
+        } else {
+            readPosition = markDeletePosition.getNext();
+        }
+    }
+
+    @Override
     public synchronized String toString() {
         return MoreObjects.toStringHelper(this).add("ledger", ledger.getName()).add("ackPos", markDeletePosition)
                 .add("readPos", readPosition).toString();
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
index 4c7ea45..8cab06b 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/AbstractDispatcherSingleActiveConsumer.java
@@ -27,7 +27,6 @@
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
 import org.apache.bookkeeper.mledger.ManagedCursor;
-import org.apache.bookkeeper.mledger.impl.NonDurableCursorImpl;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
 import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
@@ -181,9 +180,6 @@
                 consumer.notifyActiveConsumerChange(currentActiveConsumer);
             }
         }
-        if (cursor != null && !cursor.isDurable() && cursor instanceof NonDurableCursorImpl) {
-            ((NonDurableCursorImpl) cursor).setReadCompacted(ACTIVE_CONSUMER_UPDATER.get(this).readCompacted());
-        }
 
     }
 
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
index efa73ed..a3d2ff3 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
@@ -755,7 +755,7 @@
                     getDurableSubscription(subscriptionName, initialPosition, startMessageRollbackDurationSec,
                             replicatedSubscriptionState, subscriptionProperties)
                     : getNonDurableSubscription(subscriptionName, startMessageId, initialPosition,
-                    startMessageRollbackDurationSec);
+                    startMessageRollbackDurationSec, readCompacted);
 
             int maxUnackedMessages = isDurable
                     ? getMaxUnackedMessagesOnConsumer()
@@ -921,7 +921,8 @@
     }
 
     private CompletableFuture<? extends Subscription> getNonDurableSubscription(String subscriptionName,
-            MessageId startMessageId, InitialPosition initialPosition, long startMessageRollbackDurationSec) {
+            MessageId startMessageId, InitialPosition initialPosition, long startMessageRollbackDurationSec,
+            boolean isReadCompacted) {
         log.info("[{}][{}] Creating non-durable subscription at msg id {}", topic, subscriptionName, startMessageId);
 
         CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();
@@ -954,7 +955,8 @@
                 Position startPosition = new PositionImpl(ledgerId, entryId);
                 ManagedCursor cursor = null;
                 try {
-                    cursor = ledger.newNonDurableCursor(startPosition, subscriptionName, initialPosition);
+                    cursor = ledger.newNonDurableCursor(startPosition, subscriptionName, initialPosition,
+                            isReadCompacted);
                 } catch (ManagedLedgerException e) {
                     return FutureUtil.failedFuture(e);
                 }
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
index 4d00d28..accce4ef 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/compaction/CompactedTopicTest.java
@@ -734,4 +734,59 @@
         Assert.assertNull(reader.readNext(3, TimeUnit.SECONDS));
     }
 
+    public void testReadCompleteMessagesDuringTopicUnloading() throws Exception {
+        String topic = "persistent://my-property/use/my-ns/testReadCompleteMessagesDuringTopicUnloading-" +
+                UUID.randomUUID();
+        final int numMessages = 1000;
+        @Cleanup
+        Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
+                .topic(topic)
+                .blockIfQueueFull(true)
+                .enableBatching(false)
+                .create();
+        CompletableFuture<MessageId> lastMessage = null;
+        for (int i = 0; i < numMessages; ++i) {
+            lastMessage = producer.newMessage().key(i + "").value(String.format("msg [%d]", i)).sendAsync();
+        }
+        producer.flush();
+        lastMessage.join();
+        admin.topics().triggerCompaction(topic);
+        Awaitility.await().untilAsserted(() -> {
+            PersistentTopicInternalStats stats = admin.topics().getInternalStats(topic);
+            Assert.assertNotEquals(stats.compactedLedger.ledgerId, -1);
+            Assert.assertEquals(stats.compactedLedger.entries, numMessages);
+            Assert.assertEquals(admin.topics().getStats(topic)
+                    .getSubscriptions().get(COMPACTION_SUBSCRIPTION).getConsumers().size(), 0);
+            Assert.assertEquals(stats.lastConfirmedEntry, stats.cursors.get(COMPACTION_SUBSCRIPTION).markDeletePosition);
+        });
+        // Unload the topic to make sure the original ledger been deleted.
+        admin.topics().unload(topic);
+        // Produce more messages to the original topic
+        for (int i = 0; i < numMessages; ++i) {
+            lastMessage = producer.newMessage().key(i + numMessages + "").value(String.format("msg [%d]", i + numMessages)).sendAsync();
+        }
+        producer.flush();
+        lastMessage.join();
+        // For now the topic has 1000 messages in the compacted ledger and 1000 messages in the original topic.
+        @Cleanup
+        Reader<String> reader = pulsarClient.newReader(Schema.STRING)
+                .topic(topic)
+                .startMessageIdInclusive()
+                .startMessageId(MessageId.earliest)
+                .readCompacted(true)
+                .create();
+
+        // Unloading the topic during reading the data to make sure the reader will not miss any messages.
+        for (int i = 0; i < numMessages / 2; ++i) {
+            Assert.assertEquals(reader.readNext().getValue(), String.format("msg [%d]", i));
+        }
+        admin.topics().unload(topic);
+        for (int i = 0; i < numMessages / 2; ++i) {
+            Assert.assertEquals(reader.readNext().getValue(), String.format("msg [%d]", i + numMessages / 2));
+        }
+        admin.topics().unload(topic);
+        for (int i = 0; i < numMessages; ++i) {
+            Assert.assertEquals(reader.readNext().getValue(), String.format("msg [%d]", i + numMessages));
+        }
+    }
 }
diff --git a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
index 74d7ec7..3cd36ce 100644
--- a/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
+++ b/tiered-storage/jcloud/src/test/java/org/apache/bookkeeper/mledger/offload/jcloud/impl/MockManagedLedger.java
@@ -122,8 +122,8 @@
 
     @Override
     public ManagedCursor newNonDurableCursor(Position startPosition, String subscriptionName,
-                                             CommandSubscribe.InitialPosition initialPosition) throws
-            ManagedLedgerException {
+                                             CommandSubscribe.InitialPosition initialPosition,
+                                             boolean isReadCompacted) throws ManagedLedgerException {
         return null;
     }