blob: d17de686aa629cc0cae6997ecc59164eb376877a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.orc.impl;
import java.io.EOFException;
import java.io.IOException;
import java.text.DateFormat;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TimeZone;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DateColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.Decimal64ColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.StructColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.TimestampColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.UnionColumnVector;
import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.orc.OrcProto;
import org.apache.orc.impl.reader.ReaderEncryption;
import org.apache.orc.impl.reader.StripePlanner;
import org.apache.orc.impl.reader.tree.BatchReader;
import org.apache.orc.impl.reader.tree.PrimitiveBatchReader;
import org.apache.orc.impl.reader.tree.StructBatchReader;
import org.apache.orc.impl.reader.tree.TypeReader;
import org.apache.orc.impl.writer.TimestampTreeWriter;
/**
* Factory for creating ORC tree readers.
*/
public class TreeReaderFactory {
public interface Context {
SchemaEvolution getSchemaEvolution();
boolean isSkipCorrupt();
boolean getUseUTCTimestamp();
String getWriterTimezone();
OrcFile.Version getFileFormat();
ReaderEncryption getEncryption();
boolean useProlepticGregorian();
boolean fileUsedProlepticGregorian();
}
public static class ReaderContext implements Context {
private SchemaEvolution evolution;
private boolean skipCorrupt = false;
private boolean useUTCTimestamp = false;
private String writerTimezone;
private OrcFile.Version fileFormat;
private ReaderEncryption encryption;
private boolean useProlepticGregorian;
private boolean fileUsedProlepticGregorian;
public ReaderContext setSchemaEvolution(SchemaEvolution evolution) {
this.evolution = evolution;
return this;
}
public ReaderContext setEncryption(ReaderEncryption value) {
encryption = value;
return this;
}
public ReaderContext skipCorrupt(boolean skipCorrupt) {
this.skipCorrupt = skipCorrupt;
return this;
}
public ReaderContext useUTCTimestamp(boolean useUTCTimestamp) {
this.useUTCTimestamp = useUTCTimestamp;
return this;
}
public ReaderContext writerTimeZone(String writerTimezone) {
this.writerTimezone = writerTimezone;
return this;
}
public ReaderContext fileFormat(OrcFile.Version version) {
this.fileFormat = version;
return this;
}
public ReaderContext setProlepticGregorian(boolean file,
boolean reader) {
this.useProlepticGregorian = reader;
this.fileUsedProlepticGregorian = file;
return this;
}
@Override
public SchemaEvolution getSchemaEvolution() {
return evolution;
}
@Override
public boolean isSkipCorrupt() {
return skipCorrupt;
}
@Override
public boolean getUseUTCTimestamp() {
return useUTCTimestamp;
}
@Override
public String getWriterTimezone() {
return writerTimezone;
}
@Override
public OrcFile.Version getFileFormat() {
return fileFormat;
}
@Override
public ReaderEncryption getEncryption() {
return encryption;
}
@Override
public boolean useProlepticGregorian() {
return useProlepticGregorian;
}
@Override
public boolean fileUsedProlepticGregorian() {
return fileUsedProlepticGregorian;
}
}
public abstract static class TreeReader implements TypeReader {
protected final int columnId;
protected BitFieldReader present = null;
protected final Context context;
static final long[] powerOfTenTable = {
1L, // 0
10L,
100L,
1_000L,
10_000L,
100_000L,
1_000_000L,
10_000_000L,
100_000_000L, // 8
1_000_000_000L,
10_000_000_000L,
100_000_000_000L,
1_000_000_000_000L,
10_000_000_000_000L,
100_000_000_000_000L,
1_000_000_000_000_000L,
10_000_000_000_000_000L, // 16
100_000_000_000_000_000L,
1_000_000_000_000_000_000L, // 18
};
TreeReader(int columnId, Context context) throws IOException {
this(columnId, null, context);
}
protected TreeReader(int columnId, InStream in, Context context) throws IOException {
this.columnId = columnId;
this.context = context;
if (in == null) {
present = null;
} else {
present = new BitFieldReader(in);
}
}
public void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) {
throw new IOException("Unknown encoding " + encoding + " in column " +
columnId);
}
}
protected static IntegerReader createIntegerReader(OrcProto.ColumnEncoding.Kind kind,
InStream in,
boolean signed,
Context context) throws IOException {
switch (kind) {
case DIRECT_V2:
case DICTIONARY_V2:
return new RunLengthIntegerReaderV2(in, signed, context == null ? false : context.isSkipCorrupt());
case DIRECT:
case DICTIONARY:
return new RunLengthIntegerReader(in, signed);
default:
throw new IllegalArgumentException("Unknown encoding " + kind);
}
}
public void startStripe(StripePlanner planner) throws IOException {
checkEncoding(planner.getEncoding(columnId));
InStream in = planner.getStream(new StreamName(columnId,
OrcProto.Stream.Kind.PRESENT));
if (in == null) {
present = null;
} else {
present = new BitFieldReader(in);
}
}
/**
* Seek to the given position.
*
* @param index the indexes loaded from the file
* @throws IOException
*/
public void seek(PositionProvider[] index) throws IOException {
seek(index[columnId]);
}
public void seek(PositionProvider index) throws IOException {
if (present != null) {
present.seek(index);
}
}
protected long countNonNulls(long rows) throws IOException {
if (present != null) {
long result = 0;
for (long c = 0; c < rows; ++c) {
if (present.next() == 1) {
result += 1;
}
}
return result;
} else {
return rows;
}
}
/**
* Populates the isNull vector array in the previousVector object based on
* the present stream values. This function is called from all the child
* readers, and they all set the values based on isNull field value.
*
* @param previous The columnVector object whose isNull value is populated
* @param isNull Whether the each value was null at a higher level. If
* isNull is null, all values are non-null.
* @param batchSize Size of the column vector
* @throws IOException
*/
public void nextVector(ColumnVector previous,
boolean[] isNull,
final int batchSize) throws IOException {
if (present != null || isNull != null) {
// Set noNulls and isNull vector of the ColumnVector based on
// present stream
previous.noNulls = true;
boolean allNull = true;
for (int i = 0; i < batchSize; i++) {
if (isNull == null || !isNull[i]) {
if (present != null && present.next() != 1) {
previous.noNulls = false;
previous.isNull[i] = true;
} else {
previous.isNull[i] = false;
allNull = false;
}
} else {
previous.noNulls = false;
previous.isNull[i] = true;
}
}
previous.isRepeating = !previous.noNulls && allNull;
} else {
// There is no present stream, this means that all the values are
// present.
previous.noNulls = true;
for (int i = 0; i < batchSize; i++) {
previous.isNull[i] = false;
}
}
}
public BitFieldReader getPresent() {
return present;
}
@Override
public int getColumnId() {
return columnId;
}
}
public static class NullTreeReader extends TreeReader {
public NullTreeReader(int columnId) throws IOException {
super(columnId, null);
}
@Override
public void startStripe(StripePlanner planner) {
// PASS
}
@Override
public void skipRows(long rows) {
// PASS
}
@Override
public void seek(PositionProvider position) {
// PASS
}
@Override
public void seek(PositionProvider[] position) {
// PASS
}
@Override
public void nextVector(ColumnVector vector, boolean[] isNull, int size) {
vector.noNulls = false;
vector.isNull[0] = true;
vector.isRepeating = true;
}
}
public static class BooleanTreeReader extends TreeReader {
protected BitFieldReader reader = null;
BooleanTreeReader(int columnId) throws IOException {
this(columnId, null, null);
}
protected BooleanTreeReader(int columnId, InStream present, InStream data) throws IOException {
super(columnId, present, null);
if (data != null) {
reader = new BitFieldReader(data);
}
}
@Override
public void startStripe(StripePlanner planner) throws IOException {
super.startStripe(planner);
reader = new BitFieldReader(planner.getStream(new StreamName(columnId,
OrcProto.Stream.Kind.DATA)));
}
@Override
public void seek(PositionProvider[] index) throws IOException {
seek(index[columnId]);
}
@Override
public void seek(PositionProvider index) throws IOException {
super.seek(index);
reader.seek(index);
}
@Override
public void skipRows(long items) throws IOException {
reader.skip(countNonNulls(items));
}
@Override
public void nextVector(ColumnVector previousVector,
boolean[] isNull,
final int batchSize) throws IOException {
LongColumnVector result = (LongColumnVector) previousVector;
// Read present/isNull stream
super.nextVector(result, isNull, batchSize);
// Read value entries based on isNull entries
reader.nextVector(result, batchSize);
}
}
public static class ByteTreeReader extends TreeReader {
protected RunLengthByteReader reader = null;
ByteTreeReader(int columnId) throws IOException {
this(columnId, null, null);
}
protected ByteTreeReader(int columnId, InStream present, InStream data) throws IOException {
super(columnId, present, null);
this.reader = new RunLengthByteReader(data);
}
@Override
public void startStripe(StripePlanner planner) throws IOException {
super.startStripe(planner);
reader = new RunLengthByteReader(planner.getStream(new StreamName(columnId,
OrcProto.Stream.Kind.DATA)));
}
@Override
public void seek(PositionProvider[] index) throws IOException {
seek(index[columnId]);
}
@Override
public void seek(PositionProvider index) throws IOException {
super.seek(index);
reader.seek(index);
}
@Override
public void nextVector(ColumnVector previousVector,
boolean[] isNull,
final int batchSize) throws IOException {
final LongColumnVector result = (LongColumnVector) previousVector;
// Read present/isNull stream
super.nextVector(result, isNull, batchSize);
// Read value entries based on isNull entries
reader.nextVector(result, result.vector, batchSize);
}
@Override
public void skipRows(long items) throws IOException {
reader.skip(countNonNulls(items));
}
}
public static class ShortTreeReader extends TreeReader {
protected IntegerReader reader = null;
ShortTreeReader(int columnId, Context context) throws IOException {
this(columnId, null, null, null, context);
}
protected ShortTreeReader(int columnId, InStream present, InStream data,
OrcProto.ColumnEncoding encoding, Context context)
throws IOException {
super(columnId, present, context);
if (data != null && encoding != null) {
checkEncoding(encoding);
this.reader = createIntegerReader(encoding.getKind(), data, true, context);
}
}
@Override
public void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
(encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
throw new IOException("Unknown encoding " + encoding + " in column " +
columnId);
}
}
@Override
public void startStripe(StripePlanner planner) throws IOException {
super.startStripe(planner);
StreamName name = new StreamName(columnId,
OrcProto.Stream.Kind.DATA);
reader = createIntegerReader(planner.getEncoding(columnId).getKind(),
planner.getStream(name), true, context);
}
@Override
public void seek(PositionProvider[] index) throws IOException {
seek(index[columnId]);
}
@Override
public void seek(PositionProvider index) throws IOException {
super.seek(index);
reader.seek(index);
}
@Override
public void nextVector(ColumnVector previousVector,
boolean[] isNull,
final int batchSize) throws IOException {
final LongColumnVector result = (LongColumnVector) previousVector;
// Read present/isNull stream
super.nextVector(result, isNull, batchSize);
// Read value entries based on isNull entries
reader.nextVector(result, result.vector, batchSize);
}
@Override
public void skipRows(long items) throws IOException {
reader.skip(countNonNulls(items));
}
}
public static class IntTreeReader extends TreeReader {
protected IntegerReader reader = null;
IntTreeReader(int columnId, Context context) throws IOException {
this(columnId, null, null, null, context);
}
protected IntTreeReader(int columnId, InStream present, InStream data,
OrcProto.ColumnEncoding encoding, Context context)
throws IOException {
super(columnId, present, context);
if (data != null && encoding != null) {
checkEncoding(encoding);
this.reader = createIntegerReader(encoding.getKind(), data, true, context);
}
}
@Override
public void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
(encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
throw new IOException("Unknown encoding " + encoding + " in column " +
columnId);
}
}
@Override
public void startStripe(StripePlanner planner) throws IOException {
super.startStripe(planner);
StreamName name = new StreamName(columnId,
OrcProto.Stream.Kind.DATA);
reader = createIntegerReader(planner.getEncoding(columnId).getKind(),
planner.getStream(name), true, context);
}
@Override
public void seek(PositionProvider[] index) throws IOException {
seek(index[columnId]);
}
@Override
public void seek(PositionProvider index) throws IOException {
super.seek(index);
reader.seek(index);
}
@Override
public void nextVector(ColumnVector previousVector,
boolean[] isNull,
final int batchSize) throws IOException {
final LongColumnVector result = (LongColumnVector) previousVector;
// Read present/isNull stream
super.nextVector(result, isNull, batchSize);
// Read value entries based on isNull entries
reader.nextVector(result, result.vector, batchSize);
}
@Override
public void skipRows(long items) throws IOException {
reader.skip(countNonNulls(items));
}
}
public static class LongTreeReader extends TreeReader {
protected IntegerReader reader = null;
LongTreeReader(int columnId, Context context) throws IOException {
this(columnId, null, null, null, context);
}
protected LongTreeReader(int columnId, InStream present, InStream data,
OrcProto.ColumnEncoding encoding,
Context context)
throws IOException {
super(columnId, present, context);
if (data != null && encoding != null) {
checkEncoding(encoding);
this.reader = createIntegerReader(encoding.getKind(), data, true, context);
}
}
@Override
public void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
(encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
throw new IOException("Unknown encoding " + encoding + " in column " +
columnId);
}
}
@Override
public void startStripe(StripePlanner planner) throws IOException {
super.startStripe(planner);
StreamName name = new StreamName(columnId,
OrcProto.Stream.Kind.DATA);
reader = createIntegerReader(planner.getEncoding(columnId).getKind(),
planner.getStream(name), true, context);
}
@Override
public void seek(PositionProvider[] index) throws IOException {
seek(index[columnId]);
}
@Override
public void seek(PositionProvider index) throws IOException {
super.seek(index);
reader.seek(index);
}
@Override
public void nextVector(ColumnVector previousVector,
boolean[] isNull,
final int batchSize) throws IOException {
final LongColumnVector result = (LongColumnVector) previousVector;
// Read present/isNull stream
super.nextVector(result, isNull, batchSize);
// Read value entries based on isNull entries
reader.nextVector(result, result.vector, batchSize);
}
@Override
public void skipRows(long items) throws IOException {
reader.skip(countNonNulls(items));
}
}
public static class FloatTreeReader extends TreeReader {
protected InStream stream;
private final SerializationUtils utils;
FloatTreeReader(int columnId) throws IOException {
this(columnId, null, null);
}
protected FloatTreeReader(int columnId, InStream present, InStream data) throws IOException {
super(columnId, present, null);
this.utils = new SerializationUtils();
this.stream = data;
}
@Override
public void startStripe(StripePlanner planner) throws IOException {
super.startStripe(planner);
StreamName name = new StreamName(columnId,
OrcProto.Stream.Kind.DATA);
stream = planner.getStream(name);
}
@Override
public void seek(PositionProvider[] index) throws IOException {
seek(index[columnId]);
}
@Override
public void seek(PositionProvider index) throws IOException {
super.seek(index);
stream.seek(index);
}
@Override
public void nextVector(ColumnVector previousVector,
boolean[] isNull,
final int batchSize) throws IOException {
final DoubleColumnVector result = (DoubleColumnVector) previousVector;
// Read present/isNull stream
super.nextVector(result, isNull, batchSize);
final boolean hasNulls = !result.noNulls;
boolean allNulls = hasNulls;
if (batchSize > 0) {
if (hasNulls) {
// conditions to ensure bounds checks skips
for (int i = 0; batchSize <= result.isNull.length && i < batchSize; i++) {
allNulls = allNulls & result.isNull[i];
}
if (allNulls) {
result.vector[0] = Double.NaN;
result.isRepeating = true;
} else {
// some nulls
result.isRepeating = false;
// conditions to ensure bounds checks skips
for (int i = 0; batchSize <= result.isNull.length
&& batchSize <= result.vector.length && i < batchSize; i++) {
if (!result.isNull[i]) {
result.vector[i] = utils.readFloat(stream);
} else {
// If the value is not present then set NaN
result.vector[i] = Double.NaN;
}
}
}
} else {
// no nulls & > 1 row (check repeating)
boolean repeating = (batchSize > 1);
final float f1 = utils.readFloat(stream);
result.vector[0] = f1;
// conditions to ensure bounds checks skips
for (int i = 1; i < batchSize && batchSize <= result.vector.length; i++) {
final float f2 = utils.readFloat(stream);
repeating = repeating && (f1 == f2);
result.vector[i] = f2;
}
result.isRepeating = repeating;
}
}
}
@Override
public void skipRows(long items) throws IOException {
items = countNonNulls(items);
for (int i = 0; i < items; ++i) {
utils.readFloat(stream);
}
}
}
public static class DoubleTreeReader extends TreeReader {
protected InStream stream;
private final SerializationUtils utils;
DoubleTreeReader(int columnId) throws IOException {
this(columnId, null, null);
}
protected DoubleTreeReader(int columnId, InStream present, InStream data) throws IOException {
super(columnId, present, null);
this.utils = new SerializationUtils();
this.stream = data;
}
@Override
public void startStripe(StripePlanner planner) throws IOException {
super.startStripe(planner);
StreamName name =
new StreamName(columnId,
OrcProto.Stream.Kind.DATA);
stream = planner.getStream(name);
}
@Override
public void seek(PositionProvider[] index) throws IOException {
seek(index[columnId]);
}
@Override
public void seek(PositionProvider index) throws IOException {
super.seek(index);
stream.seek(index);
}
@Override
public void nextVector(ColumnVector previousVector,
boolean[] isNull,
final int batchSize) throws IOException {
final DoubleColumnVector result = (DoubleColumnVector) previousVector;
// Read present/isNull stream
super.nextVector(result, isNull, batchSize);
final boolean hasNulls = !result.noNulls;
boolean allNulls = hasNulls;
if (batchSize != 0) {
if (hasNulls) {
// conditions to ensure bounds checks skips
for (int i = 0; i < batchSize && batchSize <= result.isNull.length; i++) {
allNulls = allNulls & result.isNull[i];
}
if (allNulls) {
result.vector[0] = Double.NaN;
result.isRepeating = true;
} else {
// some nulls
result.isRepeating = false;
// conditions to ensure bounds checks skips
for (int i = 0; batchSize <= result.isNull.length
&& batchSize <= result.vector.length && i < batchSize; i++) {
if (!result.isNull[i]) {
result.vector[i] = utils.readDouble(stream);
} else {
// If the value is not present then set NaN
result.vector[i] = Double.NaN;
}
}
}
} else {
// no nulls
boolean repeating = (batchSize > 1);
final double d1 = utils.readDouble(stream);
result.vector[0] = d1;
// conditions to ensure bounds checks skips
for (int i = 1; i < batchSize && batchSize <= result.vector.length; i++) {
final double d2 = utils.readDouble(stream);
repeating = repeating && (d1 == d2);
result.vector[i] = d2;
}
result.isRepeating = repeating;
}
}
}
@Override
public void skipRows(long items) throws IOException {
items = countNonNulls(items);
long len = items * 8;
while (len > 0) {
len -= stream.skip(len);
}
}
}
public static class BinaryTreeReader extends TreeReader {
protected InStream stream;
protected IntegerReader lengths = null;
protected final LongColumnVector scratchlcv;
BinaryTreeReader(int columnId, Context context) throws IOException {
this(columnId, null, null, null, null, context);
}
protected BinaryTreeReader(int columnId, InStream present, InStream data, InStream length,
OrcProto.ColumnEncoding encoding, Context context) throws IOException {
super(columnId, present, context);
scratchlcv = new LongColumnVector();
this.stream = data;
if (length != null && encoding != null) {
checkEncoding(encoding);
this.lengths = createIntegerReader(encoding.getKind(), length, false, context);
}
}
@Override
public void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
(encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
throw new IOException("Unknown encoding " + encoding + " in column " +
columnId);
}
}
@Override
public void startStripe(StripePlanner planner) throws IOException {
super.startStripe(planner);
StreamName name = new StreamName(columnId,
OrcProto.Stream.Kind.DATA);
stream = planner.getStream(name);
lengths = createIntegerReader(planner.getEncoding(columnId).getKind(),
planner.getStream(new StreamName(columnId, OrcProto.Stream.Kind.LENGTH)), false, context);
}
@Override
public void seek(PositionProvider[] index) throws IOException {
seek(index[columnId]);
}
@Override
public void seek(PositionProvider index) throws IOException {
super.seek(index);
stream.seek(index);
lengths.seek(index);
}
@Override
public void nextVector(ColumnVector previousVector,
boolean[] isNull,
final int batchSize) throws IOException {
final BytesColumnVector result = (BytesColumnVector) previousVector;
// Read present/isNull stream
super.nextVector(result, isNull, batchSize);
scratchlcv.ensureSize(batchSize, false);
BytesColumnVectorUtil.readOrcByteArrays(stream, lengths, scratchlcv, result, batchSize);
}
@Override
public void skipRows(long items) throws IOException {
items = countNonNulls(items);
long lengthToSkip = 0;
for (int i = 0; i < items; ++i) {
lengthToSkip += lengths.next();
}
while (lengthToSkip > 0) {
lengthToSkip -= stream.skip(lengthToSkip);
}
}
}
public static class TimestampTreeReader extends TreeReader {
protected IntegerReader data = null;
protected IntegerReader nanos = null;
private Map<String, Long> baseTimestampMap;
protected long base_timestamp;
private final TimeZone readerTimeZone;
private final boolean instantType;
private TimeZone writerTimeZone;
private boolean hasSameTZRules;
private ThreadLocal<DateFormat> threadLocalDateFormat;
private final boolean useProleptic;
private final boolean fileUsesProleptic;
TimestampTreeReader(int columnId, Context context,
boolean instantType) throws IOException {
this(columnId, null, null, null, null, context, instantType);
}
protected TimestampTreeReader(int columnId, InStream presentStream, InStream dataStream,
InStream nanosStream,
OrcProto.ColumnEncoding encoding,
Context context,
boolean instantType) throws IOException {
super(columnId, presentStream, context);
this.instantType = instantType;
this.threadLocalDateFormat = new ThreadLocal<>();
this.threadLocalDateFormat.set(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"));
this.baseTimestampMap = new HashMap<>();
if (instantType || context.getUseUTCTimestamp()) {
this.readerTimeZone = TimeZone.getTimeZone("UTC");
} else {
this.readerTimeZone = TimeZone.getDefault();
}
if (context.getWriterTimezone() == null || context.getWriterTimezone().isEmpty()) {
if (instantType) {
this.base_timestamp = getBaseTimestamp(readerTimeZone.getID()); // UTC
} else {
this.base_timestamp = getBaseTimestamp(TimeZone.getDefault().getID());
}
} else {
this.base_timestamp = getBaseTimestamp(context.getWriterTimezone());
}
if (encoding != null) {
checkEncoding(encoding);
if (dataStream != null) {
this.data = createIntegerReader(encoding.getKind(), dataStream, true, context);
}
if (nanosStream != null) {
this.nanos = createIntegerReader(encoding.getKind(), nanosStream, false, context);
}
}
fileUsesProleptic = context.fileUsedProlepticGregorian();
useProleptic = context.useProlepticGregorian();
}
@Override
public void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
(encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
throw new IOException("Unknown encoding " + encoding + " in column " +
columnId);
}
}
@Override
public void startStripe(StripePlanner planner) throws IOException {
super.startStripe(planner);
OrcProto.ColumnEncoding.Kind kind = planner.getEncoding(columnId).getKind();
data = createIntegerReader(kind,
planner.getStream(new StreamName(columnId,
OrcProto.Stream.Kind.DATA)), true, context);
nanos = createIntegerReader(kind,
planner.getStream(new StreamName(columnId,
OrcProto.Stream.Kind.SECONDARY)), false, context);
if (!instantType) {
base_timestamp = getBaseTimestamp(planner.getWriterTimezone());
}
}
protected long getBaseTimestamp(String timeZoneId) throws IOException {
// to make sure new readers read old files in the same way
if (timeZoneId == null || timeZoneId.isEmpty()) {
timeZoneId = writerTimeZone.getID();
}
if (writerTimeZone == null || !timeZoneId.equals(writerTimeZone.getID())) {
writerTimeZone = TimeZone.getTimeZone(timeZoneId);
hasSameTZRules = writerTimeZone.hasSameRules(readerTimeZone);
if (!baseTimestampMap.containsKey(timeZoneId)) {
threadLocalDateFormat.get().setTimeZone(writerTimeZone);
try {
long epoch = threadLocalDateFormat.get()
.parse(TimestampTreeWriter.BASE_TIMESTAMP_STRING).getTime() /
TimestampTreeWriter.MILLIS_PER_SECOND;
baseTimestampMap.put(timeZoneId, epoch);
return epoch;
} catch (ParseException e) {
throw new IOException("Unable to create base timestamp", e);
} finally {
threadLocalDateFormat.get().setTimeZone(readerTimeZone);
}
} else {
return baseTimestampMap.get(timeZoneId);
}
}
return base_timestamp;
}
@Override
public void seek(PositionProvider[] index) throws IOException {
seek(index[columnId]);
}
@Override
public void seek(PositionProvider index) throws IOException {
super.seek(index);
data.seek(index);
nanos.seek(index);
}
@Override
public void nextVector(ColumnVector previousVector,
boolean[] isNull,
final int batchSize) throws IOException {
TimestampColumnVector result = (TimestampColumnVector) previousVector;
result.changeCalendar(fileUsesProleptic, false);
super.nextVector(previousVector, isNull, batchSize);
result.setIsUTC(context.getUseUTCTimestamp());
for (int i = 0; i < batchSize; i++) {
if (result.noNulls || !result.isNull[i]) {
final int newNanos = parseNanos(nanos.next());
long millis = (data.next() + base_timestamp)
* TimestampTreeWriter.MILLIS_PER_SECOND + newNanos / 1_000_000;
if (millis < 0 && newNanos > 999_999) {
millis -= TimestampTreeWriter.MILLIS_PER_SECOND;
}
long offset = 0;
// If reader and writer time zones have different rules, adjust the timezone difference
// between reader and writer taking day light savings into account.
if (!hasSameTZRules) {
offset = SerializationUtils.convertBetweenTimezones(writerTimeZone,
readerTimeZone, millis);
}
result.time[i] = millis + offset;
result.nanos[i] = newNanos;
if (result.isRepeating && i != 0 &&
(result.time[0] != result.time[i] ||
result.nanos[0] != result.nanos[i])) {
result.isRepeating = false;
}
}
}
result.changeCalendar(useProleptic, true);
}
private static int parseNanos(long serialized) {
int zeros = 7 & (int) serialized;
int result = (int) (serialized >>> 3);
if (zeros != 0) {
result *= (int) powerOfTenTable[zeros + 1];
}
return result;
}
@Override
public void skipRows(long items) throws IOException {
items = countNonNulls(items);
data.skip(items);
nanos.skip(items);
}
}
public static class DateTreeReader extends TreeReader {
protected IntegerReader reader = null;
private final boolean needsDateColumnVector;
private final boolean useProleptic;
private final boolean fileUsesProleptic;
DateTreeReader(int columnId, Context context) throws IOException {
this(columnId, null, null, null, context);
}
protected DateTreeReader(int columnId, InStream present, InStream data,
OrcProto.ColumnEncoding encoding, Context context) throws IOException {
super(columnId, present, context);
useProleptic = context.useProlepticGregorian();
fileUsesProleptic = context.fileUsedProlepticGregorian();
// if either side is proleptic, we need a DateColumnVector
needsDateColumnVector = useProleptic || fileUsesProleptic;
if (data != null && encoding != null) {
checkEncoding(encoding);
reader = createIntegerReader(encoding.getKind(), data, true, context);
}
}
@Override
public void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
(encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
throw new IOException("Unknown encoding " + encoding + " in column " +
columnId);
}
}
@Override
public void startStripe(StripePlanner planner) throws IOException {
super.startStripe(planner);
StreamName name = new StreamName(columnId,
OrcProto.Stream.Kind.DATA);
reader = createIntegerReader(planner.getEncoding(columnId).getKind(),
planner.getStream(name), true, context);
}
@Override
public void seek(PositionProvider[] index) throws IOException {
seek(index[columnId]);
}
@Override
public void seek(PositionProvider index) throws IOException {
super.seek(index);
reader.seek(index);
}
@Override
public void nextVector(ColumnVector previousVector,
boolean[] isNull,
final int batchSize) throws IOException {
final LongColumnVector result = (LongColumnVector) previousVector;
if (needsDateColumnVector) {
if (result instanceof DateColumnVector) {
((DateColumnVector) result).changeCalendar(fileUsesProleptic, false);
} else {
throw new IllegalArgumentException("Can't use LongColumnVector to " +
"read proleptic Gregorian dates.");
}
}
// Read present/isNull stream
super.nextVector(result, isNull, batchSize);
// Read value entries based on isNull entries
reader.nextVector(result, result.vector, batchSize);
if (needsDateColumnVector) {
((DateColumnVector) result).changeCalendar(useProleptic, true);
}
}
@Override
public void skipRows(long items) throws IOException {
reader.skip(countNonNulls(items));
}
}
public static class DecimalTreeReader extends TreeReader {
protected final int precision;
protected final int scale;
protected InStream valueStream;
protected IntegerReader scaleReader = null;
private int[] scratchScaleVector;
private byte[] scratchBytes;
DecimalTreeReader(int columnId,
int precision,
int scale,
Context context) throws IOException {
this(columnId, null, null, null, null, precision, scale, context);
}
protected DecimalTreeReader(int columnId,
InStream present,
InStream valueStream,
InStream scaleStream,
OrcProto.ColumnEncoding encoding,
int precision,
int scale,
Context context) throws IOException {
super(columnId, present, context);
this.precision = precision;
this.scale = scale;
this.scratchScaleVector = new int[VectorizedRowBatch.DEFAULT_SIZE];
this.valueStream = valueStream;
this.scratchBytes = new byte[HiveDecimal.SCRATCH_BUFFER_LEN_SERIALIZATION_UTILS_READ];
if (scaleStream != null && encoding != null) {
checkEncoding(encoding);
this.scaleReader = createIntegerReader(encoding.getKind(), scaleStream, true, context);
}
}
@Override
public void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
(encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
throw new IOException("Unknown encoding " + encoding + " in column " +
columnId);
}
}
@Override
public void startStripe(StripePlanner planner) throws IOException {
super.startStripe(planner);
valueStream = planner.getStream(new StreamName(columnId,
OrcProto.Stream.Kind.DATA));
scaleReader = createIntegerReader(planner.getEncoding(columnId).getKind(),
planner.getStream(new StreamName(columnId, OrcProto.Stream.Kind.SECONDARY)), true, context);
}
@Override
public void seek(PositionProvider[] index) throws IOException {
seek(index[columnId]);
}
@Override
public void seek(PositionProvider index) throws IOException {
super.seek(index);
valueStream.seek(index);
scaleReader.seek(index);
}
private void nextVector(DecimalColumnVector result,
boolean[] isNull,
final int batchSize) throws IOException {
if (batchSize > scratchScaleVector.length) {
scratchScaleVector = new int[(int) batchSize];
}
// read the scales
scaleReader.nextVector(result, scratchScaleVector, batchSize);
// Read value entries based on isNull entries
// Use the fast ORC deserialization method that emulates SerializationUtils.readBigInteger
// provided by HiveDecimalWritable.
HiveDecimalWritable[] vector = result.vector;
HiveDecimalWritable decWritable;
if (result.noNulls) {
for (int r=0; r < batchSize; ++r) {
decWritable = vector[r];
if (!decWritable.serializationUtilsRead(
valueStream, scratchScaleVector[r],
scratchBytes)) {
result.isNull[r] = true;
result.noNulls = false;
}
}
} else if (!result.isRepeating || !result.isNull[0]) {
for (int r=0; r < batchSize; ++r) {
if (!result.isNull[r]) {
decWritable = vector[r];
if (!decWritable.serializationUtilsRead(
valueStream, scratchScaleVector[r],
scratchBytes)) {
result.isNull[r] = true;
result.noNulls = false;
}
}
}
}
}
private void nextVector(Decimal64ColumnVector result,
boolean[] isNull,
final int batchSize) throws IOException {
if (precision > TypeDescription.MAX_DECIMAL64_PRECISION) {
throw new IllegalArgumentException("Reading large precision type into" +
" Decimal64ColumnVector.");
}
if (batchSize > scratchScaleVector.length) {
scratchScaleVector = new int[(int) batchSize];
}
// read the scales
scaleReader.nextVector(result, scratchScaleVector, batchSize);
if (result.noNulls) {
for (int r=0; r < batchSize; ++r) {
final long scaleFactor = powerOfTenTable[scale - scratchScaleVector[r]];
result.vector[r] = SerializationUtils.readVslong(valueStream) * scaleFactor;
}
} else if (!result.isRepeating || !result.isNull[0]) {
for (int r=0; r < batchSize; ++r) {
if (!result.isNull[r]) {
final long scaleFactor = powerOfTenTable[scale - scratchScaleVector[r]];
result.vector[r] = SerializationUtils.readVslong(valueStream) * scaleFactor;
}
}
}
result.precision = (short) precision;
result.scale = (short) scale;
}
@Override
public void nextVector(ColumnVector result,
boolean[] isNull,
final int batchSize) throws IOException {
// Read present/isNull stream
super.nextVector(result, isNull, batchSize);
if (result instanceof Decimal64ColumnVector) {
nextVector((Decimal64ColumnVector) result, isNull, batchSize);
} else {
nextVector((DecimalColumnVector) result, isNull, batchSize);
}
}
@Override
public void skipRows(long items) throws IOException {
items = countNonNulls(items);
HiveDecimalWritable scratchDecWritable = new HiveDecimalWritable();
for (int i = 0; i < items; i++) {
scratchDecWritable.serializationUtilsRead(valueStream, 0, scratchBytes);
}
scaleReader.skip(items);
}
}
public static class Decimal64TreeReader extends TreeReader {
protected final int precision;
protected final int scale;
protected final boolean skipCorrupt;
protected RunLengthIntegerReaderV2 valueReader;
Decimal64TreeReader(int columnId,
int precision,
int scale,
Context context) throws IOException {
this(columnId, null, null, null, precision, scale, context);
}
protected Decimal64TreeReader(int columnId,
InStream present,
InStream valueStream,
OrcProto.ColumnEncoding encoding,
int precision,
int scale,
Context context) throws IOException {
super(columnId, present, context);
this.precision = precision;
this.scale = scale;
valueReader = new RunLengthIntegerReaderV2(valueStream, true,
context.isSkipCorrupt());
skipCorrupt = context.isSkipCorrupt();
}
@Override
public void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT)) {
throw new IOException("Unknown encoding " + encoding + " in column " +
columnId);
}
}
@Override
public void startStripe(StripePlanner planner) throws IOException {
super.startStripe(planner);
InStream stream = planner.getStream(new StreamName(columnId,
OrcProto.Stream.Kind.DATA));
valueReader = new RunLengthIntegerReaderV2(stream, true, skipCorrupt);
}
@Override
public void seek(PositionProvider[] index) throws IOException {
seek(index[columnId]);
}
@Override
public void seek(PositionProvider index) throws IOException {
super.seek(index);
valueReader.seek(index);
}
private void nextVector(DecimalColumnVector result,
final int batchSize) throws IOException {
if (result.noNulls) {
for (int r=0; r < batchSize; ++r) {
result.vector[r].setFromLongAndScale(valueReader.next(), scale);
}
} else if (!result.isRepeating || !result.isNull[0]) {
for (int r=0; r < batchSize; ++r) {
if (result.noNulls || !result.isNull[r]) {
result.vector[r].setFromLongAndScale(valueReader.next(), scale);
}
}
}
result.precision = (short) precision;
result.scale = (short) scale;
}
private void nextVector(Decimal64ColumnVector result,
final int batchSize) throws IOException {
valueReader.nextVector(result, result.vector, batchSize);
result.precision = (short) precision;
result.scale = (short) scale;
}
@Override
public void nextVector(ColumnVector result,
boolean[] isNull,
final int batchSize) throws IOException {
// Read present/isNull stream
super.nextVector(result, isNull, batchSize);
if (result instanceof Decimal64ColumnVector) {
nextVector((Decimal64ColumnVector) result, batchSize);
} else {
nextVector((DecimalColumnVector) result, batchSize);
}
}
@Override
public void skipRows(long items) throws IOException {
items = countNonNulls(items);
valueReader.skip(items);
}
}
/**
* A tree reader that will read string columns. At the start of the
* stripe, it creates an internal reader based on whether a direct or
* dictionary encoding was used.
*/
public static class StringTreeReader extends TreeReader {
protected TypeReader reader;
StringTreeReader(int columnId, Context context) throws IOException {
super(columnId, context);
}
protected StringTreeReader(int columnId, InStream present, InStream data, InStream length,
InStream dictionary, OrcProto.ColumnEncoding encoding, Context context) throws IOException {
super(columnId, present, context);
if (encoding != null) {
switch (encoding.getKind()) {
case DIRECT:
case DIRECT_V2:
reader = new StringDirectTreeReader(columnId, present, data, length,
encoding.getKind());
break;
case DICTIONARY:
case DICTIONARY_V2:
reader = new StringDictionaryTreeReader(columnId, present, data, length, dictionary,
encoding, context);
break;
default:
throw new IllegalArgumentException("Unsupported encoding " +
encoding.getKind());
}
}
}
@Override
public void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
reader.checkEncoding(encoding);
}
@Override
public void startStripe(StripePlanner planner) throws IOException {
// For each stripe, checks the encoding and initializes the appropriate
// reader
switch (planner.getEncoding(columnId).getKind()) {
case DIRECT:
case DIRECT_V2:
reader = new StringDirectTreeReader(columnId);
break;
case DICTIONARY:
case DICTIONARY_V2:
reader = new StringDictionaryTreeReader(columnId, context);
break;
default:
throw new IllegalArgumentException("Unsupported encoding " +
planner.getEncoding(columnId).getKind());
}
reader.startStripe(planner);
}
@Override
public void seek(PositionProvider[] index) throws IOException {
reader.seek(index);
}
@Override
public void seek(PositionProvider index) throws IOException {
reader.seek(index);
}
@Override
public void nextVector(ColumnVector previousVector,
boolean[] isNull,
final int batchSize) throws IOException {
reader.nextVector(previousVector, isNull, batchSize);
}
@Override
public void skipRows(long items) throws IOException {
reader.skipRows(items);
}
}
// This class collects together very similar methods for reading an ORC vector of byte arrays and
// creating the BytesColumnVector.
//
public static class BytesColumnVectorUtil {
private static byte[] commonReadByteArrays(InStream stream, IntegerReader lengths,
LongColumnVector scratchlcv,
BytesColumnVector result, final int batchSize) throws IOException {
// Read lengths
scratchlcv.isRepeating = result.isRepeating;
scratchlcv.noNulls = result.noNulls;
scratchlcv.isNull = result.isNull; // Notice we are replacing the isNull vector here...
lengths.nextVector(scratchlcv, scratchlcv.vector, batchSize);
int totalLength = 0;
if (!scratchlcv.isRepeating) {
for (int i = 0; i < batchSize; i++) {
if (!scratchlcv.isNull[i]) {
totalLength += (int) scratchlcv.vector[i];
}
}
} else {
if (!scratchlcv.isNull[0]) {
totalLength = (int) (batchSize * scratchlcv.vector[0]);
}
}
// Read all the strings for this batch
byte[] allBytes = new byte[totalLength];
int offset = 0;
int len = totalLength;
while (len > 0) {
int bytesRead = stream.read(allBytes, offset, len);
if (bytesRead < 0) {
throw new EOFException("Can't finish byte read from " + stream);
}
len -= bytesRead;
offset += bytesRead;
}
return allBytes;
}
// This method has the common code for reading in bytes into a BytesColumnVector.
public static void readOrcByteArrays(InStream stream,
IntegerReader lengths,
LongColumnVector scratchlcv,
BytesColumnVector result,
final int batchSize) throws IOException {
if (result.noNulls || !(result.isRepeating && result.isNull[0])) {
byte[] allBytes = commonReadByteArrays(stream, lengths, scratchlcv,
result, (int) batchSize);
// Too expensive to figure out 'repeating' by comparisons.
result.isRepeating = false;
int offset = 0;
if (!scratchlcv.isRepeating) {
for (int i = 0; i < batchSize; i++) {
if (!scratchlcv.isNull[i]) {
result.setRef(i, allBytes, offset, (int) scratchlcv.vector[i]);
offset += scratchlcv.vector[i];
} else {
result.setRef(i, allBytes, 0, 0);
}
}
} else {
for (int i = 0; i < batchSize; i++) {
if (!scratchlcv.isNull[i]) {
result.setRef(i, allBytes, offset, (int) scratchlcv.vector[0]);
offset += scratchlcv.vector[0];
} else {
result.setRef(i, allBytes, 0, 0);
}
}
}
}
}
}
/**
* A reader for string columns that are direct encoded in the current
* stripe.
*/
public static class StringDirectTreeReader extends TreeReader {
private static final HadoopShims SHIMS = HadoopShimsFactory.get();
protected InStream stream;
protected IntegerReader lengths;
private final LongColumnVector scratchlcv;
StringDirectTreeReader(int columnId) throws IOException {
this(columnId, null, null, null, null);
}
protected StringDirectTreeReader(int columnId, InStream present, InStream data,
InStream length, OrcProto.ColumnEncoding.Kind encoding) throws IOException {
super(columnId, present, null);
this.scratchlcv = new LongColumnVector();
this.stream = data;
if (length != null && encoding != null) {
this.lengths = createIntegerReader(encoding, length, false, context);
}
}
@Override
public void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT &&
encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2) {
throw new IOException("Unknown encoding " + encoding + " in column " +
columnId);
}
}
@Override
public void startStripe(StripePlanner planner) throws IOException {
super.startStripe(planner);
StreamName name = new StreamName(columnId,
OrcProto.Stream.Kind.DATA);
stream = planner.getStream(name);
lengths = createIntegerReader(planner.getEncoding(columnId).getKind(),
planner.getStream(new StreamName(columnId, OrcProto.Stream.Kind.LENGTH)),
false, context);
}
@Override
public void seek(PositionProvider[] index) throws IOException {
seek(index[columnId]);
}
@Override
public void seek(PositionProvider index) throws IOException {
super.seek(index);
stream.seek(index);
// don't seek data stream
lengths.seek(index);
}
@Override
public void nextVector(ColumnVector previousVector,
boolean[] isNull,
final int batchSize) throws IOException {
final BytesColumnVector result = (BytesColumnVector) previousVector;
// Read present/isNull stream
super.nextVector(result, isNull, batchSize);
scratchlcv.ensureSize(batchSize, false);
BytesColumnVectorUtil.readOrcByteArrays(stream, lengths, scratchlcv,
result, batchSize);
}
@Override
public void skipRows(long items) throws IOException {
items = countNonNulls(items);
long lengthToSkip = 0;
for (int i = 0; i < items; ++i) {
lengthToSkip += lengths.next();
}
while (lengthToSkip > 0) {
lengthToSkip -= stream.skip(lengthToSkip);
}
}
public IntegerReader getLengths() {
return lengths;
}
public InStream getStream() {
return stream;
}
}
/**
* A reader for string columns that are dictionary encoded in the current
* stripe.
*/
public static class StringDictionaryTreeReader extends TreeReader {
private static final byte[] EMPTY_BYTE_ARRAY = new byte[0];
private DynamicByteArray dictionaryBuffer;
private int[] dictionaryOffsets;
protected IntegerReader reader;
private byte[] dictionaryBufferInBytesCache = null;
private final LongColumnVector scratchlcv;
StringDictionaryTreeReader(int columnId, Context context) throws IOException {
this(columnId, null, null, null, null, null, context);
}
protected StringDictionaryTreeReader(int columnId, InStream present, InStream data,
InStream length, InStream dictionary, OrcProto.ColumnEncoding encoding,
Context context) throws IOException {
super(columnId, present, context);
scratchlcv = new LongColumnVector();
if (data != null && encoding != null) {
this.reader = createIntegerReader(encoding.getKind(), data, false, context);
}
if (dictionary != null && encoding != null) {
readDictionaryStream(dictionary);
}
if (length != null && encoding != null) {
readDictionaryLengthStream(length, encoding);
}
}
@Override
public void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DICTIONARY &&
encoding.getKind() != OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) {
throw new IOException("Unknown encoding " + encoding + " in column " +
columnId);
}
}
@Override
public void startStripe(StripePlanner planner) throws IOException {
super.startStripe(planner);
// read the dictionary blob
StreamName name = new StreamName(columnId,
OrcProto.Stream.Kind.DICTIONARY_DATA);
InStream in = planner.getStream(name);
readDictionaryStream(in);
// read the lengths
name = new StreamName(columnId, OrcProto.Stream.Kind.LENGTH);
in = planner.getStream(name);
OrcProto.ColumnEncoding encoding = planner.getEncoding(columnId);
readDictionaryLengthStream(in, encoding);
// set up the row reader
name = new StreamName(columnId, OrcProto.Stream.Kind.DATA);
reader = createIntegerReader(encoding.getKind(),
planner.getStream(name), false, context);
}
private void readDictionaryLengthStream(InStream in, OrcProto.ColumnEncoding encoding)
throws IOException {
int dictionarySize = encoding.getDictionarySize();
if (in != null) { // Guard against empty LENGTH stream.
IntegerReader lenReader = createIntegerReader(encoding.getKind(), in, false, context);
int offset = 0;
if (dictionaryOffsets == null ||
dictionaryOffsets.length < dictionarySize + 1) {
dictionaryOffsets = new int[dictionarySize + 1];
}
for (int i = 0; i < dictionarySize; ++i) {
dictionaryOffsets[i] = offset;
offset += (int) lenReader.next();
}
dictionaryOffsets[dictionarySize] = offset;
in.close();
}
}
private void readDictionaryStream(InStream in) throws IOException {
if (in != null) { // Guard against empty dictionary stream.
if (in.available() > 0) {
dictionaryBuffer = new DynamicByteArray(64, in.available());
dictionaryBuffer.readAll(in);
// Since its start of strip invalidate the cache.
dictionaryBufferInBytesCache = null;
}
in.close();
} else {
dictionaryBuffer = null;
}
}
@Override
public void seek(PositionProvider[] index) throws IOException {
seek(index[columnId]);
}
@Override
public void seek(PositionProvider index) throws IOException {
super.seek(index);
reader.seek(index);
}
@Override
public void nextVector(ColumnVector previousVector,
boolean[] isNull,
final int batchSize) throws IOException {
final BytesColumnVector result = (BytesColumnVector) previousVector;
int offset;
int length;
// Read present/isNull stream
super.nextVector(result, isNull, batchSize);
if (dictionaryBuffer != null) {
// Load dictionaryBuffer into cache.
if (dictionaryBufferInBytesCache == null) {
dictionaryBufferInBytesCache = dictionaryBuffer.get();
}
// Read string offsets
scratchlcv.isRepeating = result.isRepeating;
scratchlcv.noNulls = result.noNulls;
scratchlcv.isNull = result.isNull;
scratchlcv.ensureSize((int) batchSize, false);
reader.nextVector(scratchlcv, scratchlcv.vector, batchSize);
if (!scratchlcv.isRepeating) {
// The vector has non-repeating strings. Iterate thru the batch
// and set strings one by one
for (int i = 0; i < batchSize; i++) {
if (!scratchlcv.isNull[i]) {
offset = dictionaryOffsets[(int) scratchlcv.vector[i]];
length = getDictionaryEntryLength((int) scratchlcv.vector[i], offset);
result.setRef(i, dictionaryBufferInBytesCache, offset, length);
} else {
// If the value is null then set offset and length to zero (null string)
result.setRef(i, dictionaryBufferInBytesCache, 0, 0);
}
}
} else {
// If the value is repeating then just set the first value in the
// vector and set the isRepeating flag to true. No need to iterate thru and
// set all the elements to the same value
offset = dictionaryOffsets[(int) scratchlcv.vector[0]];
length = getDictionaryEntryLength((int) scratchlcv.vector[0], offset);
result.setRef(0, dictionaryBufferInBytesCache, offset, length);
}
result.isRepeating = scratchlcv.isRepeating;
} else {
if (dictionaryOffsets == null) {
// Entire stripe contains null strings.
result.isRepeating = true;
result.noNulls = false;
result.isNull[0] = true;
result.setRef(0, EMPTY_BYTE_ARRAY, 0, 0);
} else {
// stripe contains nulls and empty strings
for (int i = 0; i < batchSize; i++) {
if (!result.isNull[i]) {
result.setRef(i, EMPTY_BYTE_ARRAY, 0, 0);
}
}
}
}
}
int getDictionaryEntryLength(int entry, int offset) {
final int length;
// if it isn't the last entry, subtract the offsets otherwise use
// the buffer length.
if (entry < dictionaryOffsets.length - 1) {
length = dictionaryOffsets[entry + 1] - offset;
} else {
length = dictionaryBuffer.size() - offset;
}
return length;
}
@Override
public void skipRows(long items) throws IOException {
reader.skip(countNonNulls(items));
}
public IntegerReader getReader() {
return reader;
}
}
public static class CharTreeReader extends StringTreeReader {
int maxLength;
CharTreeReader(int columnId, int maxLength) throws IOException {
this(columnId, maxLength, null, null, null, null, null);
}
protected CharTreeReader(int columnId, int maxLength, InStream present, InStream data,
InStream length, InStream dictionary, OrcProto.ColumnEncoding encoding) throws IOException {
super(columnId, present, data, length, dictionary, encoding, null);
this.maxLength = maxLength;
}
@Override
public void nextVector(ColumnVector previousVector,
boolean[] isNull,
final int batchSize) throws IOException {
// Get the vector of strings from StringTreeReader, then make a 2nd pass to
// adjust down the length (right trim and truncate) if necessary.
super.nextVector(previousVector, isNull, batchSize);
BytesColumnVector result = (BytesColumnVector) previousVector;
int adjustedDownLen;
if (result.isRepeating) {
if (result.noNulls || !result.isNull[0]) {
adjustedDownLen = StringExpr
.rightTrimAndTruncate(result.vector[0], result.start[0], result.length[0], maxLength);
if (adjustedDownLen < result.length[0]) {
result.setRef(0, result.vector[0], result.start[0], adjustedDownLen);
}
}
} else {
if (result.noNulls) {
for (int i = 0; i < batchSize; i++) {
adjustedDownLen = StringExpr
.rightTrimAndTruncate(result.vector[i], result.start[i], result.length[i],
maxLength);
if (adjustedDownLen < result.length[i]) {
result.setRef(i, result.vector[i], result.start[i], adjustedDownLen);
}
}
} else {
for (int i = 0; i < batchSize; i++) {
if (!result.isNull[i]) {
adjustedDownLen = StringExpr
.rightTrimAndTruncate(result.vector[i], result.start[i], result.length[i],
maxLength);
if (adjustedDownLen < result.length[i]) {
result.setRef(i, result.vector[i], result.start[i], adjustedDownLen);
}
}
}
}
}
}
}
public static class VarcharTreeReader extends StringTreeReader {
int maxLength;
VarcharTreeReader(int columnId, int maxLength) throws IOException {
this(columnId, maxLength, null, null, null, null, null);
}
protected VarcharTreeReader(int columnId, int maxLength, InStream present, InStream data,
InStream length, InStream dictionary, OrcProto.ColumnEncoding encoding) throws IOException {
super(columnId, present, data, length, dictionary, encoding, null);
this.maxLength = maxLength;
}
@Override
public void nextVector(ColumnVector previousVector,
boolean[] isNull,
final int batchSize) throws IOException {
// Get the vector of strings from StringTreeReader, then make a 2nd pass to
// adjust down the length (truncate) if necessary.
super.nextVector(previousVector, isNull, batchSize);
BytesColumnVector result = (BytesColumnVector) previousVector;
int adjustedDownLen;
if (result.isRepeating) {
if (result.noNulls || !result.isNull[0]) {
adjustedDownLen = StringExpr
.truncate(result.vector[0], result.start[0], result.length[0], maxLength);
if (adjustedDownLen < result.length[0]) {
result.setRef(0, result.vector[0], result.start[0], adjustedDownLen);
}
}
} else {
if (result.noNulls) {
for (int i = 0; i < batchSize; i++) {
adjustedDownLen = StringExpr
.truncate(result.vector[i], result.start[i], result.length[i], maxLength);
if (adjustedDownLen < result.length[i]) {
result.setRef(i, result.vector[i], result.start[i], adjustedDownLen);
}
}
} else {
for (int i = 0; i < batchSize; i++) {
if (!result.isNull[i]) {
adjustedDownLen = StringExpr
.truncate(result.vector[i], result.start[i], result.length[i], maxLength);
if (adjustedDownLen < result.length[i]) {
result.setRef(i, result.vector[i], result.start[i], adjustedDownLen);
}
}
}
}
}
}
}
public static class StructTreeReader extends TreeReader {
public final TypeReader[] fields;
protected StructTreeReader(int columnId,
TypeDescription readerSchema,
Context context) throws IOException {
super(columnId, context);
List<TypeDescription> childrenTypes = readerSchema.getChildren();
this.fields = new TreeReader[childrenTypes.size()];
for (int i = 0; i < fields.length; ++i) {
TypeDescription subtype = childrenTypes.get(i);
this.fields[i] = createTreeReader(subtype, context);
}
}
public TypeReader[] getChildReaders() {
return fields;
}
protected StructTreeReader(int columnId, InStream present,
Context context,
OrcProto.ColumnEncoding encoding,
TypeReader[] childReaders) throws IOException {
super(columnId, present, context);
if (encoding != null) {
checkEncoding(encoding);
}
this.fields = childReaders;
}
@Override
public void seek(PositionProvider[] index) throws IOException {
super.seek(index);
for (TypeReader kid : fields) {
if (kid != null) {
kid.seek(index);
}
}
}
@Override
public void nextVector(ColumnVector previousVector,
boolean[] isNull,
final int batchSize) throws IOException {
super.nextVector(previousVector, isNull, batchSize);
StructColumnVector result = (StructColumnVector) previousVector;
if (result.noNulls || !(result.isRepeating && result.isNull[0])) {
result.isRepeating = false;
// Read all the members of struct as column vectors
boolean[] mask = result.noNulls ? null : result.isNull;
for (int f = 0; f < fields.length; f++) {
if (fields[f] != null) {
fields[f].nextVector(result.fields[f], mask, batchSize);
}
}
}
}
@Override
public void startStripe(StripePlanner planner) throws IOException {
super.startStripe(planner);
for (TypeReader field : fields) {
if (field != null) {
field.startStripe(planner);
}
}
}
@Override
public void skipRows(long items) throws IOException {
items = countNonNulls(items);
for (TypeReader field : fields) {
if (field != null) {
field.skipRows(items);
}
}
}
}
public static class UnionTreeReader extends TreeReader {
protected final TypeReader[] fields;
protected RunLengthByteReader tags;
protected UnionTreeReader(int fileColumn,
TypeDescription readerSchema,
Context context) throws IOException {
super(fileColumn, context);
List<TypeDescription> childrenTypes = readerSchema.getChildren();
int fieldCount = childrenTypes.size();
this.fields = new TreeReader[fieldCount];
for (int i = 0; i < fieldCount; ++i) {
TypeDescription subtype = childrenTypes.get(i);
this.fields[i] = createTreeReader(subtype, context);
}
}
protected UnionTreeReader(int columnId, InStream present,
Context context,
OrcProto.ColumnEncoding encoding,
TypeReader[] childReaders) throws IOException {
super(columnId, present, context);
if (encoding != null) {
checkEncoding(encoding);
}
this.fields = childReaders;
}
@Override
public void seek(PositionProvider[] index) throws IOException {
super.seek(index);
tags.seek(index[columnId]);
for (TypeReader kid : fields) {
kid.seek(index);
}
}
@Override
public void nextVector(ColumnVector previousVector,
boolean[] isNull,
final int batchSize) throws IOException {
UnionColumnVector result = (UnionColumnVector) previousVector;
super.nextVector(result, isNull, batchSize);
if (result.noNulls || !(result.isRepeating && result.isNull[0])) {
result.isRepeating = false;
tags.nextVector(result.noNulls ? null : result.isNull, result.tags,
batchSize);
boolean[] ignore = new boolean[(int) batchSize];
for (int f = 0; f < result.fields.length; ++f) {
// build the ignore list for this tag
for (int r = 0; r < batchSize; ++r) {
ignore[r] = (!result.noNulls && result.isNull[r]) ||
result.tags[r] != f;
}
fields[f].nextVector(result.fields[f], ignore, batchSize);
}
}
}
@Override
public void startStripe(StripePlanner planner) throws IOException {
super.startStripe(planner);
tags = new RunLengthByteReader(planner.getStream(new StreamName(columnId,
OrcProto.Stream.Kind.DATA)));
for (TypeReader field : fields) {
if (field != null) {
field.startStripe(planner);
}
}
}
@Override
public void skipRows(long items) throws IOException {
items = countNonNulls(items);
long[] counts = new long[fields.length];
for (int i = 0; i < items; ++i) {
counts[tags.next()] += 1;
}
for (int i = 0; i < counts.length; ++i) {
fields[i].skipRows(counts[i]);
}
}
}
public static class ListTreeReader extends TreeReader {
protected final TypeReader elementReader;
protected IntegerReader lengths = null;
protected ListTreeReader(int fileColumn,
TypeDescription readerSchema,
Context context) throws IOException {
super(fileColumn, context);
TypeDescription elementType = readerSchema.getChildren().get(0);
elementReader = createTreeReader(elementType, context);
}
protected ListTreeReader(int columnId,
InStream present,
Context context,
InStream data,
OrcProto.ColumnEncoding encoding,
TypeReader elementReader) throws IOException {
super(columnId, present, context);
if (data != null && encoding != null) {
checkEncoding(encoding);
this.lengths = createIntegerReader(encoding.getKind(), data, false, context);
}
this.elementReader = elementReader;
}
@Override
public void seek(PositionProvider[] index) throws IOException {
super.seek(index);
lengths.seek(index[columnId]);
elementReader.seek(index);
}
@Override
public void nextVector(ColumnVector previous,
boolean[] isNull,
final int batchSize) throws IOException {
ListColumnVector result = (ListColumnVector) previous;
super.nextVector(result, isNull, batchSize);
// if we have some none-null values, then read them
if (result.noNulls || !(result.isRepeating && result.isNull[0])) {
lengths.nextVector(result, result.lengths, batchSize);
// even with repeating lengths, the list doesn't repeat
result.isRepeating = false;
// build the offsets vector and figure out how many children to read
result.childCount = 0;
for (int r = 0; r < batchSize; ++r) {
if (result.noNulls || !result.isNull[r]) {
result.offsets[r] = result.childCount;
result.childCount += result.lengths[r];
}
}
result.child.ensureSize(result.childCount, false);
elementReader.nextVector(result.child, null, result.childCount);
}
}
@Override
public void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
(encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
throw new IOException("Unknown encoding " + encoding + " in column " +
columnId);
}
}
@Override
public void startStripe(StripePlanner planner) throws IOException {
super.startStripe(planner);
lengths = createIntegerReader(planner.getEncoding(columnId).getKind(),
planner.getStream(new StreamName(columnId,
OrcProto.Stream.Kind.LENGTH)), false, context);
if (elementReader != null) {
elementReader.startStripe(planner);
}
}
@Override
public void skipRows(long items) throws IOException {
items = countNonNulls(items);
long childSkip = 0;
for (long i = 0; i < items; ++i) {
childSkip += lengths.next();
}
elementReader.skipRows(childSkip);
}
}
public static class MapTreeReader extends TreeReader {
protected final TypeReader keyReader;
protected final TypeReader valueReader;
protected IntegerReader lengths = null;
protected MapTreeReader(int fileColumn,
TypeDescription readerSchema,
Context context) throws IOException {
super(fileColumn, context);
TypeDescription keyType = readerSchema.getChildren().get(0);
TypeDescription valueType = readerSchema.getChildren().get(1);
keyReader = createTreeReader(keyType, context);
valueReader = createTreeReader(valueType, context);
}
protected MapTreeReader(int columnId,
InStream present,
Context context,
InStream data,
OrcProto.ColumnEncoding encoding,
TypeReader keyReader,
TypeReader valueReader) throws IOException {
super(columnId, present, context);
if (data != null && encoding != null) {
checkEncoding(encoding);
this.lengths = createIntegerReader(encoding.getKind(), data, false, context);
}
this.keyReader = keyReader;
this.valueReader = valueReader;
}
@Override
public void seek(PositionProvider[] index) throws IOException {
super.seek(index);
lengths.seek(index[columnId]);
keyReader.seek(index);
valueReader.seek(index);
}
@Override
public void nextVector(ColumnVector previous,
boolean[] isNull,
final int batchSize) throws IOException {
MapColumnVector result = (MapColumnVector) previous;
super.nextVector(result, isNull, batchSize);
if (result.noNulls || !(result.isRepeating && result.isNull[0])) {
lengths.nextVector(result, result.lengths, batchSize);
// even with repeating lengths, the map doesn't repeat
result.isRepeating = false;
// build the offsets vector and figure out how many children to read
result.childCount = 0;
for (int r = 0; r < batchSize; ++r) {
if (result.noNulls || !result.isNull[r]) {
result.offsets[r] = result.childCount;
result.childCount += result.lengths[r];
}
}
result.keys.ensureSize(result.childCount, false);
result.values.ensureSize(result.childCount, false);
keyReader.nextVector(result.keys, null, result.childCount);
valueReader.nextVector(result.values, null, result.childCount);
}
}
@Override
public void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
if ((encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) &&
(encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT_V2)) {
throw new IOException("Unknown encoding " + encoding + " in column " +
columnId);
}
}
@Override
public void startStripe(StripePlanner planner) throws IOException {
super.startStripe(planner);
lengths = createIntegerReader(planner.getEncoding(columnId).getKind(),
planner.getStream(new StreamName(columnId,
OrcProto.Stream.Kind.LENGTH)), false, context);
if (keyReader != null) {
keyReader.startStripe(planner);
}
if (valueReader != null) {
valueReader.startStripe(planner);
}
}
@Override
public void skipRows(long items) throws IOException {
items = countNonNulls(items);
long childSkip = 0;
for (long i = 0; i < items; ++i) {
childSkip += lengths.next();
}
keyReader.skipRows(childSkip);
valueReader.skipRows(childSkip);
}
}
public static TypeReader createTreeReader(TypeDescription readerType,
Context context
) throws IOException {
OrcFile.Version version = context.getFileFormat();
final SchemaEvolution evolution = context.getSchemaEvolution();
TypeDescription fileType = evolution.getFileType(readerType);
if (fileType == null || !evolution.includeReaderColumn(readerType.getId())){
return new NullTreeReader(0);
}
TypeDescription.Category readerTypeCategory = readerType.getCategory();
// We skip attribute checks when comparing types since they are not used to
// create the ConvertTreeReaders
if (!fileType.equals(readerType, false) &&
(readerTypeCategory != TypeDescription.Category.STRUCT &&
readerTypeCategory != TypeDescription.Category.MAP &&
readerTypeCategory != TypeDescription.Category.LIST &&
readerTypeCategory != TypeDescription.Category.UNION)) {
// We only convert complex children.
return ConvertTreeReaderFactory.createConvertTreeReader(readerType, context);
}
switch (readerTypeCategory) {
case BOOLEAN:
return new BooleanTreeReader(fileType.getId());
case BYTE:
return new ByteTreeReader(fileType.getId());
case DOUBLE:
return new DoubleTreeReader(fileType.getId());
case FLOAT:
return new FloatTreeReader(fileType.getId());
case SHORT:
return new ShortTreeReader(fileType.getId(), context);
case INT:
return new IntTreeReader(fileType.getId(), context);
case LONG:
return new LongTreeReader(fileType.getId(), context);
case STRING:
return new StringTreeReader(fileType.getId(), context);
case CHAR:
return new CharTreeReader(fileType.getId(), readerType.getMaxLength());
case VARCHAR:
return new VarcharTreeReader(fileType.getId(), readerType.getMaxLength());
case BINARY:
return new BinaryTreeReader(fileType.getId(), context);
case TIMESTAMP:
return new TimestampTreeReader(fileType.getId(), context, false);
case TIMESTAMP_INSTANT:
return new TimestampTreeReader(fileType.getId(), context, true);
case DATE:
return new DateTreeReader(fileType.getId(), context);
case DECIMAL:
if (version == OrcFile.Version.UNSTABLE_PRE_2_0 &&
fileType.getPrecision() <= TypeDescription.MAX_DECIMAL64_PRECISION){
return new Decimal64TreeReader(fileType.getId(), fileType.getPrecision(),
fileType.getScale(), context);
}
return new DecimalTreeReader(fileType.getId(), fileType.getPrecision(),
fileType.getScale(), context);
case STRUCT:
return new StructTreeReader(fileType.getId(), readerType, context);
case LIST:
return new ListTreeReader(fileType.getId(), readerType, context);
case MAP:
return new MapTreeReader(fileType.getId(), readerType, context);
case UNION:
return new UnionTreeReader(fileType.getId(), readerType, context);
default:
throw new IllegalArgumentException("Unsupported type " +
readerTypeCategory);
}
}
public static BatchReader createRootReader(TypeDescription readerType, Context context)
throws IOException {
TypeReader reader = createTreeReader(readerType, context);
if (reader instanceof StructTreeReader) {
return new StructBatchReader((StructTreeReader) reader);
} else {
return new PrimitiveBatchReader(reader);
}
}
}