blob: bcb54d6eb3d8f69c919530e5259039f3c0f902a0 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hive.common.Pool;
import org.apache.hadoop.hive.common.Pool.PoolObjectHelper;
import org.apache.orc.CompressionCodec;
import org.apache.orc.DataReader;
import org.apache.orc.OrcConf;
import org.apache.orc.impl.OutStream;
import org.apache.orc.impl.RecordReaderUtils;
import org.apache.orc.impl.StreamName;
import org.apache.orc.StripeInformation;
import org.apache.orc.impl.BufferChunk;
import org.apache.orc.OrcProto;
* Encoded reader implementation.
* Note about refcounts on cache blocks.
* When we get or put blocks into cache, they are "locked" (refcount++), so they cannot be evicted.
* We send the MemoryBuffer-s to caller as part of RG data; one MemoryBuffer can be used for many
* RGs (e.g. a dictionary, or multiple RGs per block). Also, we want to "unlock" MemoryBuffer-s in
* cache as soon as possible. This is how we deal with this:
* For dictionary case:
* 1) There's a separate refcount on the ColumnStreamData object we send to the caller. In the
* dictionary case, it's increased per RG, and callers don't release MBs if the containing
* ColumnStreamData is not ready to be released. This is done because dictionary can have many
* buffers; decrefing all of them for all RGs is more expensive; plus, decrefing in cache
* may be more expensive due to cache policy/etc.
* For non-dictionary case:
* 1) All the ColumnStreamData-s for normal data always have refcount 1; we return them once.
* 2) At all times, every MB in such cases has +1 refcount for each time we return it as part of CSD.
* 3) When caller is done, it therefore decrefs SB to 0, and decrefs all the MBs involved.
* 4) Additionally, we keep an extra +1 refcount "for the fetching thread". That way, if we return
* the MB to caller, and he decrefs it, the MB can't be evicted and will be there if we want to
* reuse it for some other RG.
* 5) As we read (we always read RGs in order and forward in each stream; we assume they are stored
* physically in order in the file; AND that CBs are not shared between streams), we note which
* MBs cannot possibly be reused anymore (next RG starts in the next CB). We decref the refcount
* from (4) in such case.
* 6) Given that RG end boundaries in ORC are estimates, we can request data from cache and then
* not use it; thus, at the end we go thru all the MBs, and release those not released by (5).
class EncodedReaderImpl implements EncodedReader {
public static final Logger LOG = LoggerFactory.getLogger(EncodedReaderImpl.class);
private static final Object POOLS_CREATION_LOCK = new Object();
private static Pools POOLS;
private static class Pools {
Pool<CacheChunk> tccPool;
Pool<ProcCacheChunk> pccPool;
Pool<OrcEncodedColumnBatch> ecbPool;
Pool<ColumnStreamData> csdPool;
private final static DiskRangeListFactory CC_FACTORY = new DiskRangeListFactory() {
public DiskRangeList createCacheChunk(MemoryBuffer buffer, long offset, long end) {
CacheChunk tcc = POOLS.tccPool.take();
tcc.init(buffer, offset, end);
return tcc;
private final Object fileKey;
private final DataReader dataReader;
private boolean isDataReaderOpen = false;
private final CompressionCodec codec;
private final int bufferSize;
private final List<OrcProto.Type> types;
private final long rowIndexStride;
private final DataCache cacheWrapper;
private boolean isTracingEnabled;
public EncodedReaderImpl(Object fileKey, List<OrcProto.Type> types, CompressionCodec codec,
int bufferSize, long strideRate, DataCache cacheWrapper, DataReader dataReader,
PoolFactory pf) throws IOException {
this.fileKey = fileKey;
this.codec = codec;
this.types = types;
this.bufferSize = bufferSize;
this.rowIndexStride = strideRate;
this.cacheWrapper = cacheWrapper;
this.dataReader = dataReader;
if (POOLS != null) return;
if (pf == null) {
pf = new NoopPoolFactory();
Pools pools = createPools(pf);
synchronized (POOLS_CREATION_LOCK) {
if (POOLS != null) return;
POOLS = pools;
/** Helper context for each column being read */
private static final class ColumnReadContext {
public ColumnReadContext(int colIx, OrcProto.ColumnEncoding encoding,
OrcProto.RowIndex rowIndex) {
this.encoding = encoding;
this.rowIndex = rowIndex;
this.colIx = colIx;
streamCount = 0;
public static final int MAX_STREAMS = OrcProto.Stream.Kind.ROW_INDEX_VALUE;
/** The number of streams that are part of this column. */
int streamCount = 0;
final StreamContext[] streams = new StreamContext[MAX_STREAMS];
/** Column encoding. */
OrcProto.ColumnEncoding encoding;
/** Column rowindex. */
OrcProto.RowIndex rowIndex;
/** Column index in the file. */
int colIx;
public void addStream(long offset, OrcProto.Stream stream, int indexIx) {
streams[streamCount++] = new StreamContext(stream, offset, indexIx);
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(" column_index: ").append(colIx);
sb.append(" encoding: ").append(encoding);
sb.append(" stream_count: ").append(streamCount);
int i = 0;
for (StreamContext sc : streams) {
if (sc != null) {
sb.append(" stream_").append(i).append(":").append(sc.toString());
return sb.toString();
private static final class StreamContext {
public StreamContext(OrcProto.Stream stream, long streamOffset, int streamIndexOffset) {
this.kind = stream.getKind();
this.length = stream.getLength();
this.offset = streamOffset;
this.streamIndexOffset = streamIndexOffset;
/** Offsets of each stream in the column. */
public long offset, length;
public int streamIndexOffset;
public OrcProto.Stream.Kind kind;
/** Iterators for the buffers; used to maintain position in per-rg reading. */
DiskRangeList bufferIter;
/** Saved stripe-level stream, to reuse for each RG (e.g. dictionaries). */
ColumnStreamData stripeLevelStream;
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(" kind: ").append(kind);
sb.append(" offset: ").append(offset);
sb.append(" length: ").append(length);
sb.append(" index_offset: ").append(streamIndexOffset);
return sb.toString();
public void readEncodedColumns(int stripeIx, StripeInformation stripe,
OrcProto.RowIndex[] indexes, List<OrcProto.ColumnEncoding> encodings, List<OrcProto.Stream> streamList,
boolean[] included, boolean[][] colRgs,
Consumer<OrcEncodedColumnBatch> consumer) throws IOException {
// Note: for now we don't have to setError here, caller will setError if we throw.
// We are also not supposed to call setDone, since we are only part of the operation.
long stripeOffset = stripe.getOffset();
// 1. Figure out what we have to read.
long offset = 0; // Stream offset in relation to the stripe.
// 1.1. Figure out which columns have a present stream
boolean[] hasNull = RecordReaderUtils.findPresentStreamsByColumn(streamList, types);
if (isTracingEnabled) {
LOG.trace("The following columns have PRESENT streams: " + arrayToString(hasNull));
// We assume stream list is sorted by column and that non-data
// streams do not interleave data streams for the same column.
// 1.2. With that in mind, determine disk ranges to read/get from cache (not by stream).
int colRgIx = -1, lastColIx = -1;
ColumnReadContext[] colCtxs = new ColumnReadContext[colRgs.length];
boolean[] includedRgs = null;
boolean isCompressed = (codec != null);
CreateHelper listToRead = new CreateHelper();
boolean hasIndexOnlyCols = false;
for (OrcProto.Stream stream : streamList) {
long length = stream.getLength();
int colIx = stream.getColumn();
OrcProto.Stream.Kind streamKind = stream.getKind();
if (!included[colIx] || StreamName.getArea(streamKind) != StreamName.Area.DATA) {
// We have a stream for included column, but in future it might have no data streams.
// It's more like "has at least one column included that has an index stream".
hasIndexOnlyCols = hasIndexOnlyCols | included[colIx];
if (isTracingEnabled) {
LOG.trace("Skipping stream: " + streamKind + " at " + offset + ", " + length);
offset += length;
ColumnReadContext ctx = null;
if (lastColIx != colIx) {
assert colCtxs[colRgIx] == null;
lastColIx = colIx;
includedRgs = colRgs[colRgIx];
ctx = colCtxs[colRgIx] = new ColumnReadContext(
colIx, encodings.get(colIx), indexes[colIx]);
if (isTracingEnabled) {
LOG.trace("Creating context " + colRgIx + " for column " + colIx + ":" + ctx.toString());
} else {
ctx = colCtxs[colRgIx];
assert ctx != null;
int indexIx = RecordReaderUtils.getIndexPosition(ctx.encoding.getKind(),
types.get(colIx).getKind(), streamKind, isCompressed, hasNull[colIx]);
ctx.addStream(offset, stream, indexIx);
if (isTracingEnabled) {
LOG.trace("Adding stream for column " + colIx + ": " + streamKind + " at " + offset
+ ", " + length + ", index position " + indexIx);
if (includedRgs == null || RecordReaderUtils.isDictionary(streamKind, encodings.get(colIx))) {
RecordReaderUtils.addEntireStreamToRanges(offset, length, listToRead, true);
if (isTracingEnabled) {
LOG.trace("Will read whole stream " + streamKind + "; added to " + listToRead.getTail());
} else {
RecordReaderUtils.addRgFilteredStreamToRanges(stream, includedRgs,
codec != null, indexes[colIx], encodings.get(colIx), types.get(colIx),
bufferSize, hasNull[colIx], offset, length, listToRead, true);
offset += length;
boolean hasFileId = this.fileKey != null;
if (listToRead.get() == null) {
// No data to read for this stripe. Check if we have some included index-only columns.
// TODO: there may be a bug here. Could there be partial RG filtering on index-only column?
if (hasIndexOnlyCols && (includedRgs == null)) {
OrcEncodedColumnBatch ecb = POOLS.ecbPool.take();
ecb.init(fileKey, stripeIx, OrcEncodedColumnBatch.ALL_RGS, colRgs.length);
} else {
LOG.warn("Nothing to read for stripe [" + stripe + "]");
// 2. Now, read all of the ranges from cache or disk.
DiskRangeList.MutateHelper toRead = new DiskRangeList.MutateHelper(listToRead.get());
if (isTracingEnabled && LOG.isInfoEnabled()) {
LOG.trace("Resulting disk ranges to read (file " + fileKey + "): "
+ RecordReaderUtils.stringifyDiskRanges(;
BooleanRef isAllInCache = new BooleanRef();
if (hasFileId) {
cacheWrapper.getFileData(fileKey,, stripeOffset, CC_FACTORY, isAllInCache);
if (isTracingEnabled && LOG.isInfoEnabled()) {
LOG.trace("Disk ranges after cache (file " + fileKey + ", base offset " + stripeOffset
+ "): " + RecordReaderUtils.stringifyDiskRanges(;
if (!isAllInCache.value) {
if (!isDataReaderOpen) {;
isDataReaderOpen = true;
dataReader.readFileData(, stripeOffset, cacheWrapper.getAllocator().isDirectAlloc());
// 3. For uncompressed case, we need some special processing before read.
DiskRangeList iter =; // Keep "toRead" list for future use, don't extract().
if (codec == null) {
for (int colIxMod = 0; colIxMod < colRgs.length; ++colIxMod) {
ColumnReadContext ctx = colCtxs[colIxMod];
for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) {
StreamContext sctx = ctx.streams[streamIx];
DiskRangeList newIter = preReadUncompressedStream(
stripeOffset, iter, sctx.offset, sctx.offset + sctx.length);
if (newIter != null) {
iter = newIter;
if (isTracingEnabled) {
LOG.trace("Disk ranges after pre-read (file " + fileKey + ", base offset "
+ stripeOffset + "): " + RecordReaderUtils.stringifyDiskRanges(;
iter =; // Reset the iter to start.
// 4. Finally, decompress data, map per RG, and return to caller.
// We go by RG and not by column because that is how data is processed.
int rgCount = (int)Math.ceil((double)stripe.getNumberOfRows() / rowIndexStride);
for (int rgIx = 0; rgIx < rgCount; ++rgIx) {
boolean isLastRg = rgIx == rgCount - 1;
// Create the batch we will use to return data for this RG.
OrcEncodedColumnBatch ecb = POOLS.ecbPool.take();
ecb.init(fileKey, stripeIx, rgIx, colRgs.length);
boolean isRGSelected = true;
for (int colIxMod = 0; colIxMod < colRgs.length; ++colIxMod) {
// TODO: simplify this now that high-level cache has been removed.
if (colRgs[colIxMod] != null && !colRgs[colIxMod][rgIx]) {
// RG x col filtered.
isRGSelected = false;
ColumnReadContext ctx = colCtxs[colIxMod];
OrcProto.RowIndexEntry index = ctx.rowIndex.getEntry(rgIx),
nextIndex = isLastRg ? null : ctx.rowIndex.getEntry(rgIx + 1);
ecb.initColumn(colIxMod, ctx.colIx, OrcEncodedColumnBatch.MAX_DATA_STREAMS);
for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) {
StreamContext sctx = ctx.streams[streamIx];
ColumnStreamData cb = null;
if (RecordReaderUtils.isDictionary(sctx.kind, ctx.encoding)) {
// This stream is for entire stripe and needed for every RG; uncompress once and reuse.
if (isTracingEnabled) {
LOG.trace("Getting stripe-level stream [" + sctx.kind + ", " + ctx.encoding + "] for"
+ " column " + ctx.colIx + " RG " + rgIx + " at " + sctx.offset + ", " + sctx.length);
if (sctx.stripeLevelStream == null) {
sctx.stripeLevelStream = POOLS.csdPool.take();
// We will be using this for each RG while also sending RGs to processing.
// To avoid buffers being unlocked, run refcount one ahead; we will not increase
// it when building the last RG, so each RG processing will decref once, and the
// last one will unlock the buffers.
// For stripe-level streams we don't need the extra refcount on the block.
// See class comment about refcounts.
long unlockUntilCOffset = sctx.offset + sctx.length;
DiskRangeList lastCached = readEncodedStream(stripeOffset, iter,
sctx.offset, sctx.offset + sctx.length, sctx.stripeLevelStream,
unlockUntilCOffset, sctx.offset);
if (lastCached != null) {
iter = lastCached;
if (!isLastRg) {
cb = sctx.stripeLevelStream;
} else {
// This stream can be separated by RG using index. Let's do that.
// Offset to where this RG begins.
long cOffset = sctx.offset + index.getPositions(sctx.streamIndexOffset);
// Offset relative to the beginning of the stream of where this RG ends.
long nextCOffsetRel = isLastRg ? sctx.length
: nextIndex.getPositions(sctx.streamIndexOffset);
// Offset before which this RG is guaranteed to end. Can only be estimated.
// We estimate the same way for compressed and uncompressed for now.
long endCOffset = sctx.offset + RecordReaderUtils.estimateRgEndOffset(
isCompressed, isLastRg, nextCOffsetRel, sctx.length, bufferSize);
// As we read, we can unlock initial refcounts for the buffers that end before
// the data that we need for this RG.
long unlockUntilCOffset = sctx.offset + nextCOffsetRel;
cb = createRgColumnStreamData(
rgIx, isLastRg, ctx.colIx, sctx, cOffset, endCOffset, isCompressed);
boolean isStartOfStream = sctx.bufferIter == null;
DiskRangeList lastCached = readEncodedStream(stripeOffset,
(isStartOfStream ? iter : sctx.bufferIter), cOffset, endCOffset, cb,
unlockUntilCOffset, sctx.offset);
if (lastCached != null) {
sctx.bufferIter = iter = lastCached;
ecb.setStreamData(colIxMod, sctx.kind.getNumber(), cb);
if (isRGSelected) {
if (isTracingEnabled) {
LOG.trace("Disk ranges after preparing all the data "
+ RecordReaderUtils.stringifyDiskRanges(;
// Release the unreleased buffers. See class comment about refcounts.
private static String arrayToString(boolean[] a) {
StringBuilder b = new StringBuilder();
for (int i = 0; i < a.length; ++i) {
b.append(a[i] ? "1" : "0");
return b.toString();
private ColumnStreamData createRgColumnStreamData(int rgIx, boolean isLastRg,
int colIx, StreamContext sctx, long cOffset, long endCOffset, boolean isCompressed) {
ColumnStreamData cb = POOLS.csdPool.take();
if (isTracingEnabled) {
LOG.trace("Getting data for column "+ colIx + " " + (isLastRg ? "last " : "")
+ "RG " + rgIx + " stream " + sctx.kind + " at " + sctx.offset + ", "
+ sctx.length + " index position " + sctx.streamIndexOffset + ": " +
(isCompressed ? "" : "un") + "compressed [" + cOffset + ", " + endCOffset + ")");
return cb;
private void releaseInitialRefcounts(DiskRangeList current) {
while (current != null) {
DiskRangeList toFree = current;
current =;
if (!(toFree instanceof CacheChunk)) continue;
CacheChunk cc = (CacheChunk)toFree;
if (cc.getBuffer() == null) continue;
MemoryBuffer buffer = cc.getBuffer();
public void setTracing(boolean isEnabled) {
this.isTracingEnabled = isEnabled;
public void close() throws IOException {
* Fake cache chunk used for uncompressed data. Used in preRead for uncompressed files.
* Makes assumptions about preRead code; for example, we add chunks here when they are
* already in the linked list, without unlinking. So, we record the start position in the
* original list, and then, when someone adds the next element, we merely increase the number
* of elements one has to traverse from that position to get the whole list.
private static class UncompressedCacheChunk extends CacheChunk {
private BufferChunk chunk;
private int count;
public UncompressedCacheChunk(BufferChunk bc) {
init(null, bc.getOffset(), bc.getEnd());
chunk = bc;
count = 1;
public void addChunk(BufferChunk bc) {
assert bc.getOffset() == this.getEnd();
this.end = bc.getEnd();
public BufferChunk getChunk() {
return chunk;
public int getCount() {
return count;
public void handleCacheCollision(DataCache cacheWrapper,
MemoryBuffer replacementBuffer, List<MemoryBuffer> cacheBuffers) {
assert cacheBuffers == null;
// This is done at pre-read stage where there's nothing special w/refcounts. Just release.
// Replace the buffer in our big range list, as well as in current results.
public void clear() {
this.chunk = null;
this.count = -1;
* CacheChunk that is pre-created for new cache data; initially, it contains an original disk
* buffer and an unallocated MemoryBuffer object. Before we expose it, the MB is allocated,
* the data is decompressed, and original compressed data is discarded. The chunk lives on in
* the DiskRange list created for the request, and everyone treats it like regular CacheChunk.
private static class ProcCacheChunk extends CacheChunk {
public void init(long cbStartOffset, long cbEndOffset, boolean isCompressed,
ByteBuffer originalData, MemoryBuffer targetBuffer, int originalCbIndex) {
super.init(targetBuffer, cbStartOffset, cbEndOffset);
this.isOriginalDataCompressed = isCompressed;
this.originalData = originalData;
this.originalCbIndex = originalCbIndex;
public void reset() {
this.originalData = null;
public String toString() {
return super.toString() + ", original is set " + (this.originalData != null)
+ ", buffer was replaced " + (originalCbIndex == -1);
public void handleCacheCollision(DataCache cacheWrapper, MemoryBuffer replacementBuffer,
List<MemoryBuffer> cacheBuffers) {
assert originalCbIndex >= 0;
// Had the put succeeded for our new buffer, it would have refcount of 2 - 1 from put,
// and 1 from notifyReused call above. "Old" buffer now has the 1 from put; new buffer
// is not in cache.
// Replace the buffer in our big range list, as well as in current results.
this.buffer = replacementBuffer;
cacheBuffers.set(originalCbIndex, replacementBuffer);
originalCbIndex = -1; // This can only happen once at decompress time.
/** Original data that will be turned into encoded cache data in this.buffer and reset. */
private ByteBuffer originalData = null;
/** Whether originalData is compressed. */
private boolean isOriginalDataCompressed;
/** Index of the MemoryBuffer corresponding to this object inside the result list. If we
* hit a cache collision, we will replace this memory buffer with the one from cache at
* this index, without having to look for it. */
private int originalCbIndex;
* Uncompresses part of the stream. RGs can overlap, so we cannot just go and decompress
* and remove what we have returned. We will keep iterator as a "hint" point.
* @param baseOffset Absolute offset of boundaries and ranges relative to file, for cache keys.
* @param start Ordered ranges containing file data. Helpful if they point close to cOffset.
* @param cOffset Start offset to decompress.
* @param endCOffset End offset to decompress; estimate, partial CBs will be ignored.
* @param csd Stream data, to add the results.
* @param unlockUntilCOffset The offset until which the buffers can be unlocked in cache, as
* they will not be used in future calls (see the class comment in
* EncodedReaderImpl about refcounts).
* @return Last buffer cached during decompression. Cache buffers are never removed from
* the master list, so they are safe to keep as iterators for various streams.
public DiskRangeList readEncodedStream(long baseOffset, DiskRangeList start, long cOffset,
long endCOffset, ColumnStreamData csd, long unlockUntilCOffset, long streamOffset)
throws IOException {
if (csd.getCacheBuffers() == null) {
csd.setCacheBuffers(new ArrayList<MemoryBuffer>());
} else {
if (cOffset == endCOffset) return null;
boolean isCompressed = codec != null;
List<ProcCacheChunk> toDecompress = null;
List<ByteBuffer> toRelease = null;
List<IncompleteCb> badEstimates = null;
if (isCompressed) {
toRelease = !dataReader.isTrackingDiskRanges() ? null : new ArrayList<ByteBuffer>();
toDecompress = new ArrayList<>();
badEstimates = new ArrayList<>();
// 1. Find our bearings in the stream. Normally, iter will already point either to where we
// want to be, or just before. However, RGs can overlap due to encoding, so we may have
// to return to a previous block.
DiskRangeList current = findExactPosition(start, cOffset);
if (isTracingEnabled) {
LOG.trace("Starting read for [" + cOffset + "," + endCOffset + ") at " + current);
CacheChunk lastUncompressed = null;
// 2. Go thru the blocks; add stuff to results and prepare the decompression work (see below).
lastUncompressed = isCompressed ?
prepareRangesForCompressedRead(cOffset, endCOffset, streamOffset,
unlockUntilCOffset, current, csd, toRelease, toDecompress, badEstimates)
: prepareRangesForUncompressedRead(
cOffset, endCOffset, streamOffset, unlockUntilCOffset, current, csd);
// 2.5. Remember the bad estimates for future reference.
if (badEstimates != null && !badEstimates.isEmpty()) {
// Relies on the fact that cache does not actually store these.
DiskRange[] cacheKeys = badEstimates.toArray(new DiskRange[badEstimates.size()]);
long[] result = cacheWrapper.putFileData(fileKey, cacheKeys, null, baseOffset);
assert result == null; // We don't expect conflicts from bad estimates.
if (toDecompress == null || toDecompress.isEmpty()) return lastUncompressed; // Nothing to do.
// 3. Allocate the buffers, prepare cache keys.
// At this point, we have read all the CBs we need to read. cacheBuffers contains some cache
// data and some unallocated membufs for decompression. toDecompress contains all the work we
// need to do, and each item points to one of the membufs in cacheBuffers as target. The iter
// has also been adjusted to point to these buffers instead of compressed data for the ranges.
MemoryBuffer[] targetBuffers = new MemoryBuffer[toDecompress.size()];
DiskRange[] cacheKeys = new DiskRange[toDecompress.size()];
int ix = 0;
for (ProcCacheChunk chunk : toDecompress) {
cacheKeys[ix] = chunk; // Relies on the fact that cache does not actually store these.
targetBuffers[ix] = chunk.getBuffer();
cacheWrapper.getAllocator().allocateMultiple(targetBuffers, bufferSize);
// 4. Now decompress (or copy) the data into cache buffers.
for (ProcCacheChunk chunk : toDecompress) {
ByteBuffer dest = chunk.getBuffer().getByteBufferRaw();
if (chunk.isOriginalDataCompressed) {
decompressChunk(chunk.originalData, codec, dest);
} else {
copyUncompressedChunk(chunk.originalData, dest);
chunk.originalData = null;
if (isTracingEnabled) {
LOG.trace("Locking " + chunk.getBuffer() + " due to reuse (after decompression)");
// 5. Release original compressed buffers to zero-copy reader if needed.
if (toRelease != null) {
assert dataReader.isTrackingDiskRanges();
for (ByteBuffer buffer : toRelease) {
// 6. Finally, put uncompressed data to cache.
if (fileKey != null) {
long[] collisionMask = cacheWrapper.putFileData(fileKey, cacheKeys, targetBuffers, baseOffset);
processCacheCollisions(collisionMask, toDecompress, targetBuffers, csd.getCacheBuffers());
// 7. It may happen that we know we won't use some compression buffers anymore.
// Release initial refcounts.
for (ProcCacheChunk chunk : toDecompress) {
ponderReleaseInitialRefcount(unlockUntilCOffset, streamOffset, chunk);
return lastUncompressed;
private CacheChunk prepareRangesForCompressedRead(long cOffset, long endCOffset,
long streamOffset, long unlockUntilCOffset, DiskRangeList current, ColumnStreamData columnStreamData,
List<ByteBuffer> toRelease, List<ProcCacheChunk> toDecompress,
List<IncompleteCb> badEstimates) throws IOException {
if (cOffset > current.getOffset()) {
// Target compression block is in the middle of the range; slice the range in two.
current = current.split(cOffset).next;
long currentOffset = cOffset;
CacheChunk lastUncompressed = null;
while (true) {
DiskRangeList next = null;
if (current instanceof CacheChunk) {
// 2a. This is a decoded compression buffer, add as is.
CacheChunk cc = (CacheChunk)current;
if (isTracingEnabled) {
LOG.trace("Locking " + cc.getBuffer() + " due to reuse");
currentOffset = cc.getEnd();
if (isTracingEnabled) {
LOG.trace("Adding an already-uncompressed buffer " + cc.getBuffer());
ponderReleaseInitialRefcount(unlockUntilCOffset, streamOffset, cc);
lastUncompressed = cc;
next =;
if (next != null && (endCOffset >= 0 && currentOffset < endCOffset)
&& next.getOffset() >= endCOffset) {
throw new IOException("Expected data at " + currentOffset + " (reading until "
+ endCOffset + "), but the next buffer starts at " + next.getOffset());
} else if (current instanceof IncompleteCb) {
// 2b. This is a known incomplete CB caused by ORC CB end boundaries being estimates.
if (isTracingEnabled) {
LOG.trace("Cannot read " + current);
next = null;
currentOffset = -1;
} else {
// 2c. This is a compressed buffer. We need to uncompress it; the buffer can comprise
// several disk ranges, so we might need to combine them.
BufferChunk bc = (BufferChunk)current;
ProcCacheChunk newCached = addOneCompressionBuffer(
bc, columnStreamData.getCacheBuffers(), toDecompress, toRelease, badEstimates);
lastUncompressed = (newCached == null) ? lastUncompressed : newCached;
next = (newCached != null) ? : null;
currentOffset = (next != null) ? next.getOffset() : -1;
if (next == null || (endCOffset >= 0 && currentOffset >= endCOffset)) {
current = next;
return lastUncompressed;
private CacheChunk prepareRangesForUncompressedRead(long cOffset, long endCOffset,
long streamOffset, long unlockUntilCOffset, DiskRangeList current, ColumnStreamData columnStreamData)
throws IOException {
long currentOffset = cOffset;
CacheChunk lastUncompressed = null;
boolean isFirst = true;
while (true) {
DiskRangeList next = null;
assert current instanceof CacheChunk;
lastUncompressed = (CacheChunk)current;
if (isTracingEnabled) {
LOG.trace("Locking " + lastUncompressed.getBuffer() + " due to reuse");
if (isFirst) {
columnStreamData.setIndexBaseOffset((int)(lastUncompressed.getOffset() - streamOffset));
isFirst = false;
currentOffset = lastUncompressed.getEnd();
if (isTracingEnabled) {
LOG.trace("Adding an uncompressed buffer " + lastUncompressed.getBuffer());
ponderReleaseInitialRefcount(unlockUntilCOffset, streamOffset, lastUncompressed);
next =;
if (next == null || (endCOffset >= 0 && currentOffset >= endCOffset)) {
current = next;
return lastUncompressed;
* To achieve some sort of consistent cache boundaries, we will cache streams deterministically;
* in segments starting w/stream start, and going for either stream size or some fixed size.
* If we are not reading the entire segment's worth of data, then we will not cache the partial
* RGs; the breakage of cache assumptions (no interleaving blocks, etc.) is way too much PITA
* to handle just for this case.
* We could avoid copy in non-zcr case and manage the buffer that was not allocated by our
* allocator. Uncompressed case is not mainline though so let's not complicate it.
private DiskRangeList preReadUncompressedStream(long baseOffset,
DiskRangeList start, long streamOffset, long streamEnd) throws IOException {
if (streamOffset == streamEnd) return null;
List<UncompressedCacheChunk> toCache = null;
List<ByteBuffer> toRelease = null;
// 1. Find our bearings in the stream.
DiskRangeList current = findIntersectingPosition(start, streamOffset, streamEnd);
if (isTracingEnabled) {
LOG.trace("Starting pre-read for [" + streamOffset + "," + streamEnd + ") at " + current);
if (streamOffset > current.getOffset()) {
// Target compression block is in the middle of the range; slice the range in two.
current = current.split(streamOffset).next;
// Account for maximum cache buffer size.
long streamLen = streamEnd - streamOffset;
int partSize = determineUncompressedPartSize(),
partCount = (int)(streamLen / partSize) + (((streamLen % partSize) != 0) ? 1 : 0);
CacheChunk lastUncompressed = null;
MemoryBuffer[] singleAlloc = new MemoryBuffer[1];
for (int i = 0; i < partCount; ++i) {
long partOffset = streamOffset + (i * partSize),
partEnd = Math.min(partOffset + partSize, streamEnd);
long hasEntirePartTo = partOffset; // We have 0 bytes of data for this part, for now.
if (current == null) {
break; // We have no data from this point on (could be unneeded), skip.
assert partOffset <= current.getOffset();
if (partOffset == current.getOffset() && current instanceof CacheChunk) {
// We assume cache chunks would always match the way we read, so check and skip it.
assert current.getOffset() == partOffset && current.getEnd() == partEnd;
lastUncompressed = (CacheChunk)current;
current =;
if (current.getOffset() >= partEnd) {
continue; // We have no data at all for this part of the stream (could be unneeded), skip.
if (toRelease == null && dataReader.isTrackingDiskRanges()) {
toRelease = new ArrayList<ByteBuffer>();
// We have some disk buffers... see if we have entire part, etc.
UncompressedCacheChunk candidateCached = null; // We will cache if we have the entire part.
DiskRangeList next = current;
while (true) {
boolean noMoreDataForPart = (next == null || next.getOffset() >= partEnd);
if (noMoreDataForPart && hasEntirePartTo < partEnd && candidateCached != null) {
// We are missing a section at the end of the part... copy the start to non-cached.
lastUncompressed = copyAndReplaceCandidateToNonCached(
candidateCached, partOffset, hasEntirePartTo, cacheWrapper, singleAlloc);
candidateCached = null;
current = next;
if (noMoreDataForPart) break; // Done with this part.
boolean wasSplit = false;
if (current.getEnd() > partEnd) {
// If the current buffer contains multiple parts, split it.
current = current.split(partEnd);
wasSplit = true;
if (isTracingEnabled) {
LOG.trace("Processing uncompressed file data at ["
+ current.getOffset() + ", " + current.getEnd() + ")");
BufferChunk curBc = (BufferChunk)current;
if (!wasSplit && toRelease != null) {
toRelease.add(curBc.getChunk()); // TODO: is it valid to give zcr the modified 2nd part?
// Track if we still have the entire part.
long hadEntirePartTo = hasEntirePartTo;
// We have data until the end of current block if we had it until the beginning.
hasEntirePartTo = (hasEntirePartTo == current.getOffset()) ? current.getEnd() : -1;
if (hasEntirePartTo == -1) {
// We don't have the entire part; copy both whatever we intended to cache, and the rest,
// to an allocated buffer. We could try to optimize a bit if we have contiguous buffers
// with gaps, but it's probably not needed.
if (candidateCached != null) {
assert hadEntirePartTo != -1;
candidateCached, partOffset, hadEntirePartTo, cacheWrapper, singleAlloc);
candidateCached = null;
lastUncompressed = copyAndReplaceUncompressedToNonCached(curBc, cacheWrapper, singleAlloc);
next =; // There may be more data after the gap.
} else {
// So far we have all the data from the beginning of the part.
if (candidateCached == null) {
candidateCached = new UncompressedCacheChunk(curBc);
} else {
next =;
if (candidateCached != null) {
if (toCache == null) {
toCache = new ArrayList<>(partCount - i);
// 3. Allocate the buffers, prepare cache keys.
if (toCache == null) return lastUncompressed; // Nothing to copy and cache.
MemoryBuffer[] targetBuffers =
toCache.size() == 1 ? singleAlloc : new MemoryBuffer[toCache.size()];
targetBuffers[0] = null;
DiskRange[] cacheKeys = new DiskRange[toCache.size()];
int ix = 0;
for (UncompressedCacheChunk chunk : toCache) {
cacheKeys[ix] = chunk; // Relies on the fact that cache does not actually store these.
targetBuffers, (int)(partCount == 1 ? streamLen : partSize));
// 4. Now copy the data into cache buffers.
ix = 0;
for (UncompressedCacheChunk candidateCached : toCache) {
ByteBuffer dest = candidateCached.getBuffer().getByteBufferRaw();
copyAndReplaceUncompressedChunks(candidateCached, dest, candidateCached);
lastUncompressed = candidateCached;
// 5. Release original compressed buffers to zero-copy reader if needed.
if (toRelease != null) {
assert dataReader.isTrackingDiskRanges();
for (ByteBuffer buf : toRelease) {
// 6. Finally, put uncompressed data to cache.
if (fileKey != null) {
long[] collisionMask = cacheWrapper.putFileData(fileKey, cacheKeys, targetBuffers, baseOffset);
processCacheCollisions(collisionMask, toCache, targetBuffers, null);
return lastUncompressed;
private int determineUncompressedPartSize() {
// We will break the uncompressed data in the cache in the chunks that are the size
// of the prevalent ORC compression buffer (the default), or maximum allocation (since we
// cannot allocate bigger chunks), whichever is less.
long orcCbSizeDefault = ((Number)OrcConf.BUFFER_SIZE.getDefaultValue()).longValue();
int maxAllocSize = cacheWrapper.getAllocator().getMaxAllocation();
return (int)Math.min(maxAllocSize, orcCbSizeDefault);
private static void copyUncompressedChunk(ByteBuffer src, ByteBuffer dest) {
int startPos = dest.position(), startLim = dest.limit();
dest.put(src); // Copy uncompressed data to cache.
// Put moves position forward by the size of the data.
int newPos = dest.position();
if (newPos > startLim) {
throw new AssertionError("After copying, buffer [" + startPos + ", " + startLim
+ ") became [" + newPos + ", " + dest.limit() + ")");
private static CacheChunk copyAndReplaceCandidateToNonCached(
UncompressedCacheChunk candidateCached, long partOffset,
long candidateEnd, DataCache cacheWrapper, MemoryBuffer[] singleAlloc) {
// We thought we had the entire part to cache, but we don't; convert start to
// non-cached. Since we are at the first gap, the previous stuff must be contiguous.
singleAlloc[0] = null;
cacheWrapper.getAllocator().allocateMultiple(singleAlloc, (int)(candidateEnd - partOffset));
MemoryBuffer buffer = singleAlloc[0];
ByteBuffer dest = buffer.getByteBufferRaw();
CacheChunk tcc = POOLS.tccPool.take();
tcc.init(buffer, partOffset, candidateEnd);
copyAndReplaceUncompressedChunks(candidateCached, dest, tcc);
return tcc;
private static CacheChunk copyAndReplaceUncompressedToNonCached(
BufferChunk bc, DataCache cacheWrapper, MemoryBuffer[] singleAlloc) {
singleAlloc[0] = null;
cacheWrapper.getAllocator().allocateMultiple(singleAlloc, bc.getLength());
MemoryBuffer buffer = singleAlloc[0];
ByteBuffer dest = buffer.getByteBufferRaw();
CacheChunk tcc = POOLS.tccPool.take();
tcc.init(buffer, bc.getOffset(), bc.getEnd());
copyUncompressedChunk(bc.getChunk(), dest);
return tcc;
private static void copyAndReplaceUncompressedChunks(
UncompressedCacheChunk candidateCached, ByteBuffer dest, CacheChunk tcc) {
int startPos = dest.position(), startLim = dest.limit();
DiskRangeList next = null;
for (int i = 0; i < candidateCached.getCount(); ++i) {
BufferChunk chunk = (i == 0) ? candidateCached.getChunk() : (BufferChunk)next;
next =;
if (i == 0) {
} else {
int newPos = dest.position();
if (newPos > startLim) {
throw new AssertionError("After copying, buffer [" + startPos + ", " + startLim
+ ") became [" + newPos + ", " + dest.limit() + ")");
private static void decompressChunk(
ByteBuffer src, CompressionCodec codec, ByteBuffer dest) throws IOException {
int startPos = dest.position(), startLim = dest.limit();
codec.decompress(src, dest);
// Codec resets the position to 0 and limit to correct limit.
int newLim = dest.limit();
if (newLim > startLim) {
throw new AssertionError("After codec, buffer [" + startPos + ", " + startLim
+ ") became [" + dest.position() + ", " + newLim + ")");
public static void releaseCacheChunksIntoObjectPool(DiskRangeList current) {
while (current != null) {
if (current instanceof ProcCacheChunk) {
} else if (current instanceof CacheChunk) {
current =;
private void ponderReleaseInitialRefcount(
long unlockUntilCOffset, long streamStartOffset, CacheChunk cc) {
// Don't release if the buffer contains any data beyond the acceptable boundary.
if (cc.getEnd() > unlockUntilCOffset) return;
assert cc.getBuffer() != null;
try {
releaseInitialRefcount(cc, false);
} catch (AssertionError e) {
LOG.error("BUG: releasing initial refcount; stream start " + streamStartOffset + ", "
+ "unlocking until " + unlockUntilCOffset + " from [" + cc + "]: " + e.getMessage());
throw e;
// Release all the previous buffers that we may not have been able to release due to reuse,
// as long as they are still in the same stream and are not already released.
DiskRangeList prev = cc.prev;
while (true) {
if ((prev == null) || (prev.getEnd() <= streamStartOffset)
|| !(prev instanceof CacheChunk)) break;
CacheChunk prevCc = (CacheChunk)prev;
if (prevCc.buffer == null) break;
try {
releaseInitialRefcount(prevCc, true);
} catch (AssertionError e) {
LOG.error("BUG: releasing initial refcount; stream start " + streamStartOffset + ", "
+ "unlocking until " + unlockUntilCOffset + " from [" + cc + "] and backtracked to ["
+ prevCc + "]: " + e.getMessage());
throw e;
prev = prev.prev;
private void releaseInitialRefcount(CacheChunk cc, boolean isBacktracking) {
// This is the last RG for which this buffer will be used. Remove the initial refcount
if (isTracingEnabled) {
LOG.trace("Unlocking " + cc.getBuffer() + " for the fetching thread"
+ (isBacktracking ? "; backtracking" : ""));
private void processCacheCollisions(long[] collisionMask,
List<? extends CacheChunk> toDecompress, MemoryBuffer[] targetBuffers,
List<MemoryBuffer> cacheBuffers) {
if (collisionMask == null) return;
assert collisionMask.length >= (toDecompress.size() >>> 6);
// There are some elements that were cached in parallel, take care of them.
long maskVal = -1;
for (int i = 0; i < toDecompress.size(); ++i) {
if ((i & 63) == 0) {
maskVal = collisionMask[i >>> 6];
if ((maskVal & 1) == 1) {
// Cache has found an old buffer for the key and put it into array instead of our new one.
CacheChunk replacedChunk = toDecompress.get(i);
MemoryBuffer replacementBuffer = targetBuffers[i];
if (isTracingEnabled) {
LOG.trace("Discarding data due to cache collision: " + replacedChunk.getBuffer()
+ " replaced with " + replacementBuffer);
assert replacedChunk.getBuffer() != replacementBuffer : i + " was not replaced in the results "
+ "even though mask is [" + Long.toBinaryString(maskVal) + "]";
replacedChunk.handleCacheCollision(cacheWrapper, replacementBuffer, cacheBuffers);
maskVal >>= 1;
/** Finds compressed offset in a stream and makes sure iter points to its position.
This may be necessary for obscure combinations of compression and encoding boundaries. */
private static DiskRangeList findExactPosition(DiskRangeList ranges, long offset) {
if (offset < 0) return ranges;
return findIntersectingPosition(ranges, offset, offset);
private static DiskRangeList findIntersectingPosition(DiskRangeList ranges, long offset, long end) {
if (offset < 0) return ranges;
// We expect the offset to be valid TODO: rather, validate
while (ranges.getEnd() <= offset) {
ranges =;
while (ranges.getOffset() > end) {
ranges = ranges.prev;
// We are now on some intersecting buffer, find the first intersecting buffer.
while (ranges.prev != null && ranges.prev.getEnd() > offset) {
ranges = ranges.prev;
return ranges;
* Reads one compression block from the source; handles compression blocks read from
* multiple ranges (usually, that would only happen with zcr).
* Adds stuff to cachedBuffers, toDecompress and toRelease (see below what each does).
* @param current BufferChunk where compression block starts.
* @param cacheBuffers The result buffer array to add pre-allocated target cache buffer.
* @param toDecompress The list of work to decompress - pairs of compressed buffers and the
* target buffers (same as the ones added to cacheBuffers).
* @param toRelease The list of buffers to release to zcr because they are no longer in use.
* @param badEstimates The list of bad estimates that cannot be decompressed.
* @return The resulting cache chunk.
private ProcCacheChunk addOneCompressionBuffer(BufferChunk current,
List<MemoryBuffer> cacheBuffers, List<ProcCacheChunk> toDecompress,
List<ByteBuffer> toRelease, List<IncompleteCb> badEstimates) throws IOException {
ByteBuffer slice = null;
ByteBuffer compressed = current.getChunk();
long cbStartOffset = current.getOffset();
int b0 = compressed.get() & 0xff;
int b1 = compressed.get() & 0xff;
int b2 = compressed.get() & 0xff;
int chunkLength = (b2 << 15) | (b1 << 7) | (b0 >> 1);
if (chunkLength > bufferSize) {
throw new IllegalArgumentException("Buffer size too small. size = " +
bufferSize + " needed = " + chunkLength);
int consumedLength = chunkLength + OutStream.HEADER_SIZE;
long cbEndOffset = cbStartOffset + consumedLength;
boolean isUncompressed = ((b0 & 0x01) == 1);
if (isTracingEnabled) {
LOG.trace("Found CB at " + cbStartOffset + ", chunk length " + chunkLength + ", total "
+ consumedLength + ", " + (isUncompressed ? "not " : "") + "compressed");
if (compressed.remaining() >= chunkLength) {
// Simple case - CB fits entirely in the disk range.
slice = compressed.slice();
ProcCacheChunk cc = addOneCompressionBlockByteBuffer(slice, isUncompressed,
cbStartOffset, cbEndOffset, chunkLength, current, toDecompress, cacheBuffers);
if (compressed.remaining() <= 0 && dataReader.isTrackingDiskRanges()) {
return cc;
if (current.getEnd() < cbEndOffset && !current.hasContiguousNext()) {
badEstimates.add(addIncompleteCompressionBuffer(cbStartOffset, current, 0));
return null; // This is impossible to read from this chunk.
// TODO: we could remove extra copy for isUncompressed case by copying directly to cache.
// We need to consolidate 2 or more buffers into one to decompress.
ByteBuffer copy = allocateBuffer(chunkLength, compressed.isDirect());
int remaining = chunkLength - compressed.remaining();
int originalPos = compressed.position();
if (isTracingEnabled) {
LOG.trace("Removing partial CB " + current + " from ranges after copying its contents");
DiskRangeList next =;
if (dataReader.isTrackingDiskRanges()) {
if (originalPos == 0) {
dataReader.releaseBuffer(compressed); // We copied the entire buffer.
} else {
toRelease.add(compressed); // There might be slices depending on this buffer.
int extraChunkCount = 0;
while (true) {
if (!(next instanceof BufferChunk)) {
throw new IOException("Trying to extend compressed block into uncompressed block " + next);
compressed = next.getData();
if (compressed.remaining() >= remaining) {
// This is the last range for this compression block. Yay!
slice = compressed.slice();
ProcCacheChunk cc = addOneCompressionBlockByteBuffer(copy, isUncompressed,
cbStartOffset, cbEndOffset, remaining, (BufferChunk)next, toDecompress, cacheBuffers);
if (compressed.remaining() <= 0 && dataReader.isTrackingDiskRanges()) {
dataReader.releaseBuffer(compressed); // We copied the entire buffer.
return cc;
remaining -= compressed.remaining();
if (dataReader.isTrackingDiskRanges()) {
dataReader.releaseBuffer(compressed); // We copied the entire buffer.
DiskRangeList tmp = next;
next = next.hasContiguousNext() ? : null;
if (next != null) {
if (isTracingEnabled) {
LOG.trace("Removing partial CB " + tmp + " from ranges after copying its contents");
} else {
badEstimates.add(addIncompleteCompressionBuffer(cbStartOffset, tmp, extraChunkCount));
return null; // This is impossible to read from this chunk.
private IncompleteCb addIncompleteCompressionBuffer(
long cbStartOffset, DiskRangeList target, int extraChunkCount) {
IncompleteCb icb = new IncompleteCb(cbStartOffset, target.getEnd());
if (isTracingEnabled) {
LOG.trace("Replacing " + target + " (and " + extraChunkCount + " previous chunks) with "
+ icb + " in the buffers");
return icb;
* Add one buffer with compressed data the results for addOneCompressionBuffer (see javadoc).
* @param fullCompressionBlock (fCB) Entire compression block, sliced or copied from disk data.
* @param isUncompressed Whether the data in the block is uncompressed.
* @param cbStartOffset Compressed start offset of the fCB.
* @param cbEndOffset Compressed end offset of the fCB.
* @param lastChunkLength The number of compressed bytes consumed from last *chunk* into fullCompressionBlock.
* @param lastChunk
* @param toDecompress See addOneCompressionBuffer.
* @param cacheBuffers See addOneCompressionBuffer.
* @return New cache buffer.
private ProcCacheChunk addOneCompressionBlockByteBuffer(ByteBuffer fullCompressionBlock,
boolean isUncompressed, long cbStartOffset, long cbEndOffset, int lastChunkLength,
BufferChunk lastChunk, List<ProcCacheChunk> toDecompress, List<MemoryBuffer> cacheBuffers) {
// Prepare future cache buffer.
MemoryBuffer futureAlloc = cacheWrapper.getAllocator().createUnallocated();
// Add it to result in order we are processing.
// Add it to the list of work to decompress.
ProcCacheChunk cc = POOLS.pccPool.take();
cc.init(cbStartOffset, cbEndOffset, !isUncompressed,
fullCompressionBlock, futureAlloc, cacheBuffers.size() - 1);
// Adjust the compression block position.
if (isTracingEnabled) {
LOG.trace("Adjusting " + lastChunk + " to consume " + lastChunkLength + " compressed bytes");
lastChunk.getChunk().position(lastChunk.getChunk().position() + lastChunkLength);
// Finally, put it in the ranges list for future use (if shared between RGs).
// Before anyone else accesses it, it would have been allocated and decompressed locally.
if (lastChunk.getChunk().remaining() <= 0) {
if (isTracingEnabled) {
LOG.trace("Replacing " + lastChunk + " with " + cc + " in the buffers");
} else {
if (isTracingEnabled) {
LOG.trace("Adding " + cc + " before " + lastChunk + " in the buffers");
return cc;
private static ByteBuffer allocateBuffer(int size, boolean isDirect) {
return isDirect ? ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size);
private static Pools createPools(PoolFactory pf) {
Pools pools = new Pools();
pools.pccPool = pf.createPool(1024, new PoolObjectHelper<ProcCacheChunk>() {
public ProcCacheChunk create() {
return new ProcCacheChunk();
public void resetBeforeOffer(ProcCacheChunk t) {
pools.tccPool = pf.createPool(1024, new PoolObjectHelper<CacheChunk>() {
public CacheChunk create() {
return new CacheChunk();
public void resetBeforeOffer(CacheChunk t) {
pools.ecbPool = pf.createEncodedColumnBatchPool();
pools.csdPool = pf.createColumnStreamDataPool();
return pools;
/** Pool factory that is used if another one isn't specified - just creates the objects. */
private static class NoopPoolFactory implements PoolFactory {
public <T> Pool<T> createPool(final int size, final PoolObjectHelper<T> helper) {
return new Pool<T>() {
public void offer(T t) {
public int size() {
return size;
public T take() {
return helper.create();
public Pool<OrcEncodedColumnBatch> createEncodedColumnBatchPool() {
return createPool(0, new PoolObjectHelper<OrcEncodedColumnBatch>() {
public OrcEncodedColumnBatch create() {
return new OrcEncodedColumnBatch();
public void resetBeforeOffer(OrcEncodedColumnBatch t) {
public Pool<ColumnStreamData> createColumnStreamDataPool() {
return createPool(0, new PoolObjectHelper<ColumnStreamData>() {
public ColumnStreamData create() {
return new ColumnStreamData();
public void resetBeforeOffer(ColumnStreamData t) {