[FLINK-23714] Expose both master and worker logs of StatefulFunctionsAppContainers
diff --git a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
index f03a5de..80bf7f1 100644
--- a/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
+++ b/statefun-e2e-tests/statefun-e2e-tests-common/src/main/java/org/apache/flink/statefun/e2e/common/StatefulFunctionsAppContainers.java
@@ -200,7 +200,7 @@
     private final Configuration dynamicProperties = new Configuration();
     private final List<GenericContainer<?>> dependentContainers = new ArrayList<>();
     private final List<ClasspathBuildContextFile> classpathBuildContextFiles = new ArrayList<>();
-    private Logger masterLogger;
+    private Logger logger;
 
     private Builder(String appName, int numWorkers) {
       if (appName == null || appName.isEmpty()) {
@@ -222,8 +222,8 @@
       return this;
     }
 
-    public StatefulFunctionsAppContainers.Builder exposeMasterLogs(Logger logger) {
-      this.masterLogger = logger;
+    public StatefulFunctionsAppContainers.Builder exposeLogs(Logger logger) {
+      this.logger = logger;
       return this;
     }
 
@@ -251,8 +251,8 @@
           appImage(appName, dynamicProperties, classpathBuildContextFiles);
 
       return new StatefulFunctionsAppContainers(
-          masterContainer(appImage, network, dependentContainers, numWorkers, masterLogger),
-          workerContainers(appImage, numWorkers, network));
+          masterContainer(appImage, network, dependentContainers, numWorkers, logger),
+          workerContainers(appImage, numWorkers, network, logger));
     }
 
     private static ImageFromDockerfile appImage(
@@ -324,7 +324,7 @@
         Network network,
         List<GenericContainer<?>> dependents,
         int numWorkers,
-        @Nullable Logger masterLogger) {
+        @Nullable Logger logger) {
       final GenericContainer<?> master =
           new GenericContainer(appImage)
               .withNetwork(network)
@@ -338,24 +338,30 @@
         master.dependsOn(dependent);
       }
 
-      if (masterLogger != null) {
-        master.withLogConsumer(new Slf4jLogConsumer(masterLogger, true));
+      if (logger != null) {
+        master.withLogConsumer(new Slf4jLogConsumer(logger, true));
       }
 
       return master;
     }
 
     private static List<GenericContainer<?>> workerContainers(
-        ImageFromDockerfile appImage, int numWorkers, Network network) {
+        ImageFromDockerfile appImage, int numWorkers, Network network, @Nullable Logger logger) {
       final List<GenericContainer<?>> workers = new ArrayList<>(numWorkers);
 
       for (int i = 0; i < numWorkers; i++) {
-        workers.add(
-            new GenericContainer(appImage)
+        final GenericContainer<?> worker =
+            new GenericContainer<>(appImage)
                 .withNetwork(network)
                 .withNetworkAliases(workerHostOf(i))
                 .withEnv("ROLE", "worker")
-                .withEnv("MASTER_HOST", MASTER_HOST));
+                .withEnv("MASTER_HOST", MASTER_HOST);
+
+        if (logger != null) {
+          worker.withLogConsumer(new Slf4jLogConsumer(logger, true));
+        }
+
+        workers.add(worker);
       }
 
       return workers;
diff --git a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/java/org/apache/flink/statefun/e2e/remote/ExactlyOnceWithRemoteFnE2E.java b/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/java/org/apache/flink/statefun/e2e/remote/ExactlyOnceWithRemoteFnE2E.java
index c16cf9a..091c1fa 100644
--- a/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/java/org/apache/flink/statefun/e2e/remote/ExactlyOnceWithRemoteFnE2E.java
+++ b/statefun-e2e-tests/statefun-exactly-once-remote-e2e/src/test/java/org/apache/flink/statefun/e2e/remote/ExactlyOnceWithRemoteFnE2E.java
@@ -98,7 +98,7 @@
       StatefulFunctionsAppContainers.builder("remote-module-verification", NUM_WORKERS)
           .dependsOn(kafka)
           .dependsOn(remoteFunction)
-          .exposeMasterLogs(LOG)
+          .exposeLogs(LOG)
           .withBuildContextFileFromClasspath("remote-module", "/remote-module/")
           .build();
 
diff --git a/statefun-e2e-tests/statefun-sanity-e2e/src/test/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java b/statefun-e2e-tests/statefun-sanity-e2e/src/test/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java
index 473dc9b..d9ae8be 100644
--- a/statefun-e2e-tests/statefun-sanity-e2e/src/test/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java
+++ b/statefun-e2e-tests/statefun-sanity-e2e/src/test/java/org/apache/flink/statefun/e2e/sanity/SanityVerificationE2E.java
@@ -65,7 +65,7 @@
   public StatefulFunctionsAppContainers verificationApp =
       StatefulFunctionsAppContainers.builder("sanity-verification", 2)
           .dependsOn(kafka)
-          .exposeMasterLogs(LOG)
+          .exposeLogs(LOG)
           .withModuleGlobalConfiguration(
               Constants.KAFKA_BOOTSTRAP_SERVERS_CONF, KAFKA_HOST + ":9092")
           .build();
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-common/src/main/java/org/apache/flink/statefun/e2e/smoke/SmokeRunner.java b/statefun-e2e-tests/statefun-smoke-e2e-common/src/main/java/org/apache/flink/statefun/e2e/smoke/SmokeRunner.java
index b982a38..d42e6c6 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-common/src/main/java/org/apache/flink/statefun/e2e/smoke/SmokeRunner.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-common/src/main/java/org/apache/flink/statefun/e2e/smoke/SmokeRunner.java
@@ -45,7 +45,7 @@
     // set the test module parameters as global configurations, so that
     // it can be deserialized at Module#configure()
     parameters.asMap().forEach(builder::withModuleGlobalConfiguration);
-    builder.exposeMasterLogs(LOG);
+    builder.exposeLogs(LOG);
     StatefulFunctionsAppContainers app = builder.build();
 
     // run the test