feat: start compaction operation in CompactionFilter::Filter (#780)
diff --git a/src/server/compaction_operation.cpp b/src/server/compaction_operation.cpp
index 86b12c0..4b81040 100644
--- a/src/server/compaction_operation.cpp
+++ b/src/server/compaction_operation.cpp
@@ -174,7 +174,7 @@
enum_to_string(op.type), dsn::PROVIDER_TYPE_MAIN, op.params, data_version);
if (operation != nullptr) {
operation->set_rules(std::move(rules));
- res.emplace_back(std::unique_ptr<compaction_operation>(operation));
+ res.emplace_back(std::shared_ptr<compaction_operation>(operation));
}
}
return res;
diff --git a/src/server/compaction_operation.h b/src/server/compaction_operation.h
index f1cdf93..e20b3b4 100644
--- a/src/server/compaction_operation.h
+++ b/src/server/compaction_operation.h
@@ -159,7 +159,7 @@
FRIEND_TEST(compaction_filter_operation_test, create_operations);
};
-typedef std::vector<std::unique_ptr<compaction_operation>> compaction_operations;
+typedef std::vector<std::shared_ptr<compaction_operation>> compaction_operations;
compaction_operations create_compaction_operations(const std::string &json, uint32_t data_version);
void register_compaction_operations();
} // namespace server
diff --git a/src/server/key_ttl_compaction_filter.h b/src/server/key_ttl_compaction_filter.h
index c049e83..e3f94a3 100644
--- a/src/server/key_ttl_compaction_filter.h
+++ b/src/server/key_ttl_compaction_filter.h
@@ -40,13 +40,15 @@
bool enabled,
int32_t pidx,
int32_t partition_version,
- bool validate_hash)
+ bool validate_hash,
+ compaction_operations &&compaction_ops)
: _pegasus_data_version(pegasus_data_version),
_default_ttl(default_ttl),
_enabled(enabled),
_partition_index(pidx),
_partition_version(partition_version),
- _validate_partition_hash(validate_hash)
+ _validate_partition_hash(validate_hash),
+ _user_specified_operations(std::move(compaction_ops))
{
}
@@ -60,6 +62,17 @@
return false;
}
+ // ignore empty write. Empty writes will deleted by the compaction of rocksdb. We don't need
+ // deal with it here.
+ if (key.size() < 2) {
+ return false;
+ }
+
+ if (!_user_specified_operations.empty() &&
+ user_specified_operation_filter(key, existing_value, new_value, value_changed)) {
+ return true;
+ }
+
uint32_t expire_ts =
pegasus_extract_expire_ts(_pegasus_data_version, utils::to_string_view(existing_value));
if (_default_ttl != 0 && expire_ts == 0) {
@@ -73,6 +86,22 @@
return check_if_ts_expired(utils::epoch_now(), expire_ts) || check_if_stale_split_data(key);
}
+ bool user_specified_operation_filter(const rocksdb::Slice &key,
+ const rocksdb::Slice &existing_value,
+ std::string *new_value,
+ bool *value_changed) const
+ {
+ std::string hash_key, sort_key;
+ pegasus_restore_key(dsn::blob(key.data(), 0, key.size()), hash_key, sort_key);
+ for (const auto &op : _user_specified_operations) {
+ if (op->filter(hash_key, sort_key, existing_value, new_value, value_changed)) {
+ // return true if this data need to be deleted
+ return true;
+ }
+ }
+ return false;
+ }
+
const char *Name() const override { return "KeyWithTTLCompactionFilter"; }
// Check if the record is stale after partition split, which will split the partition into two
@@ -94,6 +123,7 @@
int32_t _partition_index;
int32_t _partition_version;
bool _validate_partition_hash;
+ compaction_operations _user_specified_operations;
};
class KeyWithTTLCompactionFilterFactory : public rocksdb::CompactionFilterFactory
@@ -105,13 +135,20 @@
std::unique_ptr<rocksdb::CompactionFilter>
CreateCompactionFilter(const rocksdb::CompactionFilter::Context & /*context*/) override
{
+ compaction_operations tmp_filter_operations;
+ {
+ dsn::utils::auto_read_lock l(_lock);
+ tmp_filter_operations = _user_specified_operations;
+ }
+
return std::unique_ptr<KeyWithTTLCompactionFilter>(
new KeyWithTTLCompactionFilter(_pegasus_data_version.load(),
_default_ttl.load(),
_enabled.load(),
_partition_index.load(),
_partition_version.load(),
- _validate_partition_hash.load()));
+ _validate_partition_hash.load(),
+ std::move(tmp_filter_operations)));
}
const char *Name() const override { return "KeyWithTTLCompactionFilterFactory"; }