[fix][broker] Fix issue with GetMessageIdByTimestamp can't find match messageId from compacted ledger (#21600)

diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
index 4856e52..0dbd4f8 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java
@@ -2882,28 +2882,62 @@
                     throw new RestException(Status.METHOD_NOT_ALLOWED,
                         "Get message ID by timestamp on a non-persistent topic is not allowed");
                 }
-                ManagedLedger ledger = ((PersistentTopic) topic).getManagedLedger();
-                return ledger.asyncFindPosition(entry -> {
-                    try {
-                        long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer());
-                        return MessageImpl.isEntryPublishedEarlierThan(entryTimestamp, timestamp);
-                    } catch (Exception e) {
-                        log.error("[{}] Error deserializing message for message position find", topicName, e);
-                    } finally {
-                        entry.release();
+                final PersistentTopic persistentTopic = (PersistentTopic) topic;
+
+                return persistentTopic.getTopicCompactionService().readLastCompactedEntry().thenCompose(lastEntry -> {
+                    if (lastEntry == null) {
+                        return findMessageIdByPublishTime(timestamp, persistentTopic.getManagedLedger());
                     }
-                    return false;
-                }).thenApply(position -> {
-                    if (position == null) {
-                        return null;
+                    MessageMetadata metadata;
+                    Position position = lastEntry.getPosition();
+                    try {
+                        metadata = Commands.parseMessageMetadata(lastEntry.getDataBuffer());
+                    } finally {
+                        lastEntry.release();
+                    }
+                    if (timestamp == metadata.getPublishTime()) {
+                        return CompletableFuture.completedFuture(new MessageIdImpl(position.getLedgerId(),
+                                position.getEntryId(), topicName.getPartitionIndex()));
+                    } else if (timestamp < metadata.getPublishTime()) {
+                        return persistentTopic.getTopicCompactionService().findEntryByPublishTime(timestamp)
+                                .thenApply(compactedEntry -> {
+                                    try {
+                                        return new MessageIdImpl(compactedEntry.getLedgerId(),
+                                                compactedEntry.getEntryId(), topicName.getPartitionIndex());
+                                    } finally {
+                                        compactedEntry.release();
+                                    }
+                                });
                     } else {
-                        return new MessageIdImpl(position.getLedgerId(), position.getEntryId(),
-                            topicName.getPartitionIndex());
+                        return findMessageIdByPublishTime(timestamp, persistentTopic.getManagedLedger());
                     }
                 });
             });
     }
 
+    private CompletableFuture<MessageId> findMessageIdByPublishTime(long timestamp, ManagedLedger managedLedger) {
+        return managedLedger.asyncFindPosition(entry -> {
+            try {
+                long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer());
+                return MessageImpl.isEntryPublishedEarlierThan(entryTimestamp, timestamp);
+            } catch (Exception e) {
+                log.error("[{}] Error deserializing message for message position find",
+                    topicName,
+                    e);
+            } finally {
+                entry.release();
+            }
+            return false;
+        }).thenApply(position -> {
+            if (position == null) {
+                return null;
+            } else {
+                return new MessageIdImpl(position.getLedgerId(), position.getEntryId(),
+                    topicName.getPartitionIndex());
+            }
+        });
+    }
+
     protected CompletableFuture<Response> internalPeekNthMessageAsync(String subName, int messagePosition,
                                                                       boolean authoritative) {
         CompletableFuture<Void> ret;
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
index a8e124c..d13ce61 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/CompactedTopicImpl.java
@@ -32,6 +32,7 @@
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.function.Predicate;
 import javax.annotation.Nullable;
 import org.apache.bookkeeper.client.BKException;
 import org.apache.bookkeeper.client.BookKeeper;
@@ -320,6 +321,55 @@
         });
     }
 
