blob: 10b65fed431748b0a801fd2115c199c8ea9b82d9 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.client.impl;
import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import io.netty.buffer.ByteBuf;
import io.netty.channel.EventLoopGroup;
import io.netty.util.concurrent.FastThreadLocal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Function;
import javax.annotation.Nullable;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Triple;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
import org.apache.pulsar.client.util.TimedCompletableFuture;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandAck.AckType;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.BitSetRecyclable;
import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable;
/**
* Group the acknowledgements for a certain time and then sends them out in a single protobuf command.
*/
@Slf4j
public class PersistentAcknowledgmentsGroupingTracker implements AcknowledgmentsGroupingTracker {
/**
* When reaching the max group size, an ack command is sent out immediately.
*/
private static final int MAX_ACK_GROUP_SIZE = 1000;
private final ConsumerImpl<?> consumer;
private final long acknowledgementGroupTimeMicros;
private volatile TimedCompletableFuture<Void> currentIndividualAckFuture;
private volatile TimedCompletableFuture<Void> currentCumulativeAckFuture;
private final LastCumulativeAck lastCumulativeAck = new LastCumulativeAck();
// When we flush the command, we should ensure current ack request will send correct
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
/**
* This is a set of all the individual acks that the application has issued and that were not already sent to
* broker.
*/
private final ConcurrentSkipListSet<MessageIdImpl> pendingIndividualAcks;
private final ConcurrentHashMap<MessageIdImpl, ConcurrentBitSetRecyclable> pendingIndividualBatchIndexAcks;
private final ScheduledFuture<?> scheduledTask;
private final boolean batchIndexAckEnabled;
private final boolean ackReceiptEnabled;
public PersistentAcknowledgmentsGroupingTracker(ConsumerImpl<?> consumer, ConsumerConfigurationData<?> conf,
EventLoopGroup eventLoopGroup) {
this.consumer = consumer;
this.pendingIndividualAcks = new ConcurrentSkipListSet<>();
this.pendingIndividualBatchIndexAcks = new ConcurrentHashMap<>();
this.acknowledgementGroupTimeMicros = conf.getAcknowledgementsGroupTimeMicros();
this.batchIndexAckEnabled = conf.isBatchIndexAckEnabled();
this.ackReceiptEnabled = conf.isAckReceiptEnabled();
this.currentIndividualAckFuture = new TimedCompletableFuture<>();
this.currentCumulativeAckFuture = new TimedCompletableFuture<>();
if (acknowledgementGroupTimeMicros > 0) {
scheduledTask = eventLoopGroup.next().scheduleWithFixedDelay(catchingAndLoggingThrowables(this::flush),
acknowledgementGroupTimeMicros,
acknowledgementGroupTimeMicros, TimeUnit.MICROSECONDS);
} else {
scheduledTask = null;
}
}
/**
* Since the ack are delayed, we need to do some best-effort duplicate check to discard messages that are being
* resent after a disconnection and for which the user has already sent an acknowledgement.
*/
@Override
public boolean isDuplicate(MessageId messageId) {
if (!(messageId instanceof MessageIdImpl)) {
throw new IllegalArgumentException("isDuplicated cannot accept "
+ messageId.getClass().getName() + ": " + messageId);
}
if (lastCumulativeAck.compareTo(messageId) >= 0) {
// Already included in a cumulative ack
return true;
} else {
final MessageIdImpl messageIdImpl = (messageId instanceof BatchMessageIdImpl)
? ((BatchMessageIdImpl) messageId).toMessageIdImpl()
: (MessageIdImpl) messageId;
return pendingIndividualAcks.contains(messageIdImpl);
}
}
@Override
public CompletableFuture<Void> addListAcknowledgment(List<MessageId> messageIds,
AckType ackType, Map<String, Long> properties) {
if (AckType.Cumulative.equals(ackType)) {
if (isAckReceiptEnabled(consumer.getClientCnx())) {
Set<CompletableFuture<Void>> completableFutureSet = new HashSet<>();
messageIds.forEach(messageId ->
completableFutureSet.add(addAcknowledgment((MessageIdImpl) messageId, ackType, properties)));
return FutureUtil.waitForAll(new ArrayList<>(completableFutureSet));
} else {
messageIds.forEach(messageId -> addAcknowledgment((MessageIdImpl) messageId, ackType, properties));
return CompletableFuture.completedFuture(null);
}
} else {
if (isAckReceiptEnabled(consumer.getClientCnx())) {
// when flush the ack, we should bind the this ack in the currentFuture, during this time we can't
// change currentFuture. but we can lock by the read lock, because the currentFuture is not change
// any ack operation is allowed.
this.lock.readLock().lock();
try {
if (messageIds.size() != 0) {
addListAcknowledgment(messageIds);
return this.currentIndividualAckFuture;
} else {
return CompletableFuture.completedFuture(null);
}
} finally {
this.lock.readLock().unlock();
if (acknowledgementGroupTimeMicros == 0 || pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
flush();
}
}
} else {
addListAcknowledgment(messageIds);
if (acknowledgementGroupTimeMicros == 0 || pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
flush();
}
return CompletableFuture.completedFuture(null);
}
}
}
private void addListAcknowledgment(List<MessageId> messageIds) {
for (MessageId messageId : messageIds) {
if (messageId instanceof BatchMessageIdImpl) {
BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) messageId;
addIndividualAcknowledgment(batchMessageId.toMessageIdImpl(),
batchMessageId,
this::doIndividualAckAsync,
this::doIndividualBatchAckAsync);
} else if (messageId instanceof MessageIdImpl) {
addIndividualAcknowledgment((MessageIdImpl) messageId,
null,
this::doIndividualAckAsync,
this::doIndividualBatchAckAsync);
} else {
throw new IllegalStateException("Unsupported message id type in addListAcknowledgement: "
+ messageId.getClass().getCanonicalName());
}
}
}
@Override
public CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId, AckType ackType,
Map<String, Long> properties) {
if (msgId instanceof BatchMessageIdImpl) {
BatchMessageIdImpl batchMessageId = (BatchMessageIdImpl) msgId;
return addAcknowledgment(batchMessageId.toMessageIdImpl(), ackType, properties, batchMessageId);
} else {
return addAcknowledgment(msgId, ackType, properties, null);
}
}
private CompletableFuture<Void> addIndividualAcknowledgment(
MessageIdImpl msgId,
@Nullable BatchMessageIdImpl batchMessageId,
Function<MessageIdImpl, CompletableFuture<Void>> individualAckFunction,
Function<BatchMessageIdImpl, CompletableFuture<Void>> batchAckFunction) {
if (batchMessageId != null) {
consumer.onAcknowledge(batchMessageId, null);
} else {
consumer.onAcknowledge(msgId, null);
}
if (batchMessageId == null || batchMessageId.ackIndividual()) {
consumer.getStats().incrementNumAcksSent((batchMessageId != null) ? batchMessageId.getBatchSize() : 1);
consumer.getUnAckedMessageTracker().remove(msgId);
if (consumer.getPossibleSendToDeadLetterTopicMessages() != null) {
consumer.getPossibleSendToDeadLetterTopicMessages().remove(msgId);
}
return individualAckFunction.apply(msgId);
} else if (batchIndexAckEnabled) {
return batchAckFunction.apply(batchMessageId);
} else {
return CompletableFuture.completedFuture(null);
}
}
private CompletableFuture<Void> addAcknowledgment(MessageIdImpl msgId,
AckType ackType,
Map<String, Long> properties,
@Nullable BatchMessageIdImpl batchMessageId) {
switch (ackType) {
case Individual:
return addIndividualAcknowledgment(msgId,
batchMessageId,
__ -> doIndividualAck(__, properties),
__ -> doIndividualBatchAck(__, properties));
case Cumulative:
if (batchMessageId != null) {
consumer.onAcknowledgeCumulative(batchMessageId, null);
} else {
consumer.onAcknowledgeCumulative(msgId, null);
}
if (batchMessageId == null || batchMessageId.ackCumulative()) {
return doCumulativeAck(msgId, properties, null);
} else if (batchIndexAckEnabled) {
return doCumulativeBatchIndexAck(batchMessageId, properties);
} else {
if (!batchMessageId.getAcker().isPrevBatchCumulativelyAcked()) {
doCumulativeAck(batchMessageId.prevBatchMessageId(), properties, null);
batchMessageId.getAcker().setPrevBatchCumulativelyAcked(true);
}
return CompletableFuture.completedFuture(null);
}
default:
throw new IllegalStateException("Unknown AckType: " + ackType);
}
}
private CompletableFuture<Void> doIndividualAck(MessageIdImpl messageId, Map<String, Long> properties) {
if (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty())) {
// We cannot group acks if the delay is 0 or when there are properties attached to it. Fortunately that's an
// uncommon condition since it's only used for the compaction subscription.
return doImmediateAck(messageId, AckType.Individual, properties, null);
} else {
if (isAckReceiptEnabled(consumer.getClientCnx())) {
// when flush the ack, we should bind the this ack in the currentFuture, during this time we can't
// change currentFuture. but we can lock by the read lock, because the currentFuture is not change
// any ack operation is allowed.
this.lock.readLock().lock();
try {
doIndividualAckAsync(messageId);
return this.currentIndividualAckFuture;
} finally {
this.lock.readLock().unlock();
if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
flush();
}
}
} else {
doIndividualAckAsync(messageId);
if (pendingIndividualAcks.size() >= MAX_ACK_GROUP_SIZE) {
flush();
}
return CompletableFuture.completedFuture(null);
}
}
}
private CompletableFuture<Void> doIndividualAckAsync(MessageIdImpl messageId) {
pendingIndividualAcks.add(messageId);
pendingIndividualBatchIndexAcks.remove(messageId);
return CompletableFuture.completedFuture(null);
}
private CompletableFuture<Void> doIndividualBatchAck(BatchMessageIdImpl batchMessageId,
Map<String, Long> properties) {
if (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty())) {
return doImmediateBatchIndexAck(batchMessageId, batchMessageId.getBatchIndex(),
batchMessageId.getBatchSize(), AckType.Individual, properties);
} else {
return doIndividualBatchAck(batchMessageId);
}
}
private CompletableFuture<Void> doIndividualBatchAck(BatchMessageIdImpl batchMessageId) {
if (isAckReceiptEnabled(consumer.getClientCnx())) {
// when flush the ack, we should bind the this ack in the currentFuture, during this time we can't
// change currentFuture. but we can lock by the read lock, because the currentFuture is not change
// any ack operation is allowed.
this.lock.readLock().lock();
try {
doIndividualBatchAckAsync(batchMessageId);
return this.currentIndividualAckFuture;
} finally {
this.lock.readLock().unlock();
}
} else {
doIndividualBatchAckAsync(batchMessageId);
return CompletableFuture.completedFuture(null);
}
}
private CompletableFuture<Void> doCumulativeAck(MessageIdImpl messageId, Map<String, Long> properties,
BitSetRecyclable bitSet) {
consumer.getStats().incrementNumAcksSent(consumer.getUnAckedMessageTracker().removeMessagesTill(messageId));
if (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty())) {
// We cannot group acks if the delay is 0 or when there are properties attached to it. Fortunately that's an
// uncommon condition since it's only used for the compaction subscription.
return doImmediateAck(messageId, AckType.Cumulative, properties, bitSet);
} else {
if (isAckReceiptEnabled(consumer.getClientCnx())) {
// when flush the ack, we should bind the this ack in the currentFuture, during this time we can't
// change currentFuture. but we can lock by the read lock, because the currentFuture is not change
// any ack operation is allowed.
this.lock.readLock().lock();
try {
doCumulativeAckAsync(messageId, bitSet);
return this.currentCumulativeAckFuture;
} finally {
this.lock.readLock().unlock();
if (pendingIndividualBatchIndexAcks.size() >= MAX_ACK_GROUP_SIZE) {
flush();
}
}
} else {
doCumulativeAckAsync(messageId, bitSet);
if (pendingIndividualBatchIndexAcks.size() >= MAX_ACK_GROUP_SIZE) {
flush();
}
return CompletableFuture.completedFuture(null);
}
}
}
private CompletableFuture<Void> doIndividualBatchAckAsync(BatchMessageIdImpl batchMessageId) {
ConcurrentBitSetRecyclable bitSet = pendingIndividualBatchIndexAcks.computeIfAbsent(
batchMessageId.toMessageIdImpl(), __ -> {
ConcurrentBitSetRecyclable value;
if (batchMessageId.getAcker() != null
&& !(batchMessageId.getAcker() instanceof BatchMessageAckerDisabled)) {
value = ConcurrentBitSetRecyclable.create(batchMessageId.getAcker().getBitSet());
} else {
value = ConcurrentBitSetRecyclable.create();
value.set(0, batchMessageId.getOriginalBatchSize());
}
return value;
});
bitSet.clear(batchMessageId.getBatchIndex());
return CompletableFuture.completedFuture(null);
}
private void doCumulativeAckAsync(MessageIdImpl msgId, BitSetRecyclable bitSet) {
// Handle concurrent updates from different threads
lastCumulativeAck.update(msgId, bitSet);
}
private CompletableFuture<Void> doCumulativeBatchIndexAck(BatchMessageIdImpl batchMessageId,
Map<String, Long> properties) {
if (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty())) {
return doImmediateBatchIndexAck(batchMessageId, batchMessageId.getBatchIndex(),
batchMessageId.getBatchSize(), AckType.Cumulative, properties);
} else {
BitSetRecyclable bitSet = BitSetRecyclable.create();
bitSet.set(0, batchMessageId.getBatchSize());
bitSet.clear(0, batchMessageId.getBatchIndex() + 1);
return doCumulativeAck(batchMessageId, null, bitSet);
}
}
private CompletableFuture<Void> doImmediateAck(MessageIdImpl msgId, AckType ackType, Map<String, Long> properties,
BitSetRecyclable bitSet) {
ClientCnx cnx = consumer.getClientCnx();
if (cnx == null) {
return FutureUtil.failedFuture(new PulsarClientException
.ConnectException("Consumer connect fail! consumer state:" + consumer.getState()));
}
return newImmediateAckAndFlush(consumer.consumerId, msgId, bitSet, ackType, properties, cnx);
}
private CompletableFuture<Void> doImmediateBatchIndexAck(BatchMessageIdImpl msgId, int batchIndex, int batchSize,
AckType ackType, Map<String, Long> properties) {
ClientCnx cnx = consumer.getClientCnx();
if (cnx == null) {
return FutureUtil.failedFuture(new PulsarClientException
.ConnectException("Consumer connect fail! consumer state:" + consumer.getState()));
}
BitSetRecyclable bitSet;
if (msgId.getAcker() != null && !(msgId.getAcker() instanceof BatchMessageAckerDisabled)) {
bitSet = BitSetRecyclable.valueOf(msgId.getAcker().getBitSet().toLongArray());
} else {
bitSet = BitSetRecyclable.create();
bitSet.set(0, batchSize);
}
if (ackType == AckType.Cumulative) {
bitSet.clear(0, batchIndex + 1);
} else {
bitSet.clear(batchIndex);
}
CompletableFuture<Void> completableFuture = newMessageAckCommandAndWrite(cnx, consumer.consumerId,
msgId.ledgerId, msgId.entryId, bitSet, ackType, null, properties, true, null, null);
bitSet.recycle();
return completableFuture;
}
/**
* Flush all the pending acks and send them to the broker.
*/
@Override
public void flush() {
ClientCnx cnx = consumer.getClientCnx();
if (cnx == null) {
if (log.isDebugEnabled()) {
log.debug("[{}] Cannot flush pending acks since we're not connected to broker", consumer);
}
return;
}
if (isAckReceiptEnabled(consumer.getClientCnx())) {
this.lock.writeLock().lock();
try {
flushAsync(cnx);
} finally {
this.lock.writeLock().unlock();
}
} else {
flushAsync(cnx);
}
}
private void flushAsync(ClientCnx cnx) {
final LastCumulativeAck lastCumulativeAckToFlush = lastCumulativeAck.flush();
boolean shouldFlush = false;
if (lastCumulativeAckToFlush != null) {
shouldFlush = true;
final MessageIdImpl messageId = lastCumulativeAckToFlush.getMessageId();
newMessageAckCommandAndWrite(cnx, consumer.consumerId, messageId.getLedgerId(), messageId.getEntryId(),
lastCumulativeAckToFlush.getBitSetRecyclable(), AckType.Cumulative, null,
Collections.emptyMap(), false, this.currentCumulativeAckFuture, null);
this.consumer.unAckedChunkedMessageIdSequenceMap.remove(messageId);
}
// Flush all individual acks
List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck =
new ArrayList<>(pendingIndividualAcks.size() + pendingIndividualBatchIndexAcks.size());
if (!pendingIndividualAcks.isEmpty()) {
if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())) {
// We can send 1 single protobuf command with all individual acks
while (true) {
MessageIdImpl msgId = pendingIndividualAcks.pollFirst();
if (msgId == null) {
break;
}
// if messageId is checked then all the chunked related to that msg also processed so, ack all of
// them
MessageIdImpl[] chunkMsgIds = this.consumer.unAckedChunkedMessageIdSequenceMap.get(msgId);
if (chunkMsgIds != null && chunkMsgIds.length > 1) {
for (MessageIdImpl cMsgId : chunkMsgIds) {
if (cMsgId != null) {
entriesToAck.add(Triple.of(cMsgId.getLedgerId(), cMsgId.getEntryId(), null));
}
}
// messages will be acked so, remove checked message sequence
this.consumer.unAckedChunkedMessageIdSequenceMap.remove(msgId);
} else {
entriesToAck.add(Triple.of(msgId.getLedgerId(), msgId.getEntryId(), null));
}
}
} else {
// When talking to older brokers, send the acknowledgements individually
while (true) {
MessageIdImpl msgId = pendingIndividualAcks.pollFirst();
if (msgId == null) {
break;
}
newMessageAckCommandAndWrite(cnx, consumer.consumerId, msgId.getLedgerId(), msgId.getEntryId(),
null, AckType.Individual, null, Collections.emptyMap(), false,
null, null);
shouldFlush = true;
}
}
}
if (!pendingIndividualBatchIndexAcks.isEmpty()) {
Iterator<Map.Entry<MessageIdImpl, ConcurrentBitSetRecyclable>> iterator =
pendingIndividualBatchIndexAcks.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<MessageIdImpl, ConcurrentBitSetRecyclable> entry = iterator.next();
entriesToAck.add(Triple.of(entry.getKey().ledgerId, entry.getKey().entryId, entry.getValue()));
iterator.remove();
}
}
if (entriesToAck.size() > 0) {
newMessageAckCommandAndWrite(cnx, consumer.consumerId, 0L, 0L,
null, AckType.Individual, null, null, true,
currentIndividualAckFuture, entriesToAck);
shouldFlush = true;
}
if (shouldFlush) {
if (log.isDebugEnabled()) {
log.debug("[{}] Flushing pending acks to broker: last-cumulative-ack: {} -- individual-acks: {}"
+ " -- individual-batch-index-acks: {}",
consumer, lastCumulativeAck, pendingIndividualAcks, entriesToAck);
}
cnx.ctx().flush();
}
}
@Override
public void flushAndClean() {
flush();
lastCumulativeAck.reset();
pendingIndividualAcks.clear();
}
@Override
public void close() {
flush();
if (scheduledTask != null && !scheduledTask.isCancelled()) {
scheduledTask.cancel(true);
}
}
private CompletableFuture<Void> newImmediateAckAndFlush(long consumerId, MessageIdImpl msgId,
BitSetRecyclable bitSet, AckType ackType,
Map<String, Long> map, ClientCnx cnx) {
MessageIdImpl[] chunkMsgIds = this.consumer.unAckedChunkedMessageIdSequenceMap.remove(msgId);
final CompletableFuture<Void> completableFuture;
// cumulative ack chunk by the last messageId
if (chunkMsgIds != null && ackType != AckType.Cumulative) {
if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())) {
List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck = new ArrayList<>(chunkMsgIds.length);
for (MessageIdImpl cMsgId : chunkMsgIds) {
if (cMsgId != null && chunkMsgIds.length > 1) {
entriesToAck.add(Triple.of(cMsgId.getLedgerId(), cMsgId.getEntryId(), null));
}
}
completableFuture = newMessageAckCommandAndWrite(cnx, consumer.consumerId, 0L, 0L,
null, ackType, null, null, true, null, entriesToAck);
} else {
// if don't support multi message ack, it also support ack receipt, so we should not think about the
// ack receipt in this logic
for (MessageIdImpl cMsgId : chunkMsgIds) {
newMessageAckCommandAndWrite(cnx, consumerId, cMsgId.getLedgerId(), cMsgId.getEntryId(),
bitSet, ackType, null, map, true, null, null);
}
completableFuture = CompletableFuture.completedFuture(null);
}
} else {
completableFuture = newMessageAckCommandAndWrite(cnx, consumerId, msgId.ledgerId, msgId.getEntryId(),
bitSet, ackType, null, map, true, null, null);
}
return completableFuture;
}
private CompletableFuture<Void> newMessageAckCommandAndWrite(
ClientCnx cnx, long consumerId, long ledgerId,
long entryId, BitSetRecyclable ackSet, AckType ackType,
CommandAck.ValidationError validationError,
Map<String, Long> properties, boolean flush,
TimedCompletableFuture<Void> timedCompletableFuture,
List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck) {
if (isAckReceiptEnabled(consumer.getClientCnx())) {
final long requestId = consumer.getClient().newRequestId();
final ByteBuf cmd;
if (entriesToAck == null) {
cmd = Commands.newAck(consumerId, ledgerId, entryId, ackSet,
ackType, null, properties, requestId);
} else {
cmd = Commands.newMultiMessageAck(consumerId, entriesToAck, requestId);
}
if (timedCompletableFuture == null) {
return cnx.newAckForReceipt(cmd, requestId);
} else {
if (ackType == AckType.Individual) {
this.currentIndividualAckFuture = new TimedCompletableFuture<>();
} else {
this.currentCumulativeAckFuture = new TimedCompletableFuture<>();
}
cnx.newAckForReceiptWithFuture(cmd, requestId, timedCompletableFuture);
return timedCompletableFuture;
}
} else {
// client cnx don't support ack receipt, if we don't complete the future, the client will block.
if (ackReceiptEnabled) {
synchronized (PersistentAcknowledgmentsGroupingTracker.this) {
if (!this.currentCumulativeAckFuture.isDone()) {
this.currentCumulativeAckFuture.complete(null);
}
if (!this.currentIndividualAckFuture.isDone()) {
this.currentIndividualAckFuture.complete(null);
}
}
}
final ByteBuf cmd;
if (entriesToAck == null) {
cmd = Commands.newAck(consumerId, ledgerId, entryId, ackSet,
ackType, null, properties, -1);
} else {
cmd = Commands.newMultiMessageAck(consumerId, entriesToAck, -1);
}
if (flush) {
cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise());
} else {
cnx.ctx().write(cmd, cnx.ctx().voidPromise());
}
return CompletableFuture.completedFuture(null);
}
}
private boolean isAckReceiptEnabled(ClientCnx cnx) {
return ackReceiptEnabled && cnx != null
&& Commands.peerSupportsAckReceipt(cnx.getRemoteEndpointProtocolVersion());
}
}
@Getter
class LastCumulativeAck {
// It's used as a returned value by `flush()` to avoid creating a new instance each time `flush()` is called
public static final FastThreadLocal<LastCumulativeAck> LOCAL_LAST_CUMULATIVE_ACK =
new FastThreadLocal<LastCumulativeAck>() {
@Override
protected LastCumulativeAck initialValue() {
return new LastCumulativeAck();
}
};
public static final MessageIdImpl DEFAULT_MESSAGE_ID = (MessageIdImpl) MessageIdImpl.earliest;
private volatile MessageIdImpl messageId = DEFAULT_MESSAGE_ID;
private BitSetRecyclable bitSetRecyclable = null;
private boolean flushRequired = false;
public synchronized void update(final MessageIdImpl messageId, final BitSetRecyclable bitSetRecyclable) {
if (compareTo(messageId) < 0) {
if (this.bitSetRecyclable != null && this.bitSetRecyclable != bitSetRecyclable) {
this.bitSetRecyclable.recycle();
}
set(messageId, bitSetRecyclable);
flushRequired = true;
}
}
public synchronized LastCumulativeAck flush() {
if (flushRequired) {
final LastCumulativeAck localLastCumulativeAck = LOCAL_LAST_CUMULATIVE_ACK.get();
if (bitSetRecyclable != null) {
localLastCumulativeAck.set(messageId, BitSetRecyclable.valueOf(bitSetRecyclable.toLongArray()));
} else {
localLastCumulativeAck.set(this.messageId, null);
}
flushRequired = false;
return localLastCumulativeAck;
} else {
// Return null to indicate nothing to be flushed
return null;
}
}
public synchronized void reset() {
if (bitSetRecyclable != null) {
bitSetRecyclable.recycle();
}
messageId = DEFAULT_MESSAGE_ID;
bitSetRecyclable = null;
flushRequired = false;
}
public synchronized int compareTo(MessageId messageId) {
if (this.messageId instanceof BatchMessageIdImpl && (!(messageId instanceof BatchMessageIdImpl))) {
final BatchMessageIdImpl lhs = (BatchMessageIdImpl) this.messageId;
final MessageIdImpl rhs = (MessageIdImpl) messageId;
return MessageIdImpl.messageIdCompare(
lhs.getLedgerId(), lhs.getEntryId(), lhs.getPartitionIndex(), lhs.getBatchIndex(),
rhs.getLedgerId(), rhs.getEntryId(), rhs.getPartitionIndex(), Integer.MAX_VALUE);
} else if (messageId instanceof BatchMessageIdImpl && (!(this.messageId instanceof BatchMessageIdImpl))){
final MessageIdImpl lhs = this.messageId;
final BatchMessageIdImpl rhs = (BatchMessageIdImpl) messageId;
return MessageIdImpl.messageIdCompare(
lhs.getLedgerId(), lhs.getEntryId(), lhs.getPartitionIndex(), Integer.MAX_VALUE,
rhs.getLedgerId(), rhs.getEntryId(), rhs.getPartitionIndex(), rhs.getBatchIndex());
} else {
return this.messageId.compareTo(messageId);
}
}
private synchronized void set(final MessageIdImpl messageId, final BitSetRecyclable bitSetRecyclable) {
this.messageId = messageId;
this.bitSetRecyclable = bitSetRecyclable;
}
@Override
public String toString() {
String s = messageId.toString();
if (bitSetRecyclable != null) {
s += " (bit set: " + bitSetRecyclable + ")";
}
return s;
}
}