| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * license agreements; and to You under the Apache License, version 2.0: |
| * |
| * https://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * This file is part of the Apache Pekko project, which was derived from Akka. |
| */ |
| |
| /* |
| * Copyright (C) 2014 - 2016 Softwaremill <https://softwaremill.com> |
| * Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com> |
| */ |
| |
| package docs.scaladsl |
| |
| import org.apache.pekko |
| import pekko.actor.ActorSystem |
| import pekko.kafka.ConsumerMessage.CommittableOffset |
| import pekko.kafka.scaladsl.{ Committer, Consumer } |
| import pekko.kafka.{ CommitterSettings, ConsumerMessage, ProducerMessage } |
| import pekko.stream.scaladsl.{ Flow, Keep, Source } |
| import pekko.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped |
| import pekko.testkit.TestKit |
| import pekko.{ Done, NotUsed } |
| import org.apache.kafka.clients.consumer.ConsumerRecord |
| import org.apache.kafka.clients.producer.ProducerRecord |
| import org.scalatest.BeforeAndAfterAll |
| import org.scalatest.concurrent.{ IntegrationPatience, ScalaFutures } |
| import org.scalatest.flatspec.AnyFlatSpecLike |
| import org.scalatest.matchers.should.Matchers |
| |
| class TestkitSamplesSpec |
| extends TestKit(ActorSystem("example")) |
| with AnyFlatSpecLike |
| with Matchers |
| with BeforeAndAfterAll |
| with ScalaFutures |
| with IntegrationPatience { |
| |
| override protected def afterAll(): Unit = TestKit.shutdownActorSystem(system) |
| |
| "Without broker testing" should "be possible" in assertAllStagesStopped { |
| val topic = "topic" |
| val targetTopic = "target-topic" |
| val groupId = "group1" |
| val startOffset = 100L |
| val partition = 0 |
| val committerSettings = CommitterSettings(system) |
| |
| // #factories |
| import org.apache.pekko |
| import pekko.kafka.testkit.scaladsl.ConsumerControlFactory |
| import pekko.kafka.testkit.{ ConsumerResultFactory, ProducerResultFactory } |
| |
| // create elements emitted by the mocked Consumer |
| val elements = (0 to 10).map { i => |
| val nextOffset = startOffset + i |
| ConsumerResultFactory.committableMessage( |
| new ConsumerRecord(topic, partition, nextOffset, "key", s"value $i"), |
| ConsumerResultFactory.committableOffset(groupId, topic, partition, nextOffset, s"metadata $i")) |
| } |
| |
| // create a source imitating the Consumer.committableSource |
| val mockedKafkaConsumerSource: Source[ConsumerMessage.CommittableMessage[String, String], Consumer.Control] = |
| Source(elements).viaMat(ConsumerControlFactory.controlFlow())(Keep.right) |
| |
| // create a source imitating the Producer.flexiFlow |
| val mockedKafkaProducerFlow: Flow[ProducerMessage.Envelope[String, String, CommittableOffset], |
| ProducerMessage.Results[String, String, CommittableOffset], NotUsed] = |
| Flow[ProducerMessage.Envelope[String, String, CommittableOffset]] |
| .map { |
| case msg: ProducerMessage.Message[String, String, CommittableOffset] => |
| ProducerResultFactory.result(msg) |
| case other => throw new Exception(s"excluded: $other") |
| } |
| |
| // run the flow as if it was connected to a Kafka broker |
| val (control, streamCompletion) = mockedKafkaConsumerSource |
| .map(msg => |
| ProducerMessage.Message( |
| new ProducerRecord[String, String](targetTopic, msg.record.value), |
| msg.committableOffset)) |
| .via(mockedKafkaProducerFlow) |
| .map(_.passThrough) |
| .toMat(Committer.sink(committerSettings))(Keep.both) |
| .run() |
| // #factories |
| |
| streamCompletion.futureValue should be(Done) |
| } |
| } |