| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * license agreements; and to You under the Apache License, version 2.0: |
| * |
| * https://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * This file is part of the Apache Pekko project, which was derived from Akka. |
| */ |
| |
| /* |
| * Copyright (C) 2014 - 2016 Softwaremill <https://softwaremill.com> |
| * Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com> |
| */ |
| |
| package docs.scaladsl |
| |
| import org.apache.pekko |
| import pekko.actor.ActorRef |
| import pekko.actor.typed.scaladsl.Behaviors |
| import pekko.kafka.scaladsl.Consumer |
| import pekko.kafka.testkit.KafkaTestkitTestcontainersSettings |
| import pekko.kafka.testkit.scaladsl.TestcontainersKafkaPerClassLike |
| import pekko.kafka.{ KafkaConsumerActor, Subscriptions } |
| import pekko.stream.scaladsl.{ Keep, Sink } |
| import pekko.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped |
| import org.apache.kafka.common.{ Metric, MetricName, TopicPartition } |
| |
| import scala.annotation.nowarn |
| import scala.concurrent.duration._ |
| import scala.concurrent.{ Await, Future } |
| |
| class PartitionExamples extends DocsSpecBase with TestcontainersKafkaPerClassLike { |
| |
| override val testcontainersSettings = |
| KafkaTestkitTestcontainersSettings(system) |
| .withInternalTopicsReplicationFactor(1) |
| .withConfigureKafka { brokerContainers => |
| brokerContainers.foreach { |
| _.withEnv("KAFKA_BROKER_ID", "1") |
| .withEnv("KAFKA_NUM_PARTITIONS", "3") |
| .withEnv("KAFKA_OFFSETS_TOPIC_NUM_PARTITIONS", "3") |
| } |
| } |
| |
| "Externally controlled kafka consumer" should "work" in assertAllStagesStopped { |
| val consumerSettings = consumerDefaults.withGroupId(createGroupId()) |
| val topic = createTopic(suffix = 0, partitions = 3) |
| val partition1 = 1 |
| val partition2 = 2 |
| // #consumerActor |
| // Consumer is represented by actor |
| val consumer: ActorRef = system.actorOf(KafkaConsumerActor.props(consumerSettings)) |
| |
| // Manually assign topic partition to it |
| val (controlPartition1, result1) = Consumer |
| .plainExternalSource[String, Array[Byte]]( |
| consumer, |
| Subscriptions.assignment(new TopicPartition(topic, partition1))) |
| .via(businessFlow) |
| .toMat(Sink.seq)(Keep.both) |
| .run() |
| |
| // Manually assign another topic partition |
| val (controlPartition2, result2) = Consumer |
| .plainExternalSource[String, Array[Byte]]( |
| consumer, |
| Subscriptions.assignment(new TopicPartition(topic, partition2))) |
| .via(businessFlow) |
| .toMat(Sink.seq)(Keep.both) |
| .run() |
| |
| // .... |
| |
| // #consumerActor |
| awaitProduce(produce(topic, 1 to 10, partition1), produce(topic, 1 to 10, partition2)) |
| awaitMultiple(2.seconds, |
| // #consumerActor |
| controlPartition1.shutdown() |
| // #consumerActor |
| , |
| // #consumerActor |
| controlPartition2.shutdown() |
| // #consumerActor |
| ) |
| // #consumerActor |
| consumer ! KafkaConsumerActor.Stop |
| // #consumerActor |
| |
| result1.futureValue should have size 10 |
| result2.futureValue should have size 10 |
| } |
| |
| "Typed Externally controlled kafka consumer" should "work" in assertAllStagesStopped { |
| val consumerSettings = consumerDefaults.withGroupId(createGroupId()) |
| |
| val _ = Behaviors.setup[Nothing] { context => |
| // #consumerActorTyped |
| // adds support for actors to a classic actor system and context |
| import pekko.actor.typed.scaladsl.adapter._ |
| |
| // Consumer is represented by actor |
| // #consumerActorTyped |
| @nowarn("cat=unused") |
| // #consumerActorTyped |
| val consumer: ActorRef = |
| context.actorOf(KafkaConsumerActor.props(consumerSettings), "kafka-consumer-actor") |
| // #consumerActorTyped |
| |
| Behaviors.empty |
| } |
| |
| } |
| |
| "Consumer Metrics" should "work" in assertAllStagesStopped { |
| val consumerSettings = consumerDefaults.withGroupId(createGroupId()) |
| val topic = createTopic(suffix = 0, partitions = 3) |
| val partition = 1 |
| def println(s: String): Unit = {} |
| // #consumerMetrics |
| val control: Consumer.Control = Consumer |
| .plainSource(consumerSettings, Subscriptions.assignment(new TopicPartition(topic, partition))) |
| .via(businessFlow) |
| .to(Sink.ignore) |
| .run() |
| |
| // #consumerMetrics |
| // can be removed when https://github.com/akka/alpakka-kafka/issues/528 is fixed |
| sleep(500.millis) |
| // #consumerMetrics |
| |
| val metrics: Future[Map[MetricName, Metric]] = control.metrics |
| metrics.foreach(map => println(s"metrics: ${map.mkString("\n")}")) |
| // #consumerMetrics |
| Await.result(metrics, 4.seconds) should not be Symbol("empty") |
| control.shutdown() |
| } |
| |
| } |