blob: f5eaac4aa328f63de0406ae42f3126378f322877 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.connect.runtime;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.connect.data.SchemaAndValue;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.errors.RetriableException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.apache.kafka.connect.sink.SinkTask;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static java.util.Collections.singleton;
/**
* WorkerTask that uses a SinkTask to export data from Kafka.
*/
class WorkerSinkTask extends WorkerTask {
private static final Logger log = LoggerFactory.getLogger(WorkerSinkTask.class);
private final WorkerConfig workerConfig;
private final SinkTask task;
private Map<String, String> taskConfig;
private final Time time;
private final Converter keyConverter;
private final Converter valueConverter;
private KafkaConsumer<byte[], byte[]> consumer;
private WorkerSinkTaskContext context;
private final List<SinkRecord> messageBatch;
private Map<TopicPartition, OffsetAndMetadata> lastCommittedOffsets;
private Map<TopicPartition, OffsetAndMetadata> currentOffsets;
private RuntimeException rebalanceException;
private long nextCommit;
private int commitSeqno;
private long commitStarted;
private int commitFailures;
private boolean pausedForRedelivery;
private boolean committing;
public WorkerSinkTask(ConnectorTaskId id,
SinkTask task,
TaskStatus.Listener statusListener,
TargetState initialState,
WorkerConfig workerConfig,
Converter keyConverter,
Converter valueConverter,
Time time) {
super(id, statusListener, initialState);
this.workerConfig = workerConfig;
this.task = task;
this.keyConverter = keyConverter;
this.valueConverter = valueConverter;
this.time = time;
this.messageBatch = new ArrayList<>();
this.currentOffsets = new HashMap<>();
this.pausedForRedelivery = false;
this.rebalanceException = null;
this.nextCommit = time.milliseconds() +
workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
this.committing = false;
this.commitSeqno = 0;
this.commitStarted = -1;
this.commitFailures = 0;
}
@Override
public void initialize(TaskConfig taskConfig) {
try {
this.taskConfig = taskConfig.originalsStrings();
this.consumer = createConsumer();
this.context = new WorkerSinkTaskContext(consumer);
} catch (Throwable t) {
log.error("Task {} failed initialization and will not be started.", t);
onFailure(t);
}
}
@Override
public void stop() {
// Offset commit is handled upon exit in work thread
super.stop();
consumer.wakeup();
}
@Override
protected void close() {
// FIXME Kafka needs to add a timeout parameter here for us to properly obey the timeout
// passed in
task.stop();
if (consumer != null)
consumer.close();
}
@Override
public void transitionTo(TargetState state) {
super.transitionTo(state);
consumer.wakeup();
}
@Override
public void execute() {
initializeAndStart();
try {
while (!isStopping())
iteration();
} finally {
// Make sure any uncommitted data has been committed and the task has
// a chance to clean up its state
closePartitions();
}
}
protected void iteration() {
long now = time.milliseconds();
// Maybe commit
if (!committing && now >= nextCommit) {
commitOffsets(now, false);
nextCommit += workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG);
}
// Check for timed out commits
long commitTimeout = commitStarted + workerConfig.getLong(
WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
if (committing && now >= commitTimeout) {
log.warn("Commit of {} offsets timed out", this);
commitFailures++;
committing = false;
}
// And process messages
long timeoutMs = Math.max(nextCommit - now, 0);
poll(timeoutMs);
}
private void onCommitCompleted(Throwable error, long seqno) {
if (commitSeqno != seqno) {
log.debug("Got callback for timed out commit {}: {}, but most recent commit is {}",
this,
seqno, commitSeqno);
} else {
if (error != null) {
log.error("Commit of {} offsets threw an unexpected exception: ", this, error);
commitFailures++;
} else {
log.debug("Finished {} offset commit successfully in {} ms",
this, time.milliseconds() - commitStarted);
commitFailures = 0;
}
committing = false;
}
}
public int commitFailures() {
return commitFailures;
}
/**
* Initializes and starts the SinkTask.
*/
protected void initializeAndStart() {
log.debug("Initializing task {} with config {}", id, taskConfig);
String topicsStr = taskConfig.get(SinkTask.TOPICS_CONFIG);
if (topicsStr == null || topicsStr.isEmpty())
throw new ConnectException("Sink tasks require a list of topics.");
String[] topics = topicsStr.split(",");
log.debug("Task {} subscribing to topics {}", id, topics);
consumer.subscribe(Arrays.asList(topics), new HandleRebalance());
task.initialize(context);
task.start(taskConfig);
log.info("Sink task {} finished initialization and start", this);
}
/** Poll for new messages with the given timeout. Should only be invoked by the worker thread. */
protected void poll(long timeoutMs) {
try {
rewind();
long retryTimeout = context.timeout();
if (retryTimeout > 0) {
timeoutMs = Math.min(timeoutMs, retryTimeout);
context.timeout(-1L);
}
log.trace("{} polling consumer with timeout {} ms", id, timeoutMs);
ConsumerRecords<byte[], byte[]> msgs = pollConsumer(timeoutMs);
assert messageBatch.isEmpty() || msgs.isEmpty();
log.trace("{} polling returned {} messages", id, msgs.count());
convertMessages(msgs);
deliverMessages();
} catch (WakeupException we) {
log.trace("{} consumer woken up", id);
if (shouldPause()) {
pauseAll();
} else if (!pausedForRedelivery) {
resumeAll();
}
}
}
/**
* Starts an offset commit by flushing outstanding messages from the task and then starting
* the write commit.
**/
private void doCommit(Map<TopicPartition, OffsetAndMetadata> offsets, boolean closing, final int seqno) {
log.info("{} Committing offsets", this);
if (closing) {
try {
consumer.commitSync(offsets);
lastCommittedOffsets = offsets;
onCommitCompleted(null, seqno);
} catch (KafkaException e) {
onCommitCompleted(e, seqno);
}
} else {
OffsetCommitCallback cb = new OffsetCommitCallback() {
@Override
public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception error) {
lastCommittedOffsets = offsets;
onCommitCompleted(error, seqno);
}
};
consumer.commitAsync(offsets, cb);
}
}
private void commitOffsets(long now, boolean closing) {
if (currentOffsets.isEmpty())
return;
committing = true;
commitSeqno += 1;
commitStarted = now;
Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>(currentOffsets);
try {
task.flush(offsets);
} catch (Throwable t) {
log.error("Commit of {} offsets failed due to exception while flushing:", this, t);
log.error("Rewinding offsets to last committed offsets");
for (Map.Entry<TopicPartition, OffsetAndMetadata> entry : lastCommittedOffsets.entrySet()) {
log.debug("{} Rewinding topic partition {} to offset {}", id, entry.getKey(), entry.getValue().offset());
consumer.seek(entry.getKey(), entry.getValue().offset());
}
currentOffsets = new HashMap<>(lastCommittedOffsets);
onCommitCompleted(t, commitSeqno);
return;
} finally {
// Close the task if needed before committing the offsets. This is basically the last chance for
// the connector to actually flush data that has been written to it.
if (closing)
task.close(currentOffsets.keySet());
}
doCommit(offsets, closing, commitSeqno);
}
@Override
public String toString() {
return "WorkerSinkTask{" +
"id=" + id +
'}';
}
private ConsumerRecords<byte[], byte[]> pollConsumer(long timeoutMs) {
ConsumerRecords<byte[], byte[]> msgs = consumer.poll(timeoutMs);
// Exceptions raised from the task during a rebalance should be rethrown to stop the worker
if (rebalanceException != null) {
RuntimeException e = rebalanceException;
rebalanceException = null;
throw e;
}
return msgs;
}
private KafkaConsumer<byte[], byte[]> createConsumer() {
// Include any unknown worker configs so consumer configs can be set globally on the worker
// and through to the task
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "connect-" + id.connector());
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
Utils.join(workerConfig.getList(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG), ","));
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
props.putAll(workerConfig.originalsWithPrefix("consumer."));
KafkaConsumer<byte[], byte[]> newConsumer;
try {
newConsumer = new KafkaConsumer<>(props);
} catch (Throwable t) {
throw new ConnectException("Failed to create consumer", t);
}
return newConsumer;
}
private void convertMessages(ConsumerRecords<byte[], byte[]> msgs) {
for (ConsumerRecord<byte[], byte[]> msg : msgs) {
log.trace("Consuming message with key {}, value {}", msg.key(), msg.value());
SchemaAndValue keyAndSchema = keyConverter.toConnectData(msg.topic(), msg.key());
SchemaAndValue valueAndSchema = valueConverter.toConnectData(msg.topic(), msg.value());
messageBatch.add(
new SinkRecord(msg.topic(), msg.partition(),
keyAndSchema.schema(), keyAndSchema.value(),
valueAndSchema.schema(), valueAndSchema.value(),
msg.offset())
);
}
}
private void resumeAll() {
for (TopicPartition tp : consumer.assignment())
if (!context.pausedPartitions().contains(tp))
consumer.resume(singleton(tp));
}
private void pauseAll() {
consumer.pause(consumer.assignment());
}
private void deliverMessages() {
// Finally, deliver this batch to the sink
try {
// Since we reuse the messageBatch buffer, ensure we give the task its own copy
task.put(new ArrayList<>(messageBatch));
for (SinkRecord record : messageBatch)
currentOffsets.put(new TopicPartition(record.topic(), record.kafkaPartition()),
new OffsetAndMetadata(record.kafkaOffset() + 1));
messageBatch.clear();
// If we had paused all consumer topic partitions to try to redeliver data, then we should resume any that
// the task had not explicitly paused
if (pausedForRedelivery) {
if (!shouldPause())
resumeAll();
pausedForRedelivery = false;
}
} catch (RetriableException e) {
log.error("RetriableException from SinkTask {}:", id, e);
// If we're retrying a previous batch, make sure we've paused all topic partitions so we don't get new data,
// but will still be able to poll in order to handle user-requested timeouts, keep group membership, etc.
pausedForRedelivery = true;
pauseAll();
// Let this exit normally, the batch will be reprocessed on the next loop.
} catch (Throwable t) {
log.error("Task {} threw an uncaught and unrecoverable exception", id, t);
log.error("Task is being killed and will not recover until manually restarted");
throw new ConnectException("Exiting WorkerSinkTask due to unrecoverable exception.");
}
}
private void rewind() {
Map<TopicPartition, Long> offsets = context.offsets();
if (offsets.isEmpty()) {
return;
}
for (TopicPartition tp: offsets.keySet()) {
Long offset = offsets.get(tp);
if (offset != null) {
log.trace("Rewind {} to offset {}.", tp, offset);
consumer.seek(tp, offset);
lastCommittedOffsets.put(tp, new OffsetAndMetadata(offset));
currentOffsets.put(tp, new OffsetAndMetadata(offset));
}
}
context.clearOffsets();
}
private void openPartitions(Collection<TopicPartition> partitions) {
if (partitions.isEmpty())
return;
task.open(partitions);
}
private void closePartitions() {
commitOffsets(time.milliseconds(), true);
}
private class HandleRebalance implements ConsumerRebalanceListener {
@Override
public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
lastCommittedOffsets = new HashMap<>();
currentOffsets = new HashMap<>();
for (TopicPartition tp : partitions) {
long pos = consumer.position(tp);
lastCommittedOffsets.put(tp, new OffsetAndMetadata(pos));
currentOffsets.put(tp, new OffsetAndMetadata(pos));
log.debug("{} assigned topic partition {} with offset {}", id, tp, pos);
}
// If we paused everything for redelivery (which is no longer relevant since we discarded the data), make
// sure anything we paused that the task didn't request to be paused *and* which we still own is resumed.
// Also make sure our tracking of paused partitions is updated to remove any partitions we no longer own.
pausedForRedelivery = false;
// Ensure that the paused partitions contains only assigned partitions and repause as necessary
context.pausedPartitions().retainAll(partitions);
if (shouldPause())
pauseAll();
else if (!context.pausedPartitions().isEmpty())
consumer.pause(context.pausedPartitions());
// Instead of invoking the assignment callback on initialization, we guarantee the consumer is ready upon
// task start. Since this callback gets invoked during that initial setup before we've started the task, we
// need to guard against invoking the user's callback method during that period.
if (rebalanceException == null) {
try {
openPartitions(partitions);
} catch (RuntimeException e) {
// The consumer swallows exceptions raised in the rebalance listener, so we need to store
// exceptions and rethrow when poll() returns.
rebalanceException = e;
}
}
}
@Override
public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
try {
closePartitions();
} catch (RuntimeException e) {
// The consumer swallows exceptions raised in the rebalance listener, so we need to store
// exceptions and rethrow when poll() returns.
rebalanceException = e;
}
// Make sure we don't have any leftover data since offsets will be reset to committed positions
messageBatch.clear();
}
}
}