| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * license agreements; and to You under the Apache License, version 2.0: |
| * |
| * https://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * This file is part of the Apache Pekko project, which was derived from Akka. |
| */ |
| |
| /* |
| * Copyright (C) 2014 - 2016 Softwaremill <https://softwaremill.com> |
| * Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com> |
| */ |
| |
| package docs.scaladsl |
| |
| import org.apache.pekko |
| import pekko.Done |
| import pekko.kafka.ProducerMessage.MultiResultPart |
| import pekko.kafka.scaladsl.{ Consumer, Producer } |
| import pekko.kafka.testkit.scaladsl.TestcontainersKafkaLike |
| import pekko.kafka.{ ProducerMessage, ProducerSettings, Subscriptions } |
| import pekko.stream.scaladsl.{ Keep, Sink, Source } |
| import pekko.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped |
| import org.apache.kafka.clients.producer.ProducerRecord |
| import org.apache.kafka.common.serialization.StringSerializer |
| |
| import scala.concurrent.Future |
| import scala.concurrent.duration._ |
| |
| class ProducerExample extends DocsSpecBase with TestcontainersKafkaLike { |
| |
| private def waitBeforeValidation(): Unit = sleep(6.seconds) |
| |
| val invalidTopicName = "---*---" |
| |
| "Creating a producer" should "work" in { |
| // #producer |
| // #settings |
| val config = system.settings.config.getConfig("pekko.kafka.producer") |
| val producerSettings = |
| ProducerSettings(config, new StringSerializer, new StringSerializer) |
| .withBootstrapServers(bootstrapServers) |
| // #settings |
| val kafkaProducer: Future[org.apache.kafka.clients.producer.Producer[String, String]] = |
| producerSettings.createKafkaProducerAsync() |
| |
| // using the kafka producer |
| |
| kafkaProducer.foreach(p => p.close()) |
| // #producer |
| } |
| |
| "PlainSink" should "work" in assertAllStagesStopped { |
| val config = system.settings.config.getConfig("pekko.kafka.producer") |
| val producerSettings = |
| ProducerSettings(config, new StringSerializer, new StringSerializer) |
| .withBootstrapServers(bootstrapServers) |
| val consumerSettings = consumerDefaults.withGroupId(createGroupId()) |
| val topic = createTopic() |
| // #plainSink |
| val done: Future[Done] = |
| Source(1 to 100) |
| .map(_.toString) |
| .map(value => new ProducerRecord[String, String](topic, value)) |
| .runWith(Producer.plainSink(producerSettings)) |
| // #plainSink |
| val (control2, result) = Consumer |
| .plainSource(consumerSettings, Subscriptions.topics(topic)) |
| .toMat(Sink.seq)(Keep.both) |
| .run() |
| waitBeforeValidation() |
| done.futureValue should be(Done) |
| control2.shutdown().futureValue should be(Done) |
| result.futureValue should have size 100 |
| } |
| |
| "PlainSink with shared producer" should "work" in assertAllStagesStopped { |
| val consumerSettings = consumerDefaults.withGroupId(createGroupId()) |
| val producerSettings = producerDefaults |
| val topic = createTopic() |
| // #plainSinkWithProducer |
| // create a producer |
| val kafkaProducer = producerSettings.createKafkaProducer() |
| val settingsWithProducer = producerSettings.withProducer(kafkaProducer) |
| |
| val done = Source(1 to 100) |
| .map(_.toString) |
| .map(value => new ProducerRecord[String, String](topic, value)) |
| .runWith(Producer.plainSink(settingsWithProducer)) |
| // #plainSinkWithProducer |
| val (control2, result) = Consumer |
| .plainSource(consumerSettings, Subscriptions.topics(topic)) |
| .toMat(Sink.seq)(Keep.both) |
| .run() |
| done.futureValue should be(Done) |
| waitBeforeValidation() |
| control2.shutdown().futureValue should be(Done) |
| result.futureValue should have size 100 |
| // #plainSinkWithProducer |
| |
| // close the producer after use |
| kafkaProducer.close() |
| // #plainSinkWithProducer |
| } |
| |
| "Metrics" should "be observed" in assertAllStagesStopped { |
| val config = system.settings.config.getConfig("pekko.kafka.producer") |
| val producerSettings = |
| ProducerSettings(config, new StringSerializer, new StringSerializer) |
| .withBootstrapServers(bootstrapServers) |
| producerSettings |
| .createKafkaProducerAsync() |
| .map { kafkaProducer => |
| // #producerMetrics |
| val metrics: java.util.Map[org.apache.kafka.common.MetricName, _ <: org.apache.kafka.common.Metric] = |
| kafkaProducer.metrics() // observe metrics |
| // #producerMetrics |
| metrics.isEmpty should be(false) |
| kafkaProducer.close() |
| Done |
| } |
| .futureValue shouldBe Done |
| } |
| |
| def createMessage[KeyType, ValueType, PassThroughType]( |
| key: KeyType, |
| value: ValueType, |
| passThrough: PassThroughType): ProducerMessage.Envelope[KeyType, ValueType, PassThroughType] = { |
| // #singleMessage |
| val single: ProducerMessage.Envelope[KeyType, ValueType, PassThroughType] = |
| ProducerMessage.single( |
| new ProducerRecord("topicName", key, value), |
| passThrough) |
| // #singleMessage |
| single |
| } |
| |
| def createMultiMessage[KeyType, ValueType, PassThroughType]( |
| key: KeyType, |
| value: ValueType, |
| passThrough: PassThroughType): ProducerMessage.Envelope[KeyType, ValueType, PassThroughType] = { |
| import scala.collection.immutable |
| // #multiMessage |
| val multi: ProducerMessage.Envelope[KeyType, ValueType, PassThroughType] = |
| ProducerMessage.multi( |
| immutable.Seq( |
| new ProducerRecord("topicName", key, value), |
| new ProducerRecord("anotherTopic", key, value)), |
| passThrough) |
| // #multiMessage |
| multi |
| } |
| |
| def createPassThroughMessage[KeyType, ValueType, PassThroughType]( |
| key: KeyType, |
| value: ValueType, |
| passThrough: PassThroughType): ProducerMessage.Envelope[KeyType, ValueType, PassThroughType] = { |
| // #passThroughMessage |
| val ptm: ProducerMessage.Envelope[KeyType, ValueType, PassThroughType] = |
| ProducerMessage.passThrough( |
| passThrough) |
| // #passThroughMessage |
| ptm |
| } |
| |
| "flexiFlow" should "work" in assertAllStagesStopped { |
| val producerSettings = producerDefaults |
| val topic = createTopic() |
| def println(s: String): Unit = {} |
| // format:off |
| // #flow |
| val done = Source(1 to 100) |
| .map { number => |
| val partition = 0 |
| val value = number.toString |
| ProducerMessage.single( |
| new ProducerRecord(topic, partition, "key", value), |
| number) |
| } |
| .via(Producer.flexiFlow(producerSettings)) |
| .map { |
| case ProducerMessage.Result(metadata, ProducerMessage.Message(record, passThrough)) => |
| s"${metadata.topic}/${metadata.partition} ${metadata.offset}: ${record.value}" |
| |
| case ProducerMessage.MultiResult(parts, passThrough) => |
| parts |
| .map { |
| case MultiResultPart(metadata, record) => |
| s"${metadata.topic}/${metadata.partition} ${metadata.offset}: ${record.value}" |
| } |
| .mkString(", ") |
| |
| case ProducerMessage.PassThroughResult(passThrough) => |
| s"passed through" |
| } |
| .runWith(Sink.foreach(println(_))) |
| // #flow |
| // format:on |
| val consumerSettings = consumerDefaults.withGroupId(createGroupId()) |
| val (control2, result) = Consumer |
| .plainSource(consumerSettings, Subscriptions.topics(topic)) |
| .toMat(Sink.seq)(Keep.both) |
| .run() |
| done.futureValue should be(Done) |
| waitBeforeValidation() |
| control2.shutdown().futureValue should be(Done) |
| result.futureValue should have size 100 |
| } |
| |
| // This showed a race fixed in https://github.com/akka/alpakka-kafka/pull/1025 |
| it should "fail stream with error from producing" in assertAllStagesStopped { |
| val streamCompletion = |
| Source |
| .single(ProducerMessage.single(new ProducerRecord[String, String](invalidTopicName, "1"))) |
| .via(Producer.flexiFlow(producerDefaults)) |
| .runWith(Sink.head) |
| |
| streamCompletion.failed.futureValue shouldBe an[org.apache.kafka.common.errors.InvalidTopicException] |
| } |
| |
| } |