| // Copyright (c) 2016-present, Facebook, Inc. All rights reserved. |
| // This source code is licensed under both the GPLv2 (found in the |
| // COPYING file in the root directory) and Apache 2.0 License |
| // (found in the LICENSE.Apache file in the root directory). |
| |
| #include "db/range_del_aggregator.h" |
| |
| #include <algorithm> |
| |
| namespace rocksdb { |
| |
| RangeDelAggregator::RangeDelAggregator( |
| const InternalKeyComparator& icmp, |
| const std::vector<SequenceNumber>& snapshots, |
| bool collapse_deletions /* = true */) |
| : upper_bound_(kMaxSequenceNumber), |
| icmp_(icmp), |
| collapse_deletions_(collapse_deletions) { |
| InitRep(snapshots); |
| } |
| |
| RangeDelAggregator::RangeDelAggregator(const InternalKeyComparator& icmp, |
| SequenceNumber snapshot, |
| bool collapse_deletions /* = false */) |
| : upper_bound_(snapshot), |
| icmp_(icmp), |
| collapse_deletions_(collapse_deletions) {} |
| |
| void RangeDelAggregator::InitRep(const std::vector<SequenceNumber>& snapshots) { |
| assert(rep_ == nullptr); |
| rep_.reset(new Rep()); |
| for (auto snapshot : snapshots) { |
| rep_->stripe_map_.emplace( |
| snapshot, |
| PositionalTombstoneMap(TombstoneMap( |
| stl_wrappers::LessOfComparator(icmp_.user_comparator())))); |
| } |
| // Data newer than any snapshot falls in this catch-all stripe |
| rep_->stripe_map_.emplace( |
| kMaxSequenceNumber, |
| PositionalTombstoneMap(TombstoneMap( |
| stl_wrappers::LessOfComparator(icmp_.user_comparator())))); |
| rep_->pinned_iters_mgr_.StartPinning(); |
| } |
| |
| bool RangeDelAggregator::ShouldDelete( |
| const Slice& internal_key, RangeDelAggregator::RangePositioningMode mode) { |
| if (rep_ == nullptr) { |
| return false; |
| } |
| ParsedInternalKey parsed; |
| if (!ParseInternalKey(internal_key, &parsed)) { |
| assert(false); |
| } |
| return ShouldDelete(parsed, mode); |
| } |
| |
| bool RangeDelAggregator::ShouldDelete( |
| const ParsedInternalKey& parsed, |
| RangeDelAggregator::RangePositioningMode mode) { |
| assert(IsValueType(parsed.type)); |
| if (rep_ == nullptr) { |
| return false; |
| } |
| auto& positional_tombstone_map = GetPositionalTombstoneMap(parsed.sequence); |
| const auto& tombstone_map = positional_tombstone_map.raw_map; |
| if (tombstone_map.empty()) { |
| return false; |
| } |
| auto& tombstone_map_iter = positional_tombstone_map.iter; |
| if (tombstone_map_iter == tombstone_map.end() && |
| (mode == kForwardTraversal || mode == kBackwardTraversal)) { |
| // invalid (e.g., if AddTombstones() changed the deletions), so need to |
| // reseek |
| mode = kBinarySearch; |
| } |
| switch (mode) { |
| case kFullScan: |
| assert(!collapse_deletions_); |
| // The maintained state (PositionalTombstoneMap::iter) isn't useful when |
| // we linear scan from the beginning each time, but we maintain it anyways |
| // for consistency. |
| tombstone_map_iter = tombstone_map.begin(); |
| while (tombstone_map_iter != tombstone_map.end()) { |
| const auto& tombstone = tombstone_map_iter->second; |
| if (icmp_.user_comparator()->Compare(parsed.user_key, |
| tombstone.start_key_) < 0) { |
| break; |
| } |
| if (parsed.sequence < tombstone.seq_ && |
| icmp_.user_comparator()->Compare(parsed.user_key, |
| tombstone.end_key_) < 0) { |
| return true; |
| } |
| ++tombstone_map_iter; |
| } |
| return false; |
| case kForwardTraversal: |
| assert(collapse_deletions_ && tombstone_map_iter != tombstone_map.end()); |
| if (tombstone_map_iter == tombstone_map.begin() && |
| icmp_.user_comparator()->Compare(parsed.user_key, |
| tombstone_map_iter->first) < 0) { |
| // before start of deletion intervals |
| return false; |
| } |
| while (std::next(tombstone_map_iter) != tombstone_map.end() && |
| icmp_.user_comparator()->Compare( |
| std::next(tombstone_map_iter)->first, parsed.user_key) <= 0) { |
| ++tombstone_map_iter; |
| } |
| break; |
| case kBackwardTraversal: |
| assert(collapse_deletions_ && tombstone_map_iter != tombstone_map.end()); |
| while (tombstone_map_iter != tombstone_map.begin() && |
| icmp_.user_comparator()->Compare(parsed.user_key, |
| tombstone_map_iter->first) < 0) { |
| --tombstone_map_iter; |
| } |
| if (tombstone_map_iter == tombstone_map.begin() && |
| icmp_.user_comparator()->Compare(parsed.user_key, |
| tombstone_map_iter->first) < 0) { |
| // before start of deletion intervals |
| return false; |
| } |
| break; |
| case kBinarySearch: |
| assert(collapse_deletions_); |
| tombstone_map_iter = |
| tombstone_map.upper_bound(parsed.user_key); |
| if (tombstone_map_iter == tombstone_map.begin()) { |
| // before start of deletion intervals |
| return false; |
| } |
| --tombstone_map_iter; |
| break; |
| } |
| assert(mode != kFullScan); |
| assert(tombstone_map_iter != tombstone_map.end() && |
| icmp_.user_comparator()->Compare(tombstone_map_iter->first, |
| parsed.user_key) <= 0); |
| assert(std::next(tombstone_map_iter) == tombstone_map.end() || |
| icmp_.user_comparator()->Compare( |
| parsed.user_key, std::next(tombstone_map_iter)->first) < 0); |
| return parsed.sequence < tombstone_map_iter->second.seq_; |
| } |
| |
| bool RangeDelAggregator::ShouldAddTombstones( |
| bool bottommost_level /* = false */) { |
| // TODO(andrewkr): can we just open a file and throw it away if it ends up |
| // empty after AddToBuilder()? This function doesn't take into subcompaction |
| // boundaries so isn't completely accurate. |
| if (rep_ == nullptr) { |
| return false; |
| } |
| auto stripe_map_iter = rep_->stripe_map_.begin(); |
| assert(stripe_map_iter != rep_->stripe_map_.end()); |
| if (bottommost_level) { |
| // For the bottommost level, keys covered by tombstones in the first |
| // (oldest) stripe have been compacted away, so the tombstones are obsolete. |
| ++stripe_map_iter; |
| } |
| while (stripe_map_iter != rep_->stripe_map_.end()) { |
| if (!stripe_map_iter->second.raw_map.empty()) { |
| return true; |
| } |
| ++stripe_map_iter; |
| } |
| return false; |
| } |
| |
| Status RangeDelAggregator::AddTombstones( |
| std::unique_ptr<InternalIterator> input) { |
| if (input == nullptr) { |
| return Status::OK(); |
| } |
| input->SeekToFirst(); |
| bool first_iter = true; |
| while (input->Valid()) { |
| if (first_iter) { |
| if (rep_ == nullptr) { |
| InitRep({upper_bound_}); |
| } else { |
| InvalidateTombstoneMapPositions(); |
| } |
| first_iter = false; |
| } |
| ParsedInternalKey parsed_key; |
| if (!ParseInternalKey(input->key(), &parsed_key)) { |
| return Status::Corruption("Unable to parse range tombstone InternalKey"); |
| } |
| RangeTombstone tombstone(parsed_key, input->value()); |
| AddTombstone(std::move(tombstone)); |
| input->Next(); |
| } |
| if (!first_iter) { |
| rep_->pinned_iters_mgr_.PinIterator(input.release(), false /* arena */); |
| } |
| return Status::OK(); |
| } |
| |
| void RangeDelAggregator::InvalidateTombstoneMapPositions() { |
| if (rep_ == nullptr) { |
| return; |
| } |
| for (auto stripe_map_iter = rep_->stripe_map_.begin(); |
| stripe_map_iter != rep_->stripe_map_.end(); ++stripe_map_iter) { |
| stripe_map_iter->second.iter = stripe_map_iter->second.raw_map.end(); |
| } |
| } |
| |
| Status RangeDelAggregator::AddTombstone(RangeTombstone tombstone) { |
| auto& positional_tombstone_map = GetPositionalTombstoneMap(tombstone.seq_); |
| auto& tombstone_map = positional_tombstone_map.raw_map; |
| if (collapse_deletions_) { |
| // In collapsed mode, we only fill the seq_ field in the TombstoneMap's |
| // values. The end_key is unneeded because we assume the tombstone extends |
| // until the next tombstone starts. For gaps between real tombstones and |
| // for the last real tombstone, we denote end keys by inserting fake |
| // tombstones with sequence number zero. |
| std::vector<RangeTombstone> new_range_dels{ |
| tombstone, RangeTombstone(tombstone.end_key_, Slice(), 0)}; |
| auto new_range_dels_iter = new_range_dels.begin(); |
| // Position at the first overlapping existing tombstone; if none exists, |
| // insert until we find an existing one overlapping a new point |
| const Slice* tombstone_map_begin = nullptr; |
| if (!tombstone_map.empty()) { |
| tombstone_map_begin = &tombstone_map.begin()->first; |
| } |
| auto last_range_dels_iter = new_range_dels_iter; |
| while (new_range_dels_iter != new_range_dels.end() && |
| (tombstone_map_begin == nullptr || |
| icmp_.user_comparator()->Compare(new_range_dels_iter->start_key_, |
| *tombstone_map_begin) < 0)) { |
| tombstone_map.emplace( |
| new_range_dels_iter->start_key_, |
| RangeTombstone(Slice(), Slice(), new_range_dels_iter->seq_)); |
| last_range_dels_iter = new_range_dels_iter; |
| ++new_range_dels_iter; |
| } |
| if (new_range_dels_iter == new_range_dels.end()) { |
| return Status::OK(); |
| } |
| // above loop advances one too far |
| new_range_dels_iter = last_range_dels_iter; |
| auto tombstone_map_iter = |
| tombstone_map.upper_bound(new_range_dels_iter->start_key_); |
| // if nothing overlapped we would've already inserted all the new points |
| // and returned early |
| assert(tombstone_map_iter != tombstone_map.begin()); |
| tombstone_map_iter--; |
| |
| // untermed_seq is non-kMaxSequenceNumber when we covered an existing point |
| // but haven't seen its corresponding endpoint. It's used for (1) deciding |
| // whether to forcibly insert the new interval's endpoint; and (2) possibly |
| // raising the seqnum for the to-be-inserted element (we insert the max |
| // seqnum between the next new interval and the unterminated interval). |
| SequenceNumber untermed_seq = kMaxSequenceNumber; |
| while (tombstone_map_iter != tombstone_map.end() && |
| new_range_dels_iter != new_range_dels.end()) { |
| const Slice *tombstone_map_iter_end = nullptr, |
| *new_range_dels_iter_end = nullptr; |
| if (tombstone_map_iter != tombstone_map.end()) { |
| auto next_tombstone_map_iter = std::next(tombstone_map_iter); |
| if (next_tombstone_map_iter != tombstone_map.end()) { |
| tombstone_map_iter_end = &next_tombstone_map_iter->first; |
| } |
| } |
| if (new_range_dels_iter != new_range_dels.end()) { |
| auto next_new_range_dels_iter = std::next(new_range_dels_iter); |
| if (next_new_range_dels_iter != new_range_dels.end()) { |
| new_range_dels_iter_end = &next_new_range_dels_iter->start_key_; |
| } |
| } |
| |
| // our positions in existing/new tombstone collections should always |
| // overlap. The non-overlapping cases are handled above and below this |
| // loop. |
| assert(new_range_dels_iter_end == nullptr || |
| icmp_.user_comparator()->Compare(tombstone_map_iter->first, |
| *new_range_dels_iter_end) < 0); |
| assert(tombstone_map_iter_end == nullptr || |
| icmp_.user_comparator()->Compare(new_range_dels_iter->start_key_, |
| *tombstone_map_iter_end) < 0); |
| |
| int new_to_old_start_cmp = icmp_.user_comparator()->Compare( |
| new_range_dels_iter->start_key_, tombstone_map_iter->first); |
| // nullptr end means extends infinitely rightwards, set new_to_old_end_cmp |
| // accordingly so we can use common code paths later. |
| int new_to_old_end_cmp; |
| if (new_range_dels_iter_end == nullptr && |
| tombstone_map_iter_end == nullptr) { |
| new_to_old_end_cmp = 0; |
| } else if (new_range_dels_iter_end == nullptr) { |
| new_to_old_end_cmp = 1; |
| } else if (tombstone_map_iter_end == nullptr) { |
| new_to_old_end_cmp = -1; |
| } else { |
| new_to_old_end_cmp = icmp_.user_comparator()->Compare( |
| *new_range_dels_iter_end, *tombstone_map_iter_end); |
| } |
| |
| if (new_to_old_start_cmp < 0) { |
| // the existing one's left endpoint comes after, so raise/delete it if |
| // it's covered. |
| if (tombstone_map_iter->second.seq_ < new_range_dels_iter->seq_) { |
| untermed_seq = tombstone_map_iter->second.seq_; |
| if (tombstone_map_iter != tombstone_map.begin() && |
| std::prev(tombstone_map_iter)->second.seq_ == |
| new_range_dels_iter->seq_) { |
| tombstone_map_iter = tombstone_map.erase(tombstone_map_iter); |
| --tombstone_map_iter; |
| } else { |
| tombstone_map_iter->second.seq_ = new_range_dels_iter->seq_; |
| } |
| } |
| } else if (new_to_old_start_cmp > 0) { |
| if (untermed_seq != kMaxSequenceNumber || |
| tombstone_map_iter->second.seq_ < new_range_dels_iter->seq_) { |
| auto seq = tombstone_map_iter->second.seq_; |
| // need to adjust this element if not intended to span beyond the new |
| // element (i.e., was_tombstone_map_iter_raised == true), or if it |
| // can be raised |
| tombstone_map_iter = tombstone_map.emplace( |
| new_range_dels_iter->start_key_, |
| RangeTombstone( |
| Slice(), Slice(), |
| std::max( |
| untermed_seq == kMaxSequenceNumber ? 0 : untermed_seq, |
| new_range_dels_iter->seq_))); |
| untermed_seq = seq; |
| } |
| } else { |
| // their left endpoints coincide, so raise the existing one if needed |
| if (tombstone_map_iter->second.seq_ < new_range_dels_iter->seq_) { |
| untermed_seq = tombstone_map_iter->second.seq_; |
| tombstone_map_iter->second.seq_ = new_range_dels_iter->seq_; |
| } |
| } |
| |
| // advance whichever one ends earlier, or both if their right endpoints |
| // coincide |
| if (new_to_old_end_cmp < 0) { |
| ++new_range_dels_iter; |
| } else if (new_to_old_end_cmp > 0) { |
| ++tombstone_map_iter; |
| untermed_seq = kMaxSequenceNumber; |
| } else { |
| ++new_range_dels_iter; |
| ++tombstone_map_iter; |
| untermed_seq = kMaxSequenceNumber; |
| } |
| } |
| while (new_range_dels_iter != new_range_dels.end()) { |
| tombstone_map.emplace( |
| new_range_dels_iter->start_key_, |
| RangeTombstone(Slice(), Slice(), new_range_dels_iter->seq_)); |
| ++new_range_dels_iter; |
| } |
| } else { |
| auto start_key = tombstone.start_key_; |
| tombstone_map.emplace(start_key, std::move(tombstone)); |
| } |
| return Status::OK(); |
| } |
| |
| RangeDelAggregator::PositionalTombstoneMap& |
| RangeDelAggregator::GetPositionalTombstoneMap(SequenceNumber seq) { |
| assert(rep_ != nullptr); |
| // The stripe includes seqnum for the snapshot above and excludes seqnum for |
| // the snapshot below. |
| StripeMap::iterator iter; |
| if (seq > 0) { |
| // upper_bound() checks strict inequality so need to subtract one |
| iter = rep_->stripe_map_.upper_bound(seq - 1); |
| } else { |
| iter = rep_->stripe_map_.begin(); |
| } |
| // catch-all stripe justifies this assertion in either of above cases |
| assert(iter != rep_->stripe_map_.end()); |
| return iter->second; |
| } |
| |
| // TODO(andrewkr): We should implement an iterator over range tombstones in our |
| // map. It'd enable compaction to open tables on-demand, i.e., only once range |
| // tombstones are known to be available, without the code duplication we have |
| // in ShouldAddTombstones(). It'll also allow us to move the table-modifying |
| // code into more coherent places: CompactionJob and BuildTable(). |
| void RangeDelAggregator::AddToBuilder( |
| TableBuilder* builder, const Slice* lower_bound, const Slice* upper_bound, |
| FileMetaData* meta, |
| CompactionIterationStats* range_del_out_stats /* = nullptr */, |
| bool bottommost_level /* = false */) { |
| if (rep_ == nullptr) { |
| return; |
| } |
| auto stripe_map_iter = rep_->stripe_map_.begin(); |
| assert(stripe_map_iter != rep_->stripe_map_.end()); |
| if (bottommost_level) { |
| // TODO(andrewkr): these are counted for each compaction output file, so |
| // lots of double-counting. |
| if (!stripe_map_iter->second.raw_map.empty()) { |
| range_del_out_stats->num_range_del_drop_obsolete += |
| static_cast<int64_t>(stripe_map_iter->second.raw_map.size()) - |
| (collapse_deletions_ ? 1 : 0); |
| range_del_out_stats->num_record_drop_obsolete += |
| static_cast<int64_t>(stripe_map_iter->second.raw_map.size()) - |
| (collapse_deletions_ ? 1 : 0); |
| } |
| // For the bottommost level, keys covered by tombstones in the first |
| // (oldest) stripe have been compacted away, so the tombstones are obsolete. |
| ++stripe_map_iter; |
| } |
| |
| // Note the order in which tombstones are stored is insignificant since we |
| // insert them into a std::map on the read path. |
| while (stripe_map_iter != rep_->stripe_map_.end()) { |
| bool first_added = false; |
| for (auto tombstone_map_iter = stripe_map_iter->second.raw_map.begin(); |
| tombstone_map_iter != stripe_map_iter->second.raw_map.end(); |
| ++tombstone_map_iter) { |
| RangeTombstone tombstone; |
| if (collapse_deletions_) { |
| auto next_tombstone_map_iter = std::next(tombstone_map_iter); |
| if (next_tombstone_map_iter == stripe_map_iter->second.raw_map.end() || |
| tombstone_map_iter->second.seq_ == 0) { |
| // it's a sentinel tombstone |
| continue; |
| } |
| tombstone.start_key_ = tombstone_map_iter->first; |
| tombstone.end_key_ = next_tombstone_map_iter->first; |
| tombstone.seq_ = tombstone_map_iter->second.seq_; |
| } else { |
| tombstone = tombstone_map_iter->second; |
| } |
| if (upper_bound != nullptr && |
| icmp_.user_comparator()->Compare(*upper_bound, |
| tombstone.start_key_) <= 0) { |
| // Tombstones starting at upper_bound or later only need to be included |
| // in the next table. Break because subsequent tombstones will start |
| // even later. |
| break; |
| } |
| if (lower_bound != nullptr && |
| icmp_.user_comparator()->Compare(tombstone.end_key_, |
| *lower_bound) <= 0) { |
| // Tombstones ending before or at lower_bound only need to be included |
| // in the prev table. Continue because subsequent tombstones may still |
| // overlap [lower_bound, upper_bound). |
| continue; |
| } |
| |
| auto ikey_and_end_key = tombstone.Serialize(); |
| builder->Add(ikey_and_end_key.first.Encode(), ikey_and_end_key.second); |
| if (!first_added) { |
| first_added = true; |
| InternalKey smallest_candidate = std::move(ikey_and_end_key.first); |
| if (lower_bound != nullptr && |
| icmp_.user_comparator()->Compare(smallest_candidate.user_key(), |
| *lower_bound) <= 0) { |
| // Pretend the smallest key has the same user key as lower_bound |
| // (the max key in the previous table or subcompaction) in order for |
| // files to appear key-space partitioned. |
| // |
| // Choose lowest seqnum so this file's smallest internal key comes |
| // after the previous file's/subcompaction's largest. The fake seqnum |
| // is OK because the read path's file-picking code only considers user |
| // key. |
| smallest_candidate = InternalKey(*lower_bound, 0, kTypeRangeDeletion); |
| } |
| if (meta->smallest.size() == 0 || |
| icmp_.Compare(smallest_candidate, meta->smallest) < 0) { |
| meta->smallest = std::move(smallest_candidate); |
| } |
| } |
| InternalKey largest_candidate = tombstone.SerializeEndKey(); |
| if (upper_bound != nullptr && |
| icmp_.user_comparator()->Compare(*upper_bound, |
| largest_candidate.user_key()) <= 0) { |
| // Pretend the largest key has the same user key as upper_bound (the |
| // min key in the following table or subcompaction) in order for files |
| // to appear key-space partitioned. |
| // |
| // Choose highest seqnum so this file's largest internal key comes |
| // before the next file's/subcompaction's smallest. The fake seqnum is |
| // OK because the read path's file-picking code only considers the user |
| // key portion. |
| // |
| // Note Seek() also creates InternalKey with (user_key, |
| // kMaxSequenceNumber), but with kTypeDeletion (0x7) instead of |
| // kTypeRangeDeletion (0xF), so the range tombstone comes before the |
| // Seek() key in InternalKey's ordering. So Seek() will look in the |
| // next file for the user key. |
| largest_candidate = InternalKey(*upper_bound, kMaxSequenceNumber, |
| kTypeRangeDeletion); |
| } |
| if (meta->largest.size() == 0 || |
| icmp_.Compare(meta->largest, largest_candidate) < 0) { |
| meta->largest = std::move(largest_candidate); |
| } |
| meta->smallest_seqno = std::min(meta->smallest_seqno, tombstone.seq_); |
| meta->largest_seqno = std::max(meta->largest_seqno, tombstone.seq_); |
| } |
| ++stripe_map_iter; |
| } |
| } |
| |
| bool RangeDelAggregator::IsEmpty() { |
| if (rep_ == nullptr) { |
| return true; |
| } |
| for (auto stripe_map_iter = rep_->stripe_map_.begin(); |
| stripe_map_iter != rep_->stripe_map_.end(); ++stripe_map_iter) { |
| if (!stripe_map_iter->second.raw_map.empty()) { |
| return false; |
| } |
| } |
| return true; |
| } |
| |
| } // namespace rocksdb |