blob: 8f05530f58bfa0c7467bd56c9c756ff0103417c4 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
import io.netty.util.concurrent.FastThreadLocal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.client.api.Range;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.api.proto.KeySharedMode;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDispatcherMultipleConsumers {
private final boolean allowOutOfOrderDelivery;
private final StickyKeyConsumerSelector selector;
private boolean isDispatcherStuckOnReplays = false;
private final KeySharedMode keySharedMode;
* When a consumer joins, it will be added to this map with the current read position.
* This means that, in order to preserve ordering, new consumers can only receive old
* messages, until the mark-delete position will move past this point.
private final LinkedHashMap<Consumer, PositionImpl> recentlyJoinedConsumers;
PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor,
Subscription subscription, ServiceConfiguration conf, KeySharedMeta ksm) {
super(topic, cursor, subscription, ksm.isAllowOutOfOrderDelivery());
this.allowOutOfOrderDelivery = ksm.isAllowOutOfOrderDelivery();
this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? null : new LinkedHashMap<>();
this.keySharedMode = ksm.getKeySharedMode();
switch (this.keySharedMode) {
if (conf.isSubscriptionKeySharedUseConsistentHashing()) {
selector = new ConsistentHashingStickyKeyConsumerSelector(
} else {
selector = new HashRangeAutoSplitStickyKeyConsumerSelector();
case STICKY:
this.selector = new HashRangeExclusiveStickyKeyConsumerSelector();
throw new IllegalArgumentException("Invalid key-shared mode: " + keySharedMode);
public StickyKeyConsumerSelector getSelector() {
return selector;
public synchronized CompletableFuture<Void> addConsumer(Consumer consumer) {
if (IS_CLOSED_UPDATER.get(this) == TRUE) {
log.warn("[{}] Dispatcher is already closed. Closing consumer {}", name, consumer);
return CompletableFuture.completedFuture(null);
return super.addConsumer(consumer).thenCompose(__ ->
selector.addConsumer(consumer).handle((result, ex) -> {
if (ex != null) {
synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this) {
throw FutureUtil.wrapToCompletionException(ex);
return result;
).thenRun(() -> {
synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this) {
PositionImpl readPositionWhenJoining = (PositionImpl) cursor.getReadPosition();
// If this was the 1st consumer, or if all the messages are already acked, then we
// don't need to do anything special
if (!allowOutOfOrderDelivery
&& recentlyJoinedConsumers != null
&& consumerList.size() > 1
&& cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) {
recentlyJoinedConsumers.put(consumer, readPositionWhenJoining);
public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException {
// The consumer must be removed from the selector before calling the superclass removeConsumer method.
// In the superclass removeConsumer method, the pending acks that the consumer has are added to
// redeliveryMessages. If the consumer has not been removed from the selector at this point,
// the broker will try to redeliver the messages to the consumer that has already been closed.
// As a result, the messages are not redelivered to any consumer, and the mark-delete position does not move,
// eventually causing all consumers to get stuck.
if (recentlyJoinedConsumers != null) {
if (consumerList.size() == 1) {
if (removeConsumersFromRecentJoinedConsumers() || !redeliveryMessages.isEmpty()) {
private static final FastThreadLocal<Map<Consumer, List<Entry>>> localGroupedEntries =
new FastThreadLocal<Map<Consumer, List<Entry>>>() {
protected Map<Consumer, List<Entry>> initialValue() throws Exception {
return new HashMap<>();
protected synchronized boolean trySendMessagesToConsumers(ReadType readType, List<Entry> entries) {
long totalMessagesSent = 0;
long totalBytesSent = 0;
long totalEntries = 0;
int entriesCount = entries.size();
// Trigger read more messages
if (entriesCount == 0) {
return true;
if (consumerSet.isEmpty()) {
return false;
// A corner case that we have to retry a readMoreEntries in order to preserver order delivery.
// This may happen when consumer closed. See issue #12885 for details.
if (!allowOutOfOrderDelivery) {
NavigableSet<PositionImpl> messagesToReplayNow = this.getMessagesToReplayNow(1);
if (messagesToReplayNow != null && !messagesToReplayNow.isEmpty()) {
PositionImpl replayPosition = messagesToReplayNow.first();
// We have received a message potentially from the delayed tracker and, since we're not using it
// right now, it needs to be added to the redelivery tracker or we won't attempt anymore to
// resend it (until we disconnect consumer).
redeliveryMessages.add(replayPosition.getLedgerId(), replayPosition.getEntryId());
if (this.minReplayedPosition != null) {
// If relayPosition is a new entry wither smaller position is inserted for redelivery during this
// async read, it is possible that this relayPosition should dispatch to consumer first. So in
// order to preserver order delivery, we need to discard this read result, and try to trigger a
// replay read, that containing "relayPosition", by calling readMoreEntries.
if (replayPosition.compareTo(minReplayedPosition) < 0) {
if (log.isDebugEnabled()) {
log.debug("[{}] Position {} (<{}) is inserted for relay during current {} read, "
+ "discard this read and retry with readMoreEntries.",
name, replayPosition, minReplayedPosition, readType);
if (readType == ReadType.Normal) {
entries.forEach(entry -> {
long stickyKeyHash = getStickyKeyHash(entry);
addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash);
} else if (readType == ReadType.Replay) {
return true;
final Map<Consumer, List<Entry>> groupedEntries = localGroupedEntries.get();
final Map<Consumer, Set<Integer>> consumerStickyKeyHashesMap = new HashMap<>();
for (Entry entry : entries) {
int stickyKeyHash = getStickyKeyHash(entry);
Consumer c =;
if (c != null) {
groupedEntries.computeIfAbsent(c, k -> new ArrayList<>()).add(entry);
consumerStickyKeyHashesMap.computeIfAbsent(c, k -> new HashSet<>()).add(stickyKeyHash);
} else {
addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash);
AtomicInteger keyNumbers = new AtomicInteger(groupedEntries.size());
int currentThreadKeyNumber = groupedEntries.size();
if (currentThreadKeyNumber == 0) {
currentThreadKeyNumber = -1;
for (Map.Entry<Consumer, List<Entry>> current : groupedEntries.entrySet()) {
Consumer consumer = current.getKey();
assert consumer != null; // checked when added to groupedEntries
List<Entry> entriesWithSameKey = current.getValue();
int entriesWithSameKeyCount = entriesWithSameKey.size();
int availablePermits = Math.max(consumer.getAvailablePermits(), 0);
if (consumer.getMaxUnackedMessages() > 0) {
int remainUnAckedMessages =
// Avoid negative number
Math.max(consumer.getMaxUnackedMessages() - consumer.getUnackedMessages(), 0);
availablePermits = Math.min(availablePermits, remainUnAckedMessages);
int maxMessagesForC = Math.min(entriesWithSameKeyCount, availablePermits);
int messagesForC = getRestrictedMaxEntriesForConsumer(consumer, entriesWithSameKey, maxMessagesForC,
readType, consumerStickyKeyHashesMap.get(consumer));
if (log.isDebugEnabled()) {
log.debug("[{}] select consumer {} with messages num {}, read type is {}",
name, consumer.consumerName(), messagesForC, readType);
if (messagesForC < entriesWithSameKeyCount) {
// We are not able to push all the messages with given key to its consumer,
// so we discard for now and mark them for later redelivery
for (int i = messagesForC; i < entriesWithSameKeyCount; i++) {
Entry entry = entriesWithSameKey.get(i);
long stickyKeyHash = getStickyKeyHash(entry);
addMessageToReplay(entry.getLedgerId(), entry.getEntryId(), stickyKeyHash);
entriesWithSameKey.set(i, null);
if (messagesForC > 0) {
// remove positions first from replay list first : sendMessages recycles entries
if (readType == ReadType.Replay) {
for (int i = 0; i < messagesForC; i++) {
Entry entry = entriesWithSameKey.get(i);
redeliveryMessages.remove(entry.getLedgerId(), entry.getEntryId());
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchSizes batchSizes = EntryBatchSizes.get(messagesForC);
EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(messagesForC);
totalEntries += filterEntriesForConsumer(entriesWithSameKey, batchSizes, sendMessageInfo,
batchIndexesAcks, cursor, readType == ReadType.Replay, consumer);
consumer.sendMessages(entriesWithSameKey, batchSizes, batchIndexesAcks,
sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(),
getRedeliveryTracker()).addListener(future -> {
if (future.isDone() && keyNumbers.decrementAndGet() == 0) {
-(sendMessageInfo.getTotalMessages() - batchIndexesAcks.getTotalAckedIndexCount()));
totalMessagesSent += sendMessageInfo.getTotalMessages();
totalBytesSent += sendMessageInfo.getTotalBytes();
} else {
currentThreadKeyNumber = keyNumbers.decrementAndGet();
// acquire message-dispatch permits for already delivered messages
acquirePermitsForDeliveredMessages(topic, cursor, totalEntries, totalMessagesSent, totalBytesSent);
if (totalMessagesSent == 0 && (recentlyJoinedConsumers == null || recentlyJoinedConsumers.isEmpty())) {
// This means, that all the messages we've just read cannot be dispatched right now.
// This condition can only happen when:
// 1. We have consumers ready to accept messages (otherwise the would not haven been triggered)
// 2. All keys in the current set of messages are routing to consumers that are currently busy
// The solution here is to move on and read next batch of messages which might hopefully contain
// also keys meant for other consumers.
// We do it unless that are "recently joined consumers". In that case, we would be looking
// ahead in the stream while the new consumers are not ready to accept the new messages,
// therefore would be most likely only increase the distance between read-position and mark-delete
// position.
isDispatcherStuckOnReplays = true;
return true;
} else if (currentThreadKeyNumber == 0) {
return true;
return false;
private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<Entry> entries, int maxMessages,
ReadType readType, Set<Integer> stickyKeyHashes) {
if (maxMessages == 0) {
return 0;
if (readType == ReadType.Normal && stickyKeyHashes != null
&& redeliveryMessages.containsStickyKeyHashes(stickyKeyHashes)) {
// If redeliveryMessages contains messages that correspond to the same hash as the messages
// that the dispatcher is trying to send, do not send those messages for order guarantee
return 0;
if (recentlyJoinedConsumers == null) {
return maxMessages;
PositionImpl maxReadPosition = recentlyJoinedConsumers.get(consumer);
// At this point, all the old messages were already consumed and this consumer
// is now ready to receive any message
if (maxReadPosition == null) {
// The consumer has not recently joined, so we can send all messages
return maxMessages;
// If the read type is Replay, we should avoid send messages that hold by other consumer to the new consumers,
// For example, we have 10 messages [0,1,2,3,4,5,6,7,8,9]
// If the consumer0 get message 0 and 1, and does not acked message 0, then consumer1 joined,
// when consumer1 get message 2,3, the broker will not dispatch messages to consumer1
// because of the mark delete position did not move forward.
// So message 2,3 will stored in the redeliver tracker.
// Now, consumer2 joined, it will read new messages from the cursor,
// so the recentJoinedPosition is 4 for consumer2
// Because of there are messages need to redeliver, so the broker will read the redelivery message first [2,3]
// message [2,3] is lower than the recentJoinedPosition 4,
// so the message [2,3] will dispatched to the consumer2
// But the message [2,3] should not dispatch to consumer2.
if (readType == ReadType.Replay) {
PositionImpl minReadPositionForRecentJoinedConsumer = recentlyJoinedConsumers.values().iterator().next();
if (minReadPositionForRecentJoinedConsumer != null
&& minReadPositionForRecentJoinedConsumer.compareTo(maxReadPosition) < 0) {
maxReadPosition = minReadPositionForRecentJoinedConsumer;
// Here, the consumer is one that has recently joined, so we can only send messages that were
// published before it has joined.
for (int i = 0; i < maxMessages; i++) {
if (((PositionImpl) entries.get(i).getPosition()).compareTo(maxReadPosition) >= 0) {
// We have already crossed the divider line. All messages in the list are now
// newer than what we can currently dispatch to this consumer
return i;
return maxMessages;
public void markDeletePositionMoveForward() {
// Execute the notification in different thread to avoid a mutex chain here
// from the delete operation that was completed
topic.getBrokerService().getTopicOrderedExecutor().execute(() -> {
synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this) {
if (recentlyJoinedConsumers != null && !recentlyJoinedConsumers.isEmpty()
&& removeConsumersFromRecentJoinedConsumers()) {
// After we process acks, we need to check whether the mark-delete position was advanced and we
// can finally read more messages. It's safe to call readMoreEntries() multiple times.
private boolean removeConsumersFromRecentJoinedConsumers() {
Iterator<Map.Entry<Consumer, PositionImpl>> itr = recentlyJoinedConsumers.entrySet().iterator();
boolean hasConsumerRemovedFromTheRecentJoinedConsumers = false;
PositionImpl mdp = (PositionImpl) cursor.getMarkDeletedPosition();
if (mdp != null) {
PositionImpl nextPositionOfTheMarkDeletePosition =
((ManagedLedgerImpl) cursor.getManagedLedger()).getNextValidPosition(mdp);
while (itr.hasNext()) {
Map.Entry<Consumer, PositionImpl> entry =;
if (entry.getValue().compareTo(nextPositionOfTheMarkDeletePosition) <= 0) {
hasConsumerRemovedFromTheRecentJoinedConsumers = true;
} else {
return hasConsumerRemovedFromTheRecentJoinedConsumers;
protected synchronized NavigableSet<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) {
if (isDispatcherStuckOnReplays) {
// If we're stuck on replay, we want to move forward reading on the topic (until the overall max-unacked
// messages kicks in), instead of keep replaying the same old messages, since the consumer that these
// messages are routing to might be busy at the moment
this.isDispatcherStuckOnReplays = false;
return Collections.emptyNavigableSet();
} else {
return super.getMessagesToReplayNow(maxMessagesToRead);
public SubType getType() {
return SubType.Key_Shared;
protected Set<? extends Position> asyncReplayEntries(Set<? extends Position> positions) {
return cursor.asyncReplayEntries(positions, this, ReadType.Replay, true);
public KeySharedMode getKeySharedMode() {
return this.keySharedMode;
public boolean isAllowOutOfOrderDelivery() {
return this.allowOutOfOrderDelivery;
public boolean hasSameKeySharedPolicy(KeySharedMeta ksm) {
return (ksm.getKeySharedMode() == this.keySharedMode
&& ksm.isAllowOutOfOrderDelivery() == this.allowOutOfOrderDelivery);
public LinkedHashMap<Consumer, PositionImpl> getRecentlyJoinedConsumers() {
return recentlyJoinedConsumers;
public Map<Consumer, List<Range>> getConsumerKeyHashRanges() {
return selector.getConsumerKeyHashRanges();
private static final Logger log = LoggerFactory.getLogger(PersistentStickyKeyDispatcherMultipleConsumers.class);