[hotfix] Add configuration for reproducible smoke tests
This closes #253.
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-common/src/main/java/org/apache/flink/statefun/e2e/smoke/SmokeRunnerParameters.java b/statefun-e2e-tests/statefun-smoke-e2e-common/src/main/java/org/apache/flink/statefun/e2e/smoke/SmokeRunnerParameters.java
index fb19032..7a6d6b2 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-common/src/main/java/org/apache/flink/statefun/e2e/smoke/SmokeRunnerParameters.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-common/src/main/java/org/apache/flink/statefun/e2e/smoke/SmokeRunnerParameters.java
@@ -42,6 +42,7 @@
private String verificationServerHost = "localhost";
private int verificationServerPort = 5050;
private boolean isAsyncOpSupported = false;
+ private long randomGeneratorSeed = System.nanoTime();
/** Creates an instance of ModuleParameters from a key-value map. */
public static SmokeRunnerParameters from(Map<String, String> globalConfiguration) {
@@ -167,6 +168,14 @@
isAsyncOpSupported = asyncOpSupported;
}
+ public long getRandomGeneratorSeed() {
+ return randomGeneratorSeed;
+ }
+
+ public void setRandomGeneratorSeed(long randomGeneratorSeed) {
+ this.randomGeneratorSeed = randomGeneratorSeed;
+ }
+
@Override
public String toString() {
return "ModuleParameters{"
@@ -199,6 +208,8 @@
+ verificationServerPort
+ ", isAsyncOpSupported="
+ isAsyncOpSupported
+ + ", randomGeneratorSeed="
+ + randomGeneratorSeed
+ '}';
}
}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/CommandFlinkSource.java b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/CommandFlinkSource.java
index f878b3f..80dc713 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/CommandFlinkSource.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/CommandFlinkSource.java
@@ -27,11 +27,12 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Supplier;
import org.apache.commons.math3.random.JDKRandomGenerator;
+import org.apache.commons.math3.random.RandomGenerator;
+import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.statefun.e2e.smoke.SmokeRunnerParameters;
@@ -154,11 +155,16 @@
failuresSoFar++;
}
LOG.info(
- "starting at {}, kaboom at {}, total messages {}",
+ "starting at {}, kaboom at {}, total messages {}, random command generator seed {}",
startPosition,
kaboomIndex,
- parameters.getMessageCount());
- Supplier<SourceCommand> generator = new CommandGenerator(new JDKRandomGenerator(), parameters);
+ parameters.getMessageCount(),
+ parameters.getRandomGeneratorSeed());
+
+ RandomGenerator random = new JDKRandomGenerator();
+ random.setSeed(parameters.getRandomGeneratorSeed());
+ Supplier<SourceCommand> generator = new CommandGenerator(random, parameters);
+
FunctionStateTracker functionStateTracker = this.functionStateTracker;
for (int i = startPosition; i < parameters.getMessageCount(); i++) {
if (atLeastOneCheckpointCompleted && kaboomIndex.isPresent() && i >= kaboomIndex.getAsInt()) {