blob: 3a43f968a102a44410f8ce15cf6671d144361fb7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/
package org.apache.kafka.connect.runtime;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;
import org.apache.kafka.connect.storage.Converter;
import org.apache.kafka.connect.storage.OffsetStorageReader;
import org.apache.kafka.connect.storage.OffsetStorageWriter;
import org.apache.kafka.connect.util.ConnectorTaskId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.IdentityHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* WorkerTask that uses a SourceTask to ingest data into Kafka.
*/
class WorkerSourceTask extends WorkerTask {
private static final Logger log = LoggerFactory.getLogger(WorkerSourceTask.class);
private static final long SEND_FAILED_BACKOFF_MS = 100;
private final WorkerConfig workerConfig;
private final SourceTask task;
private final Converter keyConverter;
private final Converter valueConverter;
private KafkaProducer<byte[], byte[]> producer;
private final OffsetStorageReader offsetReader;
private final OffsetStorageWriter offsetWriter;
private final Time time;
private List<SourceRecord> toSend;
private boolean lastSendFailed; // Whether the last send failed *synchronously*, i.e. never made it into the producer's RecordAccumulator
// Use IdentityHashMap to ensure correctness with duplicate records. This is a HashMap because
// there is no IdentityHashSet.
private IdentityHashMap<ProducerRecord<byte[], byte[]>, ProducerRecord<byte[], byte[]>> outstandingMessages;
// A second buffer is used while an offset flush is running
private IdentityHashMap<ProducerRecord<byte[], byte[]>, ProducerRecord<byte[], byte[]>> outstandingMessagesBacklog;
private boolean flushing;
private CountDownLatch stopRequestedLatch;
private Map<String, String> taskConfig;
private boolean finishedStart = false;
private boolean startedShutdownBeforeStartCompleted = false;
public WorkerSourceTask(ConnectorTaskId id,
SourceTask task,
TaskStatus.Listener lifecycleListener,
Converter keyConverter,
Converter valueConverter,
KafkaProducer<byte[], byte[]> producer,
OffsetStorageReader offsetReader,
OffsetStorageWriter offsetWriter,
WorkerConfig workerConfig,
Time time) {
super(id, lifecycleListener);
this.workerConfig = workerConfig;
this.task = task;
this.keyConverter = keyConverter;
this.valueConverter = valueConverter;
this.producer = producer;
this.offsetReader = offsetReader;
this.offsetWriter = offsetWriter;
this.time = time;
this.toSend = null;
this.lastSendFailed = false;
this.outstandingMessages = new IdentityHashMap<>();
this.outstandingMessagesBacklog = new IdentityHashMap<>();
this.flushing = false;
this.stopRequestedLatch = new CountDownLatch(1);
}
@Override
public void initialize(Map<String, String> config) {
this.taskConfig = config;
}
protected void close() {
// nothing to do
}
@Override
public void stop() {
super.stop();
stopRequestedLatch.countDown();
synchronized (this) {
if (finishedStart)
task.stop();
else
startedShutdownBeforeStartCompleted = true;
}
}
@Override
public void execute() {
try {
task.initialize(new WorkerSourceTaskContext(offsetReader));
task.start(taskConfig);
log.info("Source task {} finished initialization and start", this);
synchronized (this) {
if (startedShutdownBeforeStartCompleted) {
task.stop();
return;
}
finishedStart = true;
}
while (!isStopping()) {
if (toSend == null) {
log.debug("Nothing to send to Kafka. Polling source for additional records");
toSend = task.poll();
}
if (toSend == null)
continue;
log.debug("About to send " + toSend.size() + " records to Kafka");
if (!sendRecords())
stopRequestedLatch.await(SEND_FAILED_BACKOFF_MS, TimeUnit.MILLISECONDS);
}
} catch (InterruptedException e) {
// Ignore and allow to exit.
} finally {
// It should still be safe to commit offsets since any exception would have
// simply resulted in not getting more records but all the existing records should be ok to flush
// and commit offsets. Worst case, task.flush() will also throw an exception causing the offset commit
// to fail.
commitOffsets();
}
}
/**
* Try to send a batch of records. If a send fails and is retriable, this saves the remainder of the batch so it can
* be retried after backing off. If a send fails and is not retriable, this will throw a ConnectException.
* @return true if all messages were sent, false if some need to be retried
*/
private boolean sendRecords() {
int processed = 0;
for (final SourceRecord record : toSend) {
byte[] key = keyConverter.fromConnectData(record.topic(), record.keySchema(), record.key());
byte[] value = valueConverter.fromConnectData(record.topic(), record.valueSchema(), record.value());
final ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(record.topic(), record.kafkaPartition(), key, value);
log.trace("Appending record with key {}, value {}", record.key(), record.value());
// We need this queued first since the callback could happen immediately (even synchronously in some cases).
// Because of this we need to be careful about handling retries -- we always save the previously attempted
// record as part of toSend and need to use a flag to track whether we should actually add it to the outstanding
// messages and update the offsets.
synchronized (this) {
if (!lastSendFailed) {
if (!flushing) {
outstandingMessages.put(producerRecord, producerRecord);
} else {
outstandingMessagesBacklog.put(producerRecord, producerRecord);
}
// Offsets are converted & serialized in the OffsetWriter
offsetWriter.offset(record.sourcePartition(), record.sourceOffset());
}
}
try {
producer.send(
producerRecord,
new Callback() {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
// Given the default settings for zero data loss, this should basically never happen --
// between "infinite" retries, indefinite blocking on full buffers, and "infinite" request
// timeouts, callbacks with exceptions should never be invoked in practice. If the
// user overrode these settings, the best we can do is notify them of the failure via
// logging.
log.error("{} failed to send record to {}: {}", id, record.topic(), e);
log.debug("Failed record: topic {}, Kafka partition {}, key {}, value {}, source offset {}, source partition {}",
record.topic(), record.kafkaPartition(), record.key(), record.value(),
record.sourceOffset(), record.sourcePartition());
} else {
log.trace("Wrote record successfully: topic {} partition {} offset {}",
recordMetadata.topic(), recordMetadata.partition(),
recordMetadata.offset());
commitTaskRecord(record);
}
recordSent(producerRecord);
}
});
lastSendFailed = false;
} catch (RetriableException e) {
log.warn("Failed to send {}, backing off before retrying:", producerRecord, e);
toSend = toSend.subList(processed, toSend.size());
lastSendFailed = true;
return false;
} catch (KafkaException e) {
throw new ConnectException("Unrecoverable exception trying to send", e);
}
processed++;
}
toSend = null;
return true;
}
private void commitTaskRecord(SourceRecord record) {
try {
task.commitRecord(record);
} catch (InterruptedException e) {
log.error("Exception thrown", e);
}
}
private synchronized void recordSent(final ProducerRecord<byte[], byte[]> record) {
ProducerRecord<byte[], byte[]> removed = outstandingMessages.remove(record);
// While flushing, we may also see callbacks for items in the backlog
if (removed == null && flushing)
removed = outstandingMessagesBacklog.remove(record);
// But if neither one had it, something is very wrong
if (removed == null) {
log.error("CRITICAL Saw callback for record that was not present in the outstanding message set: "
+ "{}", record);
} else if (flushing && outstandingMessages.isEmpty()) {
// flush thread may be waiting on the outstanding messages to clear
this.notifyAll();
}
}
public boolean commitOffsets() {
long commitTimeoutMs = workerConfig.getLong(WorkerConfig.OFFSET_COMMIT_TIMEOUT_MS_CONFIG);
log.debug("{} Committing offsets", this);
long started = time.milliseconds();
long timeout = started + commitTimeoutMs;
synchronized (this) {
// First we need to make sure we snapshot everything in exactly the current state. This
// means both the current set of messages we're still waiting to finish, stored in this
// class, which setting flushing = true will handle by storing any new values into a new
// buffer; and the current set of user-specified offsets, stored in the
// OffsetStorageWriter, for which we can use beginFlush() to initiate the snapshot.
flushing = true;
boolean flushStarted = offsetWriter.beginFlush();
// Still wait for any producer records to flush, even if there aren't any offsets to write
// to persistent storage
// Next we need to wait for all outstanding messages to finish sending
log.debug("{} flushing {} outstanding messages for offset commit", this, outstandingMessages.size());
while (!outstandingMessages.isEmpty()) {
try {
long timeoutMs = timeout - time.milliseconds();
if (timeoutMs <= 0) {
log.error(
"Failed to flush {}, timed out while waiting for producer to flush outstanding "
+ "messages, {} left ({})", this, outstandingMessages.size(), outstandingMessages);
finishFailedFlush();
return false;
}
this.wait(timeoutMs);
} catch (InterruptedException e) {
// We can get interrupted if we take too long committing when the work thread shutdown is requested,
// requiring a forcible shutdown. Give up since we can't safely commit any offsets, but also need
// to stop immediately
log.error("{} Interrupted while flushing messages, offsets will not be committed", this);
finishFailedFlush();
return false;
}
}
if (!flushStarted) {
// There was nothing in the offsets to process, but we still waited for the data in the
// buffer to flush. This is useful since this can feed into metrics to monitor, e.g.
// flush time, which can be used for monitoring even if the connector doesn't record any
// offsets.
finishSuccessfulFlush();
log.debug("Finished {} offset commitOffsets successfully in {} ms",
this, time.milliseconds() - started);
commitSourceTask();
return true;
}
}
// Now we can actually flush the offsets to user storage.
Future<Void> flushFuture = offsetWriter.doFlush(new org.apache.kafka.connect.util.Callback<Void>() {
@Override
public void onCompletion(Throwable error, Void result) {
if (error != null) {
log.error("Failed to flush {} offsets to storage: ", this, error);
} else {
log.trace("Finished flushing {} offsets to storage", this);
}
}
});
// Very rare case: offsets were unserializable and we finished immediately, unable to store
// any data
if (flushFuture == null) {
finishFailedFlush();
return false;
}
try {
flushFuture.get(Math.max(timeout - time.milliseconds(), 0), TimeUnit.MILLISECONDS);
// There's a small race here where we can get the callback just as this times out (and log
// success), but then catch the exception below and cancel everything. This won't cause any
// errors, is only wasteful in this minor edge case, and the worst result is that the log
// could look a little confusing.
} catch (InterruptedException e) {
log.warn("Flush of {} offsets interrupted, cancelling", this);
finishFailedFlush();
return false;
} catch (ExecutionException e) {
log.error("Flush of {} offsets threw an unexpected exception: ", this, e);
finishFailedFlush();
return false;
} catch (TimeoutException e) {
log.error("Timed out waiting to flush {} offsets to storage", this);
finishFailedFlush();
return false;
}
finishSuccessfulFlush();
log.info("Finished {} commitOffsets successfully in {} ms",
this, time.milliseconds() - started);
commitSourceTask();
return true;
}
private void commitSourceTask() {
try {
this.task.commit();
} catch (InterruptedException ex) {
log.warn("Commit interrupted", ex);
} catch (Throwable ex) {
log.error("Exception thrown while calling task.commit()", ex);
}
}
private synchronized void finishFailedFlush() {
offsetWriter.cancelFlush();
outstandingMessages.putAll(outstandingMessagesBacklog);
outstandingMessagesBacklog.clear();
flushing = false;
}
private synchronized void finishSuccessfulFlush() {
// If we were successful, we can just swap instead of replacing items back into the original map
IdentityHashMap<ProducerRecord<byte[], byte[]>, ProducerRecord<byte[], byte[]>> temp = outstandingMessages;
outstandingMessages = outstandingMessagesBacklog;
outstandingMessagesBacklog = temp;
flushing = false;
}
@Override
public String toString() {
return "WorkerSourceTask{" +
"id=" + id +
'}';
}
}