| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include <algorithm> |
| #include <iostream> |
| #include <string.h> |
| #include <utility> |
| |
| #include "ByteRLE.hh" |
| #include "orc/Exceptions.hh" |
| |
| namespace orc { |
| |
| const int MINIMUM_REPEAT = 3; |
| const int MAXIMUM_REPEAT = 127 + MINIMUM_REPEAT; |
| const int MAX_LITERAL_SIZE = 128; |
| |
| ByteRleEncoder::~ByteRleEncoder() { |
| // PASS |
| } |
| |
| class ByteRleEncoderImpl : public ByteRleEncoder { |
| public: |
| ByteRleEncoderImpl(std::unique_ptr<BufferedOutputStream> output); |
| virtual ~ByteRleEncoderImpl() override; |
| |
| /** |
| * Encode the next batch of values. |
| * @param data to be encoded |
| * @param numValues the number of values to be encoded |
| * @param notNull If the pointer is null, all values are read. If the |
| * pointer is not null, positions that are false are skipped. |
| */ |
| virtual void add(const char* data, uint64_t numValues, |
| const char* notNull) override; |
| |
| /** |
| * Get size of buffer used so far. |
| */ |
| virtual uint64_t getBufferSize() const override; |
| |
| /** |
| * Flush underlying BufferedOutputStream. |
| */ |
| virtual uint64_t flush() override; |
| |
| virtual void recordPosition(PositionRecorder* recorder) const override; |
| |
| protected: |
| std::unique_ptr<BufferedOutputStream> outputStream; |
| char* literals; |
| int numLiterals; |
| bool repeat; |
| int tailRunLength; |
| int bufferPosition; |
| int bufferLength; |
| char* buffer; |
| |
| void writeByte(char c); |
| void writeValues(); |
| void write(char c); |
| }; |
| |
| ByteRleEncoderImpl::ByteRleEncoderImpl( |
| std::unique_ptr<BufferedOutputStream> output) |
| : outputStream(std::move(output)) { |
| literals = new char[MAX_LITERAL_SIZE]; |
| numLiterals = 0; |
| tailRunLength = 0; |
| repeat = false; |
| bufferPosition = 0; |
| bufferLength = 0; |
| buffer = nullptr; |
| } |
| |
| ByteRleEncoderImpl::~ByteRleEncoderImpl() { |
| // PASS |
| delete [] literals; |
| } |
| |
| void ByteRleEncoderImpl::writeByte(char c) { |
| if (bufferPosition == bufferLength) { |
| int addedSize = 0; |
| if (!outputStream->Next(reinterpret_cast<void **>(&buffer), &addedSize)) { |
| throw std::bad_alloc(); |
| } |
| bufferPosition = 0; |
| bufferLength = addedSize; |
| } |
| buffer[bufferPosition++] = c; |
| } |
| |
| void ByteRleEncoderImpl::add( |
| const char* data, |
| uint64_t numValues, |
| const char* notNull) { |
| for (uint64_t i = 0; i < numValues; ++i) { |
| if (!notNull || notNull[i]) { |
| write(data[i]); |
| } |
| } |
| } |
| |
| void ByteRleEncoderImpl::writeValues() { |
| if (numLiterals != 0) { |
| if (repeat) { |
| writeByte( |
| static_cast<char>(numLiterals - static_cast<int>(MINIMUM_REPEAT))); |
| writeByte(literals[0]); |
| } else { |
| writeByte(static_cast<char>(-numLiterals)); |
| for (int i = 0; i < numLiterals; ++i) { |
| writeByte(literals[i]); |
| } |
| } |
| repeat = false; |
| tailRunLength = 0; |
| numLiterals = 0; |
| } |
| } |
| |
| uint64_t ByteRleEncoderImpl::flush() { |
| writeValues(); |
| outputStream->BackUp(bufferLength - bufferPosition); |
| uint64_t dataSize = outputStream->flush(); |
| bufferLength = bufferPosition = 0; |
| return dataSize; |
| } |
| |
| void ByteRleEncoderImpl::write(char value) { |
| if (numLiterals == 0) { |
| literals[numLiterals++] = value; |
| tailRunLength = 1; |
| } else if (repeat) { |
| if (value == literals[0]) { |
| numLiterals += 1; |
| if (numLiterals == MAXIMUM_REPEAT) { |
| writeValues(); |
| } |
| } else { |
| writeValues(); |
| literals[numLiterals++] = value; |
| tailRunLength = 1; |
| } |
| } else { |
| if (value == literals[numLiterals - 1]) { |
| tailRunLength += 1; |
| } else { |
| tailRunLength = 1; |
| } |
| if (tailRunLength == MINIMUM_REPEAT) { |
| if (numLiterals + 1 == MINIMUM_REPEAT) { |
| repeat = true; |
| numLiterals += 1; |
| } else { |
| numLiterals -= static_cast<int>(MINIMUM_REPEAT - 1); |
| writeValues(); |
| literals[0] = value; |
| repeat = true; |
| numLiterals = MINIMUM_REPEAT; |
| } |
| } else { |
| literals[numLiterals++] = value; |
| if (numLiterals == MAX_LITERAL_SIZE) { |
| writeValues(); |
| } |
| } |
| } |
| } |
| |
| uint64_t ByteRleEncoderImpl::getBufferSize() const { |
| return outputStream->getSize(); |
| } |
| |
| void ByteRleEncoderImpl::recordPosition(PositionRecorder *recorder) const { |
| uint64_t flushedSize = outputStream->getSize(); |
| uint64_t unflushedSize = static_cast<uint64_t>(bufferPosition); |
| if (outputStream->isCompressed()) { |
| // start of the compression chunk in the stream |
| recorder->add(flushedSize); |
| // number of decompressed bytes that need to be consumed |
| recorder->add(unflushedSize); |
| } else { |
| flushedSize -= static_cast<uint64_t>(bufferLength); |
| // byte offset of the RLE run’s start location |
| recorder->add(flushedSize + unflushedSize); |
| } |
| recorder->add(static_cast<uint64_t>(numLiterals)); |
| } |
| |
| std::unique_ptr<ByteRleEncoder> createByteRleEncoder |
| (std::unique_ptr<BufferedOutputStream> output) { |
| return std::unique_ptr<ByteRleEncoder>(new ByteRleEncoderImpl |
| (std::move(output))); |
| } |
| |
| class BooleanRleEncoderImpl : public ByteRleEncoderImpl { |
| public: |
| BooleanRleEncoderImpl(std::unique_ptr<BufferedOutputStream> output); |
| virtual ~BooleanRleEncoderImpl() override; |
| |
| /** |
| * Encode the next batch of values |
| * @param data to be encoded |
| * @param numValues the number of values to be encoded |
| * @param notNull If the pointer is null, all values are read. If the |
| * pointer is not null, positions that are false are skipped. |
| */ |
| virtual void add(const char* data, uint64_t numValues, |
| const char* notNull) override; |
| |
| /** |
| * Flushing underlying BufferedOutputStream |
| */ |
| virtual uint64_t flush() override; |
| |
| virtual void recordPosition(PositionRecorder* recorder) const override; |
| |
| private: |
| int bitsRemained; |
| char current; |
| |
| }; |
| |
| BooleanRleEncoderImpl::BooleanRleEncoderImpl( |
| std::unique_ptr<BufferedOutputStream> output) |
| : ByteRleEncoderImpl(std::move(output)) { |
| bitsRemained = 8; |
| current = static_cast<char>(0); |
| } |
| |
| BooleanRleEncoderImpl::~BooleanRleEncoderImpl() { |
| // PASS |
| } |
| |
| void BooleanRleEncoderImpl::add( |
| const char* data, |
| uint64_t numValues, |
| const char* notNull) { |
| for (uint64_t i = 0; i < numValues; ++i) { |
| if (bitsRemained == 0) { |
| write(current); |
| current = static_cast<char>(0); |
| bitsRemained = 8; |
| } |
| if (!notNull || notNull[i]) { |
| if (!data || data[i]) { |
| current = |
| static_cast<char>(current | (0x80 >> (8 - bitsRemained))); |
| } |
| --bitsRemained; |
| } |
| } |
| if (bitsRemained == 0) { |
| write(current); |
| current = static_cast<char>(0); |
| bitsRemained = 8; |
| } |
| } |
| |
| uint64_t BooleanRleEncoderImpl::flush() { |
| if (bitsRemained != 8) { |
| write(current); |
| } |
| bitsRemained = 8; |
| current = static_cast<char>(0); |
| return ByteRleEncoderImpl::flush(); |
| } |
| |
| void BooleanRleEncoderImpl::recordPosition(PositionRecorder* recorder) const { |
| ByteRleEncoderImpl::recordPosition(recorder); |
| recorder->add(static_cast<uint64_t>(8 - bitsRemained)); |
| } |
| |
| std::unique_ptr<ByteRleEncoder> createBooleanRleEncoder |
| (std::unique_ptr<BufferedOutputStream> output) { |
| BooleanRleEncoderImpl* encoder = |
| new BooleanRleEncoderImpl(std::move(output)) ; |
| return std::unique_ptr<ByteRleEncoder>( |
| reinterpret_cast<ByteRleEncoder*>(encoder)); |
| } |
| |
| ByteRleDecoder::~ByteRleDecoder() { |
| // PASS |
| } |
| |
| class ByteRleDecoderImpl: public ByteRleDecoder { |
| public: |
| ByteRleDecoderImpl(std::unique_ptr<SeekableInputStream> input); |
| |
| virtual ~ByteRleDecoderImpl(); |
| |
| /** |
| * Seek to a particular spot. |
| */ |
| virtual void seek(PositionProvider&); |
| |
| /** |
| * Seek over a given number of values. |
| */ |
| virtual void skip(uint64_t numValues); |
| |
| /** |
| * Read a number of values into the batch. |
| */ |
| virtual void next(char* data, uint64_t numValues, char* notNull); |
| |
| protected: |
| inline void nextBuffer(); |
| inline signed char readByte(); |
| inline void readHeader(); |
| |
| std::unique_ptr<SeekableInputStream> inputStream; |
| size_t remainingValues; |
| char value; |
| const char* bufferStart; |
| const char* bufferEnd; |
| bool repeating; |
| }; |
| |
| void ByteRleDecoderImpl::nextBuffer() { |
| int bufferLength; |
| const void* bufferPointer; |
| bool result = inputStream->Next(&bufferPointer, &bufferLength); |
| if (!result) { |
| throw ParseError("bad read in nextBuffer"); |
| } |
| bufferStart = static_cast<const char*>(bufferPointer); |
| bufferEnd = bufferStart + bufferLength; |
| } |
| |
| signed char ByteRleDecoderImpl::readByte() { |
| if (bufferStart == bufferEnd) { |
| nextBuffer(); |
| } |
| return *(bufferStart++); |
| } |
| |
| void ByteRleDecoderImpl::readHeader() { |
| signed char ch = readByte(); |
| if (ch < 0) { |
| remainingValues = static_cast<size_t>(-ch); |
| repeating = false; |
| } else { |
| remainingValues = static_cast<size_t>(ch) + MINIMUM_REPEAT; |
| repeating = true; |
| value = readByte(); |
| } |
| } |
| |
| ByteRleDecoderImpl::ByteRleDecoderImpl(std::unique_ptr<SeekableInputStream> |
| input) { |
| inputStream = std::move(input); |
| repeating = false; |
| remainingValues = 0; |
| value = 0; |
| bufferStart = nullptr; |
| bufferEnd = nullptr; |
| } |
| |
| ByteRleDecoderImpl::~ByteRleDecoderImpl() { |
| // PASS |
| } |
| |
| void ByteRleDecoderImpl::seek(PositionProvider& location) { |
| // move the input stream |
| inputStream->seek(location); |
| // force a re-read from the stream |
| bufferEnd = bufferStart; |
| // read a new header |
| readHeader(); |
| // skip ahead the given number of records |
| ByteRleDecoderImpl::skip(location.next()); |
| } |
| |
| void ByteRleDecoderImpl::skip(uint64_t numValues) { |
| while (numValues > 0) { |
| if (remainingValues == 0) { |
| readHeader(); |
| } |
| size_t count = std::min(static_cast<size_t>(numValues), remainingValues); |
| remainingValues -= count; |
| numValues -= count; |
| // for literals we need to skip over count bytes, which may involve |
| // reading from the underlying stream |
| if (!repeating) { |
| size_t consumedBytes = count; |
| while (consumedBytes > 0) { |
| if (bufferStart == bufferEnd) { |
| nextBuffer(); |
| } |
| size_t skipSize = std::min(static_cast<size_t>(consumedBytes), |
| static_cast<size_t>(bufferEnd - |
| bufferStart)); |
| bufferStart += skipSize; |
| consumedBytes -= skipSize; |
| } |
| } |
| } |
| } |
| |
| void ByteRleDecoderImpl::next(char* data, uint64_t numValues, |
| char* notNull) { |
| uint64_t position = 0; |
| // skip over null values |
| while (notNull && position < numValues && !notNull[position]) { |
| position += 1; |
| } |
| while (position < numValues) { |
| // if we are out of values, read more |
| if (remainingValues == 0) { |
| readHeader(); |
| } |
| // how many do we read out of this block? |
| size_t count = std::min(static_cast<size_t>(numValues - position), |
| remainingValues); |
| uint64_t consumed = 0; |
| if (repeating) { |
| if (notNull) { |
| for(uint64_t i=0; i < count; ++i) { |
| if (notNull[position + i]) { |
| data[position + i] = value; |
| consumed += 1; |
| } |
| } |
| } else { |
| memset(data + position, value, count); |
| consumed = count; |
| } |
| } else { |
| if (notNull) { |
| for(uint64_t i=0; i < count; ++i) { |
| if (notNull[position + i]) { |
| data[position + i] = readByte(); |
| consumed += 1; |
| } |
| } |
| } else { |
| uint64_t i = 0; |
| while (i < count) { |
| if (bufferStart == bufferEnd) { |
| nextBuffer(); |
| } |
| uint64_t copyBytes = |
| std::min(static_cast<uint64_t>(count - i), |
| static_cast<uint64_t>(bufferEnd - bufferStart)); |
| memcpy(data + position + i, bufferStart, copyBytes); |
| bufferStart += copyBytes; |
| i += copyBytes; |
| } |
| consumed = count; |
| } |
| } |
| remainingValues -= consumed; |
| position += count; |
| // skip over any null values |
| while (notNull && position < numValues && !notNull[position]) { |
| position += 1; |
| } |
| } |
| } |
| |
| std::unique_ptr<ByteRleDecoder> createByteRleDecoder |
| (std::unique_ptr<SeekableInputStream> input) { |
| return std::unique_ptr<ByteRleDecoder>(new ByteRleDecoderImpl |
| (std::move(input))); |
| } |
| |
| class BooleanRleDecoderImpl: public ByteRleDecoderImpl { |
| public: |
| BooleanRleDecoderImpl(std::unique_ptr<SeekableInputStream> input); |
| |
| virtual ~BooleanRleDecoderImpl(); |
| |
| /** |
| * Seek to a particular spot. |
| */ |
| virtual void seek(PositionProvider&); |
| |
| /** |
| * Seek over a given number of values. |
| */ |
| virtual void skip(uint64_t numValues); |
| |
| /** |
| * Read a number of values into the batch. |
| */ |
| virtual void next(char* data, uint64_t numValues, char* notNull); |
| |
| protected: |
| size_t remainingBits; |
| char lastByte; |
| }; |
| |
| BooleanRleDecoderImpl::BooleanRleDecoderImpl |
| (std::unique_ptr<SeekableInputStream> input |
| ): ByteRleDecoderImpl(std::move(input)) { |
| remainingBits = 0; |
| lastByte = 0; |
| } |
| |
| BooleanRleDecoderImpl::~BooleanRleDecoderImpl() { |
| // PASS |
| } |
| |
| void BooleanRleDecoderImpl::seek(PositionProvider& location) { |
| ByteRleDecoderImpl::seek(location); |
| uint64_t consumed = location.next(); |
| remainingBits = 0; |
| if (consumed > 8) { |
| throw ParseError("bad position"); |
| } |
| if (consumed != 0) { |
| remainingBits = 8 - consumed; |
| ByteRleDecoderImpl::next(&lastByte, 1, nullptr); |
| } |
| } |
| |
| void BooleanRleDecoderImpl::skip(uint64_t numValues) { |
| if (numValues <= remainingBits) { |
| remainingBits -= numValues; |
| } else { |
| numValues -= remainingBits; |
| uint64_t bytesSkipped = numValues / 8; |
| ByteRleDecoderImpl::skip(bytesSkipped); |
| if (numValues % 8 != 0) { |
| ByteRleDecoderImpl::next(&lastByte, 1, nullptr); |
| remainingBits = 8 - (numValues % 8); |
| } else { |
| remainingBits = 0; |
| } |
| } |
| } |
| |
| void BooleanRleDecoderImpl::next(char* data, uint64_t numValues, |
| char* notNull) { |
| // next spot to fill in |
| uint64_t position = 0; |
| |
| // use up any remaining bits |
| if (notNull) { |
| while(remainingBits > 0 && position < numValues) { |
| if (notNull[position]) { |
| remainingBits -= 1; |
| data[position] = (static_cast<unsigned char>(lastByte) >> |
| remainingBits) & 0x1; |
| } else { |
| data[position] = 0; |
| } |
| position += 1; |
| } |
| } else { |
| while(remainingBits > 0 && position < numValues) { |
| remainingBits -= 1; |
| data[position++] = (static_cast<unsigned char>(lastByte) >> |
| remainingBits) & 0x1; |
| } |
| } |
| |
| // count the number of nonNulls remaining |
| uint64_t nonNulls = numValues - position; |
| if (notNull) { |
| for(uint64_t i=position; i < numValues; ++i) { |
| if (!notNull[i]) { |
| nonNulls -= 1; |
| } |
| } |
| } |
| |
| // fill in the remaining values |
| if (nonNulls == 0) { |
| while (position < numValues) { |
| data[position++] = 0; |
| } |
| } else if (position < numValues) { |
| // read the new bytes into the array |
| uint64_t bytesRead = (nonNulls + 7) / 8; |
| ByteRleDecoderImpl::next(data + position, bytesRead, nullptr); |
| lastByte = data[position + bytesRead - 1]; |
| remainingBits = bytesRead * 8 - nonNulls; |
| // expand the array backwards so that we don't clobber the data |
| uint64_t bitsLeft = bytesRead * 8 - remainingBits; |
| if (notNull) { |
| for(int64_t i=static_cast<int64_t>(numValues) - 1; |
| i >= static_cast<int64_t>(position); --i) { |
| if (notNull[i]) { |
| uint64_t shiftPosn = (-bitsLeft) % 8; |
| data[i] = (data[position + (bitsLeft - 1) / 8] >> shiftPosn) & 0x1; |
| bitsLeft -= 1; |
| } else { |
| data[i] = 0; |
| } |
| } |
| } else { |
| for(int64_t i=static_cast<int64_t>(numValues) - 1; |
| i >= static_cast<int64_t>(position); --i, --bitsLeft) { |
| uint64_t shiftPosn = (-bitsLeft) % 8; |
| data[i] = (data[position + (bitsLeft - 1) / 8] >> shiftPosn) & 0x1; |
| } |
| } |
| } |
| } |
| |
| std::unique_ptr<ByteRleDecoder> createBooleanRleDecoder |
| (std::unique_ptr<SeekableInputStream> input) { |
| BooleanRleDecoderImpl* decoder = |
| new BooleanRleDecoderImpl(std::move(input)); |
| return std::unique_ptr<ByteRleDecoder>( |
| reinterpret_cast<ByteRleDecoder*>(decoder)); |
| } |
| } |