| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * license agreements; and to You under the Apache License, version 2.0: |
| * |
| * https://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * This file is part of the Apache Pekko project, which was derived from Akka. |
| */ |
| |
| /* |
| * Copyright (C) 2014 - 2016 Softwaremill <https://softwaremill.com> |
| * Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com> |
| */ |
| |
| package docs.javadsl; |
| |
| import static org.junit.Assert.assertEquals; |
| |
| import java.time.Duration; |
| import java.util.List; |
| import java.util.concurrent.CompletionStage; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.atomic.AtomicReference; |
| import org.apache.kafka.clients.consumer.ConsumerRecord; |
| import org.apache.kafka.clients.producer.ProducerRecord; |
| import org.apache.pekko.Done; |
| import org.apache.pekko.NotUsed; |
| import org.apache.pekko.actor.ActorSystem; |
| import org.apache.pekko.kafka.*; |
| import org.apache.pekko.kafka.javadsl.Consumer; |
| import org.apache.pekko.kafka.javadsl.Transactional; |
| import org.apache.pekko.kafka.testkit.TestcontainersKafkaJunit4Test; |
| import org.apache.pekko.kafka.tests.javadsl.LogCapturingJunit4; |
| import org.apache.pekko.stream.RestartSettings; |
| import org.apache.pekko.stream.javadsl.*; |
| import org.apache.pekko.testkit.javadsl.TestKit; |
| import org.junit.AfterClass; |
| import org.junit.Rule; |
| import org.junit.Test; |
| |
| public class TransactionsExampleTest extends TestcontainersKafkaJunit4Test { |
| |
| @Rule public final LogCapturingJunit4 logCapturing = new LogCapturingJunit4(); |
| |
| private static final ActorSystem system = ActorSystem.create("TransactionsExampleTest"); |
| private final ExecutorService ec = Executors.newSingleThreadExecutor(); |
| private final ProducerSettings<String, String> producerSettings = txProducerDefaults(); |
| |
| public TransactionsExampleTest() { |
| super(system); |
| } |
| |
| @AfterClass |
| public static void afterClass() { |
| TestKit.shutdownActorSystem(system); |
| } |
| |
| protected void assertDone(CompletionStage<Done> stage) throws Exception { |
| assertEquals(Done.done(), resultOf(stage)); |
| } |
| |
| protected <T> Flow<T, T, NotUsed> business() { |
| return Flow.create(); |
| } |
| |
| /** Overridden to set a different default timeout for [[#resultOf]]. Default is 5 seconds. */ |
| protected Duration resultOfTimeout() { |
| return Duration.ofSeconds(15); |
| } |
| |
| @Test |
| public void sourceSink() throws Exception { |
| ConsumerSettings<String, String> consumerSettings = |
| consumerDefaults().withGroupId(createGroupId()); |
| String sourceTopic = createTopic(1); |
| String targetTopic = createTopic(2); |
| String transactionalId = createTransactionalId(); |
| // #transactionalSink |
| Consumer.DrainingControl<Done> control = |
| Transactional.source(consumerSettings, Subscriptions.topics(sourceTopic)) |
| .via(business()) |
| .map( |
| msg -> |
| ProducerMessage.single( |
| new ProducerRecord<>(targetTopic, msg.record().key(), msg.record().value()), |
| msg.partitionOffset())) |
| .toMat( |
| Transactional.sink(producerSettings, transactionalId), |
| Consumer::createDrainingControl) |
| .run(system); |
| |
| // ... |
| |
| // #transactionalSink |
| String testConsumerGroup = createGroupId(2); |
| Consumer.DrainingControl<List<ConsumerRecord<String, String>>> consumer = |
| consumeString(probeConsumerSettings(testConsumerGroup), targetTopic, 10); |
| produceString(sourceTopic, 10, partition0); |
| assertDone(consumer.isShutdown()); |
| // #transactionalSink |
| control.drainAndShutdown(ec); |
| // #transactionalSink |
| assertDone(control.isShutdown()); |
| assertEquals(10, resultOf(consumer.drainAndShutdown(ec)).size()); |
| } |
| |
| @Test |
| public void withOffsetContext() throws Exception { |
| ConsumerSettings<String, String> consumerSettings = |
| consumerDefaults().withGroupId(createGroupId()); |
| String sourceTopic = createTopic(1); |
| String targetTopic = createTopic(2); |
| String transactionalId = createTransactionalId(); |
| Consumer.DrainingControl<Done> control = |
| Transactional.sourceWithOffsetContext(consumerSettings, Subscriptions.topics(sourceTopic)) |
| .via(business()) |
| .map( |
| record -> |
| ProducerMessage.single( |
| new ProducerRecord<>(targetTopic, record.key(), record.value()))) |
| .toMat( |
| Transactional.sinkWithOffsetContext(producerSettings, transactionalId), |
| Consumer::createDrainingControl) |
| .run(system); |
| |
| String testConsumerGroup = createGroupId(2); |
| Consumer.DrainingControl<List<ConsumerRecord<String, String>>> consumer = |
| consumeString(probeConsumerSettings(testConsumerGroup), targetTopic, 10); |
| produceString(sourceTopic, 10, partition0); |
| assertDone(consumer.isShutdown()); |
| control.drainAndShutdown(ec); |
| assertDone(control.isShutdown()); |
| assertEquals(10, resultOf(consumer.drainAndShutdown(ec)).size()); |
| } |
| |
| @Test |
| public void usingRestartSource() throws Exception { |
| ConsumerSettings<String, String> consumerSettings = |
| consumerDefaults().withGroupId(createGroupId()); |
| String sourceTopic = createTopic(1); |
| String targetTopic = createTopic(2); |
| String transactionalId = createTransactionalId(); |
| // #transactionalFailureRetry |
| AtomicReference<Consumer.Control> innerControl = |
| new AtomicReference<>(Consumer.createNoopControl()); |
| |
| Source<ProducerMessage.Results<String, String, ConsumerMessage.PartitionOffset>, NotUsed> |
| stream = |
| RestartSource.onFailuresWithBackoff( |
| RestartSettings.create( |
| java.time.Duration.ofSeconds(3), // min backoff |
| java.time.Duration.ofSeconds(30), // max backoff |
| 0.2), // adds 20% "noise" to vary the intervals slightly |
| () -> |
| Transactional.source(consumerSettings, Subscriptions.topics(sourceTopic)) |
| .via(business()) |
| .map( |
| msg -> |
| ProducerMessage.single( |
| new ProducerRecord<>( |
| targetTopic, msg.record().key(), msg.record().value()), |
| msg.partitionOffset())) |
| // side effect out the `Control` materialized value because it can't be |
| // propagated through the `RestartSource` |
| .mapMaterializedValue( |
| control -> { |
| innerControl.set(control); |
| return control; |
| }) |
| .via(Transactional.flow(producerSettings, transactionalId))); |
| |
| CompletionStage<Done> streamCompletion = stream.runWith(Sink.ignore(), system); |
| |
| // Add shutdown hook to respond to SIGTERM and gracefully shutdown stream |
| Runtime.getRuntime().addShutdownHook(new Thread(() -> innerControl.get().shutdown())); |
| // #transactionalFailureRetry |
| String testConsumerGroup = createGroupId(2); |
| int messages = 10; |
| Consumer.DrainingControl<List<ConsumerRecord<String, String>>> consumer = |
| consumeString(probeConsumerSettings(testConsumerGroup), targetTopic, messages); |
| assertDone(produceString(sourceTopic, messages, partition0)); |
| assertDone(consumer.isShutdown()); |
| assertDone(innerControl.get().shutdown()); |
| assertEquals(messages, resultOf(consumer.drainAndShutdown(ec)).size()); |
| assertDone(streamCompletion); |
| } |
| |
| // @Test |
| // public void partitionedSourceSink() throws Exception { |
| // ConsumerSettings<String, String> consumerSettings = |
| // consumerDefaults().withGroupId(createGroupId(1)); |
| // String sourceTopic = createTopic(1, 2, 1); |
| // String targetTopic = createTopic(2, 1, 1); |
| // String transactionalId = createTransactionalId(1); |
| // // #partitionedTransactionalSink |
| // Consumer.DrainingControl<Done> control = |
| // Transactional.partitionedSource(consumerSettings, Subscriptions.topics(sourceTopic)) |
| // .mapAsync( |
| // 8, |
| // pair -> { |
| // Source<ConsumerMessage.TransactionalMessage<String, String>, NotUsed> source = |
| // pair.second(); |
| // return source |
| // .via(business()) |
| // .map( |
| // msg -> |
| // ProducerMessage.single( |
| // new ProducerRecord<>( |
| // targetTopic, msg.record().key(), msg.record().value()), |
| // msg.partitionOffset())) |
| // .runWith(Transactional.sink(producerSettings, transactionalId), |
| // materializer); |
| // }) |
| // .toMat(Sink.ignore(), Keep.both()) |
| // .mapMaterializedValue(Consumer::createDrainingControl) |
| // .run(materializer); |
| // |
| // // ... |
| // |
| // // #partitionedTransactionalSink |
| // String testConsumerGroup = createGroupId(2); |
| // Consumer.DrainingControl<List<ConsumerRecord<String, String>>> consumer = |
| // consumeString(probeConsumerSettings(testConsumerGroup), targetTopic, 10); |
| // produceString(sourceTopic, 10, partition0); |
| // assertDone(consumer.isShutdown()); |
| // // #partitionedTransactionalSink |
| // control.drainAndShutdown(ec); |
| // // #partitionedTransactionalSink |
| // assertDone(control.isShutdown()); |
| // assertEquals(10, resultOf(consumer.drainAndShutdown(ec)).size()); |
| // } |
| |
| protected Consumer.DrainingControl<List<ConsumerRecord<String, String>>> consumeString( |
| ConsumerSettings<String, String> settings, String topic, long take) { |
| return Consumer.plainSource(settings, Subscriptions.topics(topic)) |
| .take(take) |
| .toMat(Sink.seq(), Consumer::createDrainingControl) |
| .run(system); |
| } |
| |
| public ConsumerSettings<String, String> probeConsumerSettings(String groupId) { |
| return TransactionsOps$.MODULE$.withProbeConsumerSettings(this.consumerDefaults(), groupId); |
| } |
| |
| @Override |
| public ProducerSettings<String, String> producerDefaults() { |
| return TransactionsOps$.MODULE$.withTestProducerSettings(super.producerDefaults()); |
| } |
| |
| public ProducerSettings<String, String> txProducerDefaults() { |
| return TransactionsOps$.MODULE$.withTransactionalProducerSettings(super.producerDefaults()); |
| } |
| } |