| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.broker.service; |
| |
| import static com.google.common.base.Preconditions.checkArgument; |
| import static com.google.common.base.Preconditions.checkNotNull; |
| import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; |
| import static org.apache.commons.collections.CollectionUtils.isEmpty; |
| import static org.apache.commons.lang3.StringUtils.isNotBlank; |
| import static org.apache.pulsar.broker.PulsarService.isTransactionSystemTopic; |
| import static org.apache.pulsar.common.events.EventsTopicNames.checkTopicIsEventsNames; |
| import com.google.common.annotations.VisibleForTesting; |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Queues; |
| import io.netty.bootstrap.ServerBootstrap; |
| import io.netty.buffer.ByteBuf; |
| import io.netty.channel.AdaptiveRecvByteBufAllocator; |
| import io.netty.channel.Channel; |
| import io.netty.channel.ChannelInitializer; |
| import io.netty.channel.ChannelOption; |
| import io.netty.channel.EventLoopGroup; |
| import io.netty.channel.socket.SocketChannel; |
| import io.netty.handler.ssl.SslContext; |
| import io.netty.util.concurrent.DefaultThreadFactory; |
| import java.io.Closeable; |
| import java.io.IOException; |
| import java.lang.reflect.Field; |
| import java.net.InetSocketAddress; |
| import java.net.SocketAddress; |
| import java.time.Duration; |
| import java.util.ArrayList; |
| import java.util.Arrays; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.HashSet; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.concurrent.CancellationException; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ConcurrentLinkedQueue; |
| import java.util.concurrent.ExecutionException; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.RejectedExecutionException; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.ScheduledFuture; |
| import java.util.concurrent.Semaphore; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.TimeoutException; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.concurrent.atomic.LongAdder; |
| import java.util.concurrent.locks.ReadWriteLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| import java.util.function.Consumer; |
| import java.util.function.Predicate; |
| import javax.ws.rs.core.Response; |
| import lombok.AccessLevel; |
| import lombok.Getter; |
| import lombok.Setter; |
| import org.apache.bookkeeper.common.util.OrderedExecutor; |
| import org.apache.bookkeeper.common.util.OrderedScheduler; |
| import org.apache.bookkeeper.mledger.AsyncCallbacks.DeleteLedgerCallback; |
| import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback; |
| import org.apache.bookkeeper.mledger.LedgerOffloader; |
| import org.apache.bookkeeper.mledger.ManagedLedger; |
| import org.apache.bookkeeper.mledger.ManagedLedgerConfig; |
| import org.apache.bookkeeper.mledger.ManagedLedgerException; |
| import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException; |
| import org.apache.bookkeeper.mledger.ManagedLedgerFactory; |
| import org.apache.bookkeeper.mledger.util.Futures; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.commons.lang3.tuple.ImmutablePair; |
| import org.apache.commons.lang3.tuple.Pair; |
| import org.apache.pulsar.bookie.rackawareness.IsolatedBookieEnsemblePlacementPolicy; |
| import org.apache.pulsar.broker.PulsarServerException; |
| import org.apache.pulsar.broker.PulsarService; |
| import org.apache.pulsar.broker.ServiceConfiguration; |
| import org.apache.pulsar.broker.TransactionMetadataStoreService; |
| import org.apache.pulsar.broker.authentication.AuthenticationService; |
| import org.apache.pulsar.broker.authorization.AuthorizationService; |
| import org.apache.pulsar.broker.cache.BundlesQuotas; |
| import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory; |
| import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerLoader; |
| import org.apache.pulsar.broker.intercept.BrokerInterceptor; |
| import org.apache.pulsar.broker.intercept.ManagedLedgerInterceptorImpl; |
| import org.apache.pulsar.broker.loadbalance.LoadManager; |
| import org.apache.pulsar.broker.resources.LocalPoliciesResources; |
| import org.apache.pulsar.broker.resources.NamespaceResources; |
| import org.apache.pulsar.broker.resources.NamespaceResources.PartitionedTopicResources; |
| import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; |
| import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException; |
| import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException; |
| import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; |
| import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; |
| import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; |
| import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; |
| import org.apache.pulsar.broker.service.persistent.PersistentTopic; |
| import org.apache.pulsar.broker.service.persistent.SystemTopic; |
| import org.apache.pulsar.broker.service.plugin.EntryFilterProvider; |
| import org.apache.pulsar.broker.service.plugin.EntryFilterWithClassLoader; |
| import org.apache.pulsar.broker.stats.ClusterReplicationMetrics; |
| import org.apache.pulsar.broker.stats.prometheus.metrics.ObserverGauge; |
| import org.apache.pulsar.broker.stats.prometheus.metrics.Summary; |
| import org.apache.pulsar.broker.systopic.SystemTopicClient; |
| import org.apache.pulsar.broker.validator.BindAddressValidator; |
| import org.apache.pulsar.client.admin.PulsarAdmin; |
| import org.apache.pulsar.client.admin.PulsarAdminBuilder; |
| import org.apache.pulsar.client.api.ClientBuilder; |
| import org.apache.pulsar.client.api.PulsarClient; |
| import org.apache.pulsar.client.impl.ClientBuilderImpl; |
| import org.apache.pulsar.client.impl.PulsarClientImpl; |
| import org.apache.pulsar.client.impl.conf.ClientConfigurationData; |
| import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; |
| import org.apache.pulsar.common.configuration.BindAddress; |
| import org.apache.pulsar.common.configuration.FieldContext; |
| import org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor; |
| import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor; |
| import org.apache.pulsar.common.intercept.BrokerEntryMetadataUtils; |
| import org.apache.pulsar.common.intercept.ManagedLedgerPayloadProcessor; |
| import org.apache.pulsar.common.naming.NamespaceBundle; |
| import org.apache.pulsar.common.naming.NamespaceName; |
| import org.apache.pulsar.common.naming.TopicDomain; |
| import org.apache.pulsar.common.naming.TopicName; |
| import org.apache.pulsar.common.partition.PartitionedTopicMetadata; |
| import org.apache.pulsar.common.policies.data.AutoSubscriptionCreationOverride; |
| import org.apache.pulsar.common.policies.data.AutoTopicCreationOverride; |
| import org.apache.pulsar.common.policies.data.BacklogQuota; |
| import org.apache.pulsar.common.policies.data.ClusterData; |
| import org.apache.pulsar.common.policies.data.OffloadPoliciesImpl; |
| import org.apache.pulsar.common.policies.data.PersistencePolicies; |
| import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats; |
| import org.apache.pulsar.common.policies.data.Policies; |
| import org.apache.pulsar.common.policies.data.PublishRate; |
| import org.apache.pulsar.common.policies.data.RetentionPolicies; |
| import org.apache.pulsar.common.policies.data.TopicPolicies; |
| import org.apache.pulsar.common.policies.data.TopicType; |
| import org.apache.pulsar.common.policies.data.stats.TopicStatsImpl; |
| import org.apache.pulsar.common.protocol.schema.SchemaVersion; |
| import org.apache.pulsar.common.stats.Metrics; |
| import org.apache.pulsar.common.util.FieldParser; |
| import org.apache.pulsar.common.util.FutureUtil; |
| import org.apache.pulsar.common.util.RestException; |
| import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; |
| import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet; |
| import org.apache.pulsar.common.util.netty.ChannelFutures; |
| import org.apache.pulsar.common.util.netty.EventLoopUtil; |
| import org.apache.pulsar.common.util.netty.NettyFutureUtil; |
| import org.apache.pulsar.compaction.Compactor; |
| import org.apache.pulsar.metadata.api.MetadataStoreException; |
| import org.apache.pulsar.metadata.api.Notification; |
| import org.apache.pulsar.metadata.api.NotificationType; |
| import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; |
| import org.apache.pulsar.transaction.coordinator.TransactionMetadataStore; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| @Getter(AccessLevel.PUBLIC) |
| @Setter(AccessLevel.PROTECTED) |
| public class BrokerService implements Closeable { |
| private static final Logger log = LoggerFactory.getLogger(BrokerService.class); |
| private static final Duration FUTURE_DEADLINE_TIMEOUT_DURATION = Duration.ofSeconds(60); |
| private static final TimeoutException FUTURE_DEADLINE_TIMEOUT_EXCEPTION = |
| FutureUtil.createTimeoutException("Future didn't finish within deadline", BrokerService.class, |
| "futureWithDeadline(...)"); |
| private static final TimeoutException FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION = |
| FutureUtil.createTimeoutException("Failed to load topic within timeout", BrokerService.class, |
| "futureWithDeadline(...)"); |
| private static final long GRACEFUL_SHUTDOWN_QUIET_PERIOD_MAX_MS = 5000L; |
| private static final double GRACEFUL_SHUTDOWN_QUIET_PERIOD_RATIO_OF_TOTAL_TIMEOUT = 0.25d; |
| private static final double GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT = 0.5d; |
| |
| private final PulsarService pulsar; |
| private final ManagedLedgerFactory managedLedgerFactory; |
| |
| private final ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> topics; |
| |
| private final ConcurrentOpenHashMap<String, PulsarClient> replicationClients; |
| private final ConcurrentOpenHashMap<String, PulsarAdmin> clusterAdmins; |
| |
| // Multi-layer topics map: |
| // Namespace --> Bundle --> topicName --> topic |
| private final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>>> |
| multiLayerTopicsMap; |
| // Keep track of topics and partitions served by this broker for fast lookup. |
| @Getter |
| private final ConcurrentOpenHashMap<String, ConcurrentOpenHashSet<Integer>> owningTopics; |
| private int numberOfNamespaceBundles = 0; |
| |
| private final EventLoopGroup acceptorGroup; |
| private final EventLoopGroup workerGroup; |
| private final OrderedExecutor topicOrderedExecutor; |
| // offline topic backlog cache |
| private final ConcurrentOpenHashMap<TopicName, PersistentOfflineTopicStats> offlineTopicStatCache; |
| private static final ConcurrentOpenHashMap<String, ConfigField> dynamicConfigurationMap = |
| prepareDynamicConfigurationMap(); |
| private final ConcurrentOpenHashMap<String, Consumer<?>> configRegisteredListeners; |
| |
| private final ConcurrentLinkedQueue<Pair<String, CompletableFuture<Optional<Topic>>>> pendingTopicLoadingQueue; |
| |
| private AuthorizationService authorizationService = null; |
| private final ScheduledExecutorService statsUpdater; |
| private final ScheduledExecutorService backlogQuotaChecker; |
| |
| protected final AtomicReference<Semaphore> lookupRequestSemaphore; |
| protected final AtomicReference<Semaphore> topicLoadRequestSemaphore; |
| |
| private final ObserverGauge pendingLookupRequests; |
| private final ObserverGauge pendingTopicLoadRequests; |
| |
| private final ScheduledExecutorService inactivityMonitor; |
| private final ScheduledExecutorService messageExpiryMonitor; |
| private final ScheduledExecutorService compactionMonitor; |
| private final ScheduledExecutorService consumedLedgersMonitor; |
| private ScheduledExecutorService topicPublishRateLimiterMonitor; |
| private ScheduledExecutorService brokerPublishRateLimiterMonitor; |
| private ScheduledExecutorService deduplicationSnapshotMonitor; |
| protected volatile PublishRateLimiter brokerPublishRateLimiter = PublishRateLimiter.DISABLED_RATE_LIMITER; |
| |
| private DistributedIdGenerator producerNameGenerator; |
| |
| public static final String PRODUCER_NAME_GENERATOR_PATH = "/counters/producer-name"; |
| |
| private final BacklogQuotaManager backlogQuotaManager; |
| |
| private final int keepAliveIntervalSeconds; |
| private final PulsarStats pulsarStats; |
| private final AuthenticationService authenticationService; |
| |
| public static final String MANAGED_LEDGER_PATH_ZNODE = "/managed-ledgers"; |
| |
| private static final LongAdder totalUnackedMessages = new LongAdder(); |
| private final int maxUnackedMessages; |
| public final int maxUnackedMsgsPerDispatcher; |
| private static final AtomicBoolean blockedDispatcherOnHighUnackedMsgs = new AtomicBoolean(false); |
| private final ConcurrentOpenHashSet<PersistentDispatcherMultipleConsumers> blockedDispatchers; |
| private final ReadWriteLock lock = new ReentrantReadWriteLock(); |
| |
| private final DelayedDeliveryTrackerFactory delayedDeliveryTrackerFactory; |
| private final ServerBootstrap defaultServerBootstrap; |
| private final List<EventLoopGroup> protocolHandlersWorkerGroups = new ArrayList<>(); |
| |
| @Getter |
| private final BundlesQuotas bundlesQuotas; |
| |
| private PulsarChannelInitializer.Factory pulsarChannelInitFactory = PulsarChannelInitializer.DEFAULT_FACTORY; |
| |
| private final List<Channel> listenChannels = new ArrayList<>(2); |
| private Channel listenChannel; |
| private Channel listenChannelTls; |
| |
| private boolean preciseTopicPublishRateLimitingEnable; |
| private final LongAdder pausedConnections = new LongAdder(); |
| private BrokerInterceptor interceptor; |
| private ImmutableMap<String, EntryFilterWithClassLoader> entryFilters; |
| |
| private Set<BrokerEntryMetadataInterceptor> brokerEntryMetadataInterceptors; |
| private Set<ManagedLedgerPayloadProcessor> brokerEntryPayloadProcessors; |
| |
| public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws Exception { |
| this.pulsar = pulsar; |
| this.preciseTopicPublishRateLimitingEnable = |
| pulsar.getConfiguration().isPreciseTopicPublishRateLimiterEnable(); |
| this.managedLedgerFactory = pulsar.getManagedLedgerFactory(); |
| this.topics = new ConcurrentOpenHashMap<>(); |
| this.replicationClients = new ConcurrentOpenHashMap<>(); |
| this.clusterAdmins = new ConcurrentOpenHashMap<>(); |
| this.keepAliveIntervalSeconds = pulsar.getConfiguration().getKeepAliveIntervalSeconds(); |
| this.configRegisteredListeners = new ConcurrentOpenHashMap<>(); |
| this.pendingTopicLoadingQueue = Queues.newConcurrentLinkedQueue(); |
| |
| this.multiLayerTopicsMap = new ConcurrentOpenHashMap<>(); |
| this.owningTopics = new ConcurrentOpenHashMap<>(); |
| this.pulsarStats = new PulsarStats(pulsar); |
| this.offlineTopicStatCache = new ConcurrentOpenHashMap<>(); |
| |
| this.topicOrderedExecutor = OrderedScheduler.newSchedulerBuilder() |
| .numThreads(pulsar.getConfiguration().getNumWorkerThreadsForNonPersistentTopic()) |
| .name("broker-topic-workers").build(); |
| final DefaultThreadFactory acceptorThreadFactory = new DefaultThreadFactory("pulsar-acceptor"); |
| |
| this.acceptorGroup = EventLoopUtil.newEventLoopGroup( |
| pulsar.getConfiguration().getNumAcceptorThreads(), false, acceptorThreadFactory); |
| this.workerGroup = eventLoopGroup; |
| this.statsUpdater = Executors |
| .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-stats-updater")); |
| this.authorizationService = new AuthorizationService( |
| pulsar.getConfiguration(), pulsar().getPulsarResources()); |
| if (!pulsar.getConfiguration().getEntryFilterNames().isEmpty()) { |
| this.entryFilters = EntryFilterProvider.createEntryFilters(pulsar.getConfiguration()); |
| } |
| |
| pulsar.getLocalMetadataStore().registerListener(this::handleMetadataChanges); |
| pulsar.getConfigurationMetadataStore().registerListener(this::handleMetadataChanges); |
| |
| this.inactivityMonitor = Executors |
| .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-inactivity-monitor")); |
| this.messageExpiryMonitor = Executors |
| .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-msg-expiry-monitor")); |
| this.compactionMonitor = |
| Executors.newSingleThreadScheduledExecutor( |
| new DefaultThreadFactory("pulsar-compaction-monitor")); |
| this.consumedLedgersMonitor = Executors |
| .newSingleThreadScheduledExecutor(new DefaultThreadFactory("consumed-Ledgers-monitor")); |
| |
| this.backlogQuotaManager = new BacklogQuotaManager(pulsar); |
| this.backlogQuotaChecker = Executors |
| .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-backlog-quota-checker")); |
| this.authenticationService = new AuthenticationService(pulsar.getConfiguration()); |
| this.blockedDispatchers = new ConcurrentOpenHashSet<>(); |
| // update dynamic configuration and register-listener |
| updateConfigurationAndRegisterListeners(); |
| this.lookupRequestSemaphore = new AtomicReference<Semaphore>( |
| new Semaphore(pulsar.getConfiguration().getMaxConcurrentLookupRequest(), false)); |
| this.topicLoadRequestSemaphore = new AtomicReference<Semaphore>( |
| new Semaphore(pulsar.getConfiguration().getMaxConcurrentTopicLoadRequest(), false)); |
| if (pulsar.getConfiguration().getMaxUnackedMessagesPerBroker() > 0 |
| && pulsar.getConfiguration().getMaxUnackedMessagesPerSubscriptionOnBrokerBlocked() > 0.0) { |
| this.maxUnackedMessages = pulsar.getConfiguration().getMaxUnackedMessagesPerBroker(); |
| this.maxUnackedMsgsPerDispatcher = (int) ((maxUnackedMessages |
| * pulsar.getConfiguration().getMaxUnackedMessagesPerSubscriptionOnBrokerBlocked()) / 100); |
| log.info("Enabling per-broker unack-message limit {} and dispatcher-limit {} on blocked-broker", |
| maxUnackedMessages, maxUnackedMsgsPerDispatcher); |
| // block misbehaving dispatcher by checking periodically |
| pulsar.getExecutor().scheduleAtFixedRate(safeRun(this::checkUnAckMessageDispatching), |
| 600, 30, TimeUnit.SECONDS); |
| } else { |
| this.maxUnackedMessages = 0; |
| this.maxUnackedMsgsPerDispatcher = 0; |
| log.info( |
| "Disabling per broker unack-msg blocking due invalid" |
| + " unAckMsgSubscriptionPercentageLimitOnBrokerBlocked {} ", |
| pulsar.getConfiguration().getMaxUnackedMessagesPerSubscriptionOnBrokerBlocked()); |
| } |
| |
| this.delayedDeliveryTrackerFactory = DelayedDeliveryTrackerLoader |
| .loadDelayedDeliveryTrackerFactory(pulsar.getConfiguration()); |
| |
| this.defaultServerBootstrap = defaultServerBootstrap(); |
| |
| this.pendingLookupRequests = ObserverGauge.build("pulsar_broker_lookup_pending_requests", "-") |
| .supplier(() -> pulsar.getConfig().getMaxConcurrentLookupRequest() |
| - lookupRequestSemaphore.get().availablePermits()) |
| .register(); |
| |
| this.pendingTopicLoadRequests = ObserverGauge.build( |
| "pulsar_broker_topic_load_pending_requests", "-") |
| .supplier(() -> pulsar.getConfig().getMaxConcurrentTopicLoadRequest() |
| - topicLoadRequestSemaphore.get().availablePermits()) |
| .register(); |
| |
| this.brokerEntryMetadataInterceptors = BrokerEntryMetadataUtils |
| .loadBrokerEntryMetadataInterceptors(pulsar.getConfiguration().getBrokerEntryMetadataInterceptors(), |
| BrokerService.class.getClassLoader()); |
| |
| this.brokerEntryPayloadProcessors = BrokerEntryMetadataUtils.loadInterceptors(pulsar.getConfiguration() |
| .getBrokerEntryPayloadProcessors(), BrokerService.class.getClassLoader()); |
| |
| this.bundlesQuotas = new BundlesQuotas(pulsar.getLocalMetadataStore()); |
| } |
| |
| // This call is used for starting additional protocol handlers |
| public void startProtocolHandlers( |
| Map<String, Map<InetSocketAddress, ChannelInitializer<SocketChannel>>> protocolHandlers) { |
| |
| protocolHandlers.forEach((protocol, initializers) -> { |
| initializers.forEach((address, initializer) -> { |
| try { |
| startProtocolHandler(protocol, address, initializer); |
| } catch (IOException e) { |
| log.error("{}", e.getMessage(), e.getCause()); |
| throw new RuntimeException(e.getMessage(), e.getCause()); |
| } |
| }); |
| }); |
| } |
| |
| private void startProtocolHandler(String protocol, |
| SocketAddress address, |
| ChannelInitializer<SocketChannel> initializer) throws IOException { |
| |
| ServiceConfiguration configuration = pulsar.getConfiguration(); |
| boolean useSeparateThreadPool = configuration.isUseSeparateThreadPoolForProtocolHandlers(); |
| ServerBootstrap bootstrap; |
| if (useSeparateThreadPool) { |
| bootstrap = new ServerBootstrap(); |
| bootstrap.childOption(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT); |
| bootstrap.childOption(ChannelOption.TCP_NODELAY, true); |
| bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, |
| new AdaptiveRecvByteBufAllocator(1024, 16 * 1024, 1 * 1024 * 1024)); |
| EventLoopUtil.enableTriggeredMode(bootstrap); |
| DefaultThreadFactory defaultThreadFactory = new DefaultThreadFactory("pulsar-ph-" + protocol); |
| EventLoopGroup dedicatedWorkerGroup = |
| EventLoopUtil.newEventLoopGroup(configuration.getNumIOThreads(), false, defaultThreadFactory); |
| bootstrap.channel(EventLoopUtil.getServerSocketChannelClass(dedicatedWorkerGroup)); |
| protocolHandlersWorkerGroups.add(dedicatedWorkerGroup); |
| bootstrap.group(this.acceptorGroup, dedicatedWorkerGroup); |
| } else { |
| bootstrap = defaultServerBootstrap.clone(); |
| } |
| bootstrap.childHandler(initializer); |
| try { |
| bootstrap.bind(address).sync(); |
| } catch (Exception e) { |
| throw new IOException("Failed to bind protocol `" + protocol + "` on " + address, e); |
| } |
| log.info("Successfully bind protocol `{}` on {}", protocol, address); |
| } |
| |
| private ServerBootstrap defaultServerBootstrap() { |
| ServerBootstrap bootstrap = new ServerBootstrap(); |
| bootstrap.childOption(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT); |
| bootstrap.group(acceptorGroup, workerGroup); |
| bootstrap.childOption(ChannelOption.TCP_NODELAY, true); |
| bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, |
| new AdaptiveRecvByteBufAllocator(1024, 16 * 1024, 1 * 1024 * 1024)); |
| bootstrap.channel(EventLoopUtil.getServerSocketChannelClass(workerGroup)); |
| EventLoopUtil.enableTriggeredMode(bootstrap); |
| return bootstrap; |
| } |
| |
| public void start() throws Exception { |
| this.producerNameGenerator = new DistributedIdGenerator(pulsar.getCoordinationService(), |
| PRODUCER_NAME_GENERATOR_PATH, pulsar.getConfiguration().getClusterName()); |
| |
| ServiceConfiguration serviceConfig = pulsar.getConfiguration(); |
| List<BindAddress> bindAddresses = BindAddressValidator.validateBindAddresses(serviceConfig, |
| Arrays.asList("pulsar", "pulsar+ssl")); |
| String internalListenerName = serviceConfig.getInternalListenerName(); |
| |
| // create a channel for each bind address |
| if (bindAddresses.size() == 0) { |
| throw new IllegalArgumentException("At least one broker bind address must be configured"); |
| } |
| for (BindAddress a : bindAddresses) { |
| InetSocketAddress addr = new InetSocketAddress(a.getAddress().getHost(), a.getAddress().getPort()); |
| boolean isTls = "pulsar+ssl".equals(a.getAddress().getScheme()); |
| PulsarChannelInitializer.PulsarChannelOptions opts = PulsarChannelInitializer.PulsarChannelOptions.builder() |
| .enableTLS(isTls) |
| .listenerName(a.getListenerName()).build(); |
| |
| ServerBootstrap b = defaultServerBootstrap.clone(); |
| b.childHandler( |
| pulsarChannelInitFactory.newPulsarChannelInitializer(pulsar, opts)); |
| try { |
| Channel ch = b.bind(addr).sync().channel(); |
| listenChannels.add(ch); |
| |
| // identify the primary channel. Note that the legacy bindings appear first and have no listener. |
| if (StringUtils.isBlank(a.getListenerName()) |
| || StringUtils.equalsIgnoreCase(a.getListenerName(), internalListenerName)) { |
| if (this.listenChannel == null && !isTls) { |
| this.listenChannel = ch; |
| } |
| if (this.listenChannelTls == null && isTls) { |
| this.listenChannelTls = ch; |
| } |
| } |
| |
| log.info("Started Pulsar Broker service on {}, TLS: {}, listener: {}", |
| ch.localAddress(), |
| isTls ? SslContext.defaultServerProvider().toString() : "(none)", |
| StringUtils.defaultString(a.getListenerName(), "(none)")); |
| } catch (Exception e) { |
| throw new IOException("Failed to bind Pulsar broker on " + addr, e); |
| } |
| } |
| |
| // start other housekeeping functions |
| this.startStatsUpdater( |
| serviceConfig.getStatsUpdateInitialDelayInSecs(), |
| serviceConfig.getStatsUpdateFrequencyInSecs()); |
| this.startInactivityMonitor(); |
| this.startMessageExpiryMonitor(); |
| this.startCompactionMonitor(); |
| this.startConsumedLedgersMonitor(); |
| this.startBacklogQuotaChecker(); |
| this.updateBrokerPublisherThrottlingMaxRate(); |
| this.startCheckReplicationPolicies(); |
| this.startDeduplicationSnapshotMonitor(); |
| } |
| |
| protected void startStatsUpdater(int statsUpdateInitialDelayInSecs, int statsUpdateFrequencyInSecs) { |
| statsUpdater.scheduleAtFixedRate(safeRun(this::updateRates), |
| statsUpdateInitialDelayInSecs, statsUpdateFrequencyInSecs, TimeUnit.SECONDS); |
| |
| // Ensure the broker starts up with initial stats |
| updateRates(); |
| } |
| |
| protected void startDeduplicationSnapshotMonitor() { |
| int interval = pulsar().getConfiguration().getBrokerDeduplicationSnapshotFrequencyInSeconds(); |
| if (interval > 0 && pulsar().getConfiguration().isBrokerDeduplicationEnabled()) { |
| this.deduplicationSnapshotMonitor = |
| Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory( |
| "deduplication-snapshot-monitor")); |
| deduplicationSnapshotMonitor.scheduleAtFixedRate(safeRun(() -> forEachTopic( |
| Topic::checkDeduplicationSnapshot)) |
| , interval, interval, TimeUnit.SECONDS); |
| } |
| } |
| |
| protected void startInactivityMonitor() { |
| if (pulsar().getConfiguration().isBrokerDeleteInactiveTopicsEnabled()) { |
| int interval = pulsar().getConfiguration().getBrokerDeleteInactiveTopicsFrequencySeconds(); |
| inactivityMonitor.scheduleAtFixedRate(safeRun(() -> checkGC()), interval, interval, |
| TimeUnit.SECONDS); |
| } |
| |
| // Deduplication info checker |
| long duplicationCheckerIntervalInSeconds = TimeUnit.MINUTES |
| .toSeconds(pulsar().getConfiguration().getBrokerDeduplicationProducerInactivityTimeoutMinutes()) / 3; |
| inactivityMonitor.scheduleAtFixedRate(safeRun(this::checkMessageDeduplicationInfo), |
| duplicationCheckerIntervalInSeconds, |
| duplicationCheckerIntervalInSeconds, TimeUnit.SECONDS); |
| |
| // Inactive subscriber checker |
| if (pulsar().getConfiguration().getSubscriptionExpiryCheckIntervalInMinutes() > 0) { |
| long subscriptionExpiryCheckIntervalInSeconds = |
| TimeUnit.MINUTES.toSeconds(pulsar().getConfiguration() |
| .getSubscriptionExpiryCheckIntervalInMinutes()); |
| inactivityMonitor.scheduleAtFixedRate(safeRun(this::checkInactiveSubscriptions), |
| subscriptionExpiryCheckIntervalInSeconds, |
| subscriptionExpiryCheckIntervalInSeconds, TimeUnit.SECONDS); |
| } |
| } |
| |
| protected void startMessageExpiryMonitor() { |
| int interval = pulsar().getConfiguration().getMessageExpiryCheckIntervalInMinutes(); |
| messageExpiryMonitor.scheduleAtFixedRate(safeRun(this::checkMessageExpiry), interval, interval, |
| TimeUnit.MINUTES); |
| } |
| |
| protected void startCheckReplicationPolicies() { |
| int interval = pulsar.getConfig().getReplicationPolicyCheckDurationSeconds(); |
| if (interval > 0) { |
| messageExpiryMonitor.scheduleAtFixedRate(safeRun(this::checkReplicationPolicies), interval, interval, |
| TimeUnit.SECONDS); |
| } |
| } |
| |
| protected void startCompactionMonitor() { |
| int interval = pulsar().getConfiguration().getBrokerServiceCompactionMonitorIntervalInSeconds(); |
| if (interval > 0) { |
| compactionMonitor.scheduleAtFixedRate(safeRun(() -> checkCompaction()), |
| interval, interval, TimeUnit.SECONDS); |
| } |
| } |
| |
| protected void startConsumedLedgersMonitor() { |
| int interval = pulsar().getConfiguration().getRetentionCheckIntervalInSeconds(); |
| if (interval > 0) { |
| consumedLedgersMonitor.scheduleAtFixedRate(safeRun(this::checkConsumedLedgers), |
| interval, interval, TimeUnit.SECONDS); |
| } |
| } |
| |
| protected void startBacklogQuotaChecker() { |
| if (pulsar().getConfiguration().isBacklogQuotaCheckEnabled()) { |
| final int interval = pulsar().getConfiguration().getBacklogQuotaCheckIntervalInSeconds(); |
| log.info("Scheduling a thread to check backlog quota after [{}] seconds in background", interval); |
| backlogQuotaChecker.scheduleAtFixedRate(safeRun(this::monitorBacklogQuota), interval, interval, |
| TimeUnit.SECONDS); |
| } else { |
| log.info("Backlog quota check monitoring is disabled"); |
| } |
| |
| } |
| |
| /** |
| * Schedules and monitors publish-throttling for all owned topics that has publish-throttling configured. It also |
| * disables and shutdowns publish-rate-limiter monitor task if broker disables it. |
| */ |
| public synchronized void setupTopicPublishRateLimiterMonitor() { |
| // set topic PublishRateLimiterMonitor |
| long topicTickTimeMs = pulsar().getConfiguration().getTopicPublisherThrottlingTickTimeMillis(); |
| if (topicTickTimeMs > 0) { |
| if (this.topicPublishRateLimiterMonitor == null) { |
| this.topicPublishRateLimiterMonitor = Executors.newSingleThreadScheduledExecutor( |
| new DefaultThreadFactory("pulsar-topic-publish-rate-limiter-monitor")); |
| // schedule task that sums up publish-rate across all cnx on a topic |
| topicPublishRateLimiterMonitor.scheduleAtFixedRate(safeRun(() -> checkTopicPublishThrottlingRate()), |
| topicTickTimeMs, topicTickTimeMs, TimeUnit.MILLISECONDS); |
| // schedule task that refreshes rate-limiting bucket |
| topicPublishRateLimiterMonitor.scheduleAtFixedRate(safeRun(() -> refreshTopicPublishRate()), 1, 1, |
| TimeUnit.SECONDS); |
| } |
| } else { |
| // disable publish-throttling for all topics |
| if (this.topicPublishRateLimiterMonitor != null) { |
| try { |
| this.topicPublishRateLimiterMonitor.awaitTermination(30, TimeUnit.SECONDS); |
| } catch (InterruptedException e) { |
| log.warn("failed to shutdown topicPublishRateLimiterMonitor", e); |
| } |
| // make sure topics are not being throttled |
| refreshTopicPublishRate(); |
| this.topicPublishRateLimiterMonitor = null; |
| } |
| } |
| } |
| |
| /** |
| * Schedules and monitors publish-throttling for broker that has publish-throttling configured. It also |
| * disables and shutdowns publish-rate-limiter monitor for broker task if broker disables it. |
| */ |
| public synchronized void setupBrokerPublishRateLimiterMonitor() { |
| // set broker PublishRateLimiterMonitor |
| long brokerTickTimeMs = pulsar().getConfiguration().getBrokerPublisherThrottlingTickTimeMillis(); |
| if (brokerTickTimeMs > 0) { |
| if (this.brokerPublishRateLimiterMonitor == null) { |
| this.brokerPublishRateLimiterMonitor = Executors.newSingleThreadScheduledExecutor( |
| new DefaultThreadFactory("pulsar-broker-publish-rate-limiter-monitor")); |
| // schedule task that sums up publish-rate across all cnx on a topic, |
| // and check the rate limit exceeded or not. |
| brokerPublishRateLimiterMonitor.scheduleAtFixedRate( |
| safeRun(() -> checkBrokerPublishThrottlingRate()), |
| brokerTickTimeMs, |
| brokerTickTimeMs, |
| TimeUnit.MILLISECONDS); |
| // schedule task that refreshes rate-limiting bucket |
| brokerPublishRateLimiterMonitor.scheduleAtFixedRate( |
| safeRun(() -> refreshBrokerPublishRate()), |
| 1, |
| 1, |
| TimeUnit.SECONDS); |
| } |
| } else { |
| // disable publish-throttling for broker. |
| if (this.brokerPublishRateLimiterMonitor != null) { |
| try { |
| this.brokerPublishRateLimiterMonitor.awaitTermination(30, TimeUnit.SECONDS); |
| } catch (InterruptedException e) { |
| log.warn("failed to shutdown brokerPublishRateLimiterMonitor", e); |
| } |
| // make sure topics are not being throttled |
| refreshBrokerPublishRate(); |
| this.brokerPublishRateLimiterMonitor = null; |
| } |
| } |
| } |
| |
| public void close() throws IOException { |
| try { |
| closeAsync().get(); |
| } catch (ExecutionException e) { |
| if (e.getCause() instanceof IOException) { |
| throw (IOException) e.getCause(); |
| } else { |
| throw new PulsarServerException(e.getCause()); |
| } |
| } catch (InterruptedException e) { |
| Thread.currentThread().interrupt(); |
| } |
| } |
| |
| public CompletableFuture<Void> closeAndRemoveReplicationClient(String clusterName) { |
| List<CompletableFuture<Void>> futures = new ArrayList<>((int) topics.size()); |
| topics.forEach((__, future) -> { |
| CompletableFuture<Void> f = new CompletableFuture<>(); |
| futures.add(f); |
| future.whenComplete((ot, ex) -> { |
| if (ot.isPresent()) { |
| Replicator r = ot.get().getReplicators().get(clusterName); |
| if (r != null && r.isConnected()) { |
| r.disconnect(false).whenComplete((v, e) -> f.complete(null)); |
| return; |
| } |
| } |
| f.complete(null); |
| }); |
| }); |
| |
| return FutureUtil.waitForAll(futures).thenCompose(__ -> { |
| PulsarClient client = replicationClients.remove(clusterName); |
| if (client == null) { |
| return CompletableFuture.completedFuture(null); |
| } |
| return client.closeAsync(); |
| }); |
| } |
| |
| public CompletableFuture<Void> closeAsync() { |
| try { |
| log.info("Shutting down Pulsar Broker service"); |
| |
| // unloads all namespaces gracefully without disrupting mutually |
| unloadNamespaceBundlesGracefully(); |
| |
| // close replication clients |
| replicationClients.forEach((cluster, client) -> { |
| try { |
| client.shutdown(); |
| } catch (Exception e) { |
| log.warn("Error shutting down repl client for cluster {}", cluster, e); |
| } |
| }); |
| |
| // close replication admins |
| clusterAdmins.forEach((cluster, admin) -> { |
| try { |
| admin.close(); |
| } catch (Exception e) { |
| log.warn("Error shutting down repl admin for cluster {}", cluster, e); |
| } |
| }); |
| |
| //close entry filters |
| if (entryFilters != null) { |
| entryFilters.forEach((name, filter) -> { |
| try { |
| filter.close(); |
| } catch (Exception e) { |
| log.warn("Error shutting down entry filter {}", name, e); |
| } |
| }); |
| } |
| |
| CompletableFuture<CompletableFuture<Void>> cancellableDownstreamFutureReference = new CompletableFuture<>(); |
| log.info("Event loops shutting down gracefully..."); |
| List<CompletableFuture<?>> shutdownEventLoops = new ArrayList<>(); |
| shutdownEventLoops.add(shutdownEventLoopGracefully(acceptorGroup)); |
| shutdownEventLoops.add(shutdownEventLoopGracefully(workerGroup)); |
| for (EventLoopGroup group : protocolHandlersWorkerGroups) { |
| shutdownEventLoops.add(shutdownEventLoopGracefully(group)); |
| } |
| CompletableFuture<Void> shutdownFuture = |
| CompletableFuture.allOf(shutdownEventLoops.toArray(new CompletableFuture[0])) |
| .handle((v, t) -> { |
| if (t != null) { |
| log.warn("Error shutting down event loops gracefully", t); |
| } else { |
| log.info("Event loops shutdown completed."); |
| } |
| return null; |
| }) |
| .thenCompose(__ -> { |
| log.info("Continuing to second phase in shutdown."); |
| |
| List<CompletableFuture<Void>> asyncCloseFutures = new ArrayList<>(); |
| listenChannels.forEach(ch -> { |
| if (ch.isOpen()) { |
| asyncCloseFutures.add(closeChannel(ch)); |
| } |
| }); |
| |
| if (interceptor != null) { |
| interceptor.close(); |
| interceptor = null; |
| } |
| |
| try { |
| authenticationService.close(); |
| } catch (IOException e) { |
| log.warn("Error in closing authenticationService", e); |
| } |
| pulsarStats.close(); |
| try { |
| delayedDeliveryTrackerFactory.close(); |
| } catch (IOException e) { |
| log.warn("Error in closing delayedDeliveryTrackerFactory", e); |
| } |
| |
| asyncCloseFutures.add(GracefulExecutorServicesShutdown |
| .initiate() |
| .timeout( |
| Duration.ofMillis( |
| (long) (GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT |
| * pulsar.getConfiguration() |
| .getBrokerShutdownTimeoutMs()))) |
| .shutdown( |
| statsUpdater, |
| inactivityMonitor, |
| messageExpiryMonitor, |
| compactionMonitor, |
| consumedLedgersMonitor, |
| backlogQuotaChecker, |
| topicOrderedExecutor, |
| topicPublishRateLimiterMonitor, |
| brokerPublishRateLimiterMonitor, |
| deduplicationSnapshotMonitor) |
| .handle()); |
| |
| CompletableFuture<Void> combined = |
| FutureUtil.waitForAllAndSupportCancel(asyncCloseFutures); |
| cancellableDownstreamFutureReference.complete(combined); |
| combined.handle((v, t) -> { |
| if (t == null) { |
| log.info("Broker service completely shut down"); |
| } else { |
| if (t instanceof CancellationException) { |
| log.warn("Broker service didn't complete gracefully. " |
| + "Terminating Broker service."); |
| } else { |
| log.warn("Broker service shut down completed with exception", t); |
| } |
| } |
| return null; |
| }); |
| return combined; |
| }); |
| FutureUtil.whenCancelledOrTimedOut(shutdownFuture, () -> cancellableDownstreamFutureReference |
| .thenAccept(future -> future.cancel(false))); |
| return shutdownFuture; |
| } catch (Exception e) { |
| return FutureUtil.failedFuture(e); |
| } |
| } |
| |
| CompletableFuture<Void> shutdownEventLoopGracefully(EventLoopGroup eventLoopGroup) { |
| long brokerShutdownTimeoutMs = pulsar.getConfiguration().getBrokerShutdownTimeoutMs(); |
| long quietPeriod = Math.min((long) ( |
| GRACEFUL_SHUTDOWN_QUIET_PERIOD_RATIO_OF_TOTAL_TIMEOUT * brokerShutdownTimeoutMs), |
| GRACEFUL_SHUTDOWN_QUIET_PERIOD_MAX_MS); |
| long timeout = (long) (GRACEFUL_SHUTDOWN_TIMEOUT_RATIO_OF_TOTAL_TIMEOUT * brokerShutdownTimeoutMs); |
| return NettyFutureUtil.toCompletableFutureVoid( |
| eventLoopGroup.shutdownGracefully(quietPeriod, |
| timeout, TimeUnit.MILLISECONDS)); |
| } |
| |
| private CompletableFuture<Void> closeChannel(Channel channel) { |
| return ChannelFutures.toCompletableFuture(channel.close()) |
| .handle((c, t) -> { |
| // log problem if closing of channel fails |
| // ignore RejectedExecutionException |
| if (t != null && !(t instanceof RejectedExecutionException)) { |
| log.warn("Cannot close channel {}", channel, t); |
| } |
| return null; |
| }); |
| } |
| |
| /** |
| * It unloads all owned namespacebundles gracefully. |
| * <ul> |
| * <li>First it makes current broker unavailable and isolates from the clusters so, it will not serve any new |
| * requests.</li> |
| * <li>Second it starts unloading namespace bundle one by one without closing the connection in order to avoid |
| * disruption for other namespacebundles which are sharing the same connection from the same client.</li> |
| * </ul> |
| */ |
| public void unloadNamespaceBundlesGracefully() { |
| try { |
| log.info("Unloading namespace-bundles..."); |
| // make broker-node unavailable from the cluster |
| if (pulsar.getLoadManager() != null && pulsar.getLoadManager().get() != null) { |
| try { |
| pulsar.getLoadManager().get().disableBroker(); |
| } catch (PulsarServerException.NotFoundException ne) { |
| log.warn("Broker load-manager znode doesn't exist ", ne); |
| // still continue and release bundle ownership as broker's registration node doesn't exist. |
| } |
| } |
| |
| // unload all namespace-bundles gracefully |
| long closeTopicsStartTime = System.nanoTime(); |
| Set<NamespaceBundle> serviceUnits = pulsar.getNamespaceService().getOwnedServiceUnits(); |
| if (serviceUnits != null) { |
| serviceUnits.forEach(su -> { |
| if (su instanceof NamespaceBundle) { |
| try { |
| pulsar.getNamespaceService().unloadNamespaceBundle(su, pulsar.getConfiguration() |
| .getNamespaceBundleUnloadingTimeoutMs(), TimeUnit.MILLISECONDS).get(); |
| } catch (Exception e) { |
| log.warn("Failed to unload namespace bundle {}", su, e); |
| } |
| } |
| }); |
| } |
| |
| double closeTopicsTimeSeconds = TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - closeTopicsStartTime)) |
| / 1000.0; |
| log.info("Unloading {} namespace-bundles completed in {} seconds", serviceUnits.size(), |
| closeTopicsTimeSeconds); |
| } catch (Exception e) { |
| log.error("Failed to disable broker from loadbalancer list {}", e.getMessage(), e); |
| } |
| } |
| |
| public CompletableFuture<Optional<Topic>> getTopicIfExists(final String topic) { |
| return getTopic(topic, false /* createIfMissing */); |
| } |
| |
| public CompletableFuture<Topic> getOrCreateTopic(final String topic) { |
| return getTopic(topic, isAllowAutoTopicCreation(topic)).thenApply(Optional::get); |
| } |
| |
| public CompletableFuture<Optional<Topic>> getTopic(final String topic, boolean createIfMissing) { |
| try { |
| CompletableFuture<Optional<Topic>> topicFuture = topics.get(topic); |
| if (topicFuture != null) { |
| if (topicFuture.isCompletedExceptionally() |
| || (topicFuture.isDone() && !topicFuture.getNow(Optional.empty()).isPresent())) { |
| // Exceptional topics should be recreated. |
| topics.remove(topic, topicFuture); |
| } else { |
| // a non-existing topic in the cache shouldn't prevent creating a topic |
| if (createIfMissing) { |
| if (topicFuture.isDone() && topicFuture.getNow(Optional.empty()).isPresent()) { |
| return topicFuture; |
| } else { |
| return topicFuture.thenCompose(value -> { |
| if (!value.isPresent()) { |
| // retry and create topic |
| return getTopic(topic, createIfMissing); |
| } else { |
| // in-progress future completed successfully |
| return CompletableFuture.completedFuture(value); |
| } |
| }); |
| } |
| } else { |
| return topicFuture; |
| } |
| } |
| } |
| final boolean isPersistentTopic = TopicName.get(topic).getDomain().equals(TopicDomain.persistent); |
| if (isPersistentTopic) { |
| return topics.computeIfAbsent(topic, (topicName) -> { |
| return this.loadOrCreatePersistentTopic(topicName, createIfMissing); |
| }); |
| } else { |
| return topics.computeIfAbsent(topic, (name) -> { |
| final TopicName topicName = TopicName.get(name); |
| if (topicName.isPartitioned()) { |
| final TopicName partitionedTopicName = TopicName.get(topicName.getPartitionedTopicName()); |
| return this.fetchPartitionedTopicMetadataAsync(partitionedTopicName).thenCompose((metadata) -> { |
| if (topicName.getPartitionIndex() < metadata.partitions) { |
| return createNonPersistentTopic(name); |
| } |
| return CompletableFuture.completedFuture(Optional.empty()); |
| }); |
| } else if (createIfMissing) { |
| return createNonPersistentTopic(name); |
| } else { |
| return CompletableFuture.completedFuture(Optional.empty()); |
| } |
| }); |
| } |
| } catch (IllegalArgumentException e) { |
| log.warn("[{}] Illegalargument exception when loading topic", topic, e); |
| return FutureUtil.failedFuture(e); |
| } catch (RuntimeException e) { |
| Throwable cause = e.getCause(); |
| if (cause instanceof ServiceUnitNotReadyException) { |
| log.warn("[{}] Service unit is not ready when loading the topic", topic); |
| } else { |
| log.warn("[{}] Unexpected exception when loading topic: {}", topic, e.getMessage(), e); |
| } |
| |
| return FutureUtil.failedFuture(cause); |
| } |
| } |
| |
| public CompletableFuture<SchemaVersion> deleteSchemaStorage(String topic) { |
| Optional<Topic> optTopic = getTopicReference(topic); |
| if (optTopic.isPresent()) { |
| return optTopic.get().deleteSchema(); |
| } else { |
| return CompletableFuture.completedFuture(null); |
| } |
| } |
| |
| public CompletableFuture<Void> deleteTopic(String topic, boolean forceDelete) { |
| return deleteTopic(topic, forceDelete, false); |
| } |
| |
| public CompletableFuture<Void> deleteTopic(String topic, boolean forceDelete, boolean deleteSchema) { |
| Optional<Topic> optTopic = getTopicReference(topic); |
| if (optTopic.isPresent()) { |
| Topic t = optTopic.get(); |
| if (forceDelete) { |
| if (deleteSchema) { |
| return t.deleteSchema().thenCompose(schemaVersion -> { |
| log.info("Successfully delete topic {}'s schema of version {}", t.getName(), schemaVersion); |
| return t.deleteForcefully(); |
| }); |
| } else { |
| return t.deleteForcefully(); |
| } |
| } |
| |
| // v2 topics have a global name so check if the topic is replicated. |
| if (t.isReplicated()) { |
| // Delete is disallowed on global topic |
| final List<String> clusters = t.getReplicators().keys(); |
| log.error("Delete forbidden topic {} is replicated on clusters {}", topic, clusters); |
| return FutureUtil.failedFuture( |
| new IllegalStateException("Delete forbidden topic is replicated on clusters " + clusters)); |
| } |
| |
| if (deleteSchema) { |
| return t.deleteSchema().thenCompose(schemaVersion -> { |
| log.info("Successfully delete topic {}'s schema of version {}", t.getName(), schemaVersion); |
| return t.delete(); |
| }); |
| } else { |
| return t.delete(); |
| } |
| } |
| |
| if (log.isDebugEnabled()) { |
| log.debug("Topic {} is not loaded, try to delete from metadata", topic); |
| } |
| // Topic is not loaded, though we still might be able to delete from metadata |
| TopicName tn = TopicName.get(topic); |
| if (!tn.isPersistent()) { |
| // Nothing to do if it's not persistent |
| return CompletableFuture.completedFuture(null); |
| } |
| |
| CompletableFuture<Void> future = new CompletableFuture<>(); |
| |
| CompletableFuture<Void> deleteTopicAuthenticationFuture = new CompletableFuture<>(); |
| deleteTopicAuthenticationWithRetry(topic, deleteTopicAuthenticationFuture, 5); |
| deleteTopicAuthenticationFuture.whenComplete((v, ex) -> { |
| if (ex != null) { |
| future.completeExceptionally(ex); |
| return; |
| } |
| managedLedgerFactory.asyncDelete(tn.getPersistenceNamingEncoding(), new DeleteLedgerCallback() { |
| @Override |
| public void deleteLedgerComplete(Object ctx) { |
| future.complete(null); |
| } |
| |
| @Override |
| public void deleteLedgerFailed(ManagedLedgerException exception, Object ctx) { |
| future.completeExceptionally(exception); |
| } |
| }, null); |
| }); |
| |
| |
| return future; |
| } |
| |
| public void deleteTopicAuthenticationWithRetry(String topic, CompletableFuture<Void> future, int count) { |
| if (count == 0) { |
| log.error("The number of retries has exhausted for topic {}", topic); |
| future.completeExceptionally(new MetadataStoreException("The number of retries has exhausted")); |
| return; |
| } |
| NamespaceName namespaceName = TopicName.get(topic).getNamespaceObject(); |
| // Check whether there are auth policies for the topic |
| pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(namespaceName).thenAccept(optPolicies -> { |
| if (!optPolicies.isPresent() || !optPolicies.get().auth_policies.getTopicAuthentication() |
| .containsKey(topic)) { |
| // if there is no auth policy for the topic, just complete and return |
| if (log.isDebugEnabled()) { |
| log.debug("Authentication policies not found for topic {}", topic); |
| } |
| future.complete(null); |
| return; |
| } |
| pulsar.getPulsarResources().getNamespaceResources() |
| .setPoliciesAsync(TopicName.get(topic).getNamespaceObject(), p -> { |
| p.auth_policies.getTopicAuthentication().remove(topic); |
| return p; |
| }).thenAccept(v -> { |
| log.info("Successfully delete authentication policies for topic {}", topic); |
| future.complete(null); |
| }).exceptionally(ex1 -> { |
| if (ex1.getCause() instanceof MetadataStoreException.BadVersionException) { |
| log.warn( |
| "Failed to delete authentication policies because of bad version. " |
| + "Retry to delete authentication policies for topic {}", |
| topic); |
| deleteTopicAuthenticationWithRetry(topic, future, count - 1); |
| } else { |
| log.error("Failed to delete authentication policies for topic {}", topic, ex1); |
| future.completeExceptionally(ex1); |
| } |
| return null; |
| }); |
| }).exceptionally(ex -> { |
| log.error("Failed to get policies for topic {}", topic, ex); |
| future.completeExceptionally(ex); |
| return null; |
| }); |
| } |
| |
| private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic) { |
| CompletableFuture<Optional<Topic>> topicFuture = new CompletableFuture<>(); |
| if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) { |
| if (log.isDebugEnabled()) { |
| log.debug("Broker is unable to load non-persistent topic {}", topic); |
| } |
| return FutureUtil.failedFuture( |
| new NotAllowedException("Broker is not unable to load non-persistent topic")); |
| } |
| final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); |
| NonPersistentTopic nonPersistentTopic = new NonPersistentTopic(topic, this); |
| CompletableFuture<Void> isOwner = checkTopicNsOwnership(topic); |
| isOwner.thenRun(() -> { |
| nonPersistentTopic.initialize() |
| .thenCompose(__ -> nonPersistentTopic.checkReplication()) |
| .thenRun(() -> { |
| log.info("Created topic {}", nonPersistentTopic); |
| long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - topicCreateTimeMs; |
| pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs); |
| addTopicToStatsMaps(TopicName.get(topic), nonPersistentTopic); |
| topicFuture.complete(Optional.of(nonPersistentTopic)); |
| }).exceptionally(ex -> { |
| log.warn("Replication check failed. Removing topic from topics list {}, {}", topic, ex.getCause()); |
| nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> { |
| pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); |
| topicFuture.completeExceptionally(ex); |
| }); |
| return null; |
| }); |
| }).exceptionally(e -> { |
| log.warn("CheckTopicNsOwnership fail when createNonPersistentTopic! {}", topic, e.getCause()); |
| // CheckTopicNsOwnership fail dont create nonPersistentTopic, when topic do lookup will find the correct |
| // broker. When client get non-persistent-partitioned topic |
| // metadata will the non-persistent-topic will be created. |
| // so we should add checkTopicNsOwnership logic otherwise the topic will be created |
| // if it dont own by this broker,we should return success |
| // otherwise it will keep retrying getPartitionedTopicMetadata |
| topicFuture.complete(Optional.of(nonPersistentTopic)); |
| // after get metadata return success, we should delete this topic from this broker, because this topic not |
| // owner by this broker and it don't initialize and checkReplication |
| pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); |
| return null; |
| }); |
| |
| return topicFuture; |
| } |
| |
| private <T> CompletableFuture<T> futureWithDeadline() { |
| return FutureUtil.createFutureWithTimeout(FUTURE_DEADLINE_TIMEOUT_DURATION, executor(), |
| () -> FUTURE_DEADLINE_TIMEOUT_EXCEPTION); |
| } |
| |
| public PulsarClient getReplicationClient(String cluster, Optional<ClusterData> clusterDataOp) { |
| PulsarClient client = replicationClients.get(cluster); |
| if (client != null) { |
| return client; |
| } |
| |
| return replicationClients.computeIfAbsent(cluster, key -> { |
| try { |
| ClusterData data = clusterDataOp |
| .orElseThrow(() -> new MetadataStoreException.NotFoundException(cluster)); |
| ClientBuilder clientBuilder = PulsarClient.builder() |
| .enableTcpNoDelay(false) |
| .connectionsPerBroker(pulsar.getConfiguration().getReplicationConnectionsPerBroker()) |
| .statsInterval(0, TimeUnit.SECONDS); |
| if (data.getAuthenticationPlugin() != null && data.getAuthenticationParameters() != null) { |
| clientBuilder.authentication(data.getAuthenticationPlugin(), data.getAuthenticationParameters()); |
| } else if (pulsar.getConfiguration().isAuthenticationEnabled()) { |
| clientBuilder.authentication(pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(), |
| pulsar.getConfiguration().getBrokerClientAuthenticationParameters()); |
| } |
| String serviceUrlTls = isNotBlank(data.getBrokerServiceUrlTls()) ? data.getBrokerServiceUrlTls() |
| : data.getServiceUrlTls(); |
| if (data.isBrokerClientTlsEnabled()) { |
| configTlsSettings(clientBuilder, serviceUrlTls, |
| data.isBrokerClientTlsEnabledWithKeyStore(), data.isTlsAllowInsecureConnection(), |
| data.getBrokerClientTlsTrustStoreType(), data.getBrokerClientTlsTrustStore(), |
| data.getBrokerClientTlsTrustStorePassword(), data.getBrokerClientTrustCertsFilePath()); |
| } else if (pulsar.getConfiguration().isBrokerClientTlsEnabled()) { |
| configTlsSettings(clientBuilder, serviceUrlTls, |
| pulsar.getConfiguration().isBrokerClientTlsEnabledWithKeyStore(), |
| pulsar.getConfiguration().isTlsAllowInsecureConnection(), |
| pulsar.getConfiguration().getBrokerClientTlsTrustStoreType(), |
| pulsar.getConfiguration().getBrokerClientTlsTrustStore(), |
| pulsar.getConfiguration().getBrokerClientTlsTrustStorePassword(), |
| pulsar.getConfiguration().getBrokerClientTrustCertsFilePath()); |
| } else { |
| clientBuilder.serviceUrl( |
| isNotBlank(data.getBrokerServiceUrl()) ? data.getBrokerServiceUrl() : data.getServiceUrl()); |
| } |
| if (data.getProxyProtocol() != null && StringUtils.isNotBlank(data.getProxyServiceUrl())) { |
| clientBuilder.proxyServiceUrl(data.getProxyServiceUrl(), data.getProxyProtocol()); |
| log.info("Configuring proxy-url {} with protocol {}", data.getProxyServiceUrl(), |
| data.getProxyProtocol()); |
| } |
| if (StringUtils.isNotBlank(data.getListenerName())) { |
| clientBuilder.listenerName(data.getListenerName()); |
| log.info("Configuring listenerName {}", data.getListenerName()); |
| } |
| // Share all the IO threads across broker and client connections |
| ClientConfigurationData conf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData(); |
| return new PulsarClientImpl(conf, workerGroup); |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| }); |
| } |
| |
| private void configTlsSettings(ClientBuilder clientBuilder, String serviceUrl, |
| boolean brokerClientTlsEnabledWithKeyStore, boolean isTlsAllowInsecureConnection, |
| String brokerClientTlsTrustStoreType, String brokerClientTlsTrustStore, |
| String brokerClientTlsTrustStorePassword, String brokerClientTrustCertsFilePath) { |
| clientBuilder |
| .serviceUrl(serviceUrl) |
| .enableTls(true) |
| .allowTlsInsecureConnection(isTlsAllowInsecureConnection); |
| if (brokerClientTlsEnabledWithKeyStore) { |
| clientBuilder.useKeyStoreTls(true) |
| .tlsTrustStoreType(brokerClientTlsTrustStoreType) |
| .tlsTrustStorePath(brokerClientTlsTrustStore) |
| .tlsTrustStorePassword(brokerClientTlsTrustStorePassword); |
| } else { |
| clientBuilder.tlsTrustCertsFilePath(brokerClientTrustCertsFilePath); |
| } |
| } |
| |
| public PulsarAdmin getClusterPulsarAdmin(String cluster, Optional<ClusterData> clusterDataOp) { |
| PulsarAdmin admin = clusterAdmins.get(cluster); |
| if (admin != null) { |
| return admin; |
| } |
| return clusterAdmins.computeIfAbsent(cluster, key -> { |
| try { |
| ClusterData data = clusterDataOp |
| .orElseThrow(() -> new MetadataStoreException.NotFoundException(cluster)); |
| |
| ServiceConfiguration conf = pulsar.getConfig(); |
| |
| boolean isTlsUrl = conf.isBrokerClientTlsEnabled() && isNotBlank(data.getServiceUrlTls()); |
| String adminApiUrl = isTlsUrl ? data.getServiceUrlTls() : data.getServiceUrl(); |
| PulsarAdminBuilder builder = PulsarAdmin.builder().serviceHttpUrl(adminApiUrl) |
| .authentication( |
| conf.getBrokerClientAuthenticationPlugin(), |
| conf.getBrokerClientAuthenticationParameters()); |
| |
| if (isTlsUrl) { |
| builder.allowTlsInsecureConnection(conf.isTlsAllowInsecureConnection()); |
| if (conf.isBrokerClientTlsEnabledWithKeyStore()) { |
| builder.useKeyStoreTls(true) |
| .tlsTrustStoreType(conf.getBrokerClientTlsTrustStoreType()) |
| .tlsTrustStorePath(conf.getBrokerClientTlsTrustStore()) |
| .tlsTrustStorePassword(conf.getBrokerClientTlsTrustStorePassword()); |
| } else { |
| builder.tlsTrustCertsFilePath(conf.getBrokerClientTrustCertsFilePath()); |
| } |
| } |
| |
| // most of the admin request requires to make zk-call so, keep the max read-timeout based on |
| // zk-operation timeout |
| builder.readTimeout(conf.getZooKeeperOperationTimeoutSeconds(), TimeUnit.SECONDS); |
| |
| PulsarAdmin adminClient = builder.build(); |
| log.info("created admin with url {} ", adminApiUrl); |
| return adminClient; |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| }); |
| } |
| |
| /** |
| * It creates a topic async and returns CompletableFuture. It also throttles down configured max-concurrent topic |
| * loading and puts them into queue once in-process topics are created. |
| * |
| * @param topic persistent-topic name |
| * @return CompletableFuture<Topic> |
| * @throws RuntimeException |
| */ |
| protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final String topic, |
| boolean createIfMissing) throws RuntimeException { |
| final CompletableFuture<Optional<Topic>> topicFuture = FutureUtil.createFutureWithTimeout( |
| Duration.ofSeconds(pulsar.getConfiguration().getTopicLoadTimeoutSeconds()), executor(), |
| () -> FAILED_TO_LOAD_TOPIC_TIMEOUT_EXCEPTION); |
| if (!pulsar.getConfiguration().isEnablePersistentTopics()) { |
| if (log.isDebugEnabled()) { |
| log.debug("Broker is unable to load persistent topic {}", topic); |
| } |
| topicFuture.completeExceptionally(new NotAllowedException( |
| "Broker is not unable to load persistent topic")); |
| return topicFuture; |
| } |
| |
| checkTopicNsOwnership(topic) |
| .thenRun(() -> { |
| final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get(); |
| |
| if (topicLoadSemaphore.tryAcquire()) { |
| createPersistentTopic(topic, createIfMissing, topicFuture); |
| topicFuture.handle((persistentTopic, ex) -> { |
| // release permit and process pending topic |
| topicLoadSemaphore.release(); |
| createPendingLoadTopic(); |
| return null; |
| }); |
| } else { |
| pendingTopicLoadingQueue.add(new ImmutablePair<>(topic, topicFuture)); |
| if (log.isDebugEnabled()) { |
| log.debug("topic-loading for {} added into pending queue", topic); |
| } |
| } |
| }).exceptionally(ex -> { |
| topicFuture.completeExceptionally(ex.getCause()); |
| return null; |
| }); |
| |
| return topicFuture; |
| } |
| |
| private void createPersistentTopic(final String topic, boolean createIfMissing, |
| CompletableFuture<Optional<Topic>> topicFuture) { |
| |
| final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); |
| TopicName topicName = TopicName.get(topic); |
| if (!pulsar.getNamespaceService().isServiceUnitActive(topicName)) { |
| // namespace is being unloaded |
| String msg = String.format("Namespace is being unloaded, cannot add topic %s", topic); |
| log.warn(msg); |
| pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); |
| topicFuture.completeExceptionally(new ServiceUnitNotReadyException(msg)); |
| return; |
| } |
| |
| if (isTransactionSystemTopic(topicName)) { |
| String msg = String.format("Can not create transaction system topic %s", topic); |
| log.warn(msg); |
| pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); |
| topicFuture.completeExceptionally(new NotAllowedException(msg)); |
| return; |
| } |
| |
| CompletableFuture<Void> maxTopicsCheck = createIfMissing |
| ? checkMaxTopicsPerNamespace(topicName, 1) |
| : CompletableFuture.completedFuture(null); |
| |
| maxTopicsCheck.thenCompose(__ -> getManagedLedgerConfig(topicName)).thenAccept(managedLedgerConfig -> { |
| if (isBrokerEntryMetadataEnabled() || isBrokerPayloadProcessorEnabled()) { |
| // init managedLedger interceptor |
| Set<BrokerEntryMetadataInterceptor> interceptors = new HashSet<>(); |
| for (BrokerEntryMetadataInterceptor interceptor : brokerEntryMetadataInterceptors) { |
| // add individual AppendOffsetMetadataInterceptor for each topic |
| if (interceptor instanceof AppendIndexMetadataInterceptor) { |
| interceptors.add(new AppendIndexMetadataInterceptor()); |
| } else { |
| interceptors.add(interceptor); |
| } |
| } |
| managedLedgerConfig.setManagedLedgerInterceptor( |
| new ManagedLedgerInterceptorImpl(interceptors, brokerEntryPayloadProcessors)); |
| } |
| managedLedgerConfig.setCreateIfMissing(createIfMissing); |
| |
| // Once we have the configuration, we can proceed with the async open operation |
| managedLedgerFactory.asyncOpen(topicName.getPersistenceNamingEncoding(), managedLedgerConfig, |
| new OpenLedgerCallback() { |
| @Override |
| public void openLedgerComplete(ManagedLedger ledger, Object ctx) { |
| try { |
| PersistentTopic persistentTopic = isSystemTopic(topic) |
| ? new SystemTopic(topic, ledger, BrokerService.this) |
| : new PersistentTopic(topic, ledger, BrokerService.this); |
| CompletableFuture<Void> preCreateSubForCompaction = |
| persistentTopic.preCreateSubscriptionForCompactionIfNeeded(); |
| CompletableFuture<Void> replicationFuture = persistentTopic |
| .initialize() |
| .thenCompose(__ -> persistentTopic.checkReplication()); |
| |
| |
| CompletableFuture.allOf(preCreateSubForCompaction, replicationFuture) |
| .thenCompose(v -> { |
| // Also check dedup status |
| return persistentTopic.checkDeduplicationStatus(); |
| }).thenRun(() -> { |
| log.info("Created topic {} - dedup is {}", topic, |
| persistentTopic.isDeduplicationEnabled() ? "enabled" : "disabled"); |
| long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) |
| - topicCreateTimeMs; |
| pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs); |
| if (topicFuture.isCompletedExceptionally()) { |
| log.warn("{} future is already completed with failure {}, closing the topic", |
| topic, FutureUtil.getException(topicFuture)); |
| persistentTopic.stopReplProducers().whenComplete((v, exception) -> { |
| topics.remove(topic, topicFuture); |
| }); |
| } else { |
| addTopicToStatsMaps(topicName, persistentTopic); |
| topicFuture.complete(Optional.of(persistentTopic)); |
| } |
| }).exceptionally((ex) -> { |
| log.warn( |
| "Replication or dedup check failed." |
| + " Removing topic from topics list {}, {}", |
| topic, ex); |
| persistentTopic.stopReplProducers().whenComplete((v, exception) -> { |
| topics.remove(topic, topicFuture); |
| topicFuture.completeExceptionally(ex); |
| }); |
| |
| return null; |
| }); |
| } catch (PulsarServerException e) { |
| log.warn("Failed to create topic {}-{}", topic, e.getMessage()); |
| pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); |
| topicFuture.completeExceptionally(e); |
| } |
| } |
| |
| @Override |
| public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { |
| if (!createIfMissing && exception instanceof ManagedLedgerNotFoundException) { |
| // We were just trying to load a topic and the topic doesn't exist |
| topicFuture.complete(Optional.empty()); |
| } else { |
| log.warn("Failed to create topic {}", topic, exception); |
| pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); |
| topicFuture.completeExceptionally(new PersistenceException(exception)); |
| } |
| } |
| }, () -> isTopicNsOwnedByBroker(topicName), null); |
| |
| }).exceptionally((exception) -> { |
| log.warn("[{}] Failed to get topic configuration: {}", topic, exception.getMessage(), exception); |
| // remove topic from topics-map in different thread to avoid possible deadlock if |
| // createPersistentTopic-thread only tries to handle this future-result |
| pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); |
| topicFuture.completeExceptionally(exception); |
| return null; |
| }); |
| } |
| |
| public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(TopicName topicName) { |
| NamespaceName namespace = topicName.getNamespaceObject(); |
| ServiceConfiguration serviceConfig = pulsar.getConfiguration(); |
| |
| NamespaceResources nsr = pulsar.getPulsarResources().getNamespaceResources(); |
| LocalPoliciesResources lpr = pulsar.getPulsarResources().getLocalPolicies(); |
| return nsr.getPoliciesAsync(namespace) |
| .thenCombine(lpr.getLocalPoliciesAsync(namespace), (policies, localPolicies) -> { |
| PersistencePolicies persistencePolicies = null; |
| RetentionPolicies retentionPolicies = null; |
| OffloadPoliciesImpl topicLevelOffloadPolicies = null; |
| |
| if (pulsar.getConfig().isTopicLevelPoliciesEnabled()) { |
| try { |
| TopicPolicies topicPolicies = pulsar.getTopicPoliciesService().getTopicPolicies(topicName); |
| if (topicPolicies != null) { |
| persistencePolicies = topicPolicies.getPersistence(); |
| retentionPolicies = topicPolicies.getRetentionPolicies(); |
| topicLevelOffloadPolicies = topicPolicies.getOffloadPolicies(); |
| } |
| } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) { |
| log.debug("Topic {} policies have not been initialized yet.", topicName); |
| } |
| } |
| |
| if (persistencePolicies == null) { |
| persistencePolicies = policies.map(p -> p.persistence).orElseGet( |
| () -> new PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(), |
| serviceConfig.getManagedLedgerDefaultWriteQuorum(), |
| serviceConfig.getManagedLedgerDefaultAckQuorum(), |
| serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit())); |
| } |
| |
| if (retentionPolicies == null) { |
| retentionPolicies = policies.map(p -> p.retention_policies).orElseGet( |
| () -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(), |
| serviceConfig.getDefaultRetentionSizeInMB()) |
| ); |
| } |
| |
| |
| ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); |
| managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble()); |
| managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum()); |
| managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum()); |
| if (localPolicies.isPresent() && localPolicies.get().bookieAffinityGroup != null) { |
| managedLedgerConfig |
| .setBookKeeperEnsemblePlacementPolicyClassName( |
| IsolatedBookieEnsemblePlacementPolicy.class); |
| Map<String, Object> properties = Maps.newHashMap(); |
| properties.put(IsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, |
| localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupPrimary()); |
| properties.put(IsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS, |
| localPolicies.get().bookieAffinityGroup.getBookkeeperAffinityGroupSecondary()); |
| managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties); |
| } |
| managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate()); |
| managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType()); |
| managedLedgerConfig.setPassword(serviceConfig.getManagedLedgerPassword()); |
| |
| managedLedgerConfig |
| .setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist()); |
| managedLedgerConfig.setMaxUnackedRangesToPersistInZk( |
| serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInZooKeeper()); |
| managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger()); |
| managedLedgerConfig |
| .setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(), |
| TimeUnit.MINUTES); |
| managedLedgerConfig |
| .setMaximumRolloverTime(serviceConfig.getManagedLedgerMaxLedgerRolloverTimeMinutes(), |
| TimeUnit.MINUTES); |
| managedLedgerConfig.setMaxSizePerLedgerMb(serviceConfig.getManagedLedgerMaxSizePerLedgerMbytes()); |
| |
| managedLedgerConfig.setMetadataOperationsTimeoutSeconds( |
| serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds()); |
| managedLedgerConfig |
| .setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds()); |
| managedLedgerConfig |
| .setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds()); |
| managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize()); |
| managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled( |
| serviceConfig.isManagedLedgerUnackedRangesOpenCacheSetEnabled()); |
| managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum()); |
| managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum()); |
| managedLedgerConfig |
| .setMetadataMaxEntriesPerLedger(serviceConfig.getManagedLedgerCursorMaxEntriesPerLedger()); |
| |
| managedLedgerConfig |
| .setLedgerRolloverTimeout(serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds()); |
| managedLedgerConfig |
| .setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES); |
| managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB()); |
| managedLedgerConfig.setAutoSkipNonRecoverableData(serviceConfig.isAutoSkipNonRecoverableData()); |
| managedLedgerConfig.setLazyCursorRecovery(serviceConfig.isLazyCursorRecovery()); |
| |
| OffloadPoliciesImpl nsLevelOffloadPolicies = |
| (OffloadPoliciesImpl) policies.map(p -> p.offload_policies).orElse(null); |
| OffloadPoliciesImpl offloadPolicies = OffloadPoliciesImpl.mergeConfiguration( |
| topicLevelOffloadPolicies, |
| OffloadPoliciesImpl.oldPoliciesCompatible(nsLevelOffloadPolicies, policies.orElse(null)), |
| getPulsar().getConfig().getProperties()); |
| if (topicLevelOffloadPolicies != null) { |
| try { |
| LedgerOffloader topicLevelLedgerOffLoader = |
| pulsar().createManagedLedgerOffloader(offloadPolicies); |
| managedLedgerConfig.setLedgerOffloader(topicLevelLedgerOffLoader); |
| } catch (PulsarServerException e) { |
| throw new RuntimeException(e); |
| } |
| } else { |
| //If the topic level policy is null, use the namespace level |
| managedLedgerConfig |
| .setLedgerOffloader(pulsar.getManagedLedgerOffloader(namespace, offloadPolicies)); |
| } |
| |
| managedLedgerConfig.setDeletionAtBatchIndexLevelEnabled( |
| serviceConfig.isAcknowledgmentAtBatchIndexLevelEnabled()); |
| managedLedgerConfig.setNewEntriesCheckDelayInMillis( |
| serviceConfig.getManagedLedgerNewEntriesCheckDelayInMillis()); |
| |
| return managedLedgerConfig; |
| }); |
| } |
| |
| private void addTopicToStatsMaps(TopicName topicName, Topic topic) { |
| pulsar.getNamespaceService().getBundleAsync(topicName) |
| .thenAccept(namespaceBundle -> { |
| if (namespaceBundle != null) { |
| synchronized (multiLayerTopicsMap) { |
| String serviceUnit = namespaceBundle.toString(); |
| multiLayerTopicsMap // |
| .computeIfAbsent(topicName.getNamespace(), k -> new ConcurrentOpenHashMap<>()) // |
| .computeIfAbsent(serviceUnit, k -> new ConcurrentOpenHashMap<>()) // |
| .put(topicName.toString(), topic); |
| } |
| } |
| invalidateOfflineTopicStatCache(topicName); |
| }) |
| .exceptionally(ex -> { |
| log.warn("Got exception when retrieving bundle name during create persistent topic", ex); |
| return null; |
| }); |
| } |
| |
| public void refreshTopicToStatsMaps(NamespaceBundle oldBundle) { |
| checkNotNull(oldBundle); |
| try { |
| // retrieve all topics under existing old bundle |
| List<Topic> topics = getAllTopicsFromNamespaceBundle(oldBundle.getNamespaceObject().toString(), |
| oldBundle.toString()); |
| if (!isEmpty(topics)) { |
| // add topic under new split bundles which already updated into NamespaceBundleFactory.bundleCache |
| topics.stream().forEach(t -> { |
| addTopicToStatsMaps(TopicName.get(t.getName()), t); |
| }); |
| // remove old bundle from the map |
| synchronized (multiLayerTopicsMap) { |
| multiLayerTopicsMap.get(oldBundle.getNamespaceObject().toString()).remove(oldBundle.toString()); |
| pulsarStats.invalidBundleStats(oldBundle.toString()); |
| } |
| } |
| } catch (Exception e) { |
| log.warn("Got exception while refreshing topicStats map", e); |
| } |
| } |
| |
| public PersistentOfflineTopicStats getOfflineTopicStat(TopicName topicName) { |
| return offlineTopicStatCache.get(topicName); |
| } |
| |
| public void cacheOfflineTopicStats(TopicName topicName, PersistentOfflineTopicStats offlineTopicStats) { |
| offlineTopicStatCache.put(topicName, offlineTopicStats); |
| } |
| |
| public void invalidateOfflineTopicStatCache(TopicName topicName) { |
| PersistentOfflineTopicStats removed = offlineTopicStatCache.remove(topicName); |
| if (removed != null) { |
| log.info("Removed cached offline topic stat for {} ", topicName.getPersistenceNamingEncoding()); |
| } |
| } |
| |
| /** |
| * Get a reference to a topic that is currently loaded in the broker. |
| * |
| * This method will not make the broker attempt to load the topic if it's not already. |
| */ |
| public Optional<Topic> getTopicReference(String topic) { |
| CompletableFuture<Optional<Topic>> future = topics.get(topic); |
| if (future != null && future.isDone() && !future.isCompletedExceptionally()) { |
| return future.join(); |
| } else { |
| return Optional.empty(); |
| } |
| } |
| |
| public void updateRates() { |
| synchronized (pulsarStats) { |
| pulsarStats.updateStats(multiLayerTopicsMap); |
| |
| Summary.rotateLatencyCollection(); |
| } |
| } |
| |
| public void getDimensionMetrics(Consumer<ByteBuf> consumer) { |
| pulsarStats.getDimensionMetrics(consumer); |
| } |
| |
| public List<Metrics> getTopicMetrics() { |
| return pulsarStats.getTopicMetrics(); |
| } |
| |
| public Map<String, NamespaceBundleStats> getBundleStats() { |
| return pulsarStats.getBundleStats(); |
| } |
| |
| public Semaphore getLookupRequestSemaphore() { |
| return lookupRequestSemaphore.get(); |
| } |
| |
| public void checkGC() { |
| forEachTopic(Topic::checkGC); |
| } |
| |
| public void checkMessageExpiry() { |
| forEachTopic(Topic::checkMessageExpiry); |
| } |
| |
| public void checkReplicationPolicies() { |
| forEachTopic(Topic::checkReplication); |
| } |
| |
| public void checkCompaction() { |
| forEachTopic((t) -> { |
| if (t instanceof PersistentTopic) { |
| ((PersistentTopic) t).checkCompaction(); |
| } |
| }); |
| } |
| |
| private void checkConsumedLedgers() { |
| forEachTopic((t) -> { |
| if (t instanceof PersistentTopic) { |
| Optional.ofNullable(((PersistentTopic) t).getManagedLedger()).ifPresent( |
| managedLedger -> { |
| managedLedger.trimConsumedLedgersInBackground(Futures.NULL_PROMISE); |
| } |
| ); |
| } |
| }); |
| } |
| |
| public void checkMessageDeduplicationInfo() { |
| forEachTopic(Topic::checkMessageDeduplicationInfo); |
| } |
| |
| public void checkInactiveSubscriptions() { |
| forEachTopic(Topic::checkInactiveSubscriptions); |
| } |
| |
| public void checkTopicPublishThrottlingRate() { |
| forEachTopic(Topic::checkTopicPublishThrottlingRate); |
| } |
| |
| private void refreshTopicPublishRate() { |
| forEachTopic(Topic::resetTopicPublishCountAndEnableReadIfRequired); |
| } |
| |
| public void checkBrokerPublishThrottlingRate() { |
| brokerPublishRateLimiter.checkPublishRate(); |
| if (brokerPublishRateLimiter.isPublishRateExceeded()) { |
| forEachTopic(topic -> ((AbstractTopic) topic).disableProducerRead()); |
| } |
| } |
| |
| private void refreshBrokerPublishRate() { |
| boolean doneReset = brokerPublishRateLimiter.resetPublishCount(); |
| forEachTopic(topic -> topic.resetBrokerPublishCountAndEnableReadIfRequired(doneReset)); |
| } |
| |
| /** |
| * Iterates over all loaded topics in the broker. |
| */ |
| public void forEachTopic(Consumer<Topic> consumer) { |
| topics.forEach((n, t) -> { |
| Optional<Topic> topic = extractTopic(t); |
| topic.ifPresent(consumer::accept); |
| }); |
| } |
| |
| public BacklogQuotaManager getBacklogQuotaManager() { |
| return this.backlogQuotaManager; |
| } |
| |
| public synchronized void monitorBacklogQuota() { |
| forEachTopic(topic -> { |
| if (topic instanceof PersistentTopic) { |
| PersistentTopic persistentTopic = (PersistentTopic) topic; |
| if (persistentTopic.isSizeBacklogExceeded()) { |
| getBacklogQuotaManager().handleExceededBacklogQuota(persistentTopic, |
| BacklogQuota.BacklogQuotaType.destination_storage, false); |
| } else if (persistentTopic.isTimeBacklogExceeded()) { |
| getBacklogQuotaManager().handleExceededBacklogQuota(persistentTopic, |
| BacklogQuota.BacklogQuotaType.message_age, |
| pulsar.getConfiguration().isPreciseTimeBasedBacklogQuotaCheck()); |
| } else { |
| if (log.isDebugEnabled()) { |
| log.debug("quota not exceeded for [{}]", topic.getName()); |
| } |
| } |
| } |
| }); |
| } |
| |
| public boolean isTopicNsOwnedByBroker(TopicName topicName) { |
| try { |
| return pulsar.getNamespaceService().isServiceUnitOwned(topicName); |
| } catch (Exception e) { |
| log.warn("Failed to check the ownership of the topic: {}, {}", topicName, e.getMessage()); |
| } |
| return false; |
| } |
| |
| public CompletableFuture<Void> checkTopicNsOwnership(final String topic) { |
| TopicName topicName = TopicName.get(topic); |
| |
| return pulsar.getNamespaceService().checkTopicOwnership(topicName) |
| .thenCompose(ownedByThisInstance -> { |
| if (ownedByThisInstance) { |
| return CompletableFuture.completedFuture(null); |
| } else { |
| String msg = String.format("Namespace bundle for topic (%s) not served by this instance. " |
| + "Please redo the lookup. Request is denied: namespace=%s", topic, |
| topicName.getNamespace()); |
| log.warn(msg); |
| return FutureUtil.failedFuture(new ServiceUnitNotReadyException(msg)); |
| } |
| }); |
| } |
| |
| public CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit, |
| boolean closeWithoutWaitingClientDisconnect, long timeout, TimeUnit unit) { |
| CompletableFuture<Integer> future = unloadServiceUnit(serviceUnit, closeWithoutWaitingClientDisconnect); |
| ScheduledFuture<?> taskTimeout = executor().schedule(() -> { |
| if (!future.isDone()) { |
| log.warn("Unloading of {} has timed out", serviceUnit); |
| // Complete the future with no error |
| future.complete(0); |
| } |
| }, timeout, unit); |
| |
| future.whenComplete((r, ex) -> taskTimeout.cancel(true)); |
| return future; |
| } |
| |
| /** |
| * Unload all the topic served by the broker service under the given service unit. |
| * |
| * @param serviceUnit |
| * @param closeWithoutWaitingClientDisconnect don't wait for clients to disconnect |
| * and forcefully close managed-ledger |
| * @return |
| */ |
| private CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit, |
| boolean closeWithoutWaitingClientDisconnect) { |
| List<CompletableFuture<Void>> closeFutures = Lists.newArrayList(); |
| topics.forEach((name, topicFuture) -> { |
| TopicName topicName = TopicName.get(name); |
| if (serviceUnit.includes(topicName)) { |
| // Topic needs to be unloaded |
| log.info("[{}] Unloading topic", topicName); |
| closeFutures.add(topicFuture |
| .thenCompose(t -> t.isPresent() ? t.get().close(closeWithoutWaitingClientDisconnect) |
| : CompletableFuture.completedFuture(null))); |
| } |
| }); |
| if (getPulsar().getConfig().isTransactionCoordinatorEnabled() |
| && serviceUnit.getNamespaceObject().equals(NamespaceName.SYSTEM_NAMESPACE)) { |
| TransactionMetadataStoreService metadataStoreService = |
| this.getPulsar().getTransactionMetadataStoreService(); |
| // if the store belongs to this bundle, remove and close the store |
| this.getPulsar().getTransactionMetadataStoreService().getStores().values().stream().filter(store -> |
| serviceUnit.includes(TopicName.TRANSACTION_COORDINATOR_ASSIGN |
| .getPartition((int) (store.getTransactionCoordinatorID().getId())))) |
| .map(TransactionMetadataStore::getTransactionCoordinatorID) |
| .forEach(tcId -> closeFutures.add(metadataStoreService.removeTransactionMetadataStore(tcId))); |
| } |
| |
| return FutureUtil.waitForAll(closeFutures).thenApply(v -> closeFutures.size()); |
| } |
| |
| public void cleanUnloadedTopicFromCache(NamespaceBundle serviceUnit) { |
| for (String topic : topics.keys()) { |
| TopicName topicName = TopicName.get(topic); |
| if (serviceUnit.includes(topicName) && getTopicReference(topic).isPresent()) { |
| log.info("[{}][{}] Clean unloaded topic from cache.", serviceUnit.toString(), topic); |
| pulsar.getBrokerService().removeTopicFromCache(topicName.toString(), serviceUnit); |
| } |
| } |
| } |
| |
| public AuthorizationService getAuthorizationService() { |
| return authorizationService; |
| } |
| |
| public CompletableFuture<Void> removeTopicFromCache(String topic) { |
| TopicName topicName = TopicName.get(topic); |
| return pulsar.getNamespaceService().getBundleAsync(topicName) |
| .thenAccept(namespaceBundle -> { |
| removeTopicFromCache(topic, namespaceBundle); |
| }); |
| } |
| |
| public void removeTopicFromCache(String topic, NamespaceBundle namespaceBundle) { |
| String bundleName = namespaceBundle.toString(); |
| String namespaceName = TopicName.get(topic).getNamespaceObject().toString(); |
| |
| synchronized (multiLayerTopicsMap) { |
| ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>> namespaceMap = multiLayerTopicsMap |
| .get(namespaceName); |
| if (namespaceMap != null) { |
| ConcurrentOpenHashMap<String, Topic> bundleMap = namespaceMap.get(bundleName); |
| if (bundleMap != null) { |
| bundleMap.remove(topic); |
| if (bundleMap.isEmpty()) { |
| namespaceMap.remove(bundleName); |
| } |
| } |
| |
| if (namespaceMap.isEmpty()) { |
| multiLayerTopicsMap.remove(namespaceName); |
| final ClusterReplicationMetrics clusterReplicationMetrics = pulsarStats |
| .getClusterReplicationMetrics(); |
| replicationClients.forEach((cluster, client) -> { |
| clusterReplicationMetrics.remove(clusterReplicationMetrics.getKeyName(namespaceName, |
| cluster)); |
| }); |
| } |
| } |
| } |
| topics.remove(topic); |
| |
| try { |
| Compactor compactor = pulsar.getCompactor(false); |
| if (compactor != null) { |
| compactor.getStats().removeTopic(topic); |
| } |
| } catch (PulsarServerException ignore) { |
| } |
| } |
| |
| public int getNumberOfNamespaceBundles() { |
| this.numberOfNamespaceBundles = 0; |
| this.multiLayerTopicsMap.forEach((namespaceName, bundles) -> { |
| this.numberOfNamespaceBundles += bundles.size(); |
| }); |
| return this.numberOfNamespaceBundles; |
| } |
| |
| public ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> getTopics() { |
| return topics; |
| } |
| |
| |
| private void handleMetadataChanges(Notification n) { |
| if (n.getType() == NotificationType.Modified && NamespaceResources.pathIsFromNamespace(n.getPath())) { |
| NamespaceName ns = NamespaceResources.namespaceFromPath(n.getPath()); |
| handlePoliciesUpdates(ns); |
| } else if (pulsar().getPulsarResources().getDynamicConfigResources().isDynamicConfigurationPath(n.getPath())) { |
| handleDynamicConfigurationUpdates(); |
| } |
| |
| // Ignore unrelated notifications |
| } |
| |
| private void handlePoliciesUpdates(NamespaceName namespace) { |
| pulsar.getPulsarResources().getNamespaceResources().getPoliciesAsync(namespace) |
| .thenAccept(optPolicies -> { |
| if (!optPolicies.isPresent()) { |
| return; |
| } |
| |
| Policies policies = optPolicies.get(); |
| log.info("[{}] updating with {}", namespace, policies); |
| |
| topics.forEach((name, topicFuture) -> { |
| if (namespace.includes(TopicName.get(name))) { |
| // If the topic is already created, immediately apply the updated policies, otherwise |
| // once the topic is created it'll apply the policies update |
| topicFuture.thenAccept(topic -> { |
| if (log.isDebugEnabled()) { |
| log.debug("Notifying topic that policies have changed: {}", name); |
| } |
| |
| topic.ifPresent(t -> t.onPoliciesUpdate(policies)); |
| }); |
| } |
| }); |
| |
| // sometimes, some brokers don't receive policies-update watch and miss to remove |
| // replication-cluster and still own the bundle. That can cause data-loss for TODO: git-issue |
| unloadDeletedReplNamespace(policies, namespace); |
| }); |
| } |
| |
| private void handleDynamicConfigurationUpdates() { |
| pulsar().getPulsarResources().getDynamicConfigResources().getDynamicConfigurationAsync() |
| .thenAccept(optMap -> { |
| if (!optMap.isPresent()) { |
| return; |
| } |
| Map<String, String> data = optMap.get(); |
| data.forEach((configKey, value) -> { |
| Field configField = dynamicConfigurationMap.get(configKey).field; |
| Object newValue = FieldParser.value(data.get(configKey), configField); |
| if (configField != null) { |
| Consumer listener = configRegisteredListeners.get(configKey); |
| try { |
| Object existingValue = configField.get(pulsar.getConfiguration()); |
| configField.set(pulsar.getConfiguration(), newValue); |
| log.info("Successfully updated configuration {}/{}", configKey, |
| data.get(configKey)); |
| if (listener != null && !existingValue.equals(newValue)) { |
| listener.accept(newValue); |
| } |
| } catch (Exception e) { |
| log.error("Failed to update config {}/{}", configKey, newValue); |
| } |
| } else { |
| log.error("Found non-dynamic field in dynamicConfigMap {}/{}", configKey, newValue); |
| } |
| }); |
| }); |
| } |
| |
| /** |
| * Unloads the namespace bundles if local cluster is not part of replication-cluster list into the namespace. |
| * So, broker that owns the bundle and doesn't receive the zk-watch will unload the namespace. |
| * @param data |
| * @param namespace |
| */ |
| private void unloadDeletedReplNamespace(Policies data, NamespaceName namespace) { |
| if (!namespace.isGlobal()) { |
| return; |
| } |
| final String localCluster = this.pulsar.getConfiguration().getClusterName(); |
| if (!data.replication_clusters.contains(localCluster)) { |
| pulsar().getNamespaceService().getNamespaceBundleFactory() |
| .getBundlesAsync(namespace).thenAccept(bundles -> { |
| bundles.getBundles().forEach(bundle -> { |
| pulsar.getNamespaceService().isNamespaceBundleOwned(bundle).thenAccept(isExist -> { |
| if (isExist) { |
| this.pulsar().getExecutor().submit(() -> { |
| try { |
| pulsar().getAdminClient().namespaces().unloadNamespaceBundle(namespace.toString(), |
| bundle.getBundleRange()); |
| } catch (Exception e) { |
| log.error("Failed to unload namespace-bundle {}-{} that not owned by {}, {}", |
| namespace.toString(), bundle.toString(), localCluster, e.getMessage()); |
| } |
| }); |
| } |
| }); |
| }); |
| }); |
| } |
| } |
| |
| public PulsarService pulsar() { |
| return pulsar; |
| } |
| |
| public EventLoopGroup executor() { |
| return workerGroup; |
| } |
| |
| public ConcurrentOpenHashMap<String, PulsarClient> getReplicationClients() { |
| return replicationClients; |
| } |
| |
| public boolean isAuthenticationEnabled() { |
| return pulsar.getConfiguration().isAuthenticationEnabled(); |
| } |
| |
| public boolean isAuthorizationEnabled() { |
| return pulsar.getConfiguration().isAuthorizationEnabled(); |
| } |
| |
| public int getKeepAliveIntervalSeconds() { |
| return keepAliveIntervalSeconds; |
| } |
| |
| public String generateUniqueProducerName() { |
| return producerNameGenerator.getNextId(); |
| } |
| |
| public Map<String, TopicStatsImpl> getTopicStats() { |
| HashMap<String, TopicStatsImpl> stats = new HashMap<>(); |
| |
| forEachTopic(topic -> stats.put(topic.getName(), topic.getStats(false, false, false))); |
| |
| return stats; |
| } |
| |
| public AuthenticationService getAuthenticationService() { |
| return authenticationService; |
| } |
| |
| public List<Topic> getAllTopicsFromNamespaceBundle(String namespace, String bundle) { |
| ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>> map1 = multiLayerTopicsMap.get(namespace); |
| if (map1 == null) { |
| return Collections.emptyList(); |
| } |
| |
| ConcurrentOpenHashMap<String, Topic> map2 = map1.get(bundle); |
| if (map2 == null) { |
| return Collections.emptyList(); |
| } |
| |
| return map2.values(); |
| } |
| |
| /** |
| * Update dynamic-ServiceConfiguration with value present into zk-configuration-map and register listeners on |
| * dynamic-ServiceConfiguration field to take appropriate action on change of zk-configuration-map. |
| */ |
| private void updateConfigurationAndRegisterListeners() { |
| // (1) Dynamic-config value validation: add validator if updated value required strict check before considering |
| // validate configured load-manager classname present into classpath |
| addDynamicConfigValidator("loadManagerClassName", (className) -> { |
| try { |
| Class.forName(className); |
| } catch (ClassNotFoundException | NoClassDefFoundError e) { |
| log.warn("Configured load-manager class {} not found {}", className, e.getMessage()); |
| return false; |
| } |
| return true; |
| }); |
| |
| // (2) update ServiceConfiguration value by reading zk-configuration-map |
| updateDynamicServiceConfiguration(); |
| |
| // (3) Listener Registration |
| // add listener on "maxConcurrentLookupRequest" value change |
| registerConfigurationListener("maxConcurrentLookupRequest", |
| (maxConcurrentLookupRequest) -> lookupRequestSemaphore.set( |
| new Semaphore((int) maxConcurrentLookupRequest, false))); |
| // add listener on "maxConcurrentTopicLoadRequest" value change |
| registerConfigurationListener("maxConcurrentTopicLoadRequest", |
| (maxConcurrentTopicLoadRequest) -> topicLoadRequestSemaphore.set( |
| new Semaphore((int) maxConcurrentTopicLoadRequest, false))); |
| registerConfigurationListener("loadManagerClassName", className -> { |
| pulsar.getExecutor().execute(() -> { |
| try { |
| final LoadManager newLoadManager = LoadManager.create(pulsar); |
| log.info("Created load manager: {}", className); |
| pulsar.getLoadManager().get().stop(); |
| newLoadManager.start(); |
| pulsar.getLoadManager().set(newLoadManager); |
| } catch (Exception ex) { |
| log.warn("Failed to change load manager", ex); |
| } |
| }); |
| }); |
| // add listener to update message-dispatch-rate in msg for topic |
| registerConfigurationListener("dispatchThrottlingRatePerTopicInMsg", (dispatchRatePerTopicInMsg) -> { |
| updateTopicMessageDispatchRate(); |
| }); |
| // add listener to update message-dispatch-rate in byte for topic |
| registerConfigurationListener("dispatchThrottlingRatePerTopicInByte", (dispatchRatePerTopicInByte) -> { |
| updateTopicMessageDispatchRate(); |
| }); |
| // add listener to update managed-ledger config to skipNonRecoverableLedgers |
| registerConfigurationListener("autoSkipNonRecoverableData", (skipNonRecoverableLedger) -> { |
| updateManagedLedgerConfig(); |
| }); |
| // add listener to update message-dispatch-rate in msg for subscription |
| registerConfigurationListener("dispatchThrottlingRatePerSubscriptionInMsg", (dispatchRatePerTopicInMsg) -> { |
| updateSubscriptionMessageDispatchRate(); |
| }); |
| // add listener to update message-dispatch-rate in byte for subscription |
| registerConfigurationListener("dispatchThrottlingRatePerSubscriptionInByte", (dispatchRatePerTopicInByte) -> { |
| updateSubscriptionMessageDispatchRate(); |
| }); |
| |
| // add listener to update message-dispatch-rate in msg for replicator |
| registerConfigurationListener("dispatchThrottlingRatePerReplicatorInMsg", |
| (dispatchRatePerTopicInMsg) -> { |
| updateReplicatorMessageDispatchRate(); |
| }); |
| // add listener to update message-dispatch-rate in byte for replicator |
| registerConfigurationListener("dispatchThrottlingRatePerReplicatorInByte", |
| (dispatchRatePerTopicInByte) -> { |
| updateReplicatorMessageDispatchRate(); |
| }); |
| |
| // add listener to notify broker publish-rate monitoring |
| registerConfigurationListener("brokerPublisherThrottlingTickTimeMillis", |
| (publisherThrottlingTickTimeMillis) -> { |
| setupBrokerPublishRateLimiterMonitor(); |
| }); |
| // add listener to notify broker publish-rate dynamic config |
| registerConfigurationListener("brokerPublisherThrottlingMaxMessageRate", |
| (brokerPublisherThrottlingMaxMessageRate) -> |
| updateBrokerPublisherThrottlingMaxRate()); |
| registerConfigurationListener("brokerPublisherThrottlingMaxByteRate", |
| (brokerPublisherThrottlingMaxByteRate) -> |
| updateBrokerPublisherThrottlingMaxRate()); |
| |
| // add listener to notify topic publish-rate monitoring |
| if (!preciseTopicPublishRateLimitingEnable) { |
| registerConfigurationListener("topicPublisherThrottlingTickTimeMillis", |
| (publisherThrottlingTickTimeMillis) -> { |
| setupTopicPublishRateLimiterMonitor(); |
| }); |
| } |
| |
| // add listener to notify topic subscriptionTypesEnabled changed. |
| registerConfigurationListener("subscriptionTypesEnabled", this::updateBrokerSubscriptionTypesEnabled); |
| |
| // add more listeners here |
| } |
| |
| private void updateBrokerPublisherThrottlingMaxRate() { |
| int currentMaxMessageRate = pulsar.getConfiguration().getBrokerPublisherThrottlingMaxMessageRate(); |
| long currentMaxByteRate = pulsar.getConfiguration().getBrokerPublisherThrottlingMaxByteRate(); |
| int brokerTickMs = pulsar.getConfiguration().getBrokerPublisherThrottlingTickTimeMillis(); |
| |
| // not enable |
| if (brokerTickMs <= 0 || (currentMaxByteRate <= 0 && currentMaxMessageRate <= 0)) { |
| if (brokerPublishRateLimiter != PublishRateLimiter.DISABLED_RATE_LIMITER) { |
| refreshBrokerPublishRate(); |
| brokerPublishRateLimiter = PublishRateLimiter.DISABLED_RATE_LIMITER; |
| } |
| return; |
| } |
| |
| final PublishRate publishRate = new PublishRate(currentMaxMessageRate, currentMaxByteRate); |
| |
| log.info("Update broker publish rate limiting {}", publishRate); |
| // lazy init broker Publish-rateLimiting monitoring if not initialized yet |
| this.setupBrokerPublishRateLimiterMonitor(); |
| if (brokerPublishRateLimiter == null |
| || brokerPublishRateLimiter == PublishRateLimiter.DISABLED_RATE_LIMITER) { |
| // create new rateLimiter if rate-limiter is disabled |
| brokerPublishRateLimiter = new PublishRateLimiterImpl(publishRate); |
| } else { |
| brokerPublishRateLimiter.update(publishRate); |
| } |
| } |
| |
| private void updateTopicMessageDispatchRate() { |
| this.pulsar().getExecutor().execute(() -> { |
| // update message-rate for each topic |
| forEachTopic(topic -> { |
| if (topic.getDispatchRateLimiter().isPresent()) { |
| topic.getDispatchRateLimiter().get().updateDispatchRate(); |
| } |
| }); |
| }); |
| } |
| |
| private void updateBrokerSubscriptionTypesEnabled(Object subscriptionTypesEnabled) { |
| this.pulsar().getExecutor().execute(() -> { |
| // update subscriptionTypesEnabled |
| forEachTopic(topic -> { |
| if (topic instanceof AbstractTopic) { |
| ((AbstractTopic) topic).updateBrokerSubscriptionTypesEnabled(); |
| } |
| }); |
| }); |
| } |
| |
| private void updateSubscriptionMessageDispatchRate() { |
| this.pulsar().getExecutor().submit(() -> { |
| // update message-rate for each topic subscription |
| forEachTopic(topic -> { |
| topic.getSubscriptions().forEach((subName, persistentSubscription) -> { |
| Dispatcher dispatcher = persistentSubscription.getDispatcher(); |
| if (dispatcher != null) { |
| dispatcher.getRateLimiter().ifPresent(DispatchRateLimiter::updateDispatchRate); |
| } |
| }); |
| }); |
| }); |
| } |
| |
| private void updateReplicatorMessageDispatchRate() { |
| this.pulsar().getExecutor().submit(() -> { |
| // update message-rate for each topic Replicator in Geo-replication |
| forEachTopic(topic -> |
| topic.getReplicators().forEach((name, persistentReplicator) -> { |
| if (persistentReplicator.getRateLimiter().isPresent()) { |
| persistentReplicator.getRateLimiter().get().updateDispatchRate(); |
| } |
| })); |
| }); |
| } |
| |
| private void updateManagedLedgerConfig() { |
| this.pulsar().getExecutor().execute(() -> { |
| // update managed-ledger config of each topic |
| |
| forEachTopic(topic -> { |
| try { |
| if (topic instanceof PersistentTopic) { |
| PersistentTopic persistentTopic = (PersistentTopic) topic; |
| // update skipNonRecoverableLedger configuration |
| persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData( |
| pulsar.getConfiguration().isAutoSkipNonRecoverableData()); |
| } |
| } catch (Exception e) { |
| log.warn("[{}] failed to update managed-ledger config", topic.getName(), e); |
| } |
| }); |
| }); |
| } |
| |
| /** |
| * Allows a listener to listen on update of {@link ServiceConfiguration} change, so listener can take appropriate |
| * action if any specific config-field value has been changed. |
| * |
| * On notification, listener should first check if config value has been changed and after taking appropriate |
| * action, listener should update config value with new value if it has been changed (so, next time listener can |
| * compare values on configMap change). |
| * @param <T> |
| * |
| * @param configKey |
| * : configuration field name |
| * @param listener |
| * : listener which takes appropriate action on config-value change |
| */ |
| public <T> void registerConfigurationListener(String configKey, Consumer<T> listener) { |
| validateConfigKey(configKey); |
| configRegisteredListeners.put(configKey, listener); |
| } |
| |
| private void addDynamicConfigValidator(String key, Predicate<String> validator) { |
| validateConfigKey(key); |
| if (dynamicConfigurationMap.containsKey(key)) { |
| dynamicConfigurationMap.get(key).validator = validator; |
| } |
| } |
| |
| private void validateConfigKey(String key) { |
| try { |
| ServiceConfiguration.class.getDeclaredField(key); |
| } catch (Exception e) { |
| log.error("ServiceConfiguration key {} not found {}", key, e.getMessage()); |
| throw new IllegalArgumentException("Invalid service config " + key, e); |
| } |
| } |
| |
| /** |
| * Updates pulsar.ServiceConfiguration's dynamic field with value persistent into zk-dynamic path. It also validates |
| * dynamic-value before updating it and throws {@code IllegalArgumentException} if validation fails |
| */ |
| private void updateDynamicServiceConfiguration() { |
| Optional<Map<String, String>> configCache = Optional.empty(); |
| |
| try { |
| configCache = |
| pulsar().getPulsarResources().getDynamicConfigResources().getDynamicConfiguration(); |
| |
| // create dynamic-config if not exist. |
| if (!configCache.isPresent()) { |
| pulsar().getPulsarResources().getDynamicConfigResources() |
| .setDynamicConfigurationWithCreate(n -> Maps.newHashMap()); |
| } |
| } catch (Exception e) { |
| log.warn("Failed to read dynamic broker configuration", e); |
| } |
| |
| configCache.ifPresent(stringStringMap -> stringStringMap.forEach((key, value) -> { |
| // validate field |
| if (dynamicConfigurationMap.containsKey(key) && dynamicConfigurationMap.get(key).validator != null) { |
| if (!dynamicConfigurationMap.get(key).validator.test(value)) { |
| log.error("Failed to validate dynamic config {} with value {}", key, value); |
| throw new IllegalArgumentException( |
| String.format("Failed to validate dynamic-config %s/%s", key, value)); |
| } |
| } |
| // update field value |
| try { |
| Field field = ServiceConfiguration.class.getDeclaredField(key); |
| if (field != null && field.isAnnotationPresent(FieldContext.class)) { |
| field.setAccessible(true); |
| field.set(pulsar().getConfiguration(), FieldParser.value(value, field)); |
| log.info("Successfully updated {}/{}", key, value); |
| } |
| } catch (Exception e) { |
| log.warn("Failed to update service configuration {}/{}, {}", key, value, e.getMessage()); |
| } |
| })); |
| } |
| |
| public DelayedDeliveryTrackerFactory getDelayedDeliveryTrackerFactory() { |
| return delayedDeliveryTrackerFactory; |
| } |
| |
| public static List<String> getDynamicConfiguration() { |
| return dynamicConfigurationMap.keys(); |
| } |
| |
| public Map<String, String> getRuntimeConfiguration() { |
| Map<String, String> configMap = Maps.newHashMap(); |
| ConcurrentOpenHashMap<String, Object> runtimeConfigurationMap = getRuntimeConfigurationMap(); |
| runtimeConfigurationMap.forEach((key, value) -> { |
| configMap.put(key, String.valueOf(value)); |
| }); |
| return configMap; |
| } |
| |
| public static boolean isDynamicConfiguration(String key) { |
| return dynamicConfigurationMap.containsKey(key); |
| } |
| |
| public static boolean validateDynamicConfiguration(String key, String value) { |
| if (dynamicConfigurationMap.containsKey(key) && dynamicConfigurationMap.get(key).validator != null) { |
| return dynamicConfigurationMap.get(key).validator.test(value); |
| } |
| return true; |
| } |
| |
| private static ConcurrentOpenHashMap<String, ConfigField> prepareDynamicConfigurationMap() { |
| ConcurrentOpenHashMap<String, ConfigField> dynamicConfigurationMap = new ConcurrentOpenHashMap<>(); |
| for (Field field : ServiceConfiguration.class.getDeclaredFields()) { |
| if (field != null && field.isAnnotationPresent(FieldContext.class)) { |
| field.setAccessible(true); |
| if (field.getAnnotation(FieldContext.class).dynamic()) { |
| dynamicConfigurationMap.put(field.getName(), new ConfigField(field)); |
| } |
| } |
| } |
| return dynamicConfigurationMap; |
| } |
| |
| private ConcurrentOpenHashMap<String, Object> getRuntimeConfigurationMap() { |
| ConcurrentOpenHashMap<String, Object> runtimeConfigurationMap = new ConcurrentOpenHashMap<>(); |
| for (Field field : ServiceConfiguration.class.getDeclaredFields()) { |
| if (field != null && field.isAnnotationPresent(FieldContext.class)) { |
| field.setAccessible(true); |
| try { |
| Object configValue = field.get(pulsar.getConfiguration()); |
| runtimeConfigurationMap.put(field.getName(), configValue == null ? "" : configValue); |
| } catch (Exception e) { |
| log.error("Failed to get value of field {}, {}", field.getName(), e.getMessage()); |
| } |
| } |
| } |
| return runtimeConfigurationMap; |
| } |
| |
| /** |
| * Create pending topic and on completion it picks the next one until processes all topics in |
| * {@link #pendingTopicLoadingQueue}.<br/> |
| * It also tries to acquire {@link #topicLoadRequestSemaphore} so throttle down newly incoming topics and release |
| * permit if it was successful to acquire it. |
| */ |
| private void createPendingLoadTopic() { |
| Pair<String, CompletableFuture<Optional<Topic>>> pendingTopic = pendingTopicLoadingQueue.poll(); |
| if (pendingTopic == null) { |
| return; |
| } |
| |
| final String topic = pendingTopic.getLeft(); |
| checkTopicNsOwnership(topic).thenRun(() -> { |
| CompletableFuture<Optional<Topic>> pendingFuture = pendingTopic.getRight(); |
| final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get(); |
| final boolean acquiredPermit = topicLoadSemaphore.tryAcquire(); |
| createPersistentTopic(topic, true, pendingFuture); |
| pendingFuture.handle((persistentTopic, ex) -> { |
| // release permit and process next pending topic |
| if (acquiredPermit) { |
| topicLoadSemaphore.release(); |
| } |
| createPendingLoadTopic(); |
| return null; |
| }); |
| }).exceptionally(e -> { |
| log.error("Failed to create pending topic {}", topic, e); |
| pendingTopic.getRight() |
| .completeExceptionally((e instanceof RuntimeException && e.getCause() != null) ? e.getCause() : e); |
| // schedule to process next pending topic |
| inactivityMonitor.schedule(this::createPendingLoadTopic, 100, TimeUnit.MILLISECONDS); |
| return null; |
| }); |
| } |
| |
| public CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync( |
| TopicName topicName) { |
| if (pulsar.getNamespaceService() == null) { |
| return FutureUtil.failedFuture(new NamingException("namespace service is not ready")); |
| } |
| return pulsar.getNamespaceService().checkTopicExists(topicName) |
| .thenCompose(topicExists -> { |
| return fetchPartitionedTopicMetadataAsync(topicName) |
| .thenCompose(metadata -> { |
| CompletableFuture<PartitionedTopicMetadata> future = new CompletableFuture<>(); |
| |
| // There are a couple of potentially blocking calls, which we cannot make from the |
| // MetadataStore callback thread. |
| pulsar.getExecutor().execute(() -> { |
| // If topic is already exist, creating partitioned topic is not allowed. |
| |
| if (metadata.partitions == 0 |
| && !topicExists |
| && !topicName.isPartitioned() |
| && pulsar.getBrokerService().isAllowAutoTopicCreation(topicName) |
| && pulsar.getBrokerService().isDefaultTopicTypePartitioned(topicName)) { |
| |
| pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName) |
| .thenAccept(md -> future.complete(md)) |
| .exceptionally(ex -> { |
| future.completeExceptionally(ex); |
| return null; |
| }); |
| } else { |
| future.complete(metadata); |
| } |
| }); |
| |
| return future; |
| }); |
| }); |
| } |
| |
| @SuppressWarnings("deprecation") |
| private CompletableFuture<PartitionedTopicMetadata> createDefaultPartitionedTopicAsync(TopicName topicName) { |
| final int defaultNumPartitions = pulsar.getBrokerService().getDefaultNumPartitions(topicName); |
| final int maxPartitions = pulsar().getConfig().getMaxNumPartitionsPerPartitionedTopic(); |
| checkArgument(defaultNumPartitions > 0, |
| "Default number of partitions should be more than 0"); |
| checkArgument(maxPartitions <= 0 || defaultNumPartitions <= maxPartitions, |
| "Number of partitions should be less than or equal to " + maxPartitions); |
| |
| PartitionedTopicMetadata configMetadata = new PartitionedTopicMetadata(defaultNumPartitions); |
| |
| return checkMaxTopicsPerNamespace(topicName, defaultNumPartitions) |
| .thenCompose(__ -> { |
| PartitionedTopicResources partitionResources = pulsar.getPulsarResources().getNamespaceResources() |
| .getPartitionedTopicResources(); |
| return partitionResources.createPartitionedTopicAsync(topicName, configMetadata) |
| .thenApply(v -> { |
| log.info("partitioned metadata successfully created for {}", topicName); |
| return configMetadata; |
| }); |
| }); |
| } |
| |
| public CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadataAsync(TopicName topicName) { |
| // gets the number of partitions from the configuration cache |
| return pulsar.getPulsarResources().getNamespaceResources().getPartitionedTopicResources() |
| .getPartitionedTopicMetadataAsync(topicName).thenApply(metadata -> { |
| // if the partitioned topic is not found in metadata, then the topic is not partitioned |
| return metadata.orElseGet(() -> new PartitionedTopicMetadata()); |
| }); |
| } |
| |
| public OrderedExecutor getTopicOrderedExecutor() { |
| return topicOrderedExecutor; |
| } |
| |
| public ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>>> |
| getMultiLayerTopicMap() { |
| return multiLayerTopicsMap; |
| } |
| |
| /** |
| * If per-broker unacked message reached to limit then it blocks dispatcher if its unacked message limit has been |
| * reached to {@link #maxUnackedMsgsPerDispatcher}. |
| * |
| * @param dispatcher |
| * @param numberOfMessages |
| */ |
| public void addUnAckedMessages(PersistentDispatcherMultipleConsumers dispatcher, int numberOfMessages) { |
| // don't block dispatchers if maxUnackedMessages = 0 |
| if (maxUnackedMessages > 0) { |
| totalUnackedMessages.add(numberOfMessages); |
| |
| // block dispatcher: if broker is already blocked and dispatcher reaches to max dispatcher limit when broker |
| // is blocked |
| if (blockedDispatcherOnHighUnackedMsgs.get() && !dispatcher.isBlockedDispatcherOnUnackedMsgs() |
| && dispatcher.getTotalUnackedMessages() > maxUnackedMsgsPerDispatcher) { |
| lock.readLock().lock(); |
| try { |
| log.info("[{}] dispatcher reached to max unack msg limit on blocked-broker {}", |
| dispatcher.getName(), dispatcher.getTotalUnackedMessages()); |
| dispatcher.blockDispatcherOnUnackedMsgs(); |
| blockedDispatchers.add(dispatcher); |
| } finally { |
| lock.readLock().unlock(); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Adds given dispatcher's unackMessage count to broker-unack message count and if it reaches to the |
| * {@link #maxUnackedMessages} then it blocks all the dispatchers which has unack-messages higher than |
| * {@link #maxUnackedMsgsPerDispatcher}. It unblocks all dispatchers once broker-unack message counts decreased to |
| * ({@link #maxUnackedMessages}/2) |
| * |
| */ |
| public void checkUnAckMessageDispatching() { |
| |
| // don't block dispatchers if maxUnackedMessages = 0 |
| if (maxUnackedMessages <= 0) { |
| return; |
| } |
| long unAckedMessages = totalUnackedMessages.sum(); |
| if (unAckedMessages >= maxUnackedMessages && blockedDispatcherOnHighUnackedMsgs.compareAndSet(false, true)) { |
| // block dispatcher with higher unack-msg when it reaches broker-unack msg limit |
| log.info("Starting blocking dispatchers with unacked msgs {} due to reached max broker limit {}", |
| maxUnackedMessages, maxUnackedMsgsPerDispatcher); |
| executor().execute(() -> blockDispatchersWithLargeUnAckMessages()); |
| } else if (blockedDispatcherOnHighUnackedMsgs.get() && unAckedMessages < maxUnackedMessages / 2) { |
| // unblock broker-dispatching if received enough acked messages back |
| if (blockedDispatcherOnHighUnackedMsgs.compareAndSet(true, false)) { |
| unblockDispatchersOnUnAckMessages(blockedDispatchers.values()); |
| } |
| } |
| |
| } |
| |
| public boolean isBrokerDispatchingBlocked() { |
| return blockedDispatcherOnHighUnackedMsgs.get(); |
| } |
| |
| private void blockDispatchersWithLargeUnAckMessages() { |
| lock.readLock().lock(); |
| try { |
| forEachTopic(topic -> { |
| topic.getSubscriptions().forEach((subName, persistentSubscription) -> { |
| if (persistentSubscription.getDispatcher() instanceof PersistentDispatcherMultipleConsumers) { |
| PersistentDispatcherMultipleConsumers dispatcher = |
| (PersistentDispatcherMultipleConsumers) persistentSubscription |
| .getDispatcher(); |
| int dispatcherUnAckMsgs = dispatcher.getTotalUnackedMessages(); |
| if (dispatcherUnAckMsgs > maxUnackedMsgsPerDispatcher) { |
| log.info("[{}] Blocking dispatcher due to reached max broker limit {}", |
| dispatcher.getName(), dispatcher.getTotalUnackedMessages()); |
| dispatcher.blockDispatcherOnUnackedMsgs(); |
| blockedDispatchers.add(dispatcher); |
| } |
| } |
| }); |
| }); |
| } finally { |
| lock.readLock().unlock(); |
| } |
| } |
| |
| /** |
| * Unblocks the dispatchers and removes it from the {@link #blockedDispatchers} list. |
| * |
| * @param dispatcherList |
| */ |
| public void unblockDispatchersOnUnAckMessages(List<PersistentDispatcherMultipleConsumers> dispatcherList) { |
| lock.writeLock().lock(); |
| try { |
| dispatcherList.forEach(dispatcher -> { |
| dispatcher.unBlockDispatcherOnUnackedMsgs(); |
| executor().execute(() -> dispatcher.readMoreEntries()); |
| log.info("[{}] Dispatcher is unblocked", dispatcher.getName()); |
| blockedDispatchers.remove(dispatcher); |
| }); |
| } finally { |
| lock.writeLock().unlock(); |
| } |
| } |
| |
| private static class ConfigField { |
| final Field field; |
| Predicate<String> validator; |
| |
| public ConfigField(Field field) { |
| super(); |
| this.field = field; |
| } |
| } |
| |
| /** |
| * Safely extract optional topic instance from a future, in a way to avoid unchecked exceptions and race conditions. |
| */ |
| public static Optional<Topic> extractTopic(CompletableFuture<Optional<Topic>> topicFuture) { |
| if (topicFuture.isDone() && !topicFuture.isCompletedExceptionally()) { |
| return topicFuture.join(); |
| } else { |
| return Optional.empty(); |
| } |
| } |
| |
| public Optional<Integer> getListenPort() { |
| if (listenChannel != null) { |
| return Optional.of(((InetSocketAddress) listenChannel.localAddress()).getPort()); |
| } else { |
| return Optional.empty(); |
| } |
| } |
| |
| public Optional<Integer> getListenPortTls() { |
| if (listenChannelTls != null) { |
| return Optional.of(((InetSocketAddress) listenChannelTls.localAddress()).getPort()); |
| } else { |
| return Optional.empty(); |
| } |
| } |
| |
| private void foreachCnx(Consumer<TransportCnx> consumer) { |
| Set<TransportCnx> cnxSet = new HashSet<>(); |
| topics.forEach((n, t) -> { |
| Optional<Topic> topic = extractTopic(t); |
| topic.ifPresent(value -> value.getProducers().values().forEach(producer -> cnxSet.add(producer.getCnx()))); |
| }); |
| cnxSet.forEach(consumer); |
| } |
| |
| public boolean isAllowAutoTopicCreation(final String topic) { |
| TopicName topicName = TopicName.get(topic); |
| return isAllowAutoTopicCreation(topicName); |
| } |
| |
| public boolean isAllowAutoTopicCreation(final TopicName topicName) { |
| //System topic can always be created automatically |
| if (pulsar.getConfiguration().isSystemTopicEnabled() && checkTopicIsEventsNames(topicName)) { |
| return true; |
| } |
| AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName); |
| if (autoTopicCreationOverride != null) { |
| return autoTopicCreationOverride.isAllowAutoTopicCreation(); |
| } else { |
| return pulsar.getConfiguration().isAllowAutoTopicCreation(); |
| } |
| } |
| |
| public boolean isDefaultTopicTypePartitioned(final TopicName topicName) { |
| AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName); |
| if (autoTopicCreationOverride != null) { |
| return TopicType.PARTITIONED.toString().equals(autoTopicCreationOverride.getTopicType()); |
| } else { |
| return pulsar.getConfiguration().isDefaultTopicTypePartitioned(); |
| } |
| } |
| |
| public int getDefaultNumPartitions(final TopicName topicName) { |
| AutoTopicCreationOverride autoTopicCreationOverride = getAutoTopicCreationOverride(topicName); |
| if (autoTopicCreationOverride != null) { |
| return autoTopicCreationOverride.getDefaultNumPartitions(); |
| } else { |
| return pulsar.getConfiguration().getDefaultNumPartitions(); |
| } |
| } |
| |
| private AutoTopicCreationOverride getAutoTopicCreationOverride(final TopicName topicName) { |
| Optional<Policies> policies = |
| pulsar.getPulsarResources().getNamespaceResources() |
| .getPoliciesIfCached(topicName.getNamespaceObject()); |
| // If namespace policies have the field set, it will override the broker-level setting |
| if (policies.isPresent() && policies.get().autoTopicCreationOverride != null) { |
| return policies.get().autoTopicCreationOverride; |
| } |
| log.debug("No autoTopicCreateOverride policy found for {}", topicName); |
| return null; |
| } |
| |
| public boolean isAllowAutoSubscriptionCreation(final String topic) { |
| TopicName topicName = TopicName.get(topic); |
| return isAllowAutoSubscriptionCreation(topicName); |
| } |
| |
| public boolean isAllowAutoSubscriptionCreation(final TopicName topicName) { |
| AutoSubscriptionCreationOverride autoSubscriptionCreationOverride = |
| getAutoSubscriptionCreationOverride(topicName); |
| if (autoSubscriptionCreationOverride != null) { |
| return autoSubscriptionCreationOverride.isAllowAutoSubscriptionCreation(); |
| } else { |
| return pulsar.getConfiguration().isAllowAutoSubscriptionCreation(); |
| } |
| } |
| |
| private AutoSubscriptionCreationOverride getAutoSubscriptionCreationOverride(final TopicName topicName) { |
| Optional<Policies> policies = |
| pulsar.getPulsarResources().getNamespaceResources().getPoliciesIfCached(topicName.getNamespaceObject()); |
| // If namespace policies have the field set, it will override the broker-level setting |
| if (policies.isPresent() && policies.get().autoSubscriptionCreationOverride != null) { |
| return policies.get().autoSubscriptionCreationOverride; |
| } |
| log.debug("No autoSubscriptionCreateOverride policy found for {}", topicName); |
| return null; |
| } |
| private boolean isSystemTopic(String topic) { |
| return SystemTopicClient.isSystemTopic(TopicName.get(topic)); |
| } |
| |
| /** |
| * Get {@link TopicPolicies} for the parameterized topic. |
| * @param topicName |
| * @return TopicPolicies, if they exist. Otherwise, the value will not be present. |
| */ |
| public Optional<TopicPolicies> getTopicPolicies(TopicName topicName) { |
| if (!pulsar().getConfig().isTopicLevelPoliciesEnabled()) { |
| return Optional.empty(); |
| } |
| try { |
| return Optional.ofNullable(pulsar.getTopicPoliciesService().getTopicPolicies(topicName)); |
| } catch (BrokerServiceException.TopicPoliciesCacheNotInitException e) { |
| log.debug("Topic {} policies have not been initialized yet.", topicName.getPartitionedTopicName()); |
| return Optional.empty(); |
| } |
| } |
| |
| public CompletableFuture<Void> deleteTopicPolicies(TopicName topicName) { |
| if (!pulsar().getConfig().isTopicLevelPoliciesEnabled()) { |
| return CompletableFuture.completedFuture(null); |
| } |
| TopicName cloneTopicName = TopicName.get(topicName.getPartitionedTopicName()); |
| return pulsar.getTopicPoliciesService().deleteTopicPoliciesAsync(cloneTopicName); |
| } |
| |
| private CompletableFuture<Void> checkMaxTopicsPerNamespace(TopicName topicName, int numPartitions) { |
| return pulsar.getPulsarResources().getNamespaceResources() |
| .getPoliciesAsync(topicName.getNamespaceObject()) |
| .thenCompose(optPolicies -> { |
| int maxTopicsPerNamespace = optPolicies.map(p -> p.max_topics_per_namespace) |
| .orElse(pulsar.getConfig().getMaxTopicsPerNamespace()); |
| |
| if (maxTopicsPerNamespace > 0 && !SystemTopicClient.isSystemTopic(topicName)) { |
| return pulsar().getPulsarResources().getTopicResources() |
| .getExistingPartitions(topicName) |
| .thenCompose(topics -> { |
| // exclude created system topic |
| long topicsCount = topics.stream() |
| .filter(t -> !SystemTopicClient.isSystemTopic(TopicName.get(t))) |
| .count(); |
| if (topicsCount + numPartitions > maxTopicsPerNamespace) { |
| log.error("Failed to create persistent topic {}, " |
| + "exceed maximum number of topics in namespace", topicName); |
| return FutureUtil.failedFuture( |
| new RestException(Response.Status.PRECONDITION_FAILED, |
| "Exceed maximum number of topics in namespace.")); |
| } else { |
| return CompletableFuture.completedFuture(null); |
| } |
| }); |
| } else { |
| return CompletableFuture.completedFuture(null); |
| } |
| }); |
| } |
| |
| public void setInterceptor(BrokerInterceptor interceptor) { |
| this.interceptor = interceptor; |
| } |
| |
| public Set<BrokerEntryMetadataInterceptor> getBrokerEntryMetadataInterceptors() { |
| return brokerEntryMetadataInterceptors; |
| } |
| |
| public boolean isBrokerEntryMetadataEnabled() { |
| return !brokerEntryMetadataInterceptors.isEmpty(); |
| } |
| |
| public boolean isBrokerPayloadProcessorEnabled() { |
| return !brokerEntryPayloadProcessors.isEmpty(); |
| } |
| |
| public void pausedConnections(int numberOfConnections) { |
| pausedConnections.add(numberOfConnections); |
| } |
| |
| public void resumedConnections(int numberOfConnections) { |
| pausedConnections.add(-numberOfConnections); |
| } |
| |
| public long getPausedConnections() { |
| return pausedConnections.longValue(); |
| } |
| |
| @VisibleForTesting |
| public void setPulsarChannelInitializerFactory(PulsarChannelInitializer.Factory factory) { |
| this.pulsarChannelInitFactory = factory; |
| } |
| } |