| --- |
| title: Kinesis |
| weight: 5 |
| type: docs |
| aliases: |
| - /zh/dev/table/connectors/kinesis.html |
| --- |
| <!-- |
| Licensed to the Apache Software Foundation (ASF) under one |
| or more contributor license agreements. See the NOTICE file |
| distributed with this work for additional information |
| regarding copyright ownership. The ASF licenses this file |
| to you under the Apache License, Version 2.0 (the |
| "License"); you may not use this file except in compliance |
| with the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, |
| software distributed under the License is distributed on an |
| "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| KIND, either express or implied. See the License for the |
| specific language governing permissions and limitations |
| under the License. |
| --> |
| |
| # Amazon Kinesis Data Streams SQL Connector |
| |
| {{< label "Scan Source: Unbounded" >}} |
| {{< label "Sink: Batch" >}} |
| {{< label "Sink: Streaming Append Mode" >}} |
| |
| The Kinesis connector allows for reading data from and writing data into [Amazon Kinesis Data Streams (KDS)](https://aws.amazon.com/kinesis/data-streams/). |
| |
| Dependencies |
| ------------ |
| |
| {{< sql_connector_download_table kinesis 4.0.0 >}} |
| |
| Kinesis 连接器目前并不包含在 Flink 的二进制发行版中,请查阅[这里]({{< ref "docs/dev/configuration/overview" >}})了解如何在集群运行中引用 Kinesis 连接器。 |
| |
| How to create a Kinesis data stream table |
| ----------------------------------------- |
| |
| Follow the instructions from the [Amazon KDS Developer Guide](https://docs.aws.amazon.com/streams/latest/dev/learning-kinesis-module-one-create-stream.html) to set up a Kinesis stream. |
| The following example shows how to create a table backed by a Kinesis data stream: |
| |
| ```sql |
| CREATE TABLE KinesisTable ( |
| `user_id` BIGINT, |
| `item_id` BIGINT, |
| `category_id` BIGINT, |
| `behavior` STRING, |
| `ts` TIMESTAMP(3) |
| ) |
| PARTITIONED BY (user_id, item_id) |
| WITH ( |
| 'connector' = 'kinesis', |
| 'stream' = 'user_behavior', |
| 'aws.region' = 'us-east-2', |
| 'scan.stream.initpos' = 'LATEST', |
| 'format' = 'csv' |
| ); |
| ``` |
| |
| Available Metadata |
| ------------------ |
| |
| The following metadata can be exposed as read-only (`VIRTUAL`) columns in a table definition. |
| |
| <table class="table table-bordered"> |
| <thead> |
| <tr> |
| <th class="text-left" style="width: 25%">Key</th> |
| <th class="text-center" style="width: 45%">Data Type</th> |
| <th class="text-center" style="width: 35%">Description</th> |
| </tr> |
| </thead> |
| <tbody> |
| <tr> |
| <td><code><a href="https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Record.html#Streams-Type-Record-ApproximateArrivalTimestamp">timestamp</a></code></td> |
| <td><code>TIMESTAMP_LTZ(3) NOT NULL</code></td> |
| <td>The approximate time when the record was inserted into the stream.</td> |
| </tr> |
| <tr> |
| <td><code><a href="https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Shard.html#Streams-Type-Shard-ShardId">shard-id</a></code></td> |
| <td><code>VARCHAR(128) NOT NULL</code></td> |
| <td>The unique identifier of the shard within the stream from which the record was read.</td> |
| </tr> |
| <tr> |
| <td><code><a href="https://docs.aws.amazon.com/kinesis/latest/APIReference/API_Record.html#Streams-Type-Record-SequenceNumber">sequence-number</a></code></td> |
| <td><code>VARCHAR(128) NOT NULL</code></td> |
| <td>The unique identifier of the record within its shard.</td> |
| </tr> |
| </tbody> |
| </table> |
| |
| The extended `CREATE TABLE` example demonstrates the syntax for exposing these metadata fields: |
| |
| ```sql |
| CREATE TABLE KinesisTable ( |
| `user_id` BIGINT, |
| `item_id` BIGINT, |
| `category_id` BIGINT, |
| `behavior` STRING, |
| `ts` TIMESTAMP(3), |
| `arrival_time` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL, |
| `shard_id` VARCHAR(128) NOT NULL METADATA FROM 'shard-id' VIRTUAL, |
| `sequence_number` VARCHAR(128) NOT NULL METADATA FROM 'sequence-number' VIRTUAL |
| ) |
| PARTITIONED BY (user_id, item_id) |
| WITH ( |
| 'connector' = 'kinesis', |
| 'stream' = 'user_behavior', |
| 'aws.region' = 'us-east-2', |
| 'scan.stream.initpos' = 'LATEST', |
| 'format' = 'csv' |
| ); |
| ``` |
| |
| |
| Connector Options |
| ----------------- |
| |
| <table class="table table-bordered"> |
| <thead> |
| <tr> |
| <th class="text-left" style="width: 25%">Option</th> |
| <th class="text-center" style="width: 8%">Required</th> |
| <th class="text-center" style="width: 8%">Forwarded</th> |
| <th class="text-center" style="width: 7%">Default</th> |
| <th class="text-center" style="width: 10%">Type</th> |
| <th class="text-center" style="width: 42%">Description</th> |
| </tr> |
| <tr> |
| <th colspan="6" class="text-left" style="width: 100%">Common Options</th> |
| </tr> |
| </thead> |
| <tbody> |
| <tr> |
| <td><h5>connector</h5></td> |
| <td>required</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">(none)</td> |
| <td>String</td> |
| <td>Specify what connector to use. For Kinesis use <code>'kinesis'</code>.</td> |
| </tr> |
| <tr> |
| <td><h5>stream</h5></td> |
| <td>required</td> |
| <td>yes</td> |
| <td style="word-wrap: break-word;">(none)</td> |
| <td>String</td> |
| <td>Name of the Kinesis data stream backing this table.</td> |
| </tr> |
| <tr> |
| <td><h5>format</h5></td> |
| <td>required</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">(none)</td> |
| <td>String</td> |
| <td>The format used to deserialize and serialize Kinesis data stream records. See <a href="#data-type-mapping">Data Type Mapping</a> for details.</td> |
| </tr> |
| <tr> |
| <td><h5>aws.region</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">(none)</td> |
| <td>String</td> |
| <td>The AWS region where the stream is defined. Either this or <code>aws.endpoint</code> are required.</td> |
| </tr> |
| <tr> |
| <td><h5>aws.endpoint</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">(none)</td> |
| <td>String</td> |
| <td>The AWS endpoint for Kinesis (derived from the AWS region setting if not set). Either this or <code>aws.region</code> are required.</td> |
| </tr> |
| <tr> |
| <td><h5>aws.trust.all.certificates</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">false</td> |
| <td>Boolean</td> |
| <td>If true accepts all SSL certificates.</td> |
| </tr> |
| </tbody> |
| <thead> |
| <tr> |
| <th colspan="6" class="text-left" style="width: 100%">Authentication Options</th> |
| </tr> |
| </thead> |
| <tbody> |
| <tr> |
| <td><h5>aws.credentials.provider</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">AUTO</td> |
| <td>String</td> |
| <td>A credentials provider to use when authenticating against the Kinesis endpoint. See <a href="#authentication">Authentication</a> for details.</td> |
| </tr> |
| <tr> |
| <td><h5>aws.credentials.basic.accesskeyid</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">(none)</td> |
| <td>String</td> |
| <td>The AWS access key ID to use when setting credentials provider type to BASIC.</td> |
| </tr> |
| <tr> |
| <td><h5>aws.credentials.basic.secretkey</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">(none)</td> |
| <td>String</td> |
| <td>The AWS secret key to use when setting credentials provider type to BASIC.</td> |
| </tr> |
| <tr> |
| <td><h5>aws.credentials.profile.path</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">(none)</td> |
| <td>String</td> |
| <td>Optional configuration for profile path if credential provider type is set to be PROFILE.</td> |
| </tr> |
| <tr> |
| <td><h5>aws.credentials.profile.name</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">(none)</td> |
| <td>String</td> |
| <td>Optional configuration for profile name if credential provider type is set to be PROFILE.</td> |
| </tr> |
| <tr> |
| <td><h5>aws.credentials.role.arn</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">(none)</td> |
| <td>String</td> |
| <td>The role ARN to use when credential provider type is set to ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td> |
| </tr> |
| <tr> |
| <td><h5>aws.credentials.role.sessionName</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">(none)</td> |
| <td>String</td> |
| <td>The role session name to use when credential provider type is set to ASSUME_ROLE or WEB_IDENTITY_TOKEN.</td> |
| </tr> |
| <tr> |
| <td><h5>aws.credentials.role.externalId</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">(none)</td> |
| <td>String</td> |
| <td>The external ID to use when credential provider type is set to ASSUME_ROLE.</td> |
| </tr> |
| <tr> |
| <td><h5>aws.credentials.role.provider</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">(none)</td> |
| <td>String</td> |
| <td>The credentials provider that provides credentials for assuming the role when credential provider type is set to ASSUME_ROLE. Roles can be nested, so this value can again be set to ASSUME_ROLE</td> |
| </tr> |
| <tr> |
| <td><h5>aws.credentials.webIdentityToken.file</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">(none)</td> |
| <td>String</td> |
| <td>The absolute path to the web identity token file that should be used if provider type is set to WEB_IDENTITY_TOKEN.</td> |
| </tr> |
| </tbody> |
| <thead> |
| <tr> |
| <th colspan="6" class="text-left" style="width: 100%">Source Options</th> |
| </tr> |
| </thead> |
| <tbody> |
| <tr> |
| <td><h5>scan.stream.initpos</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">LATEST</td> |
| <td>String</td> |
| <td>Initial position to be used when reading from the table. See <a href="#start-reading-position">Start Reading Position</a> for details.</td> |
| </tr> |
| <tr> |
| <td><h5>scan.stream.initpos-timestamp</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">(none)</td> |
| <td>String</td> |
| <td>The initial timestamp to start reading Kinesis stream from (when <code>scan.stream.initpos</code> is AT_TIMESTAMP). See <a href="#start-reading-position">Start Reading Position</a> for details.</td> |
| </tr> |
| <tr> |
| <td><h5>scan.stream.initpos-timestamp-format</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">yyyy-MM-dd'T'HH:mm:ss.SSSXXX</td> |
| <td>String</td> |
| <td>The date format of initial timestamp to start reading Kinesis stream from (when <code>scan.stream.initpos</code> is AT_TIMESTAMP). See <a href="#start-reading-position">Start Reading Position</a> for details.</td> |
| </tr> |
| <tr> |
| <td><h5>scan.stream.recordpublisher</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">POLLING</td> |
| <td>String</td> |
| <td>The <code>RecordPublisher</code> type to use for sources. See <a href="#enhanced-fan-out">Enhanced Fan-Out</a> for details.</td> |
| </tr> |
| <tr> |
| <td><h5>scan.stream.efo.consumername</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">(none)</td> |
| <td>String</td> |
| <td>The name of the EFO consumer to register with KDS. See <a href="#enhanced-fan-out">Enhanced Fan-Out</a> for details.</td> |
| </tr> |
| <tr> |
| <td><h5>scan.stream.efo.registration</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">LAZY</td> |
| <td>String</td> |
| <td>Determine how and when consumer de-/registration is performed (LAZY|EAGER|NONE). See <a href="#enhanced-fan-out">Enhanced Fan-Out</a> for details.</td> |
| </tr> |
| <tr> |
| <td><h5>scan.stream.efo.consumerarn</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">(none)</td> |
| <td>String</td> |
| <td>The prefix of consumer ARN for a given stream. See <a href="#enhanced-fan-out">Enhanced Fan-Out</a> for details.</td> |
| </tr> |
| <tr> |
| <td><h5>scan.stream.efo.http-client.max-concurrency</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">10000</td> |
| <td>Integer</td> |
| <td>Maximum number of allowed concurrent requests for the EFO client. See <a href="#enhanced-fan-out">Enhanced Fan-Out</a> for details.</td> |
| </tr> |
| <tr> |
| <td><h5>scan.stream.describe.maxretries</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">50</td> |
| <td>Integer</td> |
| <td>The maximum number of <code>describeStream</code> attempts if we get a recoverable exception.</td> |
| </tr> |
| <tr> |
| <td><h5>scan.stream.describe.backoff.base</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">2000</td> |
| <td>Long</td> |
| <td>The base backoff time (in milliseconds) between each <code>describeStream</code> attempt (for consuming from DynamoDB streams).</td> |
| </tr> |
| <tr> |
| <td><h5>scan.stream.describe.backoff.max</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">5000</td> |
| <td>Long</td> |
| <td>The maximum backoff time (in milliseconds) between each <code>describeStream</code> attempt (for consuming from DynamoDB streams).</td> |
| </tr> |
| <tr> |
| <td><h5>scan.stream.describe.backoff.expconst</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">1.5</td> |
| <td>Double</td> |
| <td>The power constant for exponential backoff between each <code>describeStream</code> attempt (for consuming from DynamoDB streams).</td> |
| </tr> |
| <tr> |
| <td><h5>scan.list.shards.maxretries</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">10</td> |
| <td>Integer</td> |
| <td>The maximum number of <code>listShards</code> attempts if we get a recoverable exception.</td> |
| </tr> |
| <tr> |
| <td><h5>scan.list.shards.backoff.base</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">1000</td> |
| <td>Long</td> |
| <td>The base backoff time (in milliseconds) between each <code>listShards</code> attempt.</td> |
| </tr> |
| <tr> |
| <td><h5>scan.list.shards.backoff.max</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">5000</td> |
| <td>Long</td> |
| <td>The maximum backoff time (in milliseconds) between each <code>listShards</code> attempt.</td> |
| </tr> |
| <tr> |
| <td><h5>scan.list.shards.backoff.expconst</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">1.5</td> |
| <td>Double</td> |
| <td>The power constant for exponential backoff between each <code>listShards</code> attempt.</td> |
| </tr> |
| <tr> |
| <td><h5>scan.stream.describestreamconsumer.maxretries</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">50</td> |
| <td>Integer</td> |
| <td>The maximum number of <code>describeStreamConsumer</code> attempts if we get a recoverable exception.</td> |
| </tr> |
| <tr> |
| <td><h5>scan.stream.describestreamconsumer.backoff.base</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">2000</td> |
| <td>Long</td> |
| <td>The base backoff time (in milliseconds) between each <code>describeStreamConsumer</code> attempt.</td> |
| </tr> |
| <tr> |
| <td><h5>scan.stream.describestreamconsumer.backoff.max</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">5000</td> |
| <td>Long</td> |
| <td>The maximum backoff time (in milliseconds) between each <code>describeStreamConsumer</code> attempt.</td> |
| </tr> |
| <tr> |
| <td><h5>scan.stream.describestreamconsumer.backoff.expconst</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">1.5</td> |
| <td>Double</td> |
| <td>The power constant for exponential backoff between each <code>describeStreamConsumer</code> attempt.</td> |
| </tr> |
| <tr> |
| <td><h5>scan.stream.registerstreamconsumer.maxretries</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">10</td> |
| <td>Integer</td> |
| <td>The maximum number of <code>registerStream</code> attempts if we get a recoverable exception.</td> |
| </tr> |
| <tr> |
| <td><h5>scan.stream.registerstreamconsumer.timeout</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">60</td> |
| <td>Integer</td> |
| <td>The maximum time in seconds to wait for a stream consumer to become active before giving up.</td> |
| </tr> |
| <tr> |
| <td><h5>scan.stream.registerstreamconsumer.backoff.base</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">500</td> |
| <td>Long</td> |
| <td>The base backoff time (in milliseconds) between each <code>registerStream</code> attempt.</td> |
| </tr> |
| <tr> |
| <td><h5>scan.stream.registerstreamconsumer.backoff.max</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">2000</td> |
| <td>Long</td> |
| <td>The maximum backoff time (in milliseconds) between each <code>registerStream</code> attempt.</td> |
| </tr> |
| <tr> |
| <td><h5>scan.stream.registerstreamconsumer.backoff.expconst</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">1.5</td> |
| <td>Double</td> |
| <td>The power constant for exponential backoff between each <code>registerStream</code> attempt.</td> |
| </tr> |
| <tr> |
| <td><h5>scan.stream.deregisterstreamconsumer.maxretries</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">10</td> |
| <td>Integer</td> |
| <td>The maximum number of <code>deregisterStream</code> attempts if we get a recoverable exception.</td> |
| </tr> |
| <tr> |
| <td><h5>scan.stream.deregisterstreamconsumer.timeout</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">60</td> |
| <td>Integer</td> |
| <td>The maximum time in seconds to wait for a stream consumer to deregister before giving up.</td> |
| </tr> |
| <tr> |
| <td><h5>scan.stream.deregisterstreamconsumer.backoff.base</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">500</td> |
| <td>Long</td> |
| <td>The base backoff time (in milliseconds) between each <code>deregisterStream</code> attempt.</td> |
| </tr> |
| <tr> |
| <td><h5>scan.stream.deregisterstreamconsumer.backoff.max</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">2000</td> |
| <td>Long</td> |
| <td>The maximum backoff time (in milliseconds) between each <code>deregisterStream</code> attempt.</td> |
| </tr> |
| <tr> |
| <td><h5>scan.stream.deregisterstreamconsumer.backoff.expconst</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">1.5</td> |
| <td>Double</td> |
| <td>The power constant for exponential backoff between each <code>deregisterStream</code> attempt.</td> |
| </tr> |
| <tr> |
| <td><h5>scan.shard.subscribetoshard.maxretries</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">10</td> |
| <td>Integer</td> |
| <td>The maximum number of <code>subscribeToShard</code> attempts if we get a recoverable exception.</td> |
| </tr> |
| <tr> |
| <td><h5>scan.shard.subscribetoshard.backoff.base</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">1000</td> |
| <td>Long</td> |
| <td>The base backoff time (in milliseconds) between each <code>subscribeToShard</code> attempt.</td> |
| </tr> |
| <tr> |
| <td><h5>scan.shard.subscribetoshard.backoff.max</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">2000</td> |
| <td>Long</td> |
| <td>The maximum backoff time (in milliseconds) between each <code>subscribeToShard</code> attempt.</td> |
| </tr> |
| <tr> |
| <td><h5>scan.shard.subscribetoshard.backoff.expconst</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">1.5</td> |
| <td>Double</td> |
| <td>The power constant for exponential backoff between each <code>subscribeToShard</code> attempt.</td> |
| </tr> |
| <tr> |
| <td><h5>scan.shard.getrecords.maxrecordcount</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">10000</td> |
| <td>Integer</td> |
| <td>The maximum number of records to try to get each time we fetch records from a AWS Kinesis shard.</td> |
| </tr> |
| <tr> |
| <td><h5>scan.shard.getrecords.maxretries</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">3</td> |
| <td>Integer</td> |
| <td>The maximum number of <code>getRecords</code> attempts if we get a recoverable exception.</td> |
| </tr> |
| <tr> |
| <td><h5>scan.shard.getrecords.backoff.base</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">300</td> |
| <td>Long</td> |
| <td>The base backoff time (in milliseconds) between <code>getRecords</code> attempts if we get a ProvisionedThroughputExceededException.</td> |
| </tr> |
| <tr> |
| <td><h5>scan.shard.getrecords.backoff.max</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">1000</td> |
| <td>Long</td> |
| <td>The maximum backoff time (in milliseconds) between <code>getRecords</code> attempts if we get a ProvisionedThroughputExceededException.</td> |
| </tr> |
| <tr> |
| <td><h5>scan.shard.getrecords.backoff.expconst</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">1.5</td> |
| <td>Double</td> |
| <td>The power constant for exponential backoff between each <code>getRecords</code> attempt.</td> |
| </tr> |
| <tr> |
| <td><h5>scan.shard.getrecords.intervalmillis</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">200</td> |
| <td>Long</td> |
| <td>The interval (in milliseconds) between each <code>getRecords</code> request to a AWS Kinesis shard in milliseconds.</td> |
| </tr> |
| <tr> |
| <td><h5>scan.shard.getiterator.maxretries</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">3</td> |
| <td>Integer</td> |
| <td>The maximum number of <code>getShardIterator</code> attempts if we get ProvisionedThroughputExceededException.</td> |
| </tr> |
| <tr> |
| <td><h5>scan.shard.getiterator.backoff.base</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">300</td> |
| <td>Long</td> |
| <td>The base backoff time (in milliseconds) between <code>getShardIterator</code> attempts if we get a ProvisionedThroughputExceededException.</td> |
| </tr> |
| <tr> |
| <td><h5>scan.shard.getiterator.backoff.max</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">1000</td> |
| <td>Long</td> |
| <td>The maximum backoff time (in milliseconds) between <code>getShardIterator</code> attempts if we get a ProvisionedThroughputExceededException.</td> |
| </tr> |
| <tr> |
| <td><h5>scan.shard.getiterator.backoff.expconst</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">1.5</td> |
| <td>Double</td> |
| <td>The power constant for exponential backoff between each <code>getShardIterator</code> attempt.</td> |
| </tr> |
| <tr> |
| <td><h5>scan.shard.discovery.intervalmillis</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">10000</td> |
| <td>Integer</td> |
| <td>The interval between each attempt to discover new shards.</td> |
| </tr> |
| <tr> |
| <td><h5>scan.shard.adaptivereads</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">false</td> |
| <td>Boolean</td> |
| <td>The config to turn on adaptive reads from a shard. See the <code>AdaptivePollingRecordPublisher</code> documentation for details.</td> |
| </tr> |
| <tr> |
| <td><h5>scan.shard.idle.interval</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">-1</td> |
| <td>Long</td> |
| <td>The interval (in milliseconds) after which to consider a shard idle for purposes of watermark generation. A positive value will allow the watermark to progress even when some shards don't receive new records.</td> |
| </tr> |
| <tr> |
| <td><h5>scan.watermark.sync.interval</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">30000</td> |
| <td>Long</td> |
| <td>The interval (in milliseconds) for periodically synchronizing the shared watermark state.</td> |
| </tr> |
| <tr> |
| <td><h5>scan.watermark.lookahead.millis</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">0</td> |
| <td>Long</td> |
| <td>The maximum delta (in milliseconds) allowed for the reader to advance ahead of the shared global watermark.</td> |
| </tr> |
| <tr> |
| <td><h5>scan.watermark.sync.queue.capacity</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">100</td> |
| <td>Integer</td> |
| <td>The maximum number of records that will be buffered before suspending consumption of a shard.</td> |
| </tr> |
| </tbody> |
| <thead> |
| <tr> |
| <th colspan="6" class="text-left" style="width: 100%">Sink Options</th> |
| </tr> |
| </thead> |
| <tbody> |
| <tr> |
| <td><h5>sink.partitioner</h5></td> |
| <td>optional</td> |
| <td>yes</td> |
| <td style="word-wrap: break-word;">random or row-based</td> |
| <td>String</td> |
| <td>Optional output partitioning from Flink's partitions into Kinesis shards. See <a href="#sink-partitioning">Sink Partitioning</a> for details.</td> |
| </tr> |
| <tr> |
| <td><h5>sink.partitioner-field-delimiter</h5></td> |
| <td>optional</td> |
| <td>yes</td> |
| <td style="word-wrap: break-word;">|</td> |
| <td>String</td> |
| <td>Optional field delimiter for a fields-based partitioner derived from a PARTITION BY clause. See <a href="#sink-partitioning">Sink Partitioning</a> for details.</td> |
| </tr> |
| <tr> |
| <td><h5>sink.producer.*</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">(none)</td> |
| <td></td> |
| <td> |
| Deprecated options previously used by the legacy connector. |
| Options with equivalant alternatives in <code>KinesisStreamsSink</code> are matched |
| to their respective properties. Unsupported options are logged out to user as warnings. |
| </td> |
| </tr> |
| <tr> |
| <td><h5>sink.http-client.max-concurrency</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">10000</td> |
| <td>Integer</td> |
| <td> |
| Maximum number of allowed concurrent requests by <code>KinesisAsyncClient</code>. |
| </td> |
| </tr> |
| <tr> |
| <td><h5>sink.http-client.read-timeout</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">360000</td> |
| <td>Integer</td> |
| <td> |
| Maximum amount of time in ms for requests to be sent by <code>KinesisAsyncClient</code>. |
| </td> |
| </tr> |
| <tr> |
| <td><h5>sink.http-client.protocol.version</h5></td> |
| <td>optional</td> |
| <td>no</td> |
| <td style="word-wrap: break-word;">HTTP2</td> |
| <td>String</td> |
| <td>Http version used by Kinesis Client.</td> |
| </tr> |
| <tr> |
| <td><h5>sink.batch.max-size</h5></td> |
| <td>optional</td> |
| <td>yes</td> |
| <td style="word-wrap: break-word;">500</td> |
| <td>Integer</td> |
| <td>Maximum batch size of elements to be passed to <code>KinesisAsyncClient</code> to be written downstream.</td> |
| </tr> |
| <tr> |
| <td><h5>sink.requests.max-inflight</h5></td> |
| <td>optional</td> |
| <td>yes</td> |
| <td style="word-wrap: break-word;">16</td> |
| <td>Integer</td> |
| <td>Request threshold for uncompleted requests by <code>KinesisAsyncClient</code>before blocking new write requests and applying backpressure.</td> |
| </tr> |
| <tr> |
| <td><h5>sink.requests.max-buffered</h5></td> |
| <td>optional</td> |
| <td>yes</td> |
| <td style="word-wrap: break-word;">10000</td> |
| <td>String</td> |
| <td>Request buffer threshold for buffered requests by <code>KinesisAsyncClient</code> before blocking new write requests and applying backpressure.</td> |
| </tr> |
| <tr> |
| <td><h5>sink.flush-buffer.size</h5></td> |
| <td>optional</td> |
| <td>yes</td> |
| <td style="word-wrap: break-word;">5242880</td> |
| <td>Long</td> |
| <td>Threshold value in bytes for writer buffer in <code>KinesisAsyncClient</code> before flushing.</td> |
| </tr> |
| <tr> |
| <td><h5>sink.flush-buffer.timeout</h5></td> |
| <td>optional</td> |
| <td>yes</td> |
| <td style="word-wrap: break-word;">5000</td> |
| <td>Long</td> |
| <td>Threshold time in milliseconds for an element to be in a buffer of<code>KinesisAsyncClient</code> before flushing.</td> |
| </tr> |
| <tr> |
| <td><h5>sink.fail-on-error</h5></td> |
| <td>optional</td> |
| <td>yes</td> |
| <td style="word-wrap: break-word;">false</td> |
| <td>Boolean</td> |
| <td>Flag used for retrying failed requests. If set any request failure will not be retried and will fail the job.</td> |
| </tr> |
| </tbody> |
| </table> |
| |
| Features |
| -------- |
| |
| ### Authorization |
| |
| Make sure to [create an appropriate IAM policy](https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html) to allow reading from / writing to the Kinesis data streams. |
| |
| ### Authentication |
| |
| Depending on your deployment you would choose a different Credentials Provider to allow access to Kinesis. |
| By default, the `AUTO` Credentials Provider is used. |
| If the access key ID and secret key are set in the deployment configuration, this results in using the `BASIC` provider. |
| |
| A specific [AWSCredentialsProvider](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/index.html?com/amazonaws/auth/AWSCredentialsProvider.html) can be **optionally** set using the `aws.credentials.provider` setting. |
| Supported values are: |
| |
| * `AUTO` - Use the default AWS Credentials Provider chain that searches for credentials in the following order: `ENV_VARS`, `SYS_PROPS`, `WEB_IDENTITY_TOKEN`, `PROFILE`, and EC2/ECS credentials provider. |
| * `BASIC` - Use access key ID and secret key supplied as configuration. |
| * `ENV_VAR` - Use `AWS_ACCESS_KEY_ID` & `AWS_SECRET_ACCESS_KEY` environment variables. |
| * `SYS_PROP` - Use Java system properties `aws.accessKeyId` and `aws.secretKey`. |
| * `PROFILE` - Use an AWS credentials profile to create the AWS credentials. |
| * `ASSUME_ROLE` - Create AWS credentials by assuming a role. The credentials for assuming the role must be supplied. |
| * `WEB_IDENTITY_TOKEN` - Create AWS credentials by assuming a role using Web Identity Token. |
| |
| ### Start Reading Position |
| |
| You can configure table sources to start reading a table-backing Kinesis data stream from a specific position through the `scan.stream.initpos` option. |
| Available values are: |
| |
| * `LATEST`: read shards starting from the latest record. |
| * `TRIM_HORIZON`: read shards starting from the earliest record possible (data may be trimmed by Kinesis depending on the current retention settings of the backing stream). |
| * `AT_TIMESTAMP`: read shards starting from a specified timestamp. The timestamp value should be specified through the `scan.stream.initpos-timestamp` in one of the following formats: |
| * A non-negative double value representing the number of seconds that has elapsed since the Unix epoch (for example, `1459799926.480`). |
| * A value conforming to a user-defined `SimpleDateFormat` specified at `scan.stream.initpos-timestamp-format`. |
| If a user does not define a format, the default pattern will be `yyyy-MM-dd'T'HH:mm:ss.SSSXXX`. |
| For example, timestamp value is `2016-04-04` and user-defined format is `yyyy-MM-dd`, or timestamp value is `2016-04-04T19:58:46.480-00:00` and a user-defined format is not provided. |
| |
| ### Sink Partitioning |
| |
| Kinesis data streams consist of one or more shards, and the `sink.partitioner` option allows you to control how records written into a multi-shard Kinesis-backed table will be partitioned between its shards. |
| Valid values are: |
| |
| * `fixed`: Kinesis `PartitionKey` values derived from the Flink subtask index, so each Flink partition ends up in at most one Kinesis partition (assuming that no re-sharding takes place at runtime). |
| * `random`: Kinesis `PartitionKey` values are assigned randomly. This is the default value for tables not defined with a `PARTITION BY` clause. |
| * Custom `FixedKinesisPartitioner` subclass: e.g. `'org.mycompany.MyPartitioner'`. |
| |
| {{< hint info >}} |
| Records written into tables defining a `PARTITION BY` clause will always be partitioned based on a concatenated projection of the `PARTITION BY` fields. |
| In this case, the `sink.partitioner` field cannot be used to modify this behavior (attempting to do this results in a configuration error). |
| You can, however, use the `sink.partitioner-field-delimiter` option to set the delimiter of field values in the concatenated [PartitionKey](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html#Streams-PutRecord-request-PartitionKey) string (an empty string is also a valid delimiter). |
| {{< /hint >}} |
| |
| ### Enhanced Fan-Out |
| |
| [Enhanced Fan-Out (EFO)](https://aws.amazon.com/blogs/aws/kds-enhanced-fanout/) increases the maximum number of concurrent consumers per Kinesis data stream. |
| Without EFO, all concurrent Kinesis consumers share a single read quota per shard. |
| Using EFO, each consumer gets a distinct dedicated read quota per shard, allowing read throughput to scale with the number of consumers. |
| |
| <span class="label label-info">Note</span> Using EFO will [incur additional cost](https://aws.amazon.com/kinesis/data-streams/pricing/). |
| |
| You can enable and configure EFO with the following properties: |
| |
| * `scan.stream.recordpublisher`: Determines whether to use `EFO` or `POLLING`. |
| * `scan.stream.efo.consumername`: A name to identify the consumer when the above value is `EFO`. |
| * `scan.stream.efo.registration`: Strategy for (de-)registration of `EFO` consumers with the name given by the `scan.stream.efo.consumername` value. Valid strategies are: |
| * `LAZY` (default): Stream consumers are registered when the Flink job starts running. |
| If the stream consumer already exists, it will be reused. |
| This is the preferred strategy for the majority of applications. |
| However, jobs with parallelism greater than 1 will result in tasks competing to register and acquire the stream consumer ARN. |
| For jobs with very large parallelism this can result in an increased start-up time. |
| The describe operation has a limit of 20 [transactions per second](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_DescribeStreamConsumer.html), |
| this means application startup time will increase by roughly `parallelism/20 seconds`. |
| * `EAGER`: Stream consumers are registered in the `FlinkKinesisConsumer` constructor. |
| If the stream consumer already exists, it will be reused. |
| This will result in registration occurring when the job is constructed, |
| either on the Flink Job Manager or client environment submitting the job. |
| Using this strategy results in a single thread registering and retrieving the stream consumer ARN, |
| reducing startup time over `LAZY` (with large parallelism). |
| However, consider that the client environment will require access to the AWS services. |
| * `NONE`: Stream consumer registration is not performed by `FlinkKinesisConsumer`. |
| Registration must be performed externally using the [AWS CLI or SDK](https://aws.amazon.com/tools/) |
| to invoke [RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html). |
| Stream consumer ARNs should be provided to the job via the consumer configuration. |
| * `scan.stream.efo.consumerarn.<stream-name>`: ARNs identifying externally registered ARN-consumers (substitute `<stream-name>` with the name of your stream in the parameter name). |
| Use this if you choose to use `NONE` as a `scan.stream.efo.registration` strategy. |
| |
| <span class="label label-info">Note</span> For a given Kinesis data stream, each EFO consumer must have a unique name. |
| However, consumer names do not have to be unique across data streams. |
| Reusing a consumer name will result in existing subscriptions being terminated. |
| |
| <span class="label label-info">Note</span> With the `LAZY` strategy, stream consumers are de-registered when the job is shutdown gracefully. |
| In the event that a job terminates within executing the shutdown hooks, stream consumers will remain active. |
| In this situation the stream consumers will be gracefully reused when the application restarts. |
| With the `NONE` and `EAGER` strategies, stream consumer de-registration is not performed by `FlinkKinesisConsumer`. |
| |
| # Data Type Mapping |
| |
| |
| Kinesis stores records as Base64-encoded binary data objects, so it doesn't have a notion of internal record structure. |
| Instead, Kinesis records are deserialized and serialized by formats, e.g. 'avro', 'csv', or 'json'. |
| To determine the data type of the messages in your Kinesis-backed tables, pick a suitable Flink format with the `format` keyword. |
| Please refer to the [Formats]({{< ref "docs/connectors/table/formats/overview" >}}) pages for more details. |
| |
| # Updates in 1.15 |
| |
| Kinesis table API connector sink data stream depends on <code>FlinkKinesisProducer</code> till 1.14, with the introduction of <code>KinesisStreamsSink</code> in 1.15 kinesis table API sink connector has been migrated to the new <code>KinesisStreamsSink</code>. Authentication options have been migrated identically while sink configuration options are now compatible with <code>KinesisStreamsSink</code>. |
| |
| Options configuring <code>FlinkKinesisProducer</code> are now deprecated with fallback support for common configuration options with <code>KinesisStreamsSink</code>. |
| |
| <code>KinesisStreamsSink</code> uses <code>KinesisAsyncClient</code> to send records to kinesis, |
| which doesn't support aggregation. In consequence, table options configuring aggregation in the deprecated <code>FlinkKinesisProducer</code> |
| are now deprecated and will be ignored, this includes <code>sink.producer.aggregation-enabled</code> and |
| <code>sink.producer.aggregation-count</code>. |
| |
| <span class="label label-info">Note</span> Migrating applications with deprecated options will result in the incompatible deprecated options being ignored and warned to users. |
| |
| Kinesis table API source connector still depends on <code>FlinkKinesisConsumer</code> with no change in configuration options. |
| |
| |
| {{< top >}} |