Initial commit for TableView (#12838)

* initial commit for TableView

* update doc for TableViewBuilder

* log topic name
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
index 30b9d8c..83097f6 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
@@ -223,6 +223,27 @@
     <T> ReaderBuilder<T> newReader(Schema<T> schema);
 
     /**
+     * Create a table view builder with a specific schema for subscribing on a specific topic.
+     *
+     * <p>The TableView provides a key-value map view of a compacted topic. Messages without keys will
+     * be ignored.
+     *
+     * <p>Example:
+     * <pre>{@code
+     *  TableView<byte[]> tableView = client.newTableView(Schema.BYTES)
+     *            .topic("my-topic")
+     *            .autoUpdatePartitionsInterval(5, TimeUnit.SECONDS)
+     *            .create();
+     *
+     *  tableView.forEach((k, v) -> System.out.println(k + ":" + v));
+     * }</pre>
+     *
+     * @param schema provide a way to convert between serialized data and domain objects
+     * @return a {@link TableViewBuilder} object to configure and construct the {@link TableView} instance
+     */
+    <T> TableViewBuilder<T> newTableViewBuilder(Schema<T> schema);
+
+    /**
      * Update the service URL this client is using.
      *
      * <p>This will force the client close all existing connections and to restart service discovery to the new service
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java
new file mode 100644
index 0000000..6067621
--- /dev/null
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.api;
+
+import java.io.Closeable;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+
+public interface TableView<T> extends Closeable {
+
+    /**
+     * Returns the number of key-value mappings in the {@link TableView}.
+     *
+     * @return the number of key-value mappings in this TableView
+     */
+    int size();
+
+    /**
+     * Returns {@code true} if this {@link TableView} contains no key-value mappings.
+     *
+     * @return true if this TableView contains no key-value mappings
+     */
+    boolean isEmpty();
+
+    /**
+     * Returns {@code true} if this {@link TableView} contains a mapping for the specified
+     * key.
+     *
+     * @param key key whose presence in this map is to be tested
+     * @return true if this map contains a mapping for the specified key
+     */
+    boolean containsKey(String key);
+
+    /**
+     * Returns the value to which the specified key is mapped, or null if this map contains
+     * no mapping for the key.
+     *
+     * @param key the key whose associated value is to be returned
+     * @return the value associated with the key or null if the keys was not found
+     */
+    T get(String key);
+
+    /**
+     * Returns a Set view of the mappings contained in this map.
+     *
+     * @return a set view of the mappings contained in this map
+     */
+    Set<Map.Entry<String, T>> entrySet();
+
+    /**
+     * Returns a {@link Set} view of the keys contained in this {@link TableView}.
+     *
+     * @return a set view of the keys contained in this map
+     */
+    Set<String> keySet();
+
+    /**
+     * Returns a Collection view of the values contained in this {@link TableView}.
+     *
+     * @return a collection view of the values contained in this TableView
+     */
+    Collection<T> values();
+
+    /**
+     * Performs the given action for each entry in this map until all entries
+     * have been processed or the action throws an exception.
+     *
+     * @param action The action to be performed for each entry
+     */
+    void forEach(BiConsumer<String, T> action);
+
+    /**
+     * Performs the give action for each entry in this map until all entries
+     * have been processed or the action throws an exception.
+     *
+     * @param action The action to be performed for each entry
+     */
+    void forEachAndListen(BiConsumer<String, T> action);
+
+    /**
+     * Close the table view and releases resources allocated.
+     *
+     * @return a future that can used to track when the table view has been closed.
+     */
+    CompletableFuture<Void> closeAsync();
+}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableViewBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableViewBuilder.java
new file mode 100644
index 0000000..d620304
--- /dev/null
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableViewBuilder.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.api;
+
+import org.apache.pulsar.common.classification.InterfaceAudience;
+import org.apache.pulsar.common.classification.InterfaceStability;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * {@link TableViewBuilder} is used to configure and create instances of {@link TableView}.
+ *
+ * @see PulsarClient#newTableViewBuilder(Schema) ()
+ *
+ * @since 2.10.0
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface TableViewBuilder<T> {
+
+    /**
+     * Load the configuration from provided <tt>config</tt> map.
+     *
+     *  <p>Example:
+     *
+     *  <pre>{@code
+     *  Map<String, Object> config = new HashMap<>();
+     *  config.put("topicName", "test-topic");
+     *  config.put("autoUpdatePartitionsSeconds", "300");
+     *
+     *  TableViewBuilder<byte[]> builder = ...;
+     *  builder = builder.loadConf(config);
+     *
+     *  TableView<byte[]> tableView = builder.create();
+     *  }</pre>
+     *
+     * @param config configuration to load
+     * @return the {@link TableViewBuilder} instance
+     */
+    TableViewBuilder<T> loadConf(Map<String, Object> config);
+
+    /**
+     * Finalize the creation of the {@link TableView} instance.
+     *
+     * <p>This method will block until the tableView is created successfully or an exception is thrown.
+     *
+     * @return the {@link TableView} instance
+     * @throws PulsarClientException
+     *              if the tableView creation fails
+     */
+    TableView<T> create() throws PulsarClientException;
+
+    /**
+     * Finalize the creation of the {@link TableView} instance in asynchronous mode.
+     *
+     *  <p>This method will return a {@link CompletableFuture} that can be used to access the instance when it's ready.
+     *
+     * @return the {@link TableView} instance
+     */
+    CompletableFuture<TableView<T>> createAsync();
+
+    /**
+     * Set the topic name of the {@link TableView}
+     *
+     * @param topic the name of the topic to create the {@link TableView}
+     * @return the {@link TableViewBuilder} builder instance
+     */
+    TableViewBuilder<T> topic(String topic);
+
+    /**
+     * Set the interval of updating partitions <i>(default: 1 minute)</i>
+     * @param interval the interval of updating partitions
+     * @param unit the time unit of the interval
+     * @return the {@link TableViewBuilder} builder instance
+     */
+    TableViewBuilder<T> autoUpdatePartitionsInterval(int interval, TimeUnit unit);
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 0ea78c3..a39d727 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -63,6 +63,7 @@
 import org.apache.pulsar.client.api.RegexSubscriptionMode;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.TableViewBuilder;
 import org.apache.pulsar.client.api.schema.KeyValueSchema;
 import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
 import org.apache.pulsar.client.api.transaction.TransactionBuilder;
@@ -259,6 +260,11 @@
         return new ReaderBuilderImpl<>(this, schema);
     }
 
