Revert "[FLINK-24041][connectors] First draft of API, Add DynamoDB sink to verify API, Add Firehose sink, Refactor to use FLIP-143: Unified Sink API, Increase robustness for flush, Add license header, Update JavaDoc comments"

This reverts commit 2e33a40cde17aaa8bfc171f3ea28d2961733fb9d.
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/AsyncSinkBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/AsyncSinkBase.java
deleted file mode 100644
index d7df82f..0000000
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/AsyncSinkBase.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.base.sink;
-
-import org.apache.flink.api.connector.sink.Committer;
-import org.apache.flink.api.connector.sink.GlobalCommitter;
-import org.apache.flink.api.connector.sink.Sink;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Optional;
-
-/**
- * A generic sink for destinations that provide an async client to persist data.
- *
- * <p>The design of the sink focuses on extensibility and a broad support of destinations. The core
- * of the sink is kept generic and free of any connector specific dependencies. The sink is designed
- * to participate in checkpointing to provide at-least once semantics, but it is limited to
- * destinations that provide a client that supports async requests.
- *
- * <p>Limitations:
- *
- * <ul>
- *   <li>The sink is designed for destinations that provide an async client. Destinations that
- *       cannot ingest events in an async fashion cannot be supported by the sink.
- *   <li>The sink usually persist InputTs in the order they are added to the sink, but reorderings
- *       may occur, eg, when RequestEntryTs need to be retried.
- *   <li>We are not considering support for exactly-once semantics at this point.
- * </ul>
- */
-public abstract class AsyncSinkBase<InputT, RequestEntryT extends Serializable>
-        implements Sink<InputT, Void, Collection<RequestEntryT>, Void> {
-
-    @Override
-    public Optional<Committer<Void>> createCommitter(CommitterInitContext context)
-            throws IOException {
-        return Optional.empty();
-    }
-
-    @Override
-    public Optional<GlobalCommitter<Void, Void>> createGlobalCommitter(CommitterInitContext context)
-            throws IOException {
-        return Optional.empty();
-    }
-
-    @Override
-    public Optional<SimpleVersionedSerializer<Void>> getCommittableSerializer() {
-        return Optional.empty();
-    }
-
-    @Override
-    public Optional<SimpleVersionedSerializer<Void>> getGlobalCommittableSerializer() {
-        return Optional.empty();
-    }
-
-    @Override
-    public Optional<SimpleVersionedSerializer<Collection<RequestEntryT>>>
-            getWriterStateSerializer() {
-        // FIXME: implement
-        return Optional.empty();
-    }
-}
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
deleted file mode 100644
index 9f80f9c..0000000
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.base.sink.writer;
-
-import org.apache.flink.api.common.operators.MailboxExecutor;
-import org.apache.flink.api.connector.sink.Sink;
-import org.apache.flink.api.connector.sink.SinkWriter;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Deque;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
-        implements SinkWriter<InputT, Void, Collection<RequestEntryT>> {
-
-    private static final Logger LOG = LoggerFactory.getLogger(AsyncSinkWriter.class);
-
-    private final MailboxExecutor mailboxExecutor;
-    private final Sink.ProcessingTimeService timeService;
-
-    private static final int MAX_BATCH_SIZE = 150; // just for testing purposes
-    private static final int MAX_IN_FLIGHT_REQUESTS = 1; // just for testing purposes
-    private static final int MAX_BUFFERED_REQUESTS_ENTRIES = 1000; // just for testing purposes
-
-    public AsyncSinkWriter(
-            ElementConverter<InputT, RequestEntryT> elementConverter, Sink.InitContext context) {
-        this.elementConverter = elementConverter;
-        this.mailboxExecutor = context.getMailboxExecutor();
-        this.timeService = context.getProcessingTimeService();
-    }
-
-    /**
-     * The ElementConverter provides a mapping between for the elements of a stream to request
-     * entries that can be sent to the destination.
-     *
-     * <p>The resulting request entry is buffered by the AsyncSinkWriter and sent to the destination
-     * when the {@code submitRequestEntries} method is invoked.
-     */
-    private final ElementConverter<InputT, RequestEntryT> elementConverter;
-
-    /**
-     * This method specifies how to persist buffered request entries into the destination. It is
-     * implemented when support for a new destination is added.
-     *
-     * <p>The method is invoked with a set of request entries according to the buffering hints (and
-     * the valid limits of the destination). The logic then needs to create and execute the request
-     * against the destination (ideally by batching together multiple request entries to increase
-     * efficiency). The logic also needs to identify individual request entries that were not
-     * persisted successfully and resubmit them using the {@code requeueFailedRequestEntry} method.
-     *
-     * <p>During checkpointing, the sink needs to ensure that there are no outstanding in-flight
-     * requests.
-     *
-     * @param requestEntries a set of request entries that should be sent to the destination
-     * @param requestResult a ResultFuture that needs to be completed once all request entries that
-     *     have been passed to the method on invocation have either been successfully persisted in
-     *     the destination or have been re-queued through {@code requestResult}
-     */
-    protected abstract void submitRequestEntries(
-            List<RequestEntryT> requestEntries, ResultFuture<RequestEntryT> requestResult);
-
-    /**
-     * Buffer to hold request entries that should be persisted into the destination.
-     *
-     * <p>A request entry contain all relevant details to make a call to the destination. Eg, for
-     * Kinesis Data Streams a request entry contains the payload and partition key.
-     *
-     * <p>It seems more natural to buffer InputT, ie, the events that should be persisted, rather
-     * than RequestEntryT. However, in practice, the response of a failed request call can make it
-     * very hard, if not impossible, to reconstruct the original event. It is much easier, to just
-     * construct a new (retry) request entry from the response and add that back to the queue for
-     * later retry.
-     */
-    private final Deque<RequestEntryT> bufferedRequestEntries = new ArrayDeque<>();
-
-    /**
-     * Tracks all pending async calls that have been executed since the last checkpoint. Calls that
-     * completed (successfully or unsuccessfully) are automatically decrementing the counter. Any
-     * request entry that was not successfully persisted needs to be handled and retried by the
-     * logic in {@code submitRequestsToApi}.
-     *
-     * <p>There is a limit on the number of concurrent (async) requests that can be handled by the
-     * client library. This limit is enforced by checking the queue size before accepting a new
-     * element into the queue.
-     *
-     * <p>To complete a checkpoint, we need to make sure that no requests are in flight, as they may
-     * fail, which could then lead to data loss.
-     */
-    private int inFlightRequestsCount;
-
-    @Override
-    public void write(InputT element, Context context) throws IOException, InterruptedException {
-        // blocks if too many elements have been buffered
-        while (bufferedRequestEntries.size() >= MAX_BUFFERED_REQUESTS_ENTRIES) {
-            mailboxExecutor.yield();
-        }
-
-        bufferedRequestEntries.add(elementConverter.apply(element, context));
-
-        // blocks if too many async requests are in flight
-        flush();
-    }
-
-    /**
-     * Persists buffered RequestsEntries into the destination by invoking {@code
-     * submitRequestEntries} with batches according to the user specified buffering hints.
-     *
-     * <p>The method blocks if too many async requests are in flight.
-     */
-    private void flush() throws InterruptedException {
-        while (bufferedRequestEntries.size() >= MAX_BATCH_SIZE) {
-
-            // create a batch of request entries that should be persisted in the destination
-            ArrayList<RequestEntryT> batch = new ArrayList<>(MAX_BATCH_SIZE);
-
-            while (batch.size() <= MAX_BATCH_SIZE && !bufferedRequestEntries.isEmpty()) {
-                try {
-                    batch.add(bufferedRequestEntries.remove());
-                } catch (NoSuchElementException e) {
-                    // if there are not enough elements, just create a smaller batch
-                    break;
-                }
-            }
-
-            ResultFuture<RequestEntryT> requestResult =
-                    failedRequestEntries -> mailboxExecutor.execute(
-                            () -> completeRequest(failedRequestEntries),
-                            "Mark in-flight request as completed and requeue %d request entries",
-                            failedRequestEntries.size());
-
-            while (inFlightRequestsCount >= MAX_IN_FLIGHT_REQUESTS) {
-                mailboxExecutor.yield();
-            }
-
-            inFlightRequestsCount++;
-            submitRequestEntries(batch, requestResult);
-        }
-    }
-
-    /**
-     * Marks an in-flight request as completed and prepends failed requestEntries back to the
-     * internal requestEntry buffer for later retry.
-     *
-     * @param failedRequestEntries requestEntries that need to be retried
-     */
-    private void completeRequest(Collection<RequestEntryT> failedRequestEntries) {
-        inFlightRequestsCount--;
-
-        // By just iterating through failedRequestEntries, it reverses the order of the
-        // failedRequestEntries. It doesn't make a difference for kinesis:putRecords, as the api
-        // does not make any order guarantees, but may cause avoidable reorderings for other
-        // destinations.
-        failedRequestEntries.forEach(bufferedRequestEntries::addFirst);
-    }
-
-    /**
-     * In flight requests will be retried if the sink is still healthy. But if in-flight requests
-     * fail after a checkpoint has been triggered and Flink needs to recover from the checkpoint,
-     * the (failed) in-flight requests are gone and cannot be retried. Hence, there cannot be any
-     * outstanding in-flight requests when a commit is initialized.
-     *
-     * <p>To this end, all in-flight requests need to completed before proceeding with the commit.
-     */
-    @Override
-    public List<Void> prepareCommit(boolean flush) throws IOException, InterruptedException {
-        if (flush) {
-            flush();
-        }
-
-        // wait until all in-flight requests completed
-        while (inFlightRequestsCount > 0) {
-            mailboxExecutor.yield();
-        }
-
-        return Collections.emptyList();
-    }
-
-    /**
-     * All in-flight requests that are relevant for the snapshot have been completed, but there may
-     * still be request entries in the internal buffers that are yet to be sent to the endpoint.
-     * These request entries are stored in the snapshot state so that they don't get lost in case of
-     * a failure/restart of the application.
-     */
-    @Override
-    public List<Collection<RequestEntryT>> snapshotState() throws IOException {
-        return Arrays.asList(bufferedRequestEntries);
-    }
-
-    @Override
-    public void close() throws Exception {}
-}
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java
deleted file mode 100644
index ee6d7da..0000000
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.base.sink.writer;
-
-import org.apache.flink.api.connector.sink.SinkWriter;
-
-import java.io.Serializable;
-
-/**
- * This interface specifies the mapping between elements of a stream to request entries that can be
- * sent to the destination. The mapping is provided by the end-user of a sink, not the sink creator.
- *
- * <p>The request entries contain all relevant information required to create and sent the actual
- * request. Eg, for Kinesis Data Streams, the request entry includes the payload and the partition
- * key.
- */
-public interface ElementConverter<InputT, RequestEntryT> extends Serializable {
-    RequestEntryT apply(InputT element, SinkWriter.Context context);
-}
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ResultFuture.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ResultFuture.java
deleted file mode 100644
index 05ecc27..0000000
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ResultFuture.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package org.apache.flink.connector.base.sink.writer;
-
-import java.util.Collection;
-
-/**
- * The entire request may fail or single request entries that are part of the request may not be
- * persisted successfully, eg, because of network issues or service side throttling. All request
- * entries that failed with transient failures need to be re-queued with this method so that aren't
- * lost and can be retried later.
- *
- * <p>Request entries that are causing the same error in a reproducible manner, eg, ill-formed
- * request entries, must not be re-queued but the error needs to be handled in the logic of {@code
- * submitRequestEntries}. Otherwise these request entries will be retried indefinitely, always
- * causing the same error.
- *
- * @param <RequestEntryT>
- */
-public interface ResultFuture<RequestEntryT> {
-    /**
-     * Completes the result future.
-     *
-     * <p>The result future must only be completed when the request sent to the endpoint completed
-     * (sucessfully or unsuccessfully). Request entries that were not persisted successfully must be
-     * included in the {@code failedRequestEntries} parameter, so that they can be retried later.
-     *
-     * @param failedRequestEntries Request entries that need to be retried at a later point
-     */
-    void complete(Collection<RequestEntryT> failedRequestEntries);
-}
diff --git a/flink-connectors/flink-connector-base/src/main/resources/log4j2.properties b/flink-connectors/flink-connector-base/src/main/resources/log4j2.properties
deleted file mode 100644
index 32c696e..0000000
--- a/flink-connectors/flink-connector-base/src/main/resources/log4j2.properties
+++ /dev/null
@@ -1,25 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-rootLogger.level = INFO
-rootLogger.appenderRef.console.ref = ConsoleAppender
-
-appender.console.name = ConsoleAppender
-appender.console.type = CONSOLE
-appender.console.layout.type = PatternLayout
-appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git a/flink-connectors/flink-connector-kinesis-171/pom.xml b/flink-connectors/flink-connector-kinesis-171/pom.xml
deleted file mode 100644
index 1ef46c0..0000000
--- a/flink-connectors/flink-connector-kinesis-171/pom.xml
+++ /dev/null
@@ -1,149 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
-		 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-		 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
-	<modelVersion>4.0.0</modelVersion>
-
-	<parent>
-		<groupId>org.apache.flink</groupId>
-		<artifactId>flink-connectors</artifactId>
-		<version>1.14-SNAPSHOT</version>
-		<relativePath>..</relativePath>
-	</parent>
-
-	<artifactId>flink-connector-kinesis-171_${scala.binary.version}</artifactId>
-	<name>Flink : Connectors : Kinesis 171</name>
-
-	<properties>
-		<aws.sdkv2.version>2.16.95</aws.sdkv2.version>
-		<log4j.version>2.14.1</log4j.version>
-	</properties>
-
-	<packaging>jar</packaging>
-
-	<dependencies>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-connector-base</artifactId>
-			<version>${project.version}</version>
-			<scope>provided</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>software.amazon.awssdk</groupId>
-			<artifactId>kinesis</artifactId>
-			<version>${aws.sdkv2.version}</version>
-		</dependency>
-
-		<!-- just for testing purposes -->
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-connector-kinesis_${scala.binary.version}</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-s3-fs-hadoop</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.logging.log4j</groupId>
-			<artifactId>log4j-slf4j-impl</artifactId>
-			<version>${log4j.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.logging.log4j</groupId>
-			<artifactId>log4j-api</artifactId>
-			<version>${log4j.version}</version>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.logging.log4j</groupId>
-			<artifactId>log4j-core</artifactId>
-			<version>${log4j.version}</version>
-		</dependency>
-
-		<!-- Flink ecosystem -->
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
-			<version>${project.version}</version>
-			<scope>provided</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-clients_${scala.binary.version}</artifactId>
-			<version>${project.version}</version>
-			<scope>provided</scope>
-		</dependency>
-
-		<!-- Test dependencies -->
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-core</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-			<type>test-jar</type>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-			<type>test-jar</type>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-tests</artifactId>
-			<version>${project.version}</version>
-			<type>test-jar</type>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-test-utils_${scala.binary.version}</artifactId>
-			<version>${project.version}</version>
-			<scope>test</scope>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.flink</groupId>
-			<artifactId>flink-runtime_${scala.binary.version}</artifactId>
-			<version>${project.version}</version>
-			<type>test-jar</type>
-			<scope>test</scope>
-		</dependency>
-
-		<dependency>
-			<groupId>org.testcontainers</groupId>
-			<artifactId>testcontainers</artifactId>
-			<version>1.15.3</version>
-			<scope>test</scope>
-		</dependency>
-
-	</dependencies>
-
-</project>
diff --git a/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/AmazonKinesisDataStreamSink.java b/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/AmazonKinesisDataStreamSink.java
deleted file mode 100644
index 63564c9..0000000
--- a/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/AmazonKinesisDataStreamSink.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package software.amazon.flink.connectors;
-
-import org.apache.flink.api.connector.sink.Sink;
-import org.apache.flink.api.connector.sink.SinkWriter;
-import org.apache.flink.connector.base.sink.AsyncSinkBase;
-import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
-import org.apache.flink.connector.base.sink.writer.ElementConverter;
-import org.apache.flink.connector.base.sink.writer.ResultFuture;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import software.amazon.awssdk.core.SdkBytes;
-import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
-import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
-import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
-import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
-import software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-
-public class AmazonKinesisDataStreamSink<InputT> extends AsyncSinkBase<InputT, PutRecordsRequestEntry> {
-
-    private static final Logger LOG = LoggerFactory.getLogger(AmazonKinesisDataStreamSink.class);
-
-    private final String streamName;
-    private static final KinesisAsyncClient client = KinesisAsyncClient.create();
-    private final ElementConverter<InputT, PutRecordsRequestEntry> elementConverter;
-
-    /**
-     * Basic service properties and limits. Supported requests per sec, max batch size, max items per batch, etc.
-     */
-    private Object serviceProperties;
-
-    private final ElementConverter<InputT, PutRecordsRequestEntry> SIMPLE_STRING_ELEMENT_CONVERTER =
-            (element, context) -> PutRecordsRequestEntry
-                .builder()
-                .data(SdkBytes.fromUtf8String(element.toString()))
-                .partitionKey(String.valueOf(element.hashCode()))
-                .build();
-
-    @Override
-    public SinkWriter<InputT, Void, Collection<PutRecordsRequestEntry>> createWriter(InitContext context, List<Collection<PutRecordsRequestEntry>> states) throws IOException {
-        return new AmazonKinesisDataStreamWriter(context);
-    }
-
-    public AmazonKinesisDataStreamSink(String streamName, ElementConverter<InputT, PutRecordsRequestEntry> elementConverter, KinesisAsyncClient client) {
-        this.streamName = streamName;
-        this.elementConverter = elementConverter;
-//        this.client = client;
-
-        // verify that user supplied buffering strategy respects service specific limits
-    }
-
-    public AmazonKinesisDataStreamSink(String streamName) {
-        this.streamName = streamName;
-        this.elementConverter = SIMPLE_STRING_ELEMENT_CONVERTER;
-//        this.client = KinesisAsyncClient.create();
-    }
-
-    private class AmazonKinesisDataStreamWriter extends AsyncSinkWriter<InputT, PutRecordsRequestEntry> {
-
-        public AmazonKinesisDataStreamWriter(Sink.InitContext context) {
-            super(elementConverter, context);
-        }
-
-        @Override
-        protected void submitRequestEntries(List<PutRecordsRequestEntry> requestEntries, ResultFuture<PutRecordsRequestEntry> requestResult) {
-            // create a batch request
-            PutRecordsRequest batchRequest = PutRecordsRequest
-                    .builder()
-                    .records(requestEntries)
-                    .streamName(streamName)
-                    .build();
-
-            LOG.info("submitRequestEntries: putRecords");
-
-            // call api with batch request
-            CompletableFuture<PutRecordsResponse> future = client.putRecords(batchRequest);
-
-            // re-queue elements of failed requests
-            future.whenComplete((response, err) -> {
-                    if (err != null) {
-                        LOG.warn("kinesis:PutRecords request failed: ", err);
-
-                        requestResult.complete(requestEntries);
-
-                        return;
-                    }
-
-                    if (response.failedRecordCount() > 0) {
-                        LOG.info("Re-queueing {} messages", response.failedRecordCount());
-
-                        ArrayList<PutRecordsRequestEntry> failedRequestEntries = new ArrayList<>(response.failedRecordCount());
-                        List<PutRecordsResultEntry> records = response.records();
-
-                        for (int i = 0; i < records.size(); i++) {
-                            if (records.get(i).errorCode() != null) {
-                                failedRequestEntries.add(requestEntries.get(i));
-                            }
-                        }
-
-                        requestResult.complete(failedRequestEntries);
-                    } else {
-                        requestResult.complete(Collections.emptyList());
-                    }
-                });
-        }
-    }
-}
diff --git a/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/Test.java b/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/Test.java
deleted file mode 100644
index 9f81567..0000000
--- a/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/Test.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package software.amazon.flink.connectors;
-
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
-import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-
-import java.util.Properties;
-
-public class Test {
-
-    public static void main(String[] args) throws Exception {
-        // set up the streaming execution environment
-        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
-        env.enableCheckpointing(10_000);
-
-//        DataStream<String> stream = env.readTextFile("s3://shausma-nyc-tlc/yellow-trip-data/taxi-trips.json/dropoff_year=2010/part-00000-cdac5fe4-b823-4576-aeb7-7327b077476e.c000.json");
-
-        Properties consumerConfig = new Properties();
-        consumerConfig.put(ConsumerConfigConstants.AWS_REGION, "eu-west-1");
-        consumerConfig.put(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO");
-        consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON");
-
-        DataStream<String> stream = env.addSource(new FlinkKinesisConsumer<>("test-in", new SimpleStringSchema(), consumerConfig));
-
-        stream.sinkTo(new AmazonKinesisDataStreamSink<>("test-out"));
-
-        /*
-         * Here, you can start creating your execution plan for Flink.
-         *
-         * Start with getting some data from the environment, like
-         * 	env.readTextFile(textPath);
-         *
-         * then, transform the resulting DataStream<String> using operations
-         * like
-         * 	.filter()
-         * 	.flatMap()
-         * 	.join()
-         * 	.coGroup()
-         *
-         * and many more.
-         * Have a look at the programming guide for the Java API:
-         *
-         * https://flink.apache.org/docs/latest/apis/streaming/index.html
-         *
-         */
-
-        // execute program
-        env.execute("Flink Streaming Java API Skeleton");
-    }
-
-}
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
index 3e9a500..afcd587 100644
--- a/flink-connectors/pom.xml
+++ b/flink-connectors/pom.xml
@@ -54,7 +54,6 @@
 		<module>flink-connector-kafka</module>
 		<module>flink-connector-gcp-pubsub</module>
 		<module>flink-connector-kinesis</module>
-		<module>flink-connector-kinesis-171</module>
 		<module>flink-connector-base</module>
 		<module>flink-file-sink-common</module>
 		<module>flink-connector-files</module>