PIP-429: Optimize Handling of Compacted Last Entry by Skipping Payload Buffer Parsing

Background knowledge

The typical reader's work flow looks like:

while (reader.hasMessageAvailable()) {
    final var msg = reader.readNext();
    handleMessage(msg);
}

hasMessageAvailable could perform the GetLastMessageId RPC to get the last message ID from broker. However, when the reader is a compacted reader, i.e. readCompacted(true) is configured when creating the reader, the server side could compute the message ID from the last entry in the compaction service.

Generally, with the built-in compaction service, when the entry represents a batch of messages, the compacted entry buffer consists of:

  1. Serialized MessageMetadata
  2. Serialized payload buffer, which can be compressed or encrypted. The uncompressed payload buffer consists of a list of SingleMessageMetadata and value buffers.

Take a typical example, when a producer that configures LZ4 as the compression type sends the following messages in a batch:

producer.newMessage().key("k0").value("v0").sendAsync();
producer.newMessage().key("k0").value("v1").sendAsync();
producer.newMessage().key("k1").value("v0").sendAsync();
producer.newMessage().key("k1").value(null).sendAsync();

After the compaction, the compacted entry buffer could be represented as follows:

metadata: # MessageMetadata
  num_messages_in_batch: 4
  compression: LZ4
payload:
  - singleMetadata: # SingleMessageMetadata
      key: k0
      compactedOut: true
    value: ""
  - singleMetadata:
      key: k0
      compactedOut: false
    value: v1
  - singleMetadata:
      key: k1
      compactedOut: true
    value: ""
  - singleMetadata:
      key: k1
      compactedOut: true
      nullValue: true
    value: ""
  • For a given key, only the latest value will be retained, so k0 => v0 will be compacted out.
  • A null value means the key will be removed, so k1 => v0 and k1 => null will be compacted out.

Prior to #18877, the hasMessageAvailable and readNext loop might encounter issues because the GetLastMessageId RPC returns {ledger, entry, batchIndex=3} as the last message ID, which represents k1 => null.

The issue occurs because the batch index of the last message ID is calculated as num_messages_in_batch - 1 without considering certain edge cases. #18877 resolves this problem by uncompressing the compacted entry buffer on the broker side and filtering out messages where the individual metadata has compactedOut set to true. This ensures that only valid messages are considered when determining the last message ID.

The compacted_out field was first introduced in the early stages of development through #1361. However, as part of the overall payload buffer, parsing a SingleMessageMetadata currently requires decompressing the compacted entry buffer. This process can be resource-intensive, particularly when handling large topics or encrypted messages, leading to potential performance bottlenecks.

Motivation

Decompressing the payload buffer solely to check whether individual messages have the compacted_out field set is both inefficient and restrictive, as it imposes constraints on the payload buffer format. Furthermore, when using a custom topic compaction service, the entry buffer in the compacted ledger may not include a SingleMessageMetadata for every single message, adding further complexity to the process.

This challenge is exacerbated when messages are encrypted, as decryption is not possible without the public key required for decryption. This limitation also impacts the current compaction service, as encrypted messages cannot be compacted. Consequently, operations such as the GetLastMessageId RPC will fail, resulting in an error similar to the following:

org.apache.pulsar.client.api.PulsarClientException$BrokerMetadataException: The subscription sub of the topic <topic> gets the last message id was failed
{"errorMsg":"Failed to read last entry of the compacted Ledger Invalid unknown tag type: 3","reqId":3317275583068061944, "remote":"localhost/127.0.0.1:50818", "local":"/127.0.0.1:50823"}

Instead, the expected behavior is to return the last message ID (e.g., k1 => null in the previous example).

Another issue arises from the assumption made by the GetLastMessageId RPC that the compacted entry's payload buffer must always contain a SingleMessageMetadata list. However, this is not always the case. For instance, a custom topic compaction service might write a payload buffer that omits the SingleMessageMetadata. In such cases, the compactedOut information could instead be stored in the properties of the MessageMetadata, but the GetLastMessageId RPC will always fail.

The custom topic compaction service has the flexibility to serialize and deserialize the payload buffer in a different format. However, it still depends on the GetLastMessageId RPC in the hasMessageAvailableAsync and getLastMessageIdAsync methods of the RawReader to compute the last message ID. This reliance creates a compatibility issue, as the GetLastMessageId RPC will fail when working with a payload buffer in a non-standard format, breaking the functionality of these methods.

Goals

In Scope

Instead of relying on the compacted_out field in SingleMessageMetadata, this PIP proposes to use MessageMetadata to determine the last message ID in the compacted last entry. Since compacted_out is no longer used, the payload buffer's format could be improved as well.

Out of Scope

High Level Design

