KAFKA-19775: Don't fail if nextOffsetsAndMetadataToBeConsumed is not available. (#20665)
Before we added caching for consumer next offsets we'd called
`mainConsumer.position` and always expected something back. When we
added the caching, we kept the check that we always have nextOffset, but
as the logic changed to fetching the offsets from poll, we may not have
anything for topics that have no messages. This PR accounts for that.
Reviewers: Lucas Brutschy <lbrutschy@confluent.io>, Matthias J. Sax
<matthias@confluent.io>
diff --git a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
index e65399e..8808187 100644
--- a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
+++ b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
@@ -87,6 +87,8 @@
private static final int NUM_BROKERS = 1;
public static final EmbeddedKafkaCluster CLUSTER = new EmbeddedKafkaCluster(NUM_BROKERS);
+ private static final List<TopicPartition> ASSIGNED_PARTITIONS = new ArrayList<>();
+
@BeforeAll
public static void startCluster() throws IOException, InterruptedException {
CLUSTER.start();
@@ -254,6 +256,64 @@
}
}
+ @Test
+ public void shouldNotCrashIfPatternMatchesTopicHasNoData() throws Exception {
+ final String topic1 = "TEST-TOPIC-1";
+ final String topic2 = "TEST-TOPIC-2";
+
+ try {
+ CLUSTER.createTopic(topic1);
+
+ final StreamsBuilder builder = new StreamsBuilder();
+ final KStream<String, String> pattern1Stream = builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
+ builder.stream(Pattern.compile("not-a-match"));
+ final List<String> assignedTopics = new CopyOnWriteArrayList<>();
+
+ pattern1Stream
+ .selectKey((k, v) -> k)
+ .groupByKey()
+ .aggregate(() -> "", (k, v, a) -> v)
+ .toStream().to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
+
+ final Topology topology = builder.build();
+ assertThat(topology.describe().subtopologies().size(), greaterThan(1));
+ streams = new KafkaStreams(builder.build(), streamsConfiguration, new DefaultKafkaClientSupplier() {
+ @Override
+ public Consumer<byte[], byte[]> getConsumer(final Map<String, Object> config) {
+ return new KafkaConsumer<>(config, new ByteArrayDeserializer(), new ByteArrayDeserializer()) {
+ @Override
+ public void subscribe(final Pattern topics, final ConsumerRebalanceListener listener) {
+ super.subscribe(topics, new TheConsumerRebalanceListener(assignedTopics, listener));
+ }
+ };
+ }
+ });
+
+ startApplicationAndWaitUntilRunning(streams);
+ TestUtils.waitForCondition(() -> assignedTopics.contains(topic1) && !assignedTopics.contains(topic2), STREAM_TASKS_NOT_UPDATED);
+
+ CLUSTER.createTopic(topic2);
+ TestUtils.waitForCondition(() -> assignedTopics.contains(topic1) && assignedTopics.contains(topic2), STREAM_TASKS_NOT_UPDATED);
+
+ final KeyValue<String, String> record1 = new KeyValue<>("1", "1");
+ IntegrationTestUtils.produceKeyValuesSynchronously(
+ topic1,
+ Collections.singletonList(record1),
+ TestUtils.producerConfig(CLUSTER.bootstrapServers(), StringSerializer.class, StringSerializer.class),
+ CLUSTER.time
+ );
+ IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
+ TestUtils.consumerConfig(CLUSTER.bootstrapServers(), StringDeserializer.class, StringDeserializer.class),
+ outputTopic,
+ List.of(record1)
+ );
+
+ streams.close();
+ } finally {
+ CLUSTER.deleteTopics(topic1, topic2);
+ }
+ }
+
private String createTopic(final int suffix) throws InterruptedException {
final String outputTopic = "outputTopic_" + suffix;
CLUSTER.createTopic(outputTopic);
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 42b57e4..895eac6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -465,38 +465,29 @@
}
}
- private OffsetAndMetadata findOffsetAndMetadata(final TopicPartition partition) {
+ private Optional<OffsetAndMetadata> findOffsetAndMetadata(final TopicPartition partition) {
Long offset = partitionGroup.headRecordOffset(partition);
Optional<Integer> leaderEpoch = partitionGroup.headRecordLeaderEpoch(partition);
final long partitionTime = partitionGroup.partitionTimestamp(partition);
if (offset == null) {
- try {
- if (nextOffsetsAndMetadataToBeConsumed.containsKey(partition)) {
- final OffsetAndMetadata offsetAndMetadata = nextOffsetsAndMetadataToBeConsumed.get(partition);
- offset = offsetAndMetadata.offset();
- leaderEpoch = offsetAndMetadata.leaderEpoch();
- } else {
- // This indicates a bug and thus we rethrow it as fatal `IllegalStateException`
- throw new IllegalStateException("Stream task " + id + " does not know the partition: " + partition);
- }
- } catch (final KafkaException fatal) {
- throw new StreamsException(fatal);
+ final OffsetAndMetadata offsetAndMetadata = nextOffsetsAndMetadataToBeConsumed.get(partition);
+ if (offsetAndMetadata == null) {
+ // it may be that we have not yet consumed any record from this partition, hence nothing to commit
+ return Optional.empty();
}
+ offset = offsetAndMetadata.offset();
+ leaderEpoch = offsetAndMetadata.leaderEpoch();
}
- return new OffsetAndMetadata(offset,
+ return Optional.of(new OffsetAndMetadata(offset,
leaderEpoch,
- new TopicPartitionMetadata(partitionTime, processorContext.processorMetadata()).encode());
+ new TopicPartitionMetadata(partitionTime, processorContext.processorMetadata()).encode()));
}
private Map<TopicPartition, OffsetAndMetadata> committableOffsetsAndMetadata() {
- final Map<TopicPartition, OffsetAndMetadata> committableOffsets;
-
switch (state()) {
case CREATED:
case RESTORING:
- committableOffsets = Collections.emptyMap();
-
- break;
+ return Collections.emptyMap();
case RUNNING:
case SUSPENDED:
@@ -505,12 +496,13 @@
// input partitions
final Set<TopicPartition> partitionsNeedCommit = processorContext.processorMetadata().needsCommit() ?
inputPartitions() : consumedOffsets.keySet();
- committableOffsets = new HashMap<>(partitionsNeedCommit.size());
- for (final TopicPartition partition : partitionsNeedCommit) {
- committableOffsets.put(partition, findOffsetAndMetadata(partition));
- }
- break;
+ return partitionsNeedCommit.stream()
+ .map(partition -> findOffsetAndMetadata(partition)
+ .map(offsetAndMetadata -> Map.entry(partition, offsetAndMetadata)))
+ .filter(Optional::isPresent)
+ .map(Optional::get)
+ .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
case CLOSED:
throw new IllegalStateException("Illegal state " + state() + " while getting committable offsets for active task " + id);
@@ -519,7 +511,6 @@
throw new IllegalStateException("Unknown state " + state() + " while post committing active task " + id);
}
- return committableOffsets;
}
@Override