blob: 6f937f6effff6aa77d0d4fb7a7d52a4fe8fcfa93 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/
/*
* Copyright (C) 2014 - 2016 Softwaremill <https://softwaremill.com>
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.scaladsl
import java.util.concurrent.atomic.AtomicReference
import org.apache.pekko
import pekko.Done
import pekko.kafka.scaladsl.Consumer.{ Control, DrainingControl }
import pekko.kafka.scaladsl.{ Consumer, Transactional }
import pekko.kafka.testkit.scaladsl.TestcontainersKafkaLike
import pekko.kafka.{ ConsumerSettings, ProducerMessage, ProducerSettings, Repeated, Subscriptions, TransactionsOps }
import pekko.stream.RestartSettings
import pekko.stream.scaladsl.{ Keep, RestartSource, Sink }
import pekko.stream.testkit.scaladsl.StreamTestKit.assertAllStagesStopped
import org.apache.kafka.clients.producer.ProducerRecord
import scala.concurrent.Await
import scala.concurrent.duration._
class TransactionsExample extends DocsSpecBase with TestcontainersKafkaLike with TransactionsOps with Repeated {
override def sleepAfterProduce: FiniteDuration = 10.seconds
"Transactional sink" should "work" in assertAllStagesStopped {
val consumerSettings = consumerDefaults.withGroupId(createGroupId())
val producerSettings = txProducerDefaults
val sourceTopic = createTopic(1)
val sinkTopic = createTopic(2)
val transactionalId = createTransactionalId()
// #transactionalSink
val control =
Transactional
.source(consumerSettings, Subscriptions.topics(sourceTopic))
.via(businessFlow)
.map { msg =>
ProducerMessage.single(new ProducerRecord(sinkTopic, msg.record.key, msg.record.value), msg.partitionOffset)
}
.toMat(Transactional.sink(producerSettings, transactionalId))(DrainingControl.apply)
.run()
// ...
// #transactionalSink
val testConsumerGroup = createGroupId(2)
val (control2, result) = Consumer
.plainSource(withProbeConsumerSettings(consumerSettings, testConsumerGroup), Subscriptions.topics(sinkTopic))
.toMat(Sink.seq)(Keep.both)
.run()
awaitProduce(produce(sourceTopic, 1 to 10))
control.drainAndShutdown().futureValue should be(Done)
control2.shutdown().futureValue should be(Done)
// #transactionalSink
control.drainAndShutdown()
// #transactionalSink
result.futureValue should have size 10
}
it should "support `withOffsetContext`" in assertAllStagesStopped {
val consumerSettings = consumerDefaults.withGroupId(createGroupId())
val producerSettings = txProducerDefaults
val sourceTopic = createTopic(1)
val sinkTopic = createTopic(2)
val control =
Transactional
.sourceWithOffsetContext(consumerSettings, Subscriptions.topics(sourceTopic))
.via(businessFlow)
.map { record =>
ProducerMessage.single(new ProducerRecord(sinkTopic, record.key, record.value))
}
.toMat(Transactional.sinkWithOffsetContext(producerSettings, createTransactionalId()))(DrainingControl.apply)
.run()
val testConsumerGroup = createGroupId(2)
val (control2, result) = Consumer
.plainSource(probeConsumerSettings(testConsumerGroup), Subscriptions.topics(sinkTopic))
.toMat(Sink.seq)(Keep.both)
.run()
awaitProduce(produce(sourceTopic, 1 to 10))
control.drainAndShutdown().futureValue shouldBe Done
control2.shutdown().futureValue shouldBe Done
result.futureValue should have size 10
}
"TransactionsFailureRetryExample" should "work" in assertAllStagesStopped {
val consumerSettings = consumerDefaults.withGroupId(createGroupId())
val producerSettings = txProducerDefaults
val sourceTopic = createTopic(1)
val sinkTopic = createTopic(2)
val transactionalId = createTransactionalId()
// #transactionalFailureRetry
val innerControl = new AtomicReference[Control](Consumer.NoopControl)
val stream = RestartSource.onFailuresWithBackoff(
RestartSettings(
minBackoff = 1.seconds,
maxBackoff = 30.seconds,
randomFactor = 0.2)) { () =>
Transactional
.source(consumerSettings, Subscriptions.topics(sourceTopic))
.via(businessFlow)
.map { msg =>
ProducerMessage.single(new ProducerRecord(sinkTopic, msg.record.key, msg.record.value), msg.partitionOffset)
}
// side effect out the `Control` materialized value because it can't be propagated through the `RestartSource`
.mapMaterializedValue(c => innerControl.set(c))
.via(Transactional.flow(producerSettings, transactionalId))
}
stream.runWith(Sink.ignore)
// Add shutdown hook to respond to SIGTERM and gracefully shutdown stream
sys.ShutdownHookThread {
Await.result(innerControl.get.shutdown(), 10.seconds)
}
// #transactionalFailureRetry
val testConsumerGroup = createGroupId(2)
val (control2, result) = Consumer
.plainSource(probeConsumerSettings(testConsumerGroup), Subscriptions.topics(sinkTopic))
.toMat(Sink.seq)(Keep.both)
.run()
awaitProduce(produce(sourceTopic, 1 to 10))
innerControl.get.shutdown().futureValue should be(Done)
control2.shutdown().futureValue should be(Done)
result.futureValue should have size 10
}
// "Partitioned transactional sink" should "work" in {
// val consumerSettings = consumerDefaults.withGroupId(createGroupId())
// val producerSettings = txProducerDefaults
// val maxPartitions = 2
// val sourceTopic = createTopic(1, maxPartitions, 1)
// val sinkTopic = createTopicName(2)
// val transactionalId = createTransactionalId()
// // #partitionedTransactionalSink
// val control =
// Transactional
// .partitionedSource(consumerSettings, Subscriptions.topics(sourceTopic))
// .mapAsyncUnordered(maxPartitions) {
// case (tp, source) =>
// source
// .via(businessFlow)
// .map { msg =>
// ProducerMessage.single(new ProducerRecord(sinkTopic, msg.record.key, msg.record.value),
// msg.partitionOffset)
// }
// .runWith(Transactional.sink(producerSettings, transactionalId))
// }
// .toMat(Sink.ignore)(DrainingControl.form)
// .run()
// // ...
//
// // #partitionedTransactionalSink
// val testConsumerGroup = createGroupId(2)
// val (control2, result) = Consumer
// .plainSource(probeConsumerSettings(testConsumerGroup), Subscriptions.topics(sinkTopic))
// .toMat(Sink.seq)(Keep.both)
// .run()
//
// awaitProduce(produce(sourceTopic, 1 to 10))
// control.drainAndShutdown().futureValue should be(Done)
// control2.shutdown().futureValue should be(Done)
// // #partitionedTransactionalSink
// control.drainAndShutdown()
// // #partitionedTransactionalSink
// result.futureValue should have size (10)
// }
def probeConsumerSettings(groupId: String): ConsumerSettings[String, String] =
withProbeConsumerSettings(consumerDefaults, groupId)
override def producerDefaults: ProducerSettings[String, String] =
withTestProducerSettings(super.producerDefaults)
def txProducerDefaults: ProducerSettings[String, String] =
withTransactionalProducerSettings(super.producerDefaults)
}