[hotfix] [e2e] Pull smoke E2E test utilities and Commands into a common module
diff --git a/statefun-e2e-tests/pom.xml b/statefun-e2e-tests/pom.xml
index 2e29272..9a3dae9 100644
--- a/statefun-e2e-tests/pom.xml
+++ b/statefun-e2e-tests/pom.xml
@@ -36,6 +36,7 @@
<module>statefun-e2e-tests-common</module>
<module>statefun-sanity-e2e</module>
<module>statefun-exactly-once-remote-e2e</module>
+ <module>statefun-smoke-e2e-common</module>
<module>statefun-smoke-e2e-driver</module>
<module>statefun-smoke-e2e-embedded</module>
<module>statefun-smoke-e2e-multilang-base</module>
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-common/pom.xml b/statefun-e2e-tests/statefun-smoke-e2e-common/pom.xml
new file mode 100644
index 0000000..fe77231
--- /dev/null
+++ b/statefun-e2e-tests/statefun-smoke-e2e-common/pom.xml
@@ -0,0 +1,104 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>statefun-e2e-tests</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>3.1-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>statefun-smoke-e2e-common</artifactId>
+
+ <properties>
+ <commons-math3.version>3.5</commons-math3.version>
+ <additional-sources.dir>target/additional-sources</additional-sources.dir>
+ </properties>
+
+ <dependencies>
+ <!-- StatefulFunctionsAppContainers -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>statefun-e2e-tests-common</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!-- Protobuf Commands messages -->
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>${protobuf.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-shaded-jackson</artifactId>
+ <version>2.12.1-13.0</version>
+ </dependency>
+
+ <!-- logging -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>1.7.15</version>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>1.2.17</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <!-- Generate the Command messages -->
+ <plugin>
+ <groupId>com.github.os72</groupId>
+ <artifactId>protoc-jar-maven-plugin</artifactId>
+ <version>${protoc-jar-maven-plugin.version}</version>
+ <executions>
+ <execution>
+ <id>generate-protobuf-sources</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ <configuration>
+ <includeStdTypes>true</includeStdTypes>
+ <protocVersion>${protobuf.version}</protocVersion>
+ <cleanOutputFolder>true</cleanOutputFolder>
+ <inputDirectories>
+ <inputDirectory>src/main/protobuf</inputDirectory>
+ </inputDirectories>
+ <outputDirectory>${basedir}/target/generated-sources/protoc-jar</outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/driver/testutils/SimpleVerificationServer.java b/statefun-e2e-tests/statefun-smoke-e2e-common/src/main/java/org/apache/flink/statefun/e2e/smoke/SimpleVerificationServer.java
similarity index 97%
rename from statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/driver/testutils/SimpleVerificationServer.java
rename to statefun-e2e-tests/statefun-smoke-e2e-common/src/main/java/org/apache/flink/statefun/e2e/smoke/SimpleVerificationServer.java
index 0dd83f7..cc02ebc 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/driver/testutils/SimpleVerificationServer.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-common/src/main/java/org/apache/flink/statefun/e2e/smoke/SimpleVerificationServer.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.statefun.e2e.smoke.driver.testutils;
+package org.apache.flink.statefun.e2e.smoke;
import java.io.IOException;
import java.io.InputStream;
@@ -46,7 +46,7 @@
this.executor = MoreExecutors.newCachedDaemonThreadPool();
}
- StartedServer start() {
+ public StartedServer start() {
if (!started.compareAndSet(false, true)) {
throw new IllegalArgumentException("Already started.");
}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/driver/testutils/SmokeRunner.java b/statefun-e2e-tests/statefun-smoke-e2e-common/src/main/java/org/apache/flink/statefun/e2e/smoke/SmokeRunner.java
similarity index 70%
rename from statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/driver/testutils/SmokeRunner.java
rename to statefun-e2e-tests/statefun-smoke-e2e-common/src/main/java/org/apache/flink/statefun/e2e/smoke/SmokeRunner.java
index db89b41..b982a38 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/driver/testutils/SmokeRunner.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-common/src/main/java/org/apache/flink/statefun/e2e/smoke/SmokeRunner.java
@@ -16,12 +16,13 @@
* limitations under the License.
*/
-package org.apache.flink.statefun.e2e.smoke.driver.testutils;
+package org.apache.flink.statefun.e2e.smoke;
-import static org.apache.flink.statefun.e2e.smoke.driver.testutils.Utils.awaitVerificationSuccess;
-import static org.apache.flink.statefun.e2e.smoke.driver.testutils.Utils.startVerificationServer;
-
+import java.util.HashSet;
+import java.util.Set;
+import java.util.function.Supplier;
import org.apache.flink.statefun.e2e.common.StatefulFunctionsAppContainers;
+import org.apache.flink.statefun.e2e.smoke.generated.VerificationResult;
import org.apache.flink.util.function.ThrowingRunnable;
import org.junit.runner.Description;
import org.junit.runners.model.Statement;
@@ -36,7 +37,7 @@
SmokeRunnerParameters parameters, StatefulFunctionsAppContainers.Builder builder)
throws Throwable {
// start verification server
- SimpleVerificationServer.StartedServer server = startVerificationServer();
+ SimpleVerificationServer.StartedServer server = new SimpleVerificationServer().start();
parameters.setVerificationServerHost("host.testcontainers.internal");
parameters.setVerificationServerPort(server.port());
Testcontainers.exposeHostPorts(server.port());
@@ -68,4 +69,23 @@
statement.evaluate();
}
+
+ public static void awaitVerificationSuccess(
+ Supplier<VerificationResult> results, final int numberOfFunctionInstances) {
+ Set<Integer> successfullyVerified = new HashSet<>();
+ while (successfullyVerified.size() != numberOfFunctionInstances) {
+ VerificationResult result = results.get();
+ if (result.getActual() == result.getExpected()) {
+ successfullyVerified.add(result.getId());
+ } else if (result.getActual() > result.getExpected()) {
+ throw new AssertionError(
+ "Over counted. Expected: "
+ + result.getExpected()
+ + ", actual: "
+ + result.getActual()
+ + ", function: "
+ + result.getId());
+ }
+ }
+ }
}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/common/ModuleParameters.java b/statefun-e2e-tests/statefun-smoke-e2e-common/src/main/java/org/apache/flink/statefun/e2e/smoke/SmokeRunnerParameters.java
similarity index 94%
rename from statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/common/ModuleParameters.java
rename to statefun-e2e-tests/statefun-smoke-e2e-common/src/main/java/org/apache/flink/statefun/e2e/smoke/SmokeRunnerParameters.java
index 0a99fea..fb19032 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/common/ModuleParameters.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-common/src/main/java/org/apache/flink/statefun/e2e/smoke/SmokeRunnerParameters.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.statefun.e2e.smoke.common;
+package org.apache.flink.statefun.e2e.smoke;
import java.io.Serializable;
import java.util.Map;
@@ -24,7 +24,7 @@
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
@SuppressWarnings("unused")
-public final class ModuleParameters implements Serializable {
+public final class SmokeRunnerParameters implements Serializable {
private static final long serialVersionUID = 1;
@@ -44,10 +44,10 @@
private boolean isAsyncOpSupported = false;
/** Creates an instance of ModuleParameters from a key-value map. */
- public static ModuleParameters from(Map<String, String> globalConfiguration) {
+ public static SmokeRunnerParameters from(Map<String, String> globalConfiguration) {
ObjectMapper mapper = new ObjectMapper();
mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
- return mapper.convertValue(globalConfiguration, ModuleParameters.class);
+ return mapper.convertValue(globalConfiguration, SmokeRunnerParameters.class);
}
public Map<String, String> asMap() {
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/protobuf/commands.proto b/statefun-e2e-tests/statefun-smoke-e2e-common/src/main/protobuf/commands.proto
similarity index 100%
rename from statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/protobuf/commands.proto
rename to statefun-e2e-tests/statefun-smoke-e2e-common/src/main/protobuf/commands.proto
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/common/ModuleParametersTest.java b/statefun-e2e-tests/statefun-smoke-e2e-common/src/test/java/org/apache/flink/statefun/e2e/smoke/SmokeRunnerParametersTest.java
similarity index 77%
rename from statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/common/ModuleParametersTest.java
rename to statefun-e2e-tests/statefun-smoke-e2e-common/src/test/java/org/apache/flink/statefun/e2e/smoke/SmokeRunnerParametersTest.java
index 927ad9d..027c8ab 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/common/ModuleParametersTest.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-common/src/test/java/org/apache/flink/statefun/e2e/smoke/SmokeRunnerParametersTest.java
@@ -16,31 +16,31 @@
* limitations under the License.
*/
-package org.apache.flink.statefun.e2e.smoke.common;
+package org.apache.flink.statefun.e2e.smoke;
import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.Assert.assertThat;
import java.util.Collections;
import java.util.Map;
import org.junit.Test;
-public class ModuleParametersTest {
+public class SmokeRunnerParametersTest {
@Test
public void exampleUsage() {
Map<String, String> keys = Collections.singletonMap("messageCount", "1");
- ModuleParameters parameters = ModuleParameters.from(keys);
+ SmokeRunnerParameters parameters = SmokeRunnerParameters.from(keys);
assertThat(parameters.getMessageCount(), is(1));
}
@Test
public void roundTrip() {
- ModuleParameters original = new ModuleParameters();
+ SmokeRunnerParameters original = new SmokeRunnerParameters();
original.setCommandDepth(1234);
- ModuleParameters deserialized = ModuleParameters.from(original.asMap());
+ SmokeRunnerParameters deserialized = SmokeRunnerParameters.from(original.asMap());
assertThat(deserialized.getCommandDepth(), is(1234));
}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-driver/pom.xml b/statefun-e2e-tests/statefun-smoke-e2e-driver/pom.xml
index 78388ad..8ee004c 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-driver/pom.xml
+++ b/statefun-e2e-tests/statefun-smoke-e2e-driver/pom.xml
@@ -33,7 +33,7 @@
</properties>
<dependencies>
- <!-- Stateful Functions -->
+ <!-- Stateful Functions SDK for composing and binding the driver components -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>statefun-sdk-embedded</artifactId>
@@ -56,6 +56,11 @@
<artifactId>commons-math3</artifactId>
<version>${commons-math3.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>statefun-smoke-e2e-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- Protobuf -->
<dependency>
@@ -95,33 +100,6 @@
</exclusion>
</exclusions>
</dependency>
-
- <!-- logging -->
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <version>1.7.15</version>
- </dependency>
- <dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- <version>1.2.17</version>
- </dependency>
-
- <!-- End-to-end test common -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>statefun-e2e-tests-common</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- <exclusions>
- <!-- conflicts with flink-core -->
- <exclusion>
- <groupId>com.kohlschutter.junixsocket</groupId>
- <artifactId>junixsocket-native-common</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
</dependencies>
<build>
@@ -186,16 +164,21 @@
</executions>
</plugin>
- <!-- package the test jar for other modules to include -->
+ <!-- driver module should be built into a fat-jar -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-jar-plugin</artifactId>
- <version>2.4</version>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>3.0.0</version>
<executions>
+ <!-- Run shade goal on package phase -->
<execution>
+ <phase>package</phase>
<goals>
- <goal>test-jar</goal>
+ <goal>shade</goal>
</goals>
+ <configuration>
+ <createDependencyReducedPom>false</createDependencyReducedPom>
+ </configuration>
</execution>
</executions>
</plugin>
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/CommandFlinkSource.java b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/CommandFlinkSource.java
index d3580f8..f878b3f 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/CommandFlinkSource.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/CommandFlinkSource.java
@@ -17,7 +17,7 @@
*/
package org.apache.flink.statefun.e2e.smoke.driver;
-import static org.apache.flink.statefun.e2e.smoke.common.Types.packSourceCommand;
+import static org.apache.flink.statefun.e2e.smoke.driver.Types.packSourceCommand;
import static org.apache.flink.statefun.e2e.smoke.generated.Command.Verify;
import static org.apache.flink.statefun.e2e.smoke.generated.Command.newBuilder;
@@ -34,7 +34,7 @@
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
-import org.apache.flink.statefun.e2e.smoke.common.ModuleParameters;
+import org.apache.flink.statefun.e2e.smoke.SmokeRunnerParameters;
import org.apache.flink.statefun.e2e.smoke.generated.Command;
import org.apache.flink.statefun.e2e.smoke.generated.Commands;
import org.apache.flink.statefun.e2e.smoke.generated.SourceCommand;
@@ -48,13 +48,13 @@
/**
* A Flink Source that Emits {@link SourceCommand}s.
*
- * <p>This source is configured by {@link ModuleParameters} and would generate random commands,
+ * <p>This source is configured by {@link SmokeRunnerParameters} and would generate random commands,
* addressed to various functions. This source might also throw exceptions (kaboom) to simulate
* failures.
*
- * <p>After generating {@link ModuleParameters#getMessageCount()} messages, this source will switch
- * to {@code verification} step. At this step, it would keep sending (every 2 seconds) a {@link
- * Verify} command to every function indefinitely.
+ * <p>After generating {@link SmokeRunnerParameters#getMessageCount()} messages, this source will
+ * switch to {@code verification} step. At this step, it would keep sending (every 2 seconds) a
+ * {@link Verify} command to every function indefinitely.
*/
final class CommandFlinkSource extends RichSourceFunction<TypedValue>
implements CheckpointedFunction, CheckpointListener {
@@ -65,7 +65,7 @@
// Configuration
// ------------------------------------------------------------------------------------------------------------
- private final ModuleParameters moduleParameters;
+ private final SmokeRunnerParameters parameters;
// ------------------------------------------------------------------------------------------------------------
// Runtime
@@ -78,8 +78,8 @@
private transient boolean done;
private transient boolean atLeastOneCheckpointCompleted;
- public CommandFlinkSource(ModuleParameters moduleParameters) {
- this.moduleParameters = Objects.requireNonNull(moduleParameters);
+ public CommandFlinkSource(SmokeRunnerParameters parameters) {
+ this.parameters = Objects.requireNonNull(parameters);
}
@Override
@@ -95,7 +95,7 @@
SourceSnapshot sourceSnapshot =
getOnlyElement(sourceSnapshotHandle.get(), SourceSnapshot.getDefaultInstance());
functionStateTracker =
- new FunctionStateTracker(moduleParameters.getNumberOfFunctionInstances())
+ new FunctionStateTracker(this.parameters.getNumberOfFunctionInstances())
.apply(sourceSnapshot.getTracker());
commandsSentSoFar = sourceSnapshot.getCommandsSentSoFarHandle();
failuresSoFar = sourceSnapshot.getFailuresGeneratedSoFar();
@@ -111,11 +111,10 @@
.setFailuresGeneratedSoFar(failuresSoFar)
.build());
- if (commandsSentSoFar < moduleParameters.getMessageCount()) {
- double perCent = 100.0d * (commandsSentSoFar) / moduleParameters.getMessageCount();
+ if (commandsSentSoFar < parameters.getMessageCount()) {
+ double perCent = 100.0d * (commandsSentSoFar) / parameters.getMessageCount();
LOG.info(
- "Commands sent {} / {} ({} %)",
- commandsSentSoFar, moduleParameters.getMessageCount(), perCent);
+ "Commands sent {} / {} ({} %)", commandsSentSoFar, parameters.getMessageCount(), perCent);
}
}
@@ -150,7 +149,7 @@
private void generate(SourceContext<TypedValue> ctx) {
final int startPosition = this.commandsSentSoFar;
final OptionalInt kaboomIndex =
- computeFailureIndex(startPosition, failuresSoFar, moduleParameters.getMaxFailures());
+ computeFailureIndex(startPosition, failuresSoFar, parameters.getMaxFailures());
if (kaboomIndex.isPresent()) {
failuresSoFar++;
}
@@ -158,11 +157,10 @@
"starting at {}, kaboom at {}, total messages {}",
startPosition,
kaboomIndex,
- moduleParameters.getMessageCount());
- Supplier<SourceCommand> generator =
- new CommandGenerator(new JDKRandomGenerator(), moduleParameters);
+ parameters.getMessageCount());
+ Supplier<SourceCommand> generator = new CommandGenerator(new JDKRandomGenerator(), parameters);
FunctionStateTracker functionStateTracker = this.functionStateTracker;
- for (int i = startPosition; i < moduleParameters.getMessageCount(); i++) {
+ for (int i = startPosition; i < parameters.getMessageCount(); i++) {
if (atLeastOneCheckpointCompleted && kaboomIndex.isPresent() && i >= kaboomIndex.getAsInt()) {
throw new RuntimeException("KABOOM!!!");
}
@@ -181,7 +179,7 @@
private void verify(SourceContext<TypedValue> ctx) {
FunctionStateTracker functionStateTracker = this.functionStateTracker;
- for (int i = 0; i < moduleParameters.getNumberOfFunctionInstances(); i++) {
+ for (int i = 0; i < parameters.getNumberOfFunctionInstances(); i++) {
final long expected = functionStateTracker.stateOf(i);
Command.Builder verify = newBuilder().setVerify(Verify.newBuilder().setExpected(expected));
@@ -205,11 +203,10 @@
if (failureSoFar >= maxFailures) {
return OptionalInt.empty();
}
- if (startPosition >= moduleParameters.getMessageCount()) {
+ if (startPosition >= parameters.getMessageCount()) {
return OptionalInt.empty();
}
- int index =
- ThreadLocalRandom.current().nextInt(startPosition, moduleParameters.getMessageCount());
+ int index = ThreadLocalRandom.current().nextInt(startPosition, parameters.getMessageCount());
return OptionalInt.of(index);
}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/CommandGenerator.java b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/CommandGenerator.java
index ae44b4f..fa399ee 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/CommandGenerator.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/CommandGenerator.java
@@ -27,13 +27,13 @@
import org.apache.commons.math3.distribution.EnumeratedDistribution;
import org.apache.commons.math3.random.RandomGenerator;
import org.apache.commons.math3.util.Pair;
-import org.apache.flink.statefun.e2e.smoke.common.ModuleParameters;
+import org.apache.flink.statefun.e2e.smoke.SmokeRunnerParameters;
import org.apache.flink.statefun.e2e.smoke.generated.Command;
import org.apache.flink.statefun.e2e.smoke.generated.Commands;
import org.apache.flink.statefun.e2e.smoke.generated.SourceCommand;
/**
- * Generates random commands to be interpreted by {@linkplain CommandInterpreter}.
+ * Generates random commands to be interpreted by functions of type {@link Constants#FN_TYPE}.
*
* <p>see {src/main/protobuf/commands.proto}
*/
@@ -41,17 +41,17 @@
private final RandomGenerator random;
private final EnumeratedDistribution<Gen> distribution;
- private final ModuleParameters moduleParameters;
+ private final SmokeRunnerParameters parameters;
- public CommandGenerator(RandomGenerator random, ModuleParameters parameters) {
+ public CommandGenerator(RandomGenerator random, SmokeRunnerParameters parameters) {
this.random = Objects.requireNonNull(random);
- this.moduleParameters = Objects.requireNonNull(parameters);
+ this.parameters = Objects.requireNonNull(parameters);
this.distribution = new EnumeratedDistribution<>(random, randomCommandGenerators());
}
@Override
public SourceCommand get() {
- final int depth = random.nextInt(moduleParameters.getCommandDepth());
+ final int depth = random.nextInt(parameters.getCommandDepth());
return SourceCommand.newBuilder().setTarget(address()).setCommands(commands(depth)).build();
}
@@ -61,7 +61,7 @@
StateModifyGen.instance().generate(builder, depth);
return builder;
}
- final int n = random.nextInt(moduleParameters.getMaxCommandsPerDepth());
+ final int n = random.nextInt(parameters.getMaxCommandsPerDepth());
for (int i = 0; i < n; i++) {
Gen gen = distribution.sample();
gen.generate(builder, depth);
@@ -73,20 +73,20 @@
}
private int address() {
- return random.nextInt(moduleParameters.getNumberOfFunctionInstances());
+ return random.nextInt(parameters.getNumberOfFunctionInstances());
}
private List<Pair<Gen, Double>> randomCommandGenerators() {
List<Pair<Gen, Double>> list =
new ArrayList<>(
asList(
- create(new StateModifyGen(), moduleParameters.getStateModificationsPr()),
- create(new SendGen(), moduleParameters.getSendPr()),
- create(new SendAfterGen(), moduleParameters.getSendAfterPr()),
- create(new Noop(), moduleParameters.getNoopPr()),
- create(new SendEgress(), moduleParameters.getSendEgressPr())));
- if (moduleParameters.isAsyncOpSupported()) {
- list.add(create(new SendAsyncOp(), moduleParameters.getAsyncSendPr()));
+ create(new StateModifyGen(), parameters.getStateModificationsPr()),
+ create(new SendGen(), parameters.getSendPr()),
+ create(new SendAfterGen(), parameters.getSendAfterPr()),
+ create(new Noop(), parameters.getNoopPr()),
+ create(new SendEgress(), parameters.getSendEgressPr())));
+ if (parameters.isAsyncOpSupported()) {
+ list.add(create(new SendAsyncOp(), parameters.getAsyncSendPr()));
}
return list;
}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/CommandRouter.java b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/CommandRouter.java
index 1717f73..3503043 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/CommandRouter.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/CommandRouter.java
@@ -17,11 +17,9 @@
*/
package org.apache.flink.statefun.e2e.smoke.driver;
-import static org.apache.flink.statefun.e2e.smoke.common.Types.unpackSourceCommand;
+import static org.apache.flink.statefun.e2e.smoke.driver.Types.unpackSourceCommand;
import java.util.Objects;
-import org.apache.flink.statefun.e2e.smoke.common.Constants;
-import org.apache.flink.statefun.e2e.smoke.common.Ids;
import org.apache.flink.statefun.e2e.smoke.generated.SourceCommand;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.io.Router;
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/common/Constants.java b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/Constants.java
similarity index 97%
rename from statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/common/Constants.java
rename to statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/Constants.java
index 62131e5..8fee0d7 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/common/Constants.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/Constants.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.statefun.e2e.smoke.common;
+package org.apache.flink.statefun.e2e.smoke.driver;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/DriverModule.java b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/DriverModule.java
index a93b4e6..26c9115 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/DriverModule.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/DriverModule.java
@@ -17,17 +17,15 @@
*/
package org.apache.flink.statefun.e2e.smoke.driver;
-import static org.apache.flink.statefun.e2e.smoke.common.Constants.IN;
-import static org.apache.flink.statefun.e2e.smoke.common.Types.unpackVerificationResult;
+import static org.apache.flink.statefun.e2e.smoke.driver.Constants.IN;
+import static org.apache.flink.statefun.e2e.smoke.driver.Types.unpackVerificationResult;
import com.google.auto.service.AutoService;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Map;
import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.statefun.e2e.smoke.common.Constants;
-import org.apache.flink.statefun.e2e.smoke.common.Ids;
-import org.apache.flink.statefun.e2e.smoke.common.ModuleParameters;
+import org.apache.flink.statefun.e2e.smoke.SmokeRunnerParameters;
import org.apache.flink.statefun.e2e.smoke.generated.VerificationResult;
import org.apache.flink.statefun.flink.io.datastream.SinkFunctionSpec;
import org.apache.flink.statefun.flink.io.datastream.SourceFunctionSpec;
@@ -44,20 +42,20 @@
@Override
public void configure(Map<String, String> globalConfiguration, Binder binder) {
- ModuleParameters moduleParameters = ModuleParameters.from(globalConfiguration);
- LOG.info(moduleParameters.toString());
+ SmokeRunnerParameters parameters = SmokeRunnerParameters.from(globalConfiguration);
+ LOG.info(parameters.toString());
- Ids ids = new Ids(moduleParameters.getNumberOfFunctionInstances());
+ Ids ids = new Ids(parameters.getNumberOfFunctionInstances());
- binder.bindIngress(new SourceFunctionSpec<>(IN, new CommandFlinkSource(moduleParameters)));
+ binder.bindIngress(new SourceFunctionSpec<>(IN, new CommandFlinkSource(parameters)));
binder.bindIngressRouter(IN, new CommandRouter(ids));
binder.bindEgress(new SinkFunctionSpec<>(Constants.OUT, new DiscardingSink<>()));
SocketClientSink<TypedValue> client =
new SocketClientSink<>(
- moduleParameters.getVerificationServerHost(),
- moduleParameters.getVerificationServerPort(),
+ parameters.getVerificationServerHost(),
+ parameters.getVerificationServerPort(),
new VerificationResultSerializer(),
3,
true);
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/common/Ids.java b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/Ids.java
similarity index 95%
rename from statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/common/Ids.java
rename to statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/Ids.java
index 3f795ba..afd1932 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/common/Ids.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/Ids.java
@@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.statefun.e2e.smoke.common;
+package org.apache.flink.statefun.e2e.smoke.driver;
public final class Ids {
private final String[] cache;
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/common/Types.java b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/Types.java
similarity index 98%
rename from statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/common/Types.java
rename to statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/Types.java
index aaf3c18..94bde28 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/common/Types.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/main/java/org/apache/flink/statefun/e2e/smoke/driver/Types.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.statefun.e2e.smoke.common;
+package org.apache.flink.statefun.e2e.smoke.driver;
import org.apache.flink.statefun.e2e.smoke.generated.Commands;
import org.apache.flink.statefun.e2e.smoke.generated.SourceCommand;
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/driver/CommandGeneratorTest.java b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/driver/CommandGeneratorTest.java
index e74662f..92fa5cd 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/driver/CommandGeneratorTest.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/driver/CommandGeneratorTest.java
@@ -22,7 +22,7 @@
import static org.hamcrest.MatcherAssert.assertThat;
import org.apache.commons.math3.random.JDKRandomGenerator;
-import org.apache.flink.statefun.e2e.smoke.common.ModuleParameters;
+import org.apache.flink.statefun.e2e.smoke.SmokeRunnerParameters;
import org.apache.flink.statefun.e2e.smoke.generated.SourceCommand;
import org.junit.Test;
@@ -30,7 +30,7 @@
@Test
public void usageExample() {
- ModuleParameters parameters = new ModuleParameters();
+ SmokeRunnerParameters parameters = new SmokeRunnerParameters();
parameters.setAsyncOpSupported(true);
CommandGenerator generator = new CommandGenerator(new JDKRandomGenerator(), parameters);
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/driver/FunctionStateTrackerTest.java b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/driver/FunctionStateTrackerTest.java
index 6030600..806290d 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/driver/FunctionStateTrackerTest.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/driver/FunctionStateTrackerTest.java
@@ -18,8 +18,8 @@
package org.apache.flink.statefun.e2e.smoke.driver;
-import static org.apache.flink.statefun.e2e.smoke.driver.testutils.Utils.aRelayedStateModificationCommand;
-import static org.apache.flink.statefun.e2e.smoke.driver.testutils.Utils.aStateModificationCommand;
+import static org.apache.flink.statefun.e2e.smoke.testutils.Utils.aRelayedStateModificationCommand;
+import static org.apache.flink.statefun.e2e.smoke.testutils.Utils.aStateModificationCommand;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/driver/testutils/SmokeRunnerParameters.java b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/driver/testutils/SmokeRunnerParameters.java
deleted file mode 100644
index 3ac2247..0000000
--- a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/driver/testutils/SmokeRunnerParameters.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flink.statefun.e2e.smoke.driver.testutils;
-
-import java.io.Serializable;
-import java.util.Map;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.type.TypeReference;
-import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
-
-@SuppressWarnings("unused")
-public final class SmokeRunnerParameters implements Serializable {
-
- private static final long serialVersionUID = 1;
-
- private int numberOfFunctionInstances = 1_000;
- private int commandDepth = 10;
- private int messageCount = 100_000;
- private int maxCommandsPerDepth = 3;
- private double stateModificationsPr = 0.4;
- private double sendPr = 0.9;
- private double sendAfterPr = 0.1;
- private double asyncSendPr = 0.1;
- private double noopPr = 0.2;
- private double sendEgressPr = 0.03;
- private int maxFailures = 1;
- private String verificationServerHost = "localhost";
- private int verificationServerPort = 5050;
- private boolean isAsyncOpSupported = false;
-
- public Map<String, String> asMap() {
- ObjectMapper mapper = new ObjectMapper();
- return mapper.convertValue(this, new TypeReference<Map<String, String>>() {});
- }
-
- public int getNumberOfFunctionInstances() {
- return numberOfFunctionInstances;
- }
-
- public void setNumberOfFunctionInstances(int numberOfFunctionInstances) {
- this.numberOfFunctionInstances = numberOfFunctionInstances;
- }
-
- public int getCommandDepth() {
- return commandDepth;
- }
-
- public void setCommandDepth(int commandDepth) {
- this.commandDepth = commandDepth;
- }
-
- public int getMessageCount() {
- return messageCount;
- }
-
- public void setMessageCount(int messageCount) {
- this.messageCount = messageCount;
- }
-
- public int getMaxCommandsPerDepth() {
- return maxCommandsPerDepth;
- }
-
- public void setMaxCommandsPerDepth(int maxCommandsPerDepth) {
- this.maxCommandsPerDepth = maxCommandsPerDepth;
- }
-
- public double getStateModificationsPr() {
- return stateModificationsPr;
- }
-
- public void setStateModificationsPr(double stateModificationsPr) {
- this.stateModificationsPr = stateModificationsPr;
- }
-
- public double getSendPr() {
- return sendPr;
- }
-
- public void setSendPr(double sendPr) {
- this.sendPr = sendPr;
- }
-
- public double getSendAfterPr() {
- return sendAfterPr;
- }
-
- public void setSendAfterPr(double sendAfterPr) {
- this.sendAfterPr = sendAfterPr;
- }
-
- public double getAsyncSendPr() {
- return asyncSendPr;
- }
-
- public void setAsyncSendPr(double asyncSendPr) {
- this.asyncSendPr = asyncSendPr;
- }
-
- public double getNoopPr() {
- return noopPr;
- }
-
- public void setNoopPr(double noopPr) {
- this.noopPr = noopPr;
- }
-
- public double getSendEgressPr() {
- return sendEgressPr;
- }
-
- public void setSendEgressPr(double sendEgressPr) {
- this.sendEgressPr = sendEgressPr;
- }
-
- public void setMaxFailures(int maxFailures) {
- this.maxFailures = maxFailures;
- }
-
- public int getMaxFailures() {
- return maxFailures;
- }
-
- public String getVerificationServerHost() {
- return verificationServerHost;
- }
-
- public void setVerificationServerHost(String verificationServerHost) {
- this.verificationServerHost = verificationServerHost;
- }
-
- public int getVerificationServerPort() {
- return verificationServerPort;
- }
-
- public void setVerificationServerPort(int verificationServerPort) {
- this.verificationServerPort = verificationServerPort;
- }
-
- public boolean isAsyncOpSupported() {
- return isAsyncOpSupported;
- }
-
- public void setAsyncOpSupported(boolean asyncOpSupported) {
- isAsyncOpSupported = asyncOpSupported;
- }
-
- @Override
- public String toString() {
- return "ModuleParameters{"
- + "numberOfFunctionInstances="
- + numberOfFunctionInstances
- + ", commandDepth="
- + commandDepth
- + ", messageCount="
- + messageCount
- + ", maxCommandsPerDepth="
- + maxCommandsPerDepth
- + ", stateModificationsPr="
- + stateModificationsPr
- + ", sendPr="
- + sendPr
- + ", sendAfterPr="
- + sendAfterPr
- + ", asyncSendPr="
- + asyncSendPr
- + ", noopPr="
- + noopPr
- + ", sendEgressPr="
- + sendEgressPr
- + ", maxFailures="
- + maxFailures
- + ", verificationServerHost='"
- + verificationServerHost
- + '\''
- + ", verificationServerPort="
- + verificationServerPort
- + ", isAsyncOpSupported="
- + isAsyncOpSupported
- + '}';
- }
-}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/driver/testutils/Utils.java b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/testutils/Utils.java
similarity index 61%
rename from statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/driver/testutils/Utils.java
rename to statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/testutils/Utils.java
index 424f021..d978cfc 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/driver/testutils/Utils.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-driver/src/test/java/org/apache/flink/statefun/e2e/smoke/testutils/Utils.java
@@ -15,15 +15,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.flink.statefun.e2e.smoke.driver.testutils;
+package org.apache.flink.statefun.e2e.smoke.testutils;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.function.Supplier;
import org.apache.flink.statefun.e2e.smoke.generated.Command;
import org.apache.flink.statefun.e2e.smoke.generated.Commands;
import org.apache.flink.statefun.e2e.smoke.generated.SourceCommand;
-import org.apache.flink.statefun.e2e.smoke.generated.VerificationResult;
public class Utils {
@@ -57,30 +53,4 @@
private static Command.Builder modify() {
return Command.newBuilder().setIncrement(Command.IncrementState.getDefaultInstance());
}
-
- /** Blocks the currently executing thread until enough successful verification results supply. */
- public static void awaitVerificationSuccess(
- Supplier<VerificationResult> results, final int numberOfFunctionInstances) {
- Set<Integer> successfullyVerified = new HashSet<>();
- while (successfullyVerified.size() != numberOfFunctionInstances) {
- VerificationResult result = results.get();
- if (result.getActual() == result.getExpected()) {
- successfullyVerified.add(result.getId());
- } else if (result.getActual() > result.getExpected()) {
- throw new AssertionError(
- "Over counted. Expected: "
- + result.getExpected()
- + ", actual: "
- + result.getActual()
- + ", function: "
- + result.getId());
- }
- }
- }
-
- /** starts a simple verification TCP server that accepts {@link com.google.protobuf.Any}. */
- public static SimpleVerificationServer.StartedServer startVerificationServer() {
- SimpleVerificationServer server = new SimpleVerificationServer();
- return server.start();
- }
}
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-embedded/pom.xml b/statefun-e2e-tests/statefun-smoke-e2e-embedded/pom.xml
index cfbd741..41194ca 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-embedded/pom.xml
+++ b/statefun-e2e-tests/statefun-smoke-e2e-embedded/pom.xml
@@ -28,20 +28,6 @@
<artifactId>statefun-smoke-e2e-embedded</artifactId>
<dependencies>
- <!-- End-to-end test common -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>statefun-e2e-tests-common</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- <exclusions>
- <!-- conflicts with flink-core -->
- <exclusion>
- <groupId>com.kohlschutter.junixsocket</groupId>
- <artifactId>junixsocket-native-common</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>statefun-flink-harness</artifactId>
@@ -59,15 +45,15 @@
<!-- smoke end-to-end common -->
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>statefun-smoke-e2e-driver</artifactId>
- <version>3.1-SNAPSHOT</version>
+ <artifactId>statefun-smoke-e2e-common</artifactId>
+ <version>${project.version}</version>
</dependency>
+
+ <!-- Reusing some classes from the driver module, e.g. Ids and Types -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>statefun-smoke-e2e-driver</artifactId>
<version>3.1-SNAPSHOT</version>
- <type>test-jar</type>
- <scope>test</scope>
</dependency>
</dependencies>
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/main/java/org/apache/flink/statefun/e2e/smoke/embedded/CommandInterpreter.java b/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/main/java/org/apache/flink/statefun/e2e/smoke/embedded/CommandInterpreter.java
index 5cc5bd5..a4da76d 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/main/java/org/apache/flink/statefun/e2e/smoke/embedded/CommandInterpreter.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/main/java/org/apache/flink/statefun/e2e/smoke/embedded/CommandInterpreter.java
@@ -17,13 +17,13 @@
*/
package org.apache.flink.statefun.e2e.smoke.embedded;
-import static org.apache.flink.statefun.e2e.smoke.common.Types.*;
+import static org.apache.flink.statefun.e2e.smoke.driver.Types.*;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
-import org.apache.flink.statefun.e2e.smoke.common.Constants;
-import org.apache.flink.statefun.e2e.smoke.common.Ids;
+import org.apache.flink.statefun.e2e.smoke.driver.Constants;
+import org.apache.flink.statefun.e2e.smoke.driver.Ids;
import org.apache.flink.statefun.e2e.smoke.generated.Command;
import org.apache.flink.statefun.e2e.smoke.generated.Commands;
import org.apache.flink.statefun.e2e.smoke.generated.SourceCommand;
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/main/java/org/apache/flink/statefun/e2e/smoke/embedded/EmbeddedFnModule.java b/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/main/java/org/apache/flink/statefun/e2e/smoke/embedded/EmbeddedFnModule.java
index e17e1bc..4d24d45 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/main/java/org/apache/flink/statefun/e2e/smoke/embedded/EmbeddedFnModule.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/main/java/org/apache/flink/statefun/e2e/smoke/embedded/EmbeddedFnModule.java
@@ -19,9 +19,9 @@
import com.google.auto.service.AutoService;
import java.util.Map;
-import org.apache.flink.statefun.e2e.smoke.common.Constants;
-import org.apache.flink.statefun.e2e.smoke.common.Ids;
-import org.apache.flink.statefun.e2e.smoke.common.ModuleParameters;
+import org.apache.flink.statefun.e2e.smoke.SmokeRunnerParameters;
+import org.apache.flink.statefun.e2e.smoke.driver.Constants;
+import org.apache.flink.statefun.e2e.smoke.driver.Ids;
import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,10 +32,10 @@
@Override
public void configure(Map<String, String> globalConfiguration, Binder binder) {
- ModuleParameters moduleParameters = ModuleParameters.from(globalConfiguration);
- LOG.info(moduleParameters.toString());
+ SmokeRunnerParameters parameters = SmokeRunnerParameters.from(globalConfiguration);
+ LOG.info(parameters.toString());
- Ids ids = new Ids(moduleParameters.getNumberOfFunctionInstances());
+ Ids ids = new Ids(parameters.getNumberOfFunctionInstances());
FunctionProvider provider = new FunctionProvider(ids);
binder.bindFunctionProvider(Constants.FN_TYPE, provider);
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/main/java/org/apache/flink/statefun/e2e/smoke/embedded/FunctionProvider.java b/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/main/java/org/apache/flink/statefun/e2e/smoke/embedded/FunctionProvider.java
index 78e163f..98aa8b4 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/main/java/org/apache/flink/statefun/e2e/smoke/embedded/FunctionProvider.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/main/java/org/apache/flink/statefun/e2e/smoke/embedded/FunctionProvider.java
@@ -18,7 +18,7 @@
package org.apache.flink.statefun.e2e.smoke.embedded;
import java.util.Objects;
-import org.apache.flink.statefun.e2e.smoke.common.Ids;
+import org.apache.flink.statefun.e2e.smoke.driver.Ids;
import org.apache.flink.statefun.sdk.FunctionType;
import org.apache.flink.statefun.sdk.StatefulFunction;
import org.apache.flink.statefun.sdk.StatefulFunctionProvider;
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/test/java/org/apache/flink/statefun/e2e/smoke/embedded/EmbeddedSmokeHarnessTest.java b/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/test/java/org/apache/flink/statefun/e2e/smoke/embedded/EmbeddedSmokeHarnessTest.java
index f8b8c07..925ebd1 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/test/java/org/apache/flink/statefun/e2e/smoke/embedded/EmbeddedSmokeHarnessTest.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/test/java/org/apache/flink/statefun/e2e/smoke/embedded/EmbeddedSmokeHarnessTest.java
@@ -18,11 +18,10 @@
package org.apache.flink.statefun.e2e.smoke.embedded;
-import static org.apache.flink.statefun.e2e.smoke.driver.testutils.Utils.awaitVerificationSuccess;
-import static org.apache.flink.statefun.e2e.smoke.driver.testutils.Utils.startVerificationServer;
+import static org.apache.flink.statefun.e2e.smoke.SmokeRunner.awaitVerificationSuccess;
-import org.apache.flink.statefun.e2e.smoke.driver.testutils.SimpleVerificationServer;
-import org.apache.flink.statefun.e2e.smoke.driver.testutils.SmokeRunnerParameters;
+import org.apache.flink.statefun.e2e.smoke.SimpleVerificationServer;
+import org.apache.flink.statefun.e2e.smoke.SmokeRunnerParameters;
import org.apache.flink.statefun.flink.harness.Harness;
import org.junit.Ignore;
import org.junit.Test;
@@ -52,7 +51,7 @@
harness.withConfiguration("state.checkpoints.dir", "file:///tmp/checkpoints");
// start the verification server
- SimpleVerificationServer.StartedServer started = startVerificationServer();
+ SimpleVerificationServer.StartedServer started = new SimpleVerificationServer().start();
// configure test parameters.
SmokeRunnerParameters parameters = new SmokeRunnerParameters();
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/test/java/org/apache/flink/statefun/e2e/smoke/embedded/SmokeVerificationEmbeddedE2E.java b/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/test/java/org/apache/flink/statefun/e2e/smoke/embedded/SmokeVerificationEmbeddedE2E.java
index b3d96fd..cc7a0b4 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/test/java/org/apache/flink/statefun/e2e/smoke/embedded/SmokeVerificationEmbeddedE2E.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-embedded/src/test/java/org/apache/flink/statefun/e2e/smoke/embedded/SmokeVerificationEmbeddedE2E.java
@@ -19,8 +19,8 @@
package org.apache.flink.statefun.e2e.smoke.embedded;
import org.apache.flink.statefun.e2e.common.StatefulFunctionsAppContainers;
-import org.apache.flink.statefun.e2e.smoke.driver.testutils.SmokeRunner;
-import org.apache.flink.statefun.e2e.smoke.driver.testutils.SmokeRunnerParameters;
+import org.apache.flink.statefun.e2e.smoke.SmokeRunner;
+import org.apache.flink.statefun.e2e.smoke.SmokeRunnerParameters;
import org.junit.Test;
public class SmokeVerificationEmbeddedE2E {
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-java/pom.xml b/statefun-e2e-tests/statefun-smoke-e2e-java/pom.xml
index b3240d9..034a4c7 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-java/pom.xml
+++ b/statefun-e2e-tests/statefun-smoke-e2e-java/pom.xml
@@ -49,6 +49,31 @@
<build>
<plugins>
+ <!-- Generate the Command messages -->
+ <plugin>
+ <groupId>com.github.os72</groupId>
+ <artifactId>protoc-jar-maven-plugin</artifactId>
+ <version>${protoc-jar-maven-plugin.version}</version>
+ <executions>
+ <execution>
+ <id>generate-protobuf-sources</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>run</goal>
+ </goals>
+ <configuration>
+ <includeStdTypes>true</includeStdTypes>
+ <protocVersion>${protobuf.version}</protocVersion>
+ <cleanOutputFolder>true</cleanOutputFolder>
+ <inputDirectories>
+ <inputDirectory>src/main/protobuf</inputDirectory>
+ </inputDirectories>
+ <outputDirectory>${basedir}/target/generated-sources/protoc-jar</outputDirectory>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+
<!-- build uber jar for launching the remote function -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
@@ -74,31 +99,6 @@
</execution>
</executions>
</plugin>
-
- <!-- Generate the Command messages -->
- <plugin>
- <groupId>com.github.os72</groupId>
- <artifactId>protoc-jar-maven-plugin</artifactId>
- <version>${protoc-jar-maven-plugin.version}</version>
- <executions>
- <execution>
- <id>generate-protobuf-sources</id>
- <phase>generate-sources</phase>
- <goals>
- <goal>run</goal>
- </goals>
- <configuration>
- <includeStdTypes>true</includeStdTypes>
- <protocVersion>${protobuf.version}</protocVersion>
- <cleanOutputFolder>true</cleanOutputFolder>
- <inputDirectories>
- <inputDirectory>src/main/protobuf</inputDirectory>
- </inputDirectories>
- <outputDirectory>${basedir}/target/generated-sources/protoc-jar</outputDirectory>
- </configuration>
- </execution>
- </executions>
- </plugin>
</plugins>
</build>
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/java/org/apache/flink/statefun/e2e/smoke/java/SmokeVerificationJavaE2E.java b/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/java/org/apache/flink/statefun/e2e/smoke/java/SmokeVerificationJavaE2E.java
index 7d7c332..3e2fb1d 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/java/org/apache/flink/statefun/e2e/smoke/java/SmokeVerificationJavaE2E.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-java/src/test/java/org/apache/flink/statefun/e2e/smoke/java/SmokeVerificationJavaE2E.java
@@ -21,8 +21,8 @@
import java.nio.file.Path;
import java.nio.file.Paths;
import org.apache.flink.statefun.e2e.common.StatefulFunctionsAppContainers;
-import org.apache.flink.statefun.e2e.smoke.driver.testutils.SmokeRunner;
-import org.apache.flink.statefun.e2e.smoke.driver.testutils.SmokeRunnerParameters;
+import org.apache.flink.statefun.e2e.smoke.SmokeRunner;
+import org.apache.flink.statefun.e2e.smoke.SmokeRunnerParameters;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-multilang-base/pom.xml b/statefun-e2e-tests/statefun-smoke-e2e-multilang-base/pom.xml
index 3f947c9..f73cbe3 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-multilang-base/pom.xml
+++ b/statefun-e2e-tests/statefun-smoke-e2e-multilang-base/pom.xml
@@ -29,7 +29,7 @@
<packaging>pom</packaging>
<dependencies>
- <!-- Testcontainer test utilities -->
+ <!-- Testcontainer test utilities, e.g. StatefulFunctionsAppContainers -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>statefun-e2e-tests-common</artifactId>
@@ -37,12 +37,11 @@
<scope>test</scope>
</dependency>
- <!-- Smoke E2E Driver test utilities -->
+ <!-- Smoke E2E test utilities, e.g. SmokeRunner -->
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>statefun-smoke-e2e-driver</artifactId>
- <version>3.1-SNAPSHOT</version>
- <type>test-jar</type>
+ <artifactId>statefun-smoke-e2e-common</artifactId>
+ <version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-multilang-harness/pom.xml b/statefun-e2e-tests/statefun-smoke-e2e-multilang-harness/pom.xml
index 1cb0b5f..3e65a8b 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-multilang-harness/pom.xml
+++ b/statefun-e2e-tests/statefun-smoke-e2e-multilang-harness/pom.xml
@@ -46,9 +46,8 @@
<!-- Test scope dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
- <artifactId>statefun-smoke-e2e-driver</artifactId>
- <version>3.1-SNAPSHOT</version>
- <type>test-jar</type>
+ <artifactId>statefun-smoke-e2e-common</artifactId>
+ <version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
diff --git a/statefun-e2e-tests/statefun-smoke-e2e-multilang-harness/src/test/java/org/apache/flink/statefun/e2e/smoke/multilang/harness/MultiLangSmokeHarnessTest.java b/statefun-e2e-tests/statefun-smoke-e2e-multilang-harness/src/test/java/org/apache/flink/statefun/e2e/smoke/multilang/harness/MultiLangSmokeHarnessTest.java
index 784e76a..f194129 100644
--- a/statefun-e2e-tests/statefun-smoke-e2e-multilang-harness/src/test/java/org/apache/flink/statefun/e2e/smoke/multilang/harness/MultiLangSmokeHarnessTest.java
+++ b/statefun-e2e-tests/statefun-smoke-e2e-multilang-harness/src/test/java/org/apache/flink/statefun/e2e/smoke/multilang/harness/MultiLangSmokeHarnessTest.java
@@ -18,12 +18,11 @@
package org.apache.flink.statefun.e2e.smoke.multilang.harness;
-import static org.apache.flink.statefun.e2e.smoke.driver.testutils.Utils.awaitVerificationSuccess;
-import static org.apache.flink.statefun.e2e.smoke.driver.testutils.Utils.startVerificationServer;
+import static org.apache.flink.statefun.e2e.smoke.SmokeRunner.awaitVerificationSuccess;
+import org.apache.flink.statefun.e2e.smoke.SimpleVerificationServer;
+import org.apache.flink.statefun.e2e.smoke.SmokeRunnerParameters;
import org.apache.flink.statefun.e2e.smoke.driver.DriverModule;
-import org.apache.flink.statefun.e2e.smoke.driver.testutils.SimpleVerificationServer;
-import org.apache.flink.statefun.e2e.smoke.driver.testutils.SmokeRunnerParameters;
import org.apache.flink.statefun.flink.harness.Harness;
import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
import org.junit.Ignore;
@@ -79,7 +78,7 @@
harness.withConfiguration("state.checkpoints.dir", "file:///tmp/checkpoints");
// start the verification server
- SimpleVerificationServer.StartedServer started = startVerificationServer();
+ SimpleVerificationServer.StartedServer started = new SimpleVerificationServer().start();
// configure test parameters.
SmokeRunnerParameters parameters = new SmokeRunnerParameters();