blob: 7f1925358e368ff63536bb771673456f07998b3a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.CheckpointListener;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitMode;
import org.apache.flink.streaming.connectors.kafka.config.OffsetCommitModes;
import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher;
import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionAssigner;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor;
import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema;
import org.apache.flink.util.SerializedValue;
import org.apache.commons.collections.map.LinkedMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
import static org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaConsumerMetricConstants.COMMITS_FAILED_METRICS_COUNTER;
import static org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaConsumerMetricConstants.COMMITS_SUCCEEDED_METRICS_COUNTER;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Base class of all Flink Kafka Consumer data sources.
* This implements the common behavior across all Kafka versions.
*
* <p>The Kafka version specific behavior is defined mainly in the specific subclasses of the
* {@link AbstractFetcher}.
*
* @param <T> The type of records produced by this data source
*/
@Internal
public abstract class FlinkKafkaConsumerBase<T> extends RichParallelSourceFunction<T> implements
CheckpointListener,
ResultTypeQueryable<T>,
CheckpointedFunction {
private static final long serialVersionUID = -6272159445203409112L;
protected static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaConsumerBase.class);
/** The maximum number of pending non-committed checkpoints to track, to avoid memory leaks. */
public static final int MAX_NUM_PENDING_CHECKPOINTS = 100;
/**
* The default interval to execute partition discovery,
* in milliseconds ({@code Long.MIN_VALUE}, i.e. disabled by default).
*/
public static final long PARTITION_DISCOVERY_DISABLED = Long.MIN_VALUE;
/** Boolean configuration key to disable metrics tracking. **/
public static final String KEY_DISABLE_METRICS = "flink.disable-metrics";
/** Configuration key to define the consumer's partition discovery interval, in milliseconds. */
public static final String KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS = "flink.partition-discovery.interval-millis";
/** State name of the consumer's partition offset states. */
private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states";
// ------------------------------------------------------------------------
// configuration state, set on the client relevant for all subtasks
// ------------------------------------------------------------------------
/** Describes whether we are discovering partitions for fixed topics or a topic pattern. */
private final KafkaTopicsDescriptor topicsDescriptor;
/** The schema to convert between Kafka's byte messages, and Flink's objects. */
protected final KeyedDeserializationSchema<T> deserializer;
/** The set of topic partitions that the source will read, with their initial offsets to start reading from. */
private Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets;
/** The set of topic partitions that the source will read, with their initial offsets to start reading from. */
private Map<KafkaTopicPartition, Long> subscribedPartitionsToEndOffsets;
/** Finish a partition when newest message has been processed. */
private boolean stopAtLatest = false;
/** Optional timestamp extractor / watermark generator that will be run per Kafka partition,
* to exploit per-partition timestamp characteristics.
* The assigner is kept in serialized form, to deserialize it into multiple copies. */
private SerializedValue<AssignerWithPeriodicWatermarks<T>> periodicWatermarkAssigner;
/** Optional timestamp extractor / watermark generator that will be run per Kafka partition,
* to exploit per-partition timestamp characteristics.
* The assigner is kept in serialized form, to deserialize it into multiple copies. */
private SerializedValue<AssignerWithPunctuatedWatermarks<T>> punctuatedWatermarkAssigner;
/**
* User-set flag determining whether or not to commit on checkpoints.
* Note: this flag does not represent the final offset commit mode.
*/
private boolean enableCommitOnCheckpoints = true;
/**
* The offset commit mode for the consumer.
* The value of this can only be determined in {@link FlinkKafkaConsumerBase#open(Configuration)} since it depends
* on whether or not checkpointing is enabled for the job.
*/
private OffsetCommitMode offsetCommitMode;
/** User configured value for discovery interval, in milliseconds. */
private final long discoveryIntervalMillis;
/** The startup mode for the consumer (default is {@link StartupMode#GROUP_OFFSETS}). */
private StartupMode startupMode = StartupMode.GROUP_OFFSETS;
/** Specific startup offsets; only relevant when startup mode is {@link StartupMode#SPECIFIC_OFFSETS}. */
private Map<KafkaTopicPartition, Long> specificStartupOffsets;
/** Timestamp to determine startup offsets; only relevant when startup mode is {@link StartupMode#TIMESTAMP}. */
private Long startupOffsetsTimestamp;
// ------------------------------------------------------------------------
// runtime state (used individually by each parallel subtask)
// ------------------------------------------------------------------------
/** Data for pending but uncommitted offsets. */
private final LinkedMap pendingOffsetsToCommit = new LinkedMap();
/** The fetcher implements the connections to the Kafka brokers. */
private transient volatile AbstractFetcher<T, ?> kafkaFetcher;
/** The partition discoverer, used to find new partitions. */
private transient volatile AbstractPartitionDiscoverer partitionDiscoverer;
/**
* The offsets to restore to, if the consumer restores state from a checkpoint.
*
* <p>This map will be populated by the {@link #initializeState(FunctionInitializationContext)} method.
*
* <p>Using a sorted map as the ordering is important when using restored state
* to seed the partition discoverer.
*/
private transient volatile TreeMap<KafkaTopicPartition, Long> restoredState;
/** Accessor for state in the operator state backend. */
private transient ListState<Tuple2<KafkaTopicPartition, Long>> unionOffsetStates;
/**
* Flag indicating whether the consumer is restored from older state written with Flink 1.1 or 1.2.
* When the current run is restored from older state, partition discovery is disabled.
*/
private boolean restoredFromOldState;
/** Discovery loop, executed in a separate thread. */
private transient volatile Thread discoveryLoopThread;
/** Flag indicating whether the consumer is still running. */
private volatile boolean running = true;
// ------------------------------------------------------------------------
// internal metrics
// ------------------------------------------------------------------------
/**
* Flag indicating whether or not metrics should be exposed.
* If {@code true}, offset metrics (e.g. current offset, committed offset) and
* Kafka-shipped metrics will be registered.
*/
private final boolean useMetrics;
/** Counter for successful Kafka offset commits. */
private transient Counter successfulCommits;
/** Counter for failed Kafka offset commits. */
private transient Counter failedCommits;
/** Callback interface that will be invoked upon async Kafka commit completion.
* Please be aware that default callback implementation in base class does not
* provide any guarantees on thread-safety. This is sufficient for now because current
* supported Kafka connectors guarantee no more than 1 concurrent async pending offset
* commit.
*/
private transient KafkaCommitCallback offsetCommitCallback;
// ------------------------------------------------------------------------
/**
* Base constructor.
*
* @param topics fixed list of topics to subscribe to (null, if using topic pattern)
* @param topicPattern the topic pattern to subscribe to (null, if using fixed topics)
* @param deserializer The deserializer to turn raw byte messages into Java/Scala objects.
* @param discoveryIntervalMillis the topic / partition discovery interval, in
* milliseconds (0 if discovery is disabled).
*/
public FlinkKafkaConsumerBase(
List<String> topics,
Pattern topicPattern,
KeyedDeserializationSchema<T> deserializer,
long discoveryIntervalMillis,
boolean useMetrics) {
this.topicsDescriptor = new KafkaTopicsDescriptor(topics, topicPattern);
this.deserializer = checkNotNull(deserializer, "valueDeserializer");
checkArgument(
discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED || discoveryIntervalMillis >= 0,
"Cannot define a negative value for the topic / partition discovery interval.");
this.discoveryIntervalMillis = discoveryIntervalMillis;
this.useMetrics = useMetrics;
}
// ------------------------------------------------------------------------
// Configuration
// ------------------------------------------------------------------------
/**
* Specifies an {@link AssignerWithPunctuatedWatermarks} to emit watermarks in a punctuated manner.
* The watermark extractor will run per Kafka partition, watermarks will be merged across partitions
* in the same way as in the Flink runtime, when streams are merged.
*
* <p>When a subtask of a FlinkKafkaConsumer source reads multiple Kafka partitions,
* the streams from the partitions are unioned in a "first come first serve" fashion. Per-partition
* characteristics are usually lost that way. For example, if the timestamps are strictly ascending
* per Kafka partition, they will not be strictly ascending in the resulting Flink DataStream, if the
* parallel source subtask reads more that one partition.
*
* <p>Running timestamp extractors / watermark generators directly inside the Kafka source, per Kafka
* partition, allows users to let them exploit the per-partition characteristics.
*
* <p>Note: One can use either an {@link AssignerWithPunctuatedWatermarks} or an
* {@link AssignerWithPeriodicWatermarks}, not both at the same time.
*
* @param assigner The timestamp assigner / watermark generator to use.
* @return The consumer object, to allow function chaining.
*/
public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> assigner) {
checkNotNull(assigner);
if (this.periodicWatermarkAssigner != null) {
throw new IllegalStateException("A periodic watermark emitter has already been set.");
}
try {
ClosureCleaner.clean(assigner, true);
this.punctuatedWatermarkAssigner = new SerializedValue<>(assigner);
return this;
} catch (Exception e) {
throw new IllegalArgumentException("The given assigner is not serializable", e);
}
}
/**
* Specifies an {@link AssignerWithPunctuatedWatermarks} to emit watermarks in a punctuated manner.
* The watermark extractor will run per Kafka partition, watermarks will be merged across partitions
* in the same way as in the Flink runtime, when streams are merged.
*
* <p>When a subtask of a FlinkKafkaConsumer source reads multiple Kafka partitions,
* the streams from the partitions are unioned in a "first come first serve" fashion. Per-partition
* characteristics are usually lost that way. For example, if the timestamps are strictly ascending
* per Kafka partition, they will not be strictly ascending in the resulting Flink DataStream, if the
* parallel source subtask reads more that one partition.
*
* <p>Running timestamp extractors / watermark generators directly inside the Kafka source, per Kafka
* partition, allows users to let them exploit the per-partition characteristics.
*
* <p>Note: One can use either an {@link AssignerWithPunctuatedWatermarks} or an
* {@link AssignerWithPeriodicWatermarks}, not both at the same time.
*
* @param assigner The timestamp assigner / watermark generator to use.
* @return The consumer object, to allow function chaining.
*/
public FlinkKafkaConsumerBase<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> assigner) {
checkNotNull(assigner);
if (this.punctuatedWatermarkAssigner != null) {
throw new IllegalStateException("A punctuated watermark emitter has already been set.");
}
try {
ClosureCleaner.clean(assigner, true);
this.periodicWatermarkAssigner = new SerializedValue<>(assigner);
return this;
} catch (Exception e) {
throw new IllegalArgumentException("The given assigner is not serializable", e);
}
}
/**
* Specifies whether or not the consumer should commit offsets back to Kafka on checkpoints.
*
* <p>This setting will only have effect if checkpointing is enabled for the job.
* If checkpointing isn't enabled, only the "auto.commit.enable" (for 0.8) / "enable.auto.commit" (for 0.9+)
* property settings will be
*
* @return The consumer object, to allow function chaining.
*/
public FlinkKafkaConsumerBase<T> setCommitOffsetsOnCheckpoints(boolean commitOnCheckpoints) {
this.enableCommitOnCheckpoints = commitOnCheckpoints;
return this;
}
/**
* Specifies the consumer to start reading from the earliest offset for all partitions.
* This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
*
* <p>This method does not affect where partitions are read from when the consumer is restored
* from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
* savepoint, only the offsets in the restored state will be used.
*
* @return The consumer object, to allow function chaining.
*/
public FlinkKafkaConsumerBase<T> setStartFromEarliest() {
this.startupMode = StartupMode.EARLIEST;
this.startupOffsetsTimestamp = null;
this.specificStartupOffsets = null;
return this;
}
/**
* Specifies the consumer to start reading from the latest offset for all partitions.
* This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
*
* <p>This method does not affect where partitions are read from when the consumer is restored
* from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
* savepoint, only the offsets in the restored state will be used.
*
* @return The consumer object, to allow function chaining.
*/
public FlinkKafkaConsumerBase<T> setStartFromLatest() {
this.startupMode = StartupMode.LATEST;
this.startupOffsetsTimestamp = null;
this.specificStartupOffsets = null;
return this;
}
/**
* Specifies the consumer to start reading partitions from a specified timestamp.
* The specified timestamp must be before the current timestamp.
* This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
*
* <p>The consumer will look up the earliest offset whose timestamp is greater than or equal
* to the specific timestamp from Kafka. If there's no such offset, the consumer will use the
* latest offset to read data from kafka.
*
* <p>This method does not affect where partitions are read from when the consumer is restored
* from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
* savepoint, only the offsets in the restored state will be used.
*
* @param startupOffsetsTimestamp timestamp for the startup offsets, as milliseconds from epoch.
*
* @return The consumer object, to allow function chaining.
*/
// NOTE -
// This method is implemented in the base class because this is where the startup logging and verifications live.
// However, it is not publicly exposed since only newer Kafka versions support the functionality.
// Version-specific subclasses which can expose the functionality should override and allow public access.
protected FlinkKafkaConsumerBase<T> setStartFromTimestamp(long startupOffsetsTimestamp) {
checkArgument(startupOffsetsTimestamp >= 0, "The provided value for the startup offsets timestamp is invalid.");
long currentTimestamp = System.currentTimeMillis();
checkArgument(startupOffsetsTimestamp <= currentTimestamp,
"Startup time[%s] must be before current time[%s].", startupOffsetsTimestamp, currentTimestamp);
this.startupMode = StartupMode.TIMESTAMP;
this.startupOffsetsTimestamp = startupOffsetsTimestamp;
this.specificStartupOffsets = null;
return this;
}
/**
* Specifies the consumer to start reading from any committed group offsets found
* in Zookeeper / Kafka brokers. The "group.id" property must be set in the configuration
* properties. If no offset can be found for a partition, the behaviour in "auto.offset.reset"
* set in the configuration properties will be used for the partition.
*
* <p>This method does not affect where partitions are read from when the consumer is restored
* from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
* savepoint, only the offsets in the restored state will be used.
*
* @return The consumer object, to allow function chaining.
*/
public FlinkKafkaConsumerBase<T> setStartFromGroupOffsets() {
this.startupMode = StartupMode.GROUP_OFFSETS;
this.startupOffsetsTimestamp = null;
this.specificStartupOffsets = null;
return this;
}
/**
* Specifies the consumer to start reading partitions from specific offsets, set independently for each partition.
* The specified offset should be the offset of the next record that will be read from partitions.
* This lets the consumer ignore any committed group offsets in Zookeeper / Kafka brokers.
*
* <p>If the provided map of offsets contains entries whose {@link KafkaTopicPartition} is not subscribed by the
* consumer, the entry will be ignored. If the consumer subscribes to a partition that does not exist in the provided
* map of offsets, the consumer will fallback to the default group offset behaviour (see
* {@link FlinkKafkaConsumerBase#setStartFromGroupOffsets()}) for that particular partition.
*
* <p>If the specified offset for a partition is invalid, or the behaviour for that partition is defaulted to group
* offsets but still no group offset could be found for it, then the "auto.offset.reset" behaviour set in the
* configuration properties will be used for the partition
*
* <p>This method does not affect where partitions are read from when the consumer is restored
* from a checkpoint or savepoint. When the consumer is restored from a checkpoint or
* savepoint, only the offsets in the restored state will be used.
*
* @return The consumer object, to allow function chaining.
*/
public FlinkKafkaConsumerBase<T> setStartFromSpecificOffsets(Map<KafkaTopicPartition, Long> specificStartupOffsets) {
this.startupMode = StartupMode.SPECIFIC_OFFSETS;
this.startupOffsetsTimestamp = null;
this.specificStartupOffsets = checkNotNull(specificStartupOffsets);
return this;
}
/**
* Finish source when all message processed.
* @param stopAtLatest
*/
public void setStopAtLatest(boolean stopAtLatest) {
this.stopAtLatest = stopAtLatest;
}
// ------------------------------------------------------------------------
// Work methods
// ------------------------------------------------------------------------
@Override
public void open(Configuration configuration) throws Exception {
// determine the offset commit mode
this.offsetCommitMode = OffsetCommitModes.fromConfiguration(
getIsAutoCommitEnabled(),
enableCommitOnCheckpoints);
if (discoveryIntervalMillis != PARTITION_DISCOVERY_DISABLED && stopAtLatest) {
throw new RuntimeException("StopAtLatest is not supported when dynamic partition discovery enabled.");
}
// create the partition discoverer
this.partitionDiscoverer = createPartitionDiscoverer(
topicsDescriptor,
getRuntimeContext().getIndexOfThisSubtask(),
getRuntimeContext().getNumberOfParallelSubtasks());
this.partitionDiscoverer.open();
subscribedPartitionsToStartOffsets = new HashMap<>();
List<KafkaTopicPartition> allPartitions = partitionDiscoverer.discoverPartitions();
if (restoredState != null) {
for (KafkaTopicPartition partition : allPartitions) {
if (!restoredState.containsKey(partition)) {
restoredState.put(partition, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);
}
}
for (Map.Entry<KafkaTopicPartition, Long> restoredStateEntry : restoredState.entrySet()) {
if (!restoredFromOldState) {
// seed the partition discoverer with the union state while filtering out
// restored partitions that should not be subscribed by this subtask
if (KafkaTopicPartitionAssigner.assign(
restoredStateEntry.getKey(), getRuntimeContext().getNumberOfParallelSubtasks())
== getRuntimeContext().getIndexOfThisSubtask()){
subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
}
} else {
// when restoring from older 1.1 / 1.2 state, the restored state would not be the union state;
// in this case, just use the restored state as the subscribed partitions
subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());
}
}
LOG.info("Consumer subtask {} will start reading {} partitions with offsets in restored state: {}",
getRuntimeContext().getIndexOfThisSubtask(), subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets);
} else {
// use the partition discoverer to fetch the initial seed partitions,
// and set their initial offsets depending on the startup mode.
// for SPECIFIC_OFFSETS and TIMESTAMP modes, we set the specific offsets now;
// for other modes (EARLIEST, LATEST, and GROUP_OFFSETS), the offset is lazily determined
// when the partition is actually read.
switch (startupMode) {
case SPECIFIC_OFFSETS:
if (specificStartupOffsets == null) {
throw new IllegalStateException(
"Startup mode for the consumer set to " + StartupMode.SPECIFIC_OFFSETS +
", but no specific offsets were specified.");
}
for (KafkaTopicPartition seedPartition : allPartitions) {
Long specificOffset = specificStartupOffsets.get(seedPartition);
if (specificOffset != null) {
// since the specified offsets represent the next record to read, we subtract
// it by one so that the initial state of the consumer will be correct
subscribedPartitionsToStartOffsets.put(seedPartition, specificOffset - 1);
} else {
// default to group offset behaviour if the user-provided specific offsets
// do not contain a value for this partition
subscribedPartitionsToStartOffsets.put(seedPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
}
}
break;
case TIMESTAMP:
if (startupOffsetsTimestamp == null) {
throw new IllegalStateException(
"Startup mode for the consumer set to " + StartupMode.TIMESTAMP +
", but no startup timestamp was specified.");
}
for (Map.Entry<KafkaTopicPartition, Long> partitionToOffset
: fetchOffsetsWithTimestamp(allPartitions, startupOffsetsTimestamp).entrySet()) {
subscribedPartitionsToStartOffsets.put(
partitionToOffset.getKey(),
(partitionToOffset.getValue() == null)
// if an offset cannot be retrieved for a partition with the given timestamp,
// we default to using the latest offset for the partition
? KafkaTopicPartitionStateSentinel.LATEST_OFFSET
// since the specified offsets represent the next record to read, we subtract
// it by one so that the initial state of the consumer will be correct
: partitionToOffset.getValue() - 1);
}
break;
default:
for (KafkaTopicPartition seedPartition : allPartitions) {
subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel());
}
}
if (!subscribedPartitionsToStartOffsets.isEmpty()) {
switch (startupMode) {
case EARLIEST:
LOG.info("Consumer subtask {} will start reading the following {} partitions from the earliest offsets: {}",
getRuntimeContext().getIndexOfThisSubtask(),
subscribedPartitionsToStartOffsets.size(),
subscribedPartitionsToStartOffsets.keySet());
break;
case LATEST:
LOG.info("Consumer subtask {} will start reading the following {} partitions from the latest offsets: {}",
getRuntimeContext().getIndexOfThisSubtask(),
subscribedPartitionsToStartOffsets.size(),
subscribedPartitionsToStartOffsets.keySet());
break;
case TIMESTAMP:
LOG.info("Consumer subtask {} will start reading the following {} partitions from timestamp {}: {}",
getRuntimeContext().getIndexOfThisSubtask(),
subscribedPartitionsToStartOffsets.size(),
startupOffsetsTimestamp,
subscribedPartitionsToStartOffsets.keySet());
break;
case SPECIFIC_OFFSETS:
LOG.info("Consumer subtask {} will start reading the following {} partitions from the specified startup offsets {}: {}",
getRuntimeContext().getIndexOfThisSubtask(),
subscribedPartitionsToStartOffsets.size(),
specificStartupOffsets,
subscribedPartitionsToStartOffsets.keySet());
List<KafkaTopicPartition> partitionsDefaultedToGroupOffsets = new ArrayList<>(subscribedPartitionsToStartOffsets.size());
for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
if (subscribedPartition.getValue() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
partitionsDefaultedToGroupOffsets.add(subscribedPartition.getKey());
}
}
if (partitionsDefaultedToGroupOffsets.size() > 0) {
LOG.warn("Consumer subtask {} cannot find offsets for the following {} partitions in the specified startup offsets: {}" +
"; their startup offsets will be defaulted to their committed group offsets in Kafka.",
getRuntimeContext().getIndexOfThisSubtask(),
partitionsDefaultedToGroupOffsets.size(),
partitionsDefaultedToGroupOffsets);
}
break;
default:
case GROUP_OFFSETS:
LOG.info("Consumer subtask {} will start reading the following {} partitions from the committed group offsets in Kafka: {}",
getRuntimeContext().getIndexOfThisSubtask(),
subscribedPartitionsToStartOffsets.size(),
subscribedPartitionsToStartOffsets.keySet());
}
if (stopAtLatest) {
subscribedPartitionsToEndOffsets = new HashMap<>();
for (KafkaTopicPartition partition : subscribedPartitionsToStartOffsets.keySet()) {
subscribedPartitionsToEndOffsets.put(
partition, KafkaTopicPartitionStateSentinel.LATEST_OFFSET);
}
} else if (subscribedPartitionsToEndOffsets == null) {
subscribedPartitionsToEndOffsets = new HashMap<>();
for (KafkaTopicPartition partition : subscribedPartitionsToStartOffsets.keySet()) {
subscribedPartitionsToEndOffsets.put(
partition, Long.MAX_VALUE);
}
}
} else {
LOG.info("Consumer subtask {} initially has no partitions to read from.",
getRuntimeContext().getIndexOfThisSubtask());
}
}
}
@Override
public void run(SourceContext<T> sourceContext) throws Exception {
if (subscribedPartitionsToStartOffsets == null) {
throw new Exception("The partitions were not set for the consumer");
}
// initialize commit metrics and default offset callback method
this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter(COMMITS_SUCCEEDED_METRICS_COUNTER);
this.failedCommits = this.getRuntimeContext().getMetricGroup().counter(COMMITS_FAILED_METRICS_COUNTER);
this.offsetCommitCallback = new KafkaCommitCallback() {
@Override
public void onSuccess() {
successfulCommits.inc();
}
@Override
public void onException(Throwable cause) {
LOG.warn("Async Kafka commit failed.", cause);
failedCommits.inc();
}
};
// mark the subtask as temporarily idle if there are no initial seed partitions;
// once this subtask discovers some partitions and starts collecting records, the subtask's
// status will automatically be triggered back to be active.
if (subscribedPartitionsToStartOffsets.isEmpty()) {
sourceContext.markAsTemporarilyIdle();
}
// from this point forward:
// - 'snapshotState' will draw offsets from the fetcher,
// instead of being built from `subscribedPartitionsToStartOffsets`
// - 'notifyCheckpointComplete' will start to do work (i.e. commit offsets to
// Kafka through the fetcher, if configured to do so)
this.kafkaFetcher = createFetcher(
sourceContext,
subscribedPartitionsToStartOffsets,
subscribedPartitionsToEndOffsets,
periodicWatermarkAssigner,
punctuatedWatermarkAssigner,
(StreamingRuntimeContext) getRuntimeContext(),
offsetCommitMode,
getRuntimeContext().getMetricGroup(),
useMetrics);
if (!running) {
return;
}
// depending on whether we were restored with the current state version (1.3),
// remaining logic branches off into 2 paths:
// 1) New state - partition discovery loop executed as separate thread, with this
// thread running the main fetcher loop
// 2) Old state - partition discovery is disabled and only the main fetcher loop is executed
if (discoveryIntervalMillis != PARTITION_DISCOVERY_DISABLED) {
final AtomicReference<Exception> discoveryLoopErrorRef = new AtomicReference<>();
this.discoveryLoopThread = new Thread(new Runnable() {
@Override
public void run() {
try {
// --------------------- partition discovery loop ---------------------
List<KafkaTopicPartition> discoveredPartitions;
// throughout the loop, we always eagerly check if we are still running before
// performing the next operation, so that we can escape the loop as soon as possible
while (running) {
if (LOG.isDebugEnabled()) {
LOG.debug("Consumer subtask {} is trying to discover new partitions ...", getRuntimeContext().getIndexOfThisSubtask());
}
try {
discoveredPartitions = partitionDiscoverer.discoverPartitions();
} catch (AbstractPartitionDiscoverer.WakeupException | AbstractPartitionDiscoverer.ClosedException e) {
// the partition discoverer may have been closed or woken up before or during the discovery;
// this would only happen if the consumer was canceled; simply escape the loop
break;
}
// no need to add the discovered partitions if we were closed during the meantime
if (running && !discoveredPartitions.isEmpty()) {
kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);
}
// do not waste any time sleeping if we're not running anymore
if (running && discoveryIntervalMillis != 0) {
try {
Thread.sleep(discoveryIntervalMillis);
} catch (InterruptedException iex) {
// may be interrupted if the consumer was canceled midway; simply escape the loop
break;
}
}
}
} catch (Exception e) {
discoveryLoopErrorRef.set(e);
} finally {
// calling cancel will also let the fetcher loop escape
// (if not running, cancel() was already called)
if (running) {
cancel();
}
}
}
});
discoveryLoopThread.start();
kafkaFetcher.runFetchLoop(true);
// --------------------------------------------------------------------
// make sure that the partition discoverer is properly closed
partitionDiscoverer.close();
discoveryLoopThread.join();
// rethrow any fetcher errors
final Exception discoveryLoopError = discoveryLoopErrorRef.get();
if (discoveryLoopError != null) {
throw new RuntimeException(discoveryLoopError);
}
} else {
// won't be using the discoverer
partitionDiscoverer.close();
kafkaFetcher.runFetchLoop(false);
}
}
@Override
public void cancel() {
// set ourselves as not running;
// this would let the main discovery loop escape as soon as possible
running = false;
if (discoveryLoopThread != null) {
if (partitionDiscoverer != null) {
// we cannot close the discoverer here, as it is error-prone to concurrent access;
// only wakeup the discoverer, the discovery loop will clean itself up after it escapes
partitionDiscoverer.wakeup();
}
// the discovery loop may currently be sleeping in-between
// consecutive discoveries; interrupt to shutdown faster
discoveryLoopThread.interrupt();
}
// abort the fetcher, if there is one
if (kafkaFetcher != null) {
kafkaFetcher.cancel();
}
}
@Override
public void close() throws Exception {
// pretty much the same logic as cancelling
try {
cancel();
} finally {
super.close();
}
}
// ------------------------------------------------------------------------
// Checkpoint and restore
// ------------------------------------------------------------------------
@Override
public final void initializeState(FunctionInitializationContext context) throws Exception {
OperatorStateStore stateStore = context.getOperatorStateStore();
ListState<Tuple2<KafkaTopicPartition, Long>> oldRoundRobinListState =
stateStore.getSerializableListState(DefaultOperatorStateBackend.DEFAULT_OPERATOR_STATE_NAME);
this.unionOffsetStates = stateStore.getUnionListState(new ListStateDescriptor<>(
OFFSETS_STATE_NAME,
TypeInformation.of(new TypeHint<Tuple2<KafkaTopicPartition, Long>>() {})));
if (context.isRestored() && !restoredFromOldState) {
restoredState = new TreeMap<>(new KafkaTopicPartition.Comparator());
// migrate from 1.2 state, if there is any
for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : oldRoundRobinListState.get()) {
restoredFromOldState = true;
unionOffsetStates.add(kafkaOffset);
}
oldRoundRobinListState.clear();
if (restoredFromOldState && discoveryIntervalMillis != PARTITION_DISCOVERY_DISABLED) {
throw new IllegalArgumentException(
"Topic / partition discovery cannot be enabled if the job is restored from a savepoint from Flink 1.2.x.");
}
// populate actual holder for restored state
for (Tuple2<KafkaTopicPartition, Long> kafkaOffset : unionOffsetStates.get()) {
restoredState.put(kafkaOffset.f0, kafkaOffset.f1);
}
LOG.info("Setting restore state in the FlinkKafkaConsumer: {}", restoredState);
} else {
LOG.info("No restore state for FlinkKafkaConsumer.");
}
}
@Override
public final void snapshotState(FunctionSnapshotContext context) throws Exception {
if (!running) {
LOG.debug("snapshotState() called on closed source");
} else {
unionOffsetStates.clear();
final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
if (fetcher == null) {
// the fetcher has not yet been initialized, which means we need to return the
// originally restored offsets or the assigned partitions
for (Map.Entry<KafkaTopicPartition, Long> subscribedPartition : subscribedPartitionsToStartOffsets.entrySet()) {
unionOffsetStates.add(Tuple2.of(subscribedPartition.getKey(), subscribedPartition.getValue()));
}
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
// the map cannot be asynchronously updated, because only one checkpoint call can happen
// on this function at a time: either snapshotState() or notifyCheckpointComplete()
pendingOffsetsToCommit.put(context.getCheckpointId(), restoredState);
}
} else {
HashMap<KafkaTopicPartition, Long> currentOffsets = fetcher.snapshotCurrentState();
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
// the map cannot be asynchronously updated, because only one checkpoint call can happen
// on this function at a time: either snapshotState() or notifyCheckpointComplete()
pendingOffsetsToCommit.put(context.getCheckpointId(), currentOffsets);
}
for (Map.Entry<KafkaTopicPartition, Long> kafkaTopicPartitionLongEntry : currentOffsets.entrySet()) {
unionOffsetStates.add(
Tuple2.of(kafkaTopicPartitionLongEntry.getKey(), kafkaTopicPartitionLongEntry.getValue()));
}
}
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
// truncate the map of pending offsets to commit, to prevent infinite growth
while (pendingOffsetsToCommit.size() > MAX_NUM_PENDING_CHECKPOINTS) {
pendingOffsetsToCommit.remove(0);
}
}
}
}
@Override
public final void notifyCheckpointComplete(long checkpointId) throws Exception {
if (!running) {
LOG.debug("notifyCheckpointComplete() called on closed source");
return;
}
final AbstractFetcher<?, ?> fetcher = this.kafkaFetcher;
if (fetcher == null) {
LOG.debug("notifyCheckpointComplete() called on uninitialized source");
return;
}
if (offsetCommitMode == OffsetCommitMode.ON_CHECKPOINTS) {
// only one commit operation must be in progress
if (LOG.isDebugEnabled()) {
LOG.debug("Committing offsets to Kafka/ZooKeeper for checkpoint " + checkpointId);
}
try {
final int posInMap = pendingOffsetsToCommit.indexOf(checkpointId);
if (posInMap == -1) {
LOG.warn("Received confirmation for unknown checkpoint id {}", checkpointId);
return;
}
@SuppressWarnings("unchecked")
Map<KafkaTopicPartition, Long> offsets =
(Map<KafkaTopicPartition, Long>) pendingOffsetsToCommit.remove(posInMap);
// remove older checkpoints in map
for (int i = 0; i < posInMap; i++) {
pendingOffsetsToCommit.remove(0);
}
if (offsets == null || offsets.size() == 0) {
LOG.debug("Checkpoint state was empty.");
return;
}
fetcher.commitInternalOffsetsToKafka(offsets, offsetCommitCallback);
} catch (Exception e) {
if (running) {
throw e;
}
// else ignore exception if we are no longer running
}
}
}
// ------------------------------------------------------------------------
// Kafka Consumer specific methods
// ------------------------------------------------------------------------
/**
* Creates the fetcher that connect to the Kafka brokers, pulls data, deserialized the
* data, and emits it into the data streams.
*
* @param sourceContext The source context to emit data to.
* @param subscribedPartitionsToStartOffsets The set of partitions that this subtask should handle, with their start offsets.
* @param subscribedPartitionsToEndOffsets The set of partitions that this subtask should handle, with their end offsets.
* @param watermarksPeriodic Optional, a serialized timestamp extractor / periodic watermark generator.
* @param watermarksPunctuated Optional, a serialized timestamp extractor / punctuated watermark generator.
* @param runtimeContext The task's runtime context.
*
* @return The instantiated fetcher
*
* @throws Exception The method should forward exceptions
*/
protected abstract AbstractFetcher<T, ?> createFetcher(
SourceContext<T> sourceContext,
Map<KafkaTopicPartition, Long> subscribedPartitionsToStartOffsets,
Map<KafkaTopicPartition, Long> subscribedPartitionsToEndOffsets,
SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
StreamingRuntimeContext runtimeContext,
OffsetCommitMode offsetCommitMode,
MetricGroup kafkaMetricGroup,
boolean useMetrics) throws Exception;
/**
* Creates the partition discoverer that is used to find new partitions for this subtask.
*
* @param topicsDescriptor Descriptor that describes whether we are discovering partitions for fixed topics or a topic pattern.
* @param indexOfThisSubtask The index of this consumer subtask.
* @param numParallelSubtasks The total number of parallel consumer subtasks.
*
* @return The instantiated partition discoverer
*/
protected abstract AbstractPartitionDiscoverer createPartitionDiscoverer(
KafkaTopicsDescriptor topicsDescriptor,
int indexOfThisSubtask,
int numParallelSubtasks);
protected abstract boolean getIsAutoCommitEnabled();
protected abstract Map<KafkaTopicPartition, Long> fetchOffsetsWithTimestamp(
Collection<KafkaTopicPartition> partitions,
long timestamp);
// ------------------------------------------------------------------------
// ResultTypeQueryable methods
// ------------------------------------------------------------------------
@Override
public TypeInformation<T> getProducedType() {
return deserializer.getProducedType();
}
// ------------------------------------------------------------------------
// Test utilities
// ------------------------------------------------------------------------
@VisibleForTesting
Map<KafkaTopicPartition, Long> getSubscribedPartitionsToStartOffsets() {
return subscribedPartitionsToStartOffsets;
}
@VisibleForTesting
TreeMap<KafkaTopicPartition, Long> getRestoredState() {
return restoredState;
}
@VisibleForTesting
OffsetCommitMode getOffsetCommitMode() {
return offsetCommitMode;
}
@VisibleForTesting
LinkedMap getPendingOffsetsToCommit() {
return pendingOffsetsToCommit;
}
}