| // Copyright (c) 2011-present, Facebook, Inc. All rights reserved. |
| // This source code is licensed under both the GPLv2 (found in the |
| // COPYING file in the root directory) and Apache 2.0 License |
| // (found in the LICENSE.Apache file in the root directory). |
| // |
| // Copyright (c) 2011 The LevelDB Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. See the AUTHORS file for names of contributors. |
| |
| #include "db/db_test_util.h" |
| #include "port/stack_trace.h" |
| |
| namespace rocksdb { |
| |
| static int cfilter_count = 0; |
| static int cfilter_skips = 0; |
| |
| // This is a static filter used for filtering |
| // kvs during the compaction process. |
| static std::string NEW_VALUE = "NewValue"; |
| |
| class DBTestCompactionFilter : public DBTestBase { |
| public: |
| DBTestCompactionFilter() : DBTestBase("/db_compaction_filter_test") {} |
| }; |
| |
| class KeepFilter : public CompactionFilter { |
| public: |
| virtual bool Filter(int level, const Slice& key, const Slice& value, |
| std::string* new_value, bool* value_changed) const |
| override { |
| cfilter_count++; |
| return false; |
| } |
| |
| virtual const char* Name() const override { return "KeepFilter"; } |
| }; |
| |
| class DeleteFilter : public CompactionFilter { |
| public: |
| virtual bool Filter(int level, const Slice& key, const Slice& value, |
| std::string* new_value, bool* value_changed) const |
| override { |
| cfilter_count++; |
| return true; |
| } |
| |
| virtual const char* Name() const override { return "DeleteFilter"; } |
| }; |
| |
| class DeleteISFilter : public CompactionFilter { |
| public: |
| virtual bool Filter(int level, const Slice& key, const Slice& value, |
| std::string* new_value, |
| bool* value_changed) const override { |
| cfilter_count++; |
| int i = std::stoi(key.ToString()); |
| if (i > 5 && i <= 105) { |
| return true; |
| } |
| return false; |
| } |
| |
| virtual bool IgnoreSnapshots() const override { return true; } |
| |
| virtual const char* Name() const override { return "DeleteFilter"; } |
| }; |
| |
| // Skip x if floor(x/10) is even, use range skips. Requires that keys are |
| // zero-padded to length 10. |
| class SkipEvenFilter : public CompactionFilter { |
| public: |
| virtual Decision FilterV2(int level, const Slice& key, ValueType value_type, |
| const Slice& existing_value, std::string* new_value, |
| std::string* skip_until) const override { |
| cfilter_count++; |
| int i = std::stoi(key.ToString()); |
| if (i / 10 % 2 == 0) { |
| char key_str[100]; |
| snprintf(key_str, sizeof(key), "%010d", i / 10 * 10 + 10); |
| *skip_until = key_str; |
| ++cfilter_skips; |
| return Decision::kRemoveAndSkipUntil; |
| } |
| return Decision::kKeep; |
| } |
| |
| virtual bool IgnoreSnapshots() const override { return true; } |
| |
| virtual const char* Name() const override { return "DeleteFilter"; } |
| }; |
| |
| class DelayFilter : public CompactionFilter { |
| public: |
| explicit DelayFilter(DBTestBase* d) : db_test(d) {} |
| virtual bool Filter(int level, const Slice& key, const Slice& value, |
| std::string* new_value, |
| bool* value_changed) const override { |
| db_test->env_->addon_time_.fetch_add(1000); |
| return true; |
| } |
| |
| virtual const char* Name() const override { return "DelayFilter"; } |
| |
| private: |
| DBTestBase* db_test; |
| }; |
| |
| class ConditionalFilter : public CompactionFilter { |
| public: |
| explicit ConditionalFilter(const std::string* filtered_value) |
| : filtered_value_(filtered_value) {} |
| virtual bool Filter(int level, const Slice& key, const Slice& value, |
| std::string* new_value, |
| bool* value_changed) const override { |
| return value.ToString() == *filtered_value_; |
| } |
| |
| virtual const char* Name() const override { return "ConditionalFilter"; } |
| |
| private: |
| const std::string* filtered_value_; |
| }; |
| |
| class ChangeFilter : public CompactionFilter { |
| public: |
| explicit ChangeFilter() {} |
| |
| virtual bool Filter(int level, const Slice& key, const Slice& value, |
| std::string* new_value, bool* value_changed) const |
| override { |
| assert(new_value != nullptr); |
| *new_value = NEW_VALUE; |
| *value_changed = true; |
| return false; |
| } |
| |
| virtual const char* Name() const override { return "ChangeFilter"; } |
| }; |
| |
| class KeepFilterFactory : public CompactionFilterFactory { |
| public: |
| explicit KeepFilterFactory(bool check_context = false, |
| bool check_context_cf_id = false) |
| : check_context_(check_context), |
| check_context_cf_id_(check_context_cf_id), |
| compaction_filter_created_(false) {} |
| |
| virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter( |
| const CompactionFilter::Context& context) override { |
| if (check_context_) { |
| EXPECT_EQ(expect_full_compaction_.load(), context.is_full_compaction); |
| EXPECT_EQ(expect_manual_compaction_.load(), context.is_manual_compaction); |
| } |
| if (check_context_cf_id_) { |
| EXPECT_EQ(expect_cf_id_.load(), context.column_family_id); |
| } |
| compaction_filter_created_ = true; |
| return std::unique_ptr<CompactionFilter>(new KeepFilter()); |
| } |
| |
| bool compaction_filter_created() const { return compaction_filter_created_; } |
| |
| virtual const char* Name() const override { return "KeepFilterFactory"; } |
| bool check_context_; |
| bool check_context_cf_id_; |
| std::atomic_bool expect_full_compaction_; |
| std::atomic_bool expect_manual_compaction_; |
| std::atomic<uint32_t> expect_cf_id_; |
| bool compaction_filter_created_; |
| }; |
| |
| class DeleteFilterFactory : public CompactionFilterFactory { |
| public: |
| virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter( |
| const CompactionFilter::Context& context) override { |
| if (context.is_manual_compaction) { |
| return std::unique_ptr<CompactionFilter>(new DeleteFilter()); |
| } else { |
| return std::unique_ptr<CompactionFilter>(nullptr); |
| } |
| } |
| |
| virtual const char* Name() const override { return "DeleteFilterFactory"; } |
| }; |
| |
| // Delete Filter Factory which ignores snapshots |
| class DeleteISFilterFactory : public CompactionFilterFactory { |
| public: |
| virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter( |
| const CompactionFilter::Context& context) override { |
| if (context.is_manual_compaction) { |
| return std::unique_ptr<CompactionFilter>(new DeleteISFilter()); |
| } else { |
| return std::unique_ptr<CompactionFilter>(nullptr); |
| } |
| } |
| |
| virtual const char* Name() const override { return "DeleteFilterFactory"; } |
| }; |
| |
| class SkipEvenFilterFactory : public CompactionFilterFactory { |
| public: |
| virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter( |
| const CompactionFilter::Context& context) override { |
| if (context.is_manual_compaction) { |
| return std::unique_ptr<CompactionFilter>(new SkipEvenFilter()); |
| } else { |
| return std::unique_ptr<CompactionFilter>(nullptr); |
| } |
| } |
| |
| virtual const char* Name() const override { return "SkipEvenFilterFactory"; } |
| }; |
| |
| class DelayFilterFactory : public CompactionFilterFactory { |
| public: |
| explicit DelayFilterFactory(DBTestBase* d) : db_test(d) {} |
| virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter( |
| const CompactionFilter::Context& context) override { |
| return std::unique_ptr<CompactionFilter>(new DelayFilter(db_test)); |
| } |
| |
| virtual const char* Name() const override { return "DelayFilterFactory"; } |
| |
| private: |
| DBTestBase* db_test; |
| }; |
| |
| class ConditionalFilterFactory : public CompactionFilterFactory { |
| public: |
| explicit ConditionalFilterFactory(const Slice& filtered_value) |
| : filtered_value_(filtered_value.ToString()) {} |
| |
| virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter( |
| const CompactionFilter::Context& context) override { |
| return std::unique_ptr<CompactionFilter>( |
| new ConditionalFilter(&filtered_value_)); |
| } |
| |
| virtual const char* Name() const override { |
| return "ConditionalFilterFactory"; |
| } |
| |
| private: |
| std::string filtered_value_; |
| }; |
| |
| class ChangeFilterFactory : public CompactionFilterFactory { |
| public: |
| explicit ChangeFilterFactory() {} |
| |
| virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter( |
| const CompactionFilter::Context& context) override { |
| return std::unique_ptr<CompactionFilter>(new ChangeFilter()); |
| } |
| |
| virtual const char* Name() const override { return "ChangeFilterFactory"; } |
| }; |
| |
| #ifndef ROCKSDB_LITE |
| TEST_F(DBTestCompactionFilter, CompactionFilter) { |
| Options options = CurrentOptions(); |
| options.max_open_files = -1; |
| options.num_levels = 3; |
| options.compaction_filter_factory = std::make_shared<KeepFilterFactory>(); |
| options = CurrentOptions(options); |
| CreateAndReopenWithCF({"pikachu"}, options); |
| |
| // Write 100K keys, these are written to a few files in L0. |
| const std::string value(10, 'x'); |
| for (int i = 0; i < 100000; i++) { |
| char key[100]; |
| snprintf(key, sizeof(key), "B%010d", i); |
| Put(1, key, value); |
| } |
| ASSERT_OK(Flush(1)); |
| |
| // Push all files to the highest level L2. Verify that |
| // the compaction is each level invokes the filter for |
| // all the keys in that level. |
| cfilter_count = 0; |
| dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]); |
| ASSERT_EQ(cfilter_count, 100000); |
| cfilter_count = 0; |
| dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]); |
| ASSERT_EQ(cfilter_count, 100000); |
| |
| ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0); |
| ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0); |
| ASSERT_NE(NumTableFilesAtLevel(2, 1), 0); |
| cfilter_count = 0; |
| |
| // All the files are in the lowest level. |
| // Verify that all but the 100001st record |
| // has sequence number zero. The 100001st record |
| // is at the tip of this snapshot and cannot |
| // be zeroed out. |
| int count = 0; |
| int total = 0; |
| Arena arena; |
| { |
| InternalKeyComparator icmp(options.comparator); |
| RangeDelAggregator range_del_agg(icmp, {} /* snapshots */); |
| ScopedArenaIterator iter( |
| dbfull()->NewInternalIterator(&arena, &range_del_agg, handles_[1])); |
| iter->SeekToFirst(); |
| ASSERT_OK(iter->status()); |
| while (iter->Valid()) { |
| ParsedInternalKey ikey(Slice(), 0, kTypeValue); |
| ikey.sequence = -1; |
| ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true); |
| total++; |
| if (ikey.sequence != 0) { |
| count++; |
| } |
| iter->Next(); |
| } |
| } |
| ASSERT_EQ(total, 100000); |
| ASSERT_EQ(count, 1); |
| |
| // overwrite all the 100K keys once again. |
| for (int i = 0; i < 100000; i++) { |
| char key[100]; |
| snprintf(key, sizeof(key), "B%010d", i); |
| ASSERT_OK(Put(1, key, value)); |
| } |
| ASSERT_OK(Flush(1)); |
| |
| // push all files to the highest level L2. This |
| // means that all keys should pass at least once |
| // via the compaction filter |
| cfilter_count = 0; |
| dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]); |
| ASSERT_EQ(cfilter_count, 100000); |
| cfilter_count = 0; |
| dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]); |
| ASSERT_EQ(cfilter_count, 100000); |
| ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0); |
| ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0); |
| ASSERT_NE(NumTableFilesAtLevel(2, 1), 0); |
| |
| // create a new database with the compaction |
| // filter in such a way that it deletes all keys |
| options.compaction_filter_factory = std::make_shared<DeleteFilterFactory>(); |
| options.create_if_missing = true; |
| DestroyAndReopen(options); |
| CreateAndReopenWithCF({"pikachu"}, options); |
| |
| // write all the keys once again. |
| for (int i = 0; i < 100000; i++) { |
| char key[100]; |
| snprintf(key, sizeof(key), "B%010d", i); |
| ASSERT_OK(Put(1, key, value)); |
| } |
| ASSERT_OK(Flush(1)); |
| ASSERT_NE(NumTableFilesAtLevel(0, 1), 0); |
| ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0); |
| ASSERT_EQ(NumTableFilesAtLevel(2, 1), 0); |
| |
| // Push all files to the highest level L2. This |
| // triggers the compaction filter to delete all keys, |
| // verify that at the end of the compaction process, |
| // nothing is left. |
| cfilter_count = 0; |
| dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]); |
| ASSERT_EQ(cfilter_count, 100000); |
| cfilter_count = 0; |
| dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]); |
| ASSERT_EQ(cfilter_count, 0); |
| ASSERT_EQ(NumTableFilesAtLevel(0, 1), 0); |
| ASSERT_EQ(NumTableFilesAtLevel(1, 1), 0); |
| |
| { |
| // Scan the entire database to ensure that nothing is left |
| std::unique_ptr<Iterator> iter( |
| db_->NewIterator(ReadOptions(), handles_[1])); |
| iter->SeekToFirst(); |
| count = 0; |
| while (iter->Valid()) { |
| count++; |
| iter->Next(); |
| } |
| ASSERT_EQ(count, 0); |
| } |
| |
| // The sequence number of the remaining record |
| // is not zeroed out even though it is at the |
| // level Lmax because this record is at the tip |
| count = 0; |
| { |
| InternalKeyComparator icmp(options.comparator); |
| RangeDelAggregator range_del_agg(icmp, {} /* snapshots */); |
| ScopedArenaIterator iter( |
| dbfull()->NewInternalIterator(&arena, &range_del_agg, handles_[1])); |
| iter->SeekToFirst(); |
| ASSERT_OK(iter->status()); |
| while (iter->Valid()) { |
| ParsedInternalKey ikey(Slice(), 0, kTypeValue); |
| ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true); |
| ASSERT_NE(ikey.sequence, (unsigned)0); |
| count++; |
| iter->Next(); |
| } |
| ASSERT_EQ(count, 0); |
| } |
| } |
| |
| // Tests the edge case where compaction does not produce any output -- all |
| // entries are deleted. The compaction should create bunch of 'DeleteFile' |
| // entries in VersionEdit, but none of the 'AddFile's. |
| TEST_F(DBTestCompactionFilter, CompactionFilterDeletesAll) { |
| Options options = CurrentOptions(); |
| options.compaction_filter_factory = std::make_shared<DeleteFilterFactory>(); |
| options.disable_auto_compactions = true; |
| options.create_if_missing = true; |
| DestroyAndReopen(options); |
| |
| // put some data |
| for (int table = 0; table < 4; ++table) { |
| for (int i = 0; i < 10 + table; ++i) { |
| Put(ToString(table * 100 + i), "val"); |
| } |
| Flush(); |
| } |
| |
| // this will produce empty file (delete compaction filter) |
| ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); |
| ASSERT_EQ(0U, CountLiveFiles()); |
| |
| Reopen(options); |
| |
| Iterator* itr = db_->NewIterator(ReadOptions()); |
| itr->SeekToFirst(); |
| // empty db |
| ASSERT_TRUE(!itr->Valid()); |
| |
| delete itr; |
| } |
| #endif // ROCKSDB_LITE |
| |
| TEST_F(DBTestCompactionFilter, CompactionFilterWithValueChange) { |
| do { |
| Options options = CurrentOptions(); |
| options.num_levels = 3; |
| options.compaction_filter_factory = |
| std::make_shared<ChangeFilterFactory>(); |
| CreateAndReopenWithCF({"pikachu"}, options); |
| |
| // Write 100K+1 keys, these are written to a few files |
| // in L0. We do this so that the current snapshot points |
| // to the 100001 key.The compaction filter is not invoked |
| // on keys that are visible via a snapshot because we |
| // anyways cannot delete it. |
| const std::string value(10, 'x'); |
| for (int i = 0; i < 100001; i++) { |
| char key[100]; |
| snprintf(key, sizeof(key), "B%010d", i); |
| Put(1, key, value); |
| } |
| |
| // push all files to lower levels |
| ASSERT_OK(Flush(1)); |
| if (option_config_ != kUniversalCompactionMultiLevel && |
| option_config_ != kUniversalSubcompactions) { |
| dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]); |
| dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]); |
| } else { |
| dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, |
| nullptr); |
| } |
| |
| // re-write all data again |
| for (int i = 0; i < 100001; i++) { |
| char key[100]; |
| snprintf(key, sizeof(key), "B%010d", i); |
| Put(1, key, value); |
| } |
| |
| // push all files to lower levels. This should |
| // invoke the compaction filter for all 100000 keys. |
| ASSERT_OK(Flush(1)); |
| if (option_config_ != kUniversalCompactionMultiLevel && |
| option_config_ != kUniversalSubcompactions) { |
| dbfull()->TEST_CompactRange(0, nullptr, nullptr, handles_[1]); |
| dbfull()->TEST_CompactRange(1, nullptr, nullptr, handles_[1]); |
| } else { |
| dbfull()->CompactRange(CompactRangeOptions(), handles_[1], nullptr, |
| nullptr); |
| } |
| |
| // verify that all keys now have the new value that |
| // was set by the compaction process. |
| for (int i = 0; i < 100001; i++) { |
| char key[100]; |
| snprintf(key, sizeof(key), "B%010d", i); |
| std::string newvalue = Get(1, key); |
| ASSERT_EQ(newvalue.compare(NEW_VALUE), 0); |
| } |
| } while (ChangeCompactOptions()); |
| } |
| |
| TEST_F(DBTestCompactionFilter, CompactionFilterWithMergeOperator) { |
| std::string one, two, three, four; |
| PutFixed64(&one, 1); |
| PutFixed64(&two, 2); |
| PutFixed64(&three, 3); |
| PutFixed64(&four, 4); |
| |
| Options options = CurrentOptions(); |
| options.create_if_missing = true; |
| options.merge_operator = MergeOperators::CreateUInt64AddOperator(); |
| options.num_levels = 3; |
| // Filter out keys with value is 2. |
| options.compaction_filter_factory = |
| std::make_shared<ConditionalFilterFactory>(two); |
| DestroyAndReopen(options); |
| |
| // In the same compaction, a value type needs to be deleted based on |
| // compaction filter, and there is a merge type for the key. compaction |
| // filter result is ignored. |
| ASSERT_OK(db_->Put(WriteOptions(), "foo", two)); |
| ASSERT_OK(Flush()); |
| ASSERT_OK(db_->Merge(WriteOptions(), "foo", one)); |
| ASSERT_OK(Flush()); |
| std::string newvalue = Get("foo"); |
| ASSERT_EQ(newvalue, three); |
| dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); |
| newvalue = Get("foo"); |
| ASSERT_EQ(newvalue, three); |
| |
| // value key can be deleted based on compaction filter, leaving only |
| // merge keys. |
| ASSERT_OK(db_->Put(WriteOptions(), "bar", two)); |
| ASSERT_OK(Flush()); |
| dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); |
| newvalue = Get("bar"); |
| ASSERT_EQ("NOT_FOUND", newvalue); |
| ASSERT_OK(db_->Merge(WriteOptions(), "bar", two)); |
| ASSERT_OK(Flush()); |
| dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); |
| newvalue = Get("bar"); |
| ASSERT_EQ(two, two); |
| |
| // Compaction filter never applies to merge keys. |
| ASSERT_OK(db_->Put(WriteOptions(), "foobar", one)); |
| ASSERT_OK(Flush()); |
| ASSERT_OK(db_->Merge(WriteOptions(), "foobar", two)); |
| ASSERT_OK(Flush()); |
| newvalue = Get("foobar"); |
| ASSERT_EQ(newvalue, three); |
| dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); |
| newvalue = Get("foobar"); |
| ASSERT_EQ(newvalue, three); |
| |
| // In the same compaction, both of value type and merge type keys need to be |
| // deleted based on compaction filter, and there is a merge type for the key. |
| // For both keys, compaction filter results are ignored. |
| ASSERT_OK(db_->Put(WriteOptions(), "barfoo", two)); |
| ASSERT_OK(Flush()); |
| ASSERT_OK(db_->Merge(WriteOptions(), "barfoo", two)); |
| ASSERT_OK(Flush()); |
| newvalue = Get("barfoo"); |
| ASSERT_EQ(newvalue, four); |
| dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); |
| newvalue = Get("barfoo"); |
| ASSERT_EQ(newvalue, four); |
| } |
| |
| #ifndef ROCKSDB_LITE |
| TEST_F(DBTestCompactionFilter, CompactionFilterContextManual) { |
| KeepFilterFactory* filter = new KeepFilterFactory(true, true); |
| |
| Options options = CurrentOptions(); |
| options.compaction_style = kCompactionStyleUniversal; |
| options.compaction_filter_factory.reset(filter); |
| options.compression = kNoCompression; |
| options.level0_file_num_compaction_trigger = 8; |
| Reopen(options); |
| int num_keys_per_file = 400; |
| for (int j = 0; j < 3; j++) { |
| // Write several keys. |
| const std::string value(10, 'x'); |
| for (int i = 0; i < num_keys_per_file; i++) { |
| char key[100]; |
| snprintf(key, sizeof(key), "B%08d%02d", i, j); |
| Put(key, value); |
| } |
| dbfull()->TEST_FlushMemTable(); |
| // Make sure next file is much smaller so automatic compaction will not |
| // be triggered. |
| num_keys_per_file /= 2; |
| } |
| dbfull()->TEST_WaitForCompact(); |
| |
| // Force a manual compaction |
| cfilter_count = 0; |
| filter->expect_manual_compaction_.store(true); |
| filter->expect_full_compaction_.store(true); |
| filter->expect_cf_id_.store(0); |
| dbfull()->CompactRange(CompactRangeOptions(), nullptr, nullptr); |
| ASSERT_EQ(cfilter_count, 700); |
| ASSERT_EQ(NumSortedRuns(0), 1); |
| ASSERT_TRUE(filter->compaction_filter_created()); |
| |
| // Verify total number of keys is correct after manual compaction. |
| { |
| int count = 0; |
| int total = 0; |
| Arena arena; |
| InternalKeyComparator icmp(options.comparator); |
| RangeDelAggregator range_del_agg(icmp, {} /* snapshots */); |
| ScopedArenaIterator iter( |
| dbfull()->NewInternalIterator(&arena, &range_del_agg)); |
| iter->SeekToFirst(); |
| ASSERT_OK(iter->status()); |
| while (iter->Valid()) { |
| ParsedInternalKey ikey(Slice(), 0, kTypeValue); |
| ikey.sequence = -1; |
| ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true); |
| total++; |
| if (ikey.sequence != 0) { |
| count++; |
| } |
| iter->Next(); |
| } |
| ASSERT_EQ(total, 700); |
| ASSERT_EQ(count, 1); |
| } |
| } |
| #endif // ROCKSDB_LITE |
| |
| TEST_F(DBTestCompactionFilter, CompactionFilterContextCfId) { |
| KeepFilterFactory* filter = new KeepFilterFactory(false, true); |
| filter->expect_cf_id_.store(1); |
| |
| Options options = CurrentOptions(); |
| options.compaction_filter_factory.reset(filter); |
| options.compression = kNoCompression; |
| options.level0_file_num_compaction_trigger = 2; |
| CreateAndReopenWithCF({"pikachu"}, options); |
| |
| int num_keys_per_file = 400; |
| for (int j = 0; j < 3; j++) { |
| // Write several keys. |
| const std::string value(10, 'x'); |
| for (int i = 0; i < num_keys_per_file; i++) { |
| char key[100]; |
| snprintf(key, sizeof(key), "B%08d%02d", i, j); |
| Put(1, key, value); |
| } |
| Flush(1); |
| // Make sure next file is much smaller so automatic compaction will not |
| // be triggered. |
| num_keys_per_file /= 2; |
| } |
| dbfull()->TEST_WaitForCompact(); |
| |
| ASSERT_TRUE(filter->compaction_filter_created()); |
| } |
| |
| #ifndef ROCKSDB_LITE |
| // Compaction filters should only be applied to records that are newer than the |
| // latest snapshot. This test inserts records and applies a delete filter. |
| TEST_F(DBTestCompactionFilter, CompactionFilterSnapshot) { |
| Options options = CurrentOptions(); |
| options.compaction_filter_factory = std::make_shared<DeleteFilterFactory>(); |
| options.disable_auto_compactions = true; |
| options.create_if_missing = true; |
| DestroyAndReopen(options); |
| |
| // Put some data. |
| const Snapshot* snapshot = nullptr; |
| for (int table = 0; table < 4; ++table) { |
| for (int i = 0; i < 10; ++i) { |
| Put(ToString(table * 100 + i), "val"); |
| } |
| Flush(); |
| |
| if (table == 0) { |
| snapshot = db_->GetSnapshot(); |
| } |
| } |
| assert(snapshot != nullptr); |
| |
| cfilter_count = 0; |
| ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); |
| // The filter should delete 10 records. |
| ASSERT_EQ(30U, cfilter_count); |
| |
| // Release the snapshot and compact again -> now all records should be |
| // removed. |
| db_->ReleaseSnapshot(snapshot); |
| ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); |
| ASSERT_EQ(0U, CountLiveFiles()); |
| } |
| |
| // Compaction filters should only be applied to records that are newer than the |
| // latest snapshot. However, if the compaction filter asks to ignore snapshots |
| // records newer than the snapshot will also be processed |
| TEST_F(DBTestCompactionFilter, CompactionFilterIgnoreSnapshot) { |
| std::string five = ToString(5); |
| Options options = CurrentOptions(); |
| options.compaction_filter_factory = std::make_shared<DeleteISFilterFactory>(); |
| options.disable_auto_compactions = true; |
| options.create_if_missing = true; |
| DestroyAndReopen(options); |
| |
| // Put some data. |
| const Snapshot* snapshot = nullptr; |
| for (int table = 0; table < 4; ++table) { |
| for (int i = 0; i < 10; ++i) { |
| Put(ToString(table * 100 + i), "val"); |
| } |
| Flush(); |
| |
| if (table == 0) { |
| snapshot = db_->GetSnapshot(); |
| } |
| } |
| assert(snapshot != nullptr); |
| |
| cfilter_count = 0; |
| ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); |
| // The filter should delete 40 records. |
| ASSERT_EQ(40U, cfilter_count); |
| |
| { |
| // Scan the entire database as of the snapshot to ensure |
| // that nothing is left |
| ReadOptions read_options; |
| read_options.snapshot = snapshot; |
| std::unique_ptr<Iterator> iter(db_->NewIterator(read_options)); |
| iter->SeekToFirst(); |
| int count = 0; |
| while (iter->Valid()) { |
| count++; |
| iter->Next(); |
| } |
| ASSERT_EQ(count, 6); |
| read_options.snapshot = 0; |
| std::unique_ptr<Iterator> iter1(db_->NewIterator(read_options)); |
| iter1->SeekToFirst(); |
| count = 0; |
| while (iter1->Valid()) { |
| count++; |
| iter1->Next(); |
| } |
| // We have deleted 10 keys from 40 using the compaction filter |
| // Keys 6-9 before the snapshot and 100-105 after the snapshot |
| ASSERT_EQ(count, 30); |
| } |
| |
| // Release the snapshot and compact again -> now all records should be |
| // removed. |
| db_->ReleaseSnapshot(snapshot); |
| } |
| #endif // ROCKSDB_LITE |
| |
| TEST_F(DBTestCompactionFilter, SkipUntil) { |
| Options options = CurrentOptions(); |
| options.compaction_filter_factory = std::make_shared<SkipEvenFilterFactory>(); |
| options.disable_auto_compactions = true; |
| options.create_if_missing = true; |
| DestroyAndReopen(options); |
| |
| // Write 100K keys, these are written to a few files in L0. |
| for (int table = 0; table < 4; ++table) { |
| // Key ranges in tables are [0, 38], [106, 149], [212, 260], [318, 371]. |
| for (int i = table * 6; i < 39 + table * 11; ++i) { |
| char key[100]; |
| snprintf(key, sizeof(key), "%010d", table * 100 + i); |
| Put(key, std::to_string(table * 1000 + i)); |
| } |
| Flush(); |
| } |
| |
| cfilter_skips = 0; |
| ASSERT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); |
| // Number of skips in tables: 2, 3, 3, 3. |
| ASSERT_EQ(11, cfilter_skips); |
| |
| for (int table = 0; table < 4; ++table) { |
| for (int i = table * 6; i < 39 + table * 11; ++i) { |
| int k = table * 100 + i; |
| char key[100]; |
| snprintf(key, sizeof(key), "%010d", table * 100 + i); |
| auto expected = std::to_string(table * 1000 + i); |
| std::string val; |
| Status s = db_->Get(ReadOptions(), key, &val); |
| if (k / 10 % 2 == 0) { |
| ASSERT_TRUE(s.IsNotFound()); |
| } else { |
| ASSERT_OK(s); |
| ASSERT_EQ(expected, val); |
| } |
| } |
| } |
| } |
| |
| TEST_F(DBTestCompactionFilter, SkipUntilWithBloomFilter) { |
| BlockBasedTableOptions table_options; |
| table_options.whole_key_filtering = false; |
| table_options.filter_policy.reset(NewBloomFilterPolicy(100, false)); |
| |
| Options options = CurrentOptions(); |
| options.table_factory.reset(NewBlockBasedTableFactory(table_options)); |
| options.prefix_extractor.reset(NewCappedPrefixTransform(9)); |
| options.compaction_filter_factory = std::make_shared<SkipEvenFilterFactory>(); |
| options.disable_auto_compactions = true; |
| options.create_if_missing = true; |
| DestroyAndReopen(options); |
| |
| Put("0000000010", "v10"); |
| Put("0000000020", "v20"); // skipped |
| Put("0000000050", "v50"); |
| Flush(); |
| |
| cfilter_skips = 0; |
| EXPECT_OK(db_->CompactRange(CompactRangeOptions(), nullptr, nullptr)); |
| EXPECT_EQ(1, cfilter_skips); |
| |
| Status s; |
| std::string val; |
| |
| s = db_->Get(ReadOptions(), "0000000010", &val); |
| ASSERT_OK(s); |
| EXPECT_EQ("v10", val); |
| |
| s = db_->Get(ReadOptions(), "0000000020", &val); |
| EXPECT_TRUE(s.IsNotFound()); |
| |
| s = db_->Get(ReadOptions(), "0000000050", &val); |
| ASSERT_OK(s); |
| EXPECT_EQ("v50", val); |
| } |
| |
| } // namespace rocksdb |
| |
| int main(int argc, char** argv) { |
| rocksdb::port::InstallStackTraceHandler(); |
| ::testing::InitGoogleTest(&argc, argv); |
| return RUN_ALL_TESTS(); |
| } |