Broker Entry Metadata
containing an index
field - a continuous sequence identifier for messages within a topic partition. This sequence ID simplifies message sequence management compared to the composite MessageId (ledgerId+entryId+batchIndex) which becomes discontinuous across ledgers.Message.getIndex()
API.get-message-by-id
cmd into pulsar-admin.After these three works, we can now obtain the index of a message by its message id:
get-message-by-id
cmdMessage.getIndex()
We can retrieve the message index using a message ID, but lack the reverse capability to obtain message IDs by index. This breaks the logical circularity and creates operational confusion. Then we need to add a new API to get the message id by index
Our organization is currently planning a migration from RocketMQ to Pulsar. To facilitate this transition, we aim to implement a standardized abstraction layer for MQ clients that encapsulates implementation details of specific messaging systems. This abstraction layer will allow seamless engine replacement while maintaining consistent client interfaces. However, one critical compatibility issues hinder the unification of message fetching pat terns between RocketMQ/Kafka and Pulsar:
Positioning Mechanism Mismatch:
We propose to add a new API to retrieve the message ID by index, enabling us to cache the mapping between message ID and index. This will allow us to use index for seek and acknowledgment operations when consuming messages through the standardized API.
The core implementation involves adding a new HTTP endpoint to query MessageId by index. Underlying implementation will leverage existing Broker Entry Metadata
containing index values.
Add a new admin API endpoint getMessageIdByIndex
to org.apache.pulsar.broker.admin.v2.PersistentTopics
.
GET /{tenant}/{namespace}/{topic}/getMessageIdByIndex
{tenant}
: Tenant name (string, path parameter).{namespace}
: Namespace name (string, path parameter).{topic}
: Topic name (URL-encoded string, path parameter).GET
Type | Name | Data Type | Required | Description |
---|---|---|---|---|
Path Param | tenant | String | Yes | Tenant identifier for multi-tenancy isolation. |
namespace | String | Yes | Namespace identifier to group topics under a tenant. | |
topic | String | Yes | Topic Name - either the specific partition name of a partitioned topic (e.g. my-topic-partition-0) or the original topic name for non-partitioned topics. | |
Query Param | index | long | Yes | Zero-based index of the message in the topic. |
authoritative | boolean | No | If authoritative is true, it means the lookup had already been redirected here by a different broker. Default: false . |
200 OK
{ "ledgerId": 12345, "entryId": 67890, "partitionIndex": 0 }
ledgerId
: ID of the ledger where the message is stored.entryId
: Unique identifier of the message within the ledger.partitionIndex
: Partition index (relevant for partitioned topics; -1
for non-partitioned topics).notes: If the index points to a system message, return the first user message following it; if the specified message has expired and been deleted, return MessageId.Earliest.
When retrieving a message ID by index, the resolution is limited to the entry level (an entry is the minimal storage unit for messages in Pulsar's persistence layer). If message batching is enabled, a single entry may contain multiple messages with distinct indexes.
Example Scenario (partition with 2 entries):
Entry | Ledger ID | Entry ID | Index | Messages |
---|---|---|---|---|
A | 0 | 0 | 2 | 0,1,2 |
B | 0 | 1 | 4 | 3,4 |
MessageId(0:0:*)
for Entry A).Why Precise Index Matching Isn't Implemented on the Broker Side:
MessageMetadata
, but brokers are designed to parse only BrokerEntryMetadata
.batchIndex
and batchSize
are determined, the broker must still send the entire entry to the client, offering no network bandwidth savings.MessageMetadata
, which includes unique indexes for each message).Status Code | Description |
---|---|
307 Temporary Redirect | The current broker does not own this topic. Redirect to the correct broker. |
403 Forbidden | Client lacks permissions to access the topic/namespace. |
404 Not Found | Topic/namespace does not exist, or the specified index is invalid. An index value of -1 or one that exceeds the maximum index range is considered invalid. |
406 Not Acceptable | The topic is not a persistent topic. |
412 Precondition Failed | The broker is not configured to enable broker entry metadata. |
Add two api in pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/Topics.java
/** * Get the message id by index. If the index points to a system message, return the first user message following it; if the specified message has expired and been deleted, return MessageId.Earliest. * @param topicName either the specific partition name of a partitioned topic (e.g. my-topic-partition-0) or the original topic name for non-partitioned topics. * @param index the index of a message * @return the message id of the message. * When retrieving a message ID by index, the resolution is limited to the **entry** level (an entry is the minimal storage unit for messages in Pulsar's persistence layer). * If message batching is enabled, a single entry may contain multiple messages with distinct indexes. * Example Scenario (partition with 2 entries): * | Entry | Ledger ID | Entry ID | Index | Messages | * | :--- | ---: | ---: | ---: | ---: | * | A | 0 | 0 | 2 | 0,1,2 | * | B | 0 | 1 | 4 | 3,4 | * Param with indexes 0,1,2 or 3,4 will return the **same MessageID** (e.g., `MessageId(0:0:*)` for Entry A). * @throws AuthorizationException (HTTP 403 Forbidden) Client lacks permissions to access the topic/namespace. * @throws NotFoundException (HTTP 404 Not Found) Source topic/namespace does not exist, or invalid index. * @throws PulsarAdminException (HTTP 406 Not Acceptable) Specified topic is not a persistent topic. * @throws PreconditionFailedException (HTTP 412 Precondition Failed) Broker entry metadata is disabled. * @throws PulsarAdminException For other errors (e.g., HTTP 500 Internal Server Error). */ MessageId getMessageIdByIndex(String topicName, long index) throws PulsarAdminException; /** * Get the message id by index asynchronously. If the index points to a system message, return the first user message following it; if the specified message has expired and been deleted, return MessageId.Earliest. * @param topicName either the specific partition name of a partitioned topic (e.g. my-topic-partition-0) or the original topic name for non-partitioned topics. * @param index the index of a message * When retrieving a message ID by index, the resolution is limited to the **entry** level (an entry is the minimal storage unit for messages in Pulsar's persistence layer). * If message batching is enabled, a single entry may contain multiple messages with distinct indexes. * Example Scenario (partition with 2 entries): * | Entry | Ledger ID | Entry ID | Index | Messages | * | :--- | ---: | ---: | ---: | ---: | * | A | 0 | 0 | 2 | 0,1,2 | * | B | 0 | 1 | 4 | 3,4 | * Param with indexes 0,1,2 or 3,4 will return the **same MessageID** (e.g., `MessageId(0:0:*)` for Entry A). * @implNote The return {@link CompletableFuture<MessageId>} that completes with the message id of the message. * The future may complete exceptionally with: * <ul> * <li>{@link AuthorizationException} (HTTP 403) Permission denied for topic/namespace access.</li> * <li>{@link NotFoundException} (HTTP 404) Shadow topic/namespace does not exist or invalid index.</li> * <li>{@link PulsarAdminException} (HTTP 406) Shadow topic is not a persistent topic.</li> * <li>{@link PreconditionFailedException} (HTTP 412) Broker entry metadata is not enabled.</li> * <li>{@link PulsarAdminException} (HTTP 307) Redirect required to the correct broker.</li> * <li>{@link PulsarAdminException} Other errors (e.g., HTTP 500).</li> * </ul> */ CompletableFuture<MessageId> getMessageIdByIndexAsync(String topicName, long index);
Add new subcommand to pulsar-admin:
pulsar-admin topics get-message-id-by-index \ --index 12345 \ persistent://tenant/ns/topic