| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.broker.service.persistent; |
| |
| import io.netty.util.concurrent.FastThreadLocal; |
| |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.LinkedHashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Set; |
| import java.util.concurrent.atomic.AtomicInteger; |
| |
| import org.apache.bookkeeper.mledger.Entry; |
| import org.apache.bookkeeper.mledger.ManagedCursor; |
| import org.apache.bookkeeper.mledger.Position; |
| import org.apache.bookkeeper.mledger.impl.PositionImpl; |
| import org.apache.pulsar.broker.ServiceConfiguration; |
| import org.apache.pulsar.broker.service.BrokerServiceException; |
| import org.apache.pulsar.broker.service.ConsistentHashingStickyKeyConsumerSelector; |
| import org.apache.pulsar.broker.service.Consumer; |
| import org.apache.pulsar.broker.service.EntryBatchIndexesAcks; |
| import org.apache.pulsar.broker.service.EntryBatchSizes; |
| import org.apache.pulsar.broker.service.HashRangeAutoSplitStickyKeyConsumerSelector; |
| import org.apache.pulsar.broker.service.HashRangeExclusiveStickyKeyConsumerSelector; |
| import org.apache.pulsar.broker.service.SendMessageInfo; |
| import org.apache.pulsar.broker.service.StickyKeyConsumerSelector; |
| import org.apache.pulsar.broker.service.Subscription; |
| import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType; |
| import org.apache.pulsar.common.api.proto.PulsarApi.KeySharedMeta; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDispatcherMultipleConsumers { |
| |
| private final boolean allowOutOfOrderDelivery; |
| private final StickyKeyConsumerSelector selector; |
| |
| private boolean isDispatcherStuckOnReplays = false; |
| |
| /** |
| * When a consumer joins, it will be added to this map with the current read position. |
| * This means that, in order to preserve ordering, new consumers can only receive old |
| * messages, until the mark-delete position will move past this point. |
| */ |
| private final LinkedHashMap<Consumer, PositionImpl> recentlyJoinedConsumers; |
| |
| private final Set<Consumer> stuckConsumers; |
| private final Set<Consumer> nextStuckConsumers; |
| |
| PersistentStickyKeyDispatcherMultipleConsumers(PersistentTopic topic, ManagedCursor cursor, |
| Subscription subscription, ServiceConfiguration conf, KeySharedMeta ksm) { |
| super(topic, cursor, subscription); |
| |
| this.allowOutOfOrderDelivery = ksm.getAllowOutOfOrderDelivery(); |
| this.recentlyJoinedConsumers = allowOutOfOrderDelivery ? null : new LinkedHashMap<>(); |
| this.stuckConsumers = new HashSet<>(); |
| this.nextStuckConsumers = new HashSet<>(); |
| |
| switch (ksm.getKeySharedMode()) { |
| case AUTO_SPLIT: |
| if (conf.isSubscriptionKeySharedUseConsistentHashing()) { |
| selector = new ConsistentHashingStickyKeyConsumerSelector( |
| conf.getSubscriptionKeySharedConsistentHashingReplicaPoints()); |
| } else { |
| selector = new HashRangeAutoSplitStickyKeyConsumerSelector(); |
| } |
| break; |
| |
| case STICKY: |
| this.selector = new HashRangeExclusiveStickyKeyConsumerSelector(); |
| break; |
| |
| default: |
| throw new IllegalArgumentException("Invalid key-shared mode: " + ksm.getKeySharedMode()); |
| } |
| } |
| |
| @Override |
| public synchronized void addConsumer(Consumer consumer) throws BrokerServiceException { |
| super.addConsumer(consumer); |
| try { |
| selector.addConsumer(consumer); |
| } catch (BrokerServiceException e) { |
| consumerSet.removeAll(consumer); |
| consumerList.remove(consumer); |
| throw e; |
| } |
| |
| PositionImpl readPositionWhenJoining = (PositionImpl) cursor.getReadPosition(); |
| consumer.setReadPositionWhenJoining(readPositionWhenJoining); |
| // If this was the 1st consumer, or if all the messages are already acked, then we |
| // don't need to do anything special |
| if (!allowOutOfOrderDelivery |
| && recentlyJoinedConsumers != null |
| && consumerList.size() > 1 |
| && cursor.getNumberOfEntriesSinceFirstNotAckedMessage() > 1) { |
| recentlyJoinedConsumers.put(consumer, readPositionWhenJoining); |
| } |
| } |
| |
| @Override |
| public synchronized void removeConsumer(Consumer consumer) throws BrokerServiceException { |
| super.removeConsumer(consumer); |
| selector.removeConsumer(consumer); |
| if (recentlyJoinedConsumers != null) { |
| recentlyJoinedConsumers.remove(consumer); |
| if (consumerList.size() == 1) { |
| recentlyJoinedConsumers.clear(); |
| } |
| } |
| } |
| |
| private static final FastThreadLocal<Map<Consumer, List<Entry>>> localGroupedEntries = new FastThreadLocal<Map<Consumer, List<Entry>>>() { |
| @Override |
| protected Map<Consumer, List<Entry>> initialValue() throws Exception { |
| return new HashMap<>(); |
| } |
| }; |
| |
| @Override |
| protected void sendMessagesToConsumers(ReadType readType, List<Entry> entries) { |
| long totalMessagesSent = 0; |
| long totalBytesSent = 0; |
| int entriesCount = entries.size(); |
| |
| // Trigger read more messages |
| if (entriesCount == 0) { |
| readMoreEntries(); |
| return; |
| } |
| |
| if (consumerSet.isEmpty()) { |
| entries.forEach(Entry::release); |
| cursor.rewind(); |
| return; |
| } |
| |
| nextStuckConsumers.clear(); |
| |
| final Map<Consumer, List<Entry>> groupedEntries = localGroupedEntries.get(); |
| groupedEntries.clear(); |
| |
| for (Entry entry : entries) { |
| Consumer c = selector.select(peekStickyKey(entry.getDataBuffer())); |
| groupedEntries.computeIfAbsent(c, k -> new ArrayList<>()).add(entry); |
| } |
| |
| AtomicInteger keyNumbers = new AtomicInteger(groupedEntries.size()); |
| |
| for (Map.Entry<Consumer, List<Entry>> current : groupedEntries.entrySet()) { |
| Consumer consumer = current.getKey(); |
| List<Entry> entriesWithSameKey = current.getValue(); |
| int entriesWithSameKeyCount = entriesWithSameKey.size(); |
| final int availablePermits = Math.max(consumer.getAvailablePermits(), 0); |
| int maxMessagesForC = Math.min(entriesWithSameKeyCount, availablePermits); |
| int messagesForC = getRestrictedMaxEntriesForConsumer(consumer, entriesWithSameKey, maxMessagesForC, readType); |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] select consumer {} with messages num {}, read type is {}", |
| name, consumer.consumerName(), messagesForC, readType); |
| } |
| |
| if (messagesForC < entriesWithSameKeyCount) { |
| // We are not able to push all the messages with given key to its consumer, |
| // so we discard for now and mark them for later redelivery |
| for (int i = messagesForC; i < entriesWithSameKeyCount; i++) { |
| Entry entry = entriesWithSameKey.get(i); |
| messagesToRedeliver.add(entry.getLedgerId(), entry.getEntryId()); |
| entry.release(); |
| entriesWithSameKey.set(i, null); |
| } |
| } |
| |
| if (messagesForC > 0) { |
| // remove positions first from replay list first : sendMessages recycles entries |
| if (readType == ReadType.Replay) { |
| for (int i = 0; i < messagesForC; i++) { |
| Entry entry = entriesWithSameKey.get(i); |
| messagesToRedeliver.remove(entry.getLedgerId(), entry.getEntryId()); |
| } |
| } |
| |
| SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal(); |
| EntryBatchSizes batchSizes = EntryBatchSizes.get(messagesForC); |
| EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(messagesForC); |
| filterEntriesForConsumer(entriesWithSameKey, batchSizes, sendMessageInfo, batchIndexesAcks, cursor); |
| |
| consumer.sendMessages(entriesWithSameKey, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(), |
| sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(), |
| getRedeliveryTracker()).addListener(future -> { |
| if (future.isSuccess() && keyNumbers.decrementAndGet() == 0) { |
| readMoreEntries(); |
| } |
| }); |
| |
| TOTAL_AVAILABLE_PERMITS_UPDATER.getAndAdd(this, -(sendMessageInfo.getTotalMessages() - batchIndexesAcks.getTotalAckedIndexCount())); |
| totalMessagesSent += sendMessageInfo.getTotalMessages(); |
| totalBytesSent += sendMessageInfo.getTotalBytes(); |
| } |
| } |
| |
| // acquire message-dispatch permits for already delivered messages |
| if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) { |
| if (topic.getDispatchRateLimiter().isPresent()) { |
| topic.getDispatchRateLimiter().get().tryDispatchPermit(totalMessagesSent, totalBytesSent); |
| } |
| |
| if (dispatchRateLimiter.isPresent()) { |
| dispatchRateLimiter.get().tryDispatchPermit(totalMessagesSent, totalBytesSent); |
| } |
| } |
| |
| stuckConsumers.clear(); |
| |
| if (totalMessagesSent == 0 && recentlyJoinedConsumers != null && recentlyJoinedConsumers.isEmpty()) { |
| // This means, that all the messages we've just read cannot be dispatched right now. |
| // This condition can only happen when: |
| // 1. We have consumers ready to accept messages (otherwise the would not haven been triggered) |
| // 2. All keys in the current set of messages are routing to consumers that are currently busy |
| // and stuck is not caused by stuckConsumers |
| // |
| // The solution here is to move on and read next batch of messages which might hopefully contain |
| // also keys meant for other consumers. |
| // |
| // We do it unless that are "recently joined consumers". In that case, we would be looking |
| // ahead in the stream while the new consumers are not ready to accept the new messages, |
| // therefore would be most likely only increase the distance between read-position and mark-delete |
| // position. |
| if (!nextStuckConsumers.isEmpty()) { |
| isDispatcherStuckOnReplays = true; |
| stuckConsumers.addAll(nextStuckConsumers); |
| } |
| // readMoreEntries should run regardless whether or not stuck is caused by stuckConsumers for avoid stopping dispatch. |
| readMoreEntries(); |
| } |
| } |
| |
| private int getRestrictedMaxEntriesForConsumer(Consumer consumer, List<Entry> entries, int maxMessages, ReadType readType) { |
| if (maxMessages == 0) { |
| // the consumer was stuck |
| nextStuckConsumers.add(consumer); |
| return 0; |
| } |
| |
| if (recentlyJoinedConsumers == null) { |
| return maxMessages; |
| } |
| |
| PositionImpl maxReadPosition = recentlyJoinedConsumers.get(consumer); |
| if (maxReadPosition == null) { |
| // stop to dispatch by stuckConsumers |
| if (stuckConsumers.contains(consumer)) { |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] stop to dispatch by stuckConsumers, consumer: {}", name, consumer); |
| } |
| return 0; |
| } |
| // The consumer has not recently joined, so we can send all messages |
| return maxMessages; |
| } |
| |
| PositionImpl markDeletePosition = (PositionImpl) cursor.getMarkDeletedPosition(); |
| |
| if (maxReadPosition.compareTo(markDeletePosition.getNext()) <= 0) { |
| // At this point, all the old messages were already consumed and this consumer |
| // is now ready to receive any message |
| recentlyJoinedConsumers.remove(consumer); |
| return maxMessages; |
| } |
| |
| // If the read type is Replay, we should avoid send messages that hold by other consumer to the new consumers, |
| // For example, we have 10 messages [0,1,2,3,4,5,6,7,8,9] |
| // If the consumer0 get message 0 and 1, and does not acked message 0, then consumer1 joined, |
| // when consumer1 get message 2,3, the broker will not dispatch messages to consumer1 |
| // because of the mark delete position did not move forward. So message 2,3 will stored in the redeliver tracker. |
| // Now, consumer2 joined, it will read new messages from the cursor, so the recentJoinedPosition is 4 for consumer2 |
| // Because of there are messages need to redeliver, so the broker will read the redelivery message first [2,3] |
| // message [2,3] is lower than the recentJoinedPosition 4, so the message [2,3] will dispatched to the consumer2 |
| // But the message [2,3] should not dispatch to consumer2. |
| |
| if (readType == ReadType.Replay) { |
| PositionImpl minReadPositionForRecentJoinedConsumer = recentlyJoinedConsumers.values().iterator().next(); |
| if (minReadPositionForRecentJoinedConsumer != null && minReadPositionForRecentJoinedConsumer.compareTo(maxReadPosition) < 0) { |
| maxReadPosition = minReadPositionForRecentJoinedConsumer; |
| } |
| } |
| // Here, the consumer is one that has recently joined, so we can only send messages that were |
| // published before it has joined. |
| for (int i = 0; i < maxMessages; i++) { |
| if (((PositionImpl) entries.get(i).getPosition()).compareTo(maxReadPosition) >= 0) { |
| // We have already crossed the divider line. All messages in the list are now |
| // newer than what we can currently dispatch to this consumer |
| return i; |
| } |
| } |
| |
| return maxMessages; |
| } |
| |
| @Override |
| public synchronized void acknowledgementWasProcessed() { |
| if (recentlyJoinedConsumers != null && !recentlyJoinedConsumers.isEmpty()) { |
| // After we process acks, we need to check whether the mark-delete position was advanced and we can finally |
| // read more messages. It's safe to call readMoreEntries() multiple times. |
| readMoreEntries(); |
| } |
| } |
| |
| protected synchronized Set<PositionImpl> getMessagesToReplayNow(int maxMessagesToRead) { |
| if (isDispatcherStuckOnReplays) { |
| // If we're stuck on replay, we want to move forward reading on the topic (until the overall max-unacked |
| // messages kicks in), instead of keep replaying the same old messages, since the consumer that these |
| // messages are routing to might be busy at the moment |
| this.isDispatcherStuckOnReplays = false; |
| return Collections.emptySet(); |
| } else { |
| return super.getMessagesToReplayNow(maxMessagesToRead); |
| } |
| } |
| |
| @Override |
| public SubType getType() { |
| return SubType.Key_Shared; |
| } |
| |
| @Override |
| protected Set<? extends Position> asyncReplayEntries(Set<? extends Position> positions) { |
| return cursor.asyncReplayEntries(positions, this, ReadType.Replay, true); |
| } |
| |
| private static final Logger log = LoggerFactory.getLogger(PersistentStickyKeyDispatcherMultipleConsumers.class); |
| |
| } |