+    CompletableFuture<Entry> findFirstMatchEntry(final Predicate<Entry> predicate) {
+        var compactedTopicContextFuture = this.getCompactedTopicContextFuture();
+
+        if (compactedTopicContextFuture == null) {
+            return CompletableFuture.completedFuture(null);
+        }
+        return compactedTopicContextFuture.thenCompose(compactedTopicContext -> {
+            LedgerHandle lh = compactedTopicContext.getLedger();
+            CompletableFuture<Long> promise = new CompletableFuture<>();
+            findFirstMatchIndexLoop(predicate, 0L, lh.getLastAddConfirmed(), promise, null, lh);
+            return promise.thenCompose(index -> {
+                if (index == null) {
+                    return CompletableFuture.completedFuture(null);
+                }
+                return readEntries(lh, index, index).thenApply(entries -> entries.get(0));
+            });
+        });
+    }
+    private static void findFirstMatchIndexLoop(final Predicate<Entry> predicate,
+                                                final long start, final long end,
+                                                final CompletableFuture<Long> promise,
+                                                final Long lastMatchIndex,
+                                                final LedgerHandle lh) {
+        if (start > end) {
+            promise.complete(lastMatchIndex);
+            return;
+        }
+
+        long mid = (start + end) / 2;
+        readEntries(lh, mid, mid).thenAccept(entries -> {
+            Entry entry = entries.get(0);
+            final boolean isMatch;
+            try {
+                isMatch = predicate.test(entry);
+            } finally {
+                entry.release();
+            }
+
+            if (isMatch) {
+                findFirstMatchIndexLoop(predicate, start, mid - 1, promise, mid, lh);
+            } else {
+                findFirstMatchIndexLoop(predicate, mid + 1, end, promise, lastMatchIndex, lh);
+            }
+        }).exceptionally(ex -> {
+            promise.completeExceptionally(ex);
+            return null;
+        });
+    }
+
     private static int comparePositionAndMessageId(PositionImpl p, MessageIdData m) {
         return ComparisonChain.start()
             .compare(p.getLedgerId(), m.getLedgerId())
diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java
index 1d3f94d..16543bc 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/PulsarTopicCompactionService.java
@@ -22,7 +22,6 @@
 import static org.apache.pulsar.compaction.CompactedTopicImpl.COMPACT_LEDGER_EMPTY;
 import static org.apache.pulsar.compaction.CompactedTopicImpl.NEWER_THAN_COMPACTED;
 import static org.apache.pulsar.compaction.CompactedTopicImpl.findStartPoint;
-import static org.apache.pulsar.compaction.CompactedTopicImpl.readEntries;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
@@ -33,7 +32,6 @@
 import java.util.function.Supplier;
 import javax.annotation.Nonnull;
 import org.apache.bookkeeper.client.BookKeeper;
-import org.apache.bookkeeper.client.LedgerHandle;
 import org.apache.bookkeeper.mledger.Entry;
 import org.apache.bookkeeper.mledger.Position;
 import org.apache.bookkeeper.mledger.impl.PositionImpl;
@@ -116,7 +114,7 @@
         final Predicate<Entry> predicate = entry -> {
             return Commands.parseMessageMetadata(entry.getDataBuffer()).getPublishTime() >= publishTime;
         };
-        return findFirstMatchEntry(predicate);
+        return compactedTopic.findFirstMatchEntry(predicate);
     }
 
     @Override
@@ -128,57 +126,7 @@
             }
             return brokerEntryMetadata.getIndex() >= entryIndex;
         };
-        return findFirstMatchEntry(predicate);
-    }
-
-    private CompletableFuture<Entry> findFirstMatchEntry(final Predicate<Entry> predicate) {
-        var compactedTopicContextFuture = compactedTopic.getCompactedTopicContextFuture();
-
-        if (compactedTopicContextFuture == null) {
-            return CompletableFuture.completedFuture(null);
-        }
-        return compactedTopicContextFuture.thenCompose(compactedTopicContext -> {
-            LedgerHandle lh = compactedTopicContext.getLedger();
-            CompletableFuture<Long> promise = new CompletableFuture<>();
-            findFirstMatchIndexLoop(predicate, 0L, lh.getLastAddConfirmed(), promise, null, lh);
-            return promise.thenCompose(index -> {
-                if (index == null) {
-                    return CompletableFuture.completedFuture(null);
-                }
-                return readEntries(lh, index, index).thenApply(entries -> entries.get(0));
-            });
-        });
-    }
-
-    private static void findFirstMatchIndexLoop(final Predicate<Entry> predicate,
-                                           final long start, final long end,
-                                           final CompletableFuture<Long> promise,
-                                           final Long lastMatchIndex,
-                                           final LedgerHandle lh) {
-        if (start > end) {
-            promise.complete(lastMatchIndex);
-            return;
-        }
-
-        long mid = (start + end) / 2;
-        readEntries(lh, mid, mid).thenAccept(entries -> {
-            Entry entry = entries.get(0);
-            final boolean isMatch;
-            try {
-                isMatch = predicate.test(entry);
-            } finally {
-                entry.release();
-            }
-
-            if (isMatch) {
-                findFirstMatchIndexLoop(predicate, start, mid - 1, promise, mid, lh);
-            } else {
-                findFirstMatchIndexLoop(predicate, mid + 1, end, promise, lastMatchIndex, lh);
-            }
-        }).exceptionally(ex -> {
-            promise.completeExceptionally(ex);
-            return null;
-        });
+        return compactedTopic.findFirstMatchEntry(predicate);
     }
 
     public CompactedTopicImpl getCompactedTopic() {
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
index 7939b19..6a81ffe 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/PersistentTopicsTest.java
@@ -31,6 +31,8 @@
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertSame;
 import static org.testng.Assert.assertTrue;
 import java.lang.reflect.Field;
 import java.util.ArrayList;
@@ -65,6 +67,7 @@
 import org.apache.pulsar.broker.service.Topic;
 import org.apache.pulsar.broker.web.PulsarWebResource;
 import org.apache.pulsar.broker.web.RestException;
+import org.apache.pulsar.client.admin.LongRunningProcessStatus;
 import org.apache.pulsar.client.admin.PulsarAdmin;
 import org.apache.pulsar.client.admin.PulsarAdminException;
 import org.apache.pulsar.client.admin.Topics;
@@ -87,6 +90,7 @@
 import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
 import org.apache.pulsar.common.policies.data.AuthAction;
 import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
 import org.apache.pulsar.common.policies.data.Policies;
 import org.apache.pulsar.common.policies.data.RetentionPolicies;
 import org.apache.pulsar.common.policies.data.TenantInfoImpl;
@@ -1460,6 +1464,69 @@
     }
 
     @Test
+    public void testGetMessageIdByTimestampWithCompaction() throws Exception {
+        TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
+        admin.tenants().createTenant("tenant-xyz", tenantInfo);
+        admin.namespaces().createNamespace("tenant-xyz/ns-abc", Set.of("test"));
+        final String topicName = "persistent://tenant-xyz/ns-abc/testGetMessageIdByTimestampWithCompaction";
+        admin.topics().createNonPartitionedTopic(topicName);
+
+        Map<MessageId, Long> publishTimeMap = new ConcurrentHashMap<>();
+        @Cleanup
+        ProducerBase<byte[]> producer = (ProducerBase<byte[]>) pulsarClient.newProducer().topic(topicName)
+                .enableBatching(false)
+                .intercept(new ProducerInterceptor() {
+                    @Override
+                    public void close() {
+
+                    }
+
+                    @Override
+                    public boolean eligible(Message message) {
+                        return true;
+                    }
+
+                    @Override
+                    public Message beforeSend(Producer producer, Message message) {
+                        return message;
+                    }
+
+                    @Override
+                    public void onSendAcknowledgement(Producer producer, Message message, MessageId msgId,
+                                                      Throwable exception) {
+                        publishTimeMap.put(message.getMessageId(), message.getPublishTime());
+                    }
+                })
+                .create();
+
+        MessageId id1 = producer.newMessage().key("K1").value("test1".getBytes()).send();
+        MessageId id2 = producer.newMessage().key("K2").value("test2".getBytes()).send();
+
+        long publish1 = publishTimeMap.get(id1);
+        long publish2 = publishTimeMap.get(id2);
+        Assert.assertTrue(publish1 < publish2);
+
+        admin.topics().triggerCompaction(topicName);
+        Awaitility.await().untilAsserted(() ->
+            assertSame(admin.topics().compactionStatus(topicName).status,
+                LongRunningProcessStatus.Status.SUCCESS));
+
+        admin.topics().unload(topicName);
+        Awaitility.await().untilAsserted(() -> {
+                PersistentTopicInternalStats internalStats = admin.topics().getInternalStats(topicName, false);
+                assertEquals(internalStats.ledgers.size(), 1);
+                assertEquals(internalStats.ledgers.get(0).entries, 0);
+        });
+
+        Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish1 - 1), id1);
+        Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish1), id1);
+        Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish1 + 1), id2);
+        Assert.assertEquals(admin.topics().getMessageIdByTimestamp(topicName, publish2), id2);
+        Assert.assertTrue(admin.topics().getMessageIdByTimestamp(topicName, publish2 + 1)
+                .compareTo(id2) > 0);
+    }
+
+    @Test
     public void testGetBatchMessageIdByTimestamp() throws Exception {
         TenantInfoImpl tenantInfo = new TenantInfoImpl(Set.of("role1", "role2"), Set.of("test"));
         admin.tenants().createTenant("tenant-xyz", tenantInfo);