[FLINK-29907][Connectors/Kinesis][Connectors/Firehose] Externalize Kinesis/Firehose documentation
diff --git a/docs/content.zh/docs/connectors/datastream/dynamodb.md b/docs/content.zh/docs/connectors/datastream/dynamodb.md
index e0e22d9..106fd6c 100644
--- a/docs/content.zh/docs/connectors/datastream/dynamodb.md
+++ b/docs/content.zh/docs/connectors/datastream/dynamodb.md
@@ -31,7 +31,7 @@
 
 To use the connector, add the following Maven dependency to your project:
 
-{{< connector_artifact flink-connector-dynamodb 3.0.0 >}}
+{{< connector_artifact flink-connector-dynamodb 4.0.0 >}}
 
 {{< tabs "ec24a4ae-6a47-11ed-a1eb-0242ac120002" >}}
 {{< tab "Java" >}}
diff --git a/docs/content.zh/docs/connectors/datastream/firehose.md b/docs/content.zh/docs/connectors/datastream/firehose.md
new file mode 100644
index 0000000..cbd3d2e
--- /dev/null
+++ b/docs/content.zh/docs/connectors/datastream/firehose.md
@@ -0,0 +1,201 @@
+---
+title: Firehose
+weight: 5
+type: docs
+aliases:
+- /zh/dev/connectors/firehose.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Amazon Kinesis Data Firehose Sink
+
+The Firehose sink writes to [Amazon Kinesis Data Firehose](https://aws.amazon.com/kinesis/data-firehose/).
+
+Follow the instructions from the [Amazon Kinesis Data Firehose Developer Guide](https://docs.aws.amazon.com/firehose/latest/dev/basic-create.html)
+to setup a Kinesis Data Firehose delivery stream.
+
+To use the connector, add the following Maven dependency to your project:
+
+{{< artifact flink-connector-aws-kinesis-firehose >}}
+
+{{< connector_artifact flink-connector-aws-kinesis-firehose 4.0.0 >}}
+
+{{< py_download_link "aws-kinesis-firehose" >}}
+
+The `KinesisFirehoseSink` uses [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html) to write data from a Flink stream into a Firehose delivery stream.
+
+{{< tabs "42vs28vdth5-nm76-6dz1-5m7s-5y345bu56se5u66je" >}}
+{{< tab "Java" >}}
+```java
+Properties sinkProperties = new Properties();
+// Required
+sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
+// Optional, provide via alternative routes e.g. environment variables
+sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
+sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
+
+KinesisFirehoseSink<String> kdfSink =
+    KinesisFirehoseSink.<String>builder()
+        .setFirehoseClientProperties(sinkProperties)      // Required
+        .setSerializationSchema(new SimpleStringSchema()) // Required
+        .setDeliveryStreamName("your-stream-name")        // Required
+        .setFailOnError(false)                            // Optional
+        .setMaxBatchSize(500)                             // Optional
+        .setMaxInFlightRequests(50)                       // Optional
+        .setMaxBufferedRequests(10_000)                   // Optional
+        .setMaxBatchSizeInBytes(4 * 1024 * 1024)          // Optional
+        .setMaxTimeInBufferMS(5000)                       // Optional
+        .setMaxRecordSizeInBytes(1000 * 1024)             // Optional
+        .build();
+
+flinkStream.sinkTo(kdfSink);
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val sinkProperties = new Properties()
+// Required
+sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1")
+// Optional, provide via alternative routes e.g. environment variables
+sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
+sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key")
+
+val kdfSink =
+    KinesisFirehoseSink.<String>builder()
+        .setFirehoseClientProperties(sinkProperties)      // Required
+        .setSerializationSchema(new SimpleStringSchema()) // Required
+        .setDeliveryStreamName("your-stream-name")        // Required
+        .setFailOnError(false)                            // Optional
+        .setMaxBatchSize(500)                             // Optional
+        .setMaxInFlightRequests(50)                       // Optional
+        .setMaxBufferedRequests(10_000)                   // Optional
+        .setMaxBatchSizeInBytes(4 * 1024 * 1024)          // Optional
+        .setMaxTimeInBufferMS(5000)                       // Optional
+        .setMaxRecordSizeInBytes(1000 * 1024)             // Optional
+        .build()
+
+flinkStream.sinkTo(kdfSink)
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+sink_properties = {
+    # Required
+    'aws.region': 'eu-west-1',
+    # Optional, provide via alternative routes e.g. environment variables
+    'aws.credentials.provider.basic.accesskeyid': 'aws_access_key_id',
+    'aws.credentials.provider.basic.secretkey': 'aws_secret_access_key'
+}
+
+kdf_sink = KinesisFirehoseSink.builder() \
+    .set_firehose_client_properties(sink_properties) \     # Required
+    .set_serialization_schema(SimpleStringSchema())  \     # Required
+    .set_delivery_stream_name('your-stream-name') \        # Required
+    .set_fail_on_error(False) \                            # Optional
+    .set_max_batch_size(500) \                             # Optional
+    .set_max_in_flight_requests(50) \                      # Optional
+    .set_max_buffered_requests(10000) \                    # Optional
+    .set_max_batch_size_in_bytes(5 * 1024 * 1024) \        # Optional
+    .set_max_time_in_buffer_ms(5000) \                     # Optional
+    .set_max_record_size_in_bytes(1 * 1024 * 1024) \       # Optional
+    .build()
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+## Configurations
+
+Flink's Firehose sink is created by using the static builder `KinesisFirehoseSink.<InputType>builder()`.
+
+1. __setFirehoseClientProperties(Properties sinkProperties)__
+    * Required.
+    * Supplies credentials, region and other parameters to the Firehose client.
+2. __setSerializationSchema(SerializationSchema<InputType> serializationSchema)__
+    * Required.
+    * Supplies a serialization schema to the Sink. This schema is used to serialize elements before sending to Firehose.
+3. __setDeliveryStreamName(String deliveryStreamName)__
+    * Required.
+    * Name of the delivery stream to sink to.
+4. _setFailOnError(boolean failOnError)_
+    * Optional. Default: `false`.
+    * Whether failed requests to write records to Firehose are treated as fatal exceptions in the sink.
+5. _setMaxBatchSize(int maxBatchSize)_
+    * Optional. Default: `500`.
+    * Maximum size of a batch to write to Firehose.
+6. _setMaxInFlightRequests(int maxInFlightRequests)_
+    * Optional. Default: `50`.
+    * The maximum number of in flight requests allowed before the sink applies backpressure.
+7. _setMaxBufferedRequests(int maxBufferedRequests)_
+    * Optional. Default: `10_000`.
+    * The maximum number of records that may be buffered in the sink before backpressure is applied.
+8. _setMaxBatchSizeInBytes(int maxBatchSizeInBytes)_
+    * Optional. Default: `4 * 1024 * 1024`.
+    * The maximum size (in bytes) a batch may become. All batches sent will be smaller than or equal to this size.
+9. _setMaxTimeInBufferMS(int maxTimeInBufferMS)_
+    * Optional. Default: `5000`.
+    * The maximum time a record may stay in the sink before being flushed.
+10. _setMaxRecordSizeInBytes(int maxRecordSizeInBytes)_
+    * Optional. Default: `1000 * 1024`.
+    * The maximum record size that the sink will accept, records larger than this will be automatically rejected.
+11. _build()_
+    * Constructs and returns the Firehose sink.
+
+
+## Using Custom Firehose Endpoints
+
+It is sometimes desirable to have Flink operate as a consumer or producer against a Firehose VPC endpoint or a non-AWS
+Firehose endpoint such as [Localstack](https://localstack.cloud/); this is especially useful when performing
+functional testing of a Flink application. The AWS endpoint that would normally be inferred by the AWS region set in the
+Flink configuration must be overridden via a configuration property.
+
+To override the AWS endpoint, set the `AWSConfigConstants.AWS_ENDPOINT` and `AWSConfigConstants.AWS_REGION` properties. The region will be used to sign the endpoint URL.
+
+{{< tabs "bcadd466-8416-4d3c-a6a7-c46eee0cbd4a" >}}
+{{< tab "Java" >}}
+```java
+Properties producerConfig = new Properties();
+        producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
+        producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
+        producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
+        producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4566");
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val producerConfig = new Properties()
+producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
+producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
+producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key")
+producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4566")
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+producer_config = {
+    'aws.region': 'us-east-1',
+    'aws.credentials.provider.basic.accesskeyid': 'aws_access_key_id',
+    'aws.credentials.provider.basic.secretkey': 'aws_secret_access_key',
+    'aws.endpoint': 'http://localhost:4566'
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+{{< top >}}
diff --git a/docs/content.zh/docs/connectors/datastream/kinesis.md b/docs/content.zh/docs/connectors/datastream/kinesis.md
new file mode 100644
index 0000000..0856a1e
--- /dev/null
+++ b/docs/content.zh/docs/connectors/datastream/kinesis.md
@@ -0,0 +1,825 @@
+---
+title: Kinesis
+weight: 5
+type: docs
+aliases:
+- /zh/dev/connectors/kinesis.html
+- /zh/apis/streaming/connectors/kinesis.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# 亚马逊 Kinesis 数据流 SQL 连接器
+
+Kinesis 连接器提供访问 [Amazon Kinesis Data Streams](http://aws.amazon.com/kinesis/streams/) 。
+
+使用此连接器, 取决于您是否读取数据和/或写入数据,增加下面依赖项的一个或多个到您的项目中:
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left">KDS Connectivity</th>
+      <th class="text-left">Maven Dependency</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td>Source</td>
+        <td>{{< connector_artifact flink-connector-kinesis 4.0.0 >}}</td>
+    </tr>
+    <tr>
+        <td>Sink</td>
+        <td>{{< connector_artifact flink-connector-aws-kinesis-streams 4.0.0 >}}</td>
+    </tr>
+  </tbody>
+</table>
+
+由于许可证问题,以前的版本中 `flink-connector-kinesis` 工件没有部署到Maven中心库。有关更多信息,请参阅特定版本的文档。
+
+{{< py_download_link "kinesis" >}}
+
+## 使用亚马逊 Kinesis 流服务
+遵循 [Amazon Kinesis Streams Developer Guide](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html) 的指令建立 Kinesis 流。
+
+## 配置用 IAM 访问 Kinesis
+确保创建合适的 IAM 策略允许从 Kinesis 中读取和写入数据。查阅例子 [这里](https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html) 。
+
+取决于您的部署,您可以选择一个不同的证书提供商来允许访问 Kinesis 。
+缺省使用 `AUTO` 的证书供应商。
+如果访问密钥ID和秘密密钥在部署配置中设置,结果使用 `BASIC` 提供商。
+
+可以通过使用 `AWSConfigConstants.AWS_CREDENTIALS_PROVIDER` 选择性地设置一个特定的证书提供商。
+
+支持的证书提供者是:
+
+* `AUTO` - 使用缺省的 AWS Credentials Provider chain 按如下顺序搜索证书: `ENV_VARS`, `SYS_PROPS`, `WEB_IDENTITY_TOKEN`, `PROFILE` 和 EC2/ECS 证书提供者。
+* `BASIC` - 使用访问密钥ID和秘密密钥作为配置。
+* `ENV_VAR` - 使用 `AWS_ACCESS_KEY_ID` & `AWS_SECRET_ACCESS_KEY` 环境变量。
+* `SYS_PROP` - 使用 Java 系统属性 `aws.accessKeyId` and `aws.secretKey`。
+* `PROFILE` - 使用一个 AWS 证书档案创建 AWS 证书。
+* `ASSUME_ROLE` - 通过承担角色创建 AWS 证书。 需要提供承担角色的证书。
+* `WEB_IDENTITY_TOKEN` - 通过使用网络身份令牌承担的角色创建 AWS 证书。
+
+## Kinesis 消费者
+
+`FlinkKinesisConsumer` 是在相同的 AWS 服务区域订阅了多个 AWS Kinesis 流的一个精确一次消费的并行流数据源,可以在作业运行时透明地处理流的重新分片。
+消费者的每个子任务负责从多个Kinesis分片中取数据记录。每个子任务取的分片数量在 Kinesis 关闭和创建分片时发生改变。
+
+从 Kinesis 流中消费数据前,确保所有的流在亚马逊 Kinesis 数据流控制台使用状态 "ACTIVE" 创建。
+
+{{< tabs "58b6c235-48ee-4cf7-aabc-41e0679a3370" >}}
+{{< tab "Java" >}}
+```java
+Properties consumerConfig = new Properties();
+consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
+consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
+consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
+consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
+
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
+    "kinesis_stream_name", new SimpleStringSchema(), consumerConfig));
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val consumerConfig = new Properties()
+consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
+consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
+consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key")
+consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST")
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+val kinesis = env.addSource(new FlinkKinesisConsumer[String](
+    "kinesis_stream_name", new SimpleStringSchema, consumerConfig))
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+consumer_config = {
+    'aws.region': 'us-east-1',
+    'aws.credentials.provider.basic.accesskeyid': 'aws_access_key_id',
+    'aws.credentials.provider.basic.secretkey': 'aws_secret_access_key',
+    'flink.stream.initpos': 'LATEST'
+}
+
+env = StreamExecutionEnvironment.get_execution_environment()
+
+kinesis = env.add_source(FlinkKinesisConsumer("stream-1", SimpleStringSchema(), consumer_config))
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+上面是一个使用消费者的简单例子。 消费者使用 `java.util.Properties` 实例配置,配置的健可以在 `AWSConfigConstants` (AWS 特定的参数)
+和 `ConsumerConfigConstants` (Kinesis 消费者参数)。这个例子演示了在 AWS区域 "us-east-1" 消费一个 Kinesis流。AWS 证书使用基本的方法提供:AWS访问密钥
+ID和秘密访问密钥直接在配置中提供。数据从 Kinesis流中最新的位置消费(另一个选项是把 `ConsumerConfigConstants.STREAM_INITIAL_POSITION`
+设置为 `TRIM_HORIZON` ,让消费者从 Kinesis 流中可能的最早记录开始读取)。
+
+别的消费者的可选配置项可以在 `ConsumerConfigConstants` 找到。
+
+请注意,Flink Kinesis 消费源的配置并行度可以完全独立于 Kinesis 流的总的分片数。
+当分片数大于消费者的并行度时,每个消费者子任务可以订阅多个分片;否则
+如果分片的数量小于消费者的并行度,那么一些消费者子任务将处于空闲状态,并等待它被分配
+新的分片(即当流被重新分片以增加用于更高配置的 Kinesis 服务吞吐量的分片数)。
+
+也请注意,默认基于分片和流名称的哈希分配分片给子任务,
+这将或多或少地平衡子任务之间的分片。
+然而,假设在流上使用默认的 Kinesis 分片管理(UpdateShardCount 用 `UNIFORM_SCALING`),
+将 `UniformShardAssigner` 设置为消费者的分片分配器将更均匀地将分片分配到子任务。
+假设传入的 Kinesis 记录被分配了随机的 Kinesis `PartitionKey` 或 `ExplicitHashKey` 值,结果是一致的子任务加载。
+如果默认分配器和 `UniformShardAssigner` 都不够,则可以设置自定义实现的 `KinesisShardAssigner` 。
+
+### `DeserializationSchema`
+
+Flink Kinesis消费者还需要一个模式来了解如何将 Kinesis 数据流中的二进制数据转换为 Java 对象。
+`KinesisDeserializationSchema` 允许用户指定这样的模式。`T deserialize(byte[] recordValue, String partitionKey, String seqNum, long approxArrivalTimestamp, String stream, String shardId)` 
+方法让每个 Kinesis 记录被调用。
+
+为了方便, Flink 提供下面开箱即用的模式:
+  
+1. `TypeInformationSerializationSchema` 基于 Flink 的 `TypeInformation` 创建一个模式。
+    如果数据被Flink读和写,是有用的。
+	这是替代其它通用序列化方法的性能良好的 Flink特定的模式。
+    
+2. `GlueSchemaRegistryJsonDeserializationSchema` 提供查找写模式的能力(模式用于写记录)。
+   在 [AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html). 使用此模式, 反序列化模式记录将被
+   从 AWS Glue Schema Registry 中检索到的模式读取并将其转换为用手动提供的模式描述通用化的记录 `com.amazonaws.services.schemaregistry.serializers.json.JsonDataWithSchema` 
+   或者 [mbknor-jackson-jsonSchema](https://github.com/mbknor/mbknor-jackson-jsonSchema) 生成的一个 JAVA POJO。
+
+   <br>要使用此反序列化模式,必须添加以下附加依赖项:
+   
+{{< tabs "8c6721c7-4a48-496e-b0fe-6522cf6a5e13" >}}
+{{< tab "GlueSchemaRegistryJsonDeserializationSchema" >}}
+{{< artifact flink-jsonschema-glue-schema-registry >}}
+{{< /tab >}}
+{{< /tabs >}}
+    
+3. `AvroDeserializationSchema` 它使用静态提供的模式读取以 Avro 格式序列化的数据。它可以
+    推断从 Avro 生成的类的模式( `AvroDeserializationSchema.forSpecific(...)` ) 或者它可以与 `GenericRecords` 一起使用
+    一个手动提供的模式(使用 `AvroDeserializationSchema.forGeneric(...)`)。这个反序列化的模式期望
+    序列化模式记录没有包含嵌入的模式。
+
+    - 您可以使用 [AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html)
+      检索写模式。类似地,反序列化记录将用来自 AWS Glue Schema Registry的模式读取并转换
+	  (通过 `GlueSchemaRegistryAvroDeserializationSchema.forGeneric(...)` 或 `GlueSchemaRegistryAvroDeserializationSchema.forSpecific(...)` )。
+	  更多 AWS Glue Schema Registry 和 Apache Flink 集成的信息参阅
+      [Use Case: Amazon Kinesis Data Analytics for Apache Flink](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kinesis-data-analytics-apache-flink).
+
+    <br>要使用此反序列化模式,必须添加以下附加依赖项:
+    
+{{< tabs "8c6721c7-4a48-496e-b0fe-6522cf6a5e13" >}}
+{{< tab "AvroDeserializationSchema" >}}
+{{< artifact flink-avro >}}
+{{< /tab >}}
+{{< tab "GlueSchemaRegistryAvroDeserializationSchema" >}}
+{{< artifact flink-avro-glue-schema-registry >}}
+{{< /tab >}}
+{{< /tabs >}}
+
+### 配置开始位置
+
+Flink Kinesis Consumer 当前提供如下的选项来配置读取 Kinesis 流的开始位置,只需在提供的配置属性中设置 `ConsumerConfigConstants.STREAM_INITIAL_POSITION` 
+为以下值之一(选项的命名完全遵循 [the namings used by the AWS Kinesis Streams service](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax) ):
+
+您可以配置源表通过 `scan.stream.initpos` 选项从一个特定的位置开始读取一个 Kinesis 数据流支持的表。
+
+- `LATEST`: 读取从最新的记录开始的分片。
+- `TRIM_HORIZON`: 读取从最早可能的记录开始的分片(取决于保留的设置,数据可能被 Kinesis 修剪)。
+- `AT_TIMESTAMP`: 读取从一个特定的时间戳开始的分片。时间戳也必须在配置中指定属性,为 `ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP` 提供值,采用以下日期模式之一:
+   - 一个非负双精度值表示从 Unix 纪元开始的秒的数量 (例如,`1459799926.480`)。
+   - 一个用户定义的模式,`ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT` 提供的 `SimpleDateFormat` 有效模式,
+     如果 `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT` 未被定义,那么默认的模式是 `yyyy-MM-dd'T'HH:mm:ss.SSSXXX`。
+	 例如,时间戳的值是 `2016-04-04` 而用户定义的模式是 `yyyy-MM-dd` 或者时间戳的值是 `2016-04-04T19:58:46.480-00:00` 而用户未定义模式。
+
+### 精确一次的用户定义状态更新语义的容错性
+
+启用 Flink 的检查点后,Flink Kinesis Consuemer 将消费 Kinesis 流中分片的记录,并且
+定期检查每个分片的进度。如果作业失败, Flink 会将流程序恢复到
+最新完成的检查点的状态,并从存储在检查点中的程序开始重新消费 Kinesis 分片中的记录。
+
+产生检查点的间隔定义了程序在出现故障时最多需要返回的量。
+
+要使用容错的 Kinesis Consumers,需要在执行环境中启用拓扑检查点:
+
+{{< tabs "b1399ed7-5855-446d-9684-7a49de9b4c97" >}}
+{{< tab "Java" >}}
+```java
+final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(5000); // checkpoint every 5000 msecs
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // checkpoint every 5000 msecs
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+env = StreamExecutionEnvironment.get_execution_environment()
+env.enable_checkpointing(5000) # checkpoint every 5000 msecs
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+还请注意,只有当有足够的处理槽可用于重新启动拓扑时, Flink 才能重新启动拓扑。
+因此,如果拓扑由于 TaskManager 的丢失而失败,那么之后必须仍然有足够的可用插槽。
+Flink on YARN 支持自动重启丢失的 YARN 容器。
+
+### 使用增强型扇出
+
+[Enhanced Fan-Out (EFO)](https://aws.amazon.com/blogs/aws/kds-enhanced-fanout/) 增加了每个 Kinesis 数据流的并行消费者的最大数量。
+没有 EFO , 所有的并行 Kinesis 消费者共享一个单一的按分片分配的读取配额。
+有了 EFO , 每个消费者获取一个不同的专用的按分片分配的读取配额,允许读吞吐量按消费者的数量放大。
+使用 EFO 将 [incur additional cost](https://aws.amazon.com/kinesis/data-streams/pricing/) 。
+
+为了启动 EFO ,两个附加的配置参数是需要的:
+
+- `RECORD_PUBLISHER_TYPE`: 确定是使用 `EFO` 还是 `POLLING`. 默认 `RecordPublisher` 是 `POLLING` 。
+- `EFO_CONSUMER_NAME`: 用于识别消费者的名字。 
+对于给定的 Kinesis 数据流,每个消费者必须具有唯一的名称。
+然而,消费者名称在数据流中不必是唯一的。
+重用消费名称将导致现有订阅终止。
+
+下面的代码片段显示了配置 EFO 消费者的简单示例。
+
+{{< tabs "42345893-70c3-4678-a348-4c419b337eb1" >}}
+{{< tab "Java" >}}
+```java
+Properties consumerConfig = new Properties();
+consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
+consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
+
+consumerConfig.put(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, 
+    ConsumerConfigConstants.RecordPublisherType.EFO.name());
+consumerConfig.put(ConsumerConfigConstants.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
+
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
+    "kinesis_stream_name", new SimpleStringSchema(), consumerConfig));
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val consumerConfig = new Properties()
+consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
+consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST")
+
+consumerConfig.put(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, 
+    ConsumerConfigConstants.RecordPublisherType.EFO.name());
+consumerConfig.put(ConsumerConfigConstants.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+
+val kinesis = env.addSource(new FlinkKinesisConsumer[String](
+    "kinesis_stream_name", new SimpleStringSchema, consumerConfig))
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+consumer_config = {
+    'aws.region': 'us-east-1',
+    'flink.stream.initpos': 'LATEST',
+    'flink.stream.recordpublisher':  'EFO',
+    'flink.stream.efo.consumername': 'my-flink-efo-consumer'
+}
+
+env = StreamExecutionEnvironment.get_execution_environment()
+
+kinesis = env.add_source(FlinkKinesisConsumer(
+    "kinesis_stream_name", SimpleStringSchema(), consumer_config))
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+#### EFO Stream Consumer 注册/注销
+
+为了使用EFO,必须针对您希望使用的每个流注册流消费者。
+默认情况下,`FlinkKinesisConsumer` 将在 Flink 作业启动时自动注册流消费者。
+流消费者将使用 `EFO_CONSUMER_NAME` 配置提供的名称注册。
+`FlinkKinesisConsumer` 提供了三种注册策略:
+
+- 注册
+  - `LAZY` (默认): Flink作业开始运行时,将注册流消费者。            
+    如果流消费者已经存在,则将重用它。
+    这是大多数应用程序的首选策略。
+    然而,并行度大于1的作业将导致任务竞争注册和获取流消费者 ARN。
+    对于并行度非常大的作业,这可能会导致启动时间增加。
+    描述操作限制为20 [transactions per second](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStreamConsumer.html),
+    这意味着应用程序启动时间将增加大约 `parallelism/20 seconds`。 
+  - `EAGER`: 流消费者在 `FlinkKinesisConsumer` 构造函数中注册。
+    如果流消费者已经存在,则将重用它。
+    这将导致在构建作业时发生注册,
+    在 Flink JobManager 或客户端环境中提交作业。
+    使用此策略会导致单线程注册和检索流消费者 ARN,
+    通过 `LAZY`(具有大并行度)减少启动时间。
+    然而,请考虑客户端环境将需要访问 AWS 服务。
+  - `NONE`: 流消费者注册不是由 `FlinkKinesisConsumer` 执行的。
+    必须使用 [AWS CLI或SDK] 在外部执行注册(https://aws.amazon.com/tools/) 。
+    调用 [RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html) 。
+    应通过消费者配置向作业提供流消费者 ARN。
+- 注销
+  - `LAZY|EAGER` (默认):当作业正常关闭时,将注销流消费者。
+    如果作业在执行关闭钩子程序时终止,流消费者将保持活动状态。
+    在这种情况下,当应用程序重新启动时,流消费者将被优雅地重用。
+  - `NONE`: 流消费者注销不是由 `FlinkKinesisConsumer` 执行的。
+
+下面是一个使用 `EAGER` 注册策略的配置例子:
+
+{{< tabs "a85d716b-6c1c-46d8-9ee4-12d8380a0c06" >}}
+{{< tab "Java" >}}
+```java
+Properties consumerConfig = new Properties();
+consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
+consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
+
+consumerConfig.put(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, 
+    ConsumerConfigConstants.RecordPublisherType.EFO.name());
+consumerConfig.put(ConsumerConfigConstants.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
+
+consumerConfig.put(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, 
+    ConsumerConfigConstants.EFORegistrationType.EAGER.name());
+
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
+    "kinesis_stream_name", new SimpleStringSchema(), consumerConfig));
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val consumerConfig = new Properties()
+consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
+consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST")
+
+consumerConfig.put(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, 
+    ConsumerConfigConstants.RecordPublisherType.EFO.name());
+consumerConfig.put(ConsumerConfigConstants.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
+
+consumerConfig.put(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, 
+    ConsumerConfigConstants.EFORegistrationType.EAGER.name());
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+
+val kinesis = env.addSource(new FlinkKinesisConsumer[String](
+    "kinesis_stream_name", new SimpleStringSchema, consumerConfig))
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+consumer_config = {
+    'aws.region': 'us-east-1',
+    'flink.stream.initpos': 'LATEST',
+    'flink.stream.recordpublisher':  'EFO',
+    'flink.stream.efo.consumername': 'my-flink-efo-consumer',
+    'flink.stream.efo.registration': 'EAGER'
+}
+
+env = StreamExecutionEnvironment.get_execution_environment()
+
+kinesis = env.add_source(FlinkKinesisConsumer(
+    "kinesis_stream_name", SimpleStringSchema(), consumer_config))
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+下面是一个使用 `NONE` 注册策略的配置例子:
+
+{{< tabs "00b46c87-7740-4263-8040-2aa7e2960513" >}}
+{{< tab "Java" >}}
+```java
+Properties consumerConfig = new Properties();
+consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
+consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
+
+consumerConfig.put(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, 
+    ConsumerConfigConstants.RecordPublisherType.EFO.name());
+consumerConfig.put(ConsumerConfigConstants.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
+
+consumerConfig.put(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, 
+    ConsumerConfigConstants.EFORegistrationType.NONE.name());
+consumerConfig.put(ConsumerConfigConstants.efoConsumerArn("stream-name"), 
+    "arn:aws:kinesis:<region>:<account>>:stream/<stream-name>/consumer/<consumer-name>:<create-timestamp>");
+
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
+    "kinesis_stream_name", new SimpleStringSchema(), consumerConfig));
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val consumerConfig = new Properties()
+consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
+consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST")
+
+consumerConfig.put(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, 
+    ConsumerConfigConstants.RecordPublisherType.EFO.name());
+consumerConfig.put(ConsumerConfigConstants.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
+
+consumerConfig.put(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, 
+    ConsumerConfigConstants.EFORegistrationType.NONE.name());
+consumerConfig.put(ConsumerConfigConstants.efoConsumerArn("stream-name"), 
+    "arn:aws:kinesis:<region>:<account>>:stream/<stream-name>/consumer/<consumer-name>:<create-timestamp>");
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+
+val kinesis = env.addSource(new FlinkKinesisConsumer[String](
+    "kinesis_stream_name", new SimpleStringSchema, consumerConfig))
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+consumer_config = {
+    'aws.region': 'us-east-1',
+    'flink.stream.initpos': 'LATEST',
+    'flink.stream.recordpublisher':  'EFO',
+    'flink.stream.efo.consumername': 'my-flink-efo-consumer',
+    'flink.stream.efo.consumerarn.stream-name':
+        'arn:aws:kinesis:<region>:<account>>:stream/<stream-name>/consumer/<consumer-name>:<create-timestamp>'
+}
+
+env = StreamExecutionEnvironment.get_execution_environment()
+
+kinesis = env.add_source(FlinkKinesisConsumer(
+    "kinesis_stream_name", SimpleStringSchema(), consumer_config))
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+### 消费记录的事件时间
+
+如果流拓扑选择使用 [event time notion]({{< ref "docs/concepts/time" >}}) 进行记录
+时间戳,默认情况下将使用 *approximate arrival timestamp*。一旦记录被流成功接收和存储,这个时间戳就会被 Kinesis 附加到记录上。
+请注意,此时间戳通常称为 Kinesis 服务器端时间戳,并且不能保证准确性或顺序正确性(即时间戳可能并不总是升序)。
+
+用户可以选择使用自定义时间戳覆盖此默认值如描述 [here]({{< ref "docs/dev/datastream/event-time/generating_watermarks" >}}),
+或者使用 [predefined ones]({{< ref "docs/dev/datastream/event-time/built_in" >}}) 中的一个文件。之后,它可以通过以下方式传递给消费者:
+
+{{< tabs "8fbaf5cb-3b76-4c62-a74e-db51b60f6600" >}}
+{{< tab "Java" >}}
+```java
+FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>(
+    "kinesis_stream_name",
+    new SimpleStringSchema(),
+    kinesisConsumerConfig);
+consumer.setPeriodicWatermarkAssigner(new CustomAssignerWithPeriodicWatermarks());
+DataStream<String> stream = env
+	.addSource(consumer)
+	.print();
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val consumer = new FlinkKinesisConsumer[String](
+    "kinesis_stream_name",
+    new SimpleStringSchema(),
+    kinesisConsumerConfig);
+consumer.setPeriodicWatermarkAssigner(new CustomAssignerWithPeriodicWatermarks());
+val stream = env
+	.addSource(consumer)
+	.print();
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+consumer = FlinkKinesisConsumer(
+    "kinesis_stream_name",
+    SimpleStringSchema(),
+    consumer_config)
+stream = env.add_source(consumer).print()
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+在内部,每个分片/消费者线程执行一个分配器器实例(请参阅下面的线程模型)。
+当指定分配器时,对于从 Kinesis 读取的每个记录, extractTimestamp(T element, long previousElementTimestamp)
+被调用来为记录分配时间戳,并调用 getCurrentWatermark 来确定分片的新水印。
+然后,消费者子任务的水印被确定为其所有分片的最小水印,并定期发出。
+每个分片的水印对于处理分片之间的不同消费速度至关重要,否则可能导致
+解决依赖水印的下游逻辑问题,例如错误的延迟数据丢弃。
+
+默认情况下,如果分片不提供新记录,水印将暂停。
+`ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS`  属性通过超时允许水印在存在空闲分片的情况下继续。
+
+### 分片消费者的事件时间对齐
+
+Flink Kinesis Consumer 可选地支持并行消费者子任务(及其线程)之间的同步
+避免事件时间偏离,相关的问题描述在 [Event time synchronization across sources](https://issues.apache.org/jira/browse/FLINK-10886) 。
+
+要启用同步,请在消费者上设置水印跟踪器:
+
+{{< tabs "8fbaf5cb-3b76-4c62-a74e-db51b60f6601" >}}
+{{< tab "Java" >}}
+```java
+JobManagerWatermarkTracker watermarkTracker =
+    new JobManagerWatermarkTracker("myKinesisSource");
+consumer.setWatermarkTracker(watermarkTracker);
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+watermark_tracker = WatermarkTracker.job_manager_watermark_tracker("myKinesisSource")
+consumer.set_watermark_tracker(watermark_tracker)
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+`JobManagerWatermarkTracker` 使用全局聚合来同步每个子任务的水印。每个子任务
+使用每个分片队列来控制记录的传输速率,该速率基于水印队列中的下一条记录在全局队列的领先程度。
+
+"提前发送" 限制通过 `ConsumerConfigConstants.WATERMARK_LOOKAHEAD_MILLIS` 配置。较小的值减少
+倾斜和吞吐量。较大的值允许子任务在等待全局水印推进前继续运行。
+
+吞吐量方程的另一个可变因素是跟踪器传播水印的频率。可以通过 `ConsumerConfigConstants.WATERMARK_SYNC_MILLIS` 
+配置间隔。较小的值会减少发送等待,并以增加与作业管理器的通信为代价。
+
+由于发生倾斜时记录会在队列中累积,因此需要增加内存消耗。内存消耗增加的多少取决于平均记录大小。对于较大的尺寸,
+可能需要通过 `ConsumerConfigConstants.WATERMARK_SYNC_QUEUE_CAPACITY` 调整发射队列容量。
+
+### 线程模型
+
+Flink Kinesis Consumer 使用多个线程进行分片发现和数据消费。
+
+#### 分片发现
+
+对于分片发现,每个并行消费者子任务都将有一个线程,该线程不断查询 Kinesis 获取分片信息
+即使在消费者启动时,子任务最初没有可读取的分片。换句话说,如果消费者以10的并行度运行,
+不管订阅流的分片总的数量多少,总共会有10个线程不断查询 Kinesis。
+
+#### 轮询(默认)记录发布者
+
+对于 `POLLING` 数据消费,将创建一个线程来消费每个发现的分片。线程将在它负责消费的分片
+由于流重分片而关闭。换句话说,每个开放的分片总会有一个线程消费。
+
+#### 增强的扇出记录发布者
+
+对于 `EFO` 数据消费,线程模型与 `POLLING` 相同,需要额外的线程池处理和 Kinesis 的异步通信。
+AWS SDK v2.x `KinesisAsyncClient` 为 Netty 使用附加的线程池处理 IO 和异步响应。每个并行消费
+子任务都有自己的 `KinesisAsyncClient` 实例。换句话说,如果消费者以 10 的并行度运行,那么总共
+将有 10 个`KinesisAsyncClient` 实例。
+在注册和注销流消费者时,一个单独的客户端将会被创建和随后销毁。
+
+### 内部使用的 Kinesis APIs
+
+Flink Kinesis Consumer 内部使用 [AWS Java SDK](http://aws.amazon.com/sdk-for-java/) 调用 Kinesis APIs
+用于分片和数据消费。由于亚马逊在 API 的 [service limits for Kinesis Streams](http://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html)
+消费者将与用户正在运行的其他非 Flink 消费应用程序竞争。
+下面是消费者调用的 API 列表,其中描述了消费者如何使用 API 以及信息关于如何处理 Flink Kinesis Consumer 因这些服务限制而可能出现的任何错误或警告。
+
+#### Shard Discovery
+
+- *[ListShards](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListShards.html)*: 通常为每个并
+行消费子任务中的单个线程调用来发现由于流重分片而产生的任何新分片。默认情况下,消费者每隔10秒执行分片发现,并
+将无限期重试,直到得到来自 Kinesis 的结果。如果这会干扰其他非 Flink 消费应用程序,则用户可以在提供的配置属性
+设置 `ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS` 的值调用此 API 减慢消费。这把发现的间隔设置为
+了不同的值。请注意,此设置直接影响了发现一个新的分片开启消费的最大延迟,因为在间隔期间不会发现分片。
+
+#### 轮询(默认)记录发布者
+
+- *[GetShardIterator](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html)*: 这
+仅在每个分片消费线程启动时调用一次,如果 Kinesis 抱怨 API 已超过,默认情况下最多尝试3次。注意,由于此API的速率
+是按每个分片(而不是每个流)限制,消费者本身不应超过限制。通常,如果发生这种情况,用户可以尝试减慢任何其它调用
+此 API 的非 Flink 消费应用程序的速度,或通过在提供的配置属性设置以 `ConsumerConfigConstants.SHARD_GETITERATOR_*`
+为前缀的键以修改消费者对此API调用的重试行为。
+
+- *[GetRecords](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html)*: 通常为每个分片线
+程调用从 Kinesis 中获取记录。当一个分片有多个并发消费者时(当如果任何其他非Flink消费应用程序正在运行),则可能会
+超过每个分片的速率限制。默认情况下,每次 API 调用时,如果 Kinesis 投诉 API 的数据大小/事务限制已超过限制,消费者
+将重试,默认最多重试3次。用户可以尝试减慢其它非 Flink 消费应用程序的速度,也可以通过在提供的配置属性设置
+`ConsumerConfigConstants.SHARD_GETRECORDS_MAX` 和 `ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS` 的
+键来调整消费者的吞吐量。设置前者调整每个消费线程在每次调用时尝试从分片中获取的最大记录数(默认值为10000),同时
+设置后者修改每次获取之间的睡眠间隔的睡眠间隔时间(默认值为200)。调用此 API 的消费者的重试行为也可以通过使用
+`ConsumerConfigConstants.SHARD_GETRECORDS_*` 为前缀的其它键进行修改。
+
+#### 增强的扇出记录发布者
+
+- *[SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html)*: 通常为
+每个分片消费线程调用来获得分片订阅。分片订阅通常在5分钟内处于活动状态,但如果抛出任何可恢复的错误,则需要重新订阅。一
+旦获得订阅,消费将收到流 [SubscribeToShardEvents](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShardEvent.html)s 。
+重试和补偿参数可以使用 `ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_*` 键配置。
+
+- *[DescribeStreamSummary](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStreamSummary.html)*: 
+通常为每个流调用在流消费者注册时调用一次。默认情况下,`LAZY` 注册策略按作业并行度放大调用数。`EAGER` 注册策略按每个流
+调用一次。`NONE` 注册策略不调用。重试和补偿参数可以使用 `ConsumerConfigConstants.STREAM_DESCRIBE_*` 键进行配置。
+
+- *[DescribeStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStreamConsumer.html)*:
+流消费者注册和注销时调用。对于每个流,将定期调用此服务直到流消费者在注册/注销收到报告 `ACTIVE`/`not found`。默认情况下,
+`LAZY` 注册策略按作业并行度放大调用数。`EAGER` 注册策略按每个流只调用一次此服务用于注册。`NONE` 注册策略不调用次服务。
+重试和补偿参数可以使用 `ConsumerConfigConstants.DESCRIBE_STREAM_CONSUMER_*` 键进行配置。
+
+- *[RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html)*: 
+每个流消费者注册时调用一次,除非配置了 `NONE` 注册策略。重试和补偿参数可以使用 
+`ConsumerConfigConstants.REGISTER_STREAM_*` 键进行配置。
+
+- *[DeregisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DeregisterStreamConsumer.html)*: 
+每个流消费者注销时调用一次,除非配置了 `NONE` 或者 `EAGER` 注册策略。重试和补偿参数可以使用 
+`ConsumerConfigConstants.DEREGISTER_STREAM_*` 键进行配置。
+
+## Kinesis 流接收器
+Kinesis Streams 接收器 (此后简称 "Kinesis 接收器") 使用 [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html) 
+从 Flink 流数据写入数据到 Kinesis 流。
+
+写数据到 Kinesis 流,确保流在 Amazon Kinesis Data Stream 控制台标记为 "ACTIVE"。
+
+为了让监控工作,访问流的用户需要访问 CloudWatch 服务。
+
+{{< tabs "6df3b696-c2ca-4f44-bea0-96cf8275d61c" >}}
+{{< tab "Java" >}}
+```java
+Properties sinkProperties = new Properties();
+// Required
+sinkProperties.put(AWSConfigConstants.AWS_REGION, "us-east-1");
+
+// Optional, provide via alternative routes e.g. environment variables
+sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
+sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
+
+KinesisStreamsSink<String> kdsSink =
+    KinesisStreamsSink.<String>builder()
+        .setKinesisClientProperties(sinkProperties)                               // Required
+        .setSerializationSchema(new SimpleStringSchema())                         // Required
+        .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode()))  // Required
+        .setStreamName("your-stream-name")                                        // Required
+        .setFailOnError(false)                                                    // Optional
+        .setMaxBatchSize(500)                                                     // Optional
+        .setMaxInFlightRequests(50)                                               // Optional
+        .setMaxBufferedRequests(10_000)                                           // Optional
+        .setMaxBatchSizeInBytes(5 * 1024 * 1024)                                  // Optional
+        .setMaxTimeInBufferMS(5000)                                               // Optional
+        .setMaxRecordSizeInBytes(1 * 1024 * 1024)                                 // Optional
+        .build();
+
+DataStream<String> simpleStringStream = ...;
+simpleStringStream.sinkTo(kdsSink);
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val sinkProperties = new Properties()
+// Required
+sinkProperties.put(AWSConfigConstants.AWS_REGION, "us-east-1")
+
+// Optional, provide via alternative routes e.g. environment variables
+sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
+sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key")
+
+val kdsSink = KinesisStreamsSink.<String>builder()
+    .setKinesisClientProperties(sinkProperties)                               // Required
+    .setSerializationSchema(new SimpleStringSchema())                         // Required
+    .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode()))  // Required
+    .setStreamName("your-stream-name")                                        // Required
+    .setFailOnError(false)                                                    // Optional
+    .setMaxBatchSize(500)                                                     // Optional
+    .setMaxInFlightRequests(50)                                               // Optional
+    .setMaxBufferedRequests(10000)                                            // Optional
+    .setMaxBatchSizeInBytes(5 * 1024 * 1024)                                  // Optional
+    .setMaxTimeInBufferMS(5000)                                               // Optional
+    .setMaxRecordSizeInBytes(1 * 1024 * 1024)                                 // Optional
+    .build()
+
+val simpleStringStream = ...
+simpleStringStream.sinkTo(kdsSink)
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+# Required
+sink_properties = {
+    # Required
+    'aws.region': 'us-east-1',
+    # Optional, provide via alternative routes e.g. environment variables
+    'aws.credentials.provider.basic.accesskeyid': 'aws_access_key_id',
+    'aws.credentials.provider.basic.secretkey': 'aws_secret_access_key',
+    'aws.endpoint': 'http://localhost:4567'
+}
+
+kds_sink = KinesisStreamsSink.builder() \
+    .set_kinesis_client_properties(sink_properties) \                      # Required
+    .set_serialization_schema(SimpleStringSchema()) \                      # Required
+    .set_partition_key_generator(PartitionKeyGenerator.fixed()) \          # Required
+    .set_stream_name("your-stream-name") \                                 # Required
+    .set_fail_on_error(False) \                                            # Optional
+    .set_max_batch_size(500) \                                             # Optional
+    .set_max_in_flight_requests(50) \                                      # Optional
+    .set_max_buffered_requests(10000) \                                    # Optional
+    .set_max_batch_size_in_bytes(5 * 1024 * 1024) \                        # Optional
+    .set_max_time_in_buffer_ms(5000) \                                     # Optional
+    .set_max_record_size_in_bytes(1 * 1024 * 1024) \                       # Optional
+    .build()
+
+simple_string_stream = ...
+simple_string_stream.sink_to(kds_sink)
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+上面是使用Kinesis 接收器的一个简单例子。从创建一个配置了 `AWS_REGION`, `AWS_ACCESS_KEY_ID` 和 `AWS_SECRET_ACCESS_KEY``java.util.Properties` 的 `java.util.Properties` 实例开始。
+然后可以使用构建器构造接收器。可选配置的默认值如上所示。其中一些值是根据 [configuration on KDS](https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html) 设置的。
+
+您将始终需要指定您的序列化模式和逻辑用于从一个记录中生成 [partition key](https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html#partition-key) 。
+
+由于多种原因,Kinesis 数据流可能无法持久化请求中的部分或全部记录。如果 `failOnError` 发生,则会引发运行时异常。否则,这些记录将在缓冲区中重新入队列以供重试。
+
+Kinesis Sink 通过 Flink 的 [metrics system]({{< ref "docs/ops/metrics" >}}) 提供了一些指标分析连接器的行为。所有公开指标的列表可在 [here]({{<ref "docs/ops/metrics#kinesis-sink">}}) 找到。
+
+根据 Kinesis Data Streams 的最大值,接收器默认最大记录大小为1MB,最大批量大小为5MB 。可以找到详细说明这些最大值的 AWS 文档 [here](https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html) 。
+
+### Kinesis 接收器和容错
+
+接收器设计用于参与 Flink 的检查点,以提供至少一次处理的保证。它通过在执行检查点的同时完成任何等待的请求来实现这一点。这有效地确保了在继续处理更多记录之前,在检查点之前触发的所有请求都已成功传送到 Kinesis Data Streams 。
+
+如果Flink需要从检查点(或保存点)还原,则自该检查点以来写入的数据将再次写入 Kinesis,从而导致流中出现重复。此外,接收器在内部使用 `PutRecords` API 调用,这并不保证维持事件顺序。
+
+### 背压
+
+当接收器缓冲区填满并写入接收器时,接收器中的背压会增加开始表现出阻塞行为。有关 Kinesis Data Streams 的速率限制的更多信息,可以在
+[Quotas and Limits](https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html) 找到。
+
+通常通过增加内部队列的大小来减少背压:
+
+{{< tabs "6df3b696-c2ca-4f44-bea0-96cf8275d61d" >}}
+{{< tab "Java" >}}
+```java
+```
+KinesisStreamsSink<String> kdsSink =
+    KinesisStreamsSink.<String>builder()
+        ...
+        .setMaxBufferedRequests(10_000)
+        ...
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+kds_sink = KinesisStreamsSink.builder() \
+    .set_max_buffered_requests(10000) \
+    .build()
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+## Kinesis 生产者
+{{< hint warning >}}
+旧的 Kinesis 接收器 `org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer` 已弃用,可能会随 Flink 的未来版本一起删除, 请用 [Kinesis Sink]({{<ref "docs/connectors/datastream/kinesis#kinesis-streams-sink">}}) 代替。
+{{< /hint >}}
+
+新的接收器使用 [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html) 然而老的接收器使用 Kinesis Producer Library. 因此,新的接收器不支持 [aggregation](https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation) 。
+
+## 使用自定义 Kinesis 端点
+
+有时,需要让 Flink 作为源或接收器,针对 Kinesis VPC 端点或非 AWS 进行操作Kinesis 端点,例如 [Kinesalite](https://github.com/mhart/kinesalite); 这在执行 Flink
+应用程序时特别有用。通常由 Flink 配置中设置的 AWS 区域集推断的 AWS 端点必须通过配置属性覆盖。
+
+要覆盖 AWS 端点,设置 `AWSConfigConstants.AWS_ENDPOINT` 和 `AWSConfigConstants.AWS_REGION` 属性。该区域将用于对端点 URL 进行签名。
+
+{{< tabs "bcadd466-8416-4d3c-a6a7-c46eee0cbd4a" >}}
+{{< tab "Java" >}}
+```java
+Properties config = new Properties();
+config.put(AWSConfigConstants.AWS_REGION, "us-east-1");
+config.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
+config.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
+config.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567");
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val config = new Properties()
+config.put(AWSConfigConstants.AWS_REGION, "us-east-1")
+config.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
+config.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key")
+config.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567")
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+config = {
+    'aws.region': 'us-east-1',
+    'aws.credentials.provider.basic.accesskeyid': 'aws_access_key_id',
+    'aws.credentials.provider.basic.secretkey': 'aws_secret_access_key',
+    'aws.endpoint': 'http://localhost:4567'
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+{{< top >}}
diff --git a/docs/content.zh/docs/connectors/table/dynamodb.md b/docs/content.zh/docs/connectors/table/dynamodb.md
index 4441e02..847f62f 100644
--- a/docs/content.zh/docs/connectors/table/dynamodb.md
+++ b/docs/content.zh/docs/connectors/table/dynamodb.md
@@ -35,7 +35,7 @@
 Dependencies
 ------------
 
-{{< sql_connector_download_table "dynamodb" 3.0.0 >}}
+{{< sql_connector_download_table dynamodb 4.0.0 >}}
 
 How to create a DynamoDB table
 -----------------------------------------
diff --git a/docs/content.zh/docs/connectors/table/firehose.md b/docs/content.zh/docs/connectors/table/firehose.md
new file mode 100644
index 0000000..b7cf694
--- /dev/null
+++ b/docs/content.zh/docs/connectors/table/firehose.md
@@ -0,0 +1,313 @@
+---
+title: Firehose
+weight: 5
+type: docs
+aliases:
+- /zh/dev/table/connectors/firehose.html
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Amazon Kinesis Data Firehose SQL Connector
+
+{{< label "Sink: Batch" >}}
+{{< label "Sink: Streaming Append Mode" >}}
+
+The Kinesis Data Firehose connector allows for writing data into [Amazon Kinesis Data Firehose (KDF)](https://aws.amazon.com/kinesis/data-firehose/).
+
+Dependencies
+------------
+
+{{< sql_connector_download_table firehose 4.0.0 >}}
+
+How to create a Kinesis Data Firehose table
+-----------------------------------------
+
+Follow the instructions from the [Amazon Kinesis Data Firehose Developer Guide](https://docs.aws.amazon.com/ses/latest/dg/event-publishing-kinesis-analytics-firehose-stream.html) to set up a Kinesis Data Firehose delivery stream.
+The following example shows how to create a table backed by a Kinesis Data Firehose delivery stream with minimum required options:
+
+```sql
+CREATE TABLE FirehoseTable (
+  `user_id` BIGINT,
+  `item_id` BIGINT,
+  `category_id` BIGINT,
+  `behavior` STRING
+)
+WITH (
+  'connector' = 'firehose',
+  'delivery-stream' = 'user_behavior',
+  'aws.region' = 'us-east-2',
+  'format' = 'csv'
+);
+```
+
+Connector Options
+-----------------
+
+<table class="table table-bordered">
+    <thead>
+    <tr>
+      <th class="text-left" style="width: 25%">Option</th>
+      <th class="text-center" style="width: 8%">Required</th>
+      <th class="text-center" style="width: 7%">Default</th>
+      <th class="text-center" style="width: 10%">Type</th>
+      <th class="text-center" style="width: 50%">Description</th>
+    </tr>
+    <tr>
+      <th colspan="5" class="text-left" style="width: 100%">Common Options</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>connector</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Specify what connector to use. For Kinesis Data Firehose use <code>'firehose'</code>.</td>
+    </tr>
+    <tr>
+      <td><h5>delivery-stream</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Name of the Kinesis Data Firehose delivery stream backing this table.</td>
+    </tr>
+    <tr>
+      <td><h5>format</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The format used to deserialize and serialize Kinesis Data Firehose records. See <a href="#data-type-mapping">Data Type Mapping</a> for details.</td>
+    </tr>
+    <tr>
+      <td><h5>aws.region</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The AWS region where the delivery stream is defined. This option is required for <code>KinesisFirehoseSink</code> creation.</td>
+    </tr>
+    <tr>
+      <td><h5>aws.endpoint</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The AWS endpoint for Amazon Kinesis Data Firehose.</td>
+    </tr>
+    <tr>
+      <td><h5>aws.trust.all.certificates</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>If true accepts all SSL certificates.</td>
+    </tr>
+    </tbody>
+    <thead>
+    <tr>
+      <th colspan="5" class="text-left" style="width: 100%">Authentication Options</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>aws.credentials.provider</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">AUTO</td>
+      <td>String</td>
+      <td>A credentials provider to use when authenticating against the Kinesis endpoint. See <a href="#authentication">Authentication</a> for details.</td>
+    </tr>
+    <tr>
+   <td><h5>aws.credentials.basic.accesskeyid</h5></td>
+   <td>optional</td>
+   <td style="word-wrap: break-word;">(none)</td>
+   <td>String</td>
+   <td>The AWS access key ID to use when setting credentials provider type to BASIC.</td>
+    </tr>
+    <tr>
+   <td><h5>aws.credentials.basic.secretkey</h5></td>
+   <td>optional</td>
+   <td style="word-wrap: break-word;">(none)</td>
+   <td>String</td>
+   <td>The AWS secret key to use when setting credentials provider type to BASIC.</td>
+    </tr>
+    <tr>
+   <td><h5>aws.credentials.profile.path</h5></td>
+   <td>optional</td>
+   <td style="word-wrap: break-word;">(none)</td>
+   <td>String</td>
+   <td>Optional configuration for profile path if credential provider type is set to be PROFILE.</td>
+    </tr>
+    <tr>
+   <td><h5>aws.credentials.profile.name</h5></td>
+   <td>optional</td>
+   <td style="word-wrap: break-word;">(none)</td>
+   <td>String</td>
+   <td>Optional configuration for profile name if credential provider type is set to be PROFILE.</td>
+    </tr>
+    <tr>
+   <td><h5>aws.credentials.role.arn</h5></td>
+   <td>optional</td>
+   <td style="word-wrap: break-word;">(none)</td>
+   <td>String</td>
+   <td>The role ARN to use when credential provider type is set to ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td>
+    </tr>
+    <tr>
+   <td><h5>aws.credentials.role.sessionName</h5></td>
+   <td>optional</td>
+   <td style="word-wrap: break-word;">(none)</td>
+   <td>String</td>
+   <td>The role session name to use when credential provider type is set to ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td>
+    </tr>
+    <tr>
+   <td><h5>aws.credentials.role.externalId</h5></td>
+   <td>optional</td>
+   <td style="word-wrap: break-word;">(none)</td>
+   <td>String</td>
+   <td>The external ID to use when credential provider type is set to ASSUME_ROLE.</td>
+    </tr>
+    <tr>
+   <td><h5>aws.credentials.role.provider</h5></td>
+   <td>optional</td>
+   <td style="word-wrap: break-word;">(none)</td>
+   <td>String</td>
+   <td>The credentials provider that provides credentials for assuming the role when credential provider type is set to ASSUME_ROLE. Roles can be nested, so this value can again be set to ASSUME_ROLE</td>
+    </tr>
+    <tr>
+   <td><h5>aws.credentials.webIdentityToken.file</h5></td>
+   <td>optional</td>
+   <td style="word-wrap: break-word;">(none)</td>
+   <td>String</td>
+   <td>The absolute path to the web identity token file that should be used if provider type is set to WEB_IDENTITY_TOKEN.</td>
+    </tr>
+    </tbody>
+    <thead>
+    <tr>
+      <th colspan="5" class="text-left" style="width: 100%">Sink Options</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>sink.http-client.max-concurrency</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">10000</td>
+      <td>Integer</td>
+      <td>
+      Maximum number of allowed concurrent requests by <code>FirehoseAsyncClient</code> to be delivered to delivery stream.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.http-client.read-timeout</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">360000</td>
+      <td>Integer</td>
+      <td>
+        Maximum amount of time in ms for requests to be sent by <code>FirehoseAsyncClient</code> to delivery stream before failure.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.http-client.protocol.version</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">HTTP2</td>
+      <td>String</td>
+      <td>Http version used by <code>FirehoseAsyncClient</code>.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.batch.max-size</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">500</td>
+      <td>Integer</td>
+      <td>Maximum batch size of elements to be passed to <code>FirehoseAsyncClient</code> to be written downstream to delivery stream.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.requests.max-inflight</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">16</td>
+      <td>Integer</td>
+      <td>Request threshold for uncompleted requests by <code>FirehoseAsyncClient</code>before blocking new write requests.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.requests.max-buffered</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">10000</td>
+      <td>String</td>
+      <td>request buffer threshold by <code>FirehoseAsyncClient</code> before blocking new write requests.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.flush-buffer.size</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">5242880</td>
+      <td>Long</td>
+      <td>Threshold value in bytes for writer buffer in <code>FirehoseAsyncClient</code> before flushing.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.flush-buffer.timeout</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">5000</td>
+      <td>Long</td>
+      <td>Threshold time in ms for an element to be in a buffer of <code>FirehoseAsyncClient</code> before flushing.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.fail-on-error</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>Flag used for retrying failed requests. If set any request failure will not be retried and will fail the job.</td>
+    </tr>
+    </tbody>
+</table>
+
+## Authorization
+
+Make sure to [create an appropriate IAM policy](https://docs.aws.amazon.com/firehose/latest/dev/controlling-access.html) to allow reading writing to the Kinesis Data Firehose delivery stream.
+
+## Authentication
+
+Depending on your deployment you would choose a different Credentials Provider to allow access to Kinesis Data Firehose.
+By default, the `AUTO` Credentials Provider is used.
+If the access key ID and secret key are set in the deployment configuration, this results in using the `BASIC` provider.
+
+A specific [AWSCredentialsProvider](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/index.html?com/amazonaws/auth/AWSCredentialsProvider.html) can be **optionally** set using the `aws.credentials.provider` setting.
+Supported values are:
+
+- `AUTO` - Use the default AWS Credentials Provider chain that searches for credentials in the following order: `ENV_VARS`, `SYS_PROPS`, `WEB_IDENTITY_TOKEN`, `PROFILE`, and EC2/ECS credentials provider.
+- `BASIC` - Use access key ID and secret key supplied as configuration.
+- `ENV_VAR` - Use `AWS_ACCESS_KEY_ID` & `AWS_SECRET_ACCESS_KEY` environment variables.
+- `SYS_PROP` - Use Java system properties `aws.accessKeyId` and `aws.secretKey`.
+- `PROFILE` - Use an AWS credentials profile to create the AWS credentials.
+- `ASSUME_ROLE` - Create AWS credentials by assuming a role. The credentials for assuming the role must be supplied.
+- `WEB_IDENTITY_TOKEN` - Create AWS credentials by assuming a role using Web Identity Token.
+
+## Data Type Mapping
+
+Kinesis Data Firehose stores records as Base64-encoded binary data objects, so it doesn't have a notion of internal record structure.
+Instead, Kinesis Data Firehose records are deserialized and serialized by formats, e.g. 'avro', 'csv', or 'json'.
+To determine the data type of the messages in your Kinesis Data Firehose backed tables, pick a suitable Flink format with the `format` keyword.
+Please refer to the [Formats]({{< ref "docs/connectors/table/formats/overview" >}}) pages for more details.
+
+## Notice
+
+The current implementation for the Kinesis Data Firehose SQL connector only supports Kinesis Data Firehose backed sinks and doesn't provide an implementation for source queries.
+Queries similar to:
+```sql
+SELECT * FROM FirehoseTable;
+```
+should result in an error similar to
+```
+Connector firehose can only be used as a sink. It cannot be used as a source.
+```
+{{< top >}}
diff --git a/docs/content.zh/docs/connectors/table/kinesis.md b/docs/content.zh/docs/connectors/table/kinesis.md
new file mode 100644
index 0000000..4f4232e
--- /dev/null
+++ b/docs/content.zh/docs/connectors/table/kinesis.md
@@ -0,0 +1,915 @@
+---
+title: Kinesis
+weight: 5
+type: docs
+aliases:
+- /zh/dev/table/connectors/kinesis.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Amazon Kinesis Data Streams SQL Connector
+
+{{< label "Scan Source: Unbounded" >}}
+{{< label "Sink: Batch" >}}
+{{< label "Sink: Streaming Append Mode" >}}
+
+The Kinesis connector allows for reading data from and writing data into [Amazon Kinesis Data Streams (KDS)](https://aws.amazon.com/kinesis/data-streams/).
+
+Dependencies
+------------
+
+{{< sql_connector_download_table kinesis 4.0.0 >}}
+
+Kinesis 连接器目前并不包含在 Flink 的二进制发行版中,请查阅[这里]({{< ref "docs/dev/configuration/overview" >}})了解如何在集群运行中引用 Kinesis 连接器。
+
+How to create a Kinesis data stream table
+-----------------------------------------
+
+Follow the instructions from the [Amazon KDS Developer Guide](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html) to set up a Kinesis stream.
+The following example shows how to create a table backed by a Kinesis data stream:
+
+```sql
+CREATE TABLE KinesisTable (
+  `user_id` BIGINT,
+  `item_id` BIGINT,
+  `category_id` BIGINT,
+  `behavior` STRING,
+  `ts` TIMESTAMP(3)
+)
+PARTITIONED BY (user_id, item_id)
+WITH (
+  'connector' = 'kinesis',
+  'stream' = 'user_behavior',
+  'aws.region' = 'us-east-2',
+  'scan.stream.initpos' = 'LATEST',
+  'format' = 'csv'
+);
+```
+
+Available Metadata
+------------------
+
+The following metadata can be exposed as read-only (`VIRTUAL`) columns in a table definition.
+
+<table class="table table-bordered">
+    <thead>
+    <tr>
+      <th class="text-left" style="width: 25%">Key</th>
+      <th class="text-center" style="width: 45%">Data Type</th>
+      <th class="text-center" style="width: 35%">Description</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><code><a href="https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Record.html#Streams-Type-Record-ApproximateArrivalTimestamp">timestamp</a></code></td>
+      <td><code>TIMESTAMP_LTZ(3) NOT NULL</code></td>
+      <td>The approximate time when the record was inserted into the stream.</td>
+    </tr>
+    <tr>
+      <td><code><a href="https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Shard.html#Streams-Type-Shard-ShardId">shard-id</a></code></td>
+      <td><code>VARCHAR(128) NOT NULL</code></td>
+      <td>The unique identifier of the shard within the stream from which the record was read.</td>
+    </tr>
+    <tr>
+      <td><code><a href="https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Record.html#Streams-Type-Record-SequenceNumber">sequence-number</a></code></td>
+      <td><code>VARCHAR(128) NOT NULL</code></td>
+      <td>The unique identifier of the record within its shard.</td>
+    </tr>
+    </tbody>
+</table>
+
+The extended `CREATE TABLE` example demonstrates the syntax for exposing these metadata fields:
+
+```sql
+CREATE TABLE KinesisTable (
+  `user_id` BIGINT,
+  `item_id` BIGINT,
+  `category_id` BIGINT,
+  `behavior` STRING,
+  `ts` TIMESTAMP(3),
+  `arrival_time` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL,
+  `shard_id` VARCHAR(128) NOT NULL METADATA FROM 'shard-id' VIRTUAL,
+  `sequence_number` VARCHAR(128) NOT NULL METADATA FROM 'sequence-number' VIRTUAL
+)
+PARTITIONED BY (user_id, item_id)
+WITH (
+  'connector' = 'kinesis',
+  'stream' = 'user_behavior',
+  'aws.region' = 'us-east-2',
+  'scan.stream.initpos' = 'LATEST',
+  'format' = 'csv'
+);
+```
+
+
+Connector Options
+-----------------
+
+<table class="table table-bordered">
+    <thead>
+    <tr>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-center" style="width: 8%">Required</th>
+        <th class="text-center" style="width: 8%">Forwarded</th>
+        <th class="text-center" style="width: 7%">Default</th>
+        <th class="text-center" style="width: 10%">Type</th>
+        <th class="text-center" style="width: 42%">Description</th>
+    </tr>
+    <tr>
+      <th colspan="6" class="text-left" style="width: 100%">Common Options</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>connector</h5></td>
+      <td>required</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Specify what connector to use. For Kinesis use <code>'kinesis'</code>.</td>
+    </tr>
+    <tr>
+      <td><h5>stream</h5></td>
+      <td>required</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Name of the Kinesis data stream backing this table.</td>
+    </tr>
+    <tr>
+      <td><h5>format</h5></td>
+      <td>required</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The format used to deserialize and serialize Kinesis data stream records. See <a href="#data-type-mapping">Data Type Mapping</a> for details.</td>
+    </tr>
+    <tr>
+      <td><h5>aws.region</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The AWS region where the stream is defined. Either this or <code>aws.endpoint</code> are required.</td>
+    </tr>
+    <tr>
+      <td><h5>aws.endpoint</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The AWS endpoint for Kinesis (derived from the AWS region setting if not set). Either this or <code>aws.region</code> are required.</td>
+    </tr>
+    <tr>
+      <td><h5>aws.trust.all.certificates</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>If true accepts all SSL certificates.</td>
+    </tr>
+    </tbody>
+    <thead>
+    <tr>
+      <th colspan="6" class="text-left" style="width: 100%">Authentication Options</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>aws.credentials.provider</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">AUTO</td>
+      <td>String</td>
+      <td>A credentials provider to use when authenticating against the Kinesis endpoint. See <a href="#authentication">Authentication</a> for details.</td>
+    </tr>
+    <tr>
+	  <td><h5>aws.credentials.basic.accesskeyid</h5></td>
+	  <td>optional</td>
+      <td>no</td>
+	  <td style="word-wrap: break-word;">(none)</td>
+	  <td>String</td>
+	  <td>The AWS access key ID to use when setting credentials provider type to BASIC.</td>
+    </tr>
+    <tr>
+	  <td><h5>aws.credentials.basic.secretkey</h5></td>
+	  <td>optional</td>
+      <td>no</td>
+	  <td style="word-wrap: break-word;">(none)</td>
+	  <td>String</td>
+	  <td>The AWS secret key to use when setting credentials provider type to BASIC.</td>
+    </tr>
+    <tr>
+	  <td><h5>aws.credentials.profile.path</h5></td>
+	  <td>optional</td>
+      <td>no</td>
+	  <td style="word-wrap: break-word;">(none)</td>
+	  <td>String</td>
+	  <td>Optional configuration for profile path if credential provider type is set to be PROFILE.</td>
+    </tr>
+    <tr>
+	  <td><h5>aws.credentials.profile.name</h5></td>
+	  <td>optional</td>
+      <td>no</td>
+	  <td style="word-wrap: break-word;">(none)</td>
+	  <td>String</td>
+	  <td>Optional configuration for profile name if credential provider type is set to be PROFILE.</td>
+    </tr>
+    <tr>
+	  <td><h5>aws.credentials.role.arn</h5></td>
+	  <td>optional</td>
+      <td>no</td>
+	  <td style="word-wrap: break-word;">(none)</td>
+	  <td>String</td>
+	  <td>The role ARN to use when credential provider type is set to ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td>
+    </tr>
+    <tr>
+	  <td><h5>aws.credentials.role.sessionName</h5></td>
+	  <td>optional</td>
+      <td>no</td>
+	  <td style="word-wrap: break-word;">(none)</td>
+	  <td>String</td>
+	  <td>The role session name to use when credential provider type is set to ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td>
+    </tr>
+    <tr>
+	  <td><h5>aws.credentials.role.externalId</h5></td>
+	  <td>optional</td>
+      <td>no</td>
+	  <td style="word-wrap: break-word;">(none)</td>
+	  <td>String</td>
+	  <td>The external ID to use when credential provider type is set to ASSUME_ROLE.</td>
+    </tr>
+    <tr>
+	  <td><h5>aws.credentials.role.provider</h5></td>
+	  <td>optional</td>
+      <td>no</td>
+	  <td style="word-wrap: break-word;">(none)</td>
+	  <td>String</td>
+	  <td>The credentials provider that provides credentials for assuming the role when credential provider type is set to ASSUME_ROLE. Roles can be nested, so this value can again be set to ASSUME_ROLE</td>
+    </tr>
+    <tr>
+	  <td><h5>aws.credentials.webIdentityToken.file</h5></td>
+	  <td>optional</td>
+      <td>no</td>
+	  <td style="word-wrap: break-word;">(none)</td>
+	  <td>String</td>
+	  <td>The absolute path to the web identity token file that should be used if provider type is set to WEB_IDENTITY_TOKEN.</td>
+    </tr>
+    </tbody>
+    <thead>
+    <tr>
+      <th colspan="6" class="text-left" style="width: 100%">Source Options</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>scan.stream.initpos</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">LATEST</td>
+      <td>String</td>
+      <td>Initial position to be used when reading from the table. See <a href="#start-reading-position">Start Reading Position</a> for details.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.initpos-timestamp</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The initial timestamp to start reading Kinesis stream from (when <code>scan.stream.initpos</code> is AT_TIMESTAMP). See <a href="#start-reading-position">Start Reading Position</a> for details.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.initpos-timestamp-format</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">yyyy-MM-dd'T'HH:mm:ss.SSSXXX</td>
+      <td>String</td>
+      <td>The date format of initial timestamp to start reading Kinesis stream from (when <code>scan.stream.initpos</code> is AT_TIMESTAMP). See <a href="#start-reading-position">Start Reading Position</a> for details.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.recordpublisher</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">POLLING</td>
+      <td>String</td>
+      <td>The <code>RecordPublisher</code> type to use for sources. See <a href="#enhanced-fan-out">Enhanced Fan-Out</a> for details.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.efo.consumername</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The name of the EFO consumer to register with KDS. See <a href="#enhanced-fan-out">Enhanced Fan-Out</a> for details.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.efo.registration</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">LAZY</td>
+      <td>String</td>
+      <td>Determine how and when consumer de-/registration is performed (LAZY|EAGER|NONE). See <a href="#enhanced-fan-out">Enhanced Fan-Out</a> for details.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.efo.consumerarn</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The prefix of consumer ARN for a given stream. See <a href="#enhanced-fan-out">Enhanced Fan-Out</a> for details.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.efo.http-client.max-concurrency</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">10000</td>
+      <td>Integer</td>
+      <td>Maximum number of allowed concurrent requests for the EFO client. See <a href="#enhanced-fan-out">Enhanced Fan-Out</a> for details.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.describe.maxretries</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">50</td>
+      <td>Integer</td>
+      <td>The maximum number of <code>describeStream</code> attempts if we get a recoverable exception.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.describe.backoff.base</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">2000</td>
+      <td>Long</td>
+      <td>The base backoff time (in milliseconds) between each <code>describeStream</code> attempt (for consuming from DynamoDB streams).</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.describe.backoff.max</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">5000</td>
+      <td>Long</td>
+      <td>The maximum backoff time (in milliseconds)  between each <code>describeStream</code> attempt (for consuming from DynamoDB streams).</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.describe.backoff.expconst</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">1.5</td>
+      <td>Double</td>
+      <td>The power constant for exponential backoff between each <code>describeStream</code> attempt (for consuming from DynamoDB streams).</td>
+    </tr>
+    <tr>
+      <td><h5>scan.list.shards.maxretries</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">10</td>
+      <td>Integer</td>
+      <td>The maximum number of <code>listShards</code> attempts if we get a recoverable exception.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.list.shards.backoff.base</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">1000</td>
+      <td>Long</td>
+      <td>The base backoff time (in milliseconds) between each <code>listShards</code> attempt.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.list.shards.backoff.max</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">5000</td>
+      <td>Long</td>
+      <td>The maximum backoff time (in milliseconds) between each <code>listShards</code> attempt.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.list.shards.backoff.expconst</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">1.5</td>
+      <td>Double</td>
+      <td>The power constant for exponential backoff between each <code>listShards</code> attempt.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.describestreamconsumer.maxretries</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">50</td>
+      <td>Integer</td>
+      <td>The maximum number of <code>describeStreamConsumer</code> attempts if we get a recoverable exception.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.describestreamconsumer.backoff.base</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">2000</td>
+      <td>Long</td>
+      <td>The base backoff time (in milliseconds) between each <code>describeStreamConsumer</code> attempt.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.describestreamconsumer.backoff.max</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">5000</td>
+      <td>Long</td>
+      <td>The maximum backoff time (in milliseconds) between each <code>describeStreamConsumer</code> attempt.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.describestreamconsumer.backoff.expconst</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">1.5</td>
+      <td>Double</td>
+      <td>The power constant for exponential backoff between each <code>describeStreamConsumer</code> attempt.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.registerstreamconsumer.maxretries</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">10</td>
+      <td>Integer</td>
+      <td>The maximum number of <code>registerStream</code> attempts if we get a recoverable exception.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.registerstreamconsumer.timeout</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">60</td>
+      <td>Integer</td>
+      <td>The maximum time in seconds to wait for a stream consumer to become active before giving up.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.registerstreamconsumer.backoff.base</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">500</td>
+      <td>Long</td>
+      <td>The base backoff time (in milliseconds) between each <code>registerStream</code> attempt.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.registerstreamconsumer.backoff.max</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">2000</td>
+      <td>Long</td>
+      <td>The maximum backoff time (in milliseconds) between each <code>registerStream</code> attempt.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.registerstreamconsumer.backoff.expconst</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">1.5</td>
+      <td>Double</td>
+      <td>The power constant for exponential backoff between each <code>registerStream</code> attempt.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.deregisterstreamconsumer.maxretries</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">10</td>
+      <td>Integer</td>
+      <td>The maximum number of <code>deregisterStream</code> attempts if we get a recoverable exception.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.deregisterstreamconsumer.timeout</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">60</td>
+      <td>Integer</td>
+      <td>The maximum time in seconds to wait for a stream consumer to deregister before giving up.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.deregisterstreamconsumer.backoff.base</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">500</td>
+      <td>Long</td>
+      <td>The base backoff time (in milliseconds) between each <code>deregisterStream</code> attempt.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.deregisterstreamconsumer.backoff.max</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">2000</td>
+      <td>Long</td>
+      <td>The maximum backoff time (in milliseconds) between each <code>deregisterStream</code> attempt.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.deregisterstreamconsumer.backoff.expconst</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">1.5</td>
+      <td>Double</td>
+      <td>The power constant for exponential backoff between each <code>deregisterStream</code> attempt.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.shard.subscribetoshard.maxretries</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">10</td>
+      <td>Integer</td>
+      <td>The maximum number of <code>subscribeToShard</code> attempts if we get a recoverable exception.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.shard.subscribetoshard.backoff.base</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">1000</td>
+      <td>Long</td>
+      <td>The base backoff time (in milliseconds) between each <code>subscribeToShard</code> attempt.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.shard.subscribetoshard.backoff.max</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">2000</td>
+      <td>Long</td>
+      <td>The maximum backoff time (in milliseconds) between each <code>subscribeToShard</code> attempt.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.shard.subscribetoshard.backoff.expconst</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">1.5</td>
+      <td>Double</td>
+      <td>The power constant for exponential backoff between each <code>subscribeToShard</code> attempt.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.shard.getrecords.maxrecordcount</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">10000</td>
+      <td>Integer</td>
+      <td>The maximum number of records to try to get each time we fetch records from a AWS Kinesis shard.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.shard.getrecords.maxretries</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">3</td>
+      <td>Integer</td>
+      <td>The maximum number of <code>getRecords</code> attempts if we get a recoverable exception.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.shard.getrecords.backoff.base</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">300</td>
+      <td>Long</td>
+      <td>The base backoff time (in milliseconds) between <code>getRecords</code> attempts if we get a ProvisionedThroughputExceededException.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.shard.getrecords.backoff.max</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">1000</td>
+      <td>Long</td>
+      <td>The maximum backoff time (in milliseconds) between <code>getRecords</code> attempts if we get a ProvisionedThroughputExceededException.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.shard.getrecords.backoff.expconst</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">1.5</td>
+      <td>Double</td>
+      <td>The power constant for exponential backoff between each <code>getRecords</code> attempt.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.shard.getrecords.intervalmillis</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">200</td>
+      <td>Long</td>
+      <td>The interval (in milliseconds) between each <code>getRecords</code> request to a AWS Kinesis shard in milliseconds.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.shard.getiterator.maxretries</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">3</td>
+      <td>Integer</td>
+      <td>The maximum number of <code>getShardIterator</code> attempts if we get ProvisionedThroughputExceededException.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.shard.getiterator.backoff.base</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">300</td>
+      <td>Long</td>
+      <td>The base backoff time (in milliseconds) between <code>getShardIterator</code> attempts if we get a ProvisionedThroughputExceededException.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.shard.getiterator.backoff.max</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">1000</td>
+      <td>Long</td>
+      <td>The maximum backoff time (in milliseconds) between <code>getShardIterator</code> attempts if we get a ProvisionedThroughputExceededException.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.shard.getiterator.backoff.expconst</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">1.5</td>
+      <td>Double</td>
+      <td>The power constant for exponential backoff between each <code>getShardIterator</code> attempt.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.shard.discovery.intervalmillis</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">10000</td>
+      <td>Integer</td>
+      <td>The interval between each attempt to discover new shards.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.shard.adaptivereads</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>The config to turn on adaptive reads from a shard. See the <code>AdaptivePollingRecordPublisher</code> documentation for details.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.shard.idle.interval</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">-1</td>
+      <td>Long</td>
+      <td>The interval (in milliseconds) after which to consider a shard idle for purposes of watermark generation. A positive value will allow the watermark to progress even when some shards don't receive new records.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.watermark.sync.interval</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">30000</td>
+      <td>Long</td>
+      <td>The interval (in milliseconds) for periodically synchronizing the shared watermark state.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.watermark.lookahead.millis</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">0</td>
+      <td>Long</td>
+      <td>The maximum delta (in milliseconds) allowed for the reader to advance ahead of the shared global watermark.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.watermark.sync.queue.capacity</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">100</td>
+      <td>Integer</td>
+      <td>The maximum number of records that will be buffered before suspending consumption of a shard.</td>
+    </tr>
+    </tbody>
+    <thead>
+    <tr>
+      <th colspan="6" class="text-left" style="width: 100%">Sink Options</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>sink.partitioner</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">random or row-based</td>
+      <td>String</td>
+      <td>Optional output partitioning from Flink's partitions into Kinesis shards. See <a href="#sink-partitioning">Sink Partitioning</a> for details.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.partitioner-field-delimiter</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">|</td>
+      <td>String</td>
+      <td>Optional field delimiter for a fields-based partitioner derived from a PARTITION BY clause. See <a href="#sink-partitioning">Sink Partitioning</a> for details.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.producer.*</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td></td>
+      <td>
+        Deprecated options previously used by the legacy connector.
+        Options with equivalant alternatives in <code>KinesisStreamsSink</code> are matched 
+        to their respective properties. Unsupported options are logged out to user as warnings.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.http-client.max-concurrency</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">10000</td>
+      <td>Integer</td>
+      <td>
+      Maximum number of allowed concurrent requests by <code>KinesisAsyncClient</code>.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.http-client.read-timeout</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">360000</td>
+      <td>Integer</td>
+      <td>
+        Maximum amount of time in ms for requests to be sent by <code>KinesisAsyncClient</code>.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.http-client.protocol.version</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">HTTP2</td>
+      <td>String</td>
+      <td>Http version used by Kinesis Client.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.batch.max-size</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">500</td>
+      <td>Integer</td>
+      <td>Maximum batch size of elements to be passed to <code>KinesisAsyncClient</code> to be written downstream.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.requests.max-inflight</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">16</td>
+      <td>Integer</td>
+      <td>Request threshold for uncompleted requests by <code>KinesisAsyncClient</code>before blocking new write requests and applying backpressure.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.requests.max-buffered</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">10000</td>
+      <td>String</td>
+      <td>Request buffer threshold for buffered requests by <code>KinesisAsyncClient</code> before blocking new write requests and applying backpressure.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.flush-buffer.size</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">5242880</td>
+      <td>Long</td>
+      <td>Threshold value in bytes for writer buffer in <code>KinesisAsyncClient</code> before flushing.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.flush-buffer.timeout</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">5000</td>
+      <td>Long</td>
+      <td>Threshold time in milliseconds for an element to be in a buffer of<code>KinesisAsyncClient</code> before flushing.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.fail-on-error</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>Flag used for retrying failed requests. If set any request failure will not be retried and will fail the job.</td>
+    </tr>
+    </tbody>
+</table>
+
+Features
+--------
+
+### Authorization
+
+Make sure to [create an appropriate IAM policy](https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html) to allow reading from / writing to the Kinesis data streams.
+
+### Authentication
+
+Depending on your deployment you would choose a different Credentials Provider to allow access to Kinesis.
+By default, the `AUTO` Credentials Provider is used.
+If the access key ID and secret key are set in the deployment configuration, this results in using the `BASIC` provider.
+
+A specific [AWSCredentialsProvider](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/index.html?com/amazonaws/auth/AWSCredentialsProvider.html) can be **optionally** set using the `aws.credentials.provider` setting.
+Supported values are:
+
+* `AUTO` - Use the default AWS Credentials Provider chain that searches for credentials in the following order: `ENV_VARS`, `SYS_PROPS`, `WEB_IDENTITY_TOKEN`, `PROFILE`, and EC2/ECS credentials provider.
+* `BASIC` - Use access key ID and secret key supplied as configuration.
+* `ENV_VAR` - Use `AWS_ACCESS_KEY_ID` & `AWS_SECRET_ACCESS_KEY` environment variables.
+* `SYS_PROP` - Use Java system properties `aws.accessKeyId` and `aws.secretKey`.
+* `PROFILE` - Use an AWS credentials profile to create the AWS credentials.
+* `ASSUME_ROLE` - Create AWS credentials by assuming a role. The credentials for assuming the role must be supplied.
+* `WEB_IDENTITY_TOKEN` - Create AWS credentials by assuming a role using Web Identity Token.
+
+### Start Reading Position
+
+You can configure table sources to start reading a table-backing Kinesis data stream from a specific position through the `scan.stream.initpos` option.
+Available values are:
+
+* `LATEST`: read shards starting from the latest record.
+* `TRIM_HORIZON`: read shards starting from the earliest record possible (data may be trimmed by Kinesis depending on the current retention settings of the backing stream).
+* `AT_TIMESTAMP`: read shards starting from a specified timestamp. The timestamp value should be specified through the `scan.stream.initpos-timestamp` in one of the following formats:
+   * A non-negative double value representing the number of seconds that has elapsed since the Unix epoch (for example, `1459799926.480`).
+   * A value conforming to a user-defined `SimpleDateFormat` specified at `scan.stream.initpos-timestamp-format`.
+     If a user does not define a format, the default pattern will be `yyyy-MM-dd'T'HH:mm:ss.SSSXXX`.
+     For example, timestamp value is `2016-04-04` and user-defined format is `yyyy-MM-dd`, or timestamp value is `2016-04-04T19:58:46.480-00:00` and a user-defined format is not provided.
+
+### Sink Partitioning
+
+Kinesis data streams consist of one or more shards, and the `sink.partitioner` option allows you to control how records written into a multi-shard Kinesis-backed table will be partitioned between its shards.
+Valid values are:
+
+* `fixed`: Kinesis `PartitionKey` values derived from the Flink subtask index, so each Flink partition ends up in at most one Kinesis partition (assuming that no re-sharding takes place at runtime).
+* `random`: Kinesis `PartitionKey` values are assigned randomly. This is the default value for tables not defined with a `PARTITION BY` clause.
+* Custom `FixedKinesisPartitioner` subclass: e.g. `'org.mycompany.MyPartitioner'`.
+
+{{< hint info >}}
+Records written into tables defining a `PARTITION BY` clause will always be partitioned based on a concatenated projection of the `PARTITION BY` fields.
+In this case, the `sink.partitioner` field cannot be used to modify this behavior (attempting to do this results in a configuration error).
+You can, however, use the `sink.partitioner-field-delimiter` option to set the delimiter of field values in the concatenated [PartitionKey](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#Streams-PutRecord-request-PartitionKey) string (an empty string is also a valid delimiter).
+{{< /hint >}}
+
+### Enhanced Fan-Out
+
+[Enhanced Fan-Out (EFO)](https://aws.amazon.com/blogs/aws/kds-enhanced-fanout/) increases the maximum number of concurrent consumers per Kinesis data stream.
+Without EFO, all concurrent Kinesis consumers share a single read quota per shard.
+Using EFO, each consumer gets a distinct dedicated read quota per shard, allowing read throughput to scale with the number of consumers.
+
+<span class="label label-info">Note</span> Using EFO will [incur additional cost](https://aws.amazon.com/kinesis/data-streams/pricing/).
+
+You can enable and configure EFO with the following properties:
+
+* `scan.stream.recordpublisher`: Determines whether to use `EFO` or `POLLING`.
+* `scan.stream.efo.consumername`: A name to identify the consumer when the above value is `EFO`.
+* `scan.stream.efo.registration`: Strategy for (de-)registration  of `EFO` consumers with the name given by the `scan.stream.efo.consumername` value. Valid strategies are:
+  * `LAZY` (default): Stream consumers are registered when the Flink job starts running.
+    If the stream consumer already exists, it will be reused.
+    This is the preferred strategy for the majority of applications.
+    However, jobs with parallelism greater than 1 will result in tasks competing to register and acquire the stream consumer ARN.
+    For jobs with very large parallelism this can result in an increased start-up time.
+    The describe operation has a limit of 20 [transactions per second](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStreamConsumer.html),
+    this means application startup time will increase by roughly `parallelism/20 seconds`.
+  * `EAGER`: Stream consumers are registered in the `FlinkKinesisConsumer` constructor.
+    If the stream consumer already exists, it will be reused.
+    This will result in registration occurring when the job is constructed,
+    either on the Flink Job Manager or client environment submitting the job.
+    Using this strategy results in a single thread registering and retrieving the stream consumer ARN,
+    reducing startup time over `LAZY` (with large parallelism).
+    However, consider that the client environment will require access to the AWS services.
+  * `NONE`: Stream consumer registration is not performed by `FlinkKinesisConsumer`.
+    Registration must be performed externally using the [AWS CLI or SDK](https://aws.amazon.com/tools/)
+    to invoke [RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html).
+    Stream consumer ARNs should be provided to the job via the consumer configuration.
+* `scan.stream.efo.consumerarn.<stream-name>`: ARNs identifying externally registered ARN-consumers (substitute `<stream-name>` with the name of your stream in the parameter name).
+   Use this if you choose to use `NONE` as a `scan.stream.efo.registration` strategy.
+
+<span class="label label-info">Note</span> For a given Kinesis data stream, each EFO consumer must have a unique name.
+However, consumer names do not have to be unique across data streams.
+Reusing a consumer name will result in existing subscriptions being terminated.
+
+<span class="label label-info">Note</span> With the `LAZY` strategy, stream consumers are de-registered when the job is shutdown gracefully.
+In the event that a job terminates within executing the shutdown hooks, stream consumers will remain active.
+In this situation the stream consumers will be gracefully reused when the application restarts.
+With the `NONE` and `EAGER` strategies, stream consumer de-registration is not performed by `FlinkKinesisConsumer`.
+
+# Data Type Mapping
+
+
+Kinesis stores records as Base64-encoded binary data objects, so it doesn't have a notion of internal record structure.
+Instead, Kinesis records are deserialized and serialized by formats, e.g. 'avro', 'csv', or 'json'.
+To determine the data type of the messages in your Kinesis-backed tables, pick a suitable Flink format with the `format` keyword.
+Please refer to the [Formats]({{< ref "docs/connectors/table/formats/overview" >}}) pages for more details.
+
+# Updates in 1.15
+
+Kinesis table API connector sink data stream depends on <code>FlinkKinesisProducer</code> till 1.14, with the introduction of <code>KinesisStreamsSink</code> in 1.15 kinesis table API sink connector has been migrated to the new <code>KinesisStreamsSink</code>. Authentication options have been migrated identically while sink configuration options are now compatible with <code>KinesisStreamsSink</code>. 
+
+Options configuring <code>FlinkKinesisProducer</code> are now deprecated with fallback support for common configuration options with <code>KinesisStreamsSink</code>. 
+
+<code>KinesisStreamsSink</code> uses <code>KinesisAsyncClient</code> to send records to kinesis, 
+which doesn't support aggregation. In consequence, table options configuring aggregation in the deprecated <code>FlinkKinesisProducer</code> 
+are now deprecated and will be ignored, this includes <code>sink.producer.aggregation-enabled</code> and
+<code>sink.producer.aggregation-count</code>.
+
+<span class="label label-info">Note</span> Migrating applications with deprecated options will result in the incompatible deprecated options being ignored and warned to users.
+
+Kinesis table API source connector still depends on <code>FlinkKinesisConsumer</code> with no change in configuration options.
+
+
+{{< top >}}
diff --git a/docs/content/docs/connectors/datastream/dynamodb.md b/docs/content/docs/connectors/datastream/dynamodb.md
index e86fca2..ab6b48d 100644
--- a/docs/content/docs/connectors/datastream/dynamodb.md
+++ b/docs/content/docs/connectors/datastream/dynamodb.md
@@ -31,7 +31,7 @@
 
 To use the connector, add the following Maven dependency to your project:
 
-{{< connector_artifact flink-connector-dynamodb 3.0.0 >}}
+{{< connector_artifact flink-connector-dynamodb 4.0.0 >}}
 
 {{< tabs "ec24a4ae-6a47-11ed-a1eb-0242ac120002" >}}
 {{< tab "Java" >}}
diff --git a/docs/content/docs/connectors/datastream/firehose.md b/docs/content/docs/connectors/datastream/firehose.md
new file mode 100644
index 0000000..7ca0edd
--- /dev/null
+++ b/docs/content/docs/connectors/datastream/firehose.md
@@ -0,0 +1,199 @@
+---
+title: Firehose
+weight: 5
+type: docs
+aliases:
+- /dev/connectors/firehose.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Amazon Kinesis Data Firehose Sink
+
+The Firehose sink writes to [Amazon Kinesis Data Firehose](https://aws.amazon.com/kinesis/data-firehose/).
+
+Follow the instructions from the [Amazon Kinesis Data Firehose Developer Guide](https://docs.aws.amazon.com/firehose/latest/dev/basic-create.html)
+to setup a Kinesis Data Firehose delivery stream.
+
+To use the connector, add the following Maven dependency to your project:
+
+{{< connector_artifact flink-connector-aws-kinesis-firehose 4.0.0 >}}
+
+{{< py_download_link "aws-kinesis-firehose" >}}
+
+The `KinesisFirehoseSink` uses [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html) to write data from a Flink stream into a Firehose delivery stream.
+
+{{< tabs "42vs28vdth5-nm76-6dz1-5m7s-5y345bu56se5u66je" >}}
+{{< tab "Java" >}}
+```java
+Properties sinkProperties = new Properties();
+// Required
+sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1");
+// Optional, provide via alternative routes e.g. environment variables
+sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
+sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
+
+KinesisFirehoseSink<String> kdfSink =
+    KinesisFirehoseSink.<String>builder()
+        .setFirehoseClientProperties(sinkProperties)      // Required
+        .setSerializationSchema(new SimpleStringSchema()) // Required
+        .setDeliveryStreamName("your-stream-name")        // Required
+        .setFailOnError(false)                            // Optional
+        .setMaxBatchSize(500)                             // Optional
+        .setMaxInFlightRequests(50)                       // Optional
+        .setMaxBufferedRequests(10_000)                   // Optional
+        .setMaxBatchSizeInBytes(4 * 1024 * 1024)          // Optional
+        .setMaxTimeInBufferMS(5000)                       // Optional
+        .setMaxRecordSizeInBytes(1000 * 1024)             // Optional
+        .build();
+
+flinkStream.sinkTo(kdfSink);
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val sinkProperties = new Properties()
+// Required
+sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1")
+// Optional, provide via alternative routes e.g. environment variables
+sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
+sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key")
+
+val kdfSink =
+    KinesisFirehoseSink.<String>builder()
+        .setFirehoseClientProperties(sinkProperties)      // Required
+        .setSerializationSchema(new SimpleStringSchema()) // Required
+        .setDeliveryStreamName("your-stream-name")        // Required
+        .setFailOnError(false)                            // Optional
+        .setMaxBatchSize(500)                             // Optional
+        .setMaxInFlightRequests(50)                       // Optional
+        .setMaxBufferedRequests(10_000)                   // Optional
+        .setMaxBatchSizeInBytes(4 * 1024 * 1024)          // Optional
+        .setMaxTimeInBufferMS(5000)                       // Optional
+        .setMaxRecordSizeInBytes(1000 * 1024)             // Optional
+        .build()
+
+flinkStream.sinkTo(kdfSink)
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+sink_properties = {
+    # Required
+    'aws.region': 'eu-west-1',
+    # Optional, provide via alternative routes e.g. environment variables
+    'aws.credentials.provider.basic.accesskeyid': 'aws_access_key_id',
+    'aws.credentials.provider.basic.secretkey': 'aws_secret_access_key'
+}
+
+kdf_sink = KinesisFirehoseSink.builder() \
+    .set_firehose_client_properties(sink_properties) \     # Required
+    .set_serialization_schema(SimpleStringSchema()) \      # Required
+    .set_delivery_stream_name('your-stream-name') \        # Required
+    .set_fail_on_error(False) \                            # Optional
+    .set_max_batch_size(500) \                             # Optional
+    .set_max_in_flight_requests(50) \                      # Optional
+    .set_max_buffered_requests(10000) \                    # Optional
+    .set_max_batch_size_in_bytes(5 * 1024 * 1024) \        # Optional
+    .set_max_time_in_buffer_ms(5000) \                     # Optional
+    .set_max_record_size_in_bytes(1 * 1024 * 1024) \       # Optional
+    .build()
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+## Configurations
+
+Flink's Firehose sink is created by using the static builder `KinesisFirehoseSink.<InputType>builder()`.
+
+1. __setFirehoseClientProperties(Properties sinkProperties)__
+    * Required.
+    * Supplies credentials, region and other parameters to the Firehose client.
+2. __setSerializationSchema(SerializationSchema<InputType> serializationSchema)__
+    * Required.
+    * Supplies a serialization schema to the Sink. This schema is used to serialize elements before sending to Firehose.
+3. __setDeliveryStreamName(String deliveryStreamName)__
+    * Required.
+    * Name of the delivery stream to sink to.
+4. _setFailOnError(boolean failOnError)_
+    * Optional. Default: `false`.
+    * Whether failed requests to write records to Firehose are treated as fatal exceptions in the sink.
+5. _setMaxBatchSize(int maxBatchSize)_
+    * Optional. Default: `500`.
+    * Maximum size of a batch to write to Firehose.
+6. _setMaxInFlightRequests(int maxInFlightRequests)_
+    * Optional. Default: `50`.
+    * The maximum number of in flight requests allowed before the sink applies backpressure.
+7. _setMaxBufferedRequests(int maxBufferedRequests)_
+    * Optional. Default: `10_000`.
+    * The maximum number of records that may be buffered in the sink before backpressure is applied. 
+8. _setMaxBatchSizeInBytes(int maxBatchSizeInBytes)_
+    * Optional. Default: `4 * 1024 * 1024`.
+    * The maximum size (in bytes) a batch may become. All batches sent will be smaller than or equal to this size.
+9. _setMaxTimeInBufferMS(int maxTimeInBufferMS)_
+    * Optional. Default: `5000`.
+    * The maximum time a record may stay in the sink before being flushed.
+10. _setMaxRecordSizeInBytes(int maxRecordSizeInBytes)_
+    * Optional. Default: `1000 * 1024`.
+    * The maximum record size that the sink will accept, records larger than this will be automatically rejected.
+11. _build()_
+    * Constructs and returns the Firehose sink.
+
+
+## Using Custom Firehose Endpoints
+
+It is sometimes desirable to have Flink operate as a consumer or producer against a Firehose VPC endpoint or a non-AWS
+Firehose endpoint such as [Localstack](https://localstack.cloud/); this is especially useful when performing
+functional testing of a Flink application. The AWS endpoint that would normally be inferred by the AWS region set in the
+Flink configuration must be overridden via a configuration property.
+
+To override the AWS endpoint, set the `AWSConfigConstants.AWS_ENDPOINT` and `AWSConfigConstants.AWS_REGION` properties. The region will be used to sign the endpoint URL.
+
+{{< tabs "bcadd466-8416-4d3c-a6a7-c46eee0cbd4a" >}}
+{{< tab "Java" >}}
+```java
+Properties producerConfig = new Properties();
+producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
+producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
+producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
+producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4566");
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val producerConfig = new Properties()
+producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
+producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
+producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key")
+producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4566")
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+producer_config = {
+    'aws.region': 'us-east-1',
+    'aws.credentials.provider.basic.accesskeyid': 'aws_access_key_id',
+    'aws.credentials.provider.basic.secretkey': 'aws_secret_access_key',
+    'aws.endpoint': 'http://localhost:4566'
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+{{< top >}}
diff --git a/docs/content/docs/connectors/datastream/kinesis.md b/docs/content/docs/connectors/datastream/kinesis.md
new file mode 100644
index 0000000..6036ac9
--- /dev/null
+++ b/docs/content/docs/connectors/datastream/kinesis.md
@@ -0,0 +1,859 @@
+---
+title: Kinesis
+weight: 5
+type: docs
+aliases:
+  - /dev/connectors/kinesis.html
+  - /apis/streaming/connectors/kinesis.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Amazon Kinesis Data Streams Connector
+
+The Kinesis connector provides access to [Amazon Kinesis Data Streams](http://aws.amazon.com/kinesis/streams/).
+
+To use this connector, add one or more of the following dependencies to your project, depending on whether you are reading from and/or writing to Kinesis Data Streams:
+
+<table class="table table-bordered">
+  <thead>
+    <tr>
+      <th class="text-left">KDS Connectivity</th>
+      <th class="text-left">Maven Dependency</th>
+    </tr>
+  </thead>
+  <tbody>
+    <tr>
+        <td>Source</td>
+        <td>{{< connector_artifact flink-connector-kinesis 4.0.0 >}}</td>
+    </tr>
+    <tr>
+        <td>Sink</td>
+        <td>{{< connector_artifact flink-connector-aws-kinesis-streams 4.0.0 >}}</td>
+    </tr>
+  </tbody>
+</table>
+
+Due to the licensing issue, the `flink-connector-kinesis` artifact is not deployed to Maven central for the prior versions. Please see the version specific documentation for further information.
+
+{{< py_download_link "kinesis" >}}
+
+## Using the Amazon Kinesis Streams Service
+Follow the instructions from the [Amazon Kinesis Streams Developer Guide](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html)
+to setup Kinesis streams.
+
+## Configuring Access to Kinesis with IAM
+Make sure to create the appropriate IAM policy to allow reading / writing to / from the Kinesis streams. See examples [here](https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html).
+
+Depending on your deployment you would choose a different Credentials Provider to allow access to Kinesis.
+By default, the `AUTO` Credentials Provider is used.
+If the access key ID and secret key are set in the configuration, the `BASIC` provider is used.  
+
+A specific Credentials Provider can **optionally** be set by using the `AWSConfigConstants.AWS_CREDENTIALS_PROVIDER` setting.
+ 
+Supported Credential Providers are:
+* `AUTO` - Using the default AWS Credentials Provider chain that searches for credentials in the following order: `ENV_VARS`, `SYS_PROPS`, `WEB_IDENTITY_TOKEN`, `PROFILE` and EC2/ECS credentials provider.
+* `BASIC` - Using access key ID and secret key supplied as configuration. 
+* `ENV_VAR` - Using `AWS_ACCESS_KEY_ID` & `AWS_SECRET_ACCESS_KEY` environment variables.
+* `SYS_PROP` - Using Java system properties aws.accessKeyId and aws.secretKey.
+* `PROFILE` - Use AWS credentials profile file to create the AWS credentials.
+* `ASSUME_ROLE` - Create AWS credentials by assuming a role. The credentials for assuming the role must be supplied.
+* `WEB_IDENTITY_TOKEN` - Create AWS credentials by assuming a role using Web Identity Token. 
+
+## Kinesis Consumer
+
+The `FlinkKinesisConsumer` is an exactly-once parallel streaming data source that subscribes to multiple AWS Kinesis
+streams within the same AWS service region, and can transparently handle resharding of streams while the job is running. Each subtask of the consumer is
+responsible for fetching data records from multiple Kinesis shards. The number of shards fetched by each subtask will
+change as shards are closed and created by Kinesis.
+
+Before consuming data from Kinesis streams, make sure that all streams1 are created with the status "ACTIVE" in the Amazon Kinesis Data Stream console.
+
+{{< tabs "58b6c235-48ee-4cf7-aabc-41e0679a3370" >}}
+{{< tab "Java" >}}
+```java
+Properties consumerConfig = new Properties();
+consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
+consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
+consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
+consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
+
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
+    "kinesis_stream_name", new SimpleStringSchema(), consumerConfig));
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val consumerConfig = new Properties()
+consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
+consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
+consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key")
+consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST")
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+
+val kinesis = env.addSource(new FlinkKinesisConsumer[String](
+    "kinesis_stream_name", new SimpleStringSchema, consumerConfig))
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+consumer_config = {
+    'aws.region': 'us-east-1',
+    'aws.credentials.provider.basic.accesskeyid': 'aws_access_key_id',
+    'aws.credentials.provider.basic.secretkey': 'aws_secret_access_key',
+    'flink.stream.initpos': 'LATEST'
+}
+
+env = StreamExecutionEnvironment.get_execution_environment()
+
+kinesis = env.add_source(FlinkKinesisConsumer("stream-1", SimpleStringSchema(), consumer_config))
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+The above is a simple example of using the consumer. Configuration for the consumer is supplied with a `java.util.Properties`
+instance, the configuration keys for which can be found in `AWSConfigConstants` (AWS-specific parameters) and 
+`ConsumerConfigConstants` (Kinesis consumer parameters). The example
+demonstrates consuming a single Kinesis stream in the AWS region "us-east-1". The AWS credentials are supplied using the basic method in which
+the AWS access key ID and secret access key are directly supplied in the configuration. Also, data is being consumed
+from the newest position in the Kinesis stream (the other option will be setting `ConsumerConfigConstants.STREAM_INITIAL_POSITION`
+to `TRIM_HORIZON`, which lets the consumer start reading the Kinesis stream from the earliest record possible).
+
+Other optional configuration keys for the consumer can be found in `ConsumerConfigConstants`.
+
+Note that the configured parallelism of the Flink Kinesis Consumer source
+can be completely independent of the total number of shards in the Kinesis streams.
+When the number of shards is larger than the parallelism of the consumer,
+then each consumer subtask can subscribe to multiple shards; otherwise
+if the number of shards is smaller than the parallelism of the consumer,
+then some consumer subtasks will simply be idle and wait until it gets assigned
+new shards (i.e., when the streams are resharded to increase the
+number of shards for higher provisioned Kinesis service throughput).
+
+Also note that the default assignment of shards to subtasks is based on the hashes of the shard and stream names,
+which will more-or-less balance the shards across the subtasks.
+However, assuming the default Kinesis shard management is used on the stream (UpdateShardCount with `UNIFORM_SCALING`),
+setting `UniformShardAssigner` as the shard assigner on the consumer will much more evenly distribute shards to subtasks.
+Assuming the incoming Kinesis records are assigned random Kinesis `PartitionKey` or `ExplicitHashKey` values,
+the result is consistent subtask loading.
+If neither the default assigner nor the `UniformShardAssigner` suffice, a custom implementation of `KinesisShardAssigner` can be set.
+
+### The `DeserializationSchema`
+
+Flink Kinesis Consumer also needs a schema to know how to turn the binary data in a Kinesis Data Stream into Java objects.
+The `KinesisDeserializationSchema` allows users to specify such a schema. The `T deserialize(byte[] recordValue, String partitionKey, String seqNum, long approxArrivalTimestamp, String stream, String shardId)` 
+method gets called for each Kinesis record.
+
+For convenience, Flink provides the following schemas out of the box:
+  
+1. `TypeInformationSerializationSchema` which creates a schema based on a Flink's `TypeInformation`. 
+    This is useful if the data is both written and read by Flink.
+    This schema is a performant Flink-specific alternative to other generic serialization approaches.
+    
+2. `GlueSchemaRegistryJsonDeserializationSchema` offers the ability to lookup the writer's schema (schema which was used to write the record)
+   in [AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html). Using this, deserialization schema record will be
+   read with the schema retrieved from AWS Glue Schema Registry and transformed to either `com.amazonaws.services.schemaregistry.serializers.json.JsonDataWithSchema`
+   that represents generic record with a manually provided schema or a JAVA POJO generated by [mbknor-jackson-jsonSchema](https://github.com/mbknor/mbknor-jackson-jsonSchema).  
+   
+   <br>To use this deserialization schema one has to add the following additional dependency:
+       
+{{< tabs "8c6721c7-4a48-496e-b0fe-6522cf6a5e13" >}}
+{{< tab "GlueSchemaRegistryJsonDeserializationSchema" >}}
+{{< artifact flink-jsonschema-glue-schema-registry >}}
+{{< /tab >}}
+{{< /tabs >}}
+    
+3. `AvroDeserializationSchema` which reads data serialized with Avro format using a statically provided schema. It can
+    infer the schema from Avro generated classes (`AvroDeserializationSchema.forSpecific(...)`) or it can work with `GenericRecords`
+    with a manually provided schema (with `AvroDeserializationSchema.forGeneric(...)`). This deserialization schema expects that
+    the serialized records DO NOT contain the embedded schema.
+
+    - You can use [AWS Glue Schema Registry](https://docs.aws.amazon.com/glue/latest/dg/schema-registry.html)
+      to retrieve the writer’s schema. Similarly, the deserialization record will be read with the schema from AWS Glue Schema Registry and transformed
+      (either through `GlueSchemaRegistryAvroDeserializationSchema.forGeneric(...)` or `GlueSchemaRegistryAvroDeserializationSchema.forSpecific(...)`).
+      For more information on integrating the AWS Glue Schema Registry with Apache Flink see
+      [Use Case: Amazon Kinesis Data Analytics for Apache Flink](https://docs.aws.amazon.com/glue/latest/dg/schema-registry-integrations.html#schema-registry-integrations-kinesis-data-analytics-apache-flink).
+
+    <br>To use this deserialization schema one has to add the following additional dependency:
+    
+{{< tabs "8c6721c7-4a48-496e-b0fe-6522cf6a5e13" >}}
+{{< tab "AvroDeserializationSchema" >}}
+{{< artifact flink-avro >}}
+{{< /tab >}}
+{{< tab "GlueSchemaRegistryAvroDeserializationSchema" >}}
+{{< artifact flink-avro-glue-schema-registry >}}
+{{< /tab >}}
+{{< /tabs >}}
+
+### Configuring Starting Position
+
+The Flink Kinesis Consumer currently provides the following options to configure where to start reading Kinesis streams, simply by setting `ConsumerConfigConstants.STREAM_INITIAL_POSITION` to
+one of the following values in the provided configuration properties (the naming of the options identically follows [the namings used by the AWS Kinesis Streams service](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax)):
+
+- `LATEST`: read all shards of all streams starting from the latest record.
+- `TRIM_HORIZON`: read all shards of all streams starting from the earliest record possible (data may be trimmed by Kinesis depending on the retention settings).
+- `AT_TIMESTAMP`: read all shards of all streams starting from a specified timestamp. The timestamp must also be specified in the configuration
+properties by providing a value for `ConsumerConfigConstants.STREAM_INITIAL_TIMESTAMP`, in one of the following date pattern :
+    - a non-negative double value representing the number of seconds that has elapsed since the Unix epoch (for example, `1459799926.480`).
+    - a user defined pattern, which is a valid pattern for `SimpleDateFormat` provided by `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT`.
+    If `ConsumerConfigConstants.STREAM_TIMESTAMP_DATE_FORMAT` is not defined then the default pattern will be `yyyy-MM-dd'T'HH:mm:ss.SSSXXX`
+    (for example, timestamp value is `2016-04-04` and pattern is `yyyy-MM-dd` given by user or timestamp value is `2016-04-04T19:58:46.480-00:00` without given a pattern).
+
+### Fault Tolerance for Exactly-Once User-Defined State Update Semantics
+
+With Flink's checkpointing enabled, the Flink Kinesis Consumer will consume records from shards in Kinesis streams and
+periodically checkpoint each shard's progress. In case of a job failure, Flink will restore the streaming program to the
+state of the latest complete checkpoint and re-consume the records from Kinesis shards, starting from the progress that
+was stored in the checkpoint.
+
+The interval of drawing checkpoints therefore defines how much the program may have to go back at most, in case of a failure.
+
+To use fault tolerant Kinesis Consumers, checkpointing of the topology needs to be enabled at the execution environment:
+
+{{< tabs "b1399ed7-5855-446d-9684-7a49de9b4c97" >}}
+{{< tab "Java" >}}
+```java
+final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+env.enableCheckpointing(5000); // checkpoint every 5000 msecs
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+env.enableCheckpointing(5000) // checkpoint every 5000 msecs
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+env = StreamExecutionEnvironment.get_execution_environment()
+env.enable_checkpointing(5000) # checkpoint every 5000 msecs
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+Also note that Flink can only restart the topology if enough processing slots are available to restart the topology.
+Therefore, if the topology fails due to loss of a TaskManager, there must still be enough slots available afterwards.
+Flink on YARN supports automatic restart of lost YARN containers.
+
+### Using Enhanced Fan-Out
+
+[Enhanced Fan-Out (EFO)](https://aws.amazon.com/blogs/aws/kds-enhanced-fanout/) increases the maximum 
+number of concurrent consumers per Kinesis stream.
+Without EFO, all concurrent consumers share a single read quota per shard. 
+Using EFO, each consumer gets a distinct dedicated read quota per shard, allowing read throughput to scale with the number of consumers. 
+Using EFO will [incur additional cost](https://aws.amazon.com/kinesis/data-streams/pricing/).
+ 
+In order to enable EFO two additional configuration parameters are required:
+
+- `RECORD_PUBLISHER_TYPE`: Determines whether to use `EFO` or `POLLING`. The default `RecordPublisher` is `POLLING`.
+- `EFO_CONSUMER_NAME`: A name to identify the consumer. 
+For a given Kinesis data stream, each consumer must have a unique name. 
+However, consumer names do not have to be unique across data streams. 
+Reusing a consumer name will result in existing subscriptions being terminated.
+
+The code snippet below shows a simple example configurating an EFO consumer.
+
+{{< tabs "42345893-70c3-4678-a348-4c419b337eb1" >}}
+{{< tab "Java" >}}
+```java
+Properties consumerConfig = new Properties();
+consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
+consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
+
+consumerConfig.put(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, 
+    ConsumerConfigConstants.RecordPublisherType.EFO.name());
+consumerConfig.put(ConsumerConfigConstants.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
+
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
+    "kinesis_stream_name", new SimpleStringSchema(), consumerConfig));
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val consumerConfig = new Properties()
+consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
+consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST")
+
+consumerConfig.put(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, 
+    ConsumerConfigConstants.RecordPublisherType.EFO.name());
+consumerConfig.put(ConsumerConfigConstants.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+
+val kinesis = env.addSource(new FlinkKinesisConsumer[String](
+    "kinesis_stream_name", new SimpleStringSchema, consumerConfig))
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+consumer_config = {
+    'aws.region': 'us-east-1',
+    'flink.stream.initpos': 'LATEST',
+    'flink.stream.recordpublisher':  'EFO',
+    'flink.stream.efo.consumername': 'my-flink-efo-consumer'
+}
+
+env = StreamExecutionEnvironment.get_execution_environment()
+
+kinesis = env.add_source(FlinkKinesisConsumer(
+    "kinesis_stream_name", SimpleStringSchema(), consumer_config))
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+#### EFO Stream Consumer Registration/Deregistration
+
+In order to use EFO, a stream consumer must be registered against each stream you wish to consume.
+By default, the `FlinkKinesisConsumer` will register the stream consumer automatically when the Flink job starts.
+The stream consumer will be registered using the name provided by the `EFO_CONSUMER_NAME` configuration.
+`FlinkKinesisConsumer` provides three registration strategies:
+
+- Registration
+  - `LAZY` (default): Stream consumers are registered when the Flink job starts running.
+    If the stream consumer already exists, it will be reused.
+    This is the preferred strategy for the majority of applications.
+    However, jobs with parallelism greater than 1 will result in tasks competing to register and acquire the stream consumer ARN.
+    For jobs with very large parallelism this can result in an increased start-up time.
+    The describe operation has a limit of 20 [transactions per second](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStreamConsumer.html),
+    this means application startup time will increase by roughly `parallelism/20 seconds`.
+  - `EAGER`: Stream consumers are registered in the `FlinkKinesisConsumer` constructor.
+    If the stream consumer already exists, it will be reused. 
+    This will result in registration occurring when the job is constructed, 
+    either on the Flink Job Manager or client environment submitting the job.
+    Using this strategy results in a single thread registering and retrieving the stream consumer ARN, 
+    reducing startup time over `LAZY` (with large parallelism).
+    However, consider that the client environment will require access to the AWS services.
+  - `NONE`: Stream consumer registration is not performed by `FlinkKinesisConsumer`.
+    Registration must be performed externally using the [AWS CLI or SDK](https://aws.amazon.com/tools/)
+    to invoke [RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html).
+    Stream consumer ARNs should be provided to the job via the consumer configuration.
+- Deregistration
+  - `LAZY|EAGER` (default): Stream consumers are deregistered when the job is shutdown gracefully.
+    In the event that a job terminates within executing the shutdown hooks, stream consumers will remain active.
+    In this situation the stream consumers will be gracefully reused when the application restarts. 
+  - `NONE`: Stream consumer deregistration is not performed by `FlinkKinesisConsumer`.
+
+Below is an example configuration to use the `EAGER` registration strategy:
+
+{{< tabs "a85d716b-6c1c-46d8-9ee4-12d8380a0c06" >}}
+{{< tab "Java" >}}
+```java
+Properties consumerConfig = new Properties();
+consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
+consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
+
+consumerConfig.put(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, 
+    ConsumerConfigConstants.RecordPublisherType.EFO.name());
+consumerConfig.put(ConsumerConfigConstants.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
+
+consumerConfig.put(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, 
+    ConsumerConfigConstants.EFORegistrationType.EAGER.name());
+
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
+    "kinesis_stream_name", new SimpleStringSchema(), consumerConfig));
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val consumerConfig = new Properties()
+consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
+consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST")
+
+consumerConfig.put(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, 
+    ConsumerConfigConstants.RecordPublisherType.EFO.name());
+consumerConfig.put(ConsumerConfigConstants.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
+
+consumerConfig.put(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, 
+    ConsumerConfigConstants.EFORegistrationType.EAGER.name());
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+
+val kinesis = env.addSource(new FlinkKinesisConsumer[String](
+    "kinesis_stream_name", new SimpleStringSchema, consumerConfig))
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+consumer_config = {
+    'aws.region': 'us-east-1',
+    'flink.stream.initpos': 'LATEST',
+    'flink.stream.recordpublisher':  'EFO',
+    'flink.stream.efo.consumername': 'my-flink-efo-consumer',
+    'flink.stream.efo.registration': 'EAGER'
+}
+
+env = StreamExecutionEnvironment.get_execution_environment()
+
+kinesis = env.add_source(FlinkKinesisConsumer(
+    "kinesis_stream_name", SimpleStringSchema(), consumer_config))
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+Below is an example configuration to use the `NONE` registration strategy:
+
+{{< tabs "00b46c87-7740-4263-8040-2aa7e2960513" >}}
+{{< tab "Java" >}}
+```java
+Properties consumerConfig = new Properties();
+consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
+consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");
+
+consumerConfig.put(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, 
+    ConsumerConfigConstants.RecordPublisherType.EFO.name());
+consumerConfig.put(ConsumerConfigConstants.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
+
+consumerConfig.put(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, 
+    ConsumerConfigConstants.EFORegistrationType.NONE.name());
+consumerConfig.put(ConsumerConfigConstants.efoConsumerArn("stream-name"), 
+    "arn:aws:kinesis:<region>:<account>>:stream/<stream-name>/consumer/<consumer-name>:<create-timestamp>");
+
+StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+DataStream<String> kinesis = env.addSource(new FlinkKinesisConsumer<>(
+    "kinesis_stream_name", new SimpleStringSchema(), consumerConfig));
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val consumerConfig = new Properties()
+consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
+consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST")
+
+consumerConfig.put(ConsumerConfigConstants.RECORD_PUBLISHER_TYPE, 
+    ConsumerConfigConstants.RecordPublisherType.EFO.name());
+consumerConfig.put(ConsumerConfigConstants.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
+
+consumerConfig.put(ConsumerConfigConstants.EFO_REGISTRATION_TYPE, 
+    ConsumerConfigConstants.EFORegistrationType.NONE.name());
+consumerConfig.put(ConsumerConfigConstants.efoConsumerArn("stream-name"), 
+    "arn:aws:kinesis:<region>:<account>>:stream/<stream-name>/consumer/<consumer-name>:<create-timestamp>");
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment()
+
+val kinesis = env.addSource(new FlinkKinesisConsumer[String](
+    "kinesis_stream_name", new SimpleStringSchema, consumerConfig))
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+consumer_config = {
+    'aws.region': 'us-east-1',
+    'flink.stream.initpos': 'LATEST',
+    'flink.stream.recordpublisher':  'EFO',
+    'flink.stream.efo.consumername': 'my-flink-efo-consumer',
+    'flink.stream.efo.consumerarn.stream-name':
+        'arn:aws:kinesis:<region>:<account>>:stream/<stream-name>/consumer/<consumer-name>:<create-timestamp>'
+}
+
+env = StreamExecutionEnvironment.get_execution_environment()
+
+kinesis = env.add_source(FlinkKinesisConsumer(
+    "kinesis_stream_name", SimpleStringSchema(), consumer_config))
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+### Event Time for Consumed Records
+
+If streaming topologies choose to use the [event time notion]({{< ref "docs/concepts/time" >}}) for record
+timestamps, an *approximate arrival timestamp* will be used by default. This timestamp is attached to records by Kinesis once they
+were successfully received and stored by streams. Note that this timestamp is typically referred to as a Kinesis server-side
+timestamp, and there are no guarantees about the accuracy or order correctness (i.e., the timestamps may not always be
+ascending).
+
+Users can choose to override this default with a custom timestamp, as described [here]({{< ref "docs/dev/datastream/event-time/generating_watermarks" >}}),
+or use one from the [predefined ones]({{< ref "docs/dev/datastream/event-time/built_in" >}}). After doing so,
+it can be passed to the consumer in the following way:
+
+{{< tabs "8fbaf5cb-3b76-4c62-a74e-db51b60f6600" >}}
+{{< tab "Java" >}}
+```java
+FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>(
+    "kinesis_stream_name",
+    new SimpleStringSchema(),
+    kinesisConsumerConfig);
+consumer.setPeriodicWatermarkAssigner(new CustomAssignerWithPeriodicWatermarks());
+DataStream<String> stream = env
+	.addSource(consumer)
+	.print();
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val consumer = new FlinkKinesisConsumer[String](
+    "kinesis_stream_name",
+    new SimpleStringSchema(),
+    kinesisConsumerConfig);
+consumer.setPeriodicWatermarkAssigner(new CustomAssignerWithPeriodicWatermarks());
+val stream = env
+	.addSource(consumer)
+	.print();
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+consumer = FlinkKinesisConsumer(
+    "kinesis_stream_name",
+    SimpleStringSchema(),
+    consumer_config)
+stream = env.add_source(consumer).print()
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+Internally, an instance of the assigner is executed per shard / consumer thread (see threading model below).
+When an assigner is specified, for each record read from Kinesis, the extractTimestamp(T element, long previousElementTimestamp)
+is called to assign a timestamp to the record and getCurrentWatermark() to determine the new watermark for the shard.
+The watermark of the consumer subtask is then determined as the minimum watermark of all its shards and emitted periodically.
+The per shard watermark is essential to deal with varying consumption speed between shards, that otherwise could lead
+to issues with downstream logic that relies on the watermark, such as incorrect late data dropping.
+
+By default, the watermark is going to stall if shards do not deliver new records.
+The property `ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS` can be used to avoid this potential issue through a
+timeout that will allow the watermark to progress despite of idle shards.
+
+### Event Time Alignment for Shard Consumers
+
+The Flink Kinesis Consumer optionally supports synchronization between parallel consumer subtasks (and their threads)
+to avoid the event time skew related problems described in [Event time synchronization across sources](https://issues.apache.org/jira/browse/FLINK-10886).
+
+To enable synchronization, set the watermark tracker on the consumer:
+
+{{< tabs "8fbaf5cb-3b76-4c62-a74e-db51b60f6601" >}}
+{{< tab "Java" >}}
+```java
+JobManagerWatermarkTracker watermarkTracker =
+    new JobManagerWatermarkTracker("myKinesisSource");
+consumer.setWatermarkTracker(watermarkTracker);
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+watermark_tracker = WatermarkTracker.job_manager_watermark_tracker("myKinesisSource")
+consumer.set_watermark_tracker(watermark_tracker)
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+The `JobManagerWatermarkTracker` will use a global aggregate to synchronize the per subtask watermarks. Each subtask
+uses a per shard queue to control the rate at which records are emitted downstream based on how far ahead of the global
+watermark the next record in the queue is.
+
+The "emit ahead" limit is configured via `ConsumerConfigConstants.WATERMARK_LOOKAHEAD_MILLIS`. Smaller values reduce
+the skew but also the throughput. Larger values will allow the subtask to proceed further before waiting for the global
+watermark to advance.
+
+Another variable in the throughput equation is how frequently the watermark is propagated by the tracker.
+The interval can be configured via `ConsumerConfigConstants.WATERMARK_SYNC_MILLIS`.
+Smaller values reduce emitter waits and come at the cost of increased communication with the job manager.
+
+Since records accumulate in the queues when skew occurs, increased memory consumption needs to be expected.
+How much depends on the average record size. With larger sizes, it may be necessary to adjust the emitter queue capacity via
+`ConsumerConfigConstants.WATERMARK_SYNC_QUEUE_CAPACITY`.
+
+### Threading Model
+
+The Flink Kinesis Consumer uses multiple threads for shard discovery and data consumption.
+
+#### Shard Discovery
+
+For shard discovery, each parallel consumer subtask will have a single thread that constantly queries Kinesis for shard
+information even if the subtask initially did not have shards to read from when the consumer was started. In other words, if
+the consumer is run with a parallelism of 10, there will be a total of 10 threads constantly querying Kinesis regardless
+of the total amount of shards in the subscribed streams.
+
+#### Polling (default) Record Publisher
+
+For `POLLING` data consumption, a single thread will be created to consume each discovered shard. Threads will terminate when the
+shard it is responsible of consuming is closed as a result of stream resharding. In other words, there will always be
+one thread per open shard.
+
+#### Enhanced Fan-Out Record Publisher
+
+For `EFO` data consumption the threading model is the same as `POLLING`, with additional thread pools to handle 
+asynchronous communication with Kinesis. AWS SDK v2.x `KinesisAsyncClient` uses additional threads for 
+Netty to handle IO and asynchronous response. Each parallel consumer subtask will have their own instance of the `KinesisAsyncClient`.
+In other words, if the consumer is run with a parallelism of 10, there will be a total of 10 `KinesisAsyncClient` instances.
+A separate client will be created and subsequently destroyed when registering and deregistering stream consumers.
+
+### Internally Used Kinesis APIs
+
+The Flink Kinesis Consumer uses the [AWS Java SDK](http://aws.amazon.com/sdk-for-java/) internally to call Kinesis APIs
+for shard discovery and data consumption. Due to Amazon's [service limits for Kinesis Streams](http://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html)
+on the APIs, the consumer will be competing with other non-Flink consuming applications that the user may be running.
+Below is a list of APIs called by the consumer with description of how the consumer uses the API, as well as information
+on how to deal with any errors or warnings that the Flink Kinesis Consumer may have due to these service limits.
+
+#### Shard Discovery
+
+- *[ListShards](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ListShards.html)*: this is constantly called
+by a single thread in each parallel consumer subtask to discover any new shards as a result of stream resharding. By default,
+the consumer performs the shard discovery at an interval of 10 seconds, and will retry indefinitely until it gets a result
+from Kinesis. If this interferes with other non-Flink consuming applications, users can slow down the consumer of
+calling this API by setting a value for `ConsumerConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS` in the supplied
+configuration properties. This sets the discovery interval to a different value. Note that this setting directly impacts
+the maximum delay of discovering a new shard and starting to consume it, as shards will not be discovered during the interval.
+
+#### Polling (default) Record Publisher
+
+- *[GetShardIterator](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html)*: this is called
+only once when per shard consuming threads are started, and will retry if Kinesis complains that the transaction limit for the
+API has exceeded, up to a default of 3 attempts. Note that since the rate limit for this API is per shard (not per stream),
+the consumer itself should not exceed the limit. Usually, if this happens, users can either try to slow down any other
+non-Flink consuming applications of calling this API, or modify the retry behaviour of this API call in the consumer by
+setting keys prefixed by `ConsumerConfigConstants.SHARD_GETITERATOR_*` in the supplied configuration properties.
+
+- *[GetRecords](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetRecords.html)*: this is constantly called
+by per shard consuming threads to fetch records from Kinesis. When a shard has multiple concurrent consumers (when there
+are any other non-Flink consuming applications running), the per shard rate limit may be exceeded. By default, on each call
+of this API, the consumer will retry if Kinesis complains that the data size / transaction limit for the API has exceeded,
+up to a default of 3 attempts. Users can either try to slow down other non-Flink consuming applications, or adjust the throughput
+of the consumer by setting the `ConsumerConfigConstants.SHARD_GETRECORDS_MAX` and
+`ConsumerConfigConstants.SHARD_GETRECORDS_INTERVAL_MILLIS` keys in the supplied configuration properties. Setting the former
+adjusts the maximum number of records each consuming thread tries to fetch from shards on each call (default is 10,000), while
+the latter modifies the sleep interval between each fetch (default is 200). The retry behaviour of the
+consumer when calling this API can also be modified by using the other keys prefixed by `ConsumerConfigConstants.SHARD_GETRECORDS_*`.
+
+#### Enhanced Fan-Out Record Publisher
+
+- *[SubscribeToShard](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShard.html)*: this is called
+by per shard consuming threads to obtain shard subscriptions. A shard subscription is typically active for 5 minutes, 
+but subscriptions will be reaquired if any recoverable errors are thrown. Once a subscription is acquired, the consumer
+will receive a stream of [SubscribeToShardEvents](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_SubscribeToShardEvent.html)s.
+Retry and backoff parameters can be configured using the `ConsumerConfigConstants.SUBSCRIBE_TO_SHARD_*` keys.
+
+- *[DescribeStreamSummary](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStreamSummary.html)*: this is called 
+once per stream, during stream consumer registration. By default, the `LAZY` registration strategy will scale the
+number of calls by the job parallelism. `EAGER` will invoke this once per stream and `NONE` will not invoke this API. 
+Retry and backoff parameters can be configured using the 
+`ConsumerConfigConstants.STREAM_DESCRIBE_*` keys.
+
+- *[DescribeStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStreamConsumer.html)*:
+this is called during stream consumer registration and deregistration. For each stream this service will be invoked 
+periodically until the stream consumer is reported `ACTIVE`/`not found` for registration/deregistration. By default,
+the `LAZY` registration strategy will scale the number of calls by the job parallelism. `EAGER` will call the service 
+once per stream for registration only. `NONE` will not invoke this service. Retry and backoff parameters can be configured using the 
+`ConsumerConfigConstants.DESCRIBE_STREAM_CONSUMER_*` keys.  
+
+- *[RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html)*: 
+this is called once per stream during stream consumer registration, unless the `NONE` registration strategy is configured.
+Retry and backoff parameters can be configured using the `ConsumerConfigConstants.REGISTER_STREAM_*` keys.
+
+- *[DeregisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DeregisterStreamConsumer.html)*: 
+this is called once per stream during stream consumer deregistration, unless the `NONE` or `EAGER` registration strategy is configured.
+Retry and backoff parameters can be configured using the `ConsumerConfigConstants.DEREGISTER_STREAM_*` keys.  
+
+## Kinesis Streams Sink
+
+The Kinesis Streams sink (hereafter "Kinesis sink") uses the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html) to write data from a Flink stream into a Kinesis stream.
+
+To write data into a Kinesis stream, make sure the stream is marked as "ACTIVE" in the Amazon Kinesis Data Stream console.
+
+For the monitoring to work, the user accessing the stream needs access to the CloudWatch service.
+
+{{< tabs "6df3b696-c2ca-4f44-bea0-96cf8275d61c" >}}
+{{< tab "Java" >}}
+```java
+Properties sinkProperties = new Properties();
+// Required
+sinkProperties.put(AWSConfigConstants.AWS_REGION, "us-east-1");
+
+// Optional, provide via alternative routes e.g. environment variables
+sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
+sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
+
+KinesisStreamsSink<String> kdsSink =
+    KinesisStreamsSink.<String>builder()
+        .setKinesisClientProperties(sinkProperties)                               // Required
+        .setSerializationSchema(new SimpleStringSchema())                         // Required
+        .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode()))  // Required
+        .setStreamName("your-stream-name")                                        // Required
+        .setFailOnError(false)                                                    // Optional
+        .setMaxBatchSize(500)                                                     // Optional
+        .setMaxInFlightRequests(50)                                               // Optional
+        .setMaxBufferedRequests(10_000)                                           // Optional
+        .setMaxBatchSizeInBytes(5 * 1024 * 1024)                                  // Optional
+        .setMaxTimeInBufferMS(5000)                                               // Optional
+        .setMaxRecordSizeInBytes(1 * 1024 * 1024)                                 // Optional
+        .build();
+
+DataStream<String> simpleStringStream = ...;
+simpleStringStream.sinkTo(kdsSink);
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val sinkProperties = new Properties()
+// Required
+sinkProperties.put(AWSConfigConstants.AWS_REGION, "us-east-1")
+
+// Optional, provide via alternative routes e.g. environment variables
+sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
+sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key")
+
+val kdsSink = KinesisStreamsSink.<String>builder()
+    .setKinesisClientProperties(sinkProperties)                               // Required
+    .setSerializationSchema(new SimpleStringSchema())                         // Required
+    .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode()))  // Required
+    .setStreamName("your-stream-name")                                        // Required
+    .setFailOnError(false)                                                    // Optional
+    .setMaxBatchSize(500)                                                     // Optional
+    .setMaxInFlightRequests(50)                                               // Optional
+    .setMaxBufferedRequests(10000)                                            // Optional
+    .setMaxBatchSizeInBytes(5 * 1024 * 1024)                                  // Optional
+    .setMaxTimeInBufferMS(5000)                                               // Optional
+    .setMaxRecordSizeInBytes(1 * 1024 * 1024)                                 // Optional
+    .build()
+
+val simpleStringStream = ...
+simpleStringStream.sinkTo(kdsSink)
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+# Required
+sink_properties = {
+    # Required
+    'aws.region': 'us-east-1',
+    # Optional, provide via alternative routes e.g. environment variables
+    'aws.credentials.provider.basic.accesskeyid': 'aws_access_key_id',
+    'aws.credentials.provider.basic.secretkey': 'aws_secret_access_key',
+    'aws.endpoint': 'http://localhost:4567'
+}
+
+kds_sink = KinesisStreamsSink.builder() \
+    .set_kinesis_client_properties(sink_properties) \                      # Required
+    .set_serialization_schema(SimpleStringSchema()) \                      # Required
+    .set_partition_key_generator(PartitionKeyGenerator.fixed()) \          # Required
+    .set_stream_name("your-stream-name") \                                 # Required
+    .set_fail_on_error(False) \                                            # Optional
+    .set_max_batch_size(500) \                                             # Optional
+    .set_max_in_flight_requests(50) \                                      # Optional
+    .set_max_buffered_requests(10000) \                                    # Optional
+    .set_max_batch_size_in_bytes(5 * 1024 * 1024) \                        # Optional
+    .set_max_time_in_buffer_ms(5000) \                                     # Optional
+    .set_max_record_size_in_bytes(1 * 1024 * 1024) \                       # Optional
+    .build()
+
+simple_string_stream = ...
+simple_string_stream.sink_to(kds_sink)
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+The above is a simple example of using the Kinesis sink. Begin by creating a `java.util.Properties` instance with the `AWS_REGION`, `AWS_ACCESS_KEY_ID`, and `AWS_SECRET_ACCESS_KEY` configured. You can then construct the sink with the builder. The default values for the optional configurations are shown above. Some of these values have been set as a result of [configuration on KDS](https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html). 
+
+You will always need to specify your serialization schema and logic for generating a [partition key](https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html#partition-key) from a record.
+
+Some or all of the records in a request may fail to be persisted by Kinesis Data Streams for a number of reasons. If `failOnError` is on, then a runtime exception will be raised. Otherwise those records will be requeued in the buffer for retry.
+
+The Kinesis Sink provides some metrics through Flink's [metrics system]({{< ref "docs/ops/metrics" >}}) to analyze the behavior of the connector. A list of all exposed metrics may be found [here]({{<ref "docs/ops/metrics#kinesis-sink">}}).
+
+The sink default maximum record size is 1MB and maximum batch size is 5MB in line with the Kinesis Data Streams maximums. The AWS documentation detailing these maximums may be found [here](https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html).
+
+### Kinesis Sinks and Fault Tolerance
+
+The sink is designed to participate in Flink's checkpointing to provide at-least-once processing guarantees. It does this by completing any in-flight requests while taking a checkpoint. This effectively assures all requests that were triggered before the checkpoint have been successfully delivered to Kinesis Data Streams, before proceeding to process more records.
+
+If Flink needs to restore from a checkpoint (or savepoint), data that has been written since that checkpoint will be written to Kinesis again, leading to duplicates in the stream. Moreover, the sink uses the `PutRecords` API call internally, which does not guarantee to maintain the order of events.
+
+### Backpressure
+
+Backpressure in the sink arises as the sink buffer fills up and writes to the sink 
+begins to exhibit blocking behaviour. More information on the rate restrictions of Kinesis Data Streams may be
+found at [Quotas and Limits](https://docs.aws.amazon.com/streams/latest/dev/service-sizes-and-limits.html).
+
+You generally reduce backpressure by increasing the size of the internal queue:
+
+{{< tabs "6df3b696-c2ca-4f44-bea0-96cf8275d61d" >}}
+{{< tab "Java" >}}
+```java
+KinesisStreamsSink<String> kdsSink =
+    KinesisStreamsSink.<String>builder()
+        ...
+        .setMaxBufferedRequests(10_000)
+        ...
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+kds_sink = KinesisStreamsSink.builder() \
+    .set_max_buffered_requests(10000) \
+    .build()
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+## Kinesis Producer
+
+{{< hint warning >}}
+The old Kinesis sink `org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer` is deprecated and may be removed with a future release of Flink, please use [Kinesis Sink]({{<ref "docs/connectors/datastream/kinesis#kinesis-streams-sink">}}) instead.
+{{< /hint >}}
+
+The new sink uses the [AWS v2 SDK for Java](https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/home.html) whereas the old sink uses the Kinesis Producer Library. Because of this, the new Kinesis sink does not support [aggregation](https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation).
+
+## Using Custom Kinesis Endpoints
+
+It is sometimes desirable to have Flink operate as a source or sink against a Kinesis VPC endpoint or a non-AWS
+Kinesis endpoint such as [Kinesalite](https://github.com/mhart/kinesalite); this is especially useful when performing
+functional testing of a Flink application. The AWS endpoint that would normally be inferred by the AWS region set in the
+Flink configuration must be overridden via a configuration property.
+
+To override the AWS endpoint, set the `AWSConfigConstants.AWS_ENDPOINT` and `AWSConfigConstants.AWS_REGION` properties. The region will be used to sign the endpoint URL.
+
+{{< tabs "bcadd466-8416-4d3c-a6a7-c46eee0cbd4a" >}}
+{{< tab "Java" >}}
+```java
+Properties config = new Properties();
+config.put(AWSConfigConstants.AWS_REGION, "us-east-1");
+config.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
+config.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
+config.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567");
+```
+{{< /tab >}}
+{{< tab "Scala" >}}
+```scala
+val config = new Properties()
+config.put(AWSConfigConstants.AWS_REGION, "us-east-1")
+config.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id")
+config.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key")
+config.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567")
+```
+{{< /tab >}}
+{{< tab "Python" >}}
+```python
+config = {
+    'aws.region': 'us-east-1',
+    'aws.credentials.provider.basic.accesskeyid': 'aws_access_key_id',
+    'aws.credentials.provider.basic.secretkey': 'aws_secret_access_key',
+    'aws.endpoint': 'http://localhost:4567'
+}
+```
+{{< /tab >}}
+{{< /tabs >}}
+
+{{< top >}}
diff --git a/docs/content/docs/connectors/table/dynamodb.md b/docs/content/docs/connectors/table/dynamodb.md
index 6301047..b02a4d1 100644
--- a/docs/content/docs/connectors/table/dynamodb.md
+++ b/docs/content/docs/connectors/table/dynamodb.md
@@ -35,7 +35,7 @@
 Dependencies
 ------------
 
-{{< sql_connector_download_table "dynamodb" 3.0.0 >}}
+{{< sql_connector_download_table dynamodb }}
 
 How to create a DynamoDB table
 -----------------------------------------
diff --git a/docs/content/docs/connectors/table/firehose.md b/docs/content/docs/connectors/table/firehose.md
new file mode 100644
index 0000000..5a135ba
--- /dev/null
+++ b/docs/content/docs/connectors/table/firehose.md
@@ -0,0 +1,313 @@
+---
+title: Firehose
+weight: 5
+type: docs
+aliases:
+- /dev/table/connectors/firehose.html
+---
+
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Amazon Kinesis Data Firehose SQL Connector
+
+{{< label "Sink: Batch" >}}
+{{< label "Sink: Streaming Append Mode" >}}
+
+The Kinesis Data Firehose connector allows for writing data into [Amazon Kinesis Data Firehose (KDF)](https://aws.amazon.com/kinesis/data-firehose/).
+
+Dependencies
+------------
+
+{{< sql_connector_download_table firehose >}}
+
+How to create a Kinesis Data Firehose table
+-----------------------------------------
+
+Follow the instructions from the [Amazon Kinesis Data Firehose Developer Guide](https://docs.aws.amazon.com/ses/latest/dg/event-publishing-kinesis-analytics-firehose-stream.html) to set up a Kinesis Data Firehose delivery stream.
+The following example shows how to create a table backed by a Kinesis Data Firehose delivery stream with minimum required options:
+
+```sql
+CREATE TABLE FirehoseTable (
+  `user_id` BIGINT,
+  `item_id` BIGINT,
+  `category_id` BIGINT,
+  `behavior` STRING
+)
+WITH (
+  'connector' = 'firehose',
+  'delivery-stream' = 'user_behavior',
+  'aws.region' = 'us-east-2',
+  'format' = 'csv'
+);
+```
+
+Connector Options
+-----------------
+
+<table class="table table-bordered">
+    <thead>
+    <tr>
+      <th class="text-left" style="width: 25%">Option</th>
+      <th class="text-center" style="width: 8%">Required</th>
+      <th class="text-center" style="width: 7%">Default</th>
+      <th class="text-center" style="width: 10%">Type</th>
+      <th class="text-center" style="width: 50%">Description</th>
+    </tr>
+    <tr>
+      <th colspan="5" class="text-left" style="width: 100%">Common Options</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>connector</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Specify what connector to use. For Kinesis Data Firehose use <code>'firehose'</code>.</td>
+    </tr>
+    <tr>
+      <td><h5>delivery-stream</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Name of the Kinesis Data Firehose delivery stream backing this table.</td>
+    </tr>
+    <tr>
+      <td><h5>format</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The format used to deserialize and serialize Kinesis Data Firehose records. See <a href="#data-type-mapping">Data Type Mapping</a> for details.</td>
+    </tr>
+    <tr>
+      <td><h5>aws.region</h5></td>
+      <td>required</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The AWS region where the delivery stream is defined. This option is required for <code>KinesisFirehoseSink</code> creation.</td>
+    </tr>
+    <tr>
+      <td><h5>aws.endpoint</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The AWS endpoint for Amazon Kinesis Data Firehose.</td>
+    </tr>
+    <tr>
+      <td><h5>aws.trust.all.certificates</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>If true accepts all SSL certificates.</td>
+    </tr>
+    </tbody>
+    <thead>
+    <tr>
+      <th colspan="5" class="text-left" style="width: 100%">Authentication Options</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>aws.credentials.provider</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">AUTO</td>
+      <td>String</td>
+      <td>A credentials provider to use when authenticating against the Kinesis endpoint. See <a href="#authentication">Authentication</a> for details.</td>
+    </tr>
+    <tr>
+   <td><h5>aws.credentials.basic.accesskeyid</h5></td>
+   <td>optional</td>
+   <td style="word-wrap: break-word;">(none)</td>
+   <td>String</td>
+   <td>The AWS access key ID to use when setting credentials provider type to BASIC.</td>
+    </tr>
+    <tr>
+   <td><h5>aws.credentials.basic.secretkey</h5></td>
+   <td>optional</td>
+   <td style="word-wrap: break-word;">(none)</td>
+   <td>String</td>
+   <td>The AWS secret key to use when setting credentials provider type to BASIC.</td>
+    </tr>
+    <tr>
+   <td><h5>aws.credentials.profile.path</h5></td>
+   <td>optional</td>
+   <td style="word-wrap: break-word;">(none)</td>
+   <td>String</td>
+   <td>Optional configuration for profile path if credential provider type is set to be PROFILE.</td>
+    </tr>
+    <tr>
+   <td><h5>aws.credentials.profile.name</h5></td>
+   <td>optional</td>
+   <td style="word-wrap: break-word;">(none)</td>
+   <td>String</td>
+   <td>Optional configuration for profile name if credential provider type is set to be PROFILE.</td>
+    </tr>
+    <tr>
+   <td><h5>aws.credentials.role.arn</h5></td>
+   <td>optional</td>
+   <td style="word-wrap: break-word;">(none)</td>
+   <td>String</td>
+   <td>The role ARN to use when credential provider type is set to ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td>
+    </tr>
+    <tr>
+   <td><h5>aws.credentials.role.sessionName</h5></td>
+   <td>optional</td>
+   <td style="word-wrap: break-word;">(none)</td>
+   <td>String</td>
+   <td>The role session name to use when credential provider type is set to ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td>
+    </tr>
+    <tr>
+   <td><h5>aws.credentials.role.externalId</h5></td>
+   <td>optional</td>
+   <td style="word-wrap: break-word;">(none)</td>
+   <td>String</td>
+   <td>The external ID to use when credential provider type is set to ASSUME_ROLE.</td>
+    </tr>
+    <tr>
+   <td><h5>aws.credentials.role.provider</h5></td>
+   <td>optional</td>
+   <td style="word-wrap: break-word;">(none)</td>
+   <td>String</td>
+   <td>The credentials provider that provides credentials for assuming the role when credential provider type is set to ASSUME_ROLE. Roles can be nested, so this value can again be set to ASSUME_ROLE</td>
+    </tr>
+    <tr>
+   <td><h5>aws.credentials.webIdentityToken.file</h5></td>
+   <td>optional</td>
+   <td style="word-wrap: break-word;">(none)</td>
+   <td>String</td>
+   <td>The absolute path to the web identity token file that should be used if provider type is set to WEB_IDENTITY_TOKEN.</td>
+    </tr>
+    </tbody>
+    <thead>
+    <tr>
+      <th colspan="5" class="text-left" style="width: 100%">Sink Options</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>sink.http-client.max-concurrency</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">10000</td>
+      <td>Integer</td>
+      <td>
+      Maximum number of allowed concurrent requests by <code>FirehoseAsyncClient</code> to be delivered to delivery stream.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.http-client.read-timeout</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">360000</td>
+      <td>Integer</td>
+      <td>
+        Maximum amount of time in ms for requests to be sent by <code>FirehoseAsyncClient</code> to delivery stream before failure.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.http-client.protocol.version</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">HTTP2</td>
+      <td>String</td>
+      <td>Http version used by <code>FirehoseAsyncClient</code>.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.batch.max-size</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">500</td>
+      <td>Integer</td>
+      <td>Maximum batch size of elements to be passed to <code>FirehoseAsyncClient</code> to be written downstream to delivery stream.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.requests.max-inflight</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">16</td>
+      <td>Integer</td>
+      <td>Request threshold for uncompleted requests by <code>FirehoseAsyncClient</code>before blocking new write requests.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.requests.max-buffered</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">10000</td>
+      <td>String</td>
+      <td>request buffer threshold by <code>FirehoseAsyncClient</code> before blocking new write requests.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.flush-buffer.size</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">5242880</td>
+      <td>Long</td>
+      <td>Threshold value in bytes for writer buffer in <code>FirehoseAsyncClient</code> before flushing.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.flush-buffer.timeout</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">5000</td>
+      <td>Long</td>
+      <td>Threshold time in ms for an element to be in a buffer of <code>FirehoseAsyncClient</code> before flushing.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.fail-on-error</h5></td>
+      <td>optional</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>Flag used for retrying failed requests. If set any request failure will not be retried and will fail the job.</td>
+    </tr>
+    </tbody>
+</table>
+
+## Authorization
+
+Make sure to [create an appropriate IAM policy](https://docs.aws.amazon.com/firehose/latest/dev/controlling-access.html) to allow reading writing to the Kinesis Data Firehose delivery stream.
+
+## Authentication
+
+Depending on your deployment you would choose a different Credentials Provider to allow access to Kinesis Data Firehose.
+By default, the `AUTO` Credentials Provider is used.
+If the access key ID and secret key are set in the deployment configuration, this results in using the `BASIC` provider.
+
+A specific [AWSCredentialsProvider](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/index.html?com/amazonaws/auth/AWSCredentialsProvider.html) can be **optionally** set using the `aws.credentials.provider` setting.
+Supported values are:
+
+- `AUTO` - Use the default AWS Credentials Provider chain that searches for credentials in the following order: `ENV_VARS`, `SYS_PROPS`, `WEB_IDENTITY_TOKEN`, `PROFILE`, and EC2/ECS credentials provider.
+- `BASIC` - Use access key ID and secret key supplied as configuration.
+- `ENV_VAR` - Use `AWS_ACCESS_KEY_ID` & `AWS_SECRET_ACCESS_KEY` environment variables.
+- `SYS_PROP` - Use Java system properties `aws.accessKeyId` and `aws.secretKey`.
+- `PROFILE` - Use an AWS credentials profile to create the AWS credentials.
+- `ASSUME_ROLE` - Create AWS credentials by assuming a role. The credentials for assuming the role must be supplied.
+- `WEB_IDENTITY_TOKEN` - Create AWS credentials by assuming a role using Web Identity Token.
+
+## Data Type Mapping
+
+Kinesis Data Firehose stores records as Base64-encoded binary data objects, so it doesn't have a notion of internal record structure.
+Instead, Kinesis Data Firehose records are deserialized and serialized by formats, e.g. 'avro', 'csv', or 'json'.
+To determine the data type of the messages in your Kinesis Data Firehose backed tables, pick a suitable Flink format with the `format` keyword.
+Please refer to the [Formats]({{< ref "docs/connectors/table/formats/overview" >}}) pages for more details.
+
+## Notice
+
+The current implementation for the Kinesis Data Firehose SQL connector only supports Kinesis Data Firehose backed sinks and doesn't provide an implementation for source queries.
+Queries similar to:
+```sql
+SELECT * FROM FirehoseTable;
+```
+should result in an error similar to
+```
+Connector firehose can only be used as a sink. It cannot be used as a source.
+```
+{{< top >}}
diff --git a/docs/content/docs/connectors/table/kinesis.md b/docs/content/docs/connectors/table/kinesis.md
new file mode 100644
index 0000000..44d42c7
--- /dev/null
+++ b/docs/content/docs/connectors/table/kinesis.md
@@ -0,0 +1,916 @@
+---
+title: Kinesis
+weight: 5
+type: docs
+aliases:
+- /dev/table/connectors/kinesis.html
+---
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+
+# Amazon Kinesis Data Streams SQL Connector
+
+{{< label "Scan Source: Unbounded" >}}
+{{< label "Sink: Batch" >}}
+{{< label "Sink: Streaming Append Mode" >}}
+
+The Kinesis connector allows for reading data from and writing data into [Amazon Kinesis Data Streams (KDS)](https://aws.amazon.com/kinesis/data-streams/).
+
+Dependencies
+------------
+
+{{< sql_connector_download_table kinesis >}}
+
+The Kinesis connector is not part of the binary distribution.
+See how to link with it for cluster execution [here]({{< ref "docs/dev/configuration/overview" >}}).
+
+How to create a Kinesis data stream table
+-----------------------------------------
+
+Follow the instructions from the [Amazon KDS Developer Guide](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html) to set up a Kinesis stream.
+The following example shows how to create a table backed by a Kinesis data stream:
+
+```sql
+CREATE TABLE KinesisTable (
+  `user_id` BIGINT,
+  `item_id` BIGINT,
+  `category_id` BIGINT,
+  `behavior` STRING,
+  `ts` TIMESTAMP(3)
+)
+PARTITIONED BY (user_id, item_id)
+WITH (
+  'connector' = 'kinesis',
+  'stream' = 'user_behavior',
+  'aws.region' = 'us-east-2',
+  'scan.stream.initpos' = 'LATEST',
+  'format' = 'csv'
+);
+```
+
+Available Metadata
+------------------
+
+The following metadata can be exposed as read-only (`VIRTUAL`) columns in a table definition.
+
+<table class="table table-bordered">
+    <thead>
+    <tr>
+      <th class="text-left" style="width: 25%">Key</th>
+      <th class="text-center" style="width: 45%">Data Type</th>
+      <th class="text-center" style="width: 35%">Description</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><code><a href="https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Record.html#Streams-Type-Record-ApproximateArrivalTimestamp">timestamp</a></code></td>
+      <td><code>TIMESTAMP_LTZ(3) NOT NULL</code></td>
+      <td>The approximate time when the record was inserted into the stream.</td>
+    </tr>
+    <tr>
+      <td><code><a href="https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Shard.html#Streams-Type-Shard-ShardId">shard-id</a></code></td>
+      <td><code>VARCHAR(128) NOT NULL</code></td>
+      <td>The unique identifier of the shard within the stream from which the record was read.</td>
+    </tr>
+    <tr>
+      <td><code><a href="https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Record.html#Streams-Type-Record-SequenceNumber">sequence-number</a></code></td>
+      <td><code>VARCHAR(128) NOT NULL</code></td>
+      <td>The unique identifier of the record within its shard.</td>
+    </tr>
+    </tbody>
+</table>
+
+The extended `CREATE TABLE` example demonstrates the syntax for exposing these metadata fields:
+
+```sql
+CREATE TABLE KinesisTable (
+  `user_id` BIGINT,
+  `item_id` BIGINT,
+  `category_id` BIGINT,
+  `behavior` STRING,
+  `ts` TIMESTAMP(3),
+  `arrival_time` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL,
+  `shard_id` VARCHAR(128) NOT NULL METADATA FROM 'shard-id' VIRTUAL,
+  `sequence_number` VARCHAR(128) NOT NULL METADATA FROM 'sequence-number' VIRTUAL
+)
+PARTITIONED BY (user_id, item_id)
+WITH (
+  'connector' = 'kinesis',
+  'stream' = 'user_behavior',
+  'aws.region' = 'us-east-2',
+  'scan.stream.initpos' = 'LATEST',
+  'format' = 'csv'
+);
+```
+
+
+Connector Options
+-----------------
+
+<table class="table table-bordered">
+    <thead>
+    <tr>
+        <th class="text-left" style="width: 25%">Option</th>
+        <th class="text-center" style="width: 8%">Required</th>
+        <th class="text-center" style="width: 8%">Forwarded</th>
+        <th class="text-center" style="width: 7%">Default</th>
+        <th class="text-center" style="width: 10%">Type</th>
+        <th class="text-center" style="width: 42%">Description</th>
+    </tr>
+    <tr>
+      <th colspan="6" class="text-left" style="width: 100%">Common Options</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>connector</h5></td>
+      <td>required</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Specify what connector to use. For Kinesis use <code>'kinesis'</code>.</td>
+    </tr>
+    <tr>
+      <td><h5>stream</h5></td>
+      <td>required</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>Name of the Kinesis data stream backing this table.</td>
+    </tr>
+    <tr>
+      <td><h5>format</h5></td>
+      <td>required</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The format used to deserialize and serialize Kinesis data stream records. See <a href="#data-type-mapping">Data Type Mapping</a> for details.</td>
+    </tr>
+    <tr>
+      <td><h5>aws.region</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The AWS region where the stream is defined. Either this or <code>aws.endpoint</code> are required.</td>
+    </tr>
+    <tr>
+      <td><h5>aws.endpoint</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The AWS endpoint for Kinesis (derived from the AWS region setting if not set). Either this or <code>aws.region</code> are required.</td>
+    </tr>
+    <tr>
+      <td><h5>aws.trust.all.certificates</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>If true accepts all SSL certificates.</td>
+    </tr>
+    </tbody>
+    <thead>
+    <tr>
+      <th colspan="6" class="text-left" style="width: 100%">Authentication Options</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>aws.credentials.provider</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">AUTO</td>
+      <td>String</td>
+      <td>A credentials provider to use when authenticating against the Kinesis endpoint. See <a href="#authentication">Authentication</a> for details.</td>
+    </tr>
+    <tr>
+	  <td><h5>aws.credentials.basic.accesskeyid</h5></td>
+	  <td>optional</td>
+      <td>no</td>
+	  <td style="word-wrap: break-word;">(none)</td>
+	  <td>String</td>
+	  <td>The AWS access key ID to use when setting credentials provider type to BASIC.</td>
+    </tr>
+    <tr>
+	  <td><h5>aws.credentials.basic.secretkey</h5></td>
+	  <td>optional</td>
+      <td>no</td>
+	  <td style="word-wrap: break-word;">(none)</td>
+	  <td>String</td>
+	  <td>The AWS secret key to use when setting credentials provider type to BASIC.</td>
+    </tr>
+    <tr>
+	  <td><h5>aws.credentials.profile.path</h5></td>
+	  <td>optional</td>
+      <td>no</td>
+	  <td style="word-wrap: break-word;">(none)</td>
+	  <td>String</td>
+	  <td>Optional configuration for profile path if credential provider type is set to be PROFILE.</td>
+    </tr>
+    <tr>
+	  <td><h5>aws.credentials.profile.name</h5></td>
+	  <td>optional</td>
+      <td>no</td>
+	  <td style="word-wrap: break-word;">(none)</td>
+	  <td>String</td>
+	  <td>Optional configuration for profile name if credential provider type is set to be PROFILE.</td>
+    </tr>
+    <tr>
+	  <td><h5>aws.credentials.role.arn</h5></td>
+	  <td>optional</td>
+      <td>no</td>
+	  <td style="word-wrap: break-word;">(none)</td>
+	  <td>String</td>
+	  <td>The role ARN to use when credential provider type is set to ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td>
+    </tr>
+    <tr>
+	  <td><h5>aws.credentials.role.sessionName</h5></td>
+	  <td>optional</td>
+      <td>no</td>
+	  <td style="word-wrap: break-word;">(none)</td>
+	  <td>String</td>
+	  <td>The role session name to use when credential provider type is set to ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td>
+    </tr>
+    <tr>
+	  <td><h5>aws.credentials.role.externalId</h5></td>
+	  <td>optional</td>
+      <td>no</td>
+	  <td style="word-wrap: break-word;">(none)</td>
+	  <td>String</td>
+	  <td>The external ID to use when credential provider type is set to ASSUME_ROLE.</td>
+    </tr>
+    <tr>
+	  <td><h5>aws.credentials.role.provider</h5></td>
+	  <td>optional</td>
+      <td>no</td>
+	  <td style="word-wrap: break-word;">(none)</td>
+	  <td>String</td>
+	  <td>The credentials provider that provides credentials for assuming the role when credential provider type is set to ASSUME_ROLE. Roles can be nested, so this value can again be set to ASSUME_ROLE</td>
+    </tr>
+    <tr>
+	  <td><h5>aws.credentials.webIdentityToken.file</h5></td>
+	  <td>optional</td>
+      <td>no</td>
+	  <td style="word-wrap: break-word;">(none)</td>
+	  <td>String</td>
+	  <td>The absolute path to the web identity token file that should be used if provider type is set to WEB_IDENTITY_TOKEN.</td>
+    </tr>
+    </tbody>
+    <thead>
+    <tr>
+      <th colspan="6" class="text-left" style="width: 100%">Source Options</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>scan.stream.initpos</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">LATEST</td>
+      <td>String</td>
+      <td>Initial position to be used when reading from the table. See <a href="#start-reading-position">Start Reading Position</a> for details.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.initpos-timestamp</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The initial timestamp to start reading Kinesis stream from (when <code>scan.stream.initpos</code> is AT_TIMESTAMP). See <a href="#start-reading-position">Start Reading Position</a> for details.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.initpos-timestamp-format</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">yyyy-MM-dd'T'HH:mm:ss.SSSXXX</td>
+      <td>String</td>
+      <td>The date format of initial timestamp to start reading Kinesis stream from (when <code>scan.stream.initpos</code> is AT_TIMESTAMP). See <a href="#start-reading-position">Start Reading Position</a> for details.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.recordpublisher</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">POLLING</td>
+      <td>String</td>
+      <td>The <code>RecordPublisher</code> type to use for sources. See <a href="#enhanced-fan-out">Enhanced Fan-Out</a> for details.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.efo.consumername</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The name of the EFO consumer to register with KDS. See <a href="#enhanced-fan-out">Enhanced Fan-Out</a> for details.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.efo.registration</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">LAZY</td>
+      <td>String</td>
+      <td>Determine how and when consumer de-/registration is performed (LAZY|EAGER|NONE). See <a href="#enhanced-fan-out">Enhanced Fan-Out</a> for details.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.efo.consumerarn</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td>String</td>
+      <td>The prefix of consumer ARN for a given stream. See <a href="#enhanced-fan-out">Enhanced Fan-Out</a> for details.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.efo.http-client.max-concurrency</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">10000</td>
+      <td>Integer</td>
+      <td>Maximum number of allowed concurrent requests for the EFO client. See <a href="#enhanced-fan-out">Enhanced Fan-Out</a> for details.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.describe.maxretries</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">50</td>
+      <td>Integer</td>
+      <td>The maximum number of <code>describeStream</code> attempts if we get a recoverable exception.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.describe.backoff.base</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">2000</td>
+      <td>Long</td>
+      <td>The base backoff time (in milliseconds) between each <code>describeStream</code> attempt (for consuming from DynamoDB streams).</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.describe.backoff.max</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">5000</td>
+      <td>Long</td>
+      <td>The maximum backoff time (in milliseconds)  between each <code>describeStream</code> attempt (for consuming from DynamoDB streams).</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.describe.backoff.expconst</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">1.5</td>
+      <td>Double</td>
+      <td>The power constant for exponential backoff between each <code>describeStream</code> attempt (for consuming from DynamoDB streams).</td>
+    </tr>
+    <tr>
+      <td><h5>scan.list.shards.maxretries</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">10</td>
+      <td>Integer</td>
+      <td>The maximum number of <code>listShards</code> attempts if we get a recoverable exception.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.list.shards.backoff.base</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">1000</td>
+      <td>Long</td>
+      <td>The base backoff time (in milliseconds) between each <code>listShards</code> attempt.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.list.shards.backoff.max</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">5000</td>
+      <td>Long</td>
+      <td>The maximum backoff time (in milliseconds) between each <code>listShards</code> attempt.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.list.shards.backoff.expconst</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">1.5</td>
+      <td>Double</td>
+      <td>The power constant for exponential backoff between each <code>listShards</code> attempt.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.describestreamconsumer.maxretries</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">50</td>
+      <td>Integer</td>
+      <td>The maximum number of <code>describeStreamConsumer</code> attempts if we get a recoverable exception.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.describestreamconsumer.backoff.base</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">2000</td>
+      <td>Long</td>
+      <td>The base backoff time (in milliseconds) between each <code>describeStreamConsumer</code> attempt.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.describestreamconsumer.backoff.max</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">5000</td>
+      <td>Long</td>
+      <td>The maximum backoff time (in milliseconds) between each <code>describeStreamConsumer</code> attempt.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.describestreamconsumer.backoff.expconst</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">1.5</td>
+      <td>Double</td>
+      <td>The power constant for exponential backoff between each <code>describeStreamConsumer</code> attempt.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.registerstreamconsumer.maxretries</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">10</td>
+      <td>Integer</td>
+      <td>The maximum number of <code>registerStream</code> attempts if we get a recoverable exception.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.registerstreamconsumer.timeout</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">60</td>
+      <td>Integer</td>
+      <td>The maximum time in seconds to wait for a stream consumer to become active before giving up.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.registerstreamconsumer.backoff.base</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">500</td>
+      <td>Long</td>
+      <td>The base backoff time (in milliseconds) between each <code>registerStream</code> attempt.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.registerstreamconsumer.backoff.max</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">2000</td>
+      <td>Long</td>
+      <td>The maximum backoff time (in milliseconds) between each <code>registerStream</code> attempt.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.registerstreamconsumer.backoff.expconst</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">1.5</td>
+      <td>Double</td>
+      <td>The power constant for exponential backoff between each <code>registerStream</code> attempt.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.deregisterstreamconsumer.maxretries</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">10</td>
+      <td>Integer</td>
+      <td>The maximum number of <code>deregisterStream</code> attempts if we get a recoverable exception.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.deregisterstreamconsumer.timeout</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">60</td>
+      <td>Integer</td>
+      <td>The maximum time in seconds to wait for a stream consumer to deregister before giving up.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.deregisterstreamconsumer.backoff.base</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">500</td>
+      <td>Long</td>
+      <td>The base backoff time (in milliseconds) between each <code>deregisterStream</code> attempt.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.deregisterstreamconsumer.backoff.max</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">2000</td>
+      <td>Long</td>
+      <td>The maximum backoff time (in milliseconds) between each <code>deregisterStream</code> attempt.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.stream.deregisterstreamconsumer.backoff.expconst</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">1.5</td>
+      <td>Double</td>
+      <td>The power constant for exponential backoff between each <code>deregisterStream</code> attempt.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.shard.subscribetoshard.maxretries</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">10</td>
+      <td>Integer</td>
+      <td>The maximum number of <code>subscribeToShard</code> attempts if we get a recoverable exception.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.shard.subscribetoshard.backoff.base</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">1000</td>
+      <td>Long</td>
+      <td>The base backoff time (in milliseconds) between each <code>subscribeToShard</code> attempt.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.shard.subscribetoshard.backoff.max</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">2000</td>
+      <td>Long</td>
+      <td>The maximum backoff time (in milliseconds) between each <code>subscribeToShard</code> attempt.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.shard.subscribetoshard.backoff.expconst</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">1.5</td>
+      <td>Double</td>
+      <td>The power constant for exponential backoff between each <code>subscribeToShard</code> attempt.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.shard.getrecords.maxrecordcount</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">10000</td>
+      <td>Integer</td>
+      <td>The maximum number of records to try to get each time we fetch records from a AWS Kinesis shard.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.shard.getrecords.maxretries</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">3</td>
+      <td>Integer</td>
+      <td>The maximum number of <code>getRecords</code> attempts if we get a recoverable exception.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.shard.getrecords.backoff.base</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">300</td>
+      <td>Long</td>
+      <td>The base backoff time (in milliseconds) between <code>getRecords</code> attempts if we get a ProvisionedThroughputExceededException.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.shard.getrecords.backoff.max</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">1000</td>
+      <td>Long</td>
+      <td>The maximum backoff time (in milliseconds) between <code>getRecords</code> attempts if we get a ProvisionedThroughputExceededException.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.shard.getrecords.backoff.expconst</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">1.5</td>
+      <td>Double</td>
+      <td>The power constant for exponential backoff between each <code>getRecords</code> attempt.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.shard.getrecords.intervalmillis</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">200</td>
+      <td>Long</td>
+      <td>The interval (in milliseconds) between each <code>getRecords</code> request to a AWS Kinesis shard in milliseconds.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.shard.getiterator.maxretries</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">3</td>
+      <td>Integer</td>
+      <td>The maximum number of <code>getShardIterator</code> attempts if we get ProvisionedThroughputExceededException.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.shard.getiterator.backoff.base</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">300</td>
+      <td>Long</td>
+      <td>The base backoff time (in milliseconds) between <code>getShardIterator</code> attempts if we get a ProvisionedThroughputExceededException.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.shard.getiterator.backoff.max</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">1000</td>
+      <td>Long</td>
+      <td>The maximum backoff time (in milliseconds) between <code>getShardIterator</code> attempts if we get a ProvisionedThroughputExceededException.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.shard.getiterator.backoff.expconst</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">1.5</td>
+      <td>Double</td>
+      <td>The power constant for exponential backoff between each <code>getShardIterator</code> attempt.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.shard.discovery.intervalmillis</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">10000</td>
+      <td>Integer</td>
+      <td>The interval between each attempt to discover new shards.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.shard.adaptivereads</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>The config to turn on adaptive reads from a shard. See the <code>AdaptivePollingRecordPublisher</code> documentation for details.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.shard.idle.interval</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">-1</td>
+      <td>Long</td>
+      <td>The interval (in milliseconds) after which to consider a shard idle for purposes of watermark generation. A positive value will allow the watermark to progress even when some shards don't receive new records.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.watermark.sync.interval</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">30000</td>
+      <td>Long</td>
+      <td>The interval (in milliseconds) for periodically synchronizing the shared watermark state.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.watermark.lookahead.millis</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">0</td>
+      <td>Long</td>
+      <td>The maximum delta (in milliseconds) allowed for the reader to advance ahead of the shared global watermark.</td>
+    </tr>
+    <tr>
+      <td><h5>scan.watermark.sync.queue.capacity</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">100</td>
+      <td>Integer</td>
+      <td>The maximum number of records that will be buffered before suspending consumption of a shard.</td>
+    </tr>
+    </tbody>
+    <thead>
+    <tr>
+      <th colspan="6" class="text-left" style="width: 100%">Sink Options</th>
+    </tr>
+    </thead>
+    <tbody>
+    <tr>
+      <td><h5>sink.partitioner</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">random or row-based</td>
+      <td>String</td>
+      <td>Optional output partitioning from Flink's partitions into Kinesis shards. See <a href="#sink-partitioning">Sink Partitioning</a> for details.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.partitioner-field-delimiter</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">|</td>
+      <td>String</td>
+      <td>Optional field delimiter for a fields-based partitioner derived from a PARTITION BY clause. See <a href="#sink-partitioning">Sink Partitioning</a> for details.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.producer.*</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">(none)</td>
+      <td></td>
+      <td>
+        Deprecated options previously used by the legacy connector.
+        Options with equivalant alternatives in <code>KinesisStreamsSink</code> are matched 
+        to their respective properties. Unsupported options are logged out to user as warnings.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.http-client.max-concurrency</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">10000</td>
+      <td>Integer</td>
+      <td>
+      Maximum number of allowed concurrent requests by <code>KinesisAsyncClient</code>.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.http-client.read-timeout</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">360000</td>
+      <td>Integer</td>
+      <td>
+        Maximum amount of time in ms for requests to be sent by <code>KinesisAsyncClient</code>.
+      </td>
+    </tr>
+    <tr>
+      <td><h5>sink.http-client.protocol.version</h5></td>
+      <td>optional</td>
+      <td>no</td>
+      <td style="word-wrap: break-word;">HTTP2</td>
+      <td>String</td>
+      <td>Http version used by Kinesis Client.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.batch.max-size</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">500</td>
+      <td>Integer</td>
+      <td>Maximum batch size of elements to be passed to <code>KinesisAsyncClient</code> to be written downstream.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.requests.max-inflight</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">16</td>
+      <td>Integer</td>
+      <td>Request threshold for uncompleted requests by <code>KinesisAsyncClient</code>before blocking new write requests and applying backpressure.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.requests.max-buffered</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">10000</td>
+      <td>String</td>
+      <td>Request buffer threshold for buffered requests by <code>KinesisAsyncClient</code> before blocking new write requests and applying backpressure.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.flush-buffer.size</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">5242880</td>
+      <td>Long</td>
+      <td>Threshold value in bytes for writer buffer in <code>KinesisAsyncClient</code> before flushing.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.flush-buffer.timeout</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">5000</td>
+      <td>Long</td>
+      <td>Threshold time in milliseconds for an element to be in a buffer of<code>KinesisAsyncClient</code> before flushing.</td>
+    </tr>
+    <tr>
+      <td><h5>sink.fail-on-error</h5></td>
+      <td>optional</td>
+      <td>yes</td>
+      <td style="word-wrap: break-word;">false</td>
+      <td>Boolean</td>
+      <td>Flag used for retrying failed requests. If set any request failure will not be retried and will fail the job.</td>
+    </tr>
+    </tbody>
+</table>
+
+Features
+--------
+
+### Authorization
+
+Make sure to [create an appropriate IAM policy](https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html) to allow reading from / writing to the Kinesis data streams.
+
+### Authentication
+
+Depending on your deployment you would choose a different Credentials Provider to allow access to Kinesis.
+By default, the `AUTO` Credentials Provider is used.
+If the access key ID and secret key are set in the deployment configuration, this results in using the `BASIC` provider.
+
+A specific [AWSCredentialsProvider](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/index.html?com/amazonaws/auth/AWSCredentialsProvider.html) can be **optionally** set using the `aws.credentials.provider` setting.
+Supported values are:
+
+* `AUTO` - Use the default AWS Credentials Provider chain that searches for credentials in the following order: `ENV_VARS`, `SYS_PROPS`, `WEB_IDENTITY_TOKEN`, `PROFILE`, and EC2/ECS credentials provider.
+* `BASIC` - Use access key ID and secret key supplied as configuration.
+* `ENV_VAR` - Use `AWS_ACCESS_KEY_ID` & `AWS_SECRET_ACCESS_KEY` environment variables.
+* `SYS_PROP` - Use Java system properties `aws.accessKeyId` and `aws.secretKey`.
+* `PROFILE` - Use an AWS credentials profile to create the AWS credentials.
+* `ASSUME_ROLE` - Create AWS credentials by assuming a role. The credentials for assuming the role must be supplied.
+* `WEB_IDENTITY_TOKEN` - Create AWS credentials by assuming a role using Web Identity Token.
+
+### Start Reading Position
+
+You can configure table sources to start reading a table-backing Kinesis data stream from a specific position through the `scan.stream.initpos` option.
+Available values are:
+
+* `LATEST`: read shards starting from the latest record.
+* `TRIM_HORIZON`: read shards starting from the earliest record possible (data may be trimmed by Kinesis depending on the current retention settings of the backing stream).
+* `AT_TIMESTAMP`: read shards starting from a specified timestamp. The timestamp value should be specified through the `scan.stream.initpos-timestamp` in one of the following formats:
+   * A non-negative double value representing the number of seconds that has elapsed since the Unix epoch (for example, `1459799926.480`).
+   * A value conforming to a user-defined `SimpleDateFormat` specified at `scan.stream.initpos-timestamp-format`.
+     If a user does not define a format, the default pattern will be `yyyy-MM-dd'T'HH:mm:ss.SSSXXX`.
+     For example, timestamp value is `2016-04-04` and user-defined format is `yyyy-MM-dd`, or timestamp value is `2016-04-04T19:58:46.480-00:00` and a user-defined format is not provided.
+
+### Sink Partitioning
+
+Kinesis data streams consist of one or more shards, and the `sink.partitioner` option allows you to control how records written into a multi-shard Kinesis-backed table will be partitioned between its shards.
+Valid values are:
+
+* `fixed`: Kinesis `PartitionKey` values derived from the Flink subtask index, so each Flink partition ends up in at most one Kinesis partition (assuming that no re-sharding takes place at runtime).
+* `random`: Kinesis `PartitionKey` values are assigned randomly. This is the default value for tables not defined with a `PARTITION BY` clause.
+* Custom `FixedKinesisPartitioner` subclass: e.g. `'org.mycompany.MyPartitioner'`.
+
+{{< hint info >}}
+Records written into tables defining a `PARTITION BY` clause will always be partitioned based on a concatenated projection of the `PARTITION BY` fields.
+In this case, the `sink.partitioner` field cannot be used to modify this behavior (attempting to do this results in a configuration error).
+You can, however, use the `sink.partitioner-field-delimiter` option to set the delimiter of field values in the concatenated [PartitionKey](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#Streams-PutRecord-request-PartitionKey) string (an empty string is also a valid delimiter).
+{{< /hint >}}
+
+### Enhanced Fan-Out
+
+[Enhanced Fan-Out (EFO)](https://aws.amazon.com/blogs/aws/kds-enhanced-fanout/) increases the maximum number of concurrent consumers per Kinesis data stream.
+Without EFO, all concurrent Kinesis consumers share a single read quota per shard.
+Using EFO, each consumer gets a distinct dedicated read quota per shard, allowing read throughput to scale with the number of consumers.
+
+<span class="label label-info">Note</span> Using EFO will [incur additional cost](https://aws.amazon.com/kinesis/data-streams/pricing/).
+
+You can enable and configure EFO with the following properties:
+
+* `scan.stream.recordpublisher`: Determines whether to use `EFO` or `POLLING`.
+* `scan.stream.efo.consumername`: A name to identify the consumer when the above value is `EFO`.
+* `scan.stream.efo.registration`: Strategy for (de-)registration  of `EFO` consumers with the name given by the `scan.stream.efo.consumername` value. Valid strategies are:
+  * `LAZY` (default): Stream consumers are registered when the Flink job starts running.
+    If the stream consumer already exists, it will be reused.
+    This is the preferred strategy for the majority of applications.
+    However, jobs with parallelism greater than 1 will result in tasks competing to register and acquire the stream consumer ARN.
+    For jobs with very large parallelism this can result in an increased start-up time.
+    The describe operation has a limit of 20 [transactions per second](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStreamConsumer.html),
+    this means application startup time will increase by roughly `parallelism/20 seconds`.
+  * `EAGER`: Stream consumers are registered in the `FlinkKinesisConsumer` constructor.
+    If the stream consumer already exists, it will be reused.
+    This will result in registration occurring when the job is constructed,
+    either on the Flink Job Manager or client environment submitting the job.
+    Using this strategy results in a single thread registering and retrieving the stream consumer ARN,
+    reducing startup time over `LAZY` (with large parallelism).
+    However, consider that the client environment will require access to the AWS services.
+  * `NONE`: Stream consumer registration is not performed by `FlinkKinesisConsumer`.
+    Registration must be performed externally using the [AWS CLI or SDK](https://aws.amazon.com/tools/)
+    to invoke [RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html).
+    Stream consumer ARNs should be provided to the job via the consumer configuration.
+* `scan.stream.efo.consumerarn.<stream-name>`: ARNs identifying externally registered ARN-consumers (substitute `<stream-name>` with the name of your stream in the parameter name).
+   Use this if you choose to use `NONE` as a `scan.stream.efo.registration` strategy.
+
+<span class="label label-info">Note</span> For a given Kinesis data stream, each EFO consumer must have a unique name.
+However, consumer names do not have to be unique across data streams.
+Reusing a consumer name will result in existing subscriptions being terminated.
+
+<span class="label label-info">Note</span> With the `LAZY` strategy, stream consumers are de-registered when the job is shutdown gracefully.
+In the event that a job terminates within executing the shutdown hooks, stream consumers will remain active.
+In this situation the stream consumers will be gracefully reused when the application restarts.
+With the `NONE` and `EAGER` strategies, stream consumer de-registration is not performed by `FlinkKinesisConsumer`.
+
+# Data Type Mapping
+
+
+Kinesis stores records as Base64-encoded binary data objects, so it doesn't have a notion of internal record structure.
+Instead, Kinesis records are deserialized and serialized by formats, e.g. 'avro', 'csv', or 'json'.
+To determine the data type of the messages in your Kinesis-backed tables, pick a suitable Flink format with the `format` keyword.
+Please refer to the [Formats]({{< ref "docs/connectors/table/formats/overview" >}}) pages for more details.
+
+# Updates in 1.15
+
+Kinesis table API connector sink data stream depends on <code>FlinkKinesisProducer</code> till 1.14, with the introduction of <code>KinesisStreamsSink</code> in 1.15 kinesis table API sink connector has been migrated to the new <code>KinesisStreamsSink</code>. Authentication options have been migrated identically while sink configuration options are now compatible with <code>KinesisStreamsSink</code>. 
+
+Options configuring <code>FlinkKinesisProducer</code> are now deprecated with fallback support for common configuration options with <code>KinesisStreamsSink</code>. 
+
+<code>KinesisStreamsSink</code> uses <code>KinesisAsyncClient</code> to send records to kinesis, 
+which doesn't support aggregation. In consequence, table options configuring aggregation in the deprecated <code>FlinkKinesisProducer</code> 
+are now deprecated and will be ignored, this includes <code>sink.producer.aggregation-enabled</code> and
+<code>sink.producer.aggregation-count</code>.
+
+<span class="label label-info">Note</span> Migrating applications with deprecated options will result in the incompatible deprecated options being ignored and warned to users.
+
+Kinesis table API source connector still depends on <code>FlinkKinesisConsumer</code> with no change in configuration options.
+
+
+{{< top >}}