Stateful Functions offers an AWS Kinesis I/O Module for reading from and writing to Kinesis streams. It is based on Apache Flink's Kinesis connector. The Kinesis I/O Module is configurable in Yaml or Java.
To use the Kinesis I/O Module in Java, please include the following dependency in your pom.
{% highlight xml %} org.apache.flink statefun-kinesis-io {{ site.version }} provided {% endhighlight %}
A KinesisIngressSpec
declares an ingress spec for consuming from Kinesis stream.
It accepts the following arguments:
KinesisIngressDeserializer
for deserializing data from Kinesis (Java only)import org.apache.flink.statefun.docs.models.User; import org.apache.flink.statefun.sdk.io.IngressIdentifier; import org.apache.flink.statefun.sdk.io.IngressSpec; import org.apache.flink.statefun.sdk.kinesis.auth.AwsCredentials; import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressBuilder; import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressStartupPosition;
public class IngressSpecs {
public static final IngressIdentifier ID = new IngressIdentifier<>(User.class, “example”, “input-ingress”);
public static final IngressSpec kinesisIngress = KinesisIngressBuilder.forIdentifier(ID) .withAwsRegion(“us-west-1”) .withAwsCredentials(AwsCredentials.fromDefaultProviderChain()) .withDeserializer(UserDeserializer.class) .withStream(“stream-name”) .withStartupPosition(KinesisIngressStartupPosition.fromEarliest()) .withClientConfigurationProperty(“key”, “value”) .build(); } {% endhighlight %}
module: meta: type: remote spec: ingresses: - ingress: meta: type: statefun.kinesis.io/routable-protobuf-ingress id: example-namespace/messages spec: awsRegion: type: specific id: us-west-1 awsCredentials: type: basic accessKeyId: my_access_key_id secretAccessKey: my_secret_access_key startupPosition: type: earliest streams: - stream: stream-1 typeUrl: com.googleapis/org.apache.flink.statefun.docs.models.User targets: - example-namespace/my-function-1 - example-namespace/my-function-2 - stream: stream-2 typeUrl: com.googleapis/org.apache.flink.statefun.docs.models.User targets: - example-namespace/my-function-1 clientConfigProperties: - SocketTimeout: 9999 - MaxConnections: 15 {% endhighlight %}
The ingress also accepts properties to directly configure the Kinesis client, using KinesisIngressBuilder#withClientConfigurationProperty()
. Please refer to the Kinesis client configuration documentation for the full list of available properties. Note that configuration passed using named methods will have higher precedence and overwrite their respective settings in the provided properties.
The ingress allows configuring the startup position to be one of the following:
Start consuming from the latest position, i.e. head of the stream shards.
Start consuming from the earliest position possible.
Starts from offsets that have an ingestion time larger than or equal to a specified date.
The Kinesis ingress needs to know how to turn the binary data in Kinesis into Java objects. The KinesisIngressDeserializer
allows users to specify such a schema. The T deserialize(IngressRecord ingressRecord)
method gets called for each Kinesis record, passing the binary data and metadata from Kinesis.
{% highlight java %} package org.apache.flink.statefun.docs.io.kinesis;
import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import org.apache.flink.statefun.docs.models.User; import org.apache.flink.statefun.sdk.kinesis.ingress.IngressRecord; import org.apache.flink.statefun.sdk.kinesis.ingress.KinesisIngressDeserializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
public class UserDeserializer implements KinesisIngressDeserializer {
private static Logger LOG = LoggerFactory.getLogger(UserDeserializer.class);
private final ObjectMapper mapper = new ObjectMapper();
@Override public User deserialize(IngressRecord ingressRecord) { try { return mapper.readValue(ingressRecord.getData(), User.class); } catch (IOException e) { LOG.debug(“Failed to deserialize record”, e); return null; } } } {% endhighlight %}
A KinesisEgressBuilder
declares an egress spec for writing data out to a Kinesis stream.
It accepts the following arguments:
KinesisEgressSerializer
for serializing data into Kinesis (Java only)import org.apache.flink.statefun.docs.models.User; import org.apache.flink.statefun.sdk.io.EgressIdentifier; import org.apache.flink.statefun.sdk.io.EgressSpec; import org.apache.flink.statefun.sdk.kinesis.auth.AwsCredentials; import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressBuilder;
public class EgressSpecs {
public static final EgressIdentifier ID = new EgressIdentifier<>(“example”, “output-egress”, User.class);
public static final EgressSpec kinesisEgress = KinesisEgressBuilder.forIdentifier(ID) .withAwsCredentials(AwsCredentials.fromDefaultProviderChain()) .withAwsRegion(“us-west-1”) .withMaxOutstandingRecords(100) .withClientConfigurationProperty(“key”, “value”) .withSerializer(UserSerializer.class) .build(); } {% endhighlight %}
module: meta: type: remote spec: egresses: - egress: meta: type: statefun.kinesis.io/generic-egress id: example/output-messages spec: awsRegion: type: custom-endpoint endpoint: https://localhost:4567 id: us-west-1 awsCredentials: type: profile profileName: john-doe profilePath: /path/to/profile/config maxOutstandingRecords: 9999 clientConfigProperties: - ThreadingModel: POOLED - ThreadPoolSize: 10 {% endhighlight %}
Please refer to the Kinesis producer default configuration properties documentation for the full list of available properties.
The Kinesis egress needs to know how to turn Java objects into binary data. The KinesisEgressSerializer
allows users to specify such a schema. The EgressRecord serialize(T value)
method gets called for each message, allowing users to set a value, and other metadata.
{% highlight java %} package org.apache.flink.statefun.docs.io.kinesis;
import com.fasterxml.jackson.databind.ObjectMapper; import java.io.IOException; import org.apache.flink.statefun.docs.models.User; import org.apache.flink.statefun.sdk.kinesis.egress.EgressRecord; import org.apache.flink.statefun.sdk.kinesis.egress.KinesisEgressSerializer; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
public class UserSerializer implements KinesisEgressSerializer {
private static final Logger LOG = LoggerFactory.getLogger(UserSerializer.class);
private static final String STREAM = “user-stream”;
private final ObjectMapper mapper = new ObjectMapper();
@Override public EgressRecord serialize(User value) { try { return EgressRecord.newBuilder() .withPartitionKey(value.getUserId()) .withData(mapper.writeValueAsBytes(value)) .withStream(STREAM) .build(); } catch (IOException e) { LOG.info(“Failed to serializer user”, e); return null; } } } {% endhighlight %}
Both the Kinesis ingress and egress can be configured to a specific AWS region.
Consults AWS's default provider chain to determine the AWS region.
Specifies an AWS region using the region's unique id.
Connects to an AWS region through a non-standard AWS service endpoint. This is typically used only for development and testing purposes.
Both the Kinesis ingress and egress can be configured using standard AWS credential providers.
Consults AWS’s default provider chain to determine the AWS credentials.
Specifies the AWS credentials directly with provided access key ID and secret access key strings.
Specifies the AWS credentials using an AWS configuration profile, along with the profile's configuration path.