| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| package org.apache.pulsar.broker; |
| |
| import static com.google.common.base.Preconditions.checkNotNull; |
| import static org.apache.commons.lang3.StringUtils.isBlank; |
| import static org.apache.commons.lang3.StringUtils.isNotBlank; |
| import static org.apache.pulsar.broker.cache.ConfigurationCacheService.POLICIES; |
| import com.google.common.collect.ImmutableMap; |
| import com.google.common.collect.Lists; |
| import com.google.common.collect.Maps; |
| import io.netty.channel.ChannelInitializer; |
| import io.netty.channel.socket.SocketChannel; |
| import io.netty.util.HashedWheelTimer; |
| import io.netty.util.concurrent.DefaultThreadFactory; |
| import java.io.IOException; |
| import java.lang.reflect.Method; |
| import java.net.InetSocketAddress; |
| import java.util.Collections; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Objects; |
| import java.util.Optional; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.CompletionException; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.ScheduledExecutorService; |
| import java.util.concurrent.ScheduledFuture; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.concurrent.locks.Condition; |
| import java.util.concurrent.locks.ReentrantLock; |
| import java.util.function.Consumer; |
| import java.util.function.Supplier; |
| import lombok.AccessLevel; |
| import lombok.Getter; |
| import lombok.Setter; |
| import org.apache.bookkeeper.client.BookKeeper; |
| import org.apache.bookkeeper.common.util.OrderedExecutor; |
| import org.apache.bookkeeper.common.util.OrderedScheduler; |
| import org.apache.bookkeeper.conf.ClientConfiguration; |
| import org.apache.bookkeeper.mledger.LedgerOffloader; |
| import org.apache.bookkeeper.mledger.LedgerOffloaderFactory; |
| import org.apache.bookkeeper.mledger.ManagedLedgerFactory; |
| import org.apache.bookkeeper.mledger.impl.NullLedgerOffloader; |
| import org.apache.bookkeeper.mledger.offload.OffloaderUtils; |
| import org.apache.bookkeeper.mledger.offload.Offloaders; |
| import org.apache.commons.configuration.ConfigurationException; |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.commons.lang3.builder.ReflectionToStringBuilder; |
| import org.apache.pulsar.PulsarVersion; |
| import org.apache.pulsar.ZookeeperSessionExpiredHandlers; |
| import org.apache.pulsar.broker.admin.AdminResource; |
| import org.apache.pulsar.broker.authentication.AuthenticationService; |
| import org.apache.pulsar.broker.authorization.AuthorizationService; |
| import org.apache.pulsar.broker.cache.ConfigurationCacheService; |
| import org.apache.pulsar.broker.cache.LocalZooKeeperCacheService; |
| import org.apache.pulsar.broker.intercept.BrokerInterceptor; |
| import org.apache.pulsar.broker.intercept.BrokerInterceptors; |
| import org.apache.pulsar.broker.loadbalance.LeaderElectionService; |
| import org.apache.pulsar.broker.loadbalance.LoadManager; |
| import org.apache.pulsar.broker.loadbalance.LoadReportUpdaterTask; |
| import org.apache.pulsar.broker.loadbalance.LoadResourceQuotaUpdaterTask; |
| import org.apache.pulsar.broker.loadbalance.LoadSheddingTask; |
| import org.apache.pulsar.broker.loadbalance.impl.LoadManagerShared; |
| import org.apache.pulsar.broker.namespace.NamespaceService; |
| import org.apache.pulsar.broker.protocol.ProtocolHandlers; |
| import org.apache.pulsar.broker.resources.PulsarResources; |
| import org.apache.pulsar.broker.service.BrokerService; |
| import org.apache.pulsar.broker.service.SystemTopicBasedTopicPoliciesService; |
| import org.apache.pulsar.broker.service.Topic; |
| import org.apache.pulsar.broker.service.TopicPoliciesService; |
| import org.apache.pulsar.broker.service.schema.SchemaRegistryService; |
| import org.apache.pulsar.broker.stats.MetricsGenerator; |
| import org.apache.pulsar.broker.stats.prometheus.PrometheusMetricsServlet; |
| import org.apache.pulsar.broker.stats.prometheus.PrometheusRawMetricsProvider; |
| import org.apache.pulsar.broker.storage.ManagedLedgerStorage; |
| import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider; |
| import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl; |
| import org.apache.pulsar.broker.validator.MultipleListenerValidator; |
| import org.apache.pulsar.broker.web.WebService; |
| import org.apache.pulsar.client.admin.PulsarAdmin; |
| import org.apache.pulsar.client.admin.PulsarAdminBuilder; |
| import org.apache.pulsar.client.api.ClientBuilder; |
| import org.apache.pulsar.client.api.PulsarClient; |
| import org.apache.pulsar.client.api.transaction.TransactionBufferClient; |
| import org.apache.pulsar.client.impl.PulsarClientImpl; |
| import org.apache.pulsar.common.conf.InternalConfigurationData; |
| import org.apache.pulsar.common.configuration.PulsarConfigurationLoader; |
| import org.apache.pulsar.common.configuration.VipStatus; |
| import org.apache.pulsar.common.naming.NamespaceBundle; |
| import org.apache.pulsar.common.naming.NamespaceName; |
| import org.apache.pulsar.common.naming.TopicName; |
| import org.apache.pulsar.common.policies.data.ClusterData; |
| import org.apache.pulsar.common.policies.data.OffloadPolicies; |
| import org.apache.pulsar.common.protocol.schema.SchemaStorage; |
| import org.apache.pulsar.common.util.FutureUtil; |
| import org.apache.pulsar.compaction.Compactor; |
| import org.apache.pulsar.compaction.TwoPhaseCompactor; |
| import org.apache.pulsar.functions.worker.ErrorNotifier; |
| import org.apache.pulsar.functions.worker.WorkerConfig; |
| import org.apache.pulsar.functions.worker.WorkerService; |
| import org.apache.pulsar.metadata.api.MetadataStoreConfig; |
| import org.apache.pulsar.metadata.api.MetadataStoreException; |
| import org.apache.pulsar.metadata.api.coordination.CoordinationService; |
| import org.apache.pulsar.metadata.api.coordination.LeaderElectionState; |
| import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended; |
| import org.apache.pulsar.metadata.coordination.impl.CoordinationServiceImpl; |
| import org.apache.pulsar.packages.management.core.PackagesManagement; |
| import org.apache.pulsar.packages.management.core.PackagesStorage; |
| import org.apache.pulsar.packages.management.core.PackagesStorageProvider; |
| import org.apache.pulsar.packages.management.core.impl.DefaultPackagesStorageConfiguration; |
| import org.apache.pulsar.packages.management.core.impl.PackagesManagementImpl; |
| import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener; |
| import org.apache.pulsar.transaction.coordinator.TransactionMetadataStoreProvider; |
| import org.apache.pulsar.websocket.WebSocketConsumerServlet; |
| import org.apache.pulsar.websocket.WebSocketProducerServlet; |
| import org.apache.pulsar.websocket.WebSocketReaderServlet; |
| import org.apache.pulsar.websocket.WebSocketService; |
| import org.apache.pulsar.zookeeper.GlobalZooKeeperCache; |
| import org.apache.pulsar.zookeeper.LocalZooKeeperCache; |
| import org.apache.pulsar.zookeeper.LocalZooKeeperConnectionService; |
| import org.apache.pulsar.zookeeper.ZooKeeperCache; |
| import org.apache.pulsar.zookeeper.ZooKeeperClientFactory; |
| import org.apache.pulsar.zookeeper.ZooKeeperSessionWatcher.ShutdownService; |
| import org.apache.pulsar.zookeeper.ZookeeperBkClientFactoryImpl; |
| import org.apache.pulsar.zookeeper.ZookeeperSessionExpiredHandler; |
| import org.apache.zookeeper.ZooKeeper; |
| import org.eclipse.jetty.servlet.ServletHolder; |
| import org.eclipse.jetty.websocket.servlet.WebSocketServlet; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| /** |
| * Main class for Pulsar broker service. |
| */ |
| |
| @Getter(AccessLevel.PUBLIC) |
| @Setter(AccessLevel.PROTECTED) |
| public class PulsarService implements AutoCloseable { |
| private static final Logger LOG = LoggerFactory.getLogger(PulsarService.class); |
| private ServiceConfiguration config = null; |
| private NamespaceService nsService = null; |
| private ManagedLedgerStorage managedLedgerClientFactory = null; |
| private LeaderElectionService leaderElectionService = null; |
| private BrokerService brokerService = null; |
| private WebService webService = null; |
| private WebSocketService webSocketService = null; |
| private ConfigurationCacheService configurationCacheService = null; |
| private LocalZooKeeperCacheService localZkCacheService = null; |
| private TopicPoliciesService topicPoliciesService = TopicPoliciesService.DISABLED; |
| private BookKeeperClientFactory bkClientFactory; |
| private ZooKeeperCache localZkCache; |
| private GlobalZooKeeperCache globalZkCache; |
| private LocalZooKeeperConnectionService localZooKeeperConnectionProvider; |
| private Compactor compactor; |
| |
| private final ScheduledExecutorService executor; |
| private final ScheduledExecutorService cacheExecutor; |
| |
| private OrderedExecutor orderedExecutor; |
| private final ScheduledExecutorService loadManagerExecutor; |
| private ScheduledExecutorService compactorExecutor; |
| private OrderedScheduler offloaderScheduler; |
| private Offloaders offloaderManager = new Offloaders(); |
| private LedgerOffloader defaultOffloader; |
| private Map<NamespaceName, LedgerOffloader> ledgerOffloaderMap = new ConcurrentHashMap<>(); |
| private ScheduledFuture<?> loadReportTask = null; |
| private ScheduledFuture<?> loadSheddingTask = null; |
| private ScheduledFuture<?> loadResourceQuotaTask = null; |
| private final AtomicReference<LoadManager> loadManager = new AtomicReference<>(); |
| private PulsarAdmin adminClient = null; |
| private PulsarClient client = null; |
| private ZooKeeperClientFactory zkClientFactory = null; |
| private final String bindAddress; |
| private final String advertisedAddress; |
| private String webServiceAddress; |
| private String webServiceAddressTls; |
| private String brokerServiceUrl; |
| private String brokerServiceUrlTls; |
| private final String brokerVersion; |
| private SchemaStorage schemaStorage = null; |
| private SchemaRegistryService schemaRegistryService = null; |
| private final WorkerConfig workerConfig; |
| private final Optional<WorkerService> functionWorkerService; |
| private ProtocolHandlers protocolHandlers = null; |
| |
| private final ShutdownService shutdownService; |
| |
| private MetricsGenerator metricsGenerator; |
| |
| private TransactionMetadataStoreService transactionMetadataStoreService; |
| private TransactionBufferProvider transactionBufferProvider; |
| private TransactionBufferClient transactionBufferClient; |
| private HashedWheelTimer transactionTimer; |
| |
| private BrokerInterceptor brokerInterceptor; |
| |
| // packages management service |
| private PackagesManagement packagesManagement; |
| private PrometheusMetricsServlet metricsServlet; |
| private List<PrometheusRawMetricsProvider> pendingMetricsProviders; |
| |
| private MetadataStoreExtended localMetadataStore; |
| private CoordinationService coordinationService; |
| |
| private MetadataStoreExtended configurationMetadataStore; |
| private PulsarResources pulsarResources; |
| |
| public enum State { |
| Init, Started, Closed |
| } |
| |
| private volatile State state; |
| |
| private final ReentrantLock mutex = new ReentrantLock(); |
| private final Condition isClosedCondition = mutex.newCondition(); |
| // key is listener name , value is pulsar address and pulsar ssl address |
| private Map<String, AdvertisedListener> advertisedListeners; |
| |
| public PulsarService(ServiceConfiguration config) { |
| this(config, Optional.empty(), (exitCode) -> { |
| LOG.info("Process termination requested with code {}. " |
| + "Ignoring, as this constructor is intended for tests. ", exitCode); |
| }); |
| } |
| |
| public PulsarService(ServiceConfiguration config, Optional<WorkerService> functionWorkerService, |
| Consumer<Integer> processTerminator) { |
| this(config, new WorkerConfig(), functionWorkerService, processTerminator); |
| } |
| |
| public PulsarService(ServiceConfiguration config, |
| WorkerConfig workerConfig, |
| Optional<WorkerService> functionWorkerService, |
| Consumer<Integer> processTerminator) { |
| // Validate correctness of configuration |
| PulsarConfigurationLoader.isComplete(config); |
| // validate `advertisedAddress`, `advertisedListeners`, `internalListenerName` |
| Map<String, AdvertisedListener> result = |
| MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config); |
| if (result != null) { |
| this.advertisedListeners = Collections.unmodifiableMap(result); |
| } else { |
| this.advertisedListeners = Collections.unmodifiableMap(Collections.emptyMap()); |
| } |
| state = State.Init; |
| // use `internalListenerName` listener as `advertisedAddress` |
| this.bindAddress = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getBindAddress()); |
| if (!this.advertisedListeners.isEmpty()) { |
| this.advertisedAddress = this.advertisedListeners.get( |
| config.getInternalListenerName()).getBrokerServiceUrl().getHost(); |
| } else { |
| this.advertisedAddress = advertisedAddress(config); |
| } |
| this.brokerVersion = PulsarVersion.getVersion(); |
| this.config = config; |
| this.shutdownService = new MessagingServiceShutdownHook(this, processTerminator); |
| this.loadManagerExecutor = Executors |
| .newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-load-manager")); |
| this.workerConfig = workerConfig; |
| this.functionWorkerService = functionWorkerService; |
| this.executor = Executors.newScheduledThreadPool(config.getNumExecutorThreadPoolSize(), |
| new DefaultThreadFactory("pulsar")); |
| this.cacheExecutor = Executors.newScheduledThreadPool(config.getNumCacheExecutorThreadPoolSize(), |
| new DefaultThreadFactory("zk-cache-callback")); |
| } |
| |
| public MetadataStoreExtended createConfigurationMetadataStore() throws MetadataStoreException { |
| return MetadataStoreExtended.create(config.getConfigurationStoreServers(), |
| MetadataStoreConfig.builder() |
| .sessionTimeoutMillis((int) config.getZooKeeperSessionTimeoutMillis()) |
| .allowReadOnlyOperations(false) |
| .build()); |
| } |
| |
| /** |
| * Close the current pulsar service. All resources are released. |
| */ |
| @Override |
| public void close() throws PulsarServerException { |
| mutex.lock(); |
| |
| try { |
| if (state == State.Closed) { |
| return; |
| } |
| |
| // close the service in reverse order v.s. in which they are started |
| if (this.webService != null) { |
| this.webService.close(); |
| this.webService = null; |
| } |
| |
| if (this.webSocketService != null) { |
| this.webSocketService.close(); |
| } |
| |
| if (this.brokerService != null) { |
| this.brokerService.close(); |
| this.brokerService = null; |
| } |
| |
| if (this.managedLedgerClientFactory != null) { |
| this.managedLedgerClientFactory.close(); |
| this.managedLedgerClientFactory = null; |
| } |
| |
| if (bkClientFactory != null) { |
| this.bkClientFactory.close(); |
| this.bkClientFactory = null; |
| } |
| |
| if (this.leaderElectionService != null) { |
| this.leaderElectionService.close(); |
| this.leaderElectionService = null; |
| } |
| |
| loadManagerExecutor.shutdown(); |
| |
| if (globalZkCache != null) { |
| globalZkCache.close(); |
| globalZkCache = null; |
| localZooKeeperConnectionProvider.close(); |
| localZooKeeperConnectionProvider = null; |
| } |
| |
| configurationCacheService = null; |
| localZkCacheService = null; |
| if (localZkCache != null) { |
| localZkCache.stop(); |
| localZkCache = null; |
| } |
| |
| if (adminClient != null) { |
| adminClient.close(); |
| adminClient = null; |
| } |
| |
| if (client != null) { |
| client.close(); |
| client = null; |
| } |
| |
| nsService = null; |
| |
| if (compactorExecutor != null) { |
| compactorExecutor.shutdown(); |
| } |
| |
| if (offloaderScheduler != null) { |
| offloaderScheduler.shutdown(); |
| } |
| |
| // executor is not initialized in mocks even when real close method is called |
| // guard against null executors |
| if (executor != null) { |
| executor.shutdown(); |
| } |
| |
| if (orderedExecutor != null) { |
| orderedExecutor.shutdown(); |
| } |
| cacheExecutor.shutdown(); |
| |
| LoadManager loadManager = this.loadManager.get(); |
| if (loadManager != null) { |
| loadManager.stop(); |
| } |
| |
| if (schemaRegistryService != null) { |
| schemaRegistryService.close(); |
| } |
| |
| offloaderManager.close(); |
| |
| if (protocolHandlers != null) { |
| protocolHandlers.close(); |
| protocolHandlers = null; |
| } |
| |
| if (transactionBufferClient != null) { |
| transactionBufferClient.close(); |
| } |
| |
| if (coordinationService != null) { |
| coordinationService.close(); |
| } |
| |
| if (localMetadataStore != null) { |
| localMetadataStore.close(); |
| } |
| if (configurationMetadataStore != null) { |
| configurationMetadataStore.close(); |
| } |
| |
| state = State.Closed; |
| isClosedCondition.signalAll(); |
| } catch (Exception e) { |
| if (e instanceof CompletionException && e.getCause() instanceof MetadataStoreException) { |
| throw new PulsarServerException(MetadataStoreException.unwrap((CompletionException) e)); |
| } else if (e.getCause() instanceof CompletionException |
| && e.getCause().getCause() instanceof MetadataStoreException) { |
| throw new PulsarServerException(MetadataStoreException.unwrap((CompletionException) e.getCause())); |
| } else { |
| throw new PulsarServerException(e); |
| } |
| } finally { |
| mutex.unlock(); |
| } |
| } |
| |
| /** |
| * Get the current service configuration. |
| * |
| * @return the current service configuration |
| */ |
| public ServiceConfiguration getConfiguration() { |
| return this.config; |
| } |
| |
| /** |
| * Get the current function worker service configuration. |
| * |
| * @return the current function worker service configuration. |
| */ |
| public Optional<WorkerConfig> getWorkerConfig() { |
| return functionWorkerService.map(service -> workerConfig); |
| } |
| |
| public Map<String, String> getProtocolDataToAdvertise() { |
| if (null == protocolHandlers) { |
| return Collections.emptyMap(); |
| } else { |
| return protocolHandlers.getProtocolDataToAdvertise(); |
| } |
| } |
| |
| /** |
| * Start the pulsar service instance. |
| */ |
| public void start() throws PulsarServerException { |
| mutex.lock(); |
| |
| LOG.info("Starting Pulsar Broker service; version: '{}'", |
| (brokerVersion != null ? brokerVersion : "unknown")); |
| LOG.info("Git Revision {}", PulsarVersion.getGitSha()); |
| LOG.info("Built by {} on {} at {}", |
| PulsarVersion.getBuildUser(), |
| PulsarVersion.getBuildHost(), |
| PulsarVersion.getBuildTime()); |
| |
| try { |
| if (state != State.Init) { |
| throw new PulsarServerException("Cannot start the service once it was stopped"); |
| } |
| |
| if (!config.getWebServicePort().isPresent() && !config.getWebServicePortTls().isPresent()) { |
| throw new IllegalArgumentException("webServicePort/webServicePortTls must be present"); |
| } |
| |
| if (!config.getBrokerServicePort().isPresent() && !config.getBrokerServicePortTls().isPresent()) { |
| throw new IllegalArgumentException("brokerServicePort/brokerServicePortTls must be present"); |
| } |
| |
| localMetadataStore = createLocalMetadataStore(); |
| coordinationService = new CoordinationServiceImpl(localMetadataStore); |
| |
| configurationMetadataStore = createConfigurationMetadataStore(); |
| pulsarResources = new PulsarResources(localMetadataStore, configurationMetadataStore, |
| config.getZooKeeperOperationTimeoutSeconds()); |
| |
| orderedExecutor = OrderedExecutor.newBuilder() |
| .numThreads(config.getNumOrderedExecutorThreads()) |
| .name("pulsar-ordered") |
| .build(); |
| |
| // Initialize the message protocol handlers |
| protocolHandlers = ProtocolHandlers.load(config); |
| protocolHandlers.initialize(config); |
| |
| // Now we are ready to start services |
| localZooKeeperConnectionProvider = new LocalZooKeeperConnectionService(getZooKeeperClientFactory(), |
| config.getZookeeperServers(), config.getZooKeeperSessionTimeoutMillis()); |
| ZookeeperSessionExpiredHandler sessionExpiredHandler = null; |
| if (ZookeeperSessionExpiredHandlers.RECONNECT_POLICY.equals(config.getZookeeperSessionExpiredPolicy())) { |
| sessionExpiredHandler = ZookeeperSessionExpiredHandlers.reconnectWhenZookeeperSessionExpired( |
| this, shutdownService); |
| } else if (ZookeeperSessionExpiredHandlers.SHUTDOWN_POLICY.equals( |
| config.getZookeeperSessionExpiredPolicy())) { |
| sessionExpiredHandler = ZookeeperSessionExpiredHandlers.shutdownWhenZookeeperSessionExpired( |
| shutdownService); |
| } else { |
| throw new IllegalArgumentException("Invalid zookeeper session expired policy " |
| + config.getZookeeperSessionExpiredPolicy()); |
| } |
| localZooKeeperConnectionProvider.start(sessionExpiredHandler); |
| |
| // Initialize and start service to access configuration repository. |
| this.startZkCacheService(); |
| |
| this.bkClientFactory = newBookKeeperClientFactory(); |
| managedLedgerClientFactory = ManagedLedgerStorage.create( |
| config, getZkClient(), bkClientFactory |
| ); |
| |
| this.brokerService = new BrokerService(this); |
| |
| // Start load management service (even if load balancing is disabled) |
| this.loadManager.set(LoadManager.create(this)); |
| |
| // needs load management service and before start broker service, |
| this.startNamespaceService(); |
| |
| schemaStorage = createAndStartSchemaStorage(); |
| schemaRegistryService = SchemaRegistryService.create( |
| schemaStorage, config.getSchemaRegistryCompatibilityCheckers()); |
| |
| this.offloaderManager = OffloaderUtils.searchForOffloaders( |
| config.getOffloadersDirectory(), config.getNarExtractionDirectory()); |
| this.defaultOffloader = createManagedLedgerOffloader( |
| OffloadPolicies.create(this.getConfiguration().getProperties())); |
| this.brokerInterceptor = BrokerInterceptors.load(config); |
| brokerService.setInterceptor(getBrokerInterceptor()); |
| this.brokerInterceptor.initialize(this); |
| brokerService.start(); |
| |
| this.webService = new WebService(this); |
| Map<String, Object> attributeMap = Maps.newHashMap(); |
| attributeMap.put(WebService.ATTRIBUTE_PULSAR_NAME, this); |
| Map<String, Object> vipAttributeMap = Maps.newHashMap(); |
| vipAttributeMap.put(VipStatus.ATTRIBUTE_STATUS_FILE_PATH, this.config.getStatusFilePath()); |
| vipAttributeMap.put(VipStatus.ATTRIBUTE_IS_READY_PROBE, new Supplier<Boolean>() { |
| @Override |
| public Boolean get() { |
| // Ensure the VIP status is only visible when the broker is fully initialized |
| return state == State.Started; |
| } |
| }); |
| this.webService.addRestResources("/", |
| VipStatus.class.getPackage().getName(), false, vipAttributeMap); |
| this.webService.addRestResources("/", |
| "org.apache.pulsar.broker.web", false, attributeMap); |
| this.webService.addRestResources("/admin", |
| "org.apache.pulsar.broker.admin.v1", true, attributeMap); |
| this.webService.addRestResources("/admin/v2", |
| "org.apache.pulsar.broker.admin.v2", true, attributeMap); |
| this.webService.addRestResources("/admin/v3", |
| "org.apache.pulsar.broker.admin.v3", true, attributeMap); |
| this.webService.addRestResources("/lookup", |
| "org.apache.pulsar.broker.lookup", true, attributeMap); |
| this.metricsServlet = new PrometheusMetricsServlet( |
| this, config.isExposeTopicLevelMetricsInPrometheus(), |
| config.isExposeConsumerLevelMetricsInPrometheus(), |
| config.isExposeProducerLevelMetricsInPrometheus()); |
| if (pendingMetricsProviders != null) { |
| pendingMetricsProviders.forEach(provider -> metricsServlet.addRawMetricsProvider(provider)); |
| this.pendingMetricsProviders = null; |
| } |
| |
| this.webService.addServlet("/metrics", |
| new ServletHolder(metricsServlet), |
| false, attributeMap); |
| |
| if (config.isWebSocketServiceEnabled()) { |
| // Use local broker address to avoid different IP address when using a VIP for service discovery |
| this.webSocketService = new WebSocketService(null, config); |
| this.webSocketService.start(); |
| |
| final WebSocketServlet producerWebSocketServlet = new WebSocketProducerServlet(webSocketService); |
| this.webService.addServlet(WebSocketProducerServlet.SERVLET_PATH, |
| new ServletHolder(producerWebSocketServlet), true, attributeMap); |
| this.webService.addServlet(WebSocketProducerServlet.SERVLET_PATH_V2, |
| new ServletHolder(producerWebSocketServlet), true, attributeMap); |
| |
| final WebSocketServlet consumerWebSocketServlet = new WebSocketConsumerServlet(webSocketService); |
| this.webService.addServlet(WebSocketConsumerServlet.SERVLET_PATH, |
| new ServletHolder(consumerWebSocketServlet), true, attributeMap); |
| this.webService.addServlet(WebSocketConsumerServlet.SERVLET_PATH_V2, |
| new ServletHolder(consumerWebSocketServlet), true, attributeMap); |
| |
| final WebSocketServlet readerWebSocketServlet = new WebSocketReaderServlet(webSocketService); |
| this.webService.addServlet(WebSocketReaderServlet.SERVLET_PATH, |
| new ServletHolder(readerWebSocketServlet), true, attributeMap); |
| this.webService.addServlet(WebSocketReaderServlet.SERVLET_PATH_V2, |
| new ServletHolder(readerWebSocketServlet), true, attributeMap); |
| } |
| |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Attempting to add static directory"); |
| } |
| this.webService.addStaticResources("/static", "/static"); |
| |
| webService.start(); |
| |
| // Refresh addresses, since the port might have been dynamically assigned |
| this.webServiceAddress = webAddress(config); |
| this.webServiceAddressTls = webAddressTls(config); |
| this.brokerServiceUrl = brokerUrl(config); |
| this.brokerServiceUrlTls = brokerUrlTls(config); |
| |
| if (null != this.webSocketService) { |
| ClusterData clusterData = |
| new ClusterData(webServiceAddress, webServiceAddressTls, brokerServiceUrl, brokerServiceUrlTls); |
| this.webSocketService.setLocalCluster(clusterData); |
| } |
| |
| // Initialize namespace service, after service url assigned. Should init zk and refresh self owner info. |
| this.nsService.initialize(); |
| |
| // Start topic level policies service |
| if (config.isTopicLevelPoliciesEnabled() && config.isSystemTopicEnabled()) { |
| this.topicPoliciesService = new SystemTopicBasedTopicPoliciesService(this); |
| } |
| |
| this.topicPoliciesService.start(); |
| |
| // Start the leader election service |
| startLeaderElectionService(); |
| |
| // Register heartbeat and bootstrap namespaces. |
| this.nsService.registerBootstrapNamespaces(); |
| |
| // Register pulsar system namespaces and start transaction meta store service |
| if (config.isTransactionCoordinatorEnabled()) { |
| this.transactionTimer = |
| new HashedWheelTimer(new DefaultThreadFactory("pulsar-transaction-timer")); |
| transactionBufferClient = TransactionBufferClientImpl.create( |
| getNamespaceService(), ((PulsarClientImpl) getClient()).getCnxPool(), transactionTimer); |
| |
| transactionMetadataStoreService = new TransactionMetadataStoreService(TransactionMetadataStoreProvider |
| .newProvider(config.getTransactionMetadataStoreProviderClassName()), this, |
| transactionBufferClient, transactionTimer); |
| transactionMetadataStoreService.start(); |
| |
| transactionBufferProvider = TransactionBufferProvider |
| .newProvider(config.getTransactionBufferProviderClassName()); |
| } |
| |
| this.metricsGenerator = new MetricsGenerator(this); |
| |
| // By starting the Load manager service, the broker will also become visible |
| // to the rest of the broker by creating the registration z-node. This needs |
| // to be done only when the broker is fully operative. |
| this.startLoadManagementService(); |
| |
| // Initialize the message protocol handlers. |
| // start the protocol handlers only after the broker is ready, |
| // so that the protocol handlers can access broker service properly. |
| this.protocolHandlers.start(brokerService); |
| Map<String, Map<InetSocketAddress, ChannelInitializer<SocketChannel>>> protocolHandlerChannelInitializers = |
| this.protocolHandlers.newChannelInitializers(); |
| this.brokerService.startProtocolHandlers(protocolHandlerChannelInitializers); |
| |
| acquireSLANamespace(); |
| |
| // start function worker service if necessary |
| this.startWorkerService(brokerService.getAuthenticationService(), brokerService.getAuthorizationService()); |
| |
| // start packages management service if necessary |
| if (config.isEnablePackagesManagement()) { |
| this.startPackagesManagementService(); |
| } |
| |
| final String bootstrapMessage = "bootstrap service " |
| + (config.getWebServicePort().isPresent() ? "port = " + config.getWebServicePort().get() : "") |
| + (config.getWebServicePortTls().isPresent() ? ", tls-port = " + config.getWebServicePortTls() : "") |
| + (config.getBrokerServicePort().isPresent() ? ", broker url= " + brokerServiceUrl : "") |
| + (config.getBrokerServicePortTls().isPresent() ? ", broker tls url= " + brokerServiceUrlTls : ""); |
| LOG.info("messaging service is ready"); |
| |
| LOG.info("messaging service is ready, {}, cluster={}, configs={}", bootstrapMessage, |
| config.getClusterName(), ReflectionToStringBuilder.toString(config)); |
| |
| state = State.Started; |
| } catch (Exception e) { |
| LOG.error(e.getMessage(), e); |
| throw new PulsarServerException(e); |
| } finally { |
| mutex.unlock(); |
| } |
| } |
| |
| public MetadataStoreExtended createLocalMetadataStore() throws MetadataStoreException { |
| return MetadataStoreExtended.create(config.getZookeeperServers(), |
| MetadataStoreConfig.builder() |
| .sessionTimeoutMillis((int) config.getZooKeeperSessionTimeoutMillis()) |
| .allowReadOnlyOperations(false) |
| .build()); |
| } |
| |
| protected void startLeaderElectionService() { |
| this.leaderElectionService = new LeaderElectionService(coordinationService, getSafeWebServiceAddress(), |
| state -> { |
| if (state == LeaderElectionState.Leading) { |
| LOG.info("This broker was elected leader"); |
| if (getConfiguration().isLoadBalancerEnabled()) { |
| long loadSheddingInterval = TimeUnit.MINUTES |
| .toMillis(getConfiguration().getLoadBalancerSheddingIntervalMinutes()); |
| long resourceQuotaUpdateInterval = TimeUnit.MINUTES |
| .toMillis(getConfiguration().getLoadBalancerResourceQuotaUpdateIntervalMinutes()); |
| |
| if (loadSheddingTask != null) { |
| loadSheddingTask.cancel(false); |
| } |
| if (loadResourceQuotaTask != null) { |
| loadResourceQuotaTask.cancel(false); |
| } |
| loadSheddingTask = loadManagerExecutor.scheduleAtFixedRate( |
| new LoadSheddingTask(loadManager), |
| loadSheddingInterval, loadSheddingInterval, TimeUnit.MILLISECONDS); |
| loadResourceQuotaTask = loadManagerExecutor.scheduleAtFixedRate( |
| new LoadResourceQuotaUpdaterTask(loadManager), resourceQuotaUpdateInterval, |
| resourceQuotaUpdateInterval, TimeUnit.MILLISECONDS); |
| } |
| } else { |
| if (leaderElectionService != null) { |
| LOG.info("This broker is a follower. Current leader is {}", |
| leaderElectionService.getCurrentLeader()); |
| } |
| if (loadSheddingTask != null) { |
| loadSheddingTask.cancel(false); |
| loadSheddingTask = null; |
| } |
| if (loadResourceQuotaTask != null) { |
| loadResourceQuotaTask.cancel(false); |
| loadResourceQuotaTask = null; |
| } |
| } |
| }); |
| |
| leaderElectionService.start(); |
| } |
| |
| protected void acquireSLANamespace() { |
| try { |
| // Namespace not created hence no need to unload it |
| String nsName = NamespaceService.getSLAMonitorNamespace(getAdvertisedAddress(), config); |
| if (!this.pulsarResources.getNamespaceResources().exists( |
| AdminResource.path(POLICIES) + "/" + nsName)) { |
| LOG.info("SLA Namespace = {} doesn't exist.", nsName); |
| return; |
| } |
| |
| boolean acquiredSLANamespace; |
| try { |
| acquiredSLANamespace = nsService.registerSLANamespace(); |
| LOG.info("Register SLA Namespace = {}, returned - {}.", nsName, acquiredSLANamespace); |
| } catch (PulsarServerException e) { |
| acquiredSLANamespace = false; |
| } |
| |
| if (!acquiredSLANamespace) { |
| this.nsService.unloadSLANamespace(); |
| } |
| } catch (Exception ex) { |
| LOG.warn( |
| "Exception while trying to unload the SLA namespace," |
| + " will try to unload the namespace again after 1 minute. Exception:", |
| ex); |
| executor.schedule(this::acquireSLANamespace, 1, TimeUnit.MINUTES); |
| } catch (Throwable ex) { |
| // To make sure SLA monitor doesn't interfere with the normal broker flow |
| LOG.warn( |
| "Exception while trying to unload the SLA namespace," |
| + " will not try to unload the namespace again. Exception:", |
| ex); |
| } |
| } |
| |
| /** |
| * Block until the service is finally closed. |
| */ |
| public void waitUntilClosed() throws InterruptedException { |
| mutex.lock(); |
| |
| try { |
| while (state != State.Closed) { |
| isClosedCondition.await(); |
| } |
| } finally { |
| mutex.unlock(); |
| } |
| } |
| |
| protected void startZkCacheService() throws PulsarServerException { |
| |
| LOG.info("starting configuration cache service"); |
| |
| this.localZkCache = new LocalZooKeeperCache(getZkClient(), config.getZooKeeperOperationTimeoutSeconds(), |
| getOrderedExecutor()); |
| this.globalZkCache = new GlobalZooKeeperCache(getZooKeeperClientFactory(), |
| (int) config.getZooKeeperSessionTimeoutMillis(), |
| config.getZooKeeperOperationTimeoutSeconds(), config.getConfigurationStoreServers(), |
| getOrderedExecutor(), this.cacheExecutor, config.getZooKeeperCacheExpirySeconds()); |
| try { |
| this.globalZkCache.start(); |
| } catch (IOException e) { |
| throw new PulsarServerException(e); |
| } |
| |
| this.configurationCacheService = new ConfigurationCacheService(globalZkCache, this.config.getClusterName(), |
| pulsarResources); |
| this.localZkCacheService = new LocalZooKeeperCacheService(getLocalZkCache(), this.configurationCacheService); |
| } |
| |
| protected void startNamespaceService() throws PulsarServerException { |
| |
| LOG.info("Starting name space service, bootstrap namespaces=" + config.getBootstrapNamespaces()); |
| |
| this.nsService = getNamespaceServiceProvider().get(); |
| } |
| |
| public Supplier<NamespaceService> getNamespaceServiceProvider() throws PulsarServerException { |
| return () -> new NamespaceService(PulsarService.this); |
| } |
| |
| protected void startLoadManagementService() throws PulsarServerException { |
| LOG.info("Starting load management service ..."); |
| this.loadManager.get().start(); |
| |
| if (config.isLoadBalancerEnabled()) { |
| LOG.info("Starting load balancer"); |
| if (this.loadReportTask == null) { |
| long loadReportMinInterval = LoadManagerShared.LOAD_REPORT_UPDATE_MIMIMUM_INTERVAL; |
| this.loadReportTask = this.loadManagerExecutor.scheduleAtFixedRate( |
| new LoadReportUpdaterTask(loadManager), loadReportMinInterval, loadReportMinInterval, |
| TimeUnit.MILLISECONDS); |
| } |
| } |
| } |
| |
| /** |
| * Load all the topics contained in a namespace. |
| * |
| * @param bundle <code>NamespaceBundle</code> to identify the service unit |
| * @throws Exception |
| */ |
| public void loadNamespaceTopics(NamespaceBundle bundle) { |
| executor.submit(() -> { |
| LOG.info("Loading all topics on bundle: {}", bundle); |
| |
| NamespaceName nsName = bundle.getNamespaceObject(); |
| List<CompletableFuture<Topic>> persistentTopics = Lists.newArrayList(); |
| long topicLoadStart = System.nanoTime(); |
| |
| for (String topic : getNamespaceService().getListOfPersistentTopics(nsName).join()) { |
| try { |
| TopicName topicName = TopicName.get(topic); |
| if (bundle.includes(topicName)) { |
| CompletableFuture<Topic> future = brokerService.getOrCreateTopic(topic); |
| if (future != null) { |
| persistentTopics.add(future); |
| } |
| } |
| } catch (Throwable t) { |
| LOG.warn("Failed to preload topic {}", topic, t); |
| } |
| } |
| |
| if (!persistentTopics.isEmpty()) { |
| FutureUtil.waitForAll(persistentTopics).thenRun(() -> { |
| double topicLoadTimeSeconds = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - topicLoadStart) |
| / 1000.0; |
| LOG.info("Loaded {} topics on {} -- time taken: {} seconds", persistentTopics.size(), bundle, |
| topicLoadTimeSeconds); |
| }); |
| } |
| return null; |
| }); |
| } |
| |
| // No need to synchronize since config is only init once |
| // We only read this from memory later |
| public String getStatusFilePath() { |
| if (config == null) { |
| return null; |
| } |
| return config.getStatusFilePath(); |
| } |
| |
| public ZooKeeper getZkClient() { |
| return this.localZooKeeperConnectionProvider.getLocalZooKeeper(); |
| } |
| |
| /** |
| * Get default bookkeeper metadata service uri. |
| */ |
| public String getMetadataServiceUri() { |
| return bookieMetadataServiceUri(this.getConfiguration()); |
| } |
| |
| public InternalConfigurationData getInternalConfigurationData() { |
| |
| String metadataServiceUri = getMetadataServiceUri(); |
| |
| if (StringUtils.isNotBlank(config.getBookkeeperMetadataServiceUri())) { |
| metadataServiceUri = this.getConfiguration().getBookkeeperMetadataServiceUri(); |
| } |
| |
| return new InternalConfigurationData( |
| this.getConfiguration().getZookeeperServers(), |
| this.getConfiguration().getConfigurationStoreServers(), |
| new ClientConfiguration().getZkLedgersRootPath(), |
| metadataServiceUri, |
| this.getWorkerConfig().map(wc -> wc.getStateStorageServiceUrl()).orElse(null)); |
| } |
| |
| public ConfigurationCacheService getConfigurationCache() { |
| return configurationCacheService; |
| } |
| |
| /** |
| * Get the current pulsar state. |
| */ |
| public State getState() { |
| return this.state; |
| } |
| |
| /** |
| * Get a reference of the current <code>LeaderElectionService</code> instance associated with the current |
| * <code>PulsarService</code> instance. |
| * |
| * @return a reference of the current <code>LeaderElectionService</code> instance. |
| */ |
| public LeaderElectionService getLeaderElectionService() { |
| return this.leaderElectionService; |
| } |
| |
| /** |
| * Get a reference of the current namespace service instance. |
| * |
| * @return a reference of the current namespace service instance. |
| */ |
| public NamespaceService getNamespaceService() { |
| return this.nsService; |
| } |
| |
| |
| public Optional<WorkerService> getWorkerServiceOpt() { |
| return functionWorkerService; |
| } |
| |
| public WorkerService getWorkerService() throws UnsupportedOperationException { |
| return functionWorkerService.orElseThrow(() -> new UnsupportedOperationException("Pulsar Function Worker " |
| + "is not enabled, probably functionsWorkerEnabled is set to false")); |
| } |
| |
| /** |
| * Get a reference of the current <code>BrokerService</code> instance associated with the current |
| * <code>PulsarService</code> instance. |
| * |
| * @return a reference of the current <code>BrokerService</code> instance. |
| */ |
| public BrokerService getBrokerService() { |
| return this.brokerService; |
| } |
| |
| public BookKeeper getBookKeeperClient() { |
| return managedLedgerClientFactory.getBookKeeperClient(); |
| } |
| |
| public ManagedLedgerFactory getManagedLedgerFactory() { |
| return managedLedgerClientFactory.getManagedLedgerFactory(); |
| } |
| |
| public ManagedLedgerStorage getManagedLedgerClientFactory() { |
| return managedLedgerClientFactory; |
| } |
| |
| /** |
| * First, get <code>LedgerOffloader</code> from local map cache, |
| * create new <code>LedgerOffloader</code> if not in cache or |
| * the <code>OffloadPolicies</code> changed, return the <code>LedgerOffloader</code> directly if exist in cache |
| * and the <code>OffloadPolicies</code> not changed. |
| * |
| * @param namespaceName NamespaceName |
| * @param offloadPolicies the OffloadPolicies |
| * @return LedgerOffloader |
| */ |
| public LedgerOffloader getManagedLedgerOffloader(NamespaceName namespaceName, OffloadPolicies offloadPolicies) { |
| if (offloadPolicies == null) { |
| return getDefaultOffloader(); |
| } |
| return ledgerOffloaderMap.compute(namespaceName, (ns, offloader) -> { |
| try { |
| if (offloader != null && Objects.equals(offloader.getOffloadPolicies(), offloadPolicies)) { |
| return offloader; |
| } else { |
| if (offloader != null) { |
| offloader.close(); |
| } |
| return createManagedLedgerOffloader(offloadPolicies); |
| } |
| } catch (PulsarServerException e) { |
| LOG.error("create ledgerOffloader failed for namespace {}", namespaceName.toString(), e); |
| return new NullLedgerOffloader(); |
| } |
| }); |
| } |
| |
| public synchronized LedgerOffloader createManagedLedgerOffloader(OffloadPolicies offloadPolicies) |
| throws PulsarServerException { |
| try { |
| if (StringUtils.isNotBlank(offloadPolicies.getManagedLedgerOffloadDriver())) { |
| checkNotNull(offloadPolicies.getOffloadersDirectory(), |
| "Offloader driver is configured to be '%s' but no offloaders directory is configured.", |
| offloadPolicies.getManagedLedgerOffloadDriver()); |
| LedgerOffloaderFactory offloaderFactory = this.offloaderManager.getOffloaderFactory( |
| offloadPolicies.getManagedLedgerOffloadDriver()); |
| try { |
| return offloaderFactory.create( |
| offloadPolicies, |
| ImmutableMap.of( |
| LedgerOffloader.METADATA_SOFTWARE_VERSION_KEY.toLowerCase(), PulsarVersion.getVersion(), |
| LedgerOffloader.METADATA_SOFTWARE_GITSHA_KEY.toLowerCase(), PulsarVersion.getGitSha() |
| ), |
| schemaStorage, |
| getOffloaderScheduler(offloadPolicies)); |
| } catch (IOException ioe) { |
| throw new PulsarServerException(ioe.getMessage(), ioe.getCause()); |
| } |
| } else { |
| LOG.info("No ledger offloader configured, using NULL instance"); |
| return NullLedgerOffloader.INSTANCE; |
| } |
| } catch (Throwable t) { |
| throw new PulsarServerException(t); |
| } |
| } |
| |
| private SchemaStorage createAndStartSchemaStorage() throws Exception { |
| final Class<?> storageClass = Class.forName(config.getSchemaRegistryStorageClassName()); |
| Object factoryInstance = storageClass.newInstance(); |
| Method createMethod = storageClass.getMethod("create", PulsarService.class); |
| SchemaStorage schemaStorage = (SchemaStorage) createMethod.invoke(factoryInstance, this); |
| schemaStorage.start(); |
| return schemaStorage; |
| } |
| |
| public ZooKeeperCache getLocalZkCache() { |
| return localZkCache; |
| } |
| |
| public ScheduledExecutorService getExecutor() { |
| return executor; |
| } |
| |
| public ScheduledExecutorService getCacheExecutor() { |
| return cacheExecutor; |
| } |
| |
| public ScheduledExecutorService getLoadManagerExecutor() { |
| return loadManagerExecutor; |
| } |
| |
| public OrderedExecutor getOrderedExecutor() { |
| return orderedExecutor; |
| } |
| |
| public LocalZooKeeperCacheService getLocalZkCacheService() { |
| return this.localZkCacheService; |
| } |
| |
| public ZooKeeperClientFactory getZooKeeperClientFactory() { |
| if (zkClientFactory == null) { |
| zkClientFactory = new ZookeeperBkClientFactoryImpl(orderedExecutor); |
| } |
| // Return default factory |
| return zkClientFactory; |
| } |
| |
| public BookKeeperClientFactory newBookKeeperClientFactory() { |
| return new BookKeeperClientFactoryImpl(); |
| } |
| |
| public BookKeeperClientFactory getBookKeeperClientFactory() { |
| return bkClientFactory; |
| } |
| |
| protected synchronized ScheduledExecutorService getCompactorExecutor() { |
| if (this.compactorExecutor == null) { |
| compactorExecutor = Executors.newSingleThreadScheduledExecutor(new DefaultThreadFactory("compaction")); |
| } |
| return this.compactorExecutor; |
| } |
| |
| // only public so mockito can mock it |
| public Compactor newCompactor() throws PulsarServerException { |
| return new TwoPhaseCompactor(this.getConfiguration(), |
| getClient(), getBookKeeperClient(), |
| getCompactorExecutor()); |
| } |
| |
| public synchronized Compactor getCompactor() throws PulsarServerException { |
| if (this.compactor == null) { |
| this.compactor = newCompactor(); |
| } |
| return this.compactor; |
| } |
| |
| protected synchronized OrderedScheduler getOffloaderScheduler(OffloadPolicies offloadPolicies) { |
| if (this.offloaderScheduler == null) { |
| this.offloaderScheduler = OrderedScheduler.newSchedulerBuilder() |
| .numThreads(offloadPolicies.getManagedLedgerOffloadMaxThreads()) |
| .name("offloader").build(); |
| } |
| return this.offloaderScheduler; |
| } |
| |
| public synchronized PulsarClient getClient() throws PulsarServerException { |
| if (this.client == null) { |
| try { |
| ClientBuilder builder = PulsarClient.builder() |
| .serviceUrl(this.getConfiguration().isTlsEnabled() |
| ? this.brokerServiceUrlTls : this.brokerServiceUrl) |
| .enableTls(this.getConfiguration().isTlsEnabled()) |
| .allowTlsInsecureConnection(this.getConfiguration().isTlsAllowInsecureConnection()) |
| .tlsTrustCertsFilePath(this.getConfiguration().getTlsCertificateFilePath()); |
| |
| if (this.getConfiguration().isBrokerClientTlsEnabled()) { |
| if (this.getConfiguration().isBrokerClientTlsEnabledWithKeyStore()) { |
| builder.useKeyStoreTls(true) |
| .tlsTrustStoreType(this.getConfiguration().getBrokerClientTlsTrustStoreType()) |
| .tlsTrustStorePath(this.getConfiguration().getBrokerClientTlsTrustStore()) |
| .tlsTrustStorePassword(this.getConfiguration().getBrokerClientTlsTrustStorePassword()); |
| } else { |
| builder.tlsTrustCertsFilePath( |
| isNotBlank(this.getConfiguration().getBrokerClientTrustCertsFilePath()) |
| ? this.getConfiguration().getBrokerClientTrustCertsFilePath() |
| : this.getConfiguration().getTlsCertificateFilePath()); |
| } |
| } |
| |
| if (isNotBlank(this.getConfiguration().getBrokerClientAuthenticationPlugin())) { |
| builder.authentication(this.getConfiguration().getBrokerClientAuthenticationPlugin(), |
| this.getConfiguration().getBrokerClientAuthenticationParameters()); |
| } |
| this.client = builder.build(); |
| } catch (Exception e) { |
| throw new PulsarServerException(e); |
| } |
| } |
| return this.client; |
| } |
| |
| public synchronized PulsarAdmin getAdminClient() throws PulsarServerException { |
| if (this.adminClient == null) { |
| try { |
| ServiceConfiguration conf = this.getConfiguration(); |
| final String adminApiUrl = conf.isBrokerClientTlsEnabled() ? webServiceAddressTls : webServiceAddress; |
| if (adminApiUrl == null) { |
| throw new IllegalArgumentException("Web service address was not set properly " |
| + ", isBrokerClientTlsEnabled: " + conf.isBrokerClientTlsEnabled() |
| + ", webServiceAddressTls: " + webServiceAddressTls |
| + ", webServiceAddress: " + webServiceAddress); |
| } |
| PulsarAdminBuilder builder = PulsarAdmin.builder().serviceHttpUrl(adminApiUrl) // |
| .authentication(// |
| conf.getBrokerClientAuthenticationPlugin(), // |
| conf.getBrokerClientAuthenticationParameters()); |
| |
| if (conf.isBrokerClientTlsEnabled()) { |
| if (conf.isBrokerClientTlsEnabledWithKeyStore()) { |
| builder.useKeyStoreTls(true) |
| .tlsTrustStoreType(conf.getBrokerClientTlsTrustStoreType()) |
| .tlsTrustStorePath(conf.getBrokerClientTlsTrustStore()) |
| .tlsTrustStorePassword(conf.getBrokerClientTlsTrustStorePassword()); |
| } else { |
| builder.tlsTrustCertsFilePath(conf.getBrokerClientTrustCertsFilePath()); |
| } |
| builder.allowTlsInsecureConnection(conf.isTlsAllowInsecureConnection()); |
| } |
| |
| // most of the admin request requires to make zk-call so, keep the max read-timeout based on |
| // zk-operation timeout |
| builder.readTimeout(conf.getZooKeeperOperationTimeoutSeconds(), TimeUnit.SECONDS); |
| |
| this.adminClient = builder.build(); |
| LOG.info("created admin with url {} ", adminApiUrl); |
| } catch (Exception e) { |
| throw new PulsarServerException(e); |
| } |
| } |
| |
| return this.adminClient; |
| } |
| |
| public MetricsGenerator getMetricsGenerator() { |
| return metricsGenerator; |
| } |
| |
| public TransactionMetadataStoreService getTransactionMetadataStoreService() { |
| return transactionMetadataStoreService; |
| } |
| |
| public TransactionBufferProvider getTransactionBufferProvider() { |
| return transactionBufferProvider; |
| } |
| |
| public TransactionBufferClient getTransactionBufferClient() { |
| return transactionBufferClient; |
| } |
| |
| public ShutdownService getShutdownService() { |
| return shutdownService; |
| } |
| |
| /** |
| * Advertised service address. |
| * |
| * @return Hostname or IP address the service advertises to the outside world. |
| */ |
| public static String advertisedAddress(ServiceConfiguration config) { |
| return ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress()); |
| } |
| |
| private String brokerUrl(ServiceConfiguration config) { |
| if (config.getBrokerServicePort().isPresent()) { |
| return brokerUrl(advertisedAddress(config), getBrokerListenPort().get()); |
| } else { |
| return null; |
| } |
| } |
| |
| public static String brokerUrl(String host, int port) { |
| return String.format("pulsar://%s:%d", host, port); |
| } |
| |
| public String brokerUrlTls(ServiceConfiguration config) { |
| if (config.getBrokerServicePortTls().isPresent()) { |
| return brokerUrlTls(advertisedAddress(config), getBrokerListenPortTls().get()); |
| } else { |
| return null; |
| } |
| } |
| |
| public static String brokerUrlTls(String host, int port) { |
| return String.format("pulsar+ssl://%s:%d", host, port); |
| } |
| |
| public String webAddress(ServiceConfiguration config) { |
| if (config.getWebServicePort().isPresent()) { |
| return webAddress(advertisedAddress(config), getListenPortHTTP().get()); |
| } else { |
| return null; |
| } |
| } |
| |
| public static String webAddress(String host, int port) { |
| return String.format("http://%s:%d", host, port); |
| } |
| |
| public String webAddressTls(ServiceConfiguration config) { |
| if (config.getWebServicePortTls().isPresent()) { |
| return webAddressTls(advertisedAddress(config), getListenPortHTTPS().get()); |
| } else { |
| return null; |
| } |
| } |
| |
| public static String webAddressTls(String host, int port) { |
| return String.format("https://%s:%d", host, port); |
| } |
| |
| public String getSafeWebServiceAddress() { |
| return webServiceAddress != null ? webServiceAddress : webServiceAddressTls; |
| } |
| |
| public String getSafeBrokerServiceUrl() { |
| return brokerServiceUrl != null ? brokerServiceUrl : brokerServiceUrlTls; |
| } |
| |
| /** |
| * Get bookkeeper metadata service uri. |
| * |
| * @param config broker configuration |
| * @return the metadata service uri that bookkeeper is used |
| */ |
| public static String bookieMetadataServiceUri(ServiceConfiguration config) { |
| ClientConfiguration bkConf = new ClientConfiguration(); |
| // init bookkeeper metadata service uri |
| String metadataServiceUri = null; |
| try { |
| String zkServers = config.getZookeeperServers(); |
| bkConf.setZkServers(zkServers); |
| metadataServiceUri = bkConf.getMetadataServiceUri(); |
| } catch (ConfigurationException e) { |
| LOG.error("Failed to get bookkeeper metadata service uri", e); |
| } |
| return metadataServiceUri; |
| } |
| |
| public TopicPoliciesService getTopicPoliciesService() { |
| return topicPoliciesService; |
| } |
| |
| public void addPrometheusRawMetricsProvider(PrometheusRawMetricsProvider metricsProvider) { |
| if (metricsServlet == null) { |
| if (pendingMetricsProviders == null) { |
| pendingMetricsProviders = new LinkedList<>(); |
| } |
| pendingMetricsProviders.add(metricsProvider); |
| } else { |
| this.metricsServlet.addRawMetricsProvider(metricsProvider); |
| } |
| } |
| |
| private void startWorkerService(AuthenticationService authenticationService, |
| AuthorizationService authorizationService) |
| throws Exception { |
| if (functionWorkerService.isPresent()) { |
| if (workerConfig.isUseTls()) { |
| workerConfig.setPulsarServiceUrl(brokerServiceUrlTls); |
| workerConfig.setPulsarWebServiceUrl(webServiceAddressTls); |
| workerConfig.setFunctionWebServiceUrl(webServiceAddressTls); |
| } else { |
| workerConfig.setPulsarServiceUrl(brokerServiceUrl); |
| workerConfig.setPulsarWebServiceUrl(webServiceAddress); |
| workerConfig.setFunctionWebServiceUrl(webServiceAddress); |
| } |
| |
| LOG.info("Starting function worker service: serviceUrl = {}," |
| + " webServiceUrl = {}, functionWebServiceUrl = {}", |
| workerConfig.getPulsarServiceUrl(), |
| workerConfig.getPulsarWebServiceUrl(), |
| workerConfig.getFunctionWebServiceUrl()); |
| |
| functionWorkerService.get().initInBroker( |
| config, |
| workerConfig, |
| pulsarResources, |
| getConfigurationCacheService(), |
| getInternalConfigurationData() |
| ); |
| |
| // TODO figure out how to handle errors from function worker service |
| functionWorkerService.get().start( |
| authenticationService, |
| authorizationService, |
| ErrorNotifier.getShutdownServiceImpl(shutdownService)); |
| LOG.info("Function worker service started"); |
| } |
| } |
| |
| private void startPackagesManagementService() throws IOException { |
| // TODO: using provider to initialize the packages management service. |
| this.packagesManagement = new PackagesManagementImpl(); |
| PackagesStorageProvider storageProvider = PackagesStorageProvider |
| .newProvider(config.getPackagesManagementStorageProvider()); |
| DefaultPackagesStorageConfiguration storageConfiguration = new DefaultPackagesStorageConfiguration(); |
| storageConfiguration.setProperty(config.getProperties()); |
| PackagesStorage storage = storageProvider.getStorage(storageConfiguration); |
| storage.initialize(); |
| packagesManagement.initialize(storage); |
| } |
| |
| public Optional<Integer> getListenPortHTTP() { |
| return webService.getListenPortHTTP(); |
| } |
| |
| public Optional<Integer> getListenPortHTTPS() { |
| return webService.getListenPortHTTPS(); |
| } |
| |
| public Optional<Integer> getBrokerListenPort() { |
| return brokerService.getListenPort(); |
| } |
| |
| public Optional<Integer> getBrokerListenPortTls() { |
| return brokerService.getListenPortTls(); |
| } |
| |
| public MetadataStoreExtended getLocalMetadataStore() { |
| return localMetadataStore; |
| } |
| |
| public CoordinationService getCoordinationService() { |
| return coordinationService; |
| } |
| |
| public static WorkerConfig initializeWorkerConfigFromBrokerConfig(ServiceConfiguration brokerConfig, |
| String workerConfigFile) throws IOException { |
| WorkerConfig workerConfig = WorkerConfig.load(workerConfigFile); |
| |
| brokerConfig.getWebServicePort() |
| .map(port -> workerConfig.setWorkerPort(port)); |
| brokerConfig.getWebServicePortTls() |
| .map(port -> workerConfig.setWorkerPortTls(port)); |
| |
| // worker talks to local broker |
| String hostname = ServiceConfigurationUtils.getDefaultOrConfiguredAddress( |
| brokerConfig.getAdvertisedAddress()); |
| workerConfig.setWorkerHostname(hostname); |
| // inherit broker authorization setting |
| workerConfig.setAuthenticationEnabled(brokerConfig.isAuthenticationEnabled()); |
| workerConfig.setAuthenticationProviders(brokerConfig.getAuthenticationProviders()); |
| workerConfig.setAuthorizationEnabled(brokerConfig.isAuthorizationEnabled()); |
| workerConfig.setAuthorizationProvider(brokerConfig.getAuthorizationProvider()); |
| workerConfig.setConfigurationStoreServers(brokerConfig.getConfigurationStoreServers()); |
| workerConfig.setZooKeeperSessionTimeoutMillis(brokerConfig.getZooKeeperSessionTimeoutMillis()); |
| workerConfig.setZooKeeperOperationTimeoutSeconds(brokerConfig.getZooKeeperOperationTimeoutSeconds()); |
| |
| workerConfig.setTlsAllowInsecureConnection(brokerConfig.isTlsAllowInsecureConnection()); |
| workerConfig.setTlsEnabled(brokerConfig.isTlsEnabled()); |
| workerConfig.setTlsEnableHostnameVerification(false); |
| workerConfig.setBrokerClientTrustCertsFilePath(brokerConfig.getTlsTrustCertsFilePath()); |
| |
| // client in worker will use this config to authenticate with broker |
| workerConfig.setBrokerClientAuthenticationPlugin(brokerConfig.getBrokerClientAuthenticationPlugin()); |
| workerConfig.setBrokerClientAuthenticationParameters(brokerConfig.getBrokerClientAuthenticationParameters()); |
| |
| // inherit super users |
| workerConfig.setSuperUserRoles(brokerConfig.getSuperUserRoles()); |
| |
| // inherit the nar package locations |
| if (isBlank(workerConfig.getFunctionsWorkerServiceNarPackage())) { |
| workerConfig.setFunctionsWorkerServiceNarPackage( |
| brokerConfig.getFunctionsWorkerServiceNarPackage()); |
| } |
| |
| workerConfig.setWorkerId( |
| "c-" + brokerConfig.getClusterName() |
| + "-fw-" + hostname |
| + "-" + (workerConfig.getTlsEnabled() |
| ? workerConfig.getWorkerPortTls() : workerConfig.getWorkerPort())); |
| return workerConfig; |
| } |
| } |