| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| #pragma once |
| |
| #include <cstddef> |
| #include <cstdint> |
| #include <memory> |
| #include <string> |
| #include <vector> |
| |
| #include <glog/logging.h> |
| |
| #include "kudu/cfile/block_encodings.h" |
| #include "kudu/cfile/block_handle.h" |
| #include "kudu/cfile/block_pointer.h" |
| #include "kudu/cfile/cfile.pb.h" |
| #include "kudu/common/iterator_stats.h" |
| #include "kudu/common/rowid.h" |
| #include "kudu/fs/block_id.h" |
| #include "kudu/fs/block_manager.h" |
| #include "kudu/gutil/macros.h" |
| #include "kudu/gutil/ref_counted.h" |
| #include "kudu/util/compression/compression.pb.h" |
| #include "kudu/util/faststring.h" |
| #include "kudu/util/mem_tracker.h" |
| #include "kudu/util/object_pool.h" |
| #include "kudu/util/once.h" |
| #include "kudu/util/rle-encoding.h" |
| #include "kudu/util/slice.h" |
| #include "kudu/util/status.h" |
| |
| namespace kudu { |
| |
| class ColumnMaterializationContext; |
| class CompressionCodec; |
| class EncodedKey; |
| class SelectionVector; |
| class TypeInfo; |
| |
| namespace fs { |
| struct IOContext; |
| } // namespace fs |
| |
| template <typename T> class ArrayView; |
| |
| namespace cfile { |
| |
| class BinaryPlainBlockDecoder; |
| class CFileIterator; |
| class IndexTreeIterator; |
| class TypeEncodingInfo; |
| struct ReaderOptions; |
| |
| class CFileReader { |
| public: |
| // Fully open a cfile using a previously opened block. |
| // |
| // After this call, the reader is safe for use. |
| static Status Open(std::unique_ptr<fs::ReadableBlock> block, |
| ReaderOptions options, |
| std::unique_ptr<CFileReader>* reader); |
| |
| // Lazily open a cfile using a previously opened block. A lazy open does |
| // not incur additional I/O, nor does it validate the contents of the |
| // cfile. |
| // |
| // Init() must be called before most methods; the exceptions are documented |
| // below. |
| static Status OpenNoInit(std::unique_ptr<fs::ReadableBlock> block, |
| ReaderOptions options, |
| std::unique_ptr<CFileReader>* reader); |
| |
| // Fully opens a previously lazily opened cfile, parsing and validating |
| // its contents. |
| // |
| // May be called multiple times; subsequent calls will no-op. |
| Status Init(const fs::IOContext* io_context); |
| |
| enum CacheControl { |
| CACHE_BLOCK, |
| DONT_CACHE_BLOCK |
| }; |
| |
| // Can be called before Init(). |
| Status NewIterator(std::unique_ptr<CFileIterator>* iter, |
| CacheControl cache_control, |
| const fs::IOContext* io_context); |
| |
| // Reads the data block pointed to by `ptr`. Will pull the data block from |
| // the block cache if it exists, and reads from the filesystem block |
| // otherwise. |
| Status ReadBlock(const fs::IOContext* io_context, |
| const BlockPointer& ptr, |
| CacheControl cache_control, |
| scoped_refptr<BlockHandle>* ret) const; |
| |
| // Return the number of rows in this cfile. |
| // This is assumed to be reasonably fast (i.e does not scan |
| // the data) |
| Status CountRows(rowid_t* count) const; |
| |
| // Retrieve the given metadata entry into 'val'. |
| // Returns true if the entry was found, otherwise returns false. |
| // |
| // Note that this implementation is currently O(n), so should not be used |
| // in a hot path. |
| bool GetMetadataEntry(const std::string& key, std::string* val) const; |
| |
| // Can be called before Init(). |
| uint64_t file_size() const { |
| return file_size_; |
| } |
| |
| // Can be called before Init(). |
| const BlockId& block_id() const { |
| return block_->id(); |
| } |
| |
| const TypeInfo* type_info() const { |
| DCHECK(init_once_.init_succeeded()); |
| return type_info_; |
| } |
| |
| const TypeEncodingInfo* type_encoding_info() const { |
| DCHECK(init_once_.init_succeeded()); |
| return type_encoding_info_; |
| } |
| |
| bool is_nullable() const { |
| return footer().is_type_nullable(); |
| } |
| |
| const CFileHeaderPB& header() const { |
| DCHECK(init_once_.init_succeeded()); |
| return *DCHECK_NOTNULL(header_.get()); |
| } |
| |
| const CFileFooterPB& footer() const { |
| DCHECK(init_once_.init_succeeded()); |
| return *DCHECK_NOTNULL(footer_.get()); |
| } |
| |
| bool is_compressed() const { |
| return footer().compression() != NO_COMPRESSION; |
| } |
| |
| // Advanced access to the cfile. This is used by the |
| // delta reader code. TODO: think about reorganizing this: |
| // delta files can probably be done more cleanly. |
| |
| // Return true if there is a position-based index on this file. |
| bool has_posidx() const { return footer().has_posidx_info(); } |
| BlockPointer posidx_root() const { |
| DCHECK(has_posidx()); |
| return BlockPointer(footer().posidx_info().root_block()); |
| } |
| |
| // Return true if there is a value-based index on this file. |
| bool has_validx() const { return footer().has_validx_info(); } |
| BlockPointer validx_root() const { |
| DCHECK(has_validx()); |
| return BlockPointer(footer().validx_info().root_block()); |
| } |
| |
| // Can be called before Init(). |
| std::string ToString() const { return block_->id().ToString(); } |
| |
| private: |
| DISALLOW_COPY_AND_ASSIGN(CFileReader); |
| |
| static Status VerifyChecksum(ArrayView<const Slice> data, |
| const Slice& checksum); |
| |
| CFileReader(ReaderOptions options, |
| uint64_t file_size, |
| std::unique_ptr<fs::ReadableBlock> block); |
| |
| // Callback used in 'init_once_' to initialize this cfile. |
| Status InitOnce(const fs::IOContext* io_context); |
| |
| Status ReadAndParseHeader(); |
| Status ReadAndParseFooter(); |
| |
| // Return true if the file has checksum on the header, footer, and data blocks. |
| bool has_checksum() const; |
| |
| // Return true if has_checksum() returns true and the checksum verification |
| // is requested. |
| bool do_verify_checksum() const; |
| |
| // Returns the memory usage of the object including the object itself. |
| size_t memory_footprint() const; |
| |
| // Handles a corruption error. Functions that may return due to a CFile |
| // corruption should call this method before returning. |
| void HandleCorruption(const fs::IOContext* io_context) const; |
| |
| const std::unique_ptr<fs::ReadableBlock> block_; |
| const uint64_t file_size_; |
| |
| uint8_t cfile_version_; |
| |
| std::unique_ptr<CFileHeaderPB> header_; |
| std::unique_ptr<CFileFooterPB> footer_; |
| const CompressionCodec* codec_; |
| const TypeInfo* type_info_; |
| const TypeEncodingInfo* type_encoding_info_; |
| |
| bool do_verify_checksum_; |
| |
| KuduOnceLambda init_once_; |
| |
| ScopedTrackedConsumption mem_consumption_; |
| }; |
| |
| // Column Iterator interface used by the CFileSet. |
| // Implemented by the CFileIterator, DefaultColumnValueIterator |
| // and the ColumnValueTypeAdaptorIterator. |
| // It is used to fill the data requested by the projection. |
| class ColumnIterator { |
| public: |
| virtual ~ColumnIterator() {} |
| |
| // Seek to the given ordinal entry in the file. |
| // Entry 0 is the first entry written to the file. |
| // If provided seek point is past the end of the file, |
| // then returns a NotFound Status. |
| // TODO: do we ever want to be able to seek to the end of the file? |
| virtual Status SeekToOrdinal(rowid_t ord_idx) = 0; |
| |
| // Return true if this reader is currently seeked. |
| // If the iterator is not seeked, it is an error to call any functions except |
| // for seek (including GetCurrentOrdinal). |
| virtual bool seeked() const = 0; |
| |
| // Get the ordinal index that the iterator is currently pointed to. |
| // |
| // Prior to calling PrepareBatch(), this returns the position after the last |
| // seek. PrepareBatch() and Scan() do not change the position returned by this |
| // function. FinishBatch() advances the ordinal to the position of the next |
| // block to be prepared. |
| virtual rowid_t GetCurrentOrdinal() const = 0; |
| |
| // Prepare to read up to *n into the given column block. |
| // On return sets *n to the number of prepared rows, which is always |
| // <= the requested value. |
| // |
| // This assumes that dst->size() >= *n on input. |
| // |
| // If there are at least dst->size() values remaining in the underlying file, |
| // this will always return *n == dst->size(). In other words, this does not |
| // ever result in a "short read". |
| virtual Status PrepareBatch(size_t* n) = 0; |
| |
| // Copy values into the prepared column block. |
| // Any indirected values (eg strings) are copied into the ctx's block's |
| // arena. |
| // This does _not_ advance the position in the underlying file. Multiple |
| // calls to Scan() will re-read the same values. |
| // If decoder eval is supported and allowed, will additionally evaluate the |
| // column predicate. |
| virtual Status Scan(ColumnMaterializationContext* ctx) = 0; |
| |
| // Finish processing the current batch, advancing the iterators |
| // such that the next call to PrepareBatch() will start where the previous |
| // batch left off. |
| virtual Status FinishBatch() = 0; |
| |
| virtual const IteratorStats& io_statistics() const = 0; |
| }; |
| |
| // ColumnIterator that fills the ColumnBlock with the specified value. |
| // It is used by the CFileSet to handle the case of a column present |
| // in the projection schema but not in the base data. |
| // |
| // Example: |
| // DefaultColumnValueIterator iter; |
| // iter.Scan(&column_block); |
| class DefaultColumnValueIterator final : public ColumnIterator { |
| public: |
| DefaultColumnValueIterator(const TypeInfo* typeinfo, const void* value) |
| : typeinfo_(typeinfo), |
| value_(value), |
| ordinal_(0) { |
| } |
| |
| Status SeekToOrdinal(rowid_t ord_idx) override; |
| |
| bool seeked() const override { return true; } |
| |
| rowid_t GetCurrentOrdinal() const override { return ordinal_; } |
| |
| Status PrepareBatch(size_t* n) override; |
| Status Scan(ColumnMaterializationContext* ctx) override; |
| Status FinishBatch() override; |
| |
| const IteratorStats& io_statistics() const override { return io_stats_; } |
| |
| private: |
| const TypeInfo* typeinfo_; |
| const void* value_; |
| |
| size_t batch_; |
| rowid_t ordinal_; |
| IteratorStats io_stats_; |
| }; |
| |
| |
| class CFileIterator final : public ColumnIterator { |
| public: |
| CFileIterator(CFileReader* reader, |
| CFileReader::CacheControl cache_control, |
| const fs::IOContext* io_context); |
| ~CFileIterator(); |
| |
| // Seek to the first entry in the file. This works for both |
| // ordinal-indexed and value-indexed files. |
| Status SeekToFirst(); |
| |
| // Seek to the given ordinal entry in the file. |
| // Entry 0 is the first entry written to the file. |
| // If provided seek point is past the end of the file, |
| // then returns a NotFound Status. |
| // TODO: do we ever want to be able to seek to the end of the file? |
| Status SeekToOrdinal(rowid_t ord_idx) override; |
| |
| // Seek the index to the given row_key, or to the index entry immediately |
| // before it. Then (if the index is sparse) seek the data block to the |
| // value matching value or to the value immediately after it. |
| // |
| // Sets *exact_match to indicate whether the seek found the exact |
| // key requested. |
| // |
| // If this iterator was constructed without no value index, |
| // then this will return a NotSupported status. |
| Status SeekAtOrAfter(const EncodedKey& encoded_key, |
| bool* exact_match); |
| |
| // Return true if this reader is currently seeked. |
| // If the iterator is not seeked, it is an error to call any functions except |
| // for seek (including GetCurrentOrdinal). |
| bool seeked() const override { return seeked_; } |
| |
| // Get the ordinal index that the iterator is currently pointed to. |
| // |
| // Prior to calling PrepareBatch(), this returns the position after the last |
| // seek. PrepareBatch() and Scan() do not change the position returned by this |
| // function. FinishBatch() advances the ordinal to the position of the next |
| // block to be prepared. |
| rowid_t GetCurrentOrdinal() const override; |
| |
| // Prepare to read up to *n into the given column block. |
| // On return sets *n to the number of prepared rows, which is always |
| // <= the requested value. |
| // |
| // This assumes that dst->size() >= *n on input. |
| // |
| // If there are at least dst->size() values remaining in the underlying file, |
| // this will always return *n == dst->size(). In other words, this does not |
| // ever result in a "short read". |
| Status PrepareBatch(size_t* n) override; |
| |
| // Copy values into the prepared column block. |
| // Any indirected values (eg strings) are copied into the dst block's |
| // arena. |
| // This does _not_ advance the position in the underlying file. Multiple |
| // calls to Scan() will re-read the same values. |
| Status Scan(ColumnMaterializationContext* ctx) override; |
| |
| // Finish processing the current batch, advancing the iterators |
| // such that the next call to PrepareBatch() will start where the previous |
| // batch left off. |
| Status FinishBatch() override; |
| |
| // Return true if the next call to PrepareBatch will return at least one row. |
| bool HasNext() const; |
| |
| // Convenience method to prepare a batch, scan it, and finish it. |
| Status CopyNextValues(size_t* n, ColumnMaterializationContext* ctx); |
| |
| const IteratorStats& io_statistics() const override { |
| return io_stats_; |
| } |
| |
| // If the column is dictionary-coded, returns the decoder |
| // for the cfile's dictionary block. This is called by the |
| // BinaryDictBlockDecoder. |
| BinaryPlainBlockDecoder* GetDictDecoder() { return dict_decoder_.get(); } |
| |
| // If the column is dictionary-coded and a predicate on the column exists, |
| // returns the set of codewords that pass the predicate. Since a vocabulary |
| // is shared among the multiple BinaryDictBlockDecoders in a single cfile, |
| // the reader must expose an interface for all decoders to access the |
| // single set of predicate-satisfying codewords. |
| SelectionVector* GetCodeWordsMatchingPredicate() { |
| return codewords_matching_pred_.get(); |
| } |
| |
| private: |
| DISALLOW_COPY_AND_ASSIGN(CFileIterator); |
| |
| struct PreparedBlock { |
| BlockPointer dblk_ptr_; |
| scoped_refptr<BlockHandle> dblk_handle_; |
| std::unique_ptr<BlockDecoder> dblk_; |
| |
| // The rowid of the first row in this block. |
| rowid_t first_row_idx() const { |
| return dblk_->GetFirstRowId(); |
| } |
| |
| // The index of the seeked position, relative to the start of the block. |
| // In case of null bitmap present, dblk_->GetCurrentIndex() is not aligned |
| // with the row number, since null values are not written to the data block. |
| // check CFileIterator::SeekToPositionInBlock() |
| uint32_t idx_in_block_; |
| |
| // When the block is first read, it is seeked to the proper position |
| // and rewind_idx_ is set to that offset in the block. needs_rewind_ |
| // is initially false, but after any values are read from the block, |
| // it becomes true. This indicates that dblk_ is pointed at a later |
| // position in the block, and should be rewound if a second call to |
| // Scan() is made. |
| // rewind_idx is relative to the first entry in the block (i.e. not a rowid) |
| bool needs_rewind_; |
| uint32_t rewind_idx_; |
| |
| // Total number of rows in the block (nulls + not nulls) |
| uint32_t num_rows_in_block_; |
| |
| // Null bitmap and bitmap (RLE) decoder |
| Slice rle_bitmap; |
| RleDecoder<bool> rle_decoder_; |
| |
| rowid_t last_row_idx() const { |
| return first_row_idx() + num_rows_in_block_ - 1; |
| } |
| |
| std::string ToString() const; |
| }; |
| |
| // Seek the given PreparedBlock to the given index within it. |
| void SeekToPositionInBlock(PreparedBlock* pb, uint32_t idx_in_block); |
| |
| // Read the data block currently pointed to by idx_iter_ |
| // into the given PreparedBlock structure. |
| // |
| // This does not advance the iterator. |
| Status ReadCurrentDataBlock(const IndexTreeIterator& idx_iter, |
| PreparedBlock* prep_block); |
| |
| // Read the data block currently pointed to by idx_iter_, and enqueue |
| // it onto the end of the prepared_blocks_ deque. |
| Status QueueCurrentDataBlock(const IndexTreeIterator& idx_iter); |
| |
| // Fully initialize the underlying cfile reader if needed, and clear any |
| // seek-related state. |
| Status PrepareForNewSeek(); |
| |
| CFileReader* reader_; |
| |
| std::unique_ptr<IndexTreeIterator> posidx_iter_; |
| std::unique_ptr<IndexTreeIterator> validx_iter_; |
| |
| // Decoder for the dictionary block. |
| std::unique_ptr<BinaryPlainBlockDecoder> dict_decoder_; |
| scoped_refptr<BlockHandle> dict_block_handle_; |
| |
| // Set containing the codewords that match the predicate in a dictionary. |
| std::unique_ptr<SelectionVector> codewords_matching_pred_; |
| |
| // The currently in-use index iterator. This is equal to either |
| // posidx_iter_.get(), validx_iter_.get(), or NULL if not seeked. |
| IndexTreeIterator* seeked_; |
| |
| // Data blocks that contain data relevant to the currently Prepared |
| // batch of rows. |
| // These pointers are allocated from the prepared_block_pool_ below. |
| std::vector<PreparedBlock*> prepared_blocks_; |
| |
| ObjectPool<PreparedBlock> prepared_block_pool_; |
| typedef ObjectPool<PreparedBlock>::scoped_ptr pblock_pool_scoped_ptr; |
| |
| // True if PrepareBatch() has been called more recently than FinishBatch(). |
| bool prepared_; |
| |
| // Whether this iterator will ask the cfile to cache the blocks it requests or not. |
| const CFileReader::CacheControl cache_control_; |
| |
| // RowID of the current prepared batch, if prepared_ is true. |
| // Otherwise, the RowID of the next batch that will be prepared. |
| rowid_t last_prepare_idx_; |
| |
| // Number of rows in the current batch, if prepared_ is true. |
| // Otherwise, 0. |
| uint32_t last_prepare_count_; |
| |
| IteratorStats io_stats_; |
| |
| const fs::IOContext* io_context_; |
| |
| // a temporary buffer for encoding |
| faststring tmp_buf_; |
| }; |
| |
| } // namespace cfile |
| } // namespace kudu |