blob: 1058eb0998f2d64bc67ad8d4402ba3830379512b [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
// Make use of bitshuffle and lz4 to encode the fixed size
// type blocks, such as UINT8, INT8, UINT16, INT16,
// UINT32, INT32, FLOAT, DOUBLE.
// Reference:
// https://github.com/kiyo-masui/bitshuffle.git
#ifndef KUDU_CFILE_BSHUF_BLOCK_H
#define KUDU_CFILE_BSHUF_BLOCK_H
#include <algorithm>
#include <stdint.h>
#include <bitshuffle.h>
#include "kudu/cfile/block_encodings.h"
#include "kudu/cfile/cfile_util.h"
#include "kudu/common/columnblock.h"
#include "kudu/gutil/strings/substitute.h"
#include "kudu/util/coding.h"
#include "kudu/util/coding-inl.h"
#include "kudu/util/hexdump.h"
namespace kudu {
namespace cfile {
// Log a FATAL error message and exit.
void AbortWithBitShuffleError(int64_t val) ATTRIBUTE_NORETURN;
// BshufBlockBuilder bitshuffles and compresses the bits of fixed
// size type blocks with lz4.
//
// Header includes:
// 1. ordinal of the first element within the block (uint32_t, little endian).
// 2. num of element within the block (uint32_t, little endian).
// 3. compressed_size, including the header size (uint32_t, little endian).
// 4. number of element after padding, padding is needed to meet the requirement
// by bitshuffle library that the number of element in the block must be
// multiple of 8. That means some psudo elements are appended at the end of the
// block if necessary (uint32_t, little endian).
// 5. the size of the elements in bytes, as actually encoded. In the case that all of the
// data in a block can fit into a smaller integer type, then we may choose to encode
// that smaller type to save CPU costs. This is currently done only for the UINT32
// block type. (uint32_t, little endian).
template<DataType Type>
class BShufBlockBuilder : public BlockBuilder {
public:
explicit BShufBlockBuilder(const WriterOptions* options)
: count_(0),
options_(options) {
Reset();
}
void Reset() OVERRIDE {
count_ = 0;
data_.clear();
data_.reserve(options_->storage_attributes.cfile_block_size);
buffer_.clear();
buffer_.resize(kMaxHeaderSize);
}
bool IsBlockFull(size_t limit) const OVERRIDE {
return EstimateEncodedSize() > limit;
}
int Add(const uint8_t* vals_void, size_t count) OVERRIDE {
const CppType* vals = reinterpret_cast<const CppType* >(vals_void);
int added = 0;
// If the current block is full, stop adding more items.
while (!IsBlockFull(options_->storage_attributes.cfile_block_size) && added < count) {
const uint8_t* ptr = reinterpret_cast<const uint8_t*>(vals);
data_.append(ptr, size_of_type);
vals++;
added++;
count_++;
}
return added;
}
size_t Count() const OVERRIDE {
return count_;
}
Status GetFirstKey(void* key) const OVERRIDE {
if (count_ == 0) {
return Status::NotFound("no keys in data block");
}
memcpy(key, &data_[0], size_of_type);
return Status::OK();
}
Slice Finish(rowid_t ordinal_pos) OVERRIDE {
return Finish(ordinal_pos, size_of_type);
}
private:
size_t EstimateEncodedSize() const {
int num = count_ + NumOfPaddingNeeded();
// The result of bshuf_compress_lz4_bound(num, size_of_type, 0)
// is always bigger than the original size (num * size_of_type).
// However, the compression ratio in most cases is larger than 1,
// Therefore, using the original size may be more accurate and
// cause less overhead.
return kMaxHeaderSize + num * size_of_type;
}
uint32_t NumOfPaddingNeeded() const {
return (count_ % 8 == 0) ? 0 : 8 - (count_ % 8);
}
Slice Finish(rowid_t ordinal_pos, int final_size_of_type) {
data_.resize(kMaxHeaderSize + final_size_of_type * count_);
// Do padding so that the input num of element is multiple of 8.
uint32_t num_of_padding = NumOfPaddingNeeded() * final_size_of_type;
for (int i = 0; i < num_of_padding; i++) {
data_.push_back(0);
}
int num_elems_after_padding = count_ + NumOfPaddingNeeded();
buffer_.resize(kMaxHeaderSize +
bshuf_compress_lz4_bound(num_elems_after_padding, final_size_of_type, 0));
InlineEncodeFixed32(&buffer_[0], ordinal_pos);
InlineEncodeFixed32(&buffer_[4], count_);
int64_t bytes = bshuf_compress_lz4(data_.data(), &buffer_[kMaxHeaderSize],
num_elems_after_padding, final_size_of_type, 0);
if (PREDICT_FALSE(bytes < 0)) {
// This means the bitshuffle function fails.
// Ideally, this should not happen.
AbortWithBitShuffleError(bytes);
// It does not matter what will be returned here,
// since we have logged fatal in AbortWithBitShuffleError().
return Slice();
}
InlineEncodeFixed32(&buffer_[8], kMaxHeaderSize + bytes);
InlineEncodeFixed32(&buffer_[12], num_elems_after_padding);
InlineEncodeFixed32(&buffer_[16], final_size_of_type);
return Slice(buffer_.data(), kMaxHeaderSize + bytes);
}
// Length of a header.
static const size_t kMaxHeaderSize = sizeof(uint32_t) * 5;
typedef typename TypeTraits<Type>::cpp_type CppType;
enum {
size_of_type = TypeTraits<Type>::size
};
faststring data_;
faststring buffer_;
uint32_t count_;
const WriterOptions* options_;
};
template<>
Slice BShufBlockBuilder<UINT32>::Finish(rowid_t ordinal_pos);
template<DataType Type>
class BShufBlockDecoder : public BlockDecoder {
public:
explicit BShufBlockDecoder(Slice slice)
: data_(std::move(slice)),
parsed_(false),
ordinal_pos_base_(0),
num_elems_(0),
compressed_size_(0),
num_elems_after_padding_(0),
cur_idx_(0) {
}
Status ParseHeader() OVERRIDE {
CHECK(!parsed_);
if (data_.size() < kHeaderSize) {
return Status::Corruption(
strings::Substitute("not enough bytes for header: bitshuffle block header "
"size ($0) less than expected header length ($1)",
data_.size(), kHeaderSize));
}
ordinal_pos_base_ = DecodeFixed32(&data_[0]);
num_elems_ = DecodeFixed32(&data_[4]);
compressed_size_ = DecodeFixed32(&data_[8]);
if (compressed_size_ != data_.size()) {
return Status::Corruption("Size Information unmatched");
}
num_elems_after_padding_ = DecodeFixed32(&data_[12]);
if (num_elems_after_padding_ != num_elems_ + NumOfPaddingNeeded()) {
return Status::Corruption("num of element information corrupted");
}
size_of_elem_ = DecodeFixed32(&data_[16]);
switch (size_of_elem_) {
case 1:
case 2:
case 4:
case 8:
break;
default:
return Status::Corruption(strings::Substitute("invalid size_of_elem: $0", size_of_elem_));
}
// Currently, only the UINT32 block encoder supports expanding size:
if (PREDICT_FALSE(Type != UINT32 && size_of_elem_ != size_of_type)) {
return Status::Corruption(strings::Substitute("size_of_elem $0 != size_of_type $1",
size_of_elem_, size_of_type));
}
if (PREDICT_FALSE(size_of_elem_ > size_of_type)) {
return Status::Corruption(strings::Substitute("size_of_elem $0 > size_of_type $1",
size_of_elem_, size_of_type));
}
RETURN_NOT_OK(Expand());
parsed_ = true;
return Status::OK();
}
void SeekToPositionInBlock(uint pos) OVERRIDE {
CHECK(parsed_) << "Must call ParseHeader()";
if (PREDICT_FALSE(num_elems_ == 0)) {
DCHECK_EQ(0, pos);
return;
}
DCHECK_LE(pos, num_elems_);
cur_idx_ = pos;
}
Status SeekAtOrAfterValue(const void* value_void, bool* exact) OVERRIDE {
CppType target = *reinterpret_cast<const CppType*>(value_void);
int32_t left = 0;
int32_t right = num_elems_;
while (left != right) {
uint32_t mid = (left + right) / 2;
CppType mid_key = Decode<CppType>(
&decoded_[mid * size_of_type]);
if (mid_key == target) {
cur_idx_ = mid;
*exact = true;
return Status::OK();
} else if (mid_key > target) {
right = mid;
} else {
left = mid + 1;
}
}
*exact = false;
cur_idx_ = left;
if (cur_idx_ == num_elems_) {
return Status::NotFound("after last key in block");
}
return Status::OK();
}
Status CopyNextValues(size_t* n, ColumnDataView* dst) OVERRIDE {
DCHECK_EQ(dst->stride(), sizeof(CppType));
return CopyNextValuesToArray(n, dst->data());
}
// Copy the codewords to a temporary buffer.
// This API provides a more convenient way for the dictionary decoder to copy out
// integer codewords and then look up the strings. If we use the CopyNextValuesToArray()
// instead of CopyNextValues(), we do not need to create ColumnDataView and ColumnBlock
// object to wrap around the uint8_t pointer.
Status CopyNextValuesToArray(size_t* n, uint8_t* array) {
DCHECK(parsed_);
if (PREDICT_FALSE(*n == 0 || cur_idx_ >= num_elems_)) {
*n = 0;
return Status::OK();
}
size_t max_fetch = std::min(*n, static_cast<size_t>(num_elems_ - cur_idx_));
memcpy(array, &decoded_[cur_idx_ * size_of_type], max_fetch * size_of_type);
*n = max_fetch;
cur_idx_ += max_fetch;
return Status::OK();
}
size_t GetCurrentIndex() const OVERRIDE {
DCHECK(parsed_) << "must parse header first";
return cur_idx_;
}
virtual rowid_t GetFirstRowId() const OVERRIDE {
return ordinal_pos_base_;
}
size_t Count() const OVERRIDE {
return num_elems_;
}
bool HasNext() const OVERRIDE {
return (num_elems_ - cur_idx_) > 0;
}
private:
template<typename T>
static T Decode(const uint8_t* ptr) {
T result;
memcpy(&result, ptr, sizeof(result));
return result;
}
// Return the number of padding elements needed to ensure that the
// number of elements is a multiple of 8.
uint32_t NumOfPaddingNeeded() const {
return KUDU_ALIGN_UP(num_elems_, 8) - num_elems_;
}
Status Expand() {
if (num_elems_ > 0) {
int64_t bytes;
decoded_.resize(num_elems_after_padding_ * size_of_type);
uint8_t* in = const_cast<uint8_t*>(&data_[kHeaderSize]);
bytes = bshuf_decompress_lz4(in, decoded_.data(), num_elems_after_padding_, size_of_type, 0);
if (PREDICT_FALSE(bytes < 0)) {
// Ideally, this should not happen.
AbortWithBitShuffleError(bytes);
return Status::RuntimeError("Unshuffle Process failed");
}
}
return Status::OK();
}
// Min Length of a header.
static const size_t kHeaderSize = sizeof(uint32_t) * 5;
typedef typename TypeTraits<Type>::cpp_type CppType;
enum {
size_of_type = TypeTraits<Type>::size
};
Slice data_;
bool parsed_;
rowid_t ordinal_pos_base_;
uint32_t num_elems_;
uint32_t compressed_size_;
uint32_t num_elems_after_padding_;
// The size of each decoded element. In the case that the input range was
// smaller than the type, this may be smaller than 'size_of_type'.
// Currently, this is always 1, 2, 4, or 8.
int size_of_elem_;
size_t cur_idx_;
faststring decoded_;
};
template<>
Status BShufBlockDecoder<UINT32>::Expand();
template<>
Status BShufBlockDecoder<UINT32>::SeekAtOrAfterValue(const void* value_void, bool* exact);
template<>
Status BShufBlockDecoder<UINT32>::CopyNextValuesToArray(size_t* n, uint8_t* array);
} // namespace cfile
} // namespace kudu
#endif