| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.tajo.storage.thirdparty.orc; |
| |
| import org.apache.hadoop.fs.FSDataInputStream; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hive.common.io.DiskRange; |
| import org.apache.hadoop.hive.common.io.DiskRangeList; |
| import org.apache.orc.CompressionCodec; |
| import org.apache.orc.DataReader; |
| import org.apache.orc.OrcProto; |
| import org.apache.orc.impl.BufferChunk; |
| import org.apache.orc.impl.DirectDecompressionCodec; |
| import org.apache.orc.impl.OutStream; |
| |
| import java.io.IOException; |
| import java.nio.ByteBuffer; |
| import java.util.ArrayList; |
| import java.util.List; |
| |
| public class RecordReaderUtils { |
| |
| public static class DefaultDataReader implements DataReader { |
| private FSDataInputStream file; |
| private ByteBufferAllocatorPool pool; |
| private ZeroCopyAdapter zcr; |
| private FileSystem fs; |
| private Path path; |
| private boolean useZeroCopy; |
| private CompressionCodec codec; |
| private long readBytes = 0; |
| |
| public DefaultDataReader( |
| FileSystem fs, Path path, boolean useZeroCopy, CompressionCodec codec) { |
| this.fs = fs; |
| this.path = path; |
| this.useZeroCopy = useZeroCopy; |
| this.codec = codec; |
| } |
| |
| @Override |
| public void open() throws IOException { |
| this.file = fs.open(path); |
| if (useZeroCopy) { |
| pool = new ByteBufferAllocatorPool(); |
| zcr = RecordReaderUtils.createZeroCopyShim(file, codec, pool); |
| } else { |
| pool = null; |
| zcr = null; |
| } |
| } |
| |
| @Override |
| public DiskRangeList readFileData( |
| DiskRangeList range, long baseOffset, boolean doForceDirect) throws IOException { |
| return readDiskRanges(file, zcr, baseOffset, range, doForceDirect); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| if (file != null) { |
| file.close(); |
| } |
| if (pool != null) { |
| pool.clear(); |
| } |
| } |
| |
| @Override |
| public boolean isTrackingDiskRanges() { |
| return zcr != null; |
| } |
| |
| @Override |
| public void releaseBuffer(ByteBuffer buffer) { |
| zcr.releaseBuffer(buffer); |
| } |
| |
| public long getReadBytes() { |
| return readBytes; |
| } |
| |
| /** |
| * Read the list of ranges from the file. |
| * @param file the file to read |
| * @param base the base of the stripe |
| * @param range the disk ranges within the stripe to read |
| * @return the bytes read for each disk range, which is the same length as |
| * ranges |
| * @throws IOException |
| */ |
| private DiskRangeList readDiskRanges(FSDataInputStream file, |
| ZeroCopyAdapter zcr, |
| long base, |
| DiskRangeList range, |
| boolean doForceDirect) throws IOException { |
| if (range == null) return null; |
| DiskRangeList prev = range.prev; |
| if (prev == null) { |
| prev = new DiskRangeList.MutateHelper(range); |
| } |
| while (range != null) { |
| if (range.hasData()) { |
| range = range.next; |
| continue; |
| } |
| int len = (int) (range.getEnd() - range.getOffset()); |
| long off = range.getOffset(); |
| if (zcr != null) { |
| file.seek(base + off); |
| boolean hasReplaced = false; |
| while (len > 0) { |
| ByteBuffer partial = zcr.readBuffer(len, false); |
| readBytes += partial.remaining(); |
| BufferChunk bc = new BufferChunk(partial, off); |
| if (!hasReplaced) { |
| range.replaceSelfWith(bc); |
| hasReplaced = true; |
| } else { |
| range.insertAfter(bc); |
| } |
| range = bc; |
| int read = partial.remaining(); |
| len -= read; |
| off += read; |
| } |
| } else { |
| // Don't use HDFS ByteBuffer API because it has no readFully, and is buggy and pointless. |
| byte[] buffer = new byte[len]; |
| file.readFully((base + off), buffer, 0, buffer.length); |
| readBytes += buffer.length; |
| ByteBuffer bb = null; |
| if (doForceDirect) { |
| bb = ByteBuffer.allocateDirect(len); |
| bb.put(buffer); |
| bb.position(0); |
| bb.limit(len); |
| } else { |
| bb = ByteBuffer.wrap(buffer); |
| } |
| range = range.replaceSelfWith(new BufferChunk(bb, range.getOffset())); |
| } |
| range = range.next; |
| } |
| return prev.next; |
| } |
| } |
| |
| public static DataReader createDefaultDataReader( |
| FileSystem fs, Path path, boolean useZeroCopy, CompressionCodec codec) { |
| return new DefaultDataReader(fs, path, useZeroCopy, codec); |
| } |
| |
| public static boolean[] findPresentStreamsByColumn( |
| List<OrcProto.Stream> streamList, List<OrcProto.Type> types) { |
| boolean[] hasNull = new boolean[types.size()]; |
| for(OrcProto.Stream stream: streamList) { |
| if (stream.hasKind() && (stream.getKind() == OrcProto.Stream.Kind.PRESENT)) { |
| hasNull[stream.getColumn()] = true; |
| } |
| } |
| return hasNull; |
| } |
| |
| /** |
| * Does region A overlap region B? The end points are inclusive on both sides. |
| * @param leftA A's left point |
| * @param rightA A's right point |
| * @param leftB B's left point |
| * @param rightB B's right point |
| * @return Does region A overlap region B? |
| */ |
| static boolean overlap(long leftA, long rightA, long leftB, long rightB) { |
| if (leftA <= leftB) { |
| return rightA >= leftB; |
| } |
| return rightB >= leftA; |
| } |
| |
| public static void addEntireStreamToRanges( |
| long offset, long length, DiskRangeList.CreateHelper list, boolean doMergeBuffers) { |
| list.addOrMerge(offset, offset + length, doMergeBuffers, false); |
| } |
| |
| public static void addRgFilteredStreamToRanges(OrcProto.Stream stream, |
| boolean[] includedRowGroups, boolean isCompressed, OrcProto.RowIndex index, |
| OrcProto.ColumnEncoding encoding, OrcProto.Type type, int compressionSize, boolean hasNull, |
| long offset, long length, DiskRangeList.CreateHelper list, boolean doMergeBuffers) { |
| for (int group = 0; group < includedRowGroups.length; ++group) { |
| if (!includedRowGroups[group]) continue; |
| int posn = getIndexPosition( |
| encoding.getKind(), type.getKind(), stream.getKind(), isCompressed, hasNull); |
| long start = index.getEntry(group).getPositions(posn); |
| final long nextGroupOffset; |
| boolean isLast = group == (includedRowGroups.length - 1); |
| nextGroupOffset = isLast ? length : index.getEntry(group + 1).getPositions(posn); |
| |
| start += offset; |
| long end = offset + estimateRgEndOffset( |
| isCompressed, isLast, nextGroupOffset, length, compressionSize); |
| list.addOrMerge(start, end, doMergeBuffers, true); |
| } |
| } |
| |
| public static long estimateRgEndOffset(boolean isCompressed, boolean isLast, |
| long nextGroupOffset, long streamLength, int bufferSize) { |
| // figure out the worst case last location |
| // if adjacent groups have the same compressed block offset then stretch the slop |
| // by factor of 2 to safely accommodate the next compression block. |
| // One for the current compression block and another for the next compression block. |
| long slop = isCompressed ? 2 * (OutStream.HEADER_SIZE + bufferSize) : WORST_UNCOMPRESSED_SLOP; |
| return isLast ? streamLength : Math.min(streamLength, nextGroupOffset + slop); |
| } |
| |
| private static final int BYTE_STREAM_POSITIONS = 1; |
| private static final int RUN_LENGTH_BYTE_POSITIONS = BYTE_STREAM_POSITIONS + 1; |
| private static final int BITFIELD_POSITIONS = RUN_LENGTH_BYTE_POSITIONS + 1; |
| private static final int RUN_LENGTH_INT_POSITIONS = BYTE_STREAM_POSITIONS + 1; |
| |
| /** |
| * Get the offset in the index positions for the column that the given |
| * stream starts. |
| * @param columnEncoding the encoding of the column |
| * @param columnType the type of the column |
| * @param streamType the kind of the stream |
| * @param isCompressed is the file compressed |
| * @param hasNulls does the column have a PRESENT stream? |
| * @return the number of positions that will be used for that stream |
| */ |
| public static int getIndexPosition(OrcProto.ColumnEncoding.Kind columnEncoding, |
| OrcProto.Type.Kind columnType, |
| OrcProto.Stream.Kind streamType, |
| boolean isCompressed, |
| boolean hasNulls) { |
| if (streamType == OrcProto.Stream.Kind.PRESENT) { |
| return 0; |
| } |
| int compressionValue = isCompressed ? 1 : 0; |
| int base = hasNulls ? (BITFIELD_POSITIONS + compressionValue) : 0; |
| switch (columnType) { |
| case BOOLEAN: |
| case BYTE: |
| case SHORT: |
| case INT: |
| case LONG: |
| case FLOAT: |
| case DOUBLE: |
| case DATE: |
| case STRUCT: |
| case MAP: |
| case LIST: |
| case UNION: |
| return base; |
| case CHAR: |
| case VARCHAR: |
| case STRING: |
| if (columnEncoding == OrcProto.ColumnEncoding.Kind.DICTIONARY || |
| columnEncoding == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) { |
| return base; |
| } else { |
| if (streamType == OrcProto.Stream.Kind.DATA) { |
| return base; |
| } else { |
| return base + BYTE_STREAM_POSITIONS + compressionValue; |
| } |
| } |
| case BINARY: |
| if (streamType == OrcProto.Stream.Kind.DATA) { |
| return base; |
| } |
| return base + BYTE_STREAM_POSITIONS + compressionValue; |
| case DECIMAL: |
| if (streamType == OrcProto.Stream.Kind.DATA) { |
| return base; |
| } |
| return base + BYTE_STREAM_POSITIONS + compressionValue; |
| case TIMESTAMP: |
| if (streamType == OrcProto.Stream.Kind.DATA) { |
| return base; |
| } |
| return base + RUN_LENGTH_INT_POSITIONS + compressionValue; |
| default: |
| throw new IllegalArgumentException("Unknown type " + columnType); |
| } |
| } |
| |
| // for uncompressed streams, what is the most overlap with the following set |
| // of rows (long vint literal group). |
| static final int WORST_UNCOMPRESSED_SLOP = 2 + 8 * 512; |
| |
| /** |
| * Is this stream part of a dictionary? |
| * @return is this part of a dictionary? |
| */ |
| public static boolean isDictionary(OrcProto.Stream.Kind kind, |
| OrcProto.ColumnEncoding encoding) { |
| assert kind != OrcProto.Stream.Kind.DICTIONARY_COUNT; |
| OrcProto.ColumnEncoding.Kind encodingKind = encoding.getKind(); |
| return kind == OrcProto.Stream.Kind.DICTIONARY_DATA || |
| (kind == OrcProto.Stream.Kind.LENGTH && |
| (encodingKind == OrcProto.ColumnEncoding.Kind.DICTIONARY || |
| encodingKind == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2)); |
| } |
| |
| /** |
| * Build a string representation of a list of disk ranges. |
| * @param range ranges to stringify |
| * @return the resulting string |
| */ |
| public static String stringifyDiskRanges(DiskRangeList range) { |
| StringBuilder buffer = new StringBuilder(); |
| buffer.append("["); |
| boolean isFirst = true; |
| while (range != null) { |
| if (!isFirst) { |
| buffer.append(", {"); |
| } else { |
| buffer.append("{"); |
| } |
| isFirst = false; |
| buffer.append(range.toString()); |
| buffer.append("}"); |
| range = range.next; |
| } |
| buffer.append("]"); |
| return buffer.toString(); |
| } |
| |
| public static List<DiskRange> getStreamBuffers(DiskRangeList range, long offset, long length) { |
| // This assumes sorted ranges (as do many other parts of ORC code. |
| ArrayList<DiskRange> buffers = new ArrayList<DiskRange>(); |
| if (length == 0) return buffers; |
| long streamEnd = offset + length; |
| boolean inRange = false; |
| while (range != null) { |
| if (!inRange) { |
| if (range.getEnd() <= offset) { |
| range = range.next; |
| continue; // Skip until we are in range. |
| } |
| inRange = true; |
| if (range.getOffset() < offset) { |
| // Partial first buffer, add a slice of it. |
| buffers.add(range.sliceAndShift(offset, Math.min(streamEnd, range.getEnd()), -offset)); |
| if (range.getEnd() >= streamEnd) break; // Partial first buffer is also partial last buffer. |
| range = range.next; |
| continue; |
| } |
| } else if (range.getOffset() >= streamEnd) { |
| break; |
| } |
| if (range.getEnd() > streamEnd) { |
| // Partial last buffer (may also be the first buffer), add a slice of it. |
| buffers.add(range.sliceAndShift(range.getOffset(), streamEnd, -offset)); |
| break; |
| } |
| // Buffer that belongs entirely to one stream. |
| // TODO: ideally we would want to reuse the object and remove it from the list, but we cannot |
| // because bufferChunks is also used by clearStreams for zcr. Create a useless dup. |
| buffers.add(range.sliceAndShift(range.getOffset(), range.getEnd(), -offset)); |
| if (range.getEnd() == streamEnd) break; |
| range = range.next; |
| } |
| return buffers; |
| } |
| |
| static ZeroCopyAdapter createZeroCopyShim(FSDataInputStream file, |
| CompressionCodec codec, ByteBufferAllocatorPool pool) throws IOException { |
| if ((codec == null || ((codec instanceof DirectDecompressionCodec) |
| && ((DirectDecompressionCodec) codec).isAvailable()))) { |
| /* codec is null or is available */ |
| return new ZeroCopyAdapter(file, pool); |
| } |
| return null; |
| } |
| } |