blob: 31e845089f43f011cc41c3609c6b5dd39d4f2468 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/
/*
* Copyright (C) 2014 - 2016 Softwaremill <https://softwaremill.com>
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.javadsl;
import org.apache.pekko.Done;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.kafka.ConsumerSettings;
import org.apache.pekko.kafka.ProducerSettings;
import org.apache.pekko.kafka.Subscriptions;
import org.apache.pekko.kafka.javadsl.Consumer;
import org.apache.pekko.kafka.javadsl.Producer;
import org.apache.pekko.kafka.testkit.KafkaTestkitTestcontainersSettings;
import org.apache.pekko.kafka.testkit.TestcontainersKafkaJunit4Test;
import org.apache.pekko.stream.javadsl.Sink;
import org.apache.pekko.stream.javadsl.Source;
import org.apache.pekko.testkit.javadsl.TestKit;
import docs.scaladsl.SampleAvroClass;
// #imports
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.serializers.KafkaAvroDeserializer;
import io.confluent.kafka.serializers.KafkaAvroSerializer;
// #imports
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
// #imports
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
// #imports
import org.junit.AfterClass;
import org.junit.Test;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
// #schema-registry-settings
public class SchemaRegistrySerializationTest extends TestcontainersKafkaJunit4Test {
private static final ActorSystem sys = ActorSystem.create("SchemaRegistrySerializationTest");
private static final Executor ec = Executors.newSingleThreadExecutor();
public SchemaRegistrySerializationTest() {
// #schema-registry-settings
// NOTE: Overriding KafkaTestkitTestcontainersSettings doesn't necessarily do anything here
// because the JUnit testcontainer abstract classes run the testcontainers as a singleton.
// Whatever JUnit test spawns first is the only one that can override settings. To workaround
// this I've enabled the schema registry container for all tests in an application.conf.
// #schema-registry-settings
super(
sys,
KafkaTestkitTestcontainersSettings.create(sys)
.withInternalTopicsReplicationFactor(1)
.withSchemaRegistry(true));
}
// #schema-registry-settings
@Test
public void avroDeSerMustWorkWithSchemaRegistry() throws Exception {
final String topic = createTopic();
final String group = createGroupId();
// #serializer #de-serializer
Map<String, Object> kafkaAvroSerDeConfig = new HashMap<>();
kafkaAvroSerDeConfig.put(
AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl());
// #serializer #de-serializer
// #de-serializer
KafkaAvroDeserializer kafkaAvroDeserializer = new KafkaAvroDeserializer();
kafkaAvroDeserializer.configure(kafkaAvroSerDeConfig, false);
Deserializer<Object> deserializer = kafkaAvroDeserializer;
ConsumerSettings<String, Object> consumerSettings =
ConsumerSettings.create(sys, new StringDeserializer(), deserializer)
.withBootstrapServers(bootstrapServers())
.withGroupId(group)
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// #de-serializer
// #serializer
KafkaAvroSerializer kafkaAvroSerializer = new KafkaAvroSerializer();
kafkaAvroSerializer.configure(kafkaAvroSerDeConfig, false);
Serializer<Object> serializer = kafkaAvroSerializer;
ProducerSettings<String, Object> producerSettings =
ProducerSettings.create(sys, new StringSerializer(), serializer)
.withBootstrapServers(bootstrapServers());
SampleAvroClass sample = new SampleAvroClass("key", "name");
List<SampleAvroClass> samples = Arrays.asList(sample, sample, sample);
CompletionStage<Done> producerCompletion =
Source.from(samples)
.map(n -> new ProducerRecord<String, Object>(topic, n.key(), n))
.runWith(Producer.plainSink(producerSettings), sys);
// #serializer
// #de-serializer
Consumer.DrainingControl<List<ConsumerRecord<String, Object>>> controlCompletionStagePair =
Consumer.plainSource(consumerSettings, Subscriptions.topics(topic))
.take(samples.size())
.toMat(Sink.seq(), Consumer::createDrainingControl)
.run(sys);
// #de-serializer
assertThat(
controlCompletionStagePair.isShutdown().toCompletableFuture().get(20, TimeUnit.SECONDS),
is(Done.getInstance()));
List<ConsumerRecord<String, Object>> result =
controlCompletionStagePair
.drainAndShutdown(ec)
.toCompletableFuture()
.get(1, TimeUnit.SECONDS);
assertThat(result.size(), is(samples.size()));
}
@AfterClass
public static void afterClass() {
TestKit.shutdownActorSystem(sys);
}
// #schema-registry-settings
}
// #schema-registry-settings