blob: 863bcb678f96e52832455575dea1b00571e5e889 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.statefun.e2e.common;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
import org.apache.flink.util.FileUtils;
import org.junit.rules.ExternalResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.images.builder.ImageFromDockerfile;
/**
* A JUnit {@link org.junit.rules.TestRule} that setups a containerized Stateful Functions
* application using <a href="https://www.testcontainers.org/">Testcontainers</a>. This allows
* composing end-to-end tests for Stateful Functions applications easier, by managing the
* containerized application as an external test resource whose lifecycle is integrated with the
* JUnit test framework.
*
* <h2>Example usage</h2>
*
* <pre>{@code
* public class MyE2E {
*
* {@code @Rule}
* public StatefulFunctionsAppContainers myApp =
* StatefulFunctionsAppContainers.builder("app-name", 3).build();
*
* {@code @Test}
* public void runTest() {
* // the containers for the app, including master and workers, will already be running
* // before the test is run; implement your test logic against the app
* }
* }
* }</pre>
*
* <p>In most cases you'd also need to start an additional system for the test, for example starting
* a container that runs Kafka from which the application depends on as an ingress or egress. The
* following demonstrates adding a Kafka container to the setup:
*
* <pre>{@code
* public class MyKafkaE2E {
*
* {@code @Rule}
* public KafkaContainer kafka = new KafkaContainer();
*
* {@code @Rule}
* public StatefulFunctionsAppContainers myApp =
* StatefulFunctionsAppContainers.builder("app-name", 3)
* .dependsOn(kafka)
* .build();
*
* ...
* }
* }</pre>
*
* <p>Application master and worker containers will always be started after containers that are
* added using {@link Builder#dependsOn(GenericContainer)} have started. Moreover, containers being
* depended on will also be setup such that they share the same network with the master and workers,
* so that they can freely communicate with each other.
*
* <h2>Prerequisites</h2>
*
* <p>Since Testcontainers uses Docker, it is required that you have Docker installed for this test
* rule to work.
*
* <p>When building the Docker image for the Stateful Functions application under test, the
* following files are added to the build context:
*
* <uL>
* <li>The {@code Dockerfile} found at path {@code /Dockerfile} in the classpath. This is required
* to be present. A simple way is to add the Dockerfile to the test resources directory. This
* will be added to the root of the Docker image build context.
* <li>The {@code flink-conf.yaml} found at path {@code /flink-conf.yaml} in the classpath, if
* any. You can also add this to the test resources directory. This will be added to the root
* of the Docker image build context.
* <li>All built artifacts under the generated {@code target} folder for the project module that
* the test resides in. This is required to be present, so this entails that the tests can
* only be ran after artifacts are built. The built artifacts are added to the root of the
* Docker image build context.
* </uL>
*/
public final class StatefulFunctionsAppContainers extends ExternalResource {
private static final Logger LOG = LoggerFactory.getLogger(StatefulFunctionsAppContainers.class);
private GenericContainer<?> master;
private List<GenericContainer<?>> workers;
private File checkpointDir;
private StatefulFunctionsAppContainers(
GenericContainer<?> masterContainer, List<GenericContainer<?>> workerContainers) {
this.master = Objects.requireNonNull(masterContainer);
this.workers = Objects.requireNonNull(workerContainers);
}
/**
* Creates a builder for creating a {@link StatefulFunctionsAppContainers}.
*
* @param appName the name of the application.
* @param numWorkers the number of workers to run the application.
* @return a builder for creating a {@link StatefulFunctionsAppContainers}.
*/
public static Builder builder(String appName, int numWorkers) {
return new Builder(appName, numWorkers);
}
@Override
protected void before() throws Throwable {
checkpointDir = temporaryCheckpointDir();
master.withFileSystemBind(
checkpointDir.getAbsolutePath(), "/checkpoint-dir", BindMode.READ_WRITE);
workers.forEach(
worker ->
worker.withFileSystemBind(
checkpointDir.getAbsolutePath(), "/checkpoint-dir", BindMode.READ_WRITE));
master.start();
workers.forEach(GenericContainer::start);
}
@Override
protected void after() {
master.stop();
workers.forEach(GenericContainer::stop);
FileUtils.deleteDirectoryQuietly(checkpointDir);
}
/** @return the exposed port on master for calling REST APIs. */
public int getMasterRestPort() {
return master.getMappedPort(8081);
}
/**
* Restarts a single worker of this Stateful Functions application.
*
* @param workerIndex the index of the worker to restart.
*/
public void restartWorker(int workerIndex) {
if (workerIndex >= workers.size()) {
throw new IndexOutOfBoundsException(
"Invalid worker index; valid values are 0 to " + (workers.size() - 1));
}
final GenericContainer<?> worker = workers.get(workerIndex);
worker.stop();
worker.start();
}
private static File temporaryCheckpointDir() throws IOException {
final Path currentWorkingDir = Paths.get(System.getProperty("user.dir"));
return Files.createTempDirectory(currentWorkingDir, "statefun-app-checkpoints-").toFile();
}
public static final class Builder {
private static final String MASTER_HOST = "statefun-app-master";
private static final String WORKER_HOST_PREFIX = "statefun-app-worker";
private final String appName;
private final int numWorkers;
private final Network network;
private final Configuration dynamicProperties = new Configuration();
private final List<GenericContainer<?>> dependentContainers = new ArrayList<>();
private final List<ClasspathBuildContextFile> classpathBuildContextFiles = new ArrayList<>();
private Logger logger;
private Builder(String appName, int numWorkers) {
if (appName == null || appName.isEmpty()) {
throw new IllegalArgumentException(
"App name must be non-empty. This is used as the application image name.");
}
if (numWorkers < 1) {
throw new IllegalArgumentException("Must have at least 1 worker.");
}
this.network = Network.newNetwork();
this.appName = appName;
this.numWorkers = numWorkers;
}
public StatefulFunctionsAppContainers.Builder dependsOn(GenericContainer<?> container) {
container.withNetwork(network);
this.dependentContainers.add(container);
return this;
}
public StatefulFunctionsAppContainers.Builder exposeLogs(Logger logger) {
this.logger = logger;
return this;
}
public StatefulFunctionsAppContainers.Builder withModuleGlobalConfiguration(
String key, String value) {
this.dynamicProperties.setString(StatefulFunctionsConfig.MODULE_CONFIG_PREFIX + key, value);
return this;
}
public <T> StatefulFunctionsAppContainers.Builder withConfiguration(
ConfigOption<T> config, T value) {
this.dynamicProperties.set(config, value);
return this;
}
public StatefulFunctionsAppContainers.Builder withConfiguration(String key, String value) {
this.dynamicProperties.setString(key, value);
return this;
}
public StatefulFunctionsAppContainers.Builder withBuildContextFileFromClasspath(
String buildContextPath, String resourcePath) {
this.classpathBuildContextFiles.add(
new ClasspathBuildContextFile(buildContextPath, resourcePath));
return this;
}
public StatefulFunctionsAppContainers build() {
final ImageFromDockerfile appImage =
appImage(appName, dynamicProperties, classpathBuildContextFiles);
return new StatefulFunctionsAppContainers(
masterContainer(appImage, network, dependentContainers, numWorkers, logger),
workerContainers(appImage, numWorkers, network, logger));
}
private static ImageFromDockerfile appImage(
String appName,
Configuration dynamicProperties,
List<ClasspathBuildContextFile> classpathBuildContextFiles) {
final Path targetDirPath = Paths.get(System.getProperty("user.dir") + "/target/");
LOG.info("Building app image with built artifacts located at: {}", targetDirPath);
final ImageFromDockerfile appImage =
new ImageFromDockerfile(appName)
.withFileFromClasspath("Dockerfile", "Dockerfile")
.withFileFromPath(".", targetDirPath);
Configuration flinkConf = resolveFlinkConf(dynamicProperties);
String flinkConfString = flinkConfigAsString(flinkConf);
LOG.info(
"Resolved Flink configuration after merging dynamic properties with base flink-conf.yaml:\n\n{}",
flinkConf);
appImage.withFileFromString("flink-conf.yaml", flinkConfString);
for (ClasspathBuildContextFile classpathBuildContextFile : classpathBuildContextFiles) {
appImage.withFileFromClasspath(
classpathBuildContextFile.buildContextPath, classpathBuildContextFile.fromResourcePath);
}
return appImage;
}
/**
* Merges set dynamic properties with configuration in the base flink-conf.yaml located in
* resources.
*/
private static Configuration resolveFlinkConf(Configuration dynamicProperties) {
final InputStream baseFlinkConfResourceInputStream =
StatefulFunctionsAppContainers.class.getResourceAsStream("/flink-conf.yaml");
if (baseFlinkConfResourceInputStream == null) {
throw new RuntimeException("Base flink-conf.yaml cannot be found.");
}
final File tempBaseFlinkConfFile = copyToTempFlinkConfFile(baseFlinkConfResourceInputStream);
return GlobalConfiguration.loadConfiguration(
tempBaseFlinkConfFile.getParentFile().getAbsolutePath(), dynamicProperties);
}
private static String flinkConfigAsString(Configuration configuration) {
StringBuilder yaml = new StringBuilder();
for (Map.Entry<String, String> entry : configuration.toMap().entrySet()) {
yaml.append(entry.getKey()).append(": ").append(entry.getValue()).append("\n");
}
return yaml.toString();
}
private static File copyToTempFlinkConfFile(InputStream inputStream) {
try {
final File tempFile =
new File(
Files.createTempDirectory("statefun-app-containers").toString(), "flink-conf.yaml");
Files.copy(inputStream, tempFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
return tempFile;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
private static GenericContainer<?> masterContainer(
ImageFromDockerfile appImage,
Network network,
List<GenericContainer<?>> dependents,
int numWorkers,
@Nullable Logger logger) {
final GenericContainer<?> master =
new GenericContainer(appImage)
.withNetwork(network)
.withNetworkAliases(MASTER_HOST)
.withEnv("ROLE", "master")
.withEnv("MASTER_HOST", MASTER_HOST)
.withCommand("-p " + numWorkers)
.withExposedPorts(8081);
for (GenericContainer<?> dependent : dependents) {
master.dependsOn(dependent);
}
if (logger != null) {
master.withLogConsumer(new Slf4jLogConsumer(logger, true));
}
return master;
}
private static List<GenericContainer<?>> workerContainers(
ImageFromDockerfile appImage, int numWorkers, Network network, @Nullable Logger logger) {
final List<GenericContainer<?>> workers = new ArrayList<>(numWorkers);
for (int i = 0; i < numWorkers; i++) {
final GenericContainer<?> worker =
new GenericContainer<>(appImage)
.withNetwork(network)
.withNetworkAliases(workerHostOf(i))
.withEnv("ROLE", "worker")
.withEnv("MASTER_HOST", MASTER_HOST);
if (logger != null) {
worker.withLogConsumer(new Slf4jLogConsumer(logger, true));
}
workers.add(worker);
}
return workers;
}
private static String workerHostOf(int workerIndex) {
return WORKER_HOST_PREFIX + "-" + workerIndex;
}
private static class ClasspathBuildContextFile {
private final String buildContextPath;
private final String fromResourcePath;
ClasspathBuildContextFile(String buildContextPath, String fromResourcePath) {
this.buildContextPath = Objects.requireNonNull(buildContextPath);
this.fromResourcePath = Objects.requireNonNull(fromResourcePath);
}
}
}
}