| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| // |
| // Make use of bitshuffle and lz4 to encode the fixed size |
| // type blocks, such as UINT8, INT8, UINT16, INT16, |
| // UINT32, INT32, FLOAT, DOUBLE. |
| // Reference: |
| // https://github.com/kiyo-masui/bitshuffle.git |
| #ifndef KUDU_CFILE_BSHUF_BLOCK_H |
| #define KUDU_CFILE_BSHUF_BLOCK_H |
| |
| #include <sys/types.h> |
| |
| #include <algorithm> |
| #include <cstring> |
| #include <cstdint> |
| #include <ostream> |
| #include <vector> |
| |
| #include <glog/logging.h> |
| |
| #include "kudu/cfile/bitshuffle_arch_wrapper.h" |
| #include "kudu/cfile/block_handle.h" |
| #include "kudu/cfile/block_encodings.h" |
| #include "kudu/cfile/cfile_util.h" |
| #include "kudu/common/columnblock.h" |
| #include "kudu/common/common.pb.h" |
| #include "kudu/common/rowid.h" |
| #include "kudu/common/schema.h" |
| #include "kudu/common/types.h" |
| #include "kudu/gutil/port.h" |
| #include "kudu/gutil/ref_counted.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/util/alignment.h" |
| #include "kudu/util/coding.h" |
| #include "kudu/util/coding-inl.h" |
| #include "kudu/util/faststring.h" |
| #include "kudu/util/slice.h" |
| #include "kudu/util/status.h" |
| |
| namespace kudu { |
| namespace cfile { |
| |
| |
| // Log a FATAL error message and exit. |
| void AbortWithBitShuffleError(int64_t val) ATTRIBUTE_NORETURN; |
| |
| // BshufBlockBuilder bitshuffles and compresses the bits of fixed |
| // size type blocks with lz4. |
| // |
| // The block format is as follows: |
| // |
| // 1. Header: (20 bytes total) |
| // |
| // <first_ordinal> [32-bit] |
| // The ordinal offset of the first element in the block. |
| // |
| // <num_elements> [32-bit] |
| // The number of elements encoded in the block. |
| // |
| // <compressed_size> [32-bit] |
| // The post-compression size of the block, including this header. |
| // |
| // <padded_num_elements> [32-bit] |
| // Padding is needed to meet the requirements of the bitshuffle |
| // library such that the input/output is a multiple of 8. Some |
| // ignored elements are appended to the end of the block if necessary |
| // to meet this requirement. |
| // |
| // This header field is the post-padding element count. |
| // |
| // <elem_size_bytes> [32-bit] |
| // The size of the elements, in bytes, as actually encoded. In the |
| // case that all of the data in a block can fit into a smaller |
| // integer type, then we may choose to encode that smaller type |
| // to save CPU costs. |
| // |
| // This is currently only implemented in the UINT32 block type. |
| // |
| // NOTE: all on-disk ints are encoded little-endian |
| // |
| // 2. Element data |
| // |
| // The header is followed by the bitshuffle-compressed element data. |
| // |
| template<DataType Type> |
| class BShufBlockBuilder final : public BlockBuilder { |
| public: |
| explicit BShufBlockBuilder(const WriterOptions* options) |
| : count_(0), |
| options_(options) { |
| Reset(); |
| } |
| |
| void Reset() OVERRIDE { |
| auto block_size = options_->storage_attributes.cfile_block_size; |
| count_ = 0; |
| data_.clear(); |
| data_.reserve(block_size); |
| DCHECK_EQ(reinterpret_cast<uintptr_t>(data_.data()) & (alignof(CppType) - 1), 0) |
| << "buffer must be naturally-aligned"; |
| buffer_.clear(); |
| buffer_.resize(kHeaderSize); |
| finished_ = false; |
| rem_elem_capacity_ = block_size / size_of_type; |
| } |
| |
| bool IsBlockFull() const override { |
| return rem_elem_capacity_ == 0; |
| } |
| |
| int Add(const uint8_t* vals_void, size_t count) OVERRIDE { |
| DCHECK(!finished_); |
| int to_add = std::min<int>(rem_elem_capacity_, count); |
| data_.append(vals_void, to_add * size_of_type); |
| count_ += to_add; |
| rem_elem_capacity_ -= to_add; |
| return to_add; |
| } |
| |
| size_t Count() const OVERRIDE { |
| return count_; |
| } |
| |
| Status GetFirstKey(void* key) const OVERRIDE { |
| DCHECK(finished_); |
| if (count_ == 0) { |
| return Status::NotFound("no keys in data block"); |
| } |
| memcpy(key, &first_key_, size_of_type); |
| return Status::OK(); |
| } |
| |
| Status GetLastKey(void* key) const OVERRIDE { |
| DCHECK(finished_); |
| if (count_ == 0) { |
| return Status::NotFound("no keys in data block"); |
| } |
| memcpy(key, &last_key_, size_of_type); |
| return Status::OK(); |
| } |
| |
| void Finish(rowid_t ordinal_pos, std::vector<Slice>* slices) OVERRIDE { |
| RememberFirstAndLastKey(); |
| *slices = { Finish(ordinal_pos, size_of_type) }; |
| } |
| |
| private: |
| typedef typename TypeTraits<Type>::cpp_type CppType; |
| |
| CppType cell(int idx) const { |
| DCHECK_GE(idx, 0); |
| return UnalignedLoad<CppType>(&data_[idx * size_of_type]); |
| } |
| |
| // Remember the last added key in 'last_key_'. This is done during |
| // Finish() because Finish() may rearrange/collapse the values in the |
| // data_ buffer during the encoding process. |
| void RememberFirstAndLastKey() { |
| if (count_ == 0) return; |
| first_key_ = cell(0); |
| last_key_ = cell(count_ - 1); |
| } |
| |
| Slice Finish(rowid_t ordinal_pos, int final_size_of_type) { |
| data_.resize(final_size_of_type * count_); |
| |
| // Do padding so that the input num of element is multiple of 8. |
| int num_elems_after_padding = KUDU_ALIGN_UP(count_, 8); |
| int padding_elems = num_elems_after_padding - count_; |
| int padding_bytes = padding_elems * final_size_of_type; |
| for (int i = 0; i < padding_bytes; i++) { |
| data_.push_back(0); |
| } |
| DCHECK_EQ(0, data_.length() % 8); |
| |
| buffer_.resize(kHeaderSize + |
| bitshuffle::compress_lz4_bound(num_elems_after_padding, final_size_of_type, 0)); |
| |
| InlineEncodeFixed32(&buffer_[0], ordinal_pos); |
| InlineEncodeFixed32(&buffer_[4], count_); |
| int64_t bytes = bitshuffle::compress_lz4(data_.data(), &buffer_[kHeaderSize], |
| num_elems_after_padding, final_size_of_type, 0); |
| if (PREDICT_FALSE(bytes < 0)) { |
| // This means the bitshuffle function fails. |
| // Ideally, this should not happen. |
| AbortWithBitShuffleError(bytes); |
| // It does not matter what will be returned here, |
| // since we have logged fatal in AbortWithBitShuffleError(). |
| return Slice(); |
| } |
| InlineEncodeFixed32(&buffer_[8], kHeaderSize + bytes); |
| InlineEncodeFixed32(&buffer_[12], num_elems_after_padding); |
| InlineEncodeFixed32(&buffer_[16], final_size_of_type); |
| finished_ = true; |
| return Slice(buffer_.data(), kHeaderSize + bytes); |
| } |
| |
| // Length of a header. |
| static const size_t kHeaderSize = sizeof(uint32_t) * 5; |
| enum { |
| size_of_type = TypeTraits<Type>::size |
| }; |
| |
| faststring data_; |
| faststring buffer_; |
| uint32_t count_; |
| int rem_elem_capacity_; |
| bool finished_; |
| CppType first_key_; |
| CppType last_key_; |
| const WriterOptions* options_; |
| }; |
| |
| template<> |
| void BShufBlockBuilder<UINT32>::Finish(rowid_t ordinal_pos, std::vector<Slice>* slices); |
| |
| template<DataType Type> |
| class BShufBlockDecoder final : public BlockDecoder { |
| public: |
| explicit BShufBlockDecoder(scoped_refptr<BlockHandle> block) |
| : block_(std::move(block)), |
| data_(block_->data()), |
| parsed_(false), |
| ordinal_pos_base_(0), |
| num_elems_(0), |
| compressed_size_(0), |
| num_elems_after_padding_(0), |
| size_of_elem_(0), |
| cur_idx_(0) { |
| } |
| |
| Status ParseHeader() OVERRIDE { |
| CHECK(!parsed_); |
| if (data_.size() < kHeaderSize) { |
| return Status::Corruption( |
| strings::Substitute("not enough bytes for header: bitshuffle block header " |
| "size ($0) less than expected header length ($1)", |
| data_.size(), kHeaderSize)); |
| } |
| |
| ordinal_pos_base_ = DecodeFixed32(&data_[0]); |
| num_elems_ = DecodeFixed32(&data_[4]); |
| compressed_size_ = DecodeFixed32(&data_[8]); |
| if (compressed_size_ != data_.size()) { |
| return Status::Corruption("Size Information unmatched"); |
| } |
| num_elems_after_padding_ = DecodeFixed32(&data_[12]); |
| if (num_elems_after_padding_ != KUDU_ALIGN_UP(num_elems_, 8)) { |
| return Status::Corruption("num of element information corrupted"); |
| } |
| size_of_elem_ = DecodeFixed32(&data_[16]); |
| switch (size_of_elem_) { |
| case 1: |
| case 2: |
| case 4: |
| case 8: |
| case 16: |
| break; |
| default: |
| return Status::Corruption(strings::Substitute("invalid size_of_elem: $0", size_of_elem_)); |
| } |
| |
| // Currently, only the UINT32 block encoder supports expanding size: |
| if (PREDICT_FALSE(Type != UINT32 && size_of_elem_ != size_of_type)) { |
| return Status::Corruption(strings::Substitute("size_of_elem $0 != size_of_type $1", |
| size_of_elem_, size_of_type)); |
| } |
| if (PREDICT_FALSE(size_of_elem_ > size_of_type)) { |
| return Status::Corruption(strings::Substitute("size_of_elem $0 > size_of_type $1", |
| size_of_elem_, size_of_type)); |
| } |
| |
| RETURN_NOT_OK(Expand()); |
| |
| parsed_ = true; |
| return Status::OK(); |
| } |
| |
| void SeekToPositionInBlock(uint pos) OVERRIDE { |
| CHECK(parsed_) << "Must call ParseHeader()"; |
| if (PREDICT_FALSE(num_elems_ == 0)) { |
| DCHECK_EQ(0, pos); |
| return; |
| } |
| |
| DCHECK_LE(pos, num_elems_); |
| cur_idx_ = pos; |
| } |
| |
| Status SeekAtOrAfterValue(const void* value_void, bool* exact) OVERRIDE { |
| CppType target = UnalignedLoad<CppType>(value_void); |
| int32_t left = 0; |
| int32_t right = num_elems_; |
| while (left != right) { |
| uint32_t mid = (left + right) / 2; |
| CppType mid_key = Decode<CppType>( |
| &decoded_[mid * size_of_type]); |
| if (mid_key == target) { |
| cur_idx_ = mid; |
| *exact = true; |
| return Status::OK(); |
| } else if (mid_key > target) { |
| right = mid; |
| } else { |
| left = mid + 1; |
| } |
| } |
| |
| *exact = false; |
| cur_idx_ = left; |
| if (cur_idx_ == num_elems_) { |
| return Status::NotFound("after last key in block"); |
| } |
| return Status::OK(); |
| } |
| |
| Status CopyNextValues(size_t* n, ColumnDataView* dst) OVERRIDE { |
| DCHECK_EQ(dst->stride(), sizeof(CppType)); |
| return CopyNextValuesToArray(n, dst->data()); |
| } |
| |
| // Copy the codewords to a temporary buffer. |
| // This API provides a more convenient way for the dictionary decoder to copy out |
| // integer codewords and then look up the strings. If we use the CopyNextValuesToArray() |
| // instead of CopyNextValues(), we do not need to create ColumnDataView and ColumnBlock |
| // object to wrap around the uint8_t pointer. |
| Status CopyNextValuesToArray(size_t* n, uint8_t* array) { |
| DCHECK(parsed_); |
| if (PREDICT_FALSE(*n == 0 || cur_idx_ >= num_elems_)) { |
| *n = 0; |
| return Status::OK(); |
| } |
| |
| size_t max_fetch = std::min(*n, static_cast<size_t>(num_elems_ - cur_idx_)); |
| memcpy(array, &decoded_[cur_idx_ * size_of_type], max_fetch * size_of_type); |
| |
| *n = max_fetch; |
| cur_idx_ += max_fetch; |
| |
| return Status::OK(); |
| } |
| |
| size_t GetCurrentIndex() const OVERRIDE { |
| DCHECK(parsed_) << "must parse header first"; |
| return cur_idx_; |
| } |
| |
| virtual rowid_t GetFirstRowId() const OVERRIDE { |
| return ordinal_pos_base_; |
| } |
| |
| size_t Count() const OVERRIDE { |
| return num_elems_; |
| } |
| |
| bool HasNext() const OVERRIDE { |
| return (num_elems_ - cur_idx_) > 0; |
| } |
| |
| private: |
| template<typename T> |
| static T Decode(const uint8_t* ptr) { |
| T result; |
| memcpy(&result, ptr, sizeof(result)); |
| return result; |
| } |
| |
| Status Expand() { |
| if (num_elems_ > 0) { |
| int64_t bytes; |
| decoded_.resize(num_elems_after_padding_ * size_of_elem_); |
| uint8_t* in = const_cast<uint8_t*>(&data_[kHeaderSize]); |
| bytes = bitshuffle::decompress_lz4(in, decoded_.data(), num_elems_after_padding_, |
| size_of_elem_, 0); |
| if (PREDICT_FALSE(bytes < 0)) { |
| // Ideally, this should not happen. |
| AbortWithBitShuffleError(bytes); |
| return Status::RuntimeError("Unshuffle Process failed"); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| // Min Length of a header. |
| static const size_t kHeaderSize = sizeof(uint32_t) * 5; |
| typedef typename TypeTraits<Type>::cpp_type CppType; |
| enum { |
| size_of_type = TypeTraits<Type>::size |
| }; |
| |
| scoped_refptr<BlockHandle> block_; |
| Slice data_; |
| bool parsed_; |
| |
| rowid_t ordinal_pos_base_; |
| uint32_t num_elems_; |
| uint32_t compressed_size_; |
| uint32_t num_elems_after_padding_; |
| |
| // The size of each decoded element. In the case that the input range was |
| // smaller than the type, this may be smaller than 'size_of_type'. |
| // Currently, this is always 1, 2, 4, or 8. |
| int size_of_elem_; |
| |
| size_t cur_idx_; |
| faststring decoded_; |
| }; |
| |
| template<> |
| Status BShufBlockDecoder<UINT32>::SeekAtOrAfterValue(const void* value_void, bool* exact); |
| template<> |
| Status BShufBlockDecoder<UINT32>::CopyNextValuesToArray(size_t* n, uint8_t* array); |
| |
| |
| } // namespace cfile |
| } // namespace kudu |
| #endif |