| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.broker.service; |
| |
| import static com.google.common.base.Preconditions.checkArgument; |
| import static org.apache.bookkeeper.mledger.impl.ManagedLedgerMBeanImpl.ENTRY_LATENCY_BUCKETS_USEC; |
| import com.google.common.base.MoreObjects; |
| import com.google.common.collect.Lists; |
| import java.util.Arrays; |
| import java.util.EnumSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Optional; |
| import java.util.Queue; |
| import java.util.Set; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.ConcurrentLinkedQueue; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicLongFieldUpdater; |
| import java.util.concurrent.atomic.LongAdder; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| import lombok.Getter; |
| import org.apache.bookkeeper.mledger.util.StatsBuckets; |
| import org.apache.commons.collections4.CollectionUtils; |
| import org.apache.commons.collections4.MapUtils; |
| import org.apache.commons.lang3.tuple.Pair; |
| import org.apache.pulsar.broker.ServiceConfiguration; |
| import org.apache.pulsar.broker.resourcegroup.ResourceGroup; |
| import org.apache.pulsar.broker.resourcegroup.ResourceGroupPublishLimiter; |
| import org.apache.pulsar.broker.service.BrokerServiceException.ConsumerBusyException; |
| import org.apache.pulsar.broker.service.BrokerServiceException.ProducerBusyException; |
| import org.apache.pulsar.broker.service.BrokerServiceException.ProducerFencedException; |
| import org.apache.pulsar.broker.service.BrokerServiceException.TopicTerminatedException; |
| import org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorage; |
| import org.apache.pulsar.broker.service.schema.SchemaRegistryService; |
| import org.apache.pulsar.broker.service.schema.exceptions.IncompatibleSchemaException; |
| import org.apache.pulsar.broker.stats.prometheus.metrics.Summary; |
| import org.apache.pulsar.broker.systopic.SystemTopicClient; |
| import org.apache.pulsar.common.api.proto.CommandSubscribe.SubType; |
| import org.apache.pulsar.common.naming.TopicName; |
| import org.apache.pulsar.common.policies.data.BacklogQuota; |
| import org.apache.pulsar.common.policies.data.HierarchyTopicPolicies; |
| import org.apache.pulsar.common.policies.data.InactiveTopicPolicies; |
| import org.apache.pulsar.common.policies.data.Policies; |
| import org.apache.pulsar.common.policies.data.PublishRate; |
| import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy; |
| import org.apache.pulsar.common.policies.data.TopicPolicies; |
| import org.apache.pulsar.common.protocol.schema.SchemaData; |
| import org.apache.pulsar.common.protocol.schema.SchemaVersion; |
| import org.apache.pulsar.common.util.FutureUtil; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| public abstract class AbstractTopic implements Topic, TopicPolicyListener<TopicPolicies> { |
| |
| protected static final long POLICY_UPDATE_FAILURE_RETRY_TIME_SECONDS = 60; |
| |
| protected final String topic; |
| |
| // Producers currently connected to this topic |
| protected final ConcurrentHashMap<String, Producer> producers; |
| |
| protected final BrokerService brokerService; |
| |
| // Prefix for replication cursors |
| protected final String replicatorPrefix; |
| |
| protected final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); |
| |
| protected volatile boolean isFenced; |
| |
| protected final HierarchyTopicPolicies topicPolicies; |
| |
| // Timestamp of when this topic was last seen active |
| protected volatile long lastActive; |
| |
| // Flag to signal that producer of this topic has published batch-message so, broker should not allow consumer which |
| // doesn't support batch-message |
| protected volatile boolean hasBatchMessagePublished = false; |
| |
| protected StatsBuckets addEntryLatencyStatsUsec = new StatsBuckets(ENTRY_LATENCY_BUCKETS_USEC); |
| |
| // Whether messages published must be encrypted or not in this topic |
| protected volatile boolean isEncryptionRequired = false; |
| |
| @Getter |
| protected volatile SchemaCompatibilityStrategy schemaCompatibilityStrategy = |
| SchemaCompatibilityStrategy.FULL; |
| protected volatile Boolean isAllowAutoUpdateSchema; |
| // schema validation enforced flag |
| protected volatile boolean schemaValidationEnforced = false; |
| |
| protected volatile int maxUnackedMessagesOnConsumerAppilied = 0; |
| |
| protected volatile PublishRateLimiter topicPublishRateLimiter; |
| |
| protected volatile ResourceGroupPublishLimiter resourceGroupPublishLimiter; |
| |
| protected boolean preciseTopicPublishRateLimitingEnable; |
| |
| @Getter |
| protected boolean resourceGroupRateLimitingEnabled; |
| |
| private LongAdder bytesInCounter = new LongAdder(); |
| private LongAdder msgInCounter = new LongAdder(); |
| |
| protected volatile Optional<Long> topicEpoch = Optional.empty(); |
| private volatile boolean hasExclusiveProducer; |
| // pointer to the exclusive producer |
| private volatile String exclusiveProducerName; |
| |
| private final Queue<Pair<Producer, CompletableFuture<Optional<Long>>>> waitingExclusiveProducers = |
| new ConcurrentLinkedQueue<>(); |
| |
| private static final AtomicLongFieldUpdater<AbstractTopic> USAGE_COUNT_UPDATER = |
| AtomicLongFieldUpdater.newUpdater(AbstractTopic.class, "usageCount"); |
| private volatile long usageCount = 0; |
| |
| public AbstractTopic(String topic, BrokerService brokerService) { |
| this.topic = topic; |
| this.brokerService = brokerService; |
| this.producers = new ConcurrentHashMap<>(); |
| this.isFenced = false; |
| ServiceConfiguration config = brokerService.pulsar().getConfiguration(); |
| this.replicatorPrefix = config.getReplicatorPrefix(); |
| |
| topicPolicies = new HierarchyTopicPolicies(); |
| updateTopicPolicyByBrokerConfig(); |
| |
| this.lastActive = System.nanoTime(); |
| this.preciseTopicPublishRateLimitingEnable = config.isPreciseTopicPublishRateLimiterEnable(); |
| updatePublishDispatcher(Optional.empty()); |
| } |
| |
| protected void updateTopicPolicy(TopicPolicies data) { |
| if (!isSystemTopic()) { |
| // Only use namespace level setting for system topic. |
| topicPolicies.getReplicationClusters().updateTopicValue(data.getReplicationClusters()); |
| } |
| topicPolicies.getMaxSubscriptionsPerTopic().updateTopicValue(data.getMaxSubscriptionsPerTopic()); |
| topicPolicies.getMaxProducersPerTopic().updateTopicValue(data.getMaxProducerPerTopic()); |
| topicPolicies.getMaxConsumerPerTopic().updateTopicValue(data.getMaxConsumerPerTopic()); |
| topicPolicies.getMaxConsumersPerSubscription().updateTopicValue(data.getMaxConsumersPerSubscription()); |
| topicPolicies.getInactiveTopicPolicies().updateTopicValue(data.getInactiveTopicPolicies()); |
| topicPolicies.getDeduplicationEnabled().updateTopicValue(data.getDeduplicationEnabled()); |
| topicPolicies.getSubscriptionTypesEnabled().updateTopicValue( |
| CollectionUtils.isEmpty(data.getSubscriptionTypesEnabled()) ? null : |
| EnumSet.copyOf(data.getSubscriptionTypesEnabled())); |
| Arrays.stream(BacklogQuota.BacklogQuotaType.values()).forEach(type -> |
| this.topicPolicies.getBackLogQuotaMap().get(type).updateTopicValue( |
| data.getBackLogQuotaMap() == null ? null : data.getBackLogQuotaMap().get(type.toString()))); |
| topicPolicies.getTopicMaxMessageSize().updateTopicValue(data.getMaxMessageSize()); |
| topicPolicies.getMessageTTLInSeconds().updateTopicValue(data.getMessageTTLInSeconds()); |
| } |
| |
| protected void updateTopicPolicyByNamespacePolicy(Policies namespacePolicies) { |
| if (log.isDebugEnabled()) { |
| log.debug("[{}]updateTopicPolicyByNamespacePolicy,data={}", topic, namespacePolicies); |
| } |
| if (namespacePolicies.deleted) { |
| return; |
| } |
| topicPolicies.getReplicationClusters().updateNamespaceValue( |
| Lists.newArrayList(CollectionUtils.emptyIfNull(namespacePolicies.replication_clusters))); |
| topicPolicies.getMessageTTLInSeconds().updateNamespaceValue(namespacePolicies.message_ttl_in_seconds); |
| topicPolicies.getMaxSubscriptionsPerTopic().updateNamespaceValue(namespacePolicies.max_subscriptions_per_topic); |
| topicPolicies.getMaxProducersPerTopic().updateNamespaceValue(namespacePolicies.max_producers_per_topic); |
| topicPolicies.getMaxConsumerPerTopic().updateNamespaceValue(namespacePolicies.max_consumers_per_topic); |
| topicPolicies.getMaxConsumersPerSubscription() |
| .updateNamespaceValue(namespacePolicies.max_consumers_per_subscription); |
| topicPolicies.getInactiveTopicPolicies().updateNamespaceValue(namespacePolicies.inactive_topic_policies); |
| topicPolicies.getDeduplicationEnabled().updateNamespaceValue(namespacePolicies.deduplicationEnabled); |
| topicPolicies.getSubscriptionTypesEnabled().updateNamespaceValue( |
| subTypeStringsToEnumSet(namespacePolicies.subscription_types_enabled)); |
| Arrays.stream(BacklogQuota.BacklogQuotaType.values()).forEach( |
| type -> this.topicPolicies.getBackLogQuotaMap().get(type) |
| .updateNamespaceValue(MapUtils.getObject(namespacePolicies.backlog_quota_map, type))); |
| } |
| |
| private void updateTopicPolicyByBrokerConfig() { |
| ServiceConfiguration config = brokerService.pulsar().getConfiguration(); |
| topicPolicies.getInactiveTopicPolicies().updateBrokerValue(new InactiveTopicPolicies( |
| config.getBrokerDeleteInactiveTopicsMode(), |
| config.getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(), |
| config.isBrokerDeleteInactiveTopicsEnabled())); |
| |
| updateBrokerSubscriptionTypesEnabled(); |
| topicPolicies.getMaxSubscriptionsPerTopic().updateBrokerValue(config.getMaxSubscriptionsPerTopic()); |
| topicPolicies.getMaxProducersPerTopic().updateBrokerValue(config.getMaxProducersPerTopic()); |
| topicPolicies.getMaxConsumerPerTopic().updateBrokerValue(config.getMaxConsumersPerTopic()); |
| topicPolicies.getMaxConsumersPerSubscription().updateBrokerValue(config.getMaxConsumersPerSubscription()); |
| topicPolicies.getDeduplicationEnabled().updateBrokerValue(config.isBrokerDeduplicationEnabled()); |
| //init backlogQuota |
| topicPolicies.getBackLogQuotaMap() |
| .get(BacklogQuota.BacklogQuotaType.destination_storage) |
| .updateBrokerValue(brokerService.getBacklogQuotaManager().getDefaultQuota()); |
| topicPolicies.getBackLogQuotaMap() |
| .get(BacklogQuota.BacklogQuotaType.message_age) |
| .updateBrokerValue(brokerService.getBacklogQuotaManager().getDefaultQuota()); |
| |
| topicPolicies.getTopicMaxMessageSize().updateBrokerValue(config.getMaxMessageSize()); |
| topicPolicies.getMessageTTLInSeconds().updateBrokerValue(config.getTtlDurationDefaultInSeconds()); |
| } |
| |
| private EnumSet<SubType> subTypeStringsToEnumSet(Set<String> getSubscriptionTypesEnabled) { |
| EnumSet<SubType> subTypes = EnumSet.noneOf(SubType.class); |
| for (String subTypeStr : CollectionUtils.emptyIfNull(getSubscriptionTypesEnabled)) { |
| try { |
| SubType subType = SubType.valueOf(subTypeStr); |
| subTypes.add(subType); |
| } catch (Throwable t) { |
| //ignore invalid SubType strings. |
| } |
| } |
| if (subTypes.isEmpty()) { |
| return null; |
| } else { |
| return subTypes; |
| } |
| } |
| |
| protected boolean isProducersExceeded() { |
| Integer maxProducers = topicPolicies.getMaxProducersPerTopic().get(); |
| if (maxProducers > 0 && maxProducers <= producers.size()) { |
| return true; |
| } |
| return false; |
| } |
| |
| protected void registerTopicPolicyListener() { |
| if (brokerService.pulsar().getConfig().isSystemTopicEnabled() |
| && brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) { |
| brokerService.getPulsar().getTopicPoliciesService() |
| .registerListener(TopicName.getPartitionedTopicName(topic), this); |
| } |
| } |
| |
| protected void unregisterTopicPolicyListener() { |
| if (brokerService.pulsar().getConfig().isSystemTopicEnabled() |
| && brokerService.pulsar().getConfig().isTopicLevelPoliciesEnabled()) { |
| brokerService.getPulsar().getTopicPoliciesService() |
| .unregisterListener(TopicName.getPartitionedTopicName(topic), this); |
| } |
| } |
| |
| protected boolean isSameAddressProducersExceeded(Producer producer) { |
| final int maxSameAddressProducers = brokerService.pulsar().getConfiguration() |
| .getMaxSameAddressProducersPerTopic(); |
| |
| if (maxSameAddressProducers > 0 |
| && getNumberOfSameAddressProducers(producer.getClientAddress()) >= maxSameAddressProducers) { |
| return true; |
| } |
| |
| return false; |
| } |
| |
| public int getNumberOfSameAddressProducers(final String clientAddress) { |
| int count = 0; |
| if (clientAddress != null) { |
| for (Producer producer : producers.values()) { |
| if (clientAddress.equals(producer.getClientAddress())) { |
| count++; |
| } |
| } |
| } |
| return count; |
| } |
| |
| protected boolean isConsumersExceededOnTopic() { |
| int maxConsumersPerTopic = topicPolicies.getMaxConsumerPerTopic().get(); |
| if (maxConsumersPerTopic > 0 && maxConsumersPerTopic <= getNumberOfConsumers()) { |
| return true; |
| } |
| return false; |
| } |
| |
| protected boolean isSameAddressConsumersExceededOnTopic(Consumer consumer) { |
| final int maxSameAddressConsumers = brokerService.pulsar().getConfiguration() |
| .getMaxSameAddressConsumersPerTopic(); |
| |
| if (maxSameAddressConsumers > 0 |
| && getNumberOfSameAddressConsumers(consumer.getClientAddress()) >= maxSameAddressConsumers) { |
| return true; |
| } |
| |
| return false; |
| } |
| |
| public abstract int getNumberOfConsumers(); |
| public abstract int getNumberOfSameAddressConsumers(String clientAddress); |
| |
| protected int getNumberOfSameAddressConsumers(final String clientAddress, |
| final List<? extends Subscription> subscriptions) { |
| int count = 0; |
| if (clientAddress != null) { |
| for (Subscription subscription : subscriptions) { |
| count += subscription.getNumberOfSameAddressConsumers(clientAddress); |
| } |
| } |
| return count; |
| } |
| |
| protected CompletableFuture<Void> addConsumerToSubscription(Subscription subscription, Consumer consumer) { |
| if (isConsumersExceededOnTopic()) { |
| log.warn("[{}] Attempting to add consumer to topic which reached max consumers limit", topic); |
| return FutureUtil.failedFuture(new ConsumerBusyException("Topic reached max consumers limit")); |
| } |
| |
| if (isSameAddressConsumersExceededOnTopic(consumer)) { |
| log.warn("[{}] Attempting to add consumer to topic which reached max same address consumers limit", topic); |
| return FutureUtil.failedFuture(new ConsumerBusyException("Topic reached max same address consumers limit")); |
| } |
| |
| return subscription.addConsumer(consumer); |
| } |
| |
| @Override |
| public void disableCnxAutoRead() { |
| producers.values().forEach(producer -> producer.getCnx().disableCnxAutoRead()); |
| } |
| |
| @Override |
| public void enableCnxAutoRead() { |
| producers.values().forEach(producer -> producer.getCnx().enableCnxAutoRead()); |
| } |
| |
| protected boolean hasLocalProducers() { |
| AtomicBoolean foundLocal = new AtomicBoolean(false); |
| producers.values().forEach(producer -> { |
| if (!producer.isRemote()) { |
| foundLocal.set(true); |
| } |
| }); |
| |
| return foundLocal.get(); |
| } |
| |
| @Override |
| public String toString() { |
| return MoreObjects.toStringHelper(this).add("topic", topic).toString(); |
| } |
| |
| @Override |
| public Map<String, Producer> getProducers() { |
| return producers; |
| } |
| |
| |
| @Override |
| public BrokerService getBrokerService() { |
| return brokerService; |
| } |
| |
| @Override |
| public String getName() { |
| return topic; |
| } |
| |
| @Override |
| public boolean isEncryptionRequired() { |
| return isEncryptionRequired; |
| } |
| |
| @Override |
| public boolean getSchemaValidationEnforced() { |
| return schemaValidationEnforced; |
| } |
| |
| public void markBatchMessagePublished() { |
| this.hasBatchMessagePublished = true; |
| } |
| |
| public String getReplicatorPrefix() { |
| return replicatorPrefix; |
| } |
| |
| @Override |
| public CompletableFuture<Boolean> hasSchema() { |
| String base = TopicName.get(getName()).getPartitionedTopicName(); |
| String id = TopicName.get(base).getSchemaName(); |
| return brokerService.pulsar() |
| .getSchemaRegistryService() |
| .getSchema(id).thenApply(Objects::nonNull); |
| } |
| |
| @Override |
| public CompletableFuture<SchemaVersion> addSchema(SchemaData schema) { |
| if (schema == null) { |
| return CompletableFuture.completedFuture(SchemaVersion.Empty); |
| } |
| |
| String base = TopicName.get(getName()).getPartitionedTopicName(); |
| String id = TopicName.get(base).getSchemaName(); |
| SchemaRegistryService schemaRegistryService = brokerService.pulsar().getSchemaRegistryService(); |
| |
| if (allowAutoUpdateSchema()) { |
| return schemaRegistryService.putSchemaIfAbsent(id, schema, schemaCompatibilityStrategy); |
| } else { |
| return schemaRegistryService.trimDeletedSchemaAndGetList(id).thenCompose(schemaAndMetadataList -> |
| schemaRegistryService.getSchemaVersionBySchemaData(schemaAndMetadataList, schema) |
| .thenCompose(schemaVersion -> { |
| if (schemaVersion == null) { |
| return FutureUtil.failedFuture(new IncompatibleSchemaException( |
| "Schema not found and schema auto updating is disabled.")); |
| } else { |
| return CompletableFuture.completedFuture(schemaVersion); |
| } |
| })); |
| } |
| } |
| |
| private boolean allowAutoUpdateSchema() { |
| if (isAllowAutoUpdateSchema == null) { |
| return brokerService.pulsar().getConfig().isAllowAutoUpdateSchemaEnabled(); |
| } |
| return isAllowAutoUpdateSchema; |
| } |
| |
| @Override |
| public CompletableFuture<SchemaVersion> deleteSchema() { |
| String base = TopicName.get(getName()).getPartitionedTopicName(); |
| String id = TopicName.get(base).getSchemaName(); |
| SchemaRegistryService schemaRegistryService = brokerService.pulsar().getSchemaRegistryService(); |
| return BookkeeperSchemaStorage.ignoreUnrecoverableBKException(schemaRegistryService.getSchema(id)) |
| .thenCompose(schema -> { |
| if (schema != null) { |
| // It's different from `SchemasResource.deleteSchema` |
| // because when we delete a topic, the schema |
| // history is meaningless. But when we delete a schema of a topic, a new schema could be |
| // registered in the future. |
| log.info("Delete schema storage of id: {}", id); |
| return schemaRegistryService.deleteSchemaStorage(id); |
| } else { |
| return CompletableFuture.completedFuture(null); |
| } |
| }); |
| } |
| |
| @Override |
| public CompletableFuture<Void> checkSchemaCompatibleForConsumer(SchemaData schema) { |
| String base = TopicName.get(getName()).getPartitionedTopicName(); |
| String id = TopicName.get(base).getSchemaName(); |
| return brokerService.pulsar() |
| .getSchemaRegistryService() |
| .checkConsumerCompatibility(id, schema, schemaCompatibilityStrategy); |
| } |
| |
| @Override |
| public CompletableFuture<Optional<Long>> addProducer(Producer producer, |
| CompletableFuture<Void> producerQueuedFuture) { |
| checkArgument(producer.getTopic() == this); |
| |
| return brokerService.checkTopicNsOwnership(getName()) |
| .thenCompose(__ -> |
| incrementTopicEpochIfNeeded(producer, producerQueuedFuture)) |
| .thenCompose(producerEpoch -> { |
| lock.writeLock().lock(); |
| try { |
| checkTopicFenced(); |
| if (isTerminated()) { |
| log.warn("[{}] Attempting to add producer to a terminated topic", topic); |
| throw new TopicTerminatedException("Topic was already terminated"); |
| } |
| internalAddProducer(producer); |
| |
| USAGE_COUNT_UPDATER.incrementAndGet(this); |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] [{}] Added producer -- count: {}", topic, producer.getProducerName(), |
| USAGE_COUNT_UPDATER.get(this)); |
| } |
| |
| return CompletableFuture.completedFuture(producerEpoch); |
| } catch (BrokerServiceException e) { |
| return FutureUtil.failedFuture(e); |
| } finally { |
| lock.writeLock().unlock(); |
| } |
| }); |
| } |
| |
| protected CompletableFuture<Optional<Long>> incrementTopicEpochIfNeeded(Producer producer, |
| CompletableFuture<Void> producerQueuedFuture) { |
| lock.writeLock().lock(); |
| try { |
| switch (producer.getAccessMode()) { |
| case Shared: |
| if (hasExclusiveProducer || !waitingExclusiveProducers.isEmpty()) { |
| return FutureUtil.failedFuture( |
| new ProducerBusyException( |
| "Topic has an existing exclusive producer: " + exclusiveProducerName)); |
| } else { |
| // Normal producer getting added, we don't need a new epoch |
| return CompletableFuture.completedFuture(topicEpoch); |
| } |
| |
| case Exclusive: |
| if (hasExclusiveProducer || !waitingExclusiveProducers.isEmpty()) { |
| return FutureUtil.failedFuture( |
| new ProducerFencedException( |
| "Topic has an existing exclusive producer: " + exclusiveProducerName)); |
| } else if (!producers.isEmpty()) { |
| return FutureUtil.failedFuture(new ProducerFencedException("Topic has existing shared producers")); |
| } else if (producer.getTopicEpoch().isPresent() |
| && producer.getTopicEpoch().get() < topicEpoch.orElse(-1L)) { |
| // If a producer reconnects, but all the topic epoch has already moved forward, this producer needs |
| // to be fenced, because a new producer had been present in between. |
| return FutureUtil.failedFuture(new ProducerFencedException( |
| String.format("Topic epoch has already moved. Current epoch: %d, Producer epoch: %d", |
| topicEpoch.get(), producer.getTopicEpoch().get()))); |
| } else { |
| // There are currently no existing producers |
| hasExclusiveProducer = true; |
| exclusiveProducerName = producer.getProducerName(); |
| |
| CompletableFuture<Long> future; |
| if (producer.getTopicEpoch().isPresent()) { |
| future = setTopicEpoch(producer.getTopicEpoch().get()); |
| } else { |
| future = incrementTopicEpoch(topicEpoch); |
| } |
| future.exceptionally(ex -> { |
| hasExclusiveProducer = false; |
| exclusiveProducerName = null; |
| return null; |
| }); |
| |
| return future.thenApply(epoch -> { |
| topicEpoch = Optional.of(epoch); |
| return topicEpoch; |
| }); |
| } |
| |
| case WaitForExclusive: { |
| if (hasExclusiveProducer || !producers.isEmpty()) { |
| CompletableFuture<Optional<Long>> future = new CompletableFuture<>(); |
| log.info("[{}] Queuing producer {} since there's already a producer", topic, producer); |
| waitingExclusiveProducers.add(Pair.of(producer, future)); |
| producerQueuedFuture.complete(null); |
| return future; |
| } else if (producer.getTopicEpoch().isPresent() |
| && producer.getTopicEpoch().get() < topicEpoch.orElse(-1L)) { |
| // If a producer reconnects, but all the topic epoch has already moved forward, this producer needs |
| // to be fenced, because a new producer had been present in between. |
| return FutureUtil.failedFuture(new ProducerFencedException( |
| String.format("Topic epoch has already moved. Current epoch: %d, Producer epoch: %d", |
| topicEpoch.get(), producer.getTopicEpoch().get()))); |
| } else { |
| // There are currently no existing producers |
| hasExclusiveProducer = true; |
| exclusiveProducerName = producer.getProducerName(); |
| |
| CompletableFuture<Long> future; |
| if (producer.getTopicEpoch().isPresent()) { |
| future = setTopicEpoch(producer.getTopicEpoch().get()); |
| } else { |
| future = incrementTopicEpoch(topicEpoch); |
| } |
| future.exceptionally(ex -> { |
| hasExclusiveProducer = false; |
| exclusiveProducerName = null; |
| return null; |
| }); |
| |
| return future.thenApply(epoch -> { |
| topicEpoch = Optional.of(epoch); |
| return topicEpoch; |
| }); |
| } |
| } |
| |
| default: |
| return FutureUtil.failedFuture( |
| new BrokerServiceException("Invalid producer access mode: " + producer.getAccessMode())); |
| } |
| |
| } catch (Exception e) { |
| log.error("Encountered unexpected error during exclusive producer creation", e); |
| return FutureUtil.failedFuture(new BrokerServiceException(e)); |
| } finally { |
| lock.writeLock().unlock(); |
| } |
| } |
| |
| protected abstract CompletableFuture<Long> setTopicEpoch(long newEpoch); |
| |
| protected abstract CompletableFuture<Long> incrementTopicEpoch(Optional<Long> currentEpoch); |
| |
| @Override |
| public void recordAddLatency(long latency, TimeUnit unit) { |
| addEntryLatencyStatsUsec.addValue(unit.toMicros(latency)); |
| |
| PUBLISH_LATENCY.observe(latency, unit); |
| } |
| |
| protected void setSchemaCompatibilityStrategy(Policies policies) { |
| if (SystemTopicClient.isSystemTopic(TopicName.get(this.topic))) { |
| schemaCompatibilityStrategy = |
| brokerService.pulsar().getConfig().getSystemTopicSchemaCompatibilityStrategy(); |
| } else if (policies.schema_compatibility_strategy == SchemaCompatibilityStrategy.UNDEFINED) { |
| schemaCompatibilityStrategy = brokerService.pulsar() |
| .getConfig().getSchemaCompatibilityStrategy(); |
| if (schemaCompatibilityStrategy == SchemaCompatibilityStrategy.UNDEFINED) { |
| schemaCompatibilityStrategy = SchemaCompatibilityStrategy.fromAutoUpdatePolicy( |
| policies.schema_auto_update_compatibility_strategy); |
| } |
| } else { |
| schemaCompatibilityStrategy = policies.schema_compatibility_strategy; |
| } |
| } |
| private static final Summary PUBLISH_LATENCY = Summary.build("pulsar_broker_publish_latency", "-") |
| .quantile(0.0) |
| .quantile(0.50) |
| .quantile(0.95) |
| .quantile(0.99) |
| .quantile(0.999) |
| .quantile(0.9999) |
| .quantile(1.0) |
| .register(); |
| |
| @Override |
| public void checkTopicPublishThrottlingRate() { |
| this.topicPublishRateLimiter.checkPublishRate(); |
| } |
| |
| @Override |
| public void incrementPublishCount(int numOfMessages, long msgSizeInBytes) { |
| // increase topic publish rate limiter |
| this.topicPublishRateLimiter.incrementPublishCount(numOfMessages, msgSizeInBytes); |
| // increase broker publish rate limiter |
| getBrokerPublishRateLimiter().incrementPublishCount(numOfMessages, msgSizeInBytes); |
| // increase counters |
| bytesInCounter.add(msgSizeInBytes); |
| msgInCounter.add(numOfMessages); |
| } |
| |
| @Override |
| public void resetTopicPublishCountAndEnableReadIfRequired() { |
| // broker rate not exceeded. and completed topic limiter reset. |
| if (!getBrokerPublishRateLimiter().isPublishRateExceeded() && topicPublishRateLimiter.resetPublishCount()) { |
| enableProducerReadForPublishRateLimiting(); |
| } |
| } |
| |
| @Override |
| public void resetBrokerPublishCountAndEnableReadIfRequired(boolean doneBrokerReset) { |
| // topic rate not exceeded, and completed broker limiter reset. |
| if (!topicPublishRateLimiter.isPublishRateExceeded() && doneBrokerReset) { |
| enableProducerReadForPublishRateLimiting(); |
| } |
| } |
| |
| /** |
| * it sets cnx auto-readable if producer's cnx is disabled due to publish-throttling. |
| */ |
| protected void enableProducerReadForPublishRateLimiting() { |
| if (producers != null) { |
| producers.values().forEach(producer -> { |
| producer.getCnx().cancelPublishRateLimiting(); |
| producer.getCnx().enableCnxAutoRead(); |
| }); |
| } |
| } |
| |
| protected void enableProducerReadForPublishBufferLimiting() { |
| if (producers != null) { |
| producers.values().forEach(producer -> { |
| producer.getCnx().cancelPublishBufferLimiting(); |
| producer.getCnx().enableCnxAutoRead(); |
| }); |
| } |
| } |
| |
| protected void disableProducerRead() { |
| if (producers != null) { |
| producers.values().forEach(producer -> producer.getCnx().disableCnxAutoRead()); |
| } |
| } |
| |
| protected void checkTopicFenced() throws BrokerServiceException { |
| if (isFenced) { |
| log.warn("[{}] Attempting to add producer to a fenced topic", topic); |
| throw new BrokerServiceException.TopicFencedException("Topic is temporarily unavailable"); |
| } |
| } |
| |
| protected void internalAddProducer(Producer producer) throws BrokerServiceException { |
| if (isProducersExceeded()) { |
| log.warn("[{}] Attempting to add producer to topic which reached max producers limit", topic); |
| throw new BrokerServiceException.ProducerBusyException("Topic reached max producers limit"); |
| } |
| |
| if (isSameAddressProducersExceeded(producer)) { |
| log.warn("[{}] Attempting to add producer to topic which reached max same address producers limit", topic); |
| throw new BrokerServiceException.ProducerBusyException("Topic reached max same address producers limit"); |
| } |
| |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] {} Got request to create producer ", topic, producer.getProducerName()); |
| } |
| |
| Producer existProducer = producers.putIfAbsent(producer.getProducerName(), producer); |
| if (existProducer != null) { |
| tryOverwriteOldProducer(existProducer, producer); |
| } |
| } |
| |
| private void tryOverwriteOldProducer(Producer oldProducer, Producer newProducer) |
| throws BrokerServiceException { |
| if (newProducer.isSuccessorTo(oldProducer) && !isUserProvidedProducerName(oldProducer) |
| && !isUserProvidedProducerName(newProducer)) { |
| oldProducer.close(false); |
| if (!producers.replace(newProducer.getProducerName(), oldProducer, newProducer)) { |
| // Met concurrent update, throw exception here so that client can try reconnect later. |
| throw new BrokerServiceException.NamingException("Producer with name '" + newProducer.getProducerName() |
| + "' replace concurrency error"); |
| } else { |
| handleProducerRemoved(oldProducer); |
| } |
| } else { |
| throw new BrokerServiceException.NamingException( |
| "Producer with name '" + newProducer.getProducerName() + "' is already connected to topic"); |
| } |
| } |
| |
| private boolean isUserProvidedProducerName(Producer producer){ |
| //considered replicator producer as generated name producer |
| return producer.isUserProvidedProducerName() && !producer.getProducerName().startsWith(replicatorPrefix); |
| } |
| |
| |
| @Override |
| public void removeProducer(Producer producer) { |
| checkArgument(producer.getTopic() == this); |
| |
| if (producers.remove(producer.getProducerName(), producer)) { |
| handleProducerRemoved(producer); |
| } |
| } |
| |
| protected void handleProducerRemoved(Producer producer) { |
| // decrement usage only if this was a valid producer close |
| USAGE_COUNT_UPDATER.decrementAndGet(this); |
| // this conditional check is an optimization so we don't have acquire the write lock |
| // and execute following routine if there are no exclusive producers |
| if (hasExclusiveProducer) { |
| lock.writeLock().lock(); |
| try { |
| hasExclusiveProducer = false; |
| exclusiveProducerName = null; |
| Pair<Producer, CompletableFuture<Optional<Long>>> nextWaitingProducer = |
| waitingExclusiveProducers.poll(); |
| if (nextWaitingProducer != null) { |
| Producer nextProducer = nextWaitingProducer.getKey(); |
| CompletableFuture<Optional<Long>> producerFuture = nextWaitingProducer.getValue(); |
| hasExclusiveProducer = true; |
| exclusiveProducerName = nextProducer.getProducerName(); |
| |
| CompletableFuture<Long> future; |
| if (nextProducer.getTopicEpoch().isPresent()) { |
| future = setTopicEpoch(nextProducer.getTopicEpoch().get()); |
| } else { |
| future = incrementTopicEpoch(topicEpoch); |
| } |
| |
| future.thenAccept(epoch -> { |
| topicEpoch = Optional.of(epoch); |
| producerFuture.complete(topicEpoch); |
| }).exceptionally(ex -> { |
| hasExclusiveProducer = false; |
| exclusiveProducerName = null; |
| producerFuture.completeExceptionally(ex); |
| return null; |
| }); |
| } |
| } finally { |
| lock.writeLock().unlock(); |
| } |
| } |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] [{}] Removed producer -- count: {}", topic, producer.getProducerName(), |
| USAGE_COUNT_UPDATER.get(this)); |
| } |
| lastActive = System.nanoTime(); |
| } |
| |
| public void handleConsumerAdded(String subscriptionName, String consumerName) { |
| USAGE_COUNT_UPDATER.incrementAndGet(this); |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] [{}] [{}] Added consumer -- count: {}", topic, subscriptionName, |
| consumerName, USAGE_COUNT_UPDATER.get(this)); |
| } |
| } |
| |
| public void decrementUsageCount() { |
| USAGE_COUNT_UPDATER.decrementAndGet(this); |
| } |
| |
| public long currentUsageCount() { |
| return usageCount; |
| } |
| |
| @Override |
| public boolean isPublishRateExceeded() { |
| // either topic or broker publish rate exceeded. |
| return this.topicPublishRateLimiter.isPublishRateExceeded() |
| || getBrokerPublishRateLimiter().isPublishRateExceeded(); |
| } |
| |
| @Override |
| public boolean isResourceGroupPublishRateExceeded(int numMessages, int bytes) { |
| return this.resourceGroupRateLimitingEnabled |
| && !this.resourceGroupPublishLimiter.tryAcquire(numMessages, bytes); |
| } |
| |
| @Override |
| public boolean isResourceGroupRateLimitingEnabled() { |
| return this.resourceGroupRateLimitingEnabled; |
| } |
| |
| @Override |
| public boolean isTopicPublishRateExceeded(int numberMessages, int bytes) { |
| // whether topic publish rate exceed if precise rate limit is enable |
| return preciseTopicPublishRateLimitingEnable && !this.topicPublishRateLimiter.tryAcquire(numberMessages, bytes); |
| } |
| |
| @Override |
| public boolean isBrokerPublishRateExceeded() { |
| // whether broker publish rate exceed |
| return getBrokerPublishRateLimiter().isPublishRateExceeded(); |
| } |
| |
| public PublishRateLimiter getTopicPublishRateLimiter() { |
| return topicPublishRateLimiter; |
| } |
| |
| public PublishRateLimiter getBrokerPublishRateLimiter() { |
| return brokerService.getBrokerPublishRateLimiter(); |
| } |
| |
| public void updateMaxPublishRate(Policies policies) { |
| updatePublishDispatcher(Optional.of(policies)); |
| } |
| |
| private void updatePublishDispatcher(Optional<Policies> optPolicies) { |
| //if topic-level policy exists, try to use topic-level publish rate policy |
| Optional<PublishRate> topicPublishRate = getTopicPolicies().map(TopicPolicies::getPublishRate); |
| if (topicPublishRate.isPresent()) { |
| log.info("Using topic policy publish rate instead of namespace level topic publish rate on topic {}", |
| this.topic); |
| updatePublishDispatcher(topicPublishRate.get()); |
| return; |
| } |
| |
| Policies policies; |
| try { |
| if (optPolicies.isPresent()) { |
| policies = optPolicies.get(); |
| } else { |
| policies = brokerService.pulsar().getPulsarResources().getNamespaceResources().getPoliciesIfCached( |
| TopicName.get(topic).getNamespaceObject()) |
| .orElseGet(() -> new Policies()); |
| } |
| } catch (Exception e) { |
| log.warn("[{}] Error getting policies {} and publish throttling will be disabled", topic, e.getMessage()); |
| policies = new Policies(); |
| } |
| |
| //topic-level policy is not set, try to use namespace-level rate policy |
| final String clusterName = brokerService.pulsar().getConfiguration().getClusterName(); |
| final PublishRate publishRate = policies.publishMaxMessageRate != null |
| ? policies.publishMaxMessageRate.get(clusterName) |
| : null; |
| |
| //both namespace-level and topic-level policy are not set, try to use broker-level policy |
| ServiceConfiguration serviceConfiguration = brokerService.pulsar().getConfiguration(); |
| if (publishRate != null) { |
| //publishRate is not null , use namespace-level policy |
| updatePublishDispatcher(publishRate); |
| } else { |
| PublishRate brokerPublishRate = new PublishRate(serviceConfiguration.getMaxPublishRatePerTopicInMessages() |
| , serviceConfiguration.getMaxPublishRatePerTopicInBytes()); |
| updatePublishDispatcher(brokerPublishRate); |
| } |
| |
| // attach the resource-group level rate limiters, if set |
| String rgName = policies.resource_group_name != null |
| ? policies.resource_group_name |
| : null; |
| if (rgName != null) { |
| final ResourceGroup resourceGroup = |
| brokerService.getPulsar().getResourceGroupServiceManager().resourceGroupGet(rgName); |
| if (resourceGroup != null) { |
| this.resourceGroupRateLimitingEnabled = true; |
| this.resourceGroupPublishLimiter = resourceGroup.getResourceGroupPublishLimiter(); |
| this.resourceGroupPublishLimiter.registerRateLimitFunction(this.getName(), |
| () -> this.enableCnxAutoRead()); |
| log.info("Using resource group {} rate limiter for topic {}", rgName, topic); |
| return; |
| } |
| } else { |
| if (this.resourceGroupRateLimitingEnabled) { |
| this.resourceGroupPublishLimiter.unregisterRateLimitFunction(this.getName()); |
| this.resourceGroupPublishLimiter = null; |
| this.resourceGroupRateLimitingEnabled = false; |
| } |
| /* Namespace detached from resource group. Enable the producer read */ |
| enableProducerReadForPublishRateLimiting(); |
| } |
| } |
| |
| public long getMsgInCounter() { |
| return this.msgInCounter.longValue(); |
| } |
| |
| public long getBytesInCounter() { |
| return this.bytesInCounter.longValue(); |
| } |
| |
| public long getMsgOutCounter() { |
| return getStats(false, false, false).msgOutCounter; |
| } |
| |
| public long getBytesOutCounter() { |
| return getStats(false, false, false).bytesOutCounter; |
| } |
| |
| public boolean isDeleteWhileInactive() { |
| return topicPolicies.getInactiveTopicPolicies().get().isDeleteWhileInactive(); |
| } |
| |
| public boolean deletePartitionedTopicMetadataWhileInactive() { |
| return brokerService.pulsar().getConfiguration().isBrokerDeleteInactivePartitionedTopicMetadataEnabled(); |
| } |
| |
| protected abstract boolean isTerminated(); |
| |
| private static final Logger log = LoggerFactory.getLogger(AbstractTopic.class); |
| |
| public InactiveTopicPolicies getInactiveTopicPolicies() { |
| return topicPolicies.getInactiveTopicPolicies().get(); |
| } |
| |
| /** |
| * Get {@link TopicPolicies} for this topic. |
| * @return TopicPolicies, if they exist. Otherwise, the value will not be present. |
| */ |
| public Optional<TopicPolicies> getTopicPolicies() { |
| return brokerService.getTopicPolicies(TopicName.get(topic)); |
| } |
| |
| public CompletableFuture<Void> deleteTopicPolicies() { |
| return brokerService.deleteTopicPolicies(TopicName.get(topic)); |
| } |
| |
| protected int getWaitingProducersCount() { |
| return waitingExclusiveProducers.size(); |
| } |
| |
| protected boolean isExceedMaximumMessageSize(int size) { |
| int topicMaxMessageSize = topicPolicies.getTopicMaxMessageSize().get(); |
| if (topicMaxMessageSize <= 0) { |
| //invalid setting means this check is disabled. |
| return false; |
| } |
| if (topicMaxMessageSize >= brokerService.pulsar().getConfiguration().getMaxMessageSize()) { |
| //broker setting does not contain message header and already handled in client and frameDecoder. |
| return false; |
| } |
| return size > topicMaxMessageSize; |
| } |
| |
| /** |
| * update topic publish dispatcher for this topic. |
| */ |
| protected void updatePublishDispatcher(PublishRate publishRate) { |
| if (publishRate != null && (publishRate.publishThrottlingRateInByte > 0 |
| || publishRate.publishThrottlingRateInMsg > 0)) { |
| log.info("Enabling publish rate limiting {} ", publishRate); |
| if (!preciseTopicPublishRateLimitingEnable) { |
| this.brokerService.setupTopicPublishRateLimiterMonitor(); |
| } |
| |
| if (this.topicPublishRateLimiter == null |
| || this.topicPublishRateLimiter == PublishRateLimiter.DISABLED_RATE_LIMITER) { |
| // create new rateLimiter if rate-limiter is disabled |
| if (preciseTopicPublishRateLimitingEnable) { |
| this.topicPublishRateLimiter = new PrecisPublishLimiter(publishRate, |
| () -> this.enableCnxAutoRead(), brokerService.pulsar().getExecutor()); |
| } else { |
| this.topicPublishRateLimiter = new PublishRateLimiterImpl(publishRate); |
| } |
| } else { |
| this.topicPublishRateLimiter.update(publishRate); |
| } |
| } else { |
| log.info("Disabling publish throttling for {}", this.topic); |
| this.topicPublishRateLimiter = PublishRateLimiter.DISABLED_RATE_LIMITER; |
| enableProducerReadForPublishRateLimiting(); |
| } |
| } |
| |
| public HierarchyTopicPolicies getHierarchyTopicPolicies() { |
| return topicPolicies; |
| } |
| |
| // subscriptionTypesEnabled is dynamic and can be updated online. |
| public void updateBrokerSubscriptionTypesEnabled() { |
| topicPolicies.getSubscriptionTypesEnabled().updateBrokerValue( |
| subTypeStringsToEnumSet(brokerService.pulsar().getConfiguration().getSubscriptionTypesEnabled())); |
| } |
| } |