[FLINK-19620] Remove original ExactlyOnceE2E

Since exactly-once is now verified by the RemoteModuleE2E, we remove the
duplicate test coverage of the old ExactlyOnceE2E
diff --git a/statefun-e2e-tests/pom.xml b/statefun-e2e-tests/pom.xml
index 8af2e92..c27d0ee 100644
--- a/statefun-e2e-tests/pom.xml
+++ b/statefun-e2e-tests/pom.xml
@@ -32,7 +32,6 @@
         <module>statefun-e2e-tests-common</module>
         <module>statefun-sanity-e2e</module>
         <module>statefun-remote-module-e2e</module>
-        <module>statefun-exactly-once-e2e</module>
     </modules>
 
     <build>
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/pom.xml b/statefun-e2e-tests/statefun-exactly-once-e2e/pom.xml
deleted file mode 100644
index a47895b..0000000
--- a/statefun-e2e-tests/statefun-exactly-once-e2e/pom.xml
+++ /dev/null
@@ -1,109 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-  http://www.apache.org/licenses/LICENSE-2.0
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-         xmlns="http://maven.apache.org/POM/4.0.0"
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
-    <parent>
-        <artifactId>statefun-e2e-tests</artifactId>
-        <groupId>org.apache.flink</groupId>
-        <version>2.3-SNAPSHOT</version>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
-
-    <artifactId>statefun-exactly-once-e2e</artifactId>
-
-    <properties>
-        <testcontainers.version>1.12.5</testcontainers.version>
-    </properties>
-
-    <dependencies>
-        <!-- Stateful Functions -->
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>statefun-sdk</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>statefun-kafka-io</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-
-        <!-- Protobuf -->
-        <dependency>
-            <groupId>com.google.protobuf</groupId>
-            <artifactId>protobuf-java</artifactId>
-            <version>${protobuf.version}</version>
-        </dependency>
-
-        <!-- logging -->
-        <dependency>
-            <groupId>org.slf4j</groupId>
-            <artifactId>slf4j-log4j12</artifactId>
-            <version>1.7.15</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>log4j</groupId>
-            <artifactId>log4j</artifactId>
-            <version>1.2.17</version>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>com.squareup.okhttp3</groupId>
-            <artifactId>okhttp</artifactId>
-            <version>3.14.6</version>
-            <scope>test</scope>
-        </dependency>
-
-        <!-- End-to-end test common -->
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>statefun-e2e-tests-common</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-        </dependency>
-
-        <!-- Testcontainers KafkaContainer -->
-        <dependency>
-            <groupId>org.testcontainers</groupId>
-            <artifactId>kafka</artifactId>
-            <version>${testcontainers.version}</version>
-            <scope>test</scope>
-        </dependency>
-    </dependencies>
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>com.github.os72</groupId>
-                <artifactId>protoc-jar-maven-plugin</artifactId>
-                <version>${protoc-jar-maven-plugin.version}</version>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-javadoc-plugin</artifactId>
-                <configuration>
-                    <excludePackageNames>org.apache.flink.statefun.examples.greeter.generated</excludePackageNames>
-                </configuration>
-            </plugin>
-        </plugins>
-    </build>
-
-</project>
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/Constants.java b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/Constants.java
deleted file mode 100644
index ab66809..0000000
--- a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/Constants.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.exactlyonce;
-
-import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.InvokeCount;
-import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.WrappedMessage;
-import org.apache.flink.statefun.sdk.io.EgressIdentifier;
-import org.apache.flink.statefun.sdk.io.IngressIdentifier;
-
-final class Constants {
-
-  private Constants() {}
-
-  static final String KAFKA_BOOTSTRAP_SERVERS_CONF = "kafka-bootstrap-servers";
-
-  static final IngressIdentifier<WrappedMessage> INGRESS_ID =
-      new IngressIdentifier<>(
-          WrappedMessage.class, "org.apache.flink.e2e.exactlyonce", "wrapped-messages");
-
-  static final EgressIdentifier<InvokeCount> EGRESS_ID =
-      new EgressIdentifier<>(
-          "org.apache.flink.e2e.exactlyonce", "invoke-counts", InvokeCount.class);
-}
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/ExactlyOnceVerificationModule.java b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/ExactlyOnceVerificationModule.java
deleted file mode 100644
index 9dbb465..0000000
--- a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/ExactlyOnceVerificationModule.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.exactlyonce;
-
-import com.google.auto.service.AutoService;
-import java.util.Map;
-import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.WrappedMessage;
-import org.apache.flink.statefun.sdk.spi.StatefulFunctionModule;
-
-/**
- * This is a a simple application used for testing end-to-end exactly-once semantics.
- *
- * <p>The application reads {@link WrappedMessage}s from a Kafka ingress which gets routed to {@link
- * FnUnwrapper} functions, which in turn simply forwards the messages to {@link FnCounter} functions
- * with specified target keys defined in the wrapped message. The counter function keeps count of
- * the number of times each key as been invoked, and sinks that count to an exactly-once delivery
- * Kafka egress for verification.
- */
-@AutoService(StatefulFunctionModule.class)
-public class ExactlyOnceVerificationModule implements StatefulFunctionModule {
-
-  @Override
-  public void configure(Map<String, String> globalConfiguration, Binder binder) {
-    String kafkaBootstrapServers = globalConfiguration.get(Constants.KAFKA_BOOTSTRAP_SERVERS_CONF);
-    if (kafkaBootstrapServers == null) {
-      throw new IllegalStateException(
-          "Missing required global configuration " + Constants.KAFKA_BOOTSTRAP_SERVERS_CONF);
-    }
-
-    configureKafkaIO(kafkaBootstrapServers, binder);
-    configureAddressTaggerFunctions(binder);
-  }
-
-  private static void configureKafkaIO(String kafkaAddress, Binder binder) {
-    final KafkaIO kafkaIO = new KafkaIO(kafkaAddress);
-
-    binder.bindIngress(kafkaIO.getIngressSpec());
-    binder.bindIngressRouter(
-        Constants.INGRESS_ID,
-        ((message, downstream) -> downstream.forward(FnUnwrapper.TYPE, message.getKey(), message)));
-
-    binder.bindEgress(kafkaIO.getEgressSpec());
-  }
-
-  private static void configureAddressTaggerFunctions(Binder binder) {
-    binder.bindFunctionProvider(FnUnwrapper.TYPE, ignored -> new FnUnwrapper());
-    binder.bindFunctionProvider(FnCounter.TYPE, ignored -> new FnCounter());
-  }
-}
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/FnCounter.java b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/FnCounter.java
deleted file mode 100644
index 5243ebd..0000000
--- a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/FnCounter.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.exactlyonce;
-
-import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.InvokeCount;
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.FunctionType;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-import org.apache.flink.statefun.sdk.annotations.Persisted;
-import org.apache.flink.statefun.sdk.state.PersistedValue;
-
-final class FnCounter implements StatefulFunction {
-
-  static final FunctionType TYPE = new FunctionType("org.apache.flink.e2e.exactlyonce", "counter");
-
-  @Persisted
-  private final PersistedValue<Integer> invokeCountState =
-      PersistedValue.of("invoke-count", Integer.class);
-
-  @Override
-  public void invoke(Context context, Object input) {
-    final int previousCount = invokeCountState.getOrDefault(0);
-    final int currentCount = previousCount + 1;
-
-    final InvokeCount invokeCount =
-        InvokeCount.newBuilder().setId(context.self().id()).setInvokeCount(currentCount).build();
-    invokeCountState.set(currentCount);
-
-    context.send(Constants.EGRESS_ID, invokeCount);
-  }
-}
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/FnUnwrapper.java b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/FnUnwrapper.java
deleted file mode 100644
index 990e545..0000000
--- a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/FnUnwrapper.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.exactlyonce;
-
-import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.WrappedMessage;
-import org.apache.flink.statefun.sdk.Context;
-import org.apache.flink.statefun.sdk.FunctionType;
-import org.apache.flink.statefun.sdk.StatefulFunction;
-
-final class FnUnwrapper implements StatefulFunction {
-
-  static final FunctionType TYPE =
-      new FunctionType("org.apache.flink.e2e.exactlyonce", "unwrapper");
-
-  @Override
-  public void invoke(Context context, Object input) {
-    final WrappedMessage message = requireWrappedMessage(input);
-    context.send(FnCounter.TYPE, message.getInvokeTargetId(), message);
-  }
-
-  private static WrappedMessage requireWrappedMessage(Object input) {
-    return (WrappedMessage) input;
-  }
-}
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/KafkaIO.java b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/KafkaIO.java
deleted file mode 100644
index 4df60ca..0000000
--- a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/java/org/apache/flink/statefun/e2e/exactlyonce/KafkaIO.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.exactlyonce;
-
-import java.time.Duration;
-import java.util.Objects;
-import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.InvokeCount;
-import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.WrappedMessage;
-import org.apache.flink.statefun.sdk.io.EgressSpec;
-import org.apache.flink.statefun.sdk.io.IngressSpec;
-import org.apache.flink.statefun.sdk.kafka.KafkaEgressBuilder;
-import org.apache.flink.statefun.sdk.kafka.KafkaEgressSerializer;
-import org.apache.flink.statefun.sdk.kafka.KafkaIngressBuilder;
-import org.apache.flink.statefun.sdk.kafka.KafkaIngressDeserializer;
-import org.apache.flink.statefun.sdk.kafka.KafkaIngressStartupPosition;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.producer.ProducerRecord;
-
-final class KafkaIO {
-
-  static final String WRAPPED_MESSAGES_TOPIC_NAME = "wrapped-messages";
-  static final String INVOKE_COUNTS_TOPIC_NAME = "invoke-counts";
-
-  private final String kafkaAddress;
-
-  KafkaIO(String kafkaAddress) {
-    this.kafkaAddress = Objects.requireNonNull(kafkaAddress);
-  }
-
-  IngressSpec<WrappedMessage> getIngressSpec() {
-    return KafkaIngressBuilder.forIdentifier(Constants.INGRESS_ID)
-        .withTopic(KafkaIO.WRAPPED_MESSAGES_TOPIC_NAME)
-        .withKafkaAddress(kafkaAddress)
-        .withStartupPosition(KafkaIngressStartupPosition.fromEarliest())
-        .withConsumerGroupId("exactly-once-e2e")
-        .withDeserializer(WrappedMessageKafkaDeserializer.class)
-        .build();
-  }
-
-  EgressSpec<InvokeCount> getEgressSpec() {
-    return KafkaEgressBuilder.forIdentifier(Constants.EGRESS_ID)
-        .withKafkaAddress(kafkaAddress)
-        .withExactlyOnceProducerSemantics(Duration.ofMinutes(1))
-        .withSerializer(InvokeCountKafkaSerializer.class)
-        .build();
-  }
-
-  private static final class WrappedMessageKafkaDeserializer
-      implements KafkaIngressDeserializer<WrappedMessage> {
-
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public WrappedMessage deserialize(ConsumerRecord<byte[], byte[]> input) {
-      try {
-        return WrappedMessage.parseFrom(input.value());
-      } catch (Exception e) {
-        throw new RuntimeException("Error deserializing messages", e);
-      }
-    }
-  }
-
-  private static final class InvokeCountKafkaSerializer
-      implements KafkaEgressSerializer<InvokeCount> {
-
-    private static final long serialVersionUID = 1L;
-
-    @Override
-    public ProducerRecord<byte[], byte[]> serialize(InvokeCount invokeCount) {
-      final byte[] key = invokeCount.getIdBytes().toByteArray();
-      final byte[] value = invokeCount.toByteArray();
-
-      return new ProducerRecord<>(INVOKE_COUNTS_TOPIC_NAME, key, value);
-    }
-  }
-}
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/protobuf/exactly-once-verification.proto b/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/protobuf/exactly-once-verification.proto
deleted file mode 100644
index 5e8b41a..0000000
--- a/statefun-e2e-tests/statefun-exactly-once-e2e/src/main/protobuf/exactly-once-verification.proto
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-syntax = "proto3";
-
-package org.apache.flink.statefun.e2e.exactlyonce;
-option java_package = "org.apache.flink.statefun.e2e.exactlyonce.generated";
-option java_multiple_files = false;
-
-message WrappedMessage {
-    string invokeTargetId = 1;
-    string key = 2;
-}
-
-message FnAddress {
-    string namespace = 1;
-    string type = 2;
-    string id = 3;
-}
-
-message InvokeCount {
-    string id = 1;
-    int32 invokeCount = 2;
-}
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/test/java/org/apache/flink/statefun/e2e/exactlyonce/ExactlyOnceE2E.java b/statefun-e2e-tests/statefun-exactly-once-e2e/src/test/java/org/apache/flink/statefun/e2e/exactlyonce/ExactlyOnceE2E.java
deleted file mode 100644
index b0c61d0..0000000
--- a/statefun-e2e-tests/statefun-exactly-once-e2e/src/test/java/org/apache/flink/statefun/e2e/exactlyonce/ExactlyOnceE2E.java
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.statefun.e2e.exactlyonce;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.hamcrest.MatcherAssert.assertThat;
-
-import java.util.Collections;
-import java.util.Properties;
-import java.util.Random;
-import java.util.UUID;
-import org.apache.flink.statefun.e2e.common.StatefulFunctionsAppContainers;
-import org.apache.flink.statefun.e2e.common.kafka.KafkaIOVerifier;
-import org.apache.flink.statefun.e2e.common.kafka.KafkaProtobufSerializer;
-import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.InvokeCount;
-import org.apache.flink.statefun.e2e.exactlyonce.generated.ExactlyOnceVerification.WrappedMessage;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.junit.Rule;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.testcontainers.containers.KafkaContainer;
-
-/**
- * End-to-end test based on the {@link ExactlyOnceVerificationModule} application.
- *
- * <p>This test writes some {@link WrappedMessage} records to Kafka, which eventually gets routed to
- * the counter function in the application. Then, after the corresponding {@link InvokeCount} are
- * seen in the Kafka egress (which implies some checkpoints have been completed since the
- * verification application is using exactly-once delivery), we restart a worker to simulate
- * failure. The application should automatically attempt to recover and eventually restart.
- * Meanwhile, more records are written to Kafka again. We verify that on the consumer side, the
- * invocation counts increase sequentially for each key as if the failure did not occur.
- */
-public class ExactlyOnceE2E {
-
-  private static final Logger LOG = LoggerFactory.getLogger(ExactlyOnceE2E.class);
-
-  private static final String CONFLUENT_PLATFORM_VERSION = "5.0.3";
-  private static final String KAFKA_HOST = "kafka-broker";
-
-  private static final int NUM_WORKERS = 2;
-
-  /**
-   * Kafka broker. We need to explicitly set the transaction state log replication factor and min
-   * ISR since by default, those values are larger than 1 while we are only using 1 Kafka broker.
-   */
-  @Rule
-  public KafkaContainer kafka =
-      new KafkaContainer(CONFLUENT_PLATFORM_VERSION)
-          .withNetworkAliases(KAFKA_HOST)
-          .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
-          .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1");
-
-  @Rule
-  public StatefulFunctionsAppContainers verificationApp =
-      StatefulFunctionsAppContainers.builder("exactly-once-verification", NUM_WORKERS)
-          .dependsOn(kafka)
-          .exposeMasterLogs(LOG)
-          .withModuleGlobalConfiguration(
-              Constants.KAFKA_BOOTSTRAP_SERVERS_CONF, KAFKA_HOST + ":9092")
-          .build();
-
-  @Test(timeout = 300_000L)
-  public void run() throws Exception {
-    final String kafkaAddress = kafka.getBootstrapServers();
-
-    final Producer<String, WrappedMessage> messageProducer =
-        kafkaWrappedMessagesProducer(kafkaAddress);
-    final Consumer<String, InvokeCount> invokeCountsConsumer =
-        kafkaInvokeCountsConsumer(kafkaAddress);
-
-    final KafkaIOVerifier<String, WrappedMessage, String, InvokeCount> verifier =
-        new KafkaIOVerifier<>(messageProducer, invokeCountsConsumer);
-
-    assertThat(
-        verifier.sending(wrappedMessage("foo"), wrappedMessage("foo"), wrappedMessage("foo")),
-        verifier.resultsInOrder(
-            is(invokeCount("foo", 1)), is(invokeCount("foo", 2)), is(invokeCount("foo", 3))));
-
-    LOG.info(
-        "Restarting random worker to simulate failure. The application should automatically recover.");
-    verificationApp.restartWorker(randomWorkerIndex());
-
-    assertThat(
-        verifier.sending(wrappedMessage("foo"), wrappedMessage("foo"), wrappedMessage("foo")),
-        verifier.resultsInOrder(
-            is(invokeCount("foo", 4)), is(invokeCount("foo", 5)), is(invokeCount("foo", 6))));
-  }
-
-  private static Producer<String, WrappedMessage> kafkaWrappedMessagesProducer(
-      String bootstrapServers) {
-    Properties props = new Properties();
-    props.put("bootstrap.servers", bootstrapServers);
-
-    return new KafkaProducer<>(
-        props, new StringSerializer(), new KafkaProtobufSerializer<>(WrappedMessage.parser()));
-  }
-
-  private Consumer<String, InvokeCount> kafkaInvokeCountsConsumer(String bootstrapServers) {
-    Properties consumerProps = new Properties();
-    consumerProps.setProperty("bootstrap.servers", bootstrapServers);
-    consumerProps.setProperty("group.id", "exactly-once-e2e");
-    consumerProps.setProperty("auto.offset.reset", "earliest");
-    consumerProps.setProperty("isolation.level", "read_committed");
-
-    KafkaConsumer<String, InvokeCount> consumer =
-        new KafkaConsumer<>(
-            consumerProps,
-            new StringDeserializer(),
-            new KafkaProtobufSerializer<>(InvokeCount.parser()));
-    consumer.subscribe(Collections.singletonList(KafkaIO.INVOKE_COUNTS_TOPIC_NAME));
-
-    return consumer;
-  }
-
-  private static ProducerRecord<String, WrappedMessage> wrappedMessage(String targetInvokeId) {
-    final String key = UUID.randomUUID().toString();
-
-    return new ProducerRecord<>(
-        KafkaIO.WRAPPED_MESSAGES_TOPIC_NAME,
-        key,
-        WrappedMessage.newBuilder().setInvokeTargetId(targetInvokeId).setKey(key).build());
-  }
-
-  private static InvokeCount invokeCount(String id, int count) {
-    return InvokeCount.newBuilder().setId(id).setInvokeCount(count).build();
-  }
-
-  private static int randomWorkerIndex() {
-    return new Random().nextInt(NUM_WORKERS);
-  }
-}
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/test/resources/Dockerfile b/statefun-e2e-tests/statefun-exactly-once-e2e/src/test/resources/Dockerfile
deleted file mode 100644
index d1463f3..0000000
--- a/statefun-e2e-tests/statefun-exactly-once-e2e/src/test/resources/Dockerfile
+++ /dev/null
@@ -1,20 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-FROM flink-statefun:2.3-SNAPSHOT
-
-RUN mkdir -p /opt/statefun/modules/statefun-exactly-once-e2e
-COPY statefun-exactly-once-e2e*.jar /opt/statefun/modules/statefun-exactly-once-e2e/
-COPY flink-conf.yaml $FLINK_HOME/conf/flink-conf.yaml
diff --git a/statefun-e2e-tests/statefun-exactly-once-e2e/src/test/resources/log4j.properties b/statefun-e2e-tests/statefun-exactly-once-e2e/src/test/resources/log4j.properties
deleted file mode 100644
index fb965d3..0000000
--- a/statefun-e2e-tests/statefun-exactly-once-e2e/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,24 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-log4j.rootLogger=INFO, console
-
-# Log all infos in the given file
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n