blob: 290b4799eedb746c515e90b5bb0a8cce899b2a47 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.parquet.columnreaders;
import io.netty.buffer.DrillBuf;
import java.io.IOException;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.vector.BaseDataValueVector;
import org.apache.drill.exec.vector.ValueVector;
import com.google.common.collect.ImmutableSet;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.Encoding;
import org.apache.parquet.format.SchemaElement;
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
public abstract class ColumnReader<V extends ValueVector> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ColumnReader.class);
public static final Set<Encoding> DICTIONARY_ENCODINGS = ImmutableSet.of(
Encoding.PLAIN_DICTIONARY,
Encoding.RLE_DICTIONARY
);
public static final Set<Encoding> VALUE_ENCODINGS = ImmutableSet.<Encoding>builder()
.addAll(DICTIONARY_ENCODINGS)
.add(Encoding.DELTA_BINARY_PACKED)
.add(Encoding.DELTA_BYTE_ARRAY)
.add(Encoding.DELTA_LENGTH_BYTE_ARRAY)
.build();
final ParquetRecordReader parentReader;
// Value Vector for this column
final V valueVec;
ColumnDescriptor getColumnDescriptor() {
return columnDescriptor;
}
// column description from the parquet library
final ColumnDescriptor columnDescriptor;
// metadata of the column, from the parquet library
final ColumnChunkMetaData columnChunkMetaData;
// status information on the current page
PageReader pageReader;
final SchemaElement schemaElement;
boolean usingDictionary;
// quick reference to see if the field is fixed length (as this requires an instanceof)
final boolean isFixedLength;
// counter for the total number of values read from one or more pages
// when a batch is filled all of these values should be the same for all of the columns
int totalValuesRead;
// counter for the values that have been read in this pass (a single call to the next() method)
int valuesReadInCurrentPass;
// length of single data value in bits, if the length is fixed
int dataTypeLengthInBits;
int bytesReadInCurrentPass;
protected DrillBuf vectorData;
// when reading definition levels for nullable columns, it is a one-way stream of integers
// when reading var length data, where we don't know if all of the records will fit until we've read all of them
// we must store the last definition level and use it at the start of the next batch
int currDefLevel;
// variables for a single read pass
long readStartInBytes = 0;
long readLength = 0;
long readLengthInBits = 0;
long recordsReadInThisIteration = 0;
private ExecutorService threadPool;
volatile boolean isShuttingDown; //Indicate to not submit any new AsyncPageReader Tasks during clear()
protected ColumnReader(ParquetRecordReader parentReader, ColumnDescriptor descriptor,
ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, V v, SchemaElement schemaElement) throws ExecutionSetupException {
this.parentReader = parentReader;
this.columnDescriptor = descriptor;
this.columnChunkMetaData = columnChunkMetaData;
this.isFixedLength = fixedLength;
this.schemaElement = schemaElement;
this.valueVec = v;
this.pageReader = parentReader.useAsyncPageReader
? new AsyncPageReader(this, parentReader.getFileSystem(), parentReader.getHadoopPath())
: new PageReader(this, parentReader.getFileSystem(), parentReader.getHadoopPath());
try {
pageReader.init();
} catch (IOException e) {
UserException ex = UserException.dataReadError(e)
.message("Error initializing page reader for Parquet file")
.pushContext("Row Group Start: ", this.columnChunkMetaData.getStartingPos())
.pushContext("Column: ", this.schemaElement.getName())
.pushContext("File: ", this.parentReader.getHadoopPath().toString() )
.build(logger);
throw ex;
}
if (columnDescriptor.getType() != PrimitiveType.PrimitiveTypeName.BINARY) {
if (columnDescriptor.getType() == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
dataTypeLengthInBits = columnDescriptor.getTypeLength() * 8;
} else {
dataTypeLengthInBits = ParquetColumnMetadata.getTypeLengthInBits(columnDescriptor.getType());
}
}
threadPool = parentReader.getOperatorContext().getScanDecodeExecutor();
}
public int getRecordsReadInCurrentPass() {
return valuesReadInCurrentPass;
}
public Future<Long> processPagesAsync(long recordsToReadInThisPass){
Future<Long> r = (isShuttingDown ? null : threadPool.submit(new ColumnReaderProcessPagesTask(recordsToReadInThisPass)));
return r;
}
public void processPages(long recordsToReadInThisPass) throws IOException {
reset();
if(recordsToReadInThisPass>0) {
do {
determineSize(recordsToReadInThisPass);
} while (valuesReadInCurrentPass < recordsToReadInThisPass && pageReader.hasPage());
}
logger.trace("Column Reader: {} - Values read in this pass: {} - ",
this.getColumnDescriptor().toString(), valuesReadInCurrentPass);
valueVec.getMutator().setValueCount(valuesReadInCurrentPass);
}
public void clear() {
//State to indicate no more tasks to be scheduled
isShuttingDown = true;
valueVec.clear();
pageReader.clear();
}
public void readValues(long recordsToRead) {
try {
readField(recordsToRead);
valuesReadInCurrentPass += (int)recordsReadInThisIteration;
pageReader.valuesRead += (int)recordsReadInThisIteration;
pageReader.readPosInBytes = readStartInBytes + readLength;
} catch (Exception e) {
UserException ex = UserException.dataReadError(e)
.message("Error reading from Parquet file")
.pushContext("Row Group Start: ", this.columnChunkMetaData.getStartingPos())
.pushContext("Column: ", this.schemaElement.getName())
.pushContext("File: ", this.parentReader.getHadoopPath().toString() )
.build(logger);
throw ex;
}
}
protected abstract void readField(long recordsToRead);
/**
* Determines the size of a single value in a variable column.
*
* Return value indicates if we have finished a row group and should stop reading
*
* @param recordsReadInCurrentPass records read in current pass
* @return true if we should stop reading
* @throws IOException
*/
public boolean determineSize(long recordsReadInCurrentPass) throws IOException {
if (readPage()) {
return true;
}
if (processPageData((int) recordsReadInCurrentPass)) {
return true;
}
return checkVectorCapacityReached();
}
protected Future<Integer> readRecordsAsync(int recordsToRead) {
Future<Integer> r = (isShuttingDown ? null : threadPool.submit(new ColumnReaderReadRecordsTask(recordsToRead)));
return r;
}
protected void readRecords(int recordsToRead) {
for (int i = 0; i < recordsToRead; i++) {
readField(i);
}
pageReader.valuesRead += recordsToRead;
}
protected int readRecordsInBulk(int recordsToReadInThisPass) throws IOException {
throw new UnsupportedOperationException();
}
protected boolean recordsRequireDecoding() {
return usingDictionary || !Collections.disjoint(VALUE_ENCODINGS, columnChunkMetaData.getEncodings());
}
protected boolean processPageData(int recordsToReadInThisPass) throws IOException {
readValues(recordsToReadInThisPass);
return true;
}
public void updatePosition() {}
public void updateReadyToReadPosition() {}
public void reset() {
readStartInBytes = 0;
readLength = 0;
readLengthInBits = 0;
recordsReadInThisIteration = 0;
bytesReadInCurrentPass = 0;
vectorData = ((BaseDataValueVector) valueVec).getBuffer();
}
public int capacity() {
return (int) (valueVec.getValueCapacity() * dataTypeLengthInBits / 8.0);
}
public Future<Boolean> readPageAsync() {
Future<Boolean> f = threadPool.submit(new Callable<Boolean>() {
@Override public Boolean call() throws Exception {
return Boolean.valueOf(readPage());
}
});
return f;
}
/**
* Read a page. If we need more data, exit the read loop and return true.
*
* @return true if we need more data and page is not read successfully
* @throws IOException
*/
public boolean readPage() throws IOException {
if (!pageReader.hasPage()
|| totalValuesReadAndReadyToReadInPage() == pageReader.pageValueCount) {
readRecords(pageReader.valuesReadyToRead);
if (pageReader.hasPage()) {
totalValuesRead += pageReader.pageValueCount;
}
if (!pageReader.next()) {
hitRowGroupEnd();
return true;
}
postPageRead();
}
return false;
}
protected int totalValuesReadAndReadyToReadInPage() {
return pageReader.valuesRead + pageReader.valuesReadyToRead;
}
protected void postPageRead() {
pageReader.valuesReadyToRead = 0;
}
protected void hitRowGroupEnd() {}
protected boolean checkVectorCapacityReached() {
// Here "bits" means "bytes"
// But, inside "capacity", "bits" sometimes means "bits".
// Note that bytesReadInCurrentPass is never updated, so this next
// line is a no-op.
if (bytesReadInCurrentPass + dataTypeLengthInBits > capacity()) {
logger.debug("Reached the capacity of the data vector in a variable length value vector.");
return true;
}
// No op: already checked this earlier and would not be here if this
// condition is true.
return valuesReadInCurrentPass > valueVec.getValueCapacity();
}
/**
* This is copied out of Parquet library, didn't want to deal with the
* unnecessary throws statement they had declared
*
* @param in incoming data
* @param offset offset
* @return little endian integer
*/
public static int readIntLittleEndian(DrillBuf in, int offset) {
int ch4 = in.getByte(offset) & 0xff;
int ch3 = in.getByte(offset + 1) & 0xff;
int ch2 = in.getByte(offset + 2) & 0xff;
int ch1 = in.getByte(offset + 3) & 0xff;
return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
}
private class ColumnReaderProcessPagesTask implements Callable<Long> {
private final ColumnReader<V> parent = ColumnReader.this;
private final long recordsToReadInThisPass;
public ColumnReaderProcessPagesTask(long recordsToReadInThisPass){
this.recordsToReadInThisPass = recordsToReadInThisPass;
}
/**
* This method calls the column reader.
*
* @return records to read
* @throws IOException
*/
@Override public Long call() throws IOException{
String oldname = Thread.currentThread().getName();
try {
Thread.currentThread().setName(oldname + "Decode-" + this.parent.columnChunkMetaData.toString());
this.parent.processPages(recordsToReadInThisPass);
return recordsToReadInThisPass;
} finally {
Thread.currentThread().setName(oldname);
}
}
}
private class ColumnReaderReadRecordsTask implements Callable<Integer> {
private final ColumnReader<V> parent = ColumnReader.this;
private final int recordsToRead;
public ColumnReaderReadRecordsTask(int recordsToRead){
this.recordsToRead = recordsToRead;
}
/**
* This method calls the column reader.
*
* @return records to read
* @throws IOException
*/
@Override public Integer call() throws IOException{
String oldname = Thread.currentThread().getName();
try {
Thread.currentThread().setName("Decode-" + this.parent.columnChunkMetaData.toString());
this.parent.readRecords(recordsToRead);
return recordsToRead;
} finally {
Thread.currentThread().setName(oldname);
}
}
}
}