blob: d1d8ad8ac1109bd2545e760afb255b4a2d3d0707 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.store.parquet.columnreaders;
import static org.apache.parquet.column.Encoding.valueOf;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import io.netty.buffer.ByteBufUtil;
import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.exceptions.ExecutionSetupException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.ops.OperatorStats;
import org.apache.drill.exec.util.concurrent.ExecutorServiceUtil;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.column.page.DictionaryPage;
import org.apache.parquet.compression.CompressionCodecFactory;
import org.apache.parquet.format.PageHeader;
import org.apache.parquet.format.PageType;
import org.apache.parquet.format.Util;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import com.google.common.base.Stopwatch;
import io.netty.buffer.DrillBuf;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The AyncPageReader reads one page of data at a time asynchronously from the provided InputStream. The
* first request to the page reader creates a Future Task (AsyncPageReaderTask) and submits it to the
* scan thread pool. The result of the Future task (a page) is put into a (blocking) queue and the scan
* thread starts processing the data as soon as the Future task is complete.
* This is a simple producer-consumer queue, the AsyncPageReaderTask is the producer and the ParquetScan is
* the consumer.
* The AsyncPageReaderTask submits another Future task for reading the next page as soon as it is done,
* while the results queue is not full. Until the queue is full, therefore, the scan thread pool keeps the
* disk as busy as possible.
* In case the disk is slower than the processing, the queue is never filled up after the processing of the
* pages begins. In this case, the next disk read begins immediately after the previous read is completed
* and the disk is never idle. The query in this case is effectively bounded by the disk.
* If, however, the processing is slower than the disk (can happen with SSDs, data being cached by the
* FileSystem, or if the processing requires complex processing that is necessarily slow) the queue fills
* up. Once the queue is full, the AsyncPageReaderTask does not submit any new Future tasks. The next Future
* task is submitted by the *processing* thread as soon as it pulls a page out of the queue. (Note that the
* invariant here is that there is space for at least one more page in the queue before the Future read task
* is submitted to the pool). This sequence is important. Not doing so can lead to deadlocks - producer
* threads may block on putting data into the queue which is full while the consumer threads might be
* blocked trying to read from a queue that has no /data.
* The first request to the page reader can be either to load a dictionary page or a data page; this leads
* to the rather odd looking code in the constructor since the parent PageReader calls
* loadDictionaryIfExists in the constructor.
* The Future tasks created are kept in a non blocking queue and the Future object is checked for any
* exceptions that might have occurred during the execution. The queue of Futures is also used to cancel
* any pending Futures at close (this may happen as a result of a cancel).
*
*/
class AsyncPageReader extends PageReader {
static final Logger logger = LoggerFactory.getLogger(AsyncPageReader.class);
private ExecutorService threadPool;
private long queueSize;
private LinkedBlockingQueue<ReadStatus> pageQueue;
private ConcurrentLinkedQueue<Future<Void>> asyncPageRead;
private long totalPageValuesRead = 0;
// Object to use to synchronize access to the page Queue.
// FindBugs complains if we synchronize on a Concurrent Queue
private final Object pageQueueSyncronize = new Object();
AsyncPageReader(ColumnReader<?> parentStatus, FileSystem fs, Path path) throws ExecutionSetupException {
super(parentStatus, fs, path);
threadPool = parentColumnReader.parentReader.getOperatorContext().getScanExecutor();
queueSize = parentColumnReader.parentReader.readQueueSize;
pageQueue = new LinkedBlockingQueue<>((int) queueSize);
asyncPageRead = new ConcurrentLinkedQueue<>();
}
@Override
protected void init() throws IOException {
super.init();
//Avoid Init if a shutdown is already in progress even if init() is called once
if (!parentColumnReader.isShuttingDown) {
asyncPageRead.offer(ExecutorServiceUtil.submit(threadPool, new AsyncPageReaderTask(debugName, pageQueue)));
}
}
/**
* Reads and stores this column chunk's dictionary page.
* @throws IOException
*/
protected void loadDictionary(ReadStatus readStatus) throws IOException {
assert readStatus.isDictionaryPage();
assert this.dictionary == null;
// dictData is not a local because we need to release it later.
this.dictData = codecName == CompressionCodecName.UNCOMPRESSED
? readStatus.getPageData()
: decompressPageV1(readStatus);
DictionaryPage page = new DictionaryPage(
asBytesInput(dictData, 0, pageHeader.uncompressed_page_size),
pageHeader.uncompressed_page_size,
pageHeader.dictionary_page_header.num_values,
valueOf(pageHeader.dictionary_page_header.encoding.name())
);
this.dictionary = page.getEncoding().initDictionary(columnDescriptor, page);
}
/**
* Reads a compressed v1 data page or a dictionary page, both of which are compressed
* in their entirety.
* @return decompressed Parquet page data
* @throws IOException
*/
protected DrillBuf decompressPageV1(ReadStatus readStatus) throws IOException {
Stopwatch timer = Stopwatch.createUnstarted();
PageHeader pageHeader = readStatus.getPageHeader();
int inputSize = pageHeader.getCompressed_page_size();
int outputSize = pageHeader.getUncompressed_page_size();
// TODO: does reporting this number have the same meaning in an async context?
long start = dataReader.getPos();
long timeToRead;
DrillBuf inputPageData = readStatus.getPageData();
DrillBuf outputPageData = this.allocator.buffer(outputSize);
try {
timer.start();
CompressionCodecName codecName = columnChunkMetaData.getCodec();
CompressionCodecFactory.BytesInputDecompressor decomp = codecFactory.getDecompressor(codecName);
ByteBuffer input = inputPageData.nioBuffer(0, inputSize);
ByteBuffer output = outputPageData.nioBuffer(0, outputSize);
decomp.decompress(input, inputSize, output, outputSize);
outputPageData.writerIndex(outputSize);
timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
if (logger.isTraceEnabled()) {
logger.trace(
"Col: {} readPos: {} Uncompressed_size: {} pageData: {}",
columnChunkMetaData.toString(),
dataReader.getPos(), // TODO: see comment on earlier call to getPos()
outputSize,
ByteBufUtil.hexDump(outputPageData)
);
}
this.updateStats(pageHeader, "Decompress", start, timeToRead, inputSize, outputSize);
} finally {
readStatus.setPageData(null);
if (inputPageData != null) {
inputPageData.release();
}
}
return outputPageData;
}
/**
* Reads a compressed v2 data page which excluded the repetition and definition level
* sections from compression.
* @return decompressed Parquet page data
* @throws IOException
*/
protected DrillBuf decompressPageV2(ReadStatus readStatus) throws IOException {
Stopwatch timer = Stopwatch.createUnstarted();
PageHeader pageHeader = readStatus.getPageHeader();
int inputSize = pageHeader.getCompressed_page_size();
int repLevelSize = pageHeader.data_page_header_v2.getRepetition_levels_byte_length();
int defLevelSize = pageHeader.data_page_header_v2.getDefinition_levels_byte_length();
int compDataOffset = repLevelSize + defLevelSize;
int outputSize = pageHeader.uncompressed_page_size;
// TODO: does reporting this number have the same meaning in an async context?
long start = dataReader.getPos();
long timeToRead;
DrillBuf inputPageData = readStatus.getPageData();
DrillBuf outputPageData = this.allocator.buffer(outputSize);
try {
timer.start();
// Write out the uncompressed section
// Note that the following setBytes call to read the repetition and definition level sections
// advances readerIndex in inputPageData but not writerIndex in outputPageData.
outputPageData.setBytes(0, inputPageData, compDataOffset);
// decompress from the start of compressed data to the end of the input buffer
CompressionCodecName codecName = columnChunkMetaData.getCodec();
CompressionCodecFactory.BytesInputDecompressor decomp = codecFactory.getDecompressor(codecName);
ByteBuffer input = inputPageData.nioBuffer(compDataOffset, inputSize - compDataOffset);
ByteBuffer output = outputPageData.nioBuffer(compDataOffset, outputSize - compDataOffset);
decomp.decompress(
input,
inputSize - compDataOffset,
output,
outputSize - compDataOffset
);
outputPageData.writerIndex(outputSize);
timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
if (logger.isTraceEnabled()) {
logger.trace(
"Col: {} readPos: {} Uncompressed_size: {} pageData: {}",
columnChunkMetaData.toString(),
dataReader.getPos(), // TODO: see comment on earlier call to getPos()
outputSize,
ByteBufUtil.hexDump(outputPageData)
);
}
this.updateStats(pageHeader, "Decompress", start, timeToRead, inputSize, outputSize);
} finally {
readStatus.setPageData(null);
if (inputPageData != null) {
inputPageData.release();
}
}
return outputPageData;
}
/**
* Blocks for a page to become available in the queue then takes it and schedules a new page
* read task if the queue was full.
* @returns ReadStatus the page taken from the queue
*/
private ReadStatus nextPageFromQueue() throws InterruptedException, ExecutionException {
ReadStatus readStatus;
Stopwatch timer = Stopwatch.createStarted();
OperatorStats opStats = parentColumnReader.parentReader.getOperatorContext().getStats();
opStats.startWait();
try {
waitForExecutionResult(); // get the result of execution
synchronized (pageQueueSyncronize) {
boolean pageQueueFull = pageQueue.remainingCapacity() == 0;
readStatus = pageQueue.take(); // get the data if no exception has been thrown
if (readStatus == ReadStatus.EMPTY) {
throw new DrillRuntimeException("Unexpected end of data");
}
//if the queue was full before we took a page out, then there would
// have been no new read tasks scheduled. In that case, schedule a new read.
if (!parentColumnReader.isShuttingDown && pageQueueFull) {
asyncPageRead.offer(ExecutorServiceUtil.submit(threadPool, new AsyncPageReaderTask(debugName, pageQueue)));
}
}
} finally {
opStats.stopWait();
}
long timeBlocked = timer.elapsed(TimeUnit.NANOSECONDS);
stats.timeDiskScanWait.addAndGet(timeBlocked);
stats.timeDiskScan.addAndGet(readStatus.getDiskScanTime());
if (readStatus.isDictionaryPage) {
stats.numDictPageLoads.incrementAndGet();
stats.timeDictPageLoads.addAndGet(timeBlocked + readStatus.getDiskScanTime());
} else {
stats.numDataPageLoads.incrementAndGet();
stats.timeDataPageLoads.addAndGet(timeBlocked + readStatus.getDiskScanTime());
}
return readStatus;
}
/**
* Inspects the type of the next page and dispatches it for dictionary loading
* or data decompression accordingly.
*/
@Override
protected void nextInternal() throws IOException {
try {
ReadStatus readStatus = nextPageFromQueue();
pageHeader = readStatus.getPageHeader();
if (pageHeader.uncompressed_page_size == 0) {
logger.info(
"skipping a {} of size {} because its uncompressed size is 0 bytes.",
pageHeader.getType(),
pageHeader.compressed_page_size
);
skip(pageHeader.compressed_page_size);
Optional.ofNullable(readStatus.getPageData()).map(DrillBuf::release);
return;
}
switch (pageHeader.getType()) {
case DICTIONARY_PAGE:
loadDictionary(readStatus);
break;
case DATA_PAGE:
pageData = codecName == CompressionCodecName.UNCOMPRESSED
? readStatus.getPageData()
: decompressPageV1(readStatus);
break;
case DATA_PAGE_V2:
pageData = codecName == CompressionCodecName.UNCOMPRESSED
? readStatus.getPageData()
: decompressPageV2(readStatus);
break;
default:
logger.warn("skipping page of type {} of size {}", pageHeader.getType(), pageHeader.compressed_page_size);
skip(pageHeader.compressed_page_size);
Optional.ofNullable(readStatus.getPageData()).map(DrillBuf::release);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} catch (RuntimeException e) { // Catch this explicitly to satisfy findbugs
throwUserException(e, "Error reading page data");
} catch (Exception e) {
throwUserException(e, "Error reading page data");
}
}
/**
* Blocking fetch from the page queue.
*/
private void waitForExecutionResult() throws InterruptedException, ExecutionException {
// Get the execution result but don't remove the Future object from the "asyncPageRead" queue yet;
// this will ensure that cleanup will happen properly in case of an exception being thrown
asyncPageRead.peek().get(); // get the result of execution
// Alright now remove the Future object
asyncPageRead.poll();
}
@Override
public void clear() {
//Cancelling all existing AsyncPageReaderTasks
while (asyncPageRead != null && !asyncPageRead.isEmpty()) {
try {
Future<Void> f = asyncPageRead.poll();
if(!f.isDone() && !f.isCancelled()){
f.cancel(true);
} else {
f.get(1, TimeUnit.MILLISECONDS);
}
} catch (RuntimeException e) {
// Do Nothing
} catch (Exception e) {
// Do nothing.
}
}
//Empty the page queue
ReadStatus r;
while (!pageQueue.isEmpty()) {
r = null;
try {
r = pageQueue.poll();
if (r == ReadStatus.EMPTY) {
break;
}
} catch (Exception e) {
//Reporting because we shouldn't get this
logger.error(e.getMessage());
} finally {
if (r != null && r.pageData != null) {
r.pageData.release();
}
}
}
super.clear();
}
/**
* Wraps up a buffer of page data along with the page header and some metadata
*/
public static class ReadStatus {
private PageHeader pageHeader;
private DrillBuf pageData;
private boolean isDictionaryPage = false;
private long bytesRead = 0;
private long valuesRead = 0;
private long diskScanTime = 0;
public static final ReadStatus EMPTY = new ReadStatus();
public synchronized PageHeader getPageHeader() {
return pageHeader;
}
public synchronized void setPageHeader(PageHeader pageHeader) {
this.pageHeader = pageHeader;
}
public synchronized DrillBuf getPageData() {
return pageData;
}
public synchronized void setPageData(DrillBuf pageData) {
this.pageData = pageData;
}
public synchronized boolean isDictionaryPage() {
return isDictionaryPage;
}
public synchronized void setIsDictionaryPage(boolean isDictionaryPage) {
this.isDictionaryPage = isDictionaryPage;
}
public synchronized long getBytesRead() {
return bytesRead;
}
public synchronized void setBytesRead(long bytesRead) {
this.bytesRead = bytesRead;
}
public synchronized long getValuesRead() {
return valuesRead;
}
public synchronized void setValuesRead(long valuesRead) {
this.valuesRead = valuesRead;
}
public synchronized long getDiskScanTime() {
return diskScanTime;
}
public synchronized void setDiskScanTime(long diskScanTime) {
this.diskScanTime = diskScanTime;
}
}
private class AsyncPageReaderTask implements Callable<Void> {
private final AsyncPageReader parent = AsyncPageReader.this;
private final LinkedBlockingQueue<ReadStatus> queue;
private final String name;
public AsyncPageReaderTask(String name, LinkedBlockingQueue<ReadStatus> queue) {
this.name = name;
this.queue = queue;
}
@Override
public Void call() throws IOException {
ReadStatus readStatus = new ReadStatus();
long bytesRead = 0;
long valuesRead = 0;
final long totalValuesRead = parent.totalPageValuesRead;
Stopwatch timer = Stopwatch.createStarted();
final long totalValuesCount = parent.columnChunkMetaData.getValueCount();
// if we are done, just put a marker object in the queue and we are done.
logger.trace("[{}]: Total Values COUNT {} Total Values READ {} ", name, totalValuesCount, totalValuesRead);
if (totalValuesRead >= totalValuesCount) {
try {
queue.put(ReadStatus.EMPTY);
// Some InputStreams (like S3ObjectInputStream) should be closed
// as soon as possible to make the connection reusable.
try {
parent.inputStream.close();
} catch (IOException e) {
logger.trace("[{}]: Failure while closing InputStream", name, e);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
// Do nothing.
}
return null;
}
DrillBuf pageData = null;
timer.reset();
try {
PageHeader pageHeader = Util.readPageHeader(parent.dataReader);
int compressedSize = pageHeader.getCompressed_page_size();
if ( parent.parentColumnReader.isShuttingDown ) { return null; } //Opportunity to skip expensive Parquet processing
pageData = parent.dataReader.getNext(compressedSize);
bytesRead = compressedSize;
synchronized (parent) {
PageType type = pageHeader.getType() == null ? PageType.DATA_PAGE : pageHeader.getType();
switch (type) {
case DICTIONARY_PAGE:
readStatus.setIsDictionaryPage(true);
valuesRead += pageHeader.getDictionary_page_header().getNum_values();
break;
case DATA_PAGE_V2:
valuesRead += pageHeader.getData_page_header_v2().getNum_values();
parent.totalPageValuesRead += valuesRead;
break;
case DATA_PAGE:
valuesRead += pageHeader.getData_page_header().getNum_values();
parent.totalPageValuesRead += valuesRead;
break;
default:
throw UserException.unsupportedError()
.message("Page type is not supported yet: " + type)
.build(logger);
}
long timeToRead = timer.elapsed(TimeUnit.NANOSECONDS);
readStatus.setPageHeader(pageHeader);
readStatus.setPageData(pageData);
readStatus.setBytesRead(bytesRead);
readStatus.setValuesRead(valuesRead);
readStatus.setDiskScanTime(timeToRead);
assert (totalValuesRead <= totalValuesCount);
}
// You do need the synchronized block
// because you want the check to see if there is remaining capacity in the queue, to be
// synchronized
synchronized (parent.pageQueueSyncronize) {
queue.put(readStatus);
// if the queue is not full, schedule another read task immediately. If it is then the consumer
// will schedule a new read task as soon as it removes a page from the queue.
if (!parentColumnReader.isShuttingDown && queue.remainingCapacity() > 0) {
asyncPageRead.offer(ExecutorServiceUtil.submit(parent.threadPool, new AsyncPageReaderTask(debugName, queue)));
}
}
// Do nothing.
} catch (InterruptedException e) {
if (pageData != null) {
pageData.release();
}
Thread.currentThread().interrupt();
} catch (Exception e) {
if (pageData != null) {
pageData.release();
}
parent.throwUserException(e, "Exception occurred while reading from disk.");
} finally {
//Nothing to do if isShuttingDown.
}
return null;
}
}
}