blob: c5c6f6a801701ff63378ca484eaa8681038e0282 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "Adaptor.hh"
#include "Compression.hh"
#include "RLEv2.hh"
#include "RLEV2Util.hh"
namespace orc {
int64_t RleDecoderV2::readLongBE(uint64_t bsz) {
int64_t ret = 0, val;
uint64_t n = bsz;
while (n > 0) {
n--;
val = readByte();
ret |= (val << (n * 8));
}
return ret;
}
inline int64_t RleDecoderV2::readVslong() {
return unZigZag(readVulong());
}
uint64_t RleDecoderV2::readVulong() {
uint64_t ret = 0, b;
uint64_t offset = 0;
do {
b = readByte();
ret |= (0x7f & b) << offset;
offset += 7;
} while (b >= 0x80);
return ret;
}
RleDecoderV2::RleDecoderV2(std::unique_ptr<SeekableInputStream> input,
bool _isSigned, MemoryPool& pool
): inputStream(std::move(input)),
isSigned(_isSigned),
firstByte(0),
runLength(0),
runRead(0),
bufferStart(nullptr),
bufferEnd(bufferStart),
deltaBase(0),
byteSize(0),
firstValue(0),
prevValue(0),
bitSize(0),
bitsLeft(0),
curByte(0),
patchBitSize(0),
unpackedIdx(0),
patchIdx(0),
base(0),
curGap(0),
curPatch(0),
patchMask(0),
actualGap(0),
unpacked(pool, 0),
unpackedPatch(pool, 0) {
// PASS
}
void RleDecoderV2::seek(PositionProvider& location) {
// move the input stream
inputStream->seek(location);
// clear state
bufferEnd = bufferStart = nullptr;
runRead = runLength = 0;
// skip ahead the given number of records
skip(location.next());
}
void RleDecoderV2::skip(uint64_t numValues) {
// simple for now, until perf tests indicate something encoding specific is
// needed
const uint64_t N = 64;
int64_t dummy[N];
while (numValues) {
uint64_t nRead = std::min(N, numValues);
next(dummy, nRead, nullptr);
numValues -= nRead;
}
}
void RleDecoderV2::next(int64_t* const data,
const uint64_t numValues,
const char* const notNull) {
uint64_t nRead = 0;
while (nRead < numValues) {
// Skip any nulls before attempting to read first byte.
while (notNull && !notNull[nRead]) {
if (++nRead == numValues) {
return; // ended with null values
}
}
if (runRead == runLength) {
resetRun();
firstByte = readByte();
}
uint64_t offset = nRead, length = numValues - nRead;
EncodingType enc = static_cast<EncodingType>
((firstByte >> 6) & 0x03);
switch(static_cast<int64_t>(enc)) {
case SHORT_REPEAT:
nRead += nextShortRepeats(data, offset, length, notNull);
break;
case DIRECT:
nRead += nextDirect(data, offset, length, notNull);
break;
case PATCHED_BASE:
nRead += nextPatched(data, offset, length, notNull);
break;
case DELTA:
nRead += nextDelta(data, offset, length, notNull);
break;
default:
throw ParseError("unknown encoding");
}
}
}
uint64_t RleDecoderV2::nextShortRepeats(int64_t* const data,
uint64_t offset,
uint64_t numValues,
const char* const notNull) {
if (runRead == runLength) {
// extract the number of fixed bytes
byteSize = (firstByte >> 3) & 0x07;
byteSize += 1;
runLength = firstByte & 0x07;
// run lengths values are stored only after MIN_REPEAT value is met
runLength += MIN_REPEAT;
runRead = 0;
// read the repeated value which is store using fixed bytes
firstValue = readLongBE(byteSize);
if (isSigned) {
firstValue = unZigZag(static_cast<uint64_t>(firstValue));
}
}
uint64_t nRead = std::min(runLength - runRead, numValues);
if (notNull) {
for(uint64_t pos = offset; pos < offset + nRead; ++pos) {
if (notNull[pos]) {
data[pos] = firstValue;
++runRead;
}
}
} else {
for(uint64_t pos = offset; pos < offset + nRead; ++pos) {
data[pos] = firstValue;
++runRead;
}
}
return nRead;
}
uint64_t RleDecoderV2::nextDirect(int64_t* const data,
uint64_t offset,
uint64_t numValues,
const char* const notNull) {
if (runRead == runLength) {
// extract the number of fixed bits
unsigned char fbo = (firstByte >> 1) & 0x1f;
bitSize = decodeBitWidth(fbo);
// extract the run length
runLength = static_cast<uint64_t>(firstByte & 0x01) << 8;
runLength |= readByte();
// runs are one off
runLength += 1;
runRead = 0;
}
uint64_t nRead = std::min(runLength - runRead, numValues);
runRead += readLongs(data, offset, nRead, bitSize, notNull);
if (isSigned) {
if (notNull) {
for (uint64_t pos = offset; pos < offset + nRead; ++pos) {
if (notNull[pos]) {
data[pos] = unZigZag(static_cast<uint64_t>(data[pos]));
}
}
} else {
for (uint64_t pos = offset; pos < offset + nRead; ++pos) {
data[pos] = unZigZag(static_cast<uint64_t>(data[pos]));
}
}
}
return nRead;
}
uint64_t RleDecoderV2::nextPatched(int64_t* const data,
uint64_t offset,
uint64_t numValues,
const char* const notNull) {
if (runRead == runLength) {
// extract the number of fixed bits
unsigned char fbo = (firstByte >> 1) & 0x1f;
bitSize = decodeBitWidth(fbo);
// extract the run length
runLength = static_cast<uint64_t>(firstByte & 0x01) << 8;
runLength |= readByte();
// runs are one off
runLength += 1;
runRead = 0;
// extract the number of bytes occupied by base
uint64_t thirdByte = readByte();
byteSize = (thirdByte >> 5) & 0x07;
// base width is one off
byteSize += 1;
// extract patch width
uint32_t pwo = thirdByte & 0x1f;
patchBitSize = decodeBitWidth(pwo);
// read fourth byte and extract patch gap width
uint64_t fourthByte = readByte();
uint32_t pgw = (fourthByte >> 5) & 0x07;
// patch gap width is one off
pgw += 1;
// extract the length of the patch list
size_t pl = fourthByte & 0x1f;
if (pl == 0) {
throw ParseError("Corrupt PATCHED_BASE encoded data (pl==0)!");
}
// read the next base width number of bytes to extract base value
base = readLongBE(byteSize);
int64_t mask = (static_cast<int64_t>(1) << ((byteSize * 8) - 1));
// if mask of base value is 1 then base is negative value else positive
if ((base & mask) != 0) {
base = base & ~mask;
base = -base;
}
// TODO: something more efficient than resize
unpacked.resize(runLength);
unpackedIdx = 0;
readLongs(unpacked.data(), 0, runLength, bitSize);
// any remaining bits are thrown out
resetReadLongs();
// TODO: something more efficient than resize
unpackedPatch.resize(pl);
patchIdx = 0;
// TODO: Skip corrupt?
// if ((patchBitSize + pgw) > 64 && !skipCorrupt) {
if ((patchBitSize + pgw) > 64) {
throw ParseError("Corrupt PATCHED_BASE encoded data "
"(patchBitSize + pgw > 64)!");
}
uint32_t cfb = getClosestFixedBits(patchBitSize + pgw);
readLongs(unpackedPatch.data(), 0, pl, cfb);
// any remaining bits are thrown out
resetReadLongs();
// apply the patch directly when decoding the packed data
patchMask = ((static_cast<int64_t>(1) << patchBitSize) - 1);
adjustGapAndPatch();
}
uint64_t nRead = std::min(runLength - runRead, numValues);
for(uint64_t pos = offset; pos < offset + nRead; ++pos) {
// skip null positions
if (notNull && !notNull[pos]) {
continue;
}
if (static_cast<int64_t>(unpackedIdx) != actualGap) {
// no patching required. add base to unpacked value to get final value
data[pos] = base + unpacked[unpackedIdx];
} else {
// extract the patch value
int64_t patchedVal = unpacked[unpackedIdx] | (curPatch << bitSize);
// add base to patched value
data[pos] = base + patchedVal;
// increment the patch to point to next entry in patch list
++patchIdx;
if (patchIdx < unpackedPatch.size()) {
adjustGapAndPatch();
// next gap is relative to the current gap
actualGap += unpackedIdx;
}
}
++runRead;
++unpackedIdx;
}
return nRead;
}
uint64_t RleDecoderV2::nextDelta(int64_t* const data,
uint64_t offset,
uint64_t numValues,
const char* const notNull) {
if (runRead == runLength) {
// extract the number of fixed bits
unsigned char fbo = (firstByte >> 1) & 0x1f;
if (fbo != 0) {
bitSize = decodeBitWidth(fbo);
} else {
bitSize = 0;
}
// extract the run length
runLength = static_cast<uint64_t>(firstByte & 0x01) << 8;
runLength |= readByte();
++runLength; // account for first value
runRead = deltaBase = 0;
// read the first value stored as vint
if (isSigned) {
firstValue = static_cast<int64_t>(readVslong());
} else {
firstValue = static_cast<int64_t>(readVulong());
}
prevValue = firstValue;
// read the fixed delta value stored as vint (deltas can be negative even
// if all number are positive)
deltaBase = static_cast<int64_t>(readVslong());
}
uint64_t nRead = std::min(runLength - runRead, numValues);
uint64_t pos = offset;
for ( ; pos < offset + nRead; ++pos) {
// skip null positions
if (!notNull || notNull[pos]) break;
}
if (runRead == 0 && pos < offset + nRead) {
data[pos++] = firstValue;
++runRead;
}
if (bitSize == 0) {
// add fixed deltas to adjacent values
for ( ; pos < offset + nRead; ++pos) {
// skip null positions
if (notNull && !notNull[pos]) {
continue;
}
prevValue = data[pos] = prevValue + deltaBase;
++runRead;
}
} else {
for ( ; pos < offset + nRead; ++pos) {
// skip null positions
if (!notNull || notNull[pos]) break;
}
if (runRead < 2 && pos < offset + nRead) {
// add delta base and first value
prevValue = data[pos++] = firstValue + deltaBase;
++runRead;
}
// write the unpacked values, add it to previous value and store final
// value to result buffer. if the delta base value is negative then it
// is a decreasing sequence else an increasing sequence
uint64_t remaining = (offset + nRead) - pos;
runRead += readLongs(data, pos, remaining, bitSize, notNull);
if (deltaBase < 0) {
for ( ; pos < offset + nRead; ++pos) {
// skip null positions
if (notNull && !notNull[pos]) {
continue;
}
prevValue = data[pos] = prevValue - data[pos];
}
} else {
for ( ; pos < offset + nRead; ++pos) {
// skip null positions
if (notNull && !notNull[pos]) {
continue;
}
prevValue = data[pos] = prevValue + data[pos];
}
}
}
return nRead;
}
} // namespace orc