| # PIP-415: Support getting message ID by index |
| |
| # Background knowledge |
| |
| |
| - In [PIP-70](pip-70.md), we introduced `Broker Entry Metadata` containing an `index` field - a continuous sequence identifier for messages within a topic partition. |
| This sequence ID simplifies message sequence management compared to the composite MessageId (ledgerId+entryId+batchIndex) which becomes discontinuous across ledgers. |
| - In [PIP-90](pip-90.md), we exposed this metadata via the `Message.getIndex()` API. |
| - In [PR-6331](https://github.com/apache/pulsar/pull/6331), we added a new `get-message-by-id` cmd into pulsar-admin. |
| |
| After these three works, we can now obtain the index of a message by its message id: |
| |
| 1. Get the message by id using `get-message-by-id` cmd |
| 2. Get the index of the message using `Message.getIndex()` |
| |
| **We can retrieve the message index using a message ID, but lack the reverse capability to obtain message IDs by index. This breaks the logical circularity and creates operational confusion.** |
| Then we need to add a new API to get the message id by index |
| # Motivation |
| |
| Our organization is currently planning a migration from RocketMQ to Pulsar. To facilitate this transition, we aim to implement a standardized abstraction layer for MQ clients that encapsulates implementation details of specific messaging systems. This abstraction layer will allow seamless engine replacement while maintaining consistent client interfaces. |
| However, one critical compatibility issues hinder the unification of message fetching pat terns between RocketMQ/Kafka and Pulsar: |
| |
| **Positioning Mechanism Mismatch**: |
| - RocketMQ/Kafka: Utilize monotonically increasing numerical index for message positioning and acknowledgment |
| - Pulsar: Relies on composite MessageID (ledgerId + entryId + batchIndex) for message identification |
| |
| We propose to add a new API to retrieve the message ID by index, enabling us to cache the mapping between message ID and index. |
| This will allow us to use index for seek and acknowledgment operations when consuming messages through the standardized API. |
| |
| # Goals |
| |
| ## In Scope |
| |
| - Add a new API to retrieve the message ID by index |
| |
| # High Level Design |
| |
| # Detailed Design |
| The core implementation involves adding a new HTTP endpoint to query MessageId by index. Underlying implementation will leverage existing `Broker Entry Metadata` containing index values. |
| |
| ## Design & Implementation Details |
| |
| ## Public-facing Changes |
| |
| ### Public API |
| |
| Add a new admin API endpoint `getMessageIdByIndex` to `org.apache.pulsar.broker.admin.v2.PersistentTopics`. |
| #### Request Path |
| - **Path Template**: |
| ```http |
| GET /{tenant}/{namespace}/{topic}/getMessageIdByIndex |
| ``` |
| - `{tenant}`: Tenant name (string, path parameter). |
| - `{namespace}`: Namespace name (string, path parameter). |
| - `{topic}`: Topic name (URL-encoded string, path parameter). |
| |
| #### HTTP Method |
| - **Method**: `GET` |
| - **Purpose**: Retrieve the message ID associated with a specific message index in a topic. |
| |
| #### Parameters |
| |
| | Type | Name | Data Type | Required | Description | |
| |-----------------|-----------------|-----------|----------|-----------------------------------------------------------------------------------------------------------------------------------------------------------| |
| | **Path Param** | `tenant` | `String` | Yes | Tenant identifier for multi-tenancy isolation. | |
| | | `namespace` | `String` | Yes | Namespace identifier to group topics under a tenant. | |
| | | `topic` | `String` | Yes | Topic Name - either the specific partition name of a partitioned topic (e.g. my-topic-partition-0) or the original topic name for non-partitioned topics. | |
| | **Query Param** | `index` | `long` | Yes | Zero-based index of the message in the topic. | |
| | | `authoritative` | `boolean` | No | If authoritative is true, it means the lookup had already been redirected here by a different broker. Default: `false`. | |
| |
| #### Response |
| |
| ##### Success Response |
| - **Status Code**: `200 OK` |
| - **Body Format**: |
| ```json |
| { |
| "ledgerId": 12345, |
| "entryId": 67890, |
| "partitionIndex": 0 |
| } |
| ``` |
| - `ledgerId`: ID of the ledger where the message is stored. |
| - `entryId`: Unique identifier of the message within the ledger. |
| - `partitionIndex`: Partition index (relevant for partitioned topics; `-1` for non-partitioned topics). |
| |
| **notes: If the index points to a system message, return the first user message following it; if the specified message has expired and been deleted, return MessageId.Earliest.** |
| |
| When retrieving a message ID by index, the resolution is limited to the **entry** level (an entry is the minimal storage unit for messages in Pulsar's persistence layer). If message batching is enabled, a single entry may contain multiple messages with distinct indexes. |
| |
| **Example Scenario** (partition with 2 entries): |
| |
| | Entry | Ledger ID | Entry ID | Index | Messages | |
| | :--- | ---: | ---: | ---: | ---: | |
| | A | 0 | 0 | 2 | 0,1,2 | |
| | B | 0 | 1 | 4 | 3,4 | |
| |
| - Param indexes 0,1,2 or 3,4 will return the **same MessageID** (e.g., `MessageId(0:0:*)` for Entry A). |
| - To achieve **precise message-level consumption**, you can first locate the entry using the MessageID, then filter messages by their indexes during client-side processing. |
| - Example: To consume from index 1, retrieve all messages in Entry A and filter out messages with indexes below 1. |
| |
| **Why Precise Index Matching Isn't Implemented on the Broker Side:** |
| 1. **Metadata Parsing Constraints**: Precise matching requires parsing `MessageMetadata`, but brokers are designed to parse only `BrokerEntryMetadata`. |
| 2. **Limited Benefits**: Even if the exact `batchIndex` and `batchSize` are determined, the broker must still send the **entire entry** to the client, offering no network bandwidth savings. |
| 3. **Client-Side Efficiency**: Once the client receives the entry, users can directly filter messages by index (after parsing `MessageMetadata`, which includes unique indexes for each message). |
| |
| ##### Error Responses |
| |
| | Status Code | Description | |
| |-----------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------| |
| | **307 Temporary Redirect** | The current broker does not own this topic. Redirect to the correct broker. | |
| | **403 Forbidden** | Client lacks permissions to access the topic/namespace. | |
| | **404 Not Found** | Topic/namespace does not exist, or the specified `index` is invalid. An index value of -1 or one that exceeds the maximum index range is considered invalid. | |
| | **406 Not Acceptable** | The topic is not a persistent topic. | |
| | **412 Precondition Failed** | The broker is not configured to enable broker entry metadata. | |
| |
| #### Pulsar Admin |
| Add two api in `pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java` |
| ```java |
| /** |
| * Get the message id by index. If the index points to a system message, return the first user message following it; if the specified message has expired and been deleted, return MessageId.Earliest. |
| * @param topicName either the specific partition name of a partitioned topic (e.g. my-topic-partition-0) or the original topic name for non-partitioned topics. |
| * @param index the index of a message |
| * @return the message id of the message. |
| * When retrieving a message ID by index, the resolution is limited to the **entry** level (an entry is the minimal storage unit for messages in Pulsar's persistence layer). |
| * If message batching is enabled, a single entry may contain multiple messages with distinct indexes. |
| * Example Scenario (partition with 2 entries): |
| * | Entry | Ledger ID | Entry ID | Index | Messages | |
| * | :--- | ---: | ---: | ---: | ---: | |
| * | A | 0 | 0 | 2 | 0,1,2 | |
| * | B | 0 | 1 | 4 | 3,4 | |
| * Param with indexes 0,1,2 or 3,4 will return the **same MessageID** (e.g., `MessageId(0:0:*)` for Entry A). |
| * @throws AuthorizationException (HTTP 403 Forbidden) Client lacks permissions to access the topic/namespace. |
| * @throws NotFoundException (HTTP 404 Not Found) Source topic/namespace does not exist, or invalid index. |
| * @throws PulsarAdminException (HTTP 406 Not Acceptable) Specified topic is not a persistent topic. |
| * @throws PreconditionFailedException (HTTP 412 Precondition Failed) Broker entry metadata is disabled. |
| * @throws PulsarAdminException For other errors (e.g., HTTP 500 Internal Server Error). |
| */ |
| MessageId getMessageIdByIndex(String topicName, long index) throws PulsarAdminException; |
| |
| /** |
| * Get the message id by index asynchronously. If the index points to a system message, return the first user message following it; if the specified message has expired and been deleted, return MessageId.Earliest. |
| * @param topicName either the specific partition name of a partitioned topic (e.g. my-topic-partition-0) or the original topic name for non-partitioned topics. |
| * @param index the index of a message |
| * When retrieving a message ID by index, the resolution is limited to the **entry** level (an entry is the minimal storage unit for messages in Pulsar's persistence layer). |
| * If message batching is enabled, a single entry may contain multiple messages with distinct indexes. |
| * Example Scenario (partition with 2 entries): |
| * | Entry | Ledger ID | Entry ID | Index | Messages | |
| * | :--- | ---: | ---: | ---: | ---: | |
| * | A | 0 | 0 | 2 | 0,1,2 | |
| * | B | 0 | 1 | 4 | 3,4 | |
| * Param with indexes 0,1,2 or 3,4 will return the **same MessageID** (e.g., `MessageId(0:0:*)` for Entry A). |
| * @implNote The return {@link CompletableFuture<MessageId>} that completes with the message id of the message. |
| * The future may complete exceptionally with: |
| * <ul> |
| * <li>{@link AuthorizationException} (HTTP 403) Permission denied for topic/namespace access.</li> |
| * <li>{@link NotFoundException} (HTTP 404) Shadow topic/namespace does not exist or invalid index.</li> |
| * <li>{@link PulsarAdminException} (HTTP 406) Shadow topic is not a persistent topic.</li> |
| * <li>{@link PreconditionFailedException} (HTTP 412) Broker entry metadata is not enabled.</li> |
| * <li>{@link PulsarAdminException} (HTTP 307) Redirect required to the correct broker.</li> |
| * <li>{@link PulsarAdminException} Other errors (e.g., HTTP 500).</li> |
| * </ul> |
| */ |
| CompletableFuture<MessageId> getMessageIdByIndexAsync(String topicName, long index); |
| ``` |
| |
| ### Binary protocol |
| |
| ### Configuration |
| |
| ### CLI |
| Add new subcommand to pulsar-admin: |
| |
| ```bash |
| pulsar-admin topics get-message-id-by-index \ |
| --index 12345 \ |
| persistent://tenant/ns/topic |
| ``` |
| ### Metrics |
| |
| # Monitoring |
| |
| # Security Considerations |
| # Backward & Forward Compatibility |
| |
| ## Upgrade |
| |
| ## Downgrade / Rollback |
| |
| ## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations |
| |
| |
| # Alternatives |
| |
| # General Notes |
| |
| # Links |
| |
| * Mailing List discussion thread: https://lists.apache.org/thread/7kmo3robyx74p81891h3q8f1cq5lomfv |
| * Mailing List voting thread: https://lists.apache.org/thread/mn694h98nmdtddx4co9w0zo94jolzj68 |