blob: d87707abddfad55bda4e520da3944151e55272ee [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.persistent;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static org.apache.commons.lang3.StringUtils.isBlank;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import com.carrotsearch.hppc.ObjectObjectHashMap;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.FastThreadLocal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.function.BiFunction;
import java.util.stream.Collectors;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.AsyncCallbacks.AddEntryCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.CloseCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OffloadCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenCursorCallback;
import org.apache.bookkeeper.mledger.AsyncCallbacks.TerminateCallback;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedCursor.IndividualDeletedEntries;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerAlreadyClosedException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerFencedException;
import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerTerminatedException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.net.BookieId;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.AlreadyRunningException;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicClosedException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicFencedException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
import org.apache.pulsar.broker.service.BrokerServiceException.UnsupportedVersionException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Dispatcher;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.TransportCnx;
import org.apache.pulsar.broker.service.StreamingStats;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TopicPolicyListener;
import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter.Type;
import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.broker.stats.ReplicationMetrics;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.client.admin.LongRunningProcessStatus;
import org.apache.pulsar.client.admin.OffloadProcessStatus;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.BatchMessageIdImpl;
import org.apache.pulsar.client.impl.MessageIdImpl;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.api.proto.PulsarApi.MessageIdData;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.CursorStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.LedgerInfo;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.policies.data.ReplicatorStats;
import org.apache.pulsar.common.policies.data.RetentionPolicies;
import org.apache.pulsar.common.policies.data.SubscriptionStats;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.Codec;
import org.apache.pulsar.common.util.DateFormatter;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.compaction.CompactedTopic;
import org.apache.pulsar.compaction.CompactedTopicImpl;
import org.apache.pulsar.compaction.Compactor;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.utils.StatsOutputStream;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class PersistentTopic extends AbstractTopic implements Topic, AddEntryCallback, TopicPolicyListener<TopicPolicies> {
// Managed ledger associated with the topic
protected final ManagedLedger ledger;
// Subscriptions to this topic
private final ConcurrentOpenHashMap<String, PersistentSubscription> subscriptions;
private final ConcurrentOpenHashMap<String, Replicator> replicators;
protected static final AtomicLongFieldUpdater<PersistentTopic> USAGE_COUNT_UPDATER =
AtomicLongFieldUpdater.newUpdater(PersistentTopic.class, "usageCount");
@SuppressWarnings("unused")
private volatile long usageCount = 0;
static final String DEDUPLICATION_CURSOR_NAME = "pulsar.dedup";
private static final double MESSAGE_EXPIRY_THRESHOLD = 1.5;
private static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60;
// topic has every published chunked message since topic is loaded
public boolean msgChunkPublished;
private Optional<DispatchRateLimiter> dispatchRateLimiter = Optional.empty();
private Optional<SubscribeRateLimiter> subscribeRateLimiter = Optional.empty();
public volatile long delayedDeliveryTickTimeMillis = 1000;
private final long backloggedCursorThresholdEntries;
public volatile boolean delayedDeliveryEnabled = false;
public static final int MESSAGE_RATE_BACKOFF_MS = 1000;
protected final MessageDeduplication messageDeduplication;
private static final long COMPACTION_NEVER_RUN = -0xfebecffeL;
private CompletableFuture<Long> currentCompaction = CompletableFuture.completedFuture(COMPACTION_NEVER_RUN);
private final CompactedTopic compactedTopic;
private CompletableFuture<MessageIdImpl> currentOffload = CompletableFuture.completedFuture(
(MessageIdImpl)MessageId.earliest);
private volatile Optional<ReplicatedSubscriptionsController> replicatedSubscriptionsController = Optional.empty();
private static final FastThreadLocal<TopicStatsHelper> threadLocalTopicStats = new FastThreadLocal<TopicStatsHelper>() {
@Override
protected TopicStatsHelper initialValue() {
return new TopicStatsHelper();
}
};
private final AtomicLong pendingWriteOps = new AtomicLong(0);
private volatile double lastUpdatedAvgPublishRateInMsg = 0;
private volatile double lastUpdatedAvgPublishRateInByte = 0;
private volatile int maxUnackedMessagesOnSubscription = -1;
private volatile boolean isClosingOrDeleting = false;
private ScheduledFuture<?> fencedTopicMonitoringTask = null;
private static class TopicStatsHelper {
public double averageMsgSize;
public double aggMsgRateIn;
public double aggMsgThroughputIn;
public double aggMsgThrottlingFailure;
public double aggMsgRateOut;
public double aggMsgThroughputOut;
public final ObjectObjectHashMap<String, PublisherStats> remotePublishersStats;
public TopicStatsHelper() {
remotePublishersStats = new ObjectObjectHashMap<>();
reset();
}
public void reset() {
averageMsgSize = 0;
aggMsgRateIn = 0;
aggMsgThroughputIn = 0;
aggMsgRateOut = 0;
aggMsgThrottlingFailure = 0;
aggMsgThroughputOut = 0;
remotePublishersStats.clear();
}
}
public PersistentTopic(String topic, ManagedLedger ledger, BrokerService brokerService) throws NamingException {
super(topic, brokerService);
this.ledger = ledger;
this.subscriptions = new ConcurrentOpenHashMap<>(16, 1);
this.replicators = new ConcurrentOpenHashMap<>(16, 1);
USAGE_COUNT_UPDATER.set(this, 0);
this.delayedDeliveryEnabled = brokerService.pulsar().getConfiguration().isDelayedDeliveryEnabled();
this.delayedDeliveryTickTimeMillis = brokerService.pulsar().getConfiguration().getDelayedDeliveryTickTimeMillis();
this.backloggedCursorThresholdEntries = brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
initializeDispatchRateLimiterIfNeeded(Optional.empty());
registerTopicPolicyListener();
this.compactedTopic = new CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());
for (ManagedCursor cursor : ledger.getCursors()) {
if (cursor.getName().startsWith(replicatorPrefix)) {
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
String remoteCluster = PersistentReplicator.getRemoteCluster(cursor.getName());
boolean isReplicatorStarted = addReplicationCluster(remoteCluster, this, cursor.getName(), localCluster);
if (!isReplicatorStarted) {
throw new NamingException(
PersistentTopic.this.getName() + " Failed to start replicator " + remoteCluster);
}
} else if (cursor.getName().equals(DEDUPLICATION_CURSOR_NAME)) {
// This is not a regular subscription, we are going to ignore it for now and let the message dedup logic
// to take care of it
} else {
final String subscriptionName = Codec.decode(cursor.getName());
subscriptions.put(subscriptionName, createPersistentSubscription(subscriptionName, cursor,
PersistentSubscription.isCursorFromReplicatedSubscription(cursor)));
// subscription-cursor gets activated by default: deactivate as there is no active subscription right
// now
subscriptions.get(subscriptionName).deactivateCursor();
}
}
this.messageDeduplication = new MessageDeduplication(brokerService.pulsar(), this, ledger);
try {
Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()))
.orElseThrow(() -> new KeeperException.NoNodeException());
this.isEncryptionRequired = policies.encryption_required;
setSchemaCompatibilityStrategy(policies);
isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema;
schemaValidationEnforced = policies.schema_validation_enforced;
if (policies.inactive_topic_policies != null) {
inactiveTopicPolicies = policies.inactive_topic_policies;
}
maxUnackedMessagesOnConsumer = unackedMessagesExceededOnConsumer(policies);
maxUnackedMessagesOnSubscription = unackedMessagesExceededOnSubscription(policies);
} catch (Exception e) {
log.warn("[{}] Error getting policies {} and isEncryptionRequired will be set to false", topic, e.getMessage());
isEncryptionRequired = false;
}
checkReplicatedSubscriptionControllerState();
}
// for testing purposes
@VisibleForTesting
PersistentTopic(String topic, BrokerService brokerService, ManagedLedger ledger, MessageDeduplication messageDeduplication) {
super(topic, brokerService);
this.ledger = ledger;
this.messageDeduplication = messageDeduplication;
this.subscriptions = new ConcurrentOpenHashMap<>(16, 1);
this.replicators = new ConcurrentOpenHashMap<>(16, 1);
this.compactedTopic = new CompactedTopicImpl(brokerService.pulsar().getBookKeeperClient());
this.backloggedCursorThresholdEntries = brokerService.pulsar().getConfiguration().getManagedLedgerCursorBackloggedThreshold();
}
private void initializeDispatchRateLimiterIfNeeded(Optional<Policies> policies) {
synchronized (dispatchRateLimiter) {
// dispatch rate limiter for topic
if (!dispatchRateLimiter.isPresent() && DispatchRateLimiter
.isDispatchRateNeeded(brokerService, policies, topic, Type.TOPIC)) {
this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(this, Type.TOPIC));
}
if (!subscribeRateLimiter.isPresent() && SubscribeRateLimiter
.isDispatchRateNeeded(brokerService, policies, topic)) {
this.subscribeRateLimiter = Optional.of(new SubscribeRateLimiter(this));
}
// dispatch rate limiter for each subscription
subscriptions.forEach((name, subscription) -> {
Dispatcher dispatcher = subscription.getDispatcher();
if (dispatcher != null) {
dispatcher.initializeDispatchRateLimiterIfNeeded(policies);
}
});
// dispatch rate limiter for each replicator
replicators.forEach((name, replicator) ->
replicator.initializeDispatchRateLimiterIfNeeded(policies));
}
}
private PersistentSubscription createPersistentSubscription(String subscriptionName, ManagedCursor cursor,
boolean replicated) {
checkNotNull(compactedTopic);
if (subscriptionName.equals(Compactor.COMPACTION_SUBSCRIPTION)) {
return new CompactorSubscription(this, compactedTopic, subscriptionName, cursor);
} else {
return new PersistentSubscription(this, subscriptionName, cursor, replicated);
}
}
@Override
public void publishMessage(ByteBuf headersAndPayload, PublishContext publishContext) {
pendingWriteOps.incrementAndGet();
if (isFenced) {
publishContext.completed(new TopicFencedException("fenced"), -1, -1);
decrementPendingWriteOpsAndCheck();
return;
}
if (isExceedMaximumMessageSize(headersAndPayload.readableBytes())) {
publishContext.completed(new NotAllowedException("Exceed maximum message size")
, -1, -1);
decrementPendingWriteOpsAndCheck();
return;
}
MessageDeduplication.MessageDupStatus status = messageDeduplication.isDuplicate(publishContext, headersAndPayload);
switch (status) {
case NotDup:
ledger.asyncAddEntry(headersAndPayload, this, publishContext);
break;
case Dup:
// Immediately acknowledge duplicated message
publishContext.completed(null, -1, -1);
decrementPendingWriteOpsAndCheck();
break;
default:
publishContext.completed(new MessageDeduplication.MessageDupUnknownException(), -1, -1);
decrementPendingWriteOpsAndCheck();
}
}
public void asyncReadEntry(PositionImpl position, AsyncCallbacks.ReadEntryCallback callback, Object ctx) {
if (ledger instanceof ManagedLedgerImpl) {
((ManagedLedgerImpl)ledger).asyncReadEntry(position, callback, ctx);
} else {
callback.readEntryFailed(new ManagedLedgerException("Unexpected managedledger implementation, doesn't support " +
"direct read entry operation.") ,ctx);
}
}
public PositionImpl getPositionAfterN(PositionImpl startPosition, long n) throws ManagedLedgerException {
if (ledger instanceof ManagedLedgerImpl) {
return ((ManagedLedgerImpl)ledger).getPositionAfterN(startPosition, n, ManagedLedgerImpl.PositionBound.startExcluded);
} else {
throw new ManagedLedgerException("Unexpected managedledger implementation, doesn't support " +
"getPositionAfterN operation.");
}
}
public PositionImpl getFirstPosition() throws ManagedLedgerException {
if (ledger instanceof ManagedLedgerImpl) {
return ((ManagedLedgerImpl)ledger).getFirstPosition();
} else {
throw new ManagedLedgerException("Unexpected managedledger implementation, doesn't support " +
"getFirstPosition operation.");
}
}
public long getNumberOfEntries() {
return ledger.getNumberOfEntries();
}
private void decrementPendingWriteOpsAndCheck() {
long pending = pendingWriteOps.decrementAndGet();
if (pending == 0 && isFenced && !isClosingOrDeleting) {
synchronized (this) {
if (isFenced && !isClosingOrDeleting) {
messageDeduplication.resetHighestSequenceIdPushed();
log.info("[{}] Un-fencing topic...", topic);
// signal to managed ledger that we are ready to resume by creating a new ledger
ledger.readyToCreateNewLedger();
unfence();
}
}
}
}
@Override
public void addComplete(Position pos, Object ctx) {
PublishContext publishContext = (PublishContext) ctx;
PositionImpl position = (PositionImpl) pos;
// Message has been successfully persisted
messageDeduplication.recordMessagePersisted(publishContext, position);
publishContext.completed(null, position.getLedgerId(), position.getEntryId());
decrementPendingWriteOpsAndCheck();
}
@Override
public synchronized void addFailed(ManagedLedgerException exception, Object ctx) {
if (exception instanceof ManagedLedgerFencedException) {
// If the managed ledger has been fenced, we cannot continue using it. We need to close and reopen
close();
} else {
// fence topic when failed to write a message to BK
fence();
// close all producers
List<CompletableFuture<Void>> futures = Lists.newArrayList();
producers.values().forEach(producer -> futures.add(producer.disconnect()));
FutureUtil.waitForAll(futures).handle((BiFunction<Void, Throwable, Void>) (aVoid, throwable) -> {
decrementPendingWriteOpsAndCheck();
return null;
});
PublishContext callback = (PublishContext) ctx;
if (exception instanceof ManagedLedgerAlreadyClosedException) {
if (log.isDebugEnabled()) {
log.debug("[{}] Failed to persist msg in store: {}", topic, exception.getMessage());
}
callback.completed(new TopicClosedException(exception), -1, -1);
return;
} else {
log.warn("[{}] Failed to persist msg in store: {}", topic, exception.getMessage());
}
if (exception instanceof ManagedLedgerTerminatedException) {
// Signal the producer that this topic is no longer available
callback.completed(new TopicTerminatedException(exception), -1, -1);
} else {
// Use generic persistence exception
callback.completed(new PersistenceException(exception), -1, -1);
}
}
}
@Override
public void addProducer(Producer producer) throws BrokerServiceException {
checkArgument(producer.getTopic() == this);
lock.readLock().lock();
try {
brokerService.checkTopicNsOwnership(getName());
checkTopicFenced();
if (ledger.isTerminated()) {
log.warn("[{}] Attempting to add producer to a terminated topic", topic);
throw new TopicTerminatedException("Topic was already terminated");
}
internalAddProducer(producer);
USAGE_COUNT_UPDATER.incrementAndGet(this);
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Added producer -- count: {}", topic, producer.getProducerName(), USAGE_COUNT_UPDATER.get(this));
}
messageDeduplication.producerAdded(producer.getProducerName());
// Start replication producers if not already
startReplProducers();
} finally {
lock.readLock().unlock();
}
}
private boolean hasRemoteProducers() {
AtomicBoolean foundRemote = new AtomicBoolean(false);
producers.values().forEach(producer -> {
if (producer.isRemote()) {
foundRemote.set(true);
}
});
return foundRemote.get();
}
public void startReplProducers() {
// read repl-cluster from policies to avoid restart of replicator which are in process of disconnect and close
try {
Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()))
.orElseThrow(() -> new KeeperException.NoNodeException());
if (policies.replication_clusters != null) {
Set<String> configuredClusters = Sets.newTreeSet(policies.replication_clusters);
replicators.forEach((region, replicator) -> {
if (configuredClusters.contains(region)) {
replicator.startProducer();
}
});
}
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug("[{}] Error getting policies while starting repl-producers {}", topic, e.getMessage());
}
replicators.forEach((region, replicator) -> replicator.startProducer());
}
}
public CompletableFuture<Void> stopReplProducers() {
List<CompletableFuture<Void>> closeFutures = Lists.newArrayList();
replicators.forEach((region, replicator) -> closeFutures.add(replicator.disconnect()));
return FutureUtil.waitForAll(closeFutures);
}
private synchronized CompletableFuture<Void> closeReplProducersIfNoBacklog() {
List<CompletableFuture<Void>> closeFutures = Lists.newArrayList();
replicators.forEach((region, replicator) -> closeFutures.add(replicator.disconnect(true)));
return FutureUtil.waitForAll(closeFutures);
}
@Override
public void removeProducer(Producer producer) {
checkArgument(producer.getTopic() == this);
if (producers.remove(producer.getProducerName(), producer)) {
handleProducerRemoved(producer);
}
}
@Override
protected void handleProducerRemoved(Producer producer) {
// decrement usage only if this was a valid producer close
USAGE_COUNT_UPDATER.decrementAndGet(this);
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Removed producer -- count: {}", topic, producer.getProducerName(),
USAGE_COUNT_UPDATER.get(this));
}
lastActive = System.nanoTime();
messageDeduplication.producerRemoved(producer.getProducerName());
}
@Override
public CompletableFuture<Consumer> subscribe(final TransportCnx cnx, String subscriptionName, long consumerId,
SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition,
long startMessageRollbackDurationSec, boolean replicatedSubscriptionState, PulsarApi.KeySharedMeta keySharedMeta) {
final CompletableFuture<Consumer> future = new CompletableFuture<>();
try {
brokerService.checkTopicNsOwnership(getName());
} catch (Exception e) {
future.completeExceptionally(e);
return future;
}
if (readCompacted && !(subType == SubType.Failover || subType == SubType.Exclusive)) {
future.completeExceptionally(
new NotAllowedException("readCompacted only allowed on failover or exclusive subscriptions"));
return future;
}
if (replicatedSubscriptionState
&& !brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions()) {
future.completeExceptionally(
new NotAllowedException("Replicated Subscriptions is disabled by broker.")
);
return future;
}
if (subType == SubType.Key_Shared
&& !brokerService.pulsar().getConfiguration().isSubscriptionKeySharedEnable()) {
future.completeExceptionally(
new NotAllowedException("Key_Shared subscription is disabled by broker.")
);
return future;
}
if (isBlank(subscriptionName)) {
if (log.isDebugEnabled()) {
log.debug("[{}] Empty subscription name", topic);
}
future.completeExceptionally(new NamingException("Empty subscription name"));
return future;
}
if (hasBatchMessagePublished && !cnx.isBatchMessageCompatibleVersion()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Consumer doesn't support batch-message {}", topic, subscriptionName);
}
future.completeExceptionally(new UnsupportedVersionException("Consumer doesn't support batch-message"));
return future;
}
if (subscriptionName.startsWith(replicatorPrefix) || subscriptionName.equals(DEDUPLICATION_CURSOR_NAME)) {
log.warn("[{}] Failed to create subscription for {}", topic, subscriptionName);
future.completeExceptionally(new NamingException("Subscription with reserved subscription name attempted"));
return future;
}
if (cnx.clientAddress() != null && cnx.clientAddress().toString().contains(":")) {
SubscribeRateLimiter.ConsumerIdentifier consumer = new SubscribeRateLimiter.ConsumerIdentifier(
cnx.clientAddress().toString().split(":")[0], consumerName, consumerId);
if (subscribeRateLimiter.isPresent() && !subscribeRateLimiter.get().subscribeAvailable(consumer) || !subscribeRateLimiter.get().tryAcquire(consumer)) {
log.warn("[{}] Failed to create subscription for {} {} limited by {}, available {}",
topic, subscriptionName, consumer, subscribeRateLimiter.get().getSubscribeRate(),
subscribeRateLimiter.get().getAvailableSubscribeRateLimit(consumer));
future.completeExceptionally(new NotAllowedException("Subscribe limited by subscribe rate limit per consumer."));
return future;
}
}
lock.readLock().lock();
try {
if (isFenced) {
log.warn("[{}] Attempting to subscribe to a fenced topic", topic);
future.completeExceptionally(new TopicFencedException("Topic is temporarily unavailable"));
return future;
}
USAGE_COUNT_UPDATER.incrementAndGet(this);
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] [{}] Added consumer -- count: {}", topic, subscriptionName, consumerName,
USAGE_COUNT_UPDATER.get(this));
}
} finally {
lock.readLock().unlock();
}
CompletableFuture<? extends Subscription> subscriptionFuture = isDurable ? //
getDurableSubscription(subscriptionName, initialPosition, startMessageRollbackDurationSec, replicatedSubscriptionState) //
: getNonDurableSubscription(subscriptionName, startMessageId, initialPosition, startMessageRollbackDurationSec);
int maxUnackedMessages = isDurable
? getMaxUnackedMessagesOnConsumer()
: 0;
subscriptionFuture.thenAccept(subscription -> {
try {
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName,
maxUnackedMessages, cnx, cnx.getAuthRole(), metadata, readCompacted, initialPosition, keySharedMeta);
addConsumerToSubscription(subscription, consumer);
checkBackloggedCursors();
if (!cnx.isActive()) {
consumer.close();
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] [{}] Subscribe failed -- count: {}", topic, subscriptionName,
consumer.consumerName(), USAGE_COUNT_UPDATER.get(PersistentTopic.this));
}
USAGE_COUNT_UPDATER.decrementAndGet(PersistentTopic.this);
future.completeExceptionally(
new BrokerServiceException("Connection was closed while the opening the cursor "));
} else {
checkReplicatedSubscriptionControllerState();
log.info("[{}][{}] Created new subscription for {}", topic, subscriptionName, consumerId);
future.complete(consumer);
}
} catch (BrokerServiceException e) {
if (e instanceof ConsumerBusyException) {
log.warn("[{}][{}] Consumer {} {} already connected", topic, subscriptionName, consumerId,
consumerName);
} else if (e instanceof SubscriptionBusyException) {
log.warn("[{}][{}] {}", topic, subscriptionName, e.getMessage());
}
USAGE_COUNT_UPDATER.decrementAndGet(PersistentTopic.this);
future.completeExceptionally(e);
}
}).exceptionally(ex -> {
log.error("[{}] Failed to create subscription: {} error: {}", topic, subscriptionName, ex);
USAGE_COUNT_UPDATER.decrementAndGet(PersistentTopic.this);
future.completeExceptionally(new PersistenceException(ex));
if (ex.getCause() instanceof NotAllowedException) {
future.completeExceptionally(ex.getCause());
} else {
future.completeExceptionally(new PersistenceException(ex));
}
return null;
});
return future;
}
private int unackedMessagesExceededOnSubscription(Policies data) {
final int maxUnackedMessages = data.max_unacked_messages_per_subscription > -1 ?
data.max_unacked_messages_per_subscription :
brokerService.pulsar().getConfiguration().getMaxUnackedMessagesPerSubscription();
return maxUnackedMessages;
}
private int unackedMessagesExceededOnConsumer(Policies data) {
final int maxUnackedMessages = data.max_unacked_messages_per_consumer > -1 ?
data.max_unacked_messages_per_consumer :
brokerService.pulsar().getConfiguration().getMaxUnackedMessagesPerConsumer();
return maxUnackedMessages;
}
private CompletableFuture<Subscription> getDurableSubscription(String subscriptionName,
InitialPosition initialPosition, long startMessageRollbackDurationSec, boolean replicated) {
CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();
if (checkMaxSubscriptionsPerTopicExceed(subscriptionName)) {
subscriptionFuture.completeExceptionally(new NotAllowedException(
"Exceed the maximum number of subscriptions of the topic: " + topic));
return subscriptionFuture;
}
Map<String, Long> properties = PersistentSubscription.getBaseCursorProperties(replicated);
ledger.asyncOpenCursor(Codec.encode(subscriptionName), initialPosition, properties, new OpenCursorCallback() {
@Override
public void openCursorComplete(ManagedCursor cursor, Object ctx) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Opened cursor", topic, subscriptionName);
}
PersistentSubscription subscription = subscriptions.computeIfAbsent(subscriptionName,
name -> createPersistentSubscription(subscriptionName, cursor, replicated));
if (replicated && !subscription.isReplicated()) {
// Flip the subscription state
subscription.setReplicated(replicated);
}
subscriptionFuture.complete(subscription);
}
@Override
public void openCursorFailed(ManagedLedgerException exception, Object ctx) {
log.warn("[{}] Failed to create subscription for {}: {}", topic, subscriptionName, exception.getMessage());
USAGE_COUNT_UPDATER.decrementAndGet(PersistentTopic.this);
subscriptionFuture.completeExceptionally(new PersistenceException(exception));
if (exception instanceof ManagedLedgerFencedException) {
// If the managed ledger has been fenced, we cannot continue using it. We need to close and reopen
close();
}
}
}, null);
return subscriptionFuture;
}
private CompletableFuture<? extends Subscription> getNonDurableSubscription(String subscriptionName,
MessageId startMessageId, InitialPosition initialPosition, long startMessageRollbackDurationSec) {
log.info("[{}][{}] Creating non-durable subscription at msg id {}", topic, subscriptionName, startMessageId);
CompletableFuture<Subscription> subscriptionFuture = new CompletableFuture<>();
if (checkMaxSubscriptionsPerTopicExceed(subscriptionName)) {
subscriptionFuture.completeExceptionally(new NotAllowedException(
"Exceed the maximum number of subscriptions of the topic: " + topic));
return subscriptionFuture;
}
synchronized (ledger) {
// Create a new non-durable cursor only for the first consumer that connects
PersistentSubscription subscription = subscriptions.get(subscriptionName);
if (subscription == null) {
MessageIdImpl msgId = startMessageId != null ? (MessageIdImpl) startMessageId
: (MessageIdImpl) MessageId.latest;
long ledgerId = msgId.getLedgerId();
long entryId = msgId.getEntryId();
// Ensure that the start message id starts from a valid entry.
if (ledgerId >= 0 && entryId >= 0
&& msgId instanceof BatchMessageIdImpl) {
// When the start message is relative to a batch, we need to take one step back on the previous
// message,
// because the "batch" might not have been consumed in its entirety.
// The client will then be able to discard the first messages if needed.
entryId = msgId.getEntryId() - 1;
}
Position startPosition = new PositionImpl(ledgerId, entryId);
ManagedCursor cursor = null;
try {
cursor = ledger.newNonDurableCursor(startPosition, subscriptionName, initialPosition);
} catch (ManagedLedgerException e) {
return FutureUtil.failedFuture(e);
}
subscription = new PersistentSubscription(this, subscriptionName, cursor, false);
subscriptions.put(subscriptionName, subscription);
}
if (startMessageRollbackDurationSec > 0) {
long timestamp = System.currentTimeMillis()
- TimeUnit.SECONDS.toMillis(startMessageRollbackDurationSec);
final Subscription finalSubscription = subscription;
subscription.resetCursor(timestamp).handle((s, ex) -> {
if (ex != null) {
log.warn("[{}] Failed to reset cursor {} position at timestamp {}", topic, subscriptionName,
startMessageRollbackDurationSec);
}
subscriptionFuture.complete(finalSubscription);
return null;
});
return subscriptionFuture;
} else {
return CompletableFuture.completedFuture(subscription);
}
}
}
@Override
public CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition, boolean replicateSubscriptionState) {
return getDurableSubscription(subscriptionName, initialPosition, 0 /*avoid reseting cursor*/, replicateSubscriptionState);
}
/**
* Delete the cursor ledger for a given subscription
*
* @param subscriptionName
* Subscription for which the cursor ledger is to be deleted
* @return Completable future indicating completion of unsubscribe operation Completed exceptionally with:
* ManagedLedgerException if cursor ledger delete fails
*/
@Override
public CompletableFuture<Void> unsubscribe(String subscriptionName) {
CompletableFuture<Void> unsubscribeFuture = new CompletableFuture<>();
ledger.asyncDeleteCursor(Codec.encode(subscriptionName), new DeleteCursorCallback() {
@Override
public void deleteCursorComplete(Object ctx) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Cursor deleted successfully", topic, subscriptionName);
}
subscriptions.remove(subscriptionName);
unsubscribeFuture.complete(null);
lastActive = System.nanoTime();
}
@Override
public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
if (log.isDebugEnabled()) {
log.debug("[{}][{}] Error deleting cursor for subscription", topic, subscriptionName, exception);
}
unsubscribeFuture.completeExceptionally(new PersistenceException(exception));
}
}, null);
return unsubscribeFuture;
}
void removeSubscription(String subscriptionName) {
subscriptions.remove(subscriptionName);
}
/**
* Delete the managed ledger associated with this topic
*
* @return Completable future indicating completion of delete operation Completed exceptionally with:
* IllegalStateException if topic is still active ManagedLedgerException if ledger delete operation fails
*/
@Override
public CompletableFuture<Void> delete() {
return delete(false, false, false);
}
private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean failIfHasBacklogs, boolean deleteSchema) {
return delete(failIfHasSubscriptions, failIfHasBacklogs, false, deleteSchema);
}
/**
* Forcefully close all producers/consumers/replicators and deletes the topic. this function is used when local
* cluster is removed from global-namespace replication list. Because broker doesn't allow lookup if local cluster
* is not part of replication cluster list.
*
* @return
*/
@Override
public CompletableFuture<Void> deleteForcefully() {
return delete(false, false, true, false);
}
/**
* Delete the managed ledger associated with this topic
*
* @param failIfHasSubscriptions
* Flag indicating whether delete should succeed if topic still has unconnected subscriptions. Set to
* false when called from admin API (it will delete the subs too), and set to true when called from GC
* thread
* @param closeIfClientsConnected
* Flag indicate whether explicitly close connected producers/consumers/replicators before trying to delete topic. If
* any client is connected to a topic and if this flag is disable then this operation fails.
* @param deleteSchema
* Flag indicating whether delete the schema defined for topic if exist.
*
* @return Completable future indicating completion of delete operation Completed exceptionally with:
* IllegalStateException if topic is still active ManagedLedgerException if ledger delete operation fails
*/
private CompletableFuture<Void> delete(boolean failIfHasSubscriptions,
boolean failIfHasBacklogs,
boolean closeIfClientsConnected,
boolean deleteSchema) {
CompletableFuture<Void> deleteFuture = new CompletableFuture<>();
lock.writeLock().lock();
try {
if (isClosingOrDeleting) {
log.warn("[{}] Topic is already being closed or deleted", topic);
return FutureUtil.failedFuture(new TopicFencedException("Topic is already fenced"));
} else if (failIfHasSubscriptions && !subscriptions.isEmpty()) {
return FutureUtil.failedFuture(new TopicBusyException("Topic has subscriptions"));
} else if (failIfHasBacklogs && hasBacklogs()) {
return FutureUtil.failedFuture(new TopicBusyException("Topic has subscriptions did not catch up"));
}
fenceTopicToCloseOrDelete(); // Avoid clients reconnections while deleting
CompletableFuture<Void> closeClientFuture = new CompletableFuture<>();
if (closeIfClientsConnected) {
List<CompletableFuture<Void>> futures = Lists.newArrayList();
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
producers.values().forEach(producer -> futures.add(producer.disconnect()));
subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
FutureUtil.waitForAll(futures).thenRun(() -> {
closeClientFuture.complete(null);
}).exceptionally(ex -> {
log.error("[{}] Error closing clients", topic, ex);
unfenceTopicToResume();
closeClientFuture.completeExceptionally(ex);
return null;
});
} else {
closeClientFuture.complete(null);
}
closeClientFuture.thenAccept(delete -> {
// We can proceed with the deletion if either:
// 1. No one is connected
// 2. We want to kick out everyone and forcefully delete the topic.
// In this case, we shouldn't care if the usageCount is 0 or not, just proceed
if (USAGE_COUNT_UPDATER.get(this) == 0 || (closeIfClientsConnected && !failIfHasSubscriptions)) {
CompletableFuture<SchemaVersion> deleteSchemaFuture = deleteSchema ?
deleteSchema()
: CompletableFuture.completedFuture(null);
deleteSchemaFuture.whenComplete((v, ex) -> {
if (ex != null) {
log.error("[{}] Error deleting topic", topic, ex);
unfenceTopicToResume();
deleteFuture.completeExceptionally(ex);
} else {
ledger.asyncDelete(new AsyncCallbacks.DeleteLedgerCallback() {
@Override
public void deleteLedgerComplete(Object ctx) {
brokerService.removeTopicFromCache(topic);
dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
brokerService.pulsar().getTopicPoliciesService().unregisterListener(TopicName.get(topic), getPersistentTopic());
log.info("[{}] Topic deleted", topic);
deleteFuture.complete(null);
}
@Override
public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) {
if (exception.getCause() instanceof KeeperException.NoNodeException) {
log.info("[{}] Topic is already deleted {}", topic, exception.getMessage());
deleteLedgerComplete(ctx);
} else {
unfenceTopicToResume();
log.error("[{}] Error deleting topic", topic, exception);
deleteFuture.completeExceptionally(new PersistenceException(exception));
}
}
}, null);
}
});
} else {
unfenceTopicToResume();
deleteFuture.completeExceptionally(new TopicBusyException(
"Topic has " + USAGE_COUNT_UPDATER.get(this) + " connected producers/consumers"));
}
}).exceptionally(ex->{
unfenceTopicToResume();
deleteFuture.completeExceptionally(
new TopicBusyException("Failed to close clients before deleting topic."));
return null;
});
} finally {
lock.writeLock().unlock();
}
return deleteFuture;
}
public CompletableFuture<Void> close() {
return close(false);
}
/**
* Close this topic - close all producers and subscriptions associated with this topic
*
* @param closeWithoutWaitingClientDisconnect don't wait for client disconnect and forcefully close managed-ledger
* @return Completable future indicating completion of close operation
*/
@Override
public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect) {
CompletableFuture<Void> closeFuture = new CompletableFuture<>();
lock.writeLock().lock();
try {
// closing managed-ledger waits until all producers/consumers/replicators get closed. Sometimes, broker
// forcefully wants to close managed-ledger without waiting all resources to be closed.
if (!isClosingOrDeleting || closeWithoutWaitingClientDisconnect) {
fenceTopicToCloseOrDelete();
} else {
log.warn("[{}] Topic is already being closed or deleted", topic);
closeFuture.completeExceptionally(new TopicFencedException("Topic is already fenced"));
return closeFuture;
}
} finally {
lock.writeLock().unlock();
}
List<CompletableFuture<Void>> futures = Lists.newArrayList();
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
producers.values().forEach(producer -> futures.add(producer.disconnect()));
subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
CompletableFuture<Void> clientCloseFuture = closeWithoutWaitingClientDisconnect ? CompletableFuture.completedFuture(null)
: FutureUtil.waitForAll(futures);
clientCloseFuture.thenRun(() -> {
// After having disconnected all producers/consumers, close the managed ledger
ledger.asyncClose(new CloseCallback() {
@Override
public void closeComplete(Object ctx) {
// Everything is now closed, remove the topic from map
brokerService.removeTopicFromCache(topic);
replicatedSubscriptionsController.ifPresent(ReplicatedSubscriptionsController::close);
dispatchRateLimiter.ifPresent(DispatchRateLimiter::close);
subscribeRateLimiter.ifPresent(SubscribeRateLimiter::close);
brokerService.pulsar().getTopicPoliciesService().unregisterListener(TopicName.get(topic), getPersistentTopic());
log.info("[{}] Topic closed", topic);
closeFuture.complete(null);
}
@Override
public void closeFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}] Failed to close managed ledger, proceeding anyway.", topic, exception);
brokerService.removeTopicFromCache(topic);
closeFuture.complete(null);
}
}, null);
}).exceptionally(exception -> {
log.error("[{}] Error closing topic", topic, exception);
unfenceTopicToResume();
closeFuture.completeExceptionally(exception);
return null;
});
return closeFuture;
}
@VisibleForTesting
CompletableFuture<Void> checkReplicationAndRetryOnFailure() {
CompletableFuture<Void> result = new CompletableFuture<Void>();
checkReplication().thenAccept(res -> {
log.info("[{}] Policies updated successfully", topic);
result.complete(null);
}).exceptionally(th -> {
log.error("[{}] Policies update failed {}, scheduled retry in {} seconds", topic, th.getMessage(),
POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS, th);
if (!(th.getCause() instanceof TopicFencedException)) {
// retriable exception
brokerService.executor().schedule(this::checkReplicationAndRetryOnFailure,
POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS, TimeUnit.SECONDS);
}
result.completeExceptionally(th);
return null;
});
return result;
}
public CompletableFuture<Void> checkDeduplicationStatus() {
return messageDeduplication.checkStatus();
}
private CompletableFuture<Void> checkPersistencePolicies() {
TopicName topicName = TopicName.get(topic);
CompletableFuture<Void> future = new CompletableFuture<>();
brokerService.getManagedLedgerConfig(topicName).thenAccept(config -> {
// update managed-ledger config and managed-cursor.markDeleteRate
this.ledger.setConfig(config);
future.complete(null);
}).exceptionally(ex -> {
log.warn("[{}] Failed to update persistence-policies {}", topic, ex.getMessage());
future.completeExceptionally(ex);
return null;
});
return future;
}
@Override
public CompletableFuture<Void> checkReplication() {
TopicName name = TopicName.get(topic);
if (!name.isGlobal()) {
return CompletableFuture.completedFuture(null);
}
if (log.isDebugEnabled()) {
log.debug("[{}] Checking replication status", name);
}
Policies policies = null;
try {
policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, name.getNamespace()))
.orElseThrow(() -> new KeeperException.NoNodeException());
} catch (Exception e) {
CompletableFuture<Void> future = new CompletableFuture<>();
future.completeExceptionally(new ServerMetadataException(e));
return future;
}
//Ignore current broker's config for messageTTL for replication.
final int newMessageTTLinSeconds;
try {
newMessageTTLinSeconds = getMessageTTL();
} catch (Exception e) {
return FutureUtil.failedFuture(new ServerMetadataException(e));
}
Set<String> configuredClusters;
if (policies.replication_clusters != null) {
configuredClusters = Sets.newTreeSet(policies.replication_clusters);
} else {
configuredClusters = Collections.emptySet();
}
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
// if local cluster is removed from global namespace cluster-list : then delete topic forcefully because pulsar
// doesn't serve global topic without local repl-cluster configured.
if (TopicName.get(topic).isGlobal() && !configuredClusters.contains(localCluster)) {
log.info("Deleting topic [{}] because local cluster is not part of global namespace repl list {}",
topic, configuredClusters);
return deleteForcefully();
}
List<CompletableFuture<Void>> futures = Lists.newArrayList();
// Check for missing replicators
for (String cluster : configuredClusters) {
if (cluster.equals(localCluster)) {
continue;
}
if (!replicators.containsKey(cluster)) {
futures.add(startReplicator(cluster));
}
}
// Check for replicators to be stopped
replicators.forEach((cluster, replicator) -> {
// Update message TTL
((PersistentReplicator) replicator).updateMessageTTL(newMessageTTLinSeconds);
if (!cluster.equals(localCluster)) {
if (!configuredClusters.contains(cluster)) {
futures.add(removeReplicator(cluster));
}
}
});
return FutureUtil.waitForAll(futures);
}
@Override
public void checkMessageExpiry() {
try {
//If topic level policy or message ttl is not set, fall back to namespace level config.
int message_ttl_in_seconds = getMessageTTL();
if (message_ttl_in_seconds != 0) {
subscriptions.forEach((subName, sub) -> sub.expireMessages(message_ttl_in_seconds));
replicators.forEach((region, replicator) -> ((PersistentReplicator)replicator).expireMessages(message_ttl_in_seconds));
}
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug("[{}] Error getting policies", topic);
}
}
}
@Override
public void checkMessageDeduplicationInfo() {
messageDeduplication.purgeInactiveProducers();
}
public void checkCompaction() {
TopicName name = TopicName.get(topic);
try {
Long compactionThreshold = Optional.ofNullable(getTopicPolicies(name))
.map(TopicPolicies::getCompactionThreshold)
.orElse(null);
if (compactionThreshold == null) {
Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, name.getNamespace()))
.orElseThrow(() -> new KeeperException.NoNodeException());
compactionThreshold = policies.compaction_threshold;
}
if (isSystemTopic() || compactionThreshold != 0
&& currentCompaction.isDone()) {
long backlogEstimate = 0;
PersistentSubscription compactionSub = subscriptions.get(Compactor.COMPACTION_SUBSCRIPTION);
if (compactionSub != null) {
backlogEstimate = compactionSub.estimateBacklogSize();
} else {
// compaction has never run, so take full backlog size,
// or total size if we have no durable subs yet.
backlogEstimate = subscriptions.isEmpty()
? ledger.getTotalSize()
: ledger.getEstimatedBacklogSize();
}
if (backlogEstimate > compactionThreshold) {
try {
triggerCompaction();
} catch (AlreadyRunningException are) {
log.debug("[{}] Compaction already running, so don't trigger again, "
+ "even though backlog({}) is over threshold({})",
name, backlogEstimate, compactionThreshold);
}
}
}
} catch (Exception e) {
log.debug("[{}] Error getting policies", topic);
}
}
CompletableFuture<Void> startReplicator(String remoteCluster) {
log.info("[{}] Starting replicator to remote: {}", topic, remoteCluster);
final CompletableFuture<Void> future = new CompletableFuture<>();
String replicatorName = PersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster);
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
boolean isReplicatorStarted = addReplicationCluster(remoteCluster, PersistentTopic.this, replicatorName, localCluster);
if (isReplicatorStarted) {
future.complete(null);
} else {
future.completeExceptionally(new NamingException(
PersistentTopic.this.getName() + " Failed to start replicator " + remoteCluster));
}
return future;
}
protected boolean addReplicationCluster(String remoteCluster, PersistentTopic persistentTopic, String replicatorName,
String localCluster) {
AtomicBoolean isReplicatorStarted = new AtomicBoolean(true);
replicators.computeIfAbsent(remoteCluster, r -> {
try {
return new PersistentReplicator(PersistentTopic.this, replicatorName, localCluster, remoteCluster,
brokerService, ledger);
} catch (NamingException e) {
isReplicatorStarted.set(false);
log.error("[{}] Replicator startup failed due to partitioned-topic {}", topic, remoteCluster);
}
return null;
});
// clean up replicator if startup is failed
if (!isReplicatorStarted.get()) {
replicators.remove(remoteCluster);
}
return isReplicatorStarted.get();
}
CompletableFuture<Void> removeReplicator(String remoteCluster) {
log.info("[{}] Removing replicator to {}", topic, remoteCluster);
final CompletableFuture<Void> future = new CompletableFuture<>();
String name = PersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster);
replicators.get(remoteCluster).disconnect().thenRun(() -> {
ledger.asyncDeleteCursor(name, new DeleteCursorCallback() {
@Override
public void deleteCursorComplete(Object ctx) {
replicators.remove(remoteCluster);
future.complete(null);
}
@Override
public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}] Failed to delete cursor {} {}", topic, name, exception.getMessage(), exception);
future.completeExceptionally(new PersistenceException(exception));
}
}, null);
}).exceptionally(e -> {
log.error("[{}] Failed to close replication producer {} {}", topic, name, e.getMessage(), e);
future.completeExceptionally(e);
return null;
});
return future;
}
public boolean isDeduplicationEnabled() {
return messageDeduplication.isEnabled();
}
@Override
public int getNumberOfConsumers() {
int count = 0;
for (PersistentSubscription subscription : subscriptions.values()) {
count += subscription.getConsumers().size();
}
return count;
}
@Override
public ConcurrentOpenHashMap<String, PersistentSubscription> getSubscriptions() {
return subscriptions;
}
@Override
public PersistentSubscription getSubscription(String subscriptionName) {
return subscriptions.get(subscriptionName);
}
@Override
public ConcurrentOpenHashMap<String, Replicator> getReplicators() {
return replicators;
}
public Replicator getPersistentReplicator(String remoteCluster) {
return replicators.get(remoteCluster);
}
public ManagedLedger getManagedLedger() {
return ledger;
}
@Override
public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats, StatsOutputStream topicStatsStream,
ClusterReplicationMetrics replStats, String namespace, boolean hydratePublishers) {
TopicStatsHelper topicStatsHelper = threadLocalTopicStats.get();
topicStatsHelper.reset();
replicators.forEach((region, replicator) -> replicator.updateRates());
nsStats.producerCount += producers.size();
bundleStats.producerCount += producers.size();
topicStatsStream.startObject(topic);
// start publisher stats
topicStatsStream.startList("publishers");
producers.values().forEach(producer -> {
producer.updateRates();
PublisherStats publisherStats = producer.getStats();
topicStatsHelper.aggMsgRateIn += publisherStats.msgRateIn;
topicStatsHelper.aggMsgThroughputIn += publisherStats.msgThroughputIn;
if (producer.isRemote()) {
topicStatsHelper.remotePublishersStats.put(producer.getRemoteCluster(), publisherStats);
}
// Populate consumer specific stats here
if (hydratePublishers) {
StreamingStats.writePublisherStats(topicStatsStream, publisherStats);
}
});
topicStatsStream.endList();
// if publish-rate increases (eg: 0 to 1K) then pick max publish-rate and if publish-rate decreases then keep
// average rate.
lastUpdatedAvgPublishRateInMsg = topicStatsHelper.aggMsgRateIn > lastUpdatedAvgPublishRateInMsg
? topicStatsHelper.aggMsgRateIn
: (topicStatsHelper.aggMsgRateIn + lastUpdatedAvgPublishRateInMsg) / 2;
lastUpdatedAvgPublishRateInByte = topicStatsHelper.aggMsgThroughputIn > lastUpdatedAvgPublishRateInByte
? topicStatsHelper.aggMsgThroughputIn
: (topicStatsHelper.aggMsgThroughputIn + lastUpdatedAvgPublishRateInByte) / 2;
// Start replicator stats
topicStatsStream.startObject("replication");
nsStats.replicatorCount += topicStatsHelper.remotePublishersStats.size();
replicators.forEach((cluster, replicator) -> {
// Update replicator cursor state
try {
((PersistentReplicator) replicator).updateCursorState();
} catch (Exception e) {
log.warn("[{}] Failed to update cursro state ", topic, e);
}
// Update replicator stats
ReplicatorStats rStat = replicator.getStats();
// Add incoming msg rates
PublisherStats pubStats = topicStatsHelper.remotePublishersStats.get(replicator.getRemoteCluster());
rStat.msgRateIn = pubStats != null ? pubStats.msgRateIn : 0;
rStat.msgThroughputIn = pubStats != null ? pubStats.msgThroughputIn : 0;
rStat.inboundConnection = pubStats != null ? pubStats.getAddress() : null;
rStat.inboundConnectedSince = pubStats != null ? pubStats.getConnectedSince() : null;
topicStatsHelper.aggMsgRateOut += rStat.msgRateOut;
topicStatsHelper.aggMsgThroughputOut += rStat.msgThroughputOut;
// Populate replicator specific stats here
topicStatsStream.startObject(cluster);
topicStatsStream.writePair("connected", rStat.connected);
topicStatsStream.writePair("msgRateExpired", rStat.msgRateExpired);
topicStatsStream.writePair("msgRateIn", rStat.msgRateIn);
topicStatsStream.writePair("msgRateOut", rStat.msgRateOut);
topicStatsStream.writePair("msgThroughputIn", rStat.msgThroughputIn);
topicStatsStream.writePair("msgThroughputOut", rStat.msgThroughputOut);
topicStatsStream.writePair("replicationBacklog", rStat.replicationBacklog);
topicStatsStream.writePair("replicationDelayInSeconds", rStat.replicationDelayInSeconds);
topicStatsStream.writePair("inboundConnection", rStat.inboundConnection);
topicStatsStream.writePair("inboundConnectedSince", rStat.inboundConnectedSince);
topicStatsStream.writePair("outboundConnection", rStat.outboundConnection);
topicStatsStream.writePair("outboundConnectedSince", rStat.outboundConnectedSince);
topicStatsStream.endObject();
nsStats.msgReplBacklog += rStat.replicationBacklog;
if (replStats.isMetricsEnabled()) {
String namespaceClusterKey = replStats.getKeyName(namespace, cluster);
ReplicationMetrics replicationMetrics = replStats.get(namespaceClusterKey);
boolean update = false;
if (replicationMetrics == null) {
replicationMetrics = ReplicationMetrics.get();
update = true;
}
replicationMetrics.connected += rStat.connected ? 1 : 0;
replicationMetrics.msgRateOut += rStat.msgRateOut;
replicationMetrics.msgThroughputOut += rStat.msgThroughputOut;
replicationMetrics.msgReplBacklog += rStat.replicationBacklog;
if (update) {
replStats.put(namespaceClusterKey, replicationMetrics);
}
// replication delay for a namespace is the max repl-delay among all the topics under this namespace
if (rStat.replicationDelayInSeconds > replicationMetrics.maxMsgReplDelayInSeconds) {
replicationMetrics.maxMsgReplDelayInSeconds = rStat.replicationDelayInSeconds;
}
}
});
// Close replication
topicStatsStream.endObject();
// Start subscription stats
topicStatsStream.startObject("subscriptions");
nsStats.subsCount += subscriptions.size();
subscriptions.forEach((subscriptionName, subscription) -> {
double subMsgRateOut = 0;
double subMsgThroughputOut = 0;
double subMsgRateRedeliver = 0;
// Start subscription name & consumers
try {
topicStatsStream.startObject(subscriptionName);
topicStatsStream.startList("consumers");
for (Consumer consumer : subscription.getConsumers()) {
++nsStats.consumerCount;
++bundleStats.consumerCount;
consumer.updateRates();
ConsumerStats consumerStats = consumer.getStats();
subMsgRateOut += consumerStats.msgRateOut;
subMsgThroughputOut += consumerStats.msgThroughputOut;
subMsgRateRedeliver += consumerStats.msgRateRedeliver;
StreamingStats.writeConsumerStats(topicStatsStream, subscription.getType(), consumerStats);
}
// Close Consumer stats
topicStatsStream.endList();
// Populate subscription specific stats here
topicStatsStream.writePair("msgBacklog",
subscription.getNumberOfEntriesInBacklog(true));
topicStatsStream.writePair("msgRateExpired", subscription.getExpiredMessageRate());
topicStatsStream.writePair("msgRateOut", subMsgRateOut);
topicStatsStream.writePair("msgThroughputOut", subMsgThroughputOut);
topicStatsStream.writePair("msgRateRedeliver", subMsgRateRedeliver);
topicStatsStream.writePair("numberOfEntriesSinceFirstNotAckedMessage", subscription.getNumberOfEntriesSinceFirstNotAckedMessage());
topicStatsStream.writePair("totalNonContiguousDeletedMessagesRange", subscription.getTotalNonContiguousDeletedMessagesRange());
topicStatsStream.writePair("type", subscription.getTypeString());
if (Subscription.isIndividualAckMode(subscription.getType())) {
if(subscription.getDispatcher() instanceof PersistentDispatcherMultipleConsumers) {
PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers)subscription.getDispatcher();
topicStatsStream.writePair("blockedSubscriptionOnUnackedMsgs", dispatcher.isBlockedDispatcherOnUnackedMsgs());
topicStatsStream.writePair("unackedMessages", dispatcher.getTotalUnackedMessages());
}
}
// Close consumers
topicStatsStream.endObject();
topicStatsHelper.aggMsgRateOut += subMsgRateOut;
topicStatsHelper.aggMsgThroughputOut += subMsgThroughputOut;
nsStats.msgBacklog += subscription.getNumberOfEntriesInBacklog(false);
} catch (Exception e) {
log.error("Got exception when creating consumer stats for subscription {}: {}", subscriptionName,
e.getMessage(), e);
}
});
// Close subscription
topicStatsStream.endObject();
// Remaining dest stats.
topicStatsHelper.averageMsgSize = topicStatsHelper.aggMsgRateIn == 0.0 ? 0.0
: (topicStatsHelper.aggMsgThroughputIn / topicStatsHelper.aggMsgRateIn);
topicStatsStream.writePair("producerCount", producers.size());
topicStatsStream.writePair("averageMsgSize", topicStatsHelper.averageMsgSize);
topicStatsStream.writePair("msgRateIn", topicStatsHelper.aggMsgRateIn);
topicStatsStream.writePair("msgRateOut", topicStatsHelper.aggMsgRateOut);
topicStatsStream.writePair("msgInCount", getMsgInCounter());
topicStatsStream.writePair("bytesInCount", getBytesInCounter());
topicStatsStream.writePair("msgOutCount", getMsgOutCounter());
topicStatsStream.writePair("bytesOutCount", getBytesOutCounter());
topicStatsStream.writePair("msgThroughputIn", topicStatsHelper.aggMsgThroughputIn);
topicStatsStream.writePair("msgThroughputOut", topicStatsHelper.aggMsgThroughputOut);
topicStatsStream.writePair("storageSize", ledger.getTotalSize());
topicStatsStream.writePair("backlogSize", ledger.getEstimatedBacklogSize());
topicStatsStream.writePair("pendingAddEntriesCount", ((ManagedLedgerImpl) ledger).getPendingAddEntriesCount());
nsStats.msgRateIn += topicStatsHelper.aggMsgRateIn;
nsStats.msgRateOut += topicStatsHelper.aggMsgRateOut;
nsStats.msgThroughputIn += topicStatsHelper.aggMsgThroughputIn;
nsStats.msgThroughputOut += topicStatsHelper.aggMsgThroughputOut;
nsStats.storageSize += ledger.getEstimatedBacklogSize();
bundleStats.msgRateIn += topicStatsHelper.aggMsgRateIn;
bundleStats.msgRateOut += topicStatsHelper.aggMsgRateOut;
bundleStats.msgThroughputIn += topicStatsHelper.aggMsgThroughputIn;
bundleStats.msgThroughputOut += topicStatsHelper.aggMsgThroughputOut;
bundleStats.cacheSize += ((ManagedLedgerImpl) ledger).getCacheSize();
// Close topic object
topicStatsStream.endObject();
// add publish-latency metrics
this.addEntryLatencyStatsUsec.refresh();
NamespaceStats.add(this.addEntryLatencyStatsUsec.getBuckets(), nsStats.addLatencyBucket);
this.addEntryLatencyStatsUsec.reset();
}
public double getLastUpdatedAvgPublishRateInMsg() {
return lastUpdatedAvgPublishRateInMsg;
}
public double getLastUpdatedAvgPublishRateInByte() {
return lastUpdatedAvgPublishRateInByte;
}
@Override
public TopicStats getStats(boolean getPreciseBacklog, boolean subscriptionBacklogSize) {
TopicStats stats = new TopicStats();
ObjectObjectHashMap<String, PublisherStats> remotePublishersStats = new ObjectObjectHashMap<String, PublisherStats>();
producers.values().forEach(producer -> {
PublisherStats publisherStats = producer.getStats();
stats.msgRateIn += publisherStats.msgRateIn;
stats.msgThroughputIn += publisherStats.msgThroughputIn;
if (producer.isRemote()) {
remotePublishersStats.put(producer.getRemoteCluster(), publisherStats);
} else {
stats.publishers.add(publisherStats);
}
});
stats.averageMsgSize = stats.msgRateIn == 0.0 ? 0.0 : (stats.msgThroughputIn / stats.msgRateIn);
stats.msgInCounter = getMsgInCounter();
stats.bytesInCounter = getBytesInCounter();
stats.msgChunkPublished = this.msgChunkPublished;
subscriptions.forEach((name, subscription) -> {
SubscriptionStats subStats = subscription.getStats(getPreciseBacklog, subscriptionBacklogSize);
stats.msgRateOut += subStats.msgRateOut;
stats.msgThroughputOut += subStats.msgThroughputOut;
stats.bytesOutCounter += subStats.bytesOutCounter;
stats.msgOutCounter += subStats.msgOutCounter;
stats.subscriptions.put(name, subStats);
stats.nonContiguousDeletedMessagesRanges += subStats.nonContiguousDeletedMessagesRanges;
stats.nonContiguousDeletedMessagesRangesSerializedSize +=
subStats.nonContiguousDeletedMessagesRangesSerializedSize;
});
replicators.forEach((cluster, replicator) -> {
ReplicatorStats replicatorStats = replicator.getStats();
// Add incoming msg rates
PublisherStats pubStats = remotePublishersStats.get(replicator.getRemoteCluster());
if (pubStats != null) {
replicatorStats.msgRateIn = pubStats.msgRateIn;
replicatorStats.msgThroughputIn = pubStats.msgThroughputIn;
replicatorStats.inboundConnection = pubStats.getAddress();
replicatorStats.inboundConnectedSince = pubStats.getConnectedSince();
}
stats.msgRateOut += replicatorStats.msgRateOut;
stats.msgThroughputOut += replicatorStats.msgThroughputOut;
stats.replication.put(replicator.getRemoteCluster(), replicatorStats);
});
stats.storageSize = ledger.getTotalSize();
stats.backlogSize = ledger.getEstimatedBacklogSize();
stats.deduplicationStatus = messageDeduplication.getStatus().toString();
stats.offloadedStorageSize = ledger.getOffloadedSize();
return stats;
}
@Override
public CompletableFuture<PersistentTopicInternalStats> getInternalStats(boolean includeLedgerMetadata) {
CompletableFuture<PersistentTopicInternalStats> statFuture = new CompletableFuture<>();
PersistentTopicInternalStats stats = new PersistentTopicInternalStats();
ManagedLedgerImpl ml = (ManagedLedgerImpl) ledger;
stats.entriesAddedCounter = ml.getEntriesAddedCounter();
stats.numberOfEntries = ml.getNumberOfEntries();
stats.totalSize = ml.getTotalSize();
stats.currentLedgerEntries = ml.getCurrentLedgerEntries();
stats.currentLedgerSize = ml.getCurrentLedgerSize();
stats.lastLedgerCreatedTimestamp = DateFormatter.format(ml.getLastLedgerCreatedTimestamp());
if (ml.getLastLedgerCreationFailureTimestamp() != 0) {
stats.lastLedgerCreationFailureTimestamp = DateFormatter.format(ml.getLastLedgerCreationFailureTimestamp());
}
stats.waitingCursorsCount = ml.getWaitingCursorsCount();
stats.pendingAddEntriesCount = ml.getPendingAddEntriesCount();
stats.lastConfirmedEntry = ml.getLastConfirmedEntry().toString();
stats.state = ml.getState();
stats.ledgers = Lists.newArrayList();
List<CompletableFuture<String>> futures = includeLedgerMetadata ? Lists.newArrayList() : null;
CompletableFuture<Set<String>> availableBookiesFuture = brokerService.pulsar().getAvailableBookiesAsync();
availableBookiesFuture.whenComplete((bookies, e) -> {
if (e != null) {
log.error("[{}] Failed to fetch available bookies.", topic, e);
statFuture.completeExceptionally(e);
} else {
ml.getLedgersInfo().forEach((id, li) -> {
LedgerInfo info = new LedgerInfo();
info.ledgerId = li.getLedgerId();
info.entries = li.getEntries();
info.size = li.getSize();
info.offloaded = li.hasOffloadContext() && li.getOffloadContext().getComplete();
stats.ledgers.add(info);
if (futures != null) {
futures.add(ml.getLedgerMetadata(li.getLedgerId()).handle((lMetadata, ex) -> {
if (ex == null) {
info.metadata = lMetadata;
}
return null;
}));
futures.add(ml.getEnsemblesAsync(li.getLedgerId()).handle((ensembles, ex) -> {
if (ex == null) {
info.underReplicated = !bookies.containsAll(ensembles.stream().map(BookieId::toString)
.collect(Collectors.toList()));
}
return null;
}));
}
});
}
});
// Add ledger info for compacted topic ledger if exist.
LedgerInfo info = new LedgerInfo();
info.ledgerId = -1;
info.entries = -1;
info.size = -1;
try {
Optional<CompactedTopicImpl.CompactedTopicContext> compactedTopicContext = ((CompactedTopicImpl) compactedTopic)
.getCompactedTopicContext();
if (compactedTopicContext.isPresent()) {
CompactedTopicImpl.CompactedTopicContext ledgerContext = compactedTopicContext.get();
info.ledgerId = ledgerContext.getLedger().getId();
info.entries = ledgerContext.getLedger().getLastAddConfirmed() + 1;
info.size = ledgerContext.getLedger().getLength();
}
} catch (ExecutionException | InterruptedException e) {
log.warn("[{}]Fail to get ledger information for compacted topic.", topic);
}
stats.compactedLedger = info;
stats.cursors = Maps.newTreeMap();
ml.getCursors().forEach(c -> {
ManagedCursorImpl cursor = (ManagedCursorImpl) c;
CursorStats cs = new CursorStats();
cs.markDeletePosition = cursor.getMarkDeletedPosition().toString();
cs.readPosition = cursor.getReadPosition().toString();
cs.waitingReadOp = cursor.hasPendingReadRequest();
cs.pendingReadOps = cursor.getPendingReadOpsCount();
cs.messagesConsumedCounter = cursor.getMessagesConsumedCounter();
cs.cursorLedger = cursor.getCursorLedger();
cs.cursorLedgerLastEntry = cursor.getCursorLedgerLastEntry();
cs.individuallyDeletedMessages = cursor.getIndividuallyDeletedMessages();
cs.lastLedgerSwitchTimestamp = DateFormatter.format(cursor.getLastLedgerSwitchTimestamp());
cs.state = cursor.getState();
cs.numberOfEntriesSinceFirstNotAckedMessage = cursor.getNumberOfEntriesSinceFirstNotAckedMessage();
cs.totalNonContiguousDeletedMessagesRange = cursor.getTotalNonContiguousDeletedMessagesRange();
cs.properties = cursor.getProperties();
stats.cursors.put(cursor.getName(), cs);
});
//Schema store ledgers
String schemaId;
try {
schemaId = TopicName.get(topic).getSchemaName();
} catch (Throwable t) {
statFuture.completeExceptionally(t);
return statFuture;
}
CompletableFuture<Void> schemaStoreLedgersFuture = new CompletableFuture<>();
stats.schemaLedgers = Collections.synchronizedList(new ArrayList<>());
if (brokerService.getPulsar().getSchemaStorage() != null
&& brokerService.getPulsar().getSchemaStorage() instanceof BookkeeperSchemaStorage) {
((BookkeeperSchemaStorage) brokerService.getPulsar().getSchemaStorage())
.getStoreLedgerIdsBySchemaId(schemaId)
.thenAccept(ledgers -> {
List<CompletableFuture<Void>> getLedgerMetadataFutures = new ArrayList<>();
ledgers.forEach(ledgerId -> {
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
getLedgerMetadataFutures.add(completableFuture);
brokerService.getPulsar().getBookKeeperClient()
.getLedgerMetadata(ledgerId)
.thenAccept(metadata -> {
LedgerInfo schemaLedgerInfo = new LedgerInfo();
schemaLedgerInfo.ledgerId = metadata.getLedgerId();
schemaLedgerInfo.entries = metadata.getLastEntryId() + 1;
schemaLedgerInfo.size = metadata.getLength();
if (includeLedgerMetadata) {
info.metadata = metadata.toSafeString();
}
stats.schemaLedgers.add(schemaLedgerInfo);
completableFuture.complete(null);
}).exceptionally(e -> {
completableFuture.completeExceptionally(e);
return null;
});
});
FutureUtil.waitForAll(getLedgerMetadataFutures).thenRun(() -> {
schemaStoreLedgersFuture.complete(null);
}).exceptionally(e -> {
schemaStoreLedgersFuture.completeExceptionally(e);
return null;
});
}).exceptionally(e -> {
schemaStoreLedgersFuture.completeExceptionally(e);
return null;
});
} else {
schemaStoreLedgersFuture.complete(null);
}
schemaStoreLedgersFuture.thenRun(() -> {
if (futures != null) {
FutureUtil.waitForAll(futures).handle((res, ex) -> {
statFuture.complete(stats);
return null;
});
} else {
statFuture.complete(stats);
}
}).exceptionally(e -> {
statFuture.completeExceptionally(e);
return null;
});
return statFuture;
}
public long getBacklogSize() {
return ledger.getEstimatedBacklogSize();
}
public boolean isActive(InactiveTopicDeleteMode deleteMode) {
switch (deleteMode) {
case delete_when_no_subscriptions:
if (!subscriptions.isEmpty()) {
return true;
}
break;
case delete_when_subscriptions_caught_up:
if (hasBacklogs()) {
return true;
}
break;
}
if (TopicName.get(topic).isGlobal()) {
// no local producers
return hasLocalProducers();
} else {
return USAGE_COUNT_UPDATER.get(this) != 0;
}
}
private boolean hasBacklogs() {
return subscriptions.values().stream().anyMatch(sub -> sub.getNumberOfEntriesInBacklog(false) > 0);
}
@Override
public void checkGC() {
if (!isDeleteWhileInactive()) {
// This topic is not included in GC
return;
}
InactiveTopicDeleteMode deleteMode = inactiveTopicPolicies.getInactiveTopicDeleteMode();
int maxInactiveDurationInSec = inactiveTopicPolicies.getMaxInactiveDurationSeconds();
if (isActive(deleteMode)) {
lastActive = System.nanoTime();
} else if (System.nanoTime() - lastActive < TimeUnit.SECONDS.toNanos(maxInactiveDurationInSec)) {
// Gc interval did not expire yet
return;
} else if (shouldTopicBeRetained()) {
// Topic activity is still within the retention period
return;
} else {
CompletableFuture<Void> replCloseFuture = new CompletableFuture<>();
if (TopicName.get(topic).isGlobal()) {
// For global namespace, close repl producers first.
// Once all repl producers are closed, we can delete the topic,
// provided no remote producers connected to the broker.
if (log.isDebugEnabled()) {
log.debug("[{}] Global topic inactive for {} seconds, closing repl producers.", topic,
maxInactiveDurationInSec);
}
closeReplProducersIfNoBacklog().thenRun(() -> {
if (hasRemoteProducers()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Global topic has connected remote producers. Not a candidate for GC",
topic);
}
replCloseFuture
.completeExceptionally(new TopicBusyException("Topic has connected remote producers"));
} else {
log.info("[{}] Global topic inactive for {} seconds, closed repl producers", topic,
maxInactiveDurationInSec);
replCloseFuture.complete(null);
}
}).exceptionally(e -> {
if (log.isDebugEnabled()) {
log.debug("[{}] Global topic has replication backlog. Not a candidate for GC", topic);
}
replCloseFuture.completeExceptionally(e.getCause());
return null;
});
} else {
replCloseFuture.complete(null);
}
replCloseFuture.thenCompose(v -> delete(deleteMode == InactiveTopicDeleteMode.delete_when_no_subscriptions,
deleteMode == InactiveTopicDeleteMode.delete_when_subscriptions_caught_up, true))
.thenApply((res) -> tryToDeletePartitionedMetadata())
.thenRun(() -> log.info("[{}] Topic deleted successfully due to inactivity", topic))
.exceptionally(e -> {
if (e.getCause() instanceof TopicBusyException) {
// topic became active again
if (log.isDebugEnabled()) {
log.debug("[{}] Did not delete busy topic: {}", topic, e.getCause().getMessage());
}
} else {
log.warn("[{}] Inactive topic deletion failed", topic, e);
}
return null;
});
}
}
private CompletableFuture<Void> tryToDeletePartitionedMetadata() {
if (TopicName.get(topic).isPartitioned() && !deletePartitionedTopicMetadataWhileInactive()) {
return CompletableFuture.completedFuture(null);
}
TopicName topicName = TopicName.get(TopicName.get(topic).getPartitionedTopicName());
String path = AdminResource.path(AdminResource.PARTITIONED_TOPIC_PATH_ZNODE, topicName.getNamespace()
, topicName.getDomain().value(), topicName.getEncodedLocalName());
try {
if (topicName.isPartitioned() && !getBrokerService().pulsar().getGlobalZkCache().exists(path)) {
return CompletableFuture.completedFuture(null);
}
CompletableFuture<Void> deleteMetadataFuture = new CompletableFuture<>();
getBrokerService().fetchPartitionedTopicMetadataAsync(TopicName.get(topicName.getPartitionedTopicName()))
.thenAccept((metadata -> {
// make sure all sub partitions were deleted
String managedPath = String.format("/managed-ledgers/%s/%s", topicName.getNamespace()
, topicName.getDomain().value());
Set<String> cache = null;
try {
cache = brokerService.pulsar().getLocalZkCacheService().managedLedgerListCache().get(managedPath);
} catch (Exception e) {
deleteMetadataFuture.completeExceptionally(e);
}
if (cache == null) {
return;
}
for (int i = 0; i < metadata.partitions; i++) {
if (cache.contains(topicName.getPartition(i).getLocalName())) {
throw new UnsupportedOperationException();
}
}
}))
.thenAccept((res) -> getBrokerService().pulsar().getGlobalZkCache().getZooKeeper().delete(path, -1
, (rc, s, o) -> {
if (KeeperException.Code.OK.intValue() == rc
|| KeeperException.Code.NONODE.intValue() == rc) {
getBrokerService().pulsar().getGlobalZkCache().invalidate(path);
deleteMetadataFuture.complete(null);
} else {
deleteMetadataFuture.completeExceptionally(
KeeperException.create(KeeperException.Code.get(rc)));
}
}, null))
.exceptionally((e) -> {
if (!(e.getCause() instanceof UnsupportedOperationException)) {
log.error("delete metadata fail", e);
}
deleteMetadataFuture.complete(null);
return null;
});
return deleteMetadataFuture;
} catch (Exception e) {
return FutureUtil.failedFuture(e);
}
}
@Override
public void checkInactiveSubscriptions() {
TopicName name = TopicName.get(topic);
try {
Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, name.getNamespace()))
.orElseThrow(() -> new KeeperException.NoNodeException());
final int defaultExpirationTime = brokerService.pulsar().getConfiguration()
.getSubscriptionExpirationTimeMinutes();
final long expirationTimeMillis = TimeUnit.MINUTES
.toMillis((policies.subscription_expiration_time_minutes <= 0 && defaultExpirationTime > 0)
? defaultExpirationTime
: policies.subscription_expiration_time_minutes);
if (expirationTimeMillis > 0) {
subscriptions.forEach((subName, sub) -> {
if (sub.dispatcher != null && sub.dispatcher.isConsumerConnected() || sub.isReplicated()) {
return;
}
if (System.currentTimeMillis() - sub.cursor.getLastActive() > expirationTimeMillis) {
sub.delete().thenAccept(v -> log.info("[{}][{}] The subscription was deleted due to expiration",
topic, subName));
}
});
}
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug("[{}] Error getting policies", topic);
}
}
}
@Override
public void checkBackloggedCursors() {
// activate caught up cursors which include consumers
subscriptions.forEach((subName, subscription) -> {
if (!subscription.getConsumers().isEmpty()
&& subscription.getCursor().getNumberOfEntries() < backloggedCursorThresholdEntries) {
subscription.getCursor().setActive();
} else {
subscription.getCursor().setInactive();
}
});
}
@Override
public void checkDeduplicationSnapshot() {
messageDeduplication.takeSnapshot();
}
/**
* Check whether the topic should be retained (based on time), even tough there are no producers/consumers and it's
* marked as inactive.
*/
private boolean shouldTopicBeRetained() {
TopicName name = TopicName.get(topic);
RetentionPolicies retentionPolicies = null;
try {
retentionPolicies = Optional.ofNullable(getTopicPolicies(name))
.map(TopicPolicies::getRetentionPolicies)
.orElse(null);
if (retentionPolicies == null){
retentionPolicies = brokerService.pulsar().getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, name.getNamespace()))
.map(p -> p.retention_policies)
.orElse(null);
}
if (retentionPolicies == null){
// If no policies, the default is to have no retention and delete the inactive topic
retentionPolicies = new RetentionPolicies(
brokerService.pulsar().getConfiguration().getDefaultRetentionTimeInMinutes(),
brokerService.pulsar().getConfiguration().getDefaultRetentionSizeInMB());
}
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug("[{}] Error getting policies", topic);
}
// Don't delete in case we cannot get the policies
return true;
}
long retentionTime = TimeUnit.MINUTES.toNanos(retentionPolicies.getRetentionTimeInMinutes());
// Negative retention time means the topic should be retained indefinitely,
// because its own data has to be retained
return retentionTime < 0 || (System.nanoTime() - lastActive) < retentionTime;
}
@Override
public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
if (log.isDebugEnabled()) {
log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic, isEncryptionRequired, data.encryption_required);
}
if (data.deleted) {
log.debug("Ignore the update because it has been deleted : {}", data);
return CompletableFuture.completedFuture(null);
}
isEncryptionRequired = data.encryption_required;
setSchemaCompatibilityStrategy(data);
isAllowAutoUpdateSchema = data.is_allow_auto_update_schema;
schemaValidationEnforced = data.schema_validation_enforced;
maxUnackedMessagesOnConsumer = unackedMessagesExceededOnConsumer(data);
maxUnackedMessagesOnSubscription = unackedMessagesExceededOnSubscription(data);
maxSubscriptionsPerTopic = data.max_subscriptions_per_topic;
if (data.delayed_delivery_policies != null) {
delayedDeliveryTickTimeMillis = data.delayed_delivery_policies.getTickTime();
delayedDeliveryEnabled = data.delayed_delivery_policies.isActive();
}
//If the topic-level policy already exists, the namespace-level policy cannot override the topic-level policy.
TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
if (data.inactive_topic_policies != null) {
if (topicPolicies == null || !topicPolicies.isInactiveTopicPoliciesSet()) {
this.inactiveTopicPolicies = data.inactive_topic_policies;
}
} else {
ServiceConfiguration cfg = brokerService.getPulsar().getConfiguration();
resetInactiveTopicPolicies(cfg.getBrokerDeleteInactiveTopicsMode()
, cfg.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(), cfg.isBrokerDeleteInactiveTopicsEnabled());
}
initializeDispatchRateLimiterIfNeeded(Optional.ofNullable(data));
this.updateMaxPublishRate(data);
producers.values().forEach(producer -> {
producer.checkPermissions();
producer.checkEncryption();
});
subscriptions.forEach((subName, sub) -> {
sub.getConsumers().forEach(Consumer::checkPermissions);
Dispatcher dispatcher = sub.getDispatcher();
// If the topic-level policy already exists, the namespace-level policy cannot override
// the topic-level policy.
if (dispatcher != null && (topicPolicies == null || !topicPolicies.isSubscriptionDispatchRateSet())) {
dispatcher.getRateLimiter().ifPresent(rateLimiter -> rateLimiter.onPoliciesUpdate(data));
}
});
replicators.forEach((name, replicator) ->
replicator.getRateLimiter().ifPresent(rateLimiter -> rateLimiter.onPoliciesUpdate(data))
);
checkMessageExpiry();
CompletableFuture<Void> replicationFuture = checkReplicationAndRetryOnFailure();
CompletableFuture<Void> dedupFuture = checkDeduplicationStatus();
CompletableFuture<Void> persistentPoliciesFuture = checkPersistencePolicies();
// update rate-limiter if policies updated
if (this.dispatchRateLimiter.isPresent()) {
if (topicPolicies == null || !topicPolicies.isDispatchRateSet()) {
dispatchRateLimiter.get().onPoliciesUpdate(data);
}
}
if (this.subscribeRateLimiter.isPresent()) {
subscribeRateLimiter.get().onPoliciesUpdate(data);
}
return CompletableFuture.allOf(replicationFuture, dedupFuture, persistentPoliciesFuture);
}
/**
*
* @return Backlog quota for topic
*/
@Override
public BacklogQuota getBacklogQuota() {
TopicName topicName = TopicName.get(this.getName());
return brokerService.getBacklogQuotaManager().getBacklogQuota(topicName);
}
/**
*
* @return quota exceeded status for blocking producer creation
*/
@Override
public boolean isBacklogQuotaExceeded(String producerName) {
BacklogQuota backlogQuota = getBacklogQuota();
if (backlogQuota != null) {
BacklogQuota.RetentionPolicy retentionPolicy = backlogQuota.getPolicy();
if ((retentionPolicy == BacklogQuota.RetentionPolicy.producer_request_hold
|| retentionPolicy == BacklogQuota.RetentionPolicy.producer_exception)
&& isBacklogExceeded()) {
log.info("[{}] Backlog quota exceeded. Cannot create producer [{}]", this.getName(), producerName);
return true;
} else {
return false;
}
}
return false;
}
/**
* @return determine if quota enforcement needs to be done for topic
*/
public boolean isBacklogExceeded() {
TopicName topicName = TopicName.get(getName());
long backlogQuotaLimitInBytes = brokerService.getBacklogQuotaManager().getBacklogQuotaLimit(topicName);
if (backlogQuotaLimitInBytes < 0) {
return false;
}
if (log.isDebugEnabled()) {
log.debug("[{}] - backlog quota limit = [{}]", getName(), backlogQuotaLimitInBytes);
}
// check if backlog exceeded quota
long storageSize = getBacklogSize();
if (log.isDebugEnabled()) {
log.debug("[{}] Storage size = [{}], limit [{}]", getName(), storageSize, backlogQuotaLimitInBytes);
}
return (storageSize >= backlogQuotaLimitInBytes);
}
@Override
public boolean isReplicated() {
return !replicators.isEmpty();
}
public CompletableFuture<MessageId> terminate() {
CompletableFuture<MessageId> future = new CompletableFuture<>();
ledger.asyncTerminate(new TerminateCallback() {
@Override
public void terminateComplete(Position lastCommittedPosition, Object ctx) {
producers.values().forEach(Producer::disconnect);
subscriptions.forEach((name, sub) -> sub.topicTerminated());
PositionImpl lastPosition = (PositionImpl) lastCommittedPosition;
MessageId messageId = new MessageIdImpl(lastPosition.getLedgerId(), lastPosition.getEntryId(), -1);
log.info("[{}] Topic terminated at {}", getName(), messageId);
future.complete(messageId);
}
@Override
public void terminateFailed(ManagedLedgerException exception, Object ctx) {
future.completeExceptionally(exception);
}
}, null);
return future;
}
public boolean isOldestMessageExpired(ManagedCursor cursor, long messageTTLInSeconds) {
MessageImpl msg = null;
Entry entry = null;
boolean isOldestMessageExpired = false;
try {
entry = cursor.getNthEntry(1, IndividualDeletedEntries.Include);
if (entry != null) {
msg = MessageImpl.deserialize(entry.getDataBuffer());
isOldestMessageExpired = messageTTLInSeconds != 0 && System.currentTimeMillis() > (msg.getPublishTime()
+ TimeUnit.SECONDS.toMillis((long) (messageTTLInSeconds * MESSAGE_EXPIRY_THRESHOLD)));
}
} catch (Exception e) {
log.warn("[{}] Error while getting the oldest message", topic, e);
} finally {
if (entry != null) {
entry.release();
}
if (msg != null) {
msg.recycle();
}
}
return isOldestMessageExpired;
}
/**
* Clears backlog for all cursors in the topic
*
* @return
*/
public CompletableFuture<Void> clearBacklog() {
log.info("[{}] Clearing backlog on all cursors in the topic.", topic);
List<CompletableFuture<Void>> futures = Lists.newArrayList();
List<String> cursors = getSubscriptions().keys();
cursors.addAll(getReplicators().keys());
for (String cursor : cursors) {
futures.add(clearBacklog(cursor));
}
return FutureUtil.waitForAll(futures);
}
/**
* Clears backlog for a given cursor in the topic.
* <p>
* Note: For a replication cursor, just provide the remote cluster name
* </p>
*
* @param cursorName
* @return
*/
public CompletableFuture<Void> clearBacklog(String cursorName) {
log.info("[{}] Clearing backlog for cursor {} in the topic.", topic, cursorName);
PersistentSubscription sub = getSubscription(cursorName);
if (sub != null) {
return sub.clearBacklog();
}
PersistentReplicator repl = (PersistentReplicator) getPersistentReplicator(cursorName);
if (repl != null) {
return repl.clearBacklog();
}
return FutureUtil.failedFuture(new BrokerServiceException("Cursor not found"));
}
@Override
public Optional<DispatchRateLimiter> getDispatchRateLimiter() {
return this.dispatchRateLimiter;
}
public Optional<SubscribeRateLimiter> getSubscribeRateLimiter() {
return this.subscribeRateLimiter;
}
public long getLastPublishedSequenceId(String producerName) {
return messageDeduplication.getLastPublishedSequenceId(producerName);
}
@Override
public Position getLastPosition() {
return ledger.getLastConfirmedEntry();
}
@Override
public CompletableFuture<MessageId> getLastMessageId() {
CompletableFuture<MessageId> completableFuture = new CompletableFuture<>();
PositionImpl position = (PositionImpl) ledger.getLastConfirmedEntry();
String name = getName();
int partitionIndex = TopicName.getPartitionIndex(name);
if (log.isDebugEnabled()) {
log.debug("getLastMessageId {}, partitionIndex{}, position {}", name, partitionIndex, position);
}
if (position.getEntryId() == -1) {
completableFuture
.complete(new MessageIdImpl(position.getLedgerId(), position.getEntryId(), partitionIndex));
return completableFuture;
}
ManagedLedgerImpl ledgerImpl = (ManagedLedgerImpl) ledger;
if (!ledgerImpl.ledgerExists(position.getLedgerId())) {
completableFuture
.complete(MessageId.earliest);
return completableFuture;
}
ledgerImpl.asyncReadEntry(position, new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
PulsarApi.MessageMetadata metadata = Commands.parseMessageMetadata(entry.getDataBuffer());
if (metadata.hasNumMessagesInBatch()) {
completableFuture.complete(new BatchMessageIdImpl(position.getLedgerId(), position.getEntryId(),
partitionIndex, metadata.getNumMessagesInBatch() - 1));
} else {
completableFuture
.complete(new MessageIdImpl(position.getLedgerId(), position.getEntryId(), partitionIndex));
}
}
@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
completableFuture.completeExceptionally(exception);
}
}, null);
return completableFuture;
}
public synchronized void triggerCompaction()
throws PulsarServerException, AlreadyRunningException {
if (currentCompaction.isDone()) {
currentCompaction = brokerService.pulsar().getCompactor().compact(topic);
} else {
throw new AlreadyRunningException("Compaction already in progress");
}
}
public synchronized LongRunningProcessStatus compactionStatus() {
final CompletableFuture<Long> current;
synchronized (this) {
current = currentCompaction;
}
if (!current.isDone()) {
return LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.RUNNING);
} else {
try {
if (current.join() == COMPACTION_NEVER_RUN) {
return LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.NOT_RUN);
} else {
return LongRunningProcessStatus.forStatus(LongRunningProcessStatus.Status.SUCCESS);
}
} catch (CancellationException | CompletionException e) {
return LongRunningProcessStatus.forError(e.getMessage());
}
}
}
public synchronized void triggerOffload(MessageIdImpl messageId) throws AlreadyRunningException {
if (currentOffload.isDone()) {
CompletableFuture<MessageIdImpl> promise = currentOffload = new CompletableFuture<>();
log.info("[{}] Starting offload operation at messageId {}", topic, messageId);
getManagedLedger().asyncOffloadPrefix(
PositionImpl.get(messageId.getLedgerId(), messageId.getEntryId()),
new OffloadCallback() {
@Override
public void offloadComplete(Position pos, Object ctx) {
PositionImpl impl = (PositionImpl)pos;
log.info("[{}] Completed successfully offload operation at messageId {}", topic, messageId);
promise.complete(new MessageIdImpl(impl.getLedgerId(), impl.getEntryId(), -1));
}
@Override
public void offloadFailed(ManagedLedgerException exception, Object ctx) {
log.warn("[{}] Failed offload operation at messageId {}", topic, messageId, exception);
promise.completeExceptionally(exception);
}
}, null);
} else {
throw new AlreadyRunningException("Offload already in progress");
}
}
public synchronized OffloadProcessStatus offloadStatus() {
if (!currentOffload.isDone()) {
return OffloadProcessStatus.forStatus(LongRunningProcessStatus.Status.RUNNING);
} else {
try {
if (currentOffload.join() == MessageId.earliest) {
return OffloadProcessStatus.forStatus(LongRunningProcessStatus.Status.NOT_RUN);
} else {
return OffloadProcessStatus.forSuccess(currentOffload.join());
}
} catch (CancellationException | CompletionException e) {
log.warn("Failed to offload", e.getCause());
return OffloadProcessStatus.forError(e.getMessage());
}
}
}
/**
* Get message TTL for this topic.
* @return Message TTL in second.
*/
private int getMessageTTL() throws Exception {
//Return Topic level message TTL if exist. If topic level policy or message ttl is not set,
//fall back to namespace level message ttl then message ttl set for current broker.
TopicName name = TopicName.get(topic);
TopicPolicies topicPolicies = getTopicPolicies(name);
Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, name.getNamespace()))
.orElseThrow(KeeperException.NoNodeException::new);
if (topicPolicies != null && topicPolicies.isMessageTTLSet()) {
return topicPolicies.getMessageTTLInSeconds();
}
if (policies.message_ttl_in_seconds != null) {
return policies.message_ttl_in_seconds;
}
return brokerService.getPulsar().getConfiguration().getTtlDurationDefaultInSeconds();
}
private static final Logger log = LoggerFactory.getLogger(PersistentTopic.class);
@Override
public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schema) {
return hasSchema()
.thenCompose((hasSchema) -> {
int numActiveConsumers = subscriptions.values().stream()
.mapToInt(subscription -> subscription.getConsumers().size())
.sum();
if (hasSchema
|| (!producers.isEmpty())
|| (numActiveConsumers != 0)
|| (ledger.getTotalSize() != 0)) {
return checkSchemaCompatibleForConsumer(schema);
} else {
return addSchema(schema).thenCompose(schemaVersion ->
CompletableFuture.completedFuture(null));
}
});
}
private synchronized void checkReplicatedSubscriptionControllerState() {
AtomicBoolean shouldBeEnabled = new AtomicBoolean(false);
subscriptions.forEach((name, subscription) -> {
if (subscription.isReplicated()) {
shouldBeEnabled.set(true);
}
});
if (shouldBeEnabled.get() == false) {
log.info("[{}] There are no replicated subscriptions on the topic", topic);
}
checkReplicatedSubscriptionControllerState(shouldBeEnabled.get());
}
private synchronized void checkReplicatedSubscriptionControllerState(boolean shouldBeEnabled) {
boolean isCurrentlyEnabled = replicatedSubscriptionsController.isPresent();
boolean isEnableReplicatedSubscriptions = brokerService.pulsar().getConfiguration().isEnableReplicatedSubscriptions();
if (shouldBeEnabled && !isCurrentlyEnabled && isEnableReplicatedSubscriptions) {
log.info("[{}] Enabling replicated subscriptions controller", topic);
replicatedSubscriptionsController = Optional.of(new ReplicatedSubscriptionsController(this,
brokerService.pulsar().getConfiguration().getClusterName()));
} else if (isCurrentlyEnabled && !shouldBeEnabled || !isEnableReplicatedSubscriptions) {
log.info("[{}] Disabled replicated subscriptions controller", topic);
replicatedSubscriptionsController.ifPresent(ReplicatedSubscriptionsController::close);
replicatedSubscriptionsController = Optional.empty();
}
}
void receivedReplicatedSubscriptionMarker(Position position, int markerType, ByteBuf payload) {
ReplicatedSubscriptionsController ctrl = replicatedSubscriptionsController.orElse(null);
if (ctrl == null) {
// Force to start the replication controller
checkReplicatedSubscriptionControllerState(true /* shouldBeEnabled */);
ctrl = replicatedSubscriptionsController.get();
}
ctrl.receivedReplicatedSubscriptionMarker(position, markerType, payload);
}
Optional<ReplicatedSubscriptionsController> getReplicatedSubscriptionController() {
return replicatedSubscriptionsController;
}
public CompactedTopic getCompactedTopic() {
return compactedTopic;
}
@Override
public boolean isSystemTopic() {
return false;
}
private synchronized void fence() {
isFenced = true;
ScheduledFuture<?> monitoringTask = this.fencedTopicMonitoringTask;
if (monitoringTask == null || monitoringTask.isDone()) {
final int timeout = brokerService.pulsar().getConfiguration().getTopicFencingTimeoutSeconds();
if (timeout > 0) {
this.fencedTopicMonitoringTask = brokerService.executor().schedule(this::closeFencedTopicForcefully,
timeout, TimeUnit.SECONDS);
}
}
}
private synchronized void unfence() {
isFenced = false;
ScheduledFuture<?> monitoringTask = this.fencedTopicMonitoringTask;
if (monitoringTask != null && !monitoringTask.isDone()) {
monitoringTask.cancel(false);
}
}
private void closeFencedTopicForcefully() {
if (isFenced) {
final int timeout = brokerService.pulsar().getConfiguration().getTopicFencingTimeoutSeconds();
if (isClosingOrDeleting) {
log.warn("[{}] Topic remained fenced for {} seconds and is already closed (pendingWriteOps: {})", topic,
timeout, pendingWriteOps.get());
} else {
log.error("[{}] Topic remained fenced for {} seconds, so close it (pendingWriteOps: {})", topic,
timeout, pendingWriteOps.get());
close();
}
}
}
private void fenceTopicToCloseOrDelete() {
isClosingOrDeleting = true;
isFenced = true;
}
private void unfenceTopicToResume() {
isFenced = false;
isClosingOrDeleting = false;
}
@Override
public CompletableFuture<TransactionBuffer> getTransactionBuffer(boolean createIfMissing) {
if (transactionBuffer == null && createIfMissing) {
transactionBufferLock.lock();
try {
if (transactionBuffer == null) {
transactionBuffer = brokerService.getPulsar().getTransactionBufferProvider()
.newTransactionBuffer(this);
}
} finally {
transactionBufferLock.unlock();
}
}
return transactionBuffer;
}
@Override
public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishContext publishContext) {
pendingWriteOps.incrementAndGet();
if (isFenced) {
publishContext.completed(new TopicFencedException("fenced"), -1, -1);
decrementPendingWriteOpsAndCheck();
return;
}
if (isExceedMaximumMessageSize(headersAndPayload.readableBytes())) {
publishContext.completed(new NotAllowedException("Exceed maximum message size")
, -1, -1);
decrementPendingWriteOpsAndCheck();
return;
}
MessageDeduplication.MessageDupStatus status = messageDeduplication.isDuplicate(publishContext, headersAndPayload);
switch (status) {
case NotDup:
getTransactionBuffer(true)
.thenCompose(txnBuffer -> txnBuffer.appendBufferToTxn(
txnID, publishContext.getSequenceId(), headersAndPayload))
.thenAccept(position -> {
decrementPendingWriteOpsAndCheck();
publishContext.completed(null,
((PositionImpl)position).getLedgerId(), ((PositionImpl)position).getEntryId());
})
.exceptionally(throwable -> {
decrementPendingWriteOpsAndCheck();
publishContext.completed(new Exception(throwable), -1, -1);
return null;
});
break;
case Dup:
// Immediately acknowledge duplicated message
publishContext.completed(null, -1, -1);
decrementPendingWriteOpsAndCheck();
break;
default:
publishContext.completed(new MessageDeduplication.MessageDupUnknownException(), -1, -1);
decrementPendingWriteOpsAndCheck();
}
}
@Override
public CompletableFuture<Void> endTxn(TxnID txnID, int txnAction, List<MessageIdData> sendMessageList) {
CompletableFuture<Void> completableFuture = new CompletableFuture<>();
getTransactionBuffer(false).thenAccept(tb -> {
CompletableFuture<Void> future = new CompletableFuture<>();
if (PulsarApi.TxnAction.COMMIT_VALUE == txnAction) {
future = tb.commitTxn(txnID, sendMessageList);
} else if (PulsarApi.TxnAction.ABORT_VALUE == txnAction) {
future = tb.abortTxn(txnID, sendMessageList);
} else {
future.completeExceptionally(new Exception("Unsupported txnAction " + txnAction));
}
future.whenComplete((ignored, throwable) -> {
if (throwable != null) {
completableFuture.completeExceptionally(throwable);
return;
}
completableFuture.complete(null);
});
}).exceptionally(tbThrowable -> {
completableFuture.completeExceptionally(tbThrowable);
return null;
});
return completableFuture;
}
public long getDelayedDeliveryTickTimeMillis() {
TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
//Topic level setting has higher priority than namespace level
if (topicPolicies != null && topicPolicies.isDelayedDeliveryTickTimeMillisSet()) {
return topicPolicies.getDelayedDeliveryTickTimeMillis();
}
return delayedDeliveryTickTimeMillis;
}
public int getMaxUnackedMessagesOnConsumer() {
TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
if (topicPolicies != null && topicPolicies.isMaxUnackedMessagesOnConsumerSet()) {
return topicPolicies.getMaxUnackedMessagesOnConsumer();
}
return maxUnackedMessagesOnConsumer;
}
public boolean isDelayedDeliveryEnabled() {
TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
//Topic level setting has higher priority than namespace level
if (topicPolicies != null && topicPolicies.isDelayedDeliveryEnabledSet()) {
return topicPolicies.getDelayedDeliveryEnabled();
}
return delayedDeliveryEnabled;
}
public int getMaxUnackedMessagesOnSubscription() {
TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
//Topic level setting has higher priority than namespace level
if (topicPolicies != null && topicPolicies.isMaxUnackedMessagesOnSubscriptionSet()) {
return topicPolicies.getMaxUnackedMessagesOnSubscription();
}
return maxUnackedMessagesOnSubscription;
}
@Override
public void onUpdate(TopicPolicies policies) {
if (log.isDebugEnabled()) {
log.debug("[{}] update topic policy: {}", topic, policies);
}
if (policies == null) {
return;
}
Optional<Policies> namespacePolicies = getNamespacePolicies();
initializeTopicDispatchRateLimiterIfNeeded(policies);
dispatchRateLimiter.ifPresent(limiter -> {
if (policies.isDispatchRateSet()) {
dispatchRateLimiter.get().updateDispatchRate(policies.getDispatchRate());
} else {
dispatchRateLimiter.get().updateDispatchRate();
}
});
subscriptions.forEach((subName, sub) -> {
sub.getConsumers().forEach(Consumer::checkPermissions);
Dispatcher dispatcher = sub.getDispatcher();
dispatcher.updateRateLimiter(policies.getSubscriptionDispatchRate());
});
if (policies.getPublishRate() != null) {
updatePublishDispatcher(policies.getPublishRate());
} else {
updateMaxPublishRate(namespacePolicies.orElse(null));
}
if (policies.isInactiveTopicPoliciesSet()) {
inactiveTopicPolicies = policies.getInactiveTopicPolicies();
} else if (namespacePolicies.isPresent() && namespacePolicies.get().inactive_topic_policies != null) {
//topic-level policies is null , so use namespace-level
inactiveTopicPolicies = namespacePolicies.get().inactive_topic_policies;
} else {
//namespace-level policies is null , so use broker level
ServiceConfiguration cfg = brokerService.getPulsar().getConfiguration();
resetInactiveTopicPolicies(cfg.getBrokerDeleteInactiveTopicsMode()
, cfg.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(), cfg.isBrokerDeleteInactiveTopicsEnabled());
}
initializeTopicSubscribeRateLimiterIfNeeded(Optional.ofNullable(policies));
if (this.subscribeRateLimiter.isPresent() && policies != null) {
subscribeRateLimiter.ifPresent(subscribeRateLimiter ->
subscribeRateLimiter.onSubscribeRateUpdate(policies.getSubscribeRate()));
}
}
private Optional<Policies> getNamespacePolicies() {
return DispatchRateLimiter.getPolicies(brokerService, topic);
}
private void initializeTopicDispatchRateLimiterIfNeeded(TopicPolicies policies) {
synchronized (dispatchRateLimiter) {
if (!dispatchRateLimiter.isPresent() && policies.getDispatchRate() != null) {
this.dispatchRateLimiter = Optional.of(new DispatchRateLimiter(this, Type.TOPIC));
}
}
}
private void initializeTopicSubscribeRateLimiterIfNeeded(Optional<TopicPolicies> policies) {
synchronized (subscribeRateLimiter) {
if (!subscribeRateLimiter.isPresent() && policies.isPresent() &&
policies.get().getSubscribeRate() != null) {
this.subscribeRateLimiter = Optional.of(new SubscribeRateLimiter(this));
}
}
}
private PersistentTopic getPersistentTopic() {
return this;
}
private void registerTopicPolicyListener() {
if (brokerService.pulsar().getConfig().isSystemTopicEnabled() &&
brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) {
TopicName topicName = TopicName.get(topic);
TopicName cloneTopicName = topicName;
if (topicName.isPartitioned()) {
cloneTopicName = TopicName.get(topicName.getPartitionedTopicName());
}
brokerService.getPulsar().getTopicPoliciesService().registerListener(cloneTopicName, this);
}
}
@VisibleForTesting
public MessageDeduplication getMessageDeduplication() {
return messageDeduplication;
}
private boolean checkMaxSubscriptionsPerTopicExceed(String subscriptionName) {
//Existing subscriptions are not affected
if (StringUtils.isNotEmpty(subscriptionName) && getSubscription(subscriptionName) != null) {
return false;
}
TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
Integer maxSubsPerTopic = null;
if (topicPolicies != null && topicPolicies.isMaxSubscriptionsPerTopicSet()) {
maxSubsPerTopic = topicPolicies.getMaxSubscriptionsPerTopic();
}
if (maxSubsPerTopic == null) {
maxSubsPerTopic = maxSubscriptionsPerTopic;
}
if (maxSubsPerTopic == null) {
maxSubsPerTopic = brokerService.pulsar().getConfig().getMaxSubscriptionsPerTopic();
}
if (maxSubsPerTopic > 0) {
if (subscriptions != null && subscriptions.size() >= maxSubsPerTopic) {
return true;
}
}
return false;
}
}