PIP-415: Support getting message ID by index

Background knowledge

  • In PIP-70, we introduced Broker Entry Metadata containing an index field - a continuous sequence identifier for messages within a topic partition. This sequence ID simplifies message sequence management compared to the composite MessageId (ledgerId+entryId+batchIndex) which becomes discontinuous across ledgers.
  • In PIP-90, we exposed this metadata via the Message.getIndex() API.
  • In PR-6331, we added a new get-message-by-id cmd into pulsar-admin.

After these three works, we can now obtain the index of a message by its message id:

  1. Get the message by id using get-message-by-id cmd
  2. Get the index of the message using Message.getIndex()

We can retrieve the message index using a message ID, but lack the reverse capability to obtain message IDs by index. This breaks the logical circularity and creates operational confusion. Then we need to add a new API to get the message id by index

Motivation

Our organization is currently planning a migration from RocketMQ to Pulsar. To facilitate this transition, we aim to implement a standardized abstraction layer for MQ clients that encapsulates implementation details of specific messaging systems. This abstraction layer will allow seamless engine replacement while maintaining consistent client interfaces. However, one critical compatibility issues hinder the unification of message fetching pat terns between RocketMQ/Kafka and Pulsar:

Positioning Mechanism Mismatch:

  • RocketMQ/Kafka: Utilize monotonically increasing numerical index for message positioning and acknowledgment
  • Pulsar: Relies on composite MessageID (ledgerId + entryId + batchIndex) for message identification

We propose to add a new API to retrieve the message ID by index, enabling us to cache the mapping between message ID and index. This will allow us to use index for seek and acknowledgment operations when consuming messages through the standardized API.

Goals

In Scope

  • Add a new API to retrieve the message ID by index

High Level Design

Detailed Design

The core implementation involves adding a new HTTP endpoint to query MessageId by index. Underlying implementation will leverage existing Broker Entry Metadata containing index values.

Design & Implementation Details

Public-facing Changes

Public API

Add a new admin API endpoint getMessageIdByIndex to org.apache.pulsar.broker.admin.v2.PersistentTopics.

Request Path

  • Path Template:
    GET /{tenant}/{namespace}/{topic}/getMessageIdByIndex
    
  • {tenant}: Tenant name (string, path parameter).
  • {namespace}: Namespace name (string, path parameter).
  • {topic}: Topic name (URL-encoded string, path parameter).

HTTP Method

  • Method: GET
  • Purpose: Retrieve the message ID associated with a specific message index in a topic.

Parameters

TypeNameData TypeRequiredDescription
Path ParamtenantStringYesTenant identifier for multi-tenancy isolation.
namespaceStringYesNamespace identifier to group topics under a tenant.
topicStringYesTopic Name - either the specific partition name of a partitioned topic (e.g. my-topic-partition-0) or the original topic name for non-partitioned topics.
Query ParamindexlongYesZero-based index of the message in the topic.
authoritativebooleanNoIf authoritative is true, it means the lookup had already been redirected here by a different broker. Default: false.

Response

Success Response
  • Status Code: 200 OK
  • Body Format:
    {
      "ledgerId": 12345,
      "entryId": 67890,
      "partitionIndex": 0
    }
    
    • ledgerId: ID of the ledger where the message is stored.
    • entryId: Unique identifier of the message within the ledger.
    • partitionIndex: Partition index (relevant for partitioned topics; -1 for non-partitioned topics).

notes: If the index points to a system message, return the first user message following it; if the specified message has expired and been deleted, return MessageId.Earliest.

When retrieving a message ID by index, the resolution is limited to the entry level (an entry is the minimal storage unit for messages in Pulsar's persistence layer). If message batching is enabled, a single entry may contain multiple messages with distinct indexes.

Example Scenario (partition with 2 entries):

EntryLedger IDEntry IDIndexMessages
A0020,1,2
B0143,4
  • Param indexes 0,1,2 or 3,4 will return the same MessageID (e.g., MessageId(0:0:*) for Entry A).
  • To achieve precise message-level consumption, you can first locate the entry using the MessageID, then filter messages by their indexes during client-side processing.
    • Example: To consume from index 1, retrieve all messages in Entry A and filter out messages with indexes below 1.

Why Precise Index Matching Isn't Implemented on the Broker Side:

  1. Metadata Parsing Constraints: Precise matching requires parsing MessageMetadata, but brokers are designed to parse only BrokerEntryMetadata.
  2. Limited Benefits: Even if the exact batchIndex and batchSize are determined, the broker must still send the entire entry to the client, offering no network bandwidth savings.
  3. Client-Side Efficiency: Once the client receives the entry, users can directly filter messages by index (after parsing MessageMetadata, which includes unique indexes for each message).
Error Responses
Status CodeDescription
307 Temporary RedirectThe current broker does not own this topic. Redirect to the correct broker.
403 ForbiddenClient lacks permissions to access the topic/namespace.
404 Not FoundTopic/namespace does not exist, or the specified index is invalid. An index value of -1 or one that exceeds the maximum index range is considered invalid.
406 Not AcceptableThe topic is not a persistent topic.
412 Precondition FailedThe broker is not configured to enable broker entry metadata.

Pulsar Admin

Add two api in pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java

    /**
     * Get the message id by index. If the index points to a system message, return the first user message following it; if the specified message has expired and been deleted, return MessageId.Earliest.
     * @param topicName either the specific partition name of a partitioned topic (e.g. my-topic-partition-0) or the original topic name for non-partitioned topics.
     * @param index the index of a message
     * @return the message id of the message.
     * When retrieving a message ID by index, the resolution is limited to the **entry** level (an entry is the minimal storage unit for messages in Pulsar's persistence layer).
     * If message batching is enabled, a single entry may contain multiple messages with distinct indexes.
     * Example Scenario (partition with 2 entries):
     * | Entry | Ledger ID | Entry ID | Index | Messages |
     * | :--- | ---: | ---: | ---: | ---: |
     * | A | 0 | 0 | 2 | 0,1,2 |
     * | B | 0 | 1 | 4 | 3,4 |
     * Param with indexes 0,1,2 or 3,4 will return the **same MessageID** (e.g., `MessageId(0:0:*)` for Entry A).
     * @throws AuthorizationException      (HTTP 403 Forbidden) Client lacks permissions to access the topic/namespace.
     * @throws NotFoundException           (HTTP 404 Not Found) Source topic/namespace does not exist, or invalid index.
     * @throws PulsarAdminException        (HTTP 406 Not Acceptable) Specified topic is not a persistent topic.
     * @throws PreconditionFailedException (HTTP 412 Precondition Failed) Broker entry metadata is disabled.
     * @throws PulsarAdminException        For other errors (e.g., HTTP 500 Internal Server Error).
     */
    MessageId getMessageIdByIndex(String topicName, long index) throws PulsarAdminException;

    /**
     * Get the message id by index asynchronously. If the index points to a system message, return the first user message following it; if the specified message has expired and been deleted, return MessageId.Earliest.
     * @param topicName either the specific partition name of a partitioned topic (e.g. my-topic-partition-0) or the original topic name for non-partitioned topics.
     * @param index the index of a message
     * When retrieving a message ID by index, the resolution is limited to the **entry** level (an entry is the minimal storage unit for messages in Pulsar's persistence layer).
     * If message batching is enabled, a single entry may contain multiple messages with distinct indexes.
     * Example Scenario (partition with 2 entries):
     * | Entry | Ledger ID | Entry ID | Index | Messages |
     * | :--- | ---: | ---: | ---: | ---: |
     * | A | 0 | 0 | 2 | 0,1,2 |
     * | B | 0 | 1 | 4 | 3,4 |
     * Param with indexes 0,1,2 or 3,4 will return the **same MessageID** (e.g., `MessageId(0:0:*)` for Entry A).
     * @implNote The return {@link CompletableFuture<MessageId>} that completes with the message id of the message.
     *         The future may complete exceptionally with:
     *         <ul>
     *             <li>{@link AuthorizationException} (HTTP 403) Permission denied for topic/namespace access.</li>
     *             <li>{@link NotFoundException} (HTTP 404) Shadow topic/namespace does not exist or invalid index.</li>
     *             <li>{@link PulsarAdminException} (HTTP 406) Shadow topic is not a persistent topic.</li>
     *             <li>{@link PreconditionFailedException} (HTTP 412) Broker entry metadata is not enabled.</li>
     *             <li>{@link PulsarAdminException} (HTTP 307) Redirect required to the correct broker.</li>
     *             <li>{@link PulsarAdminException} Other errors (e.g., HTTP 500).</li>
     *         </ul>
     */
    CompletableFuture<MessageId> getMessageIdByIndexAsync(String topicName, long index);

Binary protocol

Configuration

CLI

Add new subcommand to pulsar-admin:

    pulsar-admin topics get-message-id-by-index \
    --index 12345 \
    persistent://tenant/ns/topic

Metrics

Monitoring

Security Considerations

Backward & Forward Compatibility

Upgrade

Downgrade / Rollback

Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations

Alternatives

General Notes

Links