[FLINK-17516] [e2e] Add verification app for exactly-once E2E
diff --git a/statefun-e2e-tests/pom.xml b/statefun-e2e-tests/pom.xml
index e77429a..13f2178 100644
--- a/statefun-e2e-tests/pom.xml
+++ b/statefun-e2e-tests/pom.xml
@@ -32,6 +32,7 @@
         <module>statefun-e2e-tests-common</module>
         <module>statefun-sanity-e2e</module>
         <module>statefun-routable-kafka-e2e</module>
+        <module>statefun-exactly-once-e2e</module>
     </modules>
 
     <build>
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/pom.xml b/statefun-e2e-tests/statefun-exactly-once-e2e/pom.xml
new file mode 100644
index 0000000..0ea4d6a
--- /dev/null
+++ b/statefun-e2e-tests/statefun-exactly-once-e2e/pom.xml
@@ -0,0 +1,109 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+  http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xmlns="http://maven.apache.org/POM/4.0.0"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>statefun-e2e-tests</artifactId>
+        <groupId>org.apache.flink</groupId>
+        <version>2.1-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>statefun-exactly-once-e2e</artifactId>
+
+    <properties>
+        <testcontainers.version>1.12.5</testcontainers.version>
+    </properties>
+
+    <dependencies>
+        <!-- Stateful Functions -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>statefun-sdk</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>statefun-kafka-io</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <!-- Protobuf -->
+        <dependency>
+            <groupId>com.google.protobuf</groupId>
+            <artifactId>protobuf-java</artifactId>
+            <version>${protobuf.version}</version>
+        </dependency>
+
+        <!-- logging -->
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <version>1.7.15</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
+            <version>1.2.17</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>okhttp</artifactId>
+            <version>3.14.6</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- End-to-end test common -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>statefun-e2e-tests-common</artifactId>
+            <version>${project.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- Testcontainers KafkaContainer -->
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>kafka</artifactId>
+            <version>${testcontainers.version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>com.github.os72</groupId>
+                <artifactId>protoc-jar-maven-plugin</artifactId>
+                <version>${protoc-jar-maven-plugin.version}</version>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-javadoc-plugin</artifactId>
+                <configuration>
+                    <excludePackageNames>org.apache.flink.statefun.examples.greeter.generated</excludePackageNames>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/Constants.java b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/Constants.java
new file mode 100644
index 0000000..ab66809
--- /dev/null
+++ b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/Constants.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.e2e.exactlyonce;
+
+import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.InvokeCount;
+import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.WrappedMessage;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+
+final class Constants {
+
+  private Constants() {}
+
+  static final String KAFKA_BOOTSTRAP_SERVERS_CONF = "kafka-bootstrap-servers";
+
+  static final IngressIdentifier<WrappedMessage> INGRESS_ID =
+      new IngressIdentifier<>(
+          WrappedMessage.class, "org.apache.flink.e2e.exactlyonce", "wrapped-messages");
+
+  static final EgressIdentifier<InvokeCount> EGRESS_ID =
+      new EgressIdentifier<>(
+          "org.apache.flink.e2e.exactlyonce", "invoke-counts", InvokeCount.class);
+}
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/ExactlyOnceVerificationModule.java b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/ExactlyOnceVerificationModule.java
new file mode 100644
index 0000000..9dbb465
--- /dev/null
+++ b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/ExactlyOnceVerificationModule.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.e2e.exactlyonce;
+
+import com.google.auto.service.AutoService;
+import java.util.Map;
+import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.WrappedMessage;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
+
+/**
+ * This is a a simple application used for testing end-to-end exactly-once semantics.
+ *
+ * <p>The application reads {@link WrappedMessage}s from a Kafka ingress which gets routed to {@link
+ * FnUnwrapper} functions, which in turn simply forwards the messages to {@link FnCounter} functions
+ * with specified target keys defined in the wrapped message. The counter function keeps count of
+ * the number of times each key as been invoked, and sinks that count to an exactly-once delivery
+ * Kafka egress for verification.
+ */
+@AutoService(StatefulFunctionModule.class)
+public class ExactlyOnceVerificationModule implements StatefulFunctionModule {
+
+  @Override
+  public void configure(Map<String, String> globalConfiguration, Binder binder) {
+    String kafkaBootstrapServers = globalConfiguration.get(Constants.KAFKA_BOOTSTRAP_SERVERS_CONF);
+    if (kafkaBootstrapServers == null) {
+      throw new IllegalStateException(
+          "Missing required global configuration " + Constants.KAFKA_BOOTSTRAP_SERVERS_CONF);
+    }
+
+    configureKafkaIO(kafkaBootstrapServers, binder);
+    configureAddressTaggerFunctions(binder);
+  }
+
+  private static void configureKafkaIO(String kafkaAddress, Binder binder) {
+    final KafkaIO kafkaIO = new KafkaIO(kafkaAddress);
+
+    binder.bindIngress(kafkaIO.getIngressSpec());
+    binder.bindIngressRouter(
+        Constants.INGRESS_ID,
+        ((message, downstream) -> downstream.forward(FnUnwrapper.TYPE, message.getKey(), message)));
+
+    binder.bindEgress(kafkaIO.getEgressSpec());
+  }
+
+  private static void configureAddressTaggerFunctions(Binder binder) {
+    binder.bindFunctionProvider(FnUnwrapper.TYPE, ignored -> new FnUnwrapper());
+    binder.bindFunctionProvider(FnCounter.TYPE, ignored -> new FnCounter());
+  }
+}
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/FnCounter.java b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/FnCounter.java
new file mode 100644
index 0000000..5243ebd
--- /dev/null
+++ b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/FnCounter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.e2e.exactlyonce;
+
+import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.InvokeCount;
+import org.apache.flink.statefun.sdk.Context;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.StatefulFunction;
+import org.apache.flink.statefun.sdk.annotations.Persisted;
+import org.apache.flink.statefun.sdk.state.PersistedValue;
+
+final class FnCounter implements StatefulFunction {
+
+  static final FunctionType TYPE = new FunctionType("org.apache.flink.e2e.exactlyonce", "counter");
+
+  @Persisted
+  private final PersistedValue<Integer> invokeCountState =
+      PersistedValue.of("invoke-count", Integer.class);
+
+  @Override
+  public void invoke(Context context, Object input) {
+    final int previousCount = invokeCountState.getOrDefault(0);
+    final int currentCount = previousCount + 1;
+
+    final InvokeCount invokeCount =
+        InvokeCount.newBuilder().setId(context.self().id()).setInvokeCount(currentCount).build();
+    invokeCountState.set(currentCount);
+
+    context.send(Constants.EGRESS_ID, invokeCount);
+  }
+}
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/FnUnwrapper.java b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/FnUnwrapper.java
new file mode 100644
index 0000000..990e545
--- /dev/null
+++ b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/FnUnwrapper.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.e2e.exactlyonce;
+
+import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.WrappedMessage;
+import org.apache.flink.statefun.sdk.Context;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.StatefulFunction;
+
+final class FnUnwrapper implements StatefulFunction {
+
+  static final FunctionType TYPE =
+      new FunctionType("org.apache.flink.e2e.exactlyonce", "unwrapper");
+
+  @Override
+  public void invoke(Context context, Object input) {
+    final WrappedMessage message = requireWrappedMessage(input);
+    context.send(FnCounter.TYPE, message.getInvokeTargetId(), message);
+  }
+
+  private static WrappedMessage requireWrappedMessage(Object input) {
+    return (WrappedMessage) input;
+  }
+}
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/KafkaIO.java b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/KafkaIO.java
new file mode 100644
index 0000000..4df60ca
--- /dev/null
+++ b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/KafkaIO.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.e2e.exactlyonce;
+
+import java.time.Duration;
+import java.util.Objects;
+import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.InvokeCount;
+import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.WrappedMessage;
+import org.apache.flink.statefun.sdk.io.EgressSpec;
+import org.apache.flink.statefun.sdk.io.IngressSpec;
+import org.apache.flink.statefun.sdk.kafka.KafkaEgressBuilder;
+import org.apache.flink.statefun.sdk.kafka.KafkaEgressSerializer;
+import org.apache.flink.statefun.sdk.kafka.KafkaIngressBuilder;
+import org.apache.flink.statefun.sdk.kafka.KafkaIngressDeserializer;
+import org.apache.flink.statefun.sdk.kafka.KafkaIngressStartupPosition;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+final class KafkaIO {
+
+  static final String WRAPPED_MESSAGES_TOPIC_NAME = "wrapped-messages";
+  static final String INVOKE_COUNTS_TOPIC_NAME = "invoke-counts";
+
+  private final String kafkaAddress;
+
+  KafkaIO(String kafkaAddress) {
+    this.kafkaAddress = Objects.requireNonNull(kafkaAddress);
+  }
+
+  IngressSpec<WrappedMessage> getIngressSpec() {
+    return KafkaIngressBuilder.forIdentifier(Constants.INGRESS_ID)
+        .withTopic(KafkaIO.WRAPPED_MESSAGES_TOPIC_NAME)
+        .withKafkaAddress(kafkaAddress)
+        .withStartupPosition(KafkaIngressStartupPosition.fromEarliest())
+        .withConsumerGroupId("exactly-once-e2e")
+        .withDeserializer(WrappedMessageKafkaDeserializer.class)
+        .build();
+  }
+
+  EgressSpec<InvokeCount> getEgressSpec() {
+    return KafkaEgressBuilder.forIdentifier(Constants.EGRESS_ID)
+        .withKafkaAddress(kafkaAddress)
+        .withExactlyOnceProducerSemantics(Duration.ofMinutes(1))
+        .withSerializer(InvokeCountKafkaSerializer.class)
+        .build();
+  }
+
+  private static final class WrappedMessageKafkaDeserializer
+      implements KafkaIngressDeserializer<WrappedMessage> {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public WrappedMessage deserialize(ConsumerRecord<byte[], byte[]> input) {
+      try {
+        return WrappedMessage.parseFrom(input.value());
+      } catch (Exception e) {
+        throw new RuntimeException("Error deserializing messages", e);
+      }
+    }
+  }
+
+  private static final class InvokeCountKafkaSerializer
+      implements KafkaEgressSerializer<InvokeCount> {
+
+    private static final long serialVersionUID = 1L;
+
+    @Override
+    public ProducerRecord<byte[], byte[]> serialize(InvokeCount invokeCount) {
+      final byte[] key = invokeCount.getIdBytes().toByteArray();
+      final byte[] value = invokeCount.toByteArray();
+
+      return new ProducerRecord<>(INVOKE_COUNTS_TOPIC_NAME, key, value);
+    }
+  }
+}
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/protobuf/exactly-once-verification.proto b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/protobuf/exactly-once-verification.proto
new file mode 100644
index 0000000..5e8b41a
--- /dev/null
+++ b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/protobuf/exactly-once-verification.proto
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+package org.apache.flink.statefun.e2e.exactlyonce;
+option java_package = "org.apache.flink.statefun.e2e.exactlyonce.generated";
+option java_multiple_files = false;
+
+message WrappedMessage {
+    string invokeTargetId = 1;
+    string key = 2;
+}
+
+message FnAddress {
+    string namespace = 1;
+    string type = 2;
+    string id = 3;
+}
+
+message InvokeCount {
+    string id = 1;
+    int32 invokeCount = 2;
+}