PIP-416: Add a new topic method to implement trigger offload by size threshold

Background Knowledge

Apache Pulsar supports offloading historical data from BookKeeper to long-term storage (such as AWS S3, GCS, etc.). Currently, the Apache Pulsar client provides a MessageId-based offload API, allowing users to offload data preceding a specific message ID to cold storage. Apache Pulsar CLI also provides a command line interface to trigger offload operations based on storage size thresholds.

Motivation

The current client offload method requires users to specify a particular MessageId as the offload point. However, in real-world scenarios, users are typically more concerned about storage size rather than specific message IDs. Users want to trigger offload operations based on size thresholds, automatically moving a certain amount of historical data to cold storage.

Goals

Provide a new client method for topic offloading based on size thresholds, enabling users to manage topic storage more conveniently.

Scope

Enable clients to trigger topic data offloading by specifying a storage size threshold.

Public-facing Changes

Add a new interface implementation to the client that supports topic offloading based on size thresholds. There is no need to add new interfaces to the broker's REST API, as the implementation will reference the approach used in the CLI (org.apache.pulsar.admin.cli) Offload command, which converts the sizeThreshold to a specific messageId:

static MessageId findFirstLedgerWithinThreshold(List<PersistentTopicInternalStats.LedgerInfo> ledgers,
                                                    long sizeThreshold) {
        long suffixSize = 0L;

        ledgers = Lists.reverse(ledgers);
        long previousLedger = ledgers.get(0).ledgerId;
        for (PersistentTopicInternalStats.LedgerInfo l : ledgers) {
            suffixSize += l.size;
            if (suffixSize > sizeThreshold) {
                return new MessageIdImpl(previousLedger, 0L, -1);
            }
            previousLedger = l.ledgerId;
        }
        return null;
    }

The implementation will still call the triggerOffload API in PersistentTopics.

Public API

Topics.java (Interface)

Add new interface declarations to org.apache.pulsar.client.admin.Topics:

/**
 * Trigger offload of data to long-term storage based on size threshold
 *
 * @param topic
 *            Topic name
 * @param sizeThreshold
 *            Size threshold in bytes
 * @throws PulsarAdminException
 */
void triggerOffload(String topic, long sizeThreshold) throws PulsarAdminException;

/**
 * Trigger offload of data to long-term storage based on size threshold asynchronously
 *
 * @param topic
 *            Topic name
 * @param sizeThreshold
 *            Size threshold in bytes
 * @return Future that completes once the offload operation has started
 */
CompletableFuture<Void> triggerOffloadAsync(String topic, long sizeThreshold);

Backward & Forward Compatibility

Fully compatible. This is a new client method implementation that does not affect existing functionality.

General Notes

Links