blob: 850de9e299d6d597602ddfc7060e383f99f45384 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.orc.impl;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.function.Supplier;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.io.DiskRangeList;
import org.apache.orc.CompressionCodec;
import org.apache.orc.DataReader;
import org.apache.orc.OrcProto;
import org.apache.orc.StripeInformation;
import org.apache.orc.TypeDescription;
/**
* Stateless methods shared between RecordReaderImpl and EncodedReaderImpl.
*/
public class RecordReaderUtils {
private static final HadoopShims SHIMS = HadoopShimsFactory.get();
private static final Logger LOG = LoggerFactory.getLogger(RecordReaderUtils.class);
private static class DefaultDataReader implements DataReader {
private FSDataInputStream file;
private ByteBufferAllocatorPool pool;
private HadoopShims.ZeroCopyReaderShim zcr = null;
private final Supplier<FileSystem> fileSystemSupplier;
private final Path path;
private final boolean useZeroCopy;
private InStream.StreamOptions options;
private boolean isOpen = false;
private DefaultDataReader(DataReaderProperties properties) {
this.fileSystemSupplier = properties.getFileSystemSupplier();
this.path = properties.getPath();
this.file = properties.getFile();
this.useZeroCopy = properties.getZeroCopy();
this.options = properties.getCompression();
}
@Override
public void open() throws IOException {
if (file == null) {
this.file = fileSystemSupplier.get().open(path);
}
if (useZeroCopy) {
// ZCR only uses codec for boolean checks.
pool = new ByteBufferAllocatorPool();
zcr = RecordReaderUtils.createZeroCopyShim(file, options.getCodec(), pool);
} else {
zcr = null;
}
isOpen = true;
}
@Override
public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException {
if (!isOpen) {
open();
}
long offset = stripe.getOffset() + stripe.getIndexLength() + stripe.getDataLength();
int tailLength = (int) stripe.getFooterLength();
// read the footer
ByteBuffer tailBuf = ByteBuffer.allocate(tailLength);
file.readFully(offset, tailBuf.array(), tailBuf.arrayOffset(), tailLength);
return OrcProto.StripeFooter.parseFrom(
InStream.createCodedInputStream(InStream.create("footer",
new BufferChunk(tailBuf, 0), 0, tailLength, options)));
}
@Override
public BufferChunkList readFileData(BufferChunkList range,
boolean doForceDirect
) throws IOException {
RecordReaderUtils.readDiskRanges(file, zcr, range, doForceDirect);
return range;
}
@Override
public void close() throws IOException {
if (options.getCodec() != null) {
OrcCodecPool.returnCodec(options.getCodec().getKind(), options.getCodec());
options.withCodec(null);
}
if (pool != null) {
pool.clear();
}
// close both zcr and file
try (HadoopShims.ZeroCopyReaderShim myZcr = zcr) {
if (file != null) {
file.close();
file = null;
}
}
}
@Override
public boolean isTrackingDiskRanges() {
return zcr != null;
}
@Override
public void releaseBuffer(ByteBuffer buffer) {
zcr.releaseBuffer(buffer);
}
@Override
public DataReader clone() {
if (this.file != null) {
// We should really throw here, but that will cause failures in Hive.
// While Hive uses clone, just log a warning.
LOG.warn("Cloning an opened DataReader; the stream will be reused and closed twice");
}
try {
DefaultDataReader clone = (DefaultDataReader) super.clone();
if (options.getCodec() != null) {
// Make sure we don't share the same codec between two readers.
clone.options = options.clone();
}
return clone;
} catch (CloneNotSupportedException e) {
throw new UnsupportedOperationException("uncloneable", e);
}
}
@Override
public InStream.StreamOptions getCompressionOptions() {
return options;
}
}
public static DataReader createDefaultDataReader(DataReaderProperties properties) {
return new DefaultDataReader(properties);
}
/**
* Does region A overlap region B? The end points are inclusive on both sides.
* @param leftA A's left point
* @param rightA A's right point
* @param leftB B's left point
* @param rightB B's right point
* @return Does region A overlap region B?
*/
static boolean overlap(long leftA, long rightA, long leftB, long rightB) {
if (leftA <= leftB) {
return rightA >= leftB;
}
return rightB >= leftA;
}
public static long estimateRgEndOffset(boolean isCompressed,
int bufferSize,
boolean isLast,
long nextGroupOffset,
long streamLength) {
// figure out the worst case last location
// if adjacent groups have the same compressed block offset then stretch the slop
// by factor of 2 to safely accommodate the next compression block.
// One for the current compression block and another for the next compression block.
long slop = isCompressed
? 2 * (OutStream.HEADER_SIZE + bufferSize)
: WORST_UNCOMPRESSED_SLOP;
return isLast ? streamLength : Math.min(streamLength, nextGroupOffset + slop);
}
private static final int BYTE_STREAM_POSITIONS = 1;
private static final int RUN_LENGTH_BYTE_POSITIONS = BYTE_STREAM_POSITIONS + 1;
private static final int BITFIELD_POSITIONS = RUN_LENGTH_BYTE_POSITIONS + 1;
private static final int RUN_LENGTH_INT_POSITIONS = BYTE_STREAM_POSITIONS + 1;
/**
* Get the offset in the index positions for the column that the given
* stream starts.
* @param columnEncoding the encoding of the column
* @param columnType the type of the column
* @param streamType the kind of the stream
* @param isCompressed is the stream compressed?
* @param hasNulls does the column have a PRESENT stream?
* @return the number of positions that will be used for that stream
*/
public static int getIndexPosition(OrcProto.ColumnEncoding.Kind columnEncoding,
TypeDescription.Category columnType,
OrcProto.Stream.Kind streamType,
boolean isCompressed,
boolean hasNulls) {
if (streamType == OrcProto.Stream.Kind.PRESENT) {
return 0;
}
int compressionValue = isCompressed ? 1 : 0;
int base = hasNulls ? (BITFIELD_POSITIONS + compressionValue) : 0;
switch (columnType) {
case BOOLEAN:
case BYTE:
case SHORT:
case INT:
case LONG:
case FLOAT:
case DOUBLE:
case DATE:
case STRUCT:
case MAP:
case LIST:
case UNION:
return base;
case CHAR:
case VARCHAR:
case STRING:
if (columnEncoding == OrcProto.ColumnEncoding.Kind.DICTIONARY ||
columnEncoding == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) {
return base;
} else {
if (streamType == OrcProto.Stream.Kind.DATA) {
return base;
} else {
return base + BYTE_STREAM_POSITIONS + compressionValue;
}
}
case BINARY:
case DECIMAL:
if (streamType == OrcProto.Stream.Kind.DATA) {
return base;
}
return base + BYTE_STREAM_POSITIONS + compressionValue;
case TIMESTAMP:
case TIMESTAMP_INSTANT:
if (streamType == OrcProto.Stream.Kind.DATA) {
return base;
}
return base + RUN_LENGTH_INT_POSITIONS + compressionValue;
default:
throw new IllegalArgumentException("Unknown type " + columnType);
}
}
// for uncompressed streams, what is the most overlap with the following set
// of rows (long vint literal group).
static final int WORST_UNCOMPRESSED_SLOP = 2 + 8 * 512;
/**
* Is this stream part of a dictionary?
* @return is this part of a dictionary?
*/
public static boolean isDictionary(OrcProto.Stream.Kind kind,
OrcProto.ColumnEncoding encoding) {
assert kind != OrcProto.Stream.Kind.DICTIONARY_COUNT;
OrcProto.ColumnEncoding.Kind encodingKind = encoding.getKind();
return kind == OrcProto.Stream.Kind.DICTIONARY_DATA ||
(kind == OrcProto.Stream.Kind.LENGTH &&
(encodingKind == OrcProto.ColumnEncoding.Kind.DICTIONARY ||
encodingKind == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2));
}
/**
* Build a string representation of a list of disk ranges.
* @param range ranges to stringify
* @return the resulting string
*/
public static String stringifyDiskRanges(DiskRangeList range) {
StringBuilder buffer = new StringBuilder();
buffer.append("[");
boolean isFirst = true;
while (range != null) {
if (!isFirst) {
buffer.append(", {");
} else {
buffer.append("{");
}
isFirst = false;
buffer.append(range.toString());
buffer.append("}");
range = range.next;
}
buffer.append("]");
return buffer.toString();
}
static long computeEnd(BufferChunk first, BufferChunk last) {
long end = 0;
for(BufferChunk ptr=first; ptr != last.next; ptr = (BufferChunk) ptr.next) {
end = Math.max(ptr.getEnd(), end);
}
return end;
}
/**
* Zero-copy tead the data from the file based on a list of ranges in a
* single read.
*
* As a side note, the HDFS zero copy API really sucks from a user's point of
* view.
*
* @param file the file we're reading from
* @param zcr the zero copy shim
* @param first the first range to read
* @param last the last range to read
* @param allocateDirect if we need to allocate buffers, should we use direct
* @throws IOException
*/
static void zeroCopyReadRanges(FSDataInputStream file,
HadoopShims.ZeroCopyReaderShim zcr,
BufferChunk first,
BufferChunk last,
boolean allocateDirect) throws IOException {
// read all of the bytes that we need
final long offset = first.getOffset();
int length = (int)(computeEnd(first, last) - offset);
file.seek(offset);
List<ByteBuffer> bytes = new ArrayList<>();
while (length > 0) {
ByteBuffer read= zcr.readBuffer(length, false);
bytes.add(read);
length -= read.remaining();
}
long currentOffset = offset;
// iterate and fill each range
BufferChunk current = first;
Iterator<ByteBuffer> buffers = bytes.iterator();
ByteBuffer currentBuffer = buffers.next();
while (current != last.next) {
// if we are past the start of the range, restart the iterator
if (current.getOffset() < offset) {
buffers = bytes.iterator();
currentBuffer = buffers.next();
currentOffset = offset;
}
// walk through the buffers to find the start of the buffer
while (currentOffset + currentBuffer.remaining() <= current.getOffset()) {
currentOffset += currentBuffer.remaining();
// We assume that buffers.hasNext is true because we know we read
// enough data to cover the last range.
currentBuffer = buffers.next();
}
// did we get the current range in a single read?
if (currentOffset + currentBuffer.remaining() >= current.getEnd()) {
ByteBuffer copy = currentBuffer.duplicate();
copy.position((int) (current.getOffset() - currentOffset));
copy.limit(copy.position() + current.getLength());
current.setChunk(copy);
} else {
// otherwise, build a single buffer that holds the entire range
ByteBuffer result = allocateDirect
? ByteBuffer.allocateDirect(current.getLength())
: ByteBuffer.allocate(current.getLength());
// we know that the range spans buffers
ByteBuffer copy = currentBuffer.duplicate();
// skip over the front matter
copy.position((int) (current.getOffset() - currentOffset));
result.put(copy);
// advance the buffer
currentOffset += currentBuffer.remaining();
currentBuffer = buffers.next();
while (result.hasRemaining()) {
if (result.remaining() > currentBuffer.remaining()) {
result.put(currentBuffer.duplicate());
currentOffset += currentBuffer.remaining();
currentBuffer = buffers.next();
} else {
copy = currentBuffer.duplicate();
copy.limit(result.remaining());
result.put(copy);
}
}
result.flip();
current.setChunk(result);
}
current = (BufferChunk) current.next;
}
}
/**
* Read the data from the file based on a list of ranges in a single read.
* @param file the file to read from
* @param first the first range to read
* @param last the last range to read
* @param allocateDirect should we use direct buffers
*/
static void readRanges(FSDataInputStream file,
BufferChunk first,
BufferChunk last,
boolean allocateDirect) throws IOException {
// assume that the chunks are sorted by offset
long offset = first.getOffset();
int readSize = (int) (computeEnd(first, last) - offset);
byte[] buffer = new byte[readSize];
file.readFully(offset, buffer, 0, buffer.length);
// get the data into a ByteBuffer
ByteBuffer bytes;
if (allocateDirect) {
bytes = ByteBuffer.allocateDirect(readSize);
bytes.put(buffer);
bytes.flip();
} else {
bytes = ByteBuffer.wrap(buffer);
}
// populate each BufferChunks with the data
BufferChunk current = first;
while (current != last.next) {
ByteBuffer currentBytes = current == last ? bytes : bytes.duplicate();
currentBytes.position((int) (current.getOffset() - offset));
currentBytes.limit((int) (current.getEnd() - offset));
current.setChunk(currentBytes);
current = (BufferChunk) current.next;
}
}
/**
* Find the list of ranges that should be read in a single read.
* The read will stop when there is a gap, one of the ranges already has data,
* or we have reached the maximum read size of 2^31.
* @param first the first range to read
* @return the last range to read
*/
static BufferChunk findSingleRead(BufferChunk first) {
BufferChunk last = first;
long currentEnd = first.getEnd();
while (last.next != null &&
!last.next.hasData() &&
last.next.getOffset() <= currentEnd &&
last.next.getEnd() - first.getOffset() < Integer.MAX_VALUE) {
last = (BufferChunk) last.next;
currentEnd = Math.max(currentEnd, last.getEnd());
}
return last;
}
/**
* Read the list of ranges from the file by updating each range in the list
* with a buffer that has the bytes from the file.
*
* The ranges must be sorted, but may overlap or include holes.
*
* @param file the file to read
* @param zcr the zero copy shim
* @param list the disk ranges within the file to read
* @param doForceDirect allocate direct buffers
*/
static void readDiskRanges(FSDataInputStream file,
HadoopShims.ZeroCopyReaderShim zcr,
BufferChunkList list,
boolean doForceDirect) throws IOException {
BufferChunk current = list == null ? null : list.get();
while (current != null) {
while (current.hasData()) {
current = (BufferChunk) current.next;
}
BufferChunk last = findSingleRead(current);
if (zcr != null) {
zeroCopyReadRanges(file, zcr, current, last, doForceDirect);
} else {
readRanges(file, current, last, doForceDirect);
}
current = (BufferChunk) last.next;
}
}
static HadoopShims.ZeroCopyReaderShim createZeroCopyShim(FSDataInputStream file,
CompressionCodec codec, ByteBufferAllocatorPool pool) throws IOException {
if ((codec == null || ((codec instanceof DirectDecompressionCodec)
&& ((DirectDecompressionCodec) codec).isAvailable()))) {
/* codec is null or is available */
return SHIMS.getZeroCopyReader(file, pool);
}
return null;
}
// this is an implementation copied from ElasticByteBufferPool in hadoop-2,
// which lacks a clear()/clean() operation
public final static class ByteBufferAllocatorPool implements HadoopShims.ByteBufferPoolShim {
private static final class Key implements Comparable<Key> {
private final int capacity;
private final long insertionGeneration;
Key(int capacity, long insertionGeneration) {
this.capacity = capacity;
this.insertionGeneration = insertionGeneration;
}
@Override
public int compareTo(Key other) {
if (capacity != other.capacity) {
return capacity - other.capacity;
} else {
return Long.compare(insertionGeneration, other.insertionGeneration);
}
}
@Override
public boolean equals(Object rhs) {
if (rhs == null) {
return false;
}
try {
Key o = (Key) rhs;
return (compareTo(o) == 0);
} catch (ClassCastException e) {
return false;
}
}
@Override
public int hashCode() {
return new HashCodeBuilder().append(capacity).append(insertionGeneration)
.toHashCode();
}
}
private final TreeMap<Key, ByteBuffer> buffers = new TreeMap<>();
private final TreeMap<Key, ByteBuffer> directBuffers = new TreeMap<>();
private long currentGeneration = 0;
private final TreeMap<Key, ByteBuffer> getBufferTree(boolean direct) {
return direct ? directBuffers : buffers;
}
public void clear() {
buffers.clear();
directBuffers.clear();
}
@Override
public ByteBuffer getBuffer(boolean direct, int length) {
TreeMap<Key, ByteBuffer> tree = getBufferTree(direct);
Map.Entry<Key, ByteBuffer> entry = tree.ceilingEntry(new Key(length, 0));
if (entry == null) {
return direct ? ByteBuffer.allocateDirect(length) : ByteBuffer
.allocate(length);
}
tree.remove(entry.getKey());
return entry.getValue();
}
@Override
public void putBuffer(ByteBuffer buffer) {
TreeMap<Key, ByteBuffer> tree = getBufferTree(buffer.isDirect());
while (true) {
Key key = new Key(buffer.capacity(), currentGeneration++);
if (!tree.containsKey(key)) {
tree.put(key, buffer);
return;
}
// Buffers are indexed by (capacity, generation).
// If our key is not unique on the first try, we try again
}
}
}
}