blob: 34a035c67a18a5ea30477a8b760da2d586ae05b1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.avro;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.TimeUnit;
import org.apache.avro.LogicalType;
import org.apache.avro.LogicalTypes;
import org.apache.avro.Schema;
import org.apache.avro.Schema.Type;
import org.apache.avro.file.DataFileReader;
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericContainer;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.mapred.FsInput;
import org.apache.avro.util.Utf8;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.AbstractRecordReader;
import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
import org.apache.drill.exec.util.ImpersonationUtil;
import org.apache.drill.exec.vector.complex.DictVector;
import org.apache.drill.exec.vector.complex.fn.FieldSelection;
import org.apache.drill.exec.vector.complex.impl.MapOrListWriterImpl;
import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
import org.apache.drill.exec.vector.complex.writer.BaseWriter;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.drill.shaded.guava.com.google.common.base.Charsets;
import org.apache.drill.shaded.guava.com.google.common.base.Stopwatch;
import io.netty.buffer.DrillBuf;
import org.joda.time.DateTimeConstants;
/**
* A RecordReader implementation for Avro data files.
*
* @see org.apache.drill.exec.store.RecordReader
*/
public class AvroRecordReader extends AbstractRecordReader {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(AvroRecordReader.class);
private final Path hadoop;
private final long start;
private final long end;
private final FieldSelection fieldSelection;
private final OptionManager optionManager;
private DrillBuf buffer;
private VectorContainerWriter writer;
private DataFileReader<GenericContainer> reader = null;
private FileSystem fs;
private final String opUserName;
private final String queryUserName;
private static final int DEFAULT_BATCH_SIZE = 4096;
public AvroRecordReader(final FragmentContext fragmentContext,
final Path inputPath,
final long start,
final long length,
final FileSystem fileSystem,
final List<SchemaPath> projectedColumns,
final String userName) {
hadoop = inputPath;
this.start = start;
this.end = start + length;
buffer = fragmentContext.getManagedBuffer();
this.fs = fileSystem;
this.opUserName = userName;
this.queryUserName = fragmentContext.getQueryUserName();
setColumns(projectedColumns);
this.fieldSelection = FieldSelection.getFieldSelection(projectedColumns);
optionManager = fragmentContext.getOptions();
}
private DataFileReader<GenericContainer> getReader(final Path hadoop, final FileSystem fs) throws ExecutionSetupException {
try {
final UserGroupInformation ugi = ImpersonationUtil.createProxyUgi(this.opUserName, this.queryUserName);
return ugi.doAs((PrivilegedExceptionAction<DataFileReader<GenericContainer>>) () ->
new DataFileReader<>(new FsInput(hadoop, fs.getConf()), new GenericDatumReader<GenericContainer>()));
} catch (IOException | InterruptedException e) {
throw new ExecutionSetupException(
String.format("Error in creating avro reader for file: %s", hadoop), e);
}
}
@Override
public void setup(final OperatorContext context, final OutputMutator output) throws ExecutionSetupException {
writer = new VectorContainerWriter(output);
try {
reader = getReader(hadoop, fs);
logger.debug("Processing file : {}, start position : {}, end position : {} ", hadoop, start, end);
reader.sync(this.start);
} catch (IOException e) {
throw new ExecutionSetupException(e);
}
}
@Override
public int next() {
final Stopwatch watch = Stopwatch.createStarted();
if (reader == null) {
throw new IllegalStateException("Avro reader is not open.");
}
if (!reader.hasNext()) {
return 0;
}
int recordCount = 0;
writer.allocate();
writer.reset();
try {
for (GenericContainer container = null;
recordCount < DEFAULT_BATCH_SIZE && reader.hasNext() && !reader.pastSync(end);
recordCount++) {
writer.setPosition(recordCount);
container = reader.next(container);
processRecord(container, container.getSchema());
}
writer.setValueCount(recordCount);
} catch (IOException e) {
throw new DrillRuntimeException(e);
}
logger.debug("Read {} records in {} ms", recordCount, watch.elapsed(TimeUnit.MILLISECONDS));
return recordCount;
}
private void processRecord(final GenericContainer container, final Schema schema) {
final Schema.Type type = schema.getType();
switch (type) {
case RECORD:
process(container, schema, null, new MapOrListWriterImpl(writer.rootAsMap()), fieldSelection);
break;
default:
throw new DrillRuntimeException("Root object must be record type. Found: " + type);
}
}
private void process(final Object value, final Schema schema, final String fieldName, MapOrListWriterImpl writer, FieldSelection fieldSelection) {
if (value == null) {
return;
}
final Schema.Type type = schema.getType();
switch (type) {
case RECORD:
// list field of MapOrListWriter will be non null when we want to store array of maps/records.
MapOrListWriterImpl _writer = writer;
for (final Schema.Field field : schema.getFields()) {
if (field.schema().getType() == Schema.Type.RECORD ||
(field.schema().getType() == Schema.Type.UNION &&
field.schema().getTypes().get(0).getType() == Schema.Type.NULL &&
field.schema().getTypes().get(1).getType() == Schema.Type.RECORD)) {
_writer = (MapOrListWriterImpl) writer.map(field.name());
}
process(((GenericRecord) value).get(field.name()), field.schema(), field.name(), _writer, fieldSelection.getChild(field.name()));
}
break;
case ARRAY:
assert fieldName != null;
final GenericArray<?> array = (GenericArray<?>) value;
Schema elementSchema = array.getSchema().getElementType();
Type elementType = elementSchema.getType();
if (elementType == Schema.Type.RECORD) {
writer = (MapOrListWriterImpl) writer.list(fieldName).listoftmap(fieldName);
} else if (elementType == Schema.Type.MAP) {
writer = (MapOrListWriterImpl) writer.list(fieldName);
writer.listOfDict();
} else {
writer = (MapOrListWriterImpl) writer.list(fieldName);
}
for (final Object o : array) {
writer.start();
process(o, elementSchema, fieldName, writer, fieldSelection.getChild(fieldName));
writer.end();
}
break;
case UNION:
// currently supporting only nullable union (optional fields) like ["null", "some-type"].
if (schema.getTypes().get(0).getType() != Schema.Type.NULL) {
throw new UnsupportedOperationException("Avro union type must be of the format : [\"null\", \"some-type\"]");
}
process(value, schema.getTypes().get(1), fieldName, writer, fieldSelection);
break;
case MAP:
@SuppressWarnings("unchecked")
Map<Object, Object> map = (Map<Object, Object>) value;
// key type in Avro MAP is assumed to be string
Schema keySchema = Schema.create(Type.STRING);
Schema valueSchema = schema.getValueType();
writer = (MapOrListWriterImpl) writer.dict(fieldName);
BaseWriter.DictWriter dictWriter = (BaseWriter.DictWriter) writer.map;
dictWriter.start();
for (Entry<Object, Object> entry : map.entrySet()) {
dictWriter.startKeyValuePair();
processPrimitive(entry.getKey(), keySchema, DictVector.FIELD_KEY_NAME, writer);
process(entry.getValue(), valueSchema, DictVector.FIELD_VALUE_NAME, writer, FieldSelection.ALL_VALID);
dictWriter.endKeyValuePair();
}
dictWriter.end();
break;
case FIXED:
case ENUM: // Enum symbols are strings
case NULL: // Treat null type as a primitive
default:
assert fieldName != null;
if (writer.isMapWriter()) {
if (fieldSelection.isNeverValid()) {
break;
}
}
processPrimitive(value, schema, fieldName, writer);
break;
}
}
private void processPrimitive(final Object value, final Schema schema, final String fieldName,
final MapOrListWriterImpl writer) {
LogicalType logicalType = schema.getLogicalType();
String logicalTypeName = logicalType != null ? logicalType.getName() : "";
if (value == null) {
return;
}
switch (schema.getType()) {
case STRING:
byte[] binary;
final int length;
if (value instanceof Utf8) {
binary = ((Utf8) value).getBytes();
length = ((Utf8) value).getByteLength();
} else {
binary = value.toString().getBytes(Charsets.UTF_8);
length = binary.length;
}
ensure(length);
buffer.setBytes(0, binary);
writer.varChar(fieldName).writeVarChar(0, length, buffer);
break;
case INT:
switch (logicalTypeName) {
case "date":
writer.date(fieldName).writeDate((int) value * (long) DateTimeConstants.MILLIS_PER_DAY);
break;
case "time-millis":
writer.time(fieldName).writeTime((Integer) value);
break;
default:
writer.integer(fieldName).writeInt((Integer) value);
}
break;
case LONG:
switch (logicalTypeName) {
case "date":
writer.date(fieldName).writeDate((Long) value);
break;
case "time-micros":
writer.time(fieldName).writeTime((int) ((long) value / 1000));
break;
case "timestamp-millis":
writer.timeStamp(fieldName).writeTimeStamp((Long) value);
break;
case "timestamp-micros":
writer.timeStamp(fieldName).writeTimeStamp((long) value / 1000);
break;
default:
writer.bigInt(fieldName).writeBigInt((Long) value);
}
break;
case FLOAT:
writer.float4(fieldName).writeFloat4((Float) value);
break;
case DOUBLE:
writer.float8(fieldName).writeFloat8((Double) value);
break;
case BOOLEAN:
writer.bit(fieldName).writeBit((Boolean) value ? 1 : 0);
break;
case BYTES:
final ByteBuffer buf = (ByteBuffer) value;
length = buf.remaining();
ensure(length);
buffer.setBytes(0, buf);
switch (logicalTypeName) {
case "decimal":
ParquetReaderUtility.checkDecimalTypeEnabled(optionManager);
LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) logicalType;
writer.varDecimal(fieldName, decimalType.getPrecision(), decimalType.getScale())
.writeVarDecimal(0, length, buffer, decimalType.getPrecision(), decimalType.getScale());
break;
default:
writer.binary(fieldName).writeVarBinary(0, length, buffer);
}
break;
case NULL:
// Nothing to do for null type
break;
case ENUM:
final String symbol = value.toString();
final byte[] b;
try {
b = symbol.getBytes(Charsets.UTF_8.name());
} catch (UnsupportedEncodingException e) {
throw new DrillRuntimeException("Unable to read enum value for field: " + fieldName, e);
}
ensure(b.length);
buffer.setBytes(0, b);
writer.varChar(fieldName).writeVarChar(0, b.length, buffer);
break;
case FIXED:
GenericFixed genericFixed = (GenericFixed) value;
switch (logicalTypeName) {
case "decimal":
ParquetReaderUtility.checkDecimalTypeEnabled(optionManager);
LogicalTypes.Decimal decimalType = (LogicalTypes.Decimal) logicalType;
writer.varDecimal(fieldName, decimalType.getPrecision(), decimalType.getScale())
.writeVarDecimal(new BigDecimal(new BigInteger(genericFixed.bytes()), decimalType.getScale()));
break;
default:
throw new UnsupportedOperationException("Unimplemented type: " + schema.getType().toString());
}
break;
default:
throw new DrillRuntimeException("Unhandled Avro type: " + schema.getType().toString());
}
}
private boolean selected(SchemaPath field) {
if (isStarQuery()) {
return true;
}
for (final SchemaPath sp : getColumns()) {
if (sp.contains(field)) {
return true;
}
}
return false;
}
private void ensure(final int length) {
buffer = buffer.reallocIfNeeded(length);
}
@Override
public void close() {
if (reader != null) {
try {
reader.close();
} catch (IOException e) {
logger.warn("Error closing Avro reader", e);
} finally {
reader = null;
}
}
}
@Override
public String toString() {
long currentPosition = -1L;
try {
if (reader != null) {
currentPosition = reader.tell();
}
} catch (IOException e) {
logger.trace("Unable to obtain reader position.", e);
}
return "AvroRecordReader[File=" + hadoop
+ ", Position=" + currentPosition
+ "]";
}
}