blob: 7b4e28c28eed88b0d508baec52ebf1f44bc1cf11 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/
/*
* Copyright (C) 2014 - 2016 Softwaremill <https://softwaremill.com>
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.javadsl;
import java.time.Duration;
import java.util.concurrent.CompletionStage;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.typed.ActorSystem;
import org.apache.pekko.actor.typed.Behavior;
import org.apache.pekko.actor.typed.javadsl.Adapter;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
import org.apache.pekko.cluster.sharding.external.ExternalShardAllocationStrategy;
import org.apache.pekko.cluster.sharding.typed.javadsl.ClusterSharding;
import org.apache.pekko.cluster.sharding.typed.javadsl.Entity;
import org.apache.pekko.cluster.sharding.typed.javadsl.EntityTypeKey;
import org.apache.pekko.kafka.AutoSubscription;
import org.apache.pekko.kafka.ConsumerRebalanceEvent;
import org.apache.pekko.kafka.ConsumerSettings;
import org.apache.pekko.kafka.Subscriptions;
import org.apache.pekko.kafka.cluster.sharding.KafkaClusterSharding;
import org.apache.pekko.kafka.javadsl.Consumer;
import org.apache.pekko.stream.javadsl.Flow;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.util.Timeout;
public class ClusterShardingExample {
// #user-entity
static final class User {
public final String id;
public final String mame;
User(String id, String mame) {
this.id = id;
this.mame = mame;
}
}
// #user-entity
public static Behavior<User> userBehaviour() {
return Behaviors.empty();
}
public static <T> Flow<T, T, NotUsed> userBusiness() {
return Flow.create();
}
public static void example() {
ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "ClusterShardingExample");
String kafkaBootstrapServers = "localhost:9092";
// #message-extractor
// automatically retrieving the number of partitions requires a round trip to a Kafka broker
CompletionStage<KafkaClusterSharding.KafkaShardingNoEnvelopeExtractor<User>> messageExtractor =
KafkaClusterSharding.get(system)
.messageExtractorNoEnvelope(
"user-topic",
Duration.ofSeconds(10),
(User msg) -> msg.id,
ConsumerSettings.create(
Adapter.toClassic(system), new StringDeserializer(), new StringDeserializer()));
// #message-extractor
// #setup-cluster-sharding
String groupId = "user-topic-group-id";
EntityTypeKey<User> typeKey = EntityTypeKey.create(User.class, groupId);
messageExtractor.thenAccept(
extractor ->
ClusterSharding.get(system)
.init(
Entity.of(typeKey, ctx -> userBehaviour())
.withAllocationStrategy(
new ExternalShardAllocationStrategy(
system, typeKey.name(), Timeout.create(Duration.ofSeconds(5))))
.withMessageExtractor(extractor)));
// #setup-cluster-sharding
// #rebalance-listener
org.apache.pekko.actor.typed.ActorRef<ConsumerRebalanceEvent> rebalanceListener =
KafkaClusterSharding.get(system).rebalanceListener(typeKey);
ConsumerSettings<String, byte[]> consumerSettings =
ConsumerSettings.create(
Adapter.toClassic(system), new StringDeserializer(), new ByteArrayDeserializer())
.withBootstrapServers(kafkaBootstrapServers)
.withGroupId(
typeKey
.name()); // use the same group id as we used in the `EntityTypeKey` for `User`
// pass the rebalance listener to the topic subscription
AutoSubscription subscription =
Subscriptions.topics("user-topic")
.withRebalanceListener(Adapter.toClassic(rebalanceListener));
// run & materialize the stream
Consumer.plainSource(consumerSettings, subscription)
.via(userBusiness())
.runWith(Sink.ignore(), system);
// #rebalance-listener
}
}