| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.client.impl; |
| |
| import static org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables; |
| import io.netty.buffer.ByteBuf; |
| import io.netty.channel.EventLoopGroup; |
| import io.netty.util.concurrent.FastThreadLocal; |
| import java.util.ArrayList; |
| import java.util.BitSet; |
| import java.util.Collections; |
| import java.util.HashSet; |
| import java.util.Iterator; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentSkipListSet; |
| import java.util.concurrent.ScheduledFuture; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| import java.util.function.Function; |
| import javax.annotation.Nullable; |
| import lombok.Getter; |
| import lombok.extern.slf4j.Slf4j; |
| import org.apache.commons.lang3.tuple.Triple; |
| import org.apache.pulsar.client.api.MessageId; |
| import org.apache.pulsar.client.api.MessageIdAdv; |
| import org.apache.pulsar.client.api.PulsarClientException; |
| import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData; |
| import org.apache.pulsar.client.util.TimedCompletableFuture; |
| import org.apache.pulsar.common.api.proto.CommandAck.AckType; |
| import org.apache.pulsar.common.protocol.Commands; |
| import org.apache.pulsar.common.util.FutureUtil; |
| import org.apache.pulsar.common.util.collections.BitSetRecyclable; |
| import org.apache.pulsar.common.util.collections.ConcurrentBitSetRecyclable; |
| |
| /** |
| * Group the acknowledgements for a certain time and then sends them out in a single protobuf command. |
| */ |
| @Slf4j |
| public class PersistentAcknowledgmentsGroupingTracker implements AcknowledgmentsGroupingTracker { |
| |
| /** |
| * When reaching the max group size, an ack command is sent out immediately. |
| */ |
| private final int maxAckGroupSize; |
| |
| private final ConsumerImpl<?> consumer; |
| |
| private final long acknowledgementGroupTimeMicros; |
| |
| private volatile CompletableFuture<Void> currentIndividualAckFuture; |
| private volatile CompletableFuture<Void> currentCumulativeAckFuture; |
| |
| private final LastCumulativeAck lastCumulativeAck = new LastCumulativeAck(); |
| |
| // When we flush the command, we should ensure current ack request will send correct |
| private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); |
| |
| /** |
| * This is a set of all the individual acks that the application has issued and that were not already sent to |
| * broker. |
| */ |
| private final ConcurrentSkipListSet<MessageIdAdv> pendingIndividualAcks; |
| private final ConcurrentHashMap<MessageIdAdv, ConcurrentBitSetRecyclable> pendingIndividualBatchIndexAcks; |
| |
| private final ScheduledFuture<?> scheduledTask; |
| private final boolean batchIndexAckEnabled; |
| private final boolean ackReceiptEnabled; |
| |
| public PersistentAcknowledgmentsGroupingTracker(ConsumerImpl<?> consumer, ConsumerConfigurationData<?> conf, |
| EventLoopGroup eventLoopGroup) { |
| this.consumer = consumer; |
| this.pendingIndividualAcks = new ConcurrentSkipListSet<>(); |
| this.pendingIndividualBatchIndexAcks = new ConcurrentHashMap<>(); |
| this.acknowledgementGroupTimeMicros = conf.getAcknowledgementsGroupTimeMicros(); |
| this.maxAckGroupSize = conf.getMaxAcknowledgmentGroupSize(); |
| this.batchIndexAckEnabled = conf.isBatchIndexAckEnabled(); |
| this.ackReceiptEnabled = conf.isAckReceiptEnabled(); |
| this.currentIndividualAckFuture = new TimedCompletableFuture<>(); |
| this.currentCumulativeAckFuture = new TimedCompletableFuture<>(); |
| |
| if (acknowledgementGroupTimeMicros > 0) { |
| scheduledTask = eventLoopGroup.next().scheduleWithFixedDelay(catchingAndLoggingThrowables(this::flush), |
| acknowledgementGroupTimeMicros, |
| acknowledgementGroupTimeMicros, TimeUnit.MICROSECONDS); |
| } else { |
| scheduledTask = null; |
| } |
| } |
| |
| /** |
| * Since the ack are delayed, we need to do some best-effort duplicate check to discard messages that are being |
| * resent after a disconnection and for which the user has already sent an acknowledgement. |
| */ |
| @Override |
| public boolean isDuplicate(MessageId messageId) { |
| if (!(messageId instanceof MessageIdAdv)) { |
| throw new IllegalArgumentException("isDuplicated cannot accept " |
| + messageId.getClass().getName() + ": " + messageId); |
| } |
| final MessageIdAdv messageIdAdv = (MessageIdAdv) messageId; |
| if (lastCumulativeAck.compareTo(messageIdAdv) >= 0) { |
| // Already included in a cumulative ack |
| return true; |
| } else { |
| return pendingIndividualAcks.contains(MessageIdAdvUtils.discardBatch(messageIdAdv)); |
| } |
| } |
| |
| @Override |
| public CompletableFuture<Void> addListAcknowledgment(List<MessageId> messageIds, |
| AckType ackType, Map<String, Long> properties) { |
| if (AckType.Cumulative.equals(ackType)) { |
| if (consumer.isAckReceiptEnabled()) { |
| Set<CompletableFuture<Void>> completableFutureSet = new HashSet<>(); |
| messageIds.forEach(messageId -> |
| completableFutureSet.add(addAcknowledgment(messageId, ackType, properties))); |
| return FutureUtil.waitForAll(new ArrayList<>(completableFutureSet)); |
| } else { |
| messageIds.forEach(messageId -> addAcknowledgment(messageId, ackType, properties)); |
| return CompletableFuture.completedFuture(null); |
| } |
| } else { |
| Optional<Lock> readLock = acquireReadLock(); |
| try { |
| if (messageIds.size() != 0) { |
| addListAcknowledgment(messageIds); |
| return readLock.map(__ -> currentIndividualAckFuture) |
| .orElse(CompletableFuture.completedFuture(null)); |
| } else { |
| return CompletableFuture.completedFuture(null); |
| } |
| } finally { |
| readLock.ifPresent(Lock::unlock); |
| if (acknowledgementGroupTimeMicros == 0 || pendingIndividualAcks.size() >= maxAckGroupSize) { |
| flush(); |
| } |
| } |
| } |
| } |
| |
| private void addListAcknowledgment(List<MessageId> messageIds) { |
| for (MessageId messageId : messageIds) { |
| MessageIdAdv messageIdAdv = (MessageIdAdv) messageId; |
| if (MessageIdAdvUtils.isBatch(messageIdAdv)) { |
| addIndividualAcknowledgment(MessageIdAdvUtils.discardBatch(messageIdAdv), |
| messageIdAdv, |
| this::doIndividualAckAsync, |
| this::doIndividualBatchAckAsync); |
| } else { |
| addIndividualAcknowledgment(messageIdAdv, |
| null, |
| this::doIndividualAckAsync, |
| this::doIndividualBatchAckAsync); |
| } |
| } |
| } |
| |
| @Override |
| public CompletableFuture<Void> addAcknowledgment(MessageId msgId, AckType ackType, |
| Map<String, Long> properties) { |
| MessageIdAdv msgIdAdv = (MessageIdAdv) msgId; |
| if (MessageIdAdvUtils.isBatch(msgIdAdv)) { |
| return addAcknowledgment(MessageIdAdvUtils.discardBatch(msgId), ackType, properties, msgIdAdv); |
| } else { |
| return addAcknowledgment(msgIdAdv, ackType, properties, null); |
| } |
| } |
| |
| private CompletableFuture<Void> addIndividualAcknowledgment( |
| MessageIdAdv msgId, |
| @Nullable MessageIdAdv batchMessageId, |
| Function<MessageIdAdv, CompletableFuture<Void>> individualAckFunction, |
| Function<MessageIdAdv, CompletableFuture<Void>> batchAckFunction) { |
| if (batchMessageId != null) { |
| consumer.onAcknowledge(batchMessageId, null); |
| } else { |
| consumer.onAcknowledge(msgId, null); |
| } |
| if (batchMessageId == null || MessageIdAdvUtils.acknowledge(batchMessageId, true)) { |
| consumer.getStats().incrementNumAcksSent((batchMessageId != null) ? batchMessageId.getBatchSize() : 1); |
| consumer.getUnAckedMessageTracker().remove(msgId); |
| if (consumer.getPossibleSendToDeadLetterTopicMessages() != null) { |
| consumer.getPossibleSendToDeadLetterTopicMessages().remove(msgId); |
| } |
| return individualAckFunction.apply(msgId); |
| } else if (batchIndexAckEnabled) { |
| return batchAckFunction.apply(batchMessageId); |
| } else { |
| return CompletableFuture.completedFuture(null); |
| } |
| } |
| |
| private CompletableFuture<Void> addAcknowledgment(MessageIdAdv msgId, |
| AckType ackType, |
| Map<String, Long> properties, |
| @Nullable MessageIdAdv batchMessageId) { |
| switch (ackType) { |
| case Individual: |
| return addIndividualAcknowledgment(msgId, |
| batchMessageId, |
| __ -> doIndividualAck(__, properties), |
| __ -> doIndividualBatchAck(__, properties)); |
| case Cumulative: |
| if (batchMessageId != null) { |
| consumer.onAcknowledgeCumulative(batchMessageId, null); |
| } else { |
| consumer.onAcknowledgeCumulative(msgId, null); |
| } |
| if (batchMessageId == null || MessageIdAdvUtils.acknowledge(batchMessageId, false)) { |
| return doCumulativeAck(msgId, properties, null); |
| } else if (batchIndexAckEnabled) { |
| return doCumulativeBatchIndexAck(batchMessageId, properties); |
| } else { |
| doCumulativeAck(MessageIdAdvUtils.prevMessageId(batchMessageId), properties, null); |
| return CompletableFuture.completedFuture(null); |
| } |
| default: |
| throw new IllegalStateException("Unknown AckType: " + ackType); |
| } |
| } |
| |
| private CompletableFuture<Void> doIndividualAck(MessageIdAdv messageId, Map<String, Long> properties) { |
| if (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty())) { |
| // We cannot group acks if the delay is 0 or when there are properties attached to it. Fortunately that's an |
| // uncommon condition since it's only used for the compaction subscription. |
| return doImmediateAck(messageId, AckType.Individual, properties, null); |
| } else { |
| Optional<Lock> readLock = acquireReadLock(); |
| try { |
| doIndividualAckAsync(messageId); |
| return readLock.map(__ -> currentIndividualAckFuture).orElse(CompletableFuture.completedFuture(null)); |
| } finally { |
| readLock.ifPresent(Lock::unlock); |
| if (pendingIndividualAcks.size() >= maxAckGroupSize) { |
| flush(); |
| } |
| } |
| } |
| } |
| |
| |
| private CompletableFuture<Void> doIndividualAckAsync(MessageIdAdv messageId) { |
| pendingIndividualAcks.add(messageId); |
| pendingIndividualBatchIndexAcks.remove(messageId); |
| return CompletableFuture.completedFuture(null); |
| } |
| |
| private CompletableFuture<Void> doIndividualBatchAck(MessageIdAdv batchMessageId, |
| Map<String, Long> properties) { |
| if (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty())) { |
| return doImmediateBatchIndexAck(batchMessageId, batchMessageId.getBatchIndex(), |
| batchMessageId.getBatchSize(), AckType.Individual, properties); |
| } else { |
| return doIndividualBatchAck(batchMessageId); |
| } |
| } |
| |
| private CompletableFuture<Void> doIndividualBatchAck(MessageIdAdv batchMessageId) { |
| Optional<Lock> readLock = acquireReadLock(); |
| try { |
| doIndividualBatchAckAsync(batchMessageId); |
| return readLock.map(__ -> currentIndividualAckFuture).orElse(CompletableFuture.completedFuture(null)); |
| } finally { |
| readLock.ifPresent(Lock::unlock); |
| if (pendingIndividualBatchIndexAcks.size() >= maxAckGroupSize) { |
| flush(); |
| } |
| } |
| } |
| |
| private CompletableFuture<Void> doCumulativeAck(MessageIdAdv messageId, Map<String, Long> properties, |
| BitSetRecyclable bitSet) { |
| consumer.getStats().incrementNumAcksSent(consumer.getUnAckedMessageTracker().removeMessagesTill(messageId)); |
| if (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty())) { |
| // We cannot group acks if the delay is 0 or when there are properties attached to it. Fortunately that's an |
| // uncommon condition since it's only used for the compaction subscription. |
| return doImmediateAck(messageId, AckType.Cumulative, properties, bitSet); |
| } else { |
| Optional<Lock> readLock = acquireReadLock(); |
| try { |
| doCumulativeAckAsync(messageId, bitSet); |
| return readLock.map(__ -> currentCumulativeAckFuture).orElse(CompletableFuture.completedFuture(null)); |
| } finally { |
| readLock.ifPresent(Lock::unlock); |
| } |
| } |
| } |
| |
| private CompletableFuture<Void> doIndividualBatchAckAsync(MessageIdAdv msgId) { |
| ConcurrentBitSetRecyclable bitSet = pendingIndividualBatchIndexAcks.computeIfAbsent( |
| MessageIdAdvUtils.discardBatch(msgId), __ -> { |
| final BitSet ackSet = msgId.getAckSet(); |
| final ConcurrentBitSetRecyclable value; |
| if (ackSet != null && !ackSet.isEmpty()) { |
| value = ConcurrentBitSetRecyclable.create(ackSet); |
| } else { |
| value = ConcurrentBitSetRecyclable.create(); |
| value.set(0, msgId.getBatchSize()); |
| } |
| return value; |
| }); |
| bitSet.clear(msgId.getBatchIndex()); |
| return CompletableFuture.completedFuture(null); |
| } |
| |
| private void doCumulativeAckAsync(MessageIdAdv msgId, BitSetRecyclable bitSet) { |
| // Handle concurrent updates from different threads |
| lastCumulativeAck.update(msgId, bitSet); |
| } |
| |
| private CompletableFuture<Void> doCumulativeBatchIndexAck(MessageIdAdv batchMessageId, |
| Map<String, Long> properties) { |
| if (acknowledgementGroupTimeMicros == 0 || (properties != null && !properties.isEmpty())) { |
| return doImmediateBatchIndexAck(batchMessageId, batchMessageId.getBatchIndex(), |
| batchMessageId.getBatchSize(), AckType.Cumulative, properties); |
| } else { |
| BitSetRecyclable bitSet = BitSetRecyclable.create(); |
| bitSet.set(0, batchMessageId.getBatchSize()); |
| bitSet.clear(0, batchMessageId.getBatchIndex() + 1); |
| return doCumulativeAck(batchMessageId, null, bitSet); |
| } |
| } |
| |
| private CompletableFuture<Void> doImmediateAck(MessageIdAdv msgId, AckType ackType, Map<String, Long> properties, |
| BitSetRecyclable bitSet) { |
| ClientCnx cnx = consumer.getClientCnx(); |
| |
| if (cnx == null) { |
| return FutureUtil.failedFuture(new PulsarClientException |
| .ConnectException("Consumer connect fail! consumer state:" + consumer.getState())); |
| } |
| return newImmediateAckAndFlush(consumer.consumerId, msgId, bitSet, ackType, properties, cnx); |
| } |
| |
| private CompletableFuture<Void> doImmediateBatchIndexAck(MessageIdAdv msgId, int batchIndex, int batchSize, |
| AckType ackType, Map<String, Long> properties) { |
| ClientCnx cnx = consumer.getClientCnx(); |
| |
| if (cnx == null) { |
| return FutureUtil.failedFuture(new PulsarClientException |
| .ConnectException("Consumer connect fail! consumer state:" + consumer.getState())); |
| } |
| BitSetRecyclable bitSet; |
| if (msgId.getAckSet() != null) { |
| bitSet = BitSetRecyclable.valueOf(msgId.getAckSet().toLongArray()); |
| } else { |
| bitSet = BitSetRecyclable.create(); |
| bitSet.set(0, batchSize); |
| } |
| if (ackType == AckType.Cumulative) { |
| bitSet.clear(0, batchIndex + 1); |
| } else { |
| bitSet.clear(batchIndex); |
| } |
| |
| CompletableFuture<Void> completableFuture = newMessageAckCommandAndWrite(cnx, consumer.consumerId, |
| msgId.getLedgerId(), msgId.getEntryId(), bitSet, ackType, properties, true, null, null); |
| bitSet.recycle(); |
| return completableFuture; |
| } |
| |
| /** |
| * Flush all the pending acks and send them to the broker. |
| */ |
| @Override |
| public void flush() { |
| ClientCnx cnx = consumer.getClientCnx(); |
| |
| if (cnx == null) { |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] Cannot flush pending acks since we're not connected to broker", consumer); |
| } |
| return; |
| } |
| |
| Optional<Lock> writeLock = acquireWriteLock(); |
| try { |
| flushAsync(cnx); |
| } finally { |
| writeLock.ifPresent(Lock::unlock); |
| } |
| } |
| |
| private void flushAsync(ClientCnx cnx) { |
| final LastCumulativeAck lastCumulativeAckToFlush = lastCumulativeAck.flush(); |
| boolean shouldFlush = false; |
| if (lastCumulativeAckToFlush != null) { |
| shouldFlush = true; |
| final MessageIdAdv messageId = lastCumulativeAckToFlush.getMessageId(); |
| newMessageAckCommandAndWrite(cnx, consumer.consumerId, messageId.getLedgerId(), messageId.getEntryId(), |
| lastCumulativeAckToFlush.getBitSetRecyclable(), AckType.Cumulative, |
| Collections.emptyMap(), false, |
| (TimedCompletableFuture<Void>) this.currentCumulativeAckFuture, null); |
| this.consumer.unAckedChunkedMessageIdSequenceMap.remove(messageId); |
| } |
| |
| // Flush all individual acks |
| List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck = |
| new ArrayList<>(pendingIndividualAcks.size() + pendingIndividualBatchIndexAcks.size()); |
| if (!pendingIndividualAcks.isEmpty()) { |
| if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())) { |
| // We can send 1 single protobuf command with all individual acks |
| while (true) { |
| MessageIdAdv msgId = pendingIndividualAcks.pollFirst(); |
| if (msgId == null) { |
| break; |
| } |
| |
| // if messageId is checked then all the chunked related to that msg also processed so, ack all of |
| // them |
| MessageIdImpl[] chunkMsgIds = this.consumer.unAckedChunkedMessageIdSequenceMap.get(msgId); |
| if (chunkMsgIds != null && chunkMsgIds.length > 1) { |
| for (MessageIdImpl cMsgId : chunkMsgIds) { |
| if (cMsgId != null) { |
| entriesToAck.add(Triple.of(cMsgId.getLedgerId(), cMsgId.getEntryId(), null)); |
| } |
| } |
| // messages will be acked so, remove checked message sequence |
| this.consumer.unAckedChunkedMessageIdSequenceMap.remove(msgId); |
| } else { |
| entriesToAck.add(Triple.of(msgId.getLedgerId(), msgId.getEntryId(), null)); |
| } |
| } |
| } else { |
| // When talking to older brokers, send the acknowledgements individually |
| while (true) { |
| MessageIdAdv msgId = pendingIndividualAcks.pollFirst(); |
| if (msgId == null) { |
| break; |
| } |
| newMessageAckCommandAndWrite(cnx, consumer.consumerId, msgId.getLedgerId(), msgId.getEntryId(), |
| null, AckType.Individual, Collections.emptyMap(), false, |
| null, null); |
| shouldFlush = true; |
| } |
| } |
| } |
| |
| if (!pendingIndividualBatchIndexAcks.isEmpty()) { |
| Iterator<Map.Entry<MessageIdAdv, ConcurrentBitSetRecyclable>> iterator = |
| pendingIndividualBatchIndexAcks.entrySet().iterator(); |
| |
| while (iterator.hasNext()) { |
| Map.Entry<MessageIdAdv, ConcurrentBitSetRecyclable> entry = iterator.next(); |
| entriesToAck.add(Triple.of( |
| entry.getKey().getLedgerId(), entry.getKey().getEntryId(), entry.getValue())); |
| iterator.remove(); |
| } |
| } |
| |
| if (entriesToAck.size() > 0) { |
| |
| newMessageAckCommandAndWrite(cnx, consumer.consumerId, 0L, 0L, |
| null, AckType.Individual, null, true, |
| (TimedCompletableFuture<Void>) currentIndividualAckFuture, entriesToAck); |
| shouldFlush = true; |
| } |
| |
| if (shouldFlush) { |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] Flushing pending acks to broker: last-cumulative-ack: {} -- individual-acks: {}" |
| + " -- individual-batch-index-acks: {}", |
| consumer, lastCumulativeAck, pendingIndividualAcks, entriesToAck); |
| } |
| cnx.ctx().flush(); |
| } |
| |
| } |
| |
| @Override |
| public void flushAndClean() { |
| flush(); |
| lastCumulativeAck.reset(); |
| pendingIndividualAcks.clear(); |
| } |
| |
| @Override |
| public void close() { |
| flush(); |
| if (scheduledTask != null && !scheduledTask.isCancelled()) { |
| scheduledTask.cancel(true); |
| } |
| } |
| |
| private CompletableFuture<Void> newImmediateAckAndFlush(long consumerId, MessageIdAdv msgId, |
| BitSetRecyclable bitSet, AckType ackType, |
| Map<String, Long> map, ClientCnx cnx) { |
| MessageIdImpl[] chunkMsgIds = this.consumer.unAckedChunkedMessageIdSequenceMap.remove(msgId); |
| final CompletableFuture<Void> completableFuture; |
| // cumulative ack chunk by the last messageId |
| if (chunkMsgIds != null && ackType != AckType.Cumulative) { |
| if (Commands.peerSupportsMultiMessageAcknowledgment(cnx.getRemoteEndpointProtocolVersion())) { |
| List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck = new ArrayList<>(chunkMsgIds.length); |
| for (MessageIdImpl cMsgId : chunkMsgIds) { |
| if (cMsgId != null && chunkMsgIds.length > 1) { |
| entriesToAck.add(Triple.of(cMsgId.getLedgerId(), cMsgId.getEntryId(), null)); |
| } |
| } |
| completableFuture = newMessageAckCommandAndWrite(cnx, consumer.consumerId, 0L, 0L, |
| null, ackType, null, true, null, entriesToAck); |
| } else { |
| // if don't support multi message ack, it also support ack receipt, so we should not think about the |
| // ack receipt in this logic |
| for (MessageIdImpl cMsgId : chunkMsgIds) { |
| newMessageAckCommandAndWrite(cnx, consumerId, cMsgId.getLedgerId(), cMsgId.getEntryId(), |
| bitSet, ackType, map, true, null, null); |
| } |
| completableFuture = CompletableFuture.completedFuture(null); |
| } |
| } else { |
| completableFuture = newMessageAckCommandAndWrite(cnx, consumerId, msgId.getLedgerId(), msgId.getEntryId(), |
| bitSet, ackType, map, true, null, null); |
| } |
| return completableFuture; |
| } |
| |
| private CompletableFuture<Void> newMessageAckCommandAndWrite( |
| ClientCnx cnx, long consumerId, long ledgerId, |
| long entryId, BitSetRecyclable ackSet, AckType ackType, |
| Map<String, Long> properties, boolean flush, |
| TimedCompletableFuture<Void> timedCompletableFuture, |
| List<Triple<Long, Long, ConcurrentBitSetRecyclable>> entriesToAck) { |
| if (consumer.isAckReceiptEnabled()) { |
| final long requestId = consumer.getClient().newRequestId(); |
| final ByteBuf cmd; |
| if (entriesToAck == null) { |
| cmd = Commands.newAck(consumerId, ledgerId, entryId, ackSet, |
| ackType, null, properties, requestId); |
| } else { |
| cmd = Commands.newMultiMessageAck(consumerId, entriesToAck, requestId); |
| } |
| if (timedCompletableFuture == null) { |
| return cnx.newAckForReceipt(cmd, requestId); |
| } else { |
| if (ackType == AckType.Individual) { |
| this.currentIndividualAckFuture = new TimedCompletableFuture<>(); |
| } else { |
| this.currentCumulativeAckFuture = new TimedCompletableFuture<>(); |
| } |
| cnx.newAckForReceiptWithFuture(cmd, requestId, timedCompletableFuture); |
| return timedCompletableFuture; |
| } |
| } else { |
| // client cnx don't support ack receipt, if we don't complete the future, the client will block. |
| if (ackReceiptEnabled) { |
| synchronized (PersistentAcknowledgmentsGroupingTracker.this) { |
| if (!this.currentCumulativeAckFuture.isDone()) { |
| this.currentCumulativeAckFuture.complete(null); |
| } |
| |
| if (!this.currentIndividualAckFuture.isDone()) { |
| this.currentIndividualAckFuture.complete(null); |
| } |
| } |
| } |
| final ByteBuf cmd; |
| if (entriesToAck == null) { |
| cmd = Commands.newAck(consumerId, ledgerId, entryId, ackSet, |
| ackType, null, properties, -1); |
| } else { |
| cmd = Commands.newMultiMessageAck(consumerId, entriesToAck, -1); |
| } |
| if (flush) { |
| cnx.ctx().writeAndFlush(cmd, cnx.ctx().voidPromise()); |
| } else { |
| cnx.ctx().write(cmd, cnx.ctx().voidPromise()); |
| } |
| return CompletableFuture.completedFuture(null); |
| } |
| } |
| |
| public Optional<Lock> acquireReadLock() { |
| Optional<Lock> optionalLock = Optional.ofNullable(consumer.isAckReceiptEnabled() ? lock.readLock() : null); |
| optionalLock.ifPresent(Lock::lock); |
| return optionalLock; |
| } |
| |
| public Optional<Lock> acquireWriteLock() { |
| Optional<Lock> optionalLock = Optional.ofNullable(consumer.isAckReceiptEnabled() ? lock.writeLock() : null); |
| optionalLock.ifPresent(Lock::lock); |
| return optionalLock; |
| } |
| } |
| |
| @Getter |
| class LastCumulativeAck { |
| |
| // It's used as a returned value by `flush()` to avoid creating a new instance each time `flush()` is called |
| public static final FastThreadLocal<LastCumulativeAck> LOCAL_LAST_CUMULATIVE_ACK = |
| new FastThreadLocal<LastCumulativeAck>() { |
| |
| @Override |
| protected LastCumulativeAck initialValue() { |
| return new LastCumulativeAck(); |
| } |
| }; |
| public static final MessageIdAdv DEFAULT_MESSAGE_ID = (MessageIdAdv) MessageId.earliest; |
| |
| private volatile MessageIdAdv messageId = DEFAULT_MESSAGE_ID; |
| private BitSetRecyclable bitSetRecyclable = null; |
| private boolean flushRequired = false; |
| |
| public synchronized void update(final MessageIdAdv messageId, final BitSetRecyclable bitSetRecyclable) { |
| if (compareTo(messageId) < 0) { |
| if (this.bitSetRecyclable != null && this.bitSetRecyclable != bitSetRecyclable) { |
| this.bitSetRecyclable.recycle(); |
| } |
| set(messageId, bitSetRecyclable); |
| flushRequired = true; |
| } |
| } |
| |
| public synchronized LastCumulativeAck flush() { |
| if (flushRequired) { |
| final LastCumulativeAck localLastCumulativeAck = LOCAL_LAST_CUMULATIVE_ACK.get(); |
| if (bitSetRecyclable != null) { |
| localLastCumulativeAck.set(messageId, BitSetRecyclable.valueOf(bitSetRecyclable.toLongArray())); |
| } else { |
| localLastCumulativeAck.set(this.messageId, null); |
| } |
| flushRequired = false; |
| return localLastCumulativeAck; |
| } else { |
| // Return null to indicate nothing to be flushed |
| return null; |
| } |
| } |
| |
| public synchronized void reset() { |
| if (bitSetRecyclable != null) { |
| bitSetRecyclable.recycle(); |
| } |
| messageId = DEFAULT_MESSAGE_ID; |
| bitSetRecyclable = null; |
| flushRequired = false; |
| } |
| |
| public synchronized int compareTo(MessageIdAdv messageId) { |
| int result = Long.compare(this.messageId.getLedgerId(), messageId.getLedgerId()); |
| if (result != 0) { |
| return result; |
| } |
| result = Long.compare(this.messageId.getEntryId(), messageId.getEntryId()); |
| if (result != 0) { |
| return result; |
| } |
| return Integer.compare( |
| (this.messageId.getBatchIndex() >= 0) ? this.messageId.getBatchIndex() : Integer.MAX_VALUE, |
| (messageId.getBatchIndex() >= 0) ? messageId.getBatchIndex() : Integer.MAX_VALUE |
| ); |
| } |
| |
| private synchronized void set(final MessageIdAdv messageId, final BitSetRecyclable bitSetRecyclable) { |
| this.messageId = messageId; |
| this.bitSetRecyclable = bitSetRecyclable; |
| } |
| |
| @Override |
| public String toString() { |
| String s = messageId.toString(); |
| if (bitSetRecyclable != null) { |
| s += " (bit set: " + bitSetRecyclable + ")"; |
| } |
| return s; |
| } |
| } |