+    @Override
+    public <T> TableViewBuilder<T> newTableViewBuilder(Schema<T> schema) {
+        return new TableViewBuilderImpl<>(this, schema);
+    }
+
     public CompletableFuture<Producer<byte[]>> createProducerAsync(ProducerConfigurationData conf) {
         return createProducerAsync(conf, Schema.BYTES, null);
     }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewBuilderImpl.java
new file mode 100644
index 0000000..f82a746
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewBuilderImpl.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TableView;
+import org.apache.pulsar.client.api.TableViewBuilder;
+import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class TableViewBuilderImpl<T> implements TableViewBuilder<T> {
+
+    private final PulsarClientImpl client;
+    private final Schema<T> schema;
+    private TableViewConfigurationData conf;
+
+    TableViewBuilderImpl(PulsarClientImpl client, Schema<T> schema) {
+        this.client = client;
+        this.schema = schema;
+        this.conf = new TableViewConfigurationData();
+    }
+
+    @Override
+    public TableViewBuilder<T> loadConf(Map<String, Object> config) {
+        conf = ConfigurationDataUtils.loadData(
+                config, conf, TableViewConfigurationData.class);
+        return this;
+    }
+
+    @Override
+    public TableView<T> create() throws PulsarClientException {
+       try {
+           return createAsync().get();
+       } catch (Exception e) {
+           throw PulsarClientException.unwrap(e);
+       }
+    }
+
+    @Override
+    public CompletableFuture<TableView<T>> createAsync() {
+       return new TableViewImpl<>(client, schema, conf).start();
+    }
+
+    @Override
+    public TableViewBuilder<T> topic(String topic) {
+       checkArgument(StringUtils.isNotBlank(topic), "topic cannot be blank");
+       conf.setTopicName(StringUtils.trim(topic));
+       return this;
+    }
+
+    @Override
+    public TableViewBuilder<T> autoUpdatePartitionsInterval(int interval, TimeUnit unit) {
+       checkArgument(unit.toSeconds(interval) >= 1, "minimum is 1 second");
+       conf.setAutoUpdatePartitionsSeconds(unit.toSeconds(interval));
+       return this;
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewConfigurationData.java
new file mode 100644
index 0000000..f5a51d0
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewConfigurationData.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+
+@Data
+@NoArgsConstructor
+public class TableViewConfigurationData implements Serializable, Cloneable {
+    private static final long serialVersionUID = 1L;
+
+    private String topicName = null;
+    private long autoUpdatePartitionsSeconds = 60;
+
+    @Override
+    public TableViewConfigurationData clone() {
+        try {
+            TableViewConfigurationData clone = (TableViewConfigurationData) super.clone();
+            clone.setTopicName(topicName);
+            clone.setAutoUpdatePartitionsSeconds(autoUpdatePartitionsSeconds);
+            return clone;
+        } catch (CloneNotSupportedException e) {
+            throw new AssertionError();
+        }
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
new file mode 100644
index 0000000..bfcc972
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl;
+
+import io.netty.util.Timeout;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TableView;
+import org.apache.pulsar.common.util.FutureUtil;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class TableViewImpl<T> implements TableView<T> {
+
+    private final PulsarClientImpl client;
+    private final Schema<T> schema;
+    private final TableViewConfigurationData conf;
+
+    private final ConcurrentMap<String, T> data;
+
+    private final ConcurrentMap<String, Reader<T>> readers;
+
+    private final List<BiConsumer<String, T>> listeners;
+    private final ReentrantLock listenersMutex;
+
+    TableViewImpl(PulsarClientImpl client, Schema<T> schema, TableViewConfigurationData conf) {
+        this.client = client;
+        this.schema = schema;
+        this.conf = conf;
+        this.data = new ConcurrentHashMap<>();
+        this.readers = new ConcurrentHashMap<>();
+        this.listeners = new ArrayList<>();
+        this.listenersMutex = new ReentrantLock();
+    }
+
+    CompletableFuture<TableView<T>> start() {
+        return client.getPartitionsForTopic(conf.getTopicName())
+                .thenCompose(partitions -> {
+                    Set<String> partitionsSet = new HashSet<>(partitions);
+                    List<CompletableFuture<?>> futures = new ArrayList<>();
+
+                    // Add new Partitions
+                    partitions.forEach(partition -> {
+                        if (!readers.containsKey(partition)) {
+                            futures.add(newReader(partition));
+                        }
+                    });
+
+                    // Remove partitions that are not used anymore
+                    readers.forEach((existingPartition, existingReader) -> {
+                        if (!partitionsSet.contains(existingPartition)) {
+                            futures.add(existingReader.closeAsync()
+                                    .thenRun(() -> readers.remove(existingPartition, existingReader)));
+                        }
+                    });
+
+                    return FutureUtil.waitForAll(futures)
+                            .thenRun(() -> schedulePartitionsCheck());
+                }).thenApply(__ -> this);
+    }
+
+    private void schedulePartitionsCheck() {
+        client.timer()
+                .newTimeout(this::checkForPartitionsChanges, conf.getAutoUpdatePartitionsSeconds(), TimeUnit.SECONDS);
+    }
+
+    private void checkForPartitionsChanges(Timeout timeout) {
+        if (timeout.isCancelled()) {
+            return ;
+        }
+
+        start().whenComplete((tw, ex) -> {
+           if (ex != null) {
+               log.warn("Failed to check for changes in number of partitions");
+           }
+        });
+    }
+
+    @Override
+    public int size() {
+        return data.size();
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return data.isEmpty();
+    }
+
+    @Override
+    public boolean containsKey(String key) {
+        return data.containsKey(key);
+    }
+
+    @Override
+    public T get(String key) {
+       return data.get(key);
+    }
+
+    @Override
+    public Set<Map.Entry<String, T>> entrySet() {
+       return data.entrySet();
+    }
+
+    @Override
+    public Set<String> keySet() {
+        return data.keySet();
+    }
+
+    @Override
+    public Collection<T> values() {
+        return data.values();
+    }
+
+    @Override
+    public void forEach(BiConsumer<String, T> action) {
+        data.forEach(action);
+    }
+
+    @Override
+    public void forEachAndListen(BiConsumer<String, T> action) {
+        // Ensure we iterate over all the existing entry _and_ start the listening from the exact next message
+        try {
+            listenersMutex.lock();
+
+            // Execute the action over existing entries
+            forEach(action);
+
+            listeners.add(action);
+        } finally {
+            listenersMutex.unlock();
+        }
+    }
+
+    @Override
+    public CompletableFuture<Void> closeAsync() {
+        return FutureUtil.waitForAll(
+                readers.values().stream()
+                        .map(Reader::closeAsync)
+                        .collect(Collectors.toList())
+        );
+    }
+
+    @Override
+    public void close() throws PulsarClientException {
+        try {
+            closeAsync().get();
+        } catch (Exception e) {
+            throw PulsarClientException.unwrap(e);
+        }
+    }
+
+    private void handleMessage(Message<T> msg) {
+        try {
+            if (msg.hasKey()) {
+                if (log.isDebugEnabled()) {
+                    log.debug("Applying message from topic {}. key={} value={}",
+                            conf.getTopicName(),
+                            msg.getKey(),
+                            msg.getValue());
+                }
+
+                try {
+                    listenersMutex.lock();
+                    data.put(msg.getKey(), msg.getValue());
+
+                    for (BiConsumer<String, T> listener : listeners) {
+                        try {
+                            listener.accept(msg.getKey(), msg.getValue());
+                        } catch (Throwable t) {
+                            log.error("Table view listener raised an exception", t);
+                        }
+                    }
+                } finally {
+                    listenersMutex.unlock();
+                }
+            }
+        } finally {
+            msg.release();
+        }
+    }
+
+    private CompletableFuture<Reader<T>> newReader(String partition) {
+        return client.newReader(schema)
+                .topic(partition)
+                .startMessageId(MessageId.earliest)
+                .readCompacted(true)
+                .poolMessages(true)
+                .createAsync()
+                .thenCompose(this::readAllExistingMessages);
+    }
+
+    private CompletableFuture<Reader<T>> readAllExistingMessages(Reader<T> reader) {
+        long startTime = System.nanoTime();
+        AtomicLong messagesRead = new AtomicLong();
+
+        CompletableFuture<Reader<T>> future = new CompletableFuture<>();
+        readAllExistingMessages(reader, future, startTime, messagesRead);
+        return future;
+    }
+
+    private void readAllExistingMessages(Reader<T> reader, CompletableFuture<Reader<T>> future, long startTime,
+                                         AtomicLong messagesRead) {
+        reader.hasMessageAvailableAsync()
+                .thenAccept(hasMessage -> {
+                   if (hasMessage) {
+                       reader.readNextAsync()
+                               .thenAccept(msg -> {
+                                  messagesRead.incrementAndGet();
+                                  handleMessage(msg);
+                                  readAllExistingMessages(reader, future, startTime, messagesRead);
+                               }).exceptionally(ex -> {
+                                   future.completeExceptionally(ex);
+                                   return null;
+                               });
+                   } else {
+                       // Reached the end
+                       long endTime = System.nanoTime();
+                       long durationMillis = TimeUnit.NANOSECONDS.toMillis(endTime - startTime);
+                       log.info("Started table view for topic {} - Replayed {} messages in {} seconds",
+                               reader.getTopic(),
+                               messagesRead,
+                               durationMillis / 1000.0);
+                       future.complete(reader);
+                       readTailMessages(reader);
+                   }
+                });
+    }
+
+    private void readTailMessages(Reader<T> reader) {
+        reader.readNextAsync()
+                .thenAccept(msg -> {
+                    handleMessage(msg);
+                    readTailMessages(reader);
+                }).exceptionally(ex -> {
+                    log.info("Reader {} was interrupted", reader.getTopic());
+                    return null;
+                });
+    }
+}