[hotfix] Add configuration for reproducible smoke tests

This closes #253.
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-common/src/main/java/org/apache/flink/statefun/e2e/smoke/SmokeRunnerParameters.java b/statefun-e2e-tests/statefun-smoke-e2e-common/src/main/java/org/apache/flink/statefun/e2e/smoke/SmokeRunnerParameters.java
index fb19032..7a6d6b2 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-common/src/main/java/org/apache/flink/statefun/e2e/smoke/SmokeRunnerParameters.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-common/src/main/java/org/apache/flink/statefun/e2e/smoke/SmokeRunnerParameters.java
@@ -42,6 +42,7 @@
   private String verificationServerHost = "localhost";
   private int verificationServerPort = 5050;
   private boolean isAsyncOpSupported = false;
+  private long randomGeneratorSeed = System.nanoTime();
 
   /** Creates an instance of ModuleParameters from a key-value map. */
   public static SmokeRunnerParameters from(Map<String, String> globalConfiguration) {
@@ -167,6 +168,14 @@
     isAsyncOpSupported = asyncOpSupported;
   }
 
+  public long getRandomGeneratorSeed() {
+    return randomGeneratorSeed;
+  }
+
+  public void setRandomGeneratorSeed(long randomGeneratorSeed) {
+    this.randomGeneratorSeed = randomGeneratorSeed;
+  }
+
   @Override
   public String toString() {
     return "ModuleParameters{"
@@ -199,6 +208,8 @@
         + verificationServerPort
         + ", isAsyncOpSupported="
         + isAsyncOpSupported
+        + ", randomGeneratorSeed="
+        + randomGeneratorSeed
         + '}';
   }
 }
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/CommandFlinkSource.java b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/CommandFlinkSource.java
index f878b3f..80dc713 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/CommandFlinkSource.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/CommandFlinkSource.java
@@ -27,11 +27,12 @@
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Supplier;
 import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.commons.math3.random.RandomGenerator;
+import org.apache.flink.api.common.state.CheckpointListener;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.OperatorStateStore;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.CheckpointListener;
 import org.apache.flink.runtime.state.FunctionInitializationContext;
 import org.apache.flink.runtime.state.FunctionSnapshotContext;
 import org.apache.flink.statefun.e2e.smoke.SmokeRunnerParameters;
@@ -154,11 +155,16 @@
       failuresSoFar++;
     }
     LOG.info(
-        "starting at {}, kaboom at {}, total messages {}",
+        "starting at {}, kaboom at {}, total messages {}, random command generator seed {}",
         startPosition,
         kaboomIndex,
-        parameters.getMessageCount());
-    Supplier<SourceCommand> generator = new CommandGenerator(new JDKRandomGenerator(), parameters);
+        parameters.getMessageCount(),
+        parameters.getRandomGeneratorSeed());
+
+    RandomGenerator random = new JDKRandomGenerator();
+    random.setSeed(parameters.getRandomGeneratorSeed());
+    Supplier<SourceCommand> generator = new CommandGenerator(random, parameters);
+
     FunctionStateTracker functionStateTracker = this.functionStateTracker;
     for (int i = startPosition; i < parameters.getMessageCount(); i++) {
       if (atLeastOneCheckpointCompleted && kaboomIndex.isPresent() && i >= kaboomIndex.getAsInt()) {