blob: 07e2edbf54de42f09cdc6dfab1b6a472ec73b588 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/
/*
* Copyright (C) 2014 - 2016 Softwaremill <https://softwaremill.com>
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.scaladsl
import org.apache.pekko
import pekko.Done
import pekko.kafka.ProducerMessage.MultiResult
import pekko.kafka.scaladsl.{ Consumer, SendProducer }
import pekko.kafka.testkit.scaladsl.TestcontainersKafkaLike
import pekko.kafka.{ ConsumerSettings, ProducerMessage, Subscriptions }
import pekko.stream.scaladsl.{ Keep, Sink }
import org.apache.kafka.clients.producer.{ ProducerRecord, RecordMetadata }
import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.{ Await, Future }
class SendProducerSpec extends DocsSpecBase with TestcontainersKafkaLike {
"Simple producer" should "send producer records" in {
val topic1 = createTopic(1)
// #record
val producer = SendProducer(producerDefaults)
try {
val send: Future[RecordMetadata] = producer
.send(new ProducerRecord(topic1, "key", "value"))
// Blocking here for illustration only, you need to handle the future result
Await.result(send, 2.seconds)
// #record
send.futureValue.topic() shouldBe topic1
val read = consumeHead(consumerDefaults.withGroupId(createGroupId()), topic1)
read.futureValue shouldBe "value"
// #record
} finally {
Await.result(producer.close(), 1.minute)
}
// #record
}
it should "send a multi-message (with one record)" in {
val topic1 = createTopic(1)
val producer = SendProducer(producerDefaults)
try {
// #envelope
val message = ProducerMessage.multi(immutable.Seq(new ProducerRecord(topic1, "key", "value")), "context")
val send: Future[ProducerMessage.Results[String, String, String]] = producer.sendEnvelope(message)
// Blocking here for illustration only, you need to handle the future result
Await.result(send, 2.seconds)
// #envelope
val result = send.futureValue
result match {
case MultiResult(immutable.Seq(part), "context") =>
part.metadata.topic() shouldBe topic1
case other => fail(s"unexpected result $other")
}
val read = consumeHead(consumerDefaults.withGroupId(createGroupId()), topic1)
read.futureValue shouldBe "value"
} finally {
producer.close().futureValue shouldBe Done
}
}
it should "send a multi-message (with multiple records)" in {
val topic1 = createTopic(1)
// #multiMessage
val producer = SendProducer(producerDefaults)
try {
val envelope: ProducerMessage.Envelope[String, String, String] =
ProducerMessage.multi(immutable.Seq(
new ProducerRecord(topic1, "key", "value1"),
new ProducerRecord(topic1, "key", "value2"),
new ProducerRecord(topic1, "key", "value3")),
"context")
val send: Future[ProducerMessage.Results[String, String, String]] = producer.sendEnvelope(envelope)
// #multiMessage
val result = send.futureValue
result match {
case MultiResult(immutable.Seq(part1, part2, part3), "context") =>
part1.metadata.topic() shouldBe topic1
case other => fail(s"unexpected result $other")
}
val read = consume(consumerDefaults.withGroupId(createGroupId()), topic1, elements = 3)
read.futureValue should contain theSameElementsInOrderAs Seq("value1", "value2", "value3")
// #multiMessage
} finally {
Await.result(producer.close(), 1.minute)
}
// #multiMessage
}
"Mis-configured producer" should "fail the send future" in {
val topic1 = createTopic(1)
val producer = SendProducer(producerDefaults.withBootstrapServers("unkownhost"))
try {
val send = producer.send(new ProducerRecord(topic1, "key", "value"))
send.failed.futureValue shouldBe a[org.apache.kafka.common.KafkaException]
send.failed.futureValue.getCause shouldBe a[org.apache.kafka.common.config.ConfigException]
} finally {
producer.close().failed.futureValue shouldBe a[org.apache.kafka.common.KafkaException]
}
}
private def consume(consumerSettings: ConsumerSettings[String, String], topic: String, elements: Long) = {
Consumer
.plainSource(consumerSettings, Subscriptions.topics(topic))
.map(_.value)
.take(elements)
.toMat(Sink.seq)(Keep.right)
.run()
}
private def consumeHead(consumerSettings: ConsumerSettings[String, String], topic: String) =
consume(consumerSettings, topic, 1).map(_.head)
}