Initial commit for TableView (#12838)
* initial commit for TableView
* update doc for TableViewBuilder
* log topic name
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
index 30b9d8c..83097f6 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClient.java
@@ -223,6 +223,27 @@
<T> ReaderBuilder<T> newReader(Schema<T> schema);
/**
+ * Create a table view builder with a specific schema for subscribing on a specific topic.
+ *
+ * <p>The TableView provides a key-value map view of a compacted topic. Messages without keys will
+ * be ignored.
+ *
+ * <p>Example:
+ * <pre>{@code
+ * TableView<byte[]> tableView = client.newTableView(Schema.BYTES)
+ * .topic("my-topic")
+ * .autoUpdatePartitionsInterval(5, TimeUnit.SECONDS)
+ * .create();
+ *
+ * tableView.forEach((k, v) -> System.out.println(k + ":" + v));
+ * }</pre>
+ *
+ * @param schema provide a way to convert between serialized data and domain objects
+ * @return a {@link TableViewBuilder} object to configure and construct the {@link TableView} instance
+ */
+ <T> TableViewBuilder<T> newTableViewBuilder(Schema<T> schema);
+
+ /**
* Update the service URL this client is using.
*
* <p>This will force the client close all existing connections and to restart service discovery to the new service
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java
new file mode 100644
index 0000000..6067621
--- /dev/null
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableView.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.api;
+
+import java.io.Closeable;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BiConsumer;
+
+public interface TableView<T> extends Closeable {
+
+ /**
+ * Returns the number of key-value mappings in the {@link TableView}.
+ *
+ * @return the number of key-value mappings in this TableView
+ */
+ int size();
+
+ /**
+ * Returns {@code true} if this {@link TableView} contains no key-value mappings.
+ *
+ * @return true if this TableView contains no key-value mappings
+ */
+ boolean isEmpty();
+
+ /**
+ * Returns {@code true} if this {@link TableView} contains a mapping for the specified
+ * key.
+ *
+ * @param key key whose presence in this map is to be tested
+ * @return true if this map contains a mapping for the specified key
+ */
+ boolean containsKey(String key);
+
+ /**
+ * Returns the value to which the specified key is mapped, or null if this map contains
+ * no mapping for the key.
+ *
+ * @param key the key whose associated value is to be returned
+ * @return the value associated with the key or null if the keys was not found
+ */
+ T get(String key);
+
+ /**
+ * Returns a Set view of the mappings contained in this map.
+ *
+ * @return a set view of the mappings contained in this map
+ */
+ Set<Map.Entry<String, T>> entrySet();
+
+ /**
+ * Returns a {@link Set} view of the keys contained in this {@link TableView}.
+ *
+ * @return a set view of the keys contained in this map
+ */
+ Set<String> keySet();
+
+ /**
+ * Returns a Collection view of the values contained in this {@link TableView}.
+ *
+ * @return a collection view of the values contained in this TableView
+ */
+ Collection<T> values();
+
+ /**
+ * Performs the given action for each entry in this map until all entries
+ * have been processed or the action throws an exception.
+ *
+ * @param action The action to be performed for each entry
+ */
+ void forEach(BiConsumer<String, T> action);
+
+ /**
+ * Performs the give action for each entry in this map until all entries
+ * have been processed or the action throws an exception.
+ *
+ * @param action The action to be performed for each entry
+ */
+ void forEachAndListen(BiConsumer<String, T> action);
+
+ /**
+ * Close the table view and releases resources allocated.
+ *
+ * @return a future that can used to track when the table view has been closed.
+ */
+ CompletableFuture<Void> closeAsync();
+}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableViewBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableViewBuilder.java
new file mode 100644
index 0000000..d620304
--- /dev/null
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TableViewBuilder.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.api;
+
+import org.apache.pulsar.common.classification.InterfaceAudience;
+import org.apache.pulsar.common.classification.InterfaceStability;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * {@link TableViewBuilder} is used to configure and create instances of {@link TableView}.
+ *
+ * @see PulsarClient#newTableViewBuilder(Schema) ()
+ *
+ * @since 2.10.0
+ */
+@InterfaceAudience.Public
+@InterfaceStability.Evolving
+public interface TableViewBuilder<T> {
+
+ /**
+ * Load the configuration from provided <tt>config</tt> map.
+ *
+ * <p>Example:
+ *
+ * <pre>{@code
+ * Map<String, Object> config = new HashMap<>();
+ * config.put("topicName", "test-topic");
+ * config.put("autoUpdatePartitionsSeconds", "300");
+ *
+ * TableViewBuilder<byte[]> builder = ...;
+ * builder = builder.loadConf(config);
+ *
+ * TableView<byte[]> tableView = builder.create();
+ * }</pre>
+ *
+ * @param config configuration to load
+ * @return the {@link TableViewBuilder} instance
+ */
+ TableViewBuilder<T> loadConf(Map<String, Object> config);
+
+ /**
+ * Finalize the creation of the {@link TableView} instance.
+ *
+ * <p>This method will block until the tableView is created successfully or an exception is thrown.
+ *
+ * @return the {@link TableView} instance
+ * @throws PulsarClientException
+ * if the tableView creation fails
+ */
+ TableView<T> create() throws PulsarClientException;
+
+ /**
+ * Finalize the creation of the {@link TableView} instance in asynchronous mode.
+ *
+ * <p>This method will return a {@link CompletableFuture} that can be used to access the instance when it's ready.
+ *
+ * @return the {@link TableView} instance
+ */
+ CompletableFuture<TableView<T>> createAsync();
+
+ /**
+ * Set the topic name of the {@link TableView}
+ *
+ * @param topic the name of the topic to create the {@link TableView}
+ * @return the {@link TableViewBuilder} builder instance
+ */
+ TableViewBuilder<T> topic(String topic);
+
+ /**
+ * Set the interval of updating partitions <i>(default: 1 minute)</i>
+ * @param interval the interval of updating partitions
+ * @param unit the time unit of the interval
+ * @return the {@link TableViewBuilder} builder instance
+ */
+ TableViewBuilder<T> autoUpdatePartitionsInterval(int interval, TimeUnit unit);
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index 0ea78c3..a39d727 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -63,6 +63,7 @@
import org.apache.pulsar.client.api.RegexSubscriptionMode;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
+import org.apache.pulsar.client.api.TableViewBuilder;
import org.apache.pulsar.client.api.schema.KeyValueSchema;
import org.apache.pulsar.client.api.schema.SchemaInfoProvider;
import org.apache.pulsar.client.api.transaction.TransactionBuilder;
@@ -259,6 +260,11 @@
return new ReaderBuilderImpl<>(this, schema);
}
+ @Override
+ public <T> TableViewBuilder<T> newTableViewBuilder(Schema<T> schema) {
+ return new TableViewBuilderImpl<>(this, schema);
+ }
+
public CompletableFuture<Producer<byte[]>> createProducerAsync(ProducerConfigurationData conf) {
return createProducerAsync(conf, Schema.BYTES, null);
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewBuilderImpl.java
new file mode 100644
index 0000000..f82a746
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewBuilderImpl.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TableView;
+import org.apache.pulsar.client.api.TableViewBuilder;
+import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils;
+
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+public class TableViewBuilderImpl<T> implements TableViewBuilder<T> {
+
+ private final PulsarClientImpl client;
+ private final Schema<T> schema;
+ private TableViewConfigurationData conf;
+
+ TableViewBuilderImpl(PulsarClientImpl client, Schema<T> schema) {
+ this.client = client;
+ this.schema = schema;
+ this.conf = new TableViewConfigurationData();
+ }
+
+ @Override
+ public TableViewBuilder<T> loadConf(Map<String, Object> config) {
+ conf = ConfigurationDataUtils.loadData(
+ config, conf, TableViewConfigurationData.class);
+ return this;
+ }
+
+ @Override
+ public TableView<T> create() throws PulsarClientException {
+ try {
+ return createAsync().get();
+ } catch (Exception e) {
+ throw PulsarClientException.unwrap(e);
+ }
+ }
+
+ @Override
+ public CompletableFuture<TableView<T>> createAsync() {
+ return new TableViewImpl<>(client, schema, conf).start();
+ }
+
+ @Override
+ public TableViewBuilder<T> topic(String topic) {
+ checkArgument(StringUtils.isNotBlank(topic), "topic cannot be blank");
+ conf.setTopicName(StringUtils.trim(topic));
+ return this;
+ }
+
+ @Override
+ public TableViewBuilder<T> autoUpdatePartitionsInterval(int interval, TimeUnit unit) {
+ checkArgument(unit.toSeconds(interval) >= 1, "minimum is 1 second");
+ conf.setAutoUpdatePartitionsSeconds(unit.toSeconds(interval));
+ return this;
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewConfigurationData.java
new file mode 100644
index 0000000..f5a51d0
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewConfigurationData.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.io.Serializable;
+
+@Data
+@NoArgsConstructor
+public class TableViewConfigurationData implements Serializable, Cloneable {
+ private static final long serialVersionUID = 1L;
+
+ private String topicName = null;
+ private long autoUpdatePartitionsSeconds = 60;
+
+ @Override
+ public TableViewConfigurationData clone() {
+ try {
+ TableViewConfigurationData clone = (TableViewConfigurationData) super.clone();
+ clone.setTopicName(topicName);
+ clone.setAutoUpdatePartitionsSeconds(autoUpdatePartitionsSeconds);
+ return clone;
+ } catch (CloneNotSupportedException e) {
+ throw new AssertionError();
+ }
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
new file mode 100644
index 0000000..bfcc972
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/TableViewImpl.java
@@ -0,0 +1,274 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pulsar.client.impl;
+
+import io.netty.util.Timeout;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.MessageId;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.Reader;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.TableView;
+import org.apache.pulsar.common.util.FutureUtil;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BiConsumer;
+import java.util.stream.Collectors;
+
+@Slf4j
+public class TableViewImpl<T> implements TableView<T> {
+
+ private final PulsarClientImpl client;
+ private final Schema<T> schema;
+ private final TableViewConfigurationData conf;
+
+ private final ConcurrentMap<String, T> data;
+
+ private final ConcurrentMap<String, Reader<T>> readers;
+
+ private final List<BiConsumer<String, T>> listeners;
+ private final ReentrantLock listenersMutex;
+
+ TableViewImpl(PulsarClientImpl client, Schema<T> schema, TableViewConfigurationData conf) {
+ this.client = client;
+ this.schema = schema;
+ this.conf = conf;
+ this.data = new ConcurrentHashMap<>();
+ this.readers = new ConcurrentHashMap<>();
+ this.listeners = new ArrayList<>();
+ this.listenersMutex = new ReentrantLock();
+ }
+
+ CompletableFuture<TableView<T>> start() {
+ return client.getPartitionsForTopic(conf.getTopicName())
+ .thenCompose(partitions -> {
+ Set<String> partitionsSet = new HashSet<>(partitions);
+ List<CompletableFuture<?>> futures = new ArrayList<>();
+
+ // Add new Partitions
+ partitions.forEach(partition -> {
+ if (!readers.containsKey(partition)) {
+ futures.add(newReader(partition));
+ }
+ });
+
+ // Remove partitions that are not used anymore
+ readers.forEach((existingPartition, existingReader) -> {
+ if (!partitionsSet.contains(existingPartition)) {
+ futures.add(existingReader.closeAsync()
+ .thenRun(() -> readers.remove(existingPartition, existingReader)));
+ }
+ });
+
+ return FutureUtil.waitForAll(futures)
+ .thenRun(() -> schedulePartitionsCheck());
+ }).thenApply(__ -> this);
+ }
+
+ private void schedulePartitionsCheck() {
+ client.timer()
+ .newTimeout(this::checkForPartitionsChanges, conf.getAutoUpdatePartitionsSeconds(), TimeUnit.SECONDS);
+ }
+
+ private void checkForPartitionsChanges(Timeout timeout) {
+ if (timeout.isCancelled()) {
+ return ;
+ }
+
+ start().whenComplete((tw, ex) -> {
+ if (ex != null) {
+ log.warn("Failed to check for changes in number of partitions");
+ }
+ });
+ }
+
+ @Override
+ public int size() {
+ return data.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return data.isEmpty();
+ }
+
+ @Override
+ public boolean containsKey(String key) {
+ return data.containsKey(key);
+ }
+
+ @Override
+ public T get(String key) {
+ return data.get(key);
+ }
+
+ @Override
+ public Set<Map.Entry<String, T>> entrySet() {
+ return data.entrySet();
+ }
+
+ @Override
+ public Set<String> keySet() {
+ return data.keySet();
+ }
+
+ @Override
+ public Collection<T> values() {
+ return data.values();
+ }
+
+ @Override
+ public void forEach(BiConsumer<String, T> action) {
+ data.forEach(action);
+ }
+
+ @Override
+ public void forEachAndListen(BiConsumer<String, T> action) {
+ // Ensure we iterate over all the existing entry _and_ start the listening from the exact next message
+ try {
+ listenersMutex.lock();
+
+ // Execute the action over existing entries
+ forEach(action);
+
+ listeners.add(action);
+ } finally {
+ listenersMutex.unlock();
+ }
+ }
+
+ @Override
+ public CompletableFuture<Void> closeAsync() {
+ return FutureUtil.waitForAll(
+ readers.values().stream()
+ .map(Reader::closeAsync)
+ .collect(Collectors.toList())
+ );
+ }
+
+ @Override
+ public void close() throws PulsarClientException {
+ try {
+ closeAsync().get();
+ } catch (Exception e) {
+ throw PulsarClientException.unwrap(e);
+ }
+ }
+
+ private void handleMessage(Message<T> msg) {
+ try {
+ if (msg.hasKey()) {
+ if (log.isDebugEnabled()) {
+ log.debug("Applying message from topic {}. key={} value={}",
+ conf.getTopicName(),
+ msg.getKey(),
+ msg.getValue());
+ }
+
+ try {
+ listenersMutex.lock();
+ data.put(msg.getKey(), msg.getValue());
+
+ for (BiConsumer<String, T> listener : listeners) {
+ try {
+ listener.accept(msg.getKey(), msg.getValue());
+ } catch (Throwable t) {
+ log.error("Table view listener raised an exception", t);
+ }
+ }
+ } finally {
+ listenersMutex.unlock();
+ }
+ }
+ } finally {
+ msg.release();
+ }
+ }
+
+ private CompletableFuture<Reader<T>> newReader(String partition) {
+ return client.newReader(schema)
+ .topic(partition)
+ .startMessageId(MessageId.earliest)
+ .readCompacted(true)
+ .poolMessages(true)
+ .createAsync()
+ .thenCompose(this::readAllExistingMessages);
+ }
+
+ private CompletableFuture<Reader<T>> readAllExistingMessages(Reader<T> reader) {
+ long startTime = System.nanoTime();
+ AtomicLong messagesRead = new AtomicLong();
+
+ CompletableFuture<Reader<T>> future = new CompletableFuture<>();
+ readAllExistingMessages(reader, future, startTime, messagesRead);
+ return future;
+ }
+
+ private void readAllExistingMessages(Reader<T> reader, CompletableFuture<Reader<T>> future, long startTime,
+ AtomicLong messagesRead) {
+ reader.hasMessageAvailableAsync()
+ .thenAccept(hasMessage -> {
+ if (hasMessage) {
+ reader.readNextAsync()
+ .thenAccept(msg -> {
+ messagesRead.incrementAndGet();
+ handleMessage(msg);
+ readAllExistingMessages(reader, future, startTime, messagesRead);
+ }).exceptionally(ex -> {
+ future.completeExceptionally(ex);
+ return null;
+ });
+ } else {
+ // Reached the end
+ long endTime = System.nanoTime();
+ long durationMillis = TimeUnit.NANOSECONDS.toMillis(endTime - startTime);
+ log.info("Started table view for topic {} - Replayed {} messages in {} seconds",
+ reader.getTopic(),
+ messagesRead,
+ durationMillis / 1000.0);
+ future.complete(reader);
+ readTailMessages(reader);
+ }
+ });
+ }
+
+ private void readTailMessages(Reader<T> reader) {
+ reader.readNextAsync()
+ .thenAccept(msg -> {
+ handleMessage(msg);
+ readTailMessages(reader);
+ }).exceptionally(ex -> {
+ log.info("Reader {} was interrupted", reader.getTopic());
+ return null;
+ });
+ }
+}