blob: 3fbd5bf0f0e1b3a905ce662927b63facf35cffa1 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql.io.orc;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.type.HiveDecimal;
import org.apache.hadoop.hive.serde2.io.ByteWritable;
import org.apache.hadoop.hive.serde2.io.DoubleWritable;
import org.apache.hadoop.hive.serde2.io.ShortWritable;
import org.apache.hadoop.io.BooleanWritable;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.FloatWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
class RecordReaderImpl implements RecordReader {
private final FSDataInputStream file;
private final long firstRow;
private final List<StripeInformation> stripes =
new ArrayList<StripeInformation>();
private OrcProto.StripeFooter stripeFooter;
private final long totalRowCount;
private final CompressionCodec codec;
private final int bufferSize;
private final boolean[] included;
private final long rowIndexStride;
private long rowInStripe = 0;
private int currentStripe = 0;
private long rowBaseInStripe = 0;
private long rowCountInStripe = 0;
private final Map<StreamName, InStream> streams =
new HashMap<StreamName, InStream>();
private final TreeReader reader;
private final OrcProto.RowIndex[] indexes;
RecordReaderImpl(Iterable<StripeInformation> stripes,
FileSystem fileSystem,
Path path,
long offset, long length,
List<OrcProto.Type> types,
CompressionCodec codec,
int bufferSize,
boolean[] included,
long strideRate
) throws IOException {
this.file = fileSystem.open(path);
this.codec = codec;
this.bufferSize = bufferSize;
this.included = included;
long rows = 0;
long skippedRows = 0;
for(StripeInformation stripe: stripes) {
long stripeStart = stripe.getOffset();
if (offset > stripeStart) {
skippedRows += stripe.getNumberOfRows();
} else if (stripeStart < offset + length) {
this.stripes.add(stripe);
rows += stripe.getNumberOfRows();
}
}
firstRow = skippedRows;
totalRowCount = rows;
reader = createTreeReader(path, 0, types, included);
indexes = new OrcProto.RowIndex[types.size()];
rowIndexStride = strideRate;
if (this.stripes.size() > 0) {
readStripe();
}
}
private static final class PositionProviderImpl implements PositionProvider {
private final OrcProto.RowIndexEntry entry;
private int index = 0;
PositionProviderImpl(OrcProto.RowIndexEntry entry) {
this.entry = entry;
}
@Override
public long getNext() {
return entry.getPositions(index++);
}
}
private abstract static class TreeReader {
protected final Path path;
protected final int columnId;
private BitFieldReader present = null;
protected boolean valuePresent = false;
TreeReader(Path path, int columnId) {
this.path = path;
this.columnId = columnId;
}
void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) {
throw new IOException("Unknown encoding " + encoding + " in column " +
columnId + " of " + path);
}
}
void startStripe(Map<StreamName, InStream> streams,
List<OrcProto.ColumnEncoding> encoding
) throws IOException {
checkEncoding(encoding.get(columnId));
InStream in = streams.get(new StreamName(columnId,
OrcProto.Stream.Kind.PRESENT));
if (in == null) {
present = null;
valuePresent = true;
} else {
present = new BitFieldReader(in, 1);
}
}
/**
* Seek to the given position.
* @param index the indexes loaded from the file
* @throws IOException
*/
void seek(PositionProvider[] index) throws IOException {
if (present != null) {
present.seek(index[columnId]);
}
}
protected long countNonNulls(long rows) throws IOException {
if (present != null) {
long result = 0;
for(long c=0; c < rows; ++c) {
if (present.next() == 1) {
result += 1;
}
}
return result;
} else {
return rows;
}
}
abstract void skipRows(long rows) throws IOException;
Object next(Object previous) throws IOException {
if (present != null) {
valuePresent = present.next() == 1;
}
return previous;
}
}
private static class BooleanTreeReader extends TreeReader{
private BitFieldReader reader = null;
BooleanTreeReader(Path path, int columnId) {
super(path, columnId);
}
@Override
void startStripe(Map<StreamName, InStream> streams,
List<OrcProto.ColumnEncoding> encodings
) throws IOException {
super.startStripe(streams, encodings);
reader = new BitFieldReader(streams.get(new StreamName(columnId,
OrcProto.Stream.Kind.DATA)), 1);
}
@Override
void seek(PositionProvider[] index) throws IOException {
super.seek(index);
reader.seek(index[columnId]);
}
@Override
void skipRows(long items) throws IOException {
reader.skip(countNonNulls(items));
}
@Override
Object next(Object previous) throws IOException {
super.next(previous);
BooleanWritable result = null;
if (valuePresent) {
if (previous == null) {
result = new BooleanWritable();
} else {
result = (BooleanWritable) previous;
}
result.set(reader.next() == 1);
}
return result;
}
}
private static class ByteTreeReader extends TreeReader{
private RunLengthByteReader reader = null;
ByteTreeReader(Path path, int columnId) {
super(path, columnId);
}
@Override
void startStripe(Map<StreamName, InStream> streams,
List<OrcProto.ColumnEncoding> encodings
) throws IOException {
super.startStripe(streams, encodings);
reader = new RunLengthByteReader(streams.get(new StreamName(columnId,
OrcProto.Stream.Kind.DATA)));
}
@Override
void seek(PositionProvider[] index) throws IOException {
super.seek(index);
reader.seek(index[columnId]);
}
@Override
Object next(Object previous) throws IOException {
super.next(previous);
ByteWritable result = null;
if (valuePresent) {
if (previous == null) {
result = new ByteWritable();
} else {
result = (ByteWritable) previous;
}
result.set(reader.next());
}
return result;
}
@Override
void skipRows(long items) throws IOException {
reader.skip(countNonNulls(items));
}
}
private static class ShortTreeReader extends TreeReader{
private RunLengthIntegerReader reader = null;
ShortTreeReader(Path path, int columnId) {
super(path, columnId);
}
@Override
void startStripe(Map<StreamName, InStream> streams,
List<OrcProto.ColumnEncoding> encodings
) throws IOException {
super.startStripe(streams, encodings);
StreamName name = new StreamName(columnId,
OrcProto.Stream.Kind.DATA);
reader = new RunLengthIntegerReader(streams.get(name), true);
}
@Override
void seek(PositionProvider[] index) throws IOException {
super.seek(index);
reader.seek(index[columnId]);
}
@Override
Object next(Object previous) throws IOException {
super.next(previous);
ShortWritable result = null;
if (valuePresent) {
if (previous == null) {
result = new ShortWritable();
} else {
result = (ShortWritable) previous;
}
result.set((short) reader.next());
}
return result;
}
@Override
void skipRows(long items) throws IOException {
reader.skip(countNonNulls(items));
}
}
private static class IntTreeReader extends TreeReader{
private RunLengthIntegerReader reader = null;
IntTreeReader(Path path, int columnId) {
super(path, columnId);
}
@Override
void startStripe(Map<StreamName, InStream> streams,
List<OrcProto.ColumnEncoding> encodings
) throws IOException {
super.startStripe(streams, encodings);
StreamName name = new StreamName(columnId,
OrcProto.Stream.Kind.DATA);
reader = new RunLengthIntegerReader(streams.get(name), true);
}
@Override
void seek(PositionProvider[] index) throws IOException {
super.seek(index);
reader.seek(index[columnId]);
}
@Override
Object next(Object previous) throws IOException {
super.next(previous);
IntWritable result = null;
if (valuePresent) {
if (previous == null) {
result = new IntWritable();
} else {
result = (IntWritable) previous;
}
result.set((int) reader.next());
}
return result;
}
@Override
void skipRows(long items) throws IOException {
reader.skip(countNonNulls(items));
}
}
private static class LongTreeReader extends TreeReader{
private RunLengthIntegerReader reader = null;
LongTreeReader(Path path, int columnId) {
super(path, columnId);
}
@Override
void startStripe(Map<StreamName, InStream> streams,
List<OrcProto.ColumnEncoding> encodings
) throws IOException {
super.startStripe(streams, encodings);
StreamName name = new StreamName(columnId,
OrcProto.Stream.Kind.DATA);
reader = new RunLengthIntegerReader(streams.get(name), true);
}
@Override
void seek(PositionProvider[] index) throws IOException {
super.seek(index);
reader.seek(index[columnId]);
}
@Override
Object next(Object previous) throws IOException {
super.next(previous);
LongWritable result = null;
if (valuePresent) {
if (previous == null) {
result = new LongWritable();
} else {
result = (LongWritable) previous;
}
result.set(reader.next());
}
return result;
}
@Override
void skipRows(long items) throws IOException {
reader.skip(countNonNulls(items));
}
}
private static class FloatTreeReader extends TreeReader{
private InStream stream;
FloatTreeReader(Path path, int columnId) {
super(path, columnId);
}
@Override
void startStripe(Map<StreamName, InStream> streams,
List<OrcProto.ColumnEncoding> encodings
) throws IOException {
super.startStripe(streams, encodings);
StreamName name = new StreamName(columnId,
OrcProto.Stream.Kind.DATA);
stream = streams.get(name);
}
@Override
void seek(PositionProvider[] index) throws IOException {
super.seek(index);
stream.seek(index[columnId]);
}
@Override
Object next(Object previous) throws IOException {
super.next(previous);
FloatWritable result = null;
if (valuePresent) {
if (previous == null) {
result = new FloatWritable();
} else {
result = (FloatWritable) previous;
}
result.set(SerializationUtils.readFloat(stream));
}
return result;
}
@Override
void skipRows(long items) throws IOException {
items = countNonNulls(items);
for(int i=0; i < items; ++i) {
SerializationUtils.readFloat(stream);
}
}
}
private static class DoubleTreeReader extends TreeReader{
private InStream stream;
DoubleTreeReader(Path path, int columnId) {
super(path, columnId);
}
@Override
void startStripe(Map<StreamName, InStream> streams,
List<OrcProto.ColumnEncoding> encodings
) throws IOException {
super.startStripe(streams, encodings);
StreamName name =
new StreamName(columnId,
OrcProto.Stream.Kind.DATA);
stream = streams.get(name);
}
@Override
void seek(PositionProvider[] index) throws IOException {
super.seek(index);
stream.seek(index[columnId]);
}
@Override
Object next(Object previous) throws IOException {
super.next(previous);
DoubleWritable result = null;
if (valuePresent) {
if (previous == null) {
result = new DoubleWritable();
} else {
result = (DoubleWritable) previous;
}
result.set(SerializationUtils.readDouble(stream));
}
return result;
}
@Override
void skipRows(long items) throws IOException {
items = countNonNulls(items);
stream.skip(items * 8);
}
}
private static class BinaryTreeReader extends TreeReader{
private InStream stream;
private RunLengthIntegerReader lengths;
BinaryTreeReader(Path path, int columnId) {
super(path, columnId);
}
@Override
void startStripe(Map<StreamName, InStream> streams,
List<OrcProto.ColumnEncoding> encodings
) throws IOException {
super.startStripe(streams, encodings);
StreamName name = new StreamName(columnId,
OrcProto.Stream.Kind.DATA);
stream = streams.get(name);
lengths = new RunLengthIntegerReader(streams.get(new
StreamName(columnId, OrcProto.Stream.Kind.LENGTH)),
false);
}
@Override
void seek(PositionProvider[] index) throws IOException {
super.seek(index);
stream.seek(index[columnId]);
lengths.seek(index[columnId]);
}
@Override
Object next(Object previous) throws IOException {
super.next(previous);
BytesWritable result = null;
if (valuePresent) {
if (previous == null) {
result = new BytesWritable();
} else {
result = (BytesWritable) previous;
}
int len = (int) lengths.next();
result.setSize(len);
int offset = 0;
while (len > 0) {
int written = stream.read(result.getBytes(), offset, len);
if (written < 0) {
throw new EOFException("Can't finish byte read from " + stream);
}
len -= written;
offset += written;
}
}
return result;
}
@Override
void skipRows(long items) throws IOException {
items = countNonNulls(items);
long lengthToSkip = 0;
for(int i=0; i < items; ++i) {
lengthToSkip += lengths.next();
}
stream.skip(lengthToSkip);
}
}
private static class TimestampTreeReader extends TreeReader{
private RunLengthIntegerReader data;
private RunLengthIntegerReader nanos;
TimestampTreeReader(Path path, int columnId) {
super(path, columnId);
}
@Override
void startStripe(Map<StreamName, InStream> streams,
List<OrcProto.ColumnEncoding> encodings
) throws IOException {
super.startStripe(streams, encodings);
data = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
OrcProto.Stream.Kind.DATA)), true);
nanos = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
OrcProto.Stream.Kind.SECONDARY)), false);
}
@Override
void seek(PositionProvider[] index) throws IOException {
super.seek(index);
data.seek(index[columnId]);
nanos.seek(index[columnId]);
}
@Override
Object next(Object previous) throws IOException {
super.next(previous);
Timestamp result = null;
if (valuePresent) {
if (previous == null) {
result = new Timestamp(0);
} else {
result = (Timestamp) previous;
}
long millis = (data.next() + WriterImpl.BASE_TIMESTAMP) *
WriterImpl.MILLIS_PER_SECOND;
int newNanos = parseNanos(nanos.next());
// fix the rounding when we divided by 1000.
if (millis >= 0) {
millis += newNanos / 1000000;
} else {
millis -= newNanos / 1000000;
}
result.setTime(millis);
result.setNanos(newNanos);
}
return result;
}
private static int parseNanos(long serialized) {
int zeros = 7 & (int) serialized;
int result = (int) serialized >>> 3;
if (zeros != 0) {
for(int i =0; i <= zeros; ++i) {
result *= 10;
}
}
return result;
}
@Override
void skipRows(long items) throws IOException {
items = countNonNulls(items);
data.skip(items);
nanos.skip(items);
}
}
private static class DecimalTreeReader extends TreeReader{
private InStream valueStream;
private RunLengthIntegerReader scaleStream;
DecimalTreeReader(Path path, int columnId) {
super(path, columnId);
}
@Override
void startStripe(Map<StreamName, InStream> streams,
List<OrcProto.ColumnEncoding> encodings
) throws IOException {
super.startStripe(streams, encodings);
valueStream = streams.get(new StreamName(columnId,
OrcProto.Stream.Kind.DATA));
scaleStream = new RunLengthIntegerReader(streams.get(
new StreamName(columnId, OrcProto.Stream.Kind.SECONDARY)), true);
}
@Override
void seek(PositionProvider[] index) throws IOException {
super.seek(index);
valueStream.seek(index[columnId]);
scaleStream.seek(index[columnId]);
}
@Override
Object next(Object previous) throws IOException {
super.next(previous);
if (valuePresent) {
return new HiveDecimal(SerializationUtils.readBigInteger(valueStream),
(int) scaleStream.next());
}
return null;
}
@Override
void skipRows(long items) throws IOException {
items = countNonNulls(items);
for(int i=0; i < items; i++) {
SerializationUtils.readBigInteger(valueStream);
}
scaleStream.skip(items);
}
}
private static class StringTreeReader extends TreeReader {
private DynamicByteArray dictionaryBuffer = null;
private int dictionarySize;
private int[] dictionaryOffsets;
private RunLengthIntegerReader reader;
StringTreeReader(Path path, int columnId) {
super(path, columnId);
}
void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException {
if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DICTIONARY) {
throw new IOException("Unknown encoding " + encoding + " in column " +
columnId + " of " + path);
}
}
@Override
void startStripe(Map<StreamName, InStream> streams,
List<OrcProto.ColumnEncoding> encodings
) throws IOException {
super.startStripe(streams, encodings);
// read the dictionary blob
dictionarySize = encodings.get(columnId).getDictionarySize();
StreamName name = new StreamName(columnId,
OrcProto.Stream.Kind.DICTIONARY_DATA);
InStream in = streams.get(name);
if (in.available() > 0) {
dictionaryBuffer = new DynamicByteArray(64, in.available());
dictionaryBuffer.readAll(in);
} else {
dictionaryBuffer = null;
}
in.close();
// read the lengths
name = new StreamName(columnId, OrcProto.Stream.Kind.LENGTH);
in = streams.get(name);
RunLengthIntegerReader lenReader = new RunLengthIntegerReader(in, false);
int offset = 0;
if (dictionaryOffsets == null ||
dictionaryOffsets.length < dictionarySize + 1) {
dictionaryOffsets = new int[dictionarySize + 1];
}
for(int i=0; i < dictionarySize; ++i) {
dictionaryOffsets[i] = offset;
offset += (int) lenReader.next();
}
dictionaryOffsets[dictionarySize] = offset;
in.close();
// set up the row reader
name = new StreamName(columnId, OrcProto.Stream.Kind.DATA);
reader = new RunLengthIntegerReader(streams.get(name), false);
}
@Override
void seek(PositionProvider[] index) throws IOException {
super.seek(index);
reader.seek(index[columnId]);
}
@Override
Object next(Object previous) throws IOException {
super.next(previous);
Text result = null;
if (valuePresent) {
int entry = (int) reader.next();
if (previous == null) {
result = new Text();
} else {
result = (Text) previous;
}
int offset = dictionaryOffsets[entry];
int length;
// if it isn't the last entry, subtract the offsets otherwise use
// the buffer length.
if (entry < dictionaryOffsets.length - 1) {
length = dictionaryOffsets[entry + 1] - offset;
} else {
length = dictionaryBuffer.size() - offset;
}
// If the column is just empty strings, the size will be zero,
// so the buffer will be null, in that case just return result
// as it will default to empty
if (dictionaryBuffer != null) {
dictionaryBuffer.setText(result, offset, length);
} else {
result.clear();
}
}
return result;
}
@Override
void skipRows(long items) throws IOException {
reader.skip(countNonNulls(items));
}
}
private static class StructTreeReader extends TreeReader {
private final TreeReader[] fields;
private final String[] fieldNames;
StructTreeReader(Path path, int columnId,
List<OrcProto.Type> types,
boolean[] included) throws IOException {
super(path, columnId);
OrcProto.Type type = types.get(columnId);
int fieldCount = type.getFieldNamesCount();
this.fields = new TreeReader[fieldCount];
this.fieldNames = new String[fieldCount];
for(int i=0; i < fieldCount; ++i) {
int subtype = type.getSubtypes(i);
if (included == null || included[subtype]) {
this.fields[i] = createTreeReader(path, subtype, types, included);
}
this.fieldNames[i] = type.getFieldNames(i);
}
}
@Override
void seek(PositionProvider[] index) throws IOException {
super.seek(index);
for(TreeReader kid: fields) {
if (kid != null) {
kid.seek(index);
}
}
}
@Override
Object next(Object previous) throws IOException {
super.next(previous);
OrcStruct result = null;
if (valuePresent) {
if (previous == null) {
result = new OrcStruct(fields.length);
} else {
result = (OrcStruct) previous;
// If the input format was initialized with a file with a
// different number of fields, the number of fields needs to
// be updated to the correct number
if (result.getNumFields() != fields.length) {
result.setNumFields(fields.length);
}
}
for(int i=0; i < fields.length; ++i) {
if (fields[i] != null) {
result.setFieldValue(i, fields[i].next(result.getFieldValue(i)));
}
}
}
return result;
}
@Override
void startStripe(Map<StreamName, InStream> streams,
List<OrcProto.ColumnEncoding> encodings
) throws IOException {
super.startStripe(streams, encodings);
for(TreeReader field: fields) {
if (field != null) {
field.startStripe(streams, encodings);
}
}
}
@Override
void skipRows(long items) throws IOException {
items = countNonNulls(items);
for(TreeReader field: fields) {
if (field != null) {
field.skipRows(items);
}
}
}
}
private static class UnionTreeReader extends TreeReader {
private final TreeReader[] fields;
private RunLengthByteReader tags;
UnionTreeReader(Path path, int columnId,
List<OrcProto.Type> types,
boolean[] included) throws IOException {
super(path, columnId);
OrcProto.Type type = types.get(columnId);
int fieldCount = type.getSubtypesCount();
this.fields = new TreeReader[fieldCount];
for(int i=0; i < fieldCount; ++i) {
int subtype = type.getSubtypes(i);
if (included == null || included[subtype]) {
this.fields[i] = createTreeReader(path, subtype, types, included);
}
}
}
@Override
void seek(PositionProvider[] index) throws IOException {
super.seek(index);
tags.seek(index[columnId]);
for(TreeReader kid: fields) {
kid.seek(index);
}
}
@Override
Object next(Object previous) throws IOException {
super.next(previous);
OrcUnion result = null;
if (valuePresent) {
if (previous == null) {
result = new OrcUnion();
} else {
result = (OrcUnion) previous;
}
byte tag = tags.next();
Object previousVal = result.getObject();
result.set(tag, fields[tag].next(tag == result.getTag() ?
previousVal : null));
}
return result;
}
@Override
void startStripe(Map<StreamName, InStream> streams,
List<OrcProto.ColumnEncoding> encodings
) throws IOException {
super.startStripe(streams, encodings);
tags = new RunLengthByteReader(streams.get(new StreamName(columnId,
OrcProto.Stream.Kind.DATA)));
for(TreeReader field: fields) {
if (field != null) {
field.startStripe(streams, encodings);
}
}
}
@Override
void skipRows(long items) throws IOException {
items = countNonNulls(items);
long[] counts = new long[fields.length];
for(int i=0; i < items; ++i) {
counts[tags.next()] += 1;
}
for(int i=0; i < counts.length; ++i) {
fields[i].skipRows(counts[i]);
}
}
}
private static class ListTreeReader extends TreeReader {
private final TreeReader elementReader;
private RunLengthIntegerReader lengths;
ListTreeReader(Path path, int columnId,
List<OrcProto.Type> types,
boolean[] included) throws IOException {
super(path, columnId);
OrcProto.Type type = types.get(columnId);
elementReader = createTreeReader(path, type.getSubtypes(0), types,
included);
}
@Override
void seek(PositionProvider[] index) throws IOException {
super.seek(index);
lengths.seek(index[columnId]);
elementReader.seek(index);
}
@Override
@SuppressWarnings("unchecked")
Object next(Object previous) throws IOException {
super.next(previous);
List<Object> result = null;
if (valuePresent) {
if (previous == null) {
result = new ArrayList<Object>();
} else {
result = (ArrayList<Object>) previous;
}
int prevLength = result.size();
int length = (int) lengths.next();
// extend the list to the new length
for(int i=prevLength; i < length; ++i) {
result.add(null);
}
// read the new elements into the array
for(int i=0; i< length; i++) {
result.set(i, elementReader.next(i < prevLength ?
result.get(i) : null));
}
// remove any extra elements
for(int i=prevLength - 1; i >= length; --i) {
result.remove(i);
}
}
return result;
}
@Override
void startStripe(Map<StreamName, InStream> streams,
List<OrcProto.ColumnEncoding> encodings
) throws IOException {
super.startStripe(streams, encodings);
lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
OrcProto.Stream.Kind.LENGTH)), false);
if (elementReader != null) {
elementReader.startStripe(streams, encodings);
}
}
@Override
void skipRows(long items) throws IOException {
items = countNonNulls(items);
long childSkip = 0;
for(long i=0; i < items; ++i) {
childSkip += lengths.next();
}
elementReader.skipRows(childSkip);
}
}
private static class MapTreeReader extends TreeReader {
private final TreeReader keyReader;
private final TreeReader valueReader;
private RunLengthIntegerReader lengths;
MapTreeReader(Path path,
int columnId,
List<OrcProto.Type> types,
boolean[] included) throws IOException {
super(path, columnId);
OrcProto.Type type = types.get(columnId);
int keyColumn = type.getSubtypes(0);
int valueColumn = type.getSubtypes(1);
if (included == null || included[keyColumn]) {
keyReader = createTreeReader(path, keyColumn, types, included);
} else {
keyReader = null;
}
if (included == null || included[valueColumn]) {
valueReader = createTreeReader(path, valueColumn, types, included);
} else {
valueReader = null;
}
}
@Override
void seek(PositionProvider[] index) throws IOException {
super.seek(index);
lengths.seek(index[columnId]);
keyReader.seek(index);
valueReader.seek(index);
}
@Override
@SuppressWarnings("unchecked")
Object next(Object previous) throws IOException {
super.next(previous);
Map<Object, Object> result = null;
if (valuePresent) {
if (previous == null) {
result = new HashMap<Object, Object>();
} else {
result = (HashMap<Object, Object>) previous;
}
// for now just clear and create new objects
result.clear();
int length = (int) lengths.next();
// read the new elements into the array
for(int i=0; i< length; i++) {
result.put(keyReader.next(null), valueReader.next(null));
}
}
return result;
}
@Override
void startStripe(Map<StreamName, InStream> streams,
List<OrcProto.ColumnEncoding> encodings
) throws IOException {
super.startStripe(streams, encodings);
lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId,
OrcProto.Stream.Kind.LENGTH)), false);
if (keyReader != null) {
keyReader.startStripe(streams, encodings);
}
if (valueReader != null) {
valueReader.startStripe(streams, encodings);
}
}
@Override
void skipRows(long items) throws IOException {
items = countNonNulls(items);
long childSkip = 0;
for(long i=0; i < items; ++i) {
childSkip += lengths.next();
}
keyReader.skipRows(childSkip);
valueReader.skipRows(childSkip);
}
}
private static TreeReader createTreeReader(Path path,
int columnId,
List<OrcProto.Type> types,
boolean[] included
) throws IOException {
OrcProto.Type type = types.get(columnId);
switch (type.getKind()) {
case BOOLEAN:
return new BooleanTreeReader(path, columnId);
case BYTE:
return new ByteTreeReader(path, columnId);
case DOUBLE:
return new DoubleTreeReader(path, columnId);
case FLOAT:
return new FloatTreeReader(path, columnId);
case SHORT:
return new ShortTreeReader(path, columnId);
case INT:
return new IntTreeReader(path, columnId);
case LONG:
return new LongTreeReader(path, columnId);
case STRING:
return new StringTreeReader(path, columnId);
case BINARY:
return new BinaryTreeReader(path, columnId);
case TIMESTAMP:
return new TimestampTreeReader(path, columnId);
case DECIMAL:
return new DecimalTreeReader(path, columnId);
case STRUCT:
return new StructTreeReader(path, columnId, types, included);
case LIST:
return new ListTreeReader(path, columnId, types, included);
case MAP:
return new MapTreeReader(path, columnId, types, included);
case UNION:
return new UnionTreeReader(path, columnId, types, included);
default:
throw new IllegalArgumentException("Unsupported type " +
type.getKind());
}
}
OrcProto.StripeFooter readStripeFooter(StripeInformation stripe
) throws IOException {
long offset = stripe.getOffset() + stripe.getIndexLength() +
stripe.getDataLength();
int tailLength = (int) stripe.getFooterLength();
// read the footer
ByteBuffer tailBuf = ByteBuffer.allocate(tailLength);
file.seek(offset);
file.readFully(tailBuf.array(), tailBuf.arrayOffset(), tailLength);
return OrcProto.StripeFooter.parseFrom(InStream.create("footer", tailBuf,
codec, bufferSize));
}
private void readStripe() throws IOException {
StripeInformation stripe = stripes.get(currentStripe);
stripeFooter = readStripeFooter(stripe);
long offset = stripe.getOffset();
streams.clear();
// if we aren't projecting columns, just read the whole stripe
if (included == null) {
byte[] buffer =
new byte[(int) (stripe.getDataLength())];
file.seek(offset + stripe.getIndexLength());
file.readFully(buffer, 0, buffer.length);
int sectionOffset = 0;
for(OrcProto.Stream section: stripeFooter.getStreamsList()) {
if (StreamName.getArea(section.getKind()) == StreamName.Area.DATA) {
int sectionLength = (int) section.getLength();
ByteBuffer sectionBuffer = ByteBuffer.wrap(buffer, sectionOffset,
sectionLength);
StreamName name = new StreamName(section.getColumn(),
section.getKind());
streams.put(name,
InStream.create(name.toString(), sectionBuffer, codec,
bufferSize));
sectionOffset += sectionLength;
}
}
} else {
List<OrcProto.Stream> streamList = stripeFooter.getStreamsList();
// the index of the current section
int currentSection = 0;
while (currentSection < streamList.size() &&
StreamName.getArea(streamList.get(currentSection).getKind()) !=
StreamName.Area.DATA) {
currentSection += 1;
}
// byte position of the current section relative to the stripe start
long sectionOffset = stripe.getIndexLength();
while (currentSection < streamList.size()) {
int bytes = 0;
// find the first section that shouldn't be read
int excluded=currentSection;
while (excluded < streamList.size() &&
included[streamList.get(excluded).getColumn()]) {
bytes += streamList.get(excluded).getLength();
excluded += 1;
}
// actually read the bytes as a big chunk
if (bytes != 0) {
byte[] buffer = new byte[bytes];
file.seek(offset + sectionOffset);
file.readFully(buffer, 0, bytes);
sectionOffset += bytes;
// create the streams for the sections we just read
bytes = 0;
while (currentSection < excluded) {
OrcProto.Stream section = streamList.get(currentSection);
StreamName name =
new StreamName(section.getColumn(), section.getKind());
this.streams.put(name,
InStream.create(name.toString(),
ByteBuffer.wrap(buffer, bytes,
(int) section.getLength()), codec, bufferSize));
currentSection += 1;
bytes += section.getLength();
}
}
// skip forward until we get back to a section that we need
while (currentSection < streamList.size() &&
!included[streamList.get(currentSection).getColumn()]) {
sectionOffset += streamList.get(currentSection).getLength();
currentSection += 1;
}
}
}
reader.startStripe(streams, stripeFooter.getColumnsList());
rowInStripe = 0;
rowCountInStripe = stripe.getNumberOfRows();
rowBaseInStripe = 0;
for(int i=0; i < currentStripe; ++i) {
rowBaseInStripe += stripes.get(i).getNumberOfRows();
}
for(int i=0; i < indexes.length; ++i) {
indexes[i] = null;
}
}
@Override
public boolean hasNext() throws IOException {
return rowInStripe < rowCountInStripe || currentStripe < stripes.size() - 1;
}
@Override
public Object next(Object previous) throws IOException {
if (rowInStripe >= rowCountInStripe) {
currentStripe += 1;
readStripe();
}
rowInStripe += 1;
return reader.next(previous);
}
@Override
public void close() throws IOException {
file.close();
}
@Override
public long getRowNumber() {
return rowInStripe + rowBaseInStripe + firstRow;
}
/**
* Return the fraction of rows that have been read from the selected.
* section of the file
* @return fraction between 0.0 and 1.0 of rows consumed
*/
@Override
public float getProgress() {
return ((float) rowBaseInStripe + rowInStripe) / totalRowCount;
}
private int findStripe(long rowNumber) {
if (rowNumber < 0) {
throw new IllegalArgumentException("Seek to a negative row number " +
rowNumber);
} else if (rowNumber < firstRow) {
throw new IllegalArgumentException("Seek before reader range " +
rowNumber);
}
rowNumber -= firstRow;
for(int i=0; i < stripes.size(); i++) {
StripeInformation stripe = stripes.get(i);
if (stripe.getNumberOfRows() > rowNumber) {
return i;
}
rowNumber -= stripe.getNumberOfRows();
}
throw new IllegalArgumentException("Seek after the end of reader range");
}
private void readRowIndex() throws IOException {
long offset = stripes.get(currentStripe).getOffset();
for(OrcProto.Stream stream: stripeFooter.getStreamsList()) {
if (stream.getKind() == OrcProto.Stream.Kind.ROW_INDEX) {
int col = stream.getColumn();
if ((included == null || included[col]) && indexes[col] == null) {
byte[] buffer = new byte[(int) stream.getLength()];
file.seek(offset);
file.readFully(buffer);
indexes[col] = OrcProto.RowIndex.parseFrom(InStream.create("index",
ByteBuffer.wrap(buffer), codec, bufferSize));
}
}
offset += stream.getLength();
}
}
private void seekToRowEntry(int rowEntry) throws IOException {
PositionProvider[] index = new PositionProvider[indexes.length];
for(int i=0; i < indexes.length; ++i) {
if (indexes[i] != null) {
index[i]=
new PositionProviderImpl(indexes[i].getEntry(rowEntry));
}
}
reader.seek(index);
}
@Override
public void seekToRow(long rowNumber) throws IOException {
int rightStripe = findStripe(rowNumber);
if (rightStripe != currentStripe) {
currentStripe = rightStripe;
readStripe();
}
readRowIndex();
rowInStripe = rowNumber - rowBaseInStripe - firstRow;
if (rowIndexStride != 0) {
long entry = rowInStripe / rowIndexStride;
seekToRowEntry((int) entry);
reader.skipRows(rowInStripe - entry * rowIndexStride);
} else {
reader.skipRows(rowInStripe);
}
}
}