Background knowledge

In https://github.com/apache/pulsar/pull/11139 we support get position based on timestamp, but it doesn't work well with topic compaction enabled because the data may have been move to the compacted ledger.

In PIP-278 we introduced the pluggable topic compaction service to extend the compaction.

Motivation

In order for get-message-id to work well with topic compaction enabled we need to find the position according to publish time from topic compaction service, but TopicCompactionService missing a method that find positions according to publish time or other metadata, so we should add it.

In addition, this method can also be used to find the position/offset according to offset/timestamp in the KoP.

Goals

High Level Design

We need to add a method to Topic Compaction Service that can find the matching position and other metadata information according to publishTime/index, since the TopicCompactionService interface already has @InterfaceStability.Evolving annotation, so that we are able to add new methods directly.

Detailed Design

Add findEntryByPublishTime in the TopicCompactionService API.

Add findEntryByEntryIndex in the TopicCompactionService API.

Implement them in the PulsarTopicCompactionService using binary search.

When get messageId by timestamp, find position from topicCompactionService if we can't find position in the manageLedger.

Public-facing Changes

 @InterfaceAudience.Public
 @InterfaceStability.Evolving
 public interface TopicCompactionService extends AutoCloseable { 

  /**
   * Find the first entry that greater or equal to target publishTime.
   *
   * @param publishTime  the publish time of entry.
   * @return the first entry that greater or equal to target publishTime, this entry can be null.
   */
   CompletableFuture<Entry> findEntryByPublishTime(long publishTime);
   
   /**
   * Find the first entry that greater or equal to target entryIndex.
   *
   * @param entryIndex  the index of entry.
   * @return the first entry that greater or equal to target entryIndex, this entry can be null.
   */
   CompletableFuture<Entry> findEntryByEntryIndex(long entryIndex);
 }

Links