blob: b0cbf1c442ae27ec4510faa26d0d8fbf17d20d83 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service.nonpersistent;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.bookkeeper.mledger.impl.EntryCacheManager.create;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import com.carrotsearch.hppc.ObjectObjectHashMap;
import com.google.common.base.MoreObjects;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import io.netty.buffer.ByteBuf;
import io.netty.util.concurrent.FastThreadLocal;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.bookkeeper.common.util.OrderedExecutor;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.Position;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.BrokerService;
import org.apache.pulsar.broker.service.BrokerServiceException;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.NamingException;
import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException;
import org.apache.pulsar.broker.service.BrokerServiceException.ProducerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException;
import org.apache.pulsar.broker.service.BrokerServiceException.SubscriptionBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicFencedException;
import org.apache.pulsar.broker.service.BrokerServiceException.UnsupportedVersionException;
import org.apache.pulsar.broker.service.Consumer;
import org.apache.pulsar.broker.service.Producer;
import org.apache.pulsar.broker.service.Replicator;
import org.apache.pulsar.broker.service.ServerCnx;
import org.apache.pulsar.broker.service.StreamingStats;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.schema.SchemaCompatibilityStrategy;
import org.apache.pulsar.broker.stats.ClusterReplicationMetrics;
import org.apache.pulsar.broker.stats.NamespaceStats;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.api.proto.PulsarApi.CommandSubscribe.SubType;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.BacklogQuota;
import org.apache.pulsar.common.policies.data.ConsumerStats;
import org.apache.pulsar.common.policies.data.NonPersistentPublisherStats;
import org.apache.pulsar.common.policies.data.NonPersistentReplicatorStats;
import org.apache.pulsar.common.policies.data.NonPersistentSubscriptionStats;
import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats.CursorStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublisherStats;
import org.apache.pulsar.common.schema.SchemaData;
import org.apache.pulsar.common.schema.SchemaVersion;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap;
import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet;
import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats;
import org.apache.pulsar.utils.StatsOutputStream;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class NonPersistentTopic implements Topic {
private final String topic;
// Producers currently connected to this topic
private final ConcurrentOpenHashSet<Producer> producers;
// Subscriptions to this topic
private final ConcurrentOpenHashMap<String, NonPersistentSubscription> subscriptions;
private final ConcurrentOpenHashMap<String, NonPersistentReplicator> replicators;
private final BrokerService brokerService;
private volatile boolean isFenced;
// Prefix for replication cursors
public final String replicatorPrefix;
protected static final AtomicLongFieldUpdater<NonPersistentTopic> USAGE_COUNT_UPDATER = AtomicLongFieldUpdater
.newUpdater(NonPersistentTopic.class, "usageCount");
private volatile long usageCount = 0;
private final OrderedExecutor executor;
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
// Timestamp of when this topic was last seen active
private volatile long lastActive;
// Flag to signal that producer of this topic has published batch-message so, broker should not allow consumer which
// doesn't support batch-message
private volatile boolean hasBatchMessagePublished = false;
// Ever increasing counter of entries added
static final AtomicLongFieldUpdater<NonPersistentTopic> ENTRIES_ADDED_COUNTER_UPDATER = AtomicLongFieldUpdater
.newUpdater(NonPersistentTopic.class, "entriesAddedCounter");
private volatile long entriesAddedCounter = 0;
private static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60;
private static final FastThreadLocal<TopicStats> threadLocalTopicStats = new FastThreadLocal<TopicStats>() {
@Override
protected TopicStats initialValue() {
return new TopicStats();
}
};
// Whether messages published must be encrypted or not in this topic
private volatile boolean isEncryptionRequired = false;
private volatile SchemaCompatibilityStrategy schemaCompatibilityStrategy =
SchemaCompatibilityStrategy.FULL;
private static class TopicStats {
public double averageMsgSize;
public double aggMsgRateIn;
public double aggMsgThroughputIn;
public double aggMsgRateOut;
public double aggMsgThroughputOut;
public final ObjectObjectHashMap<String, PublisherStats> remotePublishersStats;
public TopicStats() {
remotePublishersStats = new ObjectObjectHashMap<String, PublisherStats>();
reset();
}
public void reset() {
averageMsgSize = 0;
aggMsgRateIn = 0;
aggMsgThroughputIn = 0;
aggMsgRateOut = 0;
aggMsgThroughputOut = 0;
remotePublishersStats.clear();
}
}
public NonPersistentTopic(String topic, BrokerService brokerService) {
this.topic = topic;
this.brokerService = brokerService;
this.producers = new ConcurrentOpenHashSet<Producer>(16, 1);
this.subscriptions = new ConcurrentOpenHashMap<>(16, 1);
this.replicators = new ConcurrentOpenHashMap<>(16, 1);
this.isFenced = false;
this.replicatorPrefix = brokerService.pulsar().getConfiguration().getReplicatorPrefix();
this.executor = brokerService.getTopicOrderedExecutor();
USAGE_COUNT_UPDATER.set(this, 0);
this.lastActive = System.nanoTime();
try {
Policies policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()))
.orElseThrow(() -> new KeeperException.NoNodeException());
isEncryptionRequired = policies.encryption_required;
schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
policies.schema_auto_update_compatibility_strategy);
} catch (Exception e) {
log.warn("[{}] Error getting policies {} and isEncryptionRequired will be set to false", topic, e.getMessage());
isEncryptionRequired = false;
}
}
@Override
public void publishMessage(ByteBuf data, PublishContext callback) {
callback.completed(null, 0L, 0L);
ENTRIES_ADDED_COUNTER_UPDATER.incrementAndGet(this);
subscriptions.forEach((name, subscription) -> {
ByteBuf duplicateBuffer = data.retainedDuplicate();
Entry entry = create(0L, 0L, duplicateBuffer);
// entry internally retains data so, duplicateBuffer should be release here
duplicateBuffer.release();
if (subscription.getDispatcher() != null) {
subscription.getDispatcher().sendMessages(Collections.singletonList(entry));
} else {
// it happens when subscription is created but dispatcher is not created as consumer is not added
// yet
entry.release();
}
});
if (!replicators.isEmpty()) {
replicators.forEach((name, replicator) -> {
ByteBuf duplicateBuffer = data.retainedDuplicate();
Entry entry = create(0L, 0L, duplicateBuffer);
// entry internally retains data so, duplicateBuffer should be release here
duplicateBuffer.release();
((NonPersistentReplicator) replicator).sendMessage(entry);
});
}
}
@Override
public void addProducer(Producer producer) throws BrokerServiceException {
checkArgument(producer.getTopic() == this);
lock.readLock().lock();
try {
brokerService.checkTopicNsOwnership(getName());
if (isFenced) {
log.warn("[{}] Attempting to add producer to a fenced topic", topic);
throw new TopicFencedException("Topic is temporarily unavailable");
}
if (isProducersExceeded()) {
log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic);
throw new ProducerBusyException("Topic reached max producers limit");
}
if (log.isDebugEnabled()) {
log.debug("[{}] {} Got request to create producer ", topic, producer.getProducerName());
}
if (!producers.add(producer)) {
throw new NamingException(
"Producer with name '" + producer.getProducerName() + "' is already connected to topic");
}
USAGE_COUNT_UPDATER.incrementAndGet(this);
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Added producer -- count: {}", topic, producer.getProducerName(),
USAGE_COUNT_UPDATER.get(this));
}
} finally {
lock.readLock().unlock();
}
}
private boolean isProducersExceeded() {
Policies policies;
try {
policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()))
.orElseGet(() -> new Policies());
} catch (Exception e) {
policies = new Policies();
}
final int maxProducers = policies.max_producers_per_topic > 0 ?
policies.max_producers_per_topic :
brokerService.pulsar().getConfiguration().getMaxProducersPerTopic();
if (maxProducers > 0 && maxProducers <= producers.size()) {
return true;
}
return false;
}
@Override
public void checkMessageDeduplicationInfo() {
// No-op
}
private boolean hasLocalProducers() {
AtomicBoolean foundLocal = new AtomicBoolean(false);
producers.forEach(producer -> {
if (!producer.isRemote()) {
foundLocal.set(true);
}
});
return foundLocal.get();
}
@Override
public void removeProducer(Producer producer) {
checkArgument(producer.getTopic() == this);
if (producers.remove(producer)) {
// decrement usage only if this was a valid producer close
USAGE_COUNT_UPDATER.decrementAndGet(this);
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Removed producer -- count: {}", topic, producer.getProducerName(),
USAGE_COUNT_UPDATER.get(this));
}
lastActive = System.nanoTime();
}
}
@Override
public CompletableFuture<Consumer> subscribe(final ServerCnx cnx, String subscriptionName, long consumerId,
SubType subType, int priorityLevel, String consumerName, boolean isDurable, MessageId startMessageId,
Map<String, String> metadata, boolean readCompacted, InitialPosition initialPosition) {
final CompletableFuture<Consumer> future = new CompletableFuture<>();
try {
brokerService.checkTopicNsOwnership(getName());
} catch (Exception e) {
future.completeExceptionally(e);
return future;
}
if (hasBatchMessagePublished && !cnx.isBatchMessageCompatibleVersion()) {
if (log.isDebugEnabled()) {
log.debug("[{}] Consumer doesn't support batch-message {}", topic, subscriptionName);
}
future.completeExceptionally(new UnsupportedVersionException("Consumer doesn't support batch-message"));
return future;
}
if (subscriptionName.startsWith(replicatorPrefix)) {
log.warn("[{}] Failed to create subscription for {}", topic, subscriptionName);
future.completeExceptionally(new NamingException("Subscription with reserved subscription name attempted"));
return future;
}
if (readCompacted) {
future.completeExceptionally(new NotAllowedException("readCompacted only valid on persistent topics"));
return future;
}
lock.readLock().lock();
try {
if (isFenced) {
log.warn("[{}] Attempting to subscribe to a fenced topic", topic);
future.completeExceptionally(new TopicFencedException("Topic is temporarily unavailable"));
return future;
}
USAGE_COUNT_UPDATER.incrementAndGet(this);
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] [{}] Added consumer -- count: {}", topic, subscriptionName, consumerName,
USAGE_COUNT_UPDATER.get(this));
}
} finally {
lock.readLock().unlock();
}
NonPersistentSubscription subscription = subscriptions.computeIfAbsent(subscriptionName,
name -> new NonPersistentSubscription(this, subscriptionName));
try {
Consumer consumer = new Consumer(subscription, subType, topic, consumerId, priorityLevel, consumerName, 0, cnx,
cnx.getRole(), metadata, readCompacted, initialPosition);
subscription.addConsumer(consumer);
if (!cnx.isActive()) {
consumer.close();
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] [{}] Subscribe failed -- count: {}", topic, subscriptionName,
consumer.consumerName(), USAGE_COUNT_UPDATER.get(NonPersistentTopic.this));
}
future.completeExceptionally(
new BrokerServiceException("Connection was closed while the opening the cursor "));
} else {
log.info("[{}][{}] Created new subscription for {}", topic, subscriptionName, consumerId);
future.complete(consumer);
}
} catch (BrokerServiceException e) {
if (e instanceof ConsumerBusyException) {
log.warn("[{}][{}] Consumer {} {} already connected", topic, subscriptionName, consumerId,
consumerName);
} else if (e instanceof SubscriptionBusyException) {
log.warn("[{}][{}] {}", topic, subscriptionName, e.getMessage());
}
USAGE_COUNT_UPDATER.decrementAndGet(NonPersistentTopic.this);
future.completeExceptionally(e);
}
return future;
}
@Override
public CompletableFuture<Subscription> createSubscription(String subscriptionName, InitialPosition initialPosition) {
return CompletableFuture.completedFuture(new NonPersistentSubscription(this, subscriptionName));
}
void removeSubscription(String subscriptionName) {
subscriptions.remove(subscriptionName);
}
@Override
public CompletableFuture<Void> delete() {
return delete(false, false);
}
/**
* Forcefully close all producers/consumers/replicators and deletes the topic.
*
* @return
*/
@Override
public CompletableFuture<Void> deleteForcefully() {
return delete(false, true);
}
private CompletableFuture<Void> delete(boolean failIfHasSubscriptions, boolean closeIfClientsConnected) {
CompletableFuture<Void> deleteFuture = new CompletableFuture<>();
lock.writeLock().lock();
try {
if (isFenced) {
log.warn("[{}] Topic is already being closed or deleted", topic);
deleteFuture.completeExceptionally(new TopicFencedException("Topic is already fenced"));
return deleteFuture;
}
CompletableFuture<Void> closeClientFuture = new CompletableFuture<>();
if (closeIfClientsConnected) {
List<CompletableFuture<Void>> futures = Lists.newArrayList();
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
producers.forEach(producer -> futures.add(producer.disconnect()));
subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
FutureUtil.waitForAll(futures).thenRun(() -> {
closeClientFuture.complete(null);
}).exceptionally(ex -> {
log.error("[{}] Error closing clients", topic, ex);
isFenced = false;
closeClientFuture.completeExceptionally(ex);
return null;
});
} else {
closeClientFuture.complete(null);
}
closeClientFuture.thenAccept(delete -> {
if (USAGE_COUNT_UPDATER.get(this) == 0) {
isFenced = true;
List<CompletableFuture<Void>> futures = Lists.newArrayList();
if (failIfHasSubscriptions) {
if (!subscriptions.isEmpty()) {
isFenced = false;
deleteFuture.completeExceptionally(new TopicBusyException("Topic has subscriptions"));
return;
}
} else {
subscriptions.forEach((s, sub) -> futures.add(sub.delete()));
}
FutureUtil.waitForAll(futures).whenComplete((v, ex) -> {
if (ex != null) {
log.error("[{}] Error deleting topic", topic, ex);
isFenced = false;
deleteFuture.completeExceptionally(ex);
} else {
// topic GC iterates over topics map and removing from the map with the same thread creates
// deadlock. so, execute it in different thread
brokerService.executor().execute(() -> {
brokerService.removeTopicFromCache(topic);
log.info("[{}] Topic deleted", topic);
deleteFuture.complete(null);
});
}
});
} else {
deleteFuture.completeExceptionally(new TopicBusyException(
"Topic has " + USAGE_COUNT_UPDATER.get(this) + " connected producers/consumers"));
}
}).exceptionally(ex -> {
deleteFuture.completeExceptionally(
new TopicBusyException("Failed to close clients before deleting topic."));
return null;
});
} finally {
lock.writeLock().unlock();
}
return deleteFuture;
}
/**
* Close this topic - close all producers and subscriptions associated with this topic
*
* @return Completable future indicating completion of close operation
*/
@Override
public CompletableFuture<Void> close() {
CompletableFuture<Void> closeFuture = new CompletableFuture<>();
lock.writeLock().lock();
try {
if (!isFenced) {
isFenced = true;
} else {
log.warn("[{}] Topic is already being closed or deleted", topic);
closeFuture.completeExceptionally(new TopicFencedException("Topic is already fenced"));
return closeFuture;
}
} finally {
lock.writeLock().unlock();
}
List<CompletableFuture<Void>> futures = Lists.newArrayList();
replicators.forEach((cluster, replicator) -> futures.add(replicator.disconnect()));
producers.forEach(producer -> futures.add(producer.disconnect()));
subscriptions.forEach((s, sub) -> futures.add(sub.disconnect()));
FutureUtil.waitForAll(futures).thenRun(() -> {
log.info("[{}] Topic closed", topic);
// unload topic iterates over topics map and removing from the map with the same thread creates deadlock.
// so, execute it in different thread
brokerService.executor().execute(() -> {
brokerService.removeTopicFromCache(topic);
closeFuture.complete(null);
});
}).exceptionally(exception -> {
log.error("[{}] Error closing topic", topic, exception);
isFenced = false;
closeFuture.completeExceptionally(exception);
return null;
});
return closeFuture;
}
public CompletableFuture<Void> stopReplProducers() {
List<CompletableFuture<Void>> closeFutures = Lists.newArrayList();
replicators.forEach((region, replicator) -> closeFutures.add(replicator.disconnect()));
return FutureUtil.waitForAll(closeFutures);
}
@Override
public CompletableFuture<Void> checkReplication() {
TopicName name = TopicName.get(topic);
if (!name.isGlobal()) {
return CompletableFuture.completedFuture(null);
}
if (log.isDebugEnabled()) {
log.debug("[{}] Checking replication status", name);
}
Policies policies = null;
try {
policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, name.getNamespace()))
.orElseThrow(() -> new KeeperException.NoNodeException());
} catch (Exception e) {
CompletableFuture<Void> future = new CompletableFuture<>();
future.completeExceptionally(new ServerMetadataException(e));
return future;
}
Set<String> configuredClusters;
if (policies.replication_clusters != null) {
configuredClusters = policies.replication_clusters;
} else {
configuredClusters = Collections.emptySet();
}
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
List<CompletableFuture<Void>> futures = Lists.newArrayList();
// Check for missing replicators
for (String cluster : configuredClusters) {
if (cluster.equals(localCluster)) {
continue;
}
if (!replicators.containsKey(cluster)) {
if (!startReplicator(cluster)) {
// it happens when global topic is a partitioned topic and replicator can't start on original
// non partitioned-topic (topic without partition prefix)
return FutureUtil
.failedFuture(new NamingException(topic + " failed to start replicator for " + cluster));
}
}
}
// Check for replicators to be stopped
replicators.forEach((cluster, replicator) -> {
if (!cluster.equals(localCluster)) {
if (!configuredClusters.contains(cluster)) {
futures.add(removeReplicator(cluster));
}
}
});
return FutureUtil.waitForAll(futures);
}
boolean startReplicator(String remoteCluster) {
log.info("[{}] Starting replicator to remote: {}", topic, remoteCluster);
String localCluster = brokerService.pulsar().getConfiguration().getClusterName();
return addReplicationCluster(remoteCluster,NonPersistentTopic.this, localCluster);
}
protected boolean addReplicationCluster(String remoteCluster, NonPersistentTopic nonPersistentTopic, String localCluster) {
AtomicBoolean isReplicatorStarted = new AtomicBoolean(true);
replicators.computeIfAbsent(remoteCluster, r -> {
try {
return new NonPersistentReplicator(NonPersistentTopic.this, localCluster, remoteCluster, brokerService);
} catch (NamingException e) {
isReplicatorStarted.set(false);
log.error("[{}] Replicator startup failed due to partitioned-topic {}", topic, remoteCluster);
}
return null;
});
// clean up replicator if startup is failed
if (!isReplicatorStarted.get()) {
replicators.remove(remoteCluster);
}
return isReplicatorStarted.get();
}
CompletableFuture<Void> removeReplicator(String remoteCluster) {
log.info("[{}] Removing replicator to {}", topic, remoteCluster);
final CompletableFuture<Void> future = new CompletableFuture<>();
String name = NonPersistentReplicator.getReplicatorName(replicatorPrefix, remoteCluster);
replicators.get(remoteCluster).disconnect().thenRun(() -> {
log.info("[{}] Successfully removed replicator {}", name, remoteCluster);
}).exceptionally(e -> {
log.error("[{}] Failed to close replication producer {} {}", topic, name, e.getMessage(), e);
future.completeExceptionally(e);
return null;
});
return future;
}
private CompletableFuture<Void> checkReplicationAndRetryOnFailure() {
CompletableFuture<Void> result = new CompletableFuture<Void>();
checkReplication().thenAccept(res -> {
log.info("[{}] Policies updated successfully", topic);
result.complete(null);
}).exceptionally(th -> {
log.error("[{}] Policies update failed {}, scheduled retry in {} seconds", topic, th.getMessage(),
POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS, th);
brokerService.executor().schedule(this::checkReplicationAndRetryOnFailure,
POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS, TimeUnit.SECONDS);
result.completeExceptionally(th);
return null;
});
return result;
}
@Override
public void checkMessageExpiry() {
// No-op
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("topic", topic).toString();
}
@Override
public ConcurrentOpenHashSet<Producer> getProducers() {
return producers;
}
public int getNumberOfConsumers() {
int count = 0;
for (NonPersistentSubscription subscription : subscriptions.values()) {
count += subscription.getConsumers().size();
}
return count;
}
@Override
public ConcurrentOpenHashMap<String, NonPersistentSubscription> getSubscriptions() {
return subscriptions;
}
@Override
public ConcurrentOpenHashMap<String, NonPersistentReplicator> getReplicators() {
return replicators;
}
@Override
public Subscription getSubscription(String subscription) {
return subscriptions.get(subscription);
}
public Replicator getPersistentReplicator(String remoteCluster) {
return replicators.get(remoteCluster);
}
public BrokerService getBrokerService() {
return brokerService;
}
@Override
public String getName() {
return topic;
}
public void updateRates(NamespaceStats nsStats, NamespaceBundleStats bundleStats, StatsOutputStream topicStatsStream,
ClusterReplicationMetrics replStats, String namespace, boolean hydratePublishers) {
TopicStats topicStats = threadLocalTopicStats.get();
topicStats.reset();
replicators.forEach((region, replicator) -> replicator.updateRates());
nsStats.producerCount += producers.size();
bundleStats.producerCount += producers.size();
topicStatsStream.startObject(topic);
topicStatsStream.startList("publishers");
producers.forEach(producer -> {
producer.updateRates();
PublisherStats publisherStats = producer.getStats();
topicStats.aggMsgRateIn += publisherStats.msgRateIn;
topicStats.aggMsgThroughputIn += publisherStats.msgThroughputIn;
if (producer.isRemote()) {
topicStats.remotePublishersStats.put(producer.getRemoteCluster(), publisherStats);
}
if (hydratePublishers) {
StreamingStats.writePublisherStats(topicStatsStream, publisherStats);
}
});
topicStatsStream.endList();
// Start replicator stats
topicStatsStream.startObject("replication");
nsStats.replicatorCount += topicStats.remotePublishersStats.size();
// Close replication
topicStatsStream.endObject();
// Start subscription stats
topicStatsStream.startObject("subscriptions");
nsStats.subsCount += subscriptions.size();
subscriptions.forEach((subscriptionName, subscription) -> {
double subMsgRateOut = 0;
double subMsgThroughputOut = 0;
double subMsgRateRedeliver = 0;
// Start subscription name & consumers
try {
topicStatsStream.startObject(subscriptionName);
Object[] consumers = subscription.getConsumers().array();
nsStats.consumerCount += consumers.length;
bundleStats.consumerCount += consumers.length;
topicStatsStream.startList("consumers");
subscription.getDispatcher().getMesssageDropRate().calculateRate();
for (Object consumerObj : consumers) {
Consumer consumer = (Consumer) consumerObj;
consumer.updateRates();
ConsumerStats consumerStats = consumer.getStats();
subMsgRateOut += consumerStats.msgRateOut;
subMsgThroughputOut += consumerStats.msgThroughputOut;
subMsgRateRedeliver += consumerStats.msgRateRedeliver;
// Populate consumer specific stats here
StreamingStats.writeConsumerStats(topicStatsStream, subscription.getType(), consumerStats);
}
// Close Consumer stats
topicStatsStream.endList();
// Populate subscription specific stats here
topicStatsStream.writePair("msgBacklog", subscription.getNumberOfEntriesInBacklog());
topicStatsStream.writePair("msgRateExpired", subscription.getExpiredMessageRate());
topicStatsStream.writePair("msgRateOut", subMsgRateOut);
topicStatsStream.writePair("msgThroughputOut", subMsgThroughputOut);
topicStatsStream.writePair("msgRateRedeliver", subMsgRateRedeliver);
topicStatsStream.writePair("type", subscription.getTypeString());
if (subscription.getDispatcher() != null) {
topicStatsStream.writePair("msgDropRate",
subscription.getDispatcher().getMesssageDropRate().getRate());
}
// Close consumers
topicStatsStream.endObject();
topicStats.aggMsgRateOut += subMsgRateOut;
topicStats.aggMsgThroughputOut += subMsgThroughputOut;
nsStats.msgBacklog += subscription.getNumberOfEntriesInBacklog();
} catch (Exception e) {
log.error("Got exception when creating consumer stats for subscription {}: {}", subscriptionName,
e.getMessage(), e);
}
});
// Close subscription
topicStatsStream.endObject();
// Remaining dest stats.
topicStats.averageMsgSize = topicStats.aggMsgRateIn == 0.0 ? 0.0
: (topicStats.aggMsgThroughputIn / topicStats.aggMsgRateIn);
topicStatsStream.writePair("producerCount", producers.size());
topicStatsStream.writePair("averageMsgSize", topicStats.averageMsgSize);
topicStatsStream.writePair("msgRateIn", topicStats.aggMsgRateIn);
topicStatsStream.writePair("msgRateOut", topicStats.aggMsgRateOut);
topicStatsStream.writePair("msgThroughputIn", topicStats.aggMsgThroughputIn);
topicStatsStream.writePair("msgThroughputOut", topicStats.aggMsgThroughputOut);
nsStats.msgRateIn += topicStats.aggMsgRateIn;
nsStats.msgRateOut += topicStats.aggMsgRateOut;
nsStats.msgThroughputIn += topicStats.aggMsgThroughputIn;
nsStats.msgThroughputOut += topicStats.aggMsgThroughputOut;
bundleStats.msgRateIn += topicStats.aggMsgRateIn;
bundleStats.msgRateOut += topicStats.aggMsgRateOut;
bundleStats.msgThroughputIn += topicStats.aggMsgThroughputIn;
bundleStats.msgThroughputOut += topicStats.aggMsgThroughputOut;
// Close topic object
topicStatsStream.endObject();
}
public NonPersistentTopicStats getStats() {
NonPersistentTopicStats stats = new NonPersistentTopicStats();
ObjectObjectHashMap<String, PublisherStats> remotePublishersStats = new ObjectObjectHashMap<String, PublisherStats>();
producers.forEach(producer -> {
NonPersistentPublisherStats publisherStats = (NonPersistentPublisherStats) producer.getStats();
stats.msgRateIn += publisherStats.msgRateIn;
stats.msgThroughputIn += publisherStats.msgThroughputIn;
if (producer.isRemote()) {
remotePublishersStats.put(producer.getRemoteCluster(), publisherStats);
} else {
stats.getPublishers().add(publisherStats);
}
});
stats.averageMsgSize = stats.msgRateIn == 0.0 ? 0.0 : (stats.msgThroughputIn / stats.msgRateIn);
subscriptions.forEach((name, subscription) -> {
NonPersistentSubscriptionStats subStats = subscription.getStats();
stats.msgRateOut += subStats.msgRateOut;
stats.msgThroughputOut += subStats.msgThroughputOut;
stats.getSubscriptions().put(name, subStats);
});
replicators.forEach((cluster, replicator) -> {
NonPersistentReplicatorStats ReplicatorStats = replicator.getStats();
// Add incoming msg rates
PublisherStats pubStats = remotePublishersStats.get(replicator.getRemoteCluster());
if (pubStats != null) {
ReplicatorStats.msgRateIn = pubStats.msgRateIn;
ReplicatorStats.msgThroughputIn = pubStats.msgThroughputIn;
ReplicatorStats.inboundConnection = pubStats.getAddress();
ReplicatorStats.inboundConnectedSince = pubStats.getConnectedSince();
}
stats.msgRateOut += ReplicatorStats.msgRateOut;
stats.msgThroughputOut += ReplicatorStats.msgThroughputOut;
stats.getReplication().put(replicator.getRemoteCluster(), ReplicatorStats);
});
return stats;
}
public PersistentTopicInternalStats getInternalStats() {
PersistentTopicInternalStats stats = new PersistentTopicInternalStats();
stats.entriesAddedCounter = ENTRIES_ADDED_COUNTER_UPDATER.get(this);
stats.cursors = Maps.newTreeMap();
subscriptions.forEach((name, subs) -> stats.cursors.put(name, new CursorStats()));
replicators.forEach((name, subs) -> stats.cursors.put(name, new CursorStats()));
return stats;
}
public boolean isActive() {
if (TopicName.get(topic).isGlobal()) {
// No local consumers and no local producers
return !subscriptions.isEmpty() || hasLocalProducers();
}
return USAGE_COUNT_UPDATER.get(this) != 0 || !subscriptions.isEmpty();
}
@Override
public void checkGC(int gcIntervalInSeconds) {
if (isActive()) {
lastActive = System.nanoTime();
} else {
if (System.nanoTime() - lastActive > TimeUnit.SECONDS.toNanos(gcIntervalInSeconds)) {
if (TopicName.get(topic).isGlobal()) {
// For global namespace, close repl producers first.
// Once all repl producers are closed, we can delete the topic,
// provided no remote producers connected to the broker.
if (log.isDebugEnabled()) {
log.debug("[{}] Global topic inactive for {} seconds, closing repl producers.", topic,
gcIntervalInSeconds);
}
stopReplProducers().thenCompose(v -> delete(true, false))
.thenRun(() -> log.info("[{}] Topic deleted successfully due to inactivity", topic))
.exceptionally(e -> {
if (e.getCause() instanceof TopicBusyException) {
// topic became active again
if (log.isDebugEnabled()) {
log.debug("[{}] Did not delete busy topic: {}", topic,
e.getCause().getMessage());
}
replicators.forEach((region, replicator) -> ((NonPersistentReplicator) replicator)
.startProducer());
} else {
log.warn("[{}] Inactive topic deletion failed", topic, e);
}
return null;
});
}
}
}
}
@Override
public void checkInactiveSubscriptions() {
// no-op
}
@Override
public CompletableFuture<Void> onPoliciesUpdate(Policies data) {
if (log.isDebugEnabled()) {
log.debug("[{}] isEncryptionRequired changes: {} -> {}", topic, isEncryptionRequired, data.encryption_required);
}
isEncryptionRequired = data.encryption_required;
schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
data.schema_auto_update_compatibility_strategy);
producers.forEach(producer -> {
producer.checkPermissions();
producer.checkEncryption();
});
subscriptions.forEach((subName, sub) -> sub.getConsumers().forEach(Consumer::checkPermissions));
return checkReplicationAndRetryOnFailure();
}
/**
*
* @return Backlog quota for topic
*/
@Override
public BacklogQuota getBacklogQuota() {
// No-op
throw new UnsupportedOperationException("getBacklogQuota method is not supported on non-persistent topic");
}
/**
*
* @return quota exceeded status for blocking producer creation
*/
@Override
public boolean isBacklogQuotaExceeded(String producerName) {
// No-op
return false;
}
@Override
public boolean isEncryptionRequired() {
return isEncryptionRequired;
}
@Override
public boolean isReplicated() {
return replicators.size() > 1;
}
@Override
public CompletableFuture<Void> unsubscribe(String subName) {
// No-op
return CompletableFuture.completedFuture(null);
}
@Override
public Position getLastMessageId() {
throw new UnsupportedOperationException("getLastMessageId is not supported on non-persistent topic");
}
public void markBatchMessagePublished() {
this.hasBatchMessagePublished = true;
}
private static final Logger log = LoggerFactory.getLogger(NonPersistentTopic.class);
@Override
public CompletableFuture<Boolean> hasSchema() {
String base = TopicName.get(getName()).getPartitionedTopicName();
String id = TopicName.get(base).getSchemaName();
return brokerService.pulsar()
.getSchemaRegistryService()
.getSchema(id).thenApply((schema) -> schema != null);
}
@Override
public CompletableFuture<SchemaVersion> addSchema(SchemaData schema) {
if (schema == null) {
return CompletableFuture.completedFuture(SchemaVersion.Empty);
}
String base = TopicName.get(getName()).getPartitionedTopicName();
String id = TopicName.get(base).getSchemaName();
return brokerService.pulsar()
.getSchemaRegistryService()
.putSchemaIfAbsent(id, schema, schemaCompatibilityStrategy);
}
@Override
public CompletableFuture<Boolean> isSchemaCompatible(SchemaData schema) {
String base = TopicName.get(getName()).getPartitionedTopicName();
String id = TopicName.get(base).getSchemaName();
return brokerService.pulsar()
.getSchemaRegistryService()
.isCompatibleWithLatestVersion(id, schema, schemaCompatibilityStrategy);
}
@Override
public CompletableFuture<Boolean> addSchemaIfIdleOrCheckCompatible(SchemaData schema) {
return hasSchema()
.thenCompose((hasSchema) -> {
if (hasSchema || isActive() || ENTRIES_ADDED_COUNTER_UPDATER.get(this) != 0) {
return isSchemaCompatible(schema);
} else {
return addSchema(schema).thenApply((ignore) -> true);
}
});
}
}