blob: eeca2cabfe6c69ac155c8b1a7703f2d59eb00996 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.parquet.columnreaders;
import org.apache.drill.exec.store.CommonParquetRecordReader;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.exception.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
import org.apache.drill.exec.store.parquet.columnreaders.batchsizing.RecordBatchSizerManager;
import org.apache.drill.exec.util.record.RecordBatchStats.RecordBatchStatsContext;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.compression.CompressionCodecFactory;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ParquetRecordReader extends CommonParquetRecordReader {
private static final Logger logger = LoggerFactory.getLogger(ParquetRecordReader.class);
// When no column is required by the downstream operator, ask SCAN to return a DEFAULT column. If such column does not exist,
// it will return as a nullable-int column. If that column happens to exist, return that column.
private static final List<SchemaPath> DEFAULT_COLS_TO_READ = ImmutableList.of(SchemaPath.getSimplePath("_DEFAULT_COL_TO_READ_"));
private final FileSystem fileSystem;
private final long numRecordsToRead; // number of records to read
private final Path hadoopPath;
private final CompressionCodecFactory codecFactory;
private final int rowGroupIndex;
private final ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus;
/** Container object for holding Parquet columnar readers state */
private ReadState readState;
/** Responsible for managing record batch size constraints */
private RecordBatchSizerManager batchSizerMgr;
private BatchReader batchReader;
final boolean useAsyncColReader;
final boolean useAsyncPageReader;
final boolean useBufferedReader;
final int bufferedReadSize;
final boolean useFadvise;
final boolean enforceTotalSize;
final long readQueueSize;
private final boolean useBulkReader;
public ParquetRecordReader(FragmentContext fragmentContext,
Path path,
int rowGroupIndex,
long numRecordsToRead,
FileSystem fs,
CompressionCodecFactory codecFactory,
ParquetMetadata footer,
List<SchemaPath> columns,
ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) {
this(fragmentContext, numRecordsToRead, path, rowGroupIndex, fs, codecFactory, footer, columns, dateCorruptionStatus);
}
public ParquetRecordReader(FragmentContext fragmentContext,
Path path,
int rowGroupIndex,
FileSystem fs,
CompressionCodecFactory codecFactory,
ParquetMetadata footer,
List<SchemaPath> columns,
ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) {
this(fragmentContext, footer.getBlocks().get(rowGroupIndex).getRowCount(), path, rowGroupIndex, fs, codecFactory,
footer, columns, dateCorruptionStatus);
}
public ParquetRecordReader(
FragmentContext fragmentContext,
long numRecordsToRead,
Path path,
int rowGroupIndex,
FileSystem fs,
CompressionCodecFactory codecFactory,
ParquetMetadata footer,
List<SchemaPath> columns,
ParquetReaderUtility.DateCorruptionStatus dateCorruptionStatus) {
super(footer, fragmentContext);
this.hadoopPath = path;
this.fileSystem = fs;
this.codecFactory = codecFactory;
this.rowGroupIndex = rowGroupIndex;
this.dateCorruptionStatus = dateCorruptionStatus;
this.numRecordsToRead = initNumRecordsToRead(numRecordsToRead, rowGroupIndex, footer);
this.useAsyncColReader = fragmentContext.getOptions().getOption(ExecConstants.PARQUET_COLUMNREADER_ASYNC).bool_val;
this.useAsyncPageReader = fragmentContext.getOptions().getOption(ExecConstants.PARQUET_PAGEREADER_ASYNC).bool_val;
this.useBufferedReader = fragmentContext.getOptions().getOption(ExecConstants.PARQUET_PAGEREADER_USE_BUFFERED_READ).bool_val;
this.bufferedReadSize = fragmentContext.getOptions().getOption(ExecConstants.PARQUET_PAGEREADER_BUFFER_SIZE).num_val.intValue();
this.useFadvise = fragmentContext.getOptions().getOption(ExecConstants.PARQUET_PAGEREADER_USE_FADVISE).bool_val;
this.readQueueSize = fragmentContext.getOptions().getOption(ExecConstants.PARQUET_PAGEREADER_QUEUE_SIZE).num_val;
this.enforceTotalSize = fragmentContext.getOptions().getOption(ExecConstants.PARQUET_PAGEREADER_ENFORCETOTALSIZE).bool_val;
this.useBulkReader = fragmentContext.getOptions().getOption(ExecConstants.PARQUET_FLAT_READER_BULK).bool_val;
setColumns(columns);
}
/**
* Flag indicating if the old non-standard data format appears
* in this file, see DRILL-4203.
*
* @return true if the dates are corrupted and need to be corrected
*/
public ParquetReaderUtility.DateCorruptionStatus getDateCorruptionStatus() {
return dateCorruptionStatus;
}
public CompressionCodecFactory getCodecFactory() {
return codecFactory;
}
public Path getHadoopPath() {
return hadoopPath;
}
public FileSystem getFileSystem() {
return fileSystem;
}
public int getRowGroupIndex() {
return rowGroupIndex;
}
public RecordBatchSizerManager getBatchSizesMgr() {
return batchSizerMgr;
}
public OperatorContext getOperatorContext() {
return operatorContext;
}
public FragmentContext getFragmentContext() {
return fragmentContext;
}
/**
* @return true if Parquet reader Bulk processing is enabled; false otherwise
*/
public boolean useBulkReader() {
return useBulkReader;
}
public ReadState getReadState() {
return readState;
}
/**
* Prepare the Parquet reader. First determine the set of columns to read (the schema
* for this read.) Then, create a state object to track the read across calls to
* the reader <tt>next()</tt> method. Finally, create one of three readers to
* read batches depending on whether this scan is for only fixed-width fields,
* contains at least one variable-width field, or is a "mock" scan consisting
* only of null fields (fields in the SELECT clause but not in the Parquet file.)
*/
@Override
public void setup(OperatorContext operatorContext, OutputMutator output) throws ExecutionSetupException {
this.operatorContext = operatorContext;
ParquetSchema schema = new ParquetSchema(fragmentContext.getOptions(), rowGroupIndex, footer, isStarQuery() ? null : getColumns());
batchSizerMgr = new RecordBatchSizerManager(fragmentContext.getOptions(), schema, numRecordsToRead, new RecordBatchStatsContext(fragmentContext, operatorContext));
logger.debug("Reading {} records from row group({}) in file {}.", numRecordsToRead, rowGroupIndex,
hadoopPath.toUri().getPath());
try {
schema.buildSchema();
batchSizerMgr.setup();
readState = new ReadState(schema, batchSizerMgr, parquetReaderStats, numRecordsToRead, useAsyncColReader);
readState.buildReader(this, output);
} catch (Exception e) {
throw handleAndRaise("Failure in setting up reader", e);
}
ColumnReader<?> firstColumnStatus = readState.getFirstColumnReader();
if (firstColumnStatus == null) {
batchReader = new BatchReader.MockBatchReader(readState);
} else if (schema.allFieldsFixedLength()) {
batchReader = new BatchReader.FixedWidthReader(readState);
} else {
batchReader = new BatchReader.VariableWidthReader(readState);
}
}
@Override
public void allocate(Map<String, ValueVector> vectorMap) throws OutOfMemoryException {
batchSizerMgr.allocate(vectorMap);
}
/**
* Read the next record batch from the file using the reader and read state
* created previously.
*/
@Override
public int next() {
readState.resetBatch();
Stopwatch timer = Stopwatch.createStarted();
try {
return batchReader.readBatch();
} catch (Exception e) {
throw handleAndRaise("\nHadoop path: " + hadoopPath.toUri().getPath() +
"\nTotal records read: " + readState.recordsRead() +
"\nRow group index: " + rowGroupIndex +
"\nRecords to read: " + numRecordsToRead, e);
} finally {
if (parquetReaderStats != null) {
parquetReaderStats.timeProcess.addAndGet(timer.elapsed(TimeUnit.NANOSECONDS));
} else {
logger.warn(
"Cannot log batch read timing because no Parquet reader stats tracker " +
"is available (probably due to an earlier error during query execution)."
);
}
}
}
@Override
public void close() {
long recordsRead = (readState == null) ? 0 : readState.recordsRead();
logger.debug("Read {} records out of row group({}) in file '{}'",
recordsRead, rowGroupIndex,
hadoopPath.toUri().getPath());
// enable this for debugging when it is know that a whole file will be read
// limit kills upstream operators once it has enough records, so this assert will fail
// assert totalRecordsRead == footer.getBlocks().get(rowGroupIndex).getRowCount();
// NOTE - We check whether the parquet reader data structure is not null before calling close();
// this is because close() can be invoked before setup() has executed (probably because of
// spurious failures).
if (readState != null) {
readState.close();
readState = null;
}
if (batchSizerMgr != null) {
batchSizerMgr.close();
batchSizerMgr = null;
}
codecFactory.release();
closeStats(logger, hadoopPath);
}
@Override
protected List<SchemaPath> getDefaultColumnsToRead() {
return DEFAULT_COLS_TO_READ;
}
@Override
public String toString() {
return "ParquetRecordReader[File=" + hadoopPath.toUri()
+ ", Row group index=" + rowGroupIndex
+ ", Records to read=" + numRecordsToRead
+ ", Total records read=" + (readState != null ? readState.recordsRead() : -1)
+ ", Metadata" + footer
+ "]";
}
}