blob: 80bf7f1383a96a39d45fb5d2943469be0cc58a88 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.flink.statefun.e2e.common;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import javax.annotation.Nullable;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.statefun.flink.core.StatefulFunctionsConfig;
import org.apache.flink.util.FileUtils;
import org.junit.rules.ExternalResource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.Network;
import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.images.builder.ImageFromDockerfile;
* A JUnit {@link org.junit.rules.TestRule} that setups a containerized Stateful Functions
* application using <a href="">Testcontainers</a>. This allows
* composing end-to-end tests for Stateful Functions applications easier, by managing the
* containerized application as an external test resource whose lifecycle is integrated with the
* JUnit test framework.
* <h2>Example usage</h2>
* <pre>{@code
* public class MyE2E {
* {@code @Rule}
* public StatefulFunctionsAppContainers myApp =
* StatefulFunctionsAppContainers.builder("app-name", 3).build();
* {@code @Test}
* public void runTest() {
* // the containers for the app, including master and workers, will already be running
* // before the test is run; implement your test logic against the app
* }
* }
* }</pre>
* <p>In most cases you'd also need to start an additional system for the test, for example starting
* a container that runs Kafka from which the application depends on as an ingress or egress. The
* following demonstrates adding a Kafka container to the setup:
* <pre>{@code
* public class MyKafkaE2E {
* {@code @Rule}
* public KafkaContainer kafka = new KafkaContainer();
* {@code @Rule}
* public StatefulFunctionsAppContainers myApp =
* StatefulFunctionsAppContainers.builder("app-name", 3)
* .dependsOn(kafka)
* .build();
* ...
* }
* }</pre>
* <p>Application master and worker containers will always be started after containers that are
* added using {@link Builder#dependsOn(GenericContainer)} have started. Moreover, containers being
* depended on will also be setup such that they share the same network with the master and workers,
* so that they can freely communicate with each other.
* <h2>Prerequisites</h2>
* <p>Since Testcontainers uses Docker, it is required that you have Docker installed for this test
* rule to work.
* <p>When building the Docker image for the Stateful Functions application under test, the
* following files are added to the build context:
* <uL>
* <li>The {@code Dockerfile} found at path {@code /Dockerfile} in the classpath. This is required
* to be present. A simple way is to add the Dockerfile to the test resources directory. This
* will be added to the root of the Docker image build context.
* <li>The {@code flink-conf.yaml} found at path {@code /flink-conf.yaml} in the classpath, if
* any. You can also add this to the test resources directory. This will be added to the root
* of the Docker image build context.
* <li>All built artifacts under the generated {@code target} folder for the project module that
* the test resides in. This is required to be present, so this entails that the tests can
* only be ran after artifacts are built. The built artifacts are added to the root of the
* Docker image build context.
* </uL>
public final class StatefulFunctionsAppContainers extends ExternalResource {
private static final Logger LOG = LoggerFactory.getLogger(StatefulFunctionsAppContainers.class);
private GenericContainer<?> master;
private List<GenericContainer<?>> workers;
private File checkpointDir;
private StatefulFunctionsAppContainers(
GenericContainer<?> masterContainer, List<GenericContainer<?>> workerContainers) {
this.master = Objects.requireNonNull(masterContainer);
this.workers = Objects.requireNonNull(workerContainers);
* Creates a builder for creating a {@link StatefulFunctionsAppContainers}.
* @param appName the name of the application.
* @param numWorkers the number of workers to run the application.
* @return a builder for creating a {@link StatefulFunctionsAppContainers}.
public static Builder builder(String appName, int numWorkers) {
return new Builder(appName, numWorkers);
protected void before() throws Throwable {
checkpointDir = temporaryCheckpointDir();
checkpointDir.getAbsolutePath(), "/checkpoint-dir", BindMode.READ_WRITE);
worker ->
checkpointDir.getAbsolutePath(), "/checkpoint-dir", BindMode.READ_WRITE));
protected void after() {
/** @return the exposed port on master for calling REST APIs. */
public int getMasterRestPort() {
return master.getMappedPort(8081);
* Restarts a single worker of this Stateful Functions application.
* @param workerIndex the index of the worker to restart.
public void restartWorker(int workerIndex) {
if (workerIndex >= workers.size()) {
throw new IndexOutOfBoundsException(
"Invalid worker index; valid values are 0 to " + (workers.size() - 1));
final GenericContainer<?> worker = workers.get(workerIndex);
private static File temporaryCheckpointDir() throws IOException {
final Path currentWorkingDir = Paths.get(System.getProperty("user.dir"));
return Files.createTempDirectory(currentWorkingDir, "statefun-app-checkpoints-").toFile();
public static final class Builder {
private static final String MASTER_HOST = "statefun-app-master";
private static final String WORKER_HOST_PREFIX = "statefun-app-worker";
private final String appName;
private final int numWorkers;
private final Network network;
private final Configuration dynamicProperties = new Configuration();
private final List<GenericContainer<?>> dependentContainers = new ArrayList<>();
private final List<ClasspathBuildContextFile> classpathBuildContextFiles = new ArrayList<>();
private Logger logger;
private Builder(String appName, int numWorkers) {
if (appName == null || appName.isEmpty()) {
throw new IllegalArgumentException(
"App name must be non-empty. This is used as the application image name.");
if (numWorkers < 1) {
throw new IllegalArgumentException("Must have at least 1 worker.");
} = Network.newNetwork();
this.appName = appName;
this.numWorkers = numWorkers;
public StatefulFunctionsAppContainers.Builder dependsOn(GenericContainer<?> container) {
return this;
public StatefulFunctionsAppContainers.Builder exposeLogs(Logger logger) {
this.logger = logger;
return this;
public StatefulFunctionsAppContainers.Builder withModuleGlobalConfiguration(
String key, String value) {
this.dynamicProperties.setString(StatefulFunctionsConfig.MODULE_CONFIG_PREFIX + key, value);
return this;
public <T> StatefulFunctionsAppContainers.Builder withConfiguration(
ConfigOption<T> config, T value) {
this.dynamicProperties.set(config, value);
return this;
public StatefulFunctionsAppContainers.Builder withBuildContextFileFromClasspath(
String buildContextPath, String resourcePath) {
new ClasspathBuildContextFile(buildContextPath, resourcePath));
return this;
public StatefulFunctionsAppContainers build() {
final ImageFromDockerfile appImage =
appImage(appName, dynamicProperties, classpathBuildContextFiles);
return new StatefulFunctionsAppContainers(
masterContainer(appImage, network, dependentContainers, numWorkers, logger),
workerContainers(appImage, numWorkers, network, logger));
private static ImageFromDockerfile appImage(
String appName,
Configuration dynamicProperties,
List<ClasspathBuildContextFile> classpathBuildContextFiles) {
final Path targetDirPath = Paths.get(System.getProperty("user.dir") + "/target/");"Building app image with built artifacts located at: {}", targetDirPath);
final ImageFromDockerfile appImage =
new ImageFromDockerfile(appName)
.withFileFromClasspath("Dockerfile", "Dockerfile")
.withFileFromPath(".", targetDirPath);
Configuration flinkConf = resolveFlinkConf(dynamicProperties);
String flinkConfString = flinkConfigAsString(flinkConf);
"Resolved Flink configuration after merging dynamic properties with base flink-conf.yaml:\n\n{}",
appImage.withFileFromString("flink-conf.yaml", flinkConfString);
for (ClasspathBuildContextFile classpathBuildContextFile : classpathBuildContextFiles) {
classpathBuildContextFile.buildContextPath, classpathBuildContextFile.fromResourcePath);
return appImage;
* Merges set dynamic properties with configuration in the base flink-conf.yaml located in
* resources.
private static Configuration resolveFlinkConf(Configuration dynamicProperties) {
final InputStream baseFlinkConfResourceInputStream =
if (baseFlinkConfResourceInputStream == null) {
throw new RuntimeException("Base flink-conf.yaml cannot be found.");
final File tempBaseFlinkConfFile = copyToTempFlinkConfFile(baseFlinkConfResourceInputStream);
return GlobalConfiguration.loadConfiguration(
tempBaseFlinkConfFile.getParentFile().getAbsolutePath(), dynamicProperties);
private static String flinkConfigAsString(Configuration configuration) {
StringBuilder yaml = new StringBuilder();
for (Map.Entry<String, String> entry : configuration.toMap().entrySet()) {
yaml.append(entry.getKey()).append(": ").append(entry.getValue()).append("\n");
return yaml.toString();
private static File copyToTempFlinkConfFile(InputStream inputStream) {
try {
final File tempFile =
new File(
Files.createTempDirectory("statefun-app-containers").toString(), "flink-conf.yaml");
Files.copy(inputStream, tempFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
return tempFile;
} catch (Exception e) {
throw new RuntimeException(e);
private static GenericContainer<?> masterContainer(
ImageFromDockerfile appImage,
Network network,
List<GenericContainer<?>> dependents,
int numWorkers,
@Nullable Logger logger) {
final GenericContainer<?> master =
new GenericContainer(appImage)
.withEnv("ROLE", "master")
.withCommand("-p " + numWorkers)
for (GenericContainer<?> dependent : dependents) {
if (logger != null) {
master.withLogConsumer(new Slf4jLogConsumer(logger, true));
return master;
private static List<GenericContainer<?>> workerContainers(
ImageFromDockerfile appImage, int numWorkers, Network network, @Nullable Logger logger) {
final List<GenericContainer<?>> workers = new ArrayList<>(numWorkers);
for (int i = 0; i < numWorkers; i++) {
final GenericContainer<?> worker =
new GenericContainer<>(appImage)
.withEnv("ROLE", "worker")
if (logger != null) {
worker.withLogConsumer(new Slf4jLogConsumer(logger, true));
return workers;
private static String workerHostOf(int workerIndex) {
return WORKER_HOST_PREFIX + "-" + workerIndex;
private static class ClasspathBuildContextFile {
private final String buildContextPath;
private final String fromResourcePath;
ClasspathBuildContextFile(String buildContextPath, String fromResourcePath) {
this.buildContextPath = Objects.requireNonNull(buildContextPath);
this.fromResourcePath = Objects.requireNonNull(fromResourcePath);