blob: 7aa6e713ab9c60cfa0ca84a94eff5a37b99cf2c8 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.orc.impl;
import java.io.IOException;
import java.math.BigDecimal;
import java.sql.Date;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.orc.BooleanColumnStatistics;
import org.apache.orc.Reader;
import org.apache.orc.RecordReader;
import org.apache.orc.TypeDescription;
import org.apache.orc.ColumnStatistics;
import org.apache.orc.CompressionCodec;
import org.apache.orc.DataReader;
import org.apache.orc.DateColumnStatistics;
import org.apache.orc.DecimalColumnStatistics;
import org.apache.orc.DoubleColumnStatistics;
import org.apache.orc.IntegerColumnStatistics;
import org.apache.orc.OrcConf;
import org.apache.orc.StringColumnStatistics;
import org.apache.orc.StripeInformation;
import org.apache.orc.TimestampColumnStatistics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.io.DiskRange;
import org.apache.hadoop.hive.common.io.DiskRangeList;
import org.apache.hadoop.hive.common.io.DiskRangeList.CreateHelper;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.orc.BloomFilterIO;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
import org.apache.hadoop.hive.serde2.io.DateWritable;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.hadoop.hive.ql.util.TimestampUtils;
import org.apache.hadoop.io.Text;
import org.apache.orc.OrcProto;
public class RecordReaderImpl implements RecordReader {
static final Logger LOG = LoggerFactory.getLogger(RecordReaderImpl.class);
private static final boolean isLogDebugEnabled = LOG.isDebugEnabled();
private static final Object UNKNOWN_VALUE = new Object();
protected final Path path;
private final long firstRow;
private final List<StripeInformation> stripes =
new ArrayList<StripeInformation>();
private OrcProto.StripeFooter stripeFooter;
private final long totalRowCount;
private final CompressionCodec codec;
protected final TypeDescription schema;
private final List<OrcProto.Type> types;
private final int bufferSize;
private final SchemaEvolution evolution;
private final boolean[] included;
private final long rowIndexStride;
private long rowInStripe = 0;
private int currentStripe = -1;
private long rowBaseInStripe = 0;
private long rowCountInStripe = 0;
private final Map<StreamName, InStream> streams =
new HashMap<StreamName, InStream>();
DiskRangeList bufferChunks = null;
private final TreeReaderFactory.TreeReader reader;
private final OrcProto.RowIndex[] indexes;
private final OrcProto.BloomFilterIndex[] bloomFilterIndices;
private final SargApplier sargApp;
// an array about which row groups aren't skipped
private boolean[] includedRowGroups = null;
private final DataReader dataReader;
/**
* Given a list of column names, find the given column and return the index.
*
* @param columnNames the list of potential column names
* @param columnName the column name to look for
* @param rootColumn offset the result with the rootColumn
* @return the column number or -1 if the column wasn't found
*/
static int findColumns(String[] columnNames,
String columnName,
int rootColumn) {
for(int i=0; i < columnNames.length; ++i) {
if (columnName.equals(columnNames[i])) {
return i + rootColumn;
}
}
return -1;
}
/**
* Find the mapping from predicate leaves to columns.
* @param sargLeaves the search argument that we need to map
* @param columnNames the names of the columns
* @param rootColumn the offset of the top level row, which offsets the
* result
* @return an array mapping the sarg leaves to concrete column numbers
*/
public static int[] mapSargColumnsToOrcInternalColIdx(List<PredicateLeaf> sargLeaves,
String[] columnNames,
int rootColumn) {
int[] result = new int[sargLeaves.size()];
Arrays.fill(result, -1);
for(int i=0; i < result.length; ++i) {
String colName = sargLeaves.get(i).getColumnName();
result[i] = findColumns(columnNames, colName, rootColumn);
}
return result;
}
protected RecordReaderImpl(ReaderImpl fileReader,
Reader.Options options) throws IOException {
this.included = options.getInclude();
included[0] = true;
if (options.getSchema() == null) {
if (LOG.isInfoEnabled()) {
LOG.info("Reader schema not provided -- using file schema " +
fileReader.getSchema());
}
evolution = new SchemaEvolution(fileReader.getSchema(), included);
} else {
// Now that we are creating a record reader for a file, validate that
// the schema to read is compatible with the file schema.
//
evolution = new SchemaEvolution(fileReader.getSchema(),
options.getSchema(),included);
if (LOG.isDebugEnabled() && evolution.hasConversion()) {
LOG.debug("ORC file " + fileReader.path.toString() +
" has data type conversion --\n" +
"reader schema: " + options.getSchema().toString() + "\n" +
"file schema: " + fileReader.getSchema());
}
}
this.schema = evolution.getReaderSchema();
this.path = fileReader.path;
this.codec = fileReader.codec;
this.types = fileReader.types;
this.bufferSize = fileReader.bufferSize;
this.rowIndexStride = fileReader.rowIndexStride;
SearchArgument sarg = options.getSearchArgument();
if (sarg != null && rowIndexStride != 0) {
sargApp = new SargApplier(sarg, options.getColumnNames(), rowIndexStride,
included.length, evolution);
} else {
sargApp = null;
}
long rows = 0;
long skippedRows = 0;
long offset = options.getOffset();
long maxOffset = options.getMaxOffset();
for(StripeInformation stripe: fileReader.getStripes()) {
long stripeStart = stripe.getOffset();
if (offset > stripeStart) {
skippedRows += stripe.getNumberOfRows();
} else if (stripeStart < maxOffset) {
this.stripes.add(stripe);
rows += stripe.getNumberOfRows();
}
}
Boolean zeroCopy = options.getUseZeroCopy();
if (zeroCopy == null) {
zeroCopy = OrcConf.USE_ZEROCOPY.getBoolean(fileReader.conf);
}
if (options.getDataReader() != null) {
this.dataReader = options.getDataReader();
} else {
this.dataReader = RecordReaderUtils.createDefaultDataReader(
DataReaderProperties.builder()
.withBufferSize(bufferSize)
.withCompression(fileReader.compressionKind)
.withFileSystem(fileReader.fileSystem)
.withPath(fileReader.path)
.withTypeCount(types.size())
.withZeroCopy(zeroCopy)
.build());
}
this.dataReader.open();
firstRow = skippedRows;
totalRowCount = rows;
Boolean skipCorrupt = options.getSkipCorruptRecords();
if (skipCorrupt == null) {
skipCorrupt = OrcConf.SKIP_CORRUPT_DATA.getBoolean(fileReader.conf);
}
reader = TreeReaderFactory.createTreeReader(evolution.getReaderSchema(),
evolution, included, skipCorrupt);
indexes = new OrcProto.RowIndex[types.size()];
bloomFilterIndices = new OrcProto.BloomFilterIndex[types.size()];
advanceToNextRow(reader, 0L, true);
}
public static final class PositionProviderImpl implements PositionProvider {
private final OrcProto.RowIndexEntry entry;
private int index;
public PositionProviderImpl(OrcProto.RowIndexEntry entry) {
this(entry, 0);
}
public PositionProviderImpl(OrcProto.RowIndexEntry entry, int startPos) {
this.entry = entry;
this.index = startPos;
}
@Override
public long getNext() {
return entry.getPositions(index++);
}
}
public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe
) throws IOException {
return dataReader.readStripeFooter(stripe);
}
enum Location {
BEFORE, MIN, MIDDLE, MAX, AFTER
}
/**
* Given a point and min and max, determine if the point is before, at the
* min, in the middle, at the max, or after the range.
* @param point the point to test
* @param min the minimum point
* @param max the maximum point
* @param <T> the type of the comparision
* @return the location of the point
*/
static <T> Location compareToRange(Comparable<T> point, T min, T max) {
int minCompare = point.compareTo(min);
if (minCompare < 0) {
return Location.BEFORE;
} else if (minCompare == 0) {
return Location.MIN;
}
int maxCompare = point.compareTo(max);
if (maxCompare > 0) {
return Location.AFTER;
} else if (maxCompare == 0) {
return Location.MAX;
}
return Location.MIDDLE;
}
/**
* Get the maximum value out of an index entry.
* @param index
* the index entry
* @return the object for the maximum value or null if there isn't one
*/
static Object getMax(ColumnStatistics index) {
if (index instanceof IntegerColumnStatistics) {
return ((IntegerColumnStatistics) index).getMaximum();
} else if (index instanceof DoubleColumnStatistics) {
return ((DoubleColumnStatistics) index).getMaximum();
} else if (index instanceof StringColumnStatistics) {
return ((StringColumnStatistics) index).getMaximum();
} else if (index instanceof DateColumnStatistics) {
return ((DateColumnStatistics) index).getMaximum();
} else if (index instanceof DecimalColumnStatistics) {
return ((DecimalColumnStatistics) index).getMaximum();
} else if (index instanceof TimestampColumnStatistics) {
return ((TimestampColumnStatistics) index).getMaximum();
} else if (index instanceof BooleanColumnStatistics) {
if (((BooleanColumnStatistics)index).getTrueCount()!=0) {
return Boolean.TRUE;
} else {
return Boolean.FALSE;
}
} else {
return null;
}
}
/**
* Get the minimum value out of an index entry.
* @param index
* the index entry
* @return the object for the minimum value or null if there isn't one
*/
static Object getMin(ColumnStatistics index) {
if (index instanceof IntegerColumnStatistics) {
return ((IntegerColumnStatistics) index).getMinimum();
} else if (index instanceof DoubleColumnStatistics) {
return ((DoubleColumnStatistics) index).getMinimum();
} else if (index instanceof StringColumnStatistics) {
return ((StringColumnStatistics) index).getMinimum();
} else if (index instanceof DateColumnStatistics) {
return ((DateColumnStatistics) index).getMinimum();
} else if (index instanceof DecimalColumnStatistics) {
return ((DecimalColumnStatistics) index).getMinimum();
} else if (index instanceof TimestampColumnStatistics) {
return ((TimestampColumnStatistics) index).getMinimum();
} else if (index instanceof BooleanColumnStatistics) {
if (((BooleanColumnStatistics)index).getFalseCount()!=0) {
return Boolean.FALSE;
} else {
return Boolean.TRUE;
}
} else {
return UNKNOWN_VALUE; // null is not safe here
}
}
/**
* Evaluate a predicate with respect to the statistics from the column
* that is referenced in the predicate.
* @param statsProto the statistics for the column mentioned in the predicate
* @param predicate the leaf predicate we need to evaluation
* @param bloomFilter
* @return the set of truth values that may be returned for the given
* predicate.
*/
static TruthValue evaluatePredicateProto(OrcProto.ColumnStatistics statsProto,
PredicateLeaf predicate, OrcProto.BloomFilter bloomFilter) {
ColumnStatistics cs = ColumnStatisticsImpl.deserialize(statsProto);
Object minValue = getMin(cs);
Object maxValue = getMax(cs);
BloomFilterIO bf = null;
if (bloomFilter != null) {
bf = new BloomFilterIO(bloomFilter);
}
return evaluatePredicateRange(predicate, minValue, maxValue, cs.hasNull(), bf);
}
/**
* Evaluate a predicate with respect to the statistics from the column
* that is referenced in the predicate.
* @param stats the statistics for the column mentioned in the predicate
* @param predicate the leaf predicate we need to evaluation
* @return the set of truth values that may be returned for the given
* predicate.
*/
public static TruthValue evaluatePredicate(ColumnStatistics stats,
PredicateLeaf predicate,
BloomFilterIO bloomFilter) {
Object minValue = getMin(stats);
Object maxValue = getMax(stats);
return evaluatePredicateRange(predicate, minValue, maxValue, stats.hasNull(), bloomFilter);
}
static TruthValue evaluatePredicateRange(PredicateLeaf predicate, Object min,
Object max, boolean hasNull, BloomFilterIO bloomFilter) {
// if we didn't have any values, everything must have been null
if (min == null) {
if (predicate.getOperator() == PredicateLeaf.Operator.IS_NULL) {
return TruthValue.YES;
} else {
return TruthValue.NULL;
}
} else if (min == UNKNOWN_VALUE) {
return TruthValue.YES_NO_NULL;
}
TruthValue result;
Object baseObj = predicate.getLiteral();
try {
// Predicate object and stats objects are converted to the type of the predicate object.
Object minValue = getBaseObjectForComparison(predicate.getType(), min);
Object maxValue = getBaseObjectForComparison(predicate.getType(), max);
Object predObj = getBaseObjectForComparison(predicate.getType(), baseObj);
result = evaluatePredicateMinMax(predicate, predObj, minValue, maxValue, hasNull);
if (shouldEvaluateBloomFilter(predicate, result, bloomFilter)) {
result = evaluatePredicateBloomFilter(predicate, predObj, bloomFilter, hasNull);
}
// in case failed conversion, return the default YES_NO_NULL truth value
} catch (Exception e) {
if (LOG.isWarnEnabled()) {
final String statsType = min == null ?
(max == null ? "null" : max.getClass().getSimpleName()) :
min.getClass().getSimpleName();
final String predicateType = baseObj == null ? "null" : baseObj.getClass().getSimpleName();
final String reason = e.getClass().getSimpleName() + " when evaluating predicate." +
" Skipping ORC PPD." +
" Exception: " + e.getMessage() +
" StatsType: " + statsType +
" PredicateType: " + predicateType;
LOG.warn(reason);
LOG.debug(reason, e);
}
if (predicate.getOperator().equals(PredicateLeaf.Operator.NULL_SAFE_EQUALS) || !hasNull) {
result = TruthValue.YES_NO;
} else {
result = TruthValue.YES_NO_NULL;
}
}
return result;
}
private static boolean shouldEvaluateBloomFilter(PredicateLeaf predicate,
TruthValue result, BloomFilterIO bloomFilter) {
// evaluate bloom filter only when
// 1) Bloom filter is available
// 2) Min/Max evaluation yield YES or MAYBE
// 3) Predicate is EQUALS or IN list
if (bloomFilter != null
&& result != TruthValue.NO_NULL && result != TruthValue.NO
&& (predicate.getOperator().equals(PredicateLeaf.Operator.EQUALS)
|| predicate.getOperator().equals(PredicateLeaf.Operator.NULL_SAFE_EQUALS)
|| predicate.getOperator().equals(PredicateLeaf.Operator.IN))) {
return true;
}
return false;
}
private static TruthValue evaluatePredicateMinMax(PredicateLeaf predicate, Object predObj,
Object minValue,
Object maxValue,
boolean hasNull) {
Location loc;
switch (predicate.getOperator()) {
case NULL_SAFE_EQUALS:
loc = compareToRange((Comparable) predObj, minValue, maxValue);
if (loc == Location.BEFORE || loc == Location.AFTER) {
return TruthValue.NO;
} else {
return TruthValue.YES_NO;
}
case EQUALS:
loc = compareToRange((Comparable) predObj, minValue, maxValue);
if (minValue.equals(maxValue) && loc == Location.MIN) {
return hasNull ? TruthValue.YES_NULL : TruthValue.YES;
} else if (loc == Location.BEFORE || loc == Location.AFTER) {
return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
} else {
return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
}
case LESS_THAN:
loc = compareToRange((Comparable) predObj, minValue, maxValue);
if (loc == Location.AFTER) {
return hasNull ? TruthValue.YES_NULL : TruthValue.YES;
} else if (loc == Location.BEFORE || loc == Location.MIN) {
return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
} else {
return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
}
case LESS_THAN_EQUALS:
loc = compareToRange((Comparable) predObj, minValue, maxValue);
if (loc == Location.AFTER || loc == Location.MAX) {
return hasNull ? TruthValue.YES_NULL : TruthValue.YES;
} else if (loc == Location.BEFORE) {
return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
} else {
return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
}
case IN:
if (minValue.equals(maxValue)) {
// for a single value, look through to see if that value is in the
// set
for (Object arg : predicate.getLiteralList()) {
predObj = getBaseObjectForComparison(predicate.getType(), arg);
loc = compareToRange((Comparable) predObj, minValue, maxValue);
if (loc == Location.MIN) {
return hasNull ? TruthValue.YES_NULL : TruthValue.YES;
}
}
return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
} else {
// are all of the values outside of the range?
for (Object arg : predicate.getLiteralList()) {
predObj = getBaseObjectForComparison(predicate.getType(), arg);
loc = compareToRange((Comparable) predObj, minValue, maxValue);
if (loc == Location.MIN || loc == Location.MIDDLE ||
loc == Location.MAX) {
return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
}
}
return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
}
case BETWEEN:
List<Object> args = predicate.getLiteralList();
Object predObj1 = getBaseObjectForComparison(predicate.getType(), args.get(0));
loc = compareToRange((Comparable) predObj1, minValue, maxValue);
if (loc == Location.BEFORE || loc == Location.MIN) {
Object predObj2 = getBaseObjectForComparison(predicate.getType(), args.get(1));
Location loc2 = compareToRange((Comparable) predObj2, minValue, maxValue);
if (loc2 == Location.AFTER || loc2 == Location.MAX) {
return hasNull ? TruthValue.YES_NULL : TruthValue.YES;
} else if (loc2 == Location.BEFORE) {
return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
} else {
return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
}
} else if (loc == Location.AFTER) {
return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
} else {
return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
}
case IS_NULL:
// min = null condition above handles the all-nulls YES case
return hasNull ? TruthValue.YES_NO : TruthValue.NO;
default:
return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
}
}
private static TruthValue evaluatePredicateBloomFilter(PredicateLeaf predicate,
final Object predObj, BloomFilterIO bloomFilter, boolean hasNull) {
switch (predicate.getOperator()) {
case NULL_SAFE_EQUALS:
// null safe equals does not return *_NULL variant. So set hasNull to false
return checkInBloomFilter(bloomFilter, predObj, false);
case EQUALS:
return checkInBloomFilter(bloomFilter, predObj, hasNull);
case IN:
for (Object arg : predicate.getLiteralList()) {
// if atleast one value in IN list exist in bloom filter, qualify the row group/stripe
Object predObjItem = getBaseObjectForComparison(predicate.getType(), arg);
TruthValue result = checkInBloomFilter(bloomFilter, predObjItem, hasNull);
if (result == TruthValue.YES_NO_NULL || result == TruthValue.YES_NO) {
return result;
}
}
return hasNull ? TruthValue.NO_NULL : TruthValue.NO;
default:
return hasNull ? TruthValue.YES_NO_NULL : TruthValue.YES_NO;
}
}
private static TruthValue checkInBloomFilter(BloomFilterIO bf, Object predObj, boolean hasNull) {
TruthValue result = hasNull ? TruthValue.NO_NULL : TruthValue.NO;
if (predObj instanceof Long) {
if (bf.testLong(((Long) predObj).longValue())) {
result = TruthValue.YES_NO_NULL;
}
} else if (predObj instanceof Double) {
if (bf.testDouble(((Double) predObj).doubleValue())) {
result = TruthValue.YES_NO_NULL;
}
} else if (predObj instanceof String || predObj instanceof Text ||
predObj instanceof HiveDecimalWritable ||
predObj instanceof BigDecimal) {
if (bf.testString(predObj.toString())) {
result = TruthValue.YES_NO_NULL;
}
} else if (predObj instanceof Timestamp) {
if (bf.testLong(((Timestamp) predObj).getTime())) {
result = TruthValue.YES_NO_NULL;
}
} else if (predObj instanceof Date) {
if (bf.testLong(DateWritable.dateToDays((Date) predObj))) {
result = TruthValue.YES_NO_NULL;
}
} else {
// if the predicate object is null and if hasNull says there are no nulls then return NO
if (predObj == null && !hasNull) {
result = TruthValue.NO;
} else {
result = TruthValue.YES_NO_NULL;
}
}
if (result == TruthValue.YES_NO_NULL && !hasNull) {
result = TruthValue.YES_NO;
}
if (LOG.isDebugEnabled()) {
LOG.debug("Bloom filter evaluation: " + result.toString());
}
return result;
}
private static Object getBaseObjectForComparison(PredicateLeaf.Type type, Object obj) {
if (obj == null) {
return null;
}
switch (type) {
case BOOLEAN:
if (obj instanceof Boolean) {
return obj;
} else {
// will only be true if the string conversion yields "true", all other values are
// considered false
return Boolean.valueOf(obj.toString());
}
case DATE:
if (obj instanceof Date) {
return obj;
} else if (obj instanceof String) {
return Date.valueOf((String) obj);
} else if (obj instanceof Timestamp) {
return DateWritable.timeToDate(((Timestamp) obj).getTime() / 1000L);
}
// always string, but prevent the comparison to numbers (are they days/seconds/milliseconds?)
break;
case DECIMAL:
if (obj instanceof Boolean) {
return new HiveDecimalWritable(((Boolean) obj).booleanValue() ?
HiveDecimal.ONE : HiveDecimal.ZERO);
} else if (obj instanceof Integer) {
return new HiveDecimalWritable(((Integer) obj).intValue());
} else if (obj instanceof Long) {
return new HiveDecimalWritable(((Long) obj));
} else if (obj instanceof Float || obj instanceof Double ||
obj instanceof String) {
return new HiveDecimalWritable(obj.toString());
} else if (obj instanceof BigDecimal) {
return new HiveDecimalWritable(HiveDecimal.create((BigDecimal) obj));
} else if (obj instanceof HiveDecimal) {
return new HiveDecimalWritable((HiveDecimal) obj);
} else if (obj instanceof HiveDecimalWritable) {
return obj;
} else if (obj instanceof Timestamp) {
return new HiveDecimalWritable(Double.toString(
TimestampUtils.getDouble((Timestamp) obj)));
}
break;
case FLOAT:
if (obj instanceof Number) {
// widening conversion
return ((Number) obj).doubleValue();
} else if (obj instanceof HiveDecimal) {
return ((HiveDecimal) obj).doubleValue();
} else if (obj instanceof String) {
return Double.valueOf(obj.toString());
} else if (obj instanceof Timestamp) {
return TimestampUtils.getDouble((Timestamp) obj);
} else if (obj instanceof HiveDecimal) {
return ((HiveDecimal) obj).doubleValue();
} else if (obj instanceof BigDecimal) {
return ((BigDecimal) obj).doubleValue();
}
break;
case LONG:
if (obj instanceof Number) {
// widening conversion
return ((Number) obj).longValue();
} else if (obj instanceof HiveDecimal) {
return ((HiveDecimal) obj).longValue();
} else if (obj instanceof String) {
return Long.valueOf(obj.toString());
}
break;
case STRING:
if (obj != null) {
return (obj.toString());
}
break;
case TIMESTAMP:
if (obj instanceof Timestamp) {
return obj;
} else if (obj instanceof Integer) {
return new Timestamp(((Number) obj).longValue());
} else if (obj instanceof Float) {
return TimestampUtils.doubleToTimestamp(((Float) obj).doubleValue());
} else if (obj instanceof Double) {
return TimestampUtils.doubleToTimestamp(((Double) obj).doubleValue());
} else if (obj instanceof HiveDecimal) {
return TimestampUtils.decimalToTimestamp((HiveDecimal) obj);
} else if (obj instanceof HiveDecimalWritable) {
return TimestampUtils.decimalToTimestamp(((HiveDecimalWritable) obj).getHiveDecimal());
} else if (obj instanceof Date) {
return new Timestamp(((Date) obj).getTime());
}
// float/double conversion to timestamp is interpreted as seconds whereas integer conversion
// to timestamp is interpreted as milliseconds by default. The integer to timestamp casting
// is also config driven. The filter operator changes its promotion based on config:
// "int.timestamp.conversion.in.seconds". Disable PPD for integer cases.
break;
default:
break;
}
throw new IllegalArgumentException(String.format(
"ORC SARGS could not convert from %s to %s", obj == null ? "(null)" : obj.getClass()
.getSimpleName(), type));
}
public static class SargApplier {
public final static boolean[] READ_ALL_RGS = null;
public final static boolean[] READ_NO_RGS = new boolean[0];
private final SearchArgument sarg;
private final List<PredicateLeaf> sargLeaves;
private final int[] filterColumns;
private final long rowIndexStride;
// same as the above array, but indices are set to true
private final boolean[] sargColumns;
private SchemaEvolution evolution;
public SargApplier(SearchArgument sarg, String[] columnNames, long rowIndexStride,
int includedCount, final SchemaEvolution evolution) {
this.sarg = sarg;
sargLeaves = sarg.getLeaves();
filterColumns = mapSargColumnsToOrcInternalColIdx(sargLeaves, columnNames, 0);
this.rowIndexStride = rowIndexStride;
// included will not be null, row options will fill the array with trues if null
sargColumns = new boolean[includedCount];
for (int i : filterColumns) {
// filter columns may have -1 as index which could be partition column in SARG.
if (i > 0) {
sargColumns[i] = true;
}
}
this.evolution = evolution;
}
/**
* Pick the row groups that we need to load from the current stripe.
*
* @return an array with a boolean for each row group or null if all of the
* row groups must be read.
* @throws IOException
*/
public boolean[] pickRowGroups(StripeInformation stripe, OrcProto.RowIndex[] indexes,
OrcProto.BloomFilterIndex[] bloomFilterIndices, boolean returnNone) throws IOException {
long rowsInStripe = stripe.getNumberOfRows();
int groupsInStripe = (int) ((rowsInStripe + rowIndexStride - 1) / rowIndexStride);
boolean[] result = new boolean[groupsInStripe]; // TODO: avoid alloc?
TruthValue[] leafValues = new TruthValue[sargLeaves.size()];
boolean hasSelected = false, hasSkipped = false;
for (int rowGroup = 0; rowGroup < result.length; ++rowGroup) {
for (int pred = 0; pred < leafValues.length; ++pred) {
int columnIx = filterColumns[pred];
if (columnIx != -1) {
if (indexes[columnIx] == null) {
throw new AssertionError("Index is not populated for " + columnIx);
}
OrcProto.RowIndexEntry entry = indexes[columnIx].getEntry(rowGroup);
if (entry == null) {
throw new AssertionError("RG is not populated for " + columnIx + " rg " + rowGroup);
}
OrcProto.ColumnStatistics stats = entry.getStatistics();
OrcProto.BloomFilter bf = null;
if (bloomFilterIndices != null && bloomFilterIndices[columnIx] != null) {
bf = bloomFilterIndices[columnIx].getBloomFilter(rowGroup);
}
if (evolution != null && evolution.isPPDSafeConversion(columnIx)) {
leafValues[pred] = evaluatePredicateProto(stats, sargLeaves.get(pred), bf);
} else {
leafValues[pred] = TruthValue.YES_NO_NULL;
}
if (LOG.isTraceEnabled()) {
LOG.trace("Stats = " + stats);
LOG.trace("Setting " + sargLeaves.get(pred) + " to " + leafValues[pred]);
}
} else {
// the column is a virtual column
leafValues[pred] = TruthValue.YES_NO_NULL;
}
}
result[rowGroup] = sarg.evaluate(leafValues).isNeeded();
hasSelected = hasSelected || result[rowGroup];
hasSkipped = hasSkipped || (!result[rowGroup]);
if (LOG.isDebugEnabled()) {
LOG.debug("Row group " + (rowIndexStride * rowGroup) + " to " +
(rowIndexStride * (rowGroup + 1) - 1) + " is " +
(result[rowGroup] ? "" : "not ") + "included.");
}
}
return hasSkipped ? ((hasSelected || !returnNone) ? result : READ_NO_RGS) : READ_ALL_RGS;
}
}
/**
* Pick the row groups that we need to load from the current stripe.
*
* @return an array with a boolean for each row group or null if all of the
* row groups must be read.
* @throws IOException
*/
protected boolean[] pickRowGroups() throws IOException {
// if we don't have a sarg or indexes, we read everything
if (sargApp == null) {
return null;
}
readRowIndex(currentStripe, included, sargApp.sargColumns);
return sargApp.pickRowGroups(stripes.get(currentStripe), indexes, bloomFilterIndices, false);
}
private void clearStreams() {
// explicit close of all streams to de-ref ByteBuffers
for (InStream is : streams.values()) {
is.close();
}
if (bufferChunks != null) {
if (dataReader.isTrackingDiskRanges()) {
for (DiskRangeList range = bufferChunks; range != null; range = range.next) {
if (!(range instanceof BufferChunk)) {
continue;
}
dataReader.releaseBuffer(((BufferChunk) range).getChunk());
}
}
}
bufferChunks = null;
streams.clear();
}
/**
* Read the current stripe into memory.
*
* @throws IOException
*/
private void readStripe() throws IOException {
StripeInformation stripe = beginReadStripe();
includedRowGroups = pickRowGroups();
// move forward to the first unskipped row
if (includedRowGroups != null) {
while (rowInStripe < rowCountInStripe &&
!includedRowGroups[(int) (rowInStripe / rowIndexStride)]) {
rowInStripe = Math.min(rowCountInStripe, rowInStripe + rowIndexStride);
}
}
// if we haven't skipped the whole stripe, read the data
if (rowInStripe < rowCountInStripe) {
// if we aren't projecting columns or filtering rows, just read it all
if (included == null && includedRowGroups == null) {
readAllDataStreams(stripe);
} else {
readPartialDataStreams(stripe);
}
reader.startStripe(streams, stripeFooter);
// if we skipped the first row group, move the pointers forward
if (rowInStripe != 0) {
seekToRowEntry(reader, (int) (rowInStripe / rowIndexStride));
}
}
}
private StripeInformation beginReadStripe() throws IOException {
StripeInformation stripe = stripes.get(currentStripe);
stripeFooter = readStripeFooter(stripe);
clearStreams();
// setup the position in the stripe
rowCountInStripe = stripe.getNumberOfRows();
rowInStripe = 0;
rowBaseInStripe = 0;
for (int i = 0; i < currentStripe; ++i) {
rowBaseInStripe += stripes.get(i).getNumberOfRows();
}
// reset all of the indexes
for (int i = 0; i < indexes.length; ++i) {
indexes[i] = null;
}
return stripe;
}
private void readAllDataStreams(StripeInformation stripe) throws IOException {
long start = stripe.getIndexLength();
long end = start + stripe.getDataLength();
// explicitly trigger 1 big read
DiskRangeList toRead = new DiskRangeList(start, end);
bufferChunks = dataReader.readFileData(toRead, stripe.getOffset(), false);
List<OrcProto.Stream> streamDescriptions = stripeFooter.getStreamsList();
createStreams(streamDescriptions, bufferChunks, null, codec, bufferSize, streams);
}
/**
* Plan the ranges of the file that we need to read given the list of
* columns and row groups.
*
* @param streamList the list of streams available
* @param indexes the indexes that have been loaded
* @param includedColumns which columns are needed
* @param includedRowGroups which row groups are needed
* @param isCompressed does the file have generic compression
* @param encodings the encodings for each column
* @param types the types of the columns
* @param compressionSize the compression block size
* @return the list of disk ranges that will be loaded
*/
static DiskRangeList planReadPartialDataStreams
(List<OrcProto.Stream> streamList,
OrcProto.RowIndex[] indexes,
boolean[] includedColumns,
boolean[] includedRowGroups,
boolean isCompressed,
List<OrcProto.ColumnEncoding> encodings,
List<OrcProto.Type> types,
int compressionSize,
boolean doMergeBuffers) {
long offset = 0;
// figure out which columns have a present stream
boolean[] hasNull = RecordReaderUtils.findPresentStreamsByColumn(streamList, types);
CreateHelper list = new CreateHelper();
for (OrcProto.Stream stream : streamList) {
long length = stream.getLength();
int column = stream.getColumn();
OrcProto.Stream.Kind streamKind = stream.getKind();
// since stream kind is optional, first check if it exists
if (stream.hasKind() &&
(StreamName.getArea(streamKind) == StreamName.Area.DATA) &&
(column < includedColumns.length && includedColumns[column])) {
// if we aren't filtering or it is a dictionary, load it.
if (includedRowGroups == null
|| RecordReaderUtils.isDictionary(streamKind, encodings.get(column))) {
RecordReaderUtils.addEntireStreamToRanges(offset, length, list, doMergeBuffers);
} else {
RecordReaderUtils.addRgFilteredStreamToRanges(stream, includedRowGroups,
isCompressed, indexes[column], encodings.get(column), types.get(column),
compressionSize, hasNull[column], offset, length, list, doMergeBuffers);
}
}
offset += length;
}
return list.extract();
}
void createStreams(List<OrcProto.Stream> streamDescriptions,
DiskRangeList ranges,
boolean[] includeColumn,
CompressionCodec codec,
int bufferSize,
Map<StreamName, InStream> streams) throws IOException {
long streamOffset = 0;
for (OrcProto.Stream streamDesc : streamDescriptions) {
int column = streamDesc.getColumn();
if ((includeColumn != null &&
(column < included.length && !includeColumn[column])) ||
streamDesc.hasKind() &&
(StreamName.getArea(streamDesc.getKind()) != StreamName.Area.DATA)) {
streamOffset += streamDesc.getLength();
continue;
}
List<DiskRange> buffers = RecordReaderUtils.getStreamBuffers(
ranges, streamOffset, streamDesc.getLength());
StreamName name = new StreamName(column, streamDesc.getKind());
streams.put(name, InStream.create(name.toString(), buffers,
streamDesc.getLength(), codec, bufferSize));
streamOffset += streamDesc.getLength();
}
}
private void readPartialDataStreams(StripeInformation stripe) throws IOException {
List<OrcProto.Stream> streamList = stripeFooter.getStreamsList();
DiskRangeList toRead = planReadPartialDataStreams(streamList,
indexes, included, includedRowGroups, codec != null,
stripeFooter.getColumnsList(), types, bufferSize, true);
if (LOG.isDebugEnabled()) {
LOG.debug("chunks = " + RecordReaderUtils.stringifyDiskRanges(toRead));
}
bufferChunks = dataReader.readFileData(toRead, stripe.getOffset(), false);
if (LOG.isDebugEnabled()) {
LOG.debug("merge = " + RecordReaderUtils.stringifyDiskRanges(bufferChunks));
}
createStreams(streamList, bufferChunks, included, codec, bufferSize, streams);
}
/**
* Read the next stripe until we find a row that we don't skip.
*
* @throws IOException
*/
private void advanceStripe() throws IOException {
rowInStripe = rowCountInStripe;
while (rowInStripe >= rowCountInStripe &&
currentStripe < stripes.size() - 1) {
currentStripe += 1;
readStripe();
}
}
/**
* Skip over rows that we aren't selecting, so that the next row is
* one that we will read.
*
* @param nextRow the row we want to go to
* @throws IOException
*/
private boolean advanceToNextRow(
TreeReaderFactory.TreeReader reader, long nextRow, boolean canAdvanceStripe)
throws IOException {
long nextRowInStripe = nextRow - rowBaseInStripe;
// check for row skipping
if (rowIndexStride != 0 &&
includedRowGroups != null &&
nextRowInStripe < rowCountInStripe) {
int rowGroup = (int) (nextRowInStripe / rowIndexStride);
if (!includedRowGroups[rowGroup]) {
while (rowGroup < includedRowGroups.length && !includedRowGroups[rowGroup]) {
rowGroup += 1;
}
if (rowGroup >= includedRowGroups.length) {
if (canAdvanceStripe) {
advanceStripe();
}
return canAdvanceStripe;
}
nextRowInStripe = Math.min(rowCountInStripe, rowGroup * rowIndexStride);
}
}
if (nextRowInStripe >= rowCountInStripe) {
if (canAdvanceStripe) {
advanceStripe();
}
return canAdvanceStripe;
}
if (nextRowInStripe != rowInStripe) {
if (rowIndexStride != 0) {
int rowGroup = (int) (nextRowInStripe / rowIndexStride);
seekToRowEntry(reader, rowGroup);
reader.skipRows(nextRowInStripe - rowGroup * rowIndexStride);
} else {
reader.skipRows(nextRowInStripe - rowInStripe);
}
rowInStripe = nextRowInStripe;
}
return true;
}
@Override
public boolean nextBatch(VectorizedRowBatch batch) throws IOException {
try {
if (rowInStripe >= rowCountInStripe) {
currentStripe += 1;
if (currentStripe >= stripes.size()) {
batch.size = 0;
return false;
}
readStripe();
}
int batchSize = computeBatchSize(batch.getMaxSize());
rowInStripe += batchSize;
reader.setVectorColumnCount(batch.getDataColumnCount());
reader.nextBatch(batch, batchSize);
batch.selectedInUse = false;
batch.size = batchSize;
advanceToNextRow(reader, rowInStripe + rowBaseInStripe, true);
return batch.size != 0;
} catch (IOException e) {
// Rethrow exception with file name in log message
throw new IOException("Error reading file: " + path, e);
}
}
private int computeBatchSize(long targetBatchSize) {
final int batchSize;
// In case of PPD, batch size should be aware of row group boundaries. If only a subset of row
// groups are selected then marker position is set to the end of range (subset of row groups
// within strip). Batch size computed out of marker position makes sure that batch size is
// aware of row group boundary and will not cause overflow when reading rows
// illustration of this case is here https://issues.apache.org/jira/browse/HIVE-6287
if (rowIndexStride != 0 && includedRowGroups != null && rowInStripe < rowCountInStripe) {
int startRowGroup = (int) (rowInStripe / rowIndexStride);
if (!includedRowGroups[startRowGroup]) {
while (startRowGroup < includedRowGroups.length && !includedRowGroups[startRowGroup]) {
startRowGroup += 1;
}
}
int endRowGroup = startRowGroup;
while (endRowGroup < includedRowGroups.length && includedRowGroups[endRowGroup]) {
endRowGroup += 1;
}
final long markerPosition =
(endRowGroup * rowIndexStride) < rowCountInStripe ? (endRowGroup * rowIndexStride)
: rowCountInStripe;
batchSize = (int) Math.min(targetBatchSize, (markerPosition - rowInStripe));
if (isLogDebugEnabled && batchSize < targetBatchSize) {
LOG.debug("markerPosition: " + markerPosition + " batchSize: " + batchSize);
}
} else {
batchSize = (int) Math.min(targetBatchSize, (rowCountInStripe - rowInStripe));
}
return batchSize;
}
@Override
public void close() throws IOException {
clearStreams();
dataReader.close();
}
@Override
public long getRowNumber() {
return rowInStripe + rowBaseInStripe + firstRow;
}
/**
* Return the fraction of rows that have been read from the selected.
* section of the file
*
* @return fraction between 0.0 and 1.0 of rows consumed
*/
@Override
public float getProgress() {
return ((float) rowBaseInStripe + rowInStripe) / totalRowCount;
}
private int findStripe(long rowNumber) {
for (int i = 0; i < stripes.size(); i++) {
StripeInformation stripe = stripes.get(i);
if (stripe.getNumberOfRows() > rowNumber) {
return i;
}
rowNumber -= stripe.getNumberOfRows();
}
throw new IllegalArgumentException("Seek after the end of reader range");
}
public OrcIndex readRowIndex(int stripeIndex, boolean[] included,
boolean[] sargColumns) throws IOException {
return readRowIndex(stripeIndex, included, null, null, sargColumns);
}
public OrcIndex readRowIndex(int stripeIndex, boolean[] included,
OrcProto.RowIndex[] indexes,
OrcProto.BloomFilterIndex[] bloomFilterIndex,
boolean[] sargColumns) throws IOException {
StripeInformation stripe = stripes.get(stripeIndex);
OrcProto.StripeFooter stripeFooter = null;
// if this is the current stripe, use the cached objects.
if (stripeIndex == currentStripe) {
stripeFooter = this.stripeFooter;
indexes = indexes == null ? this.indexes : indexes;
bloomFilterIndex = bloomFilterIndex == null ? this.bloomFilterIndices : bloomFilterIndex;
sargColumns = sargColumns == null ?
(sargApp == null ? null : sargApp.sargColumns) : sargColumns;
}
return dataReader.readRowIndex(stripe, stripeFooter, included, indexes, sargColumns,
bloomFilterIndex);
}
private void seekToRowEntry(TreeReaderFactory.TreeReader reader, int rowEntry)
throws IOException {
PositionProvider[] index = new PositionProvider[indexes.length];
for (int i = 0; i < indexes.length; ++i) {
if (indexes[i] != null) {
index[i] = new PositionProviderImpl(indexes[i].getEntry(rowEntry));
}
}
reader.seek(index);
}
@Override
public void seekToRow(long rowNumber) throws IOException {
if (rowNumber < 0) {
throw new IllegalArgumentException("Seek to a negative row number " +
rowNumber);
} else if (rowNumber < firstRow) {
throw new IllegalArgumentException("Seek before reader range " +
rowNumber);
}
// convert to our internal form (rows from the beginning of slice)
rowNumber -= firstRow;
// move to the right stripe
int rightStripe = findStripe(rowNumber);
if (rightStripe != currentStripe) {
currentStripe = rightStripe;
readStripe();
}
readRowIndex(currentStripe, included, sargApp == null ? null : sargApp.sargColumns);
// if we aren't to the right row yet, advance in the stripe.
advanceToNextRow(reader, rowNumber, true);
}
private static final String TRANSLATED_SARG_SEPARATOR = "_";
public static String encodeTranslatedSargColumn(int rootColumn, Integer indexInSourceTable) {
return rootColumn + TRANSLATED_SARG_SEPARATOR
+ ((indexInSourceTable == null) ? -1 : indexInSourceTable);
}
public static int[] mapTranslatedSargColumns(
List<OrcProto.Type> types, List<PredicateLeaf> sargLeaves) {
int[] result = new int[sargLeaves.size()];
OrcProto.Type lastRoot = null; // Root will be the same for everyone as of now.
String lastRootStr = null;
for (int i = 0; i < result.length; ++i) {
String[] rootAndIndex = sargLeaves.get(i).getColumnName().split(TRANSLATED_SARG_SEPARATOR);
assert rootAndIndex.length == 2;
String rootStr = rootAndIndex[0], indexStr = rootAndIndex[1];
int index = Integer.parseInt(indexStr);
// First, check if the column even maps to anything.
if (index == -1) {
result[i] = -1;
continue;
}
assert index >= 0;
// Then, find the root type if needed.
if (!rootStr.equals(lastRootStr)) {
lastRoot = types.get(Integer.parseInt(rootStr));
lastRootStr = rootStr;
}
// Subtypes of the root types correspond, in order, to the columns in the table schema
// (disregarding schema evolution that doesn't presently work). Get the index for the
// corresponding subtype.
result[i] = lastRoot.getSubtypes(index);
}
return result;
}
}