blob: 56f7d226301498a8f1de19de7ce82a8c7bb7e156 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/
/*
* Copyright (C) 2014 - 2016 Softwaremill <https://softwaremill.com>
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.javadsl;
import org.apache.pekko.Done;
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.japi.Pair;
import org.apache.pekko.kafka.CommitterSettings;
import org.apache.pekko.kafka.ConsumerMessage;
import org.apache.pekko.kafka.ProducerMessage;
import org.apache.pekko.kafka.javadsl.Committer;
import org.apache.pekko.kafka.javadsl.Consumer;
import org.apache.pekko.kafka.tests.javadsl.LogCapturingJunit4;
import org.apache.pekko.stream.javadsl.Flow;
import org.apache.pekko.stream.javadsl.Keep;
import org.apache.pekko.stream.javadsl.Source;
import org.apache.pekko.testkit.javadsl.TestKit;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.junit.AfterClass;
import org.junit.Rule;
import org.junit.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.CoreMatchers.*;
// #factories
import org.apache.pekko.kafka.testkit.ConsumerResultFactory;
import org.apache.pekko.kafka.testkit.ProducerResultFactory;
import org.apache.pekko.kafka.testkit.javadsl.ConsumerControlFactory;
// #factories
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
public class TestkitSamplesTest {
@Rule public final LogCapturingJunit4 logCapturing = new LogCapturingJunit4();
private static final ActorSystem sys = ActorSystem.create("TestkitSamplesTest");
@AfterClass
public static void afterClass() {
TestKit.shutdownActorSystem(sys);
}
@Test
public void withoutBrokerTesting() throws Exception {
String topic = "topic";
String targetTopic = "target-topic";
String groupId = "group1";
long startOffset = 100L;
int partition = 0;
CommitterSettings committerSettings = CommitterSettings.create(sys);
// #factories
// create elements emitted by the mocked Consumer
List<ConsumerMessage.CommittableMessage<String, String>> elements =
Arrays.asList(
ConsumerResultFactory.committableMessage(
new ConsumerRecord<>(topic, partition, startOffset, "key", "value 1"),
ConsumerResultFactory.committableOffset(
groupId, topic, partition, startOffset, "metadata")),
ConsumerResultFactory.committableMessage(
new ConsumerRecord<>(topic, partition, startOffset + 1, "key", "value 2"),
ConsumerResultFactory.committableOffset(
groupId, topic, partition, startOffset + 1, "metadata 2")));
// create a source imitating the Consumer.committableSource
Source<ConsumerMessage.CommittableMessage<String, String>, Consumer.Control>
mockedKafkaConsumerSource =
Source.cycle(elements::iterator)
.viaMat(ConsumerControlFactory.controlFlow(), Keep.right());
// create a source imitating the Producer.flexiFlow
Flow<
ProducerMessage.Envelope<String, String, ConsumerMessage.CommittableOffset>,
ProducerMessage.Results<String, String, ConsumerMessage.CommittableOffset>,
NotUsed>
mockedKafkaProducerFlow =
Flow
.<ProducerMessage.Envelope<String, String, ConsumerMessage.CommittableOffset>>
create()
.map(
msg -> {
if (msg instanceof ProducerMessage.Message<String, String, ConsumerMessage.CommittableOffset> pmsg) {
return ProducerResultFactory.result(pmsg);
} else throw new RuntimeException("unexpected element: " + msg);
});
// run the flow as if it was connected to a Kafka broker
Pair<Consumer.Control, CompletionStage<Done>> stream =
mockedKafkaConsumerSource
.map(
msg -> {
ProducerMessage.Envelope<String, String, ConsumerMessage.CommittableOffset>
message =
new ProducerMessage.Message<>(
new ProducerRecord<>(
targetTopic, msg.record().key(), msg.record().value()),
msg.committableOffset());
return message;
})
.via(mockedKafkaProducerFlow)
.map(ProducerMessage.Results::passThrough)
.toMat(Committer.sink(committerSettings), Keep.both())
.run(sys);
// #factories
Thread.sleep(1 * 1000L);
assertThat(
stream.first().shutdown().toCompletableFuture().get(2, TimeUnit.SECONDS),
is(Done.getInstance()));
assertThat(
stream.second().toCompletableFuture().get(2, TimeUnit.SECONDS), is(Done.getInstance()));
}
}