blob: 55e9f8b74b0de672d00b2354bc485cd9315953cf [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/
/*
* Copyright (C) 2014 - 2016 Softwaremill <https://softwaremill.com>
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.scaladsl
import org.apache.pekko
import org.apache.pekko.kafka.ConsumerSettings
import pekko.kafka.scaladsl.MetadataClient
import pekko.kafka.testkit.scaladsl.TestcontainersKafkaLike
import org.scalatest.TryValues
import org.scalatest.time.{ Seconds, Span }
// #metadata
// #metadataClient
import org.apache.pekko
import pekko.actor.ActorRef
import pekko.kafka.{ KafkaConsumerActor, Metadata }
import pekko.pattern.ask
import pekko.util.Timeout
import org.apache.kafka.common.TopicPartition
import scala.concurrent.Future
import scala.concurrent.duration._
// #metadata
// #metadataClient
class FetchMetadata extends DocsSpecBase with TestcontainersKafkaLike with TryValues {
override implicit val patienceConfig: PatienceConfig =
PatienceConfig(timeout = scaled(Span(20, Seconds)), interval = scaled(Span(1, Seconds)))
"Consumer metadata" should "be available" in {
val consumerSettings = consumerDefaults.withGroupId(createGroupId())
val topic = createTopic()
// #metadata
val timeout: FiniteDuration = 5.seconds
val settings = consumerSettings.withMetadataRequestTimeout(timeout)
implicit val askTimeout: Timeout = Timeout(timeout)
val consumer: ActorRef = system.actorOf(KafkaConsumerActor.props(settings))
val topicsFuture: Future[Metadata.Topics] = (consumer ? Metadata.ListTopics).mapTo[Metadata.Topics]
topicsFuture.map(_.response.foreach { map =>
println("Found topics:")
map.foreach {
case (topic, partitionInfo) =>
partitionInfo.foreach { info =>
println(s" $topic: $info")
}
}
})
// #metadata
(topicsFuture.futureValue.response should be).a(Symbol("success"))
topicsFuture.futureValue.response.get(topic) should not be Symbol("empty")
}
"Get topic list" should "return result" in {
val topic1 = createTopic(1)
val group1 = createGroupId(1)
val partition0 = new TopicPartition(topic1, 0)
val consumerSettings = consumerDefaults.withGroupId(group1)
awaitProduce(produce(topic1, 1 to 10))
// #metadataClient
val metadataClient = MetadataClient.create(consumerSettings, 1.second)
val beginningOffsets = metadataClient
.getBeginningOffsets(Set(partition0))
.futureValue
// #metadataClient
beginningOffsets(partition0) shouldBe 0
// #metadataClient
metadataClient.close()
// #metadataClient
}
"Get offsets" should "timeout fast" in {
val consumerSettings: ConsumerSettings[String, String] = consumerDefaults
.withGroupId(createGroupId())
.withMetadataRequestTimeout(100.millis)
val topic = createTopic()
implicit val timeout: Timeout = Timeout(consumerSettings.metadataRequestTimeout * 2)
val consumer: ActorRef = system.actorOf(KafkaConsumerActor.props(consumerSettings))
val nonExistentPartition = 42
val topicsFuture: Future[Metadata.EndOffsets] =
(consumer ? Metadata.GetEndOffsets(Set(new TopicPartition(topic, nonExistentPartition))))
.mapTo[Metadata.EndOffsets]
val response = topicsFuture.futureValue.response
(response should be).a(Symbol("failure"))
response.failed.get shouldBe a[org.apache.kafka.common.errors.TimeoutException]
}
it should "return" in {
val consumerSettings: ConsumerSettings[String, String] = consumerDefaults
.withGroupId(createGroupId())
.withMetadataRequestTimeout(5.seconds)
val topic = createTopic()
implicit val timeout: Timeout = Timeout(consumerSettings.metadataRequestTimeout * 2)
val consumer: ActorRef = system.actorOf(KafkaConsumerActor.props(consumerSettings))
val partition = 0
val tp = new TopicPartition(topic, partition)
val topicsFuture: Future[Metadata.EndOffsets] =
(consumer ? Metadata.GetEndOffsets(Set(tp))).mapTo[Metadata.EndOffsets]
val response = topicsFuture.futureValue.response
(response should be).a(Symbol("success"))
response.get(tp) should be(0L)
}
}