blob: 719a4a07ef2ff2594768c0bc6a394922ee88039a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef STORAGE_SRC_STORAGE_FORMAT_ORC_BYTE_RLE_H_
#define STORAGE_SRC_STORAGE_FORMAT_ORC_BYTE_RLE_H_
#include <memory>
#include <vector>
#include "storage/format/orc/rle.h"
#include "storage/format/orc/seekable-input-stream.h"
#include "storage/format/orc/seekable-output-stream.h"
namespace orc {
class ByteRleDecoder {
public:
virtual ~ByteRleDecoder();
// Seek to a particular spot.
// @param pos The position to seek
// @return void
virtual void seek(PositionProvider& pos) = 0; // NOLINT
// Seek over a given number of values.
// @param numValues Skip the number of values
// @return void
virtual void skip(uint64_t numValues) = 0;
// Read a number of values into the batch.
// @param data the array to read into
// @param numValues the number of values to read
// @param notNull If the pointer is null, all values are read. If the
// pointer is not null, positions that are false are skipped.
virtual void next(char* data, uint64_t numValues, const char* notNull) = 0;
};
// Create a byte RLE decoder.
// @param input the input stream to read from
// @return The decoder
std::unique_ptr<ByteRleDecoder> createByteRleDecoder(
std::unique_ptr<SeekableInputStream> input);
// Create a boolean RLE decoder.
// Unlike the other RLE decoders, the boolean decoder sets the data to 0
// if the value is masked by notNull. This is required for the notNull stream
// processing to properly apply multiple masks from nested types.
// @param input the input stream to read from
// @return The boolean RLE decoder
std::unique_ptr<ByteRleDecoder> createBooleanRleDecoder(
std::unique_ptr<SeekableInputStream> input);
class ByteRleDecoderImpl : public ByteRleDecoder {
public:
explicit ByteRleDecoderImpl(std::unique_ptr<SeekableInputStream> input);
virtual ~ByteRleDecoderImpl();
void seek(PositionProvider&) override;
void skip(uint64_t numValues) override;
void next(char* data, uint64_t numValues, const char* notNull) override;
protected:
inline void nextBuffer();
inline signed char readByte();
inline void readHeader();
std::unique_ptr<SeekableInputStream> inputStream;
size_t remainingValues;
char value;
const char* bufferStart;
const char* bufferEnd;
bool repeating;
};
// Run length byte encoder. A control byte is written before
// each run with positive values 0 to 127 meaning 2 to 129 repetitions. If the
// bytes is -1 to -128, 1 to 128 literal byte values follow.
class ByteRleCoder : public RleCoder {
public:
explicit ByteRleCoder(std::unique_ptr<SeekableOutputStream> stream)
: output(std::move(stream)), literals(MAX_LITERAL_SIZE) {}
~ByteRleCoder() {}
void flushToStream(OutputStream* os) override {
writeValues();
output->flushToStream(os);
}
void flush() { writeValues(); }
uint64_t getStreamSize() override { return output->getStreamSize(); }
void reset() override {
output->reset();
repeat = false;
tailRunLength = 0;
numLiterals = 0;
}
uint64_t getEstimatedSpaceNeeded() override {
// This is the maximal space used.
// It might not be accurate.
return output->getEstimatedSpaceNeeded() + sizeof(int8_t) * numLiterals +
sizeof(int8_t) /* control byte*/;
}
void write(void* data, uint64_t numValues, const char* notNull) override {
int8_t* d = reinterpret_cast<int8_t*>(data);
if (notNull) {
for (uint64_t i = 0; i < numValues; i++) {
if (notNull[i]) {
write(d[i]);
}
}
} else {
for (uint64_t i = 0; i < numValues; i++) {
write(d[i]);
}
}
}
void write(int8_t value) {
if (numLiterals == 0) {
literals[numLiterals++] = value;
tailRunLength = 1;
} else if (repeat) {
if (value == literals[0]) {
numLiterals += 1;
if (numLiterals == MAX_REPEAT_SIZE) {
writeValues();
}
} else {
writeValues();
literals[numLiterals++] = value;
tailRunLength = 1;
}
} else {
if (value == literals[numLiterals - 1]) {
tailRunLength += 1;
} else {
tailRunLength = 1;
}
if (tailRunLength == MIN_REPEAT_SIZE) {
if (numLiterals + 1 == MIN_REPEAT_SIZE) {
repeat = true;
numLiterals += 1;
} else {
numLiterals -= MIN_REPEAT_SIZE - 1;
writeValues();
literals[0] = value;
repeat = true;
numLiterals = MIN_REPEAT_SIZE;
}
} else {
literals[numLiterals++] = value;
if (numLiterals == MAX_LITERAL_SIZE) {
writeValues();
}
}
}
}
private:
void writeValues() {
if (numLiterals != 0) {
if (repeat) {
assert(numLiterals - MIN_REPEAT_SIZE >= 0);
output->write<int8_t>(numLiterals - MIN_REPEAT_SIZE);
output->write(reinterpret_cast<const char*>(literals.data()),
sizeof(int8_t) * 1);
} else {
output->write<int8_t>(-numLiterals);
output->write(reinterpret_cast<const char*>(literals.data()),
numLiterals * sizeof(int8_t));
}
repeat = false;
tailRunLength = 0;
numLiterals = 0;
}
}
private:
const int32_t MIN_REPEAT_SIZE = 3;
const int32_t MAX_LITERAL_SIZE = 128;
const int32_t MAX_REPEAT_SIZE = 127 + MIN_REPEAT_SIZE;
std::unique_ptr<SeekableOutputStream> output;
std::vector<int8_t> literals;
int32_t numLiterals = 0;
bool repeat = false;
int32_t tailRunLength = 0;
};
// Create a byte RLE coder.
// @param output The output stream to write to
// @return The coder
std::unique_ptr<ByteRleCoder> createByteRleCoder(CompressionKind kind);
class BooleanRleEncoderImpl : public ByteRleCoder {
public:
BooleanRleEncoderImpl(std::unique_ptr<SeekableOutputStream> output);
virtual ~BooleanRleEncoderImpl() override;
virtual void write(const char* data, uint64_t numValues, const char* notNull);
virtual void flush();
virtual void flushToStream(OutputStream* stream) override;
private:
int bitsRemained;
char current;
};
std::unique_ptr<BooleanRleEncoderImpl> createBooleanRleEncoderImpl(
CompressionKind kind);
} // end of namespace orc
#endif // STORAGE_SRC_STORAGE_FORMAT_ORC_BYTE_RLE_H_