blob: 947889e06c91dc0d6889194bb0ba8fae90c50156 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.orc.impl;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
import org.apache.hadoop.hive.ql.util.TimestampUtils;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.io.Text;
import org.apache.orc.BooleanColumnStatistics;
import org.apache.orc.CollectionColumnStatistics;
import org.apache.orc.ColumnStatistics;
import org.apache.orc.CompressionCodec;
import org.apache.orc.DataReader;
import org.apache.orc.DateColumnStatistics;
import org.apache.orc.DecimalColumnStatistics;
import org.apache.orc.DoubleColumnStatistics;
import org.apache.orc.IntegerColumnStatistics;
import org.apache.orc.OrcConf;
import org.apache.orc.OrcFile;
import org.apache.orc.OrcProto;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.StringColumnStatistics;
import org.apache.orc.StripeInformation;
import org.apache.orc.TimestampColumnStatistics;
import org.apache.orc.TypeDescription;
import org.apache.orc.impl.reader.ReaderEncryption;
import org.apache.orc.impl.reader.StripePlanner;
import org.apache.orc.impl.reader.tree.BatchReader;
import org.apache.orc.util.BloomFilter;
import org.apache.orc.util.BloomFilterIO;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.TimeZone;
public class RecordReaderImpl implements RecordReader {
static final Logger LOG = LoggerFactory.getLogger(RecordReaderImpl.class);
private static final boolean isLogDebugEnabled = LOG.isDebugEnabled();
protected final Path path;
private final long firstRow;
private final List<StripeInformation> stripes = new ArrayList<>();
private OrcProto.StripeFooter stripeFooter;
private final long totalRowCount;
protected final TypeDescription schema;
// the file included columns indexed by the file's column ids.
private final boolean[] fileIncluded;
private final long rowIndexStride;
private long rowInStripe = 0;
private int currentStripe = -1;
private long rowBaseInStripe = 0;
private long rowCountInStripe = 0;
private final BatchReader reader;
private final OrcIndex indexes;
private final SargApplier sargApp;
// an array about which row groups aren't skipped
private boolean[] includedRowGroups = null;
private final DataReader dataReader;
private final int maxDiskRangeChunkLimit;
private final StripePlanner planner;
/**
* Given a list of column names, find the given column and return the index.
*
* @param evolution the mapping from reader to file schema
* @param columnName the fully qualified column name to look for
* @return the file column number or -1 if the column wasn't found
*/
static int findColumns(SchemaEvolution evolution,
String columnName) {
try {
TypeDescription readerColumn =
evolution.getReaderBaseSchema().findSubtype(columnName);
TypeDescription fileColumn = evolution.getFileType(readerColumn);
return fileColumn == null ? -1 : fileColumn.getId();
} catch (IllegalArgumentException e) {
if (LOG.isDebugEnabled()){
LOG.debug("{}", e.getMessage());
}
return -1;
}
}
/**
* Find the mapping from predicate leaves to columns.
* @param sargLeaves the search argument that we need to map
* @param evolution the mapping from reader to file schema
* @return an array mapping the sarg leaves to concrete column numbers in the
* file
*/
public static int[] mapSargColumnsToOrcInternalColIdx(
List<PredicateLeaf> sargLeaves,
SchemaEvolution evolution) {
int[] result = new int[sargLeaves.size()];
Arrays.fill(result, -1);
for(int i=0; i < result.length; ++i) {
String colName = sargLeaves.get(i).getColumnName();
result[i] = findColumns(evolution, colName);
}
return result;
}
protected RecordReaderImpl(ReaderImpl fileReader,
Reader.Options options) throws IOException {
OrcFile.WriterVersion writerVersion = fileReader.getWriterVersion();
SchemaEvolution evolution;
if (options.getSchema() == null) {
if (LOG.isInfoEnabled()) {
LOG.info("Reader schema not provided -- using file schema " +
fileReader.getSchema());
}
evolution = new SchemaEvolution(fileReader.getSchema(), null, options);
} else {
// Now that we are creating a record reader for a file, validate that
// the schema to read is compatible with the file schema.
//
evolution = new SchemaEvolution(fileReader.getSchema(),
options.getSchema(),
options);
if (LOG.isDebugEnabled() && evolution.hasConversion()) {
LOG.debug("ORC file " + fileReader.path.toString() +
" has data type conversion --\n" +
"reader schema: " + options.getSchema().toString() + "\n" +
"file schema: " + fileReader.getSchema());
}
}
this.schema = evolution.getReaderSchema();
this.path = fileReader.path;
this.rowIndexStride = fileReader.rowIndexStride;
boolean ignoreNonUtf8BloomFilter = OrcConf.IGNORE_NON_UTF8_BLOOM_FILTERS.getBoolean(fileReader.conf);
ReaderEncryption encryption = fileReader.getEncryption();
this.fileIncluded = evolution.getFileIncluded();
SearchArgument sarg = options.getSearchArgument();
if (sarg != null && rowIndexStride != 0) {
sargApp = new SargApplier(sarg,
rowIndexStride,
evolution,
writerVersion,
fileReader.useUTCTimestamp,
fileReader.writerUsedProlepticGregorian(),
fileReader.options.getConvertToProlepticGregorian());
} else {
sargApp = null;
}
long rows = 0;
long skippedRows = 0;
long offset = options.getOffset();
long maxOffset = options.getMaxOffset();
for(StripeInformation stripe: fileReader.getStripes()) {
long stripeStart = stripe.getOffset();
if (offset > stripeStart) {
skippedRows += stripe.getNumberOfRows();
} else if (stripeStart < maxOffset) {
this.stripes.add(stripe);
rows += stripe.getNumberOfRows();
}
}
this.maxDiskRangeChunkLimit = OrcConf.ORC_MAX_DISK_RANGE_CHUNK_LIMIT.getInt(fileReader.conf);
Boolean zeroCopy = options.getUseZeroCopy();
if (zeroCopy == null) {
zeroCopy = OrcConf.USE_ZEROCOPY.getBoolean(fileReader.conf);
}
if (options.getDataReader() != null) {
this.dataReader = options.getDataReader().clone();
} else {
InStream.StreamOptions unencryptedOptions =
InStream.options()
.withCodec(OrcCodecPool.getCodec(fileReader.getCompressionKind()))
.withBufferSize(fileReader.getCompressionSize());
DataReaderProperties.Builder builder =
DataReaderProperties.builder()
.withCompression(unencryptedOptions)
.withFileSystemSupplier(fileReader.getFileSystemSupplier())
.withPath(fileReader.path)
.withMaxDiskRangeChunkLimit(maxDiskRangeChunkLimit)
.withZeroCopy(zeroCopy);
FSDataInputStream file = fileReader.takeFile();
if (file != null) {
builder.withFile(file);
}
this.dataReader = RecordReaderUtils.createDefaultDataReader(
builder.build());
}
firstRow = skippedRows;
totalRowCount = rows;
Boolean skipCorrupt = options.getSkipCorruptRecords();
if (skipCorrupt == null) {
skipCorrupt = OrcConf.SKIP_CORRUPT_DATA.getBoolean(fileReader.conf);
}
TreeReaderFactory.ReaderContext readerContext =
new TreeReaderFactory.ReaderContext()
.setSchemaEvolution(evolution)
.skipCorrupt(skipCorrupt)
.fileFormat(fileReader.getFileVersion())
.useUTCTimestamp(fileReader.useUTCTimestamp)
.setProlepticGregorian(fileReader.writerUsedProlepticGregorian(),
fileReader.options.getConvertToProlepticGregorian())
.setEncryption(encryption);
reader = TreeReaderFactory.createRootReader(evolution.getReaderSchema(), readerContext);
int columns = evolution.getFileSchema().getMaximumId() + 1;
indexes = new OrcIndex(new OrcProto.RowIndex[columns],
new OrcProto.Stream.Kind[columns],
new OrcProto.BloomFilterIndex[columns]);
planner = new StripePlanner(evolution.getFileSchema(), encryption,
dataReader, writerVersion, ignoreNonUtf8BloomFilter,
maxDiskRangeChunkLimit);
try {
advanceToNextRow(reader, 0L, true);
} catch (IOException e) {
// Try to close since this happens in constructor.
close();
throw e;
}
}
public static final class PositionProviderImpl implements PositionProvider {
private final OrcProto.RowIndexEntry entry;
private int index;
public PositionProviderImpl(OrcProto.RowIndexEntry entry) {
this(entry, 0);
}
public PositionProviderImpl(OrcProto.RowIndexEntry entry, int startPos) {
this.entry = entry;
this.index = startPos;
}
@Override
public long getNext() {
return entry.getPositions(index++);
}
}
public static final class ZeroPositionProvider implements PositionProvider {
@Override
public long getNext() {
return 0;
}
}
public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe
) throws IOException {
return dataReader.readStripeFooter(stripe);
}
enum Location {
BEFORE, MIN, MIDDLE, MAX, AFTER
}
static class ValueRange<T extends Comparable> {
final Comparable lower;
final Comparable upper;
final boolean onlyLowerBound;
final boolean onlyUpperBound;
final boolean hasNulls;
ValueRange(PredicateLeaf predicate,
T lower, T upper,
boolean hasNulls,
boolean onlyLowerBound,
boolean onlyUpperBound) {
PredicateLeaf.Type type = predicate.getType();
this.lower = getBaseObjectForComparison(type, lower);
this.upper = getBaseObjectForComparison(type, upper);
this.hasNulls = hasNulls;
this.onlyLowerBound = onlyLowerBound;
this.onlyUpperBound = onlyUpperBound;
}
ValueRange(PredicateLeaf predicate, T lower, T upper,
boolean hasNulls) {
this(predicate, lower, upper, hasNulls, false, false);
}
boolean hasValues() {
return lower != null;
}
/**
* Given a point and min and max, determine if the point is before, at the
* min, in the middle, at the max, or after the range.
* @param point the point to test
* @return the location of the point
*/
Location compare(Comparable point) {
int minCompare = point.compareTo(lower);
if (minCompare < 0) {
return Location.BEFORE;
} else if (minCompare == 0) {
return onlyLowerBound ? Location.BEFORE : Location.MIN;
}
int maxCompare = point.compareTo(upper);
if (maxCompare > 0) {
return Location.AFTER;
} else if (maxCompare == 0) {
return onlyUpperBound ? Location.AFTER : Location.MAX;
}
return Location.MIDDLE;
}
/**
* Is this range a single point?
* @return true if min == max
*/
boolean isSingleton() {
return lower != null && !onlyLowerBound && !onlyUpperBound &&
lower.equals(upper);
}
/**
* Add the null option to the truth value, if the range includes nulls.
* @param value the original truth value
* @return the truth value extended with null if appropriate
*/
TruthValue addNull(TruthValue value) {
if (hasNulls) {
switch (value) {
case YES:
return TruthValue.YES_NULL;
case NO:
return TruthValue.NO_NULL;
case YES_NO:
return TruthValue.YES_NO_NULL;
default:
return value;
}
} else {
return value;
}
}
}
/**
* Get the maximum value out of an index entry.
* Includes option to specify if timestamp column stats values
* should be in UTC.
* @param index the index entry
* @param predicate the kind of predicate
* @param useUTCTimestamp use UTC for timestamps
* @return the object for the maximum value or null if there isn't one
*/
static ValueRange getValueRange(ColumnStatistics index,
PredicateLeaf predicate,
boolean useUTCTimestamp) {
if (index instanceof IntegerColumnStatistics) {
IntegerColumnStatistics stats = (IntegerColumnStatistics) index;
Long min = stats.getMinimum();
Long max = stats.getMaximum();
return new ValueRange<>(predicate, min, max, stats.hasNull());
} else if (index instanceof CollectionColumnStatistics) {
CollectionColumnStatistics stats = (CollectionColumnStatistics) index;
Long min = stats.getMinimumChildren();
Long max = stats.getMaximumChildren();
return new ValueRange<>(predicate, min, max, stats.hasNull());
}else if (index instanceof DoubleColumnStatistics) {
DoubleColumnStatistics stats = (DoubleColumnStatistics) index;
Double min = stats.getMinimum();
Double max = stats.getMaximum();
return new ValueRange<>(predicate, min, max, stats.hasNull());
} else if (index instanceof StringColumnStatistics) {
StringColumnStatistics stats = (StringColumnStatistics) index;
return new ValueRange<>(predicate, stats.getLowerBound(),
stats.getUpperBound(), stats.hasNull(), stats.getMinimum() == null,
stats.getMaximum() == null);
} else if (index instanceof DateColumnStatistics) {
DateColumnStatistics stats = (DateColumnStatistics) index;
java.util.Date min = stats.getMinimum();
java.util.Date max = stats.getMaximum();
return new ValueRange<>(predicate, min, max, stats.hasNull());
} else if (index instanceof DecimalColumnStatistics) {
DecimalColumnStatistics stats = (DecimalColumnStatistics) index;
HiveDecimal min = stats.getMinimum();
HiveDecimal max = stats.getMaximum();
return new ValueRange<>(predicate, min, max, stats.hasNull());
} else if (index instanceof TimestampColumnStatistics) {
TimestampColumnStatistics stats = (TimestampColumnStatistics) index;
Timestamp min = useUTCTimestamp ? stats.getMinimumUTC() : stats.getMinimum();
Timestamp max = useUTCTimestamp ? stats.getMaximumUTC() : stats.getMaximum();
return new ValueRange<>(predicate, min, max, stats.hasNull());
} else if (index instanceof BooleanColumnStatistics) {
BooleanColumnStatistics stats = (BooleanColumnStatistics) index;
Boolean min = stats.getFalseCount() == 0;
Boolean max = stats.getTrueCount() != 0;
return new ValueRange<>(predicate, min, max, stats.hasNull());
} else {
return new ValueRange(predicate, null, null, true);
}
}
/**
* Evaluate a predicate with respect to the statistics from the column
* that is referenced in the predicate.
* @param statsProto the statistics for the column mentioned in the predicate
* @param predicate the leaf predicate we need to evaluation
* @param bloomFilter the bloom filter
* @param writerVersion the version of software that wrote the file
* @param type what is the kind of this column
* @return the set of truth values that may be returned for the given
* predicate.
*/
static TruthValue evaluatePredicateProto(OrcProto.ColumnStatistics statsProto,
PredicateLeaf predicate,
OrcProto.Stream.Kind kind,
OrcProto.ColumnEncoding encoding,
OrcProto.BloomFilter bloomFilter,
OrcFile.WriterVersion writerVersion,
TypeDescription type) {
return evaluatePredicateProto(statsProto, predicate, kind, encoding, bloomFilter,
writerVersion, type, false);
}
/**
* Evaluate a predicate with respect to the statistics from the column
* that is referenced in the predicate.
* Includes option to specify if timestamp column stats values
* should be in UTC.
* @param statsProto the statistics for the column mentioned in the predicate
* @param predicate the leaf predicate we need to evaluation
* @param bloomFilter the bloom filter
* @param writerVersion the version of software that wrote the file
* @param type what is the kind of this column
* @param useUTCTimestamp
* @return the set of truth values that may be returned for the given
* predicate.
*/
static TruthValue evaluatePredicateProto(OrcProto.ColumnStatistics statsProto,
PredicateLeaf predicate,
OrcProto.Stream.Kind kind,
OrcProto.ColumnEncoding encoding,
OrcProto.BloomFilter bloomFilter,
OrcFile.WriterVersion writerVersion,
TypeDescription type,
boolean useUTCTimestamp) {
ColumnStatistics cs = ColumnStatisticsImpl.deserialize(null, statsProto);
ValueRange range = getValueRange(cs, predicate, useUTCTimestamp);
// files written before ORC-135 stores timestamp wrt to local timezone causing issues with PPD.
// disable PPD for timestamp for all old files
TypeDescription.Category category = type.getCategory();
if (category == TypeDescription.Category.TIMESTAMP) {
if (!writerVersion.includes(OrcFile.WriterVersion.ORC_135)) {
LOG.debug("Not using predication pushdown on {} because it doesn't " +
"include ORC-135. Writer version: {}",
predicate.getColumnName(), writerVersion);
return range.addNull(TruthValue.YES_NO);
}
if (predicate.getType() != PredicateLeaf.Type.TIMESTAMP &&
predicate.getType() != PredicateLeaf.Type.DATE &&
predicate.getType() != PredicateLeaf.Type.STRING) {
return range.addNull(TruthValue.YES_NO);
}
} else if (writerVersion == OrcFile.WriterVersion.ORC_135 &&
category == TypeDescription.Category.DECIMAL &&
type.getPrecision() <= TypeDescription.MAX_DECIMAL64_PRECISION) {
// ORC 1.5.0 to 1.5.5, which use WriterVersion.ORC_135, have broken
// min and max values for decimal64. See ORC-517.
LOG.debug("Not using predicate push down on {}, because the file doesn't"+
" include ORC-517. Writer version: {}",
predicate.getColumnName(), writerVersion);
return TruthValue.YES_NO_NULL;
}
return evaluatePredicateRange(predicate, range,
BloomFilterIO.deserialize(kind, encoding, writerVersion, type.getCategory(),
bloomFilter), useUTCTimestamp);
}
/**
* Evaluate a predicate with respect to the statistics from the column
* that is referenced in the predicate.
* @param stats the statistics for the column mentioned in the predicate
* @param predicate the leaf predicate we need to evaluation
* @return the set of truth values that may be returned for the given
* predicate.
*/
public static TruthValue evaluatePredicate(ColumnStatistics stats,
PredicateLeaf predicate,
BloomFilter bloomFilter) {
return evaluatePredicate(stats, predicate, bloomFilter, false);
}
/**
* Evaluate a predicate with respect to the statistics from the column
* that is referenced in the predicate.
* Includes option to specify if timestamp column stats values
* should be in UTC.
* @param stats the statistics for the column mentioned in the predicate
* @param predicate the leaf predicate we need to evaluation
* @param bloomFilter
* @param useUTCTimestamp
* @return the set of truth values that may be returned for the given
* predicate.
*/
public static TruthValue evaluatePredicate(ColumnStatistics stats,
PredicateLeaf predicate,
BloomFilter bloomFilter,
boolean useUTCTimestamp) {
ValueRange range = getValueRange(stats, predicate, useUTCTimestamp);
return evaluatePredicateRange(predicate, range, bloomFilter, useUTCTimestamp);
}
static TruthValue evaluatePredicateRange(PredicateLeaf predicate,
ValueRange range,
BloomFilter bloomFilter,
boolean useUTCTimestamp) {
// if we didn't have any values, everything must have been null
if (!range.hasValues()) {
if (predicate.getOperator() == PredicateLeaf.Operator.IS_NULL) {
return TruthValue.YES;
} else {
return TruthValue.NULL;
}
}
TruthValue result;
Comparable baseObj = (Comparable) predicate.getLiteral();
// Predicate object and stats objects are converted to the type of the predicate object.
Comparable predObj = getBaseObjectForComparison(predicate.getType(), baseObj);
result = evaluatePredicateMinMax(predicate, predObj, range);
if (shouldEvaluateBloomFilter(predicate, result, bloomFilter)) {
return evaluatePredicateBloomFilter(predicate, predObj, bloomFilter, range.hasNulls, useUTCTimestamp);
} else {
return result;
}
}
private static boolean shouldEvaluateBloomFilter(PredicateLeaf predicate,
TruthValue result, BloomFilter bloomFilter) {
// evaluate bloom filter only when
// 1) Bloom filter is available
// 2) Min/Max evaluation yield YES or MAYBE
// 3) Predicate is EQUALS or IN list
if (bloomFilter != null
&& result != TruthValue.NO_NULL && result != TruthValue.NO
&& (predicate.getOperator().equals(PredicateLeaf.Operator.EQUALS)
|| predicate.getOperator().equals(PredicateLeaf.Operator.NULL_SAFE_EQUALS)
|| predicate.getOperator().equals(PredicateLeaf.Operator.IN))) {
return true;
}
return false;
}
private static TruthValue evaluatePredicateMinMax(PredicateLeaf predicate,
Comparable predObj,
ValueRange range) {
Location loc;
switch (predicate.getOperator()) {
case NULL_SAFE_EQUALS:
loc = range.compare(predObj);
if (loc == Location.BEFORE || loc == Location.AFTER) {
return TruthValue.NO;
} else {
return TruthValue.YES_NO;
}
case EQUALS:
loc = range.compare(predObj);
if (range.isSingleton() && loc == Location.MIN) {
return range.addNull(TruthValue.YES);
} else if (loc == Location.BEFORE || loc == Location.AFTER) {
return range.addNull(TruthValue.NO);
} else {
return range.addNull(TruthValue.YES_NO);
}
case LESS_THAN:
loc = range.compare(predObj);
if (loc == Location.AFTER) {
return range.addNull(TruthValue.YES);
} else if (loc == Location.BEFORE || loc == Location.MIN) {
return range.addNull(TruthValue.NO);
} else {
return range.addNull(TruthValue.YES_NO);
}
case LESS_THAN_EQUALS:
loc = range.compare(predObj);
if (loc == Location.AFTER || loc == Location.MAX) {
return range.addNull(TruthValue.YES);
} else if (loc == Location.BEFORE) {
return range.addNull(TruthValue.NO);
} else {
return range.addNull(TruthValue.YES_NO);
}
case IN:
if (range.isSingleton()) {
// for a single value, look through to see if that value is in the
// set
for (Object arg : predicate.getLiteralList()) {
if (range.compare((Comparable) arg) == Location.MIN) {
return range.addNull(TruthValue.YES);
}
}
return range.addNull(TruthValue.NO);
} else {
// are all of the values outside of the range?
for (Object arg : predicate.getLiteralList()) {
predObj = getBaseObjectForComparison(predicate.getType(), (Comparable) arg);
loc = range.compare(predObj);
if (loc == Location.MIN || loc == Location.MIDDLE ||
loc == Location.MAX) {
return range.addNull(TruthValue.YES_NO);
}
}
return range.addNull(TruthValue.NO);
}
case BETWEEN:
List<Object> args = predicate.getLiteralList();
if (args == null || args.isEmpty()) {
return range.addNull(TruthValue.YES_NO);
}
Comparable predObj1 = getBaseObjectForComparison(predicate.getType(),
(Comparable) args.get(0));
loc = range.compare(predObj1);
if (loc == Location.BEFORE || loc == Location.MIN) {
Comparable predObj2 = getBaseObjectForComparison(predicate.getType(),
(Comparable) args.get(1));
Location loc2 = range.compare(predObj2);
if (loc2 == Location.AFTER || loc2 == Location.MAX) {
return range.addNull(TruthValue.YES);
} else if (loc2 == Location.BEFORE) {
return range.addNull(TruthValue.NO);
} else {
return range.addNull(TruthValue.YES_NO);
}
} else if (loc == Location.AFTER) {
return range.addNull(TruthValue.NO);
} else {
return range.addNull(TruthValue.YES_NO);
}
case IS_NULL:
// min = null condition above handles the all-nulls YES case
return range.hasNulls ? TruthValue.YES_NO : TruthValue.NO;
default:
return range.addNull(TruthValue.YES_NO);
}
}
private static TruthValue evaluatePredicateBloomFilter(PredicateLeaf predicate,
final Object predObj, BloomFilter bloomFilter, boolean hasNull, boolean useUTCTimestamp) {
switch (predicate.getOperator()) {
case NULL_SAFE_EQUALS:
// null safe equals does not return *_NULL variant. So set hasNull to false
return checkInBloomFilter(bloomFilter, predObj, false, useUTCTimestamp);
case EQUALS:
return checkInBloomFilter(bloomFilter, predObj, hasNull, useUTCTimestamp);
case IN:
for (Object arg : predicate.getLiteralList()) {
// if atleast one value in IN list exist in bloom filter, qualify the row group/stripe
Object predObjItem = getBaseObjectForComparison(predicate.getType(), (Comparable) arg);
TruthValue result = checkInBloomFilter(bloomFilter, predObjItem, hasNull, useUTCTimestamp);
if (result == TruthValue.YES_NO_NULL || result == TruthValue.YES_NO) {
return result;
}
}
return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
default:
return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
}
}
private static TruthValue checkInBloomFilter(BloomFilter bf,
Object predObj,
boolean hasNull,
boolean useUTCTimestamp) {
TruthValue result = hasNull ? TruthValue.NO_NULL : TruthValue.NO;
if (predObj instanceof Long) {
if (bf.testLong(((Long) predObj).longValue())) {
result = TruthValue.YES_NO_NULL;
}
} else if (predObj instanceof Double) {
if (bf.testDouble(((Double) predObj).doubleValue())) {
result = TruthValue.YES_NO_NULL;
}
} else if (predObj instanceof String || predObj instanceof Text ||
predObj instanceof HiveDecimalWritable ||
predObj instanceof BigDecimal) {
if (bf.testString(predObj.toString())) {
result = TruthValue.YES_NO_NULL;
}
} else if (predObj instanceof Timestamp) {
if (useUTCTimestamp) {
if (bf.testLong(((Timestamp) predObj).getTime())) {
result = TruthValue.YES_NO_NULL;
}
} else {
if (bf.testLong(SerializationUtils.convertToUtc(TimeZone.getDefault(), ((Timestamp) predObj).getTime()))) {
result = TruthValue.YES_NO_NULL;
}
}
} else if (predObj instanceof Date) {
if (bf.testLong(DateWritable.dateToDays((Date) predObj))) {
result = TruthValue.YES_NO_NULL;
}
} else {
// if the predicate object is null and if hasNull says there are no nulls then return NO
if (predObj == null && !hasNull) {
result = TruthValue.NO;
} else {
result = TruthValue.YES_NO_NULL;
}
}
if (result == TruthValue.YES_NO_NULL && !hasNull) {
result = TruthValue.YES_NO;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Bloom filter evaluation: " + result.toString());
}
return result;
}
/**
* An exception for when we can't cast things appropriately
*/
static class SargCastException extends IllegalArgumentException {
public SargCastException(String string) {
super(string);
}
}
private static Comparable getBaseObjectForComparison(PredicateLeaf.Type type,
Comparable obj) {
if (obj == null) {
return null;
}
switch (type) {
case BOOLEAN:
if (obj instanceof Boolean) {
return obj;
} else {
// will only be true if the string conversion yields "true", all other values are
// considered false
return Boolean.valueOf(obj.toString());
}
case DATE:
if (obj instanceof Date) {
return obj;
} else if (obj instanceof String) {
return Date.valueOf((String) obj);
} else if (obj instanceof Timestamp) {
return DateWritable.timeToDate(((Timestamp) obj).getTime() / 1000L);
}
// always string, but prevent the comparison to numbers (are they days/seconds/milliseconds?)
break;
case DECIMAL:
if (obj instanceof Boolean) {
return new HiveDecimalWritable(((Boolean) obj).booleanValue() ?
HiveDecimal.ONE : HiveDecimal.ZERO);
} else if (obj instanceof Integer) {
return new HiveDecimalWritable(((Integer) obj).intValue());
} else if (obj instanceof Long) {
return new HiveDecimalWritable(((Long) obj));
} else if (obj instanceof Float || obj instanceof Double ||
obj instanceof String) {
return new HiveDecimalWritable(obj.toString());
} else if (obj instanceof BigDecimal) {
return new HiveDecimalWritable(HiveDecimal.create((BigDecimal) obj));
} else if (obj instanceof HiveDecimal) {
return new HiveDecimalWritable((HiveDecimal) obj);
} else if (obj instanceof HiveDecimalWritable) {
return obj;
} else if (obj instanceof Timestamp) {
return new HiveDecimalWritable(Double.toString(
TimestampUtils.getDouble((Timestamp) obj)));
}
break;
case FLOAT:
if (obj instanceof Number) {
// widening conversion
return ((Number) obj).doubleValue();
} else if (obj instanceof HiveDecimal) {
return ((HiveDecimal) obj).doubleValue();
} else if (obj instanceof String) {
return Double.valueOf(obj.toString());
} else if (obj instanceof Timestamp) {
return TimestampUtils.getDouble((Timestamp) obj);
} else if (obj instanceof HiveDecimal) {
return ((HiveDecimal) obj).doubleValue();
} else if (obj instanceof BigDecimal) {
return ((BigDecimal) obj).doubleValue();
}
break;
case LONG:
if (obj instanceof Number) {
// widening conversion
return ((Number) obj).longValue();
} else if (obj instanceof HiveDecimal) {
return ((HiveDecimal) obj).longValue();
} else if (obj instanceof String) {
return Long.valueOf(obj.toString());
}
break;
case STRING:
return (obj.toString());
case TIMESTAMP:
if (obj instanceof Timestamp) {
return obj;
} else if (obj instanceof Integer) {
return new Timestamp(((Number) obj).longValue());
} else if (obj instanceof Float) {
return TimestampUtils.doubleToTimestamp(((Float) obj).doubleValue());
} else if (obj instanceof Double) {
return TimestampUtils.doubleToTimestamp(((Double) obj).doubleValue());
} else if (obj instanceof HiveDecimal) {
return TimestampUtils.decimalToTimestamp((HiveDecimal) obj);
} else if (obj instanceof HiveDecimalWritable) {
return TimestampUtils.decimalToTimestamp(((HiveDecimalWritable) obj).getHiveDecimal());
} else if (obj instanceof Date) {
return new Timestamp(((Date) obj).getTime());
}
// float/double conversion to timestamp is interpreted as seconds whereas integer conversion
// to timestamp is interpreted as milliseconds by default. The integer to timestamp casting
// is also config driven. The filter operator changes its promotion based on config:
// "int.timestamp.conversion.in.seconds". Disable PPD for integer cases.
break;
default:
break;
}
throw new SargCastException(String.format(
"ORC SARGS could not convert from %s to %s", obj.getClass()
.getSimpleName(), type));
}
public static class SargApplier {
public final static boolean[] READ_ALL_RGS = null;
public final static boolean[] READ_NO_RGS = new boolean[0];
private final OrcFile.WriterVersion writerVersion;
private final SearchArgument sarg;
private final List<PredicateLeaf> sargLeaves;
private final int[] filterColumns;
private final long rowIndexStride;
// same as the above array, but indices are set to true
private final boolean[] sargColumns;
private SchemaEvolution evolution;
private final long[] exceptionCount;
private final boolean useUTCTimestamp;
private final boolean writerUsedProlepticGregorian;
private final boolean convertToProlepticGregorian;
public SargApplier(SearchArgument sarg,
long rowIndexStride,
SchemaEvolution evolution,
OrcFile.WriterVersion writerVersion,
boolean useUTCTimestamp,
boolean writerUsedProlepticGregorian,
boolean convertToProlepticGregorian) {
this.writerVersion = writerVersion;
this.sarg = sarg;
sargLeaves = sarg.getLeaves();
this.writerUsedProlepticGregorian = writerUsedProlepticGregorian;
this.convertToProlepticGregorian = convertToProlepticGregorian;
filterColumns = mapSargColumnsToOrcInternalColIdx(sargLeaves,
evolution);
this.rowIndexStride = rowIndexStride;
// included will not be null, row options will fill the array with
// trues if null
sargColumns = new boolean[evolution.getFileIncluded().length];
for (int i : filterColumns) {
// filter columns may have -1 as index which could be partition
// column in SARG.
if (i > 0) {
sargColumns[i] = true;
}
}
this.evolution = evolution;
exceptionCount = new long[sargLeaves.size()];
this.useUTCTimestamp = useUTCTimestamp;
}
/**
* Pick the row groups that we need to load from the current stripe.
*
* @return an array with a boolean for each row group or null if all of the
* row groups must be read.
* @throws IOException
*/
public boolean[] pickRowGroups(StripeInformation stripe,
OrcProto.RowIndex[] indexes,
OrcProto.Stream.Kind[] bloomFilterKinds,
List<OrcProto.ColumnEncoding> encodings,
OrcProto.BloomFilterIndex[] bloomFilterIndices,
boolean returnNone) throws IOException {
long rowsInStripe = stripe.getNumberOfRows();
int groupsInStripe = (int) ((rowsInStripe + rowIndexStride - 1) / rowIndexStride);
boolean[] result = new boolean[groupsInStripe]; // TODO: avoid alloc?
TruthValue[] leafValues = new TruthValue[sargLeaves.size()];
boolean hasSelected = false;
boolean hasSkipped = false;
TruthValue[] exceptionAnswer = new TruthValue[leafValues.length];
for (int rowGroup = 0; rowGroup < result.length; ++rowGroup) {
for (int pred = 0; pred < leafValues.length; ++pred) {
int columnIx = filterColumns[pred];
if (columnIx == -1) {
// the column is a virtual column
leafValues[pred] = TruthValue.YES_NO_NULL;
} else if (exceptionAnswer[pred] != null) {
leafValues[pred] = exceptionAnswer[pred];
} else {
if (indexes[columnIx] == null) {
throw new AssertionError("Index is not populated for " + columnIx);
}
OrcProto.RowIndexEntry entry = indexes[columnIx].getEntry(rowGroup);
if (entry == null) {
throw new AssertionError("RG is not populated for " + columnIx + " rg " + rowGroup);
}
OrcProto.ColumnStatistics stats = entry.getStatistics();
OrcProto.BloomFilter bf = null;
OrcProto.Stream.Kind bfk = null;
if (bloomFilterIndices != null && bloomFilterIndices[columnIx] != null) {
bfk = bloomFilterKinds[columnIx];
bf = bloomFilterIndices[columnIx].getBloomFilter(rowGroup);
}
if (evolution != null && evolution.isPPDSafeConversion(columnIx)) {
PredicateLeaf predicate = sargLeaves.get(pred);
try {
leafValues[pred] = evaluatePredicateProto(stats,
predicate, bfk, encodings.get(columnIx), bf,
writerVersion, evolution.getFileSchema().
findSubtype(columnIx),
useUTCTimestamp);
} catch (Exception e) {
exceptionCount[pred] += 1;
if (e instanceof SargCastException) {
LOG.info("Skipping ORC PPD - " + e.getMessage() + " on "
+ predicate);
} else {
if (LOG.isWarnEnabled()) {
final String reason = e.getClass().getSimpleName() + " when evaluating predicate." +
" Skipping ORC PPD." +
" Stats: " + stats +
" Predicate: " + predicate;
LOG.warn(reason, e);
}
}
boolean hasNoNull = stats.hasHasNull() && !stats.getHasNull();
if (predicate.getOperator().equals(PredicateLeaf.Operator.NULL_SAFE_EQUALS)
|| hasNoNull) {
exceptionAnswer[pred] = TruthValue.YES_NO;
} else {
exceptionAnswer[pred] = TruthValue.YES_NO_NULL;
}
leafValues[pred] = exceptionAnswer[pred];
}
} else {
leafValues[pred] = TruthValue.YES_NO_NULL;
}
if (LOG.isTraceEnabled()) {
LOG.trace("Stats = " + stats);
LOG.trace("Setting " + sargLeaves.get(pred) + " to " + leafValues[pred]);
}
}
}
result[rowGroup] = sarg.evaluate(leafValues).isNeeded();
hasSelected = hasSelected || result[rowGroup];
hasSkipped = hasSkipped || (!result[rowGroup]);
if (LOG.isDebugEnabled()) {
LOG.debug("Row group " + (rowIndexStride * rowGroup) + " to " +
(rowIndexStride * (rowGroup + 1) - 1) + " is " +
(result[rowGroup] ? "" : "not ") + "included.");
}
}
return hasSkipped ? ((hasSelected || !returnNone) ? result : READ_NO_RGS) : READ_ALL_RGS;
}
/**
* Get the count of exceptions for testing.
* @return
*/
long[] getExceptionCount() {
return exceptionCount;
}
}
/**
* Pick the row groups that we need to load from the current stripe.
*
* @return an array with a boolean for each row group or null if all of the
* row groups must be read.
* @throws IOException
*/
protected boolean[] pickRowGroups() throws IOException {
// if we don't have a sarg or indexes, we read everything
if (sargApp == null) {
return null;
}
readRowIndex(currentStripe, fileIncluded, sargApp.sargColumns);
return sargApp.pickRowGroups(stripes.get(currentStripe),
indexes.getRowGroupIndex(),
indexes.getBloomFilterKinds(), stripeFooter.getColumnsList(),
indexes.getBloomFilterIndex(), false);
}
private void clearStreams() {
planner.clearStreams();
}
/**
* Read the current stripe into memory.
*
* @throws IOException
*/
private void readStripe() throws IOException {
StripeInformation stripe = beginReadStripe();
planner.parseStripe(stripe, fileIncluded);
includedRowGroups = pickRowGroups();
// move forward to the first unskipped row
if (includedRowGroups != null) {
while (rowInStripe < rowCountInStripe &&
!includedRowGroups[(int) (rowInStripe / rowIndexStride)]) {
rowInStripe = Math.min(rowCountInStripe, rowInStripe + rowIndexStride);
}
}
// if we haven't skipped the whole stripe, read the data
if (rowInStripe < rowCountInStripe) {
planner.readData(indexes, includedRowGroups, false);
reader.startStripe(planner);
// if we skipped the first row group, move the pointers forward
if (rowInStripe != 0) {
seekToRowEntry(reader, (int) (rowInStripe / rowIndexStride));
}
}
}
private StripeInformation beginReadStripe() throws IOException {
StripeInformation stripe = stripes.get(currentStripe);
stripeFooter = readStripeFooter(stripe);
clearStreams();
// setup the position in the stripe
rowCountInStripe = stripe.getNumberOfRows();
rowInStripe = 0;
rowBaseInStripe = 0;
for (int i = 0; i < currentStripe; ++i) {
rowBaseInStripe += stripes.get(i).getNumberOfRows();
}
// reset all of the indexes
OrcProto.RowIndex[] rowIndex = indexes.getRowGroupIndex();
for (int i = 0; i < rowIndex.length; ++i) {
rowIndex[i] = null;
}
return stripe;
}
/**
* Read the next stripe until we find a row that we don't skip.
*
* @throws IOException
*/
private void advanceStripe() throws IOException {
rowInStripe = rowCountInStripe;
while (rowInStripe >= rowCountInStripe &&
currentStripe < stripes.size() - 1) {
currentStripe += 1;
readStripe();
}
}
/**
* Skip over rows that we aren't selecting, so that the next row is
* one that we will read.
*
* @param nextRow the row we want to go to
* @throws IOException
*/
private boolean advanceToNextRow(
BatchReader reader, long nextRow, boolean canAdvanceStripe)
throws IOException {
long nextRowInStripe = nextRow - rowBaseInStripe;
// check for row skipping
if (rowIndexStride != 0 &&
includedRowGroups != null &&
nextRowInStripe < rowCountInStripe) {
int rowGroup = (int) (nextRowInStripe / rowIndexStride);
if (!includedRowGroups[rowGroup]) {
while (rowGroup < includedRowGroups.length && !includedRowGroups[rowGroup]) {
rowGroup += 1;
}
if (rowGroup >= includedRowGroups.length) {
if (canAdvanceStripe) {
advanceStripe();
}
return canAdvanceStripe;
}
nextRowInStripe = Math.min(rowCountInStripe, rowGroup * rowIndexStride);
}
}
if (nextRowInStripe >= rowCountInStripe) {
if (canAdvanceStripe) {
advanceStripe();
}
return canAdvanceStripe;
}
if (nextRowInStripe != rowInStripe) {
if (rowIndexStride != 0) {
int rowGroup = (int) (nextRowInStripe / rowIndexStride);
seekToRowEntry(reader, rowGroup);
reader.skipRows(nextRowInStripe - rowGroup * rowIndexStride);
} else {
reader.skipRows(nextRowInStripe - rowInStripe);
}
rowInStripe = nextRowInStripe;
}
return true;
}
@Override
public boolean nextBatch(VectorizedRowBatch batch) throws IOException {
try {
if (rowInStripe >= rowCountInStripe) {
currentStripe += 1;
if (currentStripe >= stripes.size()) {
batch.size = 0;
return false;
}
readStripe();
}
int batchSize = computeBatchSize(batch.getMaxSize());
rowInStripe += batchSize;
reader.setVectorColumnCount(batch.getDataColumnCount());
reader.nextBatch(batch, batchSize);
advanceToNextRow(reader, rowInStripe + rowBaseInStripe, true);
return batch.size != 0;
} catch (IOException e) {
// Rethrow exception with file name in log message
throw new IOException("Error reading file: " + path, e);
}
}
private int computeBatchSize(long targetBatchSize) {
final int batchSize;
// In case of PPD, batch size should be aware of row group boundaries. If only a subset of row
// groups are selected then marker position is set to the end of range (subset of row groups
// within strip). Batch size computed out of marker position makes sure that batch size is
// aware of row group boundary and will not cause overflow when reading rows
// illustration of this case is here https://issues.apache.org/jira/browse/HIVE-6287
if (rowIndexStride != 0 && includedRowGroups != null && rowInStripe < rowCountInStripe) {
int startRowGroup = (int) (rowInStripe / rowIndexStride);
if (!includedRowGroups[startRowGroup]) {
while (startRowGroup < includedRowGroups.length && !includedRowGroups[startRowGroup]) {
startRowGroup += 1;
}
}
int endRowGroup = startRowGroup;
while (endRowGroup < includedRowGroups.length && includedRowGroups[endRowGroup]) {
endRowGroup += 1;
}
final long markerPosition =
(endRowGroup * rowIndexStride) < rowCountInStripe ? (endRowGroup * rowIndexStride)
: rowCountInStripe;
batchSize = (int) Math.min(targetBatchSize, (markerPosition - rowInStripe));
if (isLogDebugEnabled && batchSize < targetBatchSize) {
LOG.debug("markerPosition: " + markerPosition + " batchSize: " + batchSize);
}
} else {
batchSize = (int) Math.min(targetBatchSize, (rowCountInStripe - rowInStripe));
}
return batchSize;
}
@Override
public void close() throws IOException {
clearStreams();
dataReader.close();
}
@Override
public long getRowNumber() {
return rowInStripe + rowBaseInStripe + firstRow;
}
/**
* Return the fraction of rows that have been read from the selected.
* section of the file
*
* @return fraction between 0.0 and 1.0 of rows consumed
*/
@Override
public float getProgress() {
return ((float) rowBaseInStripe + rowInStripe) / totalRowCount;
}
private int findStripe(long rowNumber) {
for (int i = 0; i < stripes.size(); i++) {
StripeInformation stripe = stripes.get(i);
if (stripe.getNumberOfRows() > rowNumber) {
return i;
}
rowNumber -= stripe.getNumberOfRows();
}
throw new IllegalArgumentException("Seek after the end of reader range");
}
public OrcIndex readRowIndex(int stripeIndex, boolean[] included,
boolean[] sargColumns) throws IOException {
// if this is the current stripe, use the cached objects.
if (stripeIndex == currentStripe) {
return planner.readRowIndex(
sargColumns != null || sargApp == null
? sargColumns : sargApp.sargColumns, indexes);
} else {
StripePlanner copy = new StripePlanner(planner);
if (included == null) {
included = new boolean[schema.getMaximumId() + 1];
Arrays.fill(included, true);
}
copy.parseStripe(stripes.get(stripeIndex), included);
return copy.readRowIndex(sargColumns, null);
}
}
private void seekToRowEntry(BatchReader reader, int rowEntry)
throws IOException {
OrcProto.RowIndex[] rowIndices = indexes.getRowGroupIndex();
PositionProvider[] index = new PositionProvider[rowIndices.length];
for (int i = 0; i < index.length; ++i) {
if (rowIndices[i] != null) {
OrcProto.RowIndexEntry entry = rowIndices[i].getEntry(rowEntry);
// This is effectively a test for pre-ORC-569 files.
if (rowEntry == 0 && entry.getPositionsCount() == 0) {
index[i] = new ZeroPositionProvider();
} else {
index[i] = new PositionProviderImpl(entry);
}
}
}
reader.seek(index);
}
@Override
public void seekToRow(long rowNumber) throws IOException {
if (rowNumber < 0) {
throw new IllegalArgumentException("Seek to a negative row number " +
rowNumber);
} else if (rowNumber < firstRow) {
throw new IllegalArgumentException("Seek before reader range " +
rowNumber);
}
// convert to our internal form (rows from the beginning of slice)
rowNumber -= firstRow;
// move to the right stripe
int rightStripe = findStripe(rowNumber);
if (rightStripe != currentStripe) {
currentStripe = rightStripe;
readStripe();
}
readRowIndex(currentStripe, fileIncluded, sargApp == null ? null : sargApp.sargColumns);
// if we aren't to the right row yet, advance in the stripe.
advanceToNextRow(reader, rowNumber, true);
}
private static final String TRANSLATED_SARG_SEPARATOR = "_";
public static String encodeTranslatedSargColumn(int rootColumn, Integer indexInSourceTable) {
return rootColumn + TRANSLATED_SARG_SEPARATOR
+ ((indexInSourceTable == null) ? -1 : indexInSourceTable);
}
public static int[] mapTranslatedSargColumns(
List<OrcProto.Type> types, List<PredicateLeaf> sargLeaves) {
int[] result = new int[sargLeaves.size()];
OrcProto.Type lastRoot = null; // Root will be the same for everyone as of now.
String lastRootStr = null;
for (int i = 0; i < result.length; ++i) {
String[] rootAndIndex = sargLeaves.get(i).getColumnName().split(TRANSLATED_SARG_SEPARATOR);
assert rootAndIndex.length == 2;
String rootStr = rootAndIndex[0], indexStr = rootAndIndex[1];
int index = Integer.parseInt(indexStr);
// First, check if the column even maps to anything.
if (index == -1) {
result[i] = -1;
continue;
}
assert index >= 0;
// Then, find the root type if needed.
if (!rootStr.equals(lastRootStr)) {
lastRoot = types.get(Integer.parseInt(rootStr));
lastRootStr = rootStr;
}
// Subtypes of the root types correspond, in order, to the columns in the table schema
// (disregarding schema evolution that doesn't presently work). Get the index for the
// corresponding subtype.
result[i] = lastRoot.getSubtypes(index);
}
return result;
}
public CompressionCodec getCompressionCodec() {
return dataReader.getCompressionOptions().getCodec();
}
public int getMaxDiskRangeChunkLimit() {
return maxDiskRangeChunkLimit;
}
}