| // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
| // This source code is licensed under both the GPLv2 (found in the |
| // COPYING file in the root directory) and Apache 2.0 License |
| // (found in the LICENSE.Apache file in the root directory). |
| #pragma once |
| |
| #include <algorithm> |
| #include <deque> |
| #include <string> |
| #include <vector> |
| |
| #include "db/compaction.h" |
| #include "db/compaction_iteration_stats.h" |
| #include "db/merge_helper.h" |
| #include "db/pinned_iterators_manager.h" |
| #include "db/range_del_aggregator.h" |
| #include "options/cf_options.h" |
| #include "rocksdb/compaction_filter.h" |
| |
| namespace rocksdb { |
| |
| class CompactionEventListener; |
| |
| class CompactionIterator { |
| public: |
| // A wrapper around Compaction. Has a much smaller interface, only what |
| // CompactionIterator uses. Tests can override it. |
| class CompactionProxy { |
| public: |
| explicit CompactionProxy(const Compaction* compaction) |
| : compaction_(compaction) {} |
| |
| virtual ~CompactionProxy() = default; |
| virtual int level(size_t compaction_input_level = 0) const { |
| return compaction_->level(); |
| } |
| virtual bool KeyNotExistsBeyondOutputLevel( |
| const Slice& user_key, std::vector<size_t>* level_ptrs) const { |
| return compaction_->KeyNotExistsBeyondOutputLevel(user_key, level_ptrs); |
| } |
| virtual bool bottommost_level() const { |
| return compaction_->bottommost_level(); |
| } |
| virtual int number_levels() const { return compaction_->number_levels(); } |
| virtual Slice GetLargestUserKey() const { |
| return compaction_->GetLargestUserKey(); |
| } |
| virtual bool allow_ingest_behind() const { |
| return compaction_->immutable_cf_options()->allow_ingest_behind; |
| } |
| |
| protected: |
| CompactionProxy() = default; |
| |
| private: |
| const Compaction* compaction_; |
| }; |
| |
| CompactionIterator(InternalIterator* input, const Comparator* cmp, |
| MergeHelper* merge_helper, SequenceNumber last_sequence, |
| std::vector<SequenceNumber>* snapshots, |
| SequenceNumber earliest_write_conflict_snapshot, Env* env, |
| bool expect_valid_internal_key, |
| RangeDelAggregator* range_del_agg, |
| const Compaction* compaction = nullptr, |
| const CompactionFilter* compaction_filter = nullptr, |
| CompactionEventListener* compaction_listener = nullptr, |
| const std::atomic<bool>* shutting_down = nullptr); |
| |
| // Constructor with custom CompactionProxy, used for tests. |
| CompactionIterator(InternalIterator* input, const Comparator* cmp, |
| MergeHelper* merge_helper, SequenceNumber last_sequence, |
| std::vector<SequenceNumber>* snapshots, |
| SequenceNumber earliest_write_conflict_snapshot, Env* env, |
| bool expect_valid_internal_key, |
| RangeDelAggregator* range_del_agg, |
| std::unique_ptr<CompactionProxy> compaction, |
| const CompactionFilter* compaction_filter = nullptr, |
| CompactionEventListener* compaction_listener = nullptr, |
| const std::atomic<bool>* shutting_down = nullptr); |
| |
| ~CompactionIterator(); |
| |
| void ResetRecordCounts(); |
| |
| // Seek to the beginning of the compaction iterator output. |
| // |
| // REQUIRED: Call only once. |
| void SeekToFirst(); |
| |
| // Produces the next record in the compaction. |
| // |
| // REQUIRED: SeekToFirst() has been called. |
| void Next(); |
| |
| // Getters |
| const Slice& key() const { return key_; } |
| const Slice& value() const { return value_; } |
| const Status& status() const { return status_; } |
| const ParsedInternalKey& ikey() const { return ikey_; } |
| bool Valid() const { return valid_; } |
| const Slice& user_key() const { return current_user_key_; } |
| const CompactionIterationStats& iter_stats() const { return iter_stats_; } |
| |
| private: |
| // Processes the input stream to find the next output |
| void NextFromInput(); |
| |
| // Do last preparations before presenting the output to the callee. At this |
| // point this only zeroes out the sequence number if possible for better |
| // compression. |
| void PrepareOutput(); |
| |
| // Given a sequence number, return the sequence number of the |
| // earliest snapshot that this sequence number is visible in. |
| // The snapshots themselves are arranged in ascending order of |
| // sequence numbers. |
| // Employ a sequential search because the total number of |
| // snapshots are typically small. |
| inline SequenceNumber findEarliestVisibleSnapshot( |
| SequenceNumber in, SequenceNumber* prev_snapshot); |
| |
| InternalIterator* input_; |
| const Comparator* cmp_; |
| MergeHelper* merge_helper_; |
| const std::vector<SequenceNumber>* snapshots_; |
| const SequenceNumber earliest_write_conflict_snapshot_; |
| Env* env_; |
| bool expect_valid_internal_key_; |
| RangeDelAggregator* range_del_agg_; |
| std::unique_ptr<CompactionProxy> compaction_; |
| const CompactionFilter* compaction_filter_; |
| #ifndef ROCKSDB_LITE |
| CompactionEventListener* compaction_listener_; |
| #endif // ROCKSDB_LITE |
| const std::atomic<bool>* shutting_down_; |
| bool bottommost_level_; |
| bool valid_ = false; |
| bool visible_at_tip_; |
| SequenceNumber earliest_snapshot_; |
| SequenceNumber latest_snapshot_; |
| bool ignore_snapshots_; |
| |
| // State |
| // |
| // Points to a copy of the current compaction iterator output (current_key_) |
| // if valid_. |
| Slice key_; |
| // Points to the value in the underlying iterator that corresponds to the |
| // current output. |
| Slice value_; |
| // The status is OK unless compaction iterator encounters a merge operand |
| // while not having a merge operator defined. |
| Status status_; |
| // Stores the user key, sequence number and type of the current compaction |
| // iterator output (or current key in the underlying iterator during |
| // NextFromInput()). |
| ParsedInternalKey ikey_; |
| // Stores whether ikey_.user_key is valid. If set to false, the user key is |
| // not compared against the current key in the underlying iterator. |
| bool has_current_user_key_ = false; |
| bool at_next_ = false; // If false, the iterator |
| // Holds a copy of the current compaction iterator output (or current key in |
| // the underlying iterator during NextFromInput()). |
| IterKey current_key_; |
| Slice current_user_key_; |
| SequenceNumber current_user_key_sequence_; |
| SequenceNumber current_user_key_snapshot_; |
| |
| // True if the iterator has already returned a record for the current key. |
| bool has_outputted_key_ = false; |
| |
| // truncated the value of the next key and output it without applying any |
| // compaction rules. This is used for outputting a put after a single delete. |
| bool clear_and_output_next_key_ = false; |
| |
| MergeOutputIterator merge_out_iter_; |
| // PinnedIteratorsManager used to pin input_ Iterator blocks while reading |
| // merge operands and then releasing them after consuming them. |
| PinnedIteratorsManager pinned_iters_mgr_; |
| std::string compaction_filter_value_; |
| InternalKey compaction_filter_skip_until_; |
| // "level_ptrs" holds indices that remember which file of an associated |
| // level we were last checking during the last call to compaction-> |
| // KeyNotExistsBeyondOutputLevel(). This allows future calls to the function |
| // to pick off where it left off since each subcompaction's key range is |
| // increasing so a later call to the function must be looking for a key that |
| // is in or beyond the last file checked during the previous call |
| std::vector<size_t> level_ptrs_; |
| CompactionIterationStats iter_stats_; |
| |
| bool IsShuttingDown() { |
| // This is a best-effort facility, so memory_order_relaxed is sufficient. |
| return shutting_down_ && shutting_down_->load(std::memory_order_relaxed); |
| } |
| }; |
| } // namespace rocksdb |