blob: b29ee8706f4672963f6e18a91bbb02b1dea27dab [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "RLEv1.hh"
#include "RLEv2.hh"
#include "orc/Exceptions.hh"
namespace orc {
RleEncoder::~RleEncoder() {
// PASS
}
RleDecoder::~RleDecoder() {
// PASS
}
std::unique_ptr<RleEncoder> createRleEncoder(std::unique_ptr<BufferedOutputStream> output,
bool isSigned, RleVersion version, MemoryPool&,
bool alignedBitpacking) {
switch (static_cast<int64_t>(version)) {
case RleVersion_1:
return std::make_unique<RleEncoderV1>(std::move(output), isSigned);
case RleVersion_2:
return std::make_unique<RleEncoderV2>(std::move(output), isSigned, alignedBitpacking);
default:
throw NotImplementedYet("Not implemented yet");
}
}
std::unique_ptr<RleDecoder> createRleDecoder(std::unique_ptr<SeekableInputStream> input,
bool isSigned, RleVersion version, MemoryPool& pool,
ReaderMetrics* metrics) {
switch (static_cast<int64_t>(version)) {
case RleVersion_1:
return std::make_unique<RleDecoderV1>(std::move(input), isSigned, metrics);
case RleVersion_2:
return std::make_unique<RleDecoderV2>(std::move(input), isSigned, pool, metrics);
default:
throw NotImplementedYet("Not implemented yet");
}
}
template <typename T>
void RleEncoder::add(const T* data, uint64_t numValues, const char* notNull) {
for (uint64_t i = 0; i < numValues; ++i) {
if (!notNull || notNull[i]) {
write(static_cast<int64_t>(data[i]));
}
}
}
void RleEncoder::add(const int64_t* data, uint64_t numValues, const char* notNull) {
add<int64_t>(data, numValues, notNull);
}
void RleEncoder::add(const int32_t* data, uint64_t numValues, const char* notNull) {
add<int32_t>(data, numValues, notNull);
}
void RleEncoder::add(const int16_t* data, uint64_t numValues, const char* notNull) {
add<int16_t>(data, numValues, notNull);
}
void RleEncoder::writeVslong(int64_t val) {
writeVulong((static_cast<uint64_t>(val) << 1) ^ (val >> 63));
}
void RleEncoder::writeVulong(int64_t val) {
while (true) {
if ((val & ~0x7f) == 0) {
writeByte(static_cast<char>(val));
return;
} else {
writeByte(static_cast<char>(0x80 | (val & 0x7f)));
// cast val to unsigned so as to force 0-fill right shift
val = (static_cast<uint64_t>(val) >> 7);
}
}
}
void RleEncoder::writeByte(char c) {
if (bufferPosition == bufferLength) {
int addedSize = 0;
if (!outputStream->Next(reinterpret_cast<void**>(&buffer), &addedSize)) {
throw std::bad_alloc();
}
bufferPosition = 0;
bufferLength = static_cast<size_t>(addedSize);
}
buffer[bufferPosition++] = c;
}
void RleEncoder::recordPosition(PositionRecorder* recorder) const {
uint64_t flushedSize = outputStream->getSize();
uint64_t unusedBufferSize = static_cast<uint64_t>(bufferLength - bufferPosition);
if (outputStream->isCompressed()) {
recorder->add(flushedSize);
// There are multiple blocks in the input buffer, but bufferPosition only records the
// effective length of the last block. We need rawInputBufferSize to record the total length
// of all variable blocks.
recorder->add(outputStream->getRawInputBufferSize() - unusedBufferSize);
} else {
recorder->add(flushedSize - unusedBufferSize);
}
recorder->add(static_cast<uint64_t>(numLiterals));
}
void RleEncoder::finishEncode() {
outputStream->BackUp(static_cast<int>(bufferLength - bufferPosition));
outputStream->finishStream();
bufferLength = bufferPosition = 0;
}
} // namespace orc