| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "kudu/common/generic_iterators.h" |
| |
| #include <sys/types.h> |
| |
| #include <algorithm> |
| #include <cstddef> |
| #include <cstdint> |
| #include <deque> |
| #include <iterator> |
| #include <memory> |
| #include <mutex> |
| #include <numeric> |
| #include <optional> |
| #include <ostream> |
| #include <string> |
| #include <type_traits> |
| #include <typeinfo> |
| #include <unordered_map> |
| #include <utility> |
| |
| #include <boost/heap/skew_heap.hpp> |
| #include <boost/intrusive/list.hpp> |
| #include <boost/intrusive/list_hook.hpp> |
| #include <boost/iterator/iterator_facade.hpp> |
| #include <gflags/gflags.h> |
| #include <glog/logging.h> |
| |
| #include "kudu/common/column_materialization_context.h" |
| #include "kudu/common/column_predicate.h" |
| #include "kudu/common/columnblock.h" |
| #include "kudu/common/common.pb.h" |
| #include "kudu/common/iterator.h" |
| #include "kudu/common/iterator_stats.h" |
| #include "kudu/common/predicate_effectiveness.h" |
| #include "kudu/common/row.h" |
| #include "kudu/common/rowblock.h" |
| #include "kudu/common/rowblock_memory.h" |
| #include "kudu/common/scan_spec.h" |
| #include "kudu/common/schema.h" |
| #include "kudu/gutil/casts.h" |
| #include "kudu/gutil/macros.h" |
| #include "kudu/gutil/map-util.h" |
| #include "kudu/gutil/port.h" |
| #include "kudu/gutil/strings/join.h" |
| #include "kudu/gutil/strings/substitute.h" |
| #include "kudu/util/flag_tags.h" |
| #include "kudu/util/locks.h" |
| #include "kudu/util/memory/arena.h" |
| #include "kudu/util/object_pool.h" |
| |
| namespace boost { |
| namespace heap { |
| template <class T> struct compare; |
| } // namespace heap |
| } // namespace boost |
| |
| using std::deque; |
| using std::get; |
| using std::sort; |
| using std::string; |
| using std::unique_ptr; |
| using std::vector; |
| using strings::Substitute; |
| |
| DEFINE_bool(materializing_iterator_do_pushdown, true, |
| "Should MaterializingIterator do predicate pushdown"); |
| TAG_FLAG(materializing_iterator_do_pushdown, hidden); |
| DEFINE_bool(materializing_iterator_decoder_eval, true, |
| "Should MaterializingIterator do decoder-level evaluation"); |
| TAG_FLAG(materializing_iterator_decoder_eval, hidden); |
| TAG_FLAG(materializing_iterator_decoder_eval, runtime); |
| |
| namespace kudu { |
| namespace { |
| void AddIterStats(const RowwiseIterator& iter, |
| vector<IteratorStats>* stats) { |
| vector<IteratorStats> iter_stats; |
| iter.GetIteratorStats(&iter_stats); |
| DCHECK_EQ(stats->size(), iter_stats.size()); |
| for (int i = 0; i < iter_stats.size(); i++) { |
| (*stats)[i] += iter_stats[i]; |
| } |
| } |
| } // anonymous namespace |
| |
| |
| typedef std::pair<int32_t, ColumnPredicate> ColumnIdxAndPredicate; |
| |
| //////////////////////////////////////////////////////////// |
| // MergeIterator |
| //////////////////////////////////////////////////////////// |
| |
| // This is sized to a power of 2 to improve BitmapCopy performance when copying |
| // a RowBlock. |
| // |
| // TODO(todd): this should be sized by # bytes, not # rows. |
| static const int kMergeRowBuffer = 1024; |
| |
| // MergeIterState wraps a RowwiseIterator for use by the MergeIterator. |
| class MergeIterState : public boost::intrusive::list_base_hook<> { |
| public: |
| explicit MergeIterState(IterWithBounds iwb) |
| : iwb_(std::move(iwb)), |
| memory_(1024), |
| next_row_idx_(0) |
| {} |
| |
| // Fetches the next row from the iterator's current block, or the iterator's |
| // absolute lower bound if a block has not yet been pulled. |
| // |
| // Does not advance the iterator. |
| const RowBlockRow& next_row() const { |
| if (read_block_) { |
| DCHECK_LT(next_row_idx_, read_block_->nrows()); |
| return next_row_; |
| } |
| DCHECK(decoded_bounds_); |
| return decoded_bounds_->lower; |
| } |
| |
| // Fetches the last row from the iterator's current block, or the iterator's |
| // absolute upper bound if a block has not yet been pulled. |
| // |
| // Does not advance the iterator. |
| const RowBlockRow& last_row() const { |
| if (read_block_) { |
| return last_row_; |
| } |
| DCHECK(decoded_bounds_); |
| return decoded_bounds_->upper; |
| } |
| |
| // Finishes construction of the MergeIterState by decoding the bounds if they |
| // exist. If not, we have to pull a block immediately: after Init() is |
| // finished it must be safe to call next_row() and last_row(). |
| // |
| // Decoded bound allocations are done against the arena in 'decoded_bounds_memory'. |
| Status Init(RowBlockMemory* decoded_bounds_memory) { |
| DCHECK(!read_block_); |
| |
| if (iwb_.encoded_bounds) { |
| decoded_bounds_.emplace(&schema(), decoded_bounds_memory); |
| decoded_bounds_->lower = decoded_bounds_->block.row(0); |
| decoded_bounds_->upper = decoded_bounds_->block.row(1); |
| RETURN_NOT_OK(schema().DecodeRowKey( |
| iwb_.encoded_bounds->first, &decoded_bounds_->lower, &decoded_bounds_memory->arena)); |
| RETURN_NOT_OK(schema().DecodeRowKey( |
| iwb_.encoded_bounds->second, &decoded_bounds_->upper, &decoded_bounds_memory->arena)); |
| } else { |
| RETURN_NOT_OK(PullNextBlock()); |
| } |
| |
| return Status::OK(); |
| } |
| |
| // Returns true if the underlying iterator is fully exhausted. |
| bool IsFullyExhausted() const { |
| return (!read_block_ || read_block_->nrows() == 0) && !iwb_.iter->HasNext(); |
| } |
| |
| // Advances to the next row in the underlying iterator. |
| // |
| // If successful, 'pulled_new_block' is true if this block was exhausted and a |
| // new block was pulled from the underlying iterator. |
| Status Advance(size_t num_rows, bool* pulled_new_block); |
| |
| // Adds statistics about the underlying iterator to the given vector. |
| void AddStats(vector<IteratorStats>* stats) const { |
| AddIterStats(*iwb_.iter, stats); |
| } |
| |
| // Returns the number of rows remaining in the current block. |
| size_t remaining_in_block() const { |
| DCHECK(read_block_); |
| return read_block_->nrows() - next_row_idx_; |
| } |
| |
| // Returns the schema from the underlying iterator. |
| const Schema& schema() const { |
| return DCHECK_NOTNULL(iwb_.iter)->schema(); |
| } |
| |
| // Pulls the next block from the underlying iterator. |
| Status PullNextBlock(); |
| |
| // Copies as many rows as possible from the current block of buffered rows to |
| // 'dst' (starting at 'dst_offset'). |
| // |
| // If successful, 'num_rows_copied' will be set to the number of rows copied. |
| Status CopyBlock(RowBlock* dst, size_t dst_offset, size_t* num_rows_copied); |
| |
| // Returns true if the current block in the underlying iterator is exhausted. |
| bool IsBlockExhausted() const { |
| return !read_block_ || read_block_->nrows() == next_row_idx_; |
| } |
| |
| string ToString() const { |
| return Substitute("[$0,$1]: $2", |
| schema().DebugRowKey(next_row()), |
| schema().DebugRowKey(last_row()), |
| iwb_.iter->ToString()); |
| } |
| |
| private: |
| // The iterator (and optional bounds) whose rows are to be merged with other |
| // iterators. |
| // |
| // Must already be Init'ed at MergeIterState construction time. |
| const IterWithBounds iwb_; |
| |
| // Allocates memory for read_block_. |
| RowBlockMemory memory_; |
| |
| // Optional rowset bounds, decoded during Init(). |
| struct DecodedBounds { |
| // 'block' must be constructed immediately; the bounds themselves can be |
| // initialized later. |
| DecodedBounds(const Schema* schema, RowBlockMemory* mem) |
| : block(schema, /*nrows=*/2, mem) {} |
| |
| RowBlock block; |
| RowBlockRow lower; |
| RowBlockRow upper; |
| }; |
| std::optional<DecodedBounds> decoded_bounds_; |
| |
| // Current block of buffered rows from the iterator. |
| // |
| // The memory backing the rows was allocated out of the arena. |
| unique_ptr<RowBlock> read_block_; |
| |
| // The row currently pointed to by the iterator. |
| RowBlockRow next_row_; |
| |
| // The last row available in read_block_. |
| RowBlockRow last_row_; |
| |
| // The index of the row currently pointed to by the iterator. Guaranteed to be |
| // a selected row. |
| size_t next_row_idx_; |
| |
| DISALLOW_COPY_AND_ASSIGN(MergeIterState); |
| }; |
| |
| Status MergeIterState::Advance(size_t num_rows, bool* pulled_new_block) { |
| DCHECK_GE(read_block_->nrows(), next_row_idx_ + num_rows); |
| |
| next_row_idx_ += num_rows; |
| |
| // If advancing didn't exhaust this block outright, find the next selected row. |
| size_t idx; |
| if (!IsBlockExhausted() && |
| read_block_->selection_vector()->FindFirstRowSelected(next_row_idx_, &idx)) { |
| next_row_idx_ = idx; |
| next_row_.Reset(read_block_.get(), next_row_idx_); |
| *pulled_new_block = false; |
| return Status::OK(); |
| } |
| |
| // We either exhausted the block outright, or all subsequent rows were |
| // deselected. Either way, we need to pull the next block. |
| next_row_idx_ = read_block_->nrows(); |
| memory_.Reset(); |
| RETURN_NOT_OK(PullNextBlock()); |
| *pulled_new_block = true; |
| return Status::OK(); |
| } |
| |
| Status MergeIterState::PullNextBlock() { |
| CHECK(IsBlockExhausted()) |
| << "should not pull next block until current block is exhausted"; |
| |
| if (!read_block_) { |
| read_block_.reset(new RowBlock(&schema(), kMergeRowBuffer, &memory_)); |
| } |
| while (iwb_.iter->HasNext()) { |
| RETURN_NOT_OK(iwb_.iter->NextBlock(read_block_.get())); |
| |
| SelectionVector* selection = read_block_->selection_vector(); |
| DCHECK_EQ(selection->nrows(), read_block_->nrows()); |
| DCHECK_LE(selection->CountSelected(), read_block_->nrows()); |
| size_t rows_valid = selection->CountSelected(); |
| VLOG(2) << Substitute("$0/$1 rows selected", rows_valid, read_block_->nrows()); |
| if (rows_valid == 0) { |
| // Short-circuit: this block is entirely unselected and can be skipped. |
| continue; |
| } |
| |
| // Seek next_row_ and last_row_ to the first and last selected rows |
| // respectively (which could be identical). |
| |
| CHECK(selection->FindFirstRowSelected(0, &next_row_idx_)); |
| next_row_.Reset(read_block_.get(), next_row_idx_); |
| |
| // We use a signed size_t type to avoid underflowing when finding last_row_. |
| // |
| // TODO(adar): this can be simplified if there was a BitmapFindLastSet(). |
| for (ssize_t row_idx = read_block_->nrows() - 1; row_idx >= 0; row_idx--) { |
| if (selection->IsRowSelected(row_idx)) { |
| last_row_.Reset(read_block_.get(), row_idx); |
| VLOG(1) << "Pulled new block: " << ToString(); |
| return Status::OK(); |
| } |
| } |
| |
| LOG(FATAL) << "unreachable code"; // guaranteed by the short-circuit above |
| } |
| |
| VLOG(1) << "Fully exhausted iter " << iwb_.iter->ToString(); |
| next_row_idx_ = 0; |
| read_block_.reset(); |
| return Status::OK(); |
| } |
| |
| Status MergeIterState::CopyBlock(RowBlock* dst, size_t dst_offset, |
| size_t* num_rows_copied) { |
| DCHECK(read_block_); |
| DCHECK(!IsBlockExhausted()); |
| |
| size_t num_rows_to_copy = std::min(remaining_in_block(), |
| dst->nrows() - dst_offset); |
| VLOG(3) << Substitute( |
| "Copying $0 rows from RowBlock (s:$1,o:$2) to RowBlock (s:$3,o:$4): $5", |
| num_rows_to_copy, read_block_->nrows(), next_row_idx_, dst->nrows(), |
| dst_offset, ToString()); |
| RETURN_NOT_OK(read_block_->CopyTo(dst, next_row_idx_, |
| dst_offset, num_rows_to_copy)); |
| *num_rows_copied = num_rows_to_copy; |
| return Status::OK(); |
| } |
| |
| // An iterator which merges the results of other iterators, comparing |
| // based on keys. |
| // |
| // Two heaps are used to optimize the merge process. To explain how it works, |
| // let's start with an explanation of a traditional heap-based merge: there |
| // exist N sorted lists of elements and the goal is to produce a single sorted |
| // list containing all of the elements. |
| // |
| // To begin: |
| // - For each list, peek the first element into a per-list buffer. |
| // - Add all of the lists to a min-heap ordered on the per-list buffers. This |
| // means that the heap's top-most entry (accessible in O(1) time) will be the |
| // list containing the smallest not-yet-consumed element. |
| // |
| // To perform the merge, loop while the min-heap isn't empty: |
| // - Pop the top-most list from the min-heap. |
| // - Copy that list's peeked element to the output. |
| // - Peek the list's next element. If the list is empty, discard it. |
| // - If the list has more elements, push it back into the min-heap. |
| // |
| // This algorithm runs in O(n log n) time and is generally superior to a naive |
| // O(n^2) merge. However, it requires peeked elements to remain resident in |
| // memory during the merge. |
| // |
| // The MergeIterator's sub-iterators operate much like the lists described |
| // above: elements correspond to rows and sorting is based on rows' primary |
| // keys. However, there are several important differences that open the door for |
| // further optimization: |
| // 1. Each sub-iterator corresponds to a Kudu rowset, and DiskRowSets' smallest |
| // and largest possible primary keys (i.e. bounds) are known ahead of time. |
| // 2. When iterating on DiskRowSets, peeking even one row means decoding a page |
| // of columnar data for each projected column. This decoded data remains |
| // resident in memory so that we needn't repeat the decoding for each row, |
| // but it means we have a strong motivation to minimize the set of peeked |
| // sub-iterators in order to minimize memory consumption. |
| // 2a. Related to #2, there's little reason not to peek more than row at a time, |
| // since the predominant source of memory usage is in the decoded pages |
| // rather than the buffered rows. |
| // |
| // The aggressive peeking allows us to tweak the list model slightly: instead |
| // of treating sub-iterators as continuous sequences of single rows, we can |
| // think of each as a sequence of discrete row "runs". Each run has a lower |
| // bound (the key of the first row in the run) and an upper bound (the key of |
| // the last row). One run in each sub-iterator is NEXT in that its rows have |
| // been peeked and are resident in memory. When we draw rows from the |
| // sub-iterator, we'll draw them from this run. We can use the bounds to |
| // establish overlapping relationships between runs across sub-iterators. In |
| // theory, the less overlap, the fewer runs need to be considered when merging. |
| // |
| // To exploit this, we need to formally define the concept of a "merge window". |
| // The window describes, at any given time, the key space interval where we |
| // expect to find the row with the smallest key. A sub-iterator whose NEXT |
| // overlaps with the merge window is said to be actively participating in the merge. |
| // |
| // The merge window is defined as follows: |
| // 1. The window's start is the smallest lower bound of all sub-iterators. We |
| // refer to the sub-iterator that owns this lower bound as LOW. |
| // 2. The window’s end is the smallest upper bound of all sub-iterators whose |
| // lower bounds are less than or equal to LOW's upper bound. |
| // 2a. The window's end could be LOW's upper bound itself, if it is the smallest |
| // upper bound, but this isn't necessarily the case. |
| // 3. The merge window's dimensions change as the merge proceeds, though it |
| // only ever moves "to the right" (i.e. the window start/end only increase). |
| // |
| // Armed with the merge window concept, we can bifurcate the sub-iterators into |
| // a set whose NEXTs overlap with the merge window and a set whose NEXTs do not. |
| // We store the first set in a HOT min-heap (ordered by each's NEXT's lower |
| // bound). Now the merge steady state resembles that of a traditional heap-based |
| // merge: the top-most sub-iterator is popped from HOT, the lower bound is |
| // copied to the output and advanced, and the sub-iterator is pushed back to |
| // HOT. The bifurcation hasn't yielded any algorithmic improvements, but the |
| // more non-overlapped the input (i.e. the more compacted the tablet), the fewer |
| // sub-iterators will be in HOT and thus the fewer comparisons and heap motion |
| // will take place. |
| // |
| // How do sub-iterators move between the two sets? At the outset, we examine all |
| // sub-iterators to find the initial merge window and bifurcate accordingly. In |
| // the steady state, we need to move to HOT whenever the end of the merge window |
| // moves; that's a sign that the window may now overlap with a NEXT belonging to |
| // a sub-iterator in the second set. The end of the merge window moves when a |
| // sub-iterator is fully exhausted (i.e. all rows have been copied to the |
| // output), or when a sub-iterator finishes its NEXT and needs to peek again. |
| // |
| // But which sub-iterators should be moved? To answer this question efficiently, |
| // we need another heap: we'll define a COLD min-heap for sub-iterators in the |
| // second set, ordered by each's NEXT's lower bound. At any given time, the |
| // NEXT belonging to the top-most sub-iterator in COLD is nearest the merge |
| // window. |
| // |
| // When the merge window's end has moved and we need to refill HOT, the |
| // top-most sub-iterator in COLD is the best candidate. To figure out whether |
| // it should be moved, we compare its NEXT's lower bound against the upper |
| // bound in HOT's first iterator: if the lower bound is less than or equal to |
| // the key, we move the sub-iterator from COLD to HOT. On the flip side, when a |
| // sub-iterator from HOT finishes its NEXT and peeks again, we also need to |
| // check whether it has exited the merge window. The approach is similar: if |
| // its NEXT's lower bound is greater than the upper bound of HOT'S first |
| // iterator, it's time to move it to COLD. |
| // |
| // There's one more piece to this puzzle: the absolute bounds that are known |
| // ahead of time are used as stand-ins for NEXT's lower and upper bounds. This |
| // helps defer peeking for as long as possible, at least until the sub-iterator |
| // moves from COLD to HOT. After that, peeked memory must remain resident until |
| // the sub-iterator is fully exhausted. |
| // |
| // TODO(awong): there is a variant of this algorithm that defines another |
| // container to further optimize the size of the merge window: HOTMAXES, an |
| // ordered set for sub-iterators in HOT, ordered by each sub-iterator's upper |
| // bound. At any given time, the first iterator in HOTMAXES represents the |
| // optimal end of the merge window, allowing us to move elements onto COLD via |
| // comparison to the first value of HOTMAXES. Some experiments defined this |
| // using std::set and found its maintenance sometimes takes more than it nets. |
| // Experiment with a less memory-hungry data structure (maybe absl::btree?). |
| // |
| // For another description of this algorithm including pseudocode, see |
| // https://docs.google.com/document/d/1uP0ubjM6ulnKVCRrXtwT_dqrTWjF9tlFSRk0JN2e_O0/edit# |
| class MergeIterator : public RowwiseIterator { |
| public: |
| // Constructs a MergeIterator of the given iterators. |
| // |
| // The iterators must have matching schemas and should not yet be initialized. |
| // |
| // Note: the iterators must be constructed using a projection that includes |
| // all key columns; otherwise a CHECK will fire at initialization time. |
| MergeIterator(MergeIteratorOptions opts, vector<IterWithBounds> iters); |
| |
| virtual ~MergeIterator(); |
| |
| // The passed-in iterators should be already initialized. |
| Status Init(ScanSpec *spec) OVERRIDE; |
| |
| virtual bool HasNext() const OVERRIDE; |
| |
| virtual string ToString() const OVERRIDE; |
| |
| virtual const Schema& schema() const OVERRIDE; |
| |
| virtual void GetIteratorStats(vector<IteratorStats>* stats) const OVERRIDE; |
| |
| virtual Status NextBlock(RowBlock* dst) OVERRIDE; |
| |
| private: |
| // Materializes as much of the next block's worth of data into 'dst' at offset |
| // 'dst_row_idx' as possible. Only permitted when there's only one |
| // sub-iterator in the hot heap. |
| // |
| // On success, the selection vector in 'dst' and 'dst_row_idx' are both updated. |
| Status MaterializeBlock(RowBlock* dst, size_t* dst_row_idx); |
| |
| // Finds the next row and materializes it into 'dst' at offset 'dst_row_idx'. |
| // |
| // On success, the selection vector in 'dst' and 'dst_row_idx' are both updated. |
| Status MaterializeOneRow(RowBlock* dst, size_t* dst_row_idx); |
| |
| // Calls Init() on all of sub-iterators, wrapping them in predicate evaluating |
| // iterators if necessary and setting up additional per-iterator bookkeeping. |
| Status InitSubIterators(ScanSpec *spec); |
| |
| // Advances 'state' by 'num_rows_to_advance', destroying it and/or updating |
| // 'hot_' and 'cold_' if necessary. |
| Status AdvanceAndReheap(MergeIterState* state, size_t num_rows_to_advance); |
| |
| // Moves sub-iterators from cold_ to hot_ if they now overlap with the merge |
| // window. Should be called whenever the merge window moves. |
| Status RefillHotHeap(); |
| |
| // Destroys a fully exhausted sub-iterator. |
| void DestroySubIterator(MergeIterState* state); |
| |
| const MergeIteratorOptions opts_; |
| |
| // Initialized during Init. |
| unique_ptr<Schema> schema_; |
| |
| bool initted_; |
| |
| // Holds the sub-iterators until Init is called, at which point this is |
| // cleared. This is required because we can't create a MergeIterState of an |
| // uninitialized sub-iterator. |
| vector<IterWithBounds> orig_iters_; |
| |
| // See UnionIterator::iters_lock_ for details on locking. This follows the same |
| // pattern. |
| mutable rw_spinlock states_lock_; |
| boost::intrusive::list<MergeIterState> states_; // NOLINT(*) |
| |
| // Statistics (keyed by projection column index) accumulated so far by any |
| // fully-consumed sub-iterators. |
| vector<IteratorStats> finished_iter_stats_by_col_; |
| |
| // The number of iterators, used by ToString(). |
| const int num_orig_iters_; |
| |
| // When the underlying iterators are initialized, each needs its own |
| // copy of the scan spec in order to do its own pushdown calculations, etc. |
| // The copies are allocated from this pool so they can be automatically freed |
| // when the UnionIterator goes out of scope. |
| ObjectPool<ScanSpec> scan_spec_copies_; |
| |
| // Arena dedicated to MergeIterState bounds decoding. |
| // |
| // Each MergeIterState has an arena for buffered row data, but it is reset |
| // every time a new block is pulled. This single arena ensures that a |
| // MergeIterState's decoded bounds remain allocated for its lifetime. |
| RowBlockMemory decoded_bounds_memory_; |
| |
| // Min-heap that orders sub-iterators by their next row key. A call to top() |
| // will yield the sub-iterator with the smallest next row key. |
| struct MergeIterStateComparator { |
| bool operator()(const MergeIterState* a, const MergeIterState* b) const { |
| // This is counter-intuitive, but it's because boost::heap defaults to |
| // a max-heap; the comparator must be inverted to yield a min-heap. |
| return a->schema().Compare(a->next_row(), b->next_row()) > 0; |
| } |
| }; |
| typedef boost::heap::skew_heap< |
| MergeIterState*, boost::heap::compare<MergeIterStateComparator>> MergeStateMinHeap; |
| |
| // The min-heaps as described in the algorithm above. |
| // |
| // Note that none of these containers "own" the objects they contain: the |
| // MergeIterStates are all owned by states_. Care must be taken to remove |
| // entries from the containers in such a way that does not access the |
| // corresponding objects, even if they are destroyed (e.g. if an iterator |
| // state becomes fully exhausted while still in the containers). |
| // |
| // Boost offers a variety of different heap data structures[1]. Perf testing |
| // via generic_iterators-test (TestMerge and TestMergeNonOverlapping with |
| // num_iters=10, num_lists=1000, and num_rows=1000) shows that while all heaps |
| // perform more or less equally well for non-overlapping input, skew heaps |
| // outperform the rest for overlapping input. Basic priority queues (i.e. |
| // boost::heap::priority_queue and std::priority_queue) were excluded as they |
| // do not offer ordered iteration. |
| // |
| // 1. https://www.boost.org/doc/libs/1_69_0/doc/html/heap/data_structures.html |
| MergeStateMinHeap hot_; |
| MergeStateMinHeap cold_; |
| }; |
| |
| MergeIterator::MergeIterator(MergeIteratorOptions opts, |
| vector<IterWithBounds> iters) |
| : opts_(opts), |
| initted_(false), |
| orig_iters_(std::move(iters)), |
| num_orig_iters_(orig_iters_.size()), |
| decoded_bounds_memory_(1024) { |
| CHECK_GT(orig_iters_.size(), 0); |
| } |
| |
| MergeIterator::~MergeIterator() { |
| states_.clear_and_dispose([](MergeIterState* s) { delete s; }); |
| } |
| |
| Status MergeIterator::Init(ScanSpec *spec) { |
| CHECK(!initted_); |
| |
| // Initialize the iterators and the per-iterator merge states. |
| // |
| // When this method finishes, orig_iters_ has been cleared and states_ has |
| // been populated. |
| RETURN_NOT_OK(InitSubIterators(spec)); |
| |
| // Verify that the schemas match in debug builds. |
| // |
| // It's important to do the verification after initializing the iterators, as |
| // they may not know their own schemas until they've been initialized (in the |
| // case of a union of unions). |
| schema_.reset(new Schema(states_.front().schema())); |
| CHECK_GT(schema_->num_key_columns(), 0); |
| finished_iter_stats_by_col_.resize(schema_->num_columns()); |
| #ifndef NDEBUG |
| for (const auto& s : states_) { |
| if (s.schema() != *schema_) { |
| return Status::InvalidArgument( |
| Substitute("Schemas do not match: $0 vs. $1", |
| schema_->ToString(), s.schema().ToString())); |
| } |
| } |
| #endif |
| |
| if (opts_.include_deleted_rows && |
| schema_->first_is_deleted_virtual_column_idx() == Schema::kColumnNotFound) { |
| return Status::InvalidArgument("Merge iterator cannot deduplicate deleted " |
| "rows without an IS_DELETED column"); |
| } |
| |
| // Before we copy any rows, clean up any iterators which were empty |
| // to start with. Otherwise, HasNext() won't properly return false |
| // if we were passed only empty iterators. |
| states_.remove_and_dispose_if( |
| [](const MergeIterState& s) { return PREDICT_FALSE(s.IsFullyExhausted()); }, |
| [](MergeIterState* s) { delete s; }); |
| |
| // Establish the merge window and initialize the ordered iterator containers. |
| for (auto& s : states_) { |
| cold_.push(&s); |
| } |
| RETURN_NOT_OK(RefillHotHeap()); |
| |
| initted_ = true; |
| return Status::OK(); |
| } |
| |
| bool MergeIterator::HasNext() const { |
| CHECK(initted_); |
| return !states_.empty(); |
| } |
| |
| Status MergeIterator::InitSubIterators(ScanSpec *spec) { |
| // Initialize all the sub iterators. |
| for (auto& i : orig_iters_) { |
| ScanSpec *spec_copy = spec != nullptr ? scan_spec_copies_.Construct(*spec) : nullptr; |
| RETURN_NOT_OK(InitAndMaybeWrap(&i.iter, spec_copy)); |
| unique_ptr<MergeIterState> state(new MergeIterState(std::move(i))); |
| RETURN_NOT_OK(state->Init(&decoded_bounds_memory_)); |
| states_.push_back(*state.release()); |
| } |
| orig_iters_.clear(); |
| |
| // Since we handle predicates in all the wrapped iterators, we can clear |
| // them here. |
| if (spec != nullptr) { |
| spec->RemovePredicates(); |
| } |
| return Status::OK(); |
| } |
| |
| Status MergeIterator::AdvanceAndReheap(MergeIterState* state, |
| size_t num_rows_to_advance) { |
| DCHECK_EQ(state, hot_.top()); |
| bool pulled_new_block = false; |
| RETURN_NOT_OK(state->Advance(num_rows_to_advance, &pulled_new_block)); |
| hot_.pop(); |
| |
| if (state->IsFullyExhausted()) { |
| DestroySubIterator(state); |
| |
| // This sub-iterator's removal means the end of the merge window may have shifted. |
| RETURN_NOT_OK(RefillHotHeap()); |
| } else if (pulled_new_block) { |
| // This sub-iterator has a new block, which means the end of the merge window |
| // may have shifted. |
| if (!hot_.empty() && |
| schema_->Compare(hot_.top()->last_row(), state->next_row()) < 0) { |
| // The new block lies beyond the new end of the merge window. |
| VLOG(2) << "Block finished, became cold: " << state->ToString(); |
| cold_.push(state); |
| } else { |
| // The new block is still within the merge window. |
| VLOG(2) << "Block finished, still hot: " << state->ToString(); |
| hot_.push(state); |
| } |
| RETURN_NOT_OK(RefillHotHeap()); |
| } else { |
| // The sub-iterator's block's upper bound remains the same; the merge window |
| // has not changed. |
| hot_.push(state); |
| } |
| return Status::OK(); |
| } |
| |
| Status MergeIterator::RefillHotHeap() { |
| VLOG(2) << "Refilling hot heap"; |
| while (!cold_.empty() && |
| (hot_.empty() || |
| schema_->Compare(hot_.top()->last_row(), cold_.top()->next_row()) >= 0)) { |
| MergeIterState* warmest = cold_.top(); |
| cold_.pop(); |
| |
| // This will only be true once per sub-iterator, when it becomes hot for the |
| // very first time. |
| if (warmest->IsBlockExhausted()) { |
| RETURN_NOT_OK(warmest->PullNextBlock()); |
| |
| // After pulling a block, we can't just assume 'warmest' overlaps the |
| // merge window: there could have been a huge gap between the pulled block |
| // and the sub-iterator's absolute bounds. In other words, although the |
| // bounds told us that 'warmest' was the best candidate, the block is the |
| // ultimate source of truth. |
| // |
| // To deal with this, we pretend 'warmest' doesn't overlap and restart the |
| // algorithm. In the worst case (little to no gap between the block and |
| // the bounds), we'll pop 'warmest' right back out again. |
| if (warmest->IsFullyExhausted()) { |
| DestroySubIterator(warmest); |
| } else { |
| cold_.push(warmest); |
| } |
| continue; |
| } |
| VLOG(2) << "Became hot: " << warmest->ToString(); |
| hot_.push(warmest); |
| } |
| if (VLOG_IS_ON(2)) { |
| VLOG(2) << "Done refilling hot heap"; |
| for (const auto* c : cold_) { |
| VLOG(2) << "Still cold: " << c->ToString(); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| void MergeIterator::DestroySubIterator(MergeIterState* state) { |
| DCHECK(state->IsFullyExhausted()); |
| |
| std::lock_guard<rw_spinlock> l(states_lock_); |
| state->AddStats(&finished_iter_stats_by_col_); |
| states_.erase_and_dispose(states_.iterator_to(*state), |
| [](MergeIterState* s) { delete s; }); |
| } |
| |
| Status MergeIterator::NextBlock(RowBlock* dst) { |
| VLOG(3) << "Called NextBlock (" << dst->nrows() << " rows) on " << ToString(); |
| CHECK(initted_); |
| DCHECK_SCHEMA_EQ(*dst->schema(), schema()); |
| |
| if (dst->arena()) { |
| dst->arena()->Reset(); |
| } |
| |
| size_t dst_row_idx = 0; |
| while (dst_row_idx < dst->nrows()) { |
| // If the hot heap is empty, we must be out of sub-iterators. |
| if (PREDICT_FALSE(hot_.empty())) { |
| DCHECK(states_.empty()); |
| break; |
| } |
| |
| // If there's just one hot sub-iterator, we can copy its entire block |
| // instead of copying row-by-row. |
| // |
| // When N sub-iterators fully overlap, there'll only be one hot sub-iterator |
| // when consuming the very last row. A block copy for this case is more |
| // overhead than just copying out the last row. |
| // |
| // TODO(adar): this can be further optimized by "attaching" data to 'dst' |
| // rather than copying it. |
| if (hot_.size() == 1 && hot_.top()->remaining_in_block() > 1) { |
| RETURN_NOT_OK(MaterializeBlock(dst, &dst_row_idx)); |
| } else { |
| RETURN_NOT_OK(MaterializeOneRow(dst, &dst_row_idx)); |
| } |
| } |
| |
| if (dst_row_idx < dst->nrows()) { |
| dst->Resize(dst_row_idx); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status MergeIterator::MaterializeBlock(RowBlock* dst, size_t* dst_row_idx) { |
| DCHECK_EQ(1, hot_.size()); |
| |
| MergeIterState* state = hot_.top(); |
| size_t num_rows_copied; |
| RETURN_NOT_OK(state->CopyBlock(dst, *dst_row_idx, &num_rows_copied)); |
| RETURN_NOT_OK(AdvanceAndReheap(state, num_rows_copied)); |
| |
| // CopyBlock() already updated dst's SelectionVector. |
| *dst_row_idx += num_rows_copied; |
| return Status::OK(); |
| } |
| |
| // TODO(todd): this is an obvious spot to add codegen - there's a ton of branching |
| // and such around the comparisons. A simple experiment indicated there's some |
| // 2x to be gained. |
| Status MergeIterator::MaterializeOneRow(RowBlock* dst, size_t* dst_row_idx) { |
| // We need a vector to track the iterators whose next_row() contains the |
| // smallest row key at a given moment during the merge because there may be |
| // multiple deleted rows with the same row key across multiple rowsets, and up |
| // to one live instance, that we have to deduplicate. |
| vector<MergeIterState*> smallest; |
| smallest.reserve(hot_.size()); |
| |
| // Find the set of sub-iterators whose matching next row keys are the smallest |
| // across all sub-iterators. |
| // |
| // Note: heap ordered iteration isn't the same as a total ordering. For |
| // example, the two absolute smallest keys might be in the same sub-iterator |
| // rather than in the first two sub-iterators yielded via ordered iteration. |
| // However, the goal here is to identify a group of matching keys for the |
| // purpose of deduplication, and we're guaranteed that such matching keys |
| // cannot exist in the same sub-iterator (i.e. the same rowset). |
| for (auto iter = hot_.ordered_begin(); iter != hot_.ordered_end(); ++iter) { |
| MergeIterState* state = *iter; |
| if (!smallest.empty() && |
| schema_->Compare(state->next_row(), smallest[0]->next_row()) != 0) { |
| break; |
| } |
| smallest.emplace_back(state); |
| } |
| |
| MergeIterState* row_to_return_iter = nullptr; |
| if (!opts_.include_deleted_rows) { |
| // Since deleted rows are not included here, there can only be a single |
| // instance of any given row key in 'smallest'. |
| CHECK_EQ(1, smallest.size()) << "expected only a single smallest row"; |
| row_to_return_iter = smallest[0]; |
| } else { |
| // Deduplicate any deleted rows. Row instance de-duplication criteria: |
| // 1. If there is a non-deleted instance, return that instance. |
| // 2. If all rows are deleted, any instance will suffice because we |
| // don't guarantee that we will return valid field values for deleted |
| // rows. |
| int live_rows_found = 0; |
| for (const auto& s : smallest) { |
| bool is_deleted = |
| *schema_->ExtractColumnFromRow<IS_DELETED>( |
| s->next_row(), schema_->first_is_deleted_virtual_column_idx()); |
| if (!is_deleted) { |
| // We found the single live instance of the row. |
| row_to_return_iter = s; |
| #ifndef NDEBUG |
| live_rows_found++; // In DEBUG mode, do a sanity-check count of the live rows. |
| #else |
| break; // In RELEASE mode, short-circuit the loop. |
| #endif |
| } |
| } |
| DCHECK_LE(live_rows_found, 1) << "expected at most one live row"; |
| |
| // If all instances of a given row are deleted then return an arbitrary |
| // deleted instance. |
| if (row_to_return_iter == nullptr) { |
| row_to_return_iter = smallest[0]; |
| DCHECK(*schema_->ExtractColumnFromRow<IS_DELETED>( |
| row_to_return_iter->next_row(), schema_->first_is_deleted_virtual_column_idx())) |
| << "expected deleted row"; |
| } |
| } |
| VLOG(3) << Substitute("Copying row $0 from $1", |
| *dst_row_idx, row_to_return_iter->ToString()); |
| RowBlockRow dst_row = dst->row(*dst_row_idx); |
| RETURN_NOT_OK(CopyRow(row_to_return_iter->next_row(), &dst_row, dst->arena())); |
| |
| // Advance all matching sub-iterators and remove any that are exhausted. |
| // Since we're advancing iterators of the same row starting with hot_.top(), |
| // each of these calls is effectively being called on hot_.top(), since the |
| // reheaping will put each top iterator farther down the hot heap. |
| for (auto& s : smallest) { |
| RETURN_NOT_OK(AdvanceAndReheap(s, /*num_rows_to_advance=*/1)); |
| } |
| |
| dst->selection_vector()->SetRowSelected(*dst_row_idx); |
| (*dst_row_idx)++; |
| return Status::OK(); |
| } |
| |
| string MergeIterator::ToString() const { |
| return Substitute("Merge($0 iters)", num_orig_iters_); |
| } |
| |
| const Schema& MergeIterator::schema() const { |
| CHECK(initted_); |
| return *schema_; |
| } |
| |
| void MergeIterator::GetIteratorStats(vector<IteratorStats>* stats) const { |
| shared_lock<rw_spinlock> l(states_lock_); |
| CHECK(initted_); |
| *stats = finished_iter_stats_by_col_; |
| |
| for (const auto& s : states_) { |
| s.AddStats(stats); |
| } |
| } |
| |
| unique_ptr<RowwiseIterator> NewMergeIterator( |
| MergeIteratorOptions opts, vector<IterWithBounds> iters) { |
| return unique_ptr<RowwiseIterator>(new MergeIterator(opts, std::move(iters))); |
| } |
| |
| //////////////////////////////////////////////////////////// |
| // UnionIterator |
| //////////////////////////////////////////////////////////// |
| |
| // An iterator which unions the results of other iterators. |
| // This is different from MergeIterator in that it lays the results out end-to-end |
| // rather than merging them based on keys. Hence it is more efficient since there is |
| // no comparison needed, and the key column does not need to be read if it is not |
| // part of the projection. |
| class UnionIterator : public RowwiseIterator { |
| public: |
| // Constructs a UnionIterator of the given iterators. |
| // |
| // The iterators must have matching schemas and should not yet be initialized. |
| explicit UnionIterator(vector<IterWithBounds> iters); |
| |
| Status Init(ScanSpec *spec) OVERRIDE; |
| |
| bool HasNext() const OVERRIDE; |
| |
| string ToString() const OVERRIDE; |
| |
| const Schema &schema() const OVERRIDE { |
| CHECK(initted_); |
| CHECK(schema_.get() != NULL) << "Bad schema in " << ToString(); |
| return *CHECK_NOTNULL(schema_.get()); |
| } |
| |
| virtual void GetIteratorStats(vector<IteratorStats>* stats) const OVERRIDE; |
| |
| virtual Status NextBlock(RowBlock* dst) OVERRIDE; |
| |
| private: |
| void PrepareBatch(); |
| Status MaterializeBlock(RowBlock* dst); |
| void FinishBatch(); |
| Status InitSubIterators(ScanSpec *spec); |
| |
| // Pop the front iterator from iters_ and accumulate its statistics into |
| // finished_iter_stats_by_col_. |
| void PopFront(); |
| |
| // Schema: initialized during Init() |
| unique_ptr<Schema> schema_; |
| |
| bool initted_; |
| |
| // Lock protecting 'iters_' and 'finished_iter_stats_by_col_'. |
| // |
| // Scanners are mostly accessed by the thread doing the scanning, but the HTTP endpoint |
| // which lists running scans may occasionally need to read as well. |
| // |
| // The "owner" thread of the scanner doesn't need to acquire this in read mode, since |
| // it's the only thread which might write. However, it does need to acquire in write |
| // mode when changing 'iters_'. |
| mutable rw_spinlock iters_lock_; |
| deque<IterWithBounds> iters_; |
| |
| // Statistics (keyed by projection column index) accumulated so far by any |
| // fully-consumed sub-iterators. |
| vector<IteratorStats> finished_iter_stats_by_col_; |
| |
| // When the underlying iterators are initialized, each needs its own |
| // copy of the scan spec in order to do its own pushdown calculations, etc. |
| // The copies are allocated from this pool so they can be automatically freed |
| // when the UnionIterator goes out of scope. |
| ObjectPool<ScanSpec> scan_spec_copies_; |
| }; |
| |
| UnionIterator::UnionIterator(vector<IterWithBounds> iters) |
| : initted_(false), |
| iters_(std::make_move_iterator(iters.begin()), |
| std::make_move_iterator(iters.end())) { |
| CHECK_GT(iters_.size(), 0); |
| } |
| |
| Status UnionIterator::Init(ScanSpec *spec) { |
| CHECK(!initted_); |
| |
| // Initialize the underlying iterators |
| RETURN_NOT_OK(InitSubIterators(spec)); |
| |
| // Verify that the schemas match in debug builds. |
| // |
| // It's important to do the verification after initializing the iterators, as |
| // they may not know their own schemas until they've been initialized (in the |
| // case of a union of unions). |
| schema_.reset(new Schema(iters_.front().iter->schema())); |
| finished_iter_stats_by_col_.resize(schema_->num_columns()); |
| #ifndef NDEBUG |
| for (const auto& i : iters_) { |
| if (i.iter->schema() != *schema_) { |
| return Status::InvalidArgument( |
| Substitute("Schemas do not match: $0 vs. $1", |
| schema_->ToString(), i.iter->schema().ToString())); |
| } |
| } |
| #endif |
| |
| initted_ = true; |
| return Status::OK(); |
| } |
| |
| |
| Status UnionIterator::InitSubIterators(ScanSpec *spec) { |
| for (auto& i : iters_) { |
| ScanSpec *spec_copy = spec != nullptr ? scan_spec_copies_.Construct(*spec) : nullptr; |
| RETURN_NOT_OK(InitAndMaybeWrap(&i.iter, spec_copy)); |
| |
| // The union iterator doesn't care about these, so let's drop them now to |
| // free some memory. |
| i.encoded_bounds.reset(); |
| } |
| // Since we handle predicates in all the wrapped iterators, we can clear |
| // them here. |
| if (spec != nullptr) { |
| spec->RemovePredicates(); |
| } |
| return Status::OK(); |
| } |
| |
| bool UnionIterator::HasNext() const { |
| CHECK(initted_); |
| for (const auto& i : iters_) { |
| if (i.iter->HasNext()) return true; |
| } |
| |
| return false; |
| } |
| |
| Status UnionIterator::NextBlock(RowBlock* dst) { |
| CHECK(initted_); |
| PrepareBatch(); |
| RETURN_NOT_OK(MaterializeBlock(dst)); |
| FinishBatch(); |
| return Status::OK(); |
| } |
| |
| void UnionIterator::PrepareBatch() { |
| CHECK(initted_); |
| |
| while (!iters_.empty() && |
| !iters_.front().iter->HasNext()) { |
| PopFront(); |
| } |
| } |
| |
| Status UnionIterator::MaterializeBlock(RowBlock *dst) { |
| return iters_.front().iter->NextBlock(dst); |
| } |
| |
| void UnionIterator::FinishBatch() { |
| if (!iters_.front().iter->HasNext()) { |
| // Iterator exhausted, remove it. |
| PopFront(); |
| } |
| } |
| |
| void UnionIterator::PopFront() { |
| std::lock_guard<rw_spinlock> l(iters_lock_); |
| AddIterStats(*iters_.front().iter, &finished_iter_stats_by_col_); |
| iters_.pop_front(); |
| } |
| |
| string UnionIterator::ToString() const { |
| string s; |
| s.append("Union("); |
| s += JoinMapped(iters_, [](const IterWithBounds& i) { |
| return i.iter->ToString(); |
| }, ","); |
| s.append(")"); |
| return s; |
| } |
| |
| void UnionIterator::GetIteratorStats(vector<IteratorStats>* stats) const { |
| CHECK(initted_); |
| shared_lock<rw_spinlock> l(iters_lock_); |
| *stats = finished_iter_stats_by_col_; |
| if (!iters_.empty()) { |
| AddIterStats(*iters_.front().iter, stats); |
| } |
| } |
| |
| unique_ptr<RowwiseIterator> NewUnionIterator(vector<IterWithBounds> iters) { |
| return unique_ptr<RowwiseIterator>(new UnionIterator(std::move(iters))); |
| } |
| |
| //////////////////////////////////////////////////////////// |
| // MaterializingIterator |
| //////////////////////////////////////////////////////////// |
| |
| // An iterator which wraps a ColumnwiseIterator, materializing it into full rows. |
| // |
| // Column predicates are pushed down into this iterator. While materializing a |
| // block, columns with associated predicates are materialized first, and the |
| // predicates evaluated. If the predicates succeed in filtering out an entire |
| // batch, then other columns may avoid doing any IO. |
| class MaterializingIterator : public RowwiseIterator { |
| public: |
| explicit MaterializingIterator(unique_ptr<ColumnwiseIterator> iter); |
| |
| // Initialize the iterator, performing predicate pushdown as described above. |
| Status Init(ScanSpec *spec) OVERRIDE; |
| |
| bool HasNext() const OVERRIDE; |
| |
| string ToString() const OVERRIDE; |
| |
| const Schema &schema() const OVERRIDE { |
| return iter_->schema(); |
| } |
| |
| virtual void GetIteratorStats(std::vector<IteratorStats>* stats) const OVERRIDE { |
| iter_->GetIteratorStats(stats); |
| predicates_effectiveness_ctx_.PopulateIteratorStatsWithDisabledPredicates( |
| col_idx_predicates_, stats); |
| } |
| |
| virtual Status NextBlock(RowBlock* dst) OVERRIDE; |
| |
| const IteratorPredicateEffectivenessContext& effectiveness_context() const { |
| return predicates_effectiveness_ctx_; |
| } |
| |
| private: |
| Status MaterializeBlock(RowBlock *dst); |
| |
| unique_ptr<ColumnwiseIterator> iter_; |
| |
| // List of (column index, predicate) in order of most to least selective, with |
| // ties broken by the index. |
| vector<ColumnIdxAndPredicate> col_idx_predicates_; |
| |
| // List of column indexes without predicates to materialize. |
| vector<int32_t> non_predicate_column_indexes_; |
| |
| // Predicate effective contexts help disable ineffective column predicates. |
| IteratorPredicateEffectivenessContext predicates_effectiveness_ctx_; |
| |
| // Set only by test code to disallow pushdown. |
| bool disallow_pushdown_for_tests_; |
| bool disallow_decoder_eval_; |
| }; |
| |
| MaterializingIterator::MaterializingIterator(unique_ptr<ColumnwiseIterator> iter) |
| : iter_(std::move(iter)), |
| disallow_pushdown_for_tests_(!FLAGS_materializing_iterator_do_pushdown), |
| disallow_decoder_eval_(!FLAGS_materializing_iterator_decoder_eval) { |
| } |
| |
| Status MaterializingIterator::Init(ScanSpec *spec) { |
| RETURN_NOT_OK(iter_->Init(spec)); |
| |
| const int32_t num_columns = schema().num_columns(); |
| col_idx_predicates_.clear(); |
| non_predicate_column_indexes_.clear(); |
| if (PREDICT_TRUE(!disallow_pushdown_for_tests_) && spec != nullptr && |
| !spec->predicates().empty()) { |
| col_idx_predicates_.reserve(spec->predicates().size()); |
| DCHECK_GE(num_columns, spec->predicates().size()); |
| non_predicate_column_indexes_.reserve(num_columns - spec->predicates().size()); |
| |
| for (const auto& col_pred : spec->predicates()) { |
| const ColumnPredicate& pred = col_pred.second; |
| int col_idx = schema().find_column(pred.column().name()); |
| if (col_idx == Schema::kColumnNotFound) { |
| return Status::InvalidArgument("No such column", col_pred.first); |
| } |
| VLOG(1) << "Pushing down predicate " << pred.ToString(); |
| col_idx_predicates_.emplace_back(col_idx, col_pred.second); |
| } |
| |
| for (int32_t col_idx = 0; col_idx < num_columns; col_idx++) { |
| if (!ContainsKey(spec->predicates(), schema().column(col_idx).name())) { |
| non_predicate_column_indexes_.emplace_back(col_idx); |
| } |
| } |
| |
| // Since we'll evaluate these predicates ourselves, remove them from the |
| // scan spec so higher layers don't repeat our work. |
| spec->RemovePredicates(); |
| } else { |
| non_predicate_column_indexes_.resize(num_columns); |
| std::iota(non_predicate_column_indexes_.begin(), |
| non_predicate_column_indexes_.end(), 0); |
| } |
| |
| // Sort the predicates by selectivity so that the most selective are evaluated |
| // earlier, with ties broken by the column index. |
| sort(col_idx_predicates_.begin(), col_idx_predicates_.end(), |
| [] (const ColumnIdxAndPredicate& left, |
| const ColumnIdxAndPredicate& right) { |
| int comp = SelectivityComparator(left.second, right.second); |
| return comp ? comp < 0 : left.first < right.first; |
| }); |
| |
| // Important to initialize the effectiveness contexts after sorting the predicates |
| // to get right order of predicate indices. |
| for (int pred_idx = 0; pred_idx < col_idx_predicates_.size(); pred_idx++) { |
| const auto* predicate = &col_idx_predicates_[pred_idx].second; |
| if (IsColumnPredicateDisableable(predicate->predicate_type())) { |
| predicates_effectiveness_ctx_.AddDisableablePredicate(pred_idx, predicate); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| bool MaterializingIterator::HasNext() const { |
| return iter_->HasNext(); |
| } |
| |
| Status MaterializingIterator::NextBlock(RowBlock* dst) { |
| size_t n = dst->row_capacity(); |
| if (dst->arena()) { |
| dst->arena()->Reset(); |
| } |
| |
| RETURN_NOT_OK(iter_->PrepareBatch(&n)); |
| dst->Resize(n); |
| RETURN_NOT_OK(MaterializeBlock(dst)); |
| RETURN_NOT_OK(iter_->FinishBatch()); |
| |
| return Status::OK(); |
| } |
| |
| Status MaterializingIterator::MaterializeBlock(RowBlock *dst) { |
| // Initialize the selection vector indicating which rows have been |
| // been deleted. |
| RETURN_NOT_OK(iter_->InitializeSelectionVector(dst->selection_vector())); |
| |
| // It's relatively common to delete large sequential chunks of rows. |
| // We can fast-path that case and avoid reading any column data. |
| if (!dst->selection_vector()->AnySelected()) { |
| DVLOG(1) << "Fast path over " << dst->nrows() << " deleted rows"; |
| return Status::OK(); |
| } |
| |
| predicates_effectiveness_ctx_.IncrementNextBlockCount(); |
| for (int i = 0; i < col_idx_predicates_.size(); i++) { |
| const auto& col_pred = col_idx_predicates_[i]; |
| const auto& col_idx = get<0>(col_pred); |
| const auto& predicate = get<1>(col_pred); |
| |
| // Materialize the column itself into the row block. |
| ColumnBlock dst_col(dst->column_block(col_idx)); |
| ColumnMaterializationContext ctx(col_idx, |
| &predicate, |
| &dst_col, |
| dst->selection_vector()); |
| // None predicates should be short-circuited in scan spec. |
| DCHECK(ctx.pred()->predicate_type() != PredicateType::None); |
| auto* effectiveness_ctx = IsColumnPredicateDisableable(predicate.predicate_type()) ? |
| &predicates_effectiveness_ctx_[i] : nullptr; |
| // Predicate evaluation will be disabled for a predicate already determined to be ineffective |
| // both at decoder level and outside. |
| // |
| // Indicate whether the predicate has been disabled or not. If the predicate is not disableable |
| // (currently only Bloom filter predicates are disableable), both of these are false. |
| bool disableable_predicate_disabled = |
| effectiveness_ctx && !effectiveness_ctx->IsPredicateEnabled(); |
| bool disableable_predicate_enabled = |
| effectiveness_ctx && effectiveness_ctx->IsPredicateEnabled(); |
| |
| if (disallow_decoder_eval_ || disableable_predicate_disabled) { |
| // This column predicate is determined to be ineffective, hence disable the decoder level |
| // evaluation. |
| ctx.SetDecoderEvalNotSupported(); |
| } |
| |
| // Determine the number of rows filtered out by this predicate, if disableable. |
| // |
| // Currently we don't have a mechanism to get precise number of rows filtered out by a |
| // particular predicate and adding such a stat could in fact slow down the filtering. |
| // This count of rows filtered out is not precise as the rows filtered out by predicates |
| // earlier in the sort order get more credit than the ones later in the order. Nevertheless it |
| // still helps measure whether the predicate is effective in filtering out rows. |
| auto num_rows_before = disableable_predicate_enabled ? |
| dst->selection_vector()->CountSelected() : 0; |
| |
| RETURN_NOT_OK(iter_->MaterializeColumn(&ctx)); |
| if (ctx.DecoderEvalNotSupported() && !disableable_predicate_disabled) { |
| predicate.Evaluate(dst_col, dst->selection_vector()); |
| } |
| if (disableable_predicate_enabled) { |
| auto num_rows_rejected = num_rows_before - dst->selection_vector()->CountSelected(); |
| DCHECK_GE(num_rows_rejected, 0); |
| DCHECK_LE(num_rows_rejected, num_rows_before); |
| effectiveness_ctx->rows_rejected += num_rows_rejected; |
| effectiveness_ctx->rows_read += dst->nrows(); |
| } |
| |
| // If after evaluating this predicate the entire row block has been filtered |
| // out, we don't need to materialize other columns at all. |
| if (!dst->selection_vector()->AnySelected()) { |
| DVLOG(1) << "0/" << dst->nrows() << " passed predicate"; |
| return Status::OK(); |
| } |
| } |
| predicates_effectiveness_ctx_.DisableIneffectivePredicates(); |
| |
| for (size_t col_idx : non_predicate_column_indexes_) { |
| // Materialize the column itself into the row block. |
| ColumnBlock dst_col(dst->column_block(col_idx)); |
| ColumnMaterializationContext ctx(col_idx, |
| nullptr, |
| &dst_col, |
| dst->selection_vector()); |
| RETURN_NOT_OK(iter_->MaterializeColumn(&ctx)); |
| } |
| |
| DVLOG(1) << dst->selection_vector()->CountSelected() << "/" |
| << dst->nrows() << " passed predicate"; |
| return Status::OK(); |
| } |
| |
| string MaterializingIterator::ToString() const { |
| string s; |
| s.append("Materializing(").append(iter_->ToString()).append(")"); |
| return s; |
| } |
| |
| unique_ptr<RowwiseIterator> NewMaterializingIterator( |
| unique_ptr<ColumnwiseIterator> iter) { |
| return unique_ptr<RowwiseIterator>( |
| new MaterializingIterator(std::move(iter))); |
| } |
| |
| //////////////////////////////////////////////////////////// |
| // PredicateEvaluatingIterator |
| //////////////////////////////////////////////////////////// |
| |
| // An iterator which wraps another iterator and evaluates any predicates that the |
| // wrapped iterator did not itself handle during push down. |
| class PredicateEvaluatingIterator : public RowwiseIterator { |
| public: |
| // Construct the evaluating iterator. |
| // This is only called from ::InitAndMaybeWrap() |
| // REQUIRES: base_iter is already Init()ed. |
| explicit PredicateEvaluatingIterator(unique_ptr<RowwiseIterator> base_iter); |
| |
| // Initialize the iterator. |
| // POSTCONDITION: spec->predicates().empty() |
| Status Init(ScanSpec *spec) OVERRIDE; |
| |
| virtual Status NextBlock(RowBlock *dst) OVERRIDE; |
| |
| bool HasNext() const OVERRIDE; |
| |
| string ToString() const OVERRIDE; |
| |
| const Schema &schema() const OVERRIDE { |
| return base_iter_->schema(); |
| } |
| |
| virtual void GetIteratorStats(std::vector<IteratorStats>* stats) const OVERRIDE { |
| base_iter_->GetIteratorStats(stats); |
| predicates_effectiveness_ctx_.PopulateIteratorStatsWithDisabledPredicates( |
| col_idx_predicates_, stats); |
| } |
| |
| // Return the column predicates tracked by this iterator. Only valid for the lifetime of |
| // the iterator. |
| vector<const ColumnPredicate*> col_predicates() const { |
| vector<const ColumnPredicate*> result; |
| result.reserve(col_idx_predicates_.size()); |
| std::transform(col_idx_predicates_.begin(), |
| col_idx_predicates_.end(), |
| std::back_inserter(result), |
| [](const ColumnIdxAndPredicate& idx_and_pred) { return &idx_and_pred.second; }); |
| return result; |
| } |
| |
| const IteratorPredicateEffectivenessContext& effectiveness_context() const { |
| return predicates_effectiveness_ctx_; |
| } |
| |
| private: |
| |
| unique_ptr<RowwiseIterator> base_iter_; |
| |
| // List of (column index, predicate) in order of most to least selective, with |
| // ties broken by the column index. |
| vector<ColumnIdxAndPredicate> col_idx_predicates_; |
| |
| // Predicate effective contexts help disable ineffective column predicates. |
| IteratorPredicateEffectivenessContext predicates_effectiveness_ctx_; |
| }; |
| |
| PredicateEvaluatingIterator::PredicateEvaluatingIterator(unique_ptr<RowwiseIterator> base_iter) |
| : base_iter_(std::move(base_iter)) { |
| } |
| |
| Status PredicateEvaluatingIterator::Init(ScanSpec *spec) { |
| // base_iter_ already Init()ed before this is constructed. |
| CHECK_NOTNULL(spec); |
| |
| // Gather any predicates that the base iterator did not pushdown, and remove |
| // the predicates from the spec. |
| col_idx_predicates_.clear(); |
| col_idx_predicates_.reserve(spec->predicates().size()); |
| for (const auto& col_pred : spec->predicates()) { |
| const auto& col_name = col_pred.first; |
| const ColumnPredicate& pred = col_pred.second; |
| DCHECK_EQ(col_name, pred.column().name()); |
| int col_idx = schema().find_column(col_name); |
| if (col_idx == Schema::kColumnNotFound) { |
| return Status::InvalidArgument("No such column", col_name); |
| } |
| VLOG(1) << "Pushing down predicate " << pred.ToString(); |
| col_idx_predicates_.emplace_back(col_idx, pred); |
| } |
| spec->RemovePredicates(); |
| |
| // Sort the predicates by selectivity so that the most selective are evaluated |
| // earlier, with ties broken by the column index. |
| sort(col_idx_predicates_.begin(), col_idx_predicates_.end(), |
| [] (const ColumnIdxAndPredicate& left, |
| const ColumnIdxAndPredicate& right) { |
| int comp = SelectivityComparator(left.second, right.second); |
| return comp ? comp < 0 : left.first < right.first; |
| }); |
| |
| // Important to initialize the effectiveness contexts after sorting the predicates |
| // to get right order of predicate indices. |
| for (int pred_idx = 0; pred_idx < col_idx_predicates_.size(); pred_idx++) { |
| const auto* predicate = &col_idx_predicates_[pred_idx].second; |
| if (IsColumnPredicateDisableable(predicate->predicate_type())) { |
| predicates_effectiveness_ctx_.AddDisableablePredicate(pred_idx, predicate); |
| } |
| } |
| |
| return Status::OK(); |
| } |
| |
| bool PredicateEvaluatingIterator::HasNext() const { |
| return base_iter_->HasNext(); |
| } |
| |
| Status PredicateEvaluatingIterator::NextBlock(RowBlock *dst) { |
| RETURN_NOT_OK(base_iter_->NextBlock(dst)); |
| |
| predicates_effectiveness_ctx_.IncrementNextBlockCount(); |
| for (int i = 0; i < col_idx_predicates_.size(); i++) { |
| const auto& col_idx = col_idx_predicates_[i].first; |
| const auto& predicate = col_idx_predicates_[i].second; |
| DCHECK_NE(col_idx, Schema::kColumnNotFound); |
| |
| auto* effectiveness_ctx = IsColumnPredicateDisableable(predicate.predicate_type()) ? |
| &predicates_effectiveness_ctx_[i] : nullptr; |
| if (effectiveness_ctx && !effectiveness_ctx->IsPredicateEnabled()) { |
| // Column predicate determined to be disabled. |
| continue; |
| } |
| |
| // Determine the number of rows filtered out by this predicate. |
| // |
| // Currently we don't have a mechanism to get precise number of rows filtered out by a |
| // particular predicate and adding such stat could in fact slow down the filtering. |
| // This count of rows filtered out is not precise as the rows filtered out by predicates |
| // earlier in the sort order get more credit than the ones later in the order. Nevertheless it |
| // still helps measure whether the predicate is effective in filtering out rows. |
| auto num_rows_before = effectiveness_ctx ? |
| dst->selection_vector()->CountSelected() : 0; |
| |
| predicate.Evaluate(dst->column_block(col_idx), dst->selection_vector()); |
| |
| if (effectiveness_ctx) { |
| auto num_rows_rejected = num_rows_before - dst->selection_vector()->CountSelected(); |
| DCHECK_GE(num_rows_rejected, 0); |
| DCHECK_LE(num_rows_rejected, num_rows_before); |
| effectiveness_ctx->rows_rejected += num_rows_rejected; |
| effectiveness_ctx->rows_read += dst->nrows(); |
| } |
| |
| // If after evaluating this predicate, the entire row block has now been |
| // filtered out, we don't need to evaluate any further predicates. |
| if (!dst->selection_vector()->AnySelected()) { |
| break; |
| } |
| } |
| predicates_effectiveness_ctx_.DisableIneffectivePredicates(); |
| |
| return Status::OK(); |
| } |
| |
| string PredicateEvaluatingIterator::ToString() const { |
| return Substitute("PredicateEvaluating($0)", base_iter_->ToString()); |
| } |
| |
| Status InitAndMaybeWrap(unique_ptr<RowwiseIterator>* base_iter, |
| ScanSpec *spec) { |
| RETURN_NOT_OK((*base_iter)->Init(spec)); |
| |
| if (spec != nullptr && !spec->predicates().empty()) { |
| // Underlying iterator did not accept all predicates. Wrap it. |
| unique_ptr<RowwiseIterator> wrapper(new PredicateEvaluatingIterator(std::move(*base_iter))); |
| RETURN_NOT_OK(wrapper->Init(spec)); |
| *base_iter = std::move(wrapper); |
| } |
| return Status::OK(); |
| } |
| |
| vector<const ColumnPredicate*> GetIteratorPredicatesForTests( |
| const unique_ptr<RowwiseIterator>& iter) { |
| PredicateEvaluatingIterator* pred_eval = |
| down_cast<PredicateEvaluatingIterator*>(iter.get()); |
| return pred_eval->col_predicates(); |
| } |
| |
| const IteratorPredicateEffectivenessContext& GetIteratorPredicateEffectivenessCtxForTests( |
| const std::unique_ptr<RowwiseIterator>& iter) { |
| auto* iter_ptr = iter.get(); |
| // Using dynamic_cast like below is not recommended but okay considering following reasons: |
| // - This function is only used for tests |
| // - PredicateEvaluatingIterator and MaterializingIterator are hidden from public access. |
| // - Introducing effectiveness context for base RowwiseIterator would be unnecessary |
| // for other derived classes of RowwiseIterator. |
| if (auto* pred_iter = dynamic_cast<PredicateEvaluatingIterator*>(iter_ptr)) { |
| return pred_iter->effectiveness_context(); |
| } |
| if (auto* pred_iter = dynamic_cast<MaterializingIterator*>(iter_ptr)) { |
| return pred_iter->effectiveness_context(); |
| } |
| LOG(FATAL) << "Effectiveness context not available for iterator type: " |
| << typeid(iter_ptr).name(); |
| } |
| |
| } // namespace kudu |