To enhance efficiency and simplify the handling of compacted entries, a new field will be added to MessageMetadata to record the batch indexes of all retained single messages. This change allows the server to determine the last message ID directly from the new field in MessageMetadata when processing a GetLastMessageId request, rather than relying on the compacted_out field in SingleMessageMetadata.

With this update, the compacted_out field will no longer be used, and only the retained messages will be included in the payload buffer. For example, the previous representation:

metadata: # MessageMetadata
  num_messages_in_batch: 4
  compression: LZ4
  compacted_batch_indexes: [1] # Retained messages' batch indexes

The improvements are:

  1. Reduced Payload Size:

    • In the new format, only the retained messages are included in the payload buffer.
    • For the example above, the new format contains just 1 pair of SingleMessageMetadata and its corresponding value buffer, compared to 4 pairs in the original format.
  2. Efficient Batch Index Retrieval:

    • The actual batch index of retained messages can now be retrieved from the compacted_batch_indexes field in MessageMetadata.
    • In this example, the retained message's batch index is compacted_batch_indexes[0] = 1.

Public-facing Changes

Public API

Remove readLastCompactedEntry and findEntryByEntryIndex methods from TopicCompactionService interface.

  • findEntryByEntryIndex: it's never used other in tests
  • readLastCompactedEntry: it exposes the Entry to the caller, while the caller should only care about the position of the last message.

Change the return value of findEntryByPublishTime from CompletableFuture<Entry> to CompletableFuture<Position>. This change is made because the caller only needs the position of the last message, not the entire entry.

Add the following new methods:

/**
 * Retrieve the position of the last message before compaction.
 *
 * @return A future that completes with the position of the last message before compaction, or
 *         {@link MessagePosition#EARLIEST} if no such message exists.
 */
CompletableFuture<MessagePosition> getLastMessagePosition();

/**
 * Represents the position of a message.
 * <p>
 * The `ledgerId` and `entryId` together specify the exact entry to which the message belongs. For batched messages,
 * the `batchIndex` field indicates the index of the message within the batch. If the message is not part of a
 * batch, the `batchIndex` field is set to -1. The `publishTime` field corresponds to the publishing time of the
 * entry's metadata, providing a timestamp for when the entry was published.
 * </p>
 */
record MessagePosition(long ledgerId, long entryId, int batchIndex, long publishTime) {

    public static final MessagePosition EARLIEST = new MessagePosition(-1L, -1L, 0, 0);
}

After this change, the processing of the entry buffer will be handled within the TopicCompactionService instead of being managed on the caller side (ServerCnx). This adjustment enhances the flexibility of the implementation, enabling more advanced use cases. For example, it allows the position to be stored in an external metadata service, providing greater scalability and modularity.

Binary protocol

A new field will be added to MessageMetadata:

    // Indicates the indexes of messages retained in the batch after compaction. When a batch is compacted,
    // some messages may be removed (compacted out). For example, if the original batch contains:
    // `k0 => v0, k1 => v1, k2 => v2, k1 => null`, the compacted batch will retain only `k0 => v0` and `k2 => v2`.
    // In this case, this field will be set to `[0, 2]`, and the payload buffer will only include the retained messages.
    //
    // Note: Batches compacted by older versions of the compaction service do not include this field. For such batches,
    // the `compacted_out` field in `SingleMessageMetadata` must be checked to identify and filter out compacted messages
    // (e.g., `k1 => v1` and `k1 => null` in the example above).
    repeated int32 compacted_batch_indexes = 31;

Backward & Forward Compatibility

For entry buffers written by old version brokers, there is no compacted_batch_indexes field in the MessageMetadata. In this case, the GetLastMessageId RPC will still work as before, relying on the compacted_out field in SingleMessageMetadata to determine the last message ID.

Downgrading remains safe because the compacted_batch_indexes field, which is unknown to older versions, will simply be ignored when parsing MessageMetadata. Additionally, this proposal ensures backward compatibility by retaining the compacted_out field in SingleMessageMetadata.

Alternatives

Using a Property in MessageMetadata

#24431 proposed a solution to add a property to the MessageMetadata to indicate the last available message‘s batch index in the batch. However, it’s not flexiable and might have conflicts with user provided properties.

Client-Side Computation of Last Compacted Message ID

The previous solution this proposal is to pass the compacted last entry's buffer as well as the GetLastMessageId response, so the client can compute the last message ID by itself. It can even handle the encrypted messages because the client side should have the public key to decrypt the message.

However, it's unnecessarily complicated that the whole entry buffer will be sent to the client for each GetLastMessageId RPC.

Payload Buffer Format Improvement

Actually, we can reduce the payload buffer size by removing the SingleMessageMetadata and empty value buffers for messages that are compacted out.

payload:
  - singleMetadata:
      key: k0
      compactedOut: false
    value: v1

However, it would bring a compatibility issue that older clients would not be able to parse the new format unless a MessagePayloadProcessor is configured.

Links