title: Apache Kafka weight: 2 type: docs aliases:
Stateful Functions offers an Apache Kafka I/O Module for reading from and writing to Kafka topics. It is based on Apache Flink's universal Kafka connector and provides exactly-once processing semantics. Kafka is configured in the [module specification]({{< ref “docs/deployment/module” >}}) of your application.
A Kafka ingress defines an input point that reads records from one or more topics.
version: "3.0" module: meta: type: remote spec: ingresses: - ingress: meta: type: io.statefun.kafka/ingress id: com.example/users spec: address: kafka-broker:9092 consumerGroupId: my-consumer-group startupPosition: type: earliest topics: - topic: messages-1 valueType: com.example/User targets: - com.example.fns/greeter
The ingress also accepts properties to directly configure the Kafka client, using ingress.spec.properties
. Please refer to the Kafka consumer configuration documentation for the full list of available properties. Note that configuration passed using named paths, such as ingress.spec.address
, will have higher precedence and overwrite their respective settings in the provided properties.
The ingress allows configuring the startup position to be one of the following:
Starts from offsets that were committed to Kafka for the specified consumer group.
startupPosition: type: group-offsets
Starts from the earliest offset.
startupPosition: type: earliest
Starts from the latest offset.
startupPosition: type: latest
Starts from specific offsets, defined as a map of partitions to their target starting offset.
startupPosition: type: specific-offsets offsets: - user-topic/0: 91 - user-topic/1: 11 - user-topic/2: 8
Starts from offsets that have an ingestion time larger than or equal to a specified date.
startupPosition: type: date date: 2020-02-01 04:15:00.00 Z
On startup, if the specified startup offset for a partition is out-of-range or does not exist (which may be the case if the ingress is configured to start from group offsets, specific offsets, or from a date), then the ingress will fallback to using the position configured using ingress.spec.autoOffsetResetPosition
which may be set to either latest
or earliest
. By default, this is set to be the latest
position.
A Kafka egress defines an input point where functions can write out records to one or more topics.
version: "3.0" module: meta: type: remote spec: egresses: - egress: meta: type: io.statefun.kafka/egress id: example/output-messages spec: address: kafka-broker:9092 deliverySemantic: type: exactly-once transactionTimeoutMillis: 100000 properties: - foo.config: bar
Please refer to the Kafka producer configuration documentation for the full list of available properties.
With fault tolerance enabled, the Kafka egress can provide exactly-once delivery guarantees. You can choose three different modes of operation.
Nothing is guaranteed, produced records can be lost or duplicated.
deliverySemantic: type: none
Stateful Functions will guarantee that no records will be lost but they can be duplicated.
deliverySemantic: type: at-least-once
Stateful Functions uses Kafka transactions to provide exactly-once semantics.
deliverySemantic: type: exactly-once transactionTimeoutMillis: 900000 # 15 min
Functions write directly to Kafka from their SDK context. See SDK specific documentation for more details.