title: Firehose weight: 5 type: docs aliases:
The Firehose sink writes to Amazon Kinesis Data Firehose.
Follow the instructions from the Amazon Kinesis Data Firehose Developer Guide to setup a Kinesis Data Firehose delivery stream.
To use the connector, add the following Maven dependency to your project:
{{< connector_artifact flink-connector-aws-kinesis-firehose firehose >}}
{{< py_connector_download_link “firehose” >}}
The KinesisFirehoseSink
uses AWS v2 SDK for Java to write data from a Flink stream into a Firehose delivery stream.
{{< tabs “42vs28vdth5-nm76-6dz1-5m7s-5y345bu56se5u66je” >}} {{< tab “Java” >}}
Properties sinkProperties = new Properties(); // Required sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1"); // Optional, provide via alternative routes e.g. environment variables sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); KinesisFirehoseSink<String> kdfSink = KinesisFirehoseSink.<String>builder() .setFirehoseClientProperties(sinkProperties) // Required .setSerializationSchema(new SimpleStringSchema()) // Required .setDeliveryStreamName("your-stream-name") // Required .setFailOnError(false) // Optional .setMaxBatchSize(500) // Optional .setMaxInFlightRequests(50) // Optional .setMaxBufferedRequests(10_000) // Optional .setMaxBatchSizeInBytes(4 * 1024 * 1024) // Optional .setMaxTimeInBufferMS(5000) // Optional .setMaxRecordSizeInBytes(1000 * 1024) // Optional .build(); flinkStream.sinkTo(kdfSink);
{{< /tab >}} {{< tab “Scala” >}}
val sinkProperties = new Properties() // Required sinkProperties.put(AWSConfigConstants.AWS_REGION, "eu-west-1") // Optional, provide via alternative routes e.g. environment variables sinkProperties.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id") sinkProperties.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key") val kdfSink = KinesisFirehoseSink.<String>builder() .setFirehoseClientProperties(sinkProperties) // Required .setSerializationSchema(new SimpleStringSchema()) // Required .setDeliveryStreamName("your-stream-name") // Required .setFailOnError(false) // Optional .setMaxBatchSize(500) // Optional .setMaxInFlightRequests(50) // Optional .setMaxBufferedRequests(10_000) // Optional .setMaxBatchSizeInBytes(4 * 1024 * 1024) // Optional .setMaxTimeInBufferMS(5000) // Optional .setMaxRecordSizeInBytes(1000 * 1024) // Optional .build() flinkStream.sinkTo(kdfSink)
{{< /tab >}} {{< tab “Python” >}}
sink_properties = { # Required 'aws.region': 'eu-west-1', # Optional, provide via alternative routes e.g. environment variables 'aws.credentials.provider.basic.accesskeyid': 'aws_access_key_id', 'aws.credentials.provider.basic.secretkey': 'aws_secret_access_key' } kdf_sink = KinesisFirehoseSink.builder() \ .set_firehose_client_properties(sink_properties) \ # Required .set_serialization_schema(SimpleStringSchema()) \ # Required .set_delivery_stream_name('your-stream-name') \ # Required .set_fail_on_error(False) \ # Optional .set_max_batch_size(500) \ # Optional .set_max_in_flight_requests(50) \ # Optional .set_max_buffered_requests(10000) \ # Optional .set_max_batch_size_in_bytes(5 * 1024 * 1024) \ # Optional .set_max_time_in_buffer_ms(5000) \ # Optional .set_max_record_size_in_bytes(1 * 1024 * 1024) \ # Optional .build()
{{< /tab >}} {{< /tabs >}}
Flink's Firehose sink is created by using the static builder KinesisFirehoseSink.<InputType>builder()
.
false
.500
.50
.10_000
.4 * 1024 * 1024
.5000
.1000 * 1024
.It is sometimes desirable to have Flink operate as a consumer or producer against a Firehose VPC endpoint or a non-AWS Firehose endpoint such as Localstack; this is especially useful when performing functional testing of a Flink application. The AWS endpoint that would normally be inferred by the AWS region set in the Flink configuration must be overridden via a configuration property.
To override the AWS endpoint, set the AWSConfigConstants.AWS_ENDPOINT
and AWSConfigConstants.AWS_REGION
properties. The region will be used to sign the endpoint URL.
{{< tabs “bcadd466-8416-4d3c-a6a7-c46eee0cbd4a” >}} {{< tab “Java” >}}
Properties producerConfig = new Properties(); producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1"); producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id"); producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key"); producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4566");
{{< /tab >}} {{< tab “Scala” >}}
val producerConfig = new Properties() producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1") producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id") producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key") producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4566")
{{< /tab >}} {{< tab “Python” >}}
producer_config = { 'aws.region': 'us-east-1', 'aws.credentials.provider.basic.accesskeyid': 'aws_access_key_id', 'aws.credentials.provider.basic.secretkey': 'aws_secret_access_key', 'aws.endpoint': 'http://localhost:4566' }
{{< /tab >}} {{< /tabs >}}
{{< top >}}