PIP-104: Add new consumer type: TableView

In many use cases, applications are using Pulsar consumers or readers to fetch all the updates from a topic and construct a map with the latest value of each key for the messages that were received. This is very common when constructing a local cache of the data.

We want to offer support for this access pattern directly in the Pulsar client API, as a way to encapsulate the complexities of setting this up.

Goal

Provide a view of the topic data in the form of a read-only map that is constantly updated with the latest version of each key.

Additionally, let the application specify a listener so that it can perform a scan of the map and then receive notifications when new messages are received and applied.

API Changes

This proposal will only add new API on the client side.

A new type of consumer will be added, the TableView.

Example:

TableView<Integer> tableView = pulsarClient.newTableView(Schema.INT32)
    .topic(topic)
    .create();

tableView.get("my-key"); // --> 5
tableView.get("my-other-key"); // --> 7

When a TableView instance is created, it will be guaranteed to already have the latest value for each key, for the current time.

API additions

interface PulsarClient {
    // ....
    <T> TableViewBuilder<T> newTableView(Schema<T> schema);
}

interface TableViewBuilder<T> {
    TableViewBuilder<T> loadConf(Map<String, Object> config);
    TableView<T> create() throws PulsarClientException;
    CompletableFuture<TableView<T>> createAsync();
    TableViewBuilder<T> topic(String topic);
    TableViewBuilder<T> autoUpdatePartitionsInterval(int interval, TimeUnit unit);
}

interface TableView<T> extends Closeable {

    // Similar methods as java.util.Map
    int size();
    boolean isEmpty();
    boolean containsKey(String key);
    T get(String key);
    Set<Map.Entry<String, T>> entrySet();
    Set<String> keySet();
    Collection<T> values();
    void forEach(BiConsumer<String, T> action);

    /**
     * Performs the given action for each entry in this map until all entries
     * have been processed or the action throws an exception.
     *
     * When all the entries have been processed, the action will be invoked
     * for every new update that is received from the topic.
     *
     * @param action The action to be performed for each entry
     */
    void forEachAndListen(BiConsumer<String, T> action);

    /**
     * Close the table view and releases resources allocated.
     *
     * @return a future that can used to track when the table view has been closed
     */
    CompletableFuture<Void> closeAsync();
}

Implementation

The TableView will be implemented using multiple Reader instances, one per each partition and will always specify to read starting from the compacted view of the topic.

The creation time of a table view can be controlled by configuring the topic compaction policies for the given topic or namespace. More frequent compaction can lead to very short startup times, as in less data will be replayed to reconstruct the TableView of the topic.