id: io-kinesis-sink title: Kinesis sink connector sidebar_label: Kinesis sink connector

The Kinesis sink connector pulls data from Pulsar and persists data into Amazon Kinesis.

Configuration

The configuration of the Kinesis sink connector has the following property.

Property

NameTypeRequiredDefaultDescription
messageFormatMessageFormattrueONLY_RAW_PAYLOADMessage format in which Kinesis sink converts Pulsar messages and publishes to Kinesis streams.

Below are the available options:

ONLY_RAW_PAYLOAD: Kinesis sink directly publishes Pulsar message payload as a message into the configured Kinesis stream.

FULL_MESSAGE_IN_JSON: Kinesis sink creates a JSON payload with Pulsar message payload, properties and encryptionCtx, and publishes JSON payload into the configured Kinesis stream.

FULL_MESSAGE_IN_FB: Kinesis sink creates a flatbuffer serialized payload with Pulsar message payload, properties and encryptionCtx, and publishes flatbuffer payload into the configured Kinesis stream.
retainOrderingbooleanfalsefalseWhether Pulsar connectors to retain ordering when moving messages from Pulsar to Kinesis or not.
awsEndpointStringfalse" " (empty string)The Kinesis end-point URL, which can be found at here.
awsRegionStringfalse" " (empty string)The AWS region.

Example
us-west-1, us-west-2
awsKinesisStreamNameStringtrue" " (empty string)The Kinesis stream name.
awsCredentialPluginNameStringfalse" " (empty string)The fully-qualified class name of implementation of {@inject: github:AwsCredentialProviderPlugin:/pulsar-io/aws/src/main/java/org/apache/pulsar/io/aws/AwsCredentialProviderPlugin.java}.

It is a factory class which creates an AWSCredentialsProvider that is used by Kinesis sink.

If it is empty, the Kinesis sink creates a default AWSCredentialsProvider which accepts json-map of credentials in awsCredentialPluginParam.
awsCredentialPluginParamStringfalse" " (empty string)The JSON parameter to initialize awsCredentialsProviderPlugin.

Built-in plugins

The following are built-in AwsCredentialProviderPlugin plugins:

  • org.apache.pulsar.io.aws.AwsDefaultProviderChainPlugin

    This plugin takes no configuration, it uses the default AWS provider chain.

    For more information, see AWS documentation.

  • org.apache.pulsar.io.aws.STSAssumeRoleProviderPlugin

    This plugin takes a configuration (via the awsCredentialPluginParam) that describes a role to assume when running the KCL.

    This configuration takes the form of a small json document like:

    {"roleArn": "arn...", "roleSessionName": "name"}
    

Example

Before using the Kinesis sink connector, you need to create a configuration file through one of the following methods.

  • JSON

    {
       "configs": {
          "awsEndpoint": "some.endpoint.aws",
          "awsRegion": "us-east-1",
          "awsKinesisStreamName": "my-stream",
          "awsCredentialPluginParam": "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}",
          "messageFormat": "ONLY_RAW_PAYLOAD",
          "retainOrdering": "true"
       }
    }
    
  • YAML

    configs:
        awsEndpoint: "some.endpoint.aws"
        awsRegion: "us-east-1"
        awsKinesisStreamName: "my-stream"
        awsCredentialPluginParam: "{\"accessKey\":\"myKey\",\"secretKey\":\"my-Secret\"}"
        messageFormat: "ONLY_RAW_PAYLOAD"
        retainOrdering: "true"