Apache Pekko Cluster allows the user to use an @extrefexternal shard allocation strategy in order to give the user more control over how many shards are created and what cluster nodes they are assigned to. If you consume Kafka messages into your Apache Pekko Cluster application then it's possible to run an Apache Pekko Connectors Kafka Consumer on each cluster node and co-locate Kafka partitions with Apache Pekko Cluster shards. When partitions and shards are co-located together then there is less chance that a message must be transmitted over the network by the Apache Pekko Cluster Shard Coordinator to a destination user sharded entity.
@@project-info{ projectId=“cluster-sharding” }
@@dependency [Maven,sbt,Gradle] { group=org.apache.pekko artifact=pekko-connectors-kafka-cluster-sharding_$scala.binary.version$ version=$project.version$ symbol2=PekkoVersion value2=“$pekko.version$” group2=org.apache.pekko artifact2=pekko-cluster-sharding-typed_$scala.binary.version$ version2=PekkoVersion }
This module contains an Apache Pekko extension called KafkaClusterSharding
and depends on pekko-cluster-sharding-typed
.
There are two steps required to setup the cluster sharding module.
@@@ note
A complete example of using this module exists in an apache/pekko-samples
project called pekko-sample-kafka-to-sharding-scala
.
It‘s a self-contained example that can run on a developer’s laptop.
@@@
To setup the @scaladocShardingMessageExtractor pick a factory method in the KafkaClusterSharding
Apache Pekko extension that best fits your use case. This module provides two kinds of extractors, extractors for entities that are within a @scaladocShardingEnvelope and without.
They're called messageExtractor
and messageExtractorNoEnvelope
respectively.
To route Kafka messages to the correct user entity we must use the same algorithm used to define the Kafka partition for the consumed message. This module implements the Murmur2-based hashing algorithm that‘s used in the Kafka @javadocDefaultPartitioner that’s used by default in the Kafka Producer. The input to this algorithm is the entity key and the number of partitions used in the topic the message was consumed from. Therefore it's critical to use the same Kafka message key (sharded entity id) and number of Kafka topic partitions (shards). The message extractors can optionally look up the number of shards given a topic name, or the user can provide the number of shards explicitly.
To get the @scaladocShardingMessageExtractor call the messageExtractor
overload that's suitable for your use case.
In the following example we asynchronously request an extractor that does not use a sharding envelope and will use the same number of partitions as the given topic name.
Given a user entity.
Scala : @@snip snip { #user-entity }
Java : @@snip snip { #user-entity }
Create a MessageExtractor
.
Scala : @@snip snip { #message-extractor }
Java : @@snip snip { #message-extractor }
Setup Apache Pekko Typed Cluster Sharding.
Scala : @@snip snip { #setup-cluster-sharding }
Java : @@snip snip { #setup-cluster-sharding }
The Rebalance Listener is a pre-defined Actor that will handle @scaladocConsumerRebalanceEvents that will update the Apache Pekko Cluster External Sharding strategy when subscribed partitions are re-assigned to consumers running on different cluster nodes. This makes sure that shards remain local to Kafka Consumers after a consumer group rebalance. The Rebalance Listener is returned as a Typed @scaladocActorRef[ConsumerRebalanceEvent] and must be converted to a classic @scaladocActorRef before being passed to @scaladocConsumerSettings.
@@@ note
It's recommended to use the same value for both the Kafka Consumer Group ID and the @scaladocEntityTypeKey. This allows you to create multiple Kafka Consumer Groups that consume the same type of messages from the same topic, but are routed to different @scaladocBehaviors to be processed in a different way.
For example, a user-events
topic is consumed by two consumer groups. One consumer group is used to maintain an up-to-date view of the user's profile and the other is used to represent an aggregate history of the types of user events. The same message type is used by separate Apache Pekko Connectors Kafka consumers, but the messages are routed to different Apache Pekko Cluster Sharding Coordinators that are setup to use separate @scaladocBehaviors.
@@@
Create the rebalance listener using the extension and pass it into an Apache Pekko Connectors Kafka @scaladocSubscription.
Scala : @@snip snip { #rebalance-listener }
Java : @@snip snip { #rebalance-listener }