blob: ba136e7c910586b517d9fde5bde2c2c3bf166b7a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar;
import static org.apache.pulsar.common.naming.NamespaceName.SYSTEM_NAMESPACE;
import static org.apache.pulsar.common.naming.SystemTopicNames.TRANSACTION_COORDINATOR_ASSIGN;
import com.beust.jcommander.Parameter;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import io.netty.util.internal.PlatformDependent;
import java.io.File;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.commons.lang3.StringUtils;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.resources.NamespaceResources;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.common.naming.NamespaceName;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.ClusterData;
import org.apache.pulsar.common.policies.data.TenantInfo;
import org.apache.pulsar.common.util.ShutdownUtil;
import org.apache.pulsar.functions.instance.state.PulsarMetadataStateStoreProviderImpl;
import org.apache.pulsar.functions.worker.WorkerConfig;
import org.apache.pulsar.functions.worker.WorkerService;
import org.apache.pulsar.functions.worker.service.WorkerServiceLoader;
import org.apache.pulsar.metadata.api.extended.MetadataStoreExtended;
import org.apache.pulsar.metadata.bookkeeper.BKCluster;
import org.apache.pulsar.metadata.impl.ZKMetadataStore;
import org.apache.pulsar.packages.management.storage.filesystem.FileSystemPackagesStorageProvider;
import org.apache.pulsar.zookeeper.LocalBookkeeperEnsemble;
@Slf4j
public class PulsarStandalone implements AutoCloseable {
private static final String PULSAR_STANDALONE_USE_ZOOKEEPER = "PULSAR_STANDALONE_USE_ZOOKEEPER";
PulsarService broker;
// This is used in compatibility mode
LocalBookkeeperEnsemble bkEnsemble;
// This is used from Pulsar 2.11 on, with new default settings
BKCluster bkCluster;
MetadataStoreExtended metadataStore;
ServiceConfiguration config;
WorkerService fnWorkerService;
WorkerConfig workerConfig;
public void setBroker(PulsarService broker) {
this.broker = broker;
}
public void setBkEnsemble(LocalBookkeeperEnsemble bkEnsemble) {
this.bkEnsemble = bkEnsemble;
}
public void setBkPort(int bkPort) {
this.bkPort = bkPort;
}
public void setBkDir(String bkDir) {
this.bkDir = bkDir;
}
public void setAdvertisedAddress(String advertisedAddress) {
this.advertisedAddress = advertisedAddress;
}
public void setConfig(ServiceConfiguration config) {
this.config = config;
}
public void setFnWorkerService(WorkerService fnWorkerService) {
this.fnWorkerService = fnWorkerService;
}
public void setConfigFile(String configFile) {
this.configFile = configFile;
}
public void setWipeData(boolean wipeData) {
this.wipeData = wipeData;
}
public void setNumOfBk(int numOfBk) {
this.numOfBk = numOfBk;
}
public void setZkPort(int zkPort) {
this.zkPort = zkPort;
}
public void setZkDir(String zkDir) {
this.zkDir = zkDir;
}
public void setNoBroker(boolean noBroker) {
this.noBroker = noBroker;
}
public void setOnlyBroker(boolean onlyBroker) {
this.onlyBroker = onlyBroker;
}
public void setNoFunctionsWorker(boolean noFunctionsWorker) {
this.noFunctionsWorker = noFunctionsWorker;
}
public void setFnWorkerConfigFile(String fnWorkerConfigFile) {
this.fnWorkerConfigFile = fnWorkerConfigFile;
}
public void setNoStreamStorage(boolean noStreamStorage) {
this.noStreamStorage = noStreamStorage;
}
public void setStreamStoragePort(int streamStoragePort) {
this.streamStoragePort = streamStoragePort;
}
public void setHelp(boolean help) {
this.help = help;
}
public ServiceConfiguration getConfig() {
return config;
}
public String getConfigFile() {
return configFile;
}
public boolean isWipeData() {
return wipeData;
}
public int getNumOfBk() {
return numOfBk;
}
public int getZkPort() {
return zkPort;
}
public int getBkPort() {
return bkPort;
}
public String getZkDir() {
return zkDir;
}
public String getBkDir() {
return bkDir;
}
public boolean isNoBroker() {
return noBroker;
}
public boolean isOnlyBroker() {
return onlyBroker;
}
public boolean isNoFunctionsWorker() {
return noFunctionsWorker;
}
public String getFnWorkerConfigFile() {
return fnWorkerConfigFile;
}
public boolean isNoStreamStorage() {
return noStreamStorage;
}
public int getStreamStoragePort() {
return streamStoragePort;
}
public String getAdvertisedAddress() {
return advertisedAddress;
}
public boolean isHelp() {
return help;
}
@Parameter(names = { "-c", "--config" }, description = "Configuration file path")
private String configFile;
@Parameter(names = { "--wipe-data" }, description = "Clean up previous ZK/BK data")
private boolean wipeData = false;
@Parameter(names = { "--num-bookies" }, description = "Number of local Bookies")
private int numOfBk = 1;
@Parameter(names = { "--metadata-dir" },
description = "Directory for storing metadata")
private String metadataDir = "data/metadata";
@Parameter(names = { "--metadata-url" },
description = "Metadata store url")
private String metadataStoreUrl = "";
@Parameter(names = {"--zookeeper-port"}, description = "Local zookeeper's port",
hidden = true)
private int zkPort = 2181;
@Parameter(names = { "--bookkeeper-port" }, description = "Local bookies base port")
private int bkPort = 3181;
@Parameter(names = { "--zookeeper-dir" },
description = "Local zooKeeper's data directory",
hidden = true)
private String zkDir = "data/standalone/zookeeper";
@Parameter(names = { "--bookkeeper-dir" }, description = "Local bookies base data directory")
private String bkDir = "data/standalone/bookkeeper";
@Parameter(names = { "--no-broker" }, description = "Only start ZK and BK services, no broker")
private boolean noBroker = false;
@Parameter(names = { "--only-broker" }, description = "Only start Pulsar broker service (no ZK, BK)")
private boolean onlyBroker = false;
@Parameter(names = {"-nfw", "--no-functions-worker"}, description = "Run functions worker with Broker")
private boolean noFunctionsWorker = false;
@Parameter(names = {"-fwc", "--functions-worker-conf"}, description = "Configuration file for Functions Worker")
private String fnWorkerConfigFile = "conf/functions_worker.yml";
@Parameter(names = {"-nss", "--no-stream-storage"}, description = "Disable stream storage")
private boolean noStreamStorage = false;
@Parameter(names = { "--stream-storage-port" }, description = "Local bookies stream storage port")
private int streamStoragePort = 4181;
@Parameter(names = { "-a", "--advertised-address" }, description = "Standalone broker advertised address")
private String advertisedAddress = null;
@Parameter(names = { "-h", "--help" }, description = "Show this help message")
private boolean help = false;
private boolean usingNewDefaultsPIP117;
public void start() throws Exception {
if (config == null) {
log.error("Failed to load configuration");
System.exit(1);
}
String forceUseZookeeperEnv = System.getenv(PULSAR_STANDALONE_USE_ZOOKEEPER);
// Allow forcing to use ZK mode via an env variable. eg:
// PULSAR_STANDALONE_USE_ZOOKEEPER=1
if (StringUtils.equalsAnyIgnoreCase(forceUseZookeeperEnv, "1", "true")) {
usingNewDefaultsPIP117 = false;
log.info("Forcing to chose ZooKeeper metadata through environment variable");
} else if (Paths.get(zkDir).toFile().exists()) {
log.info("Found existing ZooKeeper metadata. Continuing with ZooKeeper");
usingNewDefaultsPIP117 = false;
} else {
// There's no existing ZK data directory, or we're already using RocksDB for metadata
usingNewDefaultsPIP117 = true;
}
log.debug("--- setup PulsarStandaloneStarter ---");
if (!this.isOnlyBroker()) {
if (usingNewDefaultsPIP117) {
startBookieWithMetadataStore();
} else {
startBookieWithZookeeper();
}
}
if (this.isNoBroker()) {
return;
}
// initialize the functions worker
if (!this.isNoFunctionsWorker()) {
final String filepath = Path.of(getFnWorkerConfigFile()).toAbsolutePath().normalize().toString();
workerConfig = PulsarService.initializeWorkerConfigFromBrokerConfig(config, filepath);
if (usingNewDefaultsPIP117) {
workerConfig.setStateStorageProviderImplementation(
PulsarMetadataStateStoreProviderImpl.class.getName());
config.setEnablePackagesManagement(true);
config.setFunctionsWorkerEnablePackageManagement(true);
workerConfig.setFunctionsWorkerEnablePackageManagement(true);
config.setPackagesManagementStorageProvider(FileSystemPackagesStorageProvider.class.getName());
} else {
// worker talks to local broker
if (this.isNoStreamStorage()) {
// only set the state storage service url when state is enabled.
workerConfig.setStateStorageServiceUrl(null);
} else if (workerConfig.getStateStorageServiceUrl() == null) {
workerConfig.setStateStorageServiceUrl("bk://127.0.0.1:" + this.getStreamStoragePort());
}
}
fnWorkerService = WorkerServiceLoader.load(workerConfig);
} else {
workerConfig = new WorkerConfig();
}
config.setRunningStandalone(true);
if (!usingNewDefaultsPIP117) {
final String metadataStoreUrl =
ZKMetadataStore.ZK_SCHEME_IDENTIFIER + "localhost:" + this.getZkPort();
config.setMetadataStoreUrl(metadataStoreUrl);
config.setConfigurationMetadataStoreUrl(metadataStoreUrl);
config.getProperties().setProperty("metadataStoreUrl", metadataStoreUrl);
config.getProperties().setProperty("configurationMetadataStoreUrl", metadataStoreUrl);
}
// Start Broker
broker = new PulsarService(config,
workerConfig,
Optional.ofNullable(fnWorkerService),
PulsarStandalone::processTerminator);
broker.start();
final String cluster = config.getClusterName();
//create default namespace
createNameSpace(cluster, TopicName.PUBLIC_TENANT,
NamespaceName.get(TopicName.PUBLIC_TENANT, TopicName.DEFAULT_NAMESPACE));
//create pulsar system namespace
createNameSpace(cluster, SYSTEM_NAMESPACE.getTenant(), SYSTEM_NAMESPACE);
if (config.isTransactionCoordinatorEnabled()) {
NamespaceResources.PartitionedTopicResources partitionedTopicResources =
broker.getPulsarResources().getNamespaceResources().getPartitionedTopicResources();
Optional<PartitionedTopicMetadata> getResult =
partitionedTopicResources.getPartitionedTopicMetadataAsync(TRANSACTION_COORDINATOR_ASSIGN).get();
if (!getResult.isPresent()) {
partitionedTopicResources.createPartitionedTopic(TRANSACTION_COORDINATOR_ASSIGN,
new PartitionedTopicMetadata(1));
}
}
log.debug("--- setup completed ---");
}
private void createNameSpace(String cluster, String publicTenant, NamespaceName ns) throws Exception {
PulsarAdmin admin = broker.getAdminClient();
try {
final List<String> clusters = admin.clusters().getClusters();
if (!clusters.contains(cluster)) {
admin.clusters().createCluster(cluster, ClusterData.builder()
.serviceUrl(broker.getWebServiceAddress())
.serviceUrlTls(broker.getWebServiceAddressTls())
.brokerServiceUrl(broker.getBrokerServiceUrl())
.brokerServiceUrlTls(broker.getBrokerServiceUrlTls())
.build());
}
final List<String> tenants = admin.tenants().getTenants();
if (!tenants.contains(publicTenant)) {
admin.tenants().createTenant(publicTenant, TenantInfo.builder()
.adminRoles(Sets.newHashSet(config.getSuperUserRoles()))
.allowedClusters(Sets.newHashSet(cluster))
.build());
}
final List<String> namespaces = admin.namespaces().getNamespaces(publicTenant);
if (!namespaces.contains(ns.toString())) {
admin.namespaces().createNamespace(ns.toString(), config.getDefaultNumberOfNamespaceBundles());
}
} catch (PulsarAdminException e) {
log.error("Failed to create namespace {} on cluster {} and tenant {}", ns, cluster, publicTenant, e);
}
}
/** This method gets a builder to build an embedded pulsar instance
* i.e.
* <pre>
* <code>
* PulsarStandalone pulsarStandalone = PulsarStandalone.builder().build();
* pulsarStandalone.start();
* pulsarStandalone.stop();
* </code>
* </pre>
* @return PulsarStandaloneBuilder instance
*/
public static PulsarStandaloneBuilder builder(){
return PulsarStandaloneBuilder.instance();
}
@Override
public void close() {
try {
if (fnWorkerService != null) {
fnWorkerService.stop();
}
if (broker != null) {
broker.close();
}
if (bkCluster != null) {
bkCluster.close();
}
if (bkEnsemble != null) {
bkEnsemble.stop();
}
} catch (Exception e) {
log.error("Shutdown failed: {}", e.getMessage(), e);
}
}
@VisibleForTesting
void startBookieWithMetadataStore() throws Exception {
if (StringUtils.isBlank(metadataStoreUrl)){
log.info("Starting BK with RocksDb metadata store");
metadataStoreUrl = "rocksdb://" + Paths.get(metadataDir).toAbsolutePath();
} else {
log.info("Starting BK with metadata store: {}", metadataStoreUrl);
}
ServerConfiguration bkServerConf = new ServerConfiguration();
bkServerConf.loadConf(new File(configFile).toURI().toURL());
calculateCacheSize(bkServerConf);
bkCluster = BKCluster.builder()
.baseServerConfiguration(bkServerConf)
.metadataServiceUri(metadataStoreUrl)
.bkPort(bkPort)
.numBookies(numOfBk)
.dataDir(bkDir)
.clearOldData(wipeData)
.build();
config.setBookkeeperNumberOfChannelsPerBookie(1);
config.setMetadataStoreUrl(metadataStoreUrl);
}
private void startBookieWithZookeeper() throws Exception {
log.info("Starting BK & ZK cluster");
ServerConfiguration bkServerConf = new ServerConfiguration();
bkServerConf.loadConf(new File(configFile).toURI().toURL());
calculateCacheSize(bkServerConf);
// Start LocalBookKeeper
bkEnsemble = new LocalBookkeeperEnsemble(
this.getNumOfBk(), this.getZkPort(), this.getBkPort(), this.getStreamStoragePort(), this.getZkDir(),
this.getBkDir(), this.isWipeData(), "127.0.0.1");
bkEnsemble.startStandalone(bkServerConf, !this.isNoStreamStorage());
config.setMetadataStoreUrl("zk:127.0.0.1:" + zkPort);
}
private void calculateCacheSize(ServerConfiguration bkServerConf) {
String writeCacheMaxSizeMb = "dbStorage_writeCacheMaxSizeMb";
String readAheadCacheMaxSizeMb = "dbStorage_readAheadCacheMaxSizeMb";
Object writeCache = bkServerConf.getProperty(writeCacheMaxSizeMb);
Object readCache = bkServerConf.getProperty(readAheadCacheMaxSizeMb);
// we need to add one broker and one zk (if needed) to calculate the default cache
int instanceCount = usingNewDefaultsPIP117 ? (1 + numOfBk) : (2 + numOfBk);
long defaultCacheMB = PlatformDependent.maxDirectMemory() / (1024 * 1024) / instanceCount / 4;
if (writeCache == null || writeCache.equals("")) {
bkServerConf.setProperty(writeCacheMaxSizeMb, defaultCacheMB);
}
if (readCache == null || readCache.equals("")) {
bkServerConf.setProperty(readAheadCacheMaxSizeMb, defaultCacheMB);
}
}
private static void processTerminator(int exitCode) {
log.info("Halting standalone process with code {}", exitCode);
ShutdownUtil.triggerImmediateForcefulShutdown(exitCode);
}
}