blob: 724a43a81e92b303a581ebfc8bfac690ac13850c [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
#include <immintrin.h>
#include <string.h>
#include <algorithm>
#include <iostream>
#include <utility>
#include "storage/format/orc/byte-rle.h"
#include "storage/format/orc/exceptions.h"
namespace orc {
const size_t MINIMUM_REPEAT = 3;
ByteRleDecoder::~ByteRleDecoder() {
void ByteRleDecoderImpl::nextBuffer() {
int bufferLength = 0;
const void *bufferPointer = nullptr;
bool result = inputStream->Next(&bufferPointer, &bufferLength);
if (!result) {
LOG_ERROR(ERRCODE_INTERNAL_ERROR, "bad read in nextBuffer");
bufferStart = static_cast<const char *>(bufferPointer);
bufferEnd = bufferStart + bufferLength;
signed char ByteRleDecoderImpl::readByte() {
if (bufferStart == bufferEnd) {
return *(bufferStart++);
void ByteRleDecoderImpl::readHeader() {
signed char ch = readByte();
if (ch < 0) {
remainingValues = static_cast<size_t>(-ch);
repeating = false;
} else {
remainingValues = static_cast<size_t>(ch) + MINIMUM_REPEAT;
repeating = true;
value = readByte();
std::unique_ptr<SeekableInputStream> input) {
inputStream = std::move(input);
repeating = false;
remainingValues = 0;
value = 0;
bufferStart = 0;
bufferEnd = 0;
ByteRleDecoderImpl::~ByteRleDecoderImpl() {
void ByteRleDecoderImpl::seek(PositionProvider &location) {
// move the input stream
// force a re-read from the stream
bufferEnd = bufferStart;
// read a new header
// skip ahead the given number of records
void ByteRleDecoderImpl::skip(uint64_t numValues) {
while (numValues > 0) {
if (remainingValues == 0) {
size_t count = std::min(static_cast<size_t>(numValues), remainingValues);
remainingValues -= count;
numValues -= count;
// for literals we need to skip over count bytes, which may involve
// reading from the underlying stream
if (!repeating) {
size_t consumedBytes = count;
while (consumedBytes > 0) {
if (bufferStart == bufferEnd) {
size_t skipSize =
static_cast<size_t>(bufferEnd - bufferStart));
bufferStart += skipSize;
consumedBytes -= skipSize;
void ByteRleDecoderImpl::next(char *data, uint64_t numValues,
const char *notNull) {
uint64_t position = 0;
// skip over null values
while (notNull && position < numValues && !notNull[position]) {
position += 1;
while (position < numValues) {
// if we are out of values, read more
if (remainingValues == 0) {
// how many do we read out of this block?
size_t count =
std::min(static_cast<size_t>(numValues - position), remainingValues);
uint64_t consumed = 0;
if (repeating) {
if (notNull) {
for (uint64_t i = 0; i < count; ++i) {
if (notNull[position + i]) {
data[position + i] = value;
consumed += 1;
} else {
memset(data + position, value, count);
consumed = count;
} else {
if (notNull) {
for (uint64_t i = 0; i < count; ++i) {
if (notNull[position + i]) {
data[position + i] = readByte();
consumed += 1;
} else {
uint64_t i = 0;
while (i < count) {
if (bufferStart == bufferEnd) {
uint64_t copyBytes =
std::min(static_cast<uint64_t>(count - i),
static_cast<uint64_t>(bufferEnd - bufferStart));
memcpy(data + position + i, bufferStart, copyBytes);
bufferStart += copyBytes;
i += copyBytes;
consumed = count;
remainingValues -= consumed;
position += count;
// skip over any null values
while (notNull && position < numValues && !notNull[position]) {
position += 1;
std::unique_ptr<ByteRleDecoder> createByteRleDecoder(
std::unique_ptr<SeekableInputStream> input) {
return std::unique_ptr<ByteRleDecoder>(
new ByteRleDecoderImpl(std::move(input)));
class BooleanRleDecoderImpl : public ByteRleDecoderImpl {
explicit BooleanRleDecoderImpl(std::unique_ptr<SeekableInputStream> input);
virtual ~BooleanRleDecoderImpl();
void seek(PositionProvider &) override;
void skip(uint64_t numValues) override;
void next(char *data, uint64_t numValues, const char *notNull) override;
size_t remainingBits = 0;
char lastByte = 0;
std::unique_ptr<SeekableInputStream> input)
: ByteRleDecoderImpl(std::move(input)) {}
BooleanRleDecoderImpl::~BooleanRleDecoderImpl() {
void BooleanRleDecoderImpl::seek(PositionProvider &location) {
uint64_t consumed =;
if (consumed > 8) {
throw ParseError("bad position");
if (consumed != 0) {
remainingBits = 8 - consumed;
ByteRleDecoderImpl::next(&lastByte, 1, nullptr);
void BooleanRleDecoderImpl::skip(uint64_t numValues) {
if (numValues <= remainingBits) {
remainingBits -= numValues;
} else {
numValues -= remainingBits;
uint64_t bytesSkipped = numValues / 8;
ByteRleDecoderImpl::next(&lastByte, 1, nullptr);
remainingBits = 8 - (numValues % 8);
void BooleanRleDecoderImpl::next(char *__restrict__ data, uint64_t numValues,
const char *__restrict__ notNull) {
// next spot to fill in
uint64_t position = 0;
// use up any remaining bits
if (notNull) {
while (remainingBits > 0 && position < numValues) {
if (notNull[position]) {
remainingBits -= 1;
data[position] =
(static_cast<unsigned char>(lastByte) >> remainingBits) & 0x1;
} else {
data[position] = 0;
position += 1;
} else {
while (remainingBits > 0 && position < numValues) {
remainingBits -= 1;
data[position++] =
(static_cast<unsigned char>(lastByte) >> remainingBits) & 0x1;
// count the number of nonNulls remaining
uint64_t nonNulls = numValues - position;
if (notNull) {
for (uint64_t i = position; i < numValues; ++i) {
if (!notNull[i]) {
nonNulls -= 1;
// fill in the remaining values
if (nonNulls == 0) {
while (position < numValues) {
data[position++] = 0;
} else if (position < numValues) {
// read the new bytes into the array
uint64_t bytesRead = (nonNulls + 7) / 8;
ByteRleDecoderImpl::next(data + position, bytesRead, nullptr);
lastByte = data[position + bytesRead - 1];
remainingBits = bytesRead * 8 - nonNulls;
// expand the array backwards so that we don't clobber the data
uint64_t bitsLeft = bytesRead * 8 - remainingBits;
if (notNull) {
for (int64_t i = static_cast<int64_t>(numValues) - 1;
i >= static_cast<int64_t>(position); --i) {
if (notNull[i]) {
uint64_t shiftPosn = (-bitsLeft) % 8;
data[i] = (data[position + (bitsLeft - 1) / 8] >> shiftPosn) & 0x1;
bitsLeft -= 1;
} else {
data[i] = 0;
} else {
// performance: edit the code below carefully
const char *__restrict__ dataSrc = data;
int64_t i = static_cast<int64_t>(numValues) - 1;
#ifdef AVX_OPT
int64_t positionEnd = i - (i - position + 1) % 16;
assert((positionEnd - position + 1) % 16 == 0);
int64_t positionEnd = static_cast<int64_t>(position) - 1;
// step 1: remove the back element to align to 16 byte e.g. 128 bit
for (; i > positionEnd;) {
uint8_t shiftPosn = (-bitsLeft) % 8;
data[i] = (dataSrc[position + (bitsLeft - 1) / 8] >> shiftPosn) & 0x1;
--i, --bitsLeft;
if (shiftPosn == 7) break;
for (; i - 7 > positionEnd; i -= 8, bitsLeft -= 8) {
char tmpDataSrc = dataSrc[position + (bitsLeft - 1) / 8];
uint64_t tmpBuf;
#pragma clang loop unroll(full)
for (int8_t shiftPosn = 7; shiftPosn >= 0; shiftPosn--) {
tmpBuf <<= 8;
tmpBuf |= (char)(tmpDataSrc >> shiftPosn) & 0x1;
#pragma clang loop unroll(full)
for (int8_t shiftPosn = 0; shiftPosn <= 7; shiftPosn++) {
tmpBuf <<= 8;
tmpBuf |= (char)(tmpDataSrc >> shiftPosn) & 0x1;
uint64_t *tmpPtr = (uint64_t *)&data[i - 7];
*tmpPtr = tmpBuf;
// end of step 1
#ifdef AVX_OPT
// step 2: simd
// intel cpus are all little endian
// 2 bytes src e.g. 16 bits expand to 16 bytes e.g. 128 bits
// todo: there could be more specific version for avx2, avx512
__m128i *tmpPtr = (__m128i *)&data[i - 15];
if ((uint64_t)tmpPtr % 16 == 0) {
// _mm128_store_si128 require aligned, otherwise exception
__m128i mask = _mm_set1_epi8(0x1);
for (; i - 15 >= static_cast<int64_t>(position);
i -= 16, bitsLeft -= 16) {
const char *tds = &dataSrc[position + (bitsLeft - 1) / 8 - 1];
__m128i src = _mm_set_epi8(0, 0, 0, 0, 0, 0, 0, tds[1], 0, 0, 0, 0, 0,
0, 0, tds[0]);
// high to low in register
__m128i res = _mm_set1_epi8(0x0);
__m128i tmp;
// pay attention to shift right logically
tmp = _mm_slli_si128(src, 0); // shift the byte
tmp = _mm_srli_epi64(tmp, 7); // shift the bit
res = _mm_or_si128(res, tmp);
tmp = _mm_slli_si128(src, 1); // shift the byte
tmp = _mm_srli_epi64(tmp, 6); // shift the bit
res = _mm_or_si128(res, tmp);
tmp = _mm_slli_si128(src, 2); // shift the byte
tmp = _mm_srli_epi64(tmp, 5); // shift the bit
res = _mm_or_si128(res, tmp);
tmp = _mm_slli_si128(src, 3); // shift the byte
tmp = _mm_srli_epi64(tmp, 4); // shift the bit
res = _mm_or_si128(res, tmp);
tmp = _mm_slli_si128(src, 4); // shift the byte
tmp = _mm_srli_epi64(tmp, 3); // shift the bit
res = _mm_or_si128(res, tmp);
tmp = _mm_slli_si128(src, 5); // shift the byte
tmp = _mm_srli_epi64(tmp, 2); // shift the bit
res = _mm_or_si128(res, tmp);
tmp = _mm_slli_si128(src, 6); // shift the byte
tmp = _mm_srli_epi64(tmp, 1); // shift the bit
res = _mm_or_si128(res, tmp);
tmp = _mm_slli_si128(src, 7); // shift the byte
tmp = _mm_srli_epi64(tmp, 0); // shift the bit
res = _mm_or_si128(res, tmp);
res = _mm_and_si128(res, mask);
__m128i *tmpPtr = (__m128i *)&data[i - 15];
_mm_storeu_si128(tmpPtr, res);
} else { // address not aligned
int64_t positionEnd = static_cast<int64_t>(position) - 1;
for (; i > positionEnd;) {
uint8_t shiftPosn = (-bitsLeft) % 8;
data[i] = (dataSrc[position + (bitsLeft - 1) / 8] >> shiftPosn) & 0x1;
--i, --bitsLeft;
if (shiftPosn == 7) break;
for (; i - 7 > positionEnd; i -= 8, bitsLeft -= 8) {
char tmpDataSrc = dataSrc[position + (bitsLeft - 1) / 8];
uint64_t tmpBuf;
#pragma clang loop unroll(full)
for (int8_t shiftPosn = 0; shiftPosn <= 7; shiftPosn++) {
tmpBuf <<= 8;
tmpBuf |= (char)(tmpDataSrc >> shiftPosn) & 0x1;
uint64_t *tmpPtr = (uint64_t *)&data[i - 7];
*tmpPtr = tmpBuf;
assert(bitsLeft == 0);
std::unique_ptr<ByteRleDecoder> createBooleanRleDecoder(
std::unique_ptr<SeekableInputStream> input) {
BooleanRleDecoderImpl *decoder = new BooleanRleDecoderImpl(std::move(input));
return std::unique_ptr<ByteRleDecoder>(
reinterpret_cast<ByteRleDecoder *>(decoder));
std::unique_ptr<ByteRleCoder> createByteRleCoder(CompressionKind kind) {
std::unique_ptr<ByteRleCoder> coder(
new ByteRleCoder(createBlockCompressor(kind)));
return std::move(coder);
std::unique_ptr<SeekableOutputStream> output)
: ByteRleCoder(std::move(output)) {
bitsRemained = 8;
current = static_cast<char>(0);
BooleanRleEncoderImpl::~BooleanRleEncoderImpl() {}
void BooleanRleEncoderImpl::write(const char *data, uint64_t numValues,
const char *notNull) {
for (uint64_t i = 0; i < numValues; ++i) {
if (bitsRemained == 0) {
current = static_cast<char>(0);
bitsRemained = 8;
if (!notNull || notNull[i]) {
if (!data || data[i]) {
current = static_cast<char>(current | (0x80 >> (8 - bitsRemained)));
if (bitsRemained == 0) {
current = static_cast<char>(0);
bitsRemained = 8;
void BooleanRleEncoderImpl::flush() {
if (bitsRemained != 8) {
bitsRemained = 8;
current = static_cast<char>(0);
void BooleanRleEncoderImpl::flushToStream(OutputStream *stream) {
std::unique_ptr<BooleanRleEncoderImpl> createBooleanRleEncoderImpl(
CompressionKind kind) {
std::unique_ptr<BooleanRleEncoderImpl> coder(
new BooleanRleEncoderImpl(createBlockCompressor(kind)));
return std::move(coder);
} // namespace orc