blob: 41de9a5d585bf8150fa8a5a3ad8f26b2309333ee [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef STORAGE_SRC_STORAGE_FORMAT_ORC_RLE_V1_H_
#define STORAGE_SRC_STORAGE_FORMAT_ORC_RLE_V1_H_
#include <algorithm>
#include <memory>
#include <vector>
#include "storage/format/orc/rle.h"
namespace orc {
const uint64_t MINIMUM_REPEAT = 3;
const uint64_t BASE_128_MASK = 0x7f;
template <class IntType, class UIntType>
class RleDecoderV1 : public RleDecoder {
public:
RleDecoderV1(std::unique_ptr<SeekableInputStream> input, bool hasSigned)
: inputStream(std::move(input)), isSigned(hasSigned) {}
// Seek to a particular spot.
void seek(PositionProvider& location) override {
// move the input stream
inputStream->seek(location);
// force a re-read from the stream
bufferEnd = bufferStart;
// read a new header
readHeader();
// skip ahead the given number of records
skip(location.next());
}
// Seek over a given number of values.
void skip(uint64_t numValues) override {
while (numValues > 0) {
if (remainingValues == 0) {
readHeader();
}
uint64_t count = std::min(numValues, remainingValues);
remainingValues -= count;
numValues -= count;
if (repeating) {
value += delta * static_cast<int64_t>(count);
} else {
skipLongs(count);
}
}
}
// Read a number of values into the batch.
void next(void* data, uint64_t numValues, const char* notNull) override {
uint64_t position = 0;
IntType* dat = reinterpret_cast<IntType*>(data);
// skipNulls()
if (notNull) {
// Skip over null values.
while (position < numValues && !notNull[position]) {
++position;
}
}
while (position < numValues) {
// If we are out of values, read more.
if (remainingValues == 0) {
readHeader();
}
// How many do we read out of this block?
uint64_t count = std::min(numValues - position, remainingValues);
uint64_t consumed = 0;
if (repeating) {
if (notNull) {
for (uint64_t i = 0; i < count; ++i) {
if (notNull[position + i]) {
dat[position + i] = static_cast<IntType>(
value + static_cast<int64_t>(consumed) * delta);
consumed += 1;
}
}
} else {
for (uint64_t i = 0; i < count; ++i) {
dat[position + i] =
static_cast<IntType>(value + static_cast<int64_t>(i) * delta);
}
consumed = count;
}
value += static_cast<int64_t>(consumed) * delta;
} else {
if (notNull) {
for (uint64_t i = 0; i < count; ++i) {
if (notNull[position + i]) {
dat[position + i] =
isSigned ? static_cast<IntType>(unZigZag(readLong()))
: static_cast<IntType>(readLong());
++consumed;
}
}
} else {
if (isSigned) {
for (uint64_t i = 0; i < count; ++i) {
dat[position + i] = static_cast<IntType>(unZigZag(readLong()));
}
} else {
for (uint64_t i = 0; i < count; ++i) {
dat[position + i] = static_cast<IntType>(readLong());
}
}
consumed = count;
}
}
remainingValues -= consumed;
position += count;
// skipNulls()
if (notNull) {
// Skip over null values.
while (position < numValues && !notNull[position]) {
++position;
}
}
}
}
private:
signed char readByte() {
if (bufferStart == bufferEnd) {
int32_t bufferLength;
const void* bufferPointer;
if (!inputStream->Next(&bufferPointer, &bufferLength)) {
LOG_ERROR(ERRCODE_INTERNAL_ERROR, "bad read in readByte");
}
bufferStart = static_cast<const char*>(bufferPointer);
bufferEnd = bufferStart + bufferLength;
}
return *(bufferStart++);
}
void readHeader() {
signed char ch = readByte();
if (ch < 0) {
remainingValues = static_cast<uint64_t>(-ch);
repeating = false;
} else {
remainingValues = static_cast<uint64_t>(ch) + MINIMUM_REPEAT;
repeating = true;
delta = readByte();
value =
isSigned ? unZigZag(readLong()) : static_cast<int64_t>(readLong());
}
}
uint64_t readLong() {
uint64_t result = 0;
int64_t offset = 0;
signed char ch = readByte();
if (ch >= 0) {
result = static_cast<uint64_t>(ch);
} else {
result = static_cast<uint64_t>(ch) & BASE_128_MASK;
while ((ch = readByte()) < 0) {
offset += 7;
result |= (static_cast<uint64_t>(ch) & BASE_128_MASK) << offset;
}
result |= static_cast<uint64_t>(ch) << (offset + 7);
}
return result;
}
void skipLongs(uint64_t numValues) {
while (numValues > 0) {
if (readByte() >= 0) {
--numValues;
}
}
}
const std::unique_ptr<SeekableInputStream> inputStream;
const bool isSigned;
uint64_t remainingValues = 0;
int64_t value = 0;
const char* bufferStart = nullptr;
const char* bufferEnd = nullptr;
int64_t delta = 0;
bool repeating = false;
};
// Integer Run Length Encoding version 1
// Source link:
// https://cwiki.apache.org/confluence/display/Hive/LanguageManual+ORC#LanguageManualORC-IntegerRunLengthEncodingversion1
// //NOLINT
//
// In Hive 0.11 ORC files used Run Length Encoding version 1 (RLEv1),
// which provides a lightweight compression of signed or unsigned integer
// sequences. RLEv1 has two sub-encodings:
// 1) Run - a sequence of values that differ by a small fixed delta
// 2) Literals - a sequence of varint encoded values
//
// Runs start with an initial byte of 0x00 to 0x7f, which encodes the
// length of the run - 3. A second byte provides the fixed delta in the
// range of -128 to 127. Finally, the first value of the run is encoded
// as a base 128 varint.
// For example, if the sequence is 100 instances of 7 the encoding would
// start with 100 - 3, followed by a delta of 0, and a varint of 7 for
// an encoding of [0x61, 0x00, 0x07]. To encode the sequence of numbers
// running from 100 to 1, the first byte is 100 - 3, the delta is -1,
// and the varint is 100 for an encoding of [0x61, 0xff, 0x64].
//
// Literals start with an initial byte of 0x80 to 0xff, which corresponds
// to the negative of number of literals in the sequence. Following the
// header byte, the list of N varints is encoded. Thus, if there are
// no runs, the overhead is 1 byte for each 128 integers. The first 5
// prime numbers [2, 3, 4, 7, 11] would encoded as [0xfb, 0x02, 0x03,
// 0x04, 0x07, 0xb].
template <class IntType>
class RleCoderV1 : public RleCoder {
public:
explicit RleCoderV1(std::unique_ptr<SeekableOutputStream> stream,
bool hasSigned)
: output(std::move(stream)),
isSigned(hasSigned),
literals(MAX_LITERAL_SIZE) {}
void write(void* data, uint64_t numValues, const char* notNull) override {
IntType* d = reinterpret_cast<IntType*>(data);
for (uint64_t i = 0; i < numValues; i++) {
if ((notNull == nullptr) || notNull[i]) {
// LOG_INFO("write value %lld", static_cast<int64_t>(d[i]));
write(static_cast<int64_t>(d[i]));
}
}
}
void flushToStream(OutputStream* stream) override {
writeValues();
output->flushToStream(stream);
}
uint64_t getStreamSize() override { return output->getStreamSize(); }
void reset() override { output->reset(); }
uint64_t getEstimatedSpaceNeeded() override {
return output->getEstimatedSpaceNeeded() + numLiterals * sizeof(int64_t) +
sizeof(uint8_t) // delta
+ sizeof(uint8_t); // control bytes
}
private:
// Write the input value
// @param value The input value
// @return Void
void write(int64_t value) {
if (numLiterals == 0) {
literals[numLiterals++] = value;
tailRunLength = 1;
} else if (repeat) {
if (value == literals[0] + delta * numLiterals) {
numLiterals += 1;
if (numLiterals == MAX_REPEAT_SIZE) {
writeValues();
}
} else {
writeValues();
literals[numLiterals++] = value;
tailRunLength = 1;
}
} else {
if (tailRunLength == 1) {
delta = value - literals[numLiterals - 1];
if (delta < MIN_DELTA || delta > MAX_DELTA) {
tailRunLength = 1;
} else {
tailRunLength = 2;
}
} else if (value == literals[numLiterals - 1] + delta) {
tailRunLength += 1;
} else {
delta = value - literals[numLiterals - 1];
if (delta < MIN_DELTA || delta > MAX_DELTA) {
tailRunLength = 1;
} else {
tailRunLength = 2;
}
}
if (tailRunLength == MIN_REPEAT_SIZE) {
if (numLiterals + 1 == MIN_REPEAT_SIZE) {
repeat = true;
numLiterals += 1;
} else {
numLiterals -= MIN_REPEAT_SIZE - 1;
int64_t base = literals[numLiterals];
writeValues();
literals[0] = base;
repeat = true;
numLiterals = MIN_REPEAT_SIZE;
}
} else {
literals[numLiterals++] = value;
if (numLiterals == MAX_LITERAL_SIZE) {
writeValues();
}
}
}
}
void writeValues() {
if (numLiterals != 0) {
if (repeat) {
output->writeByte(numLiterals - MIN_REPEAT_SIZE);
output->writeByte((int8_t)delta);
if (isSigned) {
writeInt64(output.get(), literals[0]);
} else {
writeUInt64(output.get(), literals[0]);
}
} else {
output->writeByte(-numLiterals);
for (uint32_t i = 0; i < numLiterals; ++i) {
if (isSigned) {
writeInt64(output.get(), literals[i]);
} else {
writeUInt64(output.get(), literals[i]);
}
}
}
repeat = false;
numLiterals = 0;
tailRunLength = 0;
}
}
private:
std::unique_ptr<SeekableOutputStream> output;
const bool isSigned = false;
const int32_t MIN_REPEAT_SIZE = 3;
const int32_t MAX_DELTA = 127;
const int32_t MIN_DELTA = -128;
const int32_t MAX_LITERAL_SIZE = 128;
const int32_t MAX_REPEAT_SIZE = 127 + MIN_REPEAT_SIZE;
std::vector<int64_t> literals;
int32_t numLiterals = 0;
int64_t delta = 0;
bool repeat = false;
int32_t tailRunLength = 0;
};
} // namespace orc
#endif // STORAGE_SRC_STORAGE_FORMAT_ORC_RLE_V1_H_