[FLINK-17516] [e2e] Add verification app for exactly-once E2E
diff --git a/statefun-e2e-tests/pom.xml b/statefun-e2e-tests/pom.xml
index e77429a..13f2178 100644
--- a/statefun-e2e-tests/pom.xml
+++ b/statefun-e2e-tests/pom.xml
@@ -32,6 +32,7 @@
<module>statefun-e2e-tests-common</module>
<module>statefun-sanity-e2e</module>
<module>statefun-routable-kafka-e2e</module>
+ <module>statefun-exactly-once-e2e</module>
</modules>
<build>
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/pom.xml b/statefun-e2e-tests/statefun-exactly-once-e2e/pom.xml
new file mode 100644
index 0000000..0ea4d6a
--- /dev/null
+++ b/statefun-e2e-tests/statefun-exactly-once-e2e/pom.xml
@@ -0,0 +1,109 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xmlns="http://maven.apache.org/POM/4.0.0"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>statefun-e2e-tests</artifactId>
+ <groupId>org.apache.flink</groupId>
+ <version>2.1-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>statefun-exactly-once-e2e</artifactId>
+
+ <properties>
+ <testcontainers.version>1.12.5</testcontainers.version>
+ </properties>
+
+ <dependencies>
+ <!-- Stateful Functions -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>statefun-sdk</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>statefun-kafka-io</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <!-- Protobuf -->
+ <dependency>
+ <groupId>com.google.protobuf</groupId>
+ <artifactId>protobuf-java</artifactId>
+ <version>${protobuf.version}</version>
+ </dependency>
+
+ <!-- logging -->
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>1.7.15</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ <version>1.2.17</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>okhttp</artifactId>
+ <version>3.14.6</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- End-to-end test common -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>statefun-e2e-tests-common</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- Testcontainers KafkaContainer -->
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>kafka</artifactId>
+ <version>${testcontainers.version}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>com.github.os72</groupId>
+ <artifactId>protoc-jar-maven-plugin</artifactId>
+ <version>${protoc-jar-maven-plugin.version}</version>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <configuration>
+ <excludePackageNames>org.apache.flink.statefun.examples.greeter.generated</excludePackageNames>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/Constants.java b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/Constants.java
new file mode 100644
index 0000000..ab66809
--- /dev/null
+++ b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/Constants.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.e2e.exactlyonce;
+
+import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.InvokeCount;
+import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.WrappedMessage;
+import org.apache.flink.statefun.sdk.io.EgressIdentifier;
+import org.apache.flink.statefun.sdk.io.IngressIdentifier;
+
+final class Constants {
+
+ private Constants() {}
+
+ static final String KAFKA_BOOTSTRAP_SERVERS_CONF = "kafka-bootstrap-servers";
+
+ static final IngressIdentifier<WrappedMessage> INGRESS_ID =
+ new IngressIdentifier<>(
+ WrappedMessage.class, "org.apache.flink.e2e.exactlyonce", "wrapped-messages");
+
+ static final EgressIdentifier<InvokeCount> EGRESS_ID =
+ new EgressIdentifier<>(
+ "org.apache.flink.e2e.exactlyonce", "invoke-counts", InvokeCount.class);
+}
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/ExactlyOnceVerificationModule.java b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/ExactlyOnceVerificationModule.java
new file mode 100644
index 0000000..9dbb465
--- /dev/null
+++ b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/ExactlyOnceVerificationModule.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.e2e.exactlyonce;
+
+import com.google.auto.service.AutoService;
+import java.util.Map;
+import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.WrappedMessage;
+import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
+
+/**
+ * This is a a simple application used for testing end-to-end exactly-once semantics.
+ *
+ * <p>The application reads {@link WrappedMessage}s from a Kafka ingress which gets routed to {@link
+ * FnUnwrapper} functions, which in turn simply forwards the messages to {@link FnCounter} functions
+ * with specified target keys defined in the wrapped message. The counter function keeps count of
+ * the number of times each key as been invoked, and sinks that count to an exactly-once delivery
+ * Kafka egress for verification.
+ */
+@AutoService(StatefulFunctionModule.class)
+public class ExactlyOnceVerificationModule implements StatefulFunctionModule {
+
+ @Override
+ public void configure(Map<String, String> globalConfiguration, Binder binder) {
+ String kafkaBootstrapServers = globalConfiguration.get(Constants.KAFKA_BOOTSTRAP_SERVERS_CONF);
+ if (kafkaBootstrapServers == null) {
+ throw new IllegalStateException(
+ "Missing required global configuration " + Constants.KAFKA_BOOTSTRAP_SERVERS_CONF);
+ }
+
+ configureKafkaIO(kafkaBootstrapServers, binder);
+ configureAddressTaggerFunctions(binder);
+ }
+
+ private static void configureKafkaIO(String kafkaAddress, Binder binder) {
+ final KafkaIO kafkaIO = new KafkaIO(kafkaAddress);
+
+ binder.bindIngress(kafkaIO.getIngressSpec());
+ binder.bindIngressRouter(
+ Constants.INGRESS_ID,
+ ((message, downstream) -> downstream.forward(FnUnwrapper.TYPE, message.getKey(), message)));
+
+ binder.bindEgress(kafkaIO.getEgressSpec());
+ }
+
+ private static void configureAddressTaggerFunctions(Binder binder) {
+ binder.bindFunctionProvider(FnUnwrapper.TYPE, ignored -> new FnUnwrapper());
+ binder.bindFunctionProvider(FnCounter.TYPE, ignored -> new FnCounter());
+ }
+}
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/FnCounter.java b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/FnCounter.java
new file mode 100644
index 0000000..5243ebd
--- /dev/null
+++ b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/FnCounter.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.e2e.exactlyonce;
+
+import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.InvokeCount;
+import org.apache.flink.statefun.sdk.Context;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.StatefulFunction;
+import org.apache.flink.statefun.sdk.annotations.Persisted;
+import org.apache.flink.statefun.sdk.state.PersistedValue;
+
+final class FnCounter implements StatefulFunction {
+
+ static final FunctionType TYPE = new FunctionType("org.apache.flink.e2e.exactlyonce", "counter");
+
+ @Persisted
+ private final PersistedValue<Integer> invokeCountState =
+ PersistedValue.of("invoke-count", Integer.class);
+
+ @Override
+ public void invoke(Context context, Object input) {
+ final int previousCount = invokeCountState.getOrDefault(0);
+ final int currentCount = previousCount + 1;
+
+ final InvokeCount invokeCount =
+ InvokeCount.newBuilder().setId(context.self().id()).setInvokeCount(currentCount).build();
+ invokeCountState.set(currentCount);
+
+ context.send(Constants.EGRESS_ID, invokeCount);
+ }
+}
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/FnUnwrapper.java b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/FnUnwrapper.java
new file mode 100644
index 0000000..990e545
--- /dev/null
+++ b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/FnUnwrapper.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.e2e.exactlyonce;
+
+import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.WrappedMessage;
+import org.apache.flink.statefun.sdk.Context;
+import org.apache.flink.statefun.sdk.FunctionType;
+import org.apache.flink.statefun.sdk.StatefulFunction;
+
+final class FnUnwrapper implements StatefulFunction {
+
+ static final FunctionType TYPE =
+ new FunctionType("org.apache.flink.e2e.exactlyonce", "unwrapper");
+
+ @Override
+ public void invoke(Context context, Object input) {
+ final WrappedMessage message = requireWrappedMessage(input);
+ context.send(FnCounter.TYPE, message.getInvokeTargetId(), message);
+ }
+
+ private static WrappedMessage requireWrappedMessage(Object input) {
+ return (WrappedMessage) input;
+ }
+}
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/KafkaIO.java b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/KafkaIO.java
new file mode 100644
index 0000000..4df60ca
--- /dev/null
+++ b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/KafkaIO.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.statefun.e2e.exactlyonce;
+
+import java.time.Duration;
+import java.util.Objects;
+import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.InvokeCount;
+import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.WrappedMessage;
+import org.apache.flink.statefun.sdk.io.EgressSpec;
+import org.apache.flink.statefun.sdk.io.IngressSpec;
+import org.apache.flink.statefun.sdk.kafka.KafkaEgressBuilder;
+import org.apache.flink.statefun.sdk.kafka.KafkaEgressSerializer;
+import org.apache.flink.statefun.sdk.kafka.KafkaIngressBuilder;
+import org.apache.flink.statefun.sdk.kafka.KafkaIngressDeserializer;
+import org.apache.flink.statefun.sdk.kafka.KafkaIngressStartupPosition;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerRecord;
+
+final class KafkaIO {
+
+ static final String WRAPPED_MESSAGES_TOPIC_NAME = "wrapped-messages";
+ static final String INVOKE_COUNTS_TOPIC_NAME = "invoke-counts";
+
+ private final String kafkaAddress;
+
+ KafkaIO(String kafkaAddress) {
+ this.kafkaAddress = Objects.requireNonNull(kafkaAddress);
+ }
+
+ IngressSpec<WrappedMessage> getIngressSpec() {
+ return KafkaIngressBuilder.forIdentifier(Constants.INGRESS_ID)
+ .withTopic(KafkaIO.WRAPPED_MESSAGES_TOPIC_NAME)
+ .withKafkaAddress(kafkaAddress)
+ .withStartupPosition(KafkaIngressStartupPosition.fromEarliest())
+ .withConsumerGroupId("exactly-once-e2e")
+ .withDeserializer(WrappedMessageKafkaDeserializer.class)
+ .build();
+ }
+
+ EgressSpec<InvokeCount> getEgressSpec() {
+ return KafkaEgressBuilder.forIdentifier(Constants.EGRESS_ID)
+ .withKafkaAddress(kafkaAddress)
+ .withExactlyOnceProducerSemantics(Duration.ofMinutes(1))
+ .withSerializer(InvokeCountKafkaSerializer.class)
+ .build();
+ }
+
+ private static final class WrappedMessageKafkaDeserializer
+ implements KafkaIngressDeserializer<WrappedMessage> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public WrappedMessage deserialize(ConsumerRecord<byte[], byte[]> input) {
+ try {
+ return WrappedMessage.parseFrom(input.value());
+ } catch (Exception e) {
+ throw new RuntimeException("Error deserializing messages", e);
+ }
+ }
+ }
+
+ private static final class InvokeCountKafkaSerializer
+ implements KafkaEgressSerializer<InvokeCount> {
+
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public ProducerRecord<byte[], byte[]> serialize(InvokeCount invokeCount) {
+ final byte[] key = invokeCount.getIdBytes().toByteArray();
+ final byte[] value = invokeCount.toByteArray();
+
+ return new ProducerRecord<>(INVOKE_COUNTS_TOPIC_NAME, key, value);
+ }
+ }
+}
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/protobuf/exactly-once-verification.proto b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/protobuf/exactly-once-verification.proto
new file mode 100644
index 0000000..5e8b41a
--- /dev/null
+++ b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/protobuf/exactly-once-verification.proto
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+syntax = "proto3";
+
+package org.apache.flink.statefun.e2e.exactlyonce;
+option java_package = "org.apache.flink.statefun.e2e.exactlyonce.generated";
+option java_multiple_files = false;
+
+message WrappedMessage {
+ string invokeTargetId = 1;
+ string key = 2;
+}
+
+message FnAddress {
+ string namespace = 1;
+ string type = 2;
+ string id = 3;
+}
+
+message InvokeCount {
+ string id = 1;
+ int32 invokeCount = 2;
+}