blob: bcb54d6eb3d8f69c919530e5259039f3c0f902a0 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hive.ql.io.orc.encoded;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.common.Pool;
import org.apache.hadoop.hive.common.Pool.PoolObjectHelper;
import org.apache.hadoop.hive.common.io.DataCache;
import org.apache.hadoop.hive.common.io.DiskRange;
import org.apache.hadoop.hive.common.io.DiskRangeList;
import org.apache.hadoop.hive.common.io.DataCache.BooleanRef;
import org.apache.hadoop.hive.common.io.DataCache.DiskRangeListFactory;
import org.apache.hadoop.hive.common.io.DiskRangeList.CreateHelper;
import org.apache.hadoop.hive.common.io.encoded.EncodedColumnBatch.ColumnStreamData;
import org.apache.hadoop.hive.common.io.encoded.MemoryBuffer;
import org.apache.orc.CompressionCodec;
import org.apache.orc.DataReader;
import org.apache.orc.OrcConf;
import org.apache.orc.impl.OutStream;
import org.apache.orc.impl.RecordReaderUtils;
import org.apache.orc.impl.StreamName;
import org.apache.orc.StripeInformation;
import org.apache.orc.impl.BufferChunk;
import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch;
import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.PoolFactory;
import org.apache.orc.OrcProto;
/**
* Encoded reader implementation.
*
* Note about refcounts on cache blocks.
* When we get or put blocks into cache, they are "locked" (refcount++), so they cannot be evicted.
* We send the MemoryBuffer-s to caller as part of RG data; one MemoryBuffer can be used for many
* RGs (e.g. a dictionary, or multiple RGs per block). Also, we want to "unlock" MemoryBuffer-s in
* cache as soon as possible. This is how we deal with this:
*
* For dictionary case:
* 1) There's a separate refcount on the ColumnStreamData object we send to the caller. In the
* dictionary case, it's increased per RG, and callers don't release MBs if the containing
* ColumnStreamData is not ready to be released. This is done because dictionary can have many
* buffers; decrefing all of them for all RGs is more expensive; plus, decrefing in cache
* may be more expensive due to cache policy/etc.
*
* For non-dictionary case:
* 1) All the ColumnStreamData-s for normal data always have refcount 1; we return them once.
* 2) At all times, every MB in such cases has +1 refcount for each time we return it as part of CSD.
* 3) When caller is done, it therefore decrefs SB to 0, and decrefs all the MBs involved.
* 4) Additionally, we keep an extra +1 refcount "for the fetching thread". That way, if we return
* the MB to caller, and he decrefs it, the MB can't be evicted and will be there if we want to
* reuse it for some other RG.
* 5) As we read (we always read RGs in order and forward in each stream; we assume they are stored
* physically in order in the file; AND that CBs are not shared between streams), we note which
* MBs cannot possibly be reused anymore (next RG starts in the next CB). We decref the refcount
* from (4) in such case.
* 6) Given that RG end boundaries in ORC are estimates, we can request data from cache and then
* not use it; thus, at the end we go thru all the MBs, and release those not released by (5).
*/
class EncodedReaderImpl implements EncodedReader {
public static final Logger LOG = LoggerFactory.getLogger(EncodedReaderImpl.class);
private static final Object POOLS_CREATION_LOCK = new Object();
private static Pools POOLS;
private static class Pools {
Pool<CacheChunk> tccPool;
Pool<ProcCacheChunk> pccPool;
Pool<OrcEncodedColumnBatch> ecbPool;
Pool<ColumnStreamData> csdPool;
}
private final static DiskRangeListFactory CC_FACTORY = new DiskRangeListFactory() {
@Override
public DiskRangeList createCacheChunk(MemoryBuffer buffer, long offset, long end) {
CacheChunk tcc = POOLS.tccPool.take();
tcc.init(buffer, offset, end);
return tcc;
}
};
private final Object fileKey;
private final DataReader dataReader;
private boolean isDataReaderOpen = false;
private final CompressionCodec codec;
private final int bufferSize;
private final List<OrcProto.Type> types;
private final long rowIndexStride;
private final DataCache cacheWrapper;
private boolean isTracingEnabled;
public EncodedReaderImpl(Object fileKey, List<OrcProto.Type> types, CompressionCodec codec,
int bufferSize, long strideRate, DataCache cacheWrapper, DataReader dataReader,
PoolFactory pf) throws IOException {
this.fileKey = fileKey;
this.codec = codec;
this.types = types;
this.bufferSize = bufferSize;
this.rowIndexStride = strideRate;
this.cacheWrapper = cacheWrapper;
this.dataReader = dataReader;
if (POOLS != null) return;
if (pf == null) {
pf = new NoopPoolFactory();
}
Pools pools = createPools(pf);
synchronized (POOLS_CREATION_LOCK) {
if (POOLS != null) return;
POOLS = pools;
}
}
/** Helper context for each column being read */
private static final class ColumnReadContext {
public ColumnReadContext(int colIx, OrcProto.ColumnEncoding encoding,
OrcProto.RowIndex rowIndex) {
this.encoding = encoding;
this.rowIndex = rowIndex;
this.colIx = colIx;
streamCount = 0;
}
public static final int MAX_STREAMS = OrcProto.Stream.Kind.ROW_INDEX_VALUE;
/** The number of streams that are part of this column. */
int streamCount = 0;
final StreamContext[] streams = new StreamContext[MAX_STREAMS];
/** Column encoding. */
OrcProto.ColumnEncoding encoding;
/** Column rowindex. */
OrcProto.RowIndex rowIndex;
/** Column index in the file. */
int colIx;
public void addStream(long offset, OrcProto.Stream stream, int indexIx) {
streams[streamCount++] = new StreamContext(stream, offset, indexIx);
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(" column_index: ").append(colIx);
sb.append(" encoding: ").append(encoding);
sb.append(" stream_count: ").append(streamCount);
int i = 0;
for (StreamContext sc : streams) {
if (sc != null) {
sb.append(" stream_").append(i).append(":").append(sc.toString());
}
i++;
}
return sb.toString();
}
}
private static final class StreamContext {
public StreamContext(OrcProto.Stream stream, long streamOffset, int streamIndexOffset) {
this.kind = stream.getKind();
this.length = stream.getLength();
this.offset = streamOffset;
this.streamIndexOffset = streamIndexOffset;
}
/** Offsets of each stream in the column. */
public long offset, length;
public int streamIndexOffset;
public OrcProto.Stream.Kind kind;
/** Iterators for the buffers; used to maintain position in per-rg reading. */
DiskRangeList bufferIter;
/** Saved stripe-level stream, to reuse for each RG (e.g. dictionaries). */
ColumnStreamData stripeLevelStream;
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(" kind: ").append(kind);
sb.append(" offset: ").append(offset);
sb.append(" length: ").append(length);
sb.append(" index_offset: ").append(streamIndexOffset);
return sb.toString();
}
}
@Override
public void readEncodedColumns(int stripeIx, StripeInformation stripe,
OrcProto.RowIndex[] indexes, List<OrcProto.ColumnEncoding> encodings, List<OrcProto.Stream> streamList,
boolean[] included, boolean[][] colRgs,
Consumer<OrcEncodedColumnBatch> consumer) throws IOException {
// Note: for now we don't have to setError here, caller will setError if we throw.
// We are also not supposed to call setDone, since we are only part of the operation.
long stripeOffset = stripe.getOffset();
// 1. Figure out what we have to read.
long offset = 0; // Stream offset in relation to the stripe.
// 1.1. Figure out which columns have a present stream
boolean[] hasNull = RecordReaderUtils.findPresentStreamsByColumn(streamList, types);
if (isTracingEnabled) {
LOG.trace("The following columns have PRESENT streams: " + arrayToString(hasNull));
}
// We assume stream list is sorted by column and that non-data
// streams do not interleave data streams for the same column.
// 1.2. With that in mind, determine disk ranges to read/get from cache (not by stream).
int colRgIx = -1, lastColIx = -1;
ColumnReadContext[] colCtxs = new ColumnReadContext[colRgs.length];
boolean[] includedRgs = null;
boolean isCompressed = (codec != null);
CreateHelper listToRead = new CreateHelper();
boolean hasIndexOnlyCols = false;
for (OrcProto.Stream stream : streamList) {
long length = stream.getLength();
int colIx = stream.getColumn();
OrcProto.Stream.Kind streamKind = stream.getKind();
if (!included[colIx] || StreamName.getArea(streamKind) != StreamName.Area.DATA) {
// We have a stream for included column, but in future it might have no data streams.
// It's more like "has at least one column included that has an index stream".
hasIndexOnlyCols = hasIndexOnlyCols | included[colIx];
if (isTracingEnabled) {
LOG.trace("Skipping stream: " + streamKind + " at " + offset + ", " + length);
}
offset += length;
continue;
}
ColumnReadContext ctx = null;
if (lastColIx != colIx) {
++colRgIx;
assert colCtxs[colRgIx] == null;
lastColIx = colIx;
includedRgs = colRgs[colRgIx];
ctx = colCtxs[colRgIx] = new ColumnReadContext(
colIx, encodings.get(colIx), indexes[colIx]);
if (isTracingEnabled) {
LOG.trace("Creating context " + colRgIx + " for column " + colIx + ":" + ctx.toString());
}
} else {
ctx = colCtxs[colRgIx];
assert ctx != null;
}
int indexIx = RecordReaderUtils.getIndexPosition(ctx.encoding.getKind(),
types.get(colIx).getKind(), streamKind, isCompressed, hasNull[colIx]);
ctx.addStream(offset, stream, indexIx);
if (isTracingEnabled) {
LOG.trace("Adding stream for column " + colIx + ": " + streamKind + " at " + offset
+ ", " + length + ", index position " + indexIx);
}
if (includedRgs == null || RecordReaderUtils.isDictionary(streamKind, encodings.get(colIx))) {
RecordReaderUtils.addEntireStreamToRanges(offset, length, listToRead, true);
if (isTracingEnabled) {
LOG.trace("Will read whole stream " + streamKind + "; added to " + listToRead.getTail());
}
} else {
RecordReaderUtils.addRgFilteredStreamToRanges(stream, includedRgs,
codec != null, indexes[colIx], encodings.get(colIx), types.get(colIx),
bufferSize, hasNull[colIx], offset, length, listToRead, true);
}
offset += length;
}
boolean hasFileId = this.fileKey != null;
if (listToRead.get() == null) {
// No data to read for this stripe. Check if we have some included index-only columns.
// TODO: there may be a bug here. Could there be partial RG filtering on index-only column?
if (hasIndexOnlyCols && (includedRgs == null)) {
OrcEncodedColumnBatch ecb = POOLS.ecbPool.take();
ecb.init(fileKey, stripeIx, OrcEncodedColumnBatch.ALL_RGS, colRgs.length);
consumer.consumeData(ecb);
} else {
LOG.warn("Nothing to read for stripe [" + stripe + "]");
}
return;
}
// 2. Now, read all of the ranges from cache or disk.
DiskRangeList.MutateHelper toRead = new DiskRangeList.MutateHelper(listToRead.get());
if (isTracingEnabled && LOG.isInfoEnabled()) {
LOG.trace("Resulting disk ranges to read (file " + fileKey + "): "
+ RecordReaderUtils.stringifyDiskRanges(toRead.next));
}
BooleanRef isAllInCache = new BooleanRef();
if (hasFileId) {
cacheWrapper.getFileData(fileKey, toRead.next, stripeOffset, CC_FACTORY, isAllInCache);
if (isTracingEnabled && LOG.isInfoEnabled()) {
LOG.trace("Disk ranges after cache (file " + fileKey + ", base offset " + stripeOffset
+ "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next));
}
}
if (!isAllInCache.value) {
if (!isDataReaderOpen) {
this.dataReader.open();
isDataReaderOpen = true;
}
dataReader.readFileData(toRead.next, stripeOffset, cacheWrapper.getAllocator().isDirectAlloc());
}
// 3. For uncompressed case, we need some special processing before read.
DiskRangeList iter = toRead.next; // Keep "toRead" list for future use, don't extract().
if (codec == null) {
for (int colIxMod = 0; colIxMod < colRgs.length; ++colIxMod) {
ColumnReadContext ctx = colCtxs[colIxMod];
for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) {
StreamContext sctx = ctx.streams[streamIx];
DiskRangeList newIter = preReadUncompressedStream(
stripeOffset, iter, sctx.offset, sctx.offset + sctx.length);
if (newIter != null) {
iter = newIter;
}
}
}
if (isTracingEnabled) {
LOG.trace("Disk ranges after pre-read (file " + fileKey + ", base offset "
+ stripeOffset + "): " + RecordReaderUtils.stringifyDiskRanges(toRead.next));
}
iter = toRead.next; // Reset the iter to start.
}
// 4. Finally, decompress data, map per RG, and return to caller.
// We go by RG and not by column because that is how data is processed.
int rgCount = (int)Math.ceil((double)stripe.getNumberOfRows() / rowIndexStride);
for (int rgIx = 0; rgIx < rgCount; ++rgIx) {
boolean isLastRg = rgIx == rgCount - 1;
// Create the batch we will use to return data for this RG.
OrcEncodedColumnBatch ecb = POOLS.ecbPool.take();
ecb.init(fileKey, stripeIx, rgIx, colRgs.length);
boolean isRGSelected = true;
for (int colIxMod = 0; colIxMod < colRgs.length; ++colIxMod) {
// TODO: simplify this now that high-level cache has been removed.
if (colRgs[colIxMod] != null && !colRgs[colIxMod][rgIx]) {
// RG x col filtered.
isRGSelected = false;
continue;
}
ColumnReadContext ctx = colCtxs[colIxMod];
OrcProto.RowIndexEntry index = ctx.rowIndex.getEntry(rgIx),
nextIndex = isLastRg ? null : ctx.rowIndex.getEntry(rgIx + 1);
ecb.initColumn(colIxMod, ctx.colIx, OrcEncodedColumnBatch.MAX_DATA_STREAMS);
for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) {
StreamContext sctx = ctx.streams[streamIx];
ColumnStreamData cb = null;
if (RecordReaderUtils.isDictionary(sctx.kind, ctx.encoding)) {
// This stream is for entire stripe and needed for every RG; uncompress once and reuse.
if (isTracingEnabled) {
LOG.trace("Getting stripe-level stream [" + sctx.kind + ", " + ctx.encoding + "] for"
+ " column " + ctx.colIx + " RG " + rgIx + " at " + sctx.offset + ", " + sctx.length);
}
if (sctx.stripeLevelStream == null) {
sctx.stripeLevelStream = POOLS.csdPool.take();
// We will be using this for each RG while also sending RGs to processing.
// To avoid buffers being unlocked, run refcount one ahead; we will not increase
// it when building the last RG, so each RG processing will decref once, and the
// last one will unlock the buffers.
sctx.stripeLevelStream.incRef();
// For stripe-level streams we don't need the extra refcount on the block.
// See class comment about refcounts.
long unlockUntilCOffset = sctx.offset + sctx.length;
DiskRangeList lastCached = readEncodedStream(stripeOffset, iter,
sctx.offset, sctx.offset + sctx.length, sctx.stripeLevelStream,
unlockUntilCOffset, sctx.offset);
if (lastCached != null) {
iter = lastCached;
}
}
if (!isLastRg) {
sctx.stripeLevelStream.incRef();
}
cb = sctx.stripeLevelStream;
} else {
// This stream can be separated by RG using index. Let's do that.
// Offset to where this RG begins.
long cOffset = sctx.offset + index.getPositions(sctx.streamIndexOffset);
// Offset relative to the beginning of the stream of where this RG ends.
long nextCOffsetRel = isLastRg ? sctx.length
: nextIndex.getPositions(sctx.streamIndexOffset);
// Offset before which this RG is guaranteed to end. Can only be estimated.
// We estimate the same way for compressed and uncompressed for now.
long endCOffset = sctx.offset + RecordReaderUtils.estimateRgEndOffset(
isCompressed, isLastRg, nextCOffsetRel, sctx.length, bufferSize);
// As we read, we can unlock initial refcounts for the buffers that end before
// the data that we need for this RG.
long unlockUntilCOffset = sctx.offset + nextCOffsetRel;
cb = createRgColumnStreamData(
rgIx, isLastRg, ctx.colIx, sctx, cOffset, endCOffset, isCompressed);
boolean isStartOfStream = sctx.bufferIter == null;
DiskRangeList lastCached = readEncodedStream(stripeOffset,
(isStartOfStream ? iter : sctx.bufferIter), cOffset, endCOffset, cb,
unlockUntilCOffset, sctx.offset);
if (lastCached != null) {
sctx.bufferIter = iter = lastCached;
}
}
ecb.setStreamData(colIxMod, sctx.kind.getNumber(), cb);
}
}
if (isRGSelected) {
consumer.consumeData(ecb);
}
}
if (isTracingEnabled) {
LOG.trace("Disk ranges after preparing all the data "
+ RecordReaderUtils.stringifyDiskRanges(toRead.next));
}
// Release the unreleased buffers. See class comment about refcounts.
releaseInitialRefcounts(toRead.next);
releaseCacheChunksIntoObjectPool(toRead.next);
}
private static String arrayToString(boolean[] a) {
StringBuilder b = new StringBuilder();
b.append('[');
for (int i = 0; i < a.length; ++i) {
b.append(a[i] ? "1" : "0");
}
b.append(']');
return b.toString();
}
private ColumnStreamData createRgColumnStreamData(int rgIx, boolean isLastRg,
int colIx, StreamContext sctx, long cOffset, long endCOffset, boolean isCompressed) {
ColumnStreamData cb = POOLS.csdPool.take();
cb.incRef();
if (isTracingEnabled) {
LOG.trace("Getting data for column "+ colIx + " " + (isLastRg ? "last " : "")
+ "RG " + rgIx + " stream " + sctx.kind + " at " + sctx.offset + ", "
+ sctx.length + " index position " + sctx.streamIndexOffset + ": " +
(isCompressed ? "" : "un") + "compressed [" + cOffset + ", " + endCOffset + ")");
}
return cb;
}
private void releaseInitialRefcounts(DiskRangeList current) {
while (current != null) {
DiskRangeList toFree = current;
current = current.next;
if (!(toFree instanceof CacheChunk)) continue;
CacheChunk cc = (CacheChunk)toFree;
if (cc.getBuffer() == null) continue;
MemoryBuffer buffer = cc.getBuffer();
cacheWrapper.releaseBuffer(buffer);
cc.setBuffer(null);
}
}
@Override
public void setTracing(boolean isEnabled) {
this.isTracingEnabled = isEnabled;
}
@Override
public void close() throws IOException {
dataReader.close();
}
/**
* Fake cache chunk used for uncompressed data. Used in preRead for uncompressed files.
* Makes assumptions about preRead code; for example, we add chunks here when they are
* already in the linked list, without unlinking. So, we record the start position in the
* original list, and then, when someone adds the next element, we merely increase the number
* of elements one has to traverse from that position to get the whole list.
*/
private static class UncompressedCacheChunk extends CacheChunk {
private BufferChunk chunk;
private int count;
public UncompressedCacheChunk(BufferChunk bc) {
super();
init(null, bc.getOffset(), bc.getEnd());
chunk = bc;
count = 1;
}
public void addChunk(BufferChunk bc) {
assert bc.getOffset() == this.getEnd();
this.end = bc.getEnd();
++count;
}
public BufferChunk getChunk() {
return chunk;
}
public int getCount() {
return count;
}
@Override
public void handleCacheCollision(DataCache cacheWrapper,
MemoryBuffer replacementBuffer, List<MemoryBuffer> cacheBuffers) {
assert cacheBuffers == null;
// This is done at pre-read stage where there's nothing special w/refcounts. Just release.
cacheWrapper.getAllocator().deallocate(getBuffer());
// Replace the buffer in our big range list, as well as in current results.
this.setBuffer(replacementBuffer);
}
public void clear() {
this.chunk = null;
this.count = -1;
}
}
/**
* CacheChunk that is pre-created for new cache data; initially, it contains an original disk
* buffer and an unallocated MemoryBuffer object. Before we expose it, the MB is allocated,
* the data is decompressed, and original compressed data is discarded. The chunk lives on in
* the DiskRange list created for the request, and everyone treats it like regular CacheChunk.
*/
private static class ProcCacheChunk extends CacheChunk {
public void init(long cbStartOffset, long cbEndOffset, boolean isCompressed,
ByteBuffer originalData, MemoryBuffer targetBuffer, int originalCbIndex) {
super.init(targetBuffer, cbStartOffset, cbEndOffset);
this.isOriginalDataCompressed = isCompressed;
this.originalData = originalData;
this.originalCbIndex = originalCbIndex;
}
@Override
public void reset() {
super.reset();
this.originalData = null;
}
@Override
public String toString() {
return super.toString() + ", original is set " + (this.originalData != null)
+ ", buffer was replaced " + (originalCbIndex == -1);
}
@Override
public void handleCacheCollision(DataCache cacheWrapper, MemoryBuffer replacementBuffer,
List<MemoryBuffer> cacheBuffers) {
assert originalCbIndex >= 0;
// Had the put succeeded for our new buffer, it would have refcount of 2 - 1 from put,
// and 1 from notifyReused call above. "Old" buffer now has the 1 from put; new buffer
// is not in cache.
cacheWrapper.getAllocator().deallocate(getBuffer());
cacheWrapper.reuseBuffer(replacementBuffer);
// Replace the buffer in our big range list, as well as in current results.
this.buffer = replacementBuffer;
cacheBuffers.set(originalCbIndex, replacementBuffer);
originalCbIndex = -1; // This can only happen once at decompress time.
}
/** Original data that will be turned into encoded cache data in this.buffer and reset. */
private ByteBuffer originalData = null;
/** Whether originalData is compressed. */
private boolean isOriginalDataCompressed;
/** Index of the MemoryBuffer corresponding to this object inside the result list. If we
* hit a cache collision, we will replace this memory buffer with the one from cache at
* this index, without having to look for it. */
private int originalCbIndex;
}
/**
* Uncompresses part of the stream. RGs can overlap, so we cannot just go and decompress
* and remove what we have returned. We will keep iterator as a "hint" point.
* @param baseOffset Absolute offset of boundaries and ranges relative to file, for cache keys.
* @param start Ordered ranges containing file data. Helpful if they point close to cOffset.
* @param cOffset Start offset to decompress.
* @param endCOffset End offset to decompress; estimate, partial CBs will be ignored.
* @param csd Stream data, to add the results.
* @param unlockUntilCOffset The offset until which the buffers can be unlocked in cache, as
* they will not be used in future calls (see the class comment in
* EncodedReaderImpl about refcounts).
* @return Last buffer cached during decompression. Cache buffers are never removed from
* the master list, so they are safe to keep as iterators for various streams.
*/
public DiskRangeList readEncodedStream(long baseOffset, DiskRangeList start, long cOffset,
long endCOffset, ColumnStreamData csd, long unlockUntilCOffset, long streamOffset)
throws IOException {
if (csd.getCacheBuffers() == null) {
csd.setCacheBuffers(new ArrayList<MemoryBuffer>());
} else {
csd.getCacheBuffers().clear();
}
if (cOffset == endCOffset) return null;
boolean isCompressed = codec != null;
List<ProcCacheChunk> toDecompress = null;
List<ByteBuffer> toRelease = null;
List<IncompleteCb> badEstimates = null;
if (isCompressed) {
toRelease = !dataReader.isTrackingDiskRanges() ? null : new ArrayList<ByteBuffer>();
toDecompress = new ArrayList<>();
badEstimates = new ArrayList<>();
}
// 1. Find our bearings in the stream. Normally, iter will already point either to where we
// want to be, or just before. However, RGs can overlap due to encoding, so we may have
// to return to a previous block.
DiskRangeList current = findExactPosition(start, cOffset);
if (isTracingEnabled) {
LOG.trace("Starting read for [" + cOffset + "," + endCOffset + ") at " + current);
}
CacheChunk lastUncompressed = null;
// 2. Go thru the blocks; add stuff to results and prepare the decompression work (see below).
lastUncompressed = isCompressed ?
prepareRangesForCompressedRead(cOffset, endCOffset, streamOffset,
unlockUntilCOffset, current, csd, toRelease, toDecompress, badEstimates)
: prepareRangesForUncompressedRead(
cOffset, endCOffset, streamOffset, unlockUntilCOffset, current, csd);
// 2.5. Remember the bad estimates for future reference.
if (badEstimates != null && !badEstimates.isEmpty()) {
// Relies on the fact that cache does not actually store these.
DiskRange[] cacheKeys = badEstimates.toArray(new DiskRange[badEstimates.size()]);
long[] result = cacheWrapper.putFileData(fileKey, cacheKeys, null, baseOffset);
assert result == null; // We don't expect conflicts from bad estimates.
}
if (toDecompress == null || toDecompress.isEmpty()) return lastUncompressed; // Nothing to do.
// 3. Allocate the buffers, prepare cache keys.
// At this point, we have read all the CBs we need to read. cacheBuffers contains some cache
// data and some unallocated membufs for decompression. toDecompress contains all the work we
// need to do, and each item points to one of the membufs in cacheBuffers as target. The iter
// has also been adjusted to point to these buffers instead of compressed data for the ranges.
MemoryBuffer[] targetBuffers = new MemoryBuffer[toDecompress.size()];
DiskRange[] cacheKeys = new DiskRange[toDecompress.size()];
int ix = 0;
for (ProcCacheChunk chunk : toDecompress) {
cacheKeys[ix] = chunk; // Relies on the fact that cache does not actually store these.
targetBuffers[ix] = chunk.getBuffer();
++ix;
}
cacheWrapper.getAllocator().allocateMultiple(targetBuffers, bufferSize);
// 4. Now decompress (or copy) the data into cache buffers.
for (ProcCacheChunk chunk : toDecompress) {
ByteBuffer dest = chunk.getBuffer().getByteBufferRaw();
if (chunk.isOriginalDataCompressed) {
decompressChunk(chunk.originalData, codec, dest);
} else {
copyUncompressedChunk(chunk.originalData, dest);
}
chunk.originalData = null;
if (isTracingEnabled) {
LOG.trace("Locking " + chunk.getBuffer() + " due to reuse (after decompression)");
}
cacheWrapper.reuseBuffer(chunk.getBuffer());
}
// 5. Release original compressed buffers to zero-copy reader if needed.
if (toRelease != null) {
assert dataReader.isTrackingDiskRanges();
for (ByteBuffer buffer : toRelease) {
dataReader.releaseBuffer(buffer);
}
}
// 6. Finally, put uncompressed data to cache.
if (fileKey != null) {
long[] collisionMask = cacheWrapper.putFileData(fileKey, cacheKeys, targetBuffers, baseOffset);
processCacheCollisions(collisionMask, toDecompress, targetBuffers, csd.getCacheBuffers());
}
// 7. It may happen that we know we won't use some compression buffers anymore.
// Release initial refcounts.
for (ProcCacheChunk chunk : toDecompress) {
ponderReleaseInitialRefcount(unlockUntilCOffset, streamOffset, chunk);
}
return lastUncompressed;
}
private CacheChunk prepareRangesForCompressedRead(long cOffset, long endCOffset,
long streamOffset, long unlockUntilCOffset, DiskRangeList current, ColumnStreamData columnStreamData,
List<ByteBuffer> toRelease, List<ProcCacheChunk> toDecompress,
List<IncompleteCb> badEstimates) throws IOException {
if (cOffset > current.getOffset()) {
// Target compression block is in the middle of the range; slice the range in two.
current = current.split(cOffset).next;
}
long currentOffset = cOffset;
CacheChunk lastUncompressed = null;
while (true) {
DiskRangeList next = null;
if (current instanceof CacheChunk) {
// 2a. This is a decoded compression buffer, add as is.
CacheChunk cc = (CacheChunk)current;
if (isTracingEnabled) {
LOG.trace("Locking " + cc.getBuffer() + " due to reuse");
}
cacheWrapper.reuseBuffer(cc.getBuffer());
columnStreamData.getCacheBuffers().add(cc.getBuffer());
currentOffset = cc.getEnd();
if (isTracingEnabled) {
LOG.trace("Adding an already-uncompressed buffer " + cc.getBuffer());
}
ponderReleaseInitialRefcount(unlockUntilCOffset, streamOffset, cc);
lastUncompressed = cc;
next = current.next;
if (next != null && (endCOffset >= 0 && currentOffset < endCOffset)
&& next.getOffset() >= endCOffset) {
throw new IOException("Expected data at " + currentOffset + " (reading until "
+ endCOffset + "), but the next buffer starts at " + next.getOffset());
}
} else if (current instanceof IncompleteCb) {
// 2b. This is a known incomplete CB caused by ORC CB end boundaries being estimates.
if (isTracingEnabled) {
LOG.trace("Cannot read " + current);
}
next = null;
currentOffset = -1;
} else {
// 2c. This is a compressed buffer. We need to uncompress it; the buffer can comprise
// several disk ranges, so we might need to combine them.
BufferChunk bc = (BufferChunk)current;
ProcCacheChunk newCached = addOneCompressionBuffer(
bc, columnStreamData.getCacheBuffers(), toDecompress, toRelease, badEstimates);
lastUncompressed = (newCached == null) ? lastUncompressed : newCached;
next = (newCached != null) ? newCached.next : null;
currentOffset = (next != null) ? next.getOffset() : -1;
}
if (next == null || (endCOffset >= 0 && currentOffset >= endCOffset)) {
break;
}
current = next;
}
return lastUncompressed;
}
private CacheChunk prepareRangesForUncompressedRead(long cOffset, long endCOffset,
long streamOffset, long unlockUntilCOffset, DiskRangeList current, ColumnStreamData columnStreamData)
throws IOException {
long currentOffset = cOffset;
CacheChunk lastUncompressed = null;
boolean isFirst = true;
while (true) {
DiskRangeList next = null;
assert current instanceof CacheChunk;
lastUncompressed = (CacheChunk)current;
if (isTracingEnabled) {
LOG.trace("Locking " + lastUncompressed.getBuffer() + " due to reuse");
}
cacheWrapper.reuseBuffer(lastUncompressed.getBuffer());
if (isFirst) {
columnStreamData.setIndexBaseOffset((int)(lastUncompressed.getOffset() - streamOffset));
isFirst = false;
}
columnStreamData.getCacheBuffers().add(lastUncompressed.getBuffer());
currentOffset = lastUncompressed.getEnd();
if (isTracingEnabled) {
LOG.trace("Adding an uncompressed buffer " + lastUncompressed.getBuffer());
}
ponderReleaseInitialRefcount(unlockUntilCOffset, streamOffset, lastUncompressed);
next = current.next;
if (next == null || (endCOffset >= 0 && currentOffset >= endCOffset)) {
break;
}
current = next;
}
return lastUncompressed;
}
/**
* To achieve some sort of consistent cache boundaries, we will cache streams deterministically;
* in segments starting w/stream start, and going for either stream size or some fixed size.
* If we are not reading the entire segment's worth of data, then we will not cache the partial
* RGs; the breakage of cache assumptions (no interleaving blocks, etc.) is way too much PITA
* to handle just for this case.
* We could avoid copy in non-zcr case and manage the buffer that was not allocated by our
* allocator. Uncompressed case is not mainline though so let's not complicate it.
*/
private DiskRangeList preReadUncompressedStream(long baseOffset,
DiskRangeList start, long streamOffset, long streamEnd) throws IOException {
if (streamOffset == streamEnd) return null;
List<UncompressedCacheChunk> toCache = null;
List<ByteBuffer> toRelease = null;
// 1. Find our bearings in the stream.
DiskRangeList current = findIntersectingPosition(start, streamOffset, streamEnd);
if (isTracingEnabled) {
LOG.trace("Starting pre-read for [" + streamOffset + "," + streamEnd + ") at " + current);
}
if (streamOffset > current.getOffset()) {
// Target compression block is in the middle of the range; slice the range in two.
current = current.split(streamOffset).next;
}
// Account for maximum cache buffer size.
long streamLen = streamEnd - streamOffset;
int partSize = determineUncompressedPartSize(),
partCount = (int)(streamLen / partSize) + (((streamLen % partSize) != 0) ? 1 : 0);
CacheChunk lastUncompressed = null;
MemoryBuffer[] singleAlloc = new MemoryBuffer[1];
for (int i = 0; i < partCount; ++i) {
long partOffset = streamOffset + (i * partSize),
partEnd = Math.min(partOffset + partSize, streamEnd);
long hasEntirePartTo = partOffset; // We have 0 bytes of data for this part, for now.
if (current == null) {
break; // We have no data from this point on (could be unneeded), skip.
}
assert partOffset <= current.getOffset();
if (partOffset == current.getOffset() && current instanceof CacheChunk) {
// We assume cache chunks would always match the way we read, so check and skip it.
assert current.getOffset() == partOffset && current.getEnd() == partEnd;
lastUncompressed = (CacheChunk)current;
current = current.next;
continue;
}
if (current.getOffset() >= partEnd) {
continue; // We have no data at all for this part of the stream (could be unneeded), skip.
}
if (toRelease == null && dataReader.isTrackingDiskRanges()) {
toRelease = new ArrayList<ByteBuffer>();
}
// We have some disk buffers... see if we have entire part, etc.
UncompressedCacheChunk candidateCached = null; // We will cache if we have the entire part.
DiskRangeList next = current;
while (true) {
boolean noMoreDataForPart = (next == null || next.getOffset() >= partEnd);
if (noMoreDataForPart && hasEntirePartTo < partEnd && candidateCached != null) {
// We are missing a section at the end of the part... copy the start to non-cached.
lastUncompressed = copyAndReplaceCandidateToNonCached(
candidateCached, partOffset, hasEntirePartTo, cacheWrapper, singleAlloc);
candidateCached = null;
}
current = next;
if (noMoreDataForPart) break; // Done with this part.
boolean wasSplit = false;
if (current.getEnd() > partEnd) {
// If the current buffer contains multiple parts, split it.
current = current.split(partEnd);
wasSplit = true;
}
if (isTracingEnabled) {
LOG.trace("Processing uncompressed file data at ["
+ current.getOffset() + ", " + current.getEnd() + ")");
}
BufferChunk curBc = (BufferChunk)current;
if (!wasSplit && toRelease != null) {
toRelease.add(curBc.getChunk()); // TODO: is it valid to give zcr the modified 2nd part?
}
// Track if we still have the entire part.
long hadEntirePartTo = hasEntirePartTo;
// We have data until the end of current block if we had it until the beginning.
hasEntirePartTo = (hasEntirePartTo == current.getOffset()) ? current.getEnd() : -1;
if (hasEntirePartTo == -1) {
// We don't have the entire part; copy both whatever we intended to cache, and the rest,
// to an allocated buffer. We could try to optimize a bit if we have contiguous buffers
// with gaps, but it's probably not needed.
if (candidateCached != null) {
assert hadEntirePartTo != -1;
copyAndReplaceCandidateToNonCached(
candidateCached, partOffset, hadEntirePartTo, cacheWrapper, singleAlloc);
candidateCached = null;
}
lastUncompressed = copyAndReplaceUncompressedToNonCached(curBc, cacheWrapper, singleAlloc);
next = lastUncompressed.next; // There may be more data after the gap.
} else {
// So far we have all the data from the beginning of the part.
if (candidateCached == null) {
candidateCached = new UncompressedCacheChunk(curBc);
} else {
candidateCached.addChunk(curBc);
}
next = current.next;
}
}
if (candidateCached != null) {
if (toCache == null) {
toCache = new ArrayList<>(partCount - i);
}
toCache.add(candidateCached);
}
}
// 3. Allocate the buffers, prepare cache keys.
if (toCache == null) return lastUncompressed; // Nothing to copy and cache.
MemoryBuffer[] targetBuffers =
toCache.size() == 1 ? singleAlloc : new MemoryBuffer[toCache.size()];
targetBuffers[0] = null;
DiskRange[] cacheKeys = new DiskRange[toCache.size()];
int ix = 0;
for (UncompressedCacheChunk chunk : toCache) {
cacheKeys[ix] = chunk; // Relies on the fact that cache does not actually store these.
++ix;
}
cacheWrapper.getAllocator().allocateMultiple(
targetBuffers, (int)(partCount == 1 ? streamLen : partSize));
// 4. Now copy the data into cache buffers.
ix = 0;
for (UncompressedCacheChunk candidateCached : toCache) {
candidateCached.setBuffer(targetBuffers[ix]);
ByteBuffer dest = candidateCached.getBuffer().getByteBufferRaw();
copyAndReplaceUncompressedChunks(candidateCached, dest, candidateCached);
candidateCached.clear();
lastUncompressed = candidateCached;
++ix;
}
// 5. Release original compressed buffers to zero-copy reader if needed.
if (toRelease != null) {
assert dataReader.isTrackingDiskRanges();
for (ByteBuffer buf : toRelease) {
dataReader.releaseBuffer(buf);
}
}
// 6. Finally, put uncompressed data to cache.
if (fileKey != null) {
long[] collisionMask = cacheWrapper.putFileData(fileKey, cacheKeys, targetBuffers, baseOffset);
processCacheCollisions(collisionMask, toCache, targetBuffers, null);
}
return lastUncompressed;
}
private int determineUncompressedPartSize() {
// We will break the uncompressed data in the cache in the chunks that are the size
// of the prevalent ORC compression buffer (the default), or maximum allocation (since we
// cannot allocate bigger chunks), whichever is less.
long orcCbSizeDefault = ((Number)OrcConf.BUFFER_SIZE.getDefaultValue()).longValue();
int maxAllocSize = cacheWrapper.getAllocator().getMaxAllocation();
return (int)Math.min(maxAllocSize, orcCbSizeDefault);
}
private static void copyUncompressedChunk(ByteBuffer src, ByteBuffer dest) {
int startPos = dest.position(), startLim = dest.limit();
dest.put(src); // Copy uncompressed data to cache.
// Put moves position forward by the size of the data.
int newPos = dest.position();
if (newPos > startLim) {
throw new AssertionError("After copying, buffer [" + startPos + ", " + startLim
+ ") became [" + newPos + ", " + dest.limit() + ")");
}
dest.position(startPos);
dest.limit(newPos);
}
private static CacheChunk copyAndReplaceCandidateToNonCached(
UncompressedCacheChunk candidateCached, long partOffset,
long candidateEnd, DataCache cacheWrapper, MemoryBuffer[] singleAlloc) {
// We thought we had the entire part to cache, but we don't; convert start to
// non-cached. Since we are at the first gap, the previous stuff must be contiguous.
singleAlloc[0] = null;
cacheWrapper.getAllocator().allocateMultiple(singleAlloc, (int)(candidateEnd - partOffset));
MemoryBuffer buffer = singleAlloc[0];
cacheWrapper.reuseBuffer(buffer);
ByteBuffer dest = buffer.getByteBufferRaw();
CacheChunk tcc = POOLS.tccPool.take();
tcc.init(buffer, partOffset, candidateEnd);
copyAndReplaceUncompressedChunks(candidateCached, dest, tcc);
return tcc;
}
private static CacheChunk copyAndReplaceUncompressedToNonCached(
BufferChunk bc, DataCache cacheWrapper, MemoryBuffer[] singleAlloc) {
singleAlloc[0] = null;
cacheWrapper.getAllocator().allocateMultiple(singleAlloc, bc.getLength());
MemoryBuffer buffer = singleAlloc[0];
cacheWrapper.reuseBuffer(buffer);
ByteBuffer dest = buffer.getByteBufferRaw();
CacheChunk tcc = POOLS.tccPool.take();
tcc.init(buffer, bc.getOffset(), bc.getEnd());
copyUncompressedChunk(bc.getChunk(), dest);
bc.replaceSelfWith(tcc);
return tcc;
}
private static void copyAndReplaceUncompressedChunks(
UncompressedCacheChunk candidateCached, ByteBuffer dest, CacheChunk tcc) {
int startPos = dest.position(), startLim = dest.limit();
DiskRangeList next = null;
for (int i = 0; i < candidateCached.getCount(); ++i) {
BufferChunk chunk = (i == 0) ? candidateCached.getChunk() : (BufferChunk)next;
dest.put(chunk.getData());
next = chunk.next;
if (i == 0) {
chunk.replaceSelfWith(tcc);
} else {
chunk.removeSelf();
}
}
int newPos = dest.position();
if (newPos > startLim) {
throw new AssertionError("After copying, buffer [" + startPos + ", " + startLim
+ ") became [" + newPos + ", " + dest.limit() + ")");
}
dest.position(startPos);
dest.limit(newPos);
}
private static void decompressChunk(
ByteBuffer src, CompressionCodec codec, ByteBuffer dest) throws IOException {
int startPos = dest.position(), startLim = dest.limit();
codec.decompress(src, dest);
// Codec resets the position to 0 and limit to correct limit.
dest.position(startPos);
int newLim = dest.limit();
if (newLim > startLim) {
throw new AssertionError("After codec, buffer [" + startPos + ", " + startLim
+ ") became [" + dest.position() + ", " + newLim + ")");
}
}
public static void releaseCacheChunksIntoObjectPool(DiskRangeList current) {
while (current != null) {
if (current instanceof ProcCacheChunk) {
POOLS.pccPool.offer((ProcCacheChunk)current);
} else if (current instanceof CacheChunk) {
POOLS.tccPool.offer((CacheChunk)current);
}
current = current.next;
}
}
private void ponderReleaseInitialRefcount(
long unlockUntilCOffset, long streamStartOffset, CacheChunk cc) {
// Don't release if the buffer contains any data beyond the acceptable boundary.
if (cc.getEnd() > unlockUntilCOffset) return;
assert cc.getBuffer() != null;
try {
releaseInitialRefcount(cc, false);
} catch (AssertionError e) {
LOG.error("BUG: releasing initial refcount; stream start " + streamStartOffset + ", "
+ "unlocking until " + unlockUntilCOffset + " from [" + cc + "]: " + e.getMessage());
throw e;
}
// Release all the previous buffers that we may not have been able to release due to reuse,
// as long as they are still in the same stream and are not already released.
DiskRangeList prev = cc.prev;
while (true) {
if ((prev == null) || (prev.getEnd() <= streamStartOffset)
|| !(prev instanceof CacheChunk)) break;
CacheChunk prevCc = (CacheChunk)prev;
if (prevCc.buffer == null) break;
try {
releaseInitialRefcount(prevCc, true);
} catch (AssertionError e) {
LOG.error("BUG: releasing initial refcount; stream start " + streamStartOffset + ", "
+ "unlocking until " + unlockUntilCOffset + " from [" + cc + "] and backtracked to ["
+ prevCc + "]: " + e.getMessage());
throw e;
}
prev = prev.prev;
}
}
private void releaseInitialRefcount(CacheChunk cc, boolean isBacktracking) {
// This is the last RG for which this buffer will be used. Remove the initial refcount
if (isTracingEnabled) {
LOG.trace("Unlocking " + cc.getBuffer() + " for the fetching thread"
+ (isBacktracking ? "; backtracking" : ""));
}
cacheWrapper.releaseBuffer(cc.getBuffer());
cc.setBuffer(null);
}
private void processCacheCollisions(long[] collisionMask,
List<? extends CacheChunk> toDecompress, MemoryBuffer[] targetBuffers,
List<MemoryBuffer> cacheBuffers) {
if (collisionMask == null) return;
assert collisionMask.length >= (toDecompress.size() >>> 6);
// There are some elements that were cached in parallel, take care of them.
long maskVal = -1;
for (int i = 0; i < toDecompress.size(); ++i) {
if ((i & 63) == 0) {
maskVal = collisionMask[i >>> 6];
}
if ((maskVal & 1) == 1) {
// Cache has found an old buffer for the key and put it into array instead of our new one.
CacheChunk replacedChunk = toDecompress.get(i);
MemoryBuffer replacementBuffer = targetBuffers[i];
if (isTracingEnabled) {
LOG.trace("Discarding data due to cache collision: " + replacedChunk.getBuffer()
+ " replaced with " + replacementBuffer);
}
assert replacedChunk.getBuffer() != replacementBuffer : i + " was not replaced in the results "
+ "even though mask is [" + Long.toBinaryString(maskVal) + "]";
replacedChunk.handleCacheCollision(cacheWrapper, replacementBuffer, cacheBuffers);
}
maskVal >>= 1;
}
}
/** Finds compressed offset in a stream and makes sure iter points to its position.
This may be necessary for obscure combinations of compression and encoding boundaries. */
private static DiskRangeList findExactPosition(DiskRangeList ranges, long offset) {
if (offset < 0) return ranges;
return findIntersectingPosition(ranges, offset, offset);
}
private static DiskRangeList findIntersectingPosition(DiskRangeList ranges, long offset, long end) {
if (offset < 0) return ranges;
// We expect the offset to be valid TODO: rather, validate
while (ranges.getEnd() <= offset) {
ranges = ranges.next;
}
while (ranges.getOffset() > end) {
ranges = ranges.prev;
}
// We are now on some intersecting buffer, find the first intersecting buffer.
while (ranges.prev != null && ranges.prev.getEnd() > offset) {
ranges = ranges.prev;
}
return ranges;
}
/**
* Reads one compression block from the source; handles compression blocks read from
* multiple ranges (usually, that would only happen with zcr).
* Adds stuff to cachedBuffers, toDecompress and toRelease (see below what each does).
* @param current BufferChunk where compression block starts.
* @param cacheBuffers The result buffer array to add pre-allocated target cache buffer.
* @param toDecompress The list of work to decompress - pairs of compressed buffers and the
* target buffers (same as the ones added to cacheBuffers).
* @param toRelease The list of buffers to release to zcr because they are no longer in use.
* @param badEstimates The list of bad estimates that cannot be decompressed.
* @return The resulting cache chunk.
*/
private ProcCacheChunk addOneCompressionBuffer(BufferChunk current,
List<MemoryBuffer> cacheBuffers, List<ProcCacheChunk> toDecompress,
List<ByteBuffer> toRelease, List<IncompleteCb> badEstimates) throws IOException {
ByteBuffer slice = null;
ByteBuffer compressed = current.getChunk();
long cbStartOffset = current.getOffset();
int b0 = compressed.get() & 0xff;
int b1 = compressed.get() & 0xff;
int b2 = compressed.get() & 0xff;
int chunkLength = (b2 << 15) | (b1 << 7) | (b0 >> 1);
if (chunkLength > bufferSize) {
throw new IllegalArgumentException("Buffer size too small. size = " +
bufferSize + " needed = " + chunkLength);
}
int consumedLength = chunkLength + OutStream.HEADER_SIZE;
long cbEndOffset = cbStartOffset + consumedLength;
boolean isUncompressed = ((b0 & 0x01) == 1);
if (isTracingEnabled) {
LOG.trace("Found CB at " + cbStartOffset + ", chunk length " + chunkLength + ", total "
+ consumedLength + ", " + (isUncompressed ? "not " : "") + "compressed");
}
if (compressed.remaining() >= chunkLength) {
// Simple case - CB fits entirely in the disk range.
slice = compressed.slice();
slice.limit(chunkLength);
ProcCacheChunk cc = addOneCompressionBlockByteBuffer(slice, isUncompressed,
cbStartOffset, cbEndOffset, chunkLength, current, toDecompress, cacheBuffers);
if (compressed.remaining() <= 0 && dataReader.isTrackingDiskRanges()) {
toRelease.add(compressed);
}
return cc;
}
if (current.getEnd() < cbEndOffset && !current.hasContiguousNext()) {
badEstimates.add(addIncompleteCompressionBuffer(cbStartOffset, current, 0));
return null; // This is impossible to read from this chunk.
}
// TODO: we could remove extra copy for isUncompressed case by copying directly to cache.
// We need to consolidate 2 or more buffers into one to decompress.
ByteBuffer copy = allocateBuffer(chunkLength, compressed.isDirect());
int remaining = chunkLength - compressed.remaining();
int originalPos = compressed.position();
copy.put(compressed);
if (isTracingEnabled) {
LOG.trace("Removing partial CB " + current + " from ranges after copying its contents");
}
DiskRangeList next = current.next;
current.removeSelf();
if (dataReader.isTrackingDiskRanges()) {
if (originalPos == 0) {
dataReader.releaseBuffer(compressed); // We copied the entire buffer.
} else {
toRelease.add(compressed); // There might be slices depending on this buffer.
}
}
int extraChunkCount = 0;
while (true) {
if (!(next instanceof BufferChunk)) {
throw new IOException("Trying to extend compressed block into uncompressed block " + next);
}
compressed = next.getData();
++extraChunkCount;
if (compressed.remaining() >= remaining) {
// This is the last range for this compression block. Yay!
slice = compressed.slice();
slice.limit(remaining);
copy.put(slice);
ProcCacheChunk cc = addOneCompressionBlockByteBuffer(copy, isUncompressed,
cbStartOffset, cbEndOffset, remaining, (BufferChunk)next, toDecompress, cacheBuffers);
if (compressed.remaining() <= 0 && dataReader.isTrackingDiskRanges()) {
dataReader.releaseBuffer(compressed); // We copied the entire buffer.
}
return cc;
}
remaining -= compressed.remaining();
copy.put(compressed);
if (dataReader.isTrackingDiskRanges()) {
dataReader.releaseBuffer(compressed); // We copied the entire buffer.
}
DiskRangeList tmp = next;
next = next.hasContiguousNext() ? next.next : null;
if (next != null) {
if (isTracingEnabled) {
LOG.trace("Removing partial CB " + tmp + " from ranges after copying its contents");
}
tmp.removeSelf();
} else {
badEstimates.add(addIncompleteCompressionBuffer(cbStartOffset, tmp, extraChunkCount));
return null; // This is impossible to read from this chunk.
}
}
}
private IncompleteCb addIncompleteCompressionBuffer(
long cbStartOffset, DiskRangeList target, int extraChunkCount) {
IncompleteCb icb = new IncompleteCb(cbStartOffset, target.getEnd());
if (isTracingEnabled) {
LOG.trace("Replacing " + target + " (and " + extraChunkCount + " previous chunks) with "
+ icb + " in the buffers");
}
target.replaceSelfWith(icb);
return icb;
}
/**
* Add one buffer with compressed data the results for addOneCompressionBuffer (see javadoc).
* @param fullCompressionBlock (fCB) Entire compression block, sliced or copied from disk data.
* @param isUncompressed Whether the data in the block is uncompressed.
* @param cbStartOffset Compressed start offset of the fCB.
* @param cbEndOffset Compressed end offset of the fCB.
* @param lastChunkLength The number of compressed bytes consumed from last *chunk* into fullCompressionBlock.
* @param lastChunk
* @param toDecompress See addOneCompressionBuffer.
* @param cacheBuffers See addOneCompressionBuffer.
* @return New cache buffer.
*/
private ProcCacheChunk addOneCompressionBlockByteBuffer(ByteBuffer fullCompressionBlock,
boolean isUncompressed, long cbStartOffset, long cbEndOffset, int lastChunkLength,
BufferChunk lastChunk, List<ProcCacheChunk> toDecompress, List<MemoryBuffer> cacheBuffers) {
// Prepare future cache buffer.
MemoryBuffer futureAlloc = cacheWrapper.getAllocator().createUnallocated();
// Add it to result in order we are processing.
cacheBuffers.add(futureAlloc);
// Add it to the list of work to decompress.
ProcCacheChunk cc = POOLS.pccPool.take();
cc.init(cbStartOffset, cbEndOffset, !isUncompressed,
fullCompressionBlock, futureAlloc, cacheBuffers.size() - 1);
toDecompress.add(cc);
// Adjust the compression block position.
if (isTracingEnabled) {
LOG.trace("Adjusting " + lastChunk + " to consume " + lastChunkLength + " compressed bytes");
}
lastChunk.getChunk().position(lastChunk.getChunk().position() + lastChunkLength);
// Finally, put it in the ranges list for future use (if shared between RGs).
// Before anyone else accesses it, it would have been allocated and decompressed locally.
if (lastChunk.getChunk().remaining() <= 0) {
if (isTracingEnabled) {
LOG.trace("Replacing " + lastChunk + " with " + cc + " in the buffers");
}
lastChunk.replaceSelfWith(cc);
} else {
if (isTracingEnabled) {
LOG.trace("Adding " + cc + " before " + lastChunk + " in the buffers");
}
lastChunk.insertPartBefore(cc);
}
return cc;
}
private static ByteBuffer allocateBuffer(int size, boolean isDirect) {
return isDirect ? ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size);
}
private static Pools createPools(PoolFactory pf) {
Pools pools = new Pools();
pools.pccPool = pf.createPool(1024, new PoolObjectHelper<ProcCacheChunk>() {
@Override
public ProcCacheChunk create() {
return new ProcCacheChunk();
}
@Override
public void resetBeforeOffer(ProcCacheChunk t) {
t.reset();
}
});
pools.tccPool = pf.createPool(1024, new PoolObjectHelper<CacheChunk>() {
@Override
public CacheChunk create() {
return new CacheChunk();
}
@Override
public void resetBeforeOffer(CacheChunk t) {
t.reset();
}
});
pools.ecbPool = pf.createEncodedColumnBatchPool();
pools.csdPool = pf.createColumnStreamDataPool();
return pools;
}
/** Pool factory that is used if another one isn't specified - just creates the objects. */
private static class NoopPoolFactory implements PoolFactory {
@Override
public <T> Pool<T> createPool(final int size, final PoolObjectHelper<T> helper) {
return new Pool<T>() {
public void offer(T t) {
}
@Override
public int size() {
return size;
}
public T take() {
return helper.create();
}
};
}
@Override
public Pool<OrcEncodedColumnBatch> createEncodedColumnBatchPool() {
return createPool(0, new PoolObjectHelper<OrcEncodedColumnBatch>() {
@Override
public OrcEncodedColumnBatch create() {
return new OrcEncodedColumnBatch();
}
@Override
public void resetBeforeOffer(OrcEncodedColumnBatch t) {
}
});
}
@Override
public Pool<ColumnStreamData> createColumnStreamDataPool() {
return createPool(0, new PoolObjectHelper<ColumnStreamData>() {
@Override
public ColumnStreamData create() {
return new ColumnStreamData();
}
@Override
public void resetBeforeOffer(ColumnStreamData t) {
}
});
}
}
}