blob: d0482304c26c5781e92ae73a2084654f4766d03e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.connector.kafka.source.reader;
import org.apache.flink.api.connector.source.SourceReaderContext;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsAddition;
import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
import org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics;
import org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
import org.apache.flink.util.Collector;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.StringJoiner;
import java.util.stream.Collectors;
/**
* A {@link SplitReader} implementation that reads records from Kafka partitions.
*
* <p>The returned type are in the format of {@code tuple3(record, offset and timestamp}.
*
* @param <T> the type of the record to be emitted from the Source.
*/
public class KafkaPartitionSplitReader<T>
implements SplitReader<Tuple3<T, Long, Long>, KafkaPartitionSplit> {
private static final Logger LOG = LoggerFactory.getLogger(KafkaPartitionSplitReader.class);
private static final long POLL_TIMEOUT = 10000L;
private final KafkaConsumer<byte[], byte[]> consumer;
private final KafkaRecordDeserializationSchema<T> deserializationSchema;
private final Map<TopicPartition, Long> stoppingOffsets;
private final SimpleCollector<T> collector;
private final String groupId;
private final int subtaskId;
private final KafkaSourceReaderMetrics kafkaSourceReaderMetrics;
// Tracking empty splits that has not been added to finished splits in fetch()
private final Set<String> emptySplits = new HashSet<>();
public KafkaPartitionSplitReader(
Properties props,
KafkaRecordDeserializationSchema<T> deserializationSchema,
SourceReaderContext context,
KafkaSourceReaderMetrics kafkaSourceReaderMetrics) {
this.subtaskId = context.getIndexOfSubtask();
this.kafkaSourceReaderMetrics = kafkaSourceReaderMetrics;
Properties consumerProps = new Properties();
consumerProps.putAll(props);
consumerProps.setProperty(ConsumerConfig.CLIENT_ID_CONFIG, createConsumerClientId(props));
this.consumer = new KafkaConsumer<>(consumerProps);
this.stoppingOffsets = new HashMap<>();
this.deserializationSchema = deserializationSchema;
this.collector = new SimpleCollector<>();
this.groupId = consumerProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG);
// Metric registration
maybeRegisterKafkaConsumerMetrics(props, kafkaSourceReaderMetrics, consumer);
this.kafkaSourceReaderMetrics.registerNumBytesIn(consumer);
}
@Override
public RecordsWithSplitIds<Tuple3<T, Long, Long>> fetch() throws IOException {
KafkaPartitionSplitRecords<Tuple3<T, Long, Long>> recordsBySplits =
new KafkaPartitionSplitRecords<>();
ConsumerRecords<byte[], byte[]> consumerRecords;
try {
consumerRecords = consumer.poll(Duration.ofMillis(POLL_TIMEOUT));
} catch (WakeupException we) {
recordsBySplits.prepareForRead();
return recordsBySplits;
}
List<TopicPartition> finishedPartitions = new ArrayList<>();
for (TopicPartition tp : consumerRecords.partitions()) {
long stoppingOffset = getStoppingOffset(tp);
String splitId = tp.toString();
Collection<Tuple3<T, Long, Long>> recordsForSplit =
recordsBySplits.recordsForSplit(splitId);
final List<ConsumerRecord<byte[], byte[]>> recordsFromPartition =
consumerRecords.records(tp);
for (ConsumerRecord<byte[], byte[]> consumerRecord : recordsFromPartition) {
// Stop consuming from this partition if the offsets has reached the stopping
// offset.
// Note that there are two cases, either case finishes a split:
// 1. After processing a record with offset of "stoppingOffset - 1". The split
// reader
// should not continue fetching because the record with stoppingOffset may not
// exist.
// 2. Before processing a record whose offset is greater than or equals to the
// stopping
// offset. This should only happens when case 1 was not met due to log compaction
// or
// log retention.
// Case 2 is handled here. Case 1 is handled after the record is processed.
if (consumerRecord.offset() >= stoppingOffset) {
finishSplitAtRecord(
tp,
stoppingOffset,
consumerRecord.offset(),
finishedPartitions,
recordsBySplits);
break;
}
// Add the record to the partition collector.
try {
deserializationSchema.deserialize(consumerRecord, collector);
collector
.getRecords()
.forEach(
r ->
recordsForSplit.add(
new Tuple3<>(
r,
consumerRecord.offset(),
consumerRecord.timestamp())));
// Finish the split because there might not be any message after this point.
// Keep polling
// will just block forever.
if (consumerRecord.offset() == stoppingOffset - 1) {
finishSplitAtRecord(
tp,
stoppingOffset,
consumerRecord.offset(),
finishedPartitions,
recordsBySplits);
}
} catch (Exception e) {
throw new IOException("Failed to deserialize consumer record due to", e);
} finally {
collector.reset();
}
}
// Use the last record for updating offset metrics
if (recordsFromPartition.size() > 0) {
kafkaSourceReaderMetrics.recordCurrentOffset(
tp, recordsFromPartition.get(recordsFromPartition.size() - 1).offset());
}
// Track this partition's record lag if it never appears before
kafkaSourceReaderMetrics.maybeAddRecordsLagMetric(consumer, tp);
}
// Some splits are discovered as empty when handling split additions. These splits should be
// added to finished splits to clean up states in split fetcher and source reader.
if (!emptySplits.isEmpty()) {
recordsBySplits.finishedSplits.addAll(emptySplits);
emptySplits.clear();
}
// Unassign the partitions that has finished.
if (!finishedPartitions.isEmpty()) {
finishedPartitions.forEach(kafkaSourceReaderMetrics::removeRecordsLagMetric);
unassignPartitions(finishedPartitions);
}
recordsBySplits.prepareForRead();
// Update numBytesIn
kafkaSourceReaderMetrics.updateNumBytesInCounter();
return recordsBySplits;
}
@Override
public void handleSplitsChanges(SplitsChange<KafkaPartitionSplit> splitsChange) {
// Get all the partition assignments and stopping offsets.
if (!(splitsChange instanceof SplitsAddition)) {
throw new UnsupportedOperationException(
String.format(
"The SplitChange type of %s is not supported.",
splitsChange.getClass()));
}
// Assignment.
List<TopicPartition> newPartitionAssignments = new ArrayList<>();
// Starting offsets.
Map<TopicPartition, Long> partitionsStartingFromSpecifiedOffsets = new HashMap<>();
List<TopicPartition> partitionsStartingFromEarliest = new ArrayList<>();
List<TopicPartition> partitionsStartingFromLatest = new ArrayList<>();
// Stopping offsets.
List<TopicPartition> partitionsStoppingAtLatest = new ArrayList<>();
Set<TopicPartition> partitionsStoppingAtCommitted = new HashSet<>();
// Parse the starting and stopping offsets.
splitsChange
.splits()
.forEach(
s -> {
newPartitionAssignments.add(s.getTopicPartition());
parseStartingOffsets(
s,
partitionsStartingFromEarliest,
partitionsStartingFromLatest,
partitionsStartingFromSpecifiedOffsets);
parseStoppingOffsets(
s, partitionsStoppingAtLatest, partitionsStoppingAtCommitted);
// Track the new topic partition in metrics
kafkaSourceReaderMetrics.registerTopicPartition(s.getTopicPartition());
});
// Assign new partitions.
newPartitionAssignments.addAll(consumer.assignment());
consumer.assign(newPartitionAssignments);
// Seek on the newly assigned partitions to their stating offsets.
seekToStartingOffsets(
partitionsStartingFromEarliest,
partitionsStartingFromLatest,
partitionsStartingFromSpecifiedOffsets);
// Setup the stopping offsets.
acquireAndSetStoppingOffsets(partitionsStoppingAtLatest, partitionsStoppingAtCommitted);
// After acquiring the starting and stopping offsets, remove the empty splits if necessary.
removeEmptySplits();
maybeLogSplitChangesHandlingResult(splitsChange);
}
@Override
public void wakeUp() {
consumer.wakeup();
}
@Override
public void close() throws Exception {
consumer.close();
}
// ---------------
public void notifyCheckpointComplete(
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit,
OffsetCommitCallback offsetCommitCallback) {
consumer.commitAsync(offsetsToCommit, offsetCommitCallback);
}
// --------------- private helper method ----------------------
private void parseStartingOffsets(
KafkaPartitionSplit split,
List<TopicPartition> partitionsStartingFromEarliest,
List<TopicPartition> partitionsStartingFromLatest,
Map<TopicPartition, Long> partitionsStartingFromSpecifiedOffsets) {
TopicPartition tp = split.getTopicPartition();
// Parse starting offsets.
if (split.getStartingOffset() == KafkaPartitionSplit.EARLIEST_OFFSET) {
partitionsStartingFromEarliest.add(tp);
} else if (split.getStartingOffset() == KafkaPartitionSplit.LATEST_OFFSET) {
partitionsStartingFromLatest.add(tp);
} else if (split.getStartingOffset() == KafkaPartitionSplit.COMMITTED_OFFSET) {
// Do nothing here, the consumer will first try to get the committed offsets of
// these partitions by default.
} else {
partitionsStartingFromSpecifiedOffsets.put(tp, split.getStartingOffset());
}
}
private void parseStoppingOffsets(
KafkaPartitionSplit split,
List<TopicPartition> partitionsStoppingAtLatest,
Set<TopicPartition> partitionsStoppingAtCommitted) {
TopicPartition tp = split.getTopicPartition();
split.getStoppingOffset()
.ifPresent(
stoppingOffset -> {
if (stoppingOffset >= 0) {
stoppingOffsets.put(tp, stoppingOffset);
} else if (stoppingOffset == KafkaPartitionSplit.LATEST_OFFSET) {
partitionsStoppingAtLatest.add(tp);
} else if (stoppingOffset == KafkaPartitionSplit.COMMITTED_OFFSET) {
partitionsStoppingAtCommitted.add(tp);
} else {
// This should not happen.
throw new FlinkRuntimeException(
String.format(
"Invalid stopping offset %d for partition %s",
stoppingOffset, tp));
}
});
}
private void seekToStartingOffsets(
List<TopicPartition> partitionsStartingFromEarliest,
List<TopicPartition> partitionsStartingFromLatest,
Map<TopicPartition, Long> partitionsStartingFromSpecifiedOffsets) {
if (!partitionsStartingFromEarliest.isEmpty()) {
LOG.trace("Seeking starting offsets to beginning: {}", partitionsStartingFromEarliest);
consumer.seekToBeginning(partitionsStartingFromEarliest);
}
if (!partitionsStartingFromLatest.isEmpty()) {
LOG.trace("Seeking starting offsets to end: {}", partitionsStartingFromLatest);
consumer.seekToEnd(partitionsStartingFromLatest);
}
if (!partitionsStartingFromSpecifiedOffsets.isEmpty()) {
LOG.trace(
"Seeking starting offsets to specified offsets: {}",
partitionsStartingFromSpecifiedOffsets);
partitionsStartingFromSpecifiedOffsets.forEach(consumer::seek);
}
}
private void acquireAndSetStoppingOffsets(
List<TopicPartition> partitionsStoppingAtLatest,
Set<TopicPartition> partitionsStoppingAtCommitted) {
Map<TopicPartition, Long> endOffset = consumer.endOffsets(partitionsStoppingAtLatest);
stoppingOffsets.putAll(endOffset);
if (!partitionsStoppingAtCommitted.isEmpty()) {
consumer.committed(partitionsStoppingAtCommitted)
.forEach(
(tp, offsetAndMetadata) -> {
Preconditions.checkNotNull(
offsetAndMetadata,
String.format(
"Partition %s should stop at committed offset. "
+ "But there is no committed offset of this partition for group %s",
tp, groupId));
stoppingOffsets.put(tp, offsetAndMetadata.offset());
});
}
}
private void removeEmptySplits() {
List<TopicPartition> emptyPartitions = new ArrayList<>();
// If none of the partitions have any records,
for (TopicPartition tp : consumer.assignment()) {
if (consumer.position(tp) >= getStoppingOffset(tp)) {
emptyPartitions.add(tp);
}
}
if (!emptyPartitions.isEmpty()) {
LOG.debug(
"These assigning splits are empty and will be marked as finished in later fetch: {}",
emptyPartitions);
// Add empty partitions to empty split set for later cleanup in fetch()
emptySplits.addAll(
emptyPartitions.stream()
.map(KafkaPartitionSplit::toSplitId)
.collect(Collectors.toSet()));
// Un-assign partitions from Kafka consumer
unassignPartitions(emptyPartitions);
}
}
private void maybeLogSplitChangesHandlingResult(
SplitsChange<KafkaPartitionSplit> splitsChange) {
if (LOG.isDebugEnabled()) {
StringJoiner splitsInfo = new StringJoiner(",");
for (KafkaPartitionSplit split : splitsChange.splits()) {
long startingOffset = consumer.position(split.getTopicPartition());
long stoppingOffset = getStoppingOffset(split.getTopicPartition());
splitsInfo.add(
String.format(
"[%s, start:%d, stop: %d]",
split.getTopicPartition(), startingOffset, stoppingOffset));
}
LOG.debug("SplitsChange handling result: {}", splitsInfo);
}
}
private void unassignPartitions(Collection<TopicPartition> partitionsToUnassign) {
Collection<TopicPartition> newAssignment = new HashSet<>(consumer.assignment());
newAssignment.removeAll(partitionsToUnassign);
consumer.assign(newAssignment);
}
private String createConsumerClientId(Properties props) {
String prefix = props.getProperty(KafkaSourceOptions.CLIENT_ID_PREFIX.key());
return prefix + "-" + subtaskId;
}
private void finishSplitAtRecord(
TopicPartition tp,
long stoppingOffset,
long currentOffset,
List<TopicPartition> finishedPartitions,
KafkaPartitionSplitRecords<Tuple3<T, Long, Long>> recordsBySplits) {
LOG.debug(
"{} has reached stopping offset {}, current offset is {}",
tp,
stoppingOffset,
currentOffset);
finishedPartitions.add(tp);
recordsBySplits.addFinishedSplit(KafkaPartitionSplit.toSplitId(tp));
}
private long getStoppingOffset(TopicPartition tp) {
return stoppingOffsets.getOrDefault(tp, Long.MAX_VALUE);
}
private void maybeRegisterKafkaConsumerMetrics(
Properties props,
KafkaSourceReaderMetrics kafkaSourceReaderMetrics,
KafkaConsumer<?, ?> consumer) {
final Boolean needToRegister =
KafkaSourceOptions.getOption(
props,
KafkaSourceOptions.REGISTER_KAFKA_CONSUMER_METRICS,
Boolean::parseBoolean);
if (needToRegister) {
kafkaSourceReaderMetrics.registerKafkaConsumerMetrics(consumer);
}
}
// ---------------- private helper class ------------------------
private static class KafkaPartitionSplitRecords<T> implements RecordsWithSplitIds<T> {
private final Map<String, Collection<T>> recordsBySplits;
private final Set<String> finishedSplits;
private Iterator<Map.Entry<String, Collection<T>>> splitIterator;
private String currentSplitId;
private Iterator<T> recordIterator;
private KafkaPartitionSplitRecords() {
this.recordsBySplits = new HashMap<>();
this.finishedSplits = new HashSet<>();
}
private Collection<T> recordsForSplit(String splitId) {
return recordsBySplits.computeIfAbsent(splitId, id -> new ArrayList<>());
}
private void addFinishedSplit(String splitId) {
finishedSplits.add(splitId);
}
private void prepareForRead() {
splitIterator = recordsBySplits.entrySet().iterator();
}
@Override
@Nullable
public String nextSplit() {
if (splitIterator.hasNext()) {
Map.Entry<String, Collection<T>> entry = splitIterator.next();
currentSplitId = entry.getKey();
recordIterator = entry.getValue().iterator();
return currentSplitId;
} else {
currentSplitId = null;
recordIterator = null;
return null;
}
}
@Override
@Nullable
public T nextRecordFromSplit() {
Preconditions.checkNotNull(
currentSplitId,
"Make sure nextSplit() did not return null before "
+ "iterate over the records split.");
if (recordIterator.hasNext()) {
return recordIterator.next();
} else {
return null;
}
}
@Override
public Set<String> finishedSplits() {
return finishedSplits;
}
}
private static class SimpleCollector<T> implements Collector<T> {
private final List<T> records = new ArrayList<>();
@Override
public void collect(T record) {
records.add(record);
}
@Override
public void close() {}
private List<T> getRecords() {
return records;
}
private void reset() {
records.clear();
}
}
}