Revert "[FLINK-24041][connectors] First draft of API, Add DynamoDB sink to verify API, Add Firehose sink, Refactor to use FLIP-143: Unified Sink API, Increase robustness for flush, Add license header, Update JavaDoc comments"
This reverts commit 2e33a40cde17aaa8bfc171f3ea28d2961733fb9d.
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/AsyncSinkBase.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/AsyncSinkBase.java
deleted file mode 100644
index d7df82f..0000000
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/AsyncSinkBase.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.base.sink;
-
-import org.apache.flink.api.connector.sink.Committer;
-import org.apache.flink.api.connector.sink.GlobalCommitter;
-import org.apache.flink.api.connector.sink.Sink;
-import org.apache.flink.core.io.SimpleVersionedSerializer;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Optional;
-
-/**
- * A generic sink for destinations that provide an async client to persist data.
- *
- * <p>The design of the sink focuses on extensibility and a broad support of destinations. The core
- * of the sink is kept generic and free of any connector specific dependencies. The sink is designed
- * to participate in checkpointing to provide at-least once semantics, but it is limited to
- * destinations that provide a client that supports async requests.
- *
- * <p>Limitations:
- *
- * <ul>
- * <li>The sink is designed for destinations that provide an async client. Destinations that
- * cannot ingest events in an async fashion cannot be supported by the sink.
- * <li>The sink usually persist InputTs in the order they are added to the sink, but reorderings
- * may occur, eg, when RequestEntryTs need to be retried.
- * <li>We are not considering support for exactly-once semantics at this point.
- * </ul>
- */
-public abstract class AsyncSinkBase<InputT, RequestEntryT extends Serializable>
- implements Sink<InputT, Void, Collection<RequestEntryT>, Void> {
-
- @Override
- public Optional<Committer<Void>> createCommitter(CommitterInitContext context)
- throws IOException {
- return Optional.empty();
- }
-
- @Override
- public Optional<GlobalCommitter<Void, Void>> createGlobalCommitter(CommitterInitContext context)
- throws IOException {
- return Optional.empty();
- }
-
- @Override
- public Optional<SimpleVersionedSerializer<Void>> getCommittableSerializer() {
- return Optional.empty();
- }
-
- @Override
- public Optional<SimpleVersionedSerializer<Void>> getGlobalCommittableSerializer() {
- return Optional.empty();
- }
-
- @Override
- public Optional<SimpleVersionedSerializer<Collection<RequestEntryT>>>
- getWriterStateSerializer() {
- // FIXME: implement
- return Optional.empty();
- }
-}
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
deleted file mode 100644
index 9f80f9c..0000000
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.base.sink.writer;
-
-import org.apache.flink.api.common.operators.MailboxExecutor;
-import org.apache.flink.api.connector.sink.Sink;
-import org.apache.flink.api.connector.sink.SinkWriter;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.io.Serializable;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Deque;
-import java.util.List;
-import java.util.NoSuchElementException;
-
-public abstract class AsyncSinkWriter<InputT, RequestEntryT extends Serializable>
- implements SinkWriter<InputT, Void, Collection<RequestEntryT>> {
-
- private static final Logger LOG = LoggerFactory.getLogger(AsyncSinkWriter.class);
-
- private final MailboxExecutor mailboxExecutor;
- private final Sink.ProcessingTimeService timeService;
-
- private static final int MAX_BATCH_SIZE = 150; // just for testing purposes
- private static final int MAX_IN_FLIGHT_REQUESTS = 1; // just for testing purposes
- private static final int MAX_BUFFERED_REQUESTS_ENTRIES = 1000; // just for testing purposes
-
- public AsyncSinkWriter(
- ElementConverter<InputT, RequestEntryT> elementConverter, Sink.InitContext context) {
- this.elementConverter = elementConverter;
- this.mailboxExecutor = context.getMailboxExecutor();
- this.timeService = context.getProcessingTimeService();
- }
-
- /**
- * The ElementConverter provides a mapping between for the elements of a stream to request
- * entries that can be sent to the destination.
- *
- * <p>The resulting request entry is buffered by the AsyncSinkWriter and sent to the destination
- * when the {@code submitRequestEntries} method is invoked.
- */
- private final ElementConverter<InputT, RequestEntryT> elementConverter;
-
- /**
- * This method specifies how to persist buffered request entries into the destination. It is
- * implemented when support for a new destination is added.
- *
- * <p>The method is invoked with a set of request entries according to the buffering hints (and
- * the valid limits of the destination). The logic then needs to create and execute the request
- * against the destination (ideally by batching together multiple request entries to increase
- * efficiency). The logic also needs to identify individual request entries that were not
- * persisted successfully and resubmit them using the {@code requeueFailedRequestEntry} method.
- *
- * <p>During checkpointing, the sink needs to ensure that there are no outstanding in-flight
- * requests.
- *
- * @param requestEntries a set of request entries that should be sent to the destination
- * @param requestResult a ResultFuture that needs to be completed once all request entries that
- * have been passed to the method on invocation have either been successfully persisted in
- * the destination or have been re-queued through {@code requestResult}
- */
- protected abstract void submitRequestEntries(
- List<RequestEntryT> requestEntries, ResultFuture<RequestEntryT> requestResult);
-
- /**
- * Buffer to hold request entries that should be persisted into the destination.
- *
- * <p>A request entry contain all relevant details to make a call to the destination. Eg, for
- * Kinesis Data Streams a request entry contains the payload and partition key.
- *
- * <p>It seems more natural to buffer InputT, ie, the events that should be persisted, rather
- * than RequestEntryT. However, in practice, the response of a failed request call can make it
- * very hard, if not impossible, to reconstruct the original event. It is much easier, to just
- * construct a new (retry) request entry from the response and add that back to the queue for
- * later retry.
- */
- private final Deque<RequestEntryT> bufferedRequestEntries = new ArrayDeque<>();
-
- /**
- * Tracks all pending async calls that have been executed since the last checkpoint. Calls that
- * completed (successfully or unsuccessfully) are automatically decrementing the counter. Any
- * request entry that was not successfully persisted needs to be handled and retried by the
- * logic in {@code submitRequestsToApi}.
- *
- * <p>There is a limit on the number of concurrent (async) requests that can be handled by the
- * client library. This limit is enforced by checking the queue size before accepting a new
- * element into the queue.
- *
- * <p>To complete a checkpoint, we need to make sure that no requests are in flight, as they may
- * fail, which could then lead to data loss.
- */
- private int inFlightRequestsCount;
-
- @Override
- public void write(InputT element, Context context) throws IOException, InterruptedException {
- // blocks if too many elements have been buffered
- while (bufferedRequestEntries.size() >= MAX_BUFFERED_REQUESTS_ENTRIES) {
- mailboxExecutor.yield();
- }
-
- bufferedRequestEntries.add(elementConverter.apply(element, context));
-
- // blocks if too many async requests are in flight
- flush();
- }
-
- /**
- * Persists buffered RequestsEntries into the destination by invoking {@code
- * submitRequestEntries} with batches according to the user specified buffering hints.
- *
- * <p>The method blocks if too many async requests are in flight.
- */
- private void flush() throws InterruptedException {
- while (bufferedRequestEntries.size() >= MAX_BATCH_SIZE) {
-
- // create a batch of request entries that should be persisted in the destination
- ArrayList<RequestEntryT> batch = new ArrayList<>(MAX_BATCH_SIZE);
-
- while (batch.size() <= MAX_BATCH_SIZE && !bufferedRequestEntries.isEmpty()) {
- try {
- batch.add(bufferedRequestEntries.remove());
- } catch (NoSuchElementException e) {
- // if there are not enough elements, just create a smaller batch
- break;
- }
- }
-
- ResultFuture<RequestEntryT> requestResult =
- failedRequestEntries -> mailboxExecutor.execute(
- () -> completeRequest(failedRequestEntries),
- "Mark in-flight request as completed and requeue %d request entries",
- failedRequestEntries.size());
-
- while (inFlightRequestsCount >= MAX_IN_FLIGHT_REQUESTS) {
- mailboxExecutor.yield();
- }
-
- inFlightRequestsCount++;
- submitRequestEntries(batch, requestResult);
- }
- }
-
- /**
- * Marks an in-flight request as completed and prepends failed requestEntries back to the
- * internal requestEntry buffer for later retry.
- *
- * @param failedRequestEntries requestEntries that need to be retried
- */
- private void completeRequest(Collection<RequestEntryT> failedRequestEntries) {
- inFlightRequestsCount--;
-
- // By just iterating through failedRequestEntries, it reverses the order of the
- // failedRequestEntries. It doesn't make a difference for kinesis:putRecords, as the api
- // does not make any order guarantees, but may cause avoidable reorderings for other
- // destinations.
- failedRequestEntries.forEach(bufferedRequestEntries::addFirst);
- }
-
- /**
- * In flight requests will be retried if the sink is still healthy. But if in-flight requests
- * fail after a checkpoint has been triggered and Flink needs to recover from the checkpoint,
- * the (failed) in-flight requests are gone and cannot be retried. Hence, there cannot be any
- * outstanding in-flight requests when a commit is initialized.
- *
- * <p>To this end, all in-flight requests need to completed before proceeding with the commit.
- */
- @Override
- public List<Void> prepareCommit(boolean flush) throws IOException, InterruptedException {
- if (flush) {
- flush();
- }
-
- // wait until all in-flight requests completed
- while (inFlightRequestsCount > 0) {
- mailboxExecutor.yield();
- }
-
- return Collections.emptyList();
- }
-
- /**
- * All in-flight requests that are relevant for the snapshot have been completed, but there may
- * still be request entries in the internal buffers that are yet to be sent to the endpoint.
- * These request entries are stored in the snapshot state so that they don't get lost in case of
- * a failure/restart of the application.
- */
- @Override
- public List<Collection<RequestEntryT>> snapshotState() throws IOException {
- return Arrays.asList(bufferedRequestEntries);
- }
-
- @Override
- public void close() throws Exception {}
-}
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java
deleted file mode 100644
index ee6d7da..0000000
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ElementConverter.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.connector.base.sink.writer;
-
-import org.apache.flink.api.connector.sink.SinkWriter;
-
-import java.io.Serializable;
-
-/**
- * This interface specifies the mapping between elements of a stream to request entries that can be
- * sent to the destination. The mapping is provided by the end-user of a sink, not the sink creator.
- *
- * <p>The request entries contain all relevant information required to create and sent the actual
- * request. Eg, for Kinesis Data Streams, the request entry includes the payload and the partition
- * key.
- */
-public interface ElementConverter<InputT, RequestEntryT> extends Serializable {
- RequestEntryT apply(InputT element, SinkWriter.Context context);
-}
diff --git a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ResultFuture.java b/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ResultFuture.java
deleted file mode 100644
index 05ecc27..0000000
--- a/flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/ResultFuture.java
+++ /dev/null
@@ -1,29 +0,0 @@
-package org.apache.flink.connector.base.sink.writer;
-
-import java.util.Collection;
-
-/**
- * The entire request may fail or single request entries that are part of the request may not be
- * persisted successfully, eg, because of network issues or service side throttling. All request
- * entries that failed with transient failures need to be re-queued with this method so that aren't
- * lost and can be retried later.
- *
- * <p>Request entries that are causing the same error in a reproducible manner, eg, ill-formed
- * request entries, must not be re-queued but the error needs to be handled in the logic of {@code
- * submitRequestEntries}. Otherwise these request entries will be retried indefinitely, always
- * causing the same error.
- *
- * @param <RequestEntryT>
- */
-public interface ResultFuture<RequestEntryT> {
- /**
- * Completes the result future.
- *
- * <p>The result future must only be completed when the request sent to the endpoint completed
- * (sucessfully or unsuccessfully). Request entries that were not persisted successfully must be
- * included in the {@code failedRequestEntries} parameter, so that they can be retried later.
- *
- * @param failedRequestEntries Request entries that need to be retried at a later point
- */
- void complete(Collection<RequestEntryT> failedRequestEntries);
-}
diff --git a/flink-connectors/flink-connector-base/src/main/resources/log4j2.properties b/flink-connectors/flink-connector-base/src/main/resources/log4j2.properties
deleted file mode 100644
index 32c696e..0000000
--- a/flink-connectors/flink-connector-base/src/main/resources/log4j2.properties
+++ /dev/null
@@ -1,25 +0,0 @@
-################################################################################
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-
-rootLogger.level = INFO
-rootLogger.appenderRef.console.ref = ConsoleAppender
-
-appender.console.name = ConsoleAppender
-appender.console.type = CONSOLE
-appender.console.layout.type = PatternLayout
-appender.console.layout.pattern = %d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
diff --git a/flink-connectors/flink-connector-kinesis-171/pom.xml b/flink-connectors/flink-connector-kinesis-171/pom.xml
deleted file mode 100644
index 1ef46c0..0000000
--- a/flink-connectors/flink-connector-kinesis-171/pom.xml
+++ /dev/null
@@ -1,149 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
-
- <parent>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connectors</artifactId>
- <version>1.14-SNAPSHOT</version>
- <relativePath>..</relativePath>
- </parent>
-
- <artifactId>flink-connector-kinesis-171_${scala.binary.version}</artifactId>
- <name>Flink : Connectors : Kinesis 171</name>
-
- <properties>
- <aws.sdkv2.version>2.16.95</aws.sdkv2.version>
- <log4j.version>2.14.1</log4j.version>
- </properties>
-
- <packaging>jar</packaging>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-base</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>software.amazon.awssdk</groupId>
- <artifactId>kinesis</artifactId>
- <version>${aws.sdkv2.version}</version>
- </dependency>
-
- <!-- just for testing purposes -->
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kinesis_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-s3-fs-hadoop</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-slf4j-impl</artifactId>
- <version>${log4j.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-api</artifactId>
- <version>${log4j.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.logging.log4j</groupId>
- <artifactId>log4j-core</artifactId>
- <version>${log4j.version}</version>
- </dependency>
-
- <!-- Flink ecosystem -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <!-- Test dependencies -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-core</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- <type>test-jar</type>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-tests</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-test-utils_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-runtime_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.testcontainers</groupId>
- <artifactId>testcontainers</artifactId>
- <version>1.15.3</version>
- <scope>test</scope>
- </dependency>
-
- </dependencies>
-
-</project>
diff --git a/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/AmazonKinesisDataStreamSink.java b/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/AmazonKinesisDataStreamSink.java
deleted file mode 100644
index 63564c9..0000000
--- a/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/AmazonKinesisDataStreamSink.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package software.amazon.flink.connectors;
-
-import org.apache.flink.api.connector.sink.Sink;
-import org.apache.flink.api.connector.sink.SinkWriter;
-import org.apache.flink.connector.base.sink.AsyncSinkBase;
-import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
-import org.apache.flink.connector.base.sink.writer.ElementConverter;
-import org.apache.flink.connector.base.sink.writer.ResultFuture;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import software.amazon.awssdk.core.SdkBytes;
-import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
-import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
-import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
-import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
-import software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.CompletableFuture;
-
-public class AmazonKinesisDataStreamSink<InputT> extends AsyncSinkBase<InputT, PutRecordsRequestEntry> {
-
- private static final Logger LOG = LoggerFactory.getLogger(AmazonKinesisDataStreamSink.class);
-
- private final String streamName;
- private static final KinesisAsyncClient client = KinesisAsyncClient.create();
- private final ElementConverter<InputT, PutRecordsRequestEntry> elementConverter;
-
- /**
- * Basic service properties and limits. Supported requests per sec, max batch size, max items per batch, etc.
- */
- private Object serviceProperties;
-
- private final ElementConverter<InputT, PutRecordsRequestEntry> SIMPLE_STRING_ELEMENT_CONVERTER =
- (element, context) -> PutRecordsRequestEntry
- .builder()
- .data(SdkBytes.fromUtf8String(element.toString()))
- .partitionKey(String.valueOf(element.hashCode()))
- .build();
-
- @Override
- public SinkWriter<InputT, Void, Collection<PutRecordsRequestEntry>> createWriter(InitContext context, List<Collection<PutRecordsRequestEntry>> states) throws IOException {
- return new AmazonKinesisDataStreamWriter(context);
- }
-
- public AmazonKinesisDataStreamSink(String streamName, ElementConverter<InputT, PutRecordsRequestEntry> elementConverter, KinesisAsyncClient client) {
- this.streamName = streamName;
- this.elementConverter = elementConverter;
-// this.client = client;
-
- // verify that user supplied buffering strategy respects service specific limits
- }
-
- public AmazonKinesisDataStreamSink(String streamName) {
- this.streamName = streamName;
- this.elementConverter = SIMPLE_STRING_ELEMENT_CONVERTER;
-// this.client = KinesisAsyncClient.create();
- }
-
- private class AmazonKinesisDataStreamWriter extends AsyncSinkWriter<InputT, PutRecordsRequestEntry> {
-
- public AmazonKinesisDataStreamWriter(Sink.InitContext context) {
- super(elementConverter, context);
- }
-
- @Override
- protected void submitRequestEntries(List<PutRecordsRequestEntry> requestEntries, ResultFuture<PutRecordsRequestEntry> requestResult) {
- // create a batch request
- PutRecordsRequest batchRequest = PutRecordsRequest
- .builder()
- .records(requestEntries)
- .streamName(streamName)
- .build();
-
- LOG.info("submitRequestEntries: putRecords");
-
- // call api with batch request
- CompletableFuture<PutRecordsResponse> future = client.putRecords(batchRequest);
-
- // re-queue elements of failed requests
- future.whenComplete((response, err) -> {
- if (err != null) {
- LOG.warn("kinesis:PutRecords request failed: ", err);
-
- requestResult.complete(requestEntries);
-
- return;
- }
-
- if (response.failedRecordCount() > 0) {
- LOG.info("Re-queueing {} messages", response.failedRecordCount());
-
- ArrayList<PutRecordsRequestEntry> failedRequestEntries = new ArrayList<>(response.failedRecordCount());
- List<PutRecordsResultEntry> records = response.records();
-
- for (int i = 0; i < records.size(); i++) {
- if (records.get(i).errorCode() != null) {
- failedRequestEntries.add(requestEntries.get(i));
- }
- }
-
- requestResult.complete(failedRequestEntries);
- } else {
- requestResult.complete(Collections.emptyList());
- }
- });
- }
- }
-}
diff --git a/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/Test.java b/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/Test.java
deleted file mode 100644
index 9f81567..0000000
--- a/flink-connectors/flink-connector-kinesis-171/src/main/java/software/amazon/flink/connectors/Test.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package software.amazon.flink.connectors;
-
-import org.apache.flink.api.common.serialization.SimpleStringSchema;
-import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
-import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
-
-import java.util.Properties;
-
-public class Test {
-
- public static void main(String[] args) throws Exception {
- // set up the streaming execution environment
- final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
-
- env.enableCheckpointing(10_000);
-
-// DataStream<String> stream = env.readTextFile("s3://shausma-nyc-tlc/yellow-trip-data/taxi-trips.json/dropoff_year=2010/part-00000-cdac5fe4-b823-4576-aeb7-7327b077476e.c000.json");
-
- Properties consumerConfig = new Properties();
- consumerConfig.put(ConsumerConfigConstants.AWS_REGION, "eu-west-1");
- consumerConfig.put(ConsumerConfigConstants.AWS_CREDENTIALS_PROVIDER, "AUTO");
- consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON");
-
- DataStream<String> stream = env.addSource(new FlinkKinesisConsumer<>("test-in", new SimpleStringSchema(), consumerConfig));
-
- stream.sinkTo(new AmazonKinesisDataStreamSink<>("test-out"));
-
- /*
- * Here, you can start creating your execution plan for Flink.
- *
- * Start with getting some data from the environment, like
- * env.readTextFile(textPath);
- *
- * then, transform the resulting DataStream<String> using operations
- * like
- * .filter()
- * .flatMap()
- * .join()
- * .coGroup()
- *
- * and many more.
- * Have a look at the programming guide for the Java API:
- *
- * https://flink.apache.org/docs/latest/apis/streaming/index.html
- *
- */
-
- // execute program
- env.execute("Flink Streaming Java API Skeleton");
- }
-
-}
diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml
index 3e9a500..afcd587 100644
--- a/flink-connectors/pom.xml
+++ b/flink-connectors/pom.xml
@@ -54,7 +54,6 @@
<module>flink-connector-kafka</module>
<module>flink-connector-gcp-pubsub</module>
<module>flink-connector-kinesis</module>
- <module>flink-connector-kinesis-171</module>
<module>flink-connector-base</module>
<module>flink-file-sink-common</module>
<module>flink-connector-files</module>