[FLINK-23714] Expose both master and worker logs of StatefulFunctionsAppContainers
diff --git a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
index f03a5de..80bf7f1 100644
--- a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
+++ b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
@@ -200,7 +200,7 @@
private final Configuration dynamicProperties = new Configuration();
private final List<GenericContainer<?>> dependentContainers = new ArrayList<>();
private final List<ClasspathBuildContextFile> classpathBuildContextFiles = new ArrayList<>();
- private Logger masterLogger;
+ private Logger logger;
private Builder(String appName, int numWorkers) {
if (appName == null || appName.isEmpty()) {
@@ -222,8 +222,8 @@
return this;
}
- public StatefulFunctionsAppContainers.Builder exposeMasterLogs(Logger logger) {
- this.masterLogger = logger;
+ public StatefulFunctionsAppContainers.Builder exposeLogs(Logger logger) {
+ this.logger = logger;
return this;
}
@@ -251,8 +251,8 @@
appImage(appName, dynamicProperties, classpathBuildContextFiles);
return new StatefulFunctionsAppContainers(
- masterContainer(appImage, network, dependentContainers, numWorkers, masterLogger),
- workerContainers(appImage, numWorkers, network));
+ masterContainer(appImage, network, dependentContainers, numWorkers, logger),
+ workerContainers(appImage, numWorkers, network, logger));
}
private static ImageFromDockerfile appImage(
@@ -324,7 +324,7 @@
Network network,
List<GenericContainer<?>> dependents,
int numWorkers,
- @Nullable Logger masterLogger) {
+ @Nullable Logger logger) {
final GenericContainer<?> master =
new GenericContainer(appImage)
.withNetwork(network)
@@ -338,24 +338,30 @@
master.dependsOn(dependent);
}
- if (masterLogger != null) {
- master.withLogConsumer(new Slf4jLogConsumer(masterLogger, true));
+ if (logger != null) {
+ master.withLogConsumer(new Slf4jLogConsumer(logger, true));
}
return master;
}
private static List<GenericContainer<?>> workerContainers(
- ImageFromDockerfile appImage, int numWorkers, Network network) {
+ ImageFromDockerfile appImage, int numWorkers, Network network, @Nullable Logger logger) {
final List<GenericContainer<?>> workers = new ArrayList<>(numWorkers);
for (int i = 0; i < numWorkers; i++) {
- workers.add(
- new GenericContainer(appImage)
+ final GenericContainer<?> worker =
+ new GenericContainer<>(appImage)
.withNetwork(network)
.withNetworkAliases(workerHostOf(i))
.withEnv("ROLE", "worker")
- .withEnv("MASTER_HOST", MASTER_HOST));
+ .withEnv("MASTER_HOST", MASTER_HOST);
+
+ if (logger != null) {
+ worker.withLogConsumer(new Slf4jLogConsumer(logger, true));
+ }
+
+ workers.add(worker);
}
return workers;
diff --git a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/java/org/apache/flink/statefun/e2e/remote/ExactlyOnceWithRemoteFnE2E.java b/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/java/org/apache/flink/statefun/e2e/remote/ExactlyOnceWithRemoteFnE2E.java
index c16cf9a..091c1fa 100644
--- a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/java/org/apache/flink/statefun/e2e/remote/ExactlyOnceWithRemoteFnE2E.java
+++ b/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/java/org/apache/flink/statefun/e2e/remote/ExactlyOnceWithRemoteFnE2E.java
@@ -98,7 +98,7 @@
StatefulFunctionsAppContainers.builder("remote-module-verification", NUM_WORKERS)
.dependsOn(kafka)
.dependsOn(remoteFunction)
- .exposeMasterLogs(LOG)
+ .exposeLogs(LOG)
.withBuildContextFileFromClasspath("remote-module", "/remote-module/")
.build();
diff --git a/statefun-e2e-tests/statefun-sanity-e2e/src/test/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java b/statefun-e2e-tests/statefun-sanity-e2e/src/test/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java
index 473dc9b..d9ae8be 100644
--- a/statefun-e2e-tests/statefun-sanity-e2e/src/test/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java
+++ b/statefun-e2e-tests/statefun-sanity-e2e/src/test/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java
@@ -65,7 +65,7 @@
public StatefulFunctionsAppContainers verificationApp =
StatefulFunctionsAppContainers.builder("sanity-verification", 2)
.dependsOn(kafka)
- .exposeMasterLogs(LOG)
+ .exposeLogs(LOG)
.withModuleGlobalConfiguration(
Constants.KAFKA_BOOTSTRAP_SERVERS_CONF, KAFKA_HOST + ":9092")
.build();
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-common/src/main/java/org/apache/flink/statefun/e2e/smoke/SmokeRunner.java b/statefun-e2e-tests/statefun-smoke-e2e-common/src/main/java/org/apache/flink/statefun/e2e/smoke/SmokeRunner.java
index b982a38..d42e6c6 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-common/src/main/java/org/apache/flink/statefun/e2e/smoke/SmokeRunner.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-common/src/main/java/org/apache/flink/statefun/e2e/smoke/SmokeRunner.java
@@ -45,7 +45,7 @@
// set the test module parameters as global configurations, so that
// it can be deserialized at Module#configure()
parameters.asMap().forEach(builder::withModuleGlobalConfiguration);
- builder.exposeMasterLogs(LOG);
+ builder.exposeLogs(LOG);
StatefulFunctionsAppContainers app = builder.build();
// run the test