blob: 6e28a3cf47efb2d04ead1ab6985c2312eb233ba4 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.persistent;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import static org.apache.pulsar.broker.service.persistent.PersistentTopic.MESSAGE_RATE_BACKOFF_MS;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.AsyncCallbacks.ReadEntriesCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.NoMoreEntriesToReadException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.TooManyRequestsException;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.util.SafeRun;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.AbstractDispatcherSingleActiveConsumer;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.EntryBatchIndexesAcks;
import org.apache.pulsar.broker.service.EntryBatchSizes;
import org.apache.pulsar.broker.service.RedeliveryTracker;
import org.apache.pulsar.broker.service.RedeliveryTrackerDisabled;
import org.apache.pulsar.broker.service.SendMessageInfo;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
import org.apache.pulsar.broker.transaction.buffer.exceptions.TransactionNotSealedException;
import org.apache.pulsar.client.impl.Backoff;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.DispatchRate;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.util.Codec;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PersistentDispatcherSingleActiveConsumer extends AbstractDispatcherSingleActiveConsumer
implements Dispatcher, ReadEntriesCallback {
protected final PersistentTopic topic;
protected final ManagedCursor cursor;
protected final String name;
private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
protected volatile boolean havePendingRead = false;
protected volatile int readBatchSize;
protected final Backoff readFailureBackoff = new Backoff(15, TimeUnit.SECONDS,
1, TimeUnit.MINUTES, 0, TimeUnit.MILLISECONDS);
protected final ServiceConfiguration serviceConfig;
private volatile ScheduledFuture<?> readOnActiveConsumerTask = null;
private final RedeliveryTracker redeliveryTracker;
public PersistentDispatcherSingleActiveConsumer(ManagedCursor cursor, SubType subscriptionType, int partitionIndex,
PersistentTopic topic, Subscription subscription) {
super(subscriptionType, partitionIndex, topic.getName(), subscription);
this.topic = topic;
this.name = topic.getName() + " / " + (cursor.getName() != null ? Codec.decode(cursor.getName())
: ""/* NonDurableCursor doesn't have name */);
this.cursor = cursor;
this.serviceConfig = topic.getBrokerService().pulsar().getConfiguration();
this.readBatchSize = serviceConfig.getDispatcherMaxReadBatchSize();
this.redeliveryTracker = RedeliveryTrackerDisabled.REDELIVERY_TRACKER_DISABLED;
this.initializeDispatchRateLimiterIfNeeded(Optional.empty());
}
protected void scheduleReadOnActiveConsumer() {
cancelPendingRead();
if (havePendingRead) {
return;
}
// When a new consumer is chosen, start delivery from unacked message.
// If there is any pending read operation, let it finish and then rewind
if (subscriptionType != SubType.Failover || serviceConfig.getActiveConsumerFailoverDelayTimeMillis() <= 0) {
if (log.isDebugEnabled()) {
log.debug("[{}] Rewind cursor and read more entries without delay", name);
}
cursor.rewind();
Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
notifyActiveConsumerChanged(activeConsumer);
readMoreEntries(activeConsumer);
return;
}
// If subscription type is Failover, delay rewinding cursor and
// reading more entries in order to prevent message duplication
if (readOnActiveConsumerTask != null) {
return;
}
readOnActiveConsumerTask = topic.getBrokerService().executor().schedule(() -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Rewind cursor and read more entries after {} ms delay", name,
serviceConfig.getActiveConsumerFailoverDelayTimeMillis());
}
cursor.rewind();
Consumer activeConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
notifyActiveConsumerChanged(activeConsumer);
readMoreEntries(activeConsumer);
readOnActiveConsumerTask = null;
}, serviceConfig.getActiveConsumerFailoverDelayTimeMillis(), TimeUnit.MILLISECONDS);
}
protected boolean isConsumersExceededOnSubscription() {
Policies policies = null;
Integer maxConsumersPerSubscription = null;
try {
maxConsumersPerSubscription = Optional.ofNullable(topic.getBrokerService()
.getTopicPolicies(TopicName.get(topicName)))
.map(TopicPolicies::getMaxConsumersPerSubscription)
.orElse(null);
if (maxConsumersPerSubscription == null) {
// Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks in addConsumer
policies = topic.getBrokerService().pulsar().getConfigurationCache().policiesCache()
.getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic.getName()).getNamespace()));
if (policies == null) {
policies = new Policies();
}
}
} catch (Exception e) {
policies = new Policies();
}
if (maxConsumersPerSubscription == null) {
maxConsumersPerSubscription = policies.max_consumers_per_subscription > 0
? policies.max_consumers_per_subscription : serviceConfig.getMaxConsumersPerSubscription();
}
if (maxConsumersPerSubscription > 0 && maxConsumersPerSubscription <= consumers.size()) {
return true;
}
return false;
}
@Override
protected void cancelPendingRead() {
if (havePendingRead && cursor.cancelPendingReadRequest()) {
havePendingRead = false;
}
}
@Override
public void readEntriesComplete(final List<Entry> entries, Object obj) {
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName, SafeRun.safeRun(() -> {
internalReadEntriesComplete(entries, obj);
}));
}
public synchronized void internalReadEntriesComplete(final List<Entry> entries, Object obj) {
Consumer readConsumer = (Consumer) obj;
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Got messages: {}", name, readConsumer, entries.size());
}
havePendingRead = false;
if (readBatchSize < serviceConfig.getDispatcherMaxReadBatchSize()) {
int newReadBatchSize = Math.min(readBatchSize * 2, serviceConfig.getDispatcherMaxReadBatchSize());
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Increasing read batch size from {} to {}", name, readConsumer, readBatchSize,
newReadBatchSize);
}
readBatchSize = newReadBatchSize;
}
readFailureBackoff.reduceToHalf();
Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
if (isKeyHashRangeFiltered) {
Iterator<Entry> iterator = entries.iterator();
while (iterator.hasNext()) {
Entry entry = iterator.next();
byte[] key = peekStickyKey(entry.getDataBuffer());
Consumer consumer = stickyKeyConsumerSelector.select(key);
if (consumer == null || currentConsumer != consumer) {
iterator.remove();
}
}
}
if (currentConsumer == null || readConsumer != currentConsumer) {
// Active consumer has changed since the read request has been issued. We need to rewind the cursor and
// re-issue the read request for the new consumer
if (log.isDebugEnabled()) {
log.debug("[{}] rewind because no available consumer found", name);
}
entries.forEach(Entry::release);
cursor.rewind();
if (currentConsumer != null) {
notifyActiveConsumerChanged(currentConsumer);
readMoreEntries(currentConsumer);
}
} else {
EntryBatchSizes batchSizes = EntryBatchSizes.get(entries.size());
SendMessageInfo sendMessageInfo = SendMessageInfo.getThreadLocal();
EntryBatchIndexesAcks batchIndexesAcks = EntryBatchIndexesAcks.get(entries.size());
filterEntriesForConsumer(entries, batchSizes, sendMessageInfo, batchIndexesAcks, cursor, false);
dispatchEntriesToConsumer(currentConsumer, entries, batchSizes, batchIndexesAcks, sendMessageInfo);
}
}
protected void dispatchEntriesToConsumer(Consumer currentConsumer, List<Entry> entries,
EntryBatchSizes batchSizes, EntryBatchIndexesAcks batchIndexesAcks,
SendMessageInfo sendMessageInfo) {
currentConsumer
.sendMessages(entries, batchSizes, batchIndexesAcks, sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes(), sendMessageInfo.getTotalChunkedMessages(),
redeliveryTracker)
.addListener(future -> {
if (future.isSuccess()) {
// acquire message-dispatch permits for already delivered messages
if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
if (topic.getDispatchRateLimiter().isPresent()) {
topic.getDispatchRateLimiter().get().tryDispatchPermit(sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes());
}
dispatchRateLimiter.ifPresent(rateLimiter ->
rateLimiter.tryDispatchPermit(sendMessageInfo.getTotalMessages(),
sendMessageInfo.getTotalBytes()));
}
// Schedule a new read batch operation only after the previous batch has been written to the socket.
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName,
SafeRun.safeRun(() -> {
synchronized (PersistentDispatcherSingleActiveConsumer.this) {
Consumer newConsumer = getActiveConsumer();
if (newConsumer != null && !havePendingRead) {
readMoreEntries(newConsumer);
} else {
log.debug(
"[{}-{}] Ignoring write future complete."
+ " consumerAvailable={} havePendingRead={}",
name, newConsumer, newConsumer != null, havePendingRead);
}
}
}));
}
});
}
@Override
public void consumerFlow(Consumer consumer, int additionalNumberOfMessages) {
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName, SafeRun.safeRun(() -> {
internalConsumerFlow(consumer, additionalNumberOfMessages);
}));
}
private synchronized void internalConsumerFlow(Consumer consumer, int additionalNumberOfMessages) {
if (havePendingRead) {
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Ignoring flow control message since we already have a pending read req", name,
consumer);
}
} else if (ACTIVE_CONSUMER_UPDATER.get(this) != consumer) {
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Ignoring flow control message since consumer is not active partition consumer", name,
consumer);
}
} else if (readOnActiveConsumerTask != null) {
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Ignoring flow control message since consumer is waiting for cursor to be rewinded",
name, consumer);
}
} else {
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Trigger new read after receiving flow control message", name, consumer);
}
readMoreEntries(consumer);
}
}
@Override
public void redeliverUnacknowledgedMessages(Consumer consumer) {
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName, SafeRun.safeRun(() -> {
internalRedeliverUnacknowledgedMessages(consumer);
}));
}
private synchronized void internalRedeliverUnacknowledgedMessages(Consumer consumer) {
if (consumer != ACTIVE_CONSUMER_UPDATER.get(this)) {
log.info("[{}-{}] Ignoring reDeliverUnAcknowledgedMessages: Only the active consumer can call resend",
name, consumer);
return;
}
if (readOnActiveConsumerTask != null) {
log.info("[{}-{}] Ignoring reDeliverUnAcknowledgedMessages: consumer is waiting for cursor to be rewinded",
name, consumer);
return;
}
cancelPendingRead();
if (!havePendingRead) {
cursor.rewind();
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Cursor rewinded, redelivering unacknowledged messages. ", name, consumer);
}
readMoreEntries(consumer);
} else {
log.info("[{}-{}] Ignoring reDeliverUnAcknowledgedMessages: cancelPendingRequest on cursor failed", name,
consumer);
}
}
@Override
public void redeliverUnacknowledgedMessages(Consumer consumer, List<PositionImpl> positions) {
// We cannot redeliver single messages to single consumers to preserve ordering.
positions.forEach(redeliveryTracker::addIfAbsent);
redeliverUnacknowledgedMessages(consumer);
}
@Override
protected void readMoreEntries(Consumer consumer) {
// consumer can be null when all consumers are disconnected from broker.
// so skip reading more entries if currently there is no active consumer.
if (null == consumer) {
return;
}
if (consumer.getAvailablePermits() > 0) {
int messagesToRead = calculateNumOfMessageToRead(consumer);
if (-1 == messagesToRead) {
// Skip read as topic/dispatcher has exceed the dispatch rate.
return;
}
// Schedule read
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Schedule read of {} messages", name, consumer, messagesToRead);
}
havePendingRead = true;
if (consumer.readCompacted()) {
topic.getCompactedTopic().asyncReadEntriesOrWait(cursor, messagesToRead, this, consumer);
} else {
cursor.asyncReadEntriesOrWait(messagesToRead,
serviceConfig.getDispatcherMaxReadSizeBytes(), this, consumer, topic.getMaxReadPosition());
}
} else {
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Consumer buffer is full, pause reading", name, consumer);
}
}
}
protected int calculateNumOfMessageToRead(Consumer consumer) {
int availablePermits = consumer.getAvailablePermits();
if (!consumer.isWritable()) {
// If the connection is not currently writable, we issue the read request anyway, but for a single
// message. The intent here is to keep use the request as a notification mechanism while avoiding to
// read and dispatch a big batch of messages which will need to wait before getting written to the
// socket.
availablePermits = 1;
}
int messagesToRead = Math.min(availablePermits, readBatchSize);
// if turn of precise dispatcher flow control, adjust the records to read
if (consumer.isPreciseDispatcherFlowControl()) {
int avgMessagesPerEntry = consumer.getAvgMessagesPerEntry();
messagesToRead = Math.min((int) Math.ceil(availablePermits * 1.0 / avgMessagesPerEntry), readBatchSize);
}
// throttle only if: (1) cursor is not active (or flag for throttle-nonBacklogConsumer is enabled) bcz
// active-cursor reads message from cache rather from bookkeeper (2) if topic has reached message-rate
// threshold: then schedule the read after MESSAGE_RATE_BACKOFF_MS
if (serviceConfig.isDispatchThrottlingOnNonBacklogConsumerEnabled() || !cursor.isActive()) {
if (topic.getDispatchRateLimiter().isPresent()
&& topic.getDispatchRateLimiter().get().isDispatchRateLimitingEnabled()) {
DispatchRateLimiter topicRateLimiter = topic.getDispatchRateLimiter().get();
if (!topicRateLimiter.hasMessageDispatchPermit()) {
if (log.isDebugEnabled()) {
log.debug("[{}] message-read exceeded topic message-rate {}/{}, schedule after a {}", name,
topicRateLimiter.getDispatchRateOnMsg(), topicRateLimiter.getDispatchRateOnByte(),
MESSAGE_RATE_BACKOFF_MS);
}
topic.getBrokerService().executor().schedule(() -> {
Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
if (currentConsumer != null && !havePendingRead) {
readMoreEntries(currentConsumer);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Skipping read retry for topic: Current Consumer {},"
+ " havePendingRead {}",
topic.getName(), currentConsumer, havePendingRead);
}
}
}, MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
return -1;
} else {
// if dispatch-rate is in msg then read only msg according to available permit
long availablePermitsOnMsg = topicRateLimiter.getAvailableDispatchRateLimitOnMsg();
if (availablePermitsOnMsg > 0) {
messagesToRead = Math.min(messagesToRead, (int) availablePermitsOnMsg);
}
}
}
if (dispatchRateLimiter.isPresent() && dispatchRateLimiter.get().isDispatchRateLimitingEnabled()) {
if (!dispatchRateLimiter.get().hasMessageDispatchPermit()) {
if (log.isDebugEnabled()) {
log.debug("[{}] message-read exceeded subscription message-rate {}/{},"
+ " schedule after a {}",
name, dispatchRateLimiter.get().getDispatchRateOnMsg(),
dispatchRateLimiter.get().getDispatchRateOnByte(),
MESSAGE_RATE_BACKOFF_MS);
}
topic.getBrokerService().executor().schedule(() -> {
Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
if (currentConsumer != null && !havePendingRead) {
readMoreEntries(currentConsumer);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}] Skipping read retry: Current Consumer {}, havePendingRead {}",
topic.getName(), currentConsumer, havePendingRead);
}
}
}, MESSAGE_RATE_BACKOFF_MS, TimeUnit.MILLISECONDS);
return -1;
} else {
// if dispatch-rate is in msg then read only msg according to available permit
long subPermitsOnMsg = dispatchRateLimiter.get().getAvailableDispatchRateLimitOnMsg();
if (subPermitsOnMsg > 0) {
messagesToRead = Math.min(messagesToRead, (int) subPermitsOnMsg);
}
}
}
}
// If messagesToRead is 0 or less, correct it to 1 to prevent IllegalArgumentException
return Math.max(messagesToRead, 1);
}
@Override
public void readEntriesFailed(ManagedLedgerException exception, Object ctx) {
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName, SafeRun.safeRun(() -> {
internalReadEntriesFailed(exception, ctx);
}));
}
private synchronized void internalReadEntriesFailed(ManagedLedgerException exception, Object ctx) {
havePendingRead = false;
Consumer c = (Consumer) ctx;
long waitTimeMillis = readFailureBackoff.next();
if (exception instanceof NoMoreEntriesToReadException) {
if (cursor.getNumberOfEntriesInBacklog(false) == 0) {
// Topic has been terminated and there are no more entries to read
// Notify the consumer only if all the messages were already acknowledged
consumers.forEach(Consumer::reachedEndOfTopic);
}
} else if (exception.getCause() instanceof TransactionNotSealedException) {
waitTimeMillis = 1;
if (log.isDebugEnabled()) {
log.debug("[{}] Error reading transaction entries : {}, - Retrying to read in {} seconds", name,
exception.getMessage(), waitTimeMillis / 1000.0);
}
} else if (!(exception instanceof TooManyRequestsException)) {
log.error("[{}-{}] Error reading entries at {} : {} - Retrying to read in {} seconds", name, c,
cursor.getReadPosition(), exception.getMessage(), waitTimeMillis / 1000.0);
} else {
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Got throttled by bookies while reading at {} : {} - Retrying to read in {} seconds",
name, c, cursor.getReadPosition(), exception.getMessage(), waitTimeMillis / 1000.0);
}
}
checkNotNull(c);
// Reduce read batch size to avoid flooding bookies with retries
readBatchSize = serviceConfig.getDispatcherMinReadBatchSize();
topic.getBrokerService().executor().schedule(() -> {
// Jump again into dispatcher dedicated thread
topic.getBrokerService().getTopicOrderedExecutor().executeOrdered(topicName, SafeRun.safeRun(() -> {
synchronized (PersistentDispatcherSingleActiveConsumer.this) {
Consumer currentConsumer = ACTIVE_CONSUMER_UPDATER.get(this);
// we should retry the read if we have an active consumer and there is no pending read
if (currentConsumer != null && !havePendingRead) {
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Retrying read operation", name, c);
}
if (currentConsumer != c) {
notifyActiveConsumerChanged(currentConsumer);
}
readMoreEntries(currentConsumer);
} else {
log.info("[{}-{}] Skipping read retry: Current Consumer {}, havePendingRead {}", name, c,
currentConsumer, havePendingRead);
}
}
}));
}, waitTimeMillis, TimeUnit.MILLISECONDS);
}
@Override
public void addUnAckedMessages(int unAckMessages) {
// No-op
}
@Override
public RedeliveryTracker getRedeliveryTracker() {
return redeliveryTracker;
}
@Override
public Optional<DispatchRateLimiter> getRateLimiter() {
return dispatchRateLimiter;
}
@Override
public void updateRateLimiter(DispatchRate dispatchRate) {
if (!this.dispatchRateLimiter.isPresent() && dispatchRate != null) {
this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(topic, Type.SUBSCRIPTION));
}
this.dispatchRateLimiter.ifPresent(limiter -> {
if (dispatchRate != null) {
this.dispatchRateLimiter.get().updateDispatchRate(dispatchRate);
} else {
this.dispatchRateLimiter.get().updateDispatchRate();
}
});
}
@Override
public void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {
if (!dispatchRateLimiter.isPresent() && DispatchRateLimiter
.isDispatchRateNeeded(topic.getBrokerService(), policies, topic.getName(), Type.SUBSCRIPTION)) {
this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(topic, Type.SUBSCRIPTION));
}
}
@Override
public CompletableFuture<Void> close() {
IS_CLOSED_UPDATER.set(this, TRUE);
dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
return disconnectAllConsumers();
}
@Override
public boolean checkAndUnblockIfStuck() {
if (cursor.checkAndUpdateReadPositionChanged()) {
return false;
}
Consumer consumer = ACTIVE_CONSUMER_UPDATER.get(this);
int totalAvailablePermits = consumer.getAvailablePermits();
// consider dispatch is stuck if : dispatcher has backlog, available-permits and there is no pending read
if (totalAvailablePermits > 0 && !havePendingRead && cursor.getNumberOfEntriesInBacklog(false) > 0) {
log.warn("{}-{} Dispatcher is stuck and unblocking by issuing reads", topic.getName(), name);
readMoreEntries(consumer);
return true;
}
return false;
}
private static final Logger log = LoggerFactory.getLogger(PersistentDispatcherSingleActiveConsumer.class);
}