| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| #ifndef KUDU_CFILE_PLAIN_BLOCK_H |
| #define KUDU_CFILE_PLAIN_BLOCK_H |
| |
| #include <algorithm> |
| #include <string> |
| #include <vector> |
| |
| #include "kudu/cfile/block_encodings.h" |
| #include "kudu/cfile/block_handle.h" |
| #include "kudu/cfile/cfile_util.h" |
| #include "kudu/common/columnblock.h" |
| #include "kudu/gutil/port.h" |
| #include "kudu/util/coding.h" |
| #include "kudu/util/coding-inl.h" |
| #include "kudu/util/hexdump.h" |
| |
| namespace kudu { |
| namespace cfile { |
| |
| template<typename Type> |
| inline Type Decode(const uint8_t *ptr) { |
| Type result; |
| memcpy(&result, ptr, sizeof(result)); |
| return result; |
| } |
| |
| static const size_t kPlainBlockHeaderSize = sizeof(uint32_t) * 2; |
| |
| // |
| // A plain encoder for generic fixed size data types. |
| // |
| template<DataType Type> |
| class PlainBlockBuilder final : public BlockBuilder { |
| public: |
| explicit PlainBlockBuilder(const WriterOptions *options) |
| : options_(options) { |
| // Reserve enough space for the block, plus a bit of slop since |
| // we often overrun the block by a few values. |
| buffer_.reserve(kPlainBlockHeaderSize + options_->storage_attributes.cfile_block_size + 1024); |
| Reset(); |
| } |
| |
| virtual int Add(const uint8_t *vals_void, size_t count) OVERRIDE { |
| int old_size = buffer_.size(); |
| buffer_.resize(old_size + count * kCppTypeSize); |
| memcpy(&buffer_[old_size], vals_void, count * kCppTypeSize); |
| count_ += count; |
| return count; |
| } |
| |
| virtual bool IsBlockFull() const override { |
| return buffer_.size() > options_->storage_attributes.cfile_block_size; |
| } |
| |
| virtual void Finish(rowid_t ordinal_pos, std::vector<Slice>* slices) OVERRIDE { |
| InlineEncodeFixed32(&buffer_[0], count_); |
| InlineEncodeFixed32(&buffer_[4], ordinal_pos); |
| *slices = { Slice(buffer_) }; |
| } |
| |
| virtual void Reset() OVERRIDE { |
| count_ = 0; |
| buffer_.clear(); |
| buffer_.resize(kPlainBlockHeaderSize); |
| } |
| |
| virtual size_t Count() const OVERRIDE { |
| return count_; |
| } |
| |
| virtual Status GetFirstKey(void *key) const OVERRIDE { |
| DCHECK_GT(count_, 0); |
| UnalignedStore(key, Decode<CppType>(&buffer_[kPlainBlockHeaderSize])); |
| return Status::OK(); |
| } |
| |
| virtual Status GetLastKey(void *key) const OVERRIDE { |
| DCHECK_GT(count_, 0); |
| size_t idx = kPlainBlockHeaderSize + (count_ - 1) * kCppTypeSize; |
| UnalignedStore(key, Decode<CppType>(&buffer_[idx])); |
| return Status::OK(); |
| } |
| |
| private: |
| faststring buffer_; |
| const WriterOptions *options_; |
| size_t count_; |
| typedef typename TypeTraits<Type>::cpp_type CppType; |
| enum { |
| kCppTypeSize = TypeTraits<Type>::size |
| }; |
| |
| }; |
| |
| // |
| // A plain decoder for generic fixed size data types. |
| // |
| template<DataType Type> |
| class PlainBlockDecoder final : public BlockDecoder { |
| public: |
| explicit PlainBlockDecoder(scoped_refptr<BlockHandle> block) |
| : block_(std::move(block)), |
| data_(block_->data()), |
| parsed_(false), |
| num_elems_(0), |
| ordinal_pos_base_(0), |
| cur_idx_(0) { |
| } |
| |
| virtual Status ParseHeader() OVERRIDE { |
| CHECK(!parsed_); |
| |
| if (data_.size() < kPlainBlockHeaderSize) { |
| return Status::Corruption( |
| "not enough bytes for header in PlainBlockDecoder"); |
| } |
| |
| num_elems_ = DecodeFixed32(&data_[0]); |
| ordinal_pos_base_ = DecodeFixed32(&data_[4]); |
| |
| if (data_.size() != kPlainBlockHeaderSize + num_elems_ * size_of_type) { |
| return Status::Corruption( |
| std::string("unexpected data size. ") + "\nFirst 100 bytes: " |
| + HexDump( |
| Slice(data_.data(), |
| (data_.size() < 100 ? data_.size() : 100)))); |
| } |
| |
| parsed_ = true; |
| |
| SeekToPositionInBlock(0); |
| |
| return Status::OK(); |
| } |
| |
| virtual void SeekToPositionInBlock(uint pos) OVERRIDE { |
| CHECK(parsed_) << "Must call ParseHeader()"; |
| |
| if (PREDICT_FALSE(num_elems_ == 0)) { |
| DCHECK_EQ(0, pos); |
| return; |
| } |
| |
| DCHECK_LE(pos, num_elems_); |
| cur_idx_ = pos; |
| } |
| |
| virtual Status SeekAtOrAfterValue(const void *value, bool *exact_match) OVERRIDE { |
| DCHECK(value != NULL); |
| |
| CppType target = UnalignedLoad<CppType>(value); |
| |
| uint32_t left = 0; |
| uint32_t right = num_elems_; |
| while (left != right) { |
| uint32_t mid = (left + right) / 2; |
| CppType mid_key = Decode<CppType>( |
| &data_[kPlainBlockHeaderSize + mid * size_of_type]); |
| // assumes CppType has an implementation of operator<() |
| if (mid_key < target) { |
| left = mid + 1; |
| } else if (mid_key > target) { |
| right = mid; |
| } else { |
| cur_idx_ = mid; |
| *exact_match = true; |
| return Status::OK(); |
| } |
| } |
| |
| *exact_match = false; |
| cur_idx_ = left; |
| if (cur_idx_ == num_elems_) { |
| return Status::NotFound("after last key in block"); |
| } |
| |
| return Status::OK(); |
| } |
| |
| virtual Status CopyNextValues(size_t *n, ColumnDataView *dst) OVERRIDE { |
| DCHECK(parsed_); |
| DCHECK_LE(*n, dst->nrows()); |
| DCHECK_EQ(dst->stride(), sizeof(CppType)); |
| |
| if (PREDICT_FALSE(*n == 0 || cur_idx_ >= num_elems_)) { |
| *n = 0; |
| return Status::OK(); |
| } |
| |
| size_t max_fetch = std::min(*n, static_cast<size_t>(num_elems_ - cur_idx_)); |
| memcpy(dst->data(), |
| &data_[kPlainBlockHeaderSize + cur_idx_ * size_of_type], |
| max_fetch * size_of_type); |
| cur_idx_ += max_fetch; |
| *n = max_fetch; |
| return Status::OK(); |
| } |
| |
| virtual bool HasNext() const OVERRIDE { |
| return cur_idx_ < num_elems_; |
| } |
| |
| virtual size_t Count() const OVERRIDE { |
| return num_elems_; |
| } |
| |
| virtual size_t GetCurrentIndex() const OVERRIDE { |
| return cur_idx_; |
| } |
| |
| virtual rowid_t GetFirstRowId() const OVERRIDE { |
| return ordinal_pos_base_; |
| } |
| |
| private: |
| scoped_refptr<BlockHandle> block_; |
| Slice data_; |
| bool parsed_; |
| uint32_t num_elems_; |
| rowid_t ordinal_pos_base_; |
| uint32_t cur_idx_; |
| typedef typename TypeTraits<Type>::cpp_type CppType; |
| enum { |
| size_of_type = TypeTraits<Type>::size |
| }; |
| |
| }; |
| |
| } // namespace cfile |
| } // namespace kudu |
| |
| #endif |