blob: b69c150fe614a8822d781499da78c60ac9fa006a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/
/*
* Copyright (C) 2014 - 2016 Softwaremill <https://softwaremill.com>
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.scaladsl
import org.apache.pekko
import pekko.NotUsed
import pekko.actor.typed.scaladsl.Behaviors
import pekko.actor.typed.scaladsl.adapter._
import pekko.actor.typed.{ ActorSystem, Behavior }
import pekko.cluster.sharding.external.ExternalShardAllocationStrategy
import pekko.cluster.sharding.typed.ClusterShardingSettings
import pekko.cluster.sharding.typed.scaladsl.{ ClusterSharding, Entity, EntityTypeKey }
import pekko.kafka.cluster.sharding.KafkaClusterSharding
import pekko.kafka.scaladsl.Consumer
import pekko.kafka.{ ConsumerRebalanceEvent, ConsumerSettings, Subscriptions }
import pekko.stream.scaladsl.{ Flow, Sink }
import org.apache.kafka.common.serialization.{ ByteArrayDeserializer, StringDeserializer }
import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration._
import scala.util.{ Failure, Success }
/**
* This is compile-only code meant for documentation purposes.
* A full sample application exists in the akka-samples repository:
*
* https://github.com/akka/akka-samples/tree/2.6/akka-sample-kafka-to-sharding-scala
*/
object ClusterShardingExample {
implicit val system: ActorSystem[_] = ActorSystem(Behaviors.empty, "ClusterShardingExample")
val kafkaBootstrapServers = "localhost:9092"
implicit val ec: ExecutionContext = system.executionContext
def userBehaviour(): Behavior[User] = Behaviors.empty[User]
def userBusiness[T](): Flow[T, T, NotUsed] = Flow[T]
// #user-entity
final case class User(id: String, name: String)
// #user-entity
// #message-extractor
// automatically retrieving the number of partitions requires a round trip to a Kafka broker
val messageExtractor: Future[KafkaClusterSharding.KafkaShardingNoEnvelopeExtractor[User]] =
KafkaClusterSharding(system.toClassic).messageExtractorNoEnvelope(
timeout = 10.seconds,
topic = "user-topic",
entityIdExtractor = (msg: User) => msg.id,
settings = ConsumerSettings(system.toClassic, new StringDeserializer, new StringDeserializer)
.withBootstrapServers(kafkaBootstrapServers))
// #message-extractor
// #setup-cluster-sharding
// create an Apache Pekko Cluster Sharding `EntityTypeKey` for `User` for this Kafka Consumer Group
val groupId = "user-topic-group-id"
val typeKey = EntityTypeKey[User](groupId)
messageExtractor.onComplete {
case Success(extractor) =>
ClusterSharding(system).init(
Entity(typeKey)(createBehavior = _ => userBehaviour())
.withAllocationStrategy(new ExternalShardAllocationStrategy(system, typeKey.name))
.withMessageExtractor(extractor)
.withSettings(ClusterShardingSettings(system)))
case Failure(ex) => system.log.error("An error occurred while obtaining the message extractor", ex)
}
// #setup-cluster-sharding
// #rebalance-listener
// obtain an Apache Pekko classic ActorRef that will handle consumer group rebalance events
val rebalanceListener: pekko.actor.typed.ActorRef[ConsumerRebalanceEvent] =
KafkaClusterSharding(system.toClassic).rebalanceListener(typeKey)
// convert the rebalance listener to a classic ActorRef until Apache Pekko Connector Kafka supports Apache Pekko Typed
import pekko.actor.typed.scaladsl.adapter._
val rebalanceListenerClassic: pekko.actor.ActorRef = rebalanceListener.toClassic
val consumerSettings =
ConsumerSettings(system.toClassic, new StringDeserializer, new ByteArrayDeserializer)
.withBootstrapServers(kafkaBootstrapServers)
.withGroupId(typeKey.name) // use the same group id as we used in the `EntityTypeKey` for `User`
// pass the rebalance listener to the topic subscription
val subscription = Subscriptions
.topics("user-topic")
.withRebalanceListener(rebalanceListenerClassic)
// run & materialize the stream
val consumer = Consumer
.plainSource(consumerSettings, subscription)
.via(userBusiness())
.runWith(Sink.ignore)
// #rebalance-listener
}