blob: 560636f94622b9ff3a282ede7f9bbea2d10acff1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;
import static org.apache.pulsar.common.topics.TopicCompactionStrategy.TABLE_VIEW_TAG;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import lombok.extern.slf4j.Slf4j;
import org.apache.pulsar.client.api.CryptoKeyReader;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.apache.pulsar.client.api.ReaderBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TableView;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.topics.TopicCompactionStrategy;
@Slf4j
public class TableViewImpl<T> implements TableView<T> {
private final TableViewConfigurationData conf;
private final ConcurrentMap<String, T> data;
private final Map<String, T> immutableData;
private final CompletableFuture<Reader<T>> reader;
private final List<BiConsumer<String, T>> listeners;
private final ReentrantLock listenersMutex;
private final boolean isPersistentTopic;
private TopicCompactionStrategy<T> compactionStrategy;
TableViewImpl(PulsarClientImpl client, Schema<T> schema, TableViewConfigurationData conf) {
this.conf = conf;
this.isPersistentTopic = conf.getTopicName().startsWith(TopicDomain.persistent.toString());
this.data = new ConcurrentHashMap<>();
this.immutableData = Collections.unmodifiableMap(data);
this.listeners = new ArrayList<>();
this.listenersMutex = new ReentrantLock();
this.compactionStrategy =
TopicCompactionStrategy.load(TABLE_VIEW_TAG, conf.getTopicCompactionStrategyClassName());
ReaderBuilder<T> readerBuilder = client.newReader(schema)
.topic(conf.getTopicName())
.startMessageId(MessageId.earliest)
.autoUpdatePartitions(true)
.autoUpdatePartitionsInterval((int) conf.getAutoUpdatePartitionsSeconds(), TimeUnit.SECONDS)
.poolMessages(true)
.subscriptionName(conf.getSubscriptionName());
if (isPersistentTopic) {
readerBuilder.readCompacted(true);
}
CryptoKeyReader cryptoKeyReader = conf.getCryptoKeyReader();
if (cryptoKeyReader != null) {
readerBuilder.cryptoKeyReader(cryptoKeyReader);
}
readerBuilder.cryptoFailureAction(conf.getCryptoFailureAction());
this.reader = readerBuilder.createAsync();
}
CompletableFuture<TableView<T>> start() {
return reader.thenCompose((reader) -> {
if (!isPersistentTopic) {
readTailMessages(reader);
return CompletableFuture.completedFuture(reader);
}
return this.readAllExistingMessages(reader);
}).thenApply(__ -> this);
}
@Override
public int size() {
return data.size();
}
@Override
public boolean isEmpty() {
return data.isEmpty();
}
@Override
public boolean containsKey(String key) {
return data.containsKey(key);
}
@Override
public T get(String key) {
return data.get(key);
}
@Override
public Set<Map.Entry<String, T>> entrySet() {
return immutableData.entrySet();
}
@Override
public Set<String> keySet() {
return immutableData.keySet();
}
@Override
public Collection<T> values() {
return immutableData.values();
}
@Override
public void forEach(BiConsumer<String, T> action) {
data.forEach(action);
}
@Override
public void listen(BiConsumer<String, T> action) {
try {
listenersMutex.lock();
listeners.add(action);
} finally {
listenersMutex.unlock();
}
}
@Override
public void forEachAndListen(BiConsumer<String, T> action) {
// Ensure we iterate over all the existing entry _and_ start the listening from the exact next message
try {
listenersMutex.lock();
// Execute the action over existing entries
forEach(action);
listeners.add(action);
} finally {
listenersMutex.unlock();
}
}
@Override
public CompletableFuture<Void> closeAsync() {
return reader.thenCompose(Reader::closeAsync);
}
@Override
public void close() throws PulsarClientException {
try {
closeAsync().get();
} catch (Exception e) {
throw PulsarClientException.unwrap(e);
}
}
private void handleMessage(Message<T> msg) {
try {
if (msg.hasKey()) {
String key = msg.getKey();
T cur = msg.size() > 0 ? msg.getValue() : null;
if (log.isDebugEnabled()) {
log.debug("Applying message from topic {}. key={} value={}",
conf.getTopicName(),
key,
cur);
}
boolean update = true;
if (compactionStrategy != null) {
T prev = data.get(key);
update = !compactionStrategy.shouldKeepLeft(prev, cur);
if (!update) {
log.info("Skipped the message from topic {}. key={} value={} prev={}",
conf.getTopicName(),
key,
cur,
prev);
compactionStrategy.handleSkippedMessage(key, cur);
}
}
if (update) {
try {
listenersMutex.lock();
if (null == cur) {
data.remove(key);
} else {
data.put(key, cur);
}
for (BiConsumer<String, T> listener : listeners) {
try {
listener.accept(key, cur);
} catch (Throwable t) {
log.error("Table view listener raised an exception", t);
}
}
} finally {
listenersMutex.unlock();
}
}
}
} finally {
msg.release();
}
}
private CompletableFuture<Reader<T>> readAllExistingMessages(Reader<T> reader) {
long startTime = System.nanoTime();
AtomicLong messagesRead = new AtomicLong();
CompletableFuture<Reader<T>> future = new CompletableFuture<>();
readAllExistingMessages(reader, future, startTime, messagesRead);
return future;
}
private void readAllExistingMessages(Reader<T> reader, CompletableFuture<Reader<T>> future, long startTime,
AtomicLong messagesRead) {
reader.hasMessageAvailableAsync()
.thenAccept(hasMessage -> {
if (hasMessage) {
reader.readNextAsync()
.thenAccept(msg -> {
messagesRead.incrementAndGet();
handleMessage(msg);
readAllExistingMessages(reader, future, startTime, messagesRead);
}).exceptionally(ex -> {
if (ex.getCause() instanceof PulsarClientException.AlreadyClosedException) {
log.error("Reader {} was closed while reading existing messages.",
reader.getTopic(), ex);
} else {
log.warn("Reader {} was interrupted while reading existing messages. ",
reader.getTopic(), ex);
}
future.completeExceptionally(ex);
return null;
});
} else {
// Reached the end
long endTime = System.nanoTime();
long durationMillis = TimeUnit.NANOSECONDS.toMillis(endTime - startTime);
log.info("Started table view for topic {} - Replayed {} messages in {} seconds",
reader.getTopic(),
messagesRead,
durationMillis / 1000.0);
future.complete(reader);
readTailMessages(reader);
}
});
}
private void readTailMessages(Reader<T> reader) {
reader.readNextAsync()
.thenAccept(msg -> {
handleMessage(msg);
readTailMessages(reader);
}).exceptionally(ex -> {
if (ex.getCause() instanceof PulsarClientException.AlreadyClosedException) {
log.error("Reader {} was closed while reading tail messages.",
reader.getTopic(), ex);
} else {
log.warn("Reader {} was interrupted while reading tail messages. "
+ "Retrying..", reader.getTopic(), ex);
readTailMessages(reader);
}
return null;
});
}
}