| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.broker.service; |
| |
| import static com.google.common.base.Preconditions.checkArgument; |
| import static com.google.common.base.Preconditions.checkNotNull; |
| import static org.apache.bookkeeper.mledger.util.SafeRun.safeRun; |
| import static org.apache.commons.collections.CollectionUtils.isEmpty; |
| import static org.apache.commons.lang3.StringUtils.isNotBlank; |
| import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; |
| import static org.apache.pulsar.broker.cache.LocalZooKeeperCacheService.LOCAL_POLICIES_ROOT; |
| import static org.apache.pulsar.broker.web.PulsarWebResource.joinPath; |
| |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import com.google.common.collect.Queues; |
| import io.netty.bootstrap.ServerBootstrap; |
| import io.netty.buffer.ByteBuf; |
| import io.netty.channel.AdaptiveRecvByteBufAllocator; |
| import io.netty.channel.Channel; |
| import io.netty.channel.ChannelInitializer; |
| import io.netty.channel.ChannelOption; |
| import io.netty.channel.EventLoopGroup; |
| import io.netty.channel.socket.SocketChannel; |
| import io.netty.handler.ssl.SslContext; |
| import io.netty.util.concurrent.DefaultThreadFactory; |
| import java.io.Closeable; |
| import java.io.IOException; |
| import java.lang.reflect.Field; |
| import java.net.InetSocketAddress; |
| import java.net.SocketAddress; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Optional; |
| import java.util.Set; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ConcurrentLinkedQueue; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.Semaphore; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.concurrent.atomic.LongAdder; |
| import java.util.concurrent.locks.ReadWriteLock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| import java.util.function.Consumer; |
| import java.util.function.Predicate; |
| import lombok.AccessLevel; |
| import lombok.Getter; |
| import lombok.Setter; |
| import org.apache.bookkeeper.common.util.OrderedExecutor; |
| import org.apache.bookkeeper.common.util.OrderedScheduler; |
| import org.apache.bookkeeper.mledger.AsyncCallbacks.OpenLedgerCallback; |
| import org.apache.bookkeeper.mledger.ManagedLedger; |
| import org.apache.bookkeeper.mledger.ManagedLedgerConfig; |
| import org.apache.bookkeeper.mledger.ManagedLedgerException; |
| import org.apache.bookkeeper.mledger.ManagedLedgerException.ManagedLedgerNotFoundException; |
| import org.apache.bookkeeper.mledger.ManagedLedgerFactory; |
| import org.apache.bookkeeper.util.ZkUtils; |
| import org.apache.commons.lang3.tuple.ImmutablePair; |
| import org.apache.commons.lang3.tuple.Pair; |
| import org.apache.pulsar.broker.PulsarServerException; |
| import org.apache.pulsar.broker.PulsarService; |
| import org.apache.pulsar.broker.ServiceConfiguration; |
| import org.apache.pulsar.broker.admin.AdminResource; |
| import org.apache.pulsar.broker.authentication.AuthenticationService; |
| import org.apache.pulsar.broker.authorization.AuthorizationService; |
| import org.apache.pulsar.broker.cache.ConfigurationCacheService; |
| import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerFactory; |
| import org.apache.pulsar.broker.delayed.DelayedDeliveryTrackerLoader; |
| import org.apache.pulsar.broker.loadbalance.LoadManager; |
| import org.apache.pulsar.broker.service.BrokerServiceException.NamingException; |
| import org.apache.pulsar.broker.service.BrokerServiceException.NotAllowedException; |
| import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException; |
| import org.apache.pulsar.broker.service.BrokerServiceException.ServerMetadataException; |
| import org.apache.pulsar.broker.service.BrokerServiceException.ServiceUnitNotReadyException; |
| import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic; |
| import org.apache.pulsar.broker.service.persistent.DispatchRateLimiter; |
| import org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers; |
| import org.apache.pulsar.broker.service.persistent.PersistentTopic; |
| import org.apache.pulsar.broker.stats.ClusterReplicationMetrics; |
| import org.apache.pulsar.broker.stats.prometheus.metrics.Summary; |
| import org.apache.pulsar.broker.web.PulsarWebResource; |
| import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect; |
| import org.apache.pulsar.broker.zookeeper.aspectj.ClientCnxnAspect.EventListner; |
| import org.apache.pulsar.client.admin.PulsarAdmin; |
| import org.apache.pulsar.client.admin.PulsarAdminBuilder; |
| import org.apache.pulsar.client.api.ClientBuilder; |
| import org.apache.pulsar.client.api.PulsarClient; |
| import org.apache.pulsar.client.api.PulsarClientException; |
| import org.apache.pulsar.client.impl.ClientBuilderImpl; |
| import org.apache.pulsar.client.impl.PulsarClientImpl; |
| import org.apache.pulsar.client.impl.conf.ClientConfigurationData; |
| import org.apache.pulsar.common.allocator.PulsarByteBufAllocator; |
| import org.apache.pulsar.common.configuration.FieldContext; |
| import org.apache.pulsar.common.naming.NamespaceBundle; |
| import org.apache.pulsar.common.naming.NamespaceBundleFactory; |
| import org.apache.pulsar.common.naming.NamespaceBundles; |
| import org.apache.pulsar.common.naming.NamespaceName; |
| import org.apache.pulsar.common.naming.TopicDomain; |
| import org.apache.pulsar.common.naming.TopicName; |
| import org.apache.pulsar.common.partition.PartitionedTopicMetadata; |
| import org.apache.pulsar.common.policies.data.ClusterData; |
| import org.apache.pulsar.common.policies.data.LocalPolicies; |
| import org.apache.pulsar.common.policies.data.PersistencePolicies; |
| import org.apache.pulsar.common.policies.data.PersistentOfflineTopicStats; |
| import org.apache.pulsar.common.policies.data.Policies; |
| import org.apache.pulsar.common.policies.data.PublishRate; |
| import org.apache.pulsar.common.policies.data.RetentionPolicies; |
| import org.apache.pulsar.common.policies.data.TopicStats; |
| import org.apache.pulsar.common.stats.Metrics; |
| import org.apache.pulsar.common.util.FieldParser; |
| import org.apache.pulsar.common.util.FutureUtil; |
| import org.apache.pulsar.common.util.ObjectMapperFactory; |
| import org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap; |
| import org.apache.pulsar.common.util.collections.ConcurrentOpenHashSet; |
| import org.apache.pulsar.common.util.netty.EventLoopUtil; |
| import org.apache.pulsar.policies.data.loadbalancer.NamespaceBundleStats; |
| import org.apache.pulsar.zookeeper.ZkIsolatedBookieEnsemblePlacementPolicy; |
| import org.apache.pulsar.zookeeper.ZooKeeperCacheListener; |
| import org.apache.pulsar.zookeeper.ZooKeeperDataCache; |
| import org.apache.zookeeper.CreateMode; |
| import org.apache.zookeeper.KeeperException; |
| import org.apache.zookeeper.ZooDefs; |
| import org.apache.zookeeper.ZooDefs.Ids; |
| import org.apache.zookeeper.data.Stat; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| @Getter(AccessLevel.PUBLIC) |
| @Setter(AccessLevel.PROTECTED) |
| public class BrokerService implements Closeable, ZooKeeperCacheListener<Policies> { |
| private static final Logger log = LoggerFactory.getLogger(BrokerService.class); |
| |
| private final PulsarService pulsar; |
| private final ManagedLedgerFactory managedLedgerFactory; |
| |
| private final ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> topics; |
| |
| private final ConcurrentOpenHashMap<String, PulsarClient> replicationClients; |
| private final ConcurrentOpenHashMap<String, PulsarAdmin> clusterAdmins; |
| |
| // Multi-layer topics map: |
| // Namespace --> Bundle --> topicName --> topic |
| private final ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>>> multiLayerTopicsMap; |
| private int numberOfNamespaceBundles = 0; |
| |
| private final EventLoopGroup acceptorGroup; |
| private final EventLoopGroup workerGroup; |
| private final OrderedExecutor topicOrderedExecutor; |
| // offline topic backlog cache |
| private final ConcurrentOpenHashMap<TopicName, PersistentOfflineTopicStats> offlineTopicStatCache; |
| private static final ConcurrentOpenHashMap<String, ConfigField> dynamicConfigurationMap = prepareDynamicConfigurationMap(); |
| private final ConcurrentOpenHashMap<String, Consumer<?>> configRegisteredListeners; |
| |
| private final ConcurrentLinkedQueue<Pair<String, CompletableFuture<Optional<Topic>>>> pendingTopicLoadingQueue; |
| |
| private AuthorizationService authorizationService = null; |
| private final ScheduledExecutorService statsUpdater; |
| private final ScheduledExecutorService backlogQuotaChecker; |
| |
| protected final AtomicReference<Semaphore> lookupRequestSemaphore; |
| protected final AtomicReference<Semaphore> topicLoadRequestSemaphore; |
| |
| private final ScheduledExecutorService inactivityMonitor; |
| private final ScheduledExecutorService messageExpiryMonitor; |
| private final ScheduledExecutorService compactionMonitor; |
| private ScheduledExecutorService topicPublishRateLimiterMonitor; |
| private ScheduledExecutorService brokerPublishRateLimiterMonitor; |
| protected volatile PublishRateLimiter brokerPublishRateLimiter = PublishRateLimiter.DISABLED_RATE_LIMITER; |
| |
| private DistributedIdGenerator producerNameGenerator; |
| |
| public final static String producerNameGeneratorPath = "/counters/producer-name"; |
| |
| private final BacklogQuotaManager backlogQuotaManager; |
| |
| private final int keepAliveIntervalSeconds; |
| private final PulsarStats pulsarStats; |
| private final EventListner zkStatsListener; |
| private final AuthenticationService authenticationService; |
| |
| public static final String BROKER_SERVICE_CONFIGURATION_PATH = "/admin/configuration"; |
| private final ZooKeeperDataCache<Map<String, String>> dynamicConfigurationCache; |
| |
| private static final LongAdder totalUnackedMessages = new LongAdder(); |
| private final int maxUnackedMessages; |
| public final int maxUnackedMsgsPerDispatcher; |
| private static final AtomicBoolean blockedDispatcherOnHighUnackedMsgs = new AtomicBoolean(false); |
| private final ConcurrentOpenHashSet<PersistentDispatcherMultipleConsumers> blockedDispatchers; |
| private final ReadWriteLock lock = new ReentrantReadWriteLock(); |
| |
| private final DelayedDeliveryTrackerFactory delayedDeliveryTrackerFactory; |
| private final ServerBootstrap defaultServerBootstrap; |
| |
| private Channel listenChannel; |
| private Channel listenChannelTls; |
| |
| public BrokerService(PulsarService pulsar) throws Exception { |
| this.pulsar = pulsar; |
| this.managedLedgerFactory = pulsar.getManagedLedgerFactory(); |
| this.topics = new ConcurrentOpenHashMap<>(); |
| this.replicationClients = new ConcurrentOpenHashMap<>(); |
| this.clusterAdmins = new ConcurrentOpenHashMap<>(); |
| this.keepAliveIntervalSeconds = pulsar.getConfiguration().getKeepAliveIntervalSeconds(); |
| this.configRegisteredListeners = new ConcurrentOpenHashMap<>(); |
| this.pendingTopicLoadingQueue = Queues.newConcurrentLinkedQueue(); |
| |
| this.multiLayerTopicsMap = new ConcurrentOpenHashMap<>(); |
| this.pulsarStats = new PulsarStats(pulsar); |
| this.offlineTopicStatCache = new ConcurrentOpenHashMap<>(); |
| |
| this.topicOrderedExecutor = OrderedScheduler.newSchedulerBuilder() |
| .numThreads(pulsar.getConfiguration().getNumWorkerThreadsForNonPersistentTopic()) |
| .name("broker-topic-workers").build(); |
| final DefaultThreadFactory acceptorThreadFactory = new DefaultThreadFactory("pulsar-acceptor"); |
| final DefaultThreadFactory workersThreadFactory = new DefaultThreadFactory("pulsar-io"); |
| final int numThreads = pulsar.getConfiguration().getNumIOThreads(); |
| log.info("Using {} threads for broker service IO", numThreads); |
| |
| this.acceptorGroup = EventLoopUtil.newEventLoopGroup(1, acceptorThreadFactory); |
| this.workerGroup = EventLoopUtil.newEventLoopGroup(numThreads, workersThreadFactory); |
| this.statsUpdater = Executors |
| .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-stats-updater")); |
| if (pulsar.getConfiguration().isAuthorizationEnabled()) { |
| this.authorizationService = new AuthorizationService(pulsar.getConfiguration(), |
| pulsar.getConfigurationCache()); |
| } |
| |
| if (pulsar.getConfigurationCache() != null) { |
| pulsar.getConfigurationCache().policiesCache().registerListener(this); |
| } |
| |
| this.inactivityMonitor = Executors |
| .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-inactivity-monitor")); |
| this.messageExpiryMonitor = Executors |
| .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-msg-expiry-monitor")); |
| this.compactionMonitor = |
| Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-compaction-monitor")); |
| |
| this.backlogQuotaManager = new BacklogQuotaManager(pulsar); |
| this.backlogQuotaChecker = Executors |
| .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-backlog-quota-checker")); |
| this.authenticationService = new AuthenticationService(pulsar.getConfiguration()); |
| this.dynamicConfigurationCache = new ZooKeeperDataCache<Map<String, String>>(pulsar().getLocalZkCache()) { |
| @Override |
| public Map<String, String> deserialize(String key, byte[] content) throws Exception { |
| return ObjectMapperFactory.getThreadLocal().readValue(content, HashMap.class); |
| } |
| }; |
| this.blockedDispatchers = new ConcurrentOpenHashSet<>(); |
| // update dynamic configuration and register-listener |
| updateConfigurationAndRegisterListeners(); |
| this.lookupRequestSemaphore = new AtomicReference<Semaphore>( |
| new Semaphore(pulsar.getConfiguration().getMaxConcurrentLookupRequest(), false)); |
| this.topicLoadRequestSemaphore = new AtomicReference<Semaphore>( |
| new Semaphore(pulsar.getConfiguration().getMaxConcurrentTopicLoadRequest(), false)); |
| if (pulsar.getConfiguration().getMaxUnackedMessagesPerBroker() > 0 |
| && pulsar.getConfiguration().getMaxUnackedMessagesPerSubscriptionOnBrokerBlocked() > 0.0) { |
| this.maxUnackedMessages = pulsar.getConfiguration().getMaxUnackedMessagesPerBroker(); |
| this.maxUnackedMsgsPerDispatcher = (int) ((maxUnackedMessages |
| * pulsar.getConfiguration().getMaxUnackedMessagesPerSubscriptionOnBrokerBlocked()) / 100); |
| log.info("Enabling per-broker unack-message limit {} and dispatcher-limit {} on blocked-broker", |
| maxUnackedMessages, maxUnackedMsgsPerDispatcher); |
| // block misbehaving dispatcher by checking periodically |
| pulsar.getExecutor().scheduleAtFixedRate(() -> checkUnAckMessageDispatching(), 600, 30, TimeUnit.SECONDS); |
| } else { |
| this.maxUnackedMessages = 0; |
| this.maxUnackedMsgsPerDispatcher = 0; |
| log.info( |
| "Disabling per broker unack-msg blocking due invalid unAckMsgSubscriptionPercentageLimitOnBrokerBlocked {} ", |
| pulsar.getConfiguration().getMaxUnackedMessagesPerSubscriptionOnBrokerBlocked()); |
| } |
| |
| // register listener to capture zk-latency |
| zkStatsListener = (eventType, latencyMs) -> pulsarStats.recordZkLatencyTimeValue(eventType, latencyMs); |
| |
| this.delayedDeliveryTrackerFactory = DelayedDeliveryTrackerLoader |
| .loadDelayedDeliveryTrackerFactory(pulsar.getConfiguration()); |
| |
| this.defaultServerBootstrap = defaultServerBootstrap(); |
| } |
| |
| // This call is used for starting additional protocol handlers |
| public void startProtocolHandlers( |
| Map<String, Map<InetSocketAddress, ChannelInitializer<SocketChannel>>> protocolHandlers) { |
| |
| protocolHandlers.forEach((protocol, initializers) -> { |
| initializers.forEach((address, initializer) -> { |
| try { |
| startProtocolHandler(protocol, address, initializer); |
| } catch (IOException e) { |
| log.error("{}", e.getMessage(), e.getCause()); |
| throw new RuntimeException(e.getMessage(), e.getCause()); |
| } |
| }); |
| }); |
| } |
| |
| private void startProtocolHandler(String protocol, |
| SocketAddress address, |
| ChannelInitializer<SocketChannel> initializer) throws IOException { |
| ServerBootstrap bootstrap = defaultServerBootstrap.clone(); |
| bootstrap.childHandler(initializer); |
| try { |
| bootstrap.bind(address).sync(); |
| } catch (Exception e) { |
| throw new IOException("Failed to bind protocol `" + protocol + "` on " + address, e); |
| } |
| log.info("Successfully bind protocol `{}` on {}", protocol, address); |
| } |
| |
| private ServerBootstrap defaultServerBootstrap() { |
| ServerBootstrap bootstrap = new ServerBootstrap(); |
| bootstrap.childOption(ChannelOption.ALLOCATOR, PulsarByteBufAllocator.DEFAULT); |
| bootstrap.group(acceptorGroup, workerGroup); |
| bootstrap.childOption(ChannelOption.TCP_NODELAY, true); |
| bootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, |
| new AdaptiveRecvByteBufAllocator(1024, 16 * 1024, 1 * 1024 * 1024)); |
| bootstrap.channel(EventLoopUtil.getServerSocketChannelClass(workerGroup)); |
| EventLoopUtil.enableTriggeredMode(bootstrap); |
| return bootstrap; |
| } |
| |
| public void start() throws Exception { |
| this.producerNameGenerator = new DistributedIdGenerator(pulsar.getZkClient(), producerNameGeneratorPath, |
| pulsar.getConfiguration().getClusterName()); |
| |
| ServerBootstrap bootstrap = defaultServerBootstrap.clone(); |
| |
| ServiceConfiguration serviceConfig = pulsar.getConfiguration(); |
| |
| bootstrap.childHandler(new PulsarChannelInitializer(pulsar, false)); |
| |
| Optional<Integer> port = serviceConfig.getBrokerServicePort(); |
| if (port.isPresent()) { |
| // Bind and start to accept incoming connections. |
| InetSocketAddress addr = new InetSocketAddress(pulsar.getBindAddress(), port.get()); |
| try { |
| listenChannel = bootstrap.bind(addr).sync().channel(); |
| log.info("Started Pulsar Broker service on {}", listenChannel.localAddress()); |
| } catch (Exception e) { |
| throw new IOException("Failed to bind Pulsar broker on " + addr, e); |
| } |
| } |
| |
| Optional<Integer> tlsPort = serviceConfig.getBrokerServicePortTls(); |
| if (tlsPort.isPresent()) { |
| ServerBootstrap tlsBootstrap = bootstrap.clone(); |
| tlsBootstrap.childHandler(new PulsarChannelInitializer(pulsar, true)); |
| try { |
| listenChannelTls = tlsBootstrap.bind(new InetSocketAddress(pulsar.getBindAddress(), tlsPort.get())).sync() |
| .channel(); |
| log.info("Started Pulsar Broker TLS service on {} - TLS provider: {}", listenChannelTls.localAddress(), |
| SslContext.defaultServerProvider()); |
| } catch (Exception e) { |
| throw new IOException(String.format("Failed to start Pulsar Broker TLS service on %s:%d", |
| pulsar.getBindAddress(), tlsPort.get()), e); |
| } |
| } |
| |
| // start other housekeeping functions |
| this.startStatsUpdater( |
| serviceConfig.getStatsUpdateInitialDelayInSecs(), |
| serviceConfig.getStatsUpdateFrequencyInSecs()); |
| this.startInactivityMonitor(); |
| this.startMessageExpiryMonitor(); |
| this.startCompactionMonitor(); |
| this.startBacklogQuotaChecker(); |
| this.updateBrokerPublisherThrottlingMaxRate(); |
| // register listener to capture zk-latency |
| ClientCnxnAspect.addListener(zkStatsListener); |
| ClientCnxnAspect.registerExecutor(pulsar.getExecutor()); |
| } |
| |
| protected void startStatsUpdater(int statsUpdateInitailDelayInSecs, int statsUpdateFrequencyInSecs) { |
| statsUpdater.scheduleAtFixedRate(safeRun(this::updateRates), |
| statsUpdateInitailDelayInSecs, statsUpdateFrequencyInSecs, TimeUnit.SECONDS); |
| |
| // Ensure the broker starts up with initial stats |
| updateRates(); |
| } |
| |
| protected void startInactivityMonitor() { |
| if (pulsar().getConfiguration().isBrokerDeleteInactiveTopicsEnabled()) { |
| int interval = pulsar().getConfiguration().getBrokerDeleteInactiveTopicsFrequencySeconds(); |
| int maxInactiveDurationInSec = pulsar().getConfiguration().getBrokerDeleteInactiveTopicsMaxInactiveDurationSeconds(); |
| inactivityMonitor.scheduleAtFixedRate(safeRun(() -> checkGC(maxInactiveDurationInSec)), interval, interval, |
| TimeUnit.SECONDS); |
| } |
| |
| // Deduplication info checker |
| long duplicationCheckerIntervalInSeconds = TimeUnit.MINUTES |
| .toSeconds(pulsar().getConfiguration().getBrokerDeduplicationProducerInactivityTimeoutMinutes()) / 3; |
| inactivityMonitor.scheduleAtFixedRate(safeRun(this::checkMessageDeduplicationInfo), duplicationCheckerIntervalInSeconds, |
| duplicationCheckerIntervalInSeconds, TimeUnit.SECONDS); |
| |
| // Inactive subscriber checker |
| if (pulsar().getConfiguration().getSubscriptionExpirationTimeMinutes() > 0) { |
| long subscriptionExpiryCheckIntervalInSeconds = |
| TimeUnit.MINUTES.toSeconds(pulsar().getConfiguration().getSubscriptionExpiryCheckIntervalInMinutes()); |
| inactivityMonitor.scheduleAtFixedRate(safeRun(this::checkInactiveSubscriptions), |
| subscriptionExpiryCheckIntervalInSeconds, subscriptionExpiryCheckIntervalInSeconds, TimeUnit.SECONDS); |
| } |
| } |
| |
| protected void startMessageExpiryMonitor() { |
| int interval = pulsar().getConfiguration().getMessageExpiryCheckIntervalInMinutes(); |
| messageExpiryMonitor.scheduleAtFixedRate(safeRun(this::checkMessageExpiry), interval, interval, |
| TimeUnit.MINUTES); |
| } |
| |
| protected void startCompactionMonitor() { |
| int interval = pulsar().getConfiguration().getBrokerServiceCompactionMonitorIntervalInSeconds(); |
| if (interval > 0) { |
| compactionMonitor.scheduleAtFixedRate(safeRun(() -> checkCompaction()), |
| interval, interval, TimeUnit.SECONDS); |
| } |
| } |
| |
| protected void startBacklogQuotaChecker() { |
| if (pulsar().getConfiguration().isBacklogQuotaCheckEnabled()) { |
| final int interval = pulsar().getConfiguration().getBacklogQuotaCheckIntervalInSeconds(); |
| log.info("Scheduling a thread to check backlog quota after [{}] seconds in background", interval); |
| backlogQuotaChecker.scheduleAtFixedRate(safeRun(this::monitorBacklogQuota), interval, interval, |
| TimeUnit.SECONDS); |
| } else { |
| log.info("Backlog quota check monitoring is disabled"); |
| } |
| |
| } |
| |
| /** |
| * Schedules and monitors publish-throttling for all owned topics that has publish-throttling configured. It also |
| * disables and shutdowns publish-rate-limiter monitor task if broker disables it. |
| */ |
| public synchronized void setupTopicPublishRateLimiterMonitor() { |
| // set topic PublishRateLimiterMonitor |
| long topicTickTimeMs = pulsar().getConfiguration().getTopicPublisherThrottlingTickTimeMillis(); |
| if (topicTickTimeMs > 0) { |
| if (this.topicPublishRateLimiterMonitor == null) { |
| this.topicPublishRateLimiterMonitor = Executors.newSingleThreadScheduledExecutor( |
| new DefaultThreadFactory("pulsar-topic-publish-rate-limiter-monitor")); |
| if (topicTickTimeMs > 0) { |
| // schedule task that sums up publish-rate across all cnx on a topic |
| topicPublishRateLimiterMonitor.scheduleAtFixedRate(safeRun(() -> checkTopicPublishThrottlingRate()), |
| topicTickTimeMs, topicTickTimeMs, TimeUnit.MILLISECONDS); |
| // schedule task that refreshes rate-limiting bucket |
| topicPublishRateLimiterMonitor.scheduleAtFixedRate(safeRun(() -> refreshTopicPublishRate()), 1, 1, |
| TimeUnit.SECONDS); |
| } |
| } |
| } else { |
| // disable publish-throttling for all topics |
| if (this.topicPublishRateLimiterMonitor != null) { |
| try { |
| this.topicPublishRateLimiterMonitor.awaitTermination(30, TimeUnit.SECONDS); |
| } catch (InterruptedException e) { |
| log.warn("failed to shutdown topicPublishRateLimiterMonitor", e); |
| } |
| // make sure topics are not being throttled |
| refreshTopicPublishRate(); |
| this.topicPublishRateLimiterMonitor = null; |
| } |
| } |
| } |
| |
| /** |
| * Schedules and monitors publish-throttling for broker that has publish-throttling configured. It also |
| * disables and shutdowns publish-rate-limiter monitor for broker task if broker disables it. |
| */ |
| public synchronized void setupBrokerPublishRateLimiterMonitor() { |
| // set broker PublishRateLimiterMonitor |
| long brokerTickTimeMs = pulsar().getConfiguration().getBrokerPublisherThrottlingTickTimeMillis(); |
| if (brokerTickTimeMs > 0) { |
| if (this.brokerPublishRateLimiterMonitor == null) { |
| this.brokerPublishRateLimiterMonitor = Executors.newSingleThreadScheduledExecutor( |
| new DefaultThreadFactory("pulsar-broker-publish-rate-limiter-monitor")); |
| if (brokerTickTimeMs > 0) { |
| // schedule task that sums up publish-rate across all cnx on a topic, |
| // and check the rate limit exceeded or not. |
| brokerPublishRateLimiterMonitor.scheduleAtFixedRate( |
| safeRun(() -> checkBrokerPublishThrottlingRate()), |
| brokerTickTimeMs, |
| brokerTickTimeMs, |
| TimeUnit.MILLISECONDS); |
| // schedule task that refreshes rate-limiting bucket |
| brokerPublishRateLimiterMonitor.scheduleAtFixedRate( |
| safeRun(() -> refreshBrokerPublishRate()), |
| 1, |
| 1, |
| TimeUnit.SECONDS); |
| } |
| } |
| } else { |
| // disable publish-throttling for broker. |
| if (this.brokerPublishRateLimiterMonitor != null) { |
| try { |
| this.brokerPublishRateLimiterMonitor.awaitTermination(30, TimeUnit.SECONDS); |
| } catch (InterruptedException e) { |
| log.warn("failed to shutdown brokerPublishRateLimiterMonitor", e); |
| } |
| // make sure topics are not being throttled |
| refreshBrokerPublishRate(); |
| this.brokerPublishRateLimiterMonitor = null; |
| } |
| } |
| } |
| |
| @Override |
| public void close() throws IOException { |
| log.info("Shutting down Pulsar Broker service"); |
| |
| if (pulsar.getConfigurationCache() != null) { |
| pulsar.getConfigurationCache().policiesCache().unregisterListener(this); |
| } |
| |
| // unloads all namespaces gracefully without disrupting mutually |
| unloadNamespaceBundlesGracefully(); |
| |
| // close replication clients |
| replicationClients.forEach((cluster, client) -> { |
| try { |
| client.shutdown(); |
| } catch (PulsarClientException e) { |
| log.warn("Error shutting down repl client for cluster {}", cluster, e); |
| } |
| }); |
| |
| // close replication admins |
| clusterAdmins.forEach((cluster, admin) -> { |
| try { |
| admin.close(); |
| } catch (Exception e) { |
| log.warn("Error shutting down repl admin for cluster {}", cluster, e); |
| } |
| }); |
| |
| if (listenChannel != null) { |
| listenChannel.close(); |
| } |
| |
| if (listenChannelTls != null) { |
| listenChannelTls.close(); |
| } |
| |
| acceptorGroup.shutdownGracefully(); |
| workerGroup.shutdownGracefully(); |
| statsUpdater.shutdown(); |
| inactivityMonitor.shutdown(); |
| messageExpiryMonitor.shutdown(); |
| compactionMonitor.shutdown(); |
| backlogQuotaChecker.shutdown(); |
| authenticationService.close(); |
| pulsarStats.close(); |
| ClientCnxnAspect.removeListener(zkStatsListener); |
| ClientCnxnAspect.registerExecutor(null); |
| topicOrderedExecutor.shutdown(); |
| delayedDeliveryTrackerFactory.close(); |
| if (topicPublishRateLimiterMonitor != null) { |
| topicPublishRateLimiterMonitor.shutdown(); |
| } |
| if (brokerPublishRateLimiterMonitor != null) { |
| brokerPublishRateLimiterMonitor.shutdown(); |
| } |
| |
| log.info("Broker service completely shut down"); |
| } |
| |
| /** |
| * It unloads all owned namespacebundles gracefully. |
| * <ul> |
| * <li>First it makes current broker unavailable and isolates from the clusters so, it will not serve any new |
| * requests.</li> |
| * <li>Second it starts unloading namespace bundle one by one without closing the connection in order to avoid |
| * disruption for other namespacebundles which are sharing the same connection from the same client.</li> |
| * <ul> |
| * |
| */ |
| public void unloadNamespaceBundlesGracefully() { |
| try { |
| // make broker-node unavailable from the cluster |
| if (pulsar.getLoadManager() != null && pulsar.getLoadManager().get() != null) { |
| try { |
| pulsar.getLoadManager().get().disableBroker(); |
| } catch (PulsarServerException.NotFoundException ne) { |
| log.warn("Broker load-manager znode doesn't exist ", ne); |
| // still continue and release bundle ownership as broker's registration node doesn't exist. |
| } |
| } |
| |
| // unload all namespace-bundles gracefully |
| long closeTopicsStartTime = System.nanoTime(); |
| Set<NamespaceBundle> serviceUnits = pulsar.getNamespaceService().getOwnedServiceUnits(); |
| serviceUnits.forEach(su -> { |
| if (su instanceof NamespaceBundle) { |
| try { |
| pulsar.getNamespaceService().unloadNamespaceBundle((NamespaceBundle) su, 1, TimeUnit.MINUTES); |
| } catch (Exception e) { |
| log.warn("Failed to unload namespace bundle {}", su, e); |
| } |
| } |
| }); |
| |
| double closeTopicsTimeSeconds = TimeUnit.NANOSECONDS.toMillis((System.nanoTime() - closeTopicsStartTime)) |
| / 1000.0; |
| log.info("Unloading {} namespace-bundles completed in {} seconds", serviceUnits.size(), |
| closeTopicsTimeSeconds); |
| } catch (Exception e) { |
| log.error("Failed to disable broker from loadbalancer list {}", e.getMessage(), e); |
| } |
| } |
| |
| public CompletableFuture<Optional<Topic>> getTopicIfExists(final String topic) { |
| return getTopic(topic, false /* createIfMissing */); |
| } |
| |
| public CompletableFuture<Topic> getOrCreateTopic(final String topic) { |
| return getTopic(topic, pulsar.getConfiguration().isAllowAutoTopicCreation()).thenApply(Optional::get); |
| } |
| |
| public CompletableFuture<Optional<Topic>> getTopic(final String topic, boolean createIfMissing) { |
| try { |
| CompletableFuture<Optional<Topic>> topicFuture = topics.get(topic); |
| if (topicFuture != null) { |
| if (topicFuture.isCompletedExceptionally() |
| || (topicFuture.isDone() && !topicFuture.getNow(Optional.empty()).isPresent())) { |
| // Exceptional topics should be recreated. |
| topics.remove(topic, topicFuture); |
| } else { |
| return topicFuture; |
| } |
| } |
| final boolean isPersistentTopic = TopicName.get(topic).getDomain().equals(TopicDomain.persistent); |
| return topics.computeIfAbsent(topic, (topicName) -> { |
| return isPersistentTopic ? this.loadOrCreatePersistentTopic(topicName, createIfMissing) |
| : createNonPersistentTopic(topicName); |
| }); |
| } catch (IllegalArgumentException e) { |
| log.warn("[{}] Illegalargument exception when loading topic", topic, e); |
| return failedFuture(e); |
| } catch (RuntimeException e) { |
| Throwable cause = e.getCause(); |
| if (cause instanceof ServiceUnitNotReadyException) { |
| log.warn("[{}] Service unit is not ready when loading the topic", topic); |
| } else { |
| log.warn("[{}] Unexpected exception when loading topic: {}", topic, e.getMessage(), e); |
| } |
| |
| return failedFuture(cause); |
| } |
| } |
| |
| private CompletableFuture<Optional<Topic>> createNonPersistentTopic(String topic) { |
| CompletableFuture<Optional<Topic>> topicFuture = new CompletableFuture<>(); |
| |
| if (!pulsar.getConfiguration().isEnableNonPersistentTopics()) { |
| if (log.isDebugEnabled()) { |
| log.debug("Broker is unable to load non-persistent topic {}", topic); |
| } |
| topicFuture.completeExceptionally( |
| new NotAllowedException("Broker is not unable to load non-persistent topic")); |
| return topicFuture; |
| } |
| final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); |
| NonPersistentTopic nonPersistentTopic = new NonPersistentTopic(topic, this); |
| CompletableFuture<Void> replicationFuture = nonPersistentTopic.checkReplication(); |
| replicationFuture.thenRun(() -> { |
| log.info("Created topic {}", nonPersistentTopic); |
| long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) - topicCreateTimeMs; |
| pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs); |
| addTopicToStatsMaps(TopicName.get(topic), nonPersistentTopic); |
| topicFuture.complete(Optional.of(nonPersistentTopic)); |
| }); |
| replicationFuture.exceptionally((ex) -> { |
| log.warn("Replication check failed. Removing topic from topics list {}, {}", topic, ex); |
| nonPersistentTopic.stopReplProducers().whenComplete((v, exception) -> { |
| pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); |
| topicFuture.completeExceptionally(ex); |
| }); |
| |
| return null; |
| }); |
| |
| return topicFuture; |
| } |
| |
| private static <T> CompletableFuture<T> failedFuture(Throwable t) { |
| CompletableFuture<T> future = new CompletableFuture<>(); |
| future.completeExceptionally(t); |
| return future; |
| } |
| |
| public PulsarClient getReplicationClient(String cluster) { |
| PulsarClient client = replicationClients.get(cluster); |
| if (client != null) { |
| return client; |
| } |
| |
| return replicationClients.computeIfAbsent(cluster, key -> { |
| try { |
| String path = PulsarWebResource.path("clusters", cluster); |
| ClusterData data = this.pulsar.getConfigurationCache().clustersCache().get(path) |
| .orElseThrow(() -> new KeeperException.NoNodeException(path)); |
| ClientBuilder clientBuilder = PulsarClient.builder() |
| .enableTcpNoDelay(false) |
| .connectionsPerBroker(pulsar.getConfiguration().getReplicationConnectionsPerBroker()) |
| .statsInterval(0, TimeUnit.SECONDS); |
| if (pulsar.getConfiguration().isAuthenticationEnabled()) { |
| clientBuilder.authentication(pulsar.getConfiguration().getBrokerClientAuthenticationPlugin(), |
| pulsar.getConfiguration().getBrokerClientAuthenticationParameters()); |
| } |
| if (pulsar.getConfiguration().isBrokerClientTlsEnabled()) { |
| clientBuilder |
| .serviceUrl(isNotBlank(data.getBrokerServiceUrlTls()) ? data.getBrokerServiceUrlTls() |
| : data.getServiceUrlTls()) |
| .enableTls(true) |
| .tlsTrustCertsFilePath(pulsar.getConfiguration().getBrokerClientTrustCertsFilePath()) |
| .allowTlsInsecureConnection(pulsar.getConfiguration().isTlsAllowInsecureConnection()); |
| } else { |
| clientBuilder.serviceUrl( |
| isNotBlank(data.getBrokerServiceUrl()) ? data.getBrokerServiceUrl() : data.getServiceUrl()); |
| } |
| |
| // Share all the IO threads across broker and client connections |
| ClientConfigurationData conf = ((ClientBuilderImpl) clientBuilder).getClientConfigurationData(); |
| return new PulsarClientImpl(conf, workerGroup); |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| }); |
| } |
| |
| public PulsarAdmin getClusterPulsarAdmin(String cluster) { |
| PulsarAdmin admin = clusterAdmins.get(cluster); |
| if (admin != null) { |
| return admin; |
| } |
| return clusterAdmins.computeIfAbsent(cluster, key -> { |
| try { |
| String path = PulsarWebResource.path("clusters", cluster); |
| ClusterData data = this.pulsar.getConfigurationCache().clustersCache().get(path) |
| .orElseThrow(() -> new KeeperException.NoNodeException(path)); |
| |
| ServiceConfiguration conf = pulsar.getConfig(); |
| |
| boolean isTlsUrl = conf.isBrokerClientTlsEnabled() && isNotBlank(data.getServiceUrlTls()); |
| String adminApiUrl = isTlsUrl ? data.getServiceUrlTls() : data.getServiceUrl(); |
| PulsarAdminBuilder builder = PulsarAdmin.builder().serviceHttpUrl(adminApiUrl) // |
| .authentication( // |
| conf.getBrokerClientAuthenticationPlugin(), // |
| conf.getBrokerClientAuthenticationParameters()); |
| |
| if (isTlsUrl) { |
| builder.tlsTrustCertsFilePath(conf.getBrokerClientTrustCertsFilePath()); |
| builder.allowTlsInsecureConnection(conf.isTlsAllowInsecureConnection()); |
| } |
| |
| // most of the admin request requires to make zk-call so, keep the max read-timeout based on |
| // zk-operation timeout |
| builder.readTimeout(conf.getZooKeeperOperationTimeoutSeconds(), TimeUnit.SECONDS); |
| |
| PulsarAdmin adminClient = builder.build(); |
| log.info("created admin with url {} ", adminApiUrl); |
| return adminClient; |
| } catch (Exception e) { |
| throw new RuntimeException(e); |
| } |
| }); |
| } |
| |
| /** |
| * It creates a topic async and returns CompletableFuture. It also throttles down configured max-concurrent topic |
| * loading and puts them into queue once in-process topics are created. |
| * |
| * @param topic persistent-topic name |
| * @return CompletableFuture<Topic> |
| * @throws RuntimeException |
| */ |
| protected CompletableFuture<Optional<Topic>> loadOrCreatePersistentTopic(final String topic, |
| boolean createIfMissing) throws RuntimeException { |
| checkTopicNsOwnership(topic); |
| |
| final CompletableFuture<Optional<Topic>> topicFuture = new CompletableFuture<>(); |
| if (!pulsar.getConfiguration().isEnablePersistentTopics()) { |
| if (log.isDebugEnabled()) { |
| log.debug("Broker is unable to load persistent topic {}", topic); |
| } |
| topicFuture.completeExceptionally(new NotAllowedException("Broker is not unable to load persistent topic")); |
| return topicFuture; |
| } |
| |
| final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get(); |
| |
| if (topicLoadSemaphore.tryAcquire()) { |
| createPersistentTopic(topic, createIfMissing, topicFuture); |
| topicFuture.handle((persistentTopic, ex) -> { |
| // release permit and process pending topic |
| topicLoadSemaphore.release(); |
| createPendingLoadTopic(); |
| return null; |
| }); |
| } else { |
| pendingTopicLoadingQueue.add(new ImmutablePair<String, CompletableFuture<Optional<Topic>>>(topic, topicFuture)); |
| if (log.isDebugEnabled()) { |
| log.debug("topic-loading for {} added into pending queue", topic); |
| } |
| } |
| return topicFuture; |
| } |
| |
| private void createPersistentTopic(final String topic, boolean createIfMissing, |
| CompletableFuture<Optional<Topic>> topicFuture) { |
| |
| final long topicCreateTimeMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); |
| TopicName topicName = TopicName.get(topic); |
| if (!pulsar.getNamespaceService().isServiceUnitActive(topicName)) { |
| // namespace is being unloaded |
| String msg = String.format("Namespace is being unloaded, cannot add topic %s", topic); |
| log.warn(msg); |
| pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); |
| topicFuture.completeExceptionally(new ServiceUnitNotReadyException(msg)); |
| return; |
| } |
| |
| getManagedLedgerConfig(topicName).thenAccept(managedLedgerConfig -> { |
| managedLedgerConfig.setCreateIfMissing(createIfMissing); |
| |
| // Once we have the configuration, we can proceed with the async open operation |
| managedLedgerFactory.asyncOpen(topicName.getPersistenceNamingEncoding(), managedLedgerConfig, |
| new OpenLedgerCallback() { |
| @Override |
| public void openLedgerComplete(ManagedLedger ledger, Object ctx) { |
| try { |
| PersistentTopic persistentTopic = new PersistentTopic(topic, ledger, |
| BrokerService.this); |
| CompletableFuture<Void> replicationFuture = persistentTopic.checkReplication(); |
| replicationFuture.thenCompose(v -> { |
| // Also check dedup status |
| return persistentTopic.checkDeduplicationStatus(); |
| }).thenRun(() -> { |
| log.info("Created topic {} - dedup is {}", topic, |
| persistentTopic.isDeduplicationEnabled() ? "enabled" : "disabled"); |
| long topicLoadLatencyMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()) |
| - topicCreateTimeMs; |
| pulsarStats.recordTopicLoadTimeValue(topic, topicLoadLatencyMs); |
| addTopicToStatsMaps(topicName, persistentTopic); |
| topicFuture.complete(Optional.of(persistentTopic)); |
| }).exceptionally((ex) -> { |
| log.warn( |
| "Replication or dedup check failed. Removing topic from topics list {}, {}", |
| topic, ex); |
| persistentTopic.stopReplProducers().whenComplete((v, exception) -> { |
| topics.remove(topic, topicFuture); |
| topicFuture.completeExceptionally(ex); |
| }); |
| |
| return null; |
| }); |
| } catch (NamingException e) { |
| log.warn("Failed to create topic {}-{}", topic, e.getMessage()); |
| pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); |
| topicFuture.completeExceptionally(e); |
| } |
| } |
| |
| @Override |
| public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { |
| if (!createIfMissing && exception instanceof ManagedLedgerNotFoundException) { |
| // We were just trying to load a topic and the topic doesn't exist |
| topicFuture.complete(Optional.empty()); |
| } else { |
| log.warn("Failed to create topic {}", topic, exception); |
| pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); |
| topicFuture.completeExceptionally(new PersistenceException(exception)); |
| } |
| } |
| }, null); |
| |
| }).exceptionally((exception) -> { |
| log.warn("[{}] Failed to get topic configuration: {}", topic, exception.getMessage(), exception); |
| // remove topic from topics-map in different thread to avoid possible deadlock if |
| // createPersistentTopic-thread only tries to handle this future-result |
| pulsar.getExecutor().execute(() -> topics.remove(topic, topicFuture)); |
| topicFuture.completeExceptionally(exception); |
| return null; |
| }); |
| } |
| |
| public CompletableFuture<ManagedLedgerConfig> getManagedLedgerConfig(TopicName topicName) { |
| CompletableFuture<ManagedLedgerConfig> future = new CompletableFuture<>(); |
| // Execute in background thread, since getting the policies might block if the z-node wasn't already cached |
| pulsar.getOrderedExecutor().executeOrdered(topicName, safeRun(() -> { |
| NamespaceName namespace = topicName.getNamespaceObject(); |
| ServiceConfiguration serviceConfig = pulsar.getConfiguration(); |
| |
| // Get persistence policy for this topic |
| Optional<Policies> policies = Optional.empty(); |
| Optional<LocalPolicies> localPolicies = Optional.empty(); |
| try { |
| policies = pulsar |
| .getConfigurationCache().policiesCache().get(AdminResource.path(POLICIES, |
| namespace.toString())); |
| String path = joinPath(LOCAL_POLICIES_ROOT, topicName.getNamespaceObject().toString()); |
| localPolicies = pulsar().getLocalZkCacheService().policiesCache().get(path); |
| } catch (Throwable t) { |
| // Ignoring since if we don't have policies, we fallback on the default |
| log.warn("Got exception when reading persistence policy for {}: {}", topicName, t.getMessage(), t); |
| future.completeExceptionally(t); |
| return; |
| } |
| |
| PersistencePolicies persistencePolicies = policies.map(p -> p.persistence).orElseGet( |
| () -> new PersistencePolicies(serviceConfig.getManagedLedgerDefaultEnsembleSize(), |
| serviceConfig.getManagedLedgerDefaultWriteQuorum(), |
| serviceConfig.getManagedLedgerDefaultAckQuorum(), |
| serviceConfig.getManagedLedgerDefaultMarkDeleteRateLimit())); |
| |
| RetentionPolicies retentionPolicies = policies.map(p -> p.retention_policies).orElseGet( |
| () -> new RetentionPolicies(serviceConfig.getDefaultRetentionTimeInMinutes(), |
| serviceConfig.getDefaultRetentionSizeInMB()) |
| ); |
| |
| ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig(); |
| managedLedgerConfig.setEnsembleSize(persistencePolicies.getBookkeeperEnsemble()); |
| managedLedgerConfig.setWriteQuorumSize(persistencePolicies.getBookkeeperWriteQuorum()); |
| managedLedgerConfig.setAckQuorumSize(persistencePolicies.getBookkeeperAckQuorum()); |
| if (localPolicies.isPresent() && localPolicies.get().bookieAffinityGroup != null) { |
| managedLedgerConfig |
| .setBookKeeperEnsemblePlacementPolicyClassName(ZkIsolatedBookieEnsemblePlacementPolicy.class); |
| Map<String, Object> properties = Maps.newHashMap(); |
| properties.put(ZkIsolatedBookieEnsemblePlacementPolicy.ISOLATION_BOOKIE_GROUPS, |
| localPolicies.get().bookieAffinityGroup.bookkeeperAffinityGroupPrimary); |
| properties.put(ZkIsolatedBookieEnsemblePlacementPolicy.SECONDARY_ISOLATION_BOOKIE_GROUPS, |
| localPolicies.get().bookieAffinityGroup.bookkeeperAffinityGroupSecondary); |
| managedLedgerConfig.setBookKeeperEnsemblePlacementPolicyProperties(properties); |
| } |
| managedLedgerConfig.setThrottleMarkDelete(persistencePolicies.getManagedLedgerMaxMarkDeleteRate()); |
| managedLedgerConfig.setDigestType(serviceConfig.getManagedLedgerDigestType()); |
| |
| managedLedgerConfig.setMaxUnackedRangesToPersist(serviceConfig.getManagedLedgerMaxUnackedRangesToPersist()); |
| managedLedgerConfig.setMaxUnackedRangesToPersistInZk(serviceConfig.getManagedLedgerMaxUnackedRangesToPersistInZooKeeper()); |
| managedLedgerConfig.setMaxEntriesPerLedger(serviceConfig.getManagedLedgerMaxEntriesPerLedger()); |
| managedLedgerConfig.setMinimumRolloverTime(serviceConfig.getManagedLedgerMinLedgerRolloverTimeMinutes(), |
| TimeUnit.MINUTES); |
| managedLedgerConfig.setMaximumRolloverTime(serviceConfig.getManagedLedgerMaxLedgerRolloverTimeMinutes(), |
| TimeUnit.MINUTES); |
| managedLedgerConfig.setMaxSizePerLedgerMb(2048); |
| |
| managedLedgerConfig.setMetadataOperationsTimeoutSeconds( |
| serviceConfig.getManagedLedgerMetadataOperationsTimeoutSeconds()); |
| managedLedgerConfig.setReadEntryTimeoutSeconds(serviceConfig.getManagedLedgerReadEntryTimeoutSeconds()); |
| managedLedgerConfig.setAddEntryTimeoutSeconds(serviceConfig.getManagedLedgerAddEntryTimeoutSeconds()); |
| managedLedgerConfig.setMetadataEnsembleSize(serviceConfig.getManagedLedgerDefaultEnsembleSize()); |
| managedLedgerConfig.setUnackedRangesOpenCacheSetEnabled( |
| serviceConfig.isManagedLedgerUnackedRangesOpenCacheSetEnabled()); |
| managedLedgerConfig.setMetadataWriteQuorumSize(serviceConfig.getManagedLedgerDefaultWriteQuorum()); |
| managedLedgerConfig.setMetadataAckQuorumSize(serviceConfig.getManagedLedgerDefaultAckQuorum()); |
| managedLedgerConfig |
| .setMetadataMaxEntriesPerLedger(serviceConfig.getManagedLedgerCursorMaxEntriesPerLedger()); |
| |
| managedLedgerConfig.setLedgerRolloverTimeout(serviceConfig.getManagedLedgerCursorRolloverTimeInSeconds()); |
| managedLedgerConfig.setRetentionTime(retentionPolicies.getRetentionTimeInMinutes(), TimeUnit.MINUTES); |
| managedLedgerConfig.setRetentionSizeInMB(retentionPolicies.getRetentionSizeInMB()); |
| |
| managedLedgerConfig.setLedgerOffloader(pulsar.getManagedLedgerOffloader()); |
| policies.ifPresent(p -> { |
| long lag = serviceConfig.getManagedLedgerOffloadDeletionLagMs(); |
| if (p.offload_deletion_lag_ms != null) { |
| lag = p.offload_deletion_lag_ms; |
| } |
| long bytes = serviceConfig.getManagedLedgerOffloadAutoTriggerSizeThresholdBytes(); |
| if (p.offload_threshold != -1L) { |
| bytes = p.offload_threshold; |
| } |
| managedLedgerConfig.setOffloadLedgerDeletionLag(lag, TimeUnit.MILLISECONDS); |
| managedLedgerConfig.setOffloadAutoTriggerSizeThresholdBytes(bytes); |
| }); |
| |
| future.complete(managedLedgerConfig); |
| }, (exception) -> future.completeExceptionally(exception))); |
| |
| return future; |
| } |
| |
| private void addTopicToStatsMaps(TopicName topicName, Topic topic) { |
| try { |
| NamespaceBundle namespaceBundle = pulsar.getNamespaceService().getBundle(topicName); |
| |
| if (namespaceBundle != null) { |
| synchronized (multiLayerTopicsMap) { |
| String serviceUnit = namespaceBundle.toString(); |
| multiLayerTopicsMap // |
| .computeIfAbsent(topicName.getNamespace(), k -> new ConcurrentOpenHashMap<>()) // |
| .computeIfAbsent(serviceUnit, k -> new ConcurrentOpenHashMap<>()) // |
| .put(topicName.toString(), topic); |
| } |
| } |
| invalidateOfflineTopicStatCache(topicName); |
| } catch (Exception e) { |
| log.warn("Got exception when retrieving bundle name during create persistent topic", e); |
| } |
| } |
| |
| public void refreshTopicToStatsMaps(NamespaceBundle oldBundle) { |
| checkNotNull(oldBundle); |
| try { |
| // retrieve all topics under existing old bundle |
| List<Topic> topics = getAllTopicsFromNamespaceBundle(oldBundle.getNamespaceObject().toString(), |
| oldBundle.toString()); |
| if (!isEmpty(topics)) { |
| // add topic under new split bundles which already updated into NamespaceBundleFactory.bundleCache |
| topics.stream().forEach(t -> { |
| addTopicToStatsMaps(TopicName.get(t.getName()), t); |
| }); |
| // remove old bundle from the map |
| synchronized (multiLayerTopicsMap) { |
| multiLayerTopicsMap.get(oldBundle.getNamespaceObject().toString()).remove(oldBundle.toString()); |
| pulsarStats.invalidBundleStats(oldBundle.toString()); |
| } |
| } |
| } catch (Exception e) { |
| log.warn("Got exception while refreshing topicStats map", e); |
| } |
| } |
| |
| public PersistentOfflineTopicStats getOfflineTopicStat(TopicName topicName) { |
| return offlineTopicStatCache.get(topicName); |
| } |
| |
| public void cacheOfflineTopicStats(TopicName topicName, PersistentOfflineTopicStats offlineTopicStats) { |
| offlineTopicStatCache.put(topicName, offlineTopicStats); |
| } |
| |
| public void invalidateOfflineTopicStatCache(TopicName topicName) { |
| PersistentOfflineTopicStats removed = offlineTopicStatCache.remove(topicName); |
| if (removed != null) { |
| log.info("Removed cached offline topic stat for {} ", topicName.getPersistenceNamingEncoding()); |
| } |
| } |
| |
| /** |
| * Get a reference to a topic that is currently loaded in the broker. |
| * |
| * This method will not make the broker attempt to load the topic if it's not already. |
| */ |
| public Optional<Topic> getTopicReference(String topic) { |
| CompletableFuture<Optional<Topic>> future = topics.get(topic); |
| if (future != null && future.isDone() && !future.isCompletedExceptionally()) { |
| return future.join(); |
| } else { |
| return Optional.empty(); |
| } |
| } |
| |
| public void updateRates() { |
| synchronized (pulsarStats) { |
| pulsarStats.updateStats(multiLayerTopicsMap); |
| |
| Summary.rotateLatencyCollection(); |
| } |
| } |
| |
| public void getDimensionMetrics(Consumer<ByteBuf> consumer) { |
| pulsarStats.getDimensionMetrics(consumer); |
| } |
| |
| public List<Metrics> getTopicMetrics() { |
| return pulsarStats.getTopicMetrics(); |
| } |
| |
| public Map<String, NamespaceBundleStats> getBundleStats() { |
| return pulsarStats.getBundleStats(); |
| } |
| |
| public Semaphore getLookupRequestSemaphore() { |
| return lookupRequestSemaphore.get(); |
| } |
| |
| public void checkGC(int maxInactiveDurationInSec) { |
| forEachTopic(topic -> topic.checkGC(maxInactiveDurationInSec, |
| pulsar.getConfiguration().getBrokerDeleteInactiveTopicsMode())); |
| } |
| |
| public void checkMessageExpiry() { |
| forEachTopic(Topic::checkMessageExpiry); |
| } |
| |
| public void checkCompaction() { |
| forEachTopic((t) -> { |
| if (t instanceof PersistentTopic) { |
| ((PersistentTopic) t).checkCompaction(); |
| } |
| }); |
| } |
| |
| public void checkMessageDeduplicationInfo() { |
| forEachTopic(Topic::checkMessageDeduplicationInfo); |
| } |
| |
| public void checkInactiveSubscriptions() { |
| forEachTopic(Topic::checkInactiveSubscriptions); |
| } |
| |
| public void checkTopicPublishThrottlingRate() { |
| forEachTopic(Topic::checkTopicPublishThrottlingRate); |
| } |
| |
| private void refreshTopicPublishRate() { |
| forEachTopic(Topic::resetTopicPublishCountAndEnableReadIfRequired); |
| } |
| |
| public void checkBrokerPublishThrottlingRate() { |
| brokerPublishRateLimiter.checkPublishRate(); |
| } |
| |
| private void refreshBrokerPublishRate() { |
| boolean doneReset = brokerPublishRateLimiter.resetPublishCount(); |
| forEachTopic(topic -> topic.resetBrokerPublishCountAndEnableReadIfRequired(doneReset)); |
| } |
| |
| /** |
| * Iterates over all loaded topics in the broker |
| */ |
| public void forEachTopic(Consumer<Topic> consumer) { |
| topics.forEach((n, t) -> { |
| Optional<Topic> topic = extractTopic(t); |
| if (topic.isPresent()) { |
| consumer.accept(topic.get()); |
| } |
| }); |
| } |
| |
| public BacklogQuotaManager getBacklogQuotaManager() { |
| return this.backlogQuotaManager; |
| } |
| |
| /** |
| * |
| * @param topic |
| * needing quota enforcement check |
| * @return determine if quota enforcement needs to be done for topic |
| */ |
| public boolean isBacklogExceeded(PersistentTopic topic) { |
| TopicName topicName = TopicName.get(topic.getName()); |
| long backlogQuotaLimitInBytes = getBacklogQuotaManager().getBacklogQuotaLimit(topicName.getNamespace()); |
| if (backlogQuotaLimitInBytes < 0) { |
| return false; |
| } |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] - backlog quota limit = [{}]", topic.getName(), backlogQuotaLimitInBytes); |
| } |
| |
| // check if backlog exceeded quota |
| long storageSize = topic.getBacklogSize(); |
| if (log.isDebugEnabled()) { |
| log.debug("[{}] Storage size = [{}], limit [{}]", topic.getName(), storageSize, backlogQuotaLimitInBytes); |
| } |
| |
| return (storageSize >= backlogQuotaLimitInBytes); |
| } |
| |
| public void monitorBacklogQuota() { |
| forEachTopic(topic -> { |
| if (topic instanceof PersistentTopic) { |
| PersistentTopic persistentTopic = (PersistentTopic) topic; |
| if (isBacklogExceeded(persistentTopic)) { |
| getBacklogQuotaManager().handleExceededBacklogQuota(persistentTopic); |
| } else { |
| if (log.isDebugEnabled()) { |
| log.debug("quota not exceeded for [{}]", topic.getName()); |
| } |
| } |
| } |
| }); |
| } |
| |
| public void checkTopicNsOwnership(final String topic) throws RuntimeException { |
| TopicName topicName = TopicName.get(topic); |
| boolean ownedByThisInstance; |
| try { |
| ownedByThisInstance = pulsar.getNamespaceService().isServiceUnitOwned(topicName); |
| } catch (Exception e) { |
| log.debug("Failed to check the ownership of the topic: {}", topicName, e); |
| throw new RuntimeException(new ServerMetadataException(e)); |
| } |
| |
| if (!ownedByThisInstance) { |
| String msg = String.format("Namespace bundle for topic (%s) not served by this instance. Please redo the lookup. " |
| + "Request is denied: namespace=%s", topic, topicName.getNamespace()); |
| log.warn(msg); |
| throw new RuntimeException(new ServiceUnitNotReadyException(msg)); |
| } |
| } |
| |
| /** |
| * Unload all the topic served by the broker service under the given service unit |
| * |
| * @param serviceUnit |
| * @param closeWithoutWaitingClientDisconnect don't wait for clients to disconnect and forcefully close managed-ledger |
| * @return |
| */ |
| public CompletableFuture<Integer> unloadServiceUnit(NamespaceBundle serviceUnit, boolean closeWithoutWaitingClientDisconnect) { |
| CompletableFuture<Integer> result = new CompletableFuture<Integer>(); |
| List<CompletableFuture<Void>> closeFutures = Lists.newArrayList(); |
| topics.forEach((name, topicFuture) -> { |
| TopicName topicName = TopicName.get(name); |
| if (serviceUnit.includes(topicName)) { |
| // Topic needs to be unloaded |
| log.info("[{}] Unloading topic", topicName); |
| closeFutures.add(topicFuture |
| .thenCompose(t -> t.isPresent() ? t.get().close(closeWithoutWaitingClientDisconnect) : CompletableFuture.completedFuture(null))); |
| } |
| }); |
| CompletableFuture<Void> aggregator = FutureUtil.waitForAll(closeFutures); |
| aggregator.thenAccept(res -> result.complete(closeFutures.size())).exceptionally(ex -> { |
| result.completeExceptionally(ex); |
| return null; |
| }); |
| return result; |
| } |
| |
| public AuthorizationService getAuthorizationService() { |
| return authorizationService; |
| } |
| |
| public void removeTopicFromCache(String topic) { |
| TopicName topicName = null; |
| NamespaceBundle namespaceBundle = null; |
| try { |
| topicName = TopicName.get(topic); |
| namespaceBundle = pulsar.getNamespaceService().getBundle(topicName); |
| checkArgument(namespaceBundle instanceof NamespaceBundle); |
| |
| String bundleName = namespaceBundle.toString(); |
| String namespaceName = topicName.getNamespaceObject().toString(); |
| |
| synchronized (multiLayerTopicsMap) { |
| ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>> namespaceMap = multiLayerTopicsMap |
| .get(namespaceName); |
| ConcurrentOpenHashMap<String, Topic> bundleMap = namespaceMap.get(bundleName); |
| bundleMap.remove(topic); |
| if (bundleMap.isEmpty()) { |
| namespaceMap.remove(bundleName); |
| } |
| |
| if (namespaceMap.isEmpty()) { |
| multiLayerTopicsMap.remove(namespaceName); |
| final ClusterReplicationMetrics clusterReplicationMetrics = pulsarStats |
| .getClusterReplicationMetrics(); |
| replicationClients.forEach((cluster, client) -> { |
| clusterReplicationMetrics.remove(clusterReplicationMetrics.getKeyName(namespaceName, cluster)); |
| }); |
| } |
| } |
| } catch (Exception e) { |
| log.warn("Got exception when retrieving bundle name {} for topic {} during removeTopicFromCache", topicName, |
| namespaceBundle, e); |
| } |
| |
| topics.remove(topic); |
| } |
| |
| public int getNumberOfNamespaceBundles() { |
| this.numberOfNamespaceBundles = 0; |
| this.multiLayerTopicsMap.forEach((namespaceName, bundles) -> { |
| this.numberOfNamespaceBundles += bundles.size(); |
| }); |
| return this.numberOfNamespaceBundles; |
| } |
| |
| public ConcurrentOpenHashMap<String, CompletableFuture<Optional<Topic>>> getTopics() { |
| return topics; |
| } |
| |
| @Override |
| public void onUpdate(String path, Policies data, Stat stat) { |
| final NamespaceName namespace = NamespaceName.get(NamespaceBundleFactory.getNamespaceFromPoliciesPath(path)); |
| |
| log.info("{} updating with {}", path, data); |
| |
| topics.forEach((name, topicFuture) -> { |
| if (namespace.includes(TopicName.get(name))) { |
| // If the topic is already created, immediately apply the updated policies, otherwise once the topic is |
| // created it'll apply the policies update |
| topicFuture.thenAccept(topic -> { |
| if (log.isDebugEnabled()) { |
| log.debug("Notifying topic that policies have changed: {}", name); |
| } |
| |
| topic.ifPresent(t -> t.onPoliciesUpdate(data)); |
| }); |
| } |
| }); |
| |
| // sometimes, some brokers don't receive policies-update watch and miss to remove replication-cluster and still |
| // own the bundle. That can cause data-loss for TODO: git-issue |
| unloadDeletedReplNamespace(data, namespace); |
| } |
| |
| /** |
| * Unloads the namespace bundles if local cluster is not part of replication-cluster list into the namespace. |
| * So, broker that owns the bundle and doesn't receive the zk-watch will unload the namespace. |
| * @param data |
| * @param namespace |
| */ |
| private void unloadDeletedReplNamespace(Policies data, NamespaceName namespace) { |
| if (!namespace.isGlobal()) { |
| return; |
| } |
| final String localCluster = this.pulsar.getConfiguration().getClusterName(); |
| if (!data.replication_clusters.contains(localCluster)) { |
| try { |
| NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory() |
| .getBundles(namespace); |
| bundles.getBundles().forEach(bundle -> { |
| pulsar.getNamespaceService().isNamespaceBundleOwned(bundle).thenAccept(isExist -> { |
| if (isExist) { |
| this.pulsar().getExecutor().submit(() -> { |
| try { |
| pulsar().getAdminClient().namespaces().unloadNamespaceBundle(namespace.toString(), |
| bundle.getBundleRange()); |
| } catch (Exception e) { |
| log.error("Failed to unload namespace-bundle {}-{} that not owned by {}, {}", |
| namespace.toString(), bundle.toString(), localCluster, e.getMessage()); |
| } |
| }); |
| } |
| }); |
| }); |
| } catch (Exception e) { |
| log.error("Failed to unload locally not owned bundles {}", e.getMessage(), e); |
| } |
| } |
| } |
| |
| public PulsarService pulsar() { |
| return pulsar; |
| } |
| |
| public ScheduledExecutorService executor() { |
| return workerGroup; |
| } |
| |
| public ConcurrentOpenHashMap<String, PulsarClient> getReplicationClients() { |
| return replicationClients; |
| } |
| |
| public boolean isAuthenticationEnabled() { |
| return pulsar.getConfiguration().isAuthenticationEnabled(); |
| } |
| |
| public boolean isAuthorizationEnabled() { |
| return authorizationService != null; |
| } |
| |
| public int getKeepAliveIntervalSeconds() { |
| return keepAliveIntervalSeconds; |
| } |
| |
| public String generateUniqueProducerName() { |
| return producerNameGenerator.getNextId(); |
| } |
| |
| public Map<String, TopicStats> getTopicStats() { |
| HashMap<String, TopicStats> stats = new HashMap<>(); |
| |
| forEachTopic(topic -> stats.put(topic.getName(), topic.getStats())); |
| |
| return stats; |
| } |
| |
| public AuthenticationService getAuthenticationService() { |
| return authenticationService; |
| } |
| |
| public List<Topic> getAllTopicsFromNamespaceBundle(String namespace, String bundle) { |
| ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>> map1 = multiLayerTopicsMap.get(namespace); |
| if (map1 == null) { |
| return Collections.emptyList(); |
| } |
| |
| ConcurrentOpenHashMap<String, Topic> map2 = map1.get(bundle); |
| if (map2 == null) { |
| return Collections.emptyList(); |
| } |
| |
| return map2.values(); |
| } |
| |
| public ZooKeeperDataCache<Map<String, String>> getDynamicConfigurationCache() { |
| return dynamicConfigurationCache; |
| } |
| |
| /** |
| * Update dynamic-ServiceConfiguration with value present into zk-configuration-map and register listeners on |
| * dynamic-ServiceConfiguration field to take appropriate action on change of zk-configuration-map. |
| */ |
| private void updateConfigurationAndRegisterListeners() { |
| // (1) Dynamic-config value validation: add validator if updated value required strict check before considering |
| // validate configured load-manager classname present into classpath |
| addDynamicConfigValidator("loadManagerClassName", (className) -> { |
| try { |
| Class.forName(className); |
| } catch (ClassNotFoundException | NoClassDefFoundError e) { |
| log.warn("Configured load-manager class {} not found {}", className, e.getMessage()); |
| return false; |
| } |
| return true; |
| }); |
| |
| // (2) update ServiceConfiguration value by reading zk-configuration-map |
| updateDynamicServiceConfiguration(); |
| |
| // (3) Listener Registration |
| // add listener on "maxConcurrentLookupRequest" value change |
| registerConfigurationListener("maxConcurrentLookupRequest", |
| (maxConcurrentLookupRequest) -> lookupRequestSemaphore.set(new Semaphore((int) maxConcurrentLookupRequest, false))); |
| // add listener on "maxConcurrentTopicLoadRequest" value change |
| registerConfigurationListener("maxConcurrentTopicLoadRequest", |
| (maxConcurrentTopicLoadRequest) -> topicLoadRequestSemaphore.set(new Semaphore((int) maxConcurrentTopicLoadRequest, false))); |
| registerConfigurationListener("loadManagerClassName", className -> { |
| try { |
| final LoadManager newLoadManager = LoadManager.create(pulsar); |
| log.info("Created load manager: {}", className); |
| pulsar.getLoadManager().get().stop(); |
| newLoadManager.start(); |
| pulsar.getLoadManager().set(newLoadManager); |
| } catch (Exception ex) { |
| log.warn("Failed to change load manager", ex); |
| } |
| }); |
| // add listener to update message-dispatch-rate in msg for topic |
| registerConfigurationListener("dispatchThrottlingRatePerTopicInMsg", (dispatchRatePerTopicInMsg) -> { |
| updateTopicMessageDispatchRate(); |
| }); |
| // add listener to update message-dispatch-rate in byte for topic |
| registerConfigurationListener("dispatchThrottlingRatePerTopicInByte", (dispatchRatePerTopicInByte) -> { |
| updateTopicMessageDispatchRate(); |
| }); |
| // add listener to update managed-ledger config to skipNonRecoverableLedgers |
| registerConfigurationListener("autoSkipNonRecoverableData", (skipNonRecoverableLedger) -> { |
| updateManagedLedgerConfig(); |
| }); |
| // add listener to update message-dispatch-rate in msg for subscription |
| registerConfigurationListener("dispatchThrottlingRatePerSubscriptionInMsg", (dispatchRatePerTopicInMsg) -> { |
| updateSubscriptionMessageDispatchRate(); |
| }); |
| // add listener to update message-dispatch-rate in byte for subscription |
| registerConfigurationListener("dispatchThrottlingRatePerSubscriptionInByte", (dispatchRatePerTopicInByte) -> { |
| updateSubscriptionMessageDispatchRate(); |
| }); |
| |
| // add listener to update message-dispatch-rate in msg for replicator |
| registerConfigurationListener("dispatchThrottlingRatePerReplicatorInMsg", (dispatchRatePerTopicInMsg) -> { |
| updateReplicatorMessageDispatchRate(); |
| }); |
| // add listener to update message-dispatch-rate in byte for replicator |
| registerConfigurationListener("dispatchThrottlingRatePerReplicatorInByte", (dispatchRatePerTopicInByte) -> { |
| updateReplicatorMessageDispatchRate(); |
| }); |
| |
| // add listener to notify broker publish-rate monitoring |
| registerConfigurationListener("brokerPublisherThrottlingTickTimeMillis", (publisherThrottlingTickTimeMillis) -> { |
| setupBrokerPublishRateLimiterMonitor(); |
| }); |
| // add listener to notify broker publish-rate dynamic config |
| registerConfigurationListener("brokerPublisherThrottlingMaxMessageRate", |
| (brokerPublisherThrottlingMaxMessageRate) -> |
| updateBrokerPublisherThrottlingMaxRate()); |
| registerConfigurationListener("brokerPublisherThrottlingMaxByteRate", |
| (brokerPublisherThrottlingMaxByteRate) -> |
| updateBrokerPublisherThrottlingMaxRate()); |
| |
| // add listener to notify topic publish-rate monitoring |
| registerConfigurationListener("topicPublisherThrottlingTickTimeMillis", (publisherThrottlingTickTimeMillis) -> { |
| setupTopicPublishRateLimiterMonitor(); |
| }); |
| |
| // add more listeners here |
| } |
| |
| private void updateBrokerPublisherThrottlingMaxRate() { |
| int currentMaxMessageRate = pulsar.getConfiguration().getBrokerPublisherThrottlingMaxMessageRate(); |
| long currentMaxByteRate = pulsar.getConfiguration().getBrokerPublisherThrottlingMaxByteRate(); |
| int brokerTickMs = pulsar.getConfiguration().getBrokerPublisherThrottlingTickTimeMillis(); |
| |
| // not enable |
| if (brokerTickMs <= 0 || (currentMaxByteRate <= 0 && currentMaxMessageRate <= 0)) { |
| if (brokerPublishRateLimiter != PublishRateLimiter.DISABLED_RATE_LIMITER) { |
| refreshBrokerPublishRate(); |
| brokerPublishRateLimiter = PublishRateLimiter.DISABLED_RATE_LIMITER; |
| } |
| return; |
| } |
| |
| final PublishRate publishRate = new PublishRate(currentMaxMessageRate, currentMaxByteRate); |
| |
| log.info("Update broker publish rate limiting {}", publishRate); |
| // lazy init broker Publish-rateLimiting monitoring if not initialized yet |
| this.setupBrokerPublishRateLimiterMonitor(); |
| if (brokerPublishRateLimiter == null |
| || brokerPublishRateLimiter == PublishRateLimiter.DISABLED_RATE_LIMITER) { |
| // create new rateLimiter if rate-limiter is disabled |
| brokerPublishRateLimiter = new PublishRateLimiterImpl(publishRate); |
| } else { |
| brokerPublishRateLimiter.update(publishRate); |
| } |
| } |
| |
| private void updateTopicMessageDispatchRate() { |
| this.pulsar().getExecutor().execute(() -> { |
| // update message-rate for each topic |
| forEachTopic(topic -> { |
| if (topic.getDispatchRateLimiter().isPresent()) { |
| topic.getDispatchRateLimiter().get().updateDispatchRate(); |
| } |
| }); |
| }); |
| } |
| |
| private void updateSubscriptionMessageDispatchRate() { |
| this.pulsar().getExecutor().submit(() -> { |
| // update message-rate for each topic subscription |
| forEachTopic(topic -> { |
| topic.getSubscriptions().forEach((subName, persistentSubscription) -> { |
| Dispatcher dispatcher = persistentSubscription.getDispatcher(); |
| if (dispatcher != null) { |
| dispatcher.getRateLimiter().ifPresent(DispatchRateLimiter::updateDispatchRate); |
| } |
| }); |
| }); |
| }); |
| } |
| |
| private void updateReplicatorMessageDispatchRate() { |
| this.pulsar().getExecutor().submit(() -> { |
| // update message-rate for each topic Replicator in Geo-replication |
| forEachTopic(topic -> |
| topic.getReplicators().forEach((name, persistentReplicator) -> { |
| if (persistentReplicator.getRateLimiter().isPresent()) { |
| persistentReplicator.getRateLimiter().get().updateDispatchRate(); |
| } |
| })); |
| }); |
| } |
| |
| private void updateManagedLedgerConfig() { |
| this.pulsar().getExecutor().execute(() -> { |
| // update managed-ledger config of each topic |
| |
| forEachTopic(topic -> { |
| try { |
| if (topic instanceof PersistentTopic) { |
| PersistentTopic persistentTopic = (PersistentTopic) topic; |
| // update skipNonRecoverableLedger configuration |
| persistentTopic.getManagedLedger().getConfig().setAutoSkipNonRecoverableData( |
| pulsar.getConfiguration().isAutoSkipNonRecoverableData()); |
| } |
| } catch (Exception e) { |
| log.warn("[{}] failed to update managed-ledger config", topic.getName(), e); |
| } |
| }); |
| }); |
| } |
| |
| /** |
| * Allows a listener to listen on update of {@link ServiceConfiguration} change, so listener can take appropriate |
| * action if any specific config-field value has been changed. |
| * </p> |
| * On notification, listener should first check if config value has been changed and after taking appropriate |
| * action, listener should update config value with new value if it has been changed (so, next time listener can |
| * compare values on configMap change). |
| * @param <T> |
| * |
| * @param configKey |
| * : configuration field name |
| * @param listener |
| * : listener which takes appropriate action on config-value change |
| */ |
| public <T> void registerConfigurationListener(String configKey, Consumer<T> listener) { |
| validateConfigKey(configKey); |
| configRegisteredListeners.put(configKey, listener); |
| } |
| |
| private void addDynamicConfigValidator(String key, Predicate<String> validator) { |
| validateConfigKey(key); |
| if (dynamicConfigurationMap.containsKey(key)) { |
| dynamicConfigurationMap.get(key).validator = validator; |
| } |
| } |
| |
| private void validateConfigKey(String key) { |
| try { |
| ServiceConfiguration.class.getDeclaredField(key); |
| } catch (Exception e) { |
| log.error("ServiceConfiguration key {} not found {}", key, e.getMessage()); |
| throw new IllegalArgumentException("Invalid service config " + key, e); |
| } |
| } |
| |
| /** |
| * Updates pulsar.ServiceConfiguration's dynamic field with value persistent into zk-dynamic path. It also validates |
| * dynamic-value before updating it and throws {@code IllegalArgumentException} if validation fails |
| */ |
| private void updateDynamicServiceConfiguration() { |
| |
| Optional<Map<String, String>> configCache = Optional.empty(); |
| try { |
| // create dynamic-config znode if not present |
| if (pulsar.getZkClient().exists(BROKER_SERVICE_CONFIGURATION_PATH, false) == null) { |
| try { |
| byte[] data = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(Maps.newHashMap()); |
| ZkUtils.createFullPathOptimistic(pulsar.getZkClient(), BROKER_SERVICE_CONFIGURATION_PATH, data, |
| Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); |
| } catch (KeeperException.NodeExistsException e) { |
| // Ok |
| } |
| } |
| configCache = dynamicConfigurationCache.get(BROKER_SERVICE_CONFIGURATION_PATH); |
| } catch (Exception e) { |
| log.warn("Failed to read zookeeper path [{}]:", BROKER_SERVICE_CONFIGURATION_PATH, e); |
| } |
| if (configCache.isPresent()) { |
| configCache.get().forEach((key, value) -> { |
| // validate field |
| if (dynamicConfigurationMap.containsKey(key) && dynamicConfigurationMap.get(key).validator != null) { |
| if (!dynamicConfigurationMap.get(key).validator.test(value)) { |
| log.error("Failed to validate dynamic config {} with value {}", key, value); |
| throw new IllegalArgumentException( |
| String.format("Failed to validate dynamic-config %s/%s", key, value)); |
| } |
| } |
| // update field value |
| try { |
| Field field = ServiceConfiguration.class.getDeclaredField(key); |
| if (field != null && field.isAnnotationPresent(FieldContext.class)) { |
| field.setAccessible(true); |
| field.set(pulsar().getConfiguration(), FieldParser.value(value, field)); |
| log.info("Successfully updated {}/{}", key, value); |
| } |
| } catch (Exception e) { |
| log.warn("Failed to update service configuration {}/{}, {}", key, value, e.getMessage()); |
| } |
| }); |
| } |
| // register a listener: it updates field value and triggers appropriate registered field-listener only if |
| // field's value has been changed so, registered doesn't have to update field value in ServiceConfiguration |
| dynamicConfigurationCache.registerListener(new ZooKeeperCacheListener<Map<String, String>>() { |
| @SuppressWarnings("unchecked") |
| @Override |
| public void onUpdate(String path, Map<String, String> data, Stat stat) { |
| if (BROKER_SERVICE_CONFIGURATION_PATH.equalsIgnoreCase(path) && data != null) { |
| data.forEach((configKey, value) -> { |
| Field configField = dynamicConfigurationMap.get(configKey).field; |
| Object newValue = FieldParser.value(data.get(configKey), configField); |
| if (configField != null) { |
| Consumer listener = configRegisteredListeners.get(configKey); |
| try { |
| Object existingValue = configField.get(pulsar.getConfiguration()); |
| configField.set(pulsar.getConfiguration(), newValue); |
| log.info("Successfully updated configuration {}/{}", configKey, |
| data.get(configKey)); |
| if (listener != null && !existingValue.equals(newValue)) { |
| listener.accept(newValue); |
| } |
| } catch (Exception e) { |
| log.error("Failed to update config {}/{}", configKey, newValue); |
| } |
| } else { |
| log.error("Found non-dynamic field in dynamicConfigMap {}/{}", configKey, newValue); |
| } |
| }); |
| } |
| } |
| }); |
| |
| } |
| |
| public DelayedDeliveryTrackerFactory getDelayedDeliveryTrackerFactory() { |
| return delayedDeliveryTrackerFactory; |
| } |
| |
| public static List<String> getDynamicConfiguration() { |
| return dynamicConfigurationMap.keys(); |
| } |
| |
| public Map<String, String> getRuntimeConfiguration() { |
| Map<String, String> configMap = Maps.newHashMap(); |
| ConcurrentOpenHashMap<String, Object> runtimeConfigurationMap = getRuntimeConfigurationMap(); |
| runtimeConfigurationMap.forEach((key, value) -> { |
| configMap.put(key, String.valueOf(value)); |
| }); |
| return configMap; |
| } |
| |
| public static boolean isDynamicConfiguration(String key) { |
| return dynamicConfigurationMap.containsKey(key); |
| } |
| |
| public static boolean validateDynamicConfiguration(String key, String value) { |
| if (dynamicConfigurationMap.containsKey(key) && dynamicConfigurationMap.get(key).validator != null) { |
| return dynamicConfigurationMap.get(key).validator.test(value); |
| } |
| return true; |
| } |
| |
| private static ConcurrentOpenHashMap<String, ConfigField> prepareDynamicConfigurationMap() { |
| ConcurrentOpenHashMap<String, ConfigField> dynamicConfigurationMap = new ConcurrentOpenHashMap<>(); |
| for (Field field : ServiceConfiguration.class.getDeclaredFields()) { |
| if (field != null && field.isAnnotationPresent(FieldContext.class)) { |
| field.setAccessible(true); |
| if (((FieldContext) field.getAnnotation(FieldContext.class)).dynamic()) { |
| dynamicConfigurationMap.put(field.getName(), new ConfigField(field)); |
| } |
| } |
| } |
| return dynamicConfigurationMap; |
| } |
| |
| private ConcurrentOpenHashMap<String, Object> getRuntimeConfigurationMap() { |
| ConcurrentOpenHashMap<String, Object> runtimeConfigurationMap = new ConcurrentOpenHashMap<>(); |
| for (Field field : ServiceConfiguration.class.getDeclaredFields()) { |
| if (field != null && field.isAnnotationPresent(FieldContext.class)) { |
| field.setAccessible(true); |
| try { |
| Object configValue = field.get(pulsar.getConfiguration()); |
| runtimeConfigurationMap.put(field.getName(), configValue == null ? "" : configValue); |
| } catch (Exception e) { |
| log.error("Failed to get value of field {}, {}", field.getName(), e.getMessage()); |
| } |
| } |
| } |
| return runtimeConfigurationMap; |
| } |
| |
| /** |
| * Create pending topic and on completion it picks the next one until processes all topics in |
| * {@link #pendingTopicLoadingQueue}.<br/> |
| * It also tries to acquire {@link #topicLoadRequestSemaphore} so throttle down newly incoming topics and release |
| * permit if it was successful to acquire it. |
| */ |
| private void createPendingLoadTopic() { |
| Pair<String, CompletableFuture<Optional<Topic>>> pendingTopic = pendingTopicLoadingQueue.poll(); |
| if (pendingTopic == null) { |
| return; |
| } |
| |
| final String topic = pendingTopic.getLeft(); |
| try { |
| checkTopicNsOwnership(topic); |
| CompletableFuture<Optional<Topic>> pendingFuture = pendingTopic.getRight(); |
| final Semaphore topicLoadSemaphore = topicLoadRequestSemaphore.get(); |
| final boolean acquiredPermit = topicLoadSemaphore.tryAcquire(); |
| createPersistentTopic(topic, true, pendingFuture); |
| pendingFuture.handle((persistentTopic, ex) -> { |
| // release permit and process next pending topic |
| if (acquiredPermit) { |
| topicLoadSemaphore.release(); |
| } |
| createPendingLoadTopic(); |
| return null; |
| }); |
| } catch (RuntimeException re) { |
| log.error("Failed to create pending topic {} {}", topic, re); |
| pendingTopic.getRight().completeExceptionally(re.getCause()); |
| // schedule to process next pending topic |
| inactivityMonitor.schedule(() -> createPendingLoadTopic(), 100, TimeUnit.MILLISECONDS); |
| } |
| |
| } |
| |
| public CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadataCheckAllowAutoCreationAsync(TopicName topicName) { |
| return pulsar.getNamespaceService().checkTopicExists(topicName) |
| .thenCompose(topicExists -> { |
| return fetchPartitionedTopicMetadataAsync(topicName) |
| .thenCompose(metadata -> { |
| // If topic is already exist, creating partitioned topic is not allowed. |
| if (metadata.partitions == 0 |
| && !topicExists |
| && pulsar.getConfiguration().isAllowAutoTopicCreation() |
| && pulsar.getConfiguration().isDefaultTopicTypePartitioned()) { |
| return pulsar.getBrokerService().createDefaultPartitionedTopicAsync(topicName); |
| } else { |
| return CompletableFuture.completedFuture(metadata); |
| } |
| }); |
| }); |
| } |
| |
| @SuppressWarnings("deprecation") |
| private CompletableFuture<PartitionedTopicMetadata> createDefaultPartitionedTopicAsync(TopicName topicName) { |
| int defaultNumPartitions = pulsar.getConfiguration().getDefaultNumPartitions(); |
| checkArgument(defaultNumPartitions > 0, "Default number of partitions should be more than 0"); |
| |
| PartitionedTopicMetadata configMetadata = new PartitionedTopicMetadata(defaultNumPartitions); |
| CompletableFuture<PartitionedTopicMetadata> partitionedTopicFuture = new CompletableFuture<>(); |
| |
| try { |
| byte[] content = ObjectMapperFactory.getThreadLocal().writeValueAsBytes(configMetadata); |
| |
| ZkUtils.asyncCreateFullPathOptimistic(pulsar.getGlobalZkCache().getZooKeeper(), |
| partitionedTopicPath(topicName), content, |
| ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, (rc, path1, ctx, name) -> { |
| if (rc == KeeperException.Code.OK.intValue()) { |
| // Sync data to all quorums and the observers |
| pulsar.getGlobalZkCache().getZooKeeper().sync(partitionedTopicPath(topicName), |
| (rc2, path2, ctx2) -> { |
| if (rc2 == KeeperException.Code.OK.intValue()) { |
| partitionedTopicFuture.complete(configMetadata); |
| } else { |
| partitionedTopicFuture.completeExceptionally(KeeperException.create(rc2)); |
| } |
| }, null); |
| } else { |
| partitionedTopicFuture.completeExceptionally(KeeperException.create(rc)); |
| } |
| }, null); |
| |
| } catch (Exception e) { |
| log.error("Failed to create default partitioned topic.", e); |
| return FutureUtil.failedFuture(e); |
| } |
| |
| return partitionedTopicFuture; |
| } |
| |
| public CompletableFuture<PartitionedTopicMetadata> fetchPartitionedTopicMetadataAsync(TopicName topicName) { |
| // gets the number of partitions from the zk cache |
| return pulsar.getGlobalZkCache().getDataAsync(partitionedTopicPath(topicName), (key, content) -> { |
| return ObjectMapperFactory.getThreadLocal().readValue(content, PartitionedTopicMetadata.class); |
| }).thenApply(metadata -> { |
| // if the partitioned topic is not found in zk, then the topic is not partitioned |
| return metadata.orElseGet(() -> new PartitionedTopicMetadata()); |
| }); |
| } |
| |
| private static String partitionedTopicPath(TopicName topicName) { |
| return String.format("%s/%s/%s/%s", |
| ConfigurationCacheService.PARTITIONED_TOPICS_ROOT, |
| topicName.getNamespace(), |
| topicName.getDomain(), |
| topicName.getEncodedLocalName()); |
| } |
| |
| public OrderedExecutor getTopicOrderedExecutor() { |
| return topicOrderedExecutor; |
| } |
| |
| public ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, ConcurrentOpenHashMap<String, Topic>>> getMultiLayerTopicMap() { |
| return multiLayerTopicsMap; |
| } |
| |
| /** |
| * If per-broker unacked message reached to limit then it blocks dispatcher if its unacked message limit has been |
| * reached to {@link #maxUnackedMsgsPerDispatcher} |
| * |
| * @param dispatcher |
| * @param numberOfMessages |
| */ |
| public void addUnAckedMessages(PersistentDispatcherMultipleConsumers dispatcher, int numberOfMessages) { |
| // don't block dispatchers if maxUnackedMessages = 0 |
| if (maxUnackedMessages > 0) { |
| totalUnackedMessages.add(numberOfMessages); |
| |
| // block dispatcher: if broker is already blocked and dispatcher reaches to max dispatcher limit when broker |
| // is blocked |
| if (blockedDispatcherOnHighUnackedMsgs.get() && !dispatcher.isBlockedDispatcherOnUnackedMsgs() |
| && dispatcher.getTotalUnackedMessages() > maxUnackedMsgsPerDispatcher) { |
| lock.readLock().lock(); |
| try { |
| log.info("[{}] dispatcher reached to max unack msg limit on blocked-broker {}", |
| dispatcher.getName(), dispatcher.getTotalUnackedMessages()); |
| dispatcher.blockDispatcherOnUnackedMsgs(); |
| blockedDispatchers.add(dispatcher); |
| } finally { |
| lock.readLock().unlock(); |
| } |
| } |
| } |
| } |
| |
| /** |
| * Adds given dispatcher's unackMessage count to broker-unack message count and if it reaches to the |
| * {@link #maxUnackedMessages} then it blocks all the dispatchers which has unack-messages higher than |
| * {@link #maxUnackedMsgsPerDispatcher}. It unblocks all dispatchers once broker-unack message counts decreased to |
| * ({@link #maxUnackedMessages}/2) |
| * |
| */ |
| public void checkUnAckMessageDispatching() { |
| |
| // don't block dispatchers if maxUnackedMessages = 0 |
| if (maxUnackedMessages <= 0) { |
| return; |
| } |
| long unAckedMessages = totalUnackedMessages.sum(); |
| if (unAckedMessages >= maxUnackedMessages && blockedDispatcherOnHighUnackedMsgs.compareAndSet(false, true)) { |
| // block dispatcher with higher unack-msg when it reaches broker-unack msg limit |
| log.info("[{}] Starting blocking dispatchers with unacked msgs {} due to reached max broker limit {}", |
| maxUnackedMessages, maxUnackedMsgsPerDispatcher); |
| executor().execute(() -> blockDispatchersWithLargeUnAckMessages()); |
| } else if (blockedDispatcherOnHighUnackedMsgs.get() && unAckedMessages < maxUnackedMessages / 2) { |
| // unblock broker-dispatching if received enough acked messages back |
| if (blockedDispatcherOnHighUnackedMsgs.compareAndSet(true, false)) { |
| unblockDispatchersOnUnAckMessages(blockedDispatchers.values()); |
| } |
| } |
| |
| } |
| |
| public boolean isBrokerDispatchingBlocked() { |
| return blockedDispatcherOnHighUnackedMsgs.get(); |
| } |
| |
| private void blockDispatchersWithLargeUnAckMessages() { |
| lock.readLock().lock(); |
| try { |
| forEachTopic(topic -> { |
| topic.getSubscriptions().forEach((subName, persistentSubscription) -> { |
| if (persistentSubscription.getDispatcher() instanceof PersistentDispatcherMultipleConsumers) { |
| PersistentDispatcherMultipleConsumers dispatcher = (PersistentDispatcherMultipleConsumers) persistentSubscription |
| .getDispatcher(); |
| int dispatcherUnAckMsgs = dispatcher.getTotalUnackedMessages(); |
| if (dispatcherUnAckMsgs > maxUnackedMsgsPerDispatcher) { |
| log.info("[{}] Blocking dispatcher due to reached max broker limit {}", |
| dispatcher.getName(), dispatcher.getTotalUnackedMessages()); |
| dispatcher.blockDispatcherOnUnackedMsgs(); |
| blockedDispatchers.add(dispatcher); |
| } |
| } |
| }); |
| }); |
| } finally { |
| lock.readLock().unlock(); |
| } |
| } |
| |
| /** |
| * Unblocks the dispatchers and removes it from the {@link #blockedDispatchers} list |
| * |
| * @param dispatcherList |
| */ |
| public void unblockDispatchersOnUnAckMessages(List<PersistentDispatcherMultipleConsumers> dispatcherList) { |
| lock.writeLock().lock(); |
| try { |
| dispatcherList.forEach(dispatcher -> { |
| dispatcher.unBlockDispatcherOnUnackedMsgs(); |
| executor().execute(() -> dispatcher.readMoreEntries()); |
| log.info("[{}] Dispatcher is unblocked", dispatcher.getName()); |
| blockedDispatchers.remove(dispatcher); |
| }); |
| } finally { |
| lock.writeLock().unlock(); |
| } |
| } |
| |
| private static class ConfigField { |
| final Field field; |
| Predicate<String> validator; |
| |
| public ConfigField(Field field) { |
| super(); |
| this.field = field; |
| } |
| } |
| |
| /** |
| * Safely extract optional topic instance from a future, in a way to avoid unchecked exceptions and race conditions. |
| */ |
| public static Optional<Topic> extractTopic(CompletableFuture<Optional<Topic>> topicFuture) { |
| if (topicFuture.isDone() && !topicFuture.isCompletedExceptionally()) { |
| return topicFuture.join(); |
| } else { |
| return Optional.empty(); |
| } |
| } |
| |
| public Optional<Integer> getListenPort() { |
| if (listenChannel != null) { |
| return Optional.of(((InetSocketAddress) listenChannel.localAddress()).getPort()); |
| } else { |
| return Optional.empty(); |
| } |
| } |
| |
| public Optional<Integer> getListenPortTls() { |
| if (listenChannelTls != null) { |
| return Optional.of(((InetSocketAddress) listenChannelTls.localAddress()).getPort()); |
| } else { |
| return Optional.empty(); |
| } |
| } |
| } |