feat(ts): Support retention in SubKeyFilter for expired TS chunk deletion (#3177)
Part of #3048
---------
Co-authored-by: Twice <twice.mliu@gmail.com>
diff --git a/src/storage/compact_filter.cc b/src/storage/compact_filter.cc
index ab16d0e..44b9e58 100644
--- a/src/storage/compact_filter.cc
+++ b/src/storage/compact_filter.cc
@@ -110,8 +110,8 @@
ikey.GetKey(), s.Msg());
return rocksdb::CompactionFilter::Decision::kKeep;
}
- // bitmap will be checked in Filter
- if (metadata.Type() == kRedisBitmap) {
+ // bitmap and timeseries will be checked in Filter
+ if (metadata.Type() == kRedisBitmap || metadata.Type() == kRedisTimeSeries) {
return rocksdb::CompactionFilter::Decision::kUndetermined;
}
@@ -133,6 +133,18 @@
return false;
}
+ if (metadata.Type() == kRedisTimeSeries && redis::TimeSeries::IsTSChunkKey(ikey)) {
+ TimeSeriesMetadata ts_metadata(false);
+ Slice input(cached_metadata_);
+ auto s = ts_metadata.Decode(&input);
+ if (!s.ok()) {
+ error("[compact_filter/subkey] Failed to decode timeseries metadata, namespace: {}, key: {}, err: {}",
+ ikey.GetNamespace(), ikey.GetKey(), s.ToString());
+ return false;
+ }
+ return redis::TimeSeries::IsChunkExpired(ts_metadata, value);
+ }
+
return IsMetadataExpired(ikey, metadata) || (metadata.Type() == kRedisBitmap && redis::Bitmap::IsEmptySegment(value));
}
diff --git a/src/storage/redis_metadata.cc b/src/storage/redis_metadata.cc
index 16fb190..692f880 100644
--- a/src/storage/redis_metadata.cc
+++ b/src/storage/redis_metadata.cc
@@ -547,6 +547,7 @@
PutFixed8(dst, static_cast<uint8_t>(chunk_type));
PutFixed8(dst, static_cast<uint8_t>(duplicate_policy));
PutSizedString(dst, source_key);
+ PutFixed64(dst, last_timestamp);
}
rocksdb::Status TimeSeriesMetadata::Decode(Slice *input) {
@@ -564,6 +565,7 @@
Slice source_key_slice;
GetSizedString(input, &source_key_slice);
source_key = source_key_slice.ToString();
+ GetFixed64(input, &last_timestamp);
return rocksdb::Status::OK();
}
diff --git a/src/storage/redis_metadata.h b/src/storage/redis_metadata.h
index 2db3e9d..fd80e5a 100644
--- a/src/storage/redis_metadata.h
+++ b/src/storage/redis_metadata.h
@@ -388,6 +388,7 @@
ChunkType chunk_type;
DuplicatePolicy duplicate_policy;
std::string source_key;
+ uint64_t last_timestamp = 0; // Approximate last timestamp, used for compaction filter
explicit TimeSeriesMetadata(bool generate_version = true)
: Metadata(kRedisTimeSeries, generate_version),
diff --git a/src/types/redis_timeseries.cc b/src/types/redis_timeseries.cc
index 7c5050e..34ec769 100644
--- a/src/types/redis_timeseries.cc
+++ b/src/types/redis_timeseries.cc
@@ -882,6 +882,7 @@
read_options.iterate_lower_bound = &lower_bound;
uint64_t chunk_count = metadata.size;
+ uint64_t recorded_last_timestamp = metadata.last_timestamp;
// Get the latest chunk
auto iter = util::UniqueIterator(ctx, read_options);
@@ -974,8 +975,13 @@
if (!new_data_list.empty()) {
chunk_count += new_data_list.size() - (metadata.size == 0 ? 0 : 1);
}
- if (chunk_count != metadata.size) {
+ if (new_data_list.size() > 1) {
+ auto chunk = CreateTSChunkFromData(new_data_list.back());
+ recorded_last_timestamp = chunk->GetLastTimestamp();
+ }
+ if (chunk_count != metadata.size || recorded_last_timestamp != metadata.last_timestamp) {
metadata.size = chunk_count;
+ metadata.last_timestamp = recorded_last_timestamp;
std::string bytes;
metadata.Encode(&bytes);
s = batch->Put(metadata_cf_handle_, ns_key, bytes);
@@ -1438,6 +1444,16 @@
}
}
if (chunk_count != metadata.size) {
+ // Recode the last timestamp
+ std::vector<TSSample> get_samples;
+ s = getCommon(ctx, ns_key, metadata, true, &get_samples);
+ if (!s.ok()) return s;
+ if (get_samples.empty()) {
+ metadata.last_timestamp = 0;
+ } else {
+ metadata.last_timestamp = get_samples.back().ts;
+ }
+
metadata.size = chunk_count;
std::string bytes;
metadata.Encode(&bytes);
@@ -2154,4 +2170,19 @@
return s;
}
+bool TimeSeries::IsChunkExpired(const TimeSeriesMetadata &metadata, const Slice &chunk_value) {
+ auto chunk = CreateTSChunkFromData(chunk_value);
+ uint64_t latest_ts = metadata.last_timestamp;
+ uint64_t retention_bound =
+ (metadata.retention_time > 0 && metadata.retention_time < latest_ts) ? latest_ts - metadata.retention_time : 0;
+ return chunk->GetLastTimestamp() < retention_bound;
+}
+
+bool TimeSeries::IsTSChunkKey(const InternalKey &ikey) {
+ auto sub_key = ikey.GetSubKey();
+ auto type = TSSubkeyType::LABEL;
+ bool is_success = GetFixed8(&sub_key, reinterpret_cast<uint8_t *>(&type));
+ return is_success && type == TSSubkeyType::CHUNK;
+}
+
} // namespace redis
diff --git a/src/types/redis_timeseries.h b/src/types/redis_timeseries.h
index f895c8a..e0e5e5c 100644
--- a/src/types/redis_timeseries.h
+++ b/src/types/redis_timeseries.h
@@ -283,6 +283,8 @@
rocksdb::Status IncrBy(engine::Context &ctx, const Slice &user_key, TSSample sample, const TSCreateOption &option,
AddResult *res);
rocksdb::Status Del(engine::Context &ctx, const Slice &user_key, uint64_t from, uint64_t to, uint64_t *deleted);
+ static bool IsChunkExpired(const TimeSeriesMetadata &metadata, const Slice &chunk_value);
+ static bool IsTSChunkKey(const InternalKey &ikey);
private:
// Bundles the arguments for a downstream upsert operation
diff --git a/tests/cppunit/compact_test.cc b/tests/cppunit/compact_test.cc
index 4c1fd9f..7a9c9d9 100644
--- a/tests/cppunit/compact_test.cc
+++ b/tests/cppunit/compact_test.cc
@@ -256,3 +256,76 @@
std::cout << "Encounter filesystem error: " << ec << std::endl;
}
}
+
+TEST(Compact, TSRetention) {
+ Config config;
+ config.db_dir = "compactdb_tsretention";
+ config.slot_id_encoded = false;
+
+ auto storage = std::make_unique<engine::Storage>(&config);
+ auto s = storage->Open();
+ assert(s.IsOK());
+
+ std::string ns = "test_compact_tsretention";
+ auto timeseries = std::make_unique<redis::TimeSeries>(storage.get(), ns);
+ engine::Context ctx(storage.get());
+
+ std::string ts_key = "ts_key";
+ redis::TSCreateOption create_option;
+ create_option.chunk_size = 3;
+ create_option.retention_time = 100;
+ ASSERT_TRUE(timeseries->Create(ctx, ts_key, create_option).ok());
+
+ rocksdb::DB* db = storage->GetDB();
+ rocksdb::ReadOptions read_options;
+ read_options.fill_cache = false;
+ auto get_all_chunks = [&]() {
+ auto iter = std::unique_ptr<rocksdb::Iterator>(
+ db->NewIterator(read_options, storage->GetCFHandle(ColumnFamilyID::PrimarySubkey)));
+ std::vector<uint64_t> chunk_ids;
+ for (iter->SeekToFirst(); iter->Valid(); iter->Next()) {
+ Slice slice(iter->key());
+ slice.remove_prefix(slice.size() - sizeof(uint64_t));
+ uint64_t chunk_id = 0;
+ GetFixed64(&slice, &chunk_id);
+ chunk_ids.push_back(chunk_id);
+ }
+ return chunk_ids;
+ };
+
+ // Add two chunk
+ std::vector<TSSample> samples = {{1, 1.0}, {2, 2.0}, {3, 3.0}, {4, 4.0}, {5, 5.0}, {10, 10.0}};
+ std::vector<TSChunk::AddResult> add_results;
+ ASSERT_TRUE(timeseries->MAdd(ctx, ts_key, samples, &add_results).ok());
+
+ // There should be two chunk key
+ auto chunk_ids = get_all_chunks();
+ ASSERT_EQ(chunk_ids.size(), 2);
+ ASSERT_EQ(chunk_ids[0], 1);
+ ASSERT_EQ(chunk_ids[1], 4);
+
+ // Add a sample to make last_timestamp = 110, then the first chunk is expired
+ samples = {{110, 110.0}};
+ ASSERT_TRUE(timeseries->MAdd(ctx, ts_key, samples, &add_results).ok());
+ ASSERT_TRUE(storage->Compact(nullptr, nullptr, nullptr).ok());
+
+ // Check the first chunk is deleted
+ chunk_ids = get_all_chunks();
+ ASSERT_EQ(chunk_ids.size(), 2);
+ ASSERT_EQ(chunk_ids[0], 4);
+ ASSERT_EQ(chunk_ids[1], 110);
+
+ // Check samples should be kept
+ redis::TSRangeOption range_option;
+ std::vector<TSSample> range_result;
+ ASSERT_TRUE(timeseries->Range(ctx, ts_key, range_option, &range_result).ok());
+ ASSERT_EQ(range_result.size(), 2);
+ ASSERT_EQ(range_result[0].ts, 10);
+ ASSERT_EQ(range_result[1].ts, 110);
+
+ std::error_code ec;
+ std::filesystem::remove_all(config.db_dir, ec);
+ if (ec) {
+ std::cout << "Encounter filesystem error: " << ec << std::endl;
+ }
+}