In https://github.com/apache/pulsar/pull/11139 we support get position based on timestamp, but it doesn't work well with topic compaction enabled because the data may have been move to the compacted ledger.
In PIP-278 we introduced the pluggable topic compaction service to extend the compaction.
In order for get-message-id
to work well with topic compaction enabled we need to find the position according to publish time from topic compaction service, but TopicCompactionService
missing a method that find positions according to publish time or other metadata, so we should add it.
In addition, this method can also be used to find the position/offset according to offset/timestamp in the KoP.
We need to add a method to Topic Compaction Service
that can find the matching position and other metadata information according to publishTime/index, since the TopicCompactionService
interface already has @InterfaceStability.Evolving
annotation, so that we are able to add new methods directly.
Add findEntryByPublishTime
in the TopicCompactionService
API.
Add findEntryByEntryIndex
in the TopicCompactionService
API.
Implement them in the PulsarTopicCompactionService
using binary search.
When get messageId by timestamp, find position from topicCompactionService if we can't find position in the manageLedger.
@InterfaceAudience.Public @InterfaceStability.Evolving public interface TopicCompactionService extends AutoCloseable { /** * Find the first entry that greater or equal to target publishTime. * * @param publishTime the publish time of entry. * @return the first entry that greater or equal to target publishTime, this entry can be null. */ CompletableFuture<Entry> findEntryByPublishTime(long publishTime); /** * Find the first entry that greater or equal to target entryIndex. * * @param entryIndex the index of entry. * @return the first entry that greater or equal to target entryIndex, this entry can be null. */ CompletableFuture<Entry> findEntryByEntryIndex(long entryIndex); }