blob: b16ef6a0fb0f40cc45235a657751bcaa01d04fc4 [file] [log] [blame] [view]
# PIP-415: Support getting message ID by index
# Background knowledge
- In [PIP-70](pip-70.md), we introduced `Broker Entry Metadata` containing an `index` field - a continuous sequence identifier for messages within a topic partition.
This sequence ID simplifies message sequence management compared to the composite MessageId (ledgerId+entryId+batchIndex) which becomes discontinuous across ledgers.
- In [PIP-90](pip-90.md), we exposed this metadata via the `Message.getIndex()` API.
- In [PR-6331](https://github.com/apache/pulsar/pull/6331), we added a new `get-message-by-id` cmd into pulsar-admin.
After these three works, we can now obtain the index of a message by its message id:
1. Get the message by id using `get-message-by-id` cmd
2. Get the index of the message using `Message.getIndex()`
**We can retrieve the message index using a message ID, but lack the reverse capability to obtain message IDs by index. This breaks the logical circularity and creates operational confusion.**
Then we need to add a new API to get the message id by index
# Motivation
Our organization is currently planning a migration from RocketMQ to Pulsar. To facilitate this transition, we aim to implement a standardized abstraction layer for MQ clients that encapsulates implementation details of specific messaging systems. This abstraction layer will allow seamless engine replacement while maintaining consistent client interfaces.
However, one critical compatibility issues hinder the unification of message fetching pat terns between RocketMQ/Kafka and Pulsar:
**Positioning Mechanism Mismatch**:
- RocketMQ/Kafka: Utilize monotonically increasing numerical index for message positioning and acknowledgment
- Pulsar: Relies on composite MessageID (ledgerId + entryId + batchIndex) for message identification
We propose to add a new API to retrieve the message ID by index, enabling us to cache the mapping between message ID and index.
This will allow us to use index for seek and acknowledgment operations when consuming messages through the standardized API.
# Goals
## In Scope
- Add a new API to retrieve the message ID by index
# High Level Design
# Detailed Design
The core implementation involves adding a new HTTP endpoint to query MessageId by index. Underlying implementation will leverage existing `Broker Entry Metadata` containing index values.
## Design & Implementation Details
## Public-facing Changes
### Public API
Add a new admin API endpoint `getMessageIdByIndex` to `org.apache.pulsar.broker.admin.v2.PersistentTopics`.
#### Request Path
- **Path Template**:
```http
GET /{tenant}/{namespace}/{topic}/getMessageIdByIndex
```
- `{tenant}`: Tenant name (string, path parameter).
- `{namespace}`: Namespace name (string, path parameter).
- `{topic}`: Topic name (URL-encoded string, path parameter).
#### HTTP Method
- **Method**: `GET`
- **Purpose**: Retrieve the message ID associated with a specific message index in a topic.
#### Parameters
| Type | Name | Data Type | Required | Description |
|-----------------|-----------------|-----------|----------|-----------------------------------------------------------------------------------------------------------------------------------------------------------|
| **Path Param** | `tenant` | `String` | Yes | Tenant identifier for multi-tenancy isolation. |
| | `namespace` | `String` | Yes | Namespace identifier to group topics under a tenant. |
| | `topic` | `String` | Yes | Topic Name - either the specific partition name of a partitioned topic (e.g. my-topic-partition-0) or the original topic name for non-partitioned topics. |
| **Query Param** | `index` | `long` | Yes | Zero-based index of the message in the topic. |
| | `authoritative` | `boolean` | No | If authoritative is true, it means the lookup had already been redirected here by a different broker. Default: `false`. |
#### Response
##### Success Response
- **Status Code**: `200 OK`
- **Body Format**:
```json
{
"ledgerId": 12345,
"entryId": 67890,
"partitionIndex": 0
}
```
- `ledgerId`: ID of the ledger where the message is stored.
- `entryId`: Unique identifier of the message within the ledger.
- `partitionIndex`: Partition index (relevant for partitioned topics; `-1` for non-partitioned topics).
**notes: If the index points to a system message, return the first user message following it; if the specified message has expired and been deleted, return MessageId.Earliest.**
When retrieving a message ID by index, the resolution is limited to the **entry** level (an entry is the minimal storage unit for messages in Pulsar's persistence layer). If message batching is enabled, a single entry may contain multiple messages with distinct indexes.
**Example Scenario** (partition with 2 entries):
| Entry | Ledger ID | Entry ID | Index | Messages |
| :--- | ---: | ---: | ---: | ---: |
| A | 0 | 0 | 2 | 0,1,2 |
| B | 0 | 1 | 4 | 3,4 |
- Param indexes 0,1,2 or 3,4 will return the **same MessageID** (e.g., `MessageId(0:0:*)` for Entry A).
- To achieve **precise message-level consumption**, you can first locate the entry using the MessageID, then filter messages by their indexes during client-side processing.
- Example: To consume from index 1, retrieve all messages in Entry A and filter out messages with indexes below 1.
**Why Precise Index Matching Isn't Implemented on the Broker Side:**
1. **Metadata Parsing Constraints**: Precise matching requires parsing `MessageMetadata`, but brokers are designed to parse only `BrokerEntryMetadata`.
2. **Limited Benefits**: Even if the exact `batchIndex` and `batchSize` are determined, the broker must still send the **entire entry** to the client, offering no network bandwidth savings.
3. **Client-Side Efficiency**: Once the client receives the entry, users can directly filter messages by index (after parsing `MessageMetadata`, which includes unique indexes for each message).
##### Error Responses
| Status Code | Description |
|-----------------------------|--------------------------------------------------------------------------------------------------------------------------------------------------------------|
| **307 Temporary Redirect** | The current broker does not own this topic. Redirect to the correct broker. |
| **403 Forbidden** | Client lacks permissions to access the topic/namespace. |
| **404 Not Found** | Topic/namespace does not exist, or the specified `index` is invalid. An index value of -1 or one that exceeds the maximum index range is considered invalid. |
| **406 Not Acceptable** | The topic is not a persistent topic. |
| **412 Precondition Failed** | The broker is not configured to enable broker entry metadata. |
#### Pulsar Admin
Add two api in `pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java`
```java
/**
* Get the message id by index. If the index points to a system message, return the first user message following it; if the specified message has expired and been deleted, return MessageId.Earliest.
* @param topicName either the specific partition name of a partitioned topic (e.g. my-topic-partition-0) or the original topic name for non-partitioned topics.
* @param index the index of a message
* @return the message id of the message.
* When retrieving a message ID by index, the resolution is limited to the **entry** level (an entry is the minimal storage unit for messages in Pulsar's persistence layer).
* If message batching is enabled, a single entry may contain multiple messages with distinct indexes.
* Example Scenario (partition with 2 entries):
* | Entry | Ledger ID | Entry ID | Index | Messages |
* | :--- | ---: | ---: | ---: | ---: |
* | A | 0 | 0 | 2 | 0,1,2 |
* | B | 0 | 1 | 4 | 3,4 |
* Param with indexes 0,1,2 or 3,4 will return the **same MessageID** (e.g., `MessageId(0:0:*)` for Entry A).
* @throws AuthorizationException (HTTP 403 Forbidden) Client lacks permissions to access the topic/namespace.
* @throws NotFoundException (HTTP 404 Not Found) Source topic/namespace does not exist, or invalid index.
* @throws PulsarAdminException (HTTP 406 Not Acceptable) Specified topic is not a persistent topic.
* @throws PreconditionFailedException (HTTP 412 Precondition Failed) Broker entry metadata is disabled.
* @throws PulsarAdminException For other errors (e.g., HTTP 500 Internal Server Error).
*/
MessageId getMessageIdByIndex(String topicName, long index) throws PulsarAdminException;
/**
* Get the message id by index asynchronously. If the index points to a system message, return the first user message following it; if the specified message has expired and been deleted, return MessageId.Earliest.
* @param topicName either the specific partition name of a partitioned topic (e.g. my-topic-partition-0) or the original topic name for non-partitioned topics.
* @param index the index of a message
* When retrieving a message ID by index, the resolution is limited to the **entry** level (an entry is the minimal storage unit for messages in Pulsar's persistence layer).
* If message batching is enabled, a single entry may contain multiple messages with distinct indexes.
* Example Scenario (partition with 2 entries):
* | Entry | Ledger ID | Entry ID | Index | Messages |
* | :--- | ---: | ---: | ---: | ---: |
* | A | 0 | 0 | 2 | 0,1,2 |
* | B | 0 | 1 | 4 | 3,4 |
* Param with indexes 0,1,2 or 3,4 will return the **same MessageID** (e.g., `MessageId(0:0:*)` for Entry A).
* @implNote The return {@link CompletableFuture<MessageId>} that completes with the message id of the message.
* The future may complete exceptionally with:
* <ul>
* <li>{@link AuthorizationException} (HTTP 403) Permission denied for topic/namespace access.</li>
* <li>{@link NotFoundException} (HTTP 404) Shadow topic/namespace does not exist or invalid index.</li>
* <li>{@link PulsarAdminException} (HTTP 406) Shadow topic is not a persistent topic.</li>
* <li>{@link PreconditionFailedException} (HTTP 412) Broker entry metadata is not enabled.</li>
* <li>{@link PulsarAdminException} (HTTP 307) Redirect required to the correct broker.</li>
* <li>{@link PulsarAdminException} Other errors (e.g., HTTP 500).</li>
* </ul>
*/
CompletableFuture<MessageId> getMessageIdByIndexAsync(String topicName, long index);
```
### Binary protocol
### Configuration
### CLI
Add new subcommand to pulsar-admin:
```bash
pulsar-admin topics get-message-id-by-index \
--index 12345 \
persistent://tenant/ns/topic
```
### Metrics
# Monitoring
# Security Considerations
# Backward & Forward Compatibility
## Upgrade
## Downgrade / Rollback
## Pulsar Geo-Replication Upgrade & Downgrade/Rollback Considerations
# Alternatives
# General Notes
# Links
* Mailing List discussion thread: https://lists.apache.org/thread/7kmo3robyx74p81891h3q8f1cq5lomfv
* Mailing List voting thread: https://lists.apache.org/thread/mn694h98nmdtddx4co9w0zo94jolzj68