blob: 724a43a81e92b303a581ebfc8bfac690ac13850c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <immintrin.h>
#include <string.h>
#include <algorithm>
#include <iostream>
#include <utility>
#include "storage/format/orc/byte-rle.h"
#include "storage/format/orc/exceptions.h"
namespace orc {
const size_t MINIMUM_REPEAT = 3;
ByteRleDecoder::~ByteRleDecoder() {
// PASS
}
void ByteRleDecoderImpl::nextBuffer() {
int bufferLength = 0;
const void *bufferPointer = nullptr;
bool result = inputStream->Next(&bufferPointer, &bufferLength);
if (!result) {
LOG_ERROR(ERRCODE_INTERNAL_ERROR, "bad read in nextBuffer");
}
bufferStart = static_cast<const char *>(bufferPointer);
bufferEnd = bufferStart + bufferLength;
}
signed char ByteRleDecoderImpl::readByte() {
if (bufferStart == bufferEnd) {
nextBuffer();
}
return *(bufferStart++);
}
void ByteRleDecoderImpl::readHeader() {
signed char ch = readByte();
if (ch < 0) {
remainingValues = static_cast<size_t>(-ch);
repeating = false;
} else {
remainingValues = static_cast<size_t>(ch) + MINIMUM_REPEAT;
repeating = true;
value = readByte();
}
}
ByteRleDecoderImpl::ByteRleDecoderImpl(
std::unique_ptr<SeekableInputStream> input) {
inputStream = std::move(input);
repeating = false;
remainingValues = 0;
value = 0;
bufferStart = 0;
bufferEnd = 0;
}
ByteRleDecoderImpl::~ByteRleDecoderImpl() {
// PASS
}
void ByteRleDecoderImpl::seek(PositionProvider &location) {
// move the input stream
inputStream->seek(location);
// force a re-read from the stream
bufferEnd = bufferStart;
// read a new header
readHeader();
// skip ahead the given number of records
ByteRleDecoderImpl::skip(location.next());
}
void ByteRleDecoderImpl::skip(uint64_t numValues) {
while (numValues > 0) {
if (remainingValues == 0) {
readHeader();
}
size_t count = std::min(static_cast<size_t>(numValues), remainingValues);
remainingValues -= count;
numValues -= count;
// for literals we need to skip over count bytes, which may involve
// reading from the underlying stream
if (!repeating) {
size_t consumedBytes = count;
while (consumedBytes > 0) {
if (bufferStart == bufferEnd) {
nextBuffer();
}
size_t skipSize =
std::min(static_cast<size_t>(consumedBytes),
static_cast<size_t>(bufferEnd - bufferStart));
bufferStart += skipSize;
consumedBytes -= skipSize;
}
}
}
}
void ByteRleDecoderImpl::next(char *data, uint64_t numValues,
const char *notNull) {
uint64_t position = 0;
// skip over null values
while (notNull && position < numValues && !notNull[position]) {
position += 1;
}
while (position < numValues) {
// if we are out of values, read more
if (remainingValues == 0) {
readHeader();
}
// how many do we read out of this block?
size_t count =
std::min(static_cast<size_t>(numValues - position), remainingValues);
uint64_t consumed = 0;
if (repeating) {
if (notNull) {
for (uint64_t i = 0; i < count; ++i) {
if (notNull[position + i]) {
data[position + i] = value;
consumed += 1;
}
}
} else {
memset(data + position, value, count);
consumed = count;
}
} else {
if (notNull) {
for (uint64_t i = 0; i < count; ++i) {
if (notNull[position + i]) {
data[position + i] = readByte();
consumed += 1;
}
}
} else {
uint64_t i = 0;
while (i < count) {
if (bufferStart == bufferEnd) {
nextBuffer();
}
uint64_t copyBytes =
std::min(static_cast<uint64_t>(count - i),
static_cast<uint64_t>(bufferEnd - bufferStart));
memcpy(data + position + i, bufferStart, copyBytes);
bufferStart += copyBytes;
i += copyBytes;
}
consumed = count;
}
}
remainingValues -= consumed;
position += count;
// skip over any null values
while (notNull && position < numValues && !notNull[position]) {
position += 1;
}
}
}
std::unique_ptr<ByteRleDecoder> createByteRleDecoder(
std::unique_ptr<SeekableInputStream> input) {
return std::unique_ptr<ByteRleDecoder>(
new ByteRleDecoderImpl(std::move(input)));
}
class BooleanRleDecoderImpl : public ByteRleDecoderImpl {
public:
explicit BooleanRleDecoderImpl(std::unique_ptr<SeekableInputStream> input);
virtual ~BooleanRleDecoderImpl();
void seek(PositionProvider &) override;
void skip(uint64_t numValues) override;
void next(char *data, uint64_t numValues, const char *notNull) override;
protected:
size_t remainingBits = 0;
char lastByte = 0;
};
BooleanRleDecoderImpl::BooleanRleDecoderImpl(
std::unique_ptr<SeekableInputStream> input)
: ByteRleDecoderImpl(std::move(input)) {}
BooleanRleDecoderImpl::~BooleanRleDecoderImpl() {
// PASS
}
void BooleanRleDecoderImpl::seek(PositionProvider &location) {
ByteRleDecoderImpl::seek(location);
uint64_t consumed = location.next();
if (consumed > 8) {
throw ParseError("bad position");
}
if (consumed != 0) {
remainingBits = 8 - consumed;
ByteRleDecoderImpl::next(&lastByte, 1, nullptr);
}
}
void BooleanRleDecoderImpl::skip(uint64_t numValues) {
if (numValues <= remainingBits) {
remainingBits -= numValues;
} else {
numValues -= remainingBits;
uint64_t bytesSkipped = numValues / 8;
ByteRleDecoderImpl::skip(bytesSkipped);
ByteRleDecoderImpl::next(&lastByte, 1, nullptr);
remainingBits = 8 - (numValues % 8);
}
}
void BooleanRleDecoderImpl::next(char *__restrict__ data, uint64_t numValues,
const char *__restrict__ notNull) {
// next spot to fill in
uint64_t position = 0;
// use up any remaining bits
if (notNull) {
while (remainingBits > 0 && position < numValues) {
if (notNull[position]) {
remainingBits -= 1;
data[position] =
(static_cast<unsigned char>(lastByte) >> remainingBits) & 0x1;
} else {
data[position] = 0;
}
position += 1;
}
} else {
while (remainingBits > 0 && position < numValues) {
remainingBits -= 1;
data[position++] =
(static_cast<unsigned char>(lastByte) >> remainingBits) & 0x1;
}
}
// count the number of nonNulls remaining
uint64_t nonNulls = numValues - position;
if (notNull) {
for (uint64_t i = position; i < numValues; ++i) {
if (!notNull[i]) {
nonNulls -= 1;
}
}
}
// fill in the remaining values
if (nonNulls == 0) {
while (position < numValues) {
data[position++] = 0;
}
} else if (position < numValues) {
// read the new bytes into the array
uint64_t bytesRead = (nonNulls + 7) / 8;
ByteRleDecoderImpl::next(data + position, bytesRead, nullptr);
lastByte = data[position + bytesRead - 1];
remainingBits = bytesRead * 8 - nonNulls;
// expand the array backwards so that we don't clobber the data
uint64_t bitsLeft = bytesRead * 8 - remainingBits;
if (notNull) {
for (int64_t i = static_cast<int64_t>(numValues) - 1;
i >= static_cast<int64_t>(position); --i) {
if (notNull[i]) {
uint64_t shiftPosn = (-bitsLeft) % 8;
data[i] = (data[position + (bitsLeft - 1) / 8] >> shiftPosn) & 0x1;
bitsLeft -= 1;
} else {
data[i] = 0;
}
}
} else {
// performance: edit the code below carefully
const char *__restrict__ dataSrc = data;
int64_t i = static_cast<int64_t>(numValues) - 1;
#ifdef AVX_OPT
int64_t positionEnd = i - (i - position + 1) % 16;
assert((positionEnd - position + 1) % 16 == 0);
#else
int64_t positionEnd = static_cast<int64_t>(position) - 1;
#endif
// step 1: remove the back element to align to 16 byte e.g. 128 bit
for (; i > positionEnd;) {
uint8_t shiftPosn = (-bitsLeft) % 8;
data[i] = (dataSrc[position + (bitsLeft - 1) / 8] >> shiftPosn) & 0x1;
--i, --bitsLeft;
if (shiftPosn == 7) break;
}
for (; i - 7 > positionEnd; i -= 8, bitsLeft -= 8) {
char tmpDataSrc = dataSrc[position + (bitsLeft - 1) / 8];
uint64_t tmpBuf;
#ifdef IS_BIG_ENDIAN
#pragma clang loop unroll(full)
for (int8_t shiftPosn = 7; shiftPosn >= 0; shiftPosn--) {
tmpBuf <<= 8;
tmpBuf |= (char)(tmpDataSrc >> shiftPosn) & 0x1;
}
#else
#pragma clang loop unroll(full)
for (int8_t shiftPosn = 0; shiftPosn <= 7; shiftPosn++) {
tmpBuf <<= 8;
tmpBuf |= (char)(tmpDataSrc >> shiftPosn) & 0x1;
}
#endif
uint64_t *tmpPtr = (uint64_t *)&data[i - 7];
*tmpPtr = tmpBuf;
}
// end of step 1
#ifdef AVX_OPT
// step 2: simd
// intel cpus are all little endian
// 2 bytes src e.g. 16 bits expand to 16 bytes e.g. 128 bits
// todo: there could be more specific version for avx2, avx512
__m128i *tmpPtr = (__m128i *)&data[i - 15];
if ((uint64_t)tmpPtr % 16 == 0) {
// _mm128_store_si128 require aligned, otherwise exception
__m128i mask = _mm_set1_epi8(0x1);
for (; i - 15 >= static_cast<int64_t>(position);
i -= 16, bitsLeft -= 16) {
const char *tds = &dataSrc[position + (bitsLeft - 1) / 8 - 1];
__m128i src = _mm_set_epi8(0, 0, 0, 0, 0, 0, 0, tds[1], 0, 0, 0, 0, 0,
0, 0, tds[0]);
// high to low in register
__m128i res = _mm_set1_epi8(0x0);
{
__m128i tmp;
// pay attention to shift right logically
tmp = _mm_slli_si128(src, 0); // shift the byte
tmp = _mm_srli_epi64(tmp, 7); // shift the bit
res = _mm_or_si128(res, tmp);
tmp = _mm_slli_si128(src, 1); // shift the byte
tmp = _mm_srli_epi64(tmp, 6); // shift the bit
res = _mm_or_si128(res, tmp);
tmp = _mm_slli_si128(src, 2); // shift the byte
tmp = _mm_srli_epi64(tmp, 5); // shift the bit
res = _mm_or_si128(res, tmp);
tmp = _mm_slli_si128(src, 3); // shift the byte
tmp = _mm_srli_epi64(tmp, 4); // shift the bit
res = _mm_or_si128(res, tmp);
tmp = _mm_slli_si128(src, 4); // shift the byte
tmp = _mm_srli_epi64(tmp, 3); // shift the bit
res = _mm_or_si128(res, tmp);
tmp = _mm_slli_si128(src, 5); // shift the byte
tmp = _mm_srli_epi64(tmp, 2); // shift the bit
res = _mm_or_si128(res, tmp);
tmp = _mm_slli_si128(src, 6); // shift the byte
tmp = _mm_srli_epi64(tmp, 1); // shift the bit
res = _mm_or_si128(res, tmp);
tmp = _mm_slli_si128(src, 7); // shift the byte
tmp = _mm_srli_epi64(tmp, 0); // shift the bit
res = _mm_or_si128(res, tmp);
}
res = _mm_and_si128(res, mask);
__m128i *tmpPtr = (__m128i *)&data[i - 15];
_mm_storeu_si128(tmpPtr, res);
}
} else { // address not aligned
int64_t positionEnd = static_cast<int64_t>(position) - 1;
for (; i > positionEnd;) {
uint8_t shiftPosn = (-bitsLeft) % 8;
data[i] = (dataSrc[position + (bitsLeft - 1) / 8] >> shiftPosn) & 0x1;
--i, --bitsLeft;
if (shiftPosn == 7) break;
}
for (; i - 7 > positionEnd; i -= 8, bitsLeft -= 8) {
char tmpDataSrc = dataSrc[position + (bitsLeft - 1) / 8];
uint64_t tmpBuf;
#pragma clang loop unroll(full)
for (int8_t shiftPosn = 0; shiftPosn <= 7; shiftPosn++) {
tmpBuf <<= 8;
tmpBuf |= (char)(tmpDataSrc >> shiftPosn) & 0x1;
}
uint64_t *tmpPtr = (uint64_t *)&data[i - 7];
*tmpPtr = tmpBuf;
}
}
#endif
assert(bitsLeft == 0);
}
}
}
std::unique_ptr<ByteRleDecoder> createBooleanRleDecoder(
std::unique_ptr<SeekableInputStream> input) {
BooleanRleDecoderImpl *decoder = new BooleanRleDecoderImpl(std::move(input));
return std::unique_ptr<ByteRleDecoder>(
reinterpret_cast<ByteRleDecoder *>(decoder));
}
std::unique_ptr<ByteRleCoder> createByteRleCoder(CompressionKind kind) {
std::unique_ptr<ByteRleCoder> coder(
new ByteRleCoder(createBlockCompressor(kind)));
return std::move(coder);
}
BooleanRleEncoderImpl::BooleanRleEncoderImpl(
std::unique_ptr<SeekableOutputStream> output)
: ByteRleCoder(std::move(output)) {
bitsRemained = 8;
current = static_cast<char>(0);
}
BooleanRleEncoderImpl::~BooleanRleEncoderImpl() {}
void BooleanRleEncoderImpl::write(const char *data, uint64_t numValues,
const char *notNull) {
for (uint64_t i = 0; i < numValues; ++i) {
if (bitsRemained == 0) {
ByteRleCoder::write(current);
current = static_cast<char>(0);
bitsRemained = 8;
}
if (!notNull || notNull[i]) {
if (!data || data[i]) {
current = static_cast<char>(current | (0x80 >> (8 - bitsRemained)));
}
--bitsRemained;
}
}
if (bitsRemained == 0) {
ByteRleCoder::write(current);
current = static_cast<char>(0);
bitsRemained = 8;
}
}
void BooleanRleEncoderImpl::flush() {
if (bitsRemained != 8) {
ByteRleCoder::write(current);
}
bitsRemained = 8;
current = static_cast<char>(0);
ByteRleCoder::flush();
}
void BooleanRleEncoderImpl::flushToStream(OutputStream *stream) {
flush();
ByteRleCoder::flushToStream(stream);
}
std::unique_ptr<BooleanRleEncoderImpl> createBooleanRleEncoderImpl(
CompressionKind kind) {
std::unique_ptr<BooleanRleEncoderImpl> coder(
new BooleanRleEncoderImpl(createBlockCompressor(kind)));
return std::move(coder);
}
} // namespace orc