| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.hadoop.hive.ql.io.orc; |
| |
| import java.io.EOFException; |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.sql.Timestamp; |
| import java.util.ArrayList; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| |
| import org.apache.hadoop.fs.FSDataInputStream; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hive.common.type.HiveDecimal; |
| import org.apache.hadoop.hive.serde2.io.ByteWritable; |
| import org.apache.hadoop.hive.serde2.io.DoubleWritable; |
| import org.apache.hadoop.hive.serde2.io.ShortWritable; |
| import org.apache.hadoop.io.BooleanWritable; |
| import org.apache.hadoop.io.BytesWritable; |
| import org.apache.hadoop.io.FloatWritable; |
| import org.apache.hadoop.io.IntWritable; |
| import org.apache.hadoop.io.LongWritable; |
| import org.apache.hadoop.io.Text; |
| |
| class RecordReaderImpl implements RecordReader { |
| private final FSDataInputStream file; |
| private final long firstRow; |
| private final List<StripeInformation> stripes = |
| new ArrayList<StripeInformation>(); |
| private OrcProto.StripeFooter stripeFooter; |
| private final long totalRowCount; |
| private final CompressionCodec codec; |
| private final int bufferSize; |
| private final boolean[] included; |
| private final long rowIndexStride; |
| private long rowInStripe = 0; |
| private int currentStripe = 0; |
| private long rowBaseInStripe = 0; |
| private long rowCountInStripe = 0; |
| private final Map<StreamName, InStream> streams = |
| new HashMap<StreamName, InStream>(); |
| private final TreeReader reader; |
| private final OrcProto.RowIndex[] indexes; |
| |
| RecordReaderImpl(Iterable<StripeInformation> stripes, |
| FileSystem fileSystem, |
| Path path, |
| long offset, long length, |
| List<OrcProto.Type> types, |
| CompressionCodec codec, |
| int bufferSize, |
| boolean[] included, |
| long strideRate |
| ) throws IOException { |
| this.file = fileSystem.open(path); |
| this.codec = codec; |
| this.bufferSize = bufferSize; |
| this.included = included; |
| long rows = 0; |
| long skippedRows = 0; |
| for(StripeInformation stripe: stripes) { |
| long stripeStart = stripe.getOffset(); |
| if (offset > stripeStart) { |
| skippedRows += stripe.getNumberOfRows(); |
| } else if (stripeStart < offset + length) { |
| this.stripes.add(stripe); |
| rows += stripe.getNumberOfRows(); |
| } |
| } |
| firstRow = skippedRows; |
| totalRowCount = rows; |
| reader = createTreeReader(path, 0, types, included); |
| indexes = new OrcProto.RowIndex[types.size()]; |
| rowIndexStride = strideRate; |
| if (this.stripes.size() > 0) { |
| readStripe(); |
| } |
| } |
| |
| private static final class PositionProviderImpl implements PositionProvider { |
| private final OrcProto.RowIndexEntry entry; |
| private int index = 0; |
| |
| PositionProviderImpl(OrcProto.RowIndexEntry entry) { |
| this.entry = entry; |
| } |
| |
| @Override |
| public long getNext() { |
| return entry.getPositions(index++); |
| } |
| } |
| |
| private abstract static class TreeReader { |
| protected final Path path; |
| protected final int columnId; |
| private BitFieldReader present = null; |
| protected boolean valuePresent = false; |
| |
| TreeReader(Path path, int columnId) { |
| this.path = path; |
| this.columnId = columnId; |
| } |
| |
| void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException { |
| if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DIRECT) { |
| throw new IOException("Unknown encoding " + encoding + " in column " + |
| columnId + " of " + path); |
| } |
| } |
| |
| void startStripe(Map<StreamName, InStream> streams, |
| List<OrcProto.ColumnEncoding> encoding |
| ) throws IOException { |
| checkEncoding(encoding.get(columnId)); |
| InStream in = streams.get(new StreamName(columnId, |
| OrcProto.Stream.Kind.PRESENT)); |
| if (in == null) { |
| present = null; |
| valuePresent = true; |
| } else { |
| present = new BitFieldReader(in, 1); |
| } |
| } |
| |
| /** |
| * Seek to the given position. |
| * @param index the indexes loaded from the file |
| * @throws IOException |
| */ |
| void seek(PositionProvider[] index) throws IOException { |
| if (present != null) { |
| present.seek(index[columnId]); |
| } |
| } |
| |
| protected long countNonNulls(long rows) throws IOException { |
| if (present != null) { |
| long result = 0; |
| for(long c=0; c < rows; ++c) { |
| if (present.next() == 1) { |
| result += 1; |
| } |
| } |
| return result; |
| } else { |
| return rows; |
| } |
| } |
| |
| abstract void skipRows(long rows) throws IOException; |
| |
| Object next(Object previous) throws IOException { |
| if (present != null) { |
| valuePresent = present.next() == 1; |
| } |
| return previous; |
| } |
| } |
| |
| private static class BooleanTreeReader extends TreeReader{ |
| private BitFieldReader reader = null; |
| |
| BooleanTreeReader(Path path, int columnId) { |
| super(path, columnId); |
| } |
| |
| @Override |
| void startStripe(Map<StreamName, InStream> streams, |
| List<OrcProto.ColumnEncoding> encodings |
| ) throws IOException { |
| super.startStripe(streams, encodings); |
| reader = new BitFieldReader(streams.get(new StreamName(columnId, |
| OrcProto.Stream.Kind.DATA)), 1); |
| } |
| |
| @Override |
| void seek(PositionProvider[] index) throws IOException { |
| super.seek(index); |
| reader.seek(index[columnId]); |
| } |
| |
| @Override |
| void skipRows(long items) throws IOException { |
| reader.skip(countNonNulls(items)); |
| } |
| |
| @Override |
| Object next(Object previous) throws IOException { |
| super.next(previous); |
| BooleanWritable result = null; |
| if (valuePresent) { |
| if (previous == null) { |
| result = new BooleanWritable(); |
| } else { |
| result = (BooleanWritable) previous; |
| } |
| result.set(reader.next() == 1); |
| } |
| return result; |
| } |
| } |
| |
| private static class ByteTreeReader extends TreeReader{ |
| private RunLengthByteReader reader = null; |
| |
| ByteTreeReader(Path path, int columnId) { |
| super(path, columnId); |
| } |
| |
| @Override |
| void startStripe(Map<StreamName, InStream> streams, |
| List<OrcProto.ColumnEncoding> encodings |
| ) throws IOException { |
| super.startStripe(streams, encodings); |
| reader = new RunLengthByteReader(streams.get(new StreamName(columnId, |
| OrcProto.Stream.Kind.DATA))); |
| } |
| |
| @Override |
| void seek(PositionProvider[] index) throws IOException { |
| super.seek(index); |
| reader.seek(index[columnId]); |
| } |
| |
| @Override |
| Object next(Object previous) throws IOException { |
| super.next(previous); |
| ByteWritable result = null; |
| if (valuePresent) { |
| if (previous == null) { |
| result = new ByteWritable(); |
| } else { |
| result = (ByteWritable) previous; |
| } |
| result.set(reader.next()); |
| } |
| return result; |
| } |
| |
| @Override |
| void skipRows(long items) throws IOException { |
| reader.skip(countNonNulls(items)); |
| } |
| } |
| |
| private static class ShortTreeReader extends TreeReader{ |
| private RunLengthIntegerReader reader = null; |
| |
| ShortTreeReader(Path path, int columnId) { |
| super(path, columnId); |
| } |
| |
| @Override |
| void startStripe(Map<StreamName, InStream> streams, |
| List<OrcProto.ColumnEncoding> encodings |
| ) throws IOException { |
| super.startStripe(streams, encodings); |
| StreamName name = new StreamName(columnId, |
| OrcProto.Stream.Kind.DATA); |
| reader = new RunLengthIntegerReader(streams.get(name), true); |
| } |
| |
| @Override |
| void seek(PositionProvider[] index) throws IOException { |
| super.seek(index); |
| reader.seek(index[columnId]); |
| } |
| |
| @Override |
| Object next(Object previous) throws IOException { |
| super.next(previous); |
| ShortWritable result = null; |
| if (valuePresent) { |
| if (previous == null) { |
| result = new ShortWritable(); |
| } else { |
| result = (ShortWritable) previous; |
| } |
| result.set((short) reader.next()); |
| } |
| return result; |
| } |
| |
| @Override |
| void skipRows(long items) throws IOException { |
| reader.skip(countNonNulls(items)); |
| } |
| } |
| |
| private static class IntTreeReader extends TreeReader{ |
| private RunLengthIntegerReader reader = null; |
| |
| IntTreeReader(Path path, int columnId) { |
| super(path, columnId); |
| } |
| |
| @Override |
| void startStripe(Map<StreamName, InStream> streams, |
| List<OrcProto.ColumnEncoding> encodings |
| ) throws IOException { |
| super.startStripe(streams, encodings); |
| StreamName name = new StreamName(columnId, |
| OrcProto.Stream.Kind.DATA); |
| reader = new RunLengthIntegerReader(streams.get(name), true); |
| } |
| |
| @Override |
| void seek(PositionProvider[] index) throws IOException { |
| super.seek(index); |
| reader.seek(index[columnId]); |
| } |
| |
| @Override |
| Object next(Object previous) throws IOException { |
| super.next(previous); |
| IntWritable result = null; |
| if (valuePresent) { |
| if (previous == null) { |
| result = new IntWritable(); |
| } else { |
| result = (IntWritable) previous; |
| } |
| result.set((int) reader.next()); |
| } |
| return result; |
| } |
| |
| @Override |
| void skipRows(long items) throws IOException { |
| reader.skip(countNonNulls(items)); |
| } |
| } |
| |
| private static class LongTreeReader extends TreeReader{ |
| private RunLengthIntegerReader reader = null; |
| |
| LongTreeReader(Path path, int columnId) { |
| super(path, columnId); |
| } |
| |
| @Override |
| void startStripe(Map<StreamName, InStream> streams, |
| List<OrcProto.ColumnEncoding> encodings |
| ) throws IOException { |
| super.startStripe(streams, encodings); |
| StreamName name = new StreamName(columnId, |
| OrcProto.Stream.Kind.DATA); |
| reader = new RunLengthIntegerReader(streams.get(name), true); |
| } |
| |
| @Override |
| void seek(PositionProvider[] index) throws IOException { |
| super.seek(index); |
| reader.seek(index[columnId]); |
| } |
| |
| @Override |
| Object next(Object previous) throws IOException { |
| super.next(previous); |
| LongWritable result = null; |
| if (valuePresent) { |
| if (previous == null) { |
| result = new LongWritable(); |
| } else { |
| result = (LongWritable) previous; |
| } |
| result.set(reader.next()); |
| } |
| return result; |
| } |
| |
| @Override |
| void skipRows(long items) throws IOException { |
| reader.skip(countNonNulls(items)); |
| } |
| } |
| |
| private static class FloatTreeReader extends TreeReader{ |
| private InStream stream; |
| |
| FloatTreeReader(Path path, int columnId) { |
| super(path, columnId); |
| } |
| |
| @Override |
| void startStripe(Map<StreamName, InStream> streams, |
| List<OrcProto.ColumnEncoding> encodings |
| ) throws IOException { |
| super.startStripe(streams, encodings); |
| StreamName name = new StreamName(columnId, |
| OrcProto.Stream.Kind.DATA); |
| stream = streams.get(name); |
| } |
| |
| @Override |
| void seek(PositionProvider[] index) throws IOException { |
| super.seek(index); |
| stream.seek(index[columnId]); |
| } |
| |
| @Override |
| Object next(Object previous) throws IOException { |
| super.next(previous); |
| FloatWritable result = null; |
| if (valuePresent) { |
| if (previous == null) { |
| result = new FloatWritable(); |
| } else { |
| result = (FloatWritable) previous; |
| } |
| result.set(SerializationUtils.readFloat(stream)); |
| } |
| return result; |
| } |
| |
| @Override |
| void skipRows(long items) throws IOException { |
| items = countNonNulls(items); |
| for(int i=0; i < items; ++i) { |
| SerializationUtils.readFloat(stream); |
| } |
| } |
| } |
| |
| private static class DoubleTreeReader extends TreeReader{ |
| private InStream stream; |
| |
| DoubleTreeReader(Path path, int columnId) { |
| super(path, columnId); |
| } |
| |
| @Override |
| void startStripe(Map<StreamName, InStream> streams, |
| List<OrcProto.ColumnEncoding> encodings |
| ) throws IOException { |
| super.startStripe(streams, encodings); |
| StreamName name = |
| new StreamName(columnId, |
| OrcProto.Stream.Kind.DATA); |
| stream = streams.get(name); |
| } |
| |
| @Override |
| void seek(PositionProvider[] index) throws IOException { |
| super.seek(index); |
| stream.seek(index[columnId]); |
| } |
| |
| @Override |
| Object next(Object previous) throws IOException { |
| super.next(previous); |
| DoubleWritable result = null; |
| if (valuePresent) { |
| if (previous == null) { |
| result = new DoubleWritable(); |
| } else { |
| result = (DoubleWritable) previous; |
| } |
| result.set(SerializationUtils.readDouble(stream)); |
| } |
| return result; |
| } |
| |
| @Override |
| void skipRows(long items) throws IOException { |
| items = countNonNulls(items); |
| stream.skip(items * 8); |
| } |
| } |
| |
| private static class BinaryTreeReader extends TreeReader{ |
| private InStream stream; |
| private RunLengthIntegerReader lengths; |
| |
| BinaryTreeReader(Path path, int columnId) { |
| super(path, columnId); |
| } |
| |
| @Override |
| void startStripe(Map<StreamName, InStream> streams, |
| List<OrcProto.ColumnEncoding> encodings |
| ) throws IOException { |
| super.startStripe(streams, encodings); |
| StreamName name = new StreamName(columnId, |
| OrcProto.Stream.Kind.DATA); |
| stream = streams.get(name); |
| lengths = new RunLengthIntegerReader(streams.get(new |
| StreamName(columnId, OrcProto.Stream.Kind.LENGTH)), |
| false); |
| } |
| |
| @Override |
| void seek(PositionProvider[] index) throws IOException { |
| super.seek(index); |
| stream.seek(index[columnId]); |
| lengths.seek(index[columnId]); |
| } |
| |
| @Override |
| Object next(Object previous) throws IOException { |
| super.next(previous); |
| BytesWritable result = null; |
| if (valuePresent) { |
| if (previous == null) { |
| result = new BytesWritable(); |
| } else { |
| result = (BytesWritable) previous; |
| } |
| int len = (int) lengths.next(); |
| result.setSize(len); |
| int offset = 0; |
| while (len > 0) { |
| int written = stream.read(result.getBytes(), offset, len); |
| if (written < 0) { |
| throw new EOFException("Can't finish byte read from " + stream); |
| } |
| len -= written; |
| offset += written; |
| } |
| } |
| return result; |
| } |
| |
| @Override |
| void skipRows(long items) throws IOException { |
| items = countNonNulls(items); |
| long lengthToSkip = 0; |
| for(int i=0; i < items; ++i) { |
| lengthToSkip += lengths.next(); |
| } |
| stream.skip(lengthToSkip); |
| } |
| } |
| |
| private static class TimestampTreeReader extends TreeReader{ |
| private RunLengthIntegerReader data; |
| private RunLengthIntegerReader nanos; |
| |
| TimestampTreeReader(Path path, int columnId) { |
| super(path, columnId); |
| } |
| |
| @Override |
| void startStripe(Map<StreamName, InStream> streams, |
| List<OrcProto.ColumnEncoding> encodings |
| ) throws IOException { |
| super.startStripe(streams, encodings); |
| data = new RunLengthIntegerReader(streams.get(new StreamName(columnId, |
| OrcProto.Stream.Kind.DATA)), true); |
| nanos = new RunLengthIntegerReader(streams.get(new StreamName(columnId, |
| OrcProto.Stream.Kind.SECONDARY)), false); |
| } |
| |
| @Override |
| void seek(PositionProvider[] index) throws IOException { |
| super.seek(index); |
| data.seek(index[columnId]); |
| nanos.seek(index[columnId]); |
| } |
| |
| @Override |
| Object next(Object previous) throws IOException { |
| super.next(previous); |
| Timestamp result = null; |
| if (valuePresent) { |
| if (previous == null) { |
| result = new Timestamp(0); |
| } else { |
| result = (Timestamp) previous; |
| } |
| long millis = (data.next() + WriterImpl.BASE_TIMESTAMP) * |
| WriterImpl.MILLIS_PER_SECOND; |
| int newNanos = parseNanos(nanos.next()); |
| // fix the rounding when we divided by 1000. |
| if (millis >= 0) { |
| millis += newNanos / 1000000; |
| } else { |
| millis -= newNanos / 1000000; |
| } |
| result.setTime(millis); |
| result.setNanos(newNanos); |
| } |
| return result; |
| } |
| |
| private static int parseNanos(long serialized) { |
| int zeros = 7 & (int) serialized; |
| int result = (int) serialized >>> 3; |
| if (zeros != 0) { |
| for(int i =0; i <= zeros; ++i) { |
| result *= 10; |
| } |
| } |
| return result; |
| } |
| |
| @Override |
| void skipRows(long items) throws IOException { |
| items = countNonNulls(items); |
| data.skip(items); |
| nanos.skip(items); |
| } |
| } |
| |
| private static class DecimalTreeReader extends TreeReader{ |
| private InStream valueStream; |
| private RunLengthIntegerReader scaleStream; |
| |
| DecimalTreeReader(Path path, int columnId) { |
| super(path, columnId); |
| } |
| |
| @Override |
| void startStripe(Map<StreamName, InStream> streams, |
| List<OrcProto.ColumnEncoding> encodings |
| ) throws IOException { |
| super.startStripe(streams, encodings); |
| valueStream = streams.get(new StreamName(columnId, |
| OrcProto.Stream.Kind.DATA)); |
| scaleStream = new RunLengthIntegerReader(streams.get( |
| new StreamName(columnId, OrcProto.Stream.Kind.SECONDARY)), true); |
| } |
| |
| @Override |
| void seek(PositionProvider[] index) throws IOException { |
| super.seek(index); |
| valueStream.seek(index[columnId]); |
| scaleStream.seek(index[columnId]); |
| } |
| |
| @Override |
| Object next(Object previous) throws IOException { |
| super.next(previous); |
| if (valuePresent) { |
| return new HiveDecimal(SerializationUtils.readBigInteger(valueStream), |
| (int) scaleStream.next()); |
| } |
| return null; |
| } |
| |
| @Override |
| void skipRows(long items) throws IOException { |
| items = countNonNulls(items); |
| for(int i=0; i < items; i++) { |
| SerializationUtils.readBigInteger(valueStream); |
| } |
| scaleStream.skip(items); |
| } |
| } |
| |
| private static class StringTreeReader extends TreeReader { |
| private DynamicByteArray dictionaryBuffer = null; |
| private int dictionarySize; |
| private int[] dictionaryOffsets; |
| private RunLengthIntegerReader reader; |
| |
| StringTreeReader(Path path, int columnId) { |
| super(path, columnId); |
| } |
| |
| void checkEncoding(OrcProto.ColumnEncoding encoding) throws IOException { |
| if (encoding.getKind() != OrcProto.ColumnEncoding.Kind.DICTIONARY) { |
| throw new IOException("Unknown encoding " + encoding + " in column " + |
| columnId + " of " + path); |
| } |
| } |
| |
| @Override |
| void startStripe(Map<StreamName, InStream> streams, |
| List<OrcProto.ColumnEncoding> encodings |
| ) throws IOException { |
| super.startStripe(streams, encodings); |
| |
| // read the dictionary blob |
| dictionarySize = encodings.get(columnId).getDictionarySize(); |
| StreamName name = new StreamName(columnId, |
| OrcProto.Stream.Kind.DICTIONARY_DATA); |
| InStream in = streams.get(name); |
| if (in.available() > 0) { |
| dictionaryBuffer = new DynamicByteArray(64, in.available()); |
| dictionaryBuffer.readAll(in); |
| } else { |
| dictionaryBuffer = null; |
| } |
| in.close(); |
| |
| // read the lengths |
| name = new StreamName(columnId, OrcProto.Stream.Kind.LENGTH); |
| in = streams.get(name); |
| RunLengthIntegerReader lenReader = new RunLengthIntegerReader(in, false); |
| int offset = 0; |
| if (dictionaryOffsets == null || |
| dictionaryOffsets.length < dictionarySize + 1) { |
| dictionaryOffsets = new int[dictionarySize + 1]; |
| } |
| for(int i=0; i < dictionarySize; ++i) { |
| dictionaryOffsets[i] = offset; |
| offset += (int) lenReader.next(); |
| } |
| dictionaryOffsets[dictionarySize] = offset; |
| in.close(); |
| |
| // set up the row reader |
| name = new StreamName(columnId, OrcProto.Stream.Kind.DATA); |
| reader = new RunLengthIntegerReader(streams.get(name), false); |
| } |
| |
| @Override |
| void seek(PositionProvider[] index) throws IOException { |
| super.seek(index); |
| reader.seek(index[columnId]); |
| } |
| |
| @Override |
| Object next(Object previous) throws IOException { |
| super.next(previous); |
| Text result = null; |
| if (valuePresent) { |
| int entry = (int) reader.next(); |
| if (previous == null) { |
| result = new Text(); |
| } else { |
| result = (Text) previous; |
| } |
| int offset = dictionaryOffsets[entry]; |
| int length; |
| // if it isn't the last entry, subtract the offsets otherwise use |
| // the buffer length. |
| if (entry < dictionaryOffsets.length - 1) { |
| length = dictionaryOffsets[entry + 1] - offset; |
| } else { |
| length = dictionaryBuffer.size() - offset; |
| } |
| // If the column is just empty strings, the size will be zero, |
| // so the buffer will be null, in that case just return result |
| // as it will default to empty |
| if (dictionaryBuffer != null) { |
| dictionaryBuffer.setText(result, offset, length); |
| } else { |
| result.clear(); |
| } |
| } |
| return result; |
| } |
| |
| @Override |
| void skipRows(long items) throws IOException { |
| reader.skip(countNonNulls(items)); |
| } |
| } |
| |
| private static class StructTreeReader extends TreeReader { |
| private final TreeReader[] fields; |
| private final String[] fieldNames; |
| |
| StructTreeReader(Path path, int columnId, |
| List<OrcProto.Type> types, |
| boolean[] included) throws IOException { |
| super(path, columnId); |
| OrcProto.Type type = types.get(columnId); |
| int fieldCount = type.getFieldNamesCount(); |
| this.fields = new TreeReader[fieldCount]; |
| this.fieldNames = new String[fieldCount]; |
| for(int i=0; i < fieldCount; ++i) { |
| int subtype = type.getSubtypes(i); |
| if (included == null || included[subtype]) { |
| this.fields[i] = createTreeReader(path, subtype, types, included); |
| } |
| this.fieldNames[i] = type.getFieldNames(i); |
| } |
| } |
| |
| @Override |
| void seek(PositionProvider[] index) throws IOException { |
| super.seek(index); |
| for(TreeReader kid: fields) { |
| if (kid != null) { |
| kid.seek(index); |
| } |
| } |
| } |
| |
| @Override |
| Object next(Object previous) throws IOException { |
| super.next(previous); |
| OrcStruct result = null; |
| if (valuePresent) { |
| if (previous == null) { |
| result = new OrcStruct(fields.length); |
| } else { |
| result = (OrcStruct) previous; |
| |
| // If the input format was initialized with a file with a |
| // different number of fields, the number of fields needs to |
| // be updated to the correct number |
| if (result.getNumFields() != fields.length) { |
| result.setNumFields(fields.length); |
| } |
| } |
| for(int i=0; i < fields.length; ++i) { |
| if (fields[i] != null) { |
| result.setFieldValue(i, fields[i].next(result.getFieldValue(i))); |
| } |
| } |
| } |
| return result; |
| } |
| |
| @Override |
| void startStripe(Map<StreamName, InStream> streams, |
| List<OrcProto.ColumnEncoding> encodings |
| ) throws IOException { |
| super.startStripe(streams, encodings); |
| for(TreeReader field: fields) { |
| if (field != null) { |
| field.startStripe(streams, encodings); |
| } |
| } |
| } |
| |
| @Override |
| void skipRows(long items) throws IOException { |
| items = countNonNulls(items); |
| for(TreeReader field: fields) { |
| if (field != null) { |
| field.skipRows(items); |
| } |
| } |
| } |
| } |
| |
| private static class UnionTreeReader extends TreeReader { |
| private final TreeReader[] fields; |
| private RunLengthByteReader tags; |
| |
| UnionTreeReader(Path path, int columnId, |
| List<OrcProto.Type> types, |
| boolean[] included) throws IOException { |
| super(path, columnId); |
| OrcProto.Type type = types.get(columnId); |
| int fieldCount = type.getSubtypesCount(); |
| this.fields = new TreeReader[fieldCount]; |
| for(int i=0; i < fieldCount; ++i) { |
| int subtype = type.getSubtypes(i); |
| if (included == null || included[subtype]) { |
| this.fields[i] = createTreeReader(path, subtype, types, included); |
| } |
| } |
| } |
| |
| @Override |
| void seek(PositionProvider[] index) throws IOException { |
| super.seek(index); |
| tags.seek(index[columnId]); |
| for(TreeReader kid: fields) { |
| kid.seek(index); |
| } |
| } |
| |
| @Override |
| Object next(Object previous) throws IOException { |
| super.next(previous); |
| OrcUnion result = null; |
| if (valuePresent) { |
| if (previous == null) { |
| result = new OrcUnion(); |
| } else { |
| result = (OrcUnion) previous; |
| } |
| byte tag = tags.next(); |
| Object previousVal = result.getObject(); |
| result.set(tag, fields[tag].next(tag == result.getTag() ? |
| previousVal : null)); |
| } |
| return result; |
| } |
| |
| @Override |
| void startStripe(Map<StreamName, InStream> streams, |
| List<OrcProto.ColumnEncoding> encodings |
| ) throws IOException { |
| super.startStripe(streams, encodings); |
| tags = new RunLengthByteReader(streams.get(new StreamName(columnId, |
| OrcProto.Stream.Kind.DATA))); |
| for(TreeReader field: fields) { |
| if (field != null) { |
| field.startStripe(streams, encodings); |
| } |
| } |
| } |
| |
| @Override |
| void skipRows(long items) throws IOException { |
| items = countNonNulls(items); |
| long[] counts = new long[fields.length]; |
| for(int i=0; i < items; ++i) { |
| counts[tags.next()] += 1; |
| } |
| for(int i=0; i < counts.length; ++i) { |
| fields[i].skipRows(counts[i]); |
| } |
| } |
| } |
| |
| private static class ListTreeReader extends TreeReader { |
| private final TreeReader elementReader; |
| private RunLengthIntegerReader lengths; |
| |
| ListTreeReader(Path path, int columnId, |
| List<OrcProto.Type> types, |
| boolean[] included) throws IOException { |
| super(path, columnId); |
| OrcProto.Type type = types.get(columnId); |
| elementReader = createTreeReader(path, type.getSubtypes(0), types, |
| included); |
| } |
| |
| @Override |
| void seek(PositionProvider[] index) throws IOException { |
| super.seek(index); |
| lengths.seek(index[columnId]); |
| elementReader.seek(index); |
| } |
| |
| @Override |
| @SuppressWarnings("unchecked") |
| Object next(Object previous) throws IOException { |
| super.next(previous); |
| List<Object> result = null; |
| if (valuePresent) { |
| if (previous == null) { |
| result = new ArrayList<Object>(); |
| } else { |
| result = (ArrayList<Object>) previous; |
| } |
| int prevLength = result.size(); |
| int length = (int) lengths.next(); |
| // extend the list to the new length |
| for(int i=prevLength; i < length; ++i) { |
| result.add(null); |
| } |
| // read the new elements into the array |
| for(int i=0; i< length; i++) { |
| result.set(i, elementReader.next(i < prevLength ? |
| result.get(i) : null)); |
| } |
| // remove any extra elements |
| for(int i=prevLength - 1; i >= length; --i) { |
| result.remove(i); |
| } |
| } |
| return result; |
| } |
| |
| @Override |
| void startStripe(Map<StreamName, InStream> streams, |
| List<OrcProto.ColumnEncoding> encodings |
| ) throws IOException { |
| super.startStripe(streams, encodings); |
| lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId, |
| OrcProto.Stream.Kind.LENGTH)), false); |
| if (elementReader != null) { |
| elementReader.startStripe(streams, encodings); |
| } |
| } |
| |
| @Override |
| void skipRows(long items) throws IOException { |
| items = countNonNulls(items); |
| long childSkip = 0; |
| for(long i=0; i < items; ++i) { |
| childSkip += lengths.next(); |
| } |
| elementReader.skipRows(childSkip); |
| } |
| } |
| |
| private static class MapTreeReader extends TreeReader { |
| private final TreeReader keyReader; |
| private final TreeReader valueReader; |
| private RunLengthIntegerReader lengths; |
| |
| MapTreeReader(Path path, |
| int columnId, |
| List<OrcProto.Type> types, |
| boolean[] included) throws IOException { |
| super(path, columnId); |
| OrcProto.Type type = types.get(columnId); |
| int keyColumn = type.getSubtypes(0); |
| int valueColumn = type.getSubtypes(1); |
| if (included == null || included[keyColumn]) { |
| keyReader = createTreeReader(path, keyColumn, types, included); |
| } else { |
| keyReader = null; |
| } |
| if (included == null || included[valueColumn]) { |
| valueReader = createTreeReader(path, valueColumn, types, included); |
| } else { |
| valueReader = null; |
| } |
| } |
| |
| @Override |
| void seek(PositionProvider[] index) throws IOException { |
| super.seek(index); |
| lengths.seek(index[columnId]); |
| keyReader.seek(index); |
| valueReader.seek(index); |
| } |
| |
| @Override |
| @SuppressWarnings("unchecked") |
| Object next(Object previous) throws IOException { |
| super.next(previous); |
| Map<Object, Object> result = null; |
| if (valuePresent) { |
| if (previous == null) { |
| result = new HashMap<Object, Object>(); |
| } else { |
| result = (HashMap<Object, Object>) previous; |
| } |
| // for now just clear and create new objects |
| result.clear(); |
| int length = (int) lengths.next(); |
| // read the new elements into the array |
| for(int i=0; i< length; i++) { |
| result.put(keyReader.next(null), valueReader.next(null)); |
| } |
| } |
| return result; |
| } |
| |
| @Override |
| void startStripe(Map<StreamName, InStream> streams, |
| List<OrcProto.ColumnEncoding> encodings |
| ) throws IOException { |
| super.startStripe(streams, encodings); |
| lengths = new RunLengthIntegerReader(streams.get(new StreamName(columnId, |
| OrcProto.Stream.Kind.LENGTH)), false); |
| if (keyReader != null) { |
| keyReader.startStripe(streams, encodings); |
| } |
| if (valueReader != null) { |
| valueReader.startStripe(streams, encodings); |
| } |
| } |
| |
| @Override |
| void skipRows(long items) throws IOException { |
| items = countNonNulls(items); |
| long childSkip = 0; |
| for(long i=0; i < items; ++i) { |
| childSkip += lengths.next(); |
| } |
| keyReader.skipRows(childSkip); |
| valueReader.skipRows(childSkip); |
| } |
| } |
| |
| private static TreeReader createTreeReader(Path path, |
| int columnId, |
| List<OrcProto.Type> types, |
| boolean[] included |
| ) throws IOException { |
| OrcProto.Type type = types.get(columnId); |
| switch (type.getKind()) { |
| case BOOLEAN: |
| return new BooleanTreeReader(path, columnId); |
| case BYTE: |
| return new ByteTreeReader(path, columnId); |
| case DOUBLE: |
| return new DoubleTreeReader(path, columnId); |
| case FLOAT: |
| return new FloatTreeReader(path, columnId); |
| case SHORT: |
| return new ShortTreeReader(path, columnId); |
| case INT: |
| return new IntTreeReader(path, columnId); |
| case LONG: |
| return new LongTreeReader(path, columnId); |
| case STRING: |
| return new StringTreeReader(path, columnId); |
| case BINARY: |
| return new BinaryTreeReader(path, columnId); |
| case TIMESTAMP: |
| return new TimestampTreeReader(path, columnId); |
| case DECIMAL: |
| return new DecimalTreeReader(path, columnId); |
| case STRUCT: |
| return new StructTreeReader(path, columnId, types, included); |
| case LIST: |
| return new ListTreeReader(path, columnId, types, included); |
| case MAP: |
| return new MapTreeReader(path, columnId, types, included); |
| case UNION: |
| return new UnionTreeReader(path, columnId, types, included); |
| default: |
| throw new IllegalArgumentException("Unsupported type " + |
| type.getKind()); |
| } |
| } |
| |
| OrcProto.StripeFooter readStripeFooter(StripeInformation stripe |
| ) throws IOException { |
| long offset = stripe.getOffset() + stripe.getIndexLength() + |
| stripe.getDataLength(); |
| int tailLength = (int) stripe.getFooterLength(); |
| |
| // read the footer |
| ByteBuffer tailBuf = ByteBuffer.allocate(tailLength); |
| file.seek(offset); |
| file.readFully(tailBuf.array(), tailBuf.arrayOffset(), tailLength); |
| return OrcProto.StripeFooter.parseFrom(InStream.create("footer", tailBuf, |
| codec, bufferSize)); |
| } |
| |
| private void readStripe() throws IOException { |
| StripeInformation stripe = stripes.get(currentStripe); |
| stripeFooter = readStripeFooter(stripe); |
| long offset = stripe.getOffset(); |
| streams.clear(); |
| |
| // if we aren't projecting columns, just read the whole stripe |
| if (included == null) { |
| byte[] buffer = |
| new byte[(int) (stripe.getDataLength())]; |
| file.seek(offset + stripe.getIndexLength()); |
| file.readFully(buffer, 0, buffer.length); |
| int sectionOffset = 0; |
| for(OrcProto.Stream section: stripeFooter.getStreamsList()) { |
| if (StreamName.getArea(section.getKind()) == StreamName.Area.DATA) { |
| int sectionLength = (int) section.getLength(); |
| ByteBuffer sectionBuffer = ByteBuffer.wrap(buffer, sectionOffset, |
| sectionLength); |
| StreamName name = new StreamName(section.getColumn(), |
| section.getKind()); |
| streams.put(name, |
| InStream.create(name.toString(), sectionBuffer, codec, |
| bufferSize)); |
| sectionOffset += sectionLength; |
| } |
| } |
| } else { |
| List<OrcProto.Stream> streamList = stripeFooter.getStreamsList(); |
| // the index of the current section |
| int currentSection = 0; |
| while (currentSection < streamList.size() && |
| StreamName.getArea(streamList.get(currentSection).getKind()) != |
| StreamName.Area.DATA) { |
| currentSection += 1; |
| } |
| // byte position of the current section relative to the stripe start |
| long sectionOffset = stripe.getIndexLength(); |
| while (currentSection < streamList.size()) { |
| int bytes = 0; |
| |
| // find the first section that shouldn't be read |
| int excluded=currentSection; |
| while (excluded < streamList.size() && |
| included[streamList.get(excluded).getColumn()]) { |
| bytes += streamList.get(excluded).getLength(); |
| excluded += 1; |
| } |
| |
| // actually read the bytes as a big chunk |
| if (bytes != 0) { |
| byte[] buffer = new byte[bytes]; |
| file.seek(offset + sectionOffset); |
| file.readFully(buffer, 0, bytes); |
| sectionOffset += bytes; |
| |
| // create the streams for the sections we just read |
| bytes = 0; |
| while (currentSection < excluded) { |
| OrcProto.Stream section = streamList.get(currentSection); |
| StreamName name = |
| new StreamName(section.getColumn(), section.getKind()); |
| this.streams.put(name, |
| InStream.create(name.toString(), |
| ByteBuffer.wrap(buffer, bytes, |
| (int) section.getLength()), codec, bufferSize)); |
| currentSection += 1; |
| bytes += section.getLength(); |
| } |
| } |
| |
| // skip forward until we get back to a section that we need |
| while (currentSection < streamList.size() && |
| !included[streamList.get(currentSection).getColumn()]) { |
| sectionOffset += streamList.get(currentSection).getLength(); |
| currentSection += 1; |
| } |
| } |
| } |
| reader.startStripe(streams, stripeFooter.getColumnsList()); |
| rowInStripe = 0; |
| rowCountInStripe = stripe.getNumberOfRows(); |
| rowBaseInStripe = 0; |
| for(int i=0; i < currentStripe; ++i) { |
| rowBaseInStripe += stripes.get(i).getNumberOfRows(); |
| } |
| for(int i=0; i < indexes.length; ++i) { |
| indexes[i] = null; |
| } |
| } |
| |
| @Override |
| public boolean hasNext() throws IOException { |
| return rowInStripe < rowCountInStripe || currentStripe < stripes.size() - 1; |
| } |
| |
| @Override |
| public Object next(Object previous) throws IOException { |
| if (rowInStripe >= rowCountInStripe) { |
| currentStripe += 1; |
| readStripe(); |
| } |
| rowInStripe += 1; |
| return reader.next(previous); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| file.close(); |
| } |
| |
| @Override |
| public long getRowNumber() { |
| return rowInStripe + rowBaseInStripe + firstRow; |
| } |
| |
| /** |
| * Return the fraction of rows that have been read from the selected. |
| * section of the file |
| * @return fraction between 0.0 and 1.0 of rows consumed |
| */ |
| @Override |
| public float getProgress() { |
| return ((float) rowBaseInStripe + rowInStripe) / totalRowCount; |
| } |
| |
| private int findStripe(long rowNumber) { |
| if (rowNumber < 0) { |
| throw new IllegalArgumentException("Seek to a negative row number " + |
| rowNumber); |
| } else if (rowNumber < firstRow) { |
| throw new IllegalArgumentException("Seek before reader range " + |
| rowNumber); |
| } |
| rowNumber -= firstRow; |
| for(int i=0; i < stripes.size(); i++) { |
| StripeInformation stripe = stripes.get(i); |
| if (stripe.getNumberOfRows() > rowNumber) { |
| return i; |
| } |
| rowNumber -= stripe.getNumberOfRows(); |
| } |
| throw new IllegalArgumentException("Seek after the end of reader range"); |
| } |
| |
| private void readRowIndex() throws IOException { |
| long offset = stripes.get(currentStripe).getOffset(); |
| for(OrcProto.Stream stream: stripeFooter.getStreamsList()) { |
| if (stream.getKind() == OrcProto.Stream.Kind.ROW_INDEX) { |
| int col = stream.getColumn(); |
| if ((included == null || included[col]) && indexes[col] == null) { |
| byte[] buffer = new byte[(int) stream.getLength()]; |
| file.seek(offset); |
| file.readFully(buffer); |
| indexes[col] = OrcProto.RowIndex.parseFrom(InStream.create("index", |
| ByteBuffer.wrap(buffer), codec, bufferSize)); |
| } |
| } |
| offset += stream.getLength(); |
| } |
| } |
| |
| private void seekToRowEntry(int rowEntry) throws IOException { |
| PositionProvider[] index = new PositionProvider[indexes.length]; |
| for(int i=0; i < indexes.length; ++i) { |
| if (indexes[i] != null) { |
| index[i]= |
| new PositionProviderImpl(indexes[i].getEntry(rowEntry)); |
| } |
| } |
| reader.seek(index); |
| } |
| |
| @Override |
| public void seekToRow(long rowNumber) throws IOException { |
| int rightStripe = findStripe(rowNumber); |
| if (rightStripe != currentStripe) { |
| currentStripe = rightStripe; |
| readStripe(); |
| } |
| readRowIndex(); |
| rowInStripe = rowNumber - rowBaseInStripe - firstRow; |
| if (rowIndexStride != 0) { |
| long entry = rowInStripe / rowIndexStride; |
| seekToRowEntry((int) entry); |
| reader.skipRows(rowInStripe - entry * rowIndexStride); |
| } else { |
| reader.skipRows(rowInStripe); |
| } |
| } |
| } |