blob: 3781bc4d65ff7e1757a3678b4c35521acb84614e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.streaming.connectors.kafka.internal;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.streaming.connectors.kafka.internals.ClosableBlockingQueue;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaCommitCallback;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionState;
import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartitionStateSentinel;
import org.apache.flink.streaming.connectors.kafka.internals.metrics.KafkaMetricWrapper;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.slf4j.Logger;
import javax.annotation.Nonnull;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicReference;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* The thread the runs the {@link KafkaConsumer}, connecting to the brokers and polling records.
* The thread pushes the data into a {@link Handover} to be picked up by the fetcher that will
* deserialize and emit the records.
*
* <p><b>IMPORTANT:</b> This thread must not be interrupted when attempting to shut it down.
* The Kafka consumer code was found to not always handle interrupts well, and to even
* deadlock in certain situations.
*
* <p>Implementation Note: This code is written to be reusable in later versions of the KafkaConsumer.
* Because Kafka is not maintaining binary compatibility, we use a "call bridge" as an indirection
* to the KafkaConsumer calls that change signature.
*/
@Internal
public class KafkaConsumerThread extends Thread {
/** Logger for this consumer. */
private final Logger log;
/** The handover of data and exceptions between the consumer thread and the task thread. */
private final Handover handover;
/** The next offsets that the main thread should commit and the commit callback. */
private final AtomicReference<Tuple2<Map<TopicPartition, OffsetAndMetadata>, KafkaCommitCallback>> nextOffsetsToCommit;
/** The configuration for the Kafka consumer. */
private final Properties kafkaProperties;
/** The queue of unassigned partitions that we need to assign to the Kafka consumer. */
private final ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue;
/** The indirections on KafkaConsumer methods, for cases where KafkaConsumer compatibility is broken. */
private final KafkaConsumerCallBridge consumerCallBridge;
/** The maximum number of milliseconds to wait for a fetch batch. */
private final long pollTimeout;
/** Flag whether to add Kafka's metrics to the Flink metrics. */
private final boolean useMetrics;
/**
* @deprecated We should only be publishing to the {{@link #consumerMetricGroup}}.
* This is kept to retain compatibility for metrics.
**/
@Deprecated
private final MetricGroup subtaskMetricGroup;
/** We get this from the outside to publish metrics. */
private final MetricGroup consumerMetricGroup;
/** Reference to the Kafka consumer, once it is created. */
private volatile KafkaConsumer<byte[], byte[]> consumer;
/** This lock is used to isolate the consumer for partition reassignment. */
private final Object consumerReassignmentLock;
/** Indication if this consumer has any assigned partition. */
private boolean hasAssignedPartitions;
/**
* Flag to indicate whether an external operation ({@link #setOffsetsToCommit(Map, KafkaCommitCallback)}
* or {@link #shutdown()}) had attempted to wakeup the consumer while it was isolated for partition reassignment.
*/
private volatile boolean hasBufferedWakeup;
/** Flag to mark the main work loop as alive. */
private volatile boolean running;
/** Flag tracking whether the latest commit request has completed. */
private volatile boolean commitInProgress;
private volatile boolean dynamicDiscoverEnabled = true;
private Map<TopicPartition, KafkaTopicPartitionState<TopicPartition>> currentPartitions = new HashMap<>();
public KafkaConsumerThread(
Logger log,
Handover handover,
Properties kafkaProperties,
ClosableBlockingQueue<KafkaTopicPartitionState<TopicPartition>> unassignedPartitionsQueue,
KafkaConsumerCallBridge consumerCallBridge,
String threadName,
long pollTimeout,
boolean useMetrics,
MetricGroup consumerMetricGroup,
MetricGroup subtaskMetricGroup) {
super(threadName);
setDaemon(true);
this.log = checkNotNull(log);
this.handover = checkNotNull(handover);
this.kafkaProperties = checkNotNull(kafkaProperties);
this.consumerMetricGroup = checkNotNull(consumerMetricGroup);
this.subtaskMetricGroup = checkNotNull(subtaskMetricGroup);
this.consumerCallBridge = checkNotNull(consumerCallBridge);
this.unassignedPartitionsQueue = checkNotNull(unassignedPartitionsQueue);
this.pollTimeout = pollTimeout;
this.useMetrics = useMetrics;
this.consumerReassignmentLock = new Object();
this.nextOffsetsToCommit = new AtomicReference<>();
this.running = true;
}
// ------------------------------------------------------------------------
public void setDynamicDiscoverEnabled(boolean dynamicDiscoverEnabled) {
this.dynamicDiscoverEnabled = dynamicDiscoverEnabled;
}
@Override
public void run() {
// early exit check
if (!running) {
return;
}
// this is the means to talk to FlinkKafkaConsumer's main thread
final Handover handover = this.handover;
// This method initializes the KafkaConsumer and guarantees it is torn down properly.
// This is important, because the consumer has multi-threading issues,
// including concurrent 'close()' calls.
try {
this.consumer = getConsumer();
}
catch (Throwable t) {
handover.reportError(t);
return;
}
// from here on, the consumer is guaranteed to be closed properly
try {
// register Kafka's very own metrics in Flink's metric reporters
if (useMetrics) {
// register Kafka metrics to Flink
Map<MetricName, ? extends Metric> metrics = consumer.metrics();
if (metrics == null) {
// MapR's Kafka implementation returns null here.
log.info("Consumer implementation does not support metrics");
} else {
// we have Kafka metrics, register them
for (Map.Entry<MetricName, ? extends Metric> metric: metrics.entrySet()) {
consumerMetricGroup.gauge(metric.getKey().name(), new KafkaMetricWrapper(metric.getValue()));
// TODO this metric is kept for compatibility purposes; should remove in the future
subtaskMetricGroup.gauge(metric.getKey().name(), new KafkaMetricWrapper(metric.getValue()));
}
}
}
// early exit check
if (!running) {
return;
}
// the latest bulk of records. May carry across the loop if the thread is woken up
// from blocking on the handover
ConsumerRecords<byte[], byte[]> records = null;
// reused variable to hold found unassigned new partitions.
// found partitions are not carried across loops using this variable;
// they are carried across via re-adding them to the unassigned partitions queue
List<KafkaTopicPartitionState<TopicPartition>> newPartitions;
boolean reAssignedFailed = false;
// main fetch loop
while (running) {
// check if there is something to commit
if (!commitInProgress) {
// get and reset the work-to-be committed, so we don't repeatedly commit the same
final Tuple2<Map<TopicPartition, OffsetAndMetadata>, KafkaCommitCallback> commitOffsetsAndCallback =
nextOffsetsToCommit.getAndSet(null);
if (commitOffsetsAndCallback != null) {
log.debug("Sending async offset commit request to Kafka broker");
// also record that a commit is already in progress
// the order here matters! first set the flag, then send the commit command.
commitInProgress = true;
consumer.commitAsync(commitOffsetsAndCallback.f0, new CommitCallback(commitOffsetsAndCallback.f1));
}
}
if (currentPartitions == null && !unassignedPartitionsQueue.isOpen()) {
break;
}
try {
if (hasAssignedPartitions) {
newPartitions = unassignedPartitionsQueue.pollBatch();
}
else {
// if no assigned partitions block until we get at least one
// instead of hot spinning this loop. We rely on a fact that
// unassignedPartitionsQueue will be closed on a shutdown, so
// we don't block indefinitely
newPartitions = unassignedPartitionsQueue.getBatchBlocking();
}
if (newPartitions != null) {
for (KafkaTopicPartitionState<TopicPartition> partition: newPartitions) {
currentPartitions.put(partition.getKafkaPartitionHandle(), partition);
}
reassignPartitions();
}
for (KafkaTopicPartitionState<TopicPartition> partitionState: currentPartitions.values()) {
if (partitionState.isFinished()) {
reassignPartitions();
break;
}
}
if (reAssignedFailed) {
reassignPartitions();
reAssignedFailed = false;
}
if (currentPartitions.size() == 0 && !dynamicDiscoverEnabled) {
break;
}
} catch (AbortedReassignmentException e) {
reAssignedFailed = true;
continue;
}
if (!hasAssignedPartitions) {
// Without assigned partitions KafkaConsumer.poll will throw an exception
continue;
}
// get the next batch of records, unless we did not manage to hand the old batch over
if (records == null) {
try {
records = consumer.poll(pollTimeout);
}
catch (WakeupException we) {
continue;
}
}
try {
Map<TopicPartition, Long> positions = new HashMap<>(records.partitions().size());
// When there are records returned, only give the positions of the partitions that
// has records returned, otherwise, return all the positions.
Collection<TopicPartition> partitionsToReportOffsets =
records.isEmpty() ? consumer.assignment() : records.partitions();
partitionsToReportOffsets.forEach(tp -> positions.put(tp, consumer.position(tp)));
handover.produce(records, positions);
records = null;
}
catch (Handover.WakeupException e) {
// fall through the loop
}
}
// end main fetch loop
}
catch (Throwable t) {
// let the main thread know and exit
// it may be that this exception comes because the main thread closed the handover, in
// which case the below reporting is irrelevant, but does not hurt either
handover.reportError(t);
}
finally {
// make sure the handover is closed if it is not already closed or has an error
handover.close();
// make sure the KafkaConsumer is closed
try {
if (consumer != null) {
consumer.close();
}
}
catch (Throwable t) {
log.warn("Error while closing Kafka consumer", t);
}
}
}
/**
* Shuts this thread down, waking up the thread gracefully if blocked (without Thread.interrupt() calls).
*/
public void shutdown() {
running = false;
// wake up all blocking calls on the queue
unassignedPartitionsQueue.close();
// We cannot call close() on the KafkaConsumer, because it will actually throw
// an exception if a concurrent call is in progress
// this wakes up the consumer if it is blocked handing over records
handover.wakeupProducer();
// this wakes up the consumer if it is blocked in a kafka poll
synchronized (consumerReassignmentLock) {
if (consumer != null) {
consumer.wakeup();
} else {
// the consumer is currently isolated for partition reassignment;
// set this flag so that the wakeup state is restored once the reassignment is complete
hasBufferedWakeup = true;
}
}
}
/**
* Tells this thread to commit a set of offsets. This method does not block, the committing
* operation will happen asynchronously.
*
* <p>Only one commit operation may be pending at any time. If the committing takes longer than
* the frequency with which this method is called, then some commits may be skipped due to being
* superseded by newer ones.
*
* @param offsetsToCommit The offsets to commit
* @param commitCallback callback when Kafka commit completes
*/
void setOffsetsToCommit(
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit,
@Nonnull KafkaCommitCallback commitCallback) {
// record the work to be committed by the main consumer thread and make sure the consumer notices that
if (nextOffsetsToCommit.getAndSet(Tuple2.of(offsetsToCommit, commitCallback)) != null) {
log.warn("Committing offsets to Kafka takes longer than the checkpoint interval. " +
"Skipping commit of previous offsets because newer complete checkpoint offsets are available. " +
"This does not compromise Flink's checkpoint integrity.");
}
// if the consumer is blocked in a poll() or handover operation, wake it up to commit soon
handover.wakeupProducer();
synchronized (consumerReassignmentLock) {
if (consumer != null) {
consumer.wakeup();
} else {
// the consumer is currently isolated for partition reassignment;
// set this flag so that the wakeup state is restored once the reassignment is complete
hasBufferedWakeup = true;
}
}
}
// ------------------------------------------------------------------------
/**
* Reestablishes the assigned partitions for the consumer.
* The reassigned partitions consists of the provided new partitions and whatever partitions
* was already previously assigned to the consumer.
*
* <p>The reassignment process is protected against wakeup calls, so that after
* this method returns, the consumer is either untouched or completely reassigned
* with the correct offset positions.
*
* <p>If the consumer was already woken-up prior to a reassignment resulting in an
* interruption any time during the reassignment, the consumer is guaranteed
* to roll back as if it was untouched. On the other hand, if there was an attempt
* to wakeup the consumer during the reassignment, the wakeup call is "buffered"
* until the reassignment completes.
*
* <p>This method is exposed for testing purposes.
*/
@VisibleForTesting
void reassignPartitions() throws Exception {
hasAssignedPartitions = true;
boolean reassignmentStarted = false;
// since the reassignment may introduce several Kafka blocking calls that cannot be interrupted,
// the consumer needs to be isolated from external wakeup calls in setOffsetsToCommit() and shutdown()
// until the reassignment is complete.
final KafkaConsumer<byte[], byte[]> consumerTmp;
synchronized (consumerReassignmentLock) {
consumerTmp = this.consumer;
this.consumer = null;
}
final Map<TopicPartition, Long> oldPartitionAssignmentsToPosition = new HashMap<>();
try {
for (TopicPartition oldPartition : consumerTmp.assignment()) {
oldPartitionAssignmentsToPosition.put(oldPartition, consumerTmp.position(oldPartition));
}
final List<TopicPartition> newPartitionAssignments =
new ArrayList<>(currentPartitions.size());
final List<KafkaTopicPartitionState<TopicPartition>> newPartitions = new ArrayList<>();
List<TopicPartition> finishedTopic = new ArrayList<>();
for (TopicPartition topicPartition : currentPartitions.keySet()) {
if (!oldPartitionAssignmentsToPosition.containsKey(topicPartition)) {
newPartitions.add(currentPartitions.get(topicPartition));
newPartitionAssignments.add(topicPartition);
} else if (currentPartitions.get(topicPartition).isFinished()) {
oldPartitionAssignmentsToPosition.remove(topicPartition);
finishedTopic.add(topicPartition);
} else {
newPartitionAssignments.add(topicPartition);
}
}
for (TopicPartition topicPartition : finishedTopic) {
currentPartitions.remove(topicPartition);
}
if (currentPartitions.isEmpty()) {
// all partition finished.
return;
}
// reassign with the new partitions
consumerCallBridge.assignPartitions(consumerTmp, newPartitionAssignments);
reassignmentStarted = true;
// old partitions should be seeked to their previous position
for (Map.Entry<TopicPartition, Long> oldPartitionToPosition : oldPartitionAssignmentsToPosition.entrySet()) {
consumerTmp.seek(oldPartitionToPosition.getKey(), oldPartitionToPosition.getValue());
}
// offsets in the state of new partitions may still be placeholder sentinel values if we are:
// (1) starting fresh,
// (2) checkpoint / savepoint state we were restored with had not completely
// been replaced with actual offset values yet, or
// (3) the partition was newly discovered after startup;
// replace those with actual offsets, according to what the sentinel value represent.
for (KafkaTopicPartitionState<TopicPartition> newPartitionState : newPartitions) {
if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET) {
consumerCallBridge.seekPartitionToBeginning(consumerTmp, newPartitionState.getKafkaPartitionHandle());
newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
} else if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.LATEST_OFFSET) {
consumerCallBridge.seekPartitionToEnd(consumerTmp, newPartitionState.getKafkaPartitionHandle());
newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
} else if (newPartitionState.getOffset() == KafkaTopicPartitionStateSentinel.GROUP_OFFSET) {
// the KafkaConsumer by default will automatically seek the consumer position
// to the committed group offset, so we do not need to do it.
newPartitionState.setOffset(consumerTmp.position(newPartitionState.getKafkaPartitionHandle()) - 1);
} else {
consumerTmp.seek(newPartitionState.getKafkaPartitionHandle(), newPartitionState.getOffset() + 1);
}
}
} catch (WakeupException e) {
// a WakeupException may be thrown if the consumer was invoked wakeup()
// before it was isolated for the reassignment. In this case, we abort the
// reassignment and just re-expose the original consumer.
synchronized (consumerReassignmentLock) {
this.consumer = consumerTmp;
// if reassignment had already started and affected the consumer,
// we do a full roll back so that it is as if it was left untouched
if (reassignmentStarted) {
consumerCallBridge.assignPartitions(
this.consumer, new ArrayList<>(oldPartitionAssignmentsToPosition.keySet()));
for (Map.Entry<TopicPartition, Long> oldPartitionToPosition : oldPartitionAssignmentsToPosition.entrySet()) {
this.consumer.seek(oldPartitionToPosition.getKey(), oldPartitionToPosition.getValue());
}
}
// no need to restore the wakeup state in this case,
// since only the last wakeup call is effective anyways
hasBufferedWakeup = false;
// this signals the main fetch loop to continue through the loop
throw new AbortedReassignmentException();
}
}
// reassignment complete; expose the reassigned consumer
synchronized (consumerReassignmentLock) {
this.consumer = consumerTmp;
// restore wakeup state for the consumer if necessary
if (hasBufferedWakeup) {
this.consumer.wakeup();
hasBufferedWakeup = false;
}
}
}
public KafkaConsumer<byte[], byte[]> getConsumer() {
return createKafkaConsumer(kafkaProperties);
}
private KafkaConsumer createKafkaConsumer(Properties kafkaProperties) {
try {
return new KafkaConsumer<>(kafkaProperties);
} catch (KafkaException e) {
ClassLoader original = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(null);
return new KafkaConsumer<>(kafkaProperties);
} finally {
Thread.currentThread().setContextClassLoader(original);
}
}
}
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
private static List<TopicPartition> convertKafkaPartitions(Collection<KafkaTopicPartitionState<TopicPartition>> partitions) {
ArrayList<TopicPartition> result = new ArrayList<>(partitions.size());
for (KafkaTopicPartitionState<TopicPartition> p : partitions) {
result.add(p.getKafkaPartitionHandle());
}
return result;
}
private class CommitCallback implements OffsetCommitCallback {
private final KafkaCommitCallback internalCommitCallback;
CommitCallback(KafkaCommitCallback internalCommitCallback) {
this.internalCommitCallback = checkNotNull(internalCommitCallback);
}
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception ex) {
commitInProgress = false;
if (ex != null) {
log.warn("Committing offsets to Kafka failed. This does not compromise Flink's checkpoints.", ex);
internalCommitCallback.onException(ex);
} else {
internalCommitCallback.onSuccess();
}
}
}
/**
* Utility exception that serves as a signal for the main loop to continue through the loop
* if a reassignment attempt was aborted due to an pre-reassignment wakeup call on the consumer.
*/
private static class AbortedReassignmentException extends Exception {
private static final long serialVersionUID = 1L;
}
@VisibleForTesting
public Map<TopicPartition, KafkaTopicPartitionState<TopicPartition>> getCurrentPartitions() {
return currentPartitions;
}
}