blob: 549eb23617fa18a0e57a39e2eacdf43d582c0942 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMap.Builder;
import com.google.common.collect.Lists;
import com.google.common.collect.Queues;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerStats;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.NotSupportedException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.client.util.ConsumerName;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandAck.AckType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkState;
public class MultiTopicsConsumerImpl<T> extends ConsumerBase<T> {
public static final String DUMMY_TOPIC_NAME_PREFIX = "MultiTopicsConsumer-";
// Map <topic+partition, consumer>, when get do ACK, consumer will by find by topic name
private final ConcurrentHashMap<String, ConsumerImpl<T>> consumers;
// Map <topic, numPartitions>, store partition number for each topic
protected final ConcurrentHashMap<String, Integer> topics;
// Queue of partition consumers on which we have stopped calling receiveAsync() because the
// shared incoming queue was full
private final ConcurrentLinkedQueue<ConsumerImpl<T>> pausedConsumers;
// Threshold for the shared queue. When the size of the shared queue goes below the threshold, we are going to
// resume receiving from the paused consumer partitions
private final int sharedQueueResumeThreshold;
// sum of topicPartitions, simple topic has 1, partitioned topic equals to partition number.
AtomicInteger allTopicPartitionsNumber;
// timeout related to auto check and subscribe partition increasement
private volatile Timeout partitionsAutoUpdateTimeout = null;
TopicsPartitionChangedListener topicsPartitionChangedListener;
CompletableFuture<Void> partitionsAutoUpdateFuture = null;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final ConsumerStatsRecorder stats;
private final UnAckedMessageTracker unAckedMessageTracker;
private final ConsumerConfigurationData<T> internalConfig;
MultiTopicsConsumerImpl(PulsarClientImpl client, ConsumerConfigurationData<T> conf,
ExecutorService listenerExecutor, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema,
ConsumerInterceptors<T> interceptors, boolean createTopicIfDoesNotExist) {
this(client, DUMMY_TOPIC_NAME_PREFIX + ConsumerName.generateRandomName(), conf, listenerExecutor,
subscribeFuture, schema, interceptors, createTopicIfDoesNotExist);
}
MultiTopicsConsumerImpl(PulsarClientImpl client, String singleTopic, ConsumerConfigurationData<T> conf,
ExecutorService listenerExecutor, CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema,
ConsumerInterceptors<T> interceptors, boolean createTopicIfDoesNotExist) {
super(client, singleTopic, conf, Math.max(2, conf.getReceiverQueueSize()), listenerExecutor, subscribeFuture,
schema, interceptors);
checkArgument(conf.getReceiverQueueSize() > 0,
"Receiver queue size needs to be greater than 0 for Topics Consumer");
this.topics = new ConcurrentHashMap<>();
this.consumers = new ConcurrentHashMap<>();
this.pausedConsumers = new ConcurrentLinkedQueue<>();
this.sharedQueueResumeThreshold = maxReceiverQueueSize / 2;
this.allTopicPartitionsNumber = new AtomicInteger(0);
if (conf.getAckTimeoutMillis() != 0) {
if (conf.getTickDurationMillis() > 0) {
this.unAckedMessageTracker = new UnAckedTopicMessageTracker(client, this, conf.getAckTimeoutMillis(), conf.getTickDurationMillis());
} else {
this.unAckedMessageTracker = new UnAckedTopicMessageTracker(client, this, conf.getAckTimeoutMillis());
}
} else {
this.unAckedMessageTracker = UnAckedMessageTracker.UNACKED_MESSAGE_TRACKER_DISABLED;
}
this.internalConfig = getInternalConsumerConfig();
this.stats = client.getConfiguration().getStatsIntervalSeconds() > 0 ? new ConsumerStatsRecorderImpl() : null;
// start track and auto subscribe partition increasement
if (conf.isAutoUpdatePartitions()) {
topicsPartitionChangedListener = new TopicsPartitionChangedListener();
partitionsAutoUpdateTimeout = client.timer()
.newTimeout(partitionsAutoUpdateTimerTask, conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS);
}
if (conf.getTopicNames().isEmpty()) {
setState(State.Ready);
subscribeFuture().complete(MultiTopicsConsumerImpl.this);
return;
}
checkArgument(conf.getTopicNames().isEmpty() || topicNamesValid(conf.getTopicNames()), "Topics is empty or invalid.");
List<CompletableFuture<Void>> futures = conf.getTopicNames().stream().map(t -> subscribeAsync(t, createTopicIfDoesNotExist))
.collect(Collectors.toList());
FutureUtil.waitForAll(futures)
.thenAccept(finalFuture -> {
if (allTopicPartitionsNumber.get() > maxReceiverQueueSize) {
setMaxReceiverQueueSize(allTopicPartitionsNumber.get());
}
setState(State.Ready);
// We have successfully created N consumers, so we can start receiving messages now
startReceivingMessages(new ArrayList<>(consumers.values()));
log.info("[{}] [{}] Created topics consumer with {} sub-consumers",
topic, subscription, allTopicPartitionsNumber.get());
subscribeFuture().complete(MultiTopicsConsumerImpl.this);
})
.exceptionally(ex -> {
log.warn("[{}] Failed to subscribe topics: {}", topic, ex.getMessage());
subscribeFuture.completeExceptionally(ex);
return null;
});
}
// Check topics are valid.
// - each topic is valid,
// - topic names are unique.
private static boolean topicNamesValid(Collection<String> topics) {
checkState(topics != null && topics.size() >= 1,
"topics should contain more than 1 topic");
Optional<String> result = topics.stream()
.filter(topic -> !TopicName.isValid(topic))
.findFirst();
if (result.isPresent()) {
log.warn("Received invalid topic name: {}", result.get());
return false;
}
// check topic names are unique
HashSet<String> set = new HashSet<>(topics);
if (set.size() == topics.size()) {
return true;
} else {
log.warn("Topic names not unique. unique/all : {}/{}", set.size(), topics.size());
return false;
}
}
private void startReceivingMessages(List<ConsumerImpl<T>> newConsumers) {
if (log.isDebugEnabled()) {
log.debug("[{}] startReceivingMessages for {} new consumers in topics consumer, state: {}",
topic, newConsumers.size(), getState());
}
if (getState() == State.Ready) {
newConsumers.forEach(consumer -> {
consumer.increaseAvailablePermits(consumer.getConnectionHandler().cnx(), conf.getReceiverQueueSize());
receiveMessageFromConsumer(consumer);
});
}
}
private void receiveMessageFromConsumer(ConsumerImpl<T> consumer) {
consumer.receiveAsync().thenAccept(message -> {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Receive message from sub consumer:{}",
topic, subscription, consumer.getTopic());
}
// Process the message, add to the queue and trigger listener or async callback
messageReceived(consumer, message);
int size = incomingMessages.size();
if (size >= maxReceiverQueueSize
|| (size > sharedQueueResumeThreshold && !pausedConsumers.isEmpty())) {
// mark this consumer to be resumed later: if No more space left in shared queue,
// or if any consumer is already paused (to create fair chance for already paused consumers)
pausedConsumers.add(consumer);
} else {
// Schedule next receiveAsync() if the incoming queue is not full. Use a different thread to avoid
// recursion and stack overflow
client.getInternalExecutorService().execute(() -> receiveMessageFromConsumer(consumer));
}
});
}
private void messageReceived(ConsumerImpl<T> consumer, Message<T> message) {
checkArgument(message instanceof MessageImpl);
TopicMessageImpl<T> topicMessage = new TopicMessageImpl<>(
consumer.getTopic(), consumer.getTopicNameWithoutPartition(), message);
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Received message from topics-consumer {}",
topic, subscription, message.getMessageId());
}
// if asyncReceive is waiting : return message to callback without adding to incomingMessages queue
CompletableFuture<Message<T>> receivedFuture = pollPendingReceive();
if (receivedFuture != null) {
unAckedMessageTracker.add(topicMessage.getMessageId());
completePendingReceive(receivedFuture, topicMessage);
} else if (enqueueMessageAndCheckBatchReceive(topicMessage) && hasPendingBatchReceive()) {
notifyPendingBatchReceivedCallBack();
}
if (listener != null) {
// Trigger the notification on the message listener in a separate thread to avoid blocking the networking
// thread while the message processing happens
listenerExecutor.execute(() -> {
Message<T> msg;
try {
msg = internalReceive(0, TimeUnit.MILLISECONDS);
if (msg == null) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Message has been cleared from the queue", topic, subscription);
}
return;
}
} catch (PulsarClientException e) {
log.warn("[{}] [{}] Failed to dequeue the message for listener", topic, subscription, e);
return;
}
try {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Calling message listener for message {}",
topic, subscription, message.getMessageId());
}
listener.received(MultiTopicsConsumerImpl.this, msg);
} catch (Throwable t) {
log.error("[{}][{}] Message listener error in processing message: {}",
topic, subscription, message, t);
}
});
}
}
protected synchronized void messageProcessed(Message<?> msg) {
unAckedMessageTracker.add(msg.getMessageId());
decreaseIncomingMessageSize(msg);
}
private void resumeReceivingFromPausedConsumersIfNeeded() {
if (incomingMessages.size() <= sharedQueueResumeThreshold && !pausedConsumers.isEmpty()) {
while (true) {
ConsumerImpl<T> consumer = pausedConsumers.poll();
if (consumer == null) {
break;
}
client.getInternalExecutorService().execute(() -> {
receiveMessageFromConsumer(consumer);
});
}
}
}
@Override
protected Message<T> internalReceive() throws PulsarClientException {
Message<T> message;
try {
message = incomingMessages.take();
decreaseIncomingMessageSize(message);
checkState(message instanceof TopicMessageImpl);
unAckedMessageTracker.add(message.getMessageId());
resumeReceivingFromPausedConsumersIfNeeded();
return message;
} catch (Exception e) {
throw PulsarClientException.unwrap(e);
}
}
@Override
protected Message<T> internalReceive(int timeout, TimeUnit unit) throws PulsarClientException {
Message<T> message;
try {
message = incomingMessages.poll(timeout, unit);
if (message != null) {
decreaseIncomingMessageSize(message);
checkArgument(message instanceof TopicMessageImpl);
unAckedMessageTracker.add(message.getMessageId());
}
resumeReceivingFromPausedConsumersIfNeeded();
return message;
} catch (Exception e) {
throw PulsarClientException.unwrap(e);
}
}
@Override
protected Messages<T> internalBatchReceive() throws PulsarClientException {
try {
return internalBatchReceiveAsync().get();
} catch (InterruptedException | ExecutionException e) {
State state = getState();
if (state != State.Closing && state != State.Closed) {
stats.incrementNumBatchReceiveFailed();
throw PulsarClientException.unwrap(e);
} else {
return null;
}
}
}
@Override
protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
CompletableFutureCancellationHandler cancellationHandler = new CompletableFutureCancellationHandler();
CompletableFuture<Messages<T>> result = cancellationHandler.createFuture();
try {
lock.writeLock().lock();
if (pendingBatchReceives == null) {
pendingBatchReceives = Queues.newConcurrentLinkedQueue();
}
if (hasEnoughMessagesForBatchReceive()) {
MessagesImpl<T> messages = getNewMessagesImpl();
Message<T> msgPeeked = incomingMessages.peek();
while (msgPeeked != null && messages.canAdd(msgPeeked)) {
Message<T> msg = incomingMessages.poll();
if (msg != null) {
decreaseIncomingMessageSize(msg);
Message<T> interceptMsg = beforeConsume(msg);
messages.add(interceptMsg);
}
msgPeeked = incomingMessages.peek();
}
result.complete(messages);
} else {
OpBatchReceive<T> opBatchReceive = OpBatchReceive.of(result);
pendingBatchReceives.add(opBatchReceive);
cancellationHandler.setCancelAction(() -> pendingBatchReceives.remove(opBatchReceive));
}
resumeReceivingFromPausedConsumersIfNeeded();
} finally {
lock.writeLock().unlock();
}
return result;
}
@Override
protected CompletableFuture<Message<T>> internalReceiveAsync() {
CompletableFutureCancellationHandler cancellationHandler = new CompletableFutureCancellationHandler();
CompletableFuture<Message<T>> result = cancellationHandler.createFuture();
Message<T> message = incomingMessages.poll();
if (message == null) {
pendingReceives.add(result);
cancellationHandler.setCancelAction(() -> pendingReceives.remove(result));
} else {
decreaseIncomingMessageSize(message);
checkState(message instanceof TopicMessageImpl);
unAckedMessageTracker.add(message.getMessageId());
resumeReceivingFromPausedConsumersIfNeeded();
result.complete(message);
}
return result;
}
@Override
protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ackType,
Map<String,Long> properties,
TransactionImpl txnImpl) {
checkArgument(messageId instanceof TopicMessageIdImpl);
TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId;
if (getState() != State.Ready) {
return FutureUtil.failedFuture(new PulsarClientException("Consumer already closed"));
}
if (ackType == AckType.Cumulative) {
Consumer individualConsumer = consumers.get(topicMessageId.getTopicPartitionName());
if (individualConsumer != null) {
MessageId innerId = topicMessageId.getInnerMessageId();
return individualConsumer.acknowledgeCumulativeAsync(innerId);
} else {
return FutureUtil.failedFuture(new PulsarClientException.NotConnectedException());
}
} else {
ConsumerImpl<T> consumer = consumers.get(topicMessageId.getTopicPartitionName());
MessageId innerId = topicMessageId.getInnerMessageId();
return consumer.doAcknowledgeWithTxn(innerId, ackType, properties, txnImpl)
.thenRun(() ->
unAckedMessageTracker.remove(topicMessageId));
}
}
@Override
protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType ackType,
Map<String,Long> properties,
long delayTime,
TimeUnit unit) {
MessageId messageId = message.getMessageId();
checkArgument(messageId instanceof TopicMessageIdImpl);
TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId;
if (getState() != State.Ready) {
return FutureUtil.failedFuture(new PulsarClientException("Consumer already closed"));
}
if (ackType == AckType.Cumulative) {
Consumer individualConsumer = consumers.get(topicMessageId.getTopicPartitionName());
if (individualConsumer != null) {
MessageId innerId = topicMessageId.getInnerMessageId();
return individualConsumer.reconsumeLaterCumulativeAsync(message, delayTime, unit);
} else {
return FutureUtil.failedFuture(new PulsarClientException.NotConnectedException());
}
} else {
ConsumerImpl<T> consumer = consumers.get(topicMessageId.getTopicPartitionName());
MessageId innerId = topicMessageId.getInnerMessageId();
return consumer.doReconsumeLater(message, ackType, properties, delayTime, unit)
.thenRun(() ->unAckedMessageTracker.remove(topicMessageId));
}
}
@Override
public void negativeAcknowledge(MessageId messageId) {
checkArgument(messageId instanceof TopicMessageIdImpl);
TopicMessageIdImpl topicMessageId = (TopicMessageIdImpl) messageId;
ConsumerImpl<T> consumer = consumers.get(topicMessageId.getTopicPartitionName());
consumer.negativeAcknowledge(topicMessageId.getInnerMessageId());
}
@Override
public CompletableFuture<Void> unsubscribeAsync() {
if (getState() == State.Closing || getState() == State.Closed) {
return FutureUtil.failedFuture(
new PulsarClientException.AlreadyClosedException("Topics Consumer was already closed"));
}
setState(State.Closing);
CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>();
List<CompletableFuture<Void>> futureList = consumers.values().stream()
.map(c -> c.unsubscribeAsync()).collect(Collectors.toList());
FutureUtil.waitForAll(futureList)
.whenComplete((r, ex) -> {
if (ex == null) {
setState(State.Closed);
unAckedMessageTracker.close();
unsubscribeFuture.complete(null);
log.info("[{}] [{}] [{}] Unsubscribed Topics Consumer",
topic, subscription, consumerName);
} else {
setState(State.Failed);
unsubscribeFuture.completeExceptionally(ex);
log.error("[{}] [{}] [{}] Could not unsubscribe Topics Consumer",
topic, subscription, consumerName, ex.getCause());
}
});
return unsubscribeFuture;
}
@Override
public CompletableFuture<Void> closeAsync() {
if (getState() == State.Closing || getState() == State.Closed) {
unAckedMessageTracker.close();
return CompletableFuture.completedFuture(null);
}
setState(State.Closing);
if (partitionsAutoUpdateTimeout != null) {
partitionsAutoUpdateTimeout.cancel();
partitionsAutoUpdateTimeout = null;
}
CompletableFuture<Void> closeFuture = new CompletableFuture<>();
List<CompletableFuture<Void>> futureList = consumers.values().stream()
.map(c -> c.closeAsync()).collect(Collectors.toList());
FutureUtil.waitForAll(futureList)
.whenComplete((r, ex) -> {
if (ex == null) {
setState(State.Closed);
unAckedMessageTracker.close();
closeFuture.complete(null);
log.info("[{}] [{}] Closed Topics Consumer", topic, subscription);
client.cleanupConsumer(this);
// fail all pending-receive futures to notify application
failPendingReceive();
} else {
setState(State.Failed);
closeFuture.completeExceptionally(ex);
log.error("[{}] [{}] Could not close Topics Consumer", topic, subscription,
ex.getCause());
}
});
return closeFuture;
}
private void failPendingReceive() {
if (listenerExecutor != null && !listenerExecutor.isShutdown()) {
failPendingReceives(pendingReceives);
failPendingBatchReceives(pendingBatchReceives);
}
}
@Override
public boolean isConnected() {
return consumers.values().stream().allMatch(consumer -> consumer.isConnected());
}
@Override
String getHandlerName() {
return subscription;
}
private ConsumerConfigurationData<T> getInternalConsumerConfig() {
ConsumerConfigurationData<T> internalConsumerConfig = conf.clone();
internalConsumerConfig.setSubscriptionName(subscription);
internalConsumerConfig.setConsumerName(consumerName);
internalConsumerConfig.setMessageListener(null);
return internalConsumerConfig;
}
@Override
public void redeliverUnacknowledgedMessages() {
lock.writeLock().lock();
try {
consumers.values().stream().forEach(consumer -> {
consumer.redeliverUnacknowledgedMessages();
consumer.unAckedChunckedMessageIdSequenceMap.clear();
});
incomingMessages.clear();
resetIncomingMessageSize();
unAckedMessageTracker.clear();
} finally {
lock.writeLock().unlock();
}
resumeReceivingFromPausedConsumersIfNeeded();
}
@Override
public void redeliverUnacknowledgedMessages(Set<MessageId> messageIds) {
if (messageIds.isEmpty()) {
return;
}
checkArgument(messageIds.stream().findFirst().get() instanceof TopicMessageIdImpl);
if (conf.getSubscriptionType() != SubscriptionType.Shared) {
// We cannot redeliver single messages if subscription type is not Shared
redeliverUnacknowledgedMessages();
return;
}
removeExpiredMessagesFromQueue(messageIds);
messageIds.stream().map(messageId -> (TopicMessageIdImpl)messageId)
.collect(Collectors.groupingBy(TopicMessageIdImpl::getTopicPartitionName, Collectors.toSet()))
.forEach((topicName, messageIds1) ->
consumers.get(topicName)
.redeliverUnacknowledgedMessages(messageIds1.stream()
.map(mid -> mid.getInnerMessageId()).collect(Collectors.toSet())));
resumeReceivingFromPausedConsumersIfNeeded();
}
@Override
protected void completeOpBatchReceive(OpBatchReceive<T> op) {
notifyPendingBatchReceivedCallBack(op);
resumeReceivingFromPausedConsumersIfNeeded();
}
@Override
public void seek(MessageId messageId) throws PulsarClientException {
try {
seekAsync(messageId).get();
} catch (Exception e) {
throw PulsarClientException.unwrap(e);
}
}
@Override
public void seek(long timestamp) throws PulsarClientException {
try {
seekAsync(timestamp).get();
} catch (Exception e) {
}
}
@Override
public CompletableFuture<Void> seekAsync(MessageId messageId) {
return FutureUtil.failedFuture(new PulsarClientException("Seek operation not supported on topics consumer"));
}
@Override
public CompletableFuture<Void> seekAsync(long timestamp) {
List<CompletableFuture<Void>> futures = new ArrayList<>(consumers.size());
consumers.values().forEach(consumer -> futures.add(consumer.seekAsync(timestamp)));
return FutureUtil.waitForAll(futures);
}
@Override
public int getAvailablePermits() {
return consumers.values().stream().mapToInt(ConsumerImpl::getAvailablePermits).sum();
}
@Override
public boolean hasReachedEndOfTopic() {
return consumers.values().stream().allMatch(Consumer::hasReachedEndOfTopic);
}
@Override
public int numMessagesInQueue() {
return incomingMessages.size() + consumers.values().stream().mapToInt(ConsumerImpl::numMessagesInQueue).sum();
}
@Override
public synchronized ConsumerStats getStats() {
if (stats == null) {
return null;
}
stats.reset();
consumers.values().stream().forEach(consumer -> stats.updateCumulativeStats(consumer.getStats()));
return stats;
}
public UnAckedMessageTracker getUnAckedMessageTracker() {
return unAckedMessageTracker;
}
private void removeExpiredMessagesFromQueue(Set<MessageId> messageIds) {
Message<T> peek = incomingMessages.peek();
if (peek != null) {
if (!messageIds.contains(peek.getMessageId())) {
// first message is not expired, then no message is expired in queue.
return;
}
// try not to remove elements that are added while we remove
Message<T> message = incomingMessages.poll();
checkState(message instanceof TopicMessageImpl);
while (message != null) {
decreaseIncomingMessageSize(message);
MessageId messageId = message.getMessageId();
if (!messageIds.contains(messageId)) {
messageIds.add(messageId);
break;
}
message = incomingMessages.poll();
}
}
}
private TopicName getTopicName(String topic) {
try {
return TopicName.get(topic);
} catch (Exception ignored) {
return null;
}
}
private String getFullTopicName(String topic) {
TopicName topicName = getTopicName(topic);
return (topicName != null) ? topicName.toString() : null;
}
private void removeTopic(String topic) {
String fullTopicName = getFullTopicName(topic);
if (fullTopicName != null) {
topics.remove(topic);
}
}
// subscribe one more given topic
public CompletableFuture<Void> subscribeAsync(String topicName, boolean createTopicIfDoesNotExist) {
TopicName topicNameInstance = getTopicName(topicName);
if (topicNameInstance == null) {
return FutureUtil.failedFuture(
new PulsarClientException.AlreadyClosedException("Topic name not valid"));
}
String fullTopicName = topicNameInstance.toString();
if (topics.containsKey(fullTopicName) || topics.containsKey(topicNameInstance.getPartitionedTopicName())) {
return FutureUtil.failedFuture(
new PulsarClientException.AlreadyClosedException("Already subscribed to " + topicName));
}
if (getState() == State.Closing || getState() == State.Closed) {
return FutureUtil.failedFuture(
new PulsarClientException.AlreadyClosedException("Topics Consumer was already closed"));
}
CompletableFuture<Void> subscribeResult = new CompletableFuture<>();
client.getPartitionedTopicMetadata(topicName)
.thenAccept(metadata -> subscribeTopicPartitions(subscribeResult, fullTopicName, metadata.partitions,
createTopicIfDoesNotExist))
.exceptionally(ex1 -> {
log.warn("[{}] Failed to get partitioned topic metadata: {}", fullTopicName, ex1.getMessage());
subscribeResult.completeExceptionally(ex1);
return null;
});
return subscribeResult;
}
// create consumer for a single topic with already known partitions.
// first create a consumer with no topic, then do subscription for already know partitionedTopic.
public static <T> MultiTopicsConsumerImpl<T> createPartitionedConsumer(PulsarClientImpl client,
ConsumerConfigurationData<T> conf,
ExecutorService listenerExecutor,
CompletableFuture<Consumer<T>> subscribeFuture,
int numPartitions,
Schema<T> schema, ConsumerInterceptors<T> interceptors) {
checkArgument(conf.getTopicNames().size() == 1, "Should have only 1 topic for partitioned consumer");
// get topic name, then remove it from conf, so constructor will create a consumer with no topic.
ConsumerConfigurationData cloneConf = conf.clone();
String topicName = cloneConf.getSingleTopic();
cloneConf.getTopicNames().remove(topicName);
CompletableFuture<Consumer> future = new CompletableFuture<>();
MultiTopicsConsumerImpl consumer = new MultiTopicsConsumerImpl(client, topicName, cloneConf, listenerExecutor,
future, schema, interceptors, true /* createTopicIfDoesNotExist */);
future.thenCompose(c -> ((MultiTopicsConsumerImpl)c).subscribeAsync(topicName, numPartitions))
.thenRun(()-> subscribeFuture.complete(consumer))
.exceptionally(e -> {
log.warn("Failed subscription for createPartitionedConsumer: {} {}, e:{}",
topicName, numPartitions, e);
subscribeFuture.completeExceptionally(
PulsarClientException.wrap(((Throwable) e).getCause(), String.format("Failed to subscribe %s with %d partitions", topicName, numPartitions)));
return null;
});
return consumer;
}
// subscribe one more given topic, but already know the numberPartitions
@VisibleForTesting
CompletableFuture<Void> subscribeAsync(String topicName, int numberPartitions) {
TopicName topicNameInstance = getTopicName(topicName);
if (topicNameInstance == null) {
return FutureUtil.failedFuture(
new PulsarClientException.AlreadyClosedException("Topic name not valid"));
}
String fullTopicName = topicNameInstance.toString();
if (topics.containsKey(fullTopicName) || topics.containsKey(topicNameInstance.getPartitionedTopicName())) {
return FutureUtil.failedFuture(
new PulsarClientException.AlreadyClosedException("Already subscribed to " + topicName));
}
if (getState() == State.Closing || getState() == State.Closed) {
return FutureUtil.failedFuture(
new PulsarClientException.AlreadyClosedException("Topics Consumer was already closed"));
}
CompletableFuture<Void> subscribeResult = new CompletableFuture<>();
subscribeTopicPartitions(subscribeResult, fullTopicName, numberPartitions, true /* createTopicIfDoesNotExist */);
return subscribeResult;
}
private void subscribeTopicPartitions(CompletableFuture<Void> subscribeResult, String topicName, int numPartitions,
boolean createIfDoesNotExist) {
client.preProcessSchemaBeforeSubscribe(client, schema, topicName).whenComplete((schema, cause) -> {
if (null == cause) {
doSubscribeTopicPartitions(schema, subscribeResult, topicName, numPartitions, createIfDoesNotExist);
} else {
subscribeResult.completeExceptionally(cause);
}
});
}
private void doSubscribeTopicPartitions(Schema<T> schema,
CompletableFuture<Void> subscribeResult, String topicName, int numPartitions,
boolean createIfDoesNotExist) {
if (log.isDebugEnabled()) {
log.debug("Subscribe to topic {} metadata.partitions: {}", topicName, numPartitions);
}
List<CompletableFuture<Consumer<T>>> futureList;
if (numPartitions > 0) {
// Below condition is true if subscribeAsync() has been invoked second time with same
// topicName before the first invocation had reached this point.
boolean isTopicBeingSubscribedForInOtherThread = this.topics.putIfAbsent(topicName, numPartitions) != null;
if (isTopicBeingSubscribedForInOtherThread) {
String errorMessage = String.format("[%s] Failed to subscribe for topic [%s] in topics consumer. "
+ "Topic is already being subscribed for in other thread.", topic, topicName);
log.warn(errorMessage);
subscribeResult.completeExceptionally(new PulsarClientException(errorMessage));
return;
}
allTopicPartitionsNumber.addAndGet(numPartitions);
int receiverQueueSize = Math.min(conf.getReceiverQueueSize(),
conf.getMaxTotalReceiverQueueSizeAcrossPartitions() / numPartitions);
ConsumerConfigurationData<T> configurationData = getInternalConsumerConfig();
configurationData.setReceiverQueueSize(receiverQueueSize);
futureList = IntStream
.range(0, numPartitions)
.mapToObj(
partitionIndex -> {
String partitionName = TopicName.get(topicName).getPartition(partitionIndex).toString();
CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>();
ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, partitionName,
configurationData, client.externalExecutorProvider().getExecutor(),
partitionIndex, true, subFuture,
null, schema, interceptors,
createIfDoesNotExist);
consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
return subFuture;
})
.collect(Collectors.toList());
} else {
boolean isTopicBeingSubscribedForInOtherThread = this.topics.putIfAbsent(topicName, 1) != null;
if (isTopicBeingSubscribedForInOtherThread) {
String errorMessage = String.format("[%s] Failed to subscribe for topic [%s] in topics consumer. "
+ "Topic is already being subscribed for in other thread.", topic, topicName);
log.warn(errorMessage);
subscribeResult.completeExceptionally(new PulsarClientException(errorMessage));
return;
}
allTopicPartitionsNumber.incrementAndGet();
CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>();
ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(client, topicName, internalConfig,
client.externalExecutorProvider().getExecutor(), -1, true, subFuture, null,
schema, interceptors,
createIfDoesNotExist);
consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
futureList = Collections.singletonList(subFuture);
}
FutureUtil.waitForAll(futureList)
.thenAccept(finalFuture -> {
if (allTopicPartitionsNumber.get() > maxReceiverQueueSize) {
setMaxReceiverQueueSize(allTopicPartitionsNumber.get());
}
int numTopics = this.topics.values().stream().mapToInt(Integer::intValue).sum();
int currentAllTopicsPartitionsNumber = allTopicPartitionsNumber.get();
checkState(currentAllTopicsPartitionsNumber == numTopics,
"allTopicPartitionsNumber " + currentAllTopicsPartitionsNumber
+ " not equals expected: " + numTopics);
// We have successfully created new consumers, so we can start receiving messages for them
startReceivingMessages(
consumers.values().stream()
.filter(consumer1 -> {
String consumerTopicName = consumer1.getTopic();
if (TopicName.get(consumerTopicName).getPartitionedTopicName().equals(
TopicName.get(topicName).getPartitionedTopicName())) {
return true;
} else {
return false;
}
})
.collect(Collectors.toList()));
subscribeResult.complete(null);
log.info("[{}] [{}] Success subscribe new topic {} in topics consumer, partitions: {}, allTopicPartitionsNumber: {}",
topic, subscription, topicName, numPartitions, allTopicPartitionsNumber.get());
return;
})
.exceptionally(ex -> {
handleSubscribeOneTopicError(topicName, ex, subscribeResult);
return null;
});
}
// handling failure during subscribe new topic, unsubscribe success created partitions
private void handleSubscribeOneTopicError(String topicName, Throwable error, CompletableFuture<Void> subscribeFuture) {
log.warn("[{}] Failed to subscribe for topic [{}] in topics consumer {}", topic, topicName, error.getMessage());
client.externalExecutorProvider().getExecutor().submit(() -> {
AtomicInteger toCloseNum = new AtomicInteger(0);
consumers.values().stream().filter(consumer1 -> {
String consumerTopicName = consumer1.getTopic();
if (TopicName.get(consumerTopicName).getPartitionedTopicName().equals(TopicName.get(topicName).getPartitionedTopicName())) {
toCloseNum.incrementAndGet();
return true;
} else {
return false;
}
}).collect(Collectors.toList()).forEach(consumer2 -> {
consumer2.closeAsync().whenComplete((r, ex) -> {
consumer2.subscribeFuture().completeExceptionally(error);
allTopicPartitionsNumber.decrementAndGet();
consumers.remove(consumer2.getTopic());
if (toCloseNum.decrementAndGet() == 0) {
log.warn("[{}] Failed to subscribe for topic [{}] in topics consumer, subscribe error: {}",
topic, topicName, error.getMessage());
removeTopic(topicName);
subscribeFuture.completeExceptionally(error);
}
return;
});
});
});
}
// un-subscribe a given topic
public CompletableFuture<Void> unsubscribeAsync(String topicName) {
checkArgument(TopicName.isValid(topicName), "Invalid topic name:" + topicName);
if (getState() == State.Closing || getState() == State.Closed) {
return FutureUtil.failedFuture(
new PulsarClientException.AlreadyClosedException("Topics Consumer was already closed"));
}
if (partitionsAutoUpdateTimeout != null) {
partitionsAutoUpdateTimeout.cancel();
partitionsAutoUpdateTimeout = null;
}
CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>();
String topicPartName = TopicName.get(topicName).getPartitionedTopicName();
List<ConsumerImpl<T>> consumersToUnsub = consumers.values().stream()
.filter(consumer -> {
String consumerTopicName = consumer.getTopic();
if (TopicName.get(consumerTopicName).getPartitionedTopicName().equals(topicPartName)) {
return true;
} else {
return false;
}
}).collect(Collectors.toList());
List<CompletableFuture<Void>> futureList = consumersToUnsub.stream()
.map(ConsumerImpl::unsubscribeAsync).collect(Collectors.toList());
FutureUtil.waitForAll(futureList)
.whenComplete((r, ex) -> {
if (ex == null) {
consumersToUnsub.forEach(consumer1 -> {
consumers.remove(consumer1.getTopic());
pausedConsumers.remove(consumer1);
allTopicPartitionsNumber.decrementAndGet();
});
removeTopic(topicName);
((UnAckedTopicMessageTracker) unAckedMessageTracker).removeTopicMessages(topicName);
unsubscribeFuture.complete(null);
log.info("[{}] [{}] [{}] Unsubscribed Topics Consumer, allTopicPartitionsNumber: {}",
topicName, subscription, consumerName, allTopicPartitionsNumber);
} else {
unsubscribeFuture.completeExceptionally(ex);
setState(State.Failed);
log.error("[{}] [{}] [{}] Could not unsubscribe Topics Consumer",
topicName, subscription, consumerName, ex.getCause());
}
});
return unsubscribeFuture;
}
// Remove a consumer for a topic
public CompletableFuture<Void> removeConsumerAsync(String topicName) {
checkArgument(TopicName.isValid(topicName), "Invalid topic name:" + topicName);
if (getState() == State.Closing || getState() == State.Closed) {
return FutureUtil.failedFuture(
new PulsarClientException.AlreadyClosedException("Topics Consumer was already closed"));
}
CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>();
String topicPartName = TopicName.get(topicName).getPartitionedTopicName();
List<ConsumerImpl<T>> consumersToClose = consumers.values().stream()
.filter(consumer -> {
String consumerTopicName = consumer.getTopic();
if (TopicName.get(consumerTopicName).getPartitionedTopicName().equals(topicPartName)) {
return true;
} else {
return false;
}
}).collect(Collectors.toList());
List<CompletableFuture<Void>> futureList = consumersToClose.stream()
.map(ConsumerImpl::closeAsync).collect(Collectors.toList());
FutureUtil.waitForAll(futureList)
.whenComplete((r, ex) -> {
if (ex == null) {
consumersToClose.forEach(consumer1 -> {
consumers.remove(consumer1.getTopic());
pausedConsumers.remove(consumer1);
allTopicPartitionsNumber.decrementAndGet();
});
removeTopic(topicName);
((UnAckedTopicMessageTracker) unAckedMessageTracker).removeTopicMessages(topicName);
unsubscribeFuture.complete(null);
log.info("[{}] [{}] [{}] Removed Topics Consumer, allTopicPartitionsNumber: {}",
topicName, subscription, consumerName, allTopicPartitionsNumber);
} else {
unsubscribeFuture.completeExceptionally(ex);
setState(State.Failed);
log.error("[{}] [{}] [{}] Could not remove Topics Consumer",
topicName, subscription, consumerName, ex.getCause());
}
});
return unsubscribeFuture;
}
// get topics name
public List<String> getTopics() {
return topics.keySet().stream().collect(Collectors.toList());
}
// get partitioned topics name
public List<String> getPartitionedTopics() {
return consumers.keySet().stream().collect(Collectors.toList());
}
// get partitioned consumers
public List<ConsumerImpl<T>> getConsumers() {
return consumers.values().stream().collect(Collectors.toList());
}
@Override
public void pause() {
consumers.forEach((name, consumer) -> consumer.pause());
}
@Override
public void resume() {
consumers.forEach((name, consumer) -> consumer.resume());
}
@Override
public long getLastDisconnectedTimestamp() {
long lastDisconnectedTimestamp = 0;
Optional<ConsumerImpl<T>> c = consumers.values().stream().max(Comparator.comparingLong(ConsumerImpl::getLastDisconnectedTimestamp));
if (c.isPresent()) {
lastDisconnectedTimestamp = c.get().getLastDisconnectedTimestamp();
}
return lastDisconnectedTimestamp;
}
// This listener is triggered when topics partitions are updated.
private class TopicsPartitionChangedListener implements PartitionsChangedListener {
// Check partitions changes of passed in topics, and subscribe new added partitions.
@Override
public CompletableFuture<Void> onTopicsExtended(Collection<String> topicsExtended) {
CompletableFuture<Void> future = new CompletableFuture<>();
if (topicsExtended.isEmpty()) {
future.complete(null);
return future;
}
if (log.isDebugEnabled()) {
log.debug("[{}] run onTopicsExtended: {}, size: {}",
topic, topicsExtended.toString(), topicsExtended.size());
}
List<CompletableFuture<Void>> futureList = Lists.newArrayListWithExpectedSize(topicsExtended.size());
topicsExtended.forEach(topic -> futureList.add(subscribeIncreasedTopicPartitions(topic)));
FutureUtil.waitForAll(futureList)
.thenAccept(finalFuture -> future.complete(null))
.exceptionally(ex -> {
log.warn("[{}] Failed to subscribe increased topics partitions: {}", topic, ex.getMessage());
future.completeExceptionally(ex);
return null;
});
return future;
}
}
// subscribe increased partitions for a given topic
private CompletableFuture<Void> subscribeIncreasedTopicPartitions(String topicName) {
CompletableFuture<Void> future = new CompletableFuture<>();
client.getPartitionsForTopic(topicName).thenCompose(list -> {
int oldPartitionNumber = topics.get(topicName);
int currentPartitionNumber = list.size();
if (log.isDebugEnabled()) {
log.debug("[{}] partitions number. old: {}, new: {}",
topicName, oldPartitionNumber, currentPartitionNumber);
}
if (oldPartitionNumber == currentPartitionNumber) {
// topic partition number not changed
future.complete(null);
return future;
} else if (oldPartitionNumber < currentPartitionNumber) {
allTopicPartitionsNumber.compareAndSet(oldPartitionNumber, currentPartitionNumber);
List<String> newPartitions = list.subList(oldPartitionNumber, currentPartitionNumber);
// subscribe new added partitions
List<CompletableFuture<Consumer<T>>> futureList = newPartitions
.stream()
.map(partitionName -> {
int partitionIndex = TopicName.getPartitionIndex(partitionName);
CompletableFuture<Consumer<T>> subFuture = new CompletableFuture<>();
ConsumerConfigurationData<T> configurationData = getInternalConsumerConfig();
ConsumerImpl<T> newConsumer = ConsumerImpl.newConsumerImpl(
client, partitionName, configurationData,
client.externalExecutorProvider().getExecutor(),
partitionIndex, true, subFuture, null, schema, interceptors,
true /* createTopicIfDoesNotExist */);
consumers.putIfAbsent(newConsumer.getTopic(), newConsumer);
if (log.isDebugEnabled()) {
log.debug("[{}] create consumer {} for partitionName: {}",
topicName, newConsumer.getTopic(), partitionName);
}
return subFuture;
})
.collect(Collectors.toList());
// wait for all partitions subscribe future complete, then startReceivingMessages
FutureUtil.waitForAll(futureList)
.thenAccept(finalFuture -> {
List<ConsumerImpl<T>> newConsumerList = newPartitions.stream()
.map(partitionTopic -> consumers.get(partitionTopic))
.collect(Collectors.toList());
startReceivingMessages(newConsumerList);
future.complete(null);
})
.exceptionally(ex -> {
log.warn("[{}] Failed to subscribe {} partition: {} - {} : {}",
topic, topicName, oldPartitionNumber, currentPartitionNumber, ex);
future.completeExceptionally(ex);
return null;
});
} else {
log.error("[{}] not support shrink topic partitions. old: {}, new: {}",
topicName, oldPartitionNumber, currentPartitionNumber);
future.completeExceptionally(new NotSupportedException("not support shrink topic partitions"));
}
return future;
});
return future;
}
private TimerTask partitionsAutoUpdateTimerTask = new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
if (timeout.isCancelled() || getState() != State.Ready) {
return;
}
if (log.isDebugEnabled()) {
log.debug("[{}] run partitionsAutoUpdateTimerTask", topic);
}
// if last auto update not completed yet, do nothing.
if (partitionsAutoUpdateFuture == null || partitionsAutoUpdateFuture.isDone()) {
partitionsAutoUpdateFuture = topicsPartitionChangedListener.onTopicsExtended(topics.keySet());
}
// schedule the next re-check task
partitionsAutoUpdateTimeout = client.timer()
.newTimeout(partitionsAutoUpdateTimerTask, conf.getAutoUpdatePartitionsIntervalSeconds(), TimeUnit.SECONDS);
}
};
@VisibleForTesting
public Timeout getPartitionsAutoUpdateTimeout() {
return partitionsAutoUpdateTimeout;
}
@Override
public CompletableFuture<MessageId> getLastMessageIdAsync() {
CompletableFuture<MessageId> returnFuture = new CompletableFuture<>();
Map<String, CompletableFuture<MessageId>> messageIdFutures = consumers.entrySet().stream()
.map(entry -> Pair.of(entry.getKey(),entry.getValue().getLastMessageIdAsync()))
.collect(Collectors.toMap(Pair::getKey, Pair::getValue));
CompletableFuture
.allOf(messageIdFutures.entrySet().stream().map(Map.Entry::getValue).toArray(CompletableFuture<?>[]::new))
.whenComplete((ignore, ex) -> {
Builder<String, MessageId> builder = ImmutableMap.builder();
messageIdFutures.forEach((key, future) -> {
MessageId messageId;
try {
messageId = future.get();
} catch(Exception e) {
log.warn("[{}] Exception when topic {} getLastMessageId.", key, e);
messageId = MessageId.earliest;
}
builder.put(key, messageId);
});
returnFuture.complete(new MultiMessageIdImpl(builder.build()));
});
return returnFuture;
}
private static final Logger log = LoggerFactory.getLogger(MultiTopicsConsumerImpl.class);
}