blob: bc882e09f0529fbc5035a34896ad13671714a3fd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.tajo.storage.thirdparty.orc;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.io.DiskRange;
import org.apache.hadoop.hive.common.io.DiskRangeList;
import org.apache.orc.CompressionCodec;
import org.apache.orc.DataReader;
import org.apache.orc.OrcProto;
import org.apache.orc.impl.BufferChunk;
import org.apache.orc.impl.DirectDecompressionCodec;
import org.apache.orc.impl.OutStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
public class RecordReaderUtils {
public static class DefaultDataReader implements DataReader {
private FSDataInputStream file;
private ByteBufferAllocatorPool pool;
private ZeroCopyAdapter zcr;
private FileSystem fs;
private Path path;
private boolean useZeroCopy;
private CompressionCodec codec;
private long readBytes = 0;
public DefaultDataReader(
FileSystem fs, Path path, boolean useZeroCopy, CompressionCodec codec) {
this.fs = fs;
this.path = path;
this.useZeroCopy = useZeroCopy;
this.codec = codec;
}
@Override
public void open() throws IOException {
this.file = fs.open(path);
if (useZeroCopy) {
pool = new ByteBufferAllocatorPool();
zcr = RecordReaderUtils.createZeroCopyShim(file, codec, pool);
} else {
pool = null;
zcr = null;
}
}
@Override
public DiskRangeList readFileData(
DiskRangeList range, long baseOffset, boolean doForceDirect) throws IOException {
return readDiskRanges(file, zcr, baseOffset, range, doForceDirect);
}
@Override
public void close() throws IOException {
if (file != null) {
file.close();
}
if (pool != null) {
pool.clear();
}
}
@Override
public boolean isTrackingDiskRanges() {
return zcr != null;
}
@Override
public void releaseBuffer(ByteBuffer buffer) {
zcr.releaseBuffer(buffer);
}
public long getReadBytes() {
return readBytes;
}
/**
* Read the list of ranges from the file.
* @param file the file to read
* @param base the base of the stripe
* @param range the disk ranges within the stripe to read
* @return the bytes read for each disk range, which is the same length as
* ranges
* @throws IOException
*/
private DiskRangeList readDiskRanges(FSDataInputStream file,
ZeroCopyAdapter zcr,
long base,
DiskRangeList range,
boolean doForceDirect) throws IOException {
if (range == null) return null;
DiskRangeList prev = range.prev;
if (prev == null) {
prev = new DiskRangeList.MutateHelper(range);
}
while (range != null) {
if (range.hasData()) {
range = range.next;
continue;
}
int len = (int) (range.getEnd() - range.getOffset());
long off = range.getOffset();
if (zcr != null) {
file.seek(base + off);
boolean hasReplaced = false;
while (len > 0) {
ByteBuffer partial = zcr.readBuffer(len, false);
readBytes += partial.remaining();
BufferChunk bc = new BufferChunk(partial, off);
if (!hasReplaced) {
range.replaceSelfWith(bc);
hasReplaced = true;
} else {
range.insertAfter(bc);
}
range = bc;
int read = partial.remaining();
len -= read;
off += read;
}
} else {
// Don't use HDFS ByteBuffer API because it has no readFully, and is buggy and pointless.
byte[] buffer = new byte[len];
file.readFully((base + off), buffer, 0, buffer.length);
readBytes += buffer.length;
ByteBuffer bb = null;
if (doForceDirect) {
bb = ByteBuffer.allocateDirect(len);
bb.put(buffer);
bb.position(0);
bb.limit(len);
} else {
bb = ByteBuffer.wrap(buffer);
}
range = range.replaceSelfWith(new BufferChunk(bb, range.getOffset()));
}
range = range.next;
}
return prev.next;
}
}
public static DataReader createDefaultDataReader(
FileSystem fs, Path path, boolean useZeroCopy, CompressionCodec codec) {
return new DefaultDataReader(fs, path, useZeroCopy, codec);
}
public static boolean[] findPresentStreamsByColumn(
List<OrcProto.Stream> streamList, List<OrcProto.Type> types) {
boolean[] hasNull = new boolean[types.size()];
for(OrcProto.Stream stream: streamList) {
if (stream.hasKind() && (stream.getKind() == OrcProto.Stream.Kind.PRESENT)) {
hasNull[stream.getColumn()] = true;
}
}
return hasNull;
}
/**
* Does region A overlap region B? The end points are inclusive on both sides.
* @param leftA A's left point
* @param rightA A's right point
* @param leftB B's left point
* @param rightB B's right point
* @return Does region A overlap region B?
*/
static boolean overlap(long leftA, long rightA, long leftB, long rightB) {
if (leftA <= leftB) {
return rightA >= leftB;
}
return rightB >= leftA;
}
public static void addEntireStreamToRanges(
long offset, long length, DiskRangeList.CreateHelper list, boolean doMergeBuffers) {
list.addOrMerge(offset, offset + length, doMergeBuffers, false);
}
public static void addRgFilteredStreamToRanges(OrcProto.Stream stream,
boolean[] includedRowGroups, boolean isCompressed, OrcProto.RowIndex index,
OrcProto.ColumnEncoding encoding, OrcProto.Type type, int compressionSize, boolean hasNull,
long offset, long length, DiskRangeList.CreateHelper list, boolean doMergeBuffers) {
for (int group = 0; group < includedRowGroups.length; ++group) {
if (!includedRowGroups[group]) continue;
int posn = getIndexPosition(
encoding.getKind(), type.getKind(), stream.getKind(), isCompressed, hasNull);
long start = index.getEntry(group).getPositions(posn);
final long nextGroupOffset;
boolean isLast = group == (includedRowGroups.length - 1);
nextGroupOffset = isLast ? length : index.getEntry(group + 1).getPositions(posn);
start += offset;
long end = offset + estimateRgEndOffset(
isCompressed, isLast, nextGroupOffset, length, compressionSize);
list.addOrMerge(start, end, doMergeBuffers, true);
}
}
public static long estimateRgEndOffset(boolean isCompressed, boolean isLast,
long nextGroupOffset, long streamLength, int bufferSize) {
// figure out the worst case last location
// if adjacent groups have the same compressed block offset then stretch the slop
// by factor of 2 to safely accommodate the next compression block.
// One for the current compression block and another for the next compression block.
long slop = isCompressed ? 2 * (OutStream.HEADER_SIZE + bufferSize) : WORST_UNCOMPRESSED_SLOP;
return isLast ? streamLength : Math.min(streamLength, nextGroupOffset + slop);
}
private static final int BYTE_STREAM_POSITIONS = 1;
private static final int RUN_LENGTH_BYTE_POSITIONS = BYTE_STREAM_POSITIONS + 1;
private static final int BITFIELD_POSITIONS = RUN_LENGTH_BYTE_POSITIONS + 1;
private static final int RUN_LENGTH_INT_POSITIONS = BYTE_STREAM_POSITIONS + 1;
/**
* Get the offset in the index positions for the column that the given
* stream starts.
* @param columnEncoding the encoding of the column
* @param columnType the type of the column
* @param streamType the kind of the stream
* @param isCompressed is the file compressed
* @param hasNulls does the column have a PRESENT stream?
* @return the number of positions that will be used for that stream
*/
public static int getIndexPosition(OrcProto.ColumnEncoding.Kind columnEncoding,
OrcProto.Type.Kind columnType,
OrcProto.Stream.Kind streamType,
boolean isCompressed,
boolean hasNulls) {
if (streamType == OrcProto.Stream.Kind.PRESENT) {
return 0;
}
int compressionValue = isCompressed ? 1 : 0;
int base = hasNulls ? (BITFIELD_POSITIONS + compressionValue) : 0;
switch (columnType) {
case BOOLEAN:
case BYTE:
case SHORT:
case INT:
case LONG:
case FLOAT:
case DOUBLE:
case DATE:
case STRUCT:
case MAP:
case LIST:
case UNION:
return base;
case CHAR:
case VARCHAR:
case STRING:
if (columnEncoding == OrcProto.ColumnEncoding.Kind.DICTIONARY ||
columnEncoding == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) {
return base;
} else {
if (streamType == OrcProto.Stream.Kind.DATA) {
return base;
} else {
return base + BYTE_STREAM_POSITIONS + compressionValue;
}
}
case BINARY:
if (streamType == OrcProto.Stream.Kind.DATA) {
return base;
}
return base + BYTE_STREAM_POSITIONS + compressionValue;
case DECIMAL:
if (streamType == OrcProto.Stream.Kind.DATA) {
return base;
}
return base + BYTE_STREAM_POSITIONS + compressionValue;
case TIMESTAMP:
if (streamType == OrcProto.Stream.Kind.DATA) {
return base;
}
return base + RUN_LENGTH_INT_POSITIONS + compressionValue;
default:
throw new IllegalArgumentException("Unknown type " + columnType);
}
}
// for uncompressed streams, what is the most overlap with the following set
// of rows (long vint literal group).
static final int WORST_UNCOMPRESSED_SLOP = 2 + 8 * 512;
/**
* Is this stream part of a dictionary?
* @return is this part of a dictionary?
*/
public static boolean isDictionary(OrcProto.Stream.Kind kind,
OrcProto.ColumnEncoding encoding) {
assert kind != OrcProto.Stream.Kind.DICTIONARY_COUNT;
OrcProto.ColumnEncoding.Kind encodingKind = encoding.getKind();
return kind == OrcProto.Stream.Kind.DICTIONARY_DATA ||
(kind == OrcProto.Stream.Kind.LENGTH &&
(encodingKind == OrcProto.ColumnEncoding.Kind.DICTIONARY ||
encodingKind == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2));
}
/**
* Build a string representation of a list of disk ranges.
* @param range ranges to stringify
* @return the resulting string
*/
public static String stringifyDiskRanges(DiskRangeList range) {
StringBuilder buffer = new StringBuilder();
buffer.append("[");
boolean isFirst = true;
while (range != null) {
if (!isFirst) {
buffer.append(", {");
} else {
buffer.append("{");
}
isFirst = false;
buffer.append(range.toString());
buffer.append("}");
range = range.next;
}
buffer.append("]");
return buffer.toString();
}
public static List<DiskRange> getStreamBuffers(DiskRangeList range, long offset, long length) {
// This assumes sorted ranges (as do many other parts of ORC code.
ArrayList<DiskRange> buffers = new ArrayList<DiskRange>();
if (length == 0) return buffers;
long streamEnd = offset + length;
boolean inRange = false;
while (range != null) {
if (!inRange) {
if (range.getEnd() <= offset) {
range = range.next;
continue; // Skip until we are in range.
}
inRange = true;
if (range.getOffset() < offset) {
// Partial first buffer, add a slice of it.
buffers.add(range.sliceAndShift(offset, Math.min(streamEnd, range.getEnd()), -offset));
if (range.getEnd() >= streamEnd) break; // Partial first buffer is also partial last buffer.
range = range.next;
continue;
}
} else if (range.getOffset() >= streamEnd) {
break;
}
if (range.getEnd() > streamEnd) {
// Partial last buffer (may also be the first buffer), add a slice of it.
buffers.add(range.sliceAndShift(range.getOffset(), streamEnd, -offset));
break;
}
// Buffer that belongs entirely to one stream.
// TODO: ideally we would want to reuse the object and remove it from the list, but we cannot
// because bufferChunks is also used by clearStreams for zcr. Create a useless dup.
buffers.add(range.sliceAndShift(range.getOffset(), range.getEnd(), -offset));
if (range.getEnd() == streamEnd) break;
range = range.next;
}
return buffers;
}
static ZeroCopyAdapter createZeroCopyShim(FSDataInputStream file,
CompressionCodec codec, ByteBufferAllocatorPool pool) throws IOException {
if ((codec == null || ((codec instanceof DirectDecompressionCodec)
&& ((DirectDecompressionCodec) codec).isAvailable()))) {
/* codec is null or is available */
return new ZeroCopyAdapter(file, pool);
}
return null;
}
}