[FLINK-19620] Remove original ExactlyOnceE2E
Since exactly-once is now verified by the RemoteModuleE2E, we remove the
duplicate test coverage of the old ExactlyOnceE2E
diff --git a/statefun-e2e-tests/pom.xml b/statefun-e2e-tests/pom.xml
index 8af2e92..c27d0ee 100644
--- a/statefun-e2e-tests/pom.xml
+++ b/statefun-e2e-tests/pom.xml
@@ -32,7 +32,6 @@
<module>statefun-e2e-tests-common</module>
<module>statefun-sanity-e2e</module>
<module>statefun-remote-module-e2e</module>
- <module>statefun-exactly-once-e2e</module>
</modules>
<build>
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/pom.xml b/statefun-e2e-tests/statefun-exactly-once-e2e/pom.xml
deleted file mode 100644
index a47895b..0000000
--- a/statefun-e2e-tests/statefun-exactly-once-e2e/pom.xml
+++ /dev/null
@@ -1,109 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xmlns="http://maven.apache.org/POM/4.0.0"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>statefun-e2e-tests</artifactId>
- <groupId>org.apache.flink</groupId>
- <version>2.3-SNAPSHOT</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>statefun-exactly-once-e2e</artifactId>
-
- <properties>
- <testcontainers.version>1.12.5</testcontainers.version>
- </properties>
-
- <dependencies>
- <!-- Stateful Functions -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>statefun-sdk</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>statefun-kafka-io</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <!-- Protobuf -->
- <dependency>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- <version>${protobuf.version}</version>
- </dependency>
-
- <!-- logging -->
- <dependency>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- <version>1.7.15</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>log4j</groupId>
- <artifactId>log4j</artifactId>
- <version>1.2.17</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>com.squareup.okhttp3</groupId>
- <artifactId>okhttp</artifactId>
- <version>3.14.6</version>
- <scope>test</scope>
- </dependency>
-
- <!-- End-to-end test common -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>statefun-e2e-tests-common</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <!-- Testcontainers KafkaContainer -->
- <dependency>
- <groupId>org.testcontainers</groupId>
- <artifactId>kafka</artifactId>
- <version>${testcontainers.version}</version>
- <scope>test</scope>
- </dependency>
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>com.github.os72</groupId>
- <artifactId>protoc-jar-maven-plugin</artifactId>
- <version>${protoc-jar-maven-plugin.version}</version>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-javadoc-plugin</artifactId>
- <configuration>
- <excludePackageNames>org.apache.flink.statefun.examples.greeter.generated</excludePackageNames>
- </configuration>
- </plugin>
- </plugins>
- </build>
-
-</project>
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/Constants.java b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/Constants.java
deleted file mode 100644
index ab66809..0000000
--- a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/Constants.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.exactlyonce;
-
-import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.InvokeCount;
-import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.WrappedMessage;
-import org.apache.flink.statefun.sdk.io.EgressIdentifier;
-import org.apache.flink.statefun.sdk.io.IngressIdentifier;
-
-final class Constants {
-
- private Constants() {}
-
- static final String KAFKA_BOOTSTRAP_SERVERS_CONF = "kafka-bootstrap-servers";
-
- static final IngressIdentifier<WrappedMessage> INGRESS_ID =
- new IngressIdentifier<>(
- WrappedMessage.class, "org.apache.flink.e2e.exactlyonce", "wrapped-messages");
-
- static final EgressIdentifier<InvokeCount> EGRESS_ID =
- new EgressIdentifier<>(
- "org.apache.flink.e2e.exactlyonce", "invoke-counts", InvokeCount.class);
-}
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/ExactlyOnceVerificationModule.java b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/ExactlyOnceVerificationModule.java
deleted file mode 100644
index 9dbb465..0000000
--- a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/ExactlyOnceVerificationModule.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.exactlyonce;
-
-import com.google.auto.service.AutoService;
-import java.util.Map;
-import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.WrappedMessage;
-import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
-
-/**
- * This is a a simple application used for testing end-to-end exactly-once semantics.
- *
- * <p>The application reads {@link WrappedMessage}s from a Kafka ingress which gets routed to {@link
- * FnUnwrapper} functions, which in turn simply forwards the messages to {@link FnCounter} functions
- * with specified target keys defined in the wrapped message. The counter function keeps count of
- * the number of times each key as been invoked, and sinks that count to an exactly-once delivery
- * Kafka egress for verification.
- */
-@AutoService(StatefulFunctionModule.class)
-public class ExactlyOnceVerificationModule implements StatefulFunctionModule {
-
- @Override
- public void configure(Map<String, String> globalConfiguration, Binder binder) {
- String kafkaBootstrapServers = globalConfiguration.get(Constants.KAFKA_BOOTSTRAP_SERVERS_CONF);
- if (kafkaBootstrapServers == null) {
- throw new IllegalStateException(
- "Missing required global configuration " + Constants.KAFKA_BOOTSTRAP_SERVERS_CONF);
- }
-
- configureKafkaIO(kafkaBootstrapServers, binder);
- configureAddressTaggerFunctions(binder);
- }
-
- private static void configureKafkaIO(String kafkaAddress, Binder binder) {
- final KafkaIO kafkaIO = new KafkaIO(kafkaAddress);
-
- binder.bindIngress(kafkaIO.getIngressSpec());
- binder.bindIngressRouter(
- Constants.INGRESS_ID,
- ((message, downstream) -> downstream.forward(FnUnwrapper.TYPE, message.getKey(), message)));
-
- binder.bindEgress(kafkaIO.getEgressSpec());
- }
-
- private static void configureAddressTaggerFunctions(Binder binder) {
- binder.bindFunctionProvider(FnUnwrapper.TYPE, ignored -> new FnUnwrapper());
- binder.bindFunctionProvider(FnCounter.TYPE, ignored -> new FnCounter());
- }
-}
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/FnCounter.java b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/FnCounter.java
deleted file mode 100644
index 5243ebd..0000000
--- a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/FnCounter.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.exactlyonce;
-
-import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.InvokeCount;
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.FunctionType;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-import org.apache.flink.statefun.sdk.annotations.Persisted;
-import org.apache.flink.statefun.sdk.state.PersistedValue;
-
-final class FnCounter implements StatefulFunction {
-
- static final FunctionType TYPE = new FunctionType("org.apache.flink.e2e.exactlyonce", "counter");
-
- @Persisted
- private final PersistedValue<Integer> invokeCountState =
- PersistedValue.of("invoke-count", Integer.class);
-
- @Override
- public void invoke(Context context, Object input) {
- final int previousCount = invokeCountState.getOrDefault(0);
- final int currentCount = previousCount + 1;
-
- final InvokeCount invokeCount =
- InvokeCount.newBuilder().setId(context.self().id()).setInvokeCount(currentCount).build();
- invokeCountState.set(currentCount);
-
- context.send(Constants.EGRESS_ID, invokeCount);
- }
-}
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/FnUnwrapper.java b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/FnUnwrapper.java
deleted file mode 100644
index 990e545..0000000
--- a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/FnUnwrapper.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.exactlyonce;
-
-import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.WrappedMessage;
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.FunctionType;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-
-final class FnUnwrapper implements StatefulFunction {
-
- static final FunctionType TYPE =
- new FunctionType("org.apache.flink.e2e.exactlyonce", "unwrapper");
-
- @Override
- public void invoke(Context context, Object input) {
- final WrappedMessage message = requireWrappedMessage(input);
- context.send(FnCounter.TYPE, message.getInvokeTargetId(), message);
- }
-
- private static WrappedMessage requireWrappedMessage(Object input) {
- return (WrappedMessage) input;
- }
-}
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/KafkaIO.java b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/KafkaIO.java
deleted file mode 100644
index 4df60ca..0000000
--- a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/KafkaIO.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.exactlyonce;
-
-import java.time.Duration;
-import java.util.Objects;
-import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.InvokeCount;
-import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.WrappedMessage;
-import org.apache.flink.statefun.sdk.io.EgressSpec;
-import org.apache.flink.statefun.sdk.io.IngressSpec;
-import org.apache.flink.statefun.sdk.kafka.KafkaEgressBuilder;
-import org.apache.flink.statefun.sdk.kafka.KafkaEgressSerializer;
-import org.apache.flink.statefun.sdk.kafka.KafkaIngressBuilder;
-import org.apache.flink.statefun.sdk.kafka.KafkaIngressDeserializer;
-import org.apache.flink.statefun.sdk.kafka.KafkaIngressStartupPosition;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.producer.ProducerRecord;
-
-final class KafkaIO {
-
- static final String WRAPPED_MESSAGES_TOPIC_NAME = "wrapped-messages";
- static final String INVOKE_COUNTS_TOPIC_NAME = "invoke-counts";
-
- private final String kafkaAddress;
-
- KafkaIO(String kafkaAddress) {
- this.kafkaAddress = Objects.requireNonNull(kafkaAddress);
- }
-
- IngressSpec<WrappedMessage> getIngressSpec() {
- return KafkaIngressBuilder.forIdentifier(Constants.INGRESS_ID)
- .withTopic(KafkaIO.WRAPPED_MESSAGES_TOPIC_NAME)
- .withKafkaAddress(kafkaAddress)
- .withStartupPosition(KafkaIngressStartupPosition.fromEarliest())
- .withConsumerGroupId("exactly-once-e2e")
- .withDeserializer(WrappedMessageKafkaDeserializer.class)
- .build();
- }
-
- EgressSpec<InvokeCount> getEgressSpec() {
- return KafkaEgressBuilder.forIdentifier(Constants.EGRESS_ID)
- .withKafkaAddress(kafkaAddress)
- .withExactlyOnceProducerSemantics(Duration.ofMinutes(1))
- .withSerializer(InvokeCountKafkaSerializer.class)
- .build();
- }
-
- private static final class WrappedMessageKafkaDeserializer
- implements KafkaIngressDeserializer<WrappedMessage> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public WrappedMessage deserialize(ConsumerRecord<byte[], byte[]> input) {
- try {
- return WrappedMessage.parseFrom(input.value());
- } catch (Exception e) {
- throw new RuntimeException("Error deserializing messages", e);
- }
- }
- }
-
- private static final class InvokeCountKafkaSerializer
- implements KafkaEgressSerializer<InvokeCount> {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public ProducerRecord<byte[], byte[]> serialize(InvokeCount invokeCount) {
- final byte[] key = invokeCount.getIdBytes().toByteArray();
- final byte[] value = invokeCount.toByteArray();
-
- return new ProducerRecord<>(INVOKE_COUNTS_TOPIC_NAME, key, value);
- }
- }
-}
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/protobuf/exactly-once-verification.proto b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/protobuf/exactly-once-verification.proto
deleted file mode 100644
index 5e8b41a..0000000
--- a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/protobuf/exactly-once-verification.proto
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-syntax = "proto3";
-
-package org.apache.flink.statefun.e2e.exactlyonce;
-option java_package = "org.apache.flink.statefun.e2e.exactlyonce.generated";
-option java_multiple_files = false;
-
-message WrappedMessage {
- string invokeTargetId = 1;
- string key = 2;
-}
-
-message FnAddress {
- string namespace = 1;
- string type = 2;
- string id = 3;
-}
-
-message InvokeCount {
- string id = 1;
- int32 invokeCount = 2;
-}
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/test/java/org/apache/flink/statefun/e2e/exactlyonce/ExactlyOnceE2E.java b/statefun-e2e-tests/statefun-exactly-once-e2e/src/test/java/org/apache/flink/statefun/e2e/exactlyonce/ExactlyOnceE2E.java
deleted file mode 100644
index b0c61d0..0000000
--- a/statefun-e2e-tests/statefun-exactly-once-e2e/src/test/java/org/apache/flink/statefun/e2e/exactlyonce/ExactlyOnceE2E.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.exactlyonce;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-import java.util.Collections;
-import java.util.Properties;
-import java.util.Random;
-import java.util.UUID;
-import org.apache.flink.statefun.e2e.common.StatefulFunctionsAppContainers;
-import org.apache.flink.statefun.e2e.common.kafka.KafkaIOVerifier;
-import org.apache.flink.statefun.e2e.common.kafka.KafkaProtobufSerializer;
-import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.InvokeCount;
-import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.WrappedMessage;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.junit.Rule;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.KafkaContainer;
-
-/**
- * End-to-end test based on the {@link ExactlyOnceVerificationModule} application.
- *
- * <p>This test writes some {@link WrappedMessage} records to Kafka, which eventually gets routed to
- * the counter function in the application. Then, after the corresponding {@link InvokeCount} are
- * seen in the Kafka egress (which implies some checkpoints have been completed since the
- * verification application is using exactly-once delivery), we restart a worker to simulate
- * failure. The application should automatically attempt to recover and eventually restart.
- * Meanwhile, more records are written to Kafka again. We verify that on the consumer side, the
- * invocation counts increase sequentially for each key as if the failure did not occur.
- */
-public class ExactlyOnceE2E {
-
- private static final Logger LOG = LoggerFactory.getLogger(ExactlyOnceE2E.class);
-
- private static final String CONFLUENT_PLATFORM_VERSION = "5.0.3";
- private static final String KAFKA_HOST = "kafka-broker";
-
- private static final int NUM_WORKERS = 2;
-
- /**
- * Kafka broker. We need to explicitly set the transaction state log replication factor and min
- * ISR since by default, those values are larger than 1 while we are only using 1 Kafka broker.
- */
- @Rule
- public KafkaContainer kafka =
- new KafkaContainer(CONFLUENT_PLATFORM_VERSION)
- .withNetworkAliases(KAFKA_HOST)
- .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
- .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1");
-
- @Rule
- public StatefulFunctionsAppContainers verificationApp =
- StatefulFunctionsAppContainers.builder("exactly-once-verification", NUM_WORKERS)
- .dependsOn(kafka)
- .exposeMasterLogs(LOG)
- .withModuleGlobalConfiguration(
- Constants.KAFKA_BOOTSTRAP_SERVERS_CONF, KAFKA_HOST + ":9092")
- .build();
-
- @Test(timeout = 300_000L)
- public void run() throws Exception {
- final String kafkaAddress = kafka.getBootstrapServers();
-
- final Producer<String, WrappedMessage> messageProducer =
- kafkaWrappedMessagesProducer(kafkaAddress);
- final Consumer<String, InvokeCount> invokeCountsConsumer =
- kafkaInvokeCountsConsumer(kafkaAddress);
-
- final KafkaIOVerifier<String, WrappedMessage, String, InvokeCount> verifier =
- new KafkaIOVerifier<>(messageProducer, invokeCountsConsumer);
-
- assertThat(
- verifier.sending(wrappedMessage("foo"), wrappedMessage("foo"), wrappedMessage("foo")),
- verifier.resultsInOrder(
- is(invokeCount("foo", 1)), is(invokeCount("foo", 2)), is(invokeCount("foo", 3))));
-
- LOG.info(
- "Restarting random worker to simulate failure. The application should automatically recover.");
- verificationApp.restartWorker(randomWorkerIndex());
-
- assertThat(
- verifier.sending(wrappedMessage("foo"), wrappedMessage("foo"), wrappedMessage("foo")),
- verifier.resultsInOrder(
- is(invokeCount("foo", 4)), is(invokeCount("foo", 5)), is(invokeCount("foo", 6))));
- }
-
- private static Producer<String, WrappedMessage> kafkaWrappedMessagesProducer(
- String bootstrapServers) {
- Properties props = new Properties();
- props.put("bootstrap.servers", bootstrapServers);
-
- return new KafkaProducer<>(
- props, new StringSerializer(), new KafkaProtobufSerializer<>(WrappedMessage.parser()));
- }
-
- private Consumer<String, InvokeCount> kafkaInvokeCountsConsumer(String bootstrapServers) {
- Properties consumerProps = new Properties();
- consumerProps.setProperty("bootstrap.servers", bootstrapServers);
- consumerProps.setProperty("group.id", "exactly-once-e2e");
- consumerProps.setProperty("auto.offset.reset", "earliest");
- consumerProps.setProperty("isolation.level", "read_committed");
-
- KafkaConsumer<String, InvokeCount> consumer =
- new KafkaConsumer<>(
- consumerProps,
- new StringDeserializer(),
- new KafkaProtobufSerializer<>(InvokeCount.parser()));
- consumer.subscribe(Collections.singletonList(KafkaIO.INVOKE_COUNTS_TOPIC_NAME));
-
- return consumer;
- }
-
- private static ProducerRecord<String, WrappedMessage> wrappedMessage(String targetInvokeId) {
- final String key = UUID.randomUUID().toString();
-
- return new ProducerRecord<>(
- KafkaIO.WRAPPED_MESSAGES_TOPIC_NAME,
- key,
- WrappedMessage.newBuilder().setInvokeTargetId(targetInvokeId).setKey(key).build());
- }
-
- private static InvokeCount invokeCount(String id, int count) {
- return InvokeCount.newBuilder().setId(id).setInvokeCount(count).build();
- }
-
- private static int randomWorkerIndex() {
- return new Random().nextInt(NUM_WORKERS);
- }
-}
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/test/resources/Dockerfile b/statefun-e2e-tests/statefun-exactly-once-e2e/src/test/resources/Dockerfile
deleted file mode 100644
index d1463f3..0000000
--- a/statefun-e2e-tests/statefun-exactly-once-e2e/src/test/resources/Dockerfile
+++ /dev/null
@@ -1,20 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-FROM flink-statefun:2.3-SNAPSHOT
-
-RUN mkdir -p /opt/statefun/modules/statefun-exactly-once-e2e
-COPY statefun-exactly-once-e2e*.jar /opt/statefun/modules/statefun-exactly-once-e2e/
-COPY flink-conf.yaml $FLINK_HOME/conf/flink-conf.yaml
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/test/resources/log4j.properties b/statefun-e2e-tests/statefun-exactly-once-e2e/src/test/resources/log4j.properties
deleted file mode 100644
index fb965d3..0000000
--- a/statefun-e2e-tests/statefun-exactly-once-e2e/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,24 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=INFO, console
-
-# Log all infos in the given file
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n