blob: 4af9dc2ad94b90d20e28ca5d7020c946a8be29e7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.llap.io.encoded;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.hive.llap.counters.LlapIOCounters;
import org.apache.orc.OrcUtils;
import org.apache.orc.TypeDescription;
import org.apache.orc.impl.DataReaderProperties;
import org.apache.orc.impl.OrcIndex;
import org.apache.orc.impl.SchemaEvolution;
import org.apache.tez.common.counters.TezCounters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.Pool;
import org.apache.hadoop.hive.common.Pool.PoolObjectHelper;
import org.apache.hadoop.hive.common.io.DataCache;
import org.apache.hadoop.hive.common.io.Allocator;
import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
import org.apache.hadoop.hive.common.io.DiskRange;
import org.apache.hadoop.hive.common.io.DiskRangeList;
import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
import org.apache.hadoop.hive.llap.ConsumerFeedback;
import org.apache.hadoop.hive.llap.DebugUtils;
import org.apache.hadoop.hive.llap.cache.BufferUsageManager;
import org.apache.hadoop.hive.llap.cache.LowLevelCache;
import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
import org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer;
import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata;
import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache;
import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata;
import org.apache.hadoop.hive.ql.exec.DDLTask;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.HdfsUtils;
import org.apache.orc.CompressionKind;
import org.apache.orc.DataReader;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.OrcFile.ReaderOptions;
import org.apache.orc.OrcConf;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcSplit;
import org.apache.hadoop.hive.ql.io.orc.encoded.Reader;
import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl;
import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedOrcFile;
import org.apache.hadoop.hive.ql.io.orc.encoded.EncodedReader;
import org.apache.hadoop.hive.ql.io.orc.encoded.OrcBatchKey;
import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch;
import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.PoolFactory;
import org.apache.orc.impl.RecordReaderUtils;
import org.apache.orc.StripeInformation;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hive.common.util.FixedSizedObjectPool;
import org.apache.orc.OrcProto;
import org.apache.tez.common.CallableWithNdc;
/**
* This produces EncodedColumnBatch via ORC EncodedDataImpl.
* It serves as Consumer for EncodedColumnBatch too, for the high-level cache scenario where
* it inserts itself into the pipeline to put the data in cache, before passing it to the real
* consumer. It also serves as ConsumerFeedback that receives processed EncodedColumnBatch-es.
*/
public class OrcEncodedDataReader extends CallableWithNdc<Void>
implements ConsumerFeedback<OrcEncodedColumnBatch> {
private static final Logger LOG = LoggerFactory.getLogger(OrcEncodedDataReader.class);
public static final FixedSizedObjectPool<ColumnStreamData> CSD_POOL =
new FixedSizedObjectPool<>(8192, new PoolObjectHelper<ColumnStreamData>() {
@Override
public ColumnStreamData create() {
return new ColumnStreamData();
}
@Override
public void resetBeforeOffer(ColumnStreamData t) {
t.reset();
}
});
public static final FixedSizedObjectPool<OrcEncodedColumnBatch> ECB_POOL =
new FixedSizedObjectPool<>(1024, new PoolObjectHelper<OrcEncodedColumnBatch>() {
@Override
public OrcEncodedColumnBatch create() {
return new OrcEncodedColumnBatch();
}
@Override
public void resetBeforeOffer(OrcEncodedColumnBatch t) {
t.reset();
}
});
private static final PoolFactory POOL_FACTORY = new PoolFactory() {
@Override
public <T> Pool<T> createPool(int size, PoolObjectHelper<T> helper) {
return new FixedSizedObjectPool<>(size, helper);
}
@Override
public Pool<ColumnStreamData> createColumnStreamDataPool() {
return CSD_POOL;
}
@Override
public Pool<OrcEncodedColumnBatch> createEncodedColumnBatchPool() {
return ECB_POOL;
}
};
private final OrcMetadataCache metadataCache;
private final LowLevelCache lowLevelCache;
private final BufferUsageManager bufferManager;
private final Configuration conf;
private final FileSplit split;
private List<Integer> columnIds;
private final SearchArgument sarg;
private final String[] columnNames;
private final OrcEncodedDataConsumer consumer;
private final QueryFragmentCounters counters;
private final UserGroupInformation ugi;
// Read state.
private int stripeIxFrom;
private OrcFileMetadata fileMetadata;
private Path path;
private Reader orcReader;
private DataReader metadataReader;
private EncodedReader stripeReader;
private Object fileKey;
private FileSystem fs;
/**
* readState[stripeIx'][colIx'] => boolean array (could be a bitmask) of rg-s that need to be
* read. Contains only stripes that are read, and only columns included. null => read all RGs.
*/
private boolean[][][] readState;
private volatile boolean isStopped = false;
@SuppressWarnings("unused")
private volatile boolean isPaused = false;
boolean[] globalIncludes = null;
public OrcEncodedDataReader(LowLevelCache lowLevelCache, BufferUsageManager bufferManager,
OrcMetadataCache metadataCache, Configuration conf, FileSplit split, List<Integer> columnIds,
SearchArgument sarg, String[] columnNames, OrcEncodedDataConsumer consumer,
QueryFragmentCounters counters) throws IOException {
this.lowLevelCache = lowLevelCache;
this.metadataCache = metadataCache;
this.bufferManager = bufferManager;
this.conf = conf;
this.split = split;
this.columnIds = columnIds;
if (this.columnIds != null) {
Collections.sort(this.columnIds);
}
this.sarg = sarg;
this.columnNames = columnNames;
this.consumer = consumer;
this.counters = counters;
try {
this.ugi = UserGroupInformation.getCurrentUser();
} catch (IOException e) {
throw new RuntimeException(e);
}
// moved this part of code from performDataRead as LlapInputFormat need to know the file schema
// to decide if schema evolution is supported or not
orcReader = null;
// 1. Get file metadata from cache, or create the reader and read it.
// Don't cache the filesystem object for now; Tez closes it and FS cache will fix all that
fs = split.getPath().getFileSystem(conf);
fileKey = determineFileId(fs, split,
HiveConf.getBoolVar(conf, ConfVars.LLAP_CACHE_ALLOW_SYNTHETIC_FILEID));
fileMetadata = getOrReadFileMetadata();
globalIncludes = OrcInputFormat.genIncludedColumns(fileMetadata.getTypes(), columnIds, true);
consumer.setFileMetadata(fileMetadata);
consumer.setIncludedColumns(globalIncludes);
}
@Override
public void stop() {
LOG.debug("Encoded reader is being stopped");
isStopped = true;
}
@Override
public void pause() {
isPaused = true;
// TODO: pause fetching
}
@Override
public void unpause() {
isPaused = false;
// TODO: unpause fetching
}
@Override
protected Void callInternal() throws IOException, InterruptedException {
return ugi.doAs(new PrivilegedExceptionAction<Void>() {
@Override
public Void run() throws Exception {
return performDataRead();
}
});
}
protected Void performDataRead() throws IOException {
long startTime = counters.startTimeCounter();
LlapIoImpl.LOG.info("Processing data for {}", split.getPath());
if (processStop()) {
recordReaderTime(startTime);
return null;
}
counters.setDesc(QueryFragmentCounters.Desc.TABLE, getDbAndTableName(split.getPath()));
counters.setDesc(QueryFragmentCounters.Desc.FILE, split.getPath()
+ (fileKey == null ? "" : " (" + fileKey + ")"));
try {
validateFileMetadata();
if (columnIds == null) {
columnIds = createColumnIds(fileMetadata);
}
// 2. Determine which stripes to read based on the split.
determineStripesToRead();
} catch (Throwable t) {
recordReaderTime(startTime);
consumer.setError(t);
return null;
}
if (readState.length == 0) {
consumer.setDone();
recordReaderTime(startTime);
return null; // No data to read.
}
counters.setDesc(QueryFragmentCounters.Desc.STRIPES, stripeIxFrom + "," + readState.length);
// 3. Apply SARG if needed, and otherwise determine what RGs to read.
int stride = fileMetadata.getRowIndexStride();
ArrayList<OrcStripeMetadata> stripeMetadatas = null;
boolean[] sargColumns = null;
try {
if (sarg != null && stride != 0) {
// TODO: move this to a common method
int[] filterColumns = RecordReaderImpl.mapSargColumnsToOrcInternalColIdx(
sarg.getLeaves(), columnNames, 0);
// included will not be null, row options will fill the array with trues if null
sargColumns = new boolean[globalIncludes.length];
for (int i : filterColumns) {
// filter columns may have -1 as index which could be partition column in SARG.
if (i > 0) {
sargColumns[i] = true;
}
}
// If SARG is present, get relevant stripe metadata from cache or readers.
stripeMetadatas = readStripesMetadata(globalIncludes, sargColumns);
}
// Now, apply SARG if any; w/o sarg, this will just initialize readState.
boolean hasData = determineRgsToRead(globalIncludes, stride, stripeMetadatas);
if (!hasData) {
consumer.setDone();
recordReaderTime(startTime);
return null; // No data to read.
}
} catch (Throwable t) {
cleanupReaders();
consumer.setError(t);
recordReaderTime(startTime);
return null;
}
if (processStop()) {
cleanupReaders();
recordReaderTime(startTime);
return null;
}
// 4. Get data from high-level cache.
// If some cols are fully in cache, this will also give us the modified list of columns to
// read for every stripe (null means read all of them - the usual path). In any case,
// readState will be modified for column x rgs that were fetched from high-level cache.
List<Integer>[] stripeColsToRead = null;
// 5. Create encoded data reader.
try {
ensureOrcReader();
// Reader creating updates HDFS counters, don't do it here.
DataWrapperForOrc dw = new DataWrapperForOrc();
stripeReader = orcReader.encodedReader(fileKey, dw, dw, POOL_FACTORY);
stripeReader.setTracing(LlapIoImpl.ORC_LOGGER.isTraceEnabled());
} catch (Throwable t) {
consumer.setError(t);
recordReaderTime(startTime);
cleanupReaders();
return null;
}
// 6. Read data.
// TODO: I/O threadpool could be here - one thread per stripe; for now, linear.
boolean hasFileId = this.fileKey != null;
OrcBatchKey stripeKey = hasFileId ? new OrcBatchKey(fileKey, -1, 0) : null;
for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) {
if (processStop()) {
cleanupReaders();
recordReaderTime(startTime);
return null;
}
int stripeIx = stripeIxFrom + stripeIxMod;
boolean[][] colRgs = null;
boolean[] stripeIncludes = null;
OrcStripeMetadata stripeMetadata = null;
StripeInformation stripe;
try {
List<Integer> cols = stripeColsToRead == null ? null : stripeColsToRead[stripeIxMod];
if (cols != null && cols.isEmpty()) continue; // No need to read this stripe.
stripe = fileMetadata.getStripes().get(stripeIx);
LlapIoImpl.ORC_LOGGER.trace("Reading stripe {}: {}, {}", stripeIx, stripe.getOffset(),
stripe.getLength());
colRgs = readState[stripeIxMod];
// We assume that NO_RGS value is only set from SARG filter and for all columns;
// intermediate changes for individual columns will unset values in the array.
// Skip this case for 0-column read. We could probably special-case it just like we do
// in EncodedReaderImpl, but for now it's not that important.
if (colRgs.length > 0 && colRgs[0] ==
RecordReaderImpl.SargApplier.READ_NO_RGS) continue;
// 6.1. Determine the columns to read (usually the same as requested).
if (cols == null || cols.size() == colRgs.length) {
cols = columnIds;
stripeIncludes = globalIncludes;
} else {
// We are reading subset of the original columns, remove unnecessary bitmasks/etc.
// This will never happen w/o high-level cache.
stripeIncludes = OrcInputFormat.genIncludedColumns(fileMetadata.getTypes(), cols, true);
colRgs = genStripeColRgs(cols, colRgs);
}
// 6.2. Ensure we have stripe metadata. We might have read it before for RG filtering.
boolean isFoundInCache = false;
if (stripeMetadatas != null) {
stripeMetadata = stripeMetadatas.get(stripeIxMod);
} else {
if (hasFileId && metadataCache != null) {
stripeKey.stripeIx = stripeIx;
stripeMetadata = metadataCache.getStripeMetadata(stripeKey);
}
isFoundInCache = (stripeMetadata != null);
if (!isFoundInCache) {
counters.incrCounter(LlapIOCounters.METADATA_CACHE_MISS);
ensureMetadataReader();
long startTimeHdfs = counters.startTimeCounter();
stripeMetadata = new OrcStripeMetadata(new OrcBatchKey(fileKey, stripeIx, 0),
metadataReader, stripe, stripeIncludes, sargColumns);
counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTimeHdfs);
if (hasFileId && metadataCache != null) {
stripeMetadata = metadataCache.putStripeMetadata(stripeMetadata);
if (LlapIoImpl.ORC_LOGGER.isTraceEnabled()) {
LlapIoImpl.ORC_LOGGER.trace("Caching stripe {} metadata with includes: {}",
stripeKey.stripeIx, DebugUtils.toString(stripeIncludes));
}
}
}
consumer.setStripeMetadata(stripeMetadata);
}
if (!stripeMetadata.hasAllIndexes(stripeIncludes)) {
if (LlapIoImpl.ORC_LOGGER.isTraceEnabled()) {
LlapIoImpl.ORC_LOGGER.trace("Updating indexes in stripe {} metadata for includes: {}",
stripeKey.stripeIx, DebugUtils.toString(stripeIncludes));
}
assert isFoundInCache;
counters.incrCounter(LlapIOCounters.METADATA_CACHE_MISS);
ensureMetadataReader();
updateLoadedIndexes(stripeMetadata, stripe, stripeIncludes, sargColumns);
} else if (isFoundInCache) {
counters.incrCounter(LlapIOCounters.METADATA_CACHE_HIT);
}
} catch (Throwable t) {
consumer.setError(t);
cleanupReaders();
recordReaderTime(startTime);
return null;
}
if (processStop()) {
cleanupReaders();
recordReaderTime(startTime);
return null;
}
// 6.3. Finally, hand off to the stripe reader to produce the data.
// This is a sync call that will feed data to the consumer.
try {
// TODO: readEncodedColumns is not supposed to throw; errors should be propagated thru
// consumer. It is potentially holding locked buffers, and must perform its own cleanup.
// Also, currently readEncodedColumns is not stoppable. The consumer will discard the
// data it receives for one stripe. We could probably interrupt it, if it checked that.
stripeReader.readEncodedColumns(stripeIx, stripe, stripeMetadata.getRowIndexes(),
stripeMetadata.getEncodings(), stripeMetadata.getStreams(), stripeIncludes,
colRgs, consumer);
} catch (Throwable t) {
consumer.setError(t);
cleanupReaders();
recordReaderTime(startTime);
return null;
}
}
// Done with all the things.
recordReaderTime(startTime);
consumer.setDone();
LlapIoImpl.LOG.trace("done processing {}", split);
// Close the stripe reader, we are done reading.
cleanupReaders();
return null;
}
private void recordReaderTime(long startTime) {
counters.incrTimeCounter(LlapIOCounters.TOTAL_IO_TIME_NS, startTime);
}
private static String getDbAndTableName(Path path) {
// Ideally, we'd get this from split; however, split doesn't contain any such thing and it's
// actually pretty hard to get cause even split generator only uses paths. We only need this
// for metrics; therefore, brace for BLACK MAGIC!
String[] parts = path.toUri().getPath().toString().split(Path.SEPARATOR);
int dbIx = -1;
// Try to find the default db postfix; don't check two last components - at least there
// should be a table and file (we could also try to throw away partition/bucket/acid stuff).
for (int i = 0; i < parts.length - 2; ++i) {
if (!parts[i].endsWith(DDLTask.DATABASE_PATH_SUFFIX)) continue;
if (dbIx >= 0) {
dbIx = -1; // Let's not guess.
break;
}
dbIx = i;
}
if (dbIx >= 0) {
return parts[dbIx].substring(0, parts[dbIx].length() - 3) + "." + parts[dbIx + 1];
}
// Just go from the back and throw away everything we think is wrong; skip last item, the file.
boolean isInPartFields = false;
for (int i = parts.length - 2; i >= 0; --i) {
String p = parts[i];
boolean isPartField = p.contains("=");
if ((isInPartFields && !isPartField) || (!isPartField && !p.startsWith(AcidUtils.BASE_PREFIX)
&& !p.startsWith(AcidUtils.DELTA_PREFIX) && !p.startsWith(AcidUtils.BUCKET_PREFIX))) {
dbIx = i - 1;
break;
}
isInPartFields = isPartField;
}
// If we found something before we ran out of components, use it.
if (dbIx >= 0) {
String dbName = parts[dbIx];
if (dbName.endsWith(DDLTask.DATABASE_PATH_SUFFIX)) {
dbName = dbName.substring(0, dbName.length() - 3);
}
return dbName + "." + parts[dbIx + 1];
}
return "unknown";
}
private void validateFileMetadata() throws IOException {
if (fileMetadata.getCompressionKind() == CompressionKind.NONE) return;
int bufferSize = fileMetadata.getCompressionBufferSize();
long minAllocSize = HiveConf.getSizeVar(conf, ConfVars.LLAP_ALLOCATOR_MIN_ALLOC);
if (bufferSize < minAllocSize) {
LOG.warn("ORC compression buffer size (" + bufferSize + ") is smaller than LLAP low-level "
+ "cache minimum allocation size (" + minAllocSize + "). Decrease the value for "
+ HiveConf.ConfVars.LLAP_ALLOCATOR_MIN_ALLOC.toString() + " to avoid wasting memory");
}
}
private boolean processStop() {
if (!isStopped) return false;
LOG.info("Encoded data reader is stopping");
cleanupReaders();
return true;
}
private static Object determineFileId(FileSystem fs, FileSplit split,
boolean allowSynthetic) throws IOException {
if (split instanceof OrcSplit) {
Object fileKey = ((OrcSplit)split).getFileKey();
if (fileKey != null) {
return fileKey;
}
}
LOG.warn("Split for " + split.getPath() + " (" + split.getClass() + ") does not have file ID");
return HdfsUtils.getFileId(fs, split.getPath(), allowSynthetic);
}
private boolean[][] genStripeColRgs(List<Integer> stripeCols, boolean[][] globalColRgs) {
boolean[][] stripeColRgs = new boolean[stripeCols.size()][];
for (int i = 0, i2 = -1; i < globalColRgs.length; ++i) {
if (globalColRgs[i] == null) continue;
stripeColRgs[i2] = globalColRgs[i];
++i2;
}
return stripeColRgs;
}
/**
* Puts all column indexes from metadata to make a column list to read all column.
*/
private static List<Integer> createColumnIds(OrcFileMetadata metadata) {
List<Integer> columnIds = new ArrayList<Integer>(metadata.getTypes().size());
for (int i = 1; i < metadata.getTypes().size(); ++i) {
columnIds.add(i);
}
return columnIds;
}
/**
* In case if stripe metadata in cache does not have all indexes for current query, load
* the missing one. This is a temporary cludge until real metadata cache becomes available.
*/
private void updateLoadedIndexes(OrcStripeMetadata stripeMetadata,
StripeInformation stripe, boolean[] stripeIncludes, boolean[] sargColumns) throws IOException {
// We only synchronize on write for now - design of metadata cache is very temporary;
// we pre-allocate the array and never remove entries; so readers should be safe.
synchronized (stripeMetadata) {
if (stripeMetadata.hasAllIndexes(stripeIncludes)) return;
long startTime = counters.startTimeCounter();
stripeMetadata.loadMissingIndexes(metadataReader, stripe, stripeIncludes, sargColumns);
counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime);
}
}
/**
* Closes the stripe readers (on error).
*/
private void cleanupReaders() {
if (stripeReader != null) {
try {
stripeReader.close();
} catch (IOException ex) {
// Ignore.
}
}
if (metadataReader != null) {
try {
metadataReader.close();
} catch (IOException ex) {
// Ignore.
}
}
}
/**
* Ensures orcReader is initialized for the split.
*/
private void ensureOrcReader() throws IOException {
if (orcReader != null) return;
path = split.getPath();
if (fileKey instanceof Long && HiveConf.getBoolVar(conf, ConfVars.LLAP_IO_USE_FILEID_PATH)) {
path = HdfsUtils.getFileIdPath(fs, path, (long)fileKey);
}
LlapIoImpl.ORC_LOGGER.trace("Creating reader for {} ({})", path, split.getPath());
long startTime = counters.startTimeCounter();
ReaderOptions opts = OrcFile.readerOptions(conf).filesystem(fs).fileMetadata(fileMetadata);
orcReader = EncodedOrcFile.createReader(path, opts);
counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime);
}
/**
* Gets file metadata for the split from cache, or reads it from the file.
*/
private OrcFileMetadata getOrReadFileMetadata() throws IOException {
OrcFileMetadata metadata = null;
if (fileKey != null && metadataCache != null) {
metadata = metadataCache.getFileMetadata(fileKey);
if (metadata != null) {
counters.incrCounter(LlapIOCounters.METADATA_CACHE_HIT);
return metadata;
} else {
counters.incrCounter(LlapIOCounters.METADATA_CACHE_MISS);
}
}
ensureOrcReader();
// We assume this call doesn't touch HDFS because everything is already read; don't add time.
metadata = new OrcFileMetadata(fileKey, orcReader);
if (fileKey == null || metadataCache == null) return metadata;
return metadataCache.putFileMetadata(metadata);
}
/**
* Reads the metadata for all stripes in the file.
*/
private ArrayList<OrcStripeMetadata> readStripesMetadata(
boolean[] globalInc, boolean[] sargColumns) throws IOException {
ArrayList<OrcStripeMetadata> result = new ArrayList<OrcStripeMetadata>(readState.length);
boolean hasFileId = this.fileKey != null;
OrcBatchKey stripeKey = hasFileId ? new OrcBatchKey(fileKey, 0, 0) : null;
for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) {
OrcStripeMetadata value = null;
int stripeIx = stripeIxMod + stripeIxFrom;
if (hasFileId && metadataCache != null) {
stripeKey.stripeIx = stripeIx;
value = metadataCache.getStripeMetadata(stripeKey);
}
if (value == null || !value.hasAllIndexes(globalInc)) {
counters.incrCounter(LlapIOCounters.METADATA_CACHE_MISS);
ensureMetadataReader();
StripeInformation si = fileMetadata.getStripes().get(stripeIx);
if (value == null) {
long startTime = counters.startTimeCounter();
value = new OrcStripeMetadata(new OrcBatchKey(fileKey, stripeIx, 0),
metadataReader, si, globalInc, sargColumns);
counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime);
if (hasFileId && metadataCache != null) {
value = metadataCache.putStripeMetadata(value);
if (LlapIoImpl.ORC_LOGGER.isTraceEnabled()) {
LlapIoImpl.ORC_LOGGER.trace("Caching stripe {} metadata with includes: {}",
stripeKey.stripeIx, DebugUtils.toString(globalInc));
}
}
}
// We might have got an old value from cache; recheck it has indexes.
if (!value.hasAllIndexes(globalInc)) {
if (LlapIoImpl.ORC_LOGGER.isTraceEnabled()) {
LlapIoImpl.ORC_LOGGER.trace("Updating indexes in stripe {} metadata for includes: {}",
stripeKey.stripeIx, DebugUtils.toString(globalInc));
}
updateLoadedIndexes(value, si, globalInc, sargColumns);
}
} else {
counters.incrCounter(LlapIOCounters.METADATA_CACHE_HIT);
}
result.add(value);
consumer.setStripeMetadata(value);
}
return result;
}
private void ensureMetadataReader() throws IOException {
ensureOrcReader();
if (metadataReader != null) return;
long startTime = counters.startTimeCounter();
boolean useZeroCopy = (conf != null) && OrcConf.USE_ZEROCOPY.getBoolean(conf);
metadataReader = RecordReaderUtils.createDefaultDataReader(
DataReaderProperties.builder()
.withBufferSize(orcReader.getCompressionSize())
.withCompression(orcReader.getCompressionKind())
.withFileSystem(fs)
.withPath(path)
.withTypeCount(orcReader.getSchema().getMaximumId() + 1)
.withZeroCopy(useZeroCopy)
.build());
counters.incrTimeCounter(LlapIOCounters.HDFS_TIME_NS, startTime);
}
@Override
public void returnData(OrcEncodedColumnBatch ecb) {
for (ColumnStreamData[] datas : ecb.getColumnData()) {
if (datas == null) continue;
for (ColumnStreamData data : datas) {
if (data == null || data.decRef() != 0) continue;
if (LlapIoImpl.LOCKING_LOGGER.isTraceEnabled()) {
for (MemoryBuffer buf : data.getCacheBuffers()) {
LlapIoImpl.LOCKING_LOGGER.trace("Unlocking {} at the end of processing", buf);
}
}
bufferManager.decRefBuffers(data.getCacheBuffers());
CSD_POOL.offer(data);
}
}
// We can offer ECB even with some streams not discarded; reset() will clear the arrays.
ECB_POOL.offer(ecb);
}
/**
* Determines which RGs need to be read, after stripes have been determined.
* SARG is applied, and readState is populated for each stripe accordingly.
*/
private boolean determineRgsToRead(boolean[] globalIncludes, int rowIndexStride,
ArrayList<OrcStripeMetadata> metadata) throws IOException {
RecordReaderImpl.SargApplier sargApp = null;
if (sarg != null && rowIndexStride != 0) {
List<OrcProto.Type> types = fileMetadata.getTypes();
String[] colNamesForSarg = OrcInputFormat.getSargColumnNames(
columnNames, types, globalIncludes, fileMetadata.isOriginalFormat());
TypeDescription schema = OrcUtils.convertTypeFromProtobuf(types, 0);
SchemaEvolution schemaEvolution = new SchemaEvolution(schema, globalIncludes);
sargApp = new RecordReaderImpl.SargApplier(sarg, colNamesForSarg,
rowIndexStride, globalIncludes.length, schemaEvolution);
}
boolean hasAnyData = false;
// readState should have been initialized by this time with an empty array.
for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) {
int stripeIx = stripeIxMod + stripeIxFrom;
StripeInformation stripe = fileMetadata.getStripes().get(stripeIx);
int rgCount = getRgCount(stripe, rowIndexStride);
boolean[] rgsToRead = null;
if (sargApp != null) {
OrcStripeMetadata stripeMetadata = metadata.get(stripeIxMod);
rgsToRead = sargApp.pickRowGroups(stripe, stripeMetadata.getRowIndexes(),
stripeMetadata.getBloomFilterIndexes(), true);
}
boolean isNone = rgsToRead == RecordReaderImpl.SargApplier.READ_NO_RGS,
isAll = rgsToRead == RecordReaderImpl.SargApplier.READ_ALL_RGS;
hasAnyData = hasAnyData || !isNone;
if (LlapIoImpl.ORC_LOGGER.isTraceEnabled()) {
if (isNone) {
LlapIoImpl.ORC_LOGGER.trace("SARG eliminated all RGs for stripe {}", stripeIx);
} else if (!isAll) {
LlapIoImpl.ORC_LOGGER.trace("SARG picked RGs for stripe {}: {}",
stripeIx, DebugUtils.toString(rgsToRead));
} else {
LlapIoImpl.ORC_LOGGER.trace("Will read all {} RGs for stripe {}", rgCount, stripeIx);
}
}
assert isAll || isNone || rgsToRead.length == rgCount;
readState[stripeIxMod] = new boolean[columnIds.size()][];
for (int j = 0; j < columnIds.size(); ++j) {
readState[stripeIxMod][j] = (isAll || isNone) ? rgsToRead :
Arrays.copyOf(rgsToRead, rgsToRead.length);
}
adjustRgMetric(rgCount, rgsToRead, isNone, isAll);
}
return hasAnyData;
}
private void adjustRgMetric(int rgCount, boolean[] rgsToRead, boolean isNone,
boolean isAll) {
int count = 0;
if (!isAll) {
for (boolean b : rgsToRead) {
if (b)
count++;
}
} else if (!isNone) {
count = rgCount;
}
counters.incrCounter(LlapIOCounters.SELECTED_ROWGROUPS, count);
}
private int getRgCount(StripeInformation stripe, int rowIndexStride) {
return (int)Math.ceil((double)stripe.getNumberOfRows() / rowIndexStride);
}
/**
* Determine which stripes to read for a split. Populates stripeIxFrom and readState.
*/
public void determineStripesToRead() {
// The unit of caching for ORC is (rg x column) (see OrcBatchKey).
List<StripeInformation> stripes = fileMetadata.getStripes();
long offset = split.getStart(), maxOffset = offset + split.getLength();
stripeIxFrom = -1;
int stripeIxTo = -1;
if (LlapIoImpl.ORC_LOGGER.isDebugEnabled()) {
String tmp = "FileSplit {" + split.getStart() + ", " + split.getLength() + "}; stripes ";
for (StripeInformation stripe : stripes) {
tmp += "{" + stripe.getOffset() + ", " + stripe.getLength() + "}, ";
}
LlapIoImpl.ORC_LOGGER.debug(tmp);
}
int stripeIx = 0;
for (StripeInformation stripe : stripes) {
long stripeStart = stripe.getOffset();
if (offset > stripeStart) {
// We assume splits will never start in the middle of the stripe.
++stripeIx;
continue;
}
if (stripeIxFrom == -1) {
LlapIoImpl.ORC_LOGGER.trace("Including stripes from {} ({} >= {})",
stripeIx, stripeStart, offset);
stripeIxFrom = stripeIx;
}
if (stripeStart >= maxOffset) {
stripeIxTo = stripeIx;
LlapIoImpl.ORC_LOGGER.trace("Including stripes until {} ({} >= {}); {} stripes",
stripeIxTo, stripeStart, maxOffset, (stripeIxTo - stripeIxFrom));
break;
}
++stripeIx;
}
if (stripeIxFrom == -1) {
LlapIoImpl.LOG.info("Not including any stripes - empty split");
}
if (stripeIxTo == -1 && stripeIxFrom != -1) {
stripeIxTo = stripeIx;
LlapIoImpl.ORC_LOGGER.trace("Including stripes until {} (end of file); {} stripes",
stripeIx, (stripeIxTo - stripeIxFrom));
}
readState = new boolean[stripeIxTo - stripeIxFrom][][];
}
private class DataWrapperForOrc implements DataReader, DataCache {
private final DataReader orcDataReader;
private DataWrapperForOrc(DataWrapperForOrc other) {
orcDataReader = other.orcDataReader.clone();
}
public DataWrapperForOrc() throws IOException {
ensureMetadataReader();
this.orcDataReader = metadataReader.clone();
}
@Override
public DiskRangeList getFileData(Object fileKey, DiskRangeList range,
long baseOffset, DiskRangeListFactory factory, BooleanRef gotAllData) {
DiskRangeList result = (lowLevelCache == null) ? range
: lowLevelCache.getFileData(fileKey, range, baseOffset, factory, counters, gotAllData);
if (gotAllData.value) return result;
return (metadataCache == null) ? range
: metadataCache.getIncompleteCbs(fileKey, range, baseOffset, factory, gotAllData);
}
@Override
public long[] putFileData(Object fileKey, DiskRange[] ranges,
MemoryBuffer[] data, long baseOffset) {
if (data != null) {
return (lowLevelCache == null) ? null : lowLevelCache.putFileData(
fileKey, ranges, data, baseOffset, Priority.NORMAL, counters);
} else if (metadataCache != null) {
metadataCache.putIncompleteCbs(fileKey, ranges, baseOffset);
}
return null;
}
@Override
public void releaseBuffer(MemoryBuffer buffer) {
bufferManager.decRefBuffer(buffer);
}
@Override
public void reuseBuffer(MemoryBuffer buffer) {
boolean isReused = bufferManager.incRefBuffer(buffer);
assert isReused;
}
@Override
public Allocator getAllocator() {
return bufferManager.getAllocator();
}
@Override
public void close() throws IOException {
orcDataReader.close();
if (metadataReader != null) {
metadataReader.close();
}
}
@Override
public DiskRangeList readFileData(DiskRangeList range, long baseOffset,
boolean doForceDirect) throws IOException {
long startTime = counters.startTimeCounter();
DiskRangeList result = orcDataReader.readFileData(range, baseOffset, doForceDirect);
counters.recordHdfsTime(startTime);
if (LlapIoImpl.ORC_LOGGER.isTraceEnabled()) {
LlapIoImpl.ORC_LOGGER.trace("Disk ranges after disk read (file {}, base offset {}): {}",
fileKey, baseOffset, RecordReaderUtils.stringifyDiskRanges(result));
}
return result;
}
@Override
public boolean isTrackingDiskRanges() {
return orcDataReader.isTrackingDiskRanges();
}
@Override
public void releaseBuffer(ByteBuffer buffer) {
orcDataReader.releaseBuffer(buffer);
}
@Override
public DataWrapperForOrc clone() {
return new DataWrapperForOrc(this);
}
@Override
public void open() throws IOException {
long startTime = counters.startTimeCounter();
orcDataReader.open();
counters.recordHdfsTime(startTime);
}
@Override
public OrcIndex readRowIndex(StripeInformation stripe,
OrcProto.StripeFooter footer,
boolean[] included,
OrcProto.RowIndex[] indexes,
boolean[] sargColumns,
OrcProto.BloomFilterIndex[] bloomFilterIndices
) throws IOException {
return orcDataReader.readRowIndex(stripe, footer, included, indexes,
sargColumns, bloomFilterIndices);
}
@Override
public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException {
return orcDataReader.readStripeFooter(stripe);
}
}
public TezCounters getTezCounters() {
return counters.getTezCounters();
}
}