title: DynamoDB weight: 5 type: docs aliases:
The DynamoDB sink writes to Amazon DynamoDB using the AWS v2 SDK for Java. Follow the instructions from the Amazon DynamoDB Developer Guide to setup a table.
To use the connector, add the following Maven dependency to your project:
{{< connector_artifact flink-connector-dynamodb 4.1.0 >}}
{{< tabs “ec24a4ae-6a47-11ed-a1eb-0242ac120002” >}} {{< tab “Java” >}}
Properties sinkProperties = new Properties(); // Required sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1"); // Optional, provide via alternative routes e.g. environment variables sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); ElementConverter<InputType, DynamoDbWriteRequest> elementConverter = new CustomElementConverter(); DynamoDbSink<String> dynamoDbSink = DynamoDbSink.<InputType>builder() .setDynamoDbProperties(sinkProperties) // Required .setTableName("my-dynamodb-table") // Required .setElementConverter(elementConverter) // Required .setOverwriteByPartitionKeys(singletonList("key")) // Optional .setFailOnError(false) // Optional .setMaxBatchSize(25) // Optional .setMaxInFlightRequests(50) // Optional .setMaxBufferedRequests(10_000) // Optional .setMaxTimeInBufferMS(5000) // Optional .build(); flinkStream.sinkTo(dynamoDbSink);
{{< /tab >}} {{< tab “Scala” >}}
val sinkProperties = new Properties() // Required sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1") // Optional, provide via alternative routes e.g. environment variables sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id") sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key") val elementConverter = new CustomElementConverter(); val dynamoDbSink = DynamoDbSink.<InputType>builder() .setDynamoDbProperties(sinkProperties) // Required .setTableName("my-dynamodb-table") // Required .setElementConverter(elementConverter) // Required .setOverwriteByPartitionKeys(singletonList("key")) // Optional .setFailOnError(false) // Optional .setMaxBatchSize(25) // Optional .setMaxInFlightRequests(50) // Optional .setMaxBufferedRequests(10_000) // Optional .setMaxTimeInBufferMS(5000) // Optional .build() flinkStream.sinkTo(dynamoDbSink)
{{< /tab >}} {{< /tabs >}}
Flink's DynamoDB sink is created by using the static builder DynamoDBSink.<InputType>builder()
.
InputType
to DynamoDbWriteRequest
.false
.25
.50
.10_000
.5000
.An element converter is used to convert from a record in the DataStream to a DynamoDbWriteRequest which the sink will write to the destination DynamoDB table. The DynamoDB sink allows the user to supply a custom element converter, or use the provided DynamoDbBeanElementConverter
when you are working with @DynamoDbBean
objects. For more information on supported annotations see here.
A sample application using a custom ElementConverter
can be found here. A sample application using the DynamoDbBeanElementConverter
can be found here.
It is sometimes desirable to have Flink operate as a consumer or producer against a DynamoDB VPC endpoint or a non-AWS DynamoDB endpoint such as Localstack; this is especially useful when performing functional testing of a Flink application. The AWS endpoint that would normally be inferred by the AWS region set in the Flink configuration must be overridden via a configuration property.
To override the AWS endpoint, set the AWSConfigConstants.AWS_ENDPOINT
and AWSConfigConstants.AWS_REGION
properties. The region will be used to sign the endpoint URL.
{{< tabs “bcadd466-8416-4d3c-a6a7-c46eee0cbd4a” >}} {{< tab “Java” >}}
Properties producerConfig = new Properties(); producerConfig.put(AWSConfigConstants.AWS_REGION, "eu-west-1"); producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4566");
{{< /tab >}} {{< tab “Scala” >}}
val producerConfig = new Properties() producerConfig.put(AWSConfigConstants.AWS_REGION, "eu-west-1") producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id") producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key") producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4566")
{{< /tab >}} {{< /tabs >}}
{{< top >}}