blob: fec428824c205a35873652134d81b1d192852d3b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import com.google.common.collect.Queues;
import io.netty.util.Timeout;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import lombok.Getter;
import lombok.Setter;
import org.apache.pulsar.client.api.BatchReceivePolicy;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerBuilder;
import org.apache.pulsar.client.api.ConsumerEventListener;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TopicMessageId;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.client.util.ConsumerName;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.client.util.NoOpLock;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class ConsumerBase<T> extends HandlerState implements Consumer<T> {
protected static final int INITIAL_RECEIVER_QUEUE_SIZE = 1;
protected static final double MEMORY_THRESHOLD_FOR_RECEIVER_QUEUE_SIZE_EXPANSION = 0.75;
protected final String subscription;
protected final ConsumerConfigurationData<T> conf;
protected final String consumerName;
protected final CompletableFuture<Consumer<T>> subscribeFuture;
protected final MessageListener<T> listener;
protected final ConsumerEventListener consumerEventListener;
protected final ExecutorProvider executorProvider;
protected final ExecutorService externalPinnedExecutor;
protected final ExecutorService internalPinnedExecutor;
protected UnAckedMessageTracker unAckedMessageTracker;
final GrowableArrayBlockingQueue<Message<T>> incomingMessages;
protected ConcurrentOpenHashMap<MessageIdAdv, MessageIdImpl[]> unAckedChunkedMessageIdSequenceMap;
protected final ConcurrentLinkedQueue<CompletableFuture<Message<T>>> pendingReceives;
protected final int maxReceiverQueueSize;
private volatile int currentReceiverQueueSize;
protected static final AtomicIntegerFieldUpdater<ConsumerBase> MESSAGE_LISTENER_QUEUE_SIZE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(ConsumerBase.class, "messageListenerQueueSize");
protected volatile int messageListenerQueueSize = 0;
protected static final AtomicIntegerFieldUpdater<ConsumerBase> CURRENT_RECEIVER_QUEUE_SIZE_UPDATER =
AtomicIntegerFieldUpdater.newUpdater(ConsumerBase.class, "currentReceiverQueueSize");
protected final Schema<T> schema;
protected final ConsumerInterceptors<T> interceptors;
protected final BatchReceivePolicy batchReceivePolicy;
protected final ConcurrentLinkedQueue<OpBatchReceive<T>> pendingBatchReceives;
private static final AtomicLongFieldUpdater<ConsumerBase> INCOMING_MESSAGES_SIZE_UPDATER = AtomicLongFieldUpdater
.newUpdater(ConsumerBase.class, "incomingMessagesSize");
protected volatile long incomingMessagesSize = 0;
protected volatile Timeout batchReceiveTimeout = null;
// Only work when subscription type is Failover or Exclusive
protected final Lock incomingQueueLock;
protected static final AtomicLongFieldUpdater<ConsumerBase> CONSUMER_EPOCH =
AtomicLongFieldUpdater.newUpdater(ConsumerBase.class, "consumerEpoch");
private static final String RECONSUME_LATER_ERROR_MSG =
"reconsumeLater method not supported because retryEnabled is set to false. "
+ "You can enable it via ConsumerBuilder.";
@Setter
@Getter
protected volatile long consumerEpoch;
protected final AtomicBoolean scaleReceiverQueueHint = new AtomicBoolean(false);
protected ConsumerBase(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
int receiverQueueSize, ExecutorProvider executorProvider,
CompletableFuture<Consumer<T>> subscribeFuture, Schema<T> schema,
ConsumerInterceptors interceptors) {
super(client, topic);
this.maxReceiverQueueSize = receiverQueueSize;
this.subscription = conf.getSubscriptionName();
this.conf = conf;
this.consumerName = conf.getConsumerName() == null ? ConsumerName.generateRandomName() : conf.getConsumerName();
this.subscribeFuture = subscribeFuture;
this.listener = conf.getMessageListener();
this.consumerEventListener = conf.getConsumerEventListener();
// Always use growable queue since items can exceed the advertised size
this.incomingMessages = new GrowableArrayBlockingQueue<>();
this.unAckedChunkedMessageIdSequenceMap =
ConcurrentOpenHashMap.<MessageIdAdv, MessageIdImpl[]>newBuilder().build();
this.executorProvider = executorProvider;
this.externalPinnedExecutor = executorProvider.getExecutor();
this.internalPinnedExecutor = client.getInternalExecutorService();
this.pendingReceives = Queues.newConcurrentLinkedQueue();
this.pendingBatchReceives = Queues.newConcurrentLinkedQueue();
this.schema = schema;
this.interceptors = interceptors;
if (conf.getBatchReceivePolicy() != null) {
BatchReceivePolicy userBatchReceivePolicy = conf.getBatchReceivePolicy();
if (userBatchReceivePolicy.getMaxNumMessages() > this.maxReceiverQueueSize) {
this.batchReceivePolicy = BatchReceivePolicy.builder()
.maxNumMessages(this.maxReceiverQueueSize)
.maxNumBytes(userBatchReceivePolicy.getMaxNumBytes())
.messagesFromMultiTopicsEnabled(userBatchReceivePolicy.isMessagesFromMultiTopicsEnabled())
.timeout((int) userBatchReceivePolicy.getTimeoutMs(), TimeUnit.MILLISECONDS)
.build();
log.warn("BatchReceivePolicy maxNumMessages: {} is greater than maxReceiverQueueSize: {}, "
+ "reset to maxReceiverQueueSize. batchReceivePolicy: {}",
userBatchReceivePolicy.getMaxNumMessages(), this.maxReceiverQueueSize,
this.batchReceivePolicy.toString());
} else if (userBatchReceivePolicy.getMaxNumMessages() <= 0
&& userBatchReceivePolicy.getMaxNumBytes() <= 0) {
this.batchReceivePolicy = BatchReceivePolicy.builder()
.maxNumMessages(BatchReceivePolicy.DEFAULT_POLICY.getMaxNumMessages())
.maxNumBytes(BatchReceivePolicy.DEFAULT_POLICY.getMaxNumBytes())
.messagesFromMultiTopicsEnabled(userBatchReceivePolicy.isMessagesFromMultiTopicsEnabled())
.timeout((int) userBatchReceivePolicy.getTimeoutMs(), TimeUnit.MILLISECONDS)
.build();
log.warn("BatchReceivePolicy maxNumMessages: {} or maxNumBytes: {} is less than 0. "
+ "Reset to DEFAULT_POLICY. batchReceivePolicy: {}",
userBatchReceivePolicy.getMaxNumMessages(), userBatchReceivePolicy.getMaxNumBytes(),
this.batchReceivePolicy.toString());
} else {
this.batchReceivePolicy = conf.getBatchReceivePolicy();
}
} else {
this.batchReceivePolicy = BatchReceivePolicy.DEFAULT_POLICY;
}
if (getSubType() == CommandSubscribe.SubType.Failover || getSubType() == CommandSubscribe.SubType.Exclusive) {
incomingQueueLock = new ReentrantLock();
} else {
incomingQueueLock = new NoOpLock();
}
if (conf.getAckTimeoutMillis() != 0) {
if (conf.getAckTimeoutRedeliveryBackoff() != null) {
this.unAckedMessageTracker = new UnAckedTopicMessageRedeliveryTracker(client, this, conf);
} else {
this.unAckedMessageTracker = new UnAckedTopicMessageTracker(client, this, conf);
}
} else {
this.unAckedMessageTracker = UnAckedMessageTracker.UNACKED_MESSAGE_TRACKER_DISABLED;
}
initReceiverQueueSize();
}
protected UnAckedMessageTracker getUnAckedMessageTracker() {
return unAckedMessageTracker;
}
protected void triggerBatchReceiveTimeoutTask() {
if (!hasBatchReceiveTimeout() && batchReceivePolicy.getTimeoutMs() > 0) {
batchReceiveTimeout = client.timer().newTimeout(this::pendingBatchReceiveTask,
batchReceivePolicy.getTimeoutMs(), TimeUnit.MILLISECONDS);
}
}
public void initReceiverQueueSize() {
if (conf.isAutoScaledReceiverQueueSizeEnabled()) {
CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, minReceiverQueueSize());
} else {
CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.set(this, maxReceiverQueueSize);
}
}
public abstract int minReceiverQueueSize();
protected void expectMoreIncomingMessages() {
if (!conf.isAutoScaledReceiverQueueSizeEnabled()) {
return;
}
double usage = getMemoryLimitController().map(MemoryLimitController::currentUsagePercent).orElse(0d);
if (usage < MEMORY_THRESHOLD_FOR_RECEIVER_QUEUE_SIZE_EXPANSION
&& scaleReceiverQueueHint.compareAndSet(true, false)) {
int oldSize = getCurrentReceiverQueueSize();
int newSize = Math.min(maxReceiverQueueSize, oldSize * 2);
setCurrentReceiverQueueSize(newSize);
}
}
// if lister is not null, we will track unAcked msg in callMessageListener
protected void trackUnAckedMsgIfNoListener(MessageId messageId, int redeliveryCount) {
if (listener == null) {
unAckedMessageTracker.add(messageId, redeliveryCount);
}
}
protected void reduceCurrentReceiverQueueSize() {
if (!conf.isAutoScaledReceiverQueueSizeEnabled()) {
return;
}
int oldSize = getCurrentReceiverQueueSize();
int newSize = Math.max(minReceiverQueueSize(), oldSize / 2);
if (oldSize > newSize) {
setCurrentReceiverQueueSize(newSize);
}
}
@Override
public Message<T> receive() throws PulsarClientException {
if (listener != null) {
throw new PulsarClientException.InvalidConfigurationException(
"Cannot use receive() when a listener has been set");
}
verifyConsumerState();
return internalReceive();
}
@Override
public CompletableFuture<Message<T>> receiveAsync() {
if (listener != null) {
return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException(
"Cannot use receive() when a listener has been set"));
}
try {
verifyConsumerState();
} catch (PulsarClientException e) {
return FutureUtil.failedFuture(e);
}
return internalReceiveAsync();
}
protected abstract Message<T> internalReceive() throws PulsarClientException;
protected abstract CompletableFuture<Message<T>> internalReceiveAsync();
@Override
public Message<T> receive(int timeout, TimeUnit unit) throws PulsarClientException {
if (getCurrentReceiverQueueSize() == 0) {
throw new PulsarClientException.InvalidConfigurationException(
"Can't use receive with timeout, if the queue size is 0");
}
if (listener != null) {
throw new PulsarClientException.InvalidConfigurationException(
"Cannot use receive() when a listener has been set");
}
verifyConsumerState();
return internalReceive(timeout, unit);
}
protected abstract Message<T> internalReceive(long timeout, TimeUnit unit) throws PulsarClientException;
@Override
public Messages<T> batchReceive() throws PulsarClientException {
verifyBatchReceive();
verifyConsumerState();
return internalBatchReceive();
}
@Override
public CompletableFuture<Messages<T>> batchReceiveAsync() {
try {
verifyBatchReceive();
verifyConsumerState();
return internalBatchReceiveAsync();
} catch (PulsarClientException e) {
return FutureUtil.failedFuture(e);
}
}
protected boolean hasNextPendingReceive() {
return !pendingReceives.isEmpty();
}
protected CompletableFuture<Message<T>> nextPendingReceive() {
CompletableFuture<Message<T>> receivedFuture;
do {
receivedFuture = pendingReceives.poll();
// skip done futures (cancelling a future could mark it done)
} while (receivedFuture != null && receivedFuture.isDone());
return receivedFuture;
}
protected void completePendingReceive(CompletableFuture<Message<T>> receivedFuture, Message<T> message) {
getInternalExecutor(message).execute(() -> {
if (!receivedFuture.complete(message)) {
log.warn("Race condition detected. receive future was already completed (cancelled={}) and message was "
+ "dropped. message={}",
receivedFuture.isCancelled(), message);
}
});
}
protected CompletableFuture<Void> failPendingReceive() {
if (internalPinnedExecutor.isShutdown()) {
// we need to fail any pending receives no matter what,
// to avoid blocking user code
failPendingReceives();
failPendingBatchReceives();
return CompletableFuture.completedFuture(null);
} else {
CompletableFuture<Void> future = new CompletableFuture<>();
internalPinnedExecutor.execute(() -> {
try {
failPendingReceives();
failPendingBatchReceives();
} finally {
future.complete(null);
}
});
return future;
}
}
private void failPendingReceives() {
while (!pendingReceives.isEmpty()) {
CompletableFuture<Message<T>> receiveFuture = pendingReceives.poll();
if (receiveFuture == null) {
break;
}
if (!receiveFuture.isDone()) {
receiveFuture.completeExceptionally(
new PulsarClientException.AlreadyClosedException(
String.format("The consumer which subscribes the topic %s with subscription name %s "
+ "was already closed when cleaning and closing the consumers",
topic, subscription)));
}
}
}
private void failPendingBatchReceives() {
while (hasNextBatchReceive()) {
OpBatchReceive<T> opBatchReceive = nextBatchReceive();
if (opBatchReceive == null || opBatchReceive.future == null) {
break;
}
if (!opBatchReceive.future.isDone()) {
opBatchReceive.future.completeExceptionally(
new PulsarClientException.AlreadyClosedException(
String.format("The consumer which subscribes the topic %s with subscription name %s was"
+ " already closed when cleaning and closing the consumers",
topic, subscription)));
}
}
}
protected abstract Messages<T> internalBatchReceive() throws PulsarClientException;
protected abstract CompletableFuture<Messages<T>> internalBatchReceiveAsync();
private static void validateMessageId(Message<?> message) throws PulsarClientException {
if (message == null) {
throw new PulsarClientException.InvalidMessageException("Non-null message is required");
}
if (message.getMessageId() == null) {
throw new PulsarClientException.InvalidMessageException("Cannot handle message with null messageId");
}
}
private static void validateMessageId(MessageId messageId) throws PulsarClientException {
if (messageId == null) {
throw new PulsarClientException.InvalidMessageException("Cannot handle message with null messageId");
}
}
@Override
public void acknowledge(Message<?> message) throws PulsarClientException {
validateMessageId(message);
acknowledge(message.getMessageId());
}
@Override
public void acknowledge(MessageId messageId) throws PulsarClientException {
validateMessageId(messageId);
try {
acknowledgeAsync(messageId).get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw PulsarClientException.unwrap(e);
} catch (ExecutionException e) {
throw PulsarClientException.unwrap(e);
}
}
@Override
public void acknowledge(List<MessageId> messageIdList) throws PulsarClientException {
try {
acknowledgeAsync(messageIdList).get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw PulsarClientException.unwrap(e);
} catch (ExecutionException e) {
throw PulsarClientException.unwrap(e);
}
}
@Override
public void acknowledge(Messages<?> messages) throws PulsarClientException {
try {
acknowledgeAsync(messages).get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw PulsarClientException.unwrap(e);
} catch (ExecutionException e) {
throw PulsarClientException.unwrap(e);
}
}
@Override
public void reconsumeLater(Message<?> message, long delayTime, TimeUnit unit) throws PulsarClientException {
reconsumeLater(message, null, delayTime, unit);
}
@Override
public void reconsumeLater(Message<?> message, Map<String, String> customProperties, long delayTime, TimeUnit unit)
throws PulsarClientException {
if (!conf.isRetryEnable()) {
throw new PulsarClientException(RECONSUME_LATER_ERROR_MSG);
}
try {
reconsumeLaterAsync(message, customProperties, delayTime, unit).get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw PulsarClientException.unwrap(e);
} catch (ExecutionException e) {
throw PulsarClientException.unwrap(e);
}
}
@Override
public void reconsumeLater(Messages<?> messages, long delayTime, TimeUnit unit) throws PulsarClientException {
try {
reconsumeLaterAsync(messages, delayTime, unit).get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw PulsarClientException.unwrap(e);
} catch (ExecutionException e) {
throw PulsarClientException.unwrap(e);
}
}
@Override
public void acknowledgeCumulative(Message<?> message) throws PulsarClientException {
validateMessageId(message);
acknowledgeCumulative(message.getMessageId());
}
@Override
public void acknowledgeCumulative(MessageId messageId) throws PulsarClientException {
validateMessageId(messageId);
try {
acknowledgeCumulativeAsync(messageId).get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw PulsarClientException.unwrap(e);
} catch (ExecutionException e) {
throw PulsarClientException.unwrap(e);
}
}
@Override
public void reconsumeLaterCumulative(Message<?> message, long delayTime, TimeUnit unit)
throws PulsarClientException {
try {
reconsumeLaterCumulativeAsync(message, delayTime, unit).get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw PulsarClientException.unwrap(e);
} catch (ExecutionException e) {
throw PulsarClientException.unwrap(e);
}
}
@Override
public CompletableFuture<Void> acknowledgeAsync(Message<?> message) {
try {
validateMessageId(message);
} catch (PulsarClientException e) {
return FutureUtil.failedFuture(e);
}
return acknowledgeAsync(message.getMessageId());
}
@Override
public CompletableFuture<Void> acknowledgeAsync(Messages<?> messages) {
return acknowledgeAsync(messages, null);
}
@Override
public CompletableFuture<Void> acknowledgeAsync(Messages<?> messages, Transaction txn) {
List<MessageId> messageIds = new ArrayList<>(messages.size());
for (Message<?> message: messages) {
try {
validateMessageId(message);
} catch (PulsarClientException e) {
return FutureUtil.failedFuture(e);
}
messageIds.add(message.getMessageId());
}
if (txn != null) {
return acknowledgeAsync(messageIds, txn);
} else {
return acknowledgeAsync(messageIds);
}
}
@Override
public CompletableFuture<Void> acknowledgeAsync(List<MessageId> messageIdList) {
return doAcknowledgeWithTxn(messageIdList, AckType.Individual, Collections.emptyMap(), null);
}
@Override
public CompletableFuture<Void> acknowledgeAsync(List<MessageId> messageIdList, Transaction txn) {
return doAcknowledgeWithTxn(messageIdList, AckType.Individual, Collections.emptyMap(), (TransactionImpl) txn);
}
@Override
public CompletableFuture<Void> reconsumeLaterAsync(Message<?> message, long delayTime, TimeUnit unit) {
return reconsumeLaterAsync(message, null, delayTime, unit);
}
@Override
public CompletableFuture<Void> reconsumeLaterAsync(
Message<?> message, Map<String, String> customProperties, long delayTime, TimeUnit unit) {
if (!conf.isRetryEnable()) {
return FutureUtil.failedFuture(new PulsarClientException(RECONSUME_LATER_ERROR_MSG));
}
try {
validateMessageId(message);
} catch (PulsarClientException e) {
return FutureUtil.failedFuture(e);
}
return doReconsumeLater(message, AckType.Individual, customProperties, delayTime, unit);
}
@Override
public CompletableFuture<Void> reconsumeLaterAsync(Messages<?> messages, long delayTime, TimeUnit unit) {
for (Message<?> message: messages) {
try {
validateMessageId(message);
} catch (PulsarClientException e) {
return FutureUtil.failedFuture(e);
}
}
messages.forEach(message -> reconsumeLaterAsync(message, delayTime, unit));
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<Void> acknowledgeCumulativeAsync(Message<?> message) {
try {
validateMessageId(message);
} catch (PulsarClientException e) {
return FutureUtil.failedFuture(e);
}
return acknowledgeCumulativeAsync(message.getMessageId());
}
@Override
public CompletableFuture<Void> reconsumeLaterCumulativeAsync(Message<?> message, long delayTime, TimeUnit unit) {
return reconsumeLaterCumulativeAsync(message, null, delayTime, unit);
}
@Override
public CompletableFuture<Void> reconsumeLaterCumulativeAsync(
Message<?> message, Map<String, String> customProperties, long delayTime, TimeUnit unit) {
if (!conf.isRetryEnable()) {
return FutureUtil.failedFuture(new PulsarClientException(RECONSUME_LATER_ERROR_MSG));
}
if (!isCumulativeAcknowledgementAllowed(conf.getSubscriptionType())) {
return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException(
"Cannot use cumulative acks on a non-exclusive subscription"));
}
return doReconsumeLater(message, AckType.Cumulative, customProperties, delayTime, unit);
}
@Override
public CompletableFuture<Void> acknowledgeAsync(MessageId messageId) {
return acknowledgeAsync(messageId, null);
}
@Override
public CompletableFuture<Void> acknowledgeAsync(MessageId messageId,
Transaction txn) {
TransactionImpl txnImpl = null;
if (null != txn) {
checkArgument(txn instanceof TransactionImpl);
txnImpl = (TransactionImpl) txn;
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
if (!txnImpl.checkIfOpen(completableFuture)) {
return completableFuture;
}
}
return doAcknowledgeWithTxn(messageId, AckType.Individual, Collections.emptyMap(), txnImpl);
}
@Override
public CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId) {
return acknowledgeCumulativeAsync(messageId, null);
}
@Override
public CompletableFuture<Void> acknowledgeCumulativeAsync(MessageId messageId, Transaction txn) {
if (!isCumulativeAcknowledgementAllowed(conf.getSubscriptionType())) {
return FutureUtil.failedFuture(new PulsarClientException.InvalidConfigurationException(
"Cannot use cumulative acks on a non-exclusive/non-failover subscription"));
}
TransactionImpl txnImpl = null;
if (null != txn) {
checkArgument(txn instanceof TransactionImpl);
txnImpl = (TransactionImpl) txn;
}
return doAcknowledgeWithTxn(messageId, AckType.Cumulative, Collections.emptyMap(), txnImpl);
}
@Override
public void negativeAcknowledge(Message<?> message) {
negativeAcknowledge(message.getMessageId());
}
protected CompletableFuture<Void> doAcknowledgeWithTxn(List<MessageId> messageIdList, AckType ackType,
Map<String, Long> properties,
TransactionImpl txn) {
CompletableFuture<Void> ackFuture;
if (txn != null && this instanceof ConsumerImpl) {
ackFuture = txn.registerAckedTopic(getTopic(), subscription)
.thenCompose(ignored -> doAcknowledge(messageIdList, ackType, properties, txn));
// register the ackFuture as part of the transaction
txn.registerAckOp(ackFuture);
} else {
ackFuture = doAcknowledge(messageIdList, ackType, properties, txn);
}
return ackFuture;
}
protected CompletableFuture<Void> doAcknowledgeWithTxn(MessageId messageId, AckType ackType,
Map<String, Long> properties,
TransactionImpl txn) {
CompletableFuture<Void> ackFuture;
if (txn != null && (this instanceof ConsumerImpl)) {
ackFuture = txn.registerAckedTopic(getTopic(), subscription)
.thenCompose(ignored -> doAcknowledge(messageId, ackType, properties, txn));
// register the ackFuture as part of the transaction
txn.registerAckOp(ackFuture);
return ackFuture;
} else {
ackFuture = doAcknowledge(messageId, ackType, properties, txn);
}
return ackFuture;
}
protected abstract CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ackType,
Map<String, Long> properties,
TransactionImpl txn);
protected abstract CompletableFuture<Void> doAcknowledge(List<MessageId> messageIdList, AckType ackType,
Map<String, Long> properties,
TransactionImpl txn);
protected abstract CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType ackType,
Map<String, String> customProperties,
long delayTime,
TimeUnit unit);
@Override
public void negativeAcknowledge(Messages<?> messages) {
messages.forEach(this::negativeAcknowledge);
}
@Override
public void unsubscribe() throws PulsarClientException {
try {
unsubscribeAsync().get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw PulsarClientException.unwrap(e);
} catch (ExecutionException e) {
throw PulsarClientException.unwrap(e);
}
}
@Override
public abstract CompletableFuture<Void> unsubscribeAsync();
@Override
public void close() throws PulsarClientException {
try {
closeAsync().get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw PulsarClientException.unwrap(e);
} catch (ExecutionException e) {
throw PulsarClientException.unwrap(e);
}
}
@Override
public abstract CompletableFuture<Void> closeAsync();
@Deprecated
@Override
public MessageId getLastMessageId() throws PulsarClientException {
try {
return getLastMessageIdAsync().get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw PulsarClientException.unwrap(e);
} catch (ExecutionException e) {
throw PulsarClientException.unwrap(e);
}
}
@Deprecated
@Override
public abstract CompletableFuture<MessageId> getLastMessageIdAsync();
@Override
public List<TopicMessageId> getLastMessageIds() throws PulsarClientException {
try {
return getLastMessageIdsAsync().get();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw PulsarClientException.unwrap(e);
} catch (ExecutionException e) {
throw PulsarClientException.unwrap(e);
}
}
private boolean isCumulativeAcknowledgementAllowed(SubscriptionType type) {
return SubscriptionType.Shared != type && SubscriptionType.Key_Shared != type;
}
protected SubType getSubType() {
SubscriptionType type = conf.getSubscriptionType();
switch (type) {
case Exclusive:
return SubType.Exclusive;
case Shared:
return SubType.Shared;
case Failover:
return SubType.Failover;
case Key_Shared:
return SubType.Key_Shared;
}
// Should not happen since we cover all cases above
return null;
}
public abstract int getAvailablePermits();
public abstract int numMessagesInQueue();
public CompletableFuture<Consumer<T>> subscribeFuture() {
return subscribeFuture;
}
@Override
public String getTopic() {
return topic;
}
@Override
public String getSubscription() {
return subscription;
}
@Override
public String getConsumerName() {
return this.consumerName;
}
/**
* Redelivers the given unacknowledged messages. In Failover mode, the request is ignored if the consumer is not
* active for the given topic. In Shared mode, the consumers messages to be redelivered are distributed across all
* the connected consumers. This is a non blocking call and doesn't throw an exception. In case the connection
* breaks, the messages are redelivered after reconnect.
*/
protected abstract void redeliverUnacknowledgedMessages(Set<MessageId> messageIds);
@Override
public String toString() {
return "ConsumerBase{"
+ "subscription='" + subscription + '\''
+ ", consumerName='" + consumerName + '\''
+ ", topic='" + topic + '\''
+ '}';
}
protected Message<T> beforeConsume(Message<T> message) {
if (interceptors != null) {
return interceptors.beforeConsume(this, message);
} else {
return message;
}
}
protected void onAcknowledge(MessageId messageId, Throwable exception) {
if (interceptors != null) {
interceptors.onAcknowledge(this, messageId, exception);
}
}
protected void onAcknowledge(List<MessageId> messageIds, Throwable exception) {
if (interceptors != null) {
messageIds.forEach(messageId -> interceptors.onAcknowledge(this, messageId, exception));
}
}
protected void onAcknowledgeCumulative(MessageId messageId, Throwable exception) {
if (interceptors != null) {
interceptors.onAcknowledgeCumulative(this, messageId, exception);
}
}
protected void onAcknowledgeCumulative(List<MessageId> messageIds, Throwable exception) {
if (interceptors != null) {
messageIds.forEach(messageId -> interceptors.onAcknowledgeCumulative(this, messageId, exception));
}
}
protected void onNegativeAcksSend(Set<MessageId> messageIds) {
if (interceptors != null) {
interceptors.onNegativeAcksSend(this, messageIds);
}
}
protected void onAckTimeoutSend(Set<MessageId> messageIds) {
if (interceptors != null) {
interceptors.onAckTimeoutSend(this, messageIds);
}
}
protected void onPartitionsChange(String topicName, int partitions) {
if (interceptors != null) {
interceptors.onPartitionsChange(topicName, partitions);
}
}
protected boolean canEnqueueMessage(Message<T> message) {
// Default behavior, can be overridden in subclasses
return true;
}
protected boolean enqueueMessageAndCheckBatchReceive(Message<T> message) {
int messageSize = message.size();
// synchronize redeliverUnacknowledgedMessages().
incomingQueueLock.lock();
try {
if (canEnqueueMessage(message) && incomingMessages.offer(message)) {
// After we have enqueued the messages on `incomingMessages` queue, we cannot touch the message
// instance anymore, since for pooled messages, this instance was possibly already been released
// and recycled.
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, messageSize);
getMemoryLimitController().ifPresent(limiter -> limiter.forceReserveMemory(messageSize));
updateAutoScaleReceiverQueueHint();
}
} finally {
incomingQueueLock.unlock();
}
return hasEnoughMessagesForBatchReceive();
}
protected abstract void updateAutoScaleReceiverQueueHint();
protected boolean hasEnoughMessagesForBatchReceive() {
if (batchReceivePolicy.getMaxNumMessages() <= 0 && batchReceivePolicy.getMaxNumBytes() <= 0) {
return false;
}
return (batchReceivePolicy.getMaxNumMessages() > 0
&& incomingMessages.size() >= batchReceivePolicy.getMaxNumMessages())
|| (batchReceivePolicy.getMaxNumBytes() > 0
&& getIncomingMessageSize() >= batchReceivePolicy.getMaxNumBytes());
}
private void verifyConsumerState() throws PulsarClientException {
switch (getState()) {
case Ready:
case Connecting:
break; // Ok
case Closing:
case Closed:
throw new PulsarClientException.AlreadyClosedException("Consumer already closed");
case Terminated:
throw new PulsarClientException.AlreadyClosedException("Topic was terminated");
case Failed:
case Uninitialized:
throw new PulsarClientException.NotConnectedException();
default:
break;
}
}
private void verifyBatchReceive() throws PulsarClientException {
if (listener != null) {
throw new PulsarClientException.InvalidConfigurationException(
"Cannot use receive() when a listener has been set");
}
if (getCurrentReceiverQueueSize() == 0) {
throw new PulsarClientException.InvalidConfigurationException(
"Can't use batch receive, if the queue size is 0");
}
}
protected static final class OpBatchReceive<T> {
final CompletableFuture<Messages<T>> future;
final long createdAt;
private OpBatchReceive(CompletableFuture<Messages<T>> future) {
this.future = future;
this.createdAt = System.nanoTime();
}
static <T> OpBatchReceive<T> of(CompletableFuture<Messages<T>> future) {
return new OpBatchReceive<>(future);
}
}
protected void notifyPendingBatchReceivedCallBack() {
OpBatchReceive<T> opBatchReceive = nextBatchReceive();
if (opBatchReceive == null) {
return;
}
notifyPendingBatchReceivedCallBack(opBatchReceive.future);
}
private boolean hasNextBatchReceive() {
return !pendingBatchReceives.isEmpty();
}
private OpBatchReceive<T> nextBatchReceive() {
OpBatchReceive<T> opBatchReceive = null;
while (opBatchReceive == null) {
opBatchReceive = pendingBatchReceives.poll();
// no entry available
if (opBatchReceive == null) {
return null;
}
// skip entries where future is null or has been completed (cancel / timeout)
if (opBatchReceive.future == null || opBatchReceive.future.isDone()) {
opBatchReceive = null;
}
}
return opBatchReceive;
}
protected final void notifyPendingBatchReceivedCallBack(CompletableFuture<Messages<T>> batchReceiveFuture) {
MessagesImpl<T> messages = getNewMessagesImpl();
Message<T> msgPeeked = incomingMessages.peek();
String topicName = null;
while (msgPeeked != null && messages.canAdd(msgPeeked)) {
// one batch receive request only can receive the same topic partition
// messages to ensure cumulative ack is not lost.
if (!this.batchReceivePolicy.isMessagesFromMultiTopicsEnabled()) {
// get the first message's `topicName` to check if
// the following message peeked is the same topic message.
if (messages.size() == 1) {
topicName = messages.getMessageList().get(0).getTopicName();
}
// if the peeked message is not the same topic as the first message, return the batch receive result
if (topicName != null && !topicName.equals(msgPeeked.getTopicName())) {
break;
}
}
Message<T> msg = incomingMessages.poll();
if (msg != null) {
messageProcessed(msg);
Message<T> interceptMsg = beforeConsume(msg);
messages.add(interceptMsg);
}
msgPeeked = incomingMessages.peek();
}
completePendingBatchReceive(batchReceiveFuture, messages);
}
protected void completePendingBatchReceive(CompletableFuture<Messages<T>> future, Messages<T> messages) {
if (!future.complete(messages)) {
log.warn("Race condition detected. batch receive future was already completed (cancelled={}) and messages"
+ " were dropped. messages={}",
future.isCancelled(), messages);
}
}
protected abstract void messageProcessed(Message<?> msg);
private void pendingBatchReceiveTask(Timeout timeout) {
internalPinnedExecutor.execute(() -> doPendingBatchReceiveTask(timeout));
}
private void doPendingBatchReceiveTask(Timeout timeout) {
if (timeout.isCancelled()) {
return;
}
long timeToWaitMs;
boolean hasPendingReceives = false;
synchronized (this) {
// If it's closing/closed we need to ignore this timeout and not schedule next timeout.
if (getState() == State.Closing || getState() == State.Closed) {
return;
}
timeToWaitMs = batchReceivePolicy.getTimeoutMs();
OpBatchReceive<T> opBatchReceive = pendingBatchReceives.peek();
while (opBatchReceive != null) {
// If there is at least one batch receive, calculate the diff between the batch receive timeout
// and the elapsed time since the operation was created.
long diff = batchReceivePolicy.getTimeoutMs()
- TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - opBatchReceive.createdAt);
if (diff <= 0) {
completeOpBatchReceive(opBatchReceive);
// remove the peeked item from the queue
OpBatchReceive<T> removed = pendingBatchReceives.poll();
if (removed != opBatchReceive) {
// regression check, if this were to happen due to incorrect code changes in the future,
// (allowing multi-threaded calls to poll()), then ensure that the polled item is completed
// to avoid blocking user code
log.error("Race condition in consumer {} (should not cause data loss). "
+ " Concurrent operations on pendingBatchReceives is not safe", this.consumerName);
if (removed != null && !removed.future.isDone()) {
completeOpBatchReceive(removed);
}
}
} else {
// The diff is greater than zero, set the timeout to the diff value
timeToWaitMs = diff;
hasPendingReceives = true;
break;
}
opBatchReceive = pendingBatchReceives.peek();
}
if (hasPendingReceives) {
batchReceiveTimeout = client.timer().newTimeout(this::pendingBatchReceiveTask,
timeToWaitMs, TimeUnit.MILLISECONDS);
} else {
batchReceiveTimeout = null;
}
}
}
protected void tryTriggerListener() {
if (listener != null) {
triggerListener();
}
}
private void triggerListener() {
// The messages are added into the receiver queue by the internal pinned executor,
// so need to use internal pinned executor to avoid race condition which message
// might be added into the receiver queue but not able to read here.
internalPinnedExecutor.execute(() -> {
try {
Message<T> msg;
do {
msg = internalReceive(0, TimeUnit.MILLISECONDS);
if (msg != null) {
// Trigger the notification on the message listener in a separate thread to avoid blocking the
// internal pinned executor thread while the message processing happens
final Message<T> finalMsg = msg;
MESSAGE_LISTENER_QUEUE_SIZE_UPDATER.incrementAndGet(this);
if (SubscriptionType.Key_Shared == conf.getSubscriptionType()) {
executorProvider.getExecutor(peekMessageKey(msg)).execute(() ->
callMessageListener(finalMsg));
} else {
getExternalExecutor(msg).execute(() -> {
callMessageListener(finalMsg);
});
}
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Message has been cleared from the queue", topic, subscription);
}
}
} while (msg != null);
} catch (PulsarClientException e) {
log.warn("[{}] [{}] Failed to dequeue the message for listener", topic, subscription, e);
}
});
}
protected void callMessageListener(Message<T> msg) {
try {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Calling message listener for message {}", topic, subscription,
msg.getMessageId());
}
ConsumerImpl receivedConsumer = (msg instanceof TopicMessageImpl)
? ((TopicMessageImpl<T>) msg).receivedByconsumer : (ConsumerImpl) this;
// Increase the permits here since we will not increase permits while receive messages from consumer
// after enabled message listener.
receivedConsumer.increaseAvailablePermits((MessageImpl<?>) (msg instanceof TopicMessageImpl
? ((TopicMessageImpl<T>) msg).getMessage() : msg));
MessageId id;
if (this instanceof ConsumerImpl) {
id = MessageIdAdvUtils.discardBatch(msg.getMessageId());
} else {
id = msg.getMessageId();
}
unAckedMessageTracker.add(id, msg.getRedeliveryCount());
listener.received(ConsumerBase.this, msg);
} catch (Throwable t) {
log.error("[{}][{}] Message listener error in processing message: {}", topic, subscription,
msg.getMessageId(), t);
} finally {
MESSAGE_LISTENER_QUEUE_SIZE_UPDATER.decrementAndGet(this);
}
}
static final byte[] NONE_KEY = "NONE_KEY".getBytes(StandardCharsets.UTF_8);
protected byte[] peekMessageKey(Message<T> msg) {
byte[] key = NONE_KEY;
if (msg.hasKey()) {
key = msg.getKeyBytes();
}
if (msg.hasOrderingKey()) {
key = msg.getOrderingKey();
}
return key;
}
protected MessagesImpl<T> getNewMessagesImpl() {
return new MessagesImpl<>(batchReceivePolicy.getMaxNumMessages(),
batchReceivePolicy.getMaxNumBytes());
}
protected boolean hasPendingBatchReceive() {
return pendingBatchReceives != null && hasNextBatchReceive();
}
Optional<MemoryLimitController> getMemoryLimitController() {
if (!conf.isAutoScaledReceiverQueueSizeEnabled()) {
//disable memory limit.
return Optional.empty();
} else {
return Optional.of(client.getMemoryLimitController());
}
}
protected void resetIncomingMessageSize() {
long oldSize = INCOMING_MESSAGES_SIZE_UPDATER.getAndSet(this, 0);
getMemoryLimitController().ifPresent(limiter -> limiter.releaseMemory(oldSize));
}
protected void decreaseIncomingMessageSize(final Message<?> message) {
INCOMING_MESSAGES_SIZE_UPDATER.addAndGet(this, -message.size());
getMemoryLimitController().ifPresent(limiter -> limiter.releaseMemory(message.size()));
}
public long getIncomingMessageSize() {
return INCOMING_MESSAGES_SIZE_UPDATER.get(this);
}
public int getTotalIncomingMessages() {
return incomingMessages.size();
}
protected void clearIncomingMessages() {
// release messages if they are pooled messages
incomingMessages.forEach(Message::release);
incomingMessages.clear();
resetIncomingMessageSize();
}
/**
* Update the size of the consumer receive queue.
* See {@link ConsumerBuilder#receiverQueueSize(int)}.
* @param newSize new size of the receiver queue.
*/
protected abstract void setCurrentReceiverQueueSize(int newSize);
public int getCurrentReceiverQueueSize() {
return CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.get(this);
}
protected abstract void completeOpBatchReceive(OpBatchReceive<T> op);
private ExecutorService getExternalExecutor(Message<T> msg) {
ConsumerImpl receivedConsumer = (msg instanceof TopicMessageImpl) ? ((TopicMessageImpl) msg).receivedByconsumer
: null;
ExecutorService executor = receivedConsumer != null && receivedConsumer.externalPinnedExecutor != null
? receivedConsumer.externalPinnedExecutor
: externalPinnedExecutor;
return executor;
}
private ExecutorService getInternalExecutor(Message<T> msg) {
ConsumerImpl receivedConsumer = (msg instanceof TopicMessageImpl) ? ((TopicMessageImpl) msg).receivedByconsumer
: null;
ExecutorService executor = receivedConsumer != null && receivedConsumer.internalPinnedExecutor != null
? receivedConsumer.internalPinnedExecutor
: internalPinnedExecutor;
return executor;
}
// If message consumer epoch is smaller than consumer epoch present that
// it has been sent to the client before the user calls redeliverUnacknowledgedMessages, this message is invalid.
// so we should release this message and receive again
protected boolean isValidConsumerEpoch(MessageImpl<T> message) {
if ((getSubType() == CommandSubscribe.SubType.Failover
|| getSubType() == CommandSubscribe.SubType.Exclusive)
&& message.getConsumerEpoch() != DEFAULT_CONSUMER_EPOCH
&& message.getConsumerEpoch() < CONSUMER_EPOCH.get(this)) {
log.info("Consumer filter old epoch message, topic : [{}], messageId : [{}], messageConsumerEpoch : [{}], "
+ "consumerEpoch : [{}]", topic, message.getMessageId(), message.getConsumerEpoch(), consumerEpoch);
message.release();
message.recycle();
return false;
}
return true;
}
public boolean hasBatchReceiveTimeout() {
return batchReceiveTimeout != null;
}
private static final Logger log = LoggerFactory.getLogger(ConsumerBase.class);
}