blob: 77f21df1a30c11387feabe4d1618c7e103b6b997 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.tests.integration.topologies;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.pulsar.tests.integration.containers.PulsarContainer.BROKER_HTTPS_PORT;
import static org.apache.pulsar.tests.integration.containers.PulsarContainer.BROKER_HTTP_PORT;
import static org.apache.pulsar.tests.integration.containers.PulsarContainer.BROKER_PORT_TLS;
import static org.apache.pulsar.tests.integration.containers.PulsarContainer.CS_PORT;
import static org.apache.pulsar.tests.integration.containers.PulsarContainer.PULSAR_CONTAINERS_LEAVE_RUNNING;
import static org.apache.pulsar.tests.integration.containers.PulsarContainer.ZK_PORT;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.IOUtils;
import org.apache.pulsar.client.impl.auth.AuthenticationTls;
import org.apache.pulsar.tests.integration.containers.BKContainer;
import org.apache.pulsar.tests.integration.containers.BrokerContainer;
import org.apache.pulsar.tests.integration.containers.CSContainer;
import org.apache.pulsar.tests.integration.containers.ProxyContainer;
import org.apache.pulsar.tests.integration.containers.PulsarContainer;
import org.apache.pulsar.tests.integration.containers.WorkerContainer;
import org.apache.pulsar.tests.integration.containers.ZKContainer;
import org.apache.pulsar.tests.integration.docker.ContainerExecResult;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
/**
* Pulsar Cluster in containers.
*/
@Slf4j
public class PulsarCluster {
public static final String ADMIN_SCRIPT = "/pulsar/bin/pulsar-admin";
public static final String CLIENT_SCRIPT = "/pulsar/bin/pulsar-client";
public static final String PULSAR_COMMAND_SCRIPT = "/pulsar/bin/pulsar";
public static final String CURL = "/usr/bin/curl";
/**
* Pulsar Cluster Spec
*
* @param spec pulsar cluster spec.
* @return the built pulsar cluster
*/
public static PulsarCluster forSpec(PulsarClusterSpec spec) {
CSContainer csContainer = new CSContainer(spec.clusterName)
.withNetwork(Network.newNetwork())
.withNetworkAliases(CSContainer.NAME);
return new PulsarCluster(spec, csContainer, false);
}
public static PulsarCluster forSpec(PulsarClusterSpec spec, CSContainer csContainer) {
return new PulsarCluster(spec, csContainer, true);
}
@Getter
private final PulsarClusterSpec spec;
@Getter
private final String clusterName;
private final Network network;
private final ZKContainer zkContainer;
private final CSContainer csContainer;
private final boolean sharedCsContainer;
private final Map<String, BKContainer> bookieContainers;
private final Map<String, BrokerContainer> brokerContainers;
private final Map<String, WorkerContainer> workerContainers;
private final ProxyContainer proxyContainer;
private Map<String, GenericContainer<?>> externalServices = Collections.emptyMap();
private Map<String, Map<String, String>> externalServiceEnvs;
private PulsarCluster(PulsarClusterSpec spec, CSContainer csContainer, boolean sharedCsContainer) {
this.spec = spec;
this.sharedCsContainer = sharedCsContainer;
this.clusterName = spec.clusterName();
this.network = csContainer.getNetwork();
this.zkContainer = new ZKContainer(clusterName);
this.zkContainer
.withNetwork(network)
.withNetworkAliases(appendClusterName(ZKContainer.NAME))
.withEnv("clusterName", clusterName)
.withEnv("zkServers", appendClusterName(ZKContainer.NAME))
.withEnv("configurationStore", CSContainer.NAME + ":" + CS_PORT)
.withEnv("forceSync", "no")
.withEnv("pulsarNode", appendClusterName("pulsar-broker-0"));
this.csContainer = csContainer;
this.bookieContainers = Maps.newTreeMap();
this.brokerContainers = Maps.newTreeMap();
this.workerContainers = Maps.newTreeMap();
this.proxyContainer = new ProxyContainer(appendClusterName("pulsar-proxy"), ProxyContainer.NAME, spec.enableTls)
.withNetwork(network)
.withNetworkAliases(appendClusterName("pulsar-proxy"))
.withEnv("zkServers", appendClusterName(ZKContainer.NAME))
.withEnv("zookeeperServers", appendClusterName(ZKContainer.NAME))
.withEnv("configurationStoreServers", CSContainer.NAME + ":" + CS_PORT)
.withEnv("clusterName", clusterName);
// enable mTLS
if (spec.enableTls) {
proxyContainer
.withEnv("webServicePortTls", String.valueOf(BROKER_HTTPS_PORT))
.withEnv("servicePortTls", String.valueOf(BROKER_PORT_TLS))
.withEnv("forwardAuthorizationCredentials", "true")
.withEnv("tlsRequireTrustedClientCertOnConnect", "true")
.withEnv("tlsAllowInsecureConnection", "false")
.withEnv("tlsCertificateFilePath", "/pulsar/certificate-authority/server-keys/proxy.cert.pem")
.withEnv("tlsKeyFilePath", "/pulsar/certificate-authority/server-keys/proxy.key-pk8.pem")
.withEnv("tlsTrustCertsFilePath", "/pulsar/certificate-authority/certs/ca.cert.pem")
.withEnv("brokerClientAuthenticationPlugin", AuthenticationTls.class.getName())
.withEnv("brokerClientAuthenticationParameters", String.format("tlsCertFile:%s,tlsKeyFile:%s", "/pulsar/certificate-authority/client-keys/admin.cert.pem", "/pulsar/certificate-authority/client-keys/admin.key-pk8.pem"))
.withEnv("tlsEnabledWithBroker", "true")
.withEnv("brokerClientTrustCertsFilePath", "/pulsar/certificate-authority/certs/ca.cert.pem")
.withEnv("brokerClientCertificateFilePath", "/pulsar/certificate-authority/server-keys/proxy.cert.pem")
.withEnv("brokerClientKeyFilePath", "/pulsar/certificate-authority/server-keys/proxy.key-pk8.pem");
}
if (spec.proxyEnvs != null) {
spec.proxyEnvs.forEach(this.proxyContainer::withEnv);
}
if (spec.proxyMountFiles != null) {
spec.proxyMountFiles.forEach(this.proxyContainer::withFileSystemBind);
}
// create bookies
bookieContainers.putAll(
runNumContainers("bookie", spec.numBookies(), (name) -> {
BKContainer bookieContainer = new BKContainer(clusterName, name)
.withNetwork(network)
.withNetworkAliases(appendClusterName(name))
.withEnv("zkServers", appendClusterName(ZKContainer.NAME))
.withEnv("useHostNameAsBookieID", "true")
// Disable fsyncs for tests since they're slow within the containers
.withEnv("journalSyncData", "false")
.withEnv("journalMaxGroupWaitMSec", "0")
.withEnv("clusterName", clusterName)
.withEnv("PULSAR_PREFIX_diskUsageWarnThreshold", "0.95")
.withEnv("diskUsageThreshold", "0.99")
.withEnv("PULSAR_PREFIX_diskUsageLwmThreshold", "0.97")
.withEnv("nettyMaxFrameSizeBytes", String.valueOf(spec.maxMessageSize));
if (spec.bookkeeperEnvs != null) {
bookieContainer.withEnv(spec.bookkeeperEnvs);
}
if (spec.bookieAdditionalPorts != null) {
spec.bookieAdditionalPorts.forEach(bookieContainer::addExposedPort);
}
return bookieContainer;
})
);
// create brokers
brokerContainers.putAll(
runNumContainers("broker", spec.numBrokers(), (name) -> {
BrokerContainer brokerContainer = new BrokerContainer(clusterName, appendClusterName(name), spec.enableTls)
.withNetwork(network)
.withNetworkAliases(appendClusterName(name))
.withEnv("zkServers", appendClusterName(ZKContainer.NAME))
.withEnv("zookeeperServers", appendClusterName(ZKContainer.NAME))
.withEnv("configurationStoreServers", CSContainer.NAME + ":" + CS_PORT)
.withEnv("clusterName", clusterName)
.withEnv("brokerServiceCompactionMonitorIntervalInSeconds", "1")
.withEnv("loadBalancerOverrideBrokerNicSpeedGbps", "1")
// used in s3 tests
.withEnv("AWS_ACCESS_KEY_ID", "accesskey").withEnv("AWS_SECRET_KEY", "secretkey")
.withEnv("maxMessageSize", "" + spec.maxMessageSize);
if (spec.enableTls) {
// enable mTLS
brokerContainer
.withEnv("webServicePortTls", String.valueOf(BROKER_HTTPS_PORT))
.withEnv("brokerServicePortTls", String.valueOf(BROKER_PORT_TLS))
.withEnv("authenticateOriginalAuthData", "true")
.withEnv("tlsAllowInsecureConnection", "false")
.withEnv("tlsRequireTrustedClientCertOnConnect", "true")
.withEnv("tlsTrustCertsFilePath", "/pulsar/certificate-authority/certs/ca.cert.pem")
.withEnv("tlsCertificateFilePath", "/pulsar/certificate-authority/server-keys/broker.cert.pem")
.withEnv("tlsKeyFilePath", "/pulsar/certificate-authority/server-keys/broker.key-pk8.pem");
}
if (spec.queryLastMessage) {
brokerContainer.withEnv("bookkeeperExplicitLacIntervalInMills", "10");
brokerContainer.withEnv("bookkeeperUseV2WireProtocol", "false");
}
if (spec.brokerEnvs != null) {
brokerContainer.withEnv(spec.brokerEnvs);
}
if (spec.brokerMountFiles != null) {
spec.brokerMountFiles.forEach(brokerContainer::withFileSystemBind);
}
if (spec.brokerAdditionalPorts() != null) {
spec.brokerAdditionalPorts().forEach(brokerContainer::addExposedPort);
}
return brokerContainer;
}
));
spec.classPathVolumeMounts.forEach((key, value) -> {
zkContainer.withClasspathResourceMapping(key, value, BindMode.READ_WRITE);
proxyContainer.withClasspathResourceMapping(key, value, BindMode.READ_WRITE);
bookieContainers.values().forEach(c -> c.withClasspathResourceMapping(key, value, BindMode.READ_WRITE));
brokerContainers.values().forEach(c -> c.withClasspathResourceMapping(key, value, BindMode.READ_WRITE));
workerContainers.values().forEach(c -> c.withClasspathResourceMapping(key, value, BindMode.READ_WRITE));
});
}
public String getPlainTextServiceUrl() {
return proxyContainer.getPlainTextServiceUrl();
}
public String getHttpServiceUrl() {
return proxyContainer.getHttpServiceUrl();
}
public String getAnyBrokersHttpsServiceUrl() {
return getAnyBroker().getHttpsServiceUrl();
}
public String getAnyBrokersServiceUrlTls() {
return getAnyBroker().getServiceUrlTls();
}
public String getAllBrokersHttpServiceUrl() {
String multiUrl = "http://";
Iterator<BrokerContainer> brokers = getBrokers().iterator();
while (brokers.hasNext()) {
BrokerContainer broker = brokers.next();
multiUrl += broker.getHost() + ":" + broker.getMappedPort(BROKER_HTTP_PORT);
if (brokers.hasNext()) {
multiUrl += ",";
}
}
return multiUrl;
}
public String getZKConnString() {
return zkContainer.getHost() + ":" + zkContainer.getMappedPort(ZK_PORT);
}
public String getCSConnString() {
return csContainer.getHost() + ":" + csContainer.getMappedPort(CS_PORT);
}
public Network getNetwork() {
return network;
}
public Map<String, GenericContainer<?>> getExternalServices() {
return externalServices;
}
public void start() throws Exception {
// start the local zookeeper
zkContainer.start();
log.info("Successfully started local zookeeper container.");
// start the configuration store
if (!sharedCsContainer) {
csContainer.start();
log.info("Successfully started configuration store container.");
}
// init the cluster
zkContainer.execCmd(
"bin/init-cluster.sh");
log.info("Successfully initialized the cluster.");
// start bookies
bookieContainers.values().forEach(BKContainer::start);
log.info("Successfully started {} bookie containers.", bookieContainers.size());
// start brokers
this.startAllBrokers();
log.info("Successfully started {} broker containers.", brokerContainers.size());
// create proxy
proxyContainer.start();
log.info("Successfully started pulsar proxy.");
log.info("Pulsar cluster {} is up running:", clusterName);
log.info("\tBinary Service Url : {}", getPlainTextServiceUrl());
log.info("\tHttp Service Url : {}", getHttpServiceUrl());
// start external services
this.externalServices = spec.externalServices;
this.externalServiceEnvs = spec.externalServiceEnvs;
if (null != externalServices) {
externalServices.entrySet().parallelStream().forEach(service -> {
GenericContainer<?> serviceContainer = service.getValue();
serviceContainer.withNetwork(network);
serviceContainer.withNetworkAliases(service.getKey());
if (null != externalServiceEnvs && null != externalServiceEnvs.get(service.getKey())) {
Map<String, String> env = externalServiceEnvs.getOrDefault(service.getKey(), Collections.emptyMap());
serviceContainer.withEnv(env);
}
PulsarContainer.configureLeaveContainerRunning(serviceContainer);
serviceContainer.start();
log.info("Successfully started external service {}.", service.getKey());
});
}
}
public void startService(String networkAlias,
GenericContainer<?> serviceContainer) {
log.info("Starting external service {} ...", networkAlias);
serviceContainer.withNetwork(network);
serviceContainer.withNetworkAliases(networkAlias);
PulsarContainer.configureLeaveContainerRunning(serviceContainer);
serviceContainer.start();
log.info("Successfully start external service {}", networkAlias);
}
public static void stopService(String networkAlias,
GenericContainer<?> serviceContainer) {
if (PULSAR_CONTAINERS_LEAVE_RUNNING) {
logIgnoringStopDueToLeaveRunning();
return;
}
log.info("Stopping external service {} ...", networkAlias);
serviceContainer.stop();
log.info("Successfully stop external service {}", networkAlias);
}
private static <T extends PulsarContainer> Map<String, T> runNumContainers(String serviceName,
int numContainers,
Function<String, T> containerCreator) {
Map<String, T> containers = Maps.newTreeMap();
for (int i = 0; i < numContainers; i++) {
String name = "pulsar-" + serviceName + "-" + i;
T container = containerCreator.apply(name);
containers.put(name, container);
}
return containers;
}
public synchronized void stop() {
if (PULSAR_CONTAINERS_LEAVE_RUNNING) {
logIgnoringStopDueToLeaveRunning();
return;
}
stopInParallel(workerContainers.values());
if (externalServices != null) {
stopInParallel(externalServices.values());
}
if (null != proxyContainer) {
proxyContainer.stop();
}
stopInParallel(brokerContainers.values());
stopInParallel(bookieContainers.values());
if (!sharedCsContainer && null != csContainer) {
csContainer.stop();
}
if (null != zkContainer) {
zkContainer.stop();
}
try {
network.close();
} catch (Exception e) {
log.info("Failed to shutdown network for pulsar cluster {}", clusterName, e);
}
}
private static void stopInParallel(Collection<? extends GenericContainer<?>> containers) {
containers.parallelStream()
.filter(Objects::nonNull)
.forEach(GenericContainer::stop);
}
public synchronized void setupFunctionWorkers(String suffix, FunctionRuntimeType runtimeType, int numFunctionWorkers) {
switch (runtimeType) {
case THREAD:
startFunctionWorkersWithThreadContainerFactory(suffix, numFunctionWorkers);
break;
case PROCESS:
startFunctionWorkersWithProcessContainerFactory(suffix, numFunctionWorkers);
break;
}
}
private void startFunctionWorkersWithProcessContainerFactory(String suffix, int numFunctionWorkers) {
workerContainers.putAll(runNumContainers(
"functions-worker-process-" + suffix,
numFunctionWorkers,
(name) -> createWorkerContainer(name)
));
this.startWorkers();
}
private WorkerContainer createWorkerContainer(String name) {
String serviceUrl = "pulsar://pulsar-broker-0:" + PulsarContainer.BROKER_PORT;
String httpServiceUrl = "http://pulsar-broker-0:" + PulsarContainer.BROKER_HTTP_PORT;
return new WorkerContainer(clusterName, name)
.withNetwork(network)
.withNetworkAliases(name)
// worker settings
.withEnv("PF_workerId", name)
.withEnv("PF_workerHostname", name)
.withEnv("PF_workerPort", "" + PulsarContainer.BROKER_HTTP_PORT)
.withEnv("PF_pulsarFunctionsCluster", clusterName)
.withEnv("PF_pulsarServiceUrl", serviceUrl)
.withEnv("PF_pulsarWebServiceUrl", httpServiceUrl)
// script
.withEnv("clusterName", clusterName)
.withEnv("zookeeperServers", ZKContainer.NAME)
// bookkeeper tools
.withEnv("zkServers", ZKContainer.NAME);
}
private void startFunctionWorkersWithThreadContainerFactory(String suffix, int numFunctionWorkers) {
workerContainers.putAll(runNumContainers(
"functions-worker-thread-" + suffix,
numFunctionWorkers,
(name) -> createWorkerContainer(name)
.withEnv("PF_functionRuntimeFactoryClassName",
"org.apache.pulsar.functions.runtime.thread.ThreadRuntimeFactory")
.withEnv("PF_functionRuntimeFactoryConfigs_threadGroupName", "pf-container-group")
));
this.startWorkers();
}
public synchronized void startWorkers() {
// Start workers that have been initialized
workerContainers.values().parallelStream().forEach(WorkerContainer::start);
log.info("Successfully started {} worker containers.", workerContainers.size());
}
public synchronized void stopWorker(String workerName) {
if (PULSAR_CONTAINERS_LEAVE_RUNNING) {
logIgnoringStopDueToLeaveRunning();
return;
}
// Stop the named worker.
WorkerContainer worker = workerContainers.get(workerName);
if (worker == null) {
log.warn("Failed to find the worker to stop ({})", workerName);
return;
}
worker.stop();
workerContainers.remove(workerName);
log.info("Worker {} stopped and removed from the map of worker containers", workerName);
}
public synchronized void stopWorkers() {
if (PULSAR_CONTAINERS_LEAVE_RUNNING) {
logIgnoringStopDueToLeaveRunning();
return;
}
// Stop workers that have been initialized
workerContainers.values().parallelStream().forEach(WorkerContainer::stop);
workerContainers.clear();
}
public void startContainers(Map<String, GenericContainer<?>> containers) {
containers.forEach((name, container) -> {
PulsarContainer.configureLeaveContainerRunning(container);
container
.withNetwork(network)
.withNetworkAliases(name)
.start();
log.info("Successfully start container {}.", name);
});
}
public static void stopContainers(Map<String, GenericContainer<?>> containers) {
if (PULSAR_CONTAINERS_LEAVE_RUNNING) {
logIgnoringStopDueToLeaveRunning();
return;
}
containers.values().parallelStream().forEach(GenericContainer::stop);
log.info("Successfully stop containers : {}", containers);
}
private static void logIgnoringStopDueToLeaveRunning() {
log.warn("Ignoring stop due to PULSAR_CONTAINERS_LEAVE_RUNNING=true.");
}
public BrokerContainer getAnyBroker() {
return getAnyContainer(brokerContainers, "pulsar-broker");
}
public synchronized WorkerContainer getAnyWorker() {
return getAnyContainer(workerContainers, "pulsar-functions-worker");
}
public synchronized List<WorkerContainer> getAlWorkers() {
return new ArrayList<WorkerContainer>(workerContainers.values());
}
public BrokerContainer getBroker(int index) {
return getAnyContainer(brokerContainers, "pulsar-broker", index);
}
public synchronized WorkerContainer getWorker(int index) {
return getAnyContainer(workerContainers, "pulsar-functions-worker", index);
}
public synchronized WorkerContainer getWorker(String workerName) {
return workerContainers.get(workerName);
}
private <T> T getAnyContainer(Map<String, T> containers, String serviceName) {
List<T> containerList = Lists.newArrayList();
containerList.addAll(containers.values());
Collections.shuffle(containerList);
checkArgument(!containerList.isEmpty(), "No " + serviceName + " is alive");
return containerList.get(0);
}
private <T> T getAnyContainer(Map<String, T> containers, String serviceName, int index) {
checkArgument(!containers.isEmpty(), "No " + serviceName + " is alive");
checkArgument((index >= 0 && index < containers.size()), "Index : " + index + " is out range");
return containers.get(serviceName.toLowerCase() + "-" + index);
}
public Collection<BrokerContainer> getBrokers() {
return brokerContainers.values();
}
public ProxyContainer getProxy() {
return proxyContainer;
}
public Collection<BKContainer> getBookies() {
return bookieContainers.values();
}
public ZKContainer getZooKeeper() {
return zkContainer;
}
public ContainerExecResult runAdminCommandOnAnyBroker(String...commands) throws Exception {
return runCommandOnAnyBrokerWithScript(ADMIN_SCRIPT, commands);
}
public ContainerExecResult runPulsarBaseCommandOnAnyBroker(String...commands) throws Exception {
return runCommandOnAnyBrokerWithScript(PULSAR_COMMAND_SCRIPT, commands);
}
private ContainerExecResult runCommandOnAnyBrokerWithScript(String scriptType, String...commands) throws Exception {
BrokerContainer container = getAnyBroker();
String[] cmds = new String[commands.length + 1];
cmds[0] = scriptType;
System.arraycopy(commands, 0, cmds, 1, commands.length);
return container.execCmd(cmds);
}
public void stopAllBrokers() {
brokerContainers.values().forEach(BrokerContainer::stop);
}
public void startAllBrokers() {
brokerContainers.values().forEach(BrokerContainer::start);
}
public void stopAllBookies() {
bookieContainers.values().forEach(BKContainer::stop);
}
public void startAllBookies() {
bookieContainers.values().forEach(BKContainer::start);
}
public void stopZooKeeper() {
zkContainer.stop();
}
public void startZooKeeper() {
zkContainer.start();
}
public ContainerExecResult createNamespace(String nsName) throws Exception {
return runAdminCommandOnAnyBroker(
"namespaces", "create", "public/" + nsName,
"--clusters", clusterName);
}
public ContainerExecResult createPartitionedTopic(String topicName, int partitions) throws Exception {
return runAdminCommandOnAnyBroker(
"topics", "create-partitioned-topic", topicName,
"-p", String.valueOf(partitions));
}
public ContainerExecResult enableDeduplication(String nsName, boolean enabled) throws Exception {
return runAdminCommandOnAnyBroker(
"namespaces", "set-deduplication", "public/" + nsName,
enabled ? "--enable" : "--disable");
}
public void dumpFunctionLogs(String name) {
for (WorkerContainer container : getAlWorkers()) {
log.info("Trying to get function {} logs from container {}", name, container.getContainerName());
try {
String logFile = "/pulsar/logs/functions/public/default/" + name + "/" + name + "-0.log";
String logs = container.<String>copyFileFromContainer(logFile, (inputStream) -> {
return IOUtils.toString(inputStream, "utf-8");
});
log.info("Function {} logs {}", name, logs);
} catch (com.github.dockerjava.api.exception.NotFoundException notFound) {
log.info("Cannot download {} logs from {} not found exception {}", name, container.getContainerName(), notFound.toString());
} catch (Throwable err) {
log.info("Cannot download {} logs from {}", name, container.getContainerName(), err);
}
}
}
private String appendClusterName(String name) {
return sharedCsContainer ? clusterName + "-" + name : name;
}
public BKContainer getAnyBookie() {
return getAnyContainer(bookieContainers, "bookie");
}
}