blob: a929fe9aa6bb234be5418251195c727e8e2e2276 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import static org.apache.pulsar.common.protocol.Commands.hasChecksum;
import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ComparisonChain;
import com.google.common.collect.Iterables;
import com.scurrilous.circe.checksum.Crc32cIntChecksum;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.Recycler;
import io.netty.util.Recycler.Handle;
import io.netty.util.ReferenceCountUtil;
import io.netty.util.Timeout;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import java.util.stream.Collectors;
import lombok.AccessLevel;
import lombok.Getter;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.ConsumerCryptoFailureAction;
import org.apache.pulsar.client.api.DeadLetterPolicy;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageCrypto;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.Messages;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.PulsarClientException.TopicDoesNotExistException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.client.api.SubscriptionMode;
import org.apache.pulsar.client.api.SubscriptionType;
import org.apache.pulsar.client.api.TopicMessageId;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.impl.crypto.MessageCryptoBc;
import org.apache.pulsar.client.impl.schema.AutoConsumeSchema;
import org.apache.pulsar.client.impl.transaction.TransactionImpl;
import org.apache.pulsar.client.util.ExecutorProvider;
import org.apache.pulsar.client.util.RetryMessageUtil;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.EncryptionContext;
import org.apache.pulsar.common.api.EncryptionContext.EncryptionKey;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.api.proto.CommandAck.ValidationError;
import org.apache.pulsar.common.api.proto.CommandMessage;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CompressionType;
import org.apache.pulsar.common.api.proto.EncryptionKeys;
import org.apache.pulsar.common.api.proto.KeyValue;
import org.apache.pulsar.common.api.proto.MessageIdData;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.api.proto.ProtocolVersion;
import org.apache.pulsar.common.api.proto.SingleMessageMetadata;
import org.apache.pulsar.common.compression.CompressionCodec;
import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.schema.SchemaInfo;
import org.apache.pulsar.common.schema.SchemaType;
import org.apache.pulsar.common.util.CompletableFutureCancellationHandler;
import org.apache.pulsar.common.util.ExceptionHandler;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.SafeCollectionUtils;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.GrowableArrayBlockingQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ConsumerImpl<T> extends ConsumerBase<T> implements ConnectionHandler.Connection {
private static final int MAX_REDELIVER_UNACKNOWLEDGED = 1000;
final long consumerId;
// Number of messages that have delivered to the application. Every once in a while, this number will be sent to the
// broker to notify that we are ready to get (and store in the incoming messages queue) more messages
@SuppressWarnings("rawtypes")
private static final AtomicIntegerFieldUpdater<ConsumerImpl> AVAILABLE_PERMITS_UPDATER = AtomicIntegerFieldUpdater
.newUpdater(ConsumerImpl.class, "availablePermits");
@SuppressWarnings("unused")
private volatile int availablePermits = 0;
protected volatile MessageId lastDequeuedMessageId = MessageId.earliest;
private volatile MessageId lastMessageIdInBroker = MessageId.earliest;
private final long lookupDeadline;
@SuppressWarnings("rawtypes")
private static final AtomicLongFieldUpdater<ConsumerImpl> SUBSCRIBE_DEADLINE_UPDATER = AtomicLongFieldUpdater
.newUpdater(ConsumerImpl.class, "subscribeDeadline");
@SuppressWarnings("unused")
private volatile long subscribeDeadline = 0; // gets set on first successful connection
private final int partitionIndex;
private final boolean hasParentConsumer;
private final boolean parentConsumerHasListener;
private final AcknowledgmentsGroupingTracker acknowledgmentsGroupingTracker;
private final NegativeAcksTracker negativeAcksTracker;
protected final ConsumerStatsRecorder stats;
@Getter(AccessLevel.PACKAGE)
private final int priorityLevel;
private final SubscriptionMode subscriptionMode;
private volatile MessageIdAdv startMessageId;
private volatile MessageIdAdv seekMessageId;
private final AtomicBoolean duringSeek;
private final MessageIdAdv initialStartMessageId;
private final long startMessageRollbackDurationInSec;
private volatile boolean hasReachedEndOfTopic;
private final MessageCrypto msgCrypto;
private final Map<String, String> metadata;
private final boolean readCompacted;
private final boolean resetIncludeHead;
private final SubscriptionInitialPosition subscriptionInitialPosition;
private final ConnectionHandler connectionHandler;
private final TopicName topicName;
private final String topicNameWithoutPartition;
private final Map<MessageIdAdv, List<MessageImpl<T>>> possibleSendToDeadLetterTopicMessages;
private final DeadLetterPolicy deadLetterPolicy;
private volatile CompletableFuture<Producer<byte[]>> deadLetterProducer;
private volatile Producer<byte[]> retryLetterProducer;
private final ReadWriteLock createProducerLock = new ReentrantReadWriteLock();
protected volatile boolean paused;
protected ConcurrentOpenHashMap<String, ChunkedMessageCtx> chunkedMessagesMap =
ConcurrentOpenHashMap.<String, ChunkedMessageCtx>newBuilder().build();
private int pendingChunkedMessageCount = 0;
protected long expireTimeOfIncompleteChunkedMessageMillis = 0;
private final AtomicBoolean expireChunkMessageTaskScheduled = new AtomicBoolean(false);
private final int maxPendingChunkedMessage;
// if queue size is reasonable (most of the time equal to number of producers try to publish messages concurrently
// on the topic) then it guards against broken chunked message which was not fully published
private final boolean autoAckOldestChunkedMessageOnQueueFull;
// it will be used to manage N outstanding chunked message buffers
private final BlockingQueue<String> pendingChunkedMessageUuidQueue;
private final boolean createTopicIfDoesNotExist;
private final boolean poolMessages;
private final AtomicReference<ClientCnx> clientCnxUsedForConsumerRegistration = new AtomicReference<>();
private final List<Throwable> previousExceptions = new CopyOnWriteArrayList<Throwable>();
static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
String topic,
ConsumerConfigurationData<T> conf,
ExecutorProvider executorProvider,
int partitionIndex,
boolean hasParentConsumer,
CompletableFuture<Consumer<T>> subscribeFuture,
MessageId startMessageId,
Schema<T> schema,
ConsumerInterceptors<T> interceptors,
boolean createTopicIfDoesNotExist) {
return newConsumerImpl(client, topic, conf, executorProvider, partitionIndex, hasParentConsumer, false,
subscribeFuture, startMessageId, schema, interceptors, createTopicIfDoesNotExist, 0);
}
static <T> ConsumerImpl<T> newConsumerImpl(PulsarClientImpl client,
String topic,
ConsumerConfigurationData<T> conf,
ExecutorProvider executorProvider,
int partitionIndex,
boolean hasParentConsumer,
boolean parentConsumerHasListener,
CompletableFuture<Consumer<T>> subscribeFuture,
MessageId startMessageId,
Schema<T> schema,
ConsumerInterceptors<T> interceptors,
boolean createTopicIfDoesNotExist,
long startMessageRollbackDurationInSec) {
if (conf.getReceiverQueueSize() == 0) {
return new ZeroQueueConsumerImpl<>(client, topic, conf, executorProvider, partitionIndex, hasParentConsumer,
subscribeFuture,
startMessageId, schema, interceptors,
createTopicIfDoesNotExist);
} else {
return new ConsumerImpl<>(client, topic, conf, executorProvider, partitionIndex, hasParentConsumer,
parentConsumerHasListener,
subscribeFuture, startMessageId,
startMessageRollbackDurationInSec /* rollback time in sec to start msgId */,
schema, interceptors, createTopicIfDoesNotExist);
}
}
protected ConsumerImpl(PulsarClientImpl client, String topic, ConsumerConfigurationData<T> conf,
ExecutorProvider executorProvider, int partitionIndex, boolean hasParentConsumer,
boolean parentConsumerHasListener, CompletableFuture<Consumer<T>> subscribeFuture, MessageId startMessageId,
long startMessageRollbackDurationInSec, Schema<T> schema, ConsumerInterceptors<T> interceptors,
boolean createTopicIfDoesNotExist) {
super(client, topic, conf, conf.getReceiverQueueSize(), executorProvider, subscribeFuture, schema,
interceptors);
this.consumerId = client.newConsumerId();
TopicName topicName = TopicName.get(topic);
if (!topicName.isPersistent() && conf.getSubscriptionMode().equals(SubscriptionMode.Durable)) {
conf.setSubscriptionMode(SubscriptionMode.NonDurable);
log.warn("[{}] Cannot create a [Durable] subscription for a NonPersistentTopic, "
+ "will use [NonDurable] to subscribe. Subscription name: {}", topic, conf.getSubscriptionName());
}
this.subscriptionMode = conf.getSubscriptionMode();
if (startMessageId != null) {
MessageIdAdv firstChunkMessageId = ((MessageIdAdv) startMessageId).getFirstChunkMessageId();
this.startMessageId = (firstChunkMessageId == null) ? (MessageIdAdv) startMessageId : firstChunkMessageId;
}
this.initialStartMessageId = this.startMessageId;
this.startMessageRollbackDurationInSec = startMessageRollbackDurationInSec;
AVAILABLE_PERMITS_UPDATER.set(this, 0);
this.lookupDeadline = System.currentTimeMillis() + client.getConfiguration().getLookupTimeoutMs();
this.partitionIndex = partitionIndex;
this.hasParentConsumer = hasParentConsumer;
this.parentConsumerHasListener = parentConsumerHasListener;
this.priorityLevel = conf.getMatchingTopicConfiguration(topic).getPriorityLevel();
this.readCompacted = conf.isReadCompacted();
this.subscriptionInitialPosition = conf.getSubscriptionInitialPosition();
this.negativeAcksTracker = new NegativeAcksTracker(this, conf);
this.resetIncludeHead = conf.isResetIncludeHead();
this.createTopicIfDoesNotExist = createTopicIfDoesNotExist;
this.maxPendingChunkedMessage = conf.getMaxPendingChunkedMessage();
this.pendingChunkedMessageUuidQueue = new GrowableArrayBlockingQueue<>();
this.expireTimeOfIncompleteChunkedMessageMillis = conf.getExpireTimeOfIncompleteChunkedMessageMillis();
this.autoAckOldestChunkedMessageOnQueueFull = conf.isAutoAckOldestChunkedMessageOnQueueFull();
this.poolMessages = conf.isPoolMessages();
this.paused = conf.isStartPaused();
if (client.getConfiguration().getStatsIntervalSeconds() > 0) {
stats = new ConsumerStatsRecorderImpl(client, conf, this);
} else {
stats = ConsumerStatsDisabled.INSTANCE;
}
duringSeek = new AtomicBoolean(false);
// Create msgCrypto if not created already
if (conf.getCryptoKeyReader() != null) {
if (conf.getMessageCrypto() != null) {
this.msgCrypto = conf.getMessageCrypto();
} else {
// default to use MessageCryptoBc;
MessageCrypto msgCryptoBc;
try {
msgCryptoBc = new MessageCryptoBc(
String.format("[%s] [%s]", topic, subscription),
false);
} catch (Exception e) {
log.error("MessageCryptoBc may not included in the jar. e:", e);
msgCryptoBc = null;
}
this.msgCrypto = msgCryptoBc;
}
} else {
this.msgCrypto = null;
}
if (conf.getProperties().isEmpty()) {
metadata = Collections.emptyMap();
} else {
metadata = Collections.unmodifiableMap(new HashMap<>(conf.getProperties()));
}
this.connectionHandler = new ConnectionHandler(this,
new BackoffBuilder()
.setInitialTime(client.getConfiguration().getInitialBackoffIntervalNanos(),
TimeUnit.NANOSECONDS)
.setMax(client.getConfiguration().getMaxBackoffIntervalNanos(), TimeUnit.NANOSECONDS)
.setMandatoryStop(0, TimeUnit.MILLISECONDS)
.create(),
this);
this.topicName = TopicName.get(topic);
if (this.topicName.isPersistent()) {
this.acknowledgmentsGroupingTracker =
new PersistentAcknowledgmentsGroupingTracker(this, conf, client.eventLoopGroup());
} else {
this.acknowledgmentsGroupingTracker =
NonPersistentAcknowledgmentGroupingTracker.of();
}
if (conf.getDeadLetterPolicy() != null) {
possibleSendToDeadLetterTopicMessages = new ConcurrentHashMap<>();
if (StringUtils.isNotBlank(conf.getDeadLetterPolicy().getDeadLetterTopic())) {
this.deadLetterPolicy = DeadLetterPolicy.builder()
.maxRedeliverCount(conf.getDeadLetterPolicy().getMaxRedeliverCount())
.deadLetterTopic(conf.getDeadLetterPolicy().getDeadLetterTopic())
.build();
} else {
this.deadLetterPolicy = DeadLetterPolicy.builder()
.maxRedeliverCount(conf.getDeadLetterPolicy().getMaxRedeliverCount())
.deadLetterTopic(String.format("%s-%s" + RetryMessageUtil.DLQ_GROUP_TOPIC_SUFFIX,
topic, subscription))
.build();
}
if (StringUtils.isNotBlank(conf.getDeadLetterPolicy().getRetryLetterTopic())) {
this.deadLetterPolicy.setRetryLetterTopic(conf.getDeadLetterPolicy().getRetryLetterTopic());
} else {
this.deadLetterPolicy.setRetryLetterTopic(String.format(
"%s-%s" + RetryMessageUtil.RETRY_GROUP_TOPIC_SUFFIX,
topic, subscription));
}
if (StringUtils.isNotBlank(conf.getDeadLetterPolicy().getInitialSubscriptionName())) {
this.deadLetterPolicy.setInitialSubscriptionName(
conf.getDeadLetterPolicy().getInitialSubscriptionName());
}
} else {
deadLetterPolicy = null;
possibleSendToDeadLetterTopicMessages = null;
}
topicNameWithoutPartition = topicName.getPartitionedTopicName();
grabCnx();
}
public ConnectionHandler getConnectionHandler() {
return connectionHandler;
}
@Override
public UnAckedMessageTracker getUnAckedMessageTracker() {
return unAckedMessageTracker;
}
@VisibleForTesting
NegativeAcksTracker getNegativeAcksTracker() {
return negativeAcksTracker;
}
@Override
public CompletableFuture<Void> unsubscribeAsync() {
if (getState() == State.Closing || getState() == State.Closed) {
return FutureUtil
.failedFuture(new PulsarClientException.AlreadyClosedException("Consumer was already closed"));
}
final CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>();
if (isConnected()) {
setState(State.Closing);
long requestId = client.newRequestId();
ByteBuf unsubscribe = Commands.newUnsubscribe(consumerId, requestId);
ClientCnx cnx = cnx();
cnx.sendRequestWithId(unsubscribe, requestId).thenRun(() -> {
closeConsumerTasks();
deregisterFromClientCnx();
client.cleanupConsumer(this);
log.info("[{}][{}] Successfully unsubscribed from topic", topic, subscription);
setState(State.Closed);
unsubscribeFuture.complete(null);
}).exceptionally(e -> {
log.error("[{}][{}] Failed to unsubscribe: {}", topic, subscription, e.getCause().getMessage());
setState(State.Ready);
unsubscribeFuture.completeExceptionally(
PulsarClientException.wrap(e.getCause(),
String.format("Failed to unsubscribe the subscription %s of topic %s",
topicName.toString(), subscription)));
return null;
});
} else {
unsubscribeFuture.completeExceptionally(
new PulsarClientException(
String.format("The client is not connected to the broker when unsubscribing the "
+ "subscription %s of the topic %s", subscription, topicName.toString())));
}
return unsubscribeFuture;
}
@Override
public int minReceiverQueueSize() {
int size = Math.min(INITIAL_RECEIVER_QUEUE_SIZE, maxReceiverQueueSize);
if (batchReceivePolicy.getMaxNumMessages() > 0) {
// consumerImpl may store (half-1) permits locally.
size = Math.max(size, 2 * batchReceivePolicy.getMaxNumMessages() - 2);
}
return size;
}
@Override
protected Message<T> internalReceive() throws PulsarClientException {
Message<T> message;
try {
if (incomingMessages.isEmpty()) {
expectMoreIncomingMessages();
}
message = incomingMessages.take();
messageProcessed(message);
return beforeConsume(message);
} catch (InterruptedException e) {
ExceptionHandler.handleInterruptedException(e);
stats.incrementNumReceiveFailed();
throw PulsarClientException.unwrap(e);
}
}
@Override
protected CompletableFuture<Message<T>> internalReceiveAsync() {
CompletableFutureCancellationHandler cancellationHandler = new CompletableFutureCancellationHandler();
CompletableFuture<Message<T>> result = cancellationHandler.createFuture();
internalPinnedExecutor.execute(() -> {
Message<T> message = incomingMessages.poll();
if (message == null) {
expectMoreIncomingMessages();
pendingReceives.add(result);
cancellationHandler.setCancelAction(() -> pendingReceives.remove(result));
} else {
messageProcessed(message);
result.complete(beforeConsume(message));
}
});
return result;
}
@Override
protected Message<T> internalReceive(long timeout, TimeUnit unit) throws PulsarClientException {
Message<T> message;
try {
if (incomingMessages.isEmpty()) {
expectMoreIncomingMessages();
}
message = incomingMessages.poll(timeout, unit);
if (message == null) {
return null;
}
messageProcessed(message);
return beforeConsume(message);
} catch (InterruptedException e) {
ExceptionHandler.handleInterruptedException(e);
State state = getState();
if (state != State.Closing && state != State.Closed) {
stats.incrementNumReceiveFailed();
throw PulsarClientException.unwrap(e);
} else {
return null;
}
}
}
@Override
protected Messages<T> internalBatchReceive() throws PulsarClientException {
try {
return internalBatchReceiveAsync().get();
} catch (InterruptedException | ExecutionException e) {
ExceptionHandler.handleInterruptedException(e);
State state = getState();
if (state != State.Closing && state != State.Closed) {
stats.incrementNumBatchReceiveFailed();
throw PulsarClientException.unwrap(e);
} else {
return null;
}
}
}
@Override
protected CompletableFuture<Messages<T>> internalBatchReceiveAsync() {
CompletableFutureCancellationHandler cancellationHandler = new CompletableFutureCancellationHandler();
CompletableFuture<Messages<T>> result = cancellationHandler.createFuture();
internalPinnedExecutor.execute(() -> {
if (hasEnoughMessagesForBatchReceive()) {
notifyPendingBatchReceivedCallBack(result);
} else {
expectMoreIncomingMessages();
OpBatchReceive<T> opBatchReceive = OpBatchReceive.of(result);
pendingBatchReceives.add(opBatchReceive);
triggerBatchReceiveTimeoutTask();
cancellationHandler.setCancelAction(() -> pendingBatchReceives.remove(opBatchReceive));
}
});
return result;
}
@Override
protected CompletableFuture<Void> doAcknowledge(MessageId messageId, AckType ackType,
Map<String, Long> properties,
TransactionImpl txn) {
if (getState() != State.Ready && getState() != State.Connecting) {
stats.incrementNumAcksFailed();
PulsarClientException exception = new PulsarClientException("Consumer not ready. State: " + getState());
if (AckType.Individual.equals(ackType)) {
onAcknowledge(messageId, exception);
} else if (AckType.Cumulative.equals(ackType)) {
onAcknowledgeCumulative(messageId, exception);
}
return FutureUtil.failedFuture(exception);
}
if (txn != null) {
return doTransactionAcknowledgeForResponse(messageId, ackType, null, properties,
new TxnID(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits()));
}
return acknowledgmentsGroupingTracker.addAcknowledgment(messageId, ackType, properties);
}
@Override
protected CompletableFuture<Void> doAcknowledge(List<MessageId> messageIdList, AckType ackType,
Map<String, Long> properties, TransactionImpl txn) {
if (getState() != State.Ready && getState() != State.Connecting) {
stats.incrementNumAcksFailed();
PulsarClientException exception = new PulsarClientException("Consumer not ready. State: " + getState());
if (AckType.Individual.equals(ackType)) {
onAcknowledge(messageIdList, exception);
} else if (AckType.Cumulative.equals(ackType)) {
onAcknowledgeCumulative(messageIdList, exception);
}
return FutureUtil.failedFuture(exception);
}
if (txn != null) {
return doTransactionAcknowledgeForResponse(messageIdList, ackType,
properties, new TxnID(txn.getTxnIdMostBits(), txn.getTxnIdLeastBits()));
} else {
return this.acknowledgmentsGroupingTracker.addListAcknowledgment(messageIdList, ackType, properties);
}
}
@SuppressWarnings("unchecked")
@Override
protected CompletableFuture<Void> doReconsumeLater(Message<?> message, AckType ackType,
Map<String, String> customProperties,
long delayTime,
TimeUnit unit) {
MessageId messageId = message.getMessageId();
if (messageId == null) {
return FutureUtil.failedFuture(new PulsarClientException
.InvalidMessageException("Cannot handle message with null messageId"));
}
if (getState() != State.Ready && getState() != State.Connecting) {
stats.incrementNumAcksFailed();
PulsarClientException exception = new PulsarClientException("Consumer not ready. State: " + getState());
if (AckType.Individual.equals(ackType)) {
onAcknowledge(messageId, exception);
} else if (AckType.Cumulative.equals(ackType)) {
onAcknowledgeCumulative(messageId, exception);
}
return FutureUtil.failedFuture(exception);
}
if (delayTime < 0) {
delayTime = 0;
}
if (retryLetterProducer == null) {
createProducerLock.writeLock().lock();
try {
if (retryLetterProducer == null) {
retryLetterProducer = client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema))
.topic(this.deadLetterPolicy.getRetryLetterTopic())
.enableBatching(false)
.blockIfQueueFull(false)
.create();
}
} catch (Exception e) {
log.error("Create retry letter producer exception with topic: {}",
deadLetterPolicy.getRetryLetterTopic(), e);
return FutureUtil.failedFuture(e);
} finally {
createProducerLock.writeLock().unlock();
}
}
CompletableFuture<Void> result = new CompletableFuture<>();
if (retryLetterProducer != null) {
try {
MessageImpl<T> retryMessage = (MessageImpl<T>) getMessageImpl(message);
String originMessageIdStr = message.getMessageId().toString();
String originTopicNameStr = getOriginTopicNameStr(message);
SortedMap<String, String> propertiesMap =
getPropertiesMap(message, originMessageIdStr, originTopicNameStr);
if (customProperties != null) {
propertiesMap.putAll(customProperties);
}
int reconsumeTimes = 1;
if (propertiesMap.containsKey(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES)) {
reconsumeTimes = Integer.parseInt(
propertiesMap.get(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES));
reconsumeTimes = reconsumeTimes + 1;
}
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES, String.valueOf(reconsumeTimes));
propertiesMap.put(RetryMessageUtil.SYSTEM_PROPERTY_DELAY_TIME,
String.valueOf(unit.toMillis(delayTime)));
MessageId finalMessageId = messageId;
if (reconsumeTimes > this.deadLetterPolicy.getMaxRedeliverCount()
&& StringUtils.isNotBlank(deadLetterPolicy.getDeadLetterTopic())) {
initDeadLetterProducerIfNeeded();
deadLetterProducer.thenAcceptAsync(dlqProducer -> {
TypedMessageBuilder<byte[]> typedMessageBuilderNew =
dlqProducer.newMessage(Schema.AUTO_PRODUCE_BYTES(retryMessage.getReaderSchema().get()))
.value(retryMessage.getData())
.properties(propertiesMap);
typedMessageBuilderNew.sendAsync().thenAccept(msgId -> {
doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null).thenAccept(v -> {
result.complete(null);
}).exceptionally(ex -> {
result.completeExceptionally(ex);
return null;
});
}).exceptionally(ex -> {
result.completeExceptionally(ex);
return null;
});
}, internalPinnedExecutor).exceptionally(ex -> {
result.completeExceptionally(ex);
deadLetterProducer = null;
return null;
});
} else {
assert retryMessage != null;
TypedMessageBuilder<byte[]> typedMessageBuilderNew = retryLetterProducer
.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
.value(retryMessage.getData())
.properties(propertiesMap);
if (delayTime > 0) {
typedMessageBuilderNew.deliverAfter(delayTime, unit);
}
if (message.hasKey()) {
typedMessageBuilderNew.key(message.getKey());
}
typedMessageBuilderNew.sendAsync()
.thenCompose(__ -> doAcknowledge(finalMessageId, ackType, Collections.emptyMap(), null))
.thenAccept(v -> result.complete(null))
.exceptionally(ex -> {
result.completeExceptionally(ex);
return null;
});
}
} catch (Exception e) {
result.completeExceptionally(e);
}
}
MessageId finalMessageId = messageId;
result.exceptionally(ex -> {
log.error("Send to retry letter topic exception with topic: {}, messageId: {}",
retryLetterProducer.getTopic(), finalMessageId, ex);
Set<MessageId> messageIds = Collections.singleton(finalMessageId);
unAckedMessageTracker.remove(finalMessageId);
redeliverUnacknowledgedMessages(messageIds);
return null;
});
return result;
}
private SortedMap<String, String> getPropertiesMap(Message<?> message,
String originMessageIdStr,
String originTopicNameStr) {
SortedMap<String, String> propertiesMap = new TreeMap<>();
if (message.getProperties() != null) {
propertiesMap.putAll(message.getProperties());
}
propertiesMap.putIfAbsent(RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC, originTopicNameStr);
//Compatible with the old version, will be deleted in the future
propertiesMap.putIfAbsent(RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID, originMessageIdStr);
propertiesMap.putIfAbsent(RetryMessageUtil.PROPERTY_ORIGIN_MESSAGE_ID, originMessageIdStr);
return propertiesMap;
}
private String getOriginTopicNameStr(Message<?> message) {
MessageId messageId = message.getMessageId();
if (messageId instanceof TopicMessageId) {
String topic = ((TopicMessageId) messageId).getOwnerTopic();
int index = topic.lastIndexOf(TopicName.PARTITIONED_TOPIC_SUFFIX);
if (index < 0) {
return topic;
} else {
return topic.substring(0, index);
}
} else {
return message.getTopicName();
}
}
private MessageImpl<?> getMessageImpl(Message<?> message) {
if (message instanceof TopicMessageImpl) {
return (MessageImpl<?>) ((TopicMessageImpl<?>) message).getMessage();
} else if (message instanceof MessageImpl) {
return (MessageImpl<?>) message;
}
return null;
}
@Override
public void negativeAcknowledge(MessageId messageId) {
negativeAcksTracker.add(messageId);
// Ensure the message is not redelivered for ack-timeout, since we did receive an "ack"
unAckedMessageTracker.remove(messageId);
}
@Override
public void negativeAcknowledge(Message<?> message) {
negativeAcksTracker.add(message);
// Ensure the message is not redelivered for ack-timeout, since we did receive an "ack"
unAckedMessageTracker.remove(MessageIdAdvUtils.discardBatch(message.getMessageId()));
}
@Override
public CompletableFuture<Void> connectionOpened(final ClientCnx cnx) {
previousExceptions.clear();
final State state = getState();
if (state == State.Closing || state == State.Closed) {
setState(State.Closed);
closeConsumerTasks();
deregisterFromClientCnx();
client.cleanupConsumer(this);
clearReceiverQueue();
return CompletableFuture.completedFuture(null);
}
log.info("[{}][{}] Subscribing to topic on cnx {}, consumerId {}",
topic, subscription, cnx.ctx().channel(), consumerId);
long requestId = client.newRequestId();
if (duringSeek.get()) {
acknowledgmentsGroupingTracker.flushAndClean();
}
SUBSCRIBE_DEADLINE_UPDATER
.compareAndSet(this, 0L, System.currentTimeMillis()
+ client.getConfiguration().getOperationTimeoutMs());
int currentSize;
synchronized (this) {
currentSize = incomingMessages.size();
startMessageId = clearReceiverQueue();
if (possibleSendToDeadLetterTopicMessages != null) {
possibleSendToDeadLetterTopicMessages.clear();
}
}
boolean isDurable = subscriptionMode == SubscriptionMode.Durable;
final MessageIdData startMessageIdData;
// For regular durable subscriptions, the message id from where to restart will be determined by the broker.
// For non-durable we are going to restart from the next entry.
if (!isDurable && startMessageId != null) {
startMessageIdData = new MessageIdData()
.setLedgerId(startMessageId.getLedgerId())
.setEntryId(startMessageId.getEntryId())
.setBatchIndex(startMessageId.getBatchIndex());
} else {
startMessageIdData = null;
}
SchemaInfo si = schema.getSchemaInfo();
if (si != null && (SchemaType.BYTES == si.getType() || SchemaType.NONE == si.getType())) {
// don't set schema for Schema.BYTES
si = null;
} else {
if (schema instanceof AutoConsumeSchema
&& Commands.peerSupportsCarryAutoConsumeSchemaToBroker(cnx.getRemoteEndpointProtocolVersion())) {
si = AutoConsumeSchema.SCHEMA_INFO;
}
}
// startMessageRollbackDurationInSec should be consider only once when consumer connects to first time
long startMessageRollbackDuration = (startMessageRollbackDurationInSec > 0
&& startMessageId != null
&& startMessageId.equals(initialStartMessageId)) ? startMessageRollbackDurationInSec : 0;
// synchronized this, because redeliverUnAckMessage eliminate the epoch inconsistency between them
final CompletableFuture<Void> future = new CompletableFuture<>();
synchronized (this) {
setClientCnx(cnx);
ByteBuf request = Commands.newSubscribe(topic, subscription, consumerId, requestId, getSubType(),
priorityLevel, consumerName, isDurable, startMessageIdData, metadata, readCompacted,
conf.isReplicateSubscriptionState(),
InitialPosition.valueOf(subscriptionInitialPosition.getValue()),
startMessageRollbackDuration, si, createTopicIfDoesNotExist, conf.getKeySharedPolicy(),
// Use the current epoch to subscribe.
conf.getSubscriptionProperties(), CONSUMER_EPOCH.get(this));
cnx.sendRequestWithId(request, requestId).thenRun(() -> {
synchronized (ConsumerImpl.this) {
if (changeToReadyState()) {
consumerIsReconnectedToBroker(cnx, currentSize);
} else {
// Consumer was closed while reconnecting, close the connection to make sure the broker
// drops the consumer on its side
setState(State.Closed);
deregisterFromClientCnx();
client.cleanupConsumer(this);
cnx.channel().close();
future.complete(null);
return;
}
}
resetBackoff();
boolean firstTimeConnect = subscribeFuture.complete(this);
// if the consumer is not partitioned or is re-connected and is partitioned, we send the flow
// command to receive messages.
if (!(firstTimeConnect && hasParentConsumer) && getCurrentReceiverQueueSize() != 0) {
increaseAvailablePermits(cnx, getCurrentReceiverQueueSize());
}
future.complete(null);
}).exceptionally((e) -> {
deregisterFromClientCnx();
if (getState() == State.Closing || getState() == State.Closed) {
// Consumer was closed while reconnecting, close the connection to make sure the broker
// drops the consumer on its side
cnx.channel().close();
future.complete(null);
return null;
}
log.warn("[{}][{}] Failed to subscribe to topic on {}", topic,
subscription, cnx.channel().remoteAddress());
if (e.getCause() instanceof PulsarClientException.TimeoutException) {
// Creating the consumer has timed out. We need to ensure the broker closes the consumer
// in case it was indeed created, otherwise it might prevent new create consumer operation,
// since we are not necessarily closing the connection.
long closeRequestId = client.newRequestId();
ByteBuf cmd = Commands.newCloseConsumer(consumerId, closeRequestId);
cnx.sendRequestWithId(cmd, closeRequestId);
}
if (e.getCause() instanceof PulsarClientException
&& PulsarClientException.isRetriableError(e.getCause())
&& System.currentTimeMillis() < SUBSCRIBE_DEADLINE_UPDATER.get(ConsumerImpl.this)) {
future.completeExceptionally(e.getCause());
} else if (!subscribeFuture.isDone()) {
// unable to create new consumer, fail operation
setState(State.Failed);
closeConsumerTasks();
subscribeFuture.completeExceptionally(
PulsarClientException.wrap(e, String.format("Failed to subscribe the topic %s "
+ "with subscription name %s when connecting to the broker",
topicName.toString(), subscription)));
client.cleanupConsumer(this);
} else if (e.getCause() instanceof TopicDoesNotExistException) {
// The topic was deleted after the consumer was created, and we're
// not allowed to recreate the topic. This can happen in few cases:
// * Regex consumer getting error after topic gets deleted
// * Regular consumer after topic is manually delete and with
// auto-topic-creation set to false
// No more retries are needed in this case.
setState(State.Failed);
closeConsumerTasks();
client.cleanupConsumer(this);
log.warn("[{}][{}] Closed consumer because topic does not exist anymore {}",
topic, subscription, cnx.channel().remoteAddress());
} else {
// consumer was subscribed and connected but we got some error, keep trying
future.completeExceptionally(e.getCause());
}
if (!future.isDone()) {
future.complete(null);
}
return null;
});
}
return future;
}
protected void consumerIsReconnectedToBroker(ClientCnx cnx, int currentQueueSize) {
log.info("[{}][{}] Subscribed to topic on {} -- consumer: {}", topic, subscription,
cnx.channel().remoteAddress(), consumerId);
AVAILABLE_PERMITS_UPDATER.set(this, 0);
}
/**
* Clear the internal receiver queue and returns the message id of what was the 1st message in the queue that was
* not seen by the application.
*/
private MessageIdAdv clearReceiverQueue() {
List<Message<?>> currentMessageQueue = new ArrayList<>(incomingMessages.size());
incomingMessages.drainTo(currentMessageQueue);
resetIncomingMessageSize();
if (duringSeek.compareAndSet(true, false)) {
return seekMessageId;
} else if (subscriptionMode == SubscriptionMode.Durable) {
return startMessageId;
}
if (!currentMessageQueue.isEmpty()) {
MessageIdAdv nextMessageInQueue = (MessageIdAdv) currentMessageQueue.get(0).getMessageId();
MessageIdAdv previousMessage;
if (MessageIdAdvUtils.isBatch(nextMessageInQueue)) {
// Get on the previous message within the current batch
previousMessage = new BatchMessageIdImpl(nextMessageInQueue.getLedgerId(),
nextMessageInQueue.getEntryId(), nextMessageInQueue.getPartitionIndex(),
nextMessageInQueue.getBatchIndex() - 1);
} else {
// Get on previous message in previous entry
previousMessage = MessageIdAdvUtils.prevMessageId(nextMessageInQueue);
}
// release messages if they are pooled messages
currentMessageQueue.forEach(Message::release);
return previousMessage;
} else if (!lastDequeuedMessageId.equals(MessageId.earliest)) {
// If the queue was empty we need to restart from the message just after the last one that has been dequeued
// in the past
return new BatchMessageIdImpl((MessageIdImpl) lastDequeuedMessageId);
} else {
// No message was received or dequeued by this consumer. Next message would still be the startMessageId
return startMessageId;
}
}
/**
* send the flow command to have the broker start pushing messages.
*/
private void sendFlowPermitsToBroker(ClientCnx cnx, int numMessages) {
if (cnx != null && numMessages > 0) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Adding {} additional permits", topic, subscription, numMessages);
}
if (log.isDebugEnabled()) {
cnx.ctx().writeAndFlush(Commands.newFlow(consumerId, numMessages))
.addListener(writeFuture -> {
if (!writeFuture.isSuccess()) {
log.debug("Consumer {} failed to send {} permits to broker: {}",
consumerId, numMessages, writeFuture.cause().getMessage());
} else {
log.debug("Consumer {} sent {} permits to broker", consumerId, numMessages);
}
});
} else {
cnx.ctx().writeAndFlush(Commands.newFlow(consumerId, numMessages), cnx.ctx().voidPromise());
}
}
}
@Override
public void connectionFailed(PulsarClientException exception) {
boolean nonRetriableError = !PulsarClientException.isRetriableError(exception);
boolean timeout = System.currentTimeMillis() > lookupDeadline;
if (nonRetriableError || timeout) {
exception.setPreviousExceptions(previousExceptions);
if (subscribeFuture.completeExceptionally(exception)) {
setState(State.Failed);
if (nonRetriableError) {
log.info("[{}] Consumer creation failed for consumer {} with unretriableError {}",
topic, consumerId, exception.getMessage());
} else {
log.info("[{}] Consumer creation failed for consumer {} after timeout", topic, consumerId);
}
closeConsumerTasks();
deregisterFromClientCnx();
client.cleanupConsumer(this);
}
} else {
previousExceptions.add(exception);
}
}
@Override
public CompletableFuture<Void> closeAsync() {
CompletableFuture<Void> closeFuture = new CompletableFuture<>();
if (getState() == State.Closing || getState() == State.Closed) {
closeConsumerTasks();
failPendingReceive().whenComplete((r, t) -> closeFuture.complete(null));
return closeFuture;
}
if (!isConnected()) {
log.info("[{}] [{}] Closed Consumer (not connected)", topic, subscription);
setState(State.Closed);
closeConsumerTasks();
deregisterFromClientCnx();
client.cleanupConsumer(this);
failPendingReceive().whenComplete((r, t) -> closeFuture.complete(null));
return closeFuture;
}
stats.getStatTimeout().ifPresent(Timeout::cancel);
setState(State.Closing);
closeConsumerTasks();
long requestId = client.newRequestId();
ClientCnx cnx = cnx();
if (null == cnx) {
cleanupAtClose(closeFuture, null);
} else {
ByteBuf cmd = Commands.newCloseConsumer(consumerId, requestId);
cnx.sendRequestWithId(cmd, requestId).handle((v, exception) -> {
final ChannelHandlerContext ctx = cnx.ctx();
boolean ignoreException = ctx == null || !ctx.channel().isActive();
if (ignoreException && exception != null) {
log.debug("Exception ignored in closing consumer", exception);
}
cleanupAtClose(closeFuture, ignoreException ? null : exception);
return null;
});
}
ArrayList<CompletableFuture<Void>> closeFutures = new ArrayList<>(4);
closeFutures.add(closeFuture);
if (retryLetterProducer != null) {
closeFutures.add(retryLetterProducer.closeAsync().whenComplete((ignore, ex) -> {
if (ex != null) {
log.warn("Exception ignored in closing retryLetterProducer of consumer", ex);
}
}));
}
if (deadLetterProducer != null) {
closeFutures.add(deadLetterProducer.thenCompose(p -> p.closeAsync()).whenComplete((ignore, ex) -> {
if (ex != null) {
log.warn("Exception ignored in closing deadLetterProducer of consumer", ex);
}
}));
}
return FutureUtil.waitForAll(closeFutures);
}
private void cleanupAtClose(CompletableFuture<Void> closeFuture, Throwable exception) {
log.info("[{}] [{}] Closed consumer", topic, subscription);
setState(State.Closed);
closeConsumerTasks();
deregisterFromClientCnx();
client.cleanupConsumer(this);
// fail all pending-receive futures to notify application
failPendingReceive().whenComplete((r, t) -> {
if (exception != null) {
closeFuture.completeExceptionally(exception);
} else {
closeFuture.complete(null);
}
});
}
private void closeConsumerTasks() {
unAckedMessageTracker.close();
if (possibleSendToDeadLetterTopicMessages != null) {
possibleSendToDeadLetterTopicMessages.clear();
}
acknowledgmentsGroupingTracker.close();
if (batchReceiveTimeout != null) {
batchReceiveTimeout.cancel();
}
negativeAcksTracker.close();
stats.getStatTimeout().ifPresent(Timeout::cancel);
if (poolMessages) {
releasePooledMessagesAndStopAcceptNew();
}
}
/**
* If enabled pooled messages, we should release the messages after closing consumer and stop accept the new
* messages.
*/
private void releasePooledMessagesAndStopAcceptNew() {
incomingMessages.terminate(message -> message.release());
clearIncomingMessages();
}
void activeConsumerChanged(boolean isActive) {
if (consumerEventListener == null) {
return;
}
externalPinnedExecutor.execute(() -> {
if (isActive) {
consumerEventListener.becameActive(this, partitionIndex);
} else {
consumerEventListener.becameInactive(this, partitionIndex);
}
});
}
protected boolean isBatch(MessageMetadata messageMetadata) {
// if message is not decryptable then it can't be parsed as a batch-message. so, add EncyrptionCtx to message
// and return undecrypted payload
return !isMessageUndecryptable(messageMetadata)
&& (messageMetadata.hasNumMessagesInBatch() || messageMetadata.getNumMessagesInBatch() != 1);
}
protected <V> MessageImpl<V> newSingleMessage(final int index,
final int numMessages,
final BrokerEntryMetadata brokerEntryMetadata,
final MessageMetadata msgMetadata,
final SingleMessageMetadata singleMessageMetadata,
final ByteBuf payload,
final MessageIdImpl messageId,
final Schema<V> schema,
final boolean containMetadata,
final BitSetRecyclable ackBitSet,
final BitSet ackSetInMessageId,
final int redeliveryCount,
final long consumerEpoch) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] processing message num - {} in batch", subscription, consumerName, index);
}
ByteBuf singleMessagePayload = null;
try {
if (containMetadata) {
singleMessagePayload =
Commands.deSerializeSingleMessageInBatch(payload, singleMessageMetadata, index, numMessages);
}
// If the topic is non-persistent, we should not ignore any messages.
if (this.topicName.isPersistent() && isSameEntry(messageId) && isPriorBatchIndex(index)) {
// If we are receiving a batch message, we need to discard messages that were prior
// to the startMessageId
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Ignoring message from before the startMessageId: {}", subscription,
consumerName, startMessageId);
}
return null;
}
if (singleMessageMetadata != null && singleMessageMetadata.isCompactedOut()) {
// message has been compacted out, so don't send to the user
return null;
}
if (ackBitSet != null && !ackBitSet.get(index)) {
return null;
}
BatchMessageIdImpl batchMessageIdImpl = new BatchMessageIdImpl(messageId.getLedgerId(),
messageId.getEntryId(), getPartitionIndex(), index, numMessages, ackSetInMessageId);
final ByteBuf payloadBuffer = (singleMessagePayload != null) ? singleMessagePayload : payload;
final MessageImpl<V> message = MessageImpl.create(topicName.toString(), batchMessageIdImpl,
msgMetadata, singleMessageMetadata, payloadBuffer,
createEncryptionContext(msgMetadata), cnx(), schema, redeliveryCount, poolMessages, consumerEpoch);
message.setBrokerEntryMetadata(brokerEntryMetadata);
return message;
} catch (IOException | IllegalStateException e) {
throw new IllegalStateException(e);
} finally {
if (singleMessagePayload != null) {
singleMessagePayload.release();
}
}
}
protected <V> MessageImpl<V> newMessage(final MessageIdImpl messageId,
final BrokerEntryMetadata brokerEntryMetadata,
final MessageMetadata messageMetadata,
final ByteBuf payload,
final Schema<V> schema,
final int redeliveryCount,
final long consumerEpoch) {
final MessageImpl<V> message = MessageImpl.create(topicName.toString(), messageId, messageMetadata, payload,
createEncryptionContext(messageMetadata), cnx(), schema, redeliveryCount, poolMessages, consumerEpoch);
message.setBrokerEntryMetadata(brokerEntryMetadata);
return message;
}
private void executeNotifyCallback(final MessageImpl<T> message) {
// Enqueue the message so that it can be retrieved when application calls receive()
// if the conf.getReceiverQueueSize() is 0 then discard message if no one is waiting for it.
// if asyncReceive is waiting then notify callback without adding to incomingMessages queue
internalPinnedExecutor.execute(() -> {
if (!isValidConsumerEpoch(message)) {
increaseAvailablePermits(cnx());
return;
}
if (hasNextPendingReceive()) {
notifyPendingReceivedCallback(message, null);
} else if (enqueueMessageAndCheckBatchReceive(message) && hasPendingBatchReceive()) {
notifyPendingBatchReceivedCallBack();
}
});
}
private void processPayloadByProcessor(final BrokerEntryMetadata brokerEntryMetadata,
final MessageMetadata messageMetadata,
final ByteBuf byteBuf,
final MessageIdImpl messageId,
final Schema<T> schema,
final int redeliveryCount,
final List<Long> ackSet,
long consumerEpoch) {
final MessagePayloadImpl payload = MessagePayloadImpl.create(byteBuf);
final MessagePayloadContextImpl entryContext = MessagePayloadContextImpl.get(
brokerEntryMetadata, messageMetadata, messageId, this, redeliveryCount, ackSet, consumerEpoch);
final AtomicInteger skippedMessages = new AtomicInteger(0);
try {
conf.getPayloadProcessor().process(payload, entryContext, schema, message -> {
if (message != null) {
executeNotifyCallback((MessageImpl<T>) message);
} else {
skippedMessages.incrementAndGet();
}
});
} catch (Throwable throwable) {
log.warn("[{}] [{}] unable to obtain message in batch", subscription, consumerName, throwable);
discardCorruptedMessage(messageId, cnx(), ValidationError.BatchDeSerializeError);
} finally {
entryContext.recycle();
payload.release(); // byteBuf.release() is called in this method
}
if (skippedMessages.get() > 0) {
increaseAvailablePermits(cnx(), skippedMessages.get());
}
tryTriggerListener();
}
void messageReceived(CommandMessage cmdMessage, ByteBuf headersAndPayload, ClientCnx cnx) {
List<Long> ackSet = Collections.emptyList();
if (cmdMessage.getAckSetsCount() > 0) {
ackSet = new ArrayList<>(cmdMessage.getAckSetsCount());
for (int i = 0; i < cmdMessage.getAckSetsCount(); i++) {
ackSet.add(cmdMessage.getAckSetAt(i));
}
}
int redeliveryCount = cmdMessage.getRedeliveryCount();
MessageIdData messageId = cmdMessage.getMessageId();
long consumerEpoch = DEFAULT_CONSUMER_EPOCH;
// if broker send messages to client with consumerEpoch, we should set consumerEpoch to message
if (cmdMessage.hasConsumerEpoch()) {
consumerEpoch = cmdMessage.getConsumerEpoch();
}
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Received message: {}/{}", topic, subscription, messageId.getLedgerId(),
messageId.getEntryId());
}
if (!verifyChecksum(headersAndPayload, messageId)) {
// discard message with checksum error
discardCorruptedMessage(messageId, cnx, ValidationError.ChecksumMismatch);
return;
}
BrokerEntryMetadata brokerEntryMetadata;
MessageMetadata msgMetadata;
try {
brokerEntryMetadata = Commands.parseBrokerEntryMetadataIfExist(headersAndPayload);
msgMetadata = Commands.parseMessageMetadata(headersAndPayload);
} catch (Throwable t) {
discardCorruptedMessage(messageId, cnx, ValidationError.ChecksumMismatch);
return;
}
final int numMessages = msgMetadata.getNumMessagesInBatch();
final int numChunks = msgMetadata.hasNumChunksFromMsg() ? msgMetadata.getNumChunksFromMsg() : 0;
final boolean isChunkedMessage = numChunks > 1;
MessageIdImpl msgId = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), getPartitionIndex());
if (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch()
&& acknowledgmentsGroupingTracker.isDuplicate(msgId)) {
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Ignoring message as it was already being acked earlier by same consumer {}/{}",
topic, subscription, consumerName, msgId);
}
increaseAvailablePermits(cnx, numMessages);
return;
}
ByteBuf decryptedPayload = decryptPayloadIfNeeded(messageId, redeliveryCount, msgMetadata, headersAndPayload,
cnx);
boolean isMessageUndecryptable = isMessageUndecryptable(msgMetadata);
if (decryptedPayload == null) {
// Message was discarded or CryptoKeyReader isn't implemented
return;
}
// uncompress decryptedPayload and release decryptedPayload-ByteBuf
ByteBuf uncompressedPayload = (isMessageUndecryptable || isChunkedMessage) ? decryptedPayload.retain()
: uncompressPayloadIfNeeded(messageId, msgMetadata, decryptedPayload, cnx, true);
decryptedPayload.release();
if (uncompressedPayload == null) {
// Message was discarded on decompression error
return;
}
if (conf.getPayloadProcessor() != null) {
// uncompressedPayload is released in this method so we don't need to call release() again
processPayloadByProcessor(brokerEntryMetadata, msgMetadata,
uncompressedPayload, msgId, schema, redeliveryCount, ackSet, consumerEpoch);
return;
}
// if message is not decryptable then it can't be parsed as a batch-message. so, add EncyrptionCtx to message
// and return undecrypted payload
if (isMessageUndecryptable || (numMessages == 1 && !msgMetadata.hasNumMessagesInBatch())) {
// right now, chunked messages are only supported by non-shared subscription
if (isChunkedMessage) {
uncompressedPayload = processMessageChunk(uncompressedPayload, msgMetadata, msgId, messageId, cnx);
if (uncompressedPayload == null) {
return;
}
// last chunk received: so, stitch chunked-messages and clear up chunkedMsgBuffer
if (log.isDebugEnabled()) {
log.debug("Chunked message completed chunkId {}, total-chunks {}, msgId {} sequenceId {}",
msgMetadata.getChunkId(), msgMetadata.getNumChunksFromMsg(), msgId,
msgMetadata.getSequenceId());
}
// remove buffer from the map, set the chunk message id
ChunkedMessageCtx chunkedMsgCtx = chunkedMessagesMap.remove(msgMetadata.getUuid());
if (chunkedMsgCtx.chunkedMessageIds.length > 0) {
msgId = new ChunkMessageIdImpl(chunkedMsgCtx.chunkedMessageIds[0],
chunkedMsgCtx.chunkedMessageIds[chunkedMsgCtx.chunkedMessageIds.length - 1]);
}
// add chunked messageId to unack-message tracker, and reduce pending-chunked-message count
unAckedChunkedMessageIdSequenceMap.put(msgId, chunkedMsgCtx.chunkedMessageIds);
pendingChunkedMessageCount--;
chunkedMsgCtx.recycle();
}
// If the topic is non-persistent, we should not ignore any messages.
if (this.topicName.isPersistent() && isSameEntry(msgId) && isPriorEntryIndex(messageId.getEntryId())) {
// We need to discard entries that were prior to startMessageId
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Ignoring message from before the startMessageId: {}", subscription,
consumerName, startMessageId);
}
uncompressedPayload.release();
return;
}
final MessageImpl<T> message =
newMessage(msgId, brokerEntryMetadata, msgMetadata, uncompressedPayload,
schema, redeliveryCount, consumerEpoch);
uncompressedPayload.release();
if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null) {
if (redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
possibleSendToDeadLetterTopicMessages.put((MessageIdImpl) message.getMessageId(),
Collections.singletonList(message));
if (redeliveryCount > deadLetterPolicy.getMaxRedeliverCount()) {
redeliverUnacknowledgedMessages(Collections.singleton(message.getMessageId()));
// The message is skipped due to reaching the max redelivery count,
// so we need to increase the available permits
increaseAvailablePermits(cnx);
return;
}
}
}
executeNotifyCallback(message);
} else {
// handle batch message enqueuing; uncompressed payload has all messages in batch
receiveIndividualMessagesFromBatch(brokerEntryMetadata, msgMetadata, redeliveryCount, ackSet,
uncompressedPayload, messageId, cnx, consumerEpoch);
uncompressedPayload.release();
}
tryTriggerListener();
}
private ByteBuf processMessageChunk(ByteBuf compressedPayload, MessageMetadata msgMetadata, MessageIdImpl msgId,
MessageIdData messageId, ClientCnx cnx) {
// Lazy task scheduling to expire incomplete chunk message
if (expireTimeOfIncompleteChunkedMessageMillis > 0 && expireChunkMessageTaskScheduled.compareAndSet(false,
true)) {
((ScheduledExecutorService) client.getScheduledExecutorProvider().getExecutor()).scheduleAtFixedRate(
() -> internalPinnedExecutor
.execute(catchingAndLoggingThrowables(this::removeExpireIncompleteChunkedMessages)),
expireTimeOfIncompleteChunkedMessageMillis, expireTimeOfIncompleteChunkedMessageMillis,
TimeUnit.MILLISECONDS
);
}
ChunkedMessageCtx chunkedMsgCtx = chunkedMessagesMap.get(msgMetadata.getUuid());
if (msgMetadata.getChunkId() == 0 && chunkedMsgCtx == null) {
pendingChunkedMessageCount++;
if (maxPendingChunkedMessage > 0 && pendingChunkedMessageCount > maxPendingChunkedMessage) {
removeOldestPendingChunkedMessage();
}
int totalChunks = msgMetadata.getNumChunksFromMsg();
ByteBuf chunkedMsgBuffer = PulsarByteBufAllocator.DEFAULT.buffer(msgMetadata.getTotalChunkMsgSize(),
msgMetadata.getTotalChunkMsgSize());
chunkedMsgCtx = chunkedMessagesMap.computeIfAbsent(msgMetadata.getUuid(),
(key) -> ChunkedMessageCtx.get(totalChunks, chunkedMsgBuffer));
pendingChunkedMessageUuidQueue.add(msgMetadata.getUuid());
}
// discard message if chunk is out-of-order
if (chunkedMsgCtx == null || chunkedMsgCtx.chunkedMsgBuffer == null
|| msgMetadata.getChunkId() != (chunkedMsgCtx.lastChunkedMessageId + 1)) {
// means we lost the first chunk: should never happen
log.info("Received unexpected chunk messageId {}, last-chunk-id{}, chunkId = {}", msgId,
(chunkedMsgCtx != null ? chunkedMsgCtx.lastChunkedMessageId : null), msgMetadata.getChunkId());
if (chunkedMsgCtx != null) {
if (chunkedMsgCtx.chunkedMsgBuffer != null) {
ReferenceCountUtil.safeRelease(chunkedMsgCtx.chunkedMsgBuffer);
}
chunkedMsgCtx.recycle();
}
chunkedMessagesMap.remove(msgMetadata.getUuid());
compressedPayload.release();
increaseAvailablePermits(cnx);
if (expireTimeOfIncompleteChunkedMessageMillis > 0
&& System.currentTimeMillis() > (msgMetadata.getPublishTime()
+ expireTimeOfIncompleteChunkedMessageMillis)) {
doAcknowledge(msgId, AckType.Individual, Collections.emptyMap(), null);
} else {
trackMessage(msgId);
}
return null;
}
chunkedMsgCtx.chunkedMessageIds[msgMetadata.getChunkId()] = msgId;
// append the chunked payload and update lastChunkedMessage-id
chunkedMsgCtx.chunkedMsgBuffer.writeBytes(compressedPayload);
chunkedMsgCtx.lastChunkedMessageId = msgMetadata.getChunkId();
// if final chunk is not received yet then release payload and return
if (msgMetadata.getChunkId() != (msgMetadata.getNumChunksFromMsg() - 1)) {
compressedPayload.release();
increaseAvailablePermits(cnx);
return null;
}
compressedPayload.release();
compressedPayload = chunkedMsgCtx.chunkedMsgBuffer;
ByteBuf uncompressedPayload = uncompressPayloadIfNeeded(messageId, msgMetadata, compressedPayload, cnx, false);
compressedPayload.release();
return uncompressedPayload;
}
/**
* Notify waiting asyncReceive request with the received message.
*
* @param message
*/
void notifyPendingReceivedCallback(final Message<T> message, Exception exception) {
if (pendingReceives.isEmpty()) {
return;
}
// fetch receivedCallback from queue
final CompletableFuture<Message<T>> receivedFuture = nextPendingReceive();
if (receivedFuture == null) {
return;
}
if (exception != null) {
internalPinnedExecutor.execute(() -> receivedFuture.completeExceptionally(exception));
return;
}
if (message == null) {
IllegalStateException e = new IllegalStateException("received message can't be null");
internalPinnedExecutor.execute(() -> receivedFuture.completeExceptionally(e));
return;
}
if (getCurrentReceiverQueueSize() == 0) {
// call interceptor and complete received callback
trackMessage(message);
interceptAndComplete(message, receivedFuture);
return;
}
// increase permits for available message-queue
messageProcessed(message);
// call interceptor and complete received callback
interceptAndComplete(message, receivedFuture);
}
private void interceptAndComplete(final Message<T> message, final CompletableFuture<Message<T>> receivedFuture) {
// call proper interceptor
final Message<T> interceptMessage = beforeConsume(message);
// return message to receivedCallback
completePendingReceive(receivedFuture, interceptMessage);
}
void receiveIndividualMessagesFromBatch(BrokerEntryMetadata brokerEntryMetadata, MessageMetadata msgMetadata,
int redeliveryCount, List<Long> ackSet, ByteBuf uncompressedPayload,
MessageIdData messageId, ClientCnx cnx, long consumerEpoch) {
int batchSize = msgMetadata.getNumMessagesInBatch();
// create ack tracker for entry aka batch
MessageIdImpl batchMessage = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(),
getPartitionIndex());
List<MessageImpl<T>> possibleToDeadLetter = null;
if (deadLetterPolicy != null && redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
possibleToDeadLetter = new ArrayList<>();
}
BitSet ackSetInMessageId = BatchMessageIdImpl.newAckSet(batchSize);
BitSetRecyclable ackBitSet = null;
if (ackSet != null && ackSet.size() > 0) {
ackBitSet = BitSetRecyclable.valueOf(SafeCollectionUtils.longListToArray(ackSet));
}
SingleMessageMetadata singleMessageMetadata = new SingleMessageMetadata();
int skippedMessages = 0;
try {
for (int i = 0; i < batchSize; ++i) {
final MessageImpl<T> message = newSingleMessage(i, batchSize, brokerEntryMetadata, msgMetadata,
singleMessageMetadata, uncompressedPayload, batchMessage, schema, true,
ackBitSet, ackSetInMessageId, redeliveryCount, consumerEpoch);
if (message == null) {
skippedMessages++;
continue;
}
if (possibleToDeadLetter != null) {
possibleToDeadLetter.add(message);
// Skip the message which reaches the max redelivery count.
if (redeliveryCount > deadLetterPolicy.getMaxRedeliverCount()) {
skippedMessages++;
continue;
}
}
if (acknowledgmentsGroupingTracker.isDuplicate(message.getMessageId())) {
skippedMessages++;
continue;
}
executeNotifyCallback(message);
}
if (ackBitSet != null) {
ackBitSet.recycle();
}
} catch (IllegalStateException e) {
log.warn("[{}] [{}] unable to obtain message in batch", subscription, consumerName, e);
discardCorruptedMessage(messageId, cnx, ValidationError.BatchDeSerializeError);
}
if (deadLetterPolicy != null && possibleSendToDeadLetterTopicMessages != null) {
if (redeliveryCount >= deadLetterPolicy.getMaxRedeliverCount()) {
possibleSendToDeadLetterTopicMessages.put(batchMessage,
possibleToDeadLetter);
if (redeliveryCount > deadLetterPolicy.getMaxRedeliverCount()) {
redeliverUnacknowledgedMessages(Collections.singleton(batchMessage));
}
}
}
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] enqueued messages in batch. queue size - {}, available queue size - {}", subscription,
consumerName, incomingMessages.size(), incomingMessages.remainingCapacity());
}
if (skippedMessages > 0) {
increaseAvailablePermits(cnx, skippedMessages);
}
}
private boolean isPriorEntryIndex(long idx) {
return resetIncludeHead ? idx < startMessageId.getEntryId() : idx <= startMessageId.getEntryId();
}
private boolean isPriorBatchIndex(long idx) {
return resetIncludeHead ? idx < startMessageId.getBatchIndex() : idx <= startMessageId.getBatchIndex();
}
private boolean isSameEntry(MessageIdImpl messageId) {
return startMessageId != null
&& messageId.getLedgerId() == startMessageId.getLedgerId()
&& messageId.getEntryId() == startMessageId.getEntryId();
}
/**
* Record the event that one message has been processed by the application.
*
* Periodically, it sends a Flow command to notify the broker that it can push more messages
*/
@Override
protected synchronized void messageProcessed(Message<?> msg) {
ClientCnx currentCnx = cnx();
ClientCnx msgCnx = ((MessageImpl<?>) msg).getCnx();
lastDequeuedMessageId = msg.getMessageId();
if (msgCnx != currentCnx) {
// The processed message did belong to the old queue that was cleared after reconnection.
} else {
if (listener == null && !parentConsumerHasListener) {
increaseAvailablePermits(currentCnx);
}
stats.updateNumMsgsReceived(msg);
trackMessage(msg);
}
decreaseIncomingMessageSize(msg);
}
protected void trackMessage(Message<?> msg) {
if (msg != null) {
trackMessage(msg.getMessageId(), msg.getRedeliveryCount());
}
}
protected void trackMessage(MessageId messageId) {
trackMessage(messageId, 0);
}
protected void trackMessage(MessageId messageId, int redeliveryCount) {
if (conf.getAckTimeoutMillis() > 0 && messageId instanceof MessageIdImpl) {
MessageId id = MessageIdAdvUtils.discardBatch(messageId);
if (hasParentConsumer) {
//TODO: check parent consumer here
// we should no longer track this message, TopicsConsumer will take care from now onwards
unAckedMessageTracker.remove(id);
} else {
trackUnAckedMsgIfNoListener(id, redeliveryCount);
}
}
}
void increaseAvailablePermits(MessageImpl<?> msg) {
ClientCnx currentCnx = cnx();
ClientCnx msgCnx = msg.getCnx();
if (msgCnx == currentCnx) {
increaseAvailablePermits(currentCnx);
}
}
void increaseAvailablePermits(ClientCnx currentCnx) {
increaseAvailablePermits(currentCnx, 1);
}
protected void increaseAvailablePermits(ClientCnx currentCnx, int delta) {
int available = AVAILABLE_PERMITS_UPDATER.addAndGet(this, delta);
while (available >= getCurrentReceiverQueueSize() / 2 && !paused) {
if (AVAILABLE_PERMITS_UPDATER.compareAndSet(this, available, 0)) {
sendFlowPermitsToBroker(currentCnx, available);
break;
} else {
available = AVAILABLE_PERMITS_UPDATER.get(this);
}
}
}
public void increaseAvailablePermits(int delta) {
increaseAvailablePermits(cnx(), delta);
}
@Override
protected void setCurrentReceiverQueueSize(int newSize) {
checkArgument(newSize > 0, "receiver queue size should larger than 0");
int oldSize = CURRENT_RECEIVER_QUEUE_SIZE_UPDATER.getAndSet(this, newSize);
int delta = newSize - oldSize;
if (log.isDebugEnabled()) {
log.debug("[{}][{}] update currentReceiverQueueSize from {} to {}, increaseAvailablePermits by {}",
topic, subscription, oldSize, newSize, delta);
}
increaseAvailablePermits(delta);
}
@Override
public void pause() {
paused = true;
}
@Override
public void resume() {
if (paused) {
paused = false;
increaseAvailablePermits(cnx(), 0);
}
}
@Override
public long getLastDisconnectedTimestamp() {
return connectionHandler.lastConnectionClosedTimestamp;
}
private ByteBuf decryptPayloadIfNeeded(MessageIdData messageId, int redeliveryCount, MessageMetadata msgMetadata,
ByteBuf payload, ClientCnx currentCnx) {
if (msgMetadata.getEncryptionKeysCount() == 0) {
return payload.retain();
}
// If KeyReader is not configured throw exception based on config param
if (conf.getCryptoKeyReader() == null) {
switch (conf.getCryptoFailureAction()) {
case CONSUME:
log.warn("[{}][{}][{}] CryptoKeyReader interface is not implemented. Consuming encrypted message.",
topic, subscription, consumerName);
return payload.retain();
case DISCARD:
log.warn(
"[{}][{}][{}] Skipping decryption since CryptoKeyReader interface is not implemented and"
+ " config is set to discard",
topic, subscription, consumerName);
discardMessage(messageId, currentCnx, ValidationError.DecryptionError);
return null;
case FAIL:
MessageId m = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), partitionIndex);
log.error(
"[{}][{}][{}][{}] Message delivery failed since CryptoKeyReader interface is not"
+ " implemented to consume encrypted message",
topic, subscription, consumerName, m);
unAckedMessageTracker.add(m, redeliveryCount);
return null;
}
}
int maxDecryptedSize = msgCrypto.getMaxOutputSize(payload.readableBytes());
ByteBuf decryptedData = PulsarByteBufAllocator.DEFAULT.buffer(maxDecryptedSize);
ByteBuffer nioDecryptedData = decryptedData.nioBuffer(0, maxDecryptedSize);
if (msgCrypto.decrypt(() -> msgMetadata, payload.nioBuffer(), nioDecryptedData, conf.getCryptoKeyReader())) {
decryptedData.writerIndex(nioDecryptedData.limit());
return decryptedData;
}
decryptedData.release();
switch (conf.getCryptoFailureAction()) {
case CONSUME:
// Note, batch message will fail to consume even if config is set to consume
log.warn("[{}][{}][{}][{}] Decryption failed. Consuming encrypted message since config is set to"
+ " consume.",
topic, subscription, consumerName, messageId);
return payload.retain();
case DISCARD:
log.warn("[{}][{}][{}][{}] Discarding message since decryption failed and config is set to discard",
topic, subscription, consumerName, messageId);
discardMessage(messageId, currentCnx, ValidationError.DecryptionError);
return null;
case FAIL:
MessageId m = new MessageIdImpl(messageId.getLedgerId(), messageId.getEntryId(), partitionIndex);
log.error(
"[{}][{}][{}][{}] Message delivery failed since unable to decrypt incoming message",
topic, subscription, consumerName, m);
unAckedMessageTracker.add(m, redeliveryCount);
return null;
}
return null;
}
private ByteBuf uncompressPayloadIfNeeded(MessageIdData messageId, MessageMetadata msgMetadata, ByteBuf payload,
ClientCnx currentCnx, boolean checkMaxMessageSize) {
CompressionType compressionType = msgMetadata.getCompression();
CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(compressionType);
int uncompressedSize = msgMetadata.getUncompressedSize();
int payloadSize = payload.readableBytes();
if (checkMaxMessageSize && payloadSize > ClientCnx.getMaxMessageSize()) {
// payload size is itself corrupted since it cannot be bigger than the MaxMessageSize
log.error("[{}][{}] Got corrupted payload message size {} at {}", topic, subscription, payloadSize,
messageId);
discardCorruptedMessage(messageId, currentCnx, ValidationError.UncompressedSizeCorruption);
return null;
}
try {
ByteBuf uncompressedPayload = codec.decode(payload, uncompressedSize);
return uncompressedPayload;
} catch (IOException e) {
log.error("[{}][{}] Failed to decompress message with {} at {}: {}", topic, subscription, compressionType,
messageId, e.getMessage(), e);
discardCorruptedMessage(messageId, currentCnx, ValidationError.DecompressionError);
return null;
}
}
private boolean verifyChecksum(ByteBuf headersAndPayload, MessageIdData messageId) {
if (hasChecksum(headersAndPayload)) {
int checksum = Commands.readChecksum(headersAndPayload);
int computedChecksum = Crc32cIntChecksum.computeChecksum(headersAndPayload);
if (checksum != computedChecksum) {
log.error(
"[{}][{}] Checksum mismatch for message at {}:{}. Received checksum: 0x{},"
+ " Computed checksum: 0x{}",
topic, subscription, messageId.getLedgerId(), messageId.getEntryId(),
Long.toHexString(checksum), Integer.toHexString(computedChecksum));
return false;
}
}
return true;
}
private void discardCorruptedMessage(MessageIdImpl messageId, ClientCnx currentCnx,
ValidationError validationError) {
log.error("[{}][{}] Discarding corrupted message at {}:{}", topic, subscription, messageId.getLedgerId(),
messageId.getEntryId());
ByteBuf cmd = Commands.newAck(consumerId, messageId.getLedgerId(), messageId.getEntryId(), null,
AckType.Individual, validationError, Collections.emptyMap(), -1);
currentCnx.ctx().writeAndFlush(cmd, currentCnx.ctx().voidPromise());
increaseAvailablePermits(currentCnx);
stats.incrementNumReceiveFailed();
}
private void discardCorruptedMessage(MessageIdData messageId, ClientCnx currentCnx,
ValidationError validationError) {
log.error("[{}][{}] Discarding corrupted message at {}:{}", topic, subscription, messageId.getLedgerId(),
messageId.getEntryId());
discardMessage(messageId, currentCnx, validationError);
}
private void discardMessage(MessageIdData messageId, ClientCnx currentCnx, ValidationError validationError) {
ByteBuf cmd = Commands.newAck(consumerId, messageId.getLedgerId(), messageId.getEntryId(), null,
AckType.Individual, validationError, Collections.emptyMap(), -1);
currentCnx.ctx().writeAndFlush(cmd, currentCnx.ctx().voidPromise());
increaseAvailablePermits(currentCnx);
stats.incrementNumReceiveFailed();
}
@Override
String getHandlerName() {
return subscription;
}
@Override
public boolean isConnected() {
return getClientCnx() != null && (getState() == State.Ready);
}
public boolean isConnected(ClientCnx cnx) {
return cnx != null && (getState() == State.Ready);
}
int getPartitionIndex() {
return partitionIndex;
}
@Override
public int getAvailablePermits() {
return AVAILABLE_PERMITS_UPDATER.get(this);
}
@Override
public int numMessagesInQueue() {
return incomingMessages.size();
}
@Override
public void redeliverUnacknowledgedMessages() {
// First : synchronized in order to handle consumer reconnect produce race condition, when broker receive
// redeliverUnacknowledgedMessages and consumer have not be created and
// then receive reconnect epoch change the broker is smaller than the client epoch, this will cause client epoch
// smaller than broker epoch forever. client will not receive message anymore.
// Second : we should synchronized `ClientCnx cnx = cnx()` to
// prevent use old cnx to send redeliverUnacknowledgedMessages to a old broker
synchronized (ConsumerImpl.this) {
ClientCnx cnx = cnx();
// V1 don't support redeliverUnacknowledgedMessages
if (cnx != null && cnx.getRemoteEndpointProtocolVersion() < ProtocolVersion.v2.getValue()) {
if ((getState() == State.Connecting)) {
log.warn("[{}] Client Connection needs to be established "
+ "for redelivery of unacknowledged messages", this);
} else {
log.warn("[{}] Reconnecting the client to redeliver the messages.", this);
cnx.ctx().close();
}
return;
}
// clear local message
int currentSize;
incomingQueueLock.lock();
try {
// we should increase epoch every time, because MultiTopicsConsumerImpl also increase it,
// we need to keep both epochs the same
if (conf.getSubscriptionType() == SubscriptionType.Failover
|| conf.getSubscriptionType() == SubscriptionType.Exclusive) {
CONSUMER_EPOCH.incrementAndGet(this);
}
// clear local message
currentSize = incomingMessages.size();
clearIncomingMessages();
unAckedMessageTracker.clear();
} finally {
incomingQueueLock.unlock();
}
// is channel is connected, we should send redeliver command to broker
if (cnx != null && isConnected(cnx)) {
cnx.ctx().writeAndFlush(Commands.newRedeliverUnacknowledgedMessages(
consumerId, CONSUMER_EPOCH.get(this)), cnx.ctx().voidPromise());
if (currentSize > 0) {
increaseAvailablePermits(cnx, currentSize);
}
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] [{}] Redeliver unacked messages and send {} permits", subscription, topic,
consumerName, currentSize);
}
} else {
log.warn("[{}] Send redeliver messages command but the client is reconnect or close, "
+ "so don't need to send redeliver command to broker", this);
}
}
}
@Override
public void redeliverUnacknowledgedMessages(Set<MessageId> messageIds) {
if (messageIds.isEmpty()) {
return;
}
if (conf.getSubscriptionType() != SubscriptionType.Shared
&& conf.getSubscriptionType() != SubscriptionType.Key_Shared) {
// We cannot redeliver single messages if subscription type is not Shared
redeliverUnacknowledgedMessages();
return;
}
ClientCnx cnx = cnx();
if (isConnected() && cnx.getRemoteEndpointProtocolVersion() >= ProtocolVersion.v2.getValue()) {
int messagesFromQueue = removeExpiredMessagesFromQueue(messageIds);
Iterables.partition(messageIds, MAX_REDELIVER_UNACKNOWLEDGED).forEach(ids -> {
getRedeliveryMessageIdData(ids).thenAccept(messageIdData -> {
if (!messageIdData.isEmpty()) {
ByteBuf cmd = Commands.newRedeliverUnacknowledgedMessages(consumerId, messageIdData);
cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
}
});
});
if (messagesFromQueue > 0) {
increaseAvailablePermits(cnx, messagesFromQueue);
}
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] [{}] Redeliver unacked messages and increase {} permits", subscription, topic,
consumerName, messagesFromQueue);
}
return;
}
if (cnx == null || (getState() == State.Connecting)) {
log.warn("[{}] Client Connection needs to be established for redelivery of unacknowledged messages", this);
} else {
log.warn("[{}] Reconnecting the client to redeliver the messages.", this);
cnx.ctx().close();
}
}
@Override
protected void updateAutoScaleReceiverQueueHint() {
boolean prev = scaleReceiverQueueHint.getAndSet(
getAvailablePermits() + incomingMessages.size() >= getCurrentReceiverQueueSize());
if (log.isDebugEnabled() && prev != scaleReceiverQueueHint.get()) {
log.debug("updateAutoScaleReceiverQueueHint {} -> {}", prev, scaleReceiverQueueHint.get());
}
}
@Override
protected void completeOpBatchReceive(OpBatchReceive<T> op) {
notifyPendingBatchReceivedCallBack(op.future);
}
private CompletableFuture<List<MessageIdData>> getRedeliveryMessageIdData(List<MessageId> messageIds) {
if (messageIds == null || messageIds.isEmpty()) {
return CompletableFuture.completedFuture(Collections.emptyList());
}
List<CompletableFuture<MessageIdData>> futures = messageIds.stream().map(originalMessageId -> {
final MessageIdAdv messageId = (MessageIdAdv) originalMessageId;
CompletableFuture<Boolean> future = processPossibleToDLQ(messageId);
return future.thenApply(sendToDLQ -> {
if (!sendToDLQ) {
return new MessageIdData()
.setPartition(messageId.getPartitionIndex())
.setLedgerId(messageId.getLedgerId())
.setEntryId(messageId.getEntryId());
}
return null;
});
}).collect(Collectors.toList());
return FutureUtil.waitForAll(futures).thenApply(v ->
futures.stream().map(CompletableFuture::join).filter(Objects::nonNull).collect(Collectors.toList()));
}
private CompletableFuture<Boolean> processPossibleToDLQ(MessageIdAdv messageId) {
List<MessageImpl<T>> deadLetterMessages = null;
if (possibleSendToDeadLetterTopicMessages != null) {
deadLetterMessages = possibleSendToDeadLetterTopicMessages.get(MessageIdAdvUtils.discardBatch(messageId));
}
CompletableFuture<Boolean> result = new CompletableFuture<>();
if (deadLetterMessages != null) {
initDeadLetterProducerIfNeeded();
List<MessageImpl<T>> finalDeadLetterMessages = deadLetterMessages;
deadLetterProducer.thenAcceptAsync(producerDLQ -> {
for (MessageImpl<T> message : finalDeadLetterMessages) {
String originMessageIdStr = message.getMessageId().toString();
String originTopicNameStr = getOriginTopicNameStr(message);
TypedMessageBuilder<byte[]> typedMessageBuilderNew =
producerDLQ.newMessage(Schema.AUTO_PRODUCE_BYTES(message.getReaderSchema().get()))
.value(message.getData())
.properties(getPropertiesMap(message, originMessageIdStr, originTopicNameStr));
if (message.hasKey()) {
typedMessageBuilderNew.key(message.getKey());
}
typedMessageBuilderNew.sendAsync()
.thenAccept(messageIdInDLQ -> {
possibleSendToDeadLetterTopicMessages.remove(messageId);
acknowledgeAsync(messageId).whenComplete((v, ex) -> {
if (ex != null) {
log.warn("[{}] [{}] [{}] Failed to acknowledge the message {} of the original"
+ " topic but send to the DLQ successfully.",
topicName, subscription, consumerName, messageId, ex);
result.complete(false);
} else {
result.complete(true);
}
});
}).exceptionally(ex -> {
if (ex instanceof PulsarClientException.ProducerQueueIsFullError) {
log.warn("[{}] [{}] [{}] Failed to send DLQ message to {} for message id {}: {}",
topicName, subscription, consumerName,
deadLetterPolicy.getDeadLetterTopic(), messageId, ex.getMessage());
} else {
log.warn("[{}] [{}] [{}] Failed to send DLQ message to {} for message id {}",
topicName, subscription, consumerName,
deadLetterPolicy.getDeadLetterTopic(), messageId, ex);
}
result.complete(false);
return null;
});
}
}, internalPinnedExecutor).exceptionally(ex -> {
log.error("Dead letter producer exception with topic: {}", deadLetterPolicy.getDeadLetterTopic(), ex);
deadLetterProducer = null;
result.complete(false);
return null;
});
} else {
result.complete(false);
}
return result;
}
private void initDeadLetterProducerIfNeeded() {
if (deadLetterProducer == null) {
createProducerLock.writeLock().lock();
try {
if (deadLetterProducer == null) {
deadLetterProducer =
((ProducerBuilderImpl<byte[]>) client.newProducer(Schema.AUTO_PRODUCE_BYTES(schema)))
.initialSubscriptionName(this.deadLetterPolicy.getInitialSubscriptionName())
.topic(this.deadLetterPolicy.getDeadLetterTopic())
.blockIfQueueFull(false)
.createAsync();
}
} finally {
createProducerLock.writeLock().unlock();
}
}
}
@Override
public void seek(MessageId messageId) throws PulsarClientException {
try {
seekAsync(messageId).get();
} catch (Exception e) {
throw PulsarClientException.unwrap(e);
}
}
@Override
public void seek(long timestamp) throws PulsarClientException {
try {
seekAsync(timestamp).get();
} catch (Exception e) {
throw PulsarClientException.unwrap(e);
}
}
@Override
public void seek(Function<String, Object> function) throws PulsarClientException {
try {
seekAsync(function).get();
} catch (Exception e) {
throw PulsarClientException.unwrap(e);
}
}
@Override
public CompletableFuture<Void> seekAsync(Function<String, Object> function) {
if (function == null) {
return FutureUtil.failedFuture(new PulsarClientException("Function must be set"));
}
Object seekPosition = function.apply(topic);
if (seekPosition == null) {
return CompletableFuture.completedFuture(null);
}
if (seekPosition instanceof MessageId) {
return seekAsync((MessageId) seekPosition);
} else if (seekPosition.getClass().getTypeName()
.equals(Long.class.getTypeName())) {
return seekAsync((long) seekPosition);
}
return FutureUtil.failedFuture(
new PulsarClientException("Only support seek by messageId or timestamp"));
}
private Optional<CompletableFuture<Void>> seekAsyncCheckState(String seekBy) {
if (getState() == State.Closing || getState() == State.Closed) {
return Optional.of(FutureUtil
.failedFuture(new PulsarClientException.AlreadyClosedException(
String.format("The consumer %s was already closed when seeking the subscription %s of the"
+ " topic %s to %s", consumerName, subscription, topicName.toString(), seekBy))));
}
if (!isConnected()) {
return Optional.of(FutureUtil.failedFuture(new PulsarClientException(
String.format("The client is not connected to the broker when seeking the subscription %s of the "
+ "topic %s to %s", subscription, topicName.toString(), seekBy))));
}
return Optional.empty();
}
private CompletableFuture<Void> seekAsyncInternal(long requestId, ByteBuf seek, MessageId seekId, String seekBy) {
final CompletableFuture<Void> seekFuture = new CompletableFuture<>();
ClientCnx cnx = cnx();
if (!duringSeek.compareAndSet(false, true)) {
final String message = String.format(
"[%s][%s] attempting to seek operation that is already in progress (seek by %s)",
topic, subscription, seekBy);
log.warn("[{}][{}] Attempting to seek operation that is already in progress, cancelling {}",
topic, subscription, seekBy);
seekFuture.completeExceptionally(new IllegalStateException(message));
return seekFuture;
}
MessageIdAdv originSeekMessageId = seekMessageId;
seekMessageId = (MessageIdAdv) seekId;
log.info("[{}][{}] Seeking subscription to {}", topic, subscription, seekBy);
cnx.sendRequestWithId(seek, requestId).thenRun(() -> {
log.info("[{}][{}] Successfully reset subscription to {}", topic, subscription, seekBy);
acknowledgmentsGroupingTracker.flushAndClean();
lastDequeuedMessageId = MessageId.earliest;
clearIncomingMessages();
seekFuture.complete(null);
}).exceptionally(e -> {
// re-set duringSeek and seekMessageId if seek failed
seekMessageId = originSeekMessageId;
duringSeek.set(false);
log.error("[{}][{}] Failed to reset subscription: {}", topic, subscription, e.getCause().getMessage());
seekFuture.completeExceptionally(
PulsarClientException.wrap(e.getCause(),
String.format("Failed to seek the subscription %s of the topic %s to %s",
subscription, topicName.toString(), seekBy)));
return null;
});
return seekFuture;
}
@Override
public CompletableFuture<Void> seekAsync(long timestamp) {
String seekBy = String.format("the timestamp %d", timestamp);
return seekAsyncCheckState(seekBy).orElseGet(() -> {
long requestId = client.newRequestId();
return seekAsyncInternal(requestId, Commands.newSeek(consumerId, requestId, timestamp),
MessageId.earliest, seekBy);
});
}
@Override
public CompletableFuture<Void> seekAsync(MessageId messageId) {
String seekBy = String.format("the message %s", messageId.toString());
return seekAsyncCheckState(seekBy).orElseGet(() -> {
long requestId = client.newRequestId();
final MessageIdAdv msgId = (MessageIdAdv) messageId;
final MessageIdAdv firstChunkMsgId = msgId.getFirstChunkMessageId();
final ByteBuf seek;
if (msgId.getFirstChunkMessageId() != null) {
seek = Commands.newSeek(consumerId, requestId, firstChunkMsgId.getLedgerId(),
firstChunkMsgId.getEntryId(), new long[0]);
} else {
final long[] ackSetArr;
if (MessageIdAdvUtils.isBatch(msgId)) {
final BitSetRecyclable ackSet = BitSetRecyclable.create();
ackSet.set(0, msgId.getBatchSize());
ackSet.clear(0, Math.max(msgId.getBatchIndex(), 0));
ackSetArr = ackSet.toLongArray();
ackSet.recycle();
} else {
ackSetArr = new long[0];
}
seek = Commands.newSeek(consumerId, requestId, msgId.getLedgerId(), msgId.getEntryId(), ackSetArr);
}
return seekAsyncInternal(requestId, seek, messageId, seekBy);
});
}
public boolean hasMessageAvailable() throws PulsarClientException {
try {
return hasMessageAvailableAsync().get();
} catch (Exception e) {
throw PulsarClientException.unwrap(e);
}
}
public CompletableFuture<Boolean> hasMessageAvailableAsync() {
final CompletableFuture<Boolean> booleanFuture = new CompletableFuture<>();
// we haven't read yet. use startMessageId for comparison
if (lastDequeuedMessageId == MessageId.earliest) {
// if we are starting from latest, we should seek to the actual last message first.
// allow the last one to be read when read head inclusively.
if (MessageId.latest.equals(startMessageId)) {
CompletableFuture<GetLastMessageIdResponse> future = internalGetLastMessageIdAsync();
// if the consumer is configured to read inclusive then we need to seek to the last message
if (resetIncludeHead) {
future = future.thenCompose((lastMessageIdResponse) ->
seekAsync(lastMessageIdResponse.lastMessageId)
.thenApply((ignore) -> lastMessageIdResponse));
}
future.thenAccept(response -> {
MessageIdAdv lastMessageId = (MessageIdAdv) response.lastMessageId;
MessageIdAdv markDeletePosition = (MessageIdAdv) response.markDeletePosition;
if (markDeletePosition != null && !(markDeletePosition.getEntryId() < 0
&& markDeletePosition.getLedgerId() > lastMessageId.getLedgerId())) {
// we only care about comparing ledger ids and entry ids as mark delete position doesn't have
// other ids such as batch index
int result = ComparisonChain.start()
.compare(markDeletePosition.getLedgerId(), lastMessageId.getLedgerId())
.compare(markDeletePosition.getEntryId(), lastMessageId.getEntryId())
.result();
if (lastMessageId.getEntryId() < 0) {
completehasMessageAvailableWithValue(booleanFuture, false);
} else {
completehasMessageAvailableWithValue(booleanFuture,
resetIncludeHead ? result <= 0 : result < 0);
}
} else if (lastMessageId == null || lastMessageId.getEntryId() < 0) {
completehasMessageAvailableWithValue(booleanFuture, false);
} else {
completehasMessageAvailableWithValue(booleanFuture, resetIncludeHead);
}
}).exceptionally(ex -> {
log.error("[{}][{}] Failed getLastMessageId command", topic, subscription, ex);
booleanFuture.completeExceptionally(ex.getCause());
return null;
});
return booleanFuture;
}
if (hasMoreMessages(lastMessageIdInBroker, startMessageId, resetIncludeHead)) {
completehasMessageAvailableWithValue(booleanFuture, true);
return booleanFuture;
}
getLastMessageIdAsync().thenAccept(messageId -> {
lastMessageIdInBroker = messageId;
completehasMessageAvailableWithValue(booleanFuture,
hasMoreMessages(lastMessageIdInBroker, startMessageId, resetIncludeHead));
}).exceptionally(e -> {
log.error("[{}][{}] Failed getLastMessageId command", topic, subscription);
booleanFuture.completeExceptionally(e.getCause());
return null;
});
} else {
// read before, use lastDequeueMessage for comparison
if (hasMoreMessages(lastMessageIdInBroker, lastDequeuedMessageId, false)) {
completehasMessageAvailableWithValue(booleanFuture, true);
return booleanFuture;
}
getLastMessageIdAsync().thenAccept(messageId -> {
lastMessageIdInBroker = messageId;
completehasMessageAvailableWithValue(booleanFuture,
hasMoreMessages(lastMessageIdInBroker, lastDequeuedMessageId, false));
}).exceptionally(e -> {
log.error("[{}][{}] Failed getLastMessageId command", topic, subscription);
booleanFuture.completeExceptionally(e.getCause());
return null;
});
}
return booleanFuture;
}
private void completehasMessageAvailableWithValue(CompletableFuture<Boolean> future, boolean value) {
internalPinnedExecutor.execute(() -> {
future.complete(value);
});
}
private boolean hasMoreMessages(MessageId lastMessageIdInBroker, MessageId messageId, boolean inclusive) {
if (inclusive && lastMessageIdInBroker.compareTo(messageId) >= 0
&& ((MessageIdImpl) lastMessageIdInBroker).getEntryId() != -1) {
return true;
}
return !inclusive && lastMessageIdInBroker.compareTo(messageId) > 0
&& ((MessageIdImpl) lastMessageIdInBroker).getEntryId() != -1;
}
private static final class GetLastMessageIdResponse {
final MessageId lastMessageId;
final MessageId markDeletePosition;
GetLastMessageIdResponse(MessageId lastMessageId, MessageId markDeletePosition) {
this.lastMessageId = lastMessageId;
this.markDeletePosition = markDeletePosition;
}
}
@Deprecated
@Override
public CompletableFuture<MessageId> getLastMessageIdAsync() {
return internalGetLastMessageIdAsync().thenApply(r -> r.lastMessageId);
}
@Override
public CompletableFuture<List<TopicMessageId>> getLastMessageIdsAsync() {
return getLastMessageIdAsync()
.thenApply(msgId -> Collections.singletonList(new TopicMessageIdImpl(topic, (MessageIdAdv) msgId)));
}
public CompletableFuture<GetLastMessageIdResponse> internalGetLastMessageIdAsync() {
if (getState() == State.Closing || getState() == State.Closed) {
return FutureUtil
.failedFuture(new PulsarClientException.AlreadyClosedException(
String.format("The consumer %s was already closed when the subscription %s of the topic %s "
+ "getting the last message id", consumerName, subscription, topicName.toString())));
}
AtomicLong opTimeoutMs = new AtomicLong(client.getConfiguration().getOperationTimeoutMs());
Backoff backoff = new BackoffBuilder()
.setInitialTime(100, TimeUnit.MILLISECONDS)
.setMax(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
.setMandatoryStop(0, TimeUnit.MILLISECONDS)
.create();
CompletableFuture<GetLastMessageIdResponse> getLastMessageIdFuture = new CompletableFuture<>();
internalGetLastMessageIdAsync(backoff, opTimeoutMs, getLastMessageIdFuture);
return getLastMessageIdFuture;
}
private void internalGetLastMessageIdAsync(final Backoff backoff,
final AtomicLong remainingTime,
CompletableFuture<GetLastMessageIdResponse> future) {
ClientCnx cnx = cnx();
if (isConnected() && cnx != null) {
if (!Commands.peerSupportsGetLastMessageId(cnx.getRemoteEndpointProtocolVersion())) {
future.completeExceptionally(
new PulsarClientException.NotSupportedException(
String.format("The command `GetLastMessageId` is not supported for the protocol version %d. "
+ "The consumer is %s, topic %s, subscription %s",
cnx.getRemoteEndpointProtocolVersion(),
consumerName, topicName.toString(), subscription)));
return;
}
long requestId = client.newRequestId();
ByteBuf getLastIdCmd = Commands.newGetLastMessageId(consumerId, requestId);
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Get topic last message Id", topic, subscription);
}
cnx.sendGetLastMessageId(getLastIdCmd, requestId).thenAccept(cmd -> {
MessageIdData lastMessageId = cmd.getLastMessageId();
MessageIdImpl markDeletePosition = null;
if (cmd.hasConsumerMarkDeletePosition()) {
markDeletePosition = new MessageIdImpl(cmd.getConsumerMarkDeletePosition().getLedgerId(),
cmd.getConsumerMarkDeletePosition().getEntryId(), -1);
}
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Successfully getLastMessageId {}:{}",
topic, subscription, lastMessageId.getLedgerId(), lastMessageId.getEntryId());
}
MessageId lastMsgId = lastMessageId.getBatchIndex() <= 0
? new MessageIdImpl(lastMessageId.getLedgerId(),
lastMessageId.getEntryId(), lastMessageId.getPartition())
: new BatchMessageIdImpl(lastMessageId.getLedgerId(), lastMessageId.getEntryId(),
lastMessageId.getPartition(), lastMessageId.getBatchIndex());
future.complete(new GetLastMessageIdResponse(lastMsgId, markDeletePosition));
}).exceptionally(e -> {
log.error("[{}][{}] Failed getLastMessageId command", topic, subscription);
future.completeExceptionally(
PulsarClientException.wrap(e.getCause(),
String.format("The subscription %s of the topic %s gets the last message id was failed",
subscription, topicName.toString())));
return null;
});
} else {
long nextDelay = Math.min(backoff.next(), remainingTime.get());
if (nextDelay <= 0) {
future.completeExceptionally(
new PulsarClientException.TimeoutException(
String.format("The subscription %s of the topic %s could not get the last message id "
+ "withing configured timeout", subscription, topicName.toString())));
return;
}
((ScheduledExecutorService) client.getScheduledExecutorProvider().getExecutor()).schedule(() -> {
log.warn("[{}] [{}] Could not get connection while getLastMessageId -- Will try again in {} ms",
topic, getHandlerName(), nextDelay);
remainingTime.addAndGet(-nextDelay);
internalGetLastMessageIdAsync(backoff, remainingTime, future);
}, nextDelay, TimeUnit.MILLISECONDS);
}
}
private boolean isMessageUndecryptable(MessageMetadata msgMetadata) {
return (msgMetadata.getEncryptionKeysCount() > 0 && conf.getCryptoKeyReader() == null
&& conf.getCryptoFailureAction() == ConsumerCryptoFailureAction.CONSUME);
}
/**
* Create EncryptionContext if message payload is encrypted.
*
* @param msgMetadata
* @return {@link Optional}<{@link EncryptionContext}>
*/
private Optional<EncryptionContext> createEncryptionContext(MessageMetadata msgMetadata) {
EncryptionContext encryptionCtx = null;
if (msgMetadata.getEncryptionKeysCount() > 0) {
encryptionCtx = new EncryptionContext();
Map<String, EncryptionKey> keys = msgMetadata.getEncryptionKeysList().stream()
.collect(
Collectors.toMap(EncryptionKeys::getKey,
e -> new EncryptionKey(e.getValue(),
e.getMetadatasList().stream().collect(
Collectors.toMap(KeyValue::getKey, KeyValue::getValue)))));
byte[] encParam = msgMetadata.getEncryptionParam();
Optional<Integer> batchSize = Optional
.ofNullable(msgMetadata.hasNumMessagesInBatch() ? msgMetadata.getNumMessagesInBatch() : null);
encryptionCtx.setKeys(keys);
encryptionCtx.setParam(encParam);
if (msgMetadata.hasEncryptionAlgo()) {
encryptionCtx.setAlgorithm(msgMetadata.getEncryptionAlgo());
}
encryptionCtx
.setCompressionType(CompressionCodecProvider.convertFromWireProtocol(msgMetadata.getCompression()));
encryptionCtx.setUncompressedMessageSize(msgMetadata.getUncompressedSize());
encryptionCtx.setBatchSize(batchSize);
}
return Optional.ofNullable(encryptionCtx);
}
private int removeExpiredMessagesFromQueue(Set<MessageId> messageIds) {
int messagesFromQueue = 0;
Message<T> peek = incomingMessages.peek();
if (peek != null) {
MessageIdAdv messageId = MessageIdAdvUtils.discardBatch(peek.getMessageId());
if (!messageIds.contains(messageId)) {
// first message is not expired, then no message is expired in queue.
return 0;
}
// try not to remove elements that are added while we remove
Message<T> message = incomingMessages.poll();
while (message != null) {
decreaseIncomingMessageSize(message);
messagesFromQueue++;
MessageIdAdv id = MessageIdAdvUtils.discardBatch(message.getMessageId());
if (!messageIds.contains(id)) {
messageIds.add(id);
break;
}
message.release();
message = incomingMessages.poll();
}
}
return messagesFromQueue;
}
@Override
public ConsumerStatsRecorder getStats() {
return stats;
}
void setTerminated() {
log.info("[{}] [{}] [{}] Consumer has reached the end of topic", subscription, topic, consumerName);
hasReachedEndOfTopic = true;
if (listener != null) {
// Propagate notification to listener
listener.reachedEndOfTopic(this);
}
}
@Override
public boolean hasReachedEndOfTopic() {
return hasReachedEndOfTopic;
}
@Override
public int hashCode() {
return Objects.hash(topic, subscription, consumerName);
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof ConsumerImpl)) {
return false;
}
ConsumerImpl<?> consumer = (ConsumerImpl<?>) o;
return consumerId == consumer.consumerId;
}
// wrapper for connection methods
ClientCnx cnx() {
return this.connectionHandler.cnx();
}
void resetBackoff() {
this.connectionHandler.resetBackoff();
}
void connectionClosed(ClientCnx cnx) {
this.connectionHandler.connectionClosed(cnx);
}
public ClientCnx getClientCnx() {
return this.connectionHandler.cnx();
}
void setClientCnx(ClientCnx clientCnx) {
if (clientCnx != null) {
this.connectionHandler.setClientCnx(clientCnx);
clientCnx.registerConsumer(consumerId, this);
if (conf.isAckReceiptEnabled()
&& !Commands.peerSupportsAckReceipt(clientCnx.getRemoteEndpointProtocolVersion())) {
log.warn("Server don't support ack for receipt! "
+ "ProtoVersion >=17 support! nowVersion : {}", clientCnx.getRemoteEndpointProtocolVersion());
}
}
ClientCnx previousClientCnx = clientCnxUsedForConsumerRegistration.getAndSet(clientCnx);
if (previousClientCnx != null && previousClientCnx != clientCnx) {
previousClientCnx.removeConsumer(consumerId);
}
}
void deregisterFromClientCnx() {
setClientCnx(null);
}
void grabCnx() {
this.connectionHandler.grabCnx();
}
@Deprecated
public String getTopicNameWithoutPartition() {
return topicNameWithoutPartition;
}
static class ChunkedMessageCtx {
protected int totalChunks = -1;
protected ByteBuf chunkedMsgBuffer;
protected int lastChunkedMessageId = -1;
protected MessageIdImpl[] chunkedMessageIds;
protected long receivedTime = 0;
static ChunkedMessageCtx get(int numChunksFromMsg, ByteBuf chunkedMsgBuffer) {
ChunkedMessageCtx ctx = RECYCLER.get();
ctx.totalChunks = numChunksFromMsg;
ctx.chunkedMsgBuffer = chunkedMsgBuffer;
ctx.chunkedMessageIds = new MessageIdImpl[numChunksFromMsg];
ctx.receivedTime = System.currentTimeMillis();
return ctx;
}
private final Handle<ChunkedMessageCtx> recyclerHandle;
private ChunkedMessageCtx(Handle<ChunkedMessageCtx> recyclerHandle) {
this.recyclerHandle = recyclerHandle;
}
private static final Recycler<ChunkedMessageCtx> RECYCLER = new Recycler<ChunkedMessageCtx>() {
protected ChunkedMessageCtx newObject(Recycler.Handle<ChunkedMessageCtx> handle) {
return new ChunkedMessageCtx(handle);
}
};
public void recycle() {
this.totalChunks = -1;
this.chunkedMsgBuffer = null;
this.lastChunkedMessageId = -1;
recyclerHandle.recycle(this);
}
}
private void removeOldestPendingChunkedMessage() {
ChunkedMessageCtx chunkedMsgCtx = null;
String firstPendingMsgUuid = null;
while (chunkedMsgCtx == null && !pendingChunkedMessageUuidQueue.isEmpty()) {
// remove oldest pending chunked-message group and free memory
firstPendingMsgUuid = pendingChunkedMessageUuidQueue.poll();
chunkedMsgCtx = StringUtils.isNotBlank(firstPendingMsgUuid) ? chunkedMessagesMap.get(firstPendingMsgUuid)
: null;
}
removeChunkMessage(firstPendingMsgUuid, chunkedMsgCtx, this.autoAckOldestChunkedMessageOnQueueFull);
}
protected void removeExpireIncompleteChunkedMessages() {
if (expireTimeOfIncompleteChunkedMessageMillis <= 0) {
return;
}
ChunkedMessageCtx chunkedMsgCtx = null;
String messageUUID;
while ((messageUUID = pendingChunkedMessageUuidQueue.peek()) != null) {
chunkedMsgCtx = StringUtils.isNotBlank(messageUUID) ? chunkedMessagesMap.get(messageUUID) : null;
if (chunkedMsgCtx != null && System
.currentTimeMillis() > (chunkedMsgCtx.receivedTime + expireTimeOfIncompleteChunkedMessageMillis)) {
pendingChunkedMessageUuidQueue.remove(messageUUID);
removeChunkMessage(messageUUID, chunkedMsgCtx, true);
} else {
return;
}
}
}
private void removeChunkMessage(String msgUUID, ChunkedMessageCtx chunkedMsgCtx, boolean autoAck) {
if (chunkedMsgCtx == null) {
return;
}
// clean up pending chuncked-Message
chunkedMessagesMap.remove(msgUUID);
if (chunkedMsgCtx.chunkedMessageIds != null) {
for (MessageIdImpl msgId : chunkedMsgCtx.chunkedMessageIds) {
if (msgId == null) {
continue;
}
if (autoAck) {
log.info("Removing chunk message-id {}", msgId);
doAcknowledge(msgId, AckType.Individual, Collections.emptyMap(), null);
} else {
trackMessage(msgId);
}
}
}
if (chunkedMsgCtx.chunkedMsgBuffer != null) {
chunkedMsgCtx.chunkedMsgBuffer.release();
}
chunkedMsgCtx.recycle();
pendingChunkedMessageCount--;
}
private CompletableFuture<Void> doTransactionAcknowledgeForResponse(MessageId messageId, AckType ackType,
ValidationError validationError,
Map<String, Long> properties, TxnID txnID) {
long requestId = client.newRequestId();
final MessageIdAdv messageIdAdv = (MessageIdAdv) messageId;
final long ledgerId = messageIdAdv.getLedgerId();
final long entryId = messageIdAdv.getEntryId();
final ByteBuf cmd;
if (MessageIdAdvUtils.isBatch(messageIdAdv)) {
BitSetRecyclable bitSetRecyclable = BitSetRecyclable.create();
bitSetRecyclable.set(0, messageIdAdv.getBatchSize());
if (ackType == AckType.Cumulative) {
MessageIdAdvUtils.acknowledge(messageIdAdv, false);
bitSetRecyclable.clear(0, messageIdAdv.getBatchIndex() + 1);
} else {
bitSetRecyclable.clear(messageIdAdv.getBatchIndex());
}
cmd = Commands.newAck(consumerId, ledgerId, entryId, bitSetRecyclable, ackType, validationError, properties,
txnID.getLeastSigBits(), txnID.getMostSigBits(), requestId, messageIdAdv.getBatchSize());
bitSetRecyclable.recycle();
} else {
cmd = Commands.newAck(consumerId, ledgerId, entryId, null, ackType, validationError, properties,
txnID.getLeastSigBits(), txnID.getMostSigBits(), requestId);
}
if (ackType == AckType.Cumulative) {
unAckedMessageTracker.removeMessagesTill(messageId);
} else {
unAckedMessageTracker.remove(messageId);
}
ClientCnx cnx = cnx();
if (cnx == null) {
return FutureUtil.failedFuture(new PulsarClientException
.ConnectException("Failed to ack message [" + messageId + "] "
+ "for transaction [" + txnID + "] due to consumer connect fail, consumer state: " + getState()));
} else {
return cnx.newAckForReceipt(cmd, requestId);
}
}
private CompletableFuture<Void> doTransactionAcknowledgeForResponse(List<MessageId> messageIds, AckType ackType,
Map<String, Long> properties, TxnID txnID) {
long requestId = client.newRequestId();
List<MessageIdData> messageIdDataList = new LinkedList<>();
for (MessageId messageId : messageIds) {
final MessageIdAdv messageIdAdv = (MessageIdAdv) messageId;
final MessageIdData messageIdData = new MessageIdData();
messageIdData.setLedgerId(messageIdAdv.getLedgerId());
messageIdData.setEntryId(messageIdAdv.getEntryId());
if (MessageIdAdvUtils.isBatch(messageIdAdv)) {
final BitSetRecyclable bitSetRecyclable = BitSetRecyclable.create();
bitSetRecyclable.set(0, messageIdAdv.getBatchSize());
if (ackType == AckType.Cumulative) {
MessageIdAdvUtils.acknowledge(messageIdAdv, false);
bitSetRecyclable.clear(0, messageIdAdv.getBatchIndex() + 1);
} else {
bitSetRecyclable.clear(messageIdAdv.getBatchIndex());
}
for (long x : bitSetRecyclable.toLongArray()) {
messageIdData.addAckSet(x);
}
bitSetRecyclable.recycle();
}
messageIdDataList.add(messageIdData);
if (ackType == AckType.Cumulative) {
unAckedMessageTracker.removeMessagesTill(messageId);
} else {
unAckedMessageTracker.remove(messageId);
}
}
final ByteBuf cmd = Commands.newAck(consumerId, messageIdDataList, ackType, null, properties,
txnID.getLeastSigBits(), txnID.getMostSigBits(), requestId);
return cnx().newAckForReceipt(cmd, requestId);
}
public Map<MessageIdAdv, List<MessageImpl<T>>> getPossibleSendToDeadLetterTopicMessages() {
return possibleSendToDeadLetterTopicMessages;
}
boolean isAckReceiptEnabled() {
ClientCnx cnx = getClientCnx();
return conf.isAckReceiptEnabled() && cnx != null
&& Commands.peerSupportsAckReceipt(cnx.getRemoteEndpointProtocolVersion());
}
private static final Logger log = LoggerFactory.getLogger(ConsumerImpl.class);
}