blob: d242ed34099e98871fa717baf079a93ef1ff8b97 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.nonpersistent;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.bookkeeper.mledger.impl.EntryCacheManager.create;
import static org.apache.pulsar.common.policies.data.BacklogQuota.BacklogQuotaType;
import static org.apache.pulsar.common.protocol.Commands.DEFAULT_CONSUMER_EPOCH;
import com.carrotsearch.hppc.ObjectObjectHashMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.FastThreadLocal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.broker.service.AbstractReplicator;
import org.apache.pulsar.broker.service.AbstractTopic;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicFencedException;
import org.apache.pulsar.broker.service.BrokerServiceException.UnsupportedVersionException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.StreamingStats;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.SubscriptionOption;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.TopicPolicyListener;
import org.apache.pulsar.broker.service.TransportCnx;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType;
import org.apache.pulsar.common.api.proto.KeySharedMeta;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ManagedLedgerInternalStats.CursorStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.policies.data.stats.ConsumerStatsImpl;
import org.apache.pulsar.common.policies.data.stats.NonPersistentPublisherStatsImpl;
import org.apache.pulsar.common.policies.data.stats.NonPersistentReplicatorStatsImpl;
import org.apache.pulsar.common.policies.data.stats.NonPersistentSubscriptionStatsImpl;
import org.apache.pulsar.common.policies.data.stats.NonPersistentTopicStatsImpl;
import org.apache.pulsar.common.policies.data.stats.PublisherStatsImpl;
import org.apache.pulsar.common.policies.data.stats.SubscriptionStatsImpl;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.metadata.api.MetadataStoreException;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.utils.StatsOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NonPersistentTopic extends AbstractTopic implements Topic, TopicPolicyListener<TopicPolicies> {
// Subscriptions to this topic
private final ConcurrentOpenHashMap<String, NonPersistentSubscription> subscriptions;
private final ConcurrentOpenHashMap<String, NonPersistentReplicator> replicators;
// Ever increasing counter of entries added
private static final AtomicLongFieldUpdater<NonPersistentTopic> ENTRIES_ADDED_COUNTER_UPDATER =
AtomicLongFieldUpdater.newUpdater(NonPersistentTopic.class, "entriesAddedCounter");
private volatile long entriesAddedCounter = 0;
private final LongAdder bytesOutFromRemovedSubscriptions = new LongAdder();
private final LongAdder msgOutFromRemovedSubscriptions = new LongAdder();
private static final FastThreadLocal<TopicStats> threadLocalTopicStats = new FastThreadLocal<TopicStats>() {
@Override
protected TopicStats initialValue() {
return new TopicStats();
}
};
private static class TopicStats {
public double averageMsgSize;
public double aggMsgRateIn;
public double aggMsgThroughputIn;
public double aggMsgRateOut;
public double aggMsgThroughputOut;
public final ObjectObjectHashMap<String, PublisherStats> remotePublishersStats;
public TopicStats() {
remotePublishersStats = new ObjectObjectHashMap<>();
reset();
}
public void reset() {
averageMsgSize = 0;
aggMsgRateIn = 0;
aggMsgThroughputIn = 0;
aggMsgRateOut = 0;
aggMsgThroughputOut = 0;
remotePublishersStats.clear();
}
}
public NonPersistentTopic(String topic, BrokerService brokerService) {
super(topic, brokerService);
this.subscriptions = new ConcurrentOpenHashMap<>(16, 1);
this.replicators = new ConcurrentOpenHashMap<>(16, 1);
this.isFenced = false;
registerTopicPolicyListener();
}
public CompletableFuture<Void> initialize() {
return brokerService.pulsar().getPulsarResources().getNamespaceResources()
.getPoliciesAsync(TopicName.get(topic).getNamespaceObject())
.thenAccept(optPolicies -> {
if (!optPolicies.isPresent()) {
log.warn("[{}] Policies not present and isEncryptionRequired will be set to false", topic);
isEncryptionRequired = false;
} else {
Policies policies = optPolicies.get();
updateTopicPolicyByNamespacePolicy(policies);
isEncryptionRequired = policies.encryption_required;
isAllowAutoUpdateSchema = policies.is_allow_auto_update_schema;
schemaValidationEnforced = policies.schema_validation_enforced;
}
updatePublishDispatcher();
updateResourceGroupLimiter(optPolicies);
});
}
@Override
public void publishMessage(ByteBuf data, PublishContext callback) {
if (isExceedMaximumMessageSize(data.readableBytes(), callback)) {
callback.completed(new NotAllowedException("Exceed maximum message size")
, -1, -1);
return;
}
callback.completed(null, 0L, 0L);
ENTRIES_ADDED_COUNTER_UPDATER.incrementAndGet(this);
subscriptions.forEach((name, subscription) -> {
ByteBuf duplicateBuffer = data.retainedDuplicate();
Entry entry = create(0L, 0L, duplicateBuffer);
// entry internally retains data so, duplicateBuffer should be release here
duplicateBuffer.release();
if (subscription.getDispatcher() != null) {
subscription.getDispatcher().sendMessages(Collections.singletonList(entry));
} else {
// it happens when subscription is created but dispatcher is not created as consumer is not added
// yet
entry.release();
}
});
if (!replicators.isEmpty()) {
replicators.forEach((name, replicator) -> {
ByteBuf duplicateBuffer = data.retainedDuplicate();
Entry entry = create(0L, 0L, duplicateBuffer);
// entry internally retains data so, duplicateBuffer should be release here
duplicateBuffer.release();
replicator.sendMessage(entry);
});
}
}
protected CompletableFuture<Long> incrementTopicEpoch(Optional<Long> currentEpoch) {
// Non-persistent topic does not have any durable metadata, so we're just
// keeping the epoch in memory
return CompletableFuture.completedFuture(currentEpoch.orElse(-1L) + 1);
}
protected CompletableFuture<Long> setTopicEpoch(long newEpoch) {
// Non-persistent topic does not have any durable metadata, so we're just
// keeping the epoch in memory
return CompletableFuture.completedFuture(newEpoch);
}
@Override
public void checkMessageDeduplicationInfo() {
// No-op
}
@Override
public void removeProducer(Producer producer) {
checkArgument(producer.getTopic() == this);
if (producers.remove(producer.getProducerName(), producer)) {
handleProducerRemoved(producer);
}
}
@Override
public CompletableFuture<Void> checkIfTransactionBufferRecoverCompletely(boolean isTxnEnabled) {
return CompletableFuture.completedFuture(null);
}
@Override
public CompletableFuture<Consumer> subscribe(SubscriptionOption option) {
return internalSubscribe(option.getCnx(), option.getSubscriptionName(), option.getConsumerId(),
option.getSubType(), option.getPriorityLevel(), option.getConsumerName(),
option.isDurable(), option.getStartMessageId(), option.getMetadata(),
option.isReadCompacted(), option.getInitialPosition(),
option.getStartMessageRollbackDurationSec(), option.isReplicatedSubscriptionStateArg(),
option.getKeySharedMeta());
}
@Override
public CompletableFuture<Consumer> subscribe(final TransportCnx cnx, String subscriptionName, long consumerId,
SubType subType, int priorityLevel, String consumerName,
boolean isDurable, MessageId startMessageId,
Map<String, String> metadata, boolean readCompacted,
InitialPosition initialPosition,
long resetStartMessageBackInSec, boolean replicateSubscriptionState,
KeySharedMeta keySharedMeta) {
return internalSubscribe(cnx, subscriptionName, consumerId, subType, priorityLevel, consumerName,
isDurable, startMessageId, metadata, readCompacted, initialPosition, resetStartMessageBackInSec,
replicateSubscriptionState, keySharedMeta);
}
private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, String subscriptionName,
long consumerId, SubType subType, int priorityLevel,
String consumerName, boolean isDurable,
MessageId startMessageId, Map<String, String> metadata,
boolean readCompacted, InitialPosition initialPosition,
long resetStartMessageBackInSec,
boolean replicateSubscriptionState,
KeySharedMeta keySharedMeta) {
return brokerService.checkTopicNsOwnership(getName()).thenCompose(__ -> {
final CompletableFuture<Consumer> future = new CompletableFuture<>();
if (hasBatchMessagePublished && !cnx.isBatchMessageCompatibleVersion()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Consumer doesn't support batch-message {}", topic, subscriptionName);
}
future.completeExceptionally(new UnsupportedVersionException("Consumer doesn't support batch-message"));
return future;
}
if (subscriptionName.startsWith(replicatorPrefix)) {
log.warn("[{}] Failed to create subscription for {}", topic, subscriptionName);
future.completeExceptionally(
new NamingException("Subscription with reserved subscription name attempted"));
return future;
}
if (readCompacted) {
future.completeExceptionally(new NotAllowedException("readCompacted only valid on persistent topics"));
return future;
}
lock.readLock().lock();
try {
if (isFenced) {
log.warn("[{}] Attempting to subscribe to a fenced topic", topic);
future.completeExceptionally(new TopicFencedException("Topic is temporarily unavailable"));
return future;
}
handleConsumerAdded(subscriptionName, consumerName);
} finally {
lock.readLock().unlock();
}
NonPersistentSubscription subscription = subscriptions.computeIfAbsent(subscriptionName,
name -> new NonPersistentSubscription(this, subscriptionName, isDurable));
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName,
false, cnx, cnx.getAuthRole(), metadata, readCompacted, initialPosition, keySharedMeta,
MessageId.latest, DEFAULT_CONSUMER_EPOCH);
addConsumerToSubscription(subscription, consumer).thenRun(() -> {
if (!cnx.isActive()) {
try {
consumer.close();
} catch (BrokerServiceException e) {
if (e instanceof ConsumerBusyException) {
log.warn("[{}][{}] Consumer {} {} already connected", topic, subscriptionName, consumerId,
consumerName);
} else if (e instanceof SubscriptionBusyException) {
log.warn("[{}][{}] {}", topic, subscriptionName, e.getMessage());
}
decrementUsageCount();
future.completeExceptionally(e);
return;
}
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] [{}] Subscribe failed -- count: {}", topic, subscriptionName,
consumer.consumerName(), currentUsageCount());
}
future.completeExceptionally(
new BrokerServiceException("Connection was closed while the opening the cursor "));
} else {
log.info("[{}][{}] Created new subscription for {}", topic, subscriptionName, consumerId);
future.complete(consumer);
}
}).exceptionally(e -> {
Throwable throwable = e.getCause();
if (throwable instanceof ConsumerBusyException) {
log.warn("[{}][{}] Consumer {} {} already connected", topic, subscriptionName, consumerId,
consumerName);
} else if (throwable instanceof SubscriptionBusyException) {
log.warn("[{}][{}] {}", topic, subscriptionName, e.getMessage());
}
decrementUsageCount();
future.completeExceptionally(throwable);
return null;
});
return future;
});
}
@Override
public CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition,
boolean replicateSubscriptionState) {
return CompletableFuture.completedFuture(new NonPersistentSubscription(this, subscriptionName, true));
}
@Override
public CompletableFuture<Void> delete() {
return delete(false, false, false);
}
/**
* Forcefully close all producers/consumers/replicators and deletes the topic.
*
* @return
*/
@Override
public CompletableFuture<Void> deleteForcefully() {
return delete(false, true, false);
}
private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean closeIfClientsConnected,
boolean deleteSchema) {
CompletableFuture<Void> deleteFuture = new CompletableFuture<>();
lock.writeLock().lock();
try {
if (isFenced) {
log.warn("[{}] Topic is already being closed or deleted", topic);
deleteFuture.completeExceptionally(new TopicFencedException("Topic is already fenced"));
return deleteFuture;
}
CompletableFuture<Void> closeClientFuture = new CompletableFuture<>();
if (closeIfClientsConnected) {
List<CompletableFuture<Void>> futures = Lists.newArrayList();
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
producers.values().forEach(producer -> futures.add(producer.disconnect()));
subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
FutureUtil.waitForAll(futures).thenRun(() -> {
closeClientFuture.complete(null);
}).exceptionally(ex -> {
log.error("[{}] Error closing clients", topic, ex);
isFenced = false;
closeClientFuture.completeExceptionally(ex);
return null;
});
} else {
closeClientFuture.complete(null);
}
closeClientFuture.thenAccept(delete -> {
if (currentUsageCount() == 0) {
isFenced = true;
List<CompletableFuture<Void>> futures = Lists.newArrayList();
if (failIfHasSubscriptions) {
if (!subscriptions.isEmpty()) {
isFenced = false;
deleteFuture.completeExceptionally(new TopicBusyException("Topic has subscriptions"));
return;
}
} else {
subscriptions.forEach((s, sub) -> futures.add(sub.delete()));
}
if (deleteSchema) {
futures.add(deleteSchema().thenApply(schemaVersion -> null));
}
futures.add(deleteTopicPolicies());
FutureUtil.waitForAll(futures).whenComplete((v, ex) -> {
if (ex != null) {
log.error("[{}] Error deleting topic", topic, ex);
isFenced = false;
deleteFuture.completeExceptionally(ex);
} else {
// topic GC iterates over topics map and removing from the map with the same thread creates
// deadlock. so, execute it in different thread
brokerService.executor().execute(() -> {
brokerService.removeTopicFromCache(topic);
unregisterTopicPolicyListener();
log.info("[{}] Topic deleted", topic);
deleteFuture.complete(null);
});
}
});
} else {
deleteFuture.completeExceptionally(new TopicBusyException(
"Topic has " + currentUsageCount() + " connected producers/consumers"));
}
}).exceptionally(ex -> {
deleteFuture.completeExceptionally(
new TopicBusyException("Failed to close clients before deleting topic."));
return null;
});
} finally {
lock.writeLock().unlock();
}
return deleteFuture;
}
/**
* Close this topic - close all producers and subscriptions associated with this topic.
*
* @param closeWithoutWaitingClientDisconnect don't wait for client disconnect and forcefully close managed-ledger
* @return Completable future indicating completion of close operation
*/
@Override
public CompletableFuture<Void> close(boolean closeWithoutWaitingClientDisconnect) {
CompletableFuture<Void> closeFuture = new CompletableFuture<>();
lock.writeLock().lock();
try {
if (!isFenced || closeWithoutWaitingClientDisconnect) {
isFenced = true;
} else {
log.warn("[{}] Topic is already being closed or deleted", topic);
closeFuture.completeExceptionally(new TopicFencedException("Topic is already fenced"));
return closeFuture;
}
} finally {
lock.writeLock().unlock();
}
List<CompletableFuture<Void>> futures = Lists.newArrayList();
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
producers.values().forEach(producer -> futures.add(producer.disconnect()));
if (topicPublishRateLimiter != null) {
try {
topicPublishRateLimiter.close();
} catch (Exception e) {
log.warn("Error closing topicPublishRateLimiter for topic {}", topic, e);
}
}
subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
if (this.resourceGroupPublishLimiter != null) {
this.resourceGroupPublishLimiter.unregisterRateLimitFunction(this.getName());
}
CompletableFuture<Void> clientCloseFuture =
closeWithoutWaitingClientDisconnect ? CompletableFuture.completedFuture(null)
: FutureUtil.waitForAll(futures);
clientCloseFuture.thenRun(() -> {
log.info("[{}] Topic closed", topic);
// unload topic iterates over topics map and removing from the map with the same thread creates deadlock.
// so, execute it in different thread
brokerService.executor().execute(() -> {
brokerService.removeTopicFromCache(topic);
unregisterTopicPolicyListener();
closeFuture.complete(null);
});
}).exceptionally(exception -> {
log.error("[{}] Error closing topic", topic, exception);
isFenced = false;
closeFuture.completeExceptionally(exception);
return null;
});
return closeFuture;
}
public CompletableFuture<Void> stopReplProducers() {
List<CompletableFuture<Void>> closeFutures = Lists.newArrayList();
replicators.forEach((region, replicator) -> closeFutures.add(replicator.disconnect()));
return FutureUtil.waitForAll(closeFutures);
}
@Override
public CompletableFuture<Void> checkReplication() {
TopicName name = TopicName.get(topic);
if (!name.isGlobal()) {
return CompletableFuture.completedFuture(null);
}
NamespaceName heartbeatNamespace = brokerService.pulsar().getHeartbeatNamespaceV2();
if (name.getNamespaceObject().equals(heartbeatNamespace)) {
return CompletableFuture.completedFuture(null);
}
if (log.isDebugEnabled()) {
log.debug("[{}] Checking replication status", name);
}
Set<String> configuredClusters = new HashSet<>(topicPolicies.getReplicationClusters().get());
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
List<CompletableFuture<Void>> futures = Lists.newArrayList();
// Check for missing replicators
for (String cluster : configuredClusters) {
if (cluster.equals(localCluster)) {
continue;
}
if (!replicators.containsKey(cluster)) {
futures.add(startReplicator(cluster));
}
}
// Check for replicators to be stopped
replicators.forEach((cluster, replicator) -> {
if (!cluster.equals(localCluster)) {
if (!configuredClusters.contains(cluster)) {
futures.add(removeReplicator(cluster));
}
}
});
return FutureUtil.waitForAll(futures);
}
CompletableFuture<Void> startReplicator(String remoteCluster) {
log.info("[{}] Starting replicator to remote: {}", topic, remoteCluster);
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
return addReplicationCluster(remoteCluster, NonPersistentTopic.this, localCluster);
}
protected CompletableFuture<Void> addReplicationCluster(String remoteCluster, NonPersistentTopic nonPersistentTopic,
String localCluster) {
return AbstractReplicator.validatePartitionedTopicAsync(nonPersistentTopic.getName(), brokerService)
.thenCompose(__ -> brokerService.pulsar().getPulsarResources().getClusterResources()
.getClusterAsync(remoteCluster)
.thenApply(clusterData ->
brokerService.getReplicationClient(remoteCluster, clusterData)))
.thenAccept(replicationClient -> {
replicators.computeIfAbsent(remoteCluster, r -> {
try {
return new NonPersistentReplicator(NonPersistentTopic.this, localCluster,
remoteCluster, brokerService, (PulsarClientImpl) replicationClient);
} catch (PulsarServerException e) {
log.error("[{}] Replicator startup failed {}", topic, remoteCluster, e);
}
return null;
});
// clean up replicator if startup is failed
if (replicators.containsKey(remoteCluster) && replicators.get(remoteCluster) == null) {
replicators.remove(remoteCluster);
}
});
}
CompletableFuture<Void> removeReplicator(String remoteCluster) {
log.info("[{}] Removing replicator to {}", topic, remoteCluster);
final CompletableFuture<Void> future = new CompletableFuture<>();
String name = NonPersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster);
replicators.get(remoteCluster).disconnect().thenRun(() -> {
log.info("[{}] Successfully removed replicator {}", name, remoteCluster);
}).exceptionally(e -> {
log.error("[{}] Failed to close replication producer {} {}", topic, name, e.getMessage(), e);
future.completeExceptionally(e);
return null;
});
return future;
}
private CompletableFuture<Void> checkReplicationAndRetryOnFailure() {
CompletableFuture<Void> result = new CompletableFuture<Void>();
checkReplication().thenAccept(res -> {
log.info("[{}] Policies updated successfully", topic);
result.complete(null);
}).exceptionally(th -> {
log.error("[{}] Policies update failed {}, scheduled retry in {} seconds", topic, th.getMessage(),
POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS, th);
brokerService.executor().schedule(this::checkReplicationAndRetryOnFailure,
POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS, TimeUnit.SECONDS);
result.completeExceptionally(th);
return null;
});
return result;
}
@Override
public void checkMessageExpiry() {
// No-op
}
@Override
public int getNumberOfConsumers() {
int count = 0;
for (NonPersistentSubscription subscription : subscriptions.values()) {
count += subscription.getConsumers().size();
}
return count;
}
@Override
public int getNumberOfSameAddressConsumers(final String clientAddress) {
return getNumberOfSameAddressConsumers(clientAddress, subscriptions.values());
}
@Override
public ConcurrentOpenHashMap<String, NonPersistentSubscription> getSubscriptions() {
return subscriptions;
}
@Override
public ConcurrentOpenHashMap<String, NonPersistentReplicator> getReplicators() {
return replicators;
}
@Override
public Subscription getSubscription(String subscription) {
return subscriptions.get(subscription);
}
public Replicator getPersistentReplicator(String remoteCluster) {
return replicators.get(remoteCluster);
}
@Override
public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats,
StatsOutputStream topicStatsStream, ClusterReplicationMetrics replStats, String namespace,
boolean hydratePublishers) {
TopicStats topicStats = threadLocalTopicStats.get();
topicStats.reset();
replicators.forEach((region, replicator) -> replicator.updateRates());
nsStats.producerCount += producers.size();
bundleStats.producerCount += producers.size();
topicStatsStream.startObject(topic);
topicStatsStream.startList("publishers");
producers.values().forEach(producer -> {
producer.updateRates();
PublisherStatsImpl publisherStats = producer.getStats();
topicStats.aggMsgRateIn += publisherStats.msgRateIn;
topicStats.aggMsgThroughputIn += publisherStats.msgThroughputIn;
if (producer.isRemote()) {
topicStats.remotePublishersStats.put(producer.getRemoteCluster(), publisherStats);
}
if (hydratePublishers) {
StreamingStats.writePublisherStats(topicStatsStream, publisherStats);
}
});
topicStatsStream.endList();
// Start replicator stats
topicStatsStream.startObject("replication");
nsStats.replicatorCount += topicStats.remotePublishersStats.size();
// Close replication
topicStatsStream.endObject();
// Start subscription stats
topicStatsStream.startObject("subscriptions");
nsStats.subsCount += subscriptions.size();
subscriptions.forEach((subscriptionName, subscription) -> {
double subMsgRateOut = 0;
double subMsgThroughputOut = 0;
double subMsgRateRedeliver = 0;
// Start subscription name & consumers
try {
topicStatsStream.startObject(subscriptionName);
topicStatsStream.startList("consumers");
for (Consumer consumer : subscription.getConsumers()) {
++nsStats.consumerCount;
++bundleStats.consumerCount;
consumer.updateRates();
ConsumerStatsImpl consumerStats = consumer.getStats();
subMsgRateOut += consumerStats.msgRateOut;
subMsgThroughputOut += consumerStats.msgThroughputOut;
subMsgRateRedeliver += consumerStats.msgRateRedeliver;
// Populate consumer specific stats here
StreamingStats.writeConsumerStats(topicStatsStream, subscription.getType(), consumerStats);
}
// Close Consumer stats
topicStatsStream.endList();
// Populate subscription specific stats here
topicStatsStream.writePair("msgBacklog", subscription.getNumberOfEntriesInBacklog(false));
topicStatsStream.writePair("msgRateExpired", subscription.getExpiredMessageRate());
topicStatsStream.writePair("msgRateOut", subMsgRateOut);
topicStatsStream.writePair("msgThroughputOut", subMsgThroughputOut);
topicStatsStream.writePair("msgRateRedeliver", subMsgRateRedeliver);
topicStatsStream.writePair("type", subscription.getTypeString());
if (subscription.getDispatcher() != null) {
subscription.getDispatcher().getMessageDropRate().calculateRate();
topicStatsStream.writePair("msgDropRate",
subscription.getDispatcher().getMessageDropRate().getValueRate());
}
// Close consumers
topicStatsStream.endObject();
topicStats.aggMsgRateOut += subMsgRateOut;
topicStats.aggMsgThroughputOut += subMsgThroughputOut;
nsStats.msgBacklog += subscription.getNumberOfEntriesInBacklog(false);
} catch (Exception e) {
log.error("Got exception when creating consumer stats for subscription {}: {}", subscriptionName,
e.getMessage(), e);
}
});
// Close subscription
topicStatsStream.endObject();
// Remaining dest stats.
topicStats.averageMsgSize = topicStats.aggMsgRateIn == 0.0 ? 0.0
: (topicStats.aggMsgThroughputIn / topicStats.aggMsgRateIn);
topicStatsStream.writePair("producerCount", producers.size());
topicStatsStream.writePair("averageMsgSize", topicStats.averageMsgSize);
topicStatsStream.writePair("msgRateIn", topicStats.aggMsgRateIn);
topicStatsStream.writePair("msgRateOut", topicStats.aggMsgRateOut);
topicStatsStream.writePair("msgThroughputIn", topicStats.aggMsgThroughputIn);
topicStatsStream.writePair("msgThroughputOut", topicStats.aggMsgThroughputOut);
topicStatsStream.writePair("msgInCount", getMsgInCounter());
topicStatsStream.writePair("bytesInCount", getBytesInCounter());
topicStatsStream.writePair("msgOutCount", getMsgOutCounter());
topicStatsStream.writePair("bytesOutCount", getBytesOutCounter());
nsStats.msgRateIn += topicStats.aggMsgRateIn;
nsStats.msgRateOut += topicStats.aggMsgRateOut;
nsStats.msgThroughputIn += topicStats.aggMsgThroughputIn;
nsStats.msgThroughputOut += topicStats.aggMsgThroughputOut;
bundleStats.msgRateIn += topicStats.aggMsgRateIn;
bundleStats.msgRateOut += topicStats.aggMsgRateOut;
bundleStats.msgThroughputIn += topicStats.aggMsgThroughputIn;
bundleStats.msgThroughputOut += topicStats.aggMsgThroughputOut;
// add publish-latency metrics
this.addEntryLatencyStatsUsec.refresh();
NamespaceStats.add(this.addEntryLatencyStatsUsec.getBuckets(), nsStats.addLatencyBucket);
this.addEntryLatencyStatsUsec.reset();
// Close topic object
topicStatsStream.endObject();
}
@Override
public NonPersistentTopicStatsImpl getStats(boolean getPreciseBacklog, boolean subscriptionBacklogSize,
boolean getEarliestTimeInBacklog) {
try {
return asyncGetStats(getPreciseBacklog, subscriptionBacklogSize, getPreciseBacklog).get();
} catch (InterruptedException | ExecutionException e) {
log.error("[{}] Fail to get stats", topic, e);
return null;
}
}
@Override
public CompletableFuture<NonPersistentTopicStatsImpl> asyncGetStats(boolean getPreciseBacklog,
boolean subscriptionBacklogSize,
boolean getEarliestTimeInBacklog) {
CompletableFuture<NonPersistentTopicStatsImpl> future = new CompletableFuture<>();
NonPersistentTopicStatsImpl stats = new NonPersistentTopicStatsImpl();
ObjectObjectHashMap<String, PublisherStatsImpl> remotePublishersStats = new ObjectObjectHashMap<>();
producers.values().forEach(producer -> {
NonPersistentPublisherStatsImpl publisherStats = (NonPersistentPublisherStatsImpl) producer.getStats();
stats.msgRateIn += publisherStats.msgRateIn;
stats.msgThroughputIn += publisherStats.msgThroughputIn;
if (producer.isRemote()) {
remotePublishersStats.put(producer.getRemoteCluster(), publisherStats);
} else {
stats.addPublisher(publisherStats);
}
});
stats.averageMsgSize = stats.msgRateIn == 0.0 ? 0.0 : (stats.msgThroughputIn / stats.msgRateIn);
stats.msgInCounter = getMsgInCounter();
stats.bytesInCounter = getBytesInCounter();
stats.waitingPublishers = getWaitingProducersCount();
stats.bytesOutCounter = bytesOutFromRemovedSubscriptions.longValue();
stats.msgOutCounter = msgOutFromRemovedSubscriptions.longValue();
subscriptions.forEach((name, subscription) -> {
NonPersistentSubscriptionStatsImpl subStats = subscription.getStats();
stats.msgRateOut += subStats.msgRateOut;
stats.msgThroughputOut += subStats.msgThroughputOut;
stats.bytesOutCounter += subStats.bytesOutCounter;
stats.msgOutCounter += subStats.msgOutCounter;
stats.getSubscriptions().put(name, subStats);
});
replicators.forEach((cluster, replicator) -> {
NonPersistentReplicatorStatsImpl replicatorStats = replicator.getStats();
// Add incoming msg rates
PublisherStatsImpl pubStats = remotePublishersStats.get(replicator.getRemoteCluster());
if (pubStats != null) {
replicatorStats.msgRateIn = pubStats.msgRateIn;
replicatorStats.msgThroughputIn = pubStats.msgThroughputIn;
replicatorStats.inboundConnection = pubStats.getAddress();
replicatorStats.inboundConnectedSince = pubStats.getConnectedSince();
}
stats.msgRateOut += replicatorStats.msgRateOut;
stats.msgThroughputOut += replicatorStats.msgThroughputOut;
stats.getReplication().put(replicator.getRemoteCluster(), replicatorStats);
});
stats.topicEpoch = topicEpoch.orElse(null);
future.complete(stats);
return future;
}
@Override
public CompletableFuture<PersistentTopicInternalStats> getInternalStats(boolean includeLedgerMetadata) {
PersistentTopicInternalStats stats = new PersistentTopicInternalStats();
stats.entriesAddedCounter = ENTRIES_ADDED_COUNTER_UPDATER.get(this);
stats.cursors = Maps.newTreeMap();
subscriptions.forEach((name, subs) -> stats.cursors.put(name, new CursorStats()));
replicators.forEach((name, subs) -> stats.cursors.put(name, new CursorStats()));
return CompletableFuture.completedFuture(stats);
}
public boolean isActive() {
if (TopicName.get(topic).isGlobal()) {
// No local consumers and no local producers
return !subscriptions.isEmpty() || hasLocalProducers();
}
return currentUsageCount() != 0 || !subscriptions.isEmpty();
}
@Override
public void checkGC() {
if (!isDeleteWhileInactive()) {
// This topic is not included in GC
return;
}
int maxInactiveDurationInSec = topicPolicies.getInactiveTopicPolicies().get().getMaxInactiveDurationSeconds();
if (isActive()) {
lastActive = System.nanoTime();
} else {
if (System.nanoTime() - lastActive > TimeUnit.SECONDS.toNanos(maxInactiveDurationInSec)) {
if (TopicName.get(topic).isGlobal()) {
// For global namespace, close repl producers first.
// Once all repl producers are closed, we can delete the topic,
// provided no remote producers connected to the broker.
if (log.isDebugEnabled()) {
log.debug("[{}] Global topic inactive for {} seconds, closing repl producers.", topic,
maxInactiveDurationInSec);
}
stopReplProducers().thenCompose(v -> delete(true, false, true))
.thenCompose(__ -> tryToDeletePartitionedMetadata())
.thenRun(() -> log.info("[{}] Topic deleted successfully due to inactivity", topic))
.exceptionally(e -> {
Throwable throwable = e.getCause();
if (throwable instanceof TopicBusyException) {
// topic became active again
if (log.isDebugEnabled()) {
log.debug("[{}] Did not delete busy topic: {}", topic,
throwable.getMessage());
}
replicators.forEach((region, replicator) -> replicator.startProducer());
} else {
log.warn("[{}] Inactive topic deletion failed", topic, e);
}
return null;
});
}
}
}
}
private CompletableFuture<Void> tryToDeletePartitionedMetadata() {
if (TopicName.get(topic).isPartitioned() && !deletePartitionedTopicMetadataWhileInactive()) {
return CompletableFuture.completedFuture(null);
}
TopicName topicName = TopicName.get(TopicName.get(topic).getPartitionedTopicName());
NamespaceResources.PartitionedTopicResources partitionedTopicResources = brokerService.pulsar()
.getPulsarResources().getNamespaceResources().getPartitionedTopicResources();
return partitionedTopicResources.partitionedTopicExistsAsync(topicName)
.thenCompose(partitionedTopicExist -> {
if (!partitionedTopicExist) {
return CompletableFuture.completedFuture(null);
} else {
return partitionedTopicResources.deletePartitionedTopicAsync(topicName);
}
});
}
@Override
public void checkInactiveSubscriptions() {
TopicName name = TopicName.get(topic);
try {
Policies policies = brokerService.pulsar().getPulsarResources().getNamespaceResources()
.getPolicies(name.getNamespaceObject())
.orElseThrow(MetadataStoreException.NotFoundException::new);
final int defaultExpirationTime = brokerService.pulsar().getConfiguration()
.getSubscriptionExpirationTimeMinutes();
final Integer nsExpirationTime = policies.subscription_expiration_time_minutes;
final long expirationTimeMillis = TimeUnit.MINUTES
.toMillis(nsExpirationTime == null ? defaultExpirationTime : nsExpirationTime);
if (expirationTimeMillis > 0) {
subscriptions.forEach((subName, sub) -> {
if (sub.getDispatcher() != null
&& sub.getDispatcher().isConsumerConnected() || sub.isReplicated()) {
return;
}
if (System.currentTimeMillis() - sub.getLastActive() > expirationTimeMillis) {
sub.delete().thenAccept(v -> log.info("[{}][{}] The subscription was deleted due to expiration",
topic, subName));
}
});
}
} catch (Exception e) {
if (log.isDebugEnabled()) {
log.debug("[{}] Error getting policies", topic);
}
}
}
@Override
public void checkBackloggedCursors() {
// no-op
}
@Override
public void checkDeduplicationSnapshot() {
// no-op
}
@Override
public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
if (log.isDebugEnabled()) {
log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic, isEncryptionRequired,
data.encryption_required);
}
updateTopicPolicyByNamespacePolicy(data);
isEncryptionRequired = data.encryption_required;
isAllowAutoUpdateSchema = data.is_allow_auto_update_schema;
schemaValidationEnforced = data.schema_validation_enforced;
List<CompletableFuture<Void>> producerCheckFutures = new ArrayList<>(producers.size());
producers.values().forEach(producer -> producerCheckFutures.add(
producer.checkPermissionsAsync().thenRun(producer::checkEncryption)));
return FutureUtil.waitForAll(producerCheckFutures).thenCompose((__) -> {
List<CompletableFuture<Void>> consumerCheckFutures = new ArrayList<>();
subscriptions.forEach((subName, sub) -> sub.getConsumers().forEach(consumer -> {
consumerCheckFutures.add(consumer.checkPermissionsAsync());
}));
return FutureUtil.waitForAll(consumerCheckFutures)
.thenCompose((___) -> checkReplicationAndRetryOnFailure());
});
}
@Override
public void onUpdate(TopicPolicies data) {
if (data == null) {
return;
}
updateTopicPolicy(data);
}
/**
*
* @return Backlog quota for topic
*/
@Override
public BacklogQuota getBacklogQuota(BacklogQuotaType backlogQuotaType) {
// No-op
throw new UnsupportedOperationException("getBacklogQuota method is not supported on non-persistent topic");
}
/**
*
* @return quota exceeded status for blocking producer creation
*/
@Override
public CompletableFuture<Void> checkBacklogQuotaExceeded(String producerName, BacklogQuotaType backlogQuotaType) {
// No-op
return CompletableFuture.completedFuture(null);
}
@Override
public boolean isReplicated() {
return replicators.size() > 1;
}
@Override
public CompletableFuture<Void> unsubscribe(String subscriptionName) {
// checkInactiveSubscriptions iterates over subscriptions map and removing from the map with the same thread.
// That creates deadlock. so, execute remove it in different thread.
return CompletableFuture.runAsync(() -> {
NonPersistentSubscription sub = subscriptions.remove(subscriptionName);
if (sub != null) {
// preserve accumulative stats form removed subscription
SubscriptionStatsImpl stats = sub.getStats();
bytesOutFromRemovedSubscriptions.add(stats.bytesOutCounter);
msgOutFromRemovedSubscriptions.add(stats.msgOutCounter);
}
}, brokerService.executor());
}
@Override
public Position getLastPosition() {
throw new UnsupportedOperationException("getLastPosition is not supported on non-persistent topic");
}
@Override
public CompletableFuture<MessageId> getLastMessageId() {
throw new UnsupportedOperationException("getLastMessageId is not supported on non-persistent topic");
}
private static final Logger log = LoggerFactory.getLogger(NonPersistentTopic.class);
@Override
public CompletableFuture<Void> addSchemaIfIdleOrCheckCompatible(SchemaData schema) {
return hasSchema().thenCompose((hasSchema) -> {
int numActiveConsumers = subscriptions.values().stream()
.mapToInt(subscription -> subscription.getConsumers().size())
.sum();
if (hasSchema
|| (!producers.isEmpty())
|| (numActiveConsumers != 0)
|| ENTRIES_ADDED_COUNTER_UPDATER.get(this) != 0) {
return checkSchemaCompatibleForConsumer(schema);
} else {
return addSchema(schema).thenCompose(schemaVersion -> CompletableFuture.completedFuture(null));
}
});
}
@Override
public void publishTxnMessage(TxnID txnID, ByteBuf headersAndPayload, PublishContext publishContext) {
throw new UnsupportedOperationException("PublishTxnMessage is not supported by non-persistent topic");
}
@Override
public CompletableFuture<Void> endTxn(TxnID txnID, int txnAction, long lowWaterMark) {
return FutureUtil.failedFuture(
new Exception("Unsupported operation endTxn in non-persistent topic."));
}
@Override
public CompletableFuture<Void> truncate() {
return FutureUtil.failedFuture(new NotAllowedException("Unsupported truncate"));
}
protected boolean isTerminated() {
return false;
}
@Override
public boolean isPersistent() {
return false;
}
}