blob: 3b7ac8818eeaf131ffaa3724343ac95e498261df [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crunch.kafka.record;
import org.apache.crunch.CrunchRuntimeException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RetriableException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.Map;
import java.util.Properties;
import static org.apache.crunch.kafka.KafkaUtils.KAFKA_RETRY_ATTEMPTS_DEFAULT;
import static org.apache.crunch.kafka.KafkaUtils.KAFKA_RETRY_ATTEMPTS_KEY;
import static org.apache.crunch.kafka.KafkaUtils.getKafkaConnectionProperties;
import static org.apache.crunch.kafka.record.KafkaInputFormat.filterConnectionProperties;
import static org.apache.crunch.kafka.record.KafkaSource.CONSUMER_POLL_TIMEOUT_DEFAULT;
import static org.apache.crunch.kafka.record.KafkaSource.CONSUMER_POLL_TIMEOUT_KEY;
/**
* A {@link RecordReader} for pulling data from Kafka.
*
* @param <K> the key of the records from Kafka
* @param <V> the value of the records from Kafka
*/
public class KafkaRecordReader<K, V> extends RecordReader<ConsumerRecord<K, V>, Void> {
private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordReader.class);
private Consumer<K, V> consumer;
private ConsumerRecord<K, V> record;
private long endingOffset;
private Iterator<ConsumerRecord<K, V>> recordIterator;
private long consumerPollTimeout;
private long maxNumberOfRecords;
private long startingOffset;
private long currentOffset;
private int maxNumberAttempts;
private Properties kafkaConnectionProperties;
private TopicPartition topicPartition;
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
throws IOException, InterruptedException {
if (!(inputSplit instanceof KafkaInputSplit)) {
throw new CrunchRuntimeException("InputSplit for RecordReader is not valid split type.");
}
kafkaConnectionProperties = filterConnectionProperties(getKafkaConnectionProperties(taskAttemptContext.getConfiguration()));
consumer = buildConsumer(kafkaConnectionProperties);
KafkaInputSplit split = (KafkaInputSplit) inputSplit;
topicPartition = split.getTopicPartition();
consumer.assign(Collections.singletonList(topicPartition));
//suggested hack to gather info without gathering data
consumer.poll(0);
//now seek to the desired start location
startingOffset = split.getStartingOffset();
consumer.seek(topicPartition, startingOffset);
currentOffset = startingOffset - 1;
endingOffset = split.getEndingOffset();
maxNumberOfRecords = endingOffset - startingOffset;
if (LOG.isInfoEnabled()) {
LOG.info("Reading data from {} between {} and {}", new Object[] { topicPartition, startingOffset, endingOffset });
}
Configuration config = taskAttemptContext.getConfiguration();
consumerPollTimeout = config.getLong(CONSUMER_POLL_TIMEOUT_KEY, CONSUMER_POLL_TIMEOUT_DEFAULT);
maxNumberAttempts = config.getInt(KAFKA_RETRY_ATTEMPTS_KEY, KAFKA_RETRY_ATTEMPTS_DEFAULT);
}
/**
* Builds a new kafka consumer
*
* @param properties
* the properties to configure the consumer
* @return a new kafka consumer
*/
// Visible for testing
protected KafkaConsumer<K, V> buildConsumer(Properties properties) {
return new KafkaConsumer<>(properties);
}
/**
* @return the current offset for the reader
*/
// Visible for testing
protected long getCurrentOffset() {
return currentOffset;
}
@Override
public boolean nextKeyValue() throws IOException, InterruptedException {
if (hasPendingData()) {
loadRecords();
record = recordIterator.hasNext() ? recordIterator.next() : null;
if (record != null) {
LOG.debug("nextKeyValue: Retrieved record with offset {}", record.offset());
long oldOffset = currentOffset;
currentOffset = record.offset();
LOG.debug("Current offset will be updated to be [{}]", currentOffset);
if (LOG.isWarnEnabled() && (currentOffset - oldOffset > 1)) {
// The most likely scenario here is our start offset was deleted and an offset reset took place by the consumer
LOG.warn("Possible data loss in partition {} as offset {} was larger than expected {}",
new Object[] { topicPartition, currentOffset, oldOffset + 1});
}
if (currentOffset >= endingOffset) {
// We had pending data but read a record beyond our end offset so don't include it and stop reading
if (LOG.isWarnEnabled())
LOG.warn("Record offset {} is beyond our end offset {}. This could indicate data loss in partition {}",
new Object[] { currentOffset, endingOffset, topicPartition});
record = null;
return false;
}
return true;
} else if (isPartitionEmpty()) {
// If the partition is empty but we are expecting to read data we would read no records and
// see no errors so handle that here
if (LOG.isWarnEnabled())
LOG.warn("The partition {} is empty though we expected to read from {} to {}. This could indicate data loss",
new Object[] {topicPartition, currentOffset, endingOffset});
} else {
// We have pending data but we are unable to fetch any records so throw an exception and stop the job
throw new IOException("Unable to read additional data from Kafka. See logs for details. Partition " +
topicPartition + " Current Offset: " + currentOffset + " End Offset: " + endingOffset);
}
}
record = null;
return false;
}
private boolean isPartitionEmpty() {
// We don't need to fetch the ending offset as well since that is looked up when running the job. If the end offset had
// changed while running we would have read that record instead of reading no records and calling this method
return getEarliestOffset() == endingOffset;
}
@Override
public ConsumerRecord<K, V> getCurrentKey() throws IOException, InterruptedException {
return record;
}
@Override
public Void getCurrentValue() throws IOException, InterruptedException {
return null;
}
@Override
public float getProgress() throws IOException, InterruptedException {
//not most accurate but gives reasonable estimate
return ((float) (currentOffset - startingOffset + 1)) / maxNumberOfRecords;
}
private boolean hasPendingData() {
//offset range is exclusive at the end which means the ending offset is one higher
// than the actual physical last offset
return currentOffset < endingOffset - 1;
}
/**
* @return the record iterator used by the reader
*/
// Visble for testing
protected Iterator<ConsumerRecord<K, V>> getRecordIterator() {
return recordIterator;
}
/**
* Loads new records into the record iterator
*/
// Visible for testing
protected void loadRecords() {
if ((recordIterator == null) || !recordIterator.hasNext()) {
ConsumerRecords<K, V> records = null;
int numTries = 0;
boolean success = false;
while(!success && (numTries < maxNumberAttempts)) {
numTries++;
try {
records = getConsumer().poll(consumerPollTimeout);
} catch (RetriableException re) {
LOG.warn("Error pulling messages from Kafka", re);
}
// There was no error but we don't have any records. This could indicate a slowness or failure in Kafka's internal
// client or that the partition has no more data
if (records != null && records.isEmpty()){
if (LOG.isWarnEnabled())
LOG.warn("No records retrieved from partition {} with poll timeout {} but pending offsets to consume. Current "
+ "Offset: {}, End Offset: {}", new Object[] { topicPartition, consumerPollTimeout, currentOffset,
endingOffset });
} else if (records != null && !records.isEmpty()){
success = true;
}
if (!success && numTries < maxNumberAttempts) {
LOG.info("Record fetch attempt {} / {} failed, retrying", numTries, maxNumberAttempts);
}
else if (!success) {
if (LOG.isWarnEnabled())
LOG.warn("Record fetch attempt {} / {} failed. No more attempts left for partition {}",
new Object[] { numTries, maxNumberAttempts, topicPartition });
}
}
if ((records == null) || records.isEmpty()){
LOG.info("No records retrieved from Kafka partition {} therefore nothing to iterate over", topicPartition);
} else{
LOG.info("Retrieved {} records from Kafka partition {} to iterate over starting from offset {}", new Object[] {
records.count(), topicPartition, records.iterator().next().offset()});
}
recordIterator = records != null ? records.iterator() : ConsumerRecords.<K, V>empty().iterator();
}
}
/**
* @return the consumer
*/
protected Consumer<K, V> getConsumer() {
return consumer;
}
/**
* @return the earliest offset of the topic partition
*/
protected long getEarliestOffset() {
Map<TopicPartition, Long> brokerOffsets = consumer.beginningOffsets(
Collections.singletonList(topicPartition));
Long offset = brokerOffsets.get(topicPartition);
if(offset == null){
LOG.debug("Unable to determine earliest offset for {} so returning 0", topicPartition);
return 0L;
}
LOG.debug("Earliest offset for {} is {}", topicPartition, offset);
return offset;
}
@Override
public void close() throws IOException {
LOG.debug("Closing the record reader.");
if (consumer != null) {
consumer.close();
}
}
}