blob: a5405703da20e44bf0eb7e2fde7365f50a910757 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* license agreements; and to You under the Apache License, version 2.0:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* This file is part of the Apache Pekko project, which was derived from Akka.
*/
/*
* Copyright (C) 2014 - 2016 Softwaremill <https://softwaremill.com>
* Copyright (C) 2016 - 2020 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.javadsl;
import org.apache.pekko.Done;
import org.apache.pekko.NotUsed;
import org.apache.pekko.actor.AbstractLoggingActor;
import org.apache.pekko.actor.ActorRef;
import org.apache.pekko.actor.ActorSystem;
import org.apache.pekko.actor.Props;
import org.apache.pekko.actor.typed.Behavior;
import org.apache.pekko.actor.typed.javadsl.ActorContext;
import org.apache.pekko.actor.typed.javadsl.Behaviors;
// #withTypedRebalanceListenerActor
// #consumerActorTyped
// adds support for actors to a classic actor system and context
import org.apache.pekko.actor.typed.javadsl.Adapter;
// #consumerActorTyped
// #withTypedRebalanceListenerActor
import org.apache.pekko.japi.Pair;
import org.apache.pekko.kafka.*;
import org.apache.pekko.kafka.javadsl.Committer;
import org.apache.pekko.kafka.javadsl.Consumer;
import org.apache.pekko.kafka.javadsl.Producer;
import org.apache.pekko.kafka.javadsl.PartitionAssignmentHandler;
import org.apache.pekko.kafka.testkit.TestcontainersKafkaTest;
import org.apache.pekko.kafka.tests.javadsl.LogCapturingExtension;
import org.apache.pekko.stream.RestartSettings;
import org.apache.pekko.stream.javadsl.*;
import org.apache.pekko.testkit.javadsl.TestKit;
import com.typesafe.config.Config;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInstance;
import org.junit.jupiter.api.extension.ExtendWith;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import static org.hamcrest.CoreMatchers.*;
import static org.hamcrest.MatcherAssert.*;
import static org.junit.Assert.assertEquals;
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@ExtendWith(LogCapturingExtension.class)
class ConsumerExampleTest extends TestcontainersKafkaTest {
private static final ActorSystem system = ActorSystem.create("ConsumerExampleTest");
private static final Executor executor = Executors.newSingleThreadExecutor();
ConsumerExampleTest() {
super(system);
}
@AfterAll
void afterClass() {
TestKit.shutdownActorSystem(system);
}
void assertDone(CompletionStage<Done> stage) throws Exception {
assertEquals(Done.done(), resultOf(stage));
}
private <T> Flow<T, T, NotUsed> business() {
return Flow.create();
}
// #settings
final Config config = system.settings().config().getConfig("pekko.kafka.consumer");
final ConsumerSettings<String, byte[]> consumerSettings =
ConsumerSettings.create(config, new StringDeserializer(), new ByteArrayDeserializer())
.withBootstrapServers("localhost:9092")
.withGroupId("group1")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// #settings
final ConsumerSettings<String, byte[]> consumerSettingsWithAutoCommit =
// #settings-autocommit
consumerSettings
.withProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
.withProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "5000");
// #settings-autocommit
@Test
void plainSourceWithExternalOffsetStorage() throws Exception {
ConsumerSettings<String, String> consumerSettings =
consumerDefaults().withGroupId(createGroupId());
String topic = createTopic();
// #plainSource
final OffsetStorage db = new OffsetStorage();
CompletionStage<Consumer.Control> controlCompletionStage =
db.loadOffset()
.thenApply(
fromOffset ->
Consumer.plainSource(
consumerSettings,
Subscriptions.assignmentWithOffset(
new TopicPartition(topic, partition0), fromOffset))
.mapAsync(1, db::businessLogicAndStoreOffset)
.to(Sink.ignore())
.run(system));
// #plainSource
assertDone(produceString(topic, 10, partition0));
while (db.offsetStore.get() < 9L) {
sleepMillis(100, "until offsets have increased");
}
assertDone(resultOf(controlCompletionStage).shutdown());
}
// #plainSource
class OffsetStorage {
// #plainSource
private final AtomicLong offsetStore = new AtomicLong(0L);
// #plainSource
public CompletionStage<Done> businessLogicAndStoreOffset(
ConsumerRecord<String, String> record) { // ... }
// #plainSource
offsetStore.set(record.offset());
return CompletableFuture.completedFuture(Done.getInstance());
}
// #plainSource
public CompletionStage<Long> loadOffset() { // ... }
// #plainSource
return CompletableFuture.completedFuture(offsetStore.get());
}
public CompletionStage<Done> storeProcessedOffset(long offset) { // ... }
offsetStore.set(offset);
return CompletableFuture.completedFuture(Done.getInstance());
}
// #plainSource
}
// #plainSource
@Test
void configInheritance() throws Exception {
// #config-inheritance
Config config = system.settings().config().getConfig("our-kafka-consumer");
ConsumerSettings<String, String> consumerSettings =
ConsumerSettings.create(config, new StringDeserializer(), new StringDeserializer());
// #config-inheritance
assertEquals("kafka-host:9092", consumerSettings.getProperty("bootstrap.servers"));
}
@Test
void atMostOnce() throws Exception {
ConsumerSettings<String, String> consumerSettings =
consumerDefaults().withGroupId(createGroupId());
String topic = createTopic();
// #atMostOnce
Consumer.Control control =
Consumer.atMostOnceSource(consumerSettings, Subscriptions.topics(topic))
.mapAsync(10, record -> business(record.key(), record.value()))
.to(Sink.foreach(it -> System.out.println("Done with " + it)))
.run(system);
// #atMostOnce
assertDone(produceString(topic, 10, partition0));
assertDone(control.shutdown());
}
// #atMostOnce #atLeastOnce
CompletionStage<String> business(String key, String value) { // .... }
// #atMostOnce #atLeastOnce
return CompletableFuture.completedFuture("");
}
@Test
void atLeastOnce() throws Exception {
ConsumerSettings<String, String> consumerSettings =
consumerDefaults().withGroupId(createGroupId());
CommitterSettings committerSettings = committerDefaults();
String topic = createTopic();
// #atLeastOnce
Consumer.DrainingControl<Done> control =
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
.mapAsync(
1,
msg ->
business(msg.record().key(), msg.record().value())
.thenApply(done -> msg.committableOffset()))
.toMat(
Committer.sink(committerSettings.withMaxBatch(1)), Consumer::createDrainingControl)
.run(system);
// #atLeastOnce
assertDone(produceString(topic, 10, partition0));
assertDone(control.drainAndShutdown(executor));
}
@Test
void atLeastOnceWithCommitterSink() throws Exception {
ConsumerSettings<String, String> consumerSettings =
consumerDefaults().withGroupId(createGroupId());
String topic = createTopic();
Config config = system.settings().config().getConfig(CommitterSettings.configPath());
// #committerSink
CommitterSettings committerSettings = CommitterSettings.create(config);
Consumer.DrainingControl<Done> control =
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
.mapAsync(
1,
msg ->
business(msg.record().key(), msg.record().value())
.<ConsumerMessage.Committable>thenApply(done -> msg.committableOffset()))
.toMat(Committer.sink(committerSettings), Consumer::createDrainingControl)
.run(system);
// #committerSink
assertDone(produceString(topic, 10, partition0));
assertDone(control.drainAndShutdown(executor));
}
@Test
void commitWithMetadata() throws Exception {
ConsumerSettings<String, String> consumerSettings =
consumerDefaults().withGroupId(createGroupId());
CommitterSettings committerSettings = committerDefaults();
String topic = createTopic();
// #commitWithMetadata
Consumer.DrainingControl<Done> control =
Consumer.commitWithMetadataSource(
consumerSettings,
Subscriptions.topics(topic),
(record) -> Long.toString(record.timestamp()))
.mapAsync(
1,
msg ->
business(msg.record().key(), msg.record().value())
.thenApply(done -> msg.committableOffset()))
.toMat(Committer.sink(committerSettings), Consumer::createDrainingControl)
.run(system);
// #commitWithMetadata
assertDone(produceString(topic, 10, partition0));
assertDone(control.drainAndShutdown(executor));
}
@Test
void consumerToProducer() throws Exception {
ConsumerSettings<String, String> consumerSettings =
consumerDefaults().withGroupId(createGroupId());
ProducerSettings<String, String> producerSettings = producerDefaults();
CommitterSettings committerSettings = committerDefaults();
String topic1 = createTopic(1);
String topic2 = createTopic(2);
String targetTopic = createTopic(10);
// #consumerToProducerSink
Consumer.DrainingControl<Done> control =
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic1, topic2))
.map(
msg ->
ProducerMessage.<String, String, ConsumerMessage.Committable>single(
new ProducerRecord<>(targetTopic, msg.record().key(), msg.record().value()),
msg.committableOffset()))
.toMat(
Producer.committableSink(producerSettings, committerSettings),
Consumer::createDrainingControl)
.run(system);
// #consumerToProducerSink
assertDone(produceString(topic1, 10, partition0));
assertDone(produceString(topic2, 10, partition0));
Consumer.DrainingControl<List<ConsumerRecord<String, String>>> consumer =
consumeString(targetTopic, 20);
assertDone(consumer.isShutdown());
assertEquals(20, resultOf(consumer.drainAndShutdown(executor)).size());
assertDone(control.drainAndShutdown(executor));
}
@Test
void consumerToProducerWithContext() throws Exception {
ConsumerSettings<String, String> consumerSettings =
consumerDefaults().withGroupId(createGroupId());
ProducerSettings<String, String> producerSettings = producerDefaults();
CommitterSettings committerSettings = committerDefaults();
String topic1 = createTopic(1);
String topic2 = createTopic(2);
String targetTopic = createTopic(10);
// #consumerToProducerWithContext
Consumer.DrainingControl<Done> control =
Consumer.sourceWithOffsetContext(consumerSettings, Subscriptions.topics(topic1, topic2))
.map(
record ->
ProducerMessage.single(
new ProducerRecord<>(targetTopic, record.key(), record.value())))
.toMat(
Producer.committableSinkWithOffsetContext(producerSettings, committerSettings),
Consumer::createDrainingControl)
.run(system);
// #consumerToProducerWithContext
assertDone(produceString(topic1, 10, partition0));
assertDone(produceString(topic2, 10, partition0));
Consumer.DrainingControl<List<ConsumerRecord<String, String>>> consumer =
consumeString(targetTopic, 20);
assertDone(consumer.isShutdown());
assertEquals(20, resultOf(consumer.drainAndShutdown(executor)).size());
assertDone(control.drainAndShutdown(executor));
}
@Test
void committablePartitionedSource() throws Exception {
ConsumerSettings<String, String> consumerSettings =
consumerDefaults().withGroupId(createGroupId()).withStopTimeout(Duration.ofMillis(10));
CommitterSettings committerSettings = committerDefaults();
int maxPartitions = 2;
String topic = createTopic(1, maxPartitions);
// #committablePartitionedSource
Consumer.DrainingControl<Done> control =
Consumer.committablePartitionedSource(consumerSettings, Subscriptions.topics(topic))
.flatMapMerge(maxPartitions, Pair::second)
.via(business())
.map(msg -> msg.committableOffset())
.toMat(Committer.sink(committerSettings), Consumer::createDrainingControl)
.run(system);
// #committablePartitionedSource
assertDone(produceString(topic, 10, partition0));
assertDone(control.drainAndShutdown(executor));
}
@Test
void streamPerPartition() throws Exception {
ConsumerSettings<String, String> consumerSettings =
consumerDefaults().withGroupId(createGroupId()).withStopTimeout(Duration.ofMillis(10));
CommitterSettings committerSettings = committerDefaults();
int maxPartitions = 2;
String topic = createTopic(1, maxPartitions);
// #committablePartitionedSource-stream-per-partition
Consumer.DrainingControl<Done> control =
Consumer.committablePartitionedSource(consumerSettings, Subscriptions.topics(topic))
.mapAsyncUnordered(
maxPartitions,
pair -> {
Source<ConsumerMessage.CommittableMessage<String, String>, NotUsed> source =
pair.second();
return source
.via(business())
.map(message -> message.committableOffset())
.runWith(Committer.sink(committerSettings), system);
})
.toMat(Sink.ignore(), Consumer::createDrainingControl)
.run(system);
// #committablePartitionedSource-stream-per-partition
assertDone(produceString(topic, 10, partition0));
assertDone(control.drainAndShutdown(executor));
}
@Test
void consumerActor() throws Exception {
ConsumerSettings<String, String> consumerSettings =
consumerDefaults().withGroupId(createGroupId());
ActorRef self = system.deadLetters();
String topic = createTopic(1, 2);
int partition0 = 0;
int partition1 = 1;
Behaviors.setup(
ctx -> {
// #consumerActorTyped
// Consumer is represented by actor
ActorRef consumer = Adapter.actorOf(ctx, KafkaConsumerActor.props(consumerSettings));
// #consumerActorTyped
return Behaviors.empty();
});
// #consumerActor
// Consumer is represented by actor
ActorRef consumer = system.actorOf((KafkaConsumerActor.props(consumerSettings)));
// Manually assign topic partition to it
Consumer.Control controlPartition1 =
Consumer.plainExternalSource(
consumer, Subscriptions.assignment(new TopicPartition(topic, partition0)))
.via(business())
.to(Sink.ignore())
.run(system);
// Manually assign another topic partition
Consumer.Control controlPartition2 =
Consumer.plainExternalSource(
consumer, Subscriptions.assignment(new TopicPartition(topic, partition1)))
.via(business())
.to(Sink.ignore())
.run(system);
// #consumerActor
CompletionStage<Done> producePartition0 = produceString(topic, 10, partition0);
CompletionStage<Done> producePartition1 = produceString(topic, 10, partition1);
assertDone(producePartition0);
assertDone(producePartition1);
assertDone(controlPartition1.shutdown());
assertDone(controlPartition2.shutdown());
// #consumerActor
consumer.tell(KafkaConsumerActor.stop(), self);
// #consumerActor
}
@Test
void restartSource() throws Exception {
ConsumerSettings<String, String> consumerSettings =
consumerDefaults().withGroupId(createGroupId());
String topic = createTopic(1, 2);
// #restartSource
AtomicReference<Consumer.Control> control = new AtomicReference<>(Consumer.createNoopControl());
RestartSettings restartSettings =
RestartSettings.create(Duration.ofSeconds(3), Duration.ofSeconds(30), 0.2);
CompletionStage<Done> streamCompletion =
RestartSource.onFailuresWithBackoff(
restartSettings,
() ->
Consumer.plainSource(consumerSettings, Subscriptions.topics(topic))
.mapMaterializedValue(
c -> {
// this is a hack to get access to the Consumer.Control
// instances of the latest Kafka Consumer source
control.set(c);
return c;
})
.via(business()))
.runWith(Sink.ignore(), system);
// #restartSource
assertDone(produceString(topic, 10, partition0));
// #restartSource
control.get().drainAndShutdown(streamCompletion, system.getDispatcher());
// #restartSource
}
// #withRebalanceListenerActor
static class RebalanceListener extends AbstractLoggingActor {
@Override
public Receive createReceive() {
return receiveBuilder()
.match(
TopicPartitionsAssigned.class,
assigned -> {
log().info("Assigned: {}", assigned);
})
.match(
TopicPartitionsRevoked.class,
revoked -> {
log().info("Revoked: {}", revoked);
})
.build();
}
}
// #withRebalanceListenerActor
@Test
void withRebalanceListener() throws Exception {
ConsumerSettings<String, String> consumerSettings =
consumerDefaults().withGroupId(createGroupId());
String topic = createTopic();
int messageCount = 10;
// #withRebalanceListenerActor
ActorRef rebalanceListener = system.actorOf(Props.create(RebalanceListener.class));
Subscription subscription =
Subscriptions.topics(topic)
// additionally, pass the actor reference:
.withRebalanceListener(rebalanceListener);
// use the subscription as usual:
Consumer.DrainingControl<List<ConsumerRecord<String, String>>> control =
Consumer.plainSource(consumerSettings, subscription)
// #withRebalanceListenerActor
.take(messageCount)
// #withRebalanceListenerActor
.toMat(Sink.seq(), Consumer::createDrainingControl)
.run(system);
// #withRebalanceListenerActor
assertDone(produceString(topic, messageCount, partition0));
assertDone(control.isShutdown());
assertEquals(messageCount, resultOf(control.drainAndShutdown(executor)).size());
}
@Test
void withTypedRebalanceListener() throws Exception {
ConsumerSettings<String, String> consumerSettings =
consumerDefaults().withGroupId(createGroupId());
String topic = createTopic();
int messageCount = 10;
// #withTypedRebalanceListenerActor
Function<ActorContext<ConsumerRebalanceEvent>, Behavior<ConsumerRebalanceEvent>>
rebalanceListener =
(ActorContext<ConsumerRebalanceEvent> context) ->
Behaviors.receive(ConsumerRebalanceEvent.class)
.onMessage(
TopicPartitionsAssigned.class,
assigned -> {
context.getLog().info("Assigned: {}", assigned);
return Behaviors.same();
})
.onMessage(
TopicPartitionsRevoked.class,
revoked -> {
context.getLog().info("Revoked: {}", revoked);
return Behaviors.same();
})
.build();
// #withTypedRebalanceListenerActor
Behavior<Object> guardian =
Behaviors.setup(
guardianCtx -> {
// #withTypedRebalanceListenerActor
Behavior<ConsumerRebalanceEvent> listener =
Behaviors.setup(ctx -> rebalanceListener.apply(ctx));
org.apache.pekko.actor.typed.ActorRef<ConsumerRebalanceEvent> typedRef =
guardianCtx.spawn(listener, "rebalance-listener");
org.apache.pekko.actor.ActorRef classicRef = Adapter.toClassic(typedRef);
Subscription subscription =
Subscriptions.topics(topic)
// additionally, pass the actor reference:
.withRebalanceListener(classicRef);
Consumer.DrainingControl<List<ConsumerRecord<String, String>>> control =
// use the subscription as usual:
Consumer.plainSource(consumerSettings, subscription)
// #withTypedRebalanceListenerActor
.take(messageCount)
// #withTypedRebalanceListenerActor
.toMat(Sink.seq(), Consumer::createDrainingControl)
.run(system);
// #withTypedRebalanceListenerActor
assertDone(produceString(topic, messageCount, partition0));
assertDone(control.isShutdown());
assertEquals(messageCount, resultOf(control.drainAndShutdown(executor)).size());
return Behaviors.stopped();
});
org.apache.pekko.actor.typed.ActorSystem<Object> typed =
org.apache.pekko.actor.typed.ActorSystem.create(
guardian, "typed-rebalance-listener-example");
assertDone(typed.getWhenTerminated());
}
@Test
void withPartitionAssignmentHandler() throws Exception {
ConsumerSettings<String, String> consumerSettings =
consumerDefaults().withGroupId(createGroupId());
String topic = createTopic();
TopicPartition tp = new TopicPartition(topic, 0);
int messageCount = 10;
AtomicReference<Set<TopicPartition>> revoked = new AtomicReference<>();
AtomicReference<Set<TopicPartition>> assigned = new AtomicReference<>();
AtomicReference<Set<TopicPartition>> stopped = new AtomicReference<>();
// #partitionAssignmentHandler
PartitionAssignmentHandler assignmentHandler =
new PartitionAssignmentHandler() {
public void onRevoke(Set<TopicPartition> revokedTps, RestrictedConsumer consumer) {
// #partitionAssignmentHandler
revoked.set(revokedTps);
// #partitionAssignmentHandler
}
public void onAssign(Set<TopicPartition> assignedTps, RestrictedConsumer consumer) {
// #partitionAssignmentHandler
assigned.set(assignedTps);
// #partitionAssignmentHandler
}
public void onLost(Set<TopicPartition> lostTps, RestrictedConsumer consumer) {}
public void onStop(Set<TopicPartition> currentTps, RestrictedConsumer consumer) {
// #partitionAssignmentHandler
stopped.set(currentTps);
// #partitionAssignmentHandler
}
};
Subscription subscription =
Subscriptions.topics(topic).withPartitionAssignmentHandler(assignmentHandler);
// #partitionAssignmentHandler
Consumer.DrainingControl<List<ConsumerRecord<String, String>>> control =
Consumer.plainSource(consumerSettings, subscription)
.take(messageCount)
.toMat(Sink.seq(), Consumer::createDrainingControl)
.run(system);
assertDone(produceString(topic, messageCount, partition0));
assertDone(control.isShutdown());
assertEquals(messageCount, resultOf(control.drainAndShutdown(executor)).size());
assertThat(assigned.get(), hasItem(tp));
assertThat(stopped.get(), hasItem(tp));
assertThat(revoked.get(), hasItem(tp)); // revoke of partitions occurs after stop
}
@Test
void consumerMetrics() throws Exception {
ConsumerSettings<String, String> consumerSettings =
consumerDefaults().withGroupId(createGroupId());
String topic = createTopic();
// #consumerMetrics
// run the stream to obtain the materialized Control value
Consumer.DrainingControl<Done> control =
Consumer.plainSource(
consumerSettings, Subscriptions.assignment(new TopicPartition(topic, 0)))
.via(business())
.toMat(Sink.ignore(), Consumer::createDrainingControl)
.run(system);
// #consumerMetrics
sleepMillis(
100,
"to let the control establish itself (fails with `java.lang.IllegalStateException: not yet initialized: only setHandler is allowed in GraphStageLogic constructor)` otherwise");
// #consumerMetrics
CompletionStage<Map<MetricName, Metric>> metrics = control.getMetrics();
metrics.thenAccept(map -> System.out.println("Metrics: " + map));
// #consumerMetrics
assertDone(control.drainAndShutdown(executor));
}
@Test
void shutdownPlainSource() {
ConsumerSettings<String, String> consumerSettings =
consumerDefaults().withGroupId(createGroupId());
String topic = createTopic();
// #shutdownPlainSource
final OffsetStorage db = new OffsetStorage();
CompletionStage<Consumer.DrainingControl<Done>> control =
db.loadOffset()
.thenApply(
fromOffset ->
Consumer.plainSource(
consumerSettings,
Subscriptions.assignmentWithOffset(
new TopicPartition(topic, 0), fromOffset))
.mapAsync(
10,
record ->
business(record.key(), record.value())
.thenApply(res -> db.storeProcessedOffset(record.offset())))
.toMat(Sink.ignore(), Consumer::createDrainingControl)
.run(system));
// Shutdown the consumer when desired
control.thenAccept(c -> c.drainAndShutdown(executor));
// #shutdownPlainSource
}
@Test
void shutdownCommittable() throws Exception {
ConsumerSettings<String, String> consumerSettings =
consumerDefaults().withGroupId(createGroupId());
String topic = createTopic();
CommitterSettings committerSettings = committerDefaults();
// TODO there is a problem with combining `take` and committing.
int messageCount = 1;
// #shutdownCommittableSource
final Executor ec = Executors.newCachedThreadPool();
Consumer.DrainingControl<Done> control =
Consumer.committableSource(
consumerSettings.withStopTimeout(Duration.ZERO), Subscriptions.topics(topic))
.mapAsync(
1,
msg ->
business(msg.record().key(), msg.record().value())
.thenApply(done -> msg.committableOffset()))
// #shutdownCommittableSource
.take(messageCount)
// #shutdownCommittableSource
.toMat(
Committer.sink(committerSettings.withMaxBatch(1)), Consumer::createDrainingControl)
.run(system);
// #shutdownCommittableSource
assertDone(produceString(topic, messageCount, partition0));
assertDone(control.isShutdown());
assertEquals(Done.done(), resultOf(control.drainAndShutdown(ec), Duration.ofSeconds(20)));
// #shutdownCommittableSource
control.drainAndShutdown(ec);
// #shutdownCommittableSource
}
}