The Samza Kinesis connector allows you to interact with Amazon Kinesis Data Streams, Amazon’s data streaming service. The hello-samza
project includes an example of processing Kinesis streams using Samza. Here is the complete source code and configs. You can build and run this example using this tutorial.
###Data Format Like a Kafka topic, a Kinesis stream can have multiple shards with producers and consumers. Each message consumed from the stream is an instance of a Kinesis Record. Samza’s KinesisSystemConsumer wraps the Record into a KinesisIncomingMessageEnvelope.
Here is the required configuration for consuming messages from Kinesis, through KinesisSystemDescriptor
and KinesisInputDescriptor
.
{% highlight java %} KinesisSystemDescriptor ksd = new KinesisSystemDescriptor(“kinesis”);
KinesisInputDescriptor<KV<String, byte[]>> kid = ksd.getInputDescriptor(“STREAM-NAME”, new NoOpSerde<byte[]>()) .withRegion(“STREAM-REGION”) .withAccessKey(“YOUR-ACCESS_KEY”) .withSecretKey(“YOUR-SECRET-KEY”); {% endhighlight %}
####Coordination The Kinesis system consumer does not rely on Samza's coordination mechanism. Instead, it uses the Kinesis client library (KCL) for coordination and distributing available shards among available instances. Hence, you should set your grouper
configuration to AllSspToSingleTaskGrouperFactory
.
{% highlight jproperties %} job.systemstreampartition.grouper.factory=org.apache.samza.container.grouper.stream.AllSspToSingleTaskGrouperFactory {% endhighlight %}
####Security
Each Kinesis stream in a given AWS region can be accessed by providing an access key. An Access key consists of two parts: an access key ID (for example, AKIAIOSFODNN7EXAMPLE
) and a secret access key (for example, wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
) which you can use to send programmatic requests to AWS.
{% highlight java %} KinesisInputDescriptor<KV<String, byte[]>> kid = ksd.getInputDescriptor(“STREAM-NAME”, new NoOpSerde<byte[]>()) .withRegion(“STREAM-REGION”) .withAccessKey(“YOUR-ACCESS_KEY”) .withSecretKey(“YOUR-SECRET-KEY”); {% endhighlight %}
Samza Kinesis Connector uses the Kinesis Client Library (KCL) to access the Kinesis data streams. You can set any KCL Configuration for a stream by configuring it through KinesisInputDescriptor
.
{% highlight java %} KinesisInputDescriptor<KV<String, byte[]>> kid = ...
Map<String, String> kclConfig = new HashMap<>; kclConfig.put(“CONFIG-PARAM”, “CONFIG-VALUE”);
kid.withKCLConfig(kclConfig); {% endhighlight %}
As an example, the below configuration is equivalent to invoking kclClient#WithTableName(myTable)
on the KCL instance. {% highlight java %} KinesisInputDescriptor<KV<String, byte[]>> kid = ...
Map<String, String> kclConfig = new HashMap<>; kclConfig.put(“TableName”, “myTable”);
kid.withKCLConfig(kclConfig); {% endhighlight %}
Samza allows you to specify any AWS client configs to connect to your Kinesis instance. You can configure any AWS client configuration through KinesisSystemDescriptor
.
{% highlight java %} Map<String, String> awsConfig = new HashMap<>; awsConfig.put(“CONFIG-PARAM”, “CONFIG-VALUE”);
KinesisSystemDescriptor sd = new KinesisSystemDescriptor(systemName) .withAWSConfig(awsConfig); {% endhighlight %}
Through KinesisSystemDescriptor
you can also set the proxy host and proxy port to be used by the Kinesis Client: {% highlight java %} KinesisSystemDescriptor sd = new KinesisSystemDescriptor(systemName) .withProxyHost(“YOUR-PROXY-HOST”) .withProxyPort(YOUR-PROXY-PORT); {% endhighlight %}
Unlike other connectors where Samza stores and manages checkpointed offsets, Kinesis checkpoints are stored in a DynamoDB table. These checkpoints are stored and managed by the KCL library internally. You can reset the checkpoints by configuring a different name for the DynamoDB table.
{% highlight jproperties %} // change the TableName to a unique name to reset checkpoints. systems.kinesis-system.streams.STREAM-NAME.aws.kcl.TableName=my-app-table-name {% endhighlight %}
Or through KinesisInputDescriptor
{% highlight java %} KinesisInputDescriptor<KV<String, byte[]>> kid = ...
Map<String, String> kclConfig = new HashMap<>; kclConfig.put(“TableName”, “my-new-app-table-name”);
kid.withKCLConfig(kclConfig); {% endhighlight %}
When you reset checkpoints, you can configure your job to start consuming from either the earliest or latest offset in the stream.
{% highlight jproperties %} // set the starting position to either TRIM_HORIZON (oldest) or LATEST (latest) systems.kinesis-system.streams.STREAM-NAME.aws.kcl.InitialPositionInStream=LATEST {% endhighlight %}
Or through KinesisInputDescriptor
{% highlight java %} KinesisInputDescriptor<KV<String, byte[]>> kid = ...
Map<String, String> kclConfig = new HashMap<>; kclConfig.put(“InitialPositionInStream”, “LATEST”);
kid.withKCLConfig(kclConfig); {% endhighlight %}
Alternately, if you want to start from a particular offset in the Kinesis stream, you can login to the AWS console and edit the offsets in your DynamoDB Table. By default, the table-name has the following format: “<job name>-<job id>-<kinesis stream>”.
The following limitations apply to Samza jobs consuming from Kinesis streams :
The KinesisSystemProducer for Samza is not yet implemented.