| // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
| // This source code is licensed under both the GPLv2 (found in the |
| // COPYING file in the root directory) and Apache 2.0 License |
| // (found in the LICENSE.Apache file in the root directory). |
| // |
| // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. See the AUTHORS file for names of contributors. |
| |
| #include "table/merging_iterator.h" |
| #include <string> |
| #include <vector> |
| #include "db/pinned_iterators_manager.h" |
| #include "monitoring/perf_context_imp.h" |
| #include "rocksdb/comparator.h" |
| #include "rocksdb/iterator.h" |
| #include "rocksdb/options.h" |
| #include "table/internal_iterator.h" |
| #include "table/iter_heap.h" |
| #include "table/iterator_wrapper.h" |
| #include "util/arena.h" |
| #include "util/autovector.h" |
| #include "util/heap.h" |
| #include "util/stop_watch.h" |
| #include "util/sync_point.h" |
| |
| namespace rocksdb { |
| // Without anonymous namespace here, we fail the warning -Wmissing-prototypes |
| namespace { |
| typedef BinaryHeap<IteratorWrapper*, MaxIteratorComparator> MergerMaxIterHeap; |
| typedef BinaryHeap<IteratorWrapper*, MinIteratorComparator> MergerMinIterHeap; |
| } // namespace |
| |
| const size_t kNumIterReserve = 4; |
| |
| class MergingIterator : public InternalIterator { |
| public: |
| MergingIterator(const Comparator* comparator, InternalIterator** children, |
| int n, bool is_arena_mode, bool prefix_seek_mode) |
| : is_arena_mode_(is_arena_mode), |
| comparator_(comparator), |
| current_(nullptr), |
| direction_(kForward), |
| minHeap_(comparator_), |
| prefix_seek_mode_(prefix_seek_mode), |
| pinned_iters_mgr_(nullptr) { |
| children_.resize(n); |
| for (int i = 0; i < n; i++) { |
| children_[i].Set(children[i]); |
| } |
| for (auto& child : children_) { |
| if (child.Valid()) { |
| minHeap_.push(&child); |
| } |
| } |
| current_ = CurrentForward(); |
| } |
| |
| virtual void AddIterator(InternalIterator* iter) { |
| assert(direction_ == kForward); |
| children_.emplace_back(iter); |
| if (pinned_iters_mgr_) { |
| iter->SetPinnedItersMgr(pinned_iters_mgr_); |
| } |
| auto new_wrapper = children_.back(); |
| if (new_wrapper.Valid()) { |
| minHeap_.push(&new_wrapper); |
| current_ = CurrentForward(); |
| } |
| } |
| |
| virtual ~MergingIterator() { |
| for (auto& child : children_) { |
| child.DeleteIter(is_arena_mode_); |
| } |
| } |
| |
| virtual bool Valid() const override { return (current_ != nullptr); } |
| |
| virtual void SeekToFirst() override { |
| ClearHeaps(); |
| for (auto& child : children_) { |
| child.SeekToFirst(); |
| if (child.Valid()) { |
| minHeap_.push(&child); |
| } |
| } |
| direction_ = kForward; |
| current_ = CurrentForward(); |
| } |
| |
| virtual void SeekToLast() override { |
| ClearHeaps(); |
| InitMaxHeap(); |
| for (auto& child : children_) { |
| child.SeekToLast(); |
| if (child.Valid()) { |
| maxHeap_->push(&child); |
| } |
| } |
| direction_ = kReverse; |
| current_ = CurrentReverse(); |
| } |
| |
| virtual void Seek(const Slice& target) override { |
| ClearHeaps(); |
| for (auto& child : children_) { |
| { |
| PERF_TIMER_GUARD(seek_child_seek_time); |
| child.Seek(target); |
| } |
| PERF_COUNTER_ADD(seek_child_seek_count, 1); |
| |
| if (child.Valid()) { |
| PERF_TIMER_GUARD(seek_min_heap_time); |
| minHeap_.push(&child); |
| } |
| } |
| direction_ = kForward; |
| { |
| PERF_TIMER_GUARD(seek_min_heap_time); |
| current_ = CurrentForward(); |
| } |
| } |
| |
| virtual void SeekForPrev(const Slice& target) override { |
| ClearHeaps(); |
| InitMaxHeap(); |
| |
| for (auto& child : children_) { |
| { |
| PERF_TIMER_GUARD(seek_child_seek_time); |
| child.SeekForPrev(target); |
| } |
| PERF_COUNTER_ADD(seek_child_seek_count, 1); |
| |
| if (child.Valid()) { |
| PERF_TIMER_GUARD(seek_max_heap_time); |
| maxHeap_->push(&child); |
| } |
| } |
| direction_ = kReverse; |
| { |
| PERF_TIMER_GUARD(seek_max_heap_time); |
| current_ = CurrentReverse(); |
| } |
| } |
| |
| virtual void Next() override { |
| assert(Valid()); |
| |
| // Ensure that all children are positioned after key(). |
| // If we are moving in the forward direction, it is already |
| // true for all of the non-current children since current_ is |
| // the smallest child and key() == current_->key(). |
| if (direction_ != kForward) { |
| // Otherwise, advance the non-current children. We advance current_ |
| // just after the if-block. |
| ClearHeaps(); |
| for (auto& child : children_) { |
| if (&child != current_) { |
| child.Seek(key()); |
| if (child.Valid() && comparator_->Equal(key(), child.key())) { |
| child.Next(); |
| } |
| } |
| if (child.Valid()) { |
| minHeap_.push(&child); |
| } |
| } |
| direction_ = kForward; |
| // The loop advanced all non-current children to be > key() so current_ |
| // should still be strictly the smallest key. |
| assert(current_ == CurrentForward()); |
| } |
| |
| // For the heap modifications below to be correct, current_ must be the |
| // current top of the heap. |
| assert(current_ == CurrentForward()); |
| |
| // as the current points to the current record. move the iterator forward. |
| current_->Next(); |
| if (current_->Valid()) { |
| // current is still valid after the Next() call above. Call |
| // replace_top() to restore the heap property. When the same child |
| // iterator yields a sequence of keys, this is cheap. |
| minHeap_.replace_top(current_); |
| } else { |
| // current stopped being valid, remove it from the heap. |
| minHeap_.pop(); |
| } |
| current_ = CurrentForward(); |
| } |
| |
| virtual void Prev() override { |
| assert(Valid()); |
| // Ensure that all children are positioned before key(). |
| // If we are moving in the reverse direction, it is already |
| // true for all of the non-current children since current_ is |
| // the largest child and key() == current_->key(). |
| if (direction_ != kReverse) { |
| // Otherwise, retreat the non-current children. We retreat current_ |
| // just after the if-block. |
| ClearHeaps(); |
| InitMaxHeap(); |
| for (auto& child : children_) { |
| if (&child != current_) { |
| if (!prefix_seek_mode_) { |
| child.Seek(key()); |
| if (child.Valid()) { |
| // Child is at first entry >= key(). Step back one to be < key() |
| TEST_SYNC_POINT_CALLBACK("MergeIterator::Prev:BeforePrev", |
| &child); |
| child.Prev(); |
| } else { |
| // Child has no entries >= key(). Position at last entry. |
| TEST_SYNC_POINT("MergeIterator::Prev:BeforeSeekToLast"); |
| child.SeekToLast(); |
| } |
| } else { |
| child.SeekForPrev(key()); |
| if (child.Valid() && comparator_->Equal(key(), child.key())) { |
| child.Prev(); |
| } |
| } |
| } |
| if (child.Valid()) { |
| maxHeap_->push(&child); |
| } |
| } |
| direction_ = kReverse; |
| if (!prefix_seek_mode_) { |
| // Note that we don't do assert(current_ == CurrentReverse()) here |
| // because it is possible to have some keys larger than the seek-key |
| // inserted between Seek() and SeekToLast(), which makes current_ not |
| // equal to CurrentReverse(). |
| current_ = CurrentReverse(); |
| } |
| // The loop advanced all non-current children to be < key() so current_ |
| // should still be strictly the smallest key. |
| assert(current_ == CurrentReverse()); |
| } |
| |
| // For the heap modifications below to be correct, current_ must be the |
| // current top of the heap. |
| assert(current_ == CurrentReverse()); |
| |
| current_->Prev(); |
| if (current_->Valid()) { |
| // current is still valid after the Prev() call above. Call |
| // replace_top() to restore the heap property. When the same child |
| // iterator yields a sequence of keys, this is cheap. |
| maxHeap_->replace_top(current_); |
| } else { |
| // current stopped being valid, remove it from the heap. |
| maxHeap_->pop(); |
| } |
| current_ = CurrentReverse(); |
| } |
| |
| virtual Slice key() const override { |
| assert(Valid()); |
| return current_->key(); |
| } |
| |
| virtual Slice value() const override { |
| assert(Valid()); |
| return current_->value(); |
| } |
| |
| virtual Status status() const override { |
| Status s; |
| for (auto& child : children_) { |
| s = child.status(); |
| if (!s.ok()) { |
| break; |
| } |
| } |
| return s; |
| } |
| |
| virtual void SetPinnedItersMgr( |
| PinnedIteratorsManager* pinned_iters_mgr) override { |
| pinned_iters_mgr_ = pinned_iters_mgr; |
| for (auto& child : children_) { |
| child.SetPinnedItersMgr(pinned_iters_mgr); |
| } |
| } |
| |
| virtual bool IsKeyPinned() const override { |
| assert(Valid()); |
| return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() && |
| current_->IsKeyPinned(); |
| } |
| |
| virtual bool IsValuePinned() const override { |
| assert(Valid()); |
| return pinned_iters_mgr_ && pinned_iters_mgr_->PinningEnabled() && |
| current_->IsValuePinned(); |
| } |
| |
| private: |
| // Clears heaps for both directions, used when changing direction or seeking |
| void ClearHeaps(); |
| // Ensures that maxHeap_ is initialized when starting to go in the reverse |
| // direction |
| void InitMaxHeap(); |
| |
| bool is_arena_mode_; |
| const Comparator* comparator_; |
| autovector<IteratorWrapper, kNumIterReserve> children_; |
| |
| // Cached pointer to child iterator with the current key, or nullptr if no |
| // child iterators are valid. This is the top of minHeap_ or maxHeap_ |
| // depending on the direction. |
| IteratorWrapper* current_; |
| // Which direction is the iterator moving? |
| enum Direction { |
| kForward, |
| kReverse |
| }; |
| Direction direction_; |
| MergerMinIterHeap minHeap_; |
| bool prefix_seek_mode_; |
| |
| // Max heap is used for reverse iteration, which is way less common than |
| // forward. Lazily initialize it to save memory. |
| std::unique_ptr<MergerMaxIterHeap> maxHeap_; |
| PinnedIteratorsManager* pinned_iters_mgr_; |
| |
| IteratorWrapper* CurrentForward() const { |
| assert(direction_ == kForward); |
| return !minHeap_.empty() ? minHeap_.top() : nullptr; |
| } |
| |
| IteratorWrapper* CurrentReverse() const { |
| assert(direction_ == kReverse); |
| assert(maxHeap_); |
| return !maxHeap_->empty() ? maxHeap_->top() : nullptr; |
| } |
| }; |
| |
| void MergingIterator::ClearHeaps() { |
| minHeap_.clear(); |
| if (maxHeap_) { |
| maxHeap_->clear(); |
| } |
| } |
| |
| void MergingIterator::InitMaxHeap() { |
| if (!maxHeap_) { |
| maxHeap_.reset(new MergerMaxIterHeap(comparator_)); |
| } |
| } |
| |
| InternalIterator* NewMergingIterator(const Comparator* cmp, |
| InternalIterator** list, int n, |
| Arena* arena, bool prefix_seek_mode) { |
| assert(n >= 0); |
| if (n == 0) { |
| return NewEmptyInternalIterator(arena); |
| } else if (n == 1) { |
| return list[0]; |
| } else { |
| if (arena == nullptr) { |
| return new MergingIterator(cmp, list, n, false, prefix_seek_mode); |
| } else { |
| auto mem = arena->AllocateAligned(sizeof(MergingIterator)); |
| return new (mem) MergingIterator(cmp, list, n, true, prefix_seek_mode); |
| } |
| } |
| } |
| |
| MergeIteratorBuilder::MergeIteratorBuilder(const Comparator* comparator, |
| Arena* a, bool prefix_seek_mode) |
| : first_iter(nullptr), use_merging_iter(false), arena(a) { |
| auto mem = arena->AllocateAligned(sizeof(MergingIterator)); |
| merge_iter = |
| new (mem) MergingIterator(comparator, nullptr, 0, true, prefix_seek_mode); |
| } |
| |
| void MergeIteratorBuilder::AddIterator(InternalIterator* iter) { |
| if (!use_merging_iter && first_iter != nullptr) { |
| merge_iter->AddIterator(first_iter); |
| use_merging_iter = true; |
| } |
| if (use_merging_iter) { |
| merge_iter->AddIterator(iter); |
| } else { |
| first_iter = iter; |
| } |
| } |
| |
| InternalIterator* MergeIteratorBuilder::Finish() { |
| if (!use_merging_iter) { |
| return first_iter; |
| } else { |
| auto ret = merge_iter; |
| merge_iter = nullptr; |
| return ret; |
| } |
| } |
| |
| } // namespace rocksdb |