blob: f35dd4fe5d124f88954c0c1c02ba702b5fde6f75 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "Adaptor.hh"
#include "BpackingDefault.hh"
#if defined(ORC_HAVE_RUNTIME_AVX512)
#include "BpackingAvx512.hh"
#endif
#include "Compression.hh"
#include "Dispatch.hh"
#include "RLEV2Util.hh"
#include "RLEv2.hh"
#include "Utils.hh"
namespace orc {
unsigned char RleDecoderV2::readByte() {
SCOPED_MINUS_STOPWATCH(metrics, DecodingLatencyUs);
if (bufferStart_ == bufferEnd_) {
int bufferLength;
const void* bufferPointer;
if (!inputStream_->Next(&bufferPointer, &bufferLength)) {
throw ParseError("bad read in RleDecoderV2::readByte");
}
bufferStart_ = const_cast<char*>(static_cast<const char*>(bufferPointer));
bufferEnd_ = bufferStart_ + bufferLength;
}
unsigned char result = static_cast<unsigned char>(*bufferStart_++);
return result;
}
int64_t RleDecoderV2::readLongBE(uint64_t bsz) {
int64_t ret = 0, val;
uint64_t n = bsz;
while (n > 0) {
n--;
val = readByte();
ret |= (val << (n * 8));
}
return ret;
}
inline int64_t RleDecoderV2::readVslong() {
return unZigZag(readVulong());
}
uint64_t RleDecoderV2::readVulong() {
uint64_t ret = 0, b;
uint64_t offset = 0;
do {
b = readByte();
ret |= (0x7f & b) << offset;
offset += 7;
} while (b >= 0x80);
return ret;
}
struct UnpackDynamicFunction {
using FunctionType = decltype(&BitUnpack::readLongs);
static std::vector<std::pair<DispatchLevel, FunctionType>> implementations() {
#if defined(ORC_HAVE_RUNTIME_AVX512)
return {{DispatchLevel::NONE, BitUnpackDefault::readLongs},
{DispatchLevel::AVX512, BitUnpackAVX512::readLongs}};
#else
return {{DispatchLevel::NONE, BitUnpackDefault::readLongs}};
#endif
}
};
void RleDecoderV2::readLongs(int64_t* data, uint64_t offset, uint64_t len, uint64_t fbs) {
static DynamicDispatch<UnpackDynamicFunction> dispatch;
return dispatch.func(this, data, offset, len, fbs);
}
RleDecoderV2::RleDecoderV2(std::unique_ptr<SeekableInputStream> input, bool isSigned,
MemoryPool& pool, ReaderMetrics* metrics)
: RleDecoder(metrics),
inputStream_(std::move(input)),
isSigned_(isSigned),
firstByte_(0),
bufferStart_(nullptr),
bufferEnd_(bufferStart_),
runLength_(0),
runRead_(0),
bitsLeft_(0),
curByte_(0),
unpackedPatch_(pool, 0),
literals_(pool, MAX_LITERAL_SIZE) {
// PASS
}
void RleDecoderV2::seek(PositionProvider& location) {
// move the input stream
inputStream_->seek(location);
// clear state
bufferEnd_ = bufferStart_ = nullptr;
runRead_ = runLength_ = 0;
// skip ahead the given number of records
skip(location.next());
}
void RleDecoderV2::skip(uint64_t numValues) {
// simple for now, until perf tests indicate something encoding specific is
// needed
const uint64_t N = 64;
int64_t dummy[N];
while (numValues) {
uint64_t nRead = std::min(N, numValues);
next(dummy, nRead, nullptr);
numValues -= nRead;
}
}
template <typename T>
void RleDecoderV2::next(T* const data, const uint64_t numValues, const char* const notNull) {
SCOPED_STOPWATCH(metrics, DecodingLatencyUs, DecodingCall);
uint64_t nRead = 0;
while (nRead < numValues) {
// Skip any nulls before attempting to read first byte.
while (notNull && !notNull[nRead]) {
if (++nRead == numValues) {
return; // ended with null values
}
}
if (runRead_ == runLength_) {
resetRun();
firstByte_ = readByte();
}
uint64_t offset = nRead, length = numValues - nRead;
EncodingType enc = static_cast<EncodingType>((firstByte_ >> 6) & 0x03);
switch (static_cast<int64_t>(enc)) {
case SHORT_REPEAT:
nRead += nextShortRepeats(data, offset, length, notNull);
break;
case DIRECT:
nRead += nextDirect(data, offset, length, notNull);
break;
case PATCHED_BASE:
nRead += nextPatched(data, offset, length, notNull);
break;
case DELTA:
nRead += nextDelta(data, offset, length, notNull);
break;
default:
throw ParseError("unknown encoding");
}
}
}
void RleDecoderV2::next(int64_t* data, uint64_t numValues, const char* notNull) {
next<int64_t>(data, numValues, notNull);
}
void RleDecoderV2::next(int32_t* data, uint64_t numValues, const char* notNull) {
next<int32_t>(data, numValues, notNull);
}
void RleDecoderV2::next(int16_t* data, uint64_t numValues, const char* notNull) {
next<int16_t>(data, numValues, notNull);
}
template <typename T>
uint64_t RleDecoderV2::nextShortRepeats(T* const data, uint64_t offset, uint64_t numValues,
const char* const notNull) {
if (runRead_ == runLength_) {
// extract the number of fixed bytes
uint64_t byteSize = (firstByte_ >> 3) & 0x07;
byteSize += 1;
runLength_ = firstByte_ & 0x07;
// run lengths values are stored only after MIN_REPEAT value is met
runLength_ += MIN_REPEAT;
runRead_ = 0;
// read the repeated value which is store using fixed bytes
literals_[0] = readLongBE(byteSize);
if (isSigned_) {
literals_[0] = unZigZag(static_cast<uint64_t>(literals_[0]));
}
}
uint64_t nRead = std::min(runLength_ - runRead_, numValues);
if (notNull) {
for (uint64_t pos = offset; pos < offset + nRead; ++pos) {
if (notNull[pos]) {
data[pos] = static_cast<T>(literals_[0]);
++runRead_;
}
}
} else {
for (uint64_t pos = offset; pos < offset + nRead; ++pos) {
data[pos] = static_cast<T>(literals_[0]);
++runRead_;
}
}
return nRead;
}
template <typename T>
uint64_t RleDecoderV2::nextDirect(T* const data, uint64_t offset, uint64_t numValues,
const char* const notNull) {
if (runRead_ == runLength_) {
// extract the number of fixed bits
unsigned char fbo = (firstByte_ >> 1) & 0x1f;
uint32_t bitSize = decodeBitWidth(fbo);
// extract the run length
runLength_ = static_cast<uint64_t>(firstByte_ & 0x01) << 8;
runLength_ |= readByte();
// runs are one off
runLength_ += 1;
runRead_ = 0;
readLongs(literals_.data(), 0, runLength_, bitSize);
if (isSigned_) {
for (uint64_t i = 0; i < runLength_; ++i) {
literals_[i] = unZigZag(static_cast<uint64_t>(literals_[i]));
}
}
}
return copyDataFromBuffer(data, offset, numValues, notNull);
}
void RleDecoderV2::adjustGapAndPatch(uint32_t patchBitSize, int64_t patchMask, int64_t* resGap,
int64_t* resPatch, uint64_t* patchIdx) {
uint64_t idx = *patchIdx;
uint64_t gap = static_cast<uint64_t>(unpackedPatch_[idx]) >> patchBitSize;
int64_t patch = unpackedPatch_[idx] & patchMask;
int64_t actualGap = 0;
// special case: gap is >255 then patch value will be 0.
// if gap is <=255 then patch value cannot be 0
while (gap == 255 && patch == 0) {
actualGap += 255;
++idx;
gap = static_cast<uint64_t>(unpackedPatch_[idx]) >> patchBitSize;
patch = unpackedPatch_[idx] & patchMask;
}
// add the left over gap
actualGap += gap;
*resGap = actualGap;
*resPatch = patch;
*patchIdx = idx;
}
template <typename T>
uint64_t RleDecoderV2::nextPatched(T* const data, uint64_t offset, uint64_t numValues,
const char* const notNull) {
if (runRead_ == runLength_) {
// extract the number of fixed bits
unsigned char fbo = (firstByte_ >> 1) & 0x1f;
uint32_t bitSize = decodeBitWidth(fbo);
// extract the run length
runLength_ = static_cast<uint64_t>(firstByte_ & 0x01) << 8;
runLength_ |= readByte();
// runs are one off
runLength_ += 1;
runRead_ = 0;
// extract the number of bytes occupied by base
uint64_t thirdByte = readByte();
uint64_t byteSize = (thirdByte >> 5) & 0x07;
// base width is one off
byteSize += 1;
// extract patch width
uint32_t pwo = thirdByte & 0x1f;
uint32_t patchBitSize = decodeBitWidth(pwo);
// read fourth byte and extract patch gap width
uint64_t fourthByte = readByte();
uint32_t pgw = (fourthByte >> 5) & 0x07;
// patch gap width is one off
pgw += 1;
// extract the length of the patch list
size_t pl = fourthByte & 0x1f;
if (pl == 0) {
throw ParseError("Corrupt PATCHED_BASE encoded data (pl==0)!");
}
// read the next base width number of bytes to extract base value
int64_t base = readLongBE(byteSize);
int64_t mask = (static_cast<int64_t>(1) << ((byteSize * 8) - 1));
// if mask of base value is 1 then base is negative value else positive
if ((base & mask) != 0) {
base = base & ~mask;
base = -base;
}
readLongs(literals_.data(), 0, runLength_, bitSize);
// any remaining bits are thrown out
resetReadLongs();
// TODO: something more efficient than resize
unpackedPatch_.resize(pl);
// TODO: Skip corrupt?
// if ((patchBitSize + pgw) > 64 && !skipCorrupt) {
if ((patchBitSize + pgw) > 64) {
throw ParseError(
"Corrupt PATCHED_BASE encoded data "
"(patchBitSize + pgw > 64)!");
}
uint32_t cfb = getClosestFixedBits(patchBitSize + pgw);
readLongs(unpackedPatch_.data(), 0, pl, cfb);
// any remaining bits are thrown out
resetReadLongs();
// apply the patch directly when decoding the packed data
int64_t patchMask = ((static_cast<int64_t>(1) << patchBitSize) - 1);
int64_t gap = 0;
int64_t patch = 0;
uint64_t patchIdx = 0;
adjustGapAndPatch(patchBitSize, patchMask, &gap, &patch, &patchIdx);
for (uint64_t i = 0; i < runLength_; ++i) {
if (static_cast<int64_t>(i) != gap) {
// no patching required. add base to unpacked value to get final value
literals_[i] += base;
} else {
// extract the patch value
int64_t patchedVal = literals_[i] | (patch << bitSize);
// add base to patched value
literals_[i] = base + patchedVal;
// increment the patch to point to next entry in patch list
++patchIdx;
if (patchIdx < unpackedPatch_.size()) {
adjustGapAndPatch(patchBitSize, patchMask, &gap, &patch, &patchIdx);
// next gap is relative to the current gap
gap += i;
}
}
}
}
return copyDataFromBuffer(data, offset, numValues, notNull);
}
template <typename T>
uint64_t RleDecoderV2::nextDelta(T* const data, uint64_t offset, uint64_t numValues,
const char* const notNull) {
if (runRead_ == runLength_) {
// extract the number of fixed bits
unsigned char fbo = (firstByte_ >> 1) & 0x1f;
uint32_t bitSize;
if (fbo != 0) {
bitSize = decodeBitWidth(fbo);
} else {
bitSize = 0;
}
// extract the run length
runLength_ = static_cast<uint64_t>(firstByte_ & 0x01) << 8;
runLength_ |= readByte();
++runLength_; // account for first value
runRead_ = 0;
int64_t prevValue;
// read the first value stored as vint
if (isSigned_) {
prevValue = readVslong();
} else {
prevValue = static_cast<int64_t>(readVulong());
}
literals_[0] = prevValue;
// read the fixed delta value stored as vint (deltas can be negative even
// if all number are positive)
int64_t deltaBase = readVslong();
if (bitSize == 0) {
// add fixed deltas to adjacent values
for (uint64_t i = 1; i < runLength_; ++i) {
literals_[i] = literals_[i - 1] + deltaBase;
}
} else {
prevValue = literals_[1] = prevValue + deltaBase;
if (runLength_ < 2) {
std::stringstream ss;
ss << "Illegal run length for delta encoding: " << runLength_;
throw ParseError(ss.str());
}
// write the unpacked values, add it to previous value and store final
// value to result buffer. if the delta base value is negative then it
// is a decreasing sequence else an increasing sequence.
// read deltas using the literals buffer.
readLongs(literals_.data(), 2, runLength_ - 2, bitSize);
if (deltaBase < 0) {
for (uint64_t i = 2; i < runLength_; ++i) {
prevValue = literals_[i] = prevValue - literals_[i];
}
} else {
for (uint64_t i = 2; i < runLength_; ++i) {
prevValue = literals_[i] = prevValue + literals_[i];
}
}
}
}
return copyDataFromBuffer(data, offset, numValues, notNull);
}
template <typename T>
uint64_t RleDecoderV2::copyDataFromBuffer(T* data, uint64_t offset, uint64_t numValues,
const char* notNull) {
uint64_t nRead = std::min(runLength_ - runRead_, numValues);
if (notNull) {
for (uint64_t i = offset; i < (offset + nRead); ++i) {
if (notNull[i]) {
data[i] = static_cast<T>(literals_[runRead_++]);
}
}
} else {
for (uint64_t i = offset; i < (offset + nRead); ++i) {
data[i] = static_cast<T>(literals_[runRead_++]);
}
}
return nRead;
}
} // namespace orc