feat: start compaction operation in CompactionFilter::Filter (#780)

diff --git a/src/server/compaction_operation.cpp b/src/server/compaction_operation.cpp
index 86b12c0..4b81040 100644
--- a/src/server/compaction_operation.cpp
+++ b/src/server/compaction_operation.cpp
@@ -174,7 +174,7 @@
             enum_to_string(op.type), dsn::PROVIDER_TYPE_MAIN, op.params, data_version);
         if (operation != nullptr) {
             operation->set_rules(std::move(rules));
-            res.emplace_back(std::unique_ptr<compaction_operation>(operation));
+            res.emplace_back(std::shared_ptr<compaction_operation>(operation));
         }
     }
     return res;
diff --git a/src/server/compaction_operation.h b/src/server/compaction_operation.h
index f1cdf93..e20b3b4 100644
--- a/src/server/compaction_operation.h
+++ b/src/server/compaction_operation.h
@@ -159,7 +159,7 @@
     FRIEND_TEST(compaction_filter_operation_test, create_operations);
 };
 
-typedef std::vector<std::unique_ptr<compaction_operation>> compaction_operations;
+typedef std::vector<std::shared_ptr<compaction_operation>> compaction_operations;
 compaction_operations create_compaction_operations(const std::string &json, uint32_t data_version);
 void register_compaction_operations();
 } // namespace server
diff --git a/src/server/key_ttl_compaction_filter.h b/src/server/key_ttl_compaction_filter.h
index c049e83..e3f94a3 100644
--- a/src/server/key_ttl_compaction_filter.h
+++ b/src/server/key_ttl_compaction_filter.h
@@ -40,13 +40,15 @@
                                bool enabled,
                                int32_t pidx,
                                int32_t partition_version,
-                               bool validate_hash)
+                               bool validate_hash,
+                               compaction_operations &&compaction_ops)
         : _pegasus_data_version(pegasus_data_version),
           _default_ttl(default_ttl),
           _enabled(enabled),
           _partition_index(pidx),
           _partition_version(partition_version),
-          _validate_partition_hash(validate_hash)
+          _validate_partition_hash(validate_hash),
+          _user_specified_operations(std::move(compaction_ops))
     {
     }
 
@@ -60,6 +62,17 @@
             return false;
         }
 
+        // ignore empty write. Empty writes will deleted by the compaction of rocksdb. We don't need
+        // deal with it here.
+        if (key.size() < 2) {
+            return false;
+        }
+
+        if (!_user_specified_operations.empty() &&
+            user_specified_operation_filter(key, existing_value, new_value, value_changed)) {
+            return true;
+        }
+
         uint32_t expire_ts =
             pegasus_extract_expire_ts(_pegasus_data_version, utils::to_string_view(existing_value));
         if (_default_ttl != 0 && expire_ts == 0) {
@@ -73,6 +86,22 @@
         return check_if_ts_expired(utils::epoch_now(), expire_ts) || check_if_stale_split_data(key);
     }
 
+    bool user_specified_operation_filter(const rocksdb::Slice &key,
+                                         const rocksdb::Slice &existing_value,
+                                         std::string *new_value,
+                                         bool *value_changed) const
+    {
+        std::string hash_key, sort_key;
+        pegasus_restore_key(dsn::blob(key.data(), 0, key.size()), hash_key, sort_key);
+        for (const auto &op : _user_specified_operations) {
+            if (op->filter(hash_key, sort_key, existing_value, new_value, value_changed)) {
+                // return true if this data need to be deleted
+                return true;
+            }
+        }
+        return false;
+    }
+
     const char *Name() const override { return "KeyWithTTLCompactionFilter"; }
 
     // Check if the record is stale after partition split, which will split the partition into two
@@ -94,6 +123,7 @@
     int32_t _partition_index;
     int32_t _partition_version;
     bool _validate_partition_hash;
+    compaction_operations _user_specified_operations;
 };
 
 class KeyWithTTLCompactionFilterFactory : public rocksdb::CompactionFilterFactory
@@ -105,13 +135,20 @@
     std::unique_ptr<rocksdb::CompactionFilter>
     CreateCompactionFilter(const rocksdb::CompactionFilter::Context & /*context*/) override
     {
+        compaction_operations tmp_filter_operations;
+        {
+            dsn::utils::auto_read_lock l(_lock);
+            tmp_filter_operations = _user_specified_operations;
+        }
+
         return std::unique_ptr<KeyWithTTLCompactionFilter>(
             new KeyWithTTLCompactionFilter(_pegasus_data_version.load(),
                                            _default_ttl.load(),
                                            _enabled.load(),
                                            _partition_index.load(),
                                            _partition_version.load(),
-                                           _validate_partition_hash.load()));
+                                           _validate_partition_hash.load(),
+                                           std::move(tmp_filter_operations)));
     }
     const char *Name() const override { return "KeyWithTTLCompactionFilterFactory"; }