id: io-kinesis-sink title: Kinesis sink connector sidebar_label: “Kinesis sink connector”

:::note

You can download all the Pulsar connectors on download page.

:::

The Kinesis sink connector pulls data from Pulsar and persists data into Amazon Kinesis.

Configuration

The configuration of the Kinesis sink connector has the following property.

Property

NameTypeRequiredDefaultDescription
messageFormatMessageFormattrueONLY_RAW_PAYLOADMessage format in which Kinesis sink converts Pulsar messages and publishes to Kinesis streams.

Below are the available options:

ONLY_RAW_PAYLOAD: Kinesis sink directly publishes Pulsar message payload as a message into the configured Kinesis stream.

FULL_MESSAGE_IN_JSON: Kinesis sink creates a JSON payload with Pulsar message payload, properties and encryptionCtx, and publishes JSON payload into the configured Kinesis stream.

FULL_MESSAGE_IN_FB: Kinesis sink creates a flatbuffer serialized payload with Pulsar message payload, properties and encryptionCtx, and publishes flatbuffer payload into the configured Kinesis stream.

FULL_MESSAGE_IN_JSON_EXPAND_VALUE: Kinesis sink sends a JSON structure containing the record topic name, key, payload, properties and event time. The record schema is used to convert the value to JSON.
jsonIncludeNonNullsbooleanfalsetrueOnly the properties with non-null values are included when the message format is FULL_MESSAGE_IN_JSON_EXPAND_VALUE.
jsonFlattenbooleanfalsefalseWhen it is set to true and the message format is FULL_MESSAGE_IN_JSON_EXPAND_VALUE, the output JSON is flattened.
retainOrderingbooleanfalsefalseWhether Pulsar connectors to retain ordering when moving messages from Pulsar to Kinesis or not.
awsEndpointStringfalse" " (empty string)The Kinesis end-point URL, which can be found at here.
awsRegionStringfalse" " (empty string)The AWS region.

Example
us-west-1, us-west-2
awsKinesisStreamNameStringtrue" " (empty string)The Kinesis stream name.
awsCredentialPluginNameStringfalse" " (empty string)The fully-qualified class name of implementation of {@inject: github:AwsCredentialProviderPlugin:/pulsar-io/aws/src/main/java/org/apache/pulsar/io/aws/AwsCredentialProviderPlugin.java}.

It is a factory class which creates an AWSCredentialsProvider that is used by Kinesis sink.

If it is empty, the Kinesis sink creates a default AWSCredentialsProvider which accepts json-map of credentials in awsCredentialPluginParam.
awsCredentialPluginParamStringfalse" " (empty string)The JSON parameter to initialize awsCredentialsProviderPlugin.

Built-in plugins

The following are built-in AwsCredentialProviderPlugin plugins:

  • org.apache.pulsar.io.aws.AwsDefaultProviderChainPlugin

    This plugin takes no configuration, it uses the default AWS provider chain.

    For more information, see AWS documentation.

  • org.apache.pulsar.io.aws.STSAssumeRoleProviderPlugin

    This plugin takes a configuration (via the awsCredentialPluginParam) that describes a role to assume when running the KCL.

    This configuration takes the form of a small JSON document like:

    {"roleArn": "arn...", "roleSessionName": "name"}
    

Example

Before using the Kinesis sink connector, you need to create a configuration file through one of the following methods.

  • JSON

    {
       "configs": {
          "awsEndpoint": "some.endpoint.aws",
          "awsRegion": "us-east-1",
          "awsKinesisStreamName": "my-stream",
          "awsCredentialPluginParam": "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}",
          "messageFormat": "ONLY_RAW_PAYLOAD",
          "retainOrdering": "true"
       }
    }
    
  • YAML

    configs:
        awsEndpoint: "some.endpoint.aws"
        awsRegion: "us-east-1"
        awsKinesisStreamName: "my-stream"
        awsCredentialPluginParam: "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}"
        messageFormat: "ONLY_RAW_PAYLOAD"
        retainOrdering: "true"