[hotfix] [e2e] Pull smoke E2E test utilities and Commands into a common module
diff --git a/statefun-e2e-tests/pom.xml b/statefun-e2e-tests/pom.xml
index 2e29272..9a3dae9 100644
--- a/statefun-e2e-tests/pom.xml
+++ b/statefun-e2e-tests/pom.xml
@@ -36,6 +36,7 @@
         <module>statefun-e2e-tests-common</module>
         <module>statefun-sanity-e2e</module>
         <module>statefun-exactly-once-remote-e2e</module>
+        <module>statefun-smoke-e2e-common</module>
         <module>statefun-smoke-e2e-driver</module>
         <module>statefun-smoke-e2e-embedded</module>
         <module>statefun-smoke-e2e-multilang-base</module>
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-common/pom.xml b/statefun-e2e-tests/statefun-smoke-e2e-common/pom.xml
new file mode 100644
index 0000000..fe77231
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e-common/pom.xml
@@ -0,0 +1,104 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xmlns="http://maven.apache.org/POM/4.0.0"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>statefun-e2e-tests</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>3.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>statefun-smoke-e2e-common</artifactId>
+
+    <properties>
+        <commons-math3.version>3.5</commons-math3.version>
+        <additional-sources.dir>target/additional-sources</additional-sources.dir>
+    </properties>
+
+    <dependencies>
+        <!-- StatefulFunctionsAppContainers -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>statefun-e2e-tests-common</artifactId>
+            <version>${project.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>com.fasterxml.jackson.core</groupId>
+                    <artifactId>jackson-annotations</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <!-- Protobuf Commands messages -->
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+            <version>${protobuf.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-shaded-jackson</artifactId>
+            <version>2.12.1-13.0</version>
+        </dependency>
+
+        <!-- logging -->
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <version>1.7.15</version>
+        </dependency>
+        <dependency>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
+            <version>1.2.17</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <!-- Generate the Command messages -->
+            <plugin>
+                <groupId>com.github.os72</groupId>
+                <artifactId>protoc-jar-maven-plugin</artifactId>
+                <version>${protoc-jar-maven-plugin.version}</version>
+                <executions>
+                    <execution>
+                        <id>generate-protobuf-sources</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>run</goal>
+                        </goals>
+                        <configuration>
+                            <includeStdTypes>true</includeStdTypes>
+                            <protocVersion>${protobuf.version}</protocVersion>
+                            <cleanOutputFolder>true</cleanOutputFolder>
+                            <inputDirectories>
+                                <inputDirectory>src/main/protobuf</inputDirectory>
+                            </inputDirectories>
+                            <outputDirectory>${basedir}/target/generated-sources/protoc-jar</outputDirectory>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/driver/testutils/SimpleVerificationServer.java b/statefun-e2e-tests/statefun-smoke-e2e-common/src/main/java/org/apache/flink/statefun/e2e/smoke/SimpleVerificationServer.java
similarity index 97%
rename from statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/driver/testutils/SimpleVerificationServer.java
rename to statefun-e2e-tests/statefun-smoke-e2e-common/src/main/java/org/apache/flink/statefun/e2e/smoke/SimpleVerificationServer.java
index 0dd83f7..cc02ebc 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/driver/testutils/SimpleVerificationServer.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-common/src/main/java/org/apache/flink/statefun/e2e/smoke/SimpleVerificationServer.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.statefun.e2e.smoke.driver.testutils;
+package org.apache.flink.statefun.e2e.smoke;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -46,7 +46,7 @@
     this.executor = MoreExecutors.newCachedDaemonThreadPool();
   }
 
-  StartedServer start() {
+  public StartedServer start() {
     if (!started.compareAndSet(false, true)) {
       throw new IllegalArgumentException("Already started.");
     }
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/driver/testutils/SmokeRunner.java b/statefun-e2e-tests/statefun-smoke-e2e-common/src/main/java/org/apache/flink/statefun/e2e/smoke/SmokeRunner.java
similarity index 70%
rename from statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/driver/testutils/SmokeRunner.java
rename to statefun-e2e-tests/statefun-smoke-e2e-common/src/main/java/org/apache/flink/statefun/e2e/smoke/SmokeRunner.java
index db89b41..b982a38 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/driver/testutils/SmokeRunner.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-common/src/main/java/org/apache/flink/statefun/e2e/smoke/SmokeRunner.java
@@ -16,12 +16,13 @@
  * limitations under the License.
  */
 
-package org.apache.flink.statefun.e2e.smoke.driver.testutils;
+package org.apache.flink.statefun.e2e.smoke;
 
-import static org.apache.flink.statefun.e2e.smoke.driver.testutils.Utils.awaitVerificationSuccess;
-import static org.apache.flink.statefun.e2e.smoke.driver.testutils.Utils.startVerificationServer;
-
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.Supplier;
 import org.apache.flink.statefun.e2e.common.StatefulFunctionsAppContainers;
+import org.apache.flink.statefun.e2e.smoke.generated.VerificationResult;
 import org.apache.flink.util.function.ThrowingRunnable;
 import org.junit.runner.Description;
 import org.junit.runners.model.Statement;
@@ -36,7 +37,7 @@
       SmokeRunnerParameters parameters, StatefulFunctionsAppContainers.Builder builder)
       throws Throwable {
     // start verification server
-    SimpleVerificationServer.StartedServer server = startVerificationServer();
+    SimpleVerificationServer.StartedServer server = new SimpleVerificationServer().start();
     parameters.setVerificationServerHost("host.testcontainers.internal");
     parameters.setVerificationServerPort(server.port());
     Testcontainers.exposeHostPorts(server.port());
@@ -68,4 +69,23 @@
 
     statement.evaluate();
   }
+
+  public static void awaitVerificationSuccess(
+      Supplier<VerificationResult> results, final int numberOfFunctionInstances) {
+    Set<Integer> successfullyVerified = new HashSet<>();
+    while (successfullyVerified.size() != numberOfFunctionInstances) {
+      VerificationResult result = results.get();
+      if (result.getActual() == result.getExpected()) {
+        successfullyVerified.add(result.getId());
+      } else if (result.getActual() > result.getExpected()) {
+        throw new AssertionError(
+            "Over counted. Expected: "
+                + result.getExpected()
+                + ", actual: "
+                + result.getActual()
+                + ", function: "
+                + result.getId());
+      }
+    }
+  }
 }
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/common/ModuleParameters.java b/statefun-e2e-tests/statefun-smoke-e2e-common/src/main/java/org/apache/flink/statefun/e2e/smoke/SmokeRunnerParameters.java
similarity index 94%
rename from statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/common/ModuleParameters.java
rename to statefun-e2e-tests/statefun-smoke-e2e-common/src/main/java/org/apache/flink/statefun/e2e/smoke/SmokeRunnerParameters.java
index 0a99fea..fb19032 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/common/ModuleParameters.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-common/src/main/java/org/apache/flink/statefun/e2e/smoke/SmokeRunnerParameters.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.statefun.e2e.smoke.common;
+package org.apache.flink.statefun.e2e.smoke;
 
 import java.io.Serializable;
 import java.util.Map;
@@ -24,7 +24,7 @@
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
 
 @SuppressWarnings("unused")
-public final class ModuleParameters implements Serializable {
+public final class SmokeRunnerParameters implements Serializable {
 
   private static final long serialVersionUID = 1;
 
@@ -44,10 +44,10 @@
   private boolean isAsyncOpSupported = false;
 
   /** Creates an instance of ModuleParameters from a key-value map. */
-  public static ModuleParameters from(Map<String, String> globalConfiguration) {
+  public static SmokeRunnerParameters from(Map<String, String> globalConfiguration) {
     ObjectMapper mapper = new ObjectMapper();
     mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
-    return mapper.convertValue(globalConfiguration, ModuleParameters.class);
+    return mapper.convertValue(globalConfiguration, SmokeRunnerParameters.class);
   }
 
   public Map<String, String> asMap() {
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/protobuf/commands.proto b/statefun-e2e-tests/statefun-smoke-e2e-common/src/main/protobuf/commands.proto
similarity index 100%
rename from statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/protobuf/commands.proto
rename to statefun-e2e-tests/statefun-smoke-e2e-common/src/main/protobuf/commands.proto
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/common/ModuleParametersTest.java b/statefun-e2e-tests/statefun-smoke-e2e-common/src/test/java/org/apache/flink/statefun/e2e/smoke/SmokeRunnerParametersTest.java
similarity index 77%
rename from statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/common/ModuleParametersTest.java
rename to statefun-e2e-tests/statefun-smoke-e2e-common/src/test/java/org/apache/flink/statefun/e2e/smoke/SmokeRunnerParametersTest.java
index 927ad9d..027c8ab 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/common/ModuleParametersTest.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-common/src/test/java/org/apache/flink/statefun/e2e/smoke/SmokeRunnerParametersTest.java
@@ -16,31 +16,31 @@
  * limitations under the License.
  */
 
-package org.apache.flink.statefun.e2e.smoke.common;
+package org.apache.flink.statefun.e2e.smoke;
 
 import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThat;
 
 import java.util.Collections;
 import java.util.Map;
 import org.junit.Test;
 
-public class ModuleParametersTest {
+public class SmokeRunnerParametersTest {
 
   @Test
   public void exampleUsage() {
     Map<String, String> keys = Collections.singletonMap("messageCount", "1");
-    ModuleParameters parameters = ModuleParameters.from(keys);
+    SmokeRunnerParameters parameters = SmokeRunnerParameters.from(keys);
 
     assertThat(parameters.getMessageCount(), is(1));
   }
 
   @Test
   public void roundTrip() {
-    ModuleParameters original = new ModuleParameters();
+    SmokeRunnerParameters original = new SmokeRunnerParameters();
     original.setCommandDepth(1234);
 
-    ModuleParameters deserialized = ModuleParameters.from(original.asMap());
+    SmokeRunnerParameters deserialized = SmokeRunnerParameters.from(original.asMap());
 
     assertThat(deserialized.getCommandDepth(), is(1234));
   }
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-driver/pom.xml b/statefun-e2e-tests/statefun-smoke-e2e-driver/pom.xml
index 78388ad..8ee004c 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-driver/pom.xml
+++ b/statefun-e2e-tests/statefun-smoke-e2e-driver/pom.xml
@@ -33,7 +33,7 @@
     </properties>
 
     <dependencies>
-        <!-- Stateful Functions -->
+        <!-- Stateful Functions SDK for composing and binding the driver components -->
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>statefun-sdk-embedded</artifactId>
@@ -56,6 +56,11 @@
             <artifactId>commons-math3</artifactId>
             <version>${commons-math3.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>statefun-smoke-e2e-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
 
         <!-- Protobuf -->
         <dependency>
@@ -95,33 +100,6 @@
                 </exclusion>
             </exclusions>
         </dependency>
-
-        <!-- logging -->
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-log4j12</artifactId>
-            <version>1.7.15</version>
-        </dependency>
-        <dependency>
-            <groupId>log4j</groupId>
-            <artifactId>log4j</artifactId>
-            <version>1.2.17</version>
-        </dependency>
-
-        <!-- End-to-end test common -->
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>statefun-e2e-tests-common</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-            <exclusions>
-                <!-- conflicts with flink-core -->
-                <exclusion>
-                    <groupId>com.kohlschutter.junixsocket</groupId>
-                    <artifactId>junixsocket-native-common</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
     </dependencies>
 
     <build>
@@ -186,16 +164,21 @@
                 </executions>
             </plugin>
 
-            <!-- package the test jar for other modules to include -->
+            <!-- driver module should be built into a fat-jar -->
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-jar-plugin</artifactId>
-                <version>2.4</version>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>3.0.0</version>
                 <executions>
+                    <!-- Run shade goal on package phase -->
                     <execution>
+                        <phase>package</phase>
                         <goals>
-                            <goal>test-jar</goal>
+                            <goal>shade</goal>
                         </goals>
+                        <configuration>
+                            <createDependencyReducedPom>false</createDependencyReducedPom>
+                        </configuration>
                     </execution>
                 </executions>
             </plugin>
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/CommandFlinkSource.java b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/CommandFlinkSource.java
index d3580f8..f878b3f 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/CommandFlinkSource.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/CommandFlinkSource.java
@@ -17,7 +17,7 @@
  */
 package org.apache.flink.statefun.e2e.smoke.driver;
 
-import static org.apache.flink.statefun.e2e.smoke.common.Types.packSourceCommand;
+import static org.apache.flink.statefun.e2e.smoke.driver.Types.packSourceCommand;
 import static org.apache.flink.statefun.e2e.smoke.generated.Command.Verify;
 import static org.apache.flink.statefun.e2e.smoke.generated.Command.newBuilder;
 
@@ -34,7 +34,7 @@
 import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.statefun.e2e.smoke.common.ModuleParameters;
+import org.apache.flink.statefun.e2e.smoke.SmokeRunnerParameters;
 import org.apache.flink.statefun.e2e.smoke.generated.Command;
 import org.apache.flink.statefun.e2e.smoke.generated.Commands;
 import org.apache.flink.statefun.e2e.smoke.generated.SourceCommand;
@@ -48,13 +48,13 @@
 /**
  * A Flink Source that Emits {@link SourceCommand}s.
  *
- * <p>This source is configured by {@link ModuleParameters} and would generate random commands,
+ * <p>This source is configured by {@link SmokeRunnerParameters} and would generate random commands,
  * addressed to various functions. This source might also throw exceptions (kaboom) to simulate
  * failures.
  *
- * <p>After generating {@link ModuleParameters#getMessageCount()} messages, this source will switch
- * to {@code verification} step. At this step, it would keep sending (every 2 seconds) a {@link
- * Verify} command to every function indefinitely.
+ * <p>After generating {@link SmokeRunnerParameters#getMessageCount()} messages, this source will
+ * switch to {@code verification} step. At this step, it would keep sending (every 2 seconds) a
+ * {@link Verify} command to every function indefinitely.
  */
 final class CommandFlinkSource extends RichSourceFunction<TypedValue>
     implements CheckpointedFunction, CheckpointListener {
@@ -65,7 +65,7 @@
   // Configuration
   // ------------------------------------------------------------------------------------------------------------
 
-  private final ModuleParameters moduleParameters;
+  private final SmokeRunnerParameters parameters;
 
   // ------------------------------------------------------------------------------------------------------------
   // Runtime
@@ -78,8 +78,8 @@
   private transient boolean done;
   private transient boolean atLeastOneCheckpointCompleted;
 
-  public CommandFlinkSource(ModuleParameters moduleParameters) {
-    this.moduleParameters = Objects.requireNonNull(moduleParameters);
+  public CommandFlinkSource(SmokeRunnerParameters parameters) {
+    this.parameters = Objects.requireNonNull(parameters);
   }
 
   @Override
@@ -95,7 +95,7 @@
     SourceSnapshot sourceSnapshot =
         getOnlyElement(sourceSnapshotHandle.get(), SourceSnapshot.getDefaultInstance());
     functionStateTracker =
-        new FunctionStateTracker(moduleParameters.getNumberOfFunctionInstances())
+        new FunctionStateTracker(this.parameters.getNumberOfFunctionInstances())
             .apply(sourceSnapshot.getTracker());
     commandsSentSoFar = sourceSnapshot.getCommandsSentSoFarHandle();
     failuresSoFar = sourceSnapshot.getFailuresGeneratedSoFar();
@@ -111,11 +111,10 @@
             .setFailuresGeneratedSoFar(failuresSoFar)
             .build());
 
-    if (commandsSentSoFar < moduleParameters.getMessageCount()) {
-      double perCent = 100.0d * (commandsSentSoFar) / moduleParameters.getMessageCount();
+    if (commandsSentSoFar < parameters.getMessageCount()) {
+      double perCent = 100.0d * (commandsSentSoFar) / parameters.getMessageCount();
       LOG.info(
-          "Commands sent {} / {} ({} %)",
-          commandsSentSoFar, moduleParameters.getMessageCount(), perCent);
+          "Commands sent {} / {} ({} %)", commandsSentSoFar, parameters.getMessageCount(), perCent);
     }
   }
 
@@ -150,7 +149,7 @@
   private void generate(SourceContext<TypedValue> ctx) {
     final int startPosition = this.commandsSentSoFar;
     final OptionalInt kaboomIndex =
-        computeFailureIndex(startPosition, failuresSoFar, moduleParameters.getMaxFailures());
+        computeFailureIndex(startPosition, failuresSoFar, parameters.getMaxFailures());
     if (kaboomIndex.isPresent()) {
       failuresSoFar++;
     }
@@ -158,11 +157,10 @@
         "starting at {}, kaboom at {}, total messages {}",
         startPosition,
         kaboomIndex,
-        moduleParameters.getMessageCount());
-    Supplier<SourceCommand> generator =
-        new CommandGenerator(new JDKRandomGenerator(), moduleParameters);
+        parameters.getMessageCount());
+    Supplier<SourceCommand> generator = new CommandGenerator(new JDKRandomGenerator(), parameters);
     FunctionStateTracker functionStateTracker = this.functionStateTracker;
-    for (int i = startPosition; i < moduleParameters.getMessageCount(); i++) {
+    for (int i = startPosition; i < parameters.getMessageCount(); i++) {
       if (atLeastOneCheckpointCompleted && kaboomIndex.isPresent() && i >= kaboomIndex.getAsInt()) {
         throw new RuntimeException("KABOOM!!!");
       }
@@ -181,7 +179,7 @@
   private void verify(SourceContext<TypedValue> ctx) {
     FunctionStateTracker functionStateTracker = this.functionStateTracker;
 
-    for (int i = 0; i < moduleParameters.getNumberOfFunctionInstances(); i++) {
+    for (int i = 0; i < parameters.getNumberOfFunctionInstances(); i++) {
       final long expected = functionStateTracker.stateOf(i);
 
       Command.Builder verify = newBuilder().setVerify(Verify.newBuilder().setExpected(expected));
@@ -205,11 +203,10 @@
     if (failureSoFar >= maxFailures) {
       return OptionalInt.empty();
     }
-    if (startPosition >= moduleParameters.getMessageCount()) {
+    if (startPosition >= parameters.getMessageCount()) {
       return OptionalInt.empty();
     }
-    int index =
-        ThreadLocalRandom.current().nextInt(startPosition, moduleParameters.getMessageCount());
+    int index = ThreadLocalRandom.current().nextInt(startPosition, parameters.getMessageCount());
     return OptionalInt.of(index);
   }
 
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/CommandGenerator.java b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/CommandGenerator.java
index ae44b4f..fa399ee 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/CommandGenerator.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/CommandGenerator.java
@@ -27,13 +27,13 @@
 import org.apache.commons.math3.distribution.EnumeratedDistribution;
 import org.apache.commons.math3.random.RandomGenerator;
 import org.apache.commons.math3.util.Pair;
-import org.apache.flink.statefun.e2e.smoke.common.ModuleParameters;
+import org.apache.flink.statefun.e2e.smoke.SmokeRunnerParameters;
 import org.apache.flink.statefun.e2e.smoke.generated.Command;
 import org.apache.flink.statefun.e2e.smoke.generated.Commands;
 import org.apache.flink.statefun.e2e.smoke.generated.SourceCommand;
 
 /**
- * Generates random commands to be interpreted by {@linkplain CommandInterpreter}.
+ * Generates random commands to be interpreted by functions of type {@link Constants#FN_TYPE}.
  *
  * <p>see {src/main/protobuf/commands.proto}
  */
@@ -41,17 +41,17 @@
 
   private final RandomGenerator random;
   private final EnumeratedDistribution<Gen> distribution;
-  private final ModuleParameters moduleParameters;
+  private final SmokeRunnerParameters parameters;
 
-  public CommandGenerator(RandomGenerator random, ModuleParameters parameters) {
+  public CommandGenerator(RandomGenerator random, SmokeRunnerParameters parameters) {
     this.random = Objects.requireNonNull(random);
-    this.moduleParameters = Objects.requireNonNull(parameters);
+    this.parameters = Objects.requireNonNull(parameters);
     this.distribution = new EnumeratedDistribution<>(random, randomCommandGenerators());
   }
 
   @Override
   public SourceCommand get() {
-    final int depth = random.nextInt(moduleParameters.getCommandDepth());
+    final int depth = random.nextInt(parameters.getCommandDepth());
     return SourceCommand.newBuilder().setTarget(address()).setCommands(commands(depth)).build();
   }
 
@@ -61,7 +61,7 @@
       StateModifyGen.instance().generate(builder, depth);
       return builder;
     }
-    final int n = random.nextInt(moduleParameters.getMaxCommandsPerDepth());
+    final int n = random.nextInt(parameters.getMaxCommandsPerDepth());
     for (int i = 0; i < n; i++) {
       Gen gen = distribution.sample();
       gen.generate(builder, depth);
@@ -73,20 +73,20 @@
   }
 
   private int address() {
-    return random.nextInt(moduleParameters.getNumberOfFunctionInstances());
+    return random.nextInt(parameters.getNumberOfFunctionInstances());
   }
 
   private List<Pair<Gen, Double>> randomCommandGenerators() {
     List<Pair<Gen, Double>> list =
         new ArrayList<>(
             asList(
-                create(new StateModifyGen(), moduleParameters.getStateModificationsPr()),
-                create(new SendGen(), moduleParameters.getSendPr()),
-                create(new SendAfterGen(), moduleParameters.getSendAfterPr()),
-                create(new Noop(), moduleParameters.getNoopPr()),
-                create(new SendEgress(), moduleParameters.getSendEgressPr())));
-    if (moduleParameters.isAsyncOpSupported()) {
-      list.add(create(new SendAsyncOp(), moduleParameters.getAsyncSendPr()));
+                create(new StateModifyGen(), parameters.getStateModificationsPr()),
+                create(new SendGen(), parameters.getSendPr()),
+                create(new SendAfterGen(), parameters.getSendAfterPr()),
+                create(new Noop(), parameters.getNoopPr()),
+                create(new SendEgress(), parameters.getSendEgressPr())));
+    if (parameters.isAsyncOpSupported()) {
+      list.add(create(new SendAsyncOp(), parameters.getAsyncSendPr()));
     }
     return list;
   }
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/CommandRouter.java b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/CommandRouter.java
index 1717f73..3503043 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/CommandRouter.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/CommandRouter.java
@@ -17,11 +17,9 @@
  */
 package org.apache.flink.statefun.e2e.smoke.driver;
 
-import static org.apache.flink.statefun.e2e.smoke.common.Types.unpackSourceCommand;
+import static org.apache.flink.statefun.e2e.smoke.driver.Types.unpackSourceCommand;
 
 import java.util.Objects;
-import org.apache.flink.statefun.e2e.smoke.common.Constants;
-import org.apache.flink.statefun.e2e.smoke.common.Ids;
 import org.apache.flink.statefun.e2e.smoke.generated.SourceCommand;
 import org.apache.flink.statefun.sdk.FunctionType;
 import org.apache.flink.statefun.sdk.io.Router;
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/common/Constants.java b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/Constants.java
similarity index 97%
rename from statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/common/Constants.java
rename to statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/Constants.java
index 62131e5..8fee0d7 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/common/Constants.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/Constants.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.statefun.e2e.smoke.common;
+package org.apache.flink.statefun.e2e.smoke.driver;
 
 import org.apache.flink.statefun.sdk.FunctionType;
 import org.apache.flink.statefun.sdk.io.EgressIdentifier;
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/DriverModule.java b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/DriverModule.java
index a93b4e6..26c9115 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/DriverModule.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/DriverModule.java
@@ -17,17 +17,15 @@
  */
 package org.apache.flink.statefun.e2e.smoke.driver;
 
-import static org.apache.flink.statefun.e2e.smoke.common.Constants.IN;
-import static org.apache.flink.statefun.e2e.smoke.common.Types.unpackVerificationResult;
+import static org.apache.flink.statefun.e2e.smoke.driver.Constants.IN;
+import static org.apache.flink.statefun.e2e.smoke.driver.Types.unpackVerificationResult;
 
 import com.google.auto.service.AutoService;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.Map;
 import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.statefun.e2e.smoke.common.Constants;
-import org.apache.flink.statefun.e2e.smoke.common.Ids;
-import org.apache.flink.statefun.e2e.smoke.common.ModuleParameters;
+import org.apache.flink.statefun.e2e.smoke.SmokeRunnerParameters;
 import org.apache.flink.statefun.e2e.smoke.generated.VerificationResult;
 import org.apache.flink.statefun.flink.io.datastream.SinkFunctionSpec;
 import org.apache.flink.statefun.flink.io.datastream.SourceFunctionSpec;
@@ -44,20 +42,20 @@
 
   @Override
   public void configure(Map<String, String> globalConfiguration, Binder binder) {
-    ModuleParameters moduleParameters = ModuleParameters.from(globalConfiguration);
-    LOG.info(moduleParameters.toString());
+    SmokeRunnerParameters parameters = SmokeRunnerParameters.from(globalConfiguration);
+    LOG.info(parameters.toString());
 
-    Ids ids = new Ids(moduleParameters.getNumberOfFunctionInstances());
+    Ids ids = new Ids(parameters.getNumberOfFunctionInstances());
 
-    binder.bindIngress(new SourceFunctionSpec<>(IN, new CommandFlinkSource(moduleParameters)));
+    binder.bindIngress(new SourceFunctionSpec<>(IN, new CommandFlinkSource(parameters)));
     binder.bindIngressRouter(IN, new CommandRouter(ids));
 
     binder.bindEgress(new SinkFunctionSpec<>(Constants.OUT, new DiscardingSink<>()));
 
     SocketClientSink<TypedValue> client =
         new SocketClientSink<>(
-            moduleParameters.getVerificationServerHost(),
-            moduleParameters.getVerificationServerPort(),
+            parameters.getVerificationServerHost(),
+            parameters.getVerificationServerPort(),
             new VerificationResultSerializer(),
             3,
             true);
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/common/Ids.java b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/Ids.java
similarity index 95%
rename from statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/common/Ids.java
rename to statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/Ids.java
index 3f795ba..afd1932 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/common/Ids.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/Ids.java
@@ -15,7 +15,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.statefun.e2e.smoke.common;
+package org.apache.flink.statefun.e2e.smoke.driver;
 
 public final class Ids {
   private final String[] cache;
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/common/Types.java b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/Types.java
similarity index 98%
rename from statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/common/Types.java
rename to statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/Types.java
index aaf3c18..94bde28 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/common/Types.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/Types.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.statefun.e2e.smoke.common;
+package org.apache.flink.statefun.e2e.smoke.driver;
 
 import org.apache.flink.statefun.e2e.smoke.generated.Commands;
 import org.apache.flink.statefun.e2e.smoke.generated.SourceCommand;
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/driver/CommandGeneratorTest.java b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/driver/CommandGeneratorTest.java
index e74662f..92fa5cd 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/driver/CommandGeneratorTest.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/driver/CommandGeneratorTest.java
@@ -22,7 +22,7 @@
 import static org.hamcrest.MatcherAssert.assertThat;
 
 import org.apache.commons.math3.random.JDKRandomGenerator;
-import org.apache.flink.statefun.e2e.smoke.common.ModuleParameters;
+import org.apache.flink.statefun.e2e.smoke.SmokeRunnerParameters;
 import org.apache.flink.statefun.e2e.smoke.generated.SourceCommand;
 import org.junit.Test;
 
@@ -30,7 +30,7 @@
 
   @Test
   public void usageExample() {
-    ModuleParameters parameters = new ModuleParameters();
+    SmokeRunnerParameters parameters = new SmokeRunnerParameters();
     parameters.setAsyncOpSupported(true);
     CommandGenerator generator = new CommandGenerator(new JDKRandomGenerator(), parameters);
 
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/driver/FunctionStateTrackerTest.java b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/driver/FunctionStateTrackerTest.java
index 6030600..806290d 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/driver/FunctionStateTrackerTest.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/driver/FunctionStateTrackerTest.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.statefun.e2e.smoke.driver;
 
-import static org.apache.flink.statefun.e2e.smoke.driver.testutils.Utils.aRelayedStateModificationCommand;
-import static org.apache.flink.statefun.e2e.smoke.driver.testutils.Utils.aStateModificationCommand;
+import static org.apache.flink.statefun.e2e.smoke.testutils.Utils.aRelayedStateModificationCommand;
+import static org.apache.flink.statefun.e2e.smoke.testutils.Utils.aStateModificationCommand;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/driver/testutils/SmokeRunnerParameters.java b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/driver/testutils/SmokeRunnerParameters.java
deleted file mode 100644
index 3ac2247..0000000
--- a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/driver/testutils/SmokeRunnerParameters.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.statefun.e2e.smoke.driver.testutils;
-
-import java.io.Serializable;
-import java.util.Map;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-
-@SuppressWarnings("unused")
-public final class SmokeRunnerParameters implements Serializable {
-
-  private static final long serialVersionUID = 1;
-
-  private int numberOfFunctionInstances = 1_000;
-  private int commandDepth = 10;
-  private int messageCount = 100_000;
-  private int maxCommandsPerDepth = 3;
-  private double stateModificationsPr = 0.4;
-  private double sendPr = 0.9;
-  private double sendAfterPr = 0.1;
-  private double asyncSendPr = 0.1;
-  private double noopPr = 0.2;
-  private double sendEgressPr = 0.03;
-  private int maxFailures = 1;
-  private String verificationServerHost = "localhost";
-  private int verificationServerPort = 5050;
-  private boolean isAsyncOpSupported = false;
-
-  public Map<String, String> asMap() {
-    ObjectMapper mapper = new ObjectMapper();
-    return mapper.convertValue(this, new TypeReference<Map<String, String>>() {});
-  }
-
-  public int getNumberOfFunctionInstances() {
-    return numberOfFunctionInstances;
-  }
-
-  public void setNumberOfFunctionInstances(int numberOfFunctionInstances) {
-    this.numberOfFunctionInstances = numberOfFunctionInstances;
-  }
-
-  public int getCommandDepth() {
-    return commandDepth;
-  }
-
-  public void setCommandDepth(int commandDepth) {
-    this.commandDepth = commandDepth;
-  }
-
-  public int getMessageCount() {
-    return messageCount;
-  }
-
-  public void setMessageCount(int messageCount) {
-    this.messageCount = messageCount;
-  }
-
-  public int getMaxCommandsPerDepth() {
-    return maxCommandsPerDepth;
-  }
-
-  public void setMaxCommandsPerDepth(int maxCommandsPerDepth) {
-    this.maxCommandsPerDepth = maxCommandsPerDepth;
-  }
-
-  public double getStateModificationsPr() {
-    return stateModificationsPr;
-  }
-
-  public void setStateModificationsPr(double stateModificationsPr) {
-    this.stateModificationsPr = stateModificationsPr;
-  }
-
-  public double getSendPr() {
-    return sendPr;
-  }
-
-  public void setSendPr(double sendPr) {
-    this.sendPr = sendPr;
-  }
-
-  public double getSendAfterPr() {
-    return sendAfterPr;
-  }
-
-  public void setSendAfterPr(double sendAfterPr) {
-    this.sendAfterPr = sendAfterPr;
-  }
-
-  public double getAsyncSendPr() {
-    return asyncSendPr;
-  }
-
-  public void setAsyncSendPr(double asyncSendPr) {
-    this.asyncSendPr = asyncSendPr;
-  }
-
-  public double getNoopPr() {
-    return noopPr;
-  }
-
-  public void setNoopPr(double noopPr) {
-    this.noopPr = noopPr;
-  }
-
-  public double getSendEgressPr() {
-    return sendEgressPr;
-  }
-
-  public void setSendEgressPr(double sendEgressPr) {
-    this.sendEgressPr = sendEgressPr;
-  }
-
-  public void setMaxFailures(int maxFailures) {
-    this.maxFailures = maxFailures;
-  }
-
-  public int getMaxFailures() {
-    return maxFailures;
-  }
-
-  public String getVerificationServerHost() {
-    return verificationServerHost;
-  }
-
-  public void setVerificationServerHost(String verificationServerHost) {
-    this.verificationServerHost = verificationServerHost;
-  }
-
-  public int getVerificationServerPort() {
-    return verificationServerPort;
-  }
-
-  public void setVerificationServerPort(int verificationServerPort) {
-    this.verificationServerPort = verificationServerPort;
-  }
-
-  public boolean isAsyncOpSupported() {
-    return isAsyncOpSupported;
-  }
-
-  public void setAsyncOpSupported(boolean asyncOpSupported) {
-    isAsyncOpSupported = asyncOpSupported;
-  }
-
-  @Override
-  public String toString() {
-    return "ModuleParameters{"
-        + "numberOfFunctionInstances="
-        + numberOfFunctionInstances
-        + ", commandDepth="
-        + commandDepth
-        + ", messageCount="
-        + messageCount
-        + ", maxCommandsPerDepth="
-        + maxCommandsPerDepth
-        + ", stateModificationsPr="
-        + stateModificationsPr
-        + ", sendPr="
-        + sendPr
-        + ", sendAfterPr="
-        + sendAfterPr
-        + ", asyncSendPr="
-        + asyncSendPr
-        + ", noopPr="
-        + noopPr
-        + ", sendEgressPr="
-        + sendEgressPr
-        + ", maxFailures="
-        + maxFailures
-        + ", verificationServerHost='"
-        + verificationServerHost
-        + '\''
-        + ", verificationServerPort="
-        + verificationServerPort
-        + ", isAsyncOpSupported="
-        + isAsyncOpSupported
-        + '}';
-  }
-}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/driver/testutils/Utils.java b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/testutils/Utils.java
similarity index 61%
rename from statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/driver/testutils/Utils.java
rename to statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/testutils/Utils.java
index 424f021..d978cfc 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/driver/testutils/Utils.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/testutils/Utils.java
@@ -15,15 +15,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.flink.statefun.e2e.smoke.driver.testutils;
+package org.apache.flink.statefun.e2e.smoke.testutils;
 
-import java.util.HashSet;
-import java.util.Set;
-import java.util.function.Supplier;
 import org.apache.flink.statefun.e2e.smoke.generated.Command;
 import org.apache.flink.statefun.e2e.smoke.generated.Commands;
 import org.apache.flink.statefun.e2e.smoke.generated.SourceCommand;
-import org.apache.flink.statefun.e2e.smoke.generated.VerificationResult;
 
 public class Utils {
 
@@ -57,30 +53,4 @@
   private static Command.Builder modify() {
     return Command.newBuilder().setIncrement(Command.IncrementState.getDefaultInstance());
   }
-
-  /** Blocks the currently executing thread until enough successful verification results supply. */
-  public static void awaitVerificationSuccess(
-      Supplier<VerificationResult> results, final int numberOfFunctionInstances) {
-    Set<Integer> successfullyVerified = new HashSet<>();
-    while (successfullyVerified.size() != numberOfFunctionInstances) {
-      VerificationResult result = results.get();
-      if (result.getActual() == result.getExpected()) {
-        successfullyVerified.add(result.getId());
-      } else if (result.getActual() > result.getExpected()) {
-        throw new AssertionError(
-            "Over counted. Expected: "
-                + result.getExpected()
-                + ", actual: "
-                + result.getActual()
-                + ", function: "
-                + result.getId());
-      }
-    }
-  }
-
-  /** starts a simple verification TCP server that accepts {@link com.google.protobuf.Any}. */
-  public static SimpleVerificationServer.StartedServer startVerificationServer() {
-    SimpleVerificationServer server = new SimpleVerificationServer();
-    return server.start();
-  }
 }
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-embedded/pom.xml b/statefun-e2e-tests/statefun-smoke-e2e-embedded/pom.xml
index cfbd741..41194ca 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-embedded/pom.xml
+++ b/statefun-e2e-tests/statefun-smoke-e2e-embedded/pom.xml
@@ -28,20 +28,6 @@
     <artifactId>statefun-smoke-e2e-embedded</artifactId>
 
     <dependencies>
-        <!-- End-to-end test common -->
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>statefun-e2e-tests-common</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-            <exclusions>
-                <!-- conflicts with flink-core -->
-                <exclusion>
-                    <groupId>com.kohlschutter.junixsocket</groupId>
-                    <artifactId>junixsocket-native-common</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>statefun-flink-harness</artifactId>
@@ -59,15 +45,15 @@
         <!-- smoke end-to-end common -->
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>statefun-smoke-e2e-driver</artifactId>
-            <version>3.1-SNAPSHOT</version>
+            <artifactId>statefun-smoke-e2e-common</artifactId>
+            <version>${project.version}</version>
         </dependency>
+
+        <!-- Reusing some classes from the driver module, e.g. Ids and Types -->
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>statefun-smoke-e2e-driver</artifactId>
             <version>3.1-SNAPSHOT</version>
-            <type>test-jar</type>
-            <scope>test</scope>
         </dependency>
     </dependencies>
 
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/main/java/org/apache/flink/statefun/e2e/smoke/embedded/CommandInterpreter.java b/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/main/java/org/apache/flink/statefun/e2e/smoke/embedded/CommandInterpreter.java
index 5cc5bd5..a4da76d 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/main/java/org/apache/flink/statefun/e2e/smoke/embedded/CommandInterpreter.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/main/java/org/apache/flink/statefun/e2e/smoke/embedded/CommandInterpreter.java
@@ -17,13 +17,13 @@
  */
 package org.apache.flink.statefun.e2e.smoke.embedded;
 
-import static org.apache.flink.statefun.e2e.smoke.common.Types.*;
+import static org.apache.flink.statefun.e2e.smoke.driver.Types.*;
 
 import java.time.Duration;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
-import org.apache.flink.statefun.e2e.smoke.common.Constants;
-import org.apache.flink.statefun.e2e.smoke.common.Ids;
+import org.apache.flink.statefun.e2e.smoke.driver.Constants;
+import org.apache.flink.statefun.e2e.smoke.driver.Ids;
 import org.apache.flink.statefun.e2e.smoke.generated.Command;
 import org.apache.flink.statefun.e2e.smoke.generated.Commands;
 import org.apache.flink.statefun.e2e.smoke.generated.SourceCommand;
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/main/java/org/apache/flink/statefun/e2e/smoke/embedded/EmbeddedFnModule.java b/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/main/java/org/apache/flink/statefun/e2e/smoke/embedded/EmbeddedFnModule.java
index e17e1bc..4d24d45 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/main/java/org/apache/flink/statefun/e2e/smoke/embedded/EmbeddedFnModule.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/main/java/org/apache/flink/statefun/e2e/smoke/embedded/EmbeddedFnModule.java
@@ -19,9 +19,9 @@
 
 import com.google.auto.service.AutoService;
 import java.util.Map;
-import org.apache.flink.statefun.e2e.smoke.common.Constants;
-import org.apache.flink.statefun.e2e.smoke.common.Ids;
-import org.apache.flink.statefun.e2e.smoke.common.ModuleParameters;
+import org.apache.flink.statefun.e2e.smoke.SmokeRunnerParameters;
+import org.apache.flink.statefun.e2e.smoke.driver.Constants;
+import org.apache.flink.statefun.e2e.smoke.driver.Ids;
 import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -32,10 +32,10 @@
 
   @Override
   public void configure(Map<String, String> globalConfiguration, Binder binder) {
-    ModuleParameters moduleParameters = ModuleParameters.from(globalConfiguration);
-    LOG.info(moduleParameters.toString());
+    SmokeRunnerParameters parameters = SmokeRunnerParameters.from(globalConfiguration);
+    LOG.info(parameters.toString());
 
-    Ids ids = new Ids(moduleParameters.getNumberOfFunctionInstances());
+    Ids ids = new Ids(parameters.getNumberOfFunctionInstances());
 
     FunctionProvider provider = new FunctionProvider(ids);
     binder.bindFunctionProvider(Constants.FN_TYPE, provider);
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/main/java/org/apache/flink/statefun/e2e/smoke/embedded/FunctionProvider.java b/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/main/java/org/apache/flink/statefun/e2e/smoke/embedded/FunctionProvider.java
index 78e163f..98aa8b4 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/main/java/org/apache/flink/statefun/e2e/smoke/embedded/FunctionProvider.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/main/java/org/apache/flink/statefun/e2e/smoke/embedded/FunctionProvider.java
@@ -18,7 +18,7 @@
 package org.apache.flink.statefun.e2e.smoke.embedded;
 
 import java.util.Objects;
-import org.apache.flink.statefun.e2e.smoke.common.Ids;
+import org.apache.flink.statefun.e2e.smoke.driver.Ids;
 import org.apache.flink.statefun.sdk.FunctionType;
 import org.apache.flink.statefun.sdk.StatefulFunction;
 import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/test/java/org/apache/flink/statefun/e2e/smoke/embedded/EmbeddedSmokeHarnessTest.java b/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/test/java/org/apache/flink/statefun/e2e/smoke/embedded/EmbeddedSmokeHarnessTest.java
index f8b8c07..925ebd1 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/test/java/org/apache/flink/statefun/e2e/smoke/embedded/EmbeddedSmokeHarnessTest.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/test/java/org/apache/flink/statefun/e2e/smoke/embedded/EmbeddedSmokeHarnessTest.java
@@ -18,11 +18,10 @@
 
 package org.apache.flink.statefun.e2e.smoke.embedded;
 
-import static org.apache.flink.statefun.e2e.smoke.driver.testutils.Utils.awaitVerificationSuccess;
-import static org.apache.flink.statefun.e2e.smoke.driver.testutils.Utils.startVerificationServer;
+import static org.apache.flink.statefun.e2e.smoke.SmokeRunner.awaitVerificationSuccess;
 
-import org.apache.flink.statefun.e2e.smoke.driver.testutils.SimpleVerificationServer;
-import org.apache.flink.statefun.e2e.smoke.driver.testutils.SmokeRunnerParameters;
+import org.apache.flink.statefun.e2e.smoke.SimpleVerificationServer;
+import org.apache.flink.statefun.e2e.smoke.SmokeRunnerParameters;
 import org.apache.flink.statefun.flink.harness.Harness;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -52,7 +51,7 @@
     harness.withConfiguration("state.checkpoints.dir", "file:///tmp/checkpoints");
 
     // start the verification server
-    SimpleVerificationServer.StartedServer started = startVerificationServer();
+    SimpleVerificationServer.StartedServer started = new SimpleVerificationServer().start();
 
     // configure test parameters.
     SmokeRunnerParameters parameters = new SmokeRunnerParameters();
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/test/java/org/apache/flink/statefun/e2e/smoke/embedded/SmokeVerificationEmbeddedE2E.java b/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/test/java/org/apache/flink/statefun/e2e/smoke/embedded/SmokeVerificationEmbeddedE2E.java
index b3d96fd..cc7a0b4 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/test/java/org/apache/flink/statefun/e2e/smoke/embedded/SmokeVerificationEmbeddedE2E.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/test/java/org/apache/flink/statefun/e2e/smoke/embedded/SmokeVerificationEmbeddedE2E.java
@@ -19,8 +19,8 @@
 package org.apache.flink.statefun.e2e.smoke.embedded;
 
 import org.apache.flink.statefun.e2e.common.StatefulFunctionsAppContainers;
-import org.apache.flink.statefun.e2e.smoke.driver.testutils.SmokeRunner;
-import org.apache.flink.statefun.e2e.smoke.driver.testutils.SmokeRunnerParameters;
+import org.apache.flink.statefun.e2e.smoke.SmokeRunner;
+import org.apache.flink.statefun.e2e.smoke.SmokeRunnerParameters;
 import org.junit.Test;
 
 public class SmokeVerificationEmbeddedE2E {
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-java/pom.xml b/statefun-e2e-tests/statefun-smoke-e2e-java/pom.xml
index b3240d9..034a4c7 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-java/pom.xml
+++ b/statefun-e2e-tests/statefun-smoke-e2e-java/pom.xml
@@ -49,6 +49,31 @@
 
     <build>
         <plugins>
+            <!-- Generate the Command messages -->
+            <plugin>
+                <groupId>com.github.os72</groupId>
+                <artifactId>protoc-jar-maven-plugin</artifactId>
+                <version>${protoc-jar-maven-plugin.version}</version>
+                <executions>
+                    <execution>
+                        <id>generate-protobuf-sources</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>run</goal>
+                        </goals>
+                        <configuration>
+                            <includeStdTypes>true</includeStdTypes>
+                            <protocVersion>${protobuf.version}</protocVersion>
+                            <cleanOutputFolder>true</cleanOutputFolder>
+                            <inputDirectories>
+                                <inputDirectory>src/main/protobuf</inputDirectory>
+                            </inputDirectories>
+                            <outputDirectory>${basedir}/target/generated-sources/protoc-jar</outputDirectory>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+
             <!-- build uber jar for launching the remote function -->
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
@@ -74,31 +99,6 @@
                     </execution>
                 </executions>
             </plugin>
-
-            <!-- Generate the Command messages -->
-            <plugin>
-                <groupId>com.github.os72</groupId>
-                <artifactId>protoc-jar-maven-plugin</artifactId>
-                <version>${protoc-jar-maven-plugin.version}</version>
-                <executions>
-                    <execution>
-                        <id>generate-protobuf-sources</id>
-                        <phase>generate-sources</phase>
-                        <goals>
-                            <goal>run</goal>
-                        </goals>
-                        <configuration>
-                            <includeStdTypes>true</includeStdTypes>
-                            <protocVersion>${protobuf.version}</protocVersion>
-                            <cleanOutputFolder>true</cleanOutputFolder>
-                            <inputDirectories>
-                                <inputDirectory>src/main/protobuf</inputDirectory>
-                            </inputDirectories>
-                            <outputDirectory>${basedir}/target/generated-sources/protoc-jar</outputDirectory>
-                        </configuration>
-                    </execution>
-                </executions>
-            </plugin>
         </plugins>
     </build>
 
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/java/org/apache/flink/statefun/e2e/smoke/java/SmokeVerificationJavaE2E.java b/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/java/org/apache/flink/statefun/e2e/smoke/java/SmokeVerificationJavaE2E.java
index 7d7c332..3e2fb1d 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/java/org/apache/flink/statefun/e2e/smoke/java/SmokeVerificationJavaE2E.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/java/org/apache/flink/statefun/e2e/smoke/java/SmokeVerificationJavaE2E.java
@@ -21,8 +21,8 @@
 import java.nio.file.Path;
 import java.nio.file.Paths;
 import org.apache.flink.statefun.e2e.common.StatefulFunctionsAppContainers;
-import org.apache.flink.statefun.e2e.smoke.driver.testutils.SmokeRunner;
-import org.apache.flink.statefun.e2e.smoke.driver.testutils.SmokeRunnerParameters;
+import org.apache.flink.statefun.e2e.smoke.SmokeRunner;
+import org.apache.flink.statefun.e2e.smoke.SmokeRunnerParameters;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-multilang-base/pom.xml b/statefun-e2e-tests/statefun-smoke-e2e-multilang-base/pom.xml
index 3f947c9..f73cbe3 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-multilang-base/pom.xml
+++ b/statefun-e2e-tests/statefun-smoke-e2e-multilang-base/pom.xml
@@ -29,7 +29,7 @@
     <packaging>pom</packaging>
 
     <dependencies>
-        <!-- Testcontainer test utilities -->
+        <!-- Testcontainer test utilities, e.g. StatefulFunctionsAppContainers -->
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>statefun-e2e-tests-common</artifactId>
@@ -37,12 +37,11 @@
             <scope>test</scope>
         </dependency>
 
-        <!-- Smoke E2E Driver test utilities -->
+        <!-- Smoke E2E test utilities, e.g. SmokeRunner -->
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>statefun-smoke-e2e-driver</artifactId>
-            <version>3.1-SNAPSHOT</version>
-            <type>test-jar</type>
+            <artifactId>statefun-smoke-e2e-common</artifactId>
+            <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
     </dependencies>
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-multilang-harness/pom.xml b/statefun-e2e-tests/statefun-smoke-e2e-multilang-harness/pom.xml
index 1cb0b5f..3e65a8b 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-multilang-harness/pom.xml
+++ b/statefun-e2e-tests/statefun-smoke-e2e-multilang-harness/pom.xml
@@ -46,9 +46,8 @@
         <!-- Test scope dependencies -->
         <dependency>
             <groupId>org.apache.flink</groupId>
-            <artifactId>statefun-smoke-e2e-driver</artifactId>
-            <version>3.1-SNAPSHOT</version>
-            <type>test-jar</type>
+            <artifactId>statefun-smoke-e2e-common</artifactId>
+            <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
         <dependency>
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-multilang-harness/src/test/java/org/apache/flink/statefun/e2e/smoke/multilang/harness/MultiLangSmokeHarnessTest.java b/statefun-e2e-tests/statefun-smoke-e2e-multilang-harness/src/test/java/org/apache/flink/statefun/e2e/smoke/multilang/harness/MultiLangSmokeHarnessTest.java
index 784e76a..f194129 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-multilang-harness/src/test/java/org/apache/flink/statefun/e2e/smoke/multilang/harness/MultiLangSmokeHarnessTest.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-multilang-harness/src/test/java/org/apache/flink/statefun/e2e/smoke/multilang/harness/MultiLangSmokeHarnessTest.java
@@ -18,12 +18,11 @@
 
 package org.apache.flink.statefun.e2e.smoke.multilang.harness;
 
-import static org.apache.flink.statefun.e2e.smoke.driver.testutils.Utils.awaitVerificationSuccess;
-import static org.apache.flink.statefun.e2e.smoke.driver.testutils.Utils.startVerificationServer;
+import static org.apache.flink.statefun.e2e.smoke.SmokeRunner.awaitVerificationSuccess;
 
+import org.apache.flink.statefun.e2e.smoke.SimpleVerificationServer;
+import org.apache.flink.statefun.e2e.smoke.SmokeRunnerParameters;
 import org.apache.flink.statefun.e2e.smoke.driver.DriverModule;
-import org.apache.flink.statefun.e2e.smoke.driver.testutils.SimpleVerificationServer;
-import org.apache.flink.statefun.e2e.smoke.driver.testutils.SmokeRunnerParameters;
 import org.apache.flink.statefun.flink.harness.Harness;
 import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
 import org.junit.Ignore;
@@ -79,7 +78,7 @@
     harness.withConfiguration("state.checkpoints.dir", "file:///tmp/checkpoints");
 
     // start the verification server
-    SimpleVerificationServer.StartedServer started = startVerificationServer();
+    SimpleVerificationServer.StartedServer started = new SimpleVerificationServer().start();
 
     // configure test parameters.
     SmokeRunnerParameters parameters = new SmokeRunnerParameters();