feat(ts): Support retention in SubKeyFilter for expired TS chunk deletion (#3177)

Part of #3048

---------

Co-authored-by: Twice <twice.mliu@gmail.com>
diff --git a/src/storage/compact_filter.cc b/src/storage/compact_filter.cc
index ab16d0e..44b9e58 100644
--- a/src/storage/compact_filter.cc
+++ b/src/storage/compact_filter.cc
@@ -110,8 +110,8 @@
           ikey.GetKey(), s.Msg());
     return rocksdb::CompactionFilter::Decision::kKeep;
   }
-  // bitmap will be checked in Filter
-  if (metadata.Type() == kRedisBitmap) {
+  // bitmap and timeseries will be checked in Filter
+  if (metadata.Type() == kRedisBitmap || metadata.Type() == kRedisTimeSeries) {
     return rocksdb::CompactionFilter::Decision::kUndetermined;
   }
 
@@ -133,6 +133,18 @@
     return false;
   }
 
+  if (metadata.Type() == kRedisTimeSeries && redis::TimeSeries::IsTSChunkKey(ikey)) {
+    TimeSeriesMetadata ts_metadata(false);
+    Slice input(cached_metadata_);
+    auto s = ts_metadata.Decode(&input);
+    if (!s.ok()) {
+      error("[compact_filter/subkey] Failed to decode timeseries metadata, namespace: {}, key: {}, err: {}",
+            ikey.GetNamespace(), ikey.GetKey(), s.ToString());
+      return false;
+    }
+    return redis::TimeSeries::IsChunkExpired(ts_metadata, value);
+  }
+
   return IsMetadataExpired(ikey, metadata) || (metadata.Type() == kRedisBitmap && redis::Bitmap::IsEmptySegment(value));
 }
 
diff --git a/src/storage/redis_metadata.cc b/src/storage/redis_metadata.cc
index 16fb190..692f880 100644
--- a/src/storage/redis_metadata.cc
+++ b/src/storage/redis_metadata.cc
@@ -547,6 +547,7 @@
   PutFixed8(dst, static_cast<uint8_t>(chunk_type));
   PutFixed8(dst, static_cast<uint8_t>(duplicate_policy));
   PutSizedString(dst, source_key);
+  PutFixed64(dst, last_timestamp);
 }
 
 rocksdb::Status TimeSeriesMetadata::Decode(Slice *input) {
@@ -564,6 +565,7 @@
   Slice source_key_slice;
   GetSizedString(input, &source_key_slice);
   source_key = source_key_slice.ToString();
+  GetFixed64(input, &last_timestamp);
 
   return rocksdb::Status::OK();
 }
diff --git a/src/storage/redis_metadata.h b/src/storage/redis_metadata.h
index 2db3e9d..fd80e5a 100644
--- a/src/storage/redis_metadata.h
+++ b/src/storage/redis_metadata.h
@@ -388,6 +388,7 @@
   ChunkType chunk_type;
   DuplicatePolicy duplicate_policy;
   std::string source_key;
+  uint64_t last_timestamp = 0;  // Approximate last timestamp, used for compaction filter
 
   explicit TimeSeriesMetadata(bool generate_version = true)
       : Metadata(kRedisTimeSeries, generate_version),
diff --git a/src/types/redis_timeseries.cc b/src/types/redis_timeseries.cc
index 7c5050e..34ec769 100644
--- a/src/types/redis_timeseries.cc
+++ b/src/types/redis_timeseries.cc
@@ -882,6 +882,7 @@
   read_options.iterate_lower_bound = &lower_bound;
 
   uint64_t chunk_count = metadata.size;
+  uint64_t recorded_last_timestamp = metadata.last_timestamp;
 
   // Get the latest chunk
   auto iter = util::UniqueIterator(ctx, read_options);
@@ -974,8 +975,13 @@
   if (!new_data_list.empty()) {
     chunk_count += new_data_list.size() - (metadata.size == 0 ? 0 : 1);
   }
-  if (chunk_count != metadata.size) {
+  if (new_data_list.size() > 1) {
+    auto chunk = CreateTSChunkFromData(new_data_list.back());
+    recorded_last_timestamp = chunk->GetLastTimestamp();
+  }
+  if (chunk_count != metadata.size || recorded_last_timestamp != metadata.last_timestamp) {
     metadata.size = chunk_count;
+    metadata.last_timestamp = recorded_last_timestamp;
     std::string bytes;
     metadata.Encode(&bytes);
     s = batch->Put(metadata_cf_handle_, ns_key, bytes);
@@ -1438,6 +1444,16 @@
     }
   }
   if (chunk_count != metadata.size) {
+    // Recode the last timestamp
+    std::vector<TSSample> get_samples;
+    s = getCommon(ctx, ns_key, metadata, true, &get_samples);
+    if (!s.ok()) return s;
+    if (get_samples.empty()) {
+      metadata.last_timestamp = 0;
+    } else {
+      metadata.last_timestamp = get_samples.back().ts;
+    }
+
     metadata.size = chunk_count;
     std::string bytes;
     metadata.Encode(&bytes);
@@ -2154,4 +2170,19 @@
   return s;
 }
 
+bool TimeSeries::IsChunkExpired(const TimeSeriesMetadata &metadata, const Slice &chunk_value) {
+  auto chunk = CreateTSChunkFromData(chunk_value);
+  uint64_t latest_ts = metadata.last_timestamp;
+  uint64_t retention_bound =
+      (metadata.retention_time > 0 && metadata.retention_time < latest_ts) ? latest_ts - metadata.retention_time : 0;
+  return chunk->GetLastTimestamp() < retention_bound;
+}
+
+bool TimeSeries::IsTSChunkKey(const InternalKey &ikey) {
+  auto sub_key = ikey.GetSubKey();
+  auto type = TSSubkeyType::LABEL;
+  bool is_success = GetFixed8(&sub_key, reinterpret_cast<uint8_t *>(&type));
+  return is_success && type == TSSubkeyType::CHUNK;
+}
+
 }  // namespace redis
diff --git a/src/types/redis_timeseries.h b/src/types/redis_timeseries.h
index f895c8a..e0e5e5c 100644
--- a/src/types/redis_timeseries.h
+++ b/src/types/redis_timeseries.h
@@ -283,6 +283,8 @@
   rocksdb::Status IncrBy(engine::Context &ctx, const Slice &user_key, TSSample sample, const TSCreateOption &option,
                          AddResult *res);
   rocksdb::Status Del(engine::Context &ctx, const Slice &user_key, uint64_t from, uint64_t to, uint64_t *deleted);
+  static bool IsChunkExpired(const TimeSeriesMetadata &metadata, const Slice &chunk_value);
+  static bool IsTSChunkKey(const InternalKey &ikey);
 
  private:
   // Bundles the arguments for a downstream upsert operation
diff --git a/tests/cppunit/compact_test.cc b/tests/cppunit/compact_test.cc
index 4c1fd9f..7a9c9d9 100644
--- a/tests/cppunit/compact_test.cc
+++ b/tests/cppunit/compact_test.cc
@@ -256,3 +256,76 @@
     std::cout << "Encounter filesystem error: " << ec << std::endl;
   }
 }
+
+TEST(Compact, TSRetention) {
+  Config config;
+  config.db_dir = "compactdb_tsretention";
+  config.slot_id_encoded = false;
+
+  auto storage = std::make_unique<engine::Storage>(&config);
+  auto s = storage->Open();
+  assert(s.IsOK());
+
+  std::string ns = "test_compact_tsretention";
+  auto timeseries = std::make_unique<redis::TimeSeries>(storage.get(), ns);
+  engine::Context ctx(storage.get());
+
+  std::string ts_key = "ts_key";
+  redis::TSCreateOption create_option;
+  create_option.chunk_size = 3;
+  create_option.retention_time = 100;
+  ASSERT_TRUE(timeseries->Create(ctx, ts_key, create_option).ok());
+
+  rocksdb::DB* db = storage->GetDB();
+  rocksdb::ReadOptions read_options;
+  read_options.fill_cache = false;
+  auto get_all_chunks = [&]() {
+    auto iter = std::unique_ptr<rocksdb::Iterator>(
+        db->NewIterator(read_options, storage->GetCFHandle(ColumnFamilyID::PrimarySubkey)));
+    std::vector<uint64_t> chunk_ids;
+    for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+      Slice slice(iter->key());
+      slice.remove_prefix(slice.size() - sizeof(uint64_t));
+      uint64_t chunk_id = 0;
+      GetFixed64(&slice, &chunk_id);
+      chunk_ids.push_back(chunk_id);
+    }
+    return chunk_ids;
+  };
+
+  // Add two chunk
+  std::vector<TSSample> samples = {{1, 1.0}, {2, 2.0}, {3, 3.0}, {4, 4.0}, {5, 5.0}, {10, 10.0}};
+  std::vector<TSChunk::AddResult> add_results;
+  ASSERT_TRUE(timeseries->MAdd(ctx, ts_key, samples, &add_results).ok());
+
+  // There should be two chunk key
+  auto chunk_ids = get_all_chunks();
+  ASSERT_EQ(chunk_ids.size(), 2);
+  ASSERT_EQ(chunk_ids[0], 1);
+  ASSERT_EQ(chunk_ids[1], 4);
+
+  // Add a sample to make last_timestamp = 110, then the first chunk is expired
+  samples = {{110, 110.0}};
+  ASSERT_TRUE(timeseries->MAdd(ctx, ts_key, samples, &add_results).ok());
+  ASSERT_TRUE(storage->Compact(nullptr, nullptr, nullptr).ok());
+
+  // Check the first chunk is deleted
+  chunk_ids = get_all_chunks();
+  ASSERT_EQ(chunk_ids.size(), 2);
+  ASSERT_EQ(chunk_ids[0], 4);
+  ASSERT_EQ(chunk_ids[1], 110);
+
+  // Check samples should be kept
+  redis::TSRangeOption range_option;
+  std::vector<TSSample> range_result;
+  ASSERT_TRUE(timeseries->Range(ctx, ts_key, range_option, &range_result).ok());
+  ASSERT_EQ(range_result.size(), 2);
+  ASSERT_EQ(range_result[0].ts, 10);
+  ASSERT_EQ(range_result[1].ts, 110);
+
+  std::error_code ec;
+  std::filesystem::remove_all(config.db_dir, ec);
+  if (ec) {
+    std::cout << "Encounter filesystem error: " << ec << std::endl;
+  }
+}