| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.broker.service; |
| |
| import static com.google.common.base.Preconditions.checkArgument; |
| import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH; |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.base.MoreObjects; |
| import com.google.common.util.concurrent.AtomicDouble; |
| import io.netty.util.concurrent.Future; |
| import io.netty.util.concurrent.Promise; |
| import java.util.ArrayList; |
| import java.util.BitSet; |
| import java.util.Collections; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Optional; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; |
| import java.util.concurrent.atomic.LongAdder; |
| import java.util.stream.Collectors; |
| import lombok.Getter; |
| import lombok.Setter; |
| import org.apache.bookkeeper.mledger.Entry; |
| import org.apache.bookkeeper.mledger.Position; |
| import org.apache.bookkeeper.mledger.impl.PositionImpl; |
| import org.apache.commons.lang3.mutable.MutableInt; |
| import org.apache.commons.lang3.tuple.MutablePair; |
| import org.apache.pulsar.broker.authentication.AuthenticationDataSubscription; |
| import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; |
| import org.apache.pulsar.broker.service.persistent.PersistentSubscription; |
| import org.apache.pulsar.broker.service.persistent.PersistentTopic; |
| import org.apache.pulsar.client.api.MessageId; |
| import org.apache.pulsar.client.api.transaction.TxnID; |
| import org.apache.pulsar.common.api.proto.CommandAck; |
| import org.apache.pulsar.common.api.proto.CommandAck.AckType; |
| import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; |
| import org.apache.pulsar.common.api.proto.CommandTopicMigrated.ResourceType; |
| import org.apache.pulsar.common.api.proto.KeyLongValue; |
| import org.apache.pulsar.common.api.proto.KeySharedMeta; |
| import org.apache.pulsar.common.api.proto.MessageIdData; |
| import org.apache.pulsar.common.naming.TopicName; |
| import org.apache.pulsar.common.policies.data.ClusterPolicies.ClusterUrl; |
| import org.apache.pulsar.common.policies.data.TopicOperation; |
| import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl; |
| import org.apache.pulsar.common.protocol.Commands; |
| import org.apache.pulsar.common.schema.SchemaType; |
| import org.apache.pulsar.common.stats.Rate; |
| import org.apache.pulsar.common.util.DateFormatter; |
| import org.apache.pulsar.common.util.FutureUtil; |
| import org.apache.pulsar.common.util.collections.BitSetRecyclable; |
| import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap; |
| import org.apache.pulsar.common.util.collections.ConcurrentLongLongPairHashMap.LongPair; |
| import org.apache.pulsar.transaction.common.exception.TransactionConflictException; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * A Consumer is a consumer currently connected and associated with a Subscription. |
| */ |
| public class Consumer { |
| private final Subscription subscription; |
| private final SubType subType; |
| private final TransportCnx cnx; |
| private final String appId; |
| private final String topicName; |
| private final int partitionIdx; |
| |
| private final long consumerId; |
| private final int priorityLevel; |
| private final boolean readCompacted; |
| private final String consumerName; |
| private final Rate msgOut; |
| private final Rate msgRedeliver; |
| private final LongAdder msgOutCounter; |
| private final LongAdder bytesOutCounter; |
| private final Rate messageAckRate; |
| |
| private volatile long lastConsumedTimestamp; |
| private volatile long lastAckedTimestamp; |
| private volatile long lastConsumedFlowTimestamp; |
| private Rate chunkedMessageRate; |
| |
| // Represents how many messages we can safely send to the consumer without |
| // overflowing its receiving queue. The consumer will use Flow commands to |
| // increase its availability |
| private static final AtomicIntegerFieldUpdater<Consumer> MESSAGE_PERMITS_UPDATER = |
| AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "messagePermits"); |
| private volatile int messagePermits = 0; |
| // It starts keep tracking of messagePermits once consumer gets blocked, as consumer needs two separate counts: |
| // messagePermits (1) before and (2) after being blocked: to dispatch only blockedPermit number of messages at the |
| // time of redelivery |
| private static final AtomicIntegerFieldUpdater<Consumer> PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER = |
| AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "permitsReceivedWhileConsumerBlocked"); |
| private volatile int permitsReceivedWhileConsumerBlocked = 0; |
| |
| private final ConcurrentLongLongPairHashMap pendingAcks; |
| |
| private final ConsumerStatsImpl stats; |
| |
| private final boolean isDurable; |
| |
| private final boolean isPersistentTopic; |
| |
| private static final AtomicIntegerFieldUpdater<Consumer> UNACKED_MESSAGES_UPDATER = |
| AtomicIntegerFieldUpdater.newUpdater(Consumer.class, "unackedMessages"); |
| private volatile int unackedMessages = 0; |
| private volatile boolean blockedConsumerOnUnackedMsgs = false; |
| |
| private final Map<String, String> metadata; |
| |
| private final KeySharedMeta keySharedMeta; |
| |
| /** |
| * It starts keep tracking the average messages per entry. |
| * The initial value is 0, when new value comes, it will update with |
| * avgMessagesPerEntry = avgMessagePerEntry * avgPercent + (1 - avgPercent) * new Value. |
| */ |
| private final AtomicDouble avgMessagesPerEntry = new AtomicDouble(0); |
| private static final long [] EMPTY_ACK_SET = new long[0]; |
| |
| private static final double avgPercent = 0.9; |
| private boolean preciseDispatcherFlowControl; |
| private PositionImpl readPositionWhenJoining; |
| private final String clientAddress; // IP address only, no port number included |
| private final MessageId startMessageId; |
| private final boolean isAcknowledgmentAtBatchIndexLevelEnabled; |
| |
| @Getter |
| @Setter |
| private volatile long consumerEpoch; |
| |
| private long negtiveUnackedMsgsTimestamp; |
| |
| @Getter |
| private final SchemaType schemaType; |
| |
| public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId, |
| int priorityLevel, String consumerName, |
| boolean isDurable, TransportCnx cnx, String appId, |
| Map<String, String> metadata, boolean readCompacted, |
| KeySharedMeta keySharedMeta, MessageId startMessageId, long consumerEpoch) { |
| this(subscription, subType, topicName, consumerId, priorityLevel, consumerName, isDurable, cnx, appId, |
| metadata, readCompacted, keySharedMeta, startMessageId, consumerEpoch, null); |
| } |
| |
| public Consumer(Subscription subscription, SubType subType, String topicName, long consumerId, |
| int priorityLevel, String consumerName, |
| boolean isDurable, TransportCnx cnx, String appId, |
| Map<String, String> metadata, boolean readCompacted, |
| KeySharedMeta keySharedMeta, MessageId startMessageId, |
| long consumerEpoch, SchemaType schemaType) { |
| this.subscription = subscription; |
| this.subType = subType; |
| this.topicName = topicName; |
| this.partitionIdx = TopicName.getPartitionIndex(topicName); |
| this.consumerId = consumerId; |
| this.priorityLevel = priorityLevel; |
| this.readCompacted = readCompacted; |
| this.consumerName = consumerName; |
| this.isDurable = isDurable; |
| this.isPersistentTopic = subscription.getTopic() instanceof PersistentTopic; |
| this.keySharedMeta = keySharedMeta; |
| this.cnx = cnx; |
| this.msgOut = new Rate(); |
| this.chunkedMessageRate = new Rate(); |
| this.msgRedeliver = new Rate(); |
| this.bytesOutCounter = new LongAdder(); |
| this.msgOutCounter = new LongAdder(); |
| this.messageAckRate = new Rate(); |
| this.appId = appId; |
| |
| // Ensure we start from compacted view |
| this.startMessageId = (readCompacted && startMessageId == null) ? MessageId.earliest : startMessageId; |
| |
| this.preciseDispatcherFlowControl = cnx.isPreciseDispatcherFlowControl(); |
| PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.set(this, 0); |
| MESSAGE_PERMITS_UPDATER.set(this, 0); |
| UNACKED_MESSAGES_UPDATER.set(this, 0); |
| |
| this.metadata = metadata != null ? metadata : Collections.emptyMap(); |
| |
| stats = new ConsumerStatsImpl(); |
| stats.setAddress(cnx.clientSourceAddressAndPort()); |
| stats.consumerName = consumerName; |
| stats.setConnectedSince(DateFormatter.now()); |
| stats.setClientVersion(cnx.getClientVersion()); |
| stats.metadata = this.metadata; |
| |
| if (Subscription.isIndividualAckMode(subType)) { |
| this.pendingAcks = ConcurrentLongLongPairHashMap.newBuilder() |
| .autoShrink(subscription.getTopic().getBrokerService() |
| .getPulsar().getConfiguration().isAutoShrinkForConsumerPendingAcksMap()) |
| .expectedItems(256) |
| .concurrencyLevel(1) |
| .build(); |
| } else { |
| // We don't need to keep track of pending acks if the subscription is not shared |
| this.pendingAcks = null; |
| } |
| |
| this.clientAddress = cnx.clientSourceAddress(); |
| this.consumerEpoch = consumerEpoch; |
| this.isAcknowledgmentAtBatchIndexLevelEnabled = subscription.getTopic().getBrokerService() |
| .getPulsar().getConfiguration().isAcknowledgmentAtBatchIndexLevelEnabled(); |
| |
| this.schemaType = schemaType; |
| } |
| |
| @VisibleForTesting |
| Consumer(String consumerName, int availablePermits) { |
| this.subscription = null; |
| this.subType = null; |
| this.cnx = null; |
| this.appId = null; |
| this.topicName = null; |
| this.partitionIdx = 0; |
| this.consumerId = 0L; |
| this.priorityLevel = 0; |
| this.readCompacted = false; |
| this.consumerName = consumerName; |
| this.msgOut = null; |
| this.msgRedeliver = null; |
| this.msgOutCounter = null; |
| this.bytesOutCounter = null; |
| this.messageAckRate = null; |
| this.pendingAcks = null; |
| this.stats = null; |
| this.isDurable = false; |
| this.isPersistentTopic = false; |
| this.metadata = null; |
| this.keySharedMeta = null; |
| this.clientAddress = null; |
| this.startMessageId = null; |
| this.isAcknowledgmentAtBatchIndexLevelEnabled = false; |
| this.schemaType = null; |
| MESSAGE_PERMITS_UPDATER.set(this, availablePermits); |
| } |
| |
| public SubType subType() { |
| return subType; |
| } |
| |
| public long consumerId() { |
| return consumerId; |
| } |
| |
| public String consumerName() { |
| return consumerName; |
| } |
| |
| void notifyActiveConsumerChange(Consumer activeConsumer) { |
| if (log.isDebugEnabled()) { |
| log.debug("notify consumer {} - that [{}] for subscription {} has new active consumer : {}", |
| consumerId, topicName, subscription.getName(), activeConsumer); |
| } |
| cnx.getCommandSender().sendActiveConsumerChange(consumerId, this == activeConsumer); |
| } |
| |
| public boolean readCompacted() { |
| return readCompacted; |
| } |
| |
| public Future<Void> sendMessages(final List<? extends Entry> entries, EntryBatchSizes batchSizes, |
| EntryBatchIndexesAcks batchIndexesAcks, |
| int totalMessages, long totalBytes, long totalChunkedMessages, |
| RedeliveryTracker redeliveryTracker) { |
| return sendMessages(entries, batchSizes, batchIndexesAcks, totalMessages, totalBytes, |
| totalChunkedMessages, redeliveryTracker, DEFAULT_CONSUMER_EPOCH); |
| } |
| |
| /** |
| * Dispatch a list of entries to the consumer. <br/> |
| * <b>It is also responsible to release entries data and recycle entries object.</b> |
| * |
| * @return a SendMessageInfo object that contains the detail of what was sent to consumer |
| */ |
| public Future<Void> sendMessages(final List<? extends Entry> entries, EntryBatchSizes batchSizes, |
| EntryBatchIndexesAcks batchIndexesAcks, |
| int totalMessages, long totalBytes, long totalChunkedMessages, |
| RedeliveryTracker redeliveryTracker, long epoch) { |
| this.lastConsumedTimestamp = System.currentTimeMillis(); |
| |
| if (entries.isEmpty() || totalMessages == 0) { |
| if (log.isDebugEnabled()) { |
| log.debug("[{}-{}] List of messages is empty, triggering write future immediately for consumerId {}", |
| topicName, subscription, consumerId); |
| } |
| batchSizes.recyle(); |
| if (batchIndexesAcks != null) { |
| batchIndexesAcks.recycle(); |
| } |
| final Promise<Void> writePromise = cnx.newPromise(); |
| writePromise.setSuccess(null); |
| return writePromise; |
| } |
| int unackedMessages = totalMessages; |
| int totalEntries = 0; |
| |
| for (int i = 0; i < entries.size(); i++) { |
| Entry entry = entries.get(i); |
| if (entry != null) { |
| totalEntries++; |
| // Note |
| // Must ensure that the message is written to the pendingAcks before sent is first, |
| // because this consumer is possible to disconnect at this time. |
| if (pendingAcks != null) { |
| int batchSize = batchSizes.getBatchSize(i); |
| int stickyKeyHash = getStickyKeyHash(entry); |
| long[] ackSet = batchIndexesAcks == null ? null : batchIndexesAcks.getAckSet(i); |
| if (ackSet != null) { |
| unackedMessages -= (batchSize - BitSet.valueOf(ackSet).cardinality()); |
| } |
| pendingAcks.put(entry.getLedgerId(), entry.getEntryId(), batchSize, stickyKeyHash); |
| if (log.isDebugEnabled()) { |
| log.debug("[{}-{}] Added {}:{} ledger entry with batchSize of {} to pendingAcks in" |
| + " broker.service.Consumer for consumerId: {}", |
| topicName, subscription, entry.getLedgerId(), entry.getEntryId(), batchSize, |
| consumerId); |
| } |
| } |
| } |
| } |
| |
| // calculate avg message per entry |
| if (avgMessagesPerEntry.get() < 1) { //valid avgMessagesPerEntry should always >= 1 |
| // set init value. |
| avgMessagesPerEntry.set(1.0 * totalMessages / totalEntries); |
| } else { |
| avgMessagesPerEntry.set(avgMessagesPerEntry.get() * avgPercent |
| + (1 - avgPercent) * totalMessages / totalEntries); |
| } |
| |
| // reduce permit and increment unackedMsg count with total number of messages in batch-msgs |
| int ackedCount = batchIndexesAcks == null ? 0 : batchIndexesAcks.getTotalAckedIndexCount(); |
| MESSAGE_PERMITS_UPDATER.addAndGet(this, ackedCount - totalMessages); |
| if (log.isDebugEnabled()){ |
| log.debug("[{}-{}] Added {} minus {} messages to MESSAGE_PERMITS_UPDATER in broker.service.Consumer" |
| + " for consumerId: {}; avgMessagesPerEntry is {}", |
| topicName, subscription, ackedCount, totalMessages, consumerId, avgMessagesPerEntry.get()); |
| } |
| incrementUnackedMessages(unackedMessages); |
| Future<Void> writeAndFlushPromise = |
| cnx.getCommandSender().sendMessagesToConsumer(consumerId, topicName, subscription, partitionIdx, |
| entries, batchSizes, batchIndexesAcks, redeliveryTracker, epoch); |
| writeAndFlushPromise.addListener(status -> { |
| // only increment counters after the messages have been successfully written to the TCP/IP connection |
| if (status.isSuccess()) { |
| msgOut.recordMultipleEvents(totalMessages, totalBytes); |
| msgOutCounter.add(totalMessages); |
| bytesOutCounter.add(totalBytes); |
| chunkedMessageRate.recordMultipleEvents(totalChunkedMessages, 0); |
| } else { |
| if (log.isDebugEnabled()) { |
| log.debug("[{}-{}] Sent messages to client fail by IO exception[{}], close the connection" |
| + " immediately. Consumer: {}", topicName, subscription, |
| status.cause() == null ? "" : status.cause().getMessage(), this.toString()); |
| } |
| } |
| }); |
| return writeAndFlushPromise; |
| } |
| |
| private void incrementUnackedMessages(int unackedMessages) { |
| if (Subscription.isIndividualAckMode(subType) |
| && addAndGetUnAckedMsgs(this, unackedMessages) >= getMaxUnackedMessages() |
| && getMaxUnackedMessages() > 0) { |
| blockedConsumerOnUnackedMsgs = true; |
| } |
| } |
| |
| public boolean isWritable() { |
| return cnx.isWritable(); |
| } |
| |
| /** |
| * Close the consumer if: a. the connection is dropped b. connection is open (graceful close) and there are no |
| * pending message acks |
| */ |
| public void close() throws BrokerServiceException { |
| close(false); |
| } |
| |
| public void close(boolean isResetCursor) throws BrokerServiceException { |
| subscription.removeConsumer(this, isResetCursor); |
| cnx.removedConsumer(this); |
| } |
| |
| public void disconnect() { |
| disconnect(false); |
| } |
| |
| public void disconnect(boolean isResetCursor) { |
| disconnect(isResetCursor, Optional.empty()); |
| } |
| |
| public void disconnect(boolean isResetCursor, Optional<BrokerLookupData> assignedBrokerLookupData) { |
| log.info("Disconnecting consumer: {}", this); |
| cnx.closeConsumer(this, assignedBrokerLookupData); |
| try { |
| close(isResetCursor); |
| } catch (BrokerServiceException e) { |
| log.warn("Consumer {} was already closed: {}", this, e.getMessage(), e); |
| } |
| } |
| |
| public void doUnsubscribe(final long requestId, boolean force) { |
| subscription.doUnsubscribe(this, force).thenAccept(v -> { |
| log.info("Unsubscribed successfully from {}", subscription); |
| cnx.removedConsumer(this); |
| cnx.getCommandSender().sendSuccessResponse(requestId); |
| }).exceptionally(exception -> { |
| log.warn("Unsubscribe failed for {}", subscription, exception); |
| cnx.getCommandSender().sendErrorResponse(requestId, BrokerServiceException.getClientErrorCode(exception), |
| exception.getCause().getMessage()); |
| return null; |
| }); |
| } |
| |
| public CompletableFuture<Void> messageAcked(CommandAck ack) { |
| CompletableFuture<Long> future; |
| |
| this.lastAckedTimestamp = System.currentTimeMillis(); |
| Map<String, Long> properties = Collections.emptyMap(); |
| if (ack.getPropertiesCount() > 0) { |
| properties = ack.getPropertiesList().stream() |
| .collect(Collectors.toMap(KeyLongValue::getKey, KeyLongValue::getValue)); |
| } |
| |
| if (ack.getAckType() == AckType.Cumulative) { |
| if (ack.getMessageIdsCount() != 1) { |
| log.warn("[{}] [{}] Received multi-message ack", subscription, consumerId); |
| return CompletableFuture.completedFuture(null); |
| } |
| |
| if (Subscription.isIndividualAckMode(subType)) { |
| log.warn("[{}] [{}] Received cumulative ack on shared subscription, ignoring", |
| subscription, consumerId); |
| return CompletableFuture.completedFuture(null); |
| } |
| |
| PositionImpl position; |
| MessageIdData msgId = ack.getMessageIdAt(0); |
| if (msgId.getAckSetsCount() > 0) { |
| long[] ackSets = new long[msgId.getAckSetsCount()]; |
| for (int j = 0; j < msgId.getAckSetsCount(); j++) { |
| ackSets[j] = msgId.getAckSetAt(j); |
| } |
| position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId(), ackSets); |
| } else { |
| position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId()); |
| } |
| |
| if (ack.hasTxnidMostBits() && ack.hasTxnidLeastBits()) { |
| List<PositionImpl> positionsAcked = Collections.singletonList(position); |
| future = transactionCumulativeAcknowledge(ack.getTxnidMostBits(), |
| ack.getTxnidLeastBits(), positionsAcked) |
| .thenApply(unused -> 1L); |
| } else { |
| List<Position> positionsAcked = Collections.singletonList(position); |
| subscription.acknowledgeMessage(positionsAcked, AckType.Cumulative, properties); |
| future = CompletableFuture.completedFuture(1L); |
| } |
| } else { |
| if (ack.hasTxnidLeastBits() && ack.hasTxnidMostBits()) { |
| future = individualAckWithTransaction(ack); |
| } else { |
| future = individualAckNormal(ack, properties); |
| } |
| } |
| |
| return future |
| .thenApply(v -> { |
| this.messageAckRate.recordEvent(v); |
| return null; |
| }); |
| } |
| |
| //this method is for individual ack not carry the transaction |
| private CompletableFuture<Long> individualAckNormal(CommandAck ack, Map<String, Long> properties) { |
| List<Position> positionsAcked = new ArrayList<>(); |
| long totalAckCount = 0; |
| for (int i = 0; i < ack.getMessageIdsCount(); i++) { |
| MessageIdData msgId = ack.getMessageIdAt(i); |
| PositionImpl position; |
| long ackedCount = 0; |
| long batchSize = getBatchSize(msgId); |
| Consumer ackOwnerConsumer = getAckOwnerConsumer(msgId.getLedgerId(), msgId.getEntryId()); |
| if (msgId.getAckSetsCount() > 0) { |
| long[] ackSets = new long[msgId.getAckSetsCount()]; |
| for (int j = 0; j < msgId.getAckSetsCount(); j++) { |
| ackSets[j] = msgId.getAckSetAt(j); |
| } |
| position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId(), ackSets); |
| ackedCount = getAckedCountForBatchIndexLevelEnabled(position, batchSize, ackSets, ackOwnerConsumer); |
| if (isTransactionEnabled()) { |
| //sync the batch position bit set point, in order to delete the position in pending acks |
| if (Subscription.isIndividualAckMode(subType)) { |
| ((PersistentSubscription) subscription) |
| .syncBatchPositionBitSetForPendingAck(position); |
| } |
| } |
| addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount); |
| } else { |
| position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId()); |
| ackedCount = getAckedCountForMsgIdNoAckSets(batchSize, position, ackOwnerConsumer); |
| if (checkCanRemovePendingAcksAndHandle(position, msgId)) { |
| addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount); |
| } |
| } |
| |
| positionsAcked.add(position); |
| |
| checkAckValidationError(ack, position); |
| |
| totalAckCount += ackedCount; |
| } |
| subscription.acknowledgeMessage(positionsAcked, AckType.Individual, properties); |
| CompletableFuture<Long> completableFuture = new CompletableFuture<>(); |
| completableFuture.complete(totalAckCount); |
| if (isTransactionEnabled() && Subscription.isIndividualAckMode(subType)) { |
| completableFuture.whenComplete((v, e) -> positionsAcked.forEach(position -> { |
| //check if the position can remove from the consumer pending acks. |
| // the bit set is empty in pending ack handle. |
| if (((PositionImpl) position).getAckSet() != null) { |
| if (((PersistentSubscription) subscription) |
| .checkIsCanDeleteConsumerPendingAck((PositionImpl) position)) { |
| removePendingAcks((PositionImpl) position); |
| } |
| } |
| })); |
| } |
| return completableFuture; |
| } |
| |
| |
| //this method is for individual ack carry the transaction |
| private CompletableFuture<Long> individualAckWithTransaction(CommandAck ack) { |
| // Individual ack |
| List<MutablePair<PositionImpl, Integer>> positionsAcked = new ArrayList<>(); |
| if (!isTransactionEnabled()) { |
| return FutureUtil.failedFuture( |
| new BrokerServiceException.NotAllowedException("Server don't support transaction ack!")); |
| } |
| |
| LongAdder totalAckCount = new LongAdder(); |
| for (int i = 0; i < ack.getMessageIdsCount(); i++) { |
| MessageIdData msgId = ack.getMessageIdAt(i); |
| PositionImpl position = PositionImpl.get(msgId.getLedgerId(), msgId.getEntryId()); |
| // acked count at least one |
| long ackedCount = 0; |
| long batchSize = 0; |
| if (msgId.hasBatchSize()) { |
| batchSize = msgId.getBatchSize(); |
| // ack batch messages set ackeCount = batchSize |
| ackedCount = msgId.getBatchSize(); |
| positionsAcked.add(new MutablePair<>(position, msgId.getBatchSize())); |
| } else { |
| // ack no batch message set ackedCount = 1 |
| ackedCount = 1; |
| positionsAcked.add(new MutablePair<>(position, (int) batchSize)); |
| } |
| Consumer ackOwnerConsumer = getAckOwnerConsumer(msgId.getLedgerId(), msgId.getEntryId()); |
| if (msgId.getAckSetsCount() > 0) { |
| long[] ackSets = new long[msgId.getAckSetsCount()]; |
| for (int j = 0; j < msgId.getAckSetsCount(); j++) { |
| ackSets[j] = msgId.getAckSetAt(j); |
| } |
| position.setAckSet(ackSets); |
| ackedCount = getAckedCountForTransactionAck(batchSize, ackSets); |
| } |
| |
| addAndGetUnAckedMsgs(ackOwnerConsumer, -(int) ackedCount); |
| |
| checkCanRemovePendingAcksAndHandle(position, msgId); |
| |
| checkAckValidationError(ack, position); |
| |
| totalAckCount.add(ackedCount); |
| } |
| |
| CompletableFuture<Void> completableFuture = transactionIndividualAcknowledge(ack.getTxnidMostBits(), |
| ack.getTxnidLeastBits(), positionsAcked); |
| if (Subscription.isIndividualAckMode(subType)) { |
| completableFuture.whenComplete((v, e) -> |
| positionsAcked.forEach(positionLongMutablePair -> { |
| if (positionLongMutablePair.getLeft().getAckSet() != null) { |
| if (((PersistentSubscription) subscription) |
| .checkIsCanDeleteConsumerPendingAck(positionLongMutablePair.left)) { |
| removePendingAcks(positionLongMutablePair.left); |
| } |
| } |
| })); |
| } |
| return completableFuture.thenApply(__ -> totalAckCount.sum()); |
| } |
| |
| private long getBatchSize(MessageIdData msgId) { |
| long batchSize = 1; |
| if (Subscription.isIndividualAckMode(subType)) { |
| LongPair longPair = pendingAcks.get(msgId.getLedgerId(), msgId.getEntryId()); |
| // Consumer may ack the msg that not belongs to it. |
| if (longPair == null) { |
| Consumer ackOwnerConsumer = getAckOwnerConsumer(msgId.getLedgerId(), msgId.getEntryId()); |
| longPair = ackOwnerConsumer.getPendingAcks().get(msgId.getLedgerId(), msgId.getEntryId()); |
| if (longPair != null) { |
| batchSize = longPair.first; |
| } |
| } else { |
| batchSize = longPair.first; |
| } |
| } |
| return batchSize; |
| } |
| |
| private long getAckedCountForMsgIdNoAckSets(long batchSize, PositionImpl position, Consumer consumer) { |
| if (isAcknowledgmentAtBatchIndexLevelEnabled && Subscription.isIndividualAckMode(subType)) { |
| long[] cursorAckSet = getCursorAckSet(position); |
| if (cursorAckSet != null) { |
| return getAckedCountForBatchIndexLevelEnabled(position, batchSize, EMPTY_ACK_SET, consumer); |
| } |
| } |
| return batchSize; |
| } |
| |
| private long getAckedCountForBatchIndexLevelEnabled(PositionImpl position, long batchSize, long[] ackSets, |
| Consumer consumer) { |
| long ackedCount = 0; |
| if (isAcknowledgmentAtBatchIndexLevelEnabled && Subscription.isIndividualAckMode(subType) |
| && consumer.getPendingAcks().get(position.getLedgerId(), position.getEntryId()) != null) { |
| long[] cursorAckSet = getCursorAckSet(position); |
| if (cursorAckSet != null) { |
| BitSetRecyclable cursorBitSet = BitSetRecyclable.create().resetWords(cursorAckSet); |
| int lastCardinality = cursorBitSet.cardinality(); |
| BitSetRecyclable givenBitSet = BitSetRecyclable.create().resetWords(ackSets); |
| cursorBitSet.and(givenBitSet); |
| givenBitSet.recycle(); |
| int currentCardinality = cursorBitSet.cardinality(); |
| ackedCount = lastCardinality - currentCardinality; |
| cursorBitSet.recycle(); |
| } else { |
| ackedCount = batchSize - BitSet.valueOf(ackSets).cardinality(); |
| } |
| } |
| return ackedCount; |
| } |
| |
| private long getAckedCountForTransactionAck(long batchSize, long[] ackSets) { |
| BitSetRecyclable bitset = BitSetRecyclable.create().resetWords(ackSets); |
| long ackedCount = batchSize - bitset.cardinality(); |
| bitset.recycle(); |
| return ackedCount; |
| } |
| |
| private long getUnAckedCountForBatchIndexLevelEnabled(PositionImpl position, long batchSize) { |
| long unAckedCount = batchSize; |
| if (isAcknowledgmentAtBatchIndexLevelEnabled) { |
| long[] cursorAckSet = getCursorAckSet(position); |
| if (cursorAckSet != null) { |
| BitSetRecyclable cursorBitSet = BitSetRecyclable.create().resetWords(cursorAckSet); |
| unAckedCount = cursorBitSet.cardinality(); |
| cursorBitSet.recycle(); |
| } |
| } |
| return unAckedCount; |
| } |
| |
| private void checkAckValidationError(CommandAck ack, PositionImpl position) { |
| if (ack.hasValidationError()) { |
| log.error("[{}] [{}] Received ack for corrupted message at {} - Reason: {}", subscription, |
| consumerId, position, ack.getValidationError()); |
| } |
| } |
| |
| private boolean checkCanRemovePendingAcksAndHandle(PositionImpl position, MessageIdData msgId) { |
| if (Subscription.isIndividualAckMode(subType) && msgId.getAckSetsCount() == 0) { |
| return removePendingAcks(position); |
| } |
| return false; |
| } |
| |
| private Consumer getAckOwnerConsumer(long ledgerId, long entryId) { |
| Consumer ackOwnerConsumer = this; |
| if (Subscription.isIndividualAckMode(subType)) { |
| if (!getPendingAcks().containsKey(ledgerId, entryId)) { |
| for (Consumer consumer : subscription.getConsumers()) { |
| if (consumer != this && consumer.getPendingAcks().containsKey(ledgerId, entryId)) { |
| ackOwnerConsumer = consumer; |
| break; |
| } |
| } |
| } |
| } |
| return ackOwnerConsumer; |
| } |
| |
| private long[] getCursorAckSet(PositionImpl position) { |
| if (!(subscription instanceof PersistentSubscription)) { |
| return null; |
| } |
| return (((PersistentSubscription) subscription).getCursor()).getDeletedBatchIndexesAsLongArray(position); |
| } |
| |
| private boolean isTransactionEnabled() { |
| return subscription instanceof PersistentSubscription |
| && ((PersistentTopic) subscription.getTopic()) |
| .getBrokerService().getPulsar().getConfig().isTransactionCoordinatorEnabled(); |
| } |
| |
| private CompletableFuture<Void> transactionIndividualAcknowledge( |
| long txnidMostBits, |
| long txnidLeastBits, |
| List<MutablePair<PositionImpl, Integer>> positionList) { |
| if (subscription instanceof PersistentSubscription) { |
| TxnID txnID = new TxnID(txnidMostBits, txnidLeastBits); |
| return ((PersistentSubscription) subscription).transactionIndividualAcknowledge(txnID, positionList); |
| } else { |
| String error = "Transaction acknowledge only support the `PersistentSubscription`."; |
| log.error(error); |
| return FutureUtil.failedFuture(new TransactionConflictException(error)); |
| } |
| } |
| |
| private CompletableFuture<Void> transactionCumulativeAcknowledge(long txnidMostBits, long txnidLeastBits, |
| List<PositionImpl> positionList) { |
| if (!isTransactionEnabled()) { |
| return FutureUtil.failedFuture( |
| new BrokerServiceException.NotAllowedException("Server don't support transaction ack!")); |
| } |
| if (subscription instanceof PersistentSubscription) { |
| TxnID txnID = new TxnID(txnidMostBits, txnidLeastBits); |
| return ((PersistentSubscription) subscription).transactionCumulativeAcknowledge(txnID, positionList); |
| } else { |
| String error = "Transaction acknowledge only support the `PersistentSubscription`."; |
| log.error(error); |
| return FutureUtil.failedFuture(new TransactionConflictException(error)); |
| } |
| } |
| |
| public void flowPermits(int additionalNumberOfMessages) { |
| checkArgument(additionalNumberOfMessages > 0); |
| this.lastConsumedFlowTimestamp = System.currentTimeMillis(); |
| |
| // block shared consumer when unacked-messages reaches limit |
| if (shouldBlockConsumerOnUnackMsgs() && unackedMessages >= getMaxUnackedMessages()) { |
| blockedConsumerOnUnackedMsgs = true; |
| } |
| int oldPermits; |
| if (!blockedConsumerOnUnackedMsgs) { |
| oldPermits = MESSAGE_PERMITS_UPDATER.getAndAdd(this, additionalNumberOfMessages); |
| if (log.isDebugEnabled()) { |
| log.debug("[{}-{}] Added {} message permits in broker.service.Consumer before updating dispatcher " |
| + "for consumer {}", topicName, subscription, additionalNumberOfMessages, consumerId); |
| } |
| subscription.consumerFlow(this, additionalNumberOfMessages); |
| } else { |
| oldPermits = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndAdd(this, additionalNumberOfMessages); |
| } |
| |
| if (log.isDebugEnabled()) { |
| log.debug("[{}-{}] Added more flow control message permits {} (old was: {}), blocked = {} ", topicName, |
| subscription, additionalNumberOfMessages, oldPermits, blockedConsumerOnUnackedMsgs); |
| } |
| |
| } |
| |
| /** |
| * Triggers dispatcher to dispatch {@code blockedPermits} number of messages and adds same number of permits to |
| * {@code messagePermits} as it maintains count of actual dispatched message-permits. |
| * |
| * @param consumer: |
| * Consumer whose blockedPermits needs to be dispatched |
| */ |
| void flowConsumerBlockedPermits(Consumer consumer) { |
| int additionalNumberOfPermits = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndSet(consumer, 0); |
| // add newly flow permits to actual consumer.messagePermits |
| MESSAGE_PERMITS_UPDATER.getAndAdd(consumer, additionalNumberOfPermits); |
| if (log.isDebugEnabled()){ |
| log.debug("[{}-{}] Added {} blocked permits to broker.service.Consumer for consumer {}", topicName, |
| subscription, additionalNumberOfPermits, consumerId); |
| } |
| // dispatch pending permits to flow more messages: it will add more permits to dispatcher and consumer |
| subscription.consumerFlow(consumer, additionalNumberOfPermits); |
| } |
| |
| public int getAvailablePermits() { |
| return MESSAGE_PERMITS_UPDATER.get(this); |
| } |
| |
| /** |
| * return 0 if there is no entry dispatched yet. |
| */ |
| public int getAvgMessagesPerEntry() { |
| return (int) Math.round(avgMessagesPerEntry.get()); |
| } |
| |
| public boolean isBlocked() { |
| return blockedConsumerOnUnackedMsgs; |
| } |
| |
| public void reachedEndOfTopic() { |
| cnx.getCommandSender().sendReachedEndOfTopic(consumerId); |
| } |
| |
| public void topicMigrated(Optional<ClusterUrl> clusterUrl) { |
| if (clusterUrl.isPresent()) { |
| ClusterUrl url = clusterUrl.get(); |
| cnx.getCommandSender().sendTopicMigrated(ResourceType.Consumer, consumerId, url.getBrokerServiceUrl(), |
| url.getBrokerServiceUrlTls()); |
| // disconnect consumer after sending migrated cluster url |
| disconnect(); |
| } |
| } |
| |
| public boolean checkAndApplyTopicMigration() { |
| if (subscription.isSubscriptionMigrated()) { |
| Optional<ClusterUrl> clusterUrl = AbstractTopic.getMigratedClusterUrl(cnx.getBrokerService().getPulsar(), |
| topicName); |
| if (clusterUrl.isPresent()) { |
| ClusterUrl url = clusterUrl.get(); |
| cnx.getCommandSender().sendTopicMigrated(ResourceType.Consumer, consumerId, url.getBrokerServiceUrl(), |
| url.getBrokerServiceUrlTls()); |
| // disconnect consumer after sending migrated cluster url |
| disconnect(); |
| return true; |
| } |
| } |
| return false; |
| } |
| /** |
| * Checks if consumer-blocking on unAckedMessages is allowed for below conditions:<br/> |
| * a. consumer must have Shared-subscription<br/> |
| * b. {@link this#getMaxUnackedMessages()} value > 0 |
| * |
| * @return |
| */ |
| private boolean shouldBlockConsumerOnUnackMsgs() { |
| return Subscription.isIndividualAckMode(subType) && getMaxUnackedMessages() > 0; |
| } |
| |
| public void updateRates() { |
| msgOut.calculateRate(); |
| chunkedMessageRate.calculateRate(); |
| msgRedeliver.calculateRate(); |
| messageAckRate.calculateRate(); |
| |
| stats.msgRateOut = msgOut.getRate(); |
| stats.msgThroughputOut = msgOut.getValueRate(); |
| stats.msgRateRedeliver = msgRedeliver.getRate(); |
| stats.messageAckRate = messageAckRate.getValueRate(); |
| stats.chunkedMessageRate = chunkedMessageRate.getRate(); |
| } |
| |
| public void updateStats(ConsumerStatsImpl consumerStats) { |
| msgOutCounter.add(consumerStats.msgOutCounter); |
| bytesOutCounter.add(consumerStats.bytesOutCounter); |
| msgOut.recordMultipleEvents(consumerStats.msgOutCounter, consumerStats.bytesOutCounter); |
| lastAckedTimestamp = consumerStats.lastAckedTimestamp; |
| lastConsumedTimestamp = consumerStats.lastConsumedTimestamp; |
| lastConsumedFlowTimestamp = consumerStats.lastConsumedFlowTimestamp; |
| MESSAGE_PERMITS_UPDATER.set(this, consumerStats.availablePermits); |
| if (log.isDebugEnabled()) { |
| log.debug("[{}-{}] Setting broker.service.Consumer's messagePermits to {} for consumer {}", topicName, |
| subscription, consumerStats.availablePermits, consumerId); |
| } |
| unackedMessages = consumerStats.unackedMessages; |
| blockedConsumerOnUnackedMsgs = consumerStats.blockedConsumerOnUnackedMsgs; |
| avgMessagesPerEntry.set(consumerStats.avgMessagesPerEntry); |
| } |
| |
| public ConsumerStatsImpl getStats() { |
| stats.msgOutCounter = msgOutCounter.longValue(); |
| stats.bytesOutCounter = bytesOutCounter.longValue(); |
| stats.lastAckedTimestamp = lastAckedTimestamp; |
| stats.lastConsumedTimestamp = lastConsumedTimestamp; |
| stats.lastConsumedFlowTimestamp = lastConsumedFlowTimestamp; |
| stats.availablePermits = getAvailablePermits(); |
| stats.unackedMessages = unackedMessages; |
| stats.blockedConsumerOnUnackedMsgs = blockedConsumerOnUnackedMsgs; |
| stats.avgMessagesPerEntry = getAvgMessagesPerEntry(); |
| if (readPositionWhenJoining != null) { |
| stats.readPositionWhenJoining = readPositionWhenJoining.toString(); |
| } |
| return stats; |
| } |
| |
| public long getMsgOutCounter() { |
| return msgOutCounter.longValue(); |
| } |
| |
| public long getBytesOutCounter() { |
| return bytesOutCounter.longValue(); |
| } |
| |
| public int getUnackedMessages() { |
| return unackedMessages; |
| } |
| |
| public KeySharedMeta getKeySharedMeta() { |
| return keySharedMeta; |
| } |
| |
| @Override |
| public String toString() { |
| if (subscription != null && cnx != null) { |
| return MoreObjects.toStringHelper(this).add("subscription", subscription).add("consumerId", consumerId) |
| .add("consumerName", consumerName).add("address", this.cnx.clientAddress()).toString(); |
| } else { |
| return MoreObjects.toStringHelper(this).add("consumerId", consumerId) |
| .add("consumerName", consumerName).toString(); |
| } |
| } |
| |
| public CompletableFuture<Void> checkPermissionsAsync() { |
| TopicName topicName = TopicName.get(subscription.getTopicName()); |
| if (cnx.getBrokerService().getAuthorizationService() != null) { |
| AuthenticationDataSubscription authData = |
| new AuthenticationDataSubscription(cnx.getAuthenticationData(), subscription.getName()); |
| return cnx.getBrokerService().getAuthorizationService() |
| .allowTopicOperationAsync(topicName, TopicOperation.CONSUME, appId, authData) |
| .handle((ok, e) -> { |
| if (e != null) { |
| log.warn("[{}] Get unexpected error while authorizing [{}] {}", appId, |
| subscription.getTopicName(), e.getMessage(), e); |
| } |
| |
| if (ok == null || !ok) { |
| log.info("[{}] is not allowed to consume from topic [{}] anymore", appId, |
| subscription.getTopicName()); |
| disconnect(); |
| } |
| return null; |
| }); |
| } |
| return CompletableFuture.completedFuture(null); |
| } |
| |
| @Override |
| public boolean equals(Object obj) { |
| if (obj instanceof Consumer) { |
| Consumer other = (Consumer) obj; |
| return consumerId == other.consumerId && Objects.equals(cnx.clientAddress(), other.cnx.clientAddress()); |
| } |
| return false; |
| } |
| |
| @Override |
| public int hashCode() { |
| return consumerName.hashCode() + 31 * cnx.hashCode(); |
| } |
| |
| /** |
| * first try to remove ack-position from the current_consumer's pendingAcks. |
| * if ack-message doesn't present into current_consumer's pendingAcks |
| * a. try to remove from other connected subscribed consumers (It happens when client |
| * tries to acknowledge message through different consumer under the same subscription) |
| * |
| * |
| * @param position |
| */ |
| private boolean removePendingAcks(PositionImpl position) { |
| Consumer ackOwnedConsumer = null; |
| if (pendingAcks.get(position.getLedgerId(), position.getEntryId()) == null) { |
| for (Consumer consumer : subscription.getConsumers()) { |
| if (!consumer.equals(this) && consumer.getPendingAcks().containsKey(position.getLedgerId(), |
| position.getEntryId())) { |
| ackOwnedConsumer = consumer; |
| break; |
| } |
| } |
| } else { |
| ackOwnedConsumer = this; |
| } |
| |
| // remove pending message from appropriate consumer and unblock unAckMsg-flow if requires |
| LongPair ackedPosition = ackOwnedConsumer != null |
| ? ackOwnedConsumer.getPendingAcks().get(position.getLedgerId(), position.getEntryId()) |
| : null; |
| if (ackedPosition != null) { |
| if (!ackOwnedConsumer.getPendingAcks().remove(position.getLedgerId(), position.getEntryId())) { |
| // Message was already removed by the other consumer |
| return false; |
| } |
| if (log.isDebugEnabled()) { |
| log.debug("[{}-{}] consumer {} received ack {}", topicName, subscription, consumerId, position); |
| } |
| // unblock consumer-throttling when limit check is disabled or receives half of maxUnackedMessages => |
| // consumer can start again consuming messages |
| int unAckedMsgs = UNACKED_MESSAGES_UPDATER.get(ackOwnedConsumer); |
| if ((((unAckedMsgs <= getMaxUnackedMessages() / 2) && ackOwnedConsumer.blockedConsumerOnUnackedMsgs) |
| && ackOwnedConsumer.shouldBlockConsumerOnUnackMsgs()) |
| || !shouldBlockConsumerOnUnackMsgs()) { |
| ackOwnedConsumer.blockedConsumerOnUnackedMsgs = false; |
| flowConsumerBlockedPermits(ackOwnedConsumer); |
| } |
| return true; |
| } |
| return false; |
| } |
| |
| public ConcurrentLongLongPairHashMap getPendingAcks() { |
| return pendingAcks; |
| } |
| |
| public int getPriorityLevel() { |
| return priorityLevel; |
| } |
| |
| public void redeliverUnacknowledgedMessages(long consumerEpoch) { |
| // cleanup unackedMessage bucket and redeliver those unack-msgs again |
| clearUnAckedMsgs(); |
| blockedConsumerOnUnackedMsgs = false; |
| if (log.isDebugEnabled()) { |
| log.debug("[{}-{}] consumer {} received redelivery", topicName, subscription, consumerId); |
| } |
| |
| if (pendingAcks != null) { |
| List<PositionImpl> pendingPositions = new ArrayList<>((int) pendingAcks.size()); |
| MutableInt totalRedeliveryMessages = new MutableInt(0); |
| pendingAcks.forEach((ledgerId, entryId, batchSize, stickyKeyHash) -> { |
| int unAckedCount = (int) getUnAckedCountForBatchIndexLevelEnabled(PositionImpl.get(ledgerId, entryId), |
| batchSize); |
| totalRedeliveryMessages.add(unAckedCount); |
| pendingPositions.add(new PositionImpl(ledgerId, entryId)); |
| }); |
| |
| for (PositionImpl p : pendingPositions) { |
| pendingAcks.remove(p.getLedgerId(), p.getEntryId()); |
| } |
| |
| msgRedeliver.recordMultipleEvents(totalRedeliveryMessages.intValue(), totalRedeliveryMessages.intValue()); |
| subscription.redeliverUnacknowledgedMessages(this, pendingPositions); |
| } else { |
| subscription.redeliverUnacknowledgedMessages(this, consumerEpoch); |
| } |
| |
| flowConsumerBlockedPermits(this); |
| } |
| |
| public void redeliverUnacknowledgedMessages(List<MessageIdData> messageIds) { |
| int totalRedeliveryMessages = 0; |
| List<PositionImpl> pendingPositions = new ArrayList<>(); |
| for (MessageIdData msg : messageIds) { |
| PositionImpl position = PositionImpl.get(msg.getLedgerId(), msg.getEntryId()); |
| LongPair longPair = pendingAcks.get(position.getLedgerId(), position.getEntryId()); |
| if (longPair != null) { |
| int unAckedCount = (int) getUnAckedCountForBatchIndexLevelEnabled(position, longPair.first); |
| pendingAcks.remove(position.getLedgerId(), position.getEntryId()); |
| totalRedeliveryMessages += unAckedCount; |
| pendingPositions.add(position); |
| } |
| } |
| |
| addAndGetUnAckedMsgs(this, -totalRedeliveryMessages); |
| blockedConsumerOnUnackedMsgs = false; |
| |
| if (log.isDebugEnabled()) { |
| log.debug("[{}-{}] consumer {} received {} msg-redelivery {}", topicName, subscription, consumerId, |
| totalRedeliveryMessages, pendingPositions.size()); |
| } |
| |
| subscription.redeliverUnacknowledgedMessages(this, pendingPositions); |
| msgRedeliver.recordMultipleEvents(totalRedeliveryMessages, totalRedeliveryMessages); |
| |
| int numberOfBlockedPermits = PERMITS_RECEIVED_WHILE_CONSUMER_BLOCKED_UPDATER.getAndSet(this, 0); |
| |
| // if permitsReceivedWhileConsumerBlocked has been accumulated then pass it to Dispatcher to flow messages |
| if (numberOfBlockedPermits > 0) { |
| MESSAGE_PERMITS_UPDATER.getAndAdd(this, numberOfBlockedPermits); |
| if (log.isDebugEnabled()) { |
| log.debug("[{}-{}] Added {} blockedPermits to broker.service.Consumer's messagePermits for consumer {}", |
| topicName, subscription, numberOfBlockedPermits, consumerId); |
| } |
| subscription.consumerFlow(this, numberOfBlockedPermits); |
| } |
| } |
| |
| public Subscription getSubscription() { |
| return subscription; |
| } |
| |
| private int addAndGetUnAckedMsgs(Consumer consumer, int ackedMessages) { |
| int unackedMsgs = 0; |
| if (isPersistentTopic && Subscription.isIndividualAckMode(subType)) { |
| subscription.addUnAckedMessages(ackedMessages); |
| unackedMsgs = UNACKED_MESSAGES_UPDATER.addAndGet(consumer, ackedMessages); |
| } |
| if (unackedMsgs < 0 && System.currentTimeMillis() - negtiveUnackedMsgsTimestamp >= 10_000) { |
| negtiveUnackedMsgsTimestamp = System.currentTimeMillis(); |
| log.warn("unackedMsgs is : {}, ackedMessages : {}, consumer : {}", unackedMsgs, ackedMessages, consumer); |
| } |
| return unackedMsgs; |
| } |
| |
| private void clearUnAckedMsgs() { |
| int unaAckedMsgs = UNACKED_MESSAGES_UPDATER.getAndSet(this, 0); |
| subscription.addUnAckedMessages(-unaAckedMsgs); |
| } |
| |
| public boolean isPreciseDispatcherFlowControl() { |
| return preciseDispatcherFlowControl; |
| } |
| |
| public void setReadPositionWhenJoining(PositionImpl readPositionWhenJoining) { |
| this.readPositionWhenJoining = readPositionWhenJoining; |
| } |
| |
| public int getMaxUnackedMessages() { |
| //Unacked messages check is disabled for non-durable subscriptions. |
| if (isDurable && subscription != null) { |
| return subscription.getTopic().getHierarchyTopicPolicies().getMaxUnackedMessagesOnConsumer().get(); |
| } else { |
| return 0; |
| } |
| } |
| |
| |
| public TransportCnx cnx() { |
| return cnx; |
| } |
| |
| public String getClientAddress() { |
| return clientAddress; |
| } |
| |
| public MessageId getStartMessageId() { |
| return startMessageId; |
| } |
| |
| public Map<String, String> getMetadata() { |
| return metadata; |
| } |
| |
| private int getStickyKeyHash(Entry entry) { |
| final byte[] stickyKey; |
| if (entry instanceof EntryAndMetadata) { |
| stickyKey = ((EntryAndMetadata) entry).getStickyKey(); |
| } else { |
| stickyKey = Commands.peekStickyKey(entry.getDataBuffer(), topicName, subscription.getName()); |
| } |
| return StickyKeyConsumerSelector.makeStickyKeyHash(stickyKey); |
| } |
| |
| private static final Logger log = LoggerFactory.getLogger(Consumer.class); |
| } |