blob: 20a97f571f9e49475da7f3fc4cb44812599caff7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.service;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC;
import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES;
import com.google.common.base.MoreObjects;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import java.util.concurrent.atomic.LongAdder;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.bookkeeper.mledger.util.StatsBuckets;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.ProducerBusyException;
import org.apache.pulsar.broker.service.BrokerServiceException.ProducerFencedException;
import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException;
import org.apache.pulsar.broker.service.schema.SchemaRegistryService;
import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException;
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.InactiveTopicDeleteMode;
import org.apache.pulsar.common.policies.data.InactiveTopicPolicies;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.policies.data.PublishRate;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.apache.pulsar.common.policies.data.TopicPolicies;
import org.apache.pulsar.common.protocol.schema.SchemaData;
import org.apache.pulsar.common.protocol.schema.SchemaVersion;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class AbstractTopic implements Topic {
protected static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60;
protected final String topic;
// Producers currently connected to this topic
protected final ConcurrentHashMap<String, Producer> producers;
protected final BrokerService brokerService;
// Prefix for replication cursors
protected final String replicatorPrefix;
protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
protected volatile boolean isFenced;
// Inactive topic policies
protected InactiveTopicPolicies inactiveTopicPolicies = new InactiveTopicPolicies();
// Timestamp of when this topic was last seen active
protected volatile long lastActive;
// Flag to signal that producer of this topic has published batch-message so, broker should not allow consumer which
// doesn't support batch-message
protected volatile boolean hasBatchMessagePublished = false;
protected StatsBuckets addEntryLatencyStatsUsec = new StatsBuckets(ENTRY_LATENCY_BUCKETS_USEC);
// Whether messages published must be encrypted or not in this topic
protected volatile boolean isEncryptionRequired = false;
protected volatile SchemaCompatibilityStrategy schemaCompatibilityStrategy =
SchemaCompatibilityStrategy.FULL;
protected volatile boolean isAllowAutoUpdateSchema = true;
// schema validation enforced flag
protected volatile boolean schemaValidationEnforced = false;
protected volatile int maxUnackedMessagesOnConsumerAppilied = 0;
protected volatile Integer maxSubscriptionsPerTopic = null;
protected volatile PublishRateLimiter topicPublishRateLimiter;
protected boolean preciseTopicPublishRateLimitingEnable;
private LongAdder bytesInCounter = new LongAdder();
private LongAdder msgInCounter = new LongAdder();
protected volatile Optional<Long> topicEpoch = Optional.empty();
private volatile boolean hasExclusiveProducer;
// pointer to the exclusive producer
private volatile String exclusiveProducerName;
private final Queue<Pair<Producer, CompletableFuture<Optional<Long>>>> waitingExclusiveProducers =
new ConcurrentLinkedQueue<>();
private static final AtomicLongFieldUpdater<AbstractTopic> USAGE_COUNT_UPDATER =
AtomicLongFieldUpdater.newUpdater(AbstractTopic.class, "usageCount");
private volatile long usageCount = 0;
public AbstractTopic(String topic, BrokerService brokerService) {
this.topic = topic;
this.brokerService = brokerService;
this.producers = new ConcurrentHashMap<>();
this.isFenced = false;
this.replicatorPrefix = brokerService.pulsar().getConfiguration().getReplicatorPrefix();
this.inactiveTopicPolicies.setDeleteWhileInactive(brokerService.pulsar().getConfiguration()
.isBrokerDeleteInactiveTopicsEnabled());
this.inactiveTopicPolicies.setMaxInactiveDurationSeconds(brokerService.pulsar().getConfiguration()
.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds());
this.inactiveTopicPolicies.setInactiveTopicDeleteMode(brokerService.pulsar().getConfiguration()
.getBrokerDeleteInactiveTopicsMode());
this.lastActive = System.nanoTime();
Policies policies = null;
try {
policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()))
.orElseGet(() -> new Policies());
} catch (Exception e) {
log.warn("[{}] Error getting policies {} and publish throttling will be disabled", topic, e.getMessage());
}
this.preciseTopicPublishRateLimitingEnable =
brokerService.pulsar().getConfiguration().isPreciseTopicPublishRateLimiterEnable();
updatePublishDispatcher(policies);
}
protected boolean isProducersExceeded() {
Integer maxProducers = null;
TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
if (topicPolicies != null) {
maxProducers = topicPolicies.getMaxProducerPerTopic();
}
if (maxProducers == null) {
Policies policies;
try {
policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.get(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()))
.orElseGet(() -> new Policies());
} catch (Exception e) {
policies = new Policies();
}
maxProducers = policies.max_producers_per_topic;
}
maxProducers = maxProducers != null ? maxProducers : brokerService.pulsar()
.getConfiguration().getMaxProducersPerTopic();
if (maxProducers > 0 && maxProducers <= producers.size()) {
return true;
}
return false;
}
protected boolean isConsumersExceededOnTopic() {
Integer maxConsumers = null;
TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
if (topicPolicies != null) {
maxConsumers = topicPolicies.getMaxConsumerPerTopic();
}
if (maxConsumers == null) {
Policies policies;
try {
// Use getDataIfPresent from zk cache to make the call non-blocking and prevent deadlocks
policies = brokerService.pulsar().getConfigurationCache().policiesCache()
.getDataIfPresent(AdminResource.path(POLICIES, TopicName.get(topic).getNamespace()));
if (policies == null) {
policies = new Policies();
}
} catch (Exception e) {
log.warn("[{}] Failed to get namespace policies that include max number of consumers: {}", topic,
e.getMessage());
policies = new Policies();
}
maxConsumers = policies.max_consumers_per_topic;
}
final int maxConsumersPerTopic = maxConsumers != null ? maxConsumers
: brokerService.pulsar().getConfiguration().getMaxConsumersPerTopic();
if (maxConsumersPerTopic > 0 && maxConsumersPerTopic <= getNumberOfConsumers()) {
return true;
}
return false;
}
public abstract int getNumberOfConsumers();
protected void addConsumerToSubscription(Subscription subscription, Consumer consumer)
throws BrokerServiceException {
if (isConsumersExceededOnTopic()) {
log.warn("[{}] Attempting to add consumer to topic which reached max consumers limit", topic);
throw new ConsumerBusyException("Topic reached max consumers limit");
}
subscription.addConsumer(consumer);
}
@Override
public void disableCnxAutoRead() {
if (producers != null) {
producers.values().forEach(producer -> producer.getCnx().disableCnxAutoRead());
}
}
@Override
public void enableCnxAutoRead() {
if (producers != null) {
producers.values().forEach(producer -> producer.getCnx().enableCnxAutoRead());
}
}
protected boolean hasLocalProducers() {
AtomicBoolean foundLocal = new AtomicBoolean(false);
producers.values().forEach(producer -> {
if (!producer.isRemote()) {
foundLocal.set(true);
}
});
return foundLocal.get();
}
@Override
public String toString() {
return MoreObjects.toStringHelper(this).add("topic", topic).toString();
}
@Override
public Map<String, Producer> getProducers() {
return producers;
}
public BrokerService getBrokerService() {
return brokerService;
}
@Override
public String getName() {
return topic;
}
@Override
public boolean isEncryptionRequired() {
return isEncryptionRequired;
}
@Override
public boolean getSchemaValidationEnforced() {
return schemaValidationEnforced;
}
public void markBatchMessagePublished() {
this.hasBatchMessagePublished = true;
}
public String getReplicatorPrefix() {
return replicatorPrefix;
}
@Override
public CompletableFuture<Boolean> hasSchema() {
String base = TopicName.get(getName()).getPartitionedTopicName();
String id = TopicName.get(base).getSchemaName();
return brokerService.pulsar()
.getSchemaRegistryService()
.getSchema(id).thenApply(Objects::nonNull);
}
@Override
public CompletableFuture<SchemaVersion> addSchema(SchemaData schema) {
if (schema == null) {
return CompletableFuture.completedFuture(SchemaVersion.Empty);
}
String base = TopicName.get(getName()).getPartitionedTopicName();
String id = TopicName.get(base).getSchemaName();
SchemaRegistryService schemaRegistryService = brokerService.pulsar().getSchemaRegistryService();
return isAllowAutoUpdateSchema ? schemaRegistryService
.putSchemaIfAbsent(id, schema, schemaCompatibilityStrategy)
: schemaRegistryService.trimDeletedSchemaAndGetList(id).thenCompose(schemaAndMetadataList ->
schemaRegistryService.getSchemaVersionBySchemaData(schemaAndMetadataList, schema)
.thenCompose(schemaVersion -> {
if (schemaVersion == null) {
return FutureUtil
.failedFuture(
new IncompatibleSchemaException(
"Schema not found and schema auto updating is disabled."));
} else {
return CompletableFuture.completedFuture(schemaVersion);
}
}));
}
@Override
public CompletableFuture<SchemaVersion> deleteSchema() {
String base = TopicName.get(getName()).getPartitionedTopicName();
String id = TopicName.get(base).getSchemaName();
SchemaRegistryService schemaRegistryService = brokerService.pulsar().getSchemaRegistryService();
return schemaRegistryService.getSchema(id)
.thenCompose(schema -> {
if (schema != null) {
// It's different from `SchemasResource.deleteSchema`
// because when we delete a topic, the schema
// history is meaningless. But when we delete a schema of a topic, a new schema could be
// registered in the future.
log.info("Delete schema storage of id: {}", id);
return schemaRegistryService.deleteSchemaStorage(id);
} else {
return CompletableFuture.completedFuture(null);
}
});
}
@Override
public CompletableFuture<Void> checkSchemaCompatibleForConsumer(SchemaData schema) {
String base = TopicName.get(getName()).getPartitionedTopicName();
String id = TopicName.get(base).getSchemaName();
return brokerService.pulsar()
.getSchemaRegistryService()
.checkConsumerCompatibility(id, schema, schemaCompatibilityStrategy);
}
@Override
public CompletableFuture<Optional<Long>> addProducer(Producer producer,
CompletableFuture<Void> producerQueuedFuture) {
checkArgument(producer.getTopic() == this);
CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
incrementTopicEpochIfNeeded(producer, producerQueuedFuture)
.thenAccept(producerEpoch -> {
lock.writeLock().lock();
try {
brokerService.checkTopicNsOwnership(getName());
checkTopicFenced();
if (isTerminated()) {
log.warn("[{}] Attempting to add producer to a terminated topic", topic);
throw new TopicTerminatedException("Topic was already terminated");
}
internalAddProducer(producer);
USAGE_COUNT_UPDATER.incrementAndGet(this);
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Added producer -- count: {}", topic, producer.getProducerName(),
USAGE_COUNT_UPDATER.get(this));
}
future.complete(producerEpoch);
} catch (Throwable e) {
future.completeExceptionally(e);
} finally {
lock.writeLock().unlock();
}
}).exceptionally(ex -> {
future.completeExceptionally(ex);
return null;
});
return future;
}
protected CompletableFuture<Optional<Long>> incrementTopicEpochIfNeeded(Producer producer,
CompletableFuture<Void> producerQueuedFuture) {
lock.writeLock().lock();
try {
switch (producer.getAccessMode()) {
case Shared:
if (hasExclusiveProducer || !waitingExclusiveProducers.isEmpty()) {
return FutureUtil.failedFuture(
new ProducerBusyException(
"Topic has an existing exclusive producer: " + exclusiveProducerName));
} else {
// Normal producer getting added, we don't need a new epoch
return CompletableFuture.completedFuture(topicEpoch);
}
case Exclusive:
if (hasExclusiveProducer || !waitingExclusiveProducers.isEmpty()) {
return FutureUtil.failedFuture(
new ProducerFencedException(
"Topic has an existing exclusive producer: " + exclusiveProducerName));
} else if (!producers.isEmpty()) {
return FutureUtil.failedFuture(new ProducerFencedException("Topic has existing shared producers"));
} else if (producer.getTopicEpoch().isPresent()
&& producer.getTopicEpoch().get() < topicEpoch.orElse(-1L)) {
// If a producer reconnects, but all the topic epoch has already moved forward, this producer needs
// to be fenced, because a new producer had been present in between.
return FutureUtil.failedFuture(new ProducerFencedException(
String.format("Topic epoch has already moved. Current epoch: %d, Producer epoch: %d",
topicEpoch.get(), producer.getTopicEpoch().get())));
} else {
// There are currently no existing producers
hasExclusiveProducer = true;
exclusiveProducerName = producer.getProducerName();
CompletableFuture<Long> future;
if (producer.getTopicEpoch().isPresent()) {
future = setTopicEpoch(producer.getTopicEpoch().get());
} else {
future = incrementTopicEpoch(topicEpoch);
}
future.exceptionally(ex -> {
hasExclusiveProducer = false;
exclusiveProducerName = null;
return null;
});
return future.thenApply(epoch -> {
topicEpoch = Optional.of(epoch);
return topicEpoch;
});
}
case WaitForExclusive: {
if (hasExclusiveProducer || !producers.isEmpty()) {
CompletableFuture<Optional<Long>> future = new CompletableFuture<>();
log.info("[{}] Queuing producer {} since there's already a producer", topic, producer);
waitingExclusiveProducers.add(Pair.of(producer, future));
producerQueuedFuture.complete(null);
return future;
} else if (producer.getTopicEpoch().isPresent()
&& producer.getTopicEpoch().get() < topicEpoch.orElse(-1L)) {
// If a producer reconnects, but all the topic epoch has already moved forward, this producer needs
// to be fenced, because a new producer had been present in between.
return FutureUtil.failedFuture(new ProducerFencedException(
String.format("Topic epoch has already moved. Current epoch: %d, Producer epoch: %d",
topicEpoch.get(), producer.getTopicEpoch().get())));
} else {
// There are currently no existing producers
hasExclusiveProducer = true;
exclusiveProducerName = producer.getProducerName();
CompletableFuture<Long> future;
if (producer.getTopicEpoch().isPresent()) {
future = setTopicEpoch(producer.getTopicEpoch().get());
} else {
future = incrementTopicEpoch(topicEpoch);
}
future.exceptionally(ex -> {
hasExclusiveProducer = false;
exclusiveProducerName = null;
return null;
});
return future.thenApply(epoch -> {
topicEpoch = Optional.of(epoch);
return topicEpoch;
});
}
}
default:
return FutureUtil.failedFuture(
new BrokerServiceException("Invalid producer access mode: " + producer.getAccessMode()));
}
} catch (Exception e) {
log.error("Encountered unexpected error during exclusive producer creation", e);
return FutureUtil.failedFuture(new BrokerServiceException(e));
} finally {
lock.writeLock().unlock();
}
}
protected abstract CompletableFuture<Long> setTopicEpoch(long newEpoch);
protected abstract CompletableFuture<Long> incrementTopicEpoch(Optional<Long> currentEpoch);
@Override
public void recordAddLatency(long latency, TimeUnit unit) {
addEntryLatencyStatsUsec.addValue(unit.toMicros(latency));
PUBLISH_LATENCY.observe(latency, unit);
}
protected void setSchemaCompatibilityStrategy (Policies policies) {
if (policies.schema_compatibility_strategy == SchemaCompatibilityStrategy.UNDEFINED) {
schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy(
policies.schema_auto_update_compatibility_strategy);
} else {
schemaCompatibilityStrategy = policies.schema_compatibility_strategy;
}
}
private static final Summary PUBLISH_LATENCY = Summary.build("pulsar_broker_publish_latency", "-")
.quantile(0.0)
.quantile(0.50)
.quantile(0.95)
.quantile(0.99)
.quantile(0.999)
.quantile(0.9999)
.quantile(1.0)
.register();
@Override
public void checkTopicPublishThrottlingRate() {
this.topicPublishRateLimiter.checkPublishRate();
}
@Override
public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) {
// increase topic publish rate limiter
this.topicPublishRateLimiter.incrementPublishCount(numOfMessages, msgSizeInBytes);
// increase broker publish rate limiter
getBrokerPublishRateLimiter().incrementPublishCount(numOfMessages, msgSizeInBytes);
// increase counters
bytesInCounter.add(msgSizeInBytes);
msgInCounter.add(numOfMessages);
}
@Override
public void resetTopicPublishCountAndEnableReadIfRequired() {
// broker rate not exceeded. and completed topic limiter reset.
if (!getBrokerPublishRateLimiter().isPublishRateExceeded() && topicPublishRateLimiter.resetPublishCount()) {
enableProducerReadForPublishRateLimiting();
}
}
@Override
public void resetBrokerPublishCountAndEnableReadIfRequired(boolean doneBrokerReset) {
// topic rate not exceeded, and completed broker limiter reset.
if (!topicPublishRateLimiter.isPublishRateExceeded() && doneBrokerReset) {
enableProducerReadForPublishRateLimiting();
}
}
/**
* it sets cnx auto-readable if producer's cnx is disabled due to publish-throttling.
*/
protected void enableProducerReadForPublishRateLimiting() {
if (producers != null) {
producers.values().forEach(producer -> {
producer.getCnx().cancelPublishRateLimiting();
producer.getCnx().enableCnxAutoRead();
});
}
}
protected void enableProducerReadForPublishBufferLimiting() {
if (producers != null) {
producers.values().forEach(producer -> {
producer.getCnx().cancelPublishBufferLimiting();
producer.getCnx().enableCnxAutoRead();
});
}
}
protected void disableProducerRead() {
if (producers != null) {
producers.values().forEach(producer -> producer.getCnx().disableCnxAutoRead());
}
}
protected void checkTopicFenced() throws BrokerServiceException {
if (isFenced) {
log.warn("[{}] Attempting to add producer to a fenced topic", topic);
throw new BrokerServiceException.TopicFencedException("Topic is temporarily unavailable");
}
}
protected void internalAddProducer(Producer producer) throws BrokerServiceException {
if (isProducersExceeded()) {
log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic);
throw new BrokerServiceException.ProducerBusyException("Topic reached max producers limit");
}
if (log.isDebugEnabled()) {
log.debug("[{}] {} Got request to create producer ", topic, producer.getProducerName());
}
Producer existProducer = producers.putIfAbsent(producer.getProducerName(), producer);
if (existProducer != null) {
tryOverwriteOldProducer(existProducer, producer);
}
}
private void tryOverwriteOldProducer(Producer oldProducer, Producer newProducer)
throws BrokerServiceException {
boolean canOverwrite = false;
if (oldProducer.equals(newProducer) && !isUserProvidedProducerName(oldProducer)
&& !isUserProvidedProducerName(newProducer) && newProducer.getEpoch() > oldProducer.getEpoch()) {
oldProducer.close(false);
canOverwrite = true;
}
if (canOverwrite) {
if (!producers.replace(newProducer.getProducerName(), oldProducer, newProducer)) {
// Met concurrent update, throw exception here so that client can try reconnect later.
throw new BrokerServiceException.NamingException("Producer with name '" + newProducer.getProducerName()
+ "' replace concurrency error");
} else {
handleProducerRemoved(oldProducer);
}
} else {
throw new BrokerServiceException.NamingException(
"Producer with name '" + newProducer.getProducerName() + "' is already connected to topic");
}
}
private boolean isUserProvidedProducerName(Producer producer){
//considered replicator producer as generated name producer
return producer.isUserProvidedProducerName() && !producer.getProducerName().startsWith(replicatorPrefix);
}
@Override
public void removeProducer(Producer producer) {
checkArgument(producer.getTopic() == this);
if (producers.remove(producer.getProducerName(), producer)) {
handleProducerRemoved(producer);
}
}
protected void handleProducerRemoved(Producer producer) {
// decrement usage only if this was a valid producer close
USAGE_COUNT_UPDATER.decrementAndGet(this);
// this conditional check is an optimization so we don't have acquire the write lock
// and execute following routine if there are no exclusive producers
if (hasExclusiveProducer) {
lock.writeLock().lock();
try {
hasExclusiveProducer = false;
exclusiveProducerName = null;
Pair<Producer, CompletableFuture<Optional<Long>>> nextWaitingProducer =
waitingExclusiveProducers.poll();
if (nextWaitingProducer != null) {
Producer nextProducer = nextWaitingProducer.getKey();
CompletableFuture<Optional<Long>> producerFuture = nextWaitingProducer.getValue();
hasExclusiveProducer = true;
exclusiveProducerName = nextProducer.getProducerName();
CompletableFuture<Long> future;
if (nextProducer.getTopicEpoch().isPresent()) {
future = setTopicEpoch(nextProducer.getTopicEpoch().get());
} else {
future = incrementTopicEpoch(topicEpoch);
}
future.thenAccept(epoch -> {
topicEpoch = Optional.of(epoch);
producerFuture.complete(topicEpoch);
}).exceptionally(ex -> {
hasExclusiveProducer = false;
exclusiveProducerName = null;
producerFuture.completeExceptionally(ex);
return null;
});
}
} finally {
lock.writeLock().unlock();
}
}
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] Removed producer -- count: {}", topic, producer.getProducerName(),
USAGE_COUNT_UPDATER.get(this));
}
lastActive = System.nanoTime();
}
public void handleConsumerAdded(String subscriptionName, String consumerName) {
USAGE_COUNT_UPDATER.incrementAndGet(this);
if (log.isDebugEnabled()) {
log.debug("[{}] [{}] [{}] Added consumer -- count: {}", topic, subscriptionName,
consumerName, USAGE_COUNT_UPDATER.get(this));
}
}
public void decrementUsageCount() {
USAGE_COUNT_UPDATER.decrementAndGet(this);
}
public long currentUsageCount() {
return usageCount;
}
@Override
public boolean isPublishRateExceeded() {
// either topic or broker publish rate exceeded.
return this.topicPublishRateLimiter.isPublishRateExceeded()
|| getBrokerPublishRateLimiter().isPublishRateExceeded();
}
@Override
public boolean isTopicPublishRateExceeded(int numberMessages, int bytes) {
// whether topic publish rate exceed if precise rate limit is enable
return preciseTopicPublishRateLimitingEnable && !this.topicPublishRateLimiter.tryAcquire(numberMessages, bytes);
}
@Override
public boolean isBrokerPublishRateExceeded() {
// whether broker publish rate exceed
return getBrokerPublishRateLimiter().isPublishRateExceeded();
}
public PublishRateLimiter getTopicPublishRateLimiter() {
return topicPublishRateLimiter;
}
public PublishRateLimiter getBrokerPublishRateLimiter() {
return brokerService.getBrokerPublishRateLimiter();
}
public void updateMaxPublishRate(Policies policies) {
updatePublishDispatcher(policies);
}
private void updatePublishDispatcher(Policies policies) {
//if topic-level policy exists, try to use topic-level publish rate policy
TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
if (topicPolicies != null && topicPolicies.isPublishRateSet()) {
log.info("Using topic policy publish rate instead of namespace level topic publish rate on topic {}",
this.topic);
updatePublishDispatcher(topicPolicies.getPublishRate());
return;
}
//topic-level policy is not set, try to use namespace-level rate policy
final String clusterName = brokerService.pulsar().getConfiguration().getClusterName();
final PublishRate publishRate = policies != null && policies.publishMaxMessageRate != null
? policies.publishMaxMessageRate.get(clusterName)
: null;
//both namespace-level and topic-level policy are not set, try to use broker-level policy
ServiceConfiguration serviceConfiguration = brokerService.pulsar().getConfiguration();
if (publishRate == null) {
PublishRate brokerPublishRate = new PublishRate(serviceConfiguration.getMaxPublishRatePerTopicInMessages()
, serviceConfiguration.getMaxPublishRatePerTopicInBytes());
updatePublishDispatcher(brokerPublishRate);
return;
}
//publishRate is not null , use namespace-level policy
updatePublishDispatcher(publishRate);
}
public long getMsgInCounter() {
return this.msgInCounter.longValue();
}
public long getBytesInCounter() {
return this.bytesInCounter.longValue();
}
public long getMsgOutCounter() {
return getStats(false, false).msgOutCounter;
}
public long getBytesOutCounter() {
return getStats(false, false).bytesOutCounter;
}
public boolean isDeleteWhileInactive() {
return this.inactiveTopicPolicies.isDeleteWhileInactive();
}
public boolean deletePartitionedTopicMetadataWhileInactive() {
return brokerService.pulsar().getConfiguration().isBrokerDeleteInactivePartitionedTopicMetadataEnabled();
}
public void setDeleteWhileInactive(boolean deleteWhileInactive) {
this.inactiveTopicPolicies.setDeleteWhileInactive(deleteWhileInactive);
}
protected abstract boolean isTerminated();
private static final Logger log = LoggerFactory.getLogger(AbstractTopic.class);
public InactiveTopicPolicies getInactiveTopicPolicies() {
return inactiveTopicPolicies;
}
public void resetInactiveTopicPolicies(InactiveTopicDeleteMode inactiveTopicDeleteMode
, int maxInactiveDurationSeconds, boolean deleteWhileInactive) {
inactiveTopicPolicies.setInactiveTopicDeleteMode(inactiveTopicDeleteMode);
inactiveTopicPolicies.setMaxInactiveDurationSeconds(maxInactiveDurationSeconds);
inactiveTopicPolicies.setDeleteWhileInactive(deleteWhileInactive);
}
/**
* Get {@link TopicPolicies} for this topic.
* @param topicName
* @return TopicPolicies is exist else return null.
*/
public TopicPolicies getTopicPolicies(TopicName topicName) {
TopicName cloneTopicName = topicName;
if (topicName.isPartitioned()) {
cloneTopicName = TopicName.get(topicName.getPartitionedTopicName());
}
try {
return brokerService.pulsar().getTopicPoliciesService().getTopicPolicies(cloneTopicName);
} catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) {
log.debug("Topic {} policies have not been initialized yet.", topicName.getPartitionedTopicName());
return null;
} catch (NullPointerException e) {
log.debug("Topic level policies are not enabled. "
+ "Please refer to systemTopicEnabled and topicLevelPoliciesEnabled on broker.conf");
return null;
}
}
protected int getWaitingProducersCount() {
return waitingExclusiveProducers.size();
}
protected boolean isExceedMaximumMessageSize(int size) {
Integer maxMessageSize = null;
TopicPolicies topicPolicies = getTopicPolicies(TopicName.get(topic));
if (topicPolicies != null && topicPolicies.isMaxMessageSizeSet()) {
maxMessageSize = topicPolicies.getMaxMessageSize();
}
if (maxMessageSize != null) {
if (maxMessageSize == 0) {
return false;
}
return size > maxMessageSize;
}
return false;
}
/**
* update topic publish dispatcher for this topic.
*/
protected void updatePublishDispatcher(PublishRate publishRate) {
if (publishRate != null && (publishRate.publishThrottlingRateInByte > 0
|| publishRate.publishThrottlingRateInMsg > 0)) {
log.info("Enabling publish rate limiting {} ", publishRate);
if (!preciseTopicPublishRateLimitingEnable) {
this.brokerService.setupTopicPublishRateLimiterMonitor();
}
if (this.topicPublishRateLimiter == null
|| this.topicPublishRateLimiter == PublishRateLimiter.DISABLED_RATE_LIMITER) {
// create new rateLimiter if rate-limiter is disabled
if (preciseTopicPublishRateLimitingEnable) {
this.topicPublishRateLimiter = new PrecisPublishLimiter(publishRate,
() -> this.enableCnxAutoRead());
} else {
this.topicPublishRateLimiter = new PublishRateLimiterImpl(publishRate);
}
} else {
this.topicPublishRateLimiter.update(publishRate);
}
} else {
log.info("Disabling publish throttling for {}", this.topic);
this.topicPublishRateLimiter = PublishRateLimiter.DISABLED_RATE_LIMITER;
enableProducerReadForPublishRateLimiting();
}
}
}