blob: ee1a4575dc6954b57d24e3d307d801b241ecedfa [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <algorithm>
#include <iostream>
#include <string.h>
#include <utility>
#include "ByteRLE.hh"
#include "orc/Exceptions.hh"
namespace orc {
const int MINIMUM_REPEAT = 3;
const int MAXIMUM_REPEAT = 127 + MINIMUM_REPEAT;
const int MAX_LITERAL_SIZE = 128;
ByteRleEncoder::~ByteRleEncoder() {
// PASS
}
class ByteRleEncoderImpl : public ByteRleEncoder {
public:
ByteRleEncoderImpl(std::unique_ptr<BufferedOutputStream> output);
virtual ~ByteRleEncoderImpl() override;
/**
* Encode the next batch of values.
* @param data to be encoded
* @param numValues the number of values to be encoded
* @param notNull If the pointer is null, all values are read. If the
* pointer is not null, positions that are false are skipped.
*/
virtual void add(const char* data, uint64_t numValues,
const char* notNull) override;
/**
* Get size of buffer used so far.
*/
virtual uint64_t getBufferSize() const override;
/**
* Flush underlying BufferedOutputStream.
*/
virtual uint64_t flush() override;
virtual void recordPosition(PositionRecorder* recorder) const override;
protected:
std::unique_ptr<BufferedOutputStream> outputStream;
char* literals;
int numLiterals;
bool repeat;
int tailRunLength;
int bufferPosition;
int bufferLength;
char* buffer;
void writeByte(char c);
void writeValues();
void write(char c);
};
ByteRleEncoderImpl::ByteRleEncoderImpl(
std::unique_ptr<BufferedOutputStream> output)
: outputStream(std::move(output)) {
literals = new char[MAX_LITERAL_SIZE];
numLiterals = 0;
tailRunLength = 0;
repeat = false;
bufferPosition = 0;
bufferLength = 0;
buffer = nullptr;
}
ByteRleEncoderImpl::~ByteRleEncoderImpl() {
// PASS
delete [] literals;
}
void ByteRleEncoderImpl::writeByte(char c) {
if (bufferPosition == bufferLength) {
int addedSize = 0;
if (!outputStream->Next(reinterpret_cast<void **>(&buffer), &addedSize)) {
throw std::bad_alloc();
}
bufferPosition = 0;
bufferLength = addedSize;
}
buffer[bufferPosition++] = c;
}
void ByteRleEncoderImpl::add(
const char* data,
uint64_t numValues,
const char* notNull) {
for (uint64_t i = 0; i < numValues; ++i) {
if (!notNull || notNull[i]) {
write(data[i]);
}
}
}
void ByteRleEncoderImpl::writeValues() {
if (numLiterals != 0) {
if (repeat) {
writeByte(
static_cast<char>(numLiterals - static_cast<int>(MINIMUM_REPEAT)));
writeByte(literals[0]);
} else {
writeByte(static_cast<char>(-numLiterals));
for (int i = 0; i < numLiterals; ++i) {
writeByte(literals[i]);
}
}
repeat = false;
tailRunLength = 0;
numLiterals = 0;
}
}
uint64_t ByteRleEncoderImpl::flush() {
writeValues();
outputStream->BackUp(bufferLength - bufferPosition);
uint64_t dataSize = outputStream->flush();
bufferLength = bufferPosition = 0;
return dataSize;
}
void ByteRleEncoderImpl::write(char value) {
if (numLiterals == 0) {
literals[numLiterals++] = value;
tailRunLength = 1;
} else if (repeat) {
if (value == literals[0]) {
numLiterals += 1;
if (numLiterals == MAXIMUM_REPEAT) {
writeValues();
}
} else {
writeValues();
literals[numLiterals++] = value;
tailRunLength = 1;
}
} else {
if (value == literals[numLiterals - 1]) {
tailRunLength += 1;
} else {
tailRunLength = 1;
}
if (tailRunLength == MINIMUM_REPEAT) {
if (numLiterals + 1 == MINIMUM_REPEAT) {
repeat = true;
numLiterals += 1;
} else {
numLiterals -= static_cast<int>(MINIMUM_REPEAT - 1);
writeValues();
literals[0] = value;
repeat = true;
numLiterals = MINIMUM_REPEAT;
}
} else {
literals[numLiterals++] = value;
if (numLiterals == MAX_LITERAL_SIZE) {
writeValues();
}
}
}
}
uint64_t ByteRleEncoderImpl::getBufferSize() const {
return outputStream->getSize();
}
void ByteRleEncoderImpl::recordPosition(PositionRecorder *recorder) const {
uint64_t flushedSize = outputStream->getSize();
uint64_t unflushedSize = static_cast<uint64_t>(bufferPosition);
if (outputStream->isCompressed()) {
// start of the compression chunk in the stream
recorder->add(flushedSize);
// number of decompressed bytes that need to be consumed
recorder->add(unflushedSize);
} else {
flushedSize -= static_cast<uint64_t>(bufferLength);
// byte offset of the RLE run’s start location
recorder->add(flushedSize + unflushedSize);
}
recorder->add(static_cast<uint64_t>(numLiterals));
}
std::unique_ptr<ByteRleEncoder> createByteRleEncoder
(std::unique_ptr<BufferedOutputStream> output) {
return std::unique_ptr<ByteRleEncoder>(new ByteRleEncoderImpl
(std::move(output)));
}
class BooleanRleEncoderImpl : public ByteRleEncoderImpl {
public:
BooleanRleEncoderImpl(std::unique_ptr<BufferedOutputStream> output);
virtual ~BooleanRleEncoderImpl() override;
/**
* Encode the next batch of values
* @param data to be encoded
* @param numValues the number of values to be encoded
* @param notNull If the pointer is null, all values are read. If the
* pointer is not null, positions that are false are skipped.
*/
virtual void add(const char* data, uint64_t numValues,
const char* notNull) override;
/**
* Flushing underlying BufferedOutputStream
*/
virtual uint64_t flush() override;
virtual void recordPosition(PositionRecorder* recorder) const override;
private:
int bitsRemained;
char current;
};
BooleanRleEncoderImpl::BooleanRleEncoderImpl(
std::unique_ptr<BufferedOutputStream> output)
: ByteRleEncoderImpl(std::move(output)) {
bitsRemained = 8;
current = static_cast<char>(0);
}
BooleanRleEncoderImpl::~BooleanRleEncoderImpl() {
// PASS
}
void BooleanRleEncoderImpl::add(
const char* data,
uint64_t numValues,
const char* notNull) {
for (uint64_t i = 0; i < numValues; ++i) {
if (bitsRemained == 0) {
write(current);
current = static_cast<char>(0);
bitsRemained = 8;
}
if (!notNull || notNull[i]) {
if (!data || data[i]) {
current =
static_cast<char>(current | (0x80 >> (8 - bitsRemained)));
}
--bitsRemained;
}
}
if (bitsRemained == 0) {
write(current);
current = static_cast<char>(0);
bitsRemained = 8;
}
}
uint64_t BooleanRleEncoderImpl::flush() {
if (bitsRemained != 8) {
write(current);
}
bitsRemained = 8;
current = static_cast<char>(0);
return ByteRleEncoderImpl::flush();
}
void BooleanRleEncoderImpl::recordPosition(PositionRecorder* recorder) const {
ByteRleEncoderImpl::recordPosition(recorder);
recorder->add(static_cast<uint64_t>(8 - bitsRemained));
}
std::unique_ptr<ByteRleEncoder> createBooleanRleEncoder
(std::unique_ptr<BufferedOutputStream> output) {
BooleanRleEncoderImpl* encoder =
new BooleanRleEncoderImpl(std::move(output)) ;
return std::unique_ptr<ByteRleEncoder>(
reinterpret_cast<ByteRleEncoder*>(encoder));
}
ByteRleDecoder::~ByteRleDecoder() {
// PASS
}
class ByteRleDecoderImpl: public ByteRleDecoder {
public:
ByteRleDecoderImpl(std::unique_ptr<SeekableInputStream> input);
virtual ~ByteRleDecoderImpl();
/**
* Seek to a particular spot.
*/
virtual void seek(PositionProvider&);
/**
* Seek over a given number of values.
*/
virtual void skip(uint64_t numValues);
/**
* Read a number of values into the batch.
*/
virtual void next(char* data, uint64_t numValues, char* notNull);
protected:
inline void nextBuffer();
inline signed char readByte();
inline void readHeader();
std::unique_ptr<SeekableInputStream> inputStream;
size_t remainingValues;
char value;
const char* bufferStart;
const char* bufferEnd;
bool repeating;
};
void ByteRleDecoderImpl::nextBuffer() {
int bufferLength;
const void* bufferPointer;
bool result = inputStream->Next(&bufferPointer, &bufferLength);
if (!result) {
throw ParseError("bad read in nextBuffer");
}
bufferStart = static_cast<const char*>(bufferPointer);
bufferEnd = bufferStart + bufferLength;
}
signed char ByteRleDecoderImpl::readByte() {
if (bufferStart == bufferEnd) {
nextBuffer();
}
return *(bufferStart++);
}
void ByteRleDecoderImpl::readHeader() {
signed char ch = readByte();
if (ch < 0) {
remainingValues = static_cast<size_t>(-ch);
repeating = false;
} else {
remainingValues = static_cast<size_t>(ch) + MINIMUM_REPEAT;
repeating = true;
value = readByte();
}
}
ByteRleDecoderImpl::ByteRleDecoderImpl(std::unique_ptr<SeekableInputStream>
input) {
inputStream = std::move(input);
repeating = false;
remainingValues = 0;
value = 0;
bufferStart = nullptr;
bufferEnd = nullptr;
}
ByteRleDecoderImpl::~ByteRleDecoderImpl() {
// PASS
}
void ByteRleDecoderImpl::seek(PositionProvider& location) {
// move the input stream
inputStream->seek(location);
// force a re-read from the stream
bufferEnd = bufferStart;
// read a new header
readHeader();
// skip ahead the given number of records
ByteRleDecoderImpl::skip(location.next());
}
void ByteRleDecoderImpl::skip(uint64_t numValues) {
while (numValues > 0) {
if (remainingValues == 0) {
readHeader();
}
size_t count = std::min(static_cast<size_t>(numValues), remainingValues);
remainingValues -= count;
numValues -= count;
// for literals we need to skip over count bytes, which may involve
// reading from the underlying stream
if (!repeating) {
size_t consumedBytes = count;
while (consumedBytes > 0) {
if (bufferStart == bufferEnd) {
nextBuffer();
}
size_t skipSize = std::min(static_cast<size_t>(consumedBytes),
static_cast<size_t>(bufferEnd -
bufferStart));
bufferStart += skipSize;
consumedBytes -= skipSize;
}
}
}
}
void ByteRleDecoderImpl::next(char* data, uint64_t numValues,
char* notNull) {
uint64_t position = 0;
// skip over null values
while (notNull && position < numValues && !notNull[position]) {
position += 1;
}
while (position < numValues) {
// if we are out of values, read more
if (remainingValues == 0) {
readHeader();
}
// how many do we read out of this block?
size_t count = std::min(static_cast<size_t>(numValues - position),
remainingValues);
uint64_t consumed = 0;
if (repeating) {
if (notNull) {
for(uint64_t i=0; i < count; ++i) {
if (notNull[position + i]) {
data[position + i] = value;
consumed += 1;
}
}
} else {
memset(data + position, value, count);
consumed = count;
}
} else {
if (notNull) {
for(uint64_t i=0; i < count; ++i) {
if (notNull[position + i]) {
data[position + i] = readByte();
consumed += 1;
}
}
} else {
uint64_t i = 0;
while (i < count) {
if (bufferStart == bufferEnd) {
nextBuffer();
}
uint64_t copyBytes =
std::min(static_cast<uint64_t>(count - i),
static_cast<uint64_t>(bufferEnd - bufferStart));
memcpy(data + position + i, bufferStart, copyBytes);
bufferStart += copyBytes;
i += copyBytes;
}
consumed = count;
}
}
remainingValues -= consumed;
position += count;
// skip over any null values
while (notNull && position < numValues && !notNull[position]) {
position += 1;
}
}
}
std::unique_ptr<ByteRleDecoder> createByteRleDecoder
(std::unique_ptr<SeekableInputStream> input) {
return std::unique_ptr<ByteRleDecoder>(new ByteRleDecoderImpl
(std::move(input)));
}
class BooleanRleDecoderImpl: public ByteRleDecoderImpl {
public:
BooleanRleDecoderImpl(std::unique_ptr<SeekableInputStream> input);
virtual ~BooleanRleDecoderImpl();
/**
* Seek to a particular spot.
*/
virtual void seek(PositionProvider&);
/**
* Seek over a given number of values.
*/
virtual void skip(uint64_t numValues);
/**
* Read a number of values into the batch.
*/
virtual void next(char* data, uint64_t numValues, char* notNull);
protected:
size_t remainingBits;
char lastByte;
};
BooleanRleDecoderImpl::BooleanRleDecoderImpl
(std::unique_ptr<SeekableInputStream> input
): ByteRleDecoderImpl(std::move(input)) {
remainingBits = 0;
lastByte = 0;
}
BooleanRleDecoderImpl::~BooleanRleDecoderImpl() {
// PASS
}
void BooleanRleDecoderImpl::seek(PositionProvider& location) {
ByteRleDecoderImpl::seek(location);
uint64_t consumed = location.next();
remainingBits = 0;
if (consumed > 8) {
throw ParseError("bad position");
}
if (consumed != 0) {
remainingBits = 8 - consumed;
ByteRleDecoderImpl::next(&lastByte, 1, nullptr);
}
}
void BooleanRleDecoderImpl::skip(uint64_t numValues) {
if (numValues <= remainingBits) {
remainingBits -= numValues;
} else {
numValues -= remainingBits;
uint64_t bytesSkipped = numValues / 8;
ByteRleDecoderImpl::skip(bytesSkipped);
if (numValues % 8 != 0) {
ByteRleDecoderImpl::next(&lastByte, 1, nullptr);
remainingBits = 8 - (numValues % 8);
} else {
remainingBits = 0;
}
}
}
void BooleanRleDecoderImpl::next(char* data, uint64_t numValues,
char* notNull) {
// next spot to fill in
uint64_t position = 0;
// use up any remaining bits
if (notNull) {
while(remainingBits > 0 && position < numValues) {
if (notNull[position]) {
remainingBits -= 1;
data[position] = (static_cast<unsigned char>(lastByte) >>
remainingBits) & 0x1;
} else {
data[position] = 0;
}
position += 1;
}
} else {
while(remainingBits > 0 && position < numValues) {
remainingBits -= 1;
data[position++] = (static_cast<unsigned char>(lastByte) >>
remainingBits) & 0x1;
}
}
// count the number of nonNulls remaining
uint64_t nonNulls = numValues - position;
if (notNull) {
for(uint64_t i=position; i < numValues; ++i) {
if (!notNull[i]) {
nonNulls -= 1;
}
}
}
// fill in the remaining values
if (nonNulls == 0) {
while (position < numValues) {
data[position++] = 0;
}
} else if (position < numValues) {
// read the new bytes into the array
uint64_t bytesRead = (nonNulls + 7) / 8;
ByteRleDecoderImpl::next(data + position, bytesRead, nullptr);
lastByte = data[position + bytesRead - 1];
remainingBits = bytesRead * 8 - nonNulls;
// expand the array backwards so that we don't clobber the data
uint64_t bitsLeft = bytesRead * 8 - remainingBits;
if (notNull) {
for(int64_t i=static_cast<int64_t>(numValues) - 1;
i >= static_cast<int64_t>(position); --i) {
if (notNull[i]) {
uint64_t shiftPosn = (-bitsLeft) % 8;
data[i] = (data[position + (bitsLeft - 1) / 8] >> shiftPosn) & 0x1;
bitsLeft -= 1;
} else {
data[i] = 0;
}
}
} else {
for(int64_t i=static_cast<int64_t>(numValues) - 1;
i >= static_cast<int64_t>(position); --i, --bitsLeft) {
uint64_t shiftPosn = (-bitsLeft) % 8;
data[i] = (data[position + (bitsLeft - 1) / 8] >> shiftPosn) & 0x1;
}
}
}
}
std::unique_ptr<ByteRleDecoder> createBooleanRleDecoder
(std::unique_ptr<SeekableInputStream> input) {
BooleanRleDecoderImpl* decoder =
new BooleanRleDecoderImpl(std::move(input));
return std::unique_ptr<ByteRleDecoder>(
reinterpret_cast<ByteRleDecoder*>(decoder));
}
}