blob: 5f643fb1978dc56505cc4e719998690dfcd00837 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/
/*
* Copyright (C) 2014 - 2016 Softwaremill <https://softwaremill.com>
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.scaladsl
import java.nio.charset.StandardCharsets
import org.apache.pekko
import pekko.Done
import pekko.kafka._
import pekko.kafka.scaladsl._
import pekko.kafka.testkit.KafkaTestkitTestcontainersSettings
import pekko.kafka.testkit.scaladsl.TestcontainersKafkaPerClassLike
import pekko.stream.scaladsl.{ Keep, Sink, Source }
import pekko.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
import pekko.stream.testkit.scaladsl.TestSink
import io.confluent.kafka.serializers.KafkaAvroDeserializerConfig
import org.apache.avro.specific.SpecificRecordBase
import org.apache.avro.util.Utf8
import org.apache.avro.{ AvroRuntimeException, Schema }
import org.apache.kafka.common.TopicPartition
import scala.collection.immutable
import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
// #imports
import io.confluent.kafka.serializers.{ AbstractKafkaSchemaSerDeConfig, KafkaAvroDeserializer, KafkaAvroSerializer }
import org.apache.avro.specific.SpecificRecord
// #imports
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
// #imports
import org.apache.kafka.common.serialization._
// #imports
// #schema-registry-settings
class SchemaRegistrySerializationSpec extends DocsSpecBase with TestcontainersKafkaPerClassLike {
override val testcontainersSettings = KafkaTestkitTestcontainersSettings(system).withSchemaRegistry(true)
// tests..
// #schema-registry-settings
"With SchemaRegistry" should "Avro serialization/deserialization work" in assertAllStagesStopped {
val group = createGroupId()
val topic = createTopic()
// #serializer #de-serializer
val kafkaAvroSerDeConfig = Map[String, Any](
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl,
KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG -> true.toString)
// #serializer #de-serializer
// #de-serializer
val consumerSettings: ConsumerSettings[String, SpecificRecord] = {
val kafkaAvroDeserializer = new KafkaAvroDeserializer()
kafkaAvroDeserializer.configure(kafkaAvroSerDeConfig.asJava, false)
val deserializer = kafkaAvroDeserializer.asInstanceOf[Deserializer[SpecificRecord]]
ConsumerSettings(system, new StringDeserializer, deserializer)
.withBootstrapServers(bootstrapServers)
.withGroupId(group)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
}
// #de-serializer
// #serializer
val producerSettings: ProducerSettings[String, SpecificRecord] = {
val kafkaAvroSerializer = new KafkaAvroSerializer()
kafkaAvroSerializer.configure(kafkaAvroSerDeConfig.asJava, false)
val serializer = kafkaAvroSerializer.asInstanceOf[Serializer[SpecificRecord]]
ProducerSettings(system, new StringSerializer, serializer)
.withBootstrapServers(bootstrapServers)
}
val samples = (1 to 3).map(i => SampleAvroClass(s"key_$i", s"name_$i"))
val producerCompletion =
Source(samples)
.map(n => new ProducerRecord[String, SpecificRecord](topic, n.key, n))
.runWith(Producer.plainSink(producerSettings))
// #serializer
awaitProduce(producerCompletion)
// #de-serializer
val (control, result) =
Consumer
.plainSource(consumerSettings, Subscriptions.topics(topic))
.take(samples.size.toLong)
.map(_.value())
.toMat(Sink.seq)(Keep.both)
.run()
// #de-serializer
control.isShutdown.futureValue should be(Done)
result.futureValue should contain theSameElementsInOrderAs samples
}
"Error in deserialization" should "signal undisguised" in assertAllStagesStopped {
val group = createGroupId()
val topic = createTopic()
val producerSettings: ProducerSettings[String, Array[Byte]] =
ProducerSettings(system, new StringSerializer, new ByteArraySerializer)
.withBootstrapServers(bootstrapServers)
val samples = immutable.Seq("String1")
val producerCompletion =
Source(samples)
.map(n => new ProducerRecord(topic, n, n.getBytes(StandardCharsets.UTF_8)))
.runWith(Producer.plainSink(producerSettings))
awaitProduce(producerCompletion)
val (control, result) =
Consumer
.plainSource(specificRecordConsumerSettings(group), Subscriptions.topics(topic))
.take(samples.size.toLong)
.toMat(Sink.seq)(Keep.both)
.run()
control.isShutdown.futureValue should be(Done)
result.failed.futureValue shouldBe a[org.apache.kafka.common.errors.SerializationException]
}
it should "signal to all streams of a shared actor in same request, and keep others alive" in assertAllStagesStopped {
val group = createGroupId()
val topic = createTopic(suffix = 0, partitions = 3)
val consumerActor =
system.actorOf(KafkaConsumerActor.props(specificRecordConsumerSettings(group)), "sharedKafkaConsumerActor")
val samples = immutable.Seq("String1", "String2", "String3")
val (control1, probe1) =
Consumer
.plainExternalSource[String, SpecificRecord](consumerActor,
Subscriptions.assignment(new TopicPartition(topic, 0)))
.toMat(TestSink.probe)(Keep.both)
.run()
val (control2, probe2) =
Consumer
.plainExternalSource[String, SpecificRecord](consumerActor,
Subscriptions.assignment(new TopicPartition(topic, 1)))
.toMat(TestSink.probe)(Keep.both)
.run()
val (thisStreamStaysAlive, probe3) =
Consumer
.plainExternalSource[String, SpecificRecord](consumerActor,
Subscriptions.assignment(new TopicPartition(topic, 2)))
.toMat(TestSink.probe)(Keep.both)
.run()
// request from 2 streams
probe1.request(samples.size.toLong)
probe2.request(samples.size.toLong)
sleep(500.millis, "to establish demand")
val producerSettings: ProducerSettings[String, Array[Byte]] =
ProducerSettings(system, new StringSerializer, new ByteArraySerializer)
.withBootstrapServers(bootstrapServers)
Source(samples)
.map(n => new ProducerRecord(topic, 0, n, n.getBytes(StandardCharsets.UTF_8)))
.runWith(Producer.plainSink(producerSettings))
probe1.expectError() shouldBe a[org.apache.kafka.common.errors.SerializationException]
probe2.expectError() shouldBe a[org.apache.kafka.common.errors.SerializationException]
probe3.cancel()
control1.isShutdown.futureValue should be(Done)
control2.isShutdown.futureValue should be(Done)
thisStreamStaysAlive.shutdown().futureValue should be(Done)
consumerActor ! KafkaConsumerActor.Stop
}
it should "signal to sub-sources" in assertAllStagesStopped {
val group = createGroupId()
val topic = createTopic()
val (control1, partitionedProbe) =
Consumer
.plainPartitionedSource(specificRecordConsumerSettings(group), Subscriptions.topics(topic))
.toMat(TestSink.probe)(Keep.both)
.run()
partitionedProbe.request(1L)
val (_, subSource) = partitionedProbe.expectNext()
val subStream = subSource.runWith(TestSink.probe)
subStream.request(1L)
val producerSettings: ProducerSettings[String, Array[Byte]] =
ProducerSettings(system, new StringSerializer, new ByteArraySerializer)
.withBootstrapServers(bootstrapServers)
Source
.single(new ProducerRecord(topic, 0, "0", "0".getBytes(StandardCharsets.UTF_8)))
.runWith(Producer.plainSink(producerSettings))
subStream.expectError() shouldBe a[org.apache.kafka.common.errors.SerializationException]
control1.shutdown().futureValue should be(Done)
}
private def specificRecordConsumerSettings(group: String): ConsumerSettings[String, SpecificRecord] = {
val kafkaAvroSerDeConfig = Map[String, Any] {
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> schemaRegistryUrl
}
val kafkaAvroDeserializer = new KafkaAvroDeserializer()
kafkaAvroDeserializer.configure(kafkaAvroSerDeConfig.asJava, false)
val deserializer = kafkaAvroDeserializer.asInstanceOf[Deserializer[SpecificRecord]]
ConsumerSettings(system, new StringDeserializer, deserializer)
.withBootstrapServers(bootstrapServers)
.withGroupId(group)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
}
// #schema-registry-settings
}
// #schema-registry-settings
case class SampleAvroClass(var key: String, var name: String) extends SpecificRecordBase {
def this() = this(null, null)
override def get(i: Int): AnyRef = i match {
case 0 => key
case 1 => name
case index => throw new AvroRuntimeException(s"Unknown index: $index")
}
override def put(i: Int, v: scala.Any): Unit = i match {
case 0 => key = asString(v)
case 1 => name = asString(v)
case index => throw new AvroRuntimeException(s"Unknown index: $index")
}
private def asString(v: Any) =
v match {
case utf8: Utf8 => utf8.toString
case _ => v.asInstanceOf[String]
}
override def getSchema: Schema = SampleAvroClass.SCHEMA$
}
object SampleAvroClass {
val SCHEMA$ : Schema =
new org.apache.avro.Schema.Parser().parse("""
|{"namespace": "docs.scaladsl",
| "type": "record",
| "name": "SampleAvroClass",
| "fields": [
| {"name": "key", "type": "string"},
| {"name": "name", "type": "string"}
| ]
|}
""".stripMargin)
}