blob: 5f55bbce20b4333451575aa3d725146f56dd889f [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.kafka;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.exec.Utilities;
import org.apache.hadoop.hive.ql.exec.vector.VectorAssignRow;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.MapWork;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Iterator;
import java.util.Properties;
/**
* Vectorized Kafka record reader.
*/
class VectorizedKafkaRecordReader implements RecordReader<NullWritable, VectorizedRowBatch> {
private static final Logger LOG = LoggerFactory.getLogger(VectorizedKafkaRecordReader.class);
private final KafkaConsumer<byte[], byte[]> consumer;
private final Iterator<ConsumerRecord<byte[], byte[]>> recordsCursor;
private long totalNumberRecords = 0L;
private long consumedRecords = 0L;
private long readBytes = 0L;
private final VectorizedRowBatchCtx rbCtx;
/**
* actual projected columns needed by the query, this can be empty in case of query like: select count(*) from src;
*/
private final int[] projectedColumns;
/**
* underlying row deserializer.
*/
private final KafkaSerDe serDe;
private final VectorAssignRow vectorAssignRow = new VectorAssignRow();
private final KafkaWritable kafkaWritable = new KafkaWritable();
final Object[] row;
VectorizedKafkaRecordReader(KafkaInputSplit inputSplit, Configuration jobConf) {
// VectorBatch Context initializing
this.rbCtx = Utilities.getVectorizedRowBatchCtx(jobConf);
if (rbCtx.getDataColumnNums() != null) {
projectedColumns = rbCtx.getDataColumnNums();
} else {
// case all the columns are selected
projectedColumns = new int[rbCtx.getRowColumnTypeInfos().length];
for (int i = 0; i < projectedColumns.length; i++) {
projectedColumns[i] = i;
}
}
// row parser and row assigner initializing
serDe = createAndInitializeSerde(jobConf);
try {
vectorAssignRow.init((StructObjectInspector) serDe.getObjectInspector());
} catch (HiveException e) {
throw new RuntimeException(e);
}
// Kafka iterator initializing
long startOffset = inputSplit.getStartOffset();
long endOffset = inputSplit.getEndOffset();
TopicPartition topicPartition = new TopicPartition(inputSplit.getTopic(), inputSplit.getPartition());
Preconditions.checkState(startOffset >= 0 && startOffset <= endOffset,
"Start [%s] has to be positive and Less than or equal to End [%s]",
startOffset,
endOffset);
totalNumberRecords += endOffset - startOffset;
final Properties properties = KafkaUtils.consumerProperties(jobConf);
consumer = new KafkaConsumer<>(properties);
long pollTimeout = jobConf.getLong(KafkaTableProperties.KAFKA_POLL_TIMEOUT.getName(), -1);
LOG.debug("Consumer poll timeout [{}] ms", pollTimeout);
this.recordsCursor =
startOffset == endOffset ?
new KafkaRecordReader.EmptyIterator() :
new KafkaRecordIterator(consumer, topicPartition, startOffset, endOffset, pollTimeout);
// row has to be as wide as the entire kafka row plus metadata
row = new Object[((StructObjectInspector) serDe.getObjectInspector()).getAllStructFieldRefs().size()];
}
@Override public boolean next(NullWritable nullWritable, VectorizedRowBatch vectorizedRowBatch) throws IOException {
vectorizedRowBatch.reset();
try {
return readNextBatch(vectorizedRowBatch, recordsCursor) > 0;
} catch (SerDeException e) {
throw new IOException("Serde exception", e);
}
}
private void cleanRowBoat() {
for (int i = 0; i < row.length; i++) {
row[i] = null;
}
}
@Override public NullWritable createKey() {
return NullWritable.get();
}
@Override public VectorizedRowBatch createValue() {
return rbCtx.createVectorizedRowBatch();
}
@Override public long getPos() throws IOException {
return -1;
}
@Override public float getProgress() {
if (consumedRecords >= totalNumberRecords) {
return 1f;
} else if (consumedRecords == 0) {
return 0f;
}
return consumedRecords * 1.0f / totalNumberRecords;
}
@Override public void close() {
LOG.trace("total read bytes [{}]", readBytes);
if (consumer != null) {
consumer.wakeup();
consumer.close();
}
}
private int readNextBatch(VectorizedRowBatch vectorizedRowBatch,
Iterator<ConsumerRecord<byte[], byte[]>> recordIterator) throws SerDeException {
int rowsCount = 0;
while (recordIterator.hasNext() && rowsCount < vectorizedRowBatch.getMaxSize()) {
ConsumerRecord<byte[], byte[]> kRecord = recordIterator.next();
kafkaWritable.set(kRecord);
readBytes += kRecord.serializedKeySize() + kRecord.serializedValueSize();
if (projectedColumns.length > 0) {
serDe.deserializeKWritable(kafkaWritable, row);
for (int i : projectedColumns) {
vectorAssignRow.assignRowColumn(vectorizedRowBatch, rowsCount, i, row[i]);
}
}
rowsCount++;
}
vectorizedRowBatch.size = rowsCount;
consumedRecords += rowsCount;
cleanRowBoat();
return rowsCount;
}
@SuppressWarnings("Duplicates") private static KafkaSerDe createAndInitializeSerde(Configuration jobConf) {
KafkaSerDe serDe = new KafkaSerDe();
MapWork mapWork = Preconditions.checkNotNull(Utilities.getMapWork(jobConf), "Map work is null");
Properties
properties =
mapWork.getPartitionDescs()
.stream()
.map(partitionDesc -> partitionDesc.getTableDesc().getProperties())
.findAny()
.orElseThrow(() -> new RuntimeException("Can not find table property at the map work"));
try {
serDe.initialize(jobConf, properties, null);
} catch (SerDeException e) {
throw new RuntimeException("Can not initialized the serde", e);
}
return serDe;
}
}