title: Kinesis weight: 5 type: docs aliases:
{{< label “Scan Source: Unbounded” >}} {{< label “Sink: Batch” >}} {{< label “Sink: Streaming Append Mode” >}}
The Kinesis connector allows for reading data from and writing data into Amazon Kinesis Data Streams (KDS).
{{< sql_connector_download_table kinesis >}}
The Kinesis connector is not part of the binary distribution. See how to link with it for cluster execution [here]({{< ref “docs/dev/configuration/overview” >}}).
Follow the instructions from the Amazon KDS Developer Guide to set up a Kinesis stream. The following example shows how to create a table backed by a Kinesis data stream:
CREATE TABLE KinesisTable ( `user_id` BIGINT, `item_id` BIGINT, `category_id` BIGINT, `behavior` STRING, `ts` TIMESTAMP(3) ) PARTITIONED BY (user_id, item_id) WITH ( 'connector' = 'kinesis', 'stream' = 'user_behavior', 'aws.region' = 'us-east-2', 'scan.stream.initpos' = 'LATEST', 'format' = 'csv' );
The following metadata can be exposed as read-only (VIRTUAL
) columns in a table definition.
The extended CREATE TABLE
example demonstrates the syntax for exposing these metadata fields:
CREATE TABLE KinesisTable ( `user_id` BIGINT, `item_id` BIGINT, `category_id` BIGINT, `behavior` STRING, `ts` TIMESTAMP(3), `arrival_time` TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL, `shard_id` VARCHAR(128) NOT NULL METADATA FROM 'shard-id' VIRTUAL, `sequence_number` VARCHAR(128) NOT NULL METADATA FROM 'sequence-number' VIRTUAL ) PARTITIONED BY (user_id, item_id) WITH ( 'connector' = 'kinesis', 'stream' = 'user_behavior', 'aws.region' = 'us-east-2', 'scan.stream.initpos' = 'LATEST', 'format' = 'csv' );
Make sure to create an appropriate IAM policy to allow reading from / writing to the Kinesis data streams.
Depending on your deployment you would choose a different Credentials Provider to allow access to Kinesis. By default, the AUTO
Credentials Provider is used. If the access key ID and secret key are set in the deployment configuration, this results in using the BASIC
provider.
A specific AWSCredentialsProvider can be optionally set using the aws.credentials.provider
setting. Supported values are:
AUTO
- Use the default AWS Credentials Provider chain that searches for credentials in the following order: ENV_VARS
, SYS_PROPS
, WEB_IDENTITY_TOKEN
, PROFILE
, and EC2/ECS credentials provider.BASIC
- Use access key ID and secret key supplied as configuration.ENV_VAR
- Use AWS_ACCESS_KEY_ID
& AWS_SECRET_ACCESS_KEY
environment variables.SYS_PROP
- Use Java system properties aws.accessKeyId
and aws.secretKey
.PROFILE
- Use an AWS credentials profile to create the AWS credentials.ASSUME_ROLE
- Create AWS credentials by assuming a role. The credentials for assuming the role must be supplied.WEB_IDENTITY_TOKEN
- Create AWS credentials by assuming a role using Web Identity Token.You can configure table sources to start reading a table-backing Kinesis data stream from a specific position through the scan.stream.initpos
option. Available values are:
LATEST
: read shards starting from the latest record.TRIM_HORIZON
: read shards starting from the earliest record possible (data may be trimmed by Kinesis depending on the current retention settings of the backing stream).AT_TIMESTAMP
: read shards starting from a specified timestamp. The timestamp value should be specified through the scan.stream.initpos-timestamp
in one of the following formats:1459799926.480
).SimpleDateFormat
specified at scan.stream.initpos-timestamp-format
. If a user does not define a format, the default pattern will be yyyy-MM-dd'T'HH:mm:ss.SSSXXX
. For example, timestamp value is 2016-04-04
and user-defined format is yyyy-MM-dd
, or timestamp value is 2016-04-04T19:58:46.480-00:00
and a user-defined format is not provided.Kinesis data streams consist of one or more shards, and the sink.partitioner
option allows you to control how records written into a multi-shard Kinesis-backed table will be partitioned between its shards. Valid values are:
fixed
: Kinesis PartitionKey
values derived from the Flink subtask index, so each Flink partition ends up in at most one Kinesis partition (assuming that no re-sharding takes place at runtime).random
: Kinesis PartitionKey
values are assigned randomly. This is the default value for tables not defined with a PARTITION BY
clause.FixedKinesisPartitioner
subclass: e.g. 'org.mycompany.MyPartitioner'
.{{< hint info >}} Records written into tables defining a PARTITION BY
clause will always be partitioned based on a concatenated projection of the PARTITION BY
fields. In this case, the sink.partitioner
field cannot be used to modify this behavior (attempting to do this results in a configuration error). You can, however, use the sink.partitioner-field-delimiter
option to set the delimiter of field values in the concatenated PartitionKey string (an empty string is also a valid delimiter). {{< /hint >}}
Enhanced Fan-Out (EFO) increases the maximum number of concurrent consumers per Kinesis data stream. Without EFO, all concurrent Kinesis consumers share a single read quota per shard. Using EFO, each consumer gets a distinct dedicated read quota per shard, allowing read throughput to scale with the number of consumers.
Note Using EFO will incur additional cost.
You can enable and configure EFO with the following properties:
scan.stream.recordpublisher
: Determines whether to use EFO
or POLLING
.scan.stream.efo.consumername
: A name to identify the consumer when the above value is EFO
.scan.stream.efo.registration
: Strategy for (de-)registration of EFO
consumers with the name given by the scan.stream.efo.consumername
value. Valid strategies are:LAZY
(default): Stream consumers are registered when the Flink job starts running. If the stream consumer already exists, it will be reused. This is the preferred strategy for the majority of applications. However, jobs with parallelism greater than 1 will result in tasks competing to register and acquire the stream consumer ARN. For jobs with very large parallelism this can result in an increased start-up time. The describe operation has a limit of 20 transactions per second, this means application startup time will increase by roughly parallelism/20 seconds
.EAGER
: Stream consumers are registered in the FlinkKinesisConsumer
constructor. If the stream consumer already exists, it will be reused. This will result in registration occurring when the job is constructed, either on the Flink Job Manager or client environment submitting the job. Using this strategy results in a single thread registering and retrieving the stream consumer ARN, reducing startup time over LAZY
(with large parallelism). However, consider that the client environment will require access to the AWS services.NONE
: Stream consumer registration is not performed by FlinkKinesisConsumer
. Registration must be performed externally using the AWS CLI or SDK to invoke RegisterStreamConsumer. Stream consumer ARNs should be provided to the job via the consumer configuration.scan.stream.efo.consumerarn.<stream-name>
: ARNs identifying externally registered ARN-consumers (substitute <stream-name>
with the name of your stream in the parameter name). Use this if you choose to use NONE
as a scan.stream.efo.registration
strategy.Note For a given Kinesis data stream, each EFO consumer must have a unique name. However, consumer names do not have to be unique across data streams. Reusing a consumer name will result in existing subscriptions being terminated.
Note With the LAZY
strategy, stream consumers are de-registered when the job is shutdown gracefully. In the event that a job terminates within executing the shutdown hooks, stream consumers will remain active. In this situation the stream consumers will be gracefully reused when the application restarts. With the NONE
and EAGER
strategies, stream consumer de-registration is not performed by FlinkKinesisConsumer
.
Kinesis stores records as Base64-encoded binary data objects, so it doesn't have a notion of internal record structure. Instead, Kinesis records are deserialized and serialized by formats, e.g. ‘avro’, ‘csv’, or ‘json’. To determine the data type of the messages in your Kinesis-backed tables, pick a suitable Flink format with the format
keyword. Please refer to the [Formats]({{< ref “docs/connectors/table/formats/overview” >}}) pages for more details.
Kinesis table API connector sink data stream depends on FlinkKinesisProducer till 1.14, with the introduction of KinesisStreamsSink in 1.15 kinesis table API sink connector has been migrated to the new KinesisStreamsSink. Authentication options have been migrated identically while sink configuration options are now compatible with KinesisStreamsSink.
Options configuring FlinkKinesisProducer are now deprecated with fallback support for common configuration options with KinesisStreamsSink.
KinesisStreamsSink uses KinesisAsyncClient to send records to kinesis, which doesn't support aggregation. In consequence, table options configuring aggregation in the deprecated FlinkKinesisProducer are now deprecated and will be ignored, this includes sink.producer.aggregation-enabled and sink.producer.aggregation-count.
Note Migrating applications with deprecated options will result in the incompatible deprecated options being ignored and warned to users.
Kinesis table API source connector still depends on FlinkKinesisConsumer with no change in configuration options.
{{< top >}}