[FLINK-19620] Add exactly-once verification to RemoteModuleE2E
diff --git a/statefun-e2e-tests/statefun-remote-module-e2e/src/test/java/org/apache/flink/statefun/e2e/remote/RemoteModuleE2E.java b/statefun-e2e-tests/statefun-remote-module-e2e/src/test/java/org/apache/flink/statefun/e2e/remote/RemoteModuleE2E.java
index 618150b..6976cb5 100644
--- a/statefun-e2e-tests/statefun-remote-module-e2e/src/test/java/org/apache/flink/statefun/e2e/remote/RemoteModuleE2E.java
+++ b/statefun-e2e-tests/statefun-remote-module-e2e/src/test/java/org/apache/flink/statefun/e2e/remote/RemoteModuleE2E.java
@@ -25,6 +25,7 @@
 import java.nio.file.Paths;
 import java.util.Collections;
 import java.util.Properties;
+import java.util.Random;
 import org.apache.flink.statefun.e2e.common.StatefulFunctionsAppContainers;
 import org.apache.flink.statefun.e2e.common.kafka.KafkaIOVerifier;
 import org.apache.flink.statefun.e2e.common.kafka.KafkaProtobufSerializer;
@@ -47,16 +48,23 @@
 import org.testcontainers.images.builder.ImageFromDockerfile;
 
 /**
- * End-to-end test for a completely YAML-based remote module setup.consisting of
+ * Exactly-once end-to-end test with a completely YAML-based remote module setup.
  *
  * <p>The setup consists of a auto-routable YAML Kafka ingress, the generic YAML Kafka egress, and
- * two Python. remote functions: 1) a simple invocation counter function, which gets routed invoke
+ * two Python remote functions: 1) a simple invocation counter function, which gets routed invoke
  * messages from the auto-routable Kafka ingress, and 2) a simple stateless forwarding. function,
  * which gets the invocation counts from the counter function and simply forwards them to the Kafka
  * egress.
  *
  * <p>We perform the extra stateless forwarding so that the E2E test scenario covers messaging
  * between remote functions.
+ *
+ * <p>After the first series of output is seen in the Kafka egress (which implies some checkpoints
+ * have been completed since the verification application is using exactly-once delivery), we
+ * restart a StateFun worker to simulate failure. The application should automatically attempt to
+ * recover and eventually restart. Meanwhile, more records are written to Kafka again. We verify
+ * that on the consumer side, the invocation counts increase sequentially for each key as if the
+ * failure did not occur.
  */
 public class RemoteModuleE2E {
 
@@ -70,9 +78,14 @@
 
   private static final String REMOTE_FUNCTION_HOST = "remote-function";
 
+  private static final int NUM_WORKERS = 2;
+
   @Rule
   public KafkaContainer kafka =
-      new KafkaContainer(CONFLUENT_PLATFORM_VERSION).withNetworkAliases(KAFKA_HOST);
+      new KafkaContainer(CONFLUENT_PLATFORM_VERSION)
+          .withNetworkAliases(KAFKA_HOST)
+          .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
+          .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1");
 
   @Rule
   public GenericContainer<?> remoteFunction =
@@ -82,7 +95,7 @@
 
   @Rule
   public StatefulFunctionsAppContainers verificationApp =
-      StatefulFunctionsAppContainers.builder("remote-module-verification", 2)
+      StatefulFunctionsAppContainers.builder("remote-module-verification", NUM_WORKERS)
           .dependsOn(kafka)
           .dependsOn(remoteFunction)
           .exposeMasterLogs(LOG)
@@ -109,6 +122,15 @@
         verifier.sending(invoke("foo"), invoke("foo"), invoke("bar")),
         verifier.resultsInAnyOrder(
             is(invokeResult("foo", 1)), is(invokeResult("foo", 2)), is(invokeResult("bar", 1))));
+
+    LOG.info(
+        "Restarting random worker to simulate failure. The application should automatically recover.");
+    verificationApp.restartWorker(randomWorkerIndex());
+
+    assertThat(
+        verifier.sending(invoke("foo"), invoke("foo"), invoke("bar")),
+        verifier.resultsInAnyOrder(
+            is(invokeResult("foo", 3)), is(invokeResult("foo", 4)), is(invokeResult("bar", 2))));
   }
 
   private static ImageFromDockerfile remoteFunctionImage() {
@@ -146,6 +168,7 @@
     consumerProps.setProperty("bootstrap.servers", bootstrapServers);
     consumerProps.setProperty("group.id", "remote-module-e2e");
     consumerProps.setProperty("auto.offset.reset", "earliest");
+    consumerProps.setProperty("isolation.level", "read_committed");
 
     KafkaConsumer<String, InvokeResult> consumer =
         new KafkaConsumer<>(
@@ -164,4 +187,8 @@
   private static InvokeResult invokeResult(String id, int invokeCount) {
     return InvokeResult.newBuilder().setId(id).setInvokeCount(invokeCount).build();
   }
+
+  private static int randomWorkerIndex() {
+    return new Random().nextInt(NUM_WORKERS);
+  }
 }
diff --git a/statefun-e2e-tests/statefun-remote-module-e2e/src/test/resources/remote-module/module.yaml b/statefun-e2e-tests/statefun-remote-module-e2e/src/test/resources/remote-module/module.yaml
index fc1e57c..f9d3c34 100644
--- a/statefun-e2e-tests/statefun-remote-module-e2e/src/test/resources/remote-module/module.yaml
+++ b/statefun-e2e-tests/statefun-remote-module-e2e/src/test/resources/remote-module/module.yaml
@@ -58,3 +58,6 @@
             id: org.apache.flink.statefun.e2e.remote/invoke-results
           spec:
             address: kafka-broker:9092
+            deliverySemantic:
+              type: exactly-once
+              transactionTimeoutMillis: 900000