project.description: Consume messages from Apache Kafka in Akka Streams sources and their commit offsets to Kafka.

Consumer

A consumer subscribes to Kafka topics and passes the messages into an Akka Stream.

The underlying implementation is using the KafkaConsumer, see @javadocKafka API for a description of consumer groups, offsets, and other details.

Choosing a consumer

Alpakka Kafka offers a large variety of consumers that connect to Kafka and stream data. The tables below may help you to find the consumer best suited for your use-case.

Consumers

These factory methods are part of the @apidoc[Consumer$] API.

Offsets handlingPartition awareSubscriptionShared consumerFactory methodStream element type
No (auto commit can be enabled)NoTopic or PartitionNoplainSourceConsumerRecord
No (auto commit can be enabled)NoPartitionYesplainExternalSourceConsumerRecord
Explicit committingNoTopic or PartitionNocommittableSourceCommittableMessage
Explicit committingNoPartitionYescommittableExternalSourceCommittableMessage
Explicit committing with metadataNoTopic or PartitionNocommitWithMetadataSourceCommittableMessage
Explicit committing (with metadata)NoTopic or PartitionNosourceWithOffsetContextConsumerRecord
Offset committed per elementNoTopic or PartitionNoatMostOnceSourceConsumerRecord
No (auto commit can be enabled)YesTopic or PartitionNoplainPartitionedSource(TopicPartition, Source[ConsumerRecord, ..])
External to KafkaYesTopic or PartitionNoplainPartitionedManualOffsetSource(TopicPartition, Source[ConsumerRecord, ..])
Explicit committingYesTopic or PartitionNocommittablePartitionedSource(TopicPartition, Source[CommittableMessage, ..])
External to Kafka & Explicit CommittingYesTopic or PartitionNocommittablePartitionedManualOffsetSource(TopicPartition, Source[CommittableMessage, ..])
Explicit committing with metadataYesTopic or PartitionNocommitWithMetadataPartitionedSource(TopicPartition, Source[CommittableMessage, ..])

Transactional consumers

These factory methods are part of the @apidoc[Transactional$]. For details see @refTransactions.

Offsets handlingPartition awareShared consumerFactory methodStream element type
TransactionalNoNoTransactional.sourceTransactionalMessage
TransactionalNoNoTransactional.sourceWithOffsetContextConsumerRecord

Settings

When creating a consumer source you need to pass in @apidoc[ConsumerSettings] that define things like:

  • de-serializers for the keys and values
  • bootstrap servers of the Kafka cluster (see @ref:Service discovery to defer the server configuration)
  • group id for the consumer, note that offsets are always committed for a given consumer group
  • Kafka consumer tuning parameters

Alpakka Kafka's defaults for all settings are defined in reference.conf which is included in the library JAR.

Important consumer settings : | Setting | Description | |-------------|----------------------------------------------| | stop-timeout | The stage will delay stopping the internal actor to allow processing of messages already in the stream (required for successful committing). This can be set to 0 for streams using @apidoc[Consumer.DrainingControl] | | kafka-clients | Section for properties passed unchanged to the Kafka client (see @extref:Kafka's Consumer Configs) | | connection-checker | Configuration to let the stream fail if the connection to the Kafka broker fails. |

reference.conf (HOCON) : @@ snip snip { #consumer-settings }

The Kafka documentation @extref:Consumer Configs lists the settings, their defaults and importance. More detailed explanations are given in the @javadocKafkaConsumer API and constants are defined in @javadocConsumerConfig API.

Programmatic construction

Stream-specific settings like the de-serializers and consumer group ID should be set programmatically. Settings that apply to many consumers may be set in application.conf or use @ref:config inheritance.

Scala : @@ snip snip { #settings }

Java : @@ snip snip { #settings }

Config inheritance

@apidoc[ConsumerSettings$] are created from configuration in application.conf (with defaults in reference.conf). The format of these settings files are described in the HOCON Config Documentation. A recommended setup is to rely on config inheritance as below:

application.conf (HOCON) : @@ snip app.conf { #consumer-config-inheritance }

Read the settings that inherit the defaults from “akka.kafka.consumer” settings:

Scala : @@ snip read { #config-inheritance }

Java : @@ snip read { #config-inheritance }

Offset Storage external to Kafka

The Kafka read offset can either be stored in Kafka (see below), or at a data store of your choice.

@apidocConsumer.plainSource { java=“#plainSourceK,V:akka.stream.javadsl.Source[org.apache.kafka.clients.consumer.ConsumerRecord[K,V],akka.kafka.javadsl.Consumer.Control]” scala=“#plainSourceK,V:akka.stream.scaladsl.Source[org.apache.kafka.clients.consumer.ConsumerRecord[K,V],akka.kafka.scaladsl.Consumer.Control]” } and @apidocConsumer.plainPartitionedManualOffsetSource { java=“#plainPartitionedManualOffsetSourceK,V:akka.stream.javadsl.Source[akka.japi.Pair[org.apache.kafka.common.TopicPartition,akka.stream.javadsl.Source[org.apache.kafka.clients.consumer.ConsumerRecord[K,V],akka.NotUsed]],akka.kafka.javadsl.Consumer.Control]” scala=“#breaks-paradox” } can be used to emit @javadocConsumerRecord elements as received from the underlying @javadocKafkaConsumer. They do not have support for committing offsets to Kafka. When using these Sources, either store an offset externally, or use auto-commit (note that auto-commit is disabled by default).

Scala : @@ snip snip { #settings-autocommit }

Java : @@ snip snip { #settings-autocommit }

The consumer application doesn‘t need to use Kafka’s built-in offset storage, it can store offsets in a store of its own choosing. The primary use case for this is allowing the application to store both the offset and the results of the consumption in the same system in a way that both the results and offsets are stored atomically. This is not always possible, but when it is it will make the consumption fully atomic and give “exactly once” semantics that are stronger than the “at-least-once” semantics you get with Kafka's offset commit functionality.

Scala : @@ snip snip { #plainSource }

Java : @@ snip snip { #plainSource }

For @apidocConsumer.plainSource { java=“#plainSourceK,V:akka.stream.javadsl.Source[org.apache.kafka.clients.consumer.ConsumerRecord[K,V],akka.kafka.javadsl.Consumer.Control]” scala=“#plainSourceK,V:akka.stream.scaladsl.Source[org.apache.kafka.clients.consumer.ConsumerRecord[K,V],akka.kafka.scaladsl.Consumer.Control]” } the @apidocSubscriptions.assignmentWithOffset specifies the starting point (offset) for a given consumer group id, topic and partition. The group id is defined in the @apidoc[ConsumerSettings$].

Alternatively, with @apidocConsumer.plainPartitionedManualOffsetSource, only the consumer group id and the topic are required on creation. The starting point is fetched by calling the getOffsetsOnAssign function passed in by the user. This function should return a Map of @javadocTopicPartition to Long, with the Long representing the starting point. If a consumer is assigned a partition that is not included in the Map that results from getOffsetsOnAssign, the default starting position will be used, according to the consumer configuration value auto.offset.reset. Also note that @apidocConsumer.plainPartitionedManualOffsetSource emits tuples of assigned topic-partition and a corresponding source, as in Source per partition.

Offset Storage in Kafka - committing

The @apidocConsumer.committableSource { java=“#committableSourceK,V:akka.stream.javadsl.Source[akka.kafka.ConsumerMessage.CommittableMessage[K,V],akka.kafka.javadsl.Consumer.Control]” scala=“#committableSourceK,V:akka.stream.scaladsl.Source[akka.kafka.ConsumerMessage.CommittableMessage[K,V],akka.kafka.scaladsl.Consumer.Control]” } makes it possible to commit offset positions to Kafka. Compared to auto-commit this gives exact control of when a message is considered consumed.

This is useful when “at-least-once” delivery is desired, as each message will likely be delivered one time, but in failure cases could be received more than once.

Scala : @@ snip snip { #atLeastOnce }

Java : @@ snip snip { #atLeastOnce }

Committing the offset for each message (withMaxBatch(1)) as illustrated above is rather slow. It is recommended to batch the commits for better throughput, in cases when upstream fails the Committer will try to commit the offsets collected before the error.

Committer sink

You can use a pre-defined @apidocCommitter.sink to perform commits in batches:

Scala : @@ snip snip { #committerSink }

Java : @@ snip snip { #committerSink }

When creating a @apidocCommitter.sink you need to pass in @apidoc[CommitterSettings$]. These may be created by passing the actor system to read the defaults from the config section akka.kafka.committer, or by passing a @scaladocConfig instance with the same structure.

Table : | Setting | Description | Default Value | |-------------|----------------------------------------------|---------------| | maxBatch | maximum number of messages to commit at once | 1000 | | maxInterval | maximum interval between commits | 10 seconds | | parallelism | maximum number of commit batches in flight | 100 |

reference.conf : @@snip snip { #committer-settings }

All commit batches are aggregated internally and passed on to Kafka very often (in every poll cycle), the Committer settings configure how the stream sends the offsets to the internal actor which communicates with the Kafka broker. Increasing these values means that in case of a failure you may have to re-process more messages.

If you use Kafka older than version 2.1.0 and consume from a topic with low activity, and possibly no messages arrive for more than 24 hours, consider enabling periodical commit refresh (akka.kafka.consumer.commit-refresh-interval configuration parameters), otherwise offsets might expire in the Kafka storage. This has been fixed in Kafka 2.1.0 (See KAFKA-4682).

Committer variants

These factory methods are part of the @apidoc[Committer$].

Factory methodStream element typeEmits
sinkCommittableN/A
sinkWithOffsetContextAny (CommittableOffset in context)N/A
flowCommittableDone
batchFlowCommittableCommittableOffsetBatch
flowWithOffsetContextAny (CommittableOffset in context)NotUsed (CommittableOffsetBatch in context)

Commit with meta-data

The @apidocConsumer.commitWithMetadataSource allows you to add metadata to the committed offset based on the last consumed record.

Note that the first offset provided to the consumer during a partition assignment will not contain metadata. This offset can get committed due to a periodic commit refresh (akka.kafka.consumer.commit-refresh-interval configuration parameters) and the commit will not contain metadata.

Scala : @@ snip snip { #commitWithMetadata }

Java : @@ snip snip { #commitWithMetadata }

Offset Storage in Kafka & external

In some cases you may wish to use external offset storage as your primary means to manage offsets, but also commit offsets to Kafka. This gives you all the benefits of controlling offsets described in @ref:Offset Storage external to Kafka and allows you to use tooling in the Kafka ecosystem to track consumer group lag. You can use the @apidocConsumer.committablePartitionedManualOffsetSource source, which emits a @apidoc[ConsumerMessage.CommittableMessage], to seek to appropriate offsets on startup, do your processing, commit to external storage, and then commit offsets back to Kafka. This will only provide at-least-once guarantees for your consumer group lag monitoring because it's possible for a failure between storing your offsets externally and committing to Kafka, but it will give you a more accurate representation of consumer group lag then when turning on auto commits with the enable.auto.commit consumer property.

Consume “at-most-once”

If you commit the offset before processing the message you get “at-most-once” delivery semantics, this is provided by @apidocConsumer.atMostOnceSource. However, atMostOnceSource commits the offset for each message and that is rather slow, batching of commits is recommended. If your “at-most-once” requirements are more relaxed, consider a @apidocConsumer.plainSource and enable Kafka's auto committing with enable.auto.commit = true.

Scala : @@ snip snip { #atMostOnce }

Java : @@ snip snip { #atMostOnce }

Consume “at-least-once”

How to achieve at-least-once delivery semantics is covered in @ref:At-Least-Once Delivery.

Connecting Producer and Consumer

For cases when you need to read messages from one topic, transform or enrich them, and then write to another topic you can use @apidocConsumer.committableSource and connect it to a @apidocProducer.committableSink. The committableSink will commit the offset back to the consumer regularly.

The committableSink accepts implementations @apidoc[ProducerMessage.Envelope] that contain the offset to commit the consumption of the originating message (of type @apidoc[akka.kafka.ConsumerMessage.Committable]). See @refProducing messages about different implementations of @apidoc[ProducerMessage.Envelope].

Scala : @@ snip snip { #consumerToProducerSink }

Java : @@ snip snip { #consumerToProducerSink }

@@@note

There is a risk that something fails after publishing, but before committing, so committableSink has “at-least-once” delivery semantics.

To get delivery guarantees, please read about @reftransactions.

@@@

Source per partition

@apidocConsumer.plainPartitionedSource , @apidocConsumer.committablePartitionedSource, and @apidocConsumer.commitWithMetadataPartitionedSource support tracking the automatic partition assignment from Kafka. When a topic-partition is assigned to a consumer, this source will emit a tuple with the assigned topic-partition and a corresponding source. When a topic-partition is revoked, the corresponding source completes.

Scala : @@ snip snip { #committablePartitionedSource }

Java : @@ snip snip { #committablePartitionedSource }

Separate streams per partition:

Scala : @@ snip snip { #committablePartitionedSource-stream-per-partition }

Java : @@ snip snip { #committablePartitionedSource-stream-per-partition }

Sharing the KafkaConsumer instance

If you have many streams it can be more efficient to share the underlying @javadocKafkaConsumer instance. It is shared by creating a @apidoc[akka.kafka.KafkaConsumerActor$]. You need to create the actor and stop it by sending KafkaConsumerActor.Stop when it is not needed any longer. You pass the @apidoc[akka.actor.ActorRef] as a parameter to the @apidocConsumer factory methods.

Scala : @@ snip snip { #consumerActor }

Java : @@ snip snip { #consumerActor }

Accessing KafkaConsumer metrics

You can access the underlying consumer metrics via the materialized Control instance:

Scala : @@ snip snip { #consumerMetrics }

Java : @@ snip snip { #consumerMetrics }

Accessing KafkaConsumer metadata

Accessing of Kafka consumer metadata is possible as described in @refConsumer Metadata.

Controlled shutdown

The @apidoc[Source] created with @apidocConsumer.plainSource and similar methods materializes to a @apidoc[akka.kafka.(javadsl|scaladsl).Consumer.Control] instance. This can be used to stop the stream in a controlled manner.

When using external offset storage, a call to Consumer.Control.shutdown() suffices to complete the Source, which starts the completion of the stream.

Scala : @@ snip snip { #shutdownPlainSource }

Java : @@ snip snip { #shutdownPlainSource }

When you are using offset storage in Kafka, the shutdown process involves several steps:

  1. Consumer.Control.stop() to stop producing messages from the Source. This does not stop the underlying Kafka Consumer.
  2. Wait for the stream to complete, so that a commit request has been made for all offsets of all processed messages (via Committer.sink/flow, commitScaladsl() or commitJavadsl()).
  3. Consumer.Control.shutdown() to wait for all outstanding commit requests to finish and stop the Kafka Consumer.

Draining control

To manage this shutdown process, use the @apidoc[Consumer.DrainingControl] by combining the Consumer.Control with the sink's materialized completion future in mapMaterializedValue. That control offers the method drainAndShutdown which implements the process descibed above.

Note: The @apidoc[ConsumerSettings] stop-timeout delays stopping the Kafka Consumer and the stream, but when using drainAndShutdown that delay is not required and can be set to zero (as below).

Scala : @@ snip snip { #shutdownCommittableSource }

Java : @@ snip snip { #shutdownCommittableSource }

@@@ index

@@@