feat(ts): Add support for deleting range of data and `TS.DEL` command (#3174)
```
127.0.0.1:6666> TS.CREATE temp:JLM LABELS type temp location JLM
127.0.0.1:6666> TS.MADD temp:JLM 1005 30 temp:JLM 1015 35 temp:JLM 1025 9999 temp:JLM 1035 40
127.0.0.1:6666> TS.DEL temp:JLM 1000 1030
(integer) 3
127.0.0.1:6666> TS.DEL temp:JLM - +
(integer) 1
```
diff --git a/src/commands/cmd_timeseries.cc b/src/commands/cmd_timeseries.cc
index 8c1fda7..1726ebd 100644
--- a/src/commands/cmd_timeseries.cc
+++ b/src/commands/cmd_timeseries.cc
@@ -1039,6 +1039,53 @@
double value_ = 0;
};
+class CommandTSDel : public Commander {
+ public:
+ Status Parse(const std::vector<std::string> &args) override {
+ if (args.size() < 4) {
+ return {Status::RedisParseErr, "wrong number of arguments for 'ts.del' command"};
+ }
+ CommandParser parser(args, 2);
+ // Parse start timestamp
+ auto start_parse = parser.TakeInt<uint64_t>();
+ if (!start_parse.IsOK()) {
+ auto start_ts_str = parser.TakeStr();
+ if (!start_ts_str.IsOK() || start_ts_str.GetValue() != "-") {
+ return {Status::RedisParseErr, "wrong fromTimestamp"};
+ }
+ // "-" means use default start timestamp: 0
+ } else {
+ start_ts_ = start_parse.GetValue();
+ }
+ // Parse end timestamp
+ auto end_parse = parser.TakeInt<uint64_t>();
+ if (!end_parse.IsOK()) {
+ auto end_ts_str = parser.TakeStr();
+ if (!end_ts_str.IsOK() || end_ts_str.GetValue() != "+") {
+ return {Status::RedisParseErr, "wrong toTimestamp"};
+ }
+ // "+" means use default end timestamp: MAX_TIMESTAMP
+ } else {
+ end_ts_ = end_parse.GetValue();
+ }
+ return Commander::Parse(args);
+ }
+ Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
+ auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
+ uint64_t deleted_count = 0;
+ auto s = timeseries_db.Del(ctx, args_[1], start_ts_, end_ts_, &deleted_count);
+ if (!s.ok()) {
+ return {Status::RedisExecErr, s.ToString()};
+ }
+ *output = redis::Integer(deleted_count);
+ return Status::OK();
+ }
+
+ private:
+ uint64_t start_ts_ = 0;
+ uint64_t end_ts_ = TSSample::MAX_TIMESTAMP;
+};
+
REDIS_REGISTER_COMMANDS(Timeseries, MakeCmdAttr<CommandTSCreate>("ts.create", -2, "write", 1, 1, 1),
MakeCmdAttr<CommandTSAdd>("ts.add", -4, "write", 1, 1, 1),
MakeCmdAttr<CommandTSMAdd>("ts.madd", -4, "write", 1, -3, 1),
@@ -1049,6 +1096,7 @@
MakeCmdAttr<CommandTSMGet>("ts.mget", -3, "read-only", NO_KEY),
MakeCmdAttr<CommandTSMRange>("ts.mrange", -5, "read-only", NO_KEY),
MakeCmdAttr<CommandTSIncrByDecrBy>("ts.incrby", -3, "write", 1, 1, 1),
- MakeCmdAttr<CommandTSIncrByDecrBy>("ts.decrby", -3, "write", 1, 1, 1), );
+ MakeCmdAttr<CommandTSIncrByDecrBy>("ts.decrby", -3, "write", 1, 1, 1),
+ MakeCmdAttr<CommandTSDel>("ts.del", -4, "write", 1, 1, 1), );
} // namespace redis
diff --git a/src/types/redis_timeseries.cc b/src/types/redis_timeseries.cc
index b108cfb..540d582 100644
--- a/src/types/redis_timeseries.cc
+++ b/src/types/redis_timeseries.cc
@@ -320,6 +320,7 @@
}
void TSDownStreamMeta::AggregateLatestBucket(nonstd::span<const TSSample> samples) {
+ if (samples.empty()) return;
double temp_v = 0.0;
switch (aggregator.type) {
case TSAggregatorType::SUM:
@@ -732,15 +733,18 @@
return spans;
}
-nonstd::span<const TSSample> TSAggregator::GetBucketByTimestamp(nonstd::span<const TSSample> samples,
- uint64_t ts) const {
+nonstd::span<const TSSample> TSAggregator::GetBucketByTimestamp(nonstd::span<const TSSample> samples, uint64_t ts,
+ uint64_t less_than) const {
if (type == TSAggregatorType::NONE || samples.empty()) {
return {};
}
uint64_t start_bucket = CalculateAlignedBucketLeft(ts);
- uint64_t end_bucket = CalculateAlignedBucketRight(ts);
+ uint64_t end_bucket = std::min(CalculateAlignedBucketRight(ts), less_than);
auto lower = std::lower_bound(samples.begin(), samples.end(), TSSample{start_bucket, 0.0});
auto upper = std::lower_bound(lower, samples.end(), TSSample{end_bucket, 0.0});
+ if (lower == upper) {
+ return {};
+ }
return {lower, upper};
}
@@ -830,8 +834,23 @@
rocksdb::Status TimeSeries::upsertCommon(engine::Context &ctx, const Slice &ns_key, TimeSeriesMetadata &metadata,
SampleBatch &sample_batch, std::vector<std::string> *new_chunks) {
+ auto batch = storage_->GetWriteBatchBase();
+ auto s = upsertCommonInBatch(ctx, ns_key, metadata, sample_batch, batch, new_chunks);
+ if (!s.ok()) return s;
+ return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
+}
+
+rocksdb::Status TimeSeries::upsertCommonInBatch(engine::Context &ctx, const Slice &ns_key, TimeSeriesMetadata &metadata,
+ SampleBatch &sample_batch,
+ ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch,
+ std::vector<std::string> *new_chunks) {
auto all_batch_slice = sample_batch.AsSlice();
+ if (all_batch_slice.GetSampleSpan().empty() && new_chunks != nullptr) {
+ new_chunks->clear();
+ return rocksdb::Status::OK();
+ }
+
// In the emun `TSSubkeyType`, `LABEL` is the next of `CHUNK`
std::string chunk_upper_bound = internalKeyFromLabelKey(ns_key, metadata, "");
std::string end_key = internalKeyFromChunkID(ns_key, metadata, TSSample::MAX_TIMESTAMP);
@@ -867,7 +886,6 @@
return rocksdb::Status::OK();
}
- auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisTimeSeries);
auto s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
@@ -953,7 +971,7 @@
}
}
- return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
+ return rocksdb::Status::OK();
}
rocksdb::Status TimeSeries::rangeCommon(engine::Context &ctx, const Slice &ns_key, const TimeSeriesMetadata &metadata,
@@ -1301,6 +1319,219 @@
return rocksdb::Status::OK();
}
+rocksdb::Status TimeSeries::delRangeCommon(engine::Context &ctx, const Slice &ns_key, TimeSeriesMetadata &metadata,
+ uint64_t from, uint64_t to, uint64_t *deleted, bool inclusive_to) {
+ auto batch = storage_->GetWriteBatchBase();
+ auto s = delRangeCommonInBatch(ctx, ns_key, metadata, from, to, batch, deleted, inclusive_to);
+ if (!s.ok()) return s;
+ return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
+}
+
+rocksdb::Status TimeSeries::delRangeCommonInBatch(engine::Context &ctx, const Slice &ns_key,
+ TimeSeriesMetadata &metadata, uint64_t from, uint64_t to,
+ ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch,
+ uint64_t *deleted, bool inclusive_to) {
+ *deleted = 0;
+ if (from > to || (from == to && !inclusive_to)) {
+ return rocksdb::Status::OK();
+ }
+ // In the emun `TSSubkeyType`, `LABEL` is the next of `CHUNK`
+ std::string start_key = internalKeyFromChunkID(ns_key, metadata, from);
+ std::string prefix = start_key.substr(0, start_key.size() - sizeof(uint64_t));
+ std::string end_key;
+ if (to == TSSample::MAX_TIMESTAMP && inclusive_to) {
+ end_key = internalKeyFromLabelKey(ns_key, metadata, "");
+ } else if (inclusive_to) {
+ end_key = internalKeyFromChunkID(ns_key, metadata, to + 1);
+ } else {
+ end_key = internalKeyFromChunkID(ns_key, metadata, to);
+ }
+
+ uint64_t chunk_count = metadata.size;
+
+ rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
+ rocksdb::Slice upper_bound(end_key);
+ read_options.iterate_upper_bound = &upper_bound;
+ rocksdb::Slice lower_bound(prefix);
+ read_options.iterate_lower_bound = &lower_bound;
+
+ WriteBatchLogData log_data(kRedisTimeSeries);
+ auto s = batch->PutLogData(log_data.Encode());
+ if (!s.ok()) return s;
+
+ auto iter = util::UniqueIterator(ctx, read_options);
+ iter->SeekForPrev(start_key);
+ if (!iter->Valid()) {
+ iter->Seek(start_key);
+ } else if (!iter->key().starts_with(prefix)) {
+ iter->Next();
+ }
+ for (; iter->Valid() && iter->key().starts_with(prefix); iter->Next()) {
+ auto chunk = CreateTSChunkFromData(iter->value());
+ uint64_t deleted_temp = 0;
+ auto new_chunk_data = chunk->RemoveSamplesBetween(from, to, &deleted_temp, inclusive_to);
+ if (new_chunk_data.empty() || deleted_temp == 0) {
+ // No samples deleted
+ continue;
+ }
+ *deleted += deleted_temp;
+ auto new_chunk = CreateTSChunkFromData(new_chunk_data);
+ bool need_delete_old_key = false;
+ if (new_chunk->GetCount() == 0) {
+ // Delete the whole chunk
+ need_delete_old_key = true;
+ if (chunk_count > 0) chunk_count--;
+ } else {
+ auto new_key = internalKeyFromChunkID(ns_key, metadata, new_chunk->GetFirstTimestamp());
+ if (new_key != iter->key()) {
+ // Change the chunk key
+ need_delete_old_key = true;
+ }
+ s = batch->Put(new_key, new_chunk_data);
+ if (!s.ok()) return s;
+ }
+ if (need_delete_old_key) {
+ s = batch->Delete(iter->key());
+ if (!s.ok()) return s;
+ }
+ }
+ if (chunk_count != metadata.size) {
+ metadata.size = chunk_count;
+ std::string bytes;
+ metadata.Encode(&bytes);
+ s = batch->Put(metadata_cf_handle_, ns_key, bytes);
+ if (!s.ok()) return s;
+ }
+
+ return rocksdb::Status::OK();
+}
+
+rocksdb::Status TimeSeries::delRangeDownStream(engine::Context &ctx, const Slice &ns_key, TimeSeriesMetadata &metadata,
+ std::vector<std::string> &ds_keys,
+ std::vector<TSDownStreamMeta> &ds_metas, uint64_t from, uint64_t to) {
+ if (from > to || ds_keys.empty()) return rocksdb::Status::OK();
+
+ auto batch = storage_->GetWriteBatchBase();
+ WriteBatchLogData log_data(kRedisTimeSeries);
+ auto s = batch->PutLogData(log_data.Encode());
+ if (!s.ok()) return s;
+
+ // Calculate key boundaries for latest chunk retrieval
+ std::string chunk_upper_bound = internalKeyFromLabelKey(ns_key, metadata, "");
+ std::string end_key = internalKeyFromChunkID(ns_key, metadata, TSSample::MAX_TIMESTAMP);
+ std::string prefix = end_key.substr(0, end_key.size() - sizeof(uint64_t));
+ // Configure read options for reverse iteration
+ rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
+ rocksdb::Slice upper_bound(chunk_upper_bound);
+ read_options.iterate_upper_bound = &upper_bound;
+ rocksdb::Slice lower_bound(prefix);
+ read_options.iterate_lower_bound = &lower_bound;
+ // Retrieve the latest chunk for boundary calculations
+ auto iter = util::UniqueIterator(ctx, read_options);
+ iter->SeekForPrev(end_key);
+
+ // If no chunks found
+ uint64_t last_chunk_start = 0;
+ uint64_t last_chunk_end = 0;
+ // Check if any chunks exist for the source time series.
+ bool has_chunk = iter->Valid() && iter->key().starts_with(prefix);
+ if (has_chunk) {
+ auto last_chunk = CreateTSChunkFromData(iter->value());
+ last_chunk_start = last_chunk->GetFirstTimestamp();
+ last_chunk_end = last_chunk->GetLastTimestamp();
+ }
+ iter->Reset(); // Release iterator resources
+
+ // Determine global time range needed for sample retrieval
+ uint64_t retrieve_start_ts = TSSample::MAX_TIMESTAMP;
+ uint64_t retrieve_end_ts = 0;
+ if (has_chunk) {
+ for (const auto &ds_meta : ds_metas) {
+ retrieve_start_ts = std::min(retrieve_start_ts, ds_meta.aggregator.CalculateAlignedBucketLeft(from));
+ retrieve_end_ts = std::max(retrieve_end_ts, ds_meta.aggregator.CalculateAlignedBucketRight(to) - 1);
+ }
+ }
+
+ // Retrieve samples needed for downstream recalculation
+ std::vector<TSSample> retrieved_samples;
+ if (has_chunk) {
+ TSRangeOption range_option;
+ range_option.start_ts = retrieve_start_ts;
+ range_option.end_ts = retrieve_end_ts;
+ s = rangeCommon(ctx, ns_key, metadata, range_option, &retrieved_samples, true);
+ if (!s.ok()) return s;
+ }
+
+ // Process each downstream rule
+ for (size_t i = 0; i < ds_keys.size(); i++) {
+ auto &ds_meta = ds_metas[i];
+ auto &agg = ds_meta.aggregator;
+
+ TimeSeriesMetadata meta;
+ auto ds_ns_key = AppendNamespacePrefix(downstreamKeyFromInternalKey(ds_keys[i]));
+ s = getTimeSeriesMetadata(ctx, ds_ns_key, &meta);
+ if (!s.ok()) return s;
+
+ // Calculate the range of buckets affected by this deletion.
+ uint64_t start_bucket = agg.CalculateAlignedBucketLeft(from);
+ uint64_t end_bucket = agg.CalculateAlignedBucketLeft(to);
+ CHECK(start_bucket <= ds_meta.latest_bucket_idx);
+
+ std::vector<TSSample> new_samples; // To store re-aggregated boundary buckets.
+
+ // Recalculate the start bucket.
+ auto start_span = agg.GetBucketByTimestamp(retrieved_samples, start_bucket);
+ // If start_span is empty, the entire bucket will be deleted. Otherwise, it's re-aggregated,
+ // and the deletion starts from the next bucket.
+ uint64_t del_start = start_span.empty() ? start_bucket : start_bucket + 1;
+ if (!start_span.empty() && start_bucket < ds_meta.latest_bucket_idx) {
+ new_samples.push_back({start_bucket, agg.AggregateSamplesValue(start_span)});
+ }
+
+ // Recalculate the end bucket.
+ auto end_span = (start_bucket == end_bucket) ? start_span : agg.GetBucketByTimestamp(retrieved_samples, end_bucket);
+ // If end_span is empty, the bucket is included in the deletion. Otherwise, it's re-aggregated
+ // and excluded from deletion.
+ bool inclusive_end = end_span.empty();
+ if (!end_span.empty() && end_bucket < ds_meta.latest_bucket_idx && start_bucket != end_bucket) {
+ new_samples.push_back({end_bucket, agg.AggregateSamplesValue(end_span)});
+ }
+
+ // Update recalculated buckets
+ auto sample_batch = SampleBatch(std::move(new_samples), DuplicatePolicy::LAST);
+ s = upsertCommonInBatch(ctx, ds_ns_key, meta, sample_batch, batch);
+ if (!s.ok()) return s;
+
+ // Delete affected buckets in downstream
+ uint64_t deleted = 0;
+ s = delRangeCommonInBatch(ctx, ds_ns_key, meta, del_start, end_bucket, batch, &deleted, inclusive_end);
+ if (!s.ok()) return s;
+
+ // Update latest bucket if deletion affects the end
+ if (end_bucket < ds_meta.latest_bucket_idx) continue;
+
+ if (!has_chunk) {
+ ds_meta.latest_bucket_idx = 0;
+ } else if (to > last_chunk_end) {
+ ds_meta.latest_bucket_idx = agg.CalculateAlignedBucketLeft(last_chunk_end);
+ }
+
+ // Reaggregate latest bucket if needed
+ ds_meta.ResetAuxs();
+ if (has_chunk && last_chunk_start > 0) {
+ auto span = agg.GetBucketByTimestamp(retrieved_samples, ds_meta.latest_bucket_idx, last_chunk_start - 1);
+ ds_meta.AggregateLatestBucket(span);
+ }
+
+ // Persist downstream metadata updates if needed
+ std::string bytes;
+ ds_meta.Encode(&bytes);
+ batch->Put(ds_keys[i], bytes);
+ }
+
+ return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
+}
+
rocksdb::Status TimeSeries::createLabelIndexInBatch(const Slice &ns_key, const TimeSeriesMetadata &metadata,
ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch,
const LabelKVList &labels) {
@@ -1841,4 +2072,41 @@
return rocksdb::Status::OK();
}
+rocksdb::Status TimeSeries::Del(engine::Context &ctx, const Slice &user_key, uint64_t from, uint64_t to,
+ uint64_t *deleted) {
+ std::string ns_key = AppendNamespacePrefix(user_key);
+ TimeSeriesMetadata metadata(false);
+ rocksdb::Status s = getTimeSeriesMetadata(ctx, ns_key, &metadata);
+ if (!s.ok()) return s;
+
+ // Get downstream rules
+ std::vector<std::string> ds_keys;
+ std::vector<TSDownStreamMeta> ds_metas;
+ s = getDownStreamRules(ctx, ns_key, metadata, &ds_keys, &ds_metas);
+ if (!s.ok()) return s;
+
+ // Check retention and compaction rules
+ std::vector<TSSample> get_samples;
+ s = getCommon(ctx, ns_key, metadata, true, &get_samples);
+ if (!s.ok()) return s;
+ if (get_samples.empty()) return rocksdb::Status::OK();
+ uint64_t last_ts = get_samples.back().ts;
+ uint64_t retention_bound =
+ (metadata.retention_time > 0 && metadata.retention_time < last_ts) ? last_ts - metadata.retention_time : 0;
+ for (const auto &ds_meta : ds_metas) {
+ const auto &agg = ds_meta.aggregator;
+ if (agg.CalculateAlignedBucketLeft(from) < retention_bound) {
+ return rocksdb::Status::InvalidArgument(
+ "When a series has compactions, deleting samples or compaction buckets beyond the series retention period is "
+ "not possible");
+ }
+ }
+
+ s = delRangeCommon(ctx, ns_key, metadata, from, to, deleted);
+ if (!s.ok()) return s;
+ if (*deleted == 0) return rocksdb::Status::OK();
+ s = delRangeDownStream(ctx, ns_key, metadata, ds_keys, ds_metas, from, to);
+ return s;
+}
+
} // namespace redis
diff --git a/src/types/redis_timeseries.h b/src/types/redis_timeseries.h
index 5531235..cf5a031 100644
--- a/src/types/redis_timeseries.h
+++ b/src/types/redis_timeseries.h
@@ -80,8 +80,9 @@
// Splits the given samples into buckets.
std::vector<nonstd::span<const TSSample>> SplitSamplesToBuckets(nonstd::span<const TSSample> samples) const;
- // Returns the samples in the bucket that contains the given timestamp.
- nonstd::span<const TSSample> GetBucketByTimestamp(nonstd::span<const TSSample> samples, uint64_t ts) const;
+ // Returns the samples earlier than `less_than` in the bucket that contains `ts`.
+ nonstd::span<const TSSample> GetBucketByTimestamp(nonstd::span<const TSSample> samples, uint64_t ts,
+ uint64_t less_than = TSSample::MAX_TIMESTAMP) const;
// Calculates the aggregated value of the given samples according to the aggregator type
double AggregateSamplesValue(nonstd::span<const TSSample> samples) const;
@@ -270,6 +271,7 @@
rocksdb::Status MRange(engine::Context &ctx, const TSMRangeOption &option, std::vector<TSMRangeResult> *res);
rocksdb::Status IncrBy(engine::Context &ctx, const Slice &user_key, TSSample sample, const TSCreateOption &option,
AddResult *res);
+ rocksdb::Status Del(engine::Context &ctx, const Slice &user_key, uint64_t from, uint64_t to, uint64_t *deleted);
private:
rocksdb::ColumnFamilyHandle *index_cf_handle_;
@@ -283,12 +285,23 @@
LabelKVList *labels);
rocksdb::Status upsertCommon(engine::Context &ctx, const Slice &ns_key, TimeSeriesMetadata &metadata,
SampleBatch &sample_batch, std::vector<std::string> *new_chunks = nullptr);
+ rocksdb::Status upsertCommonInBatch(engine::Context &ctx, const Slice &ns_key, TimeSeriesMetadata &metadata,
+ SampleBatch &sample_batch, ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch,
+ std::vector<std::string> *new_chunks = nullptr);
rocksdb::Status rangeCommon(engine::Context &ctx, const Slice &ns_key, const TimeSeriesMetadata &metadata,
const TSRangeOption &option, std::vector<TSSample> *res, bool apply_retention = true);
rocksdb::Status upsertDownStream(engine::Context &ctx, const Slice &ns_key, const TimeSeriesMetadata &metadata,
const std::vector<std::string> &new_chunks, SampleBatch &sample_batch);
rocksdb::Status getCommon(engine::Context &ctx, const Slice &ns_key, const TimeSeriesMetadata &metadata,
bool is_return_latest, std::vector<TSSample> *res);
+ rocksdb::Status delRangeCommon(engine::Context &ctx, const Slice &ns_key, TimeSeriesMetadata &metadata, uint64_t from,
+ uint64_t to, uint64_t *deleted, bool inclusive_to = true);
+ rocksdb::Status delRangeCommonInBatch(engine::Context &ctx, const Slice &ns_key, TimeSeriesMetadata &metadata,
+ uint64_t from, uint64_t to, ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch,
+ uint64_t *deleted, bool inclusive_to = true);
+ rocksdb::Status delRangeDownStream(engine::Context &ctx, const Slice &ns_key, TimeSeriesMetadata &metadata,
+ std::vector<std::string> &ds_keys, std::vector<TSDownStreamMeta> &ds_metas,
+ uint64_t from, uint64_t to);
rocksdb::Status createLabelIndexInBatch(const Slice &ns_key, const TimeSeriesMetadata &metadata,
ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch,
const LabelKVList &labels);
diff --git a/src/types/timeseries.cc b/src/types/timeseries.cc
index 950f856..e76a69e 100644
--- a/src/types/timeseries.cc
+++ b/src/types/timeseries.cc
@@ -478,24 +478,39 @@
return res;
}
-std::string UncompTSChunk::RemoveSamplesBetween(uint64_t from, uint64_t to) const {
+std::string TSChunk::RemoveSamplesBetween(uint64_t from, uint64_t to, uint64_t* deleted, bool inclusive_to) const {
+ uint64_t temp = 0;
+ if (deleted == nullptr) deleted = &temp;
+ return doRemoveSamplesBetween(from, to, deleted, inclusive_to);
+}
+
+std::string UncompTSChunk::doRemoveSamplesBetween(uint64_t from, uint64_t to, uint64_t* deleted,
+ bool inclusive_to) const {
if (from > to) {
+ *deleted = 0;
return "";
}
// Find the range of samples to delete using binary search
auto start_it = std::lower_bound(samples_.begin(), samples_.end(), TSSample{from, 0.0});
if (start_it == samples_.end()) {
+ *deleted = 0;
return "";
}
- auto end_it = std::upper_bound(samples_.begin(), samples_.end(), TSSample{to, 0.0});
+
+ auto end_it = inclusive_to ? std::upper_bound(start_it, samples_.end(), TSSample{to, 0.0})
+ : std::lower_bound(start_it, samples_.end(), TSSample{to, 0.0});
size_t start_idx = std::distance(samples_.begin(), start_it);
size_t end_idx = std::distance(samples_.begin(), end_it);
+ *deleted = end_idx - start_idx;
+ if (*deleted == 0) {
+ return "";
+ }
// Calculate buffer size: header + remaining samples
const size_t header_size = TSChunk::MetaData::kEncodedSize;
- const size_t remaining_count = metadata_.count - (end_idx - start_idx);
+ const size_t remaining_count = metadata_.count - *deleted;
const size_t required_size = header_size + remaining_count * sizeof(TSSample);
// Prepare new buffer
diff --git a/src/types/timeseries.h b/src/types/timeseries.h
index 224d43e..0734483 100644
--- a/src/types/timeseries.h
+++ b/src/types/timeseries.h
@@ -190,7 +190,8 @@
// Delete samples in [from, to] timestamp range
// Returns new chunk data without deleted samples. Returns empty string if no changes
- virtual std::string RemoveSamplesBetween(uint64_t from, uint64_t to) const = 0;
+ std::string RemoveSamplesBetween(uint64_t from, uint64_t to, uint64_t* deleted = nullptr,
+ bool inclusive_to = true) const;
// Update sample value at specified timestamp
// is_add_on controls whether to add to existing value or replace it
@@ -206,6 +207,9 @@
protected:
nonstd::span<const char> data_;
MetaData metadata_;
+
+ virtual std::string doRemoveSamplesBetween(uint64_t from, uint64_t to, uint64_t* deleted,
+ bool inclusive_to) const = 0;
};
class UncompTSChunk : public TSChunk {
@@ -219,12 +223,14 @@
std::string UpsertSamples(SampleBatchSlice samples) const override;
std::vector<std::string> UpsertSampleAndSplit(SampleBatchSlice batch, uint64_t preferred_chunk_size,
bool is_fix_split_mode) const override;
- std::string RemoveSamplesBetween(uint64_t from, uint64_t to) const override;
std::string UpdateSampleValue(uint64_t ts, double value, bool is_add_on) const override;
TSSample GetLatestSample(uint32_t idx) const override;
nonstd::span<const TSSample> GetSamplesSpan() const override { return samples_; }
+ protected:
+ std::string doRemoveSamplesBetween(uint64_t from, uint64_t to, uint64_t* deleted, bool inclusive_to) const override;
+
private:
nonstd::span<const TSSample> samples_;
};
diff --git a/tests/cppunit/types/timeseries_test.cc b/tests/cppunit/types/timeseries_test.cc
index fbaf042..c448bc8 100644
--- a/tests/cppunit/types/timeseries_test.cc
+++ b/tests/cppunit/types/timeseries_test.cc
@@ -982,3 +982,92 @@
EXPECT_TRUE(actual.empty());
}
}
+
+TEST_F(TimeSeriesTest, DelComprehensive) {
+ using TSCreateOption = redis::TSCreateOption;
+ using TSRangeOption = redis::TSRangeOption;
+ // Create time series
+ auto s = ts_db_->Create(*ctx_, "test1", TSCreateOption());
+ EXPECT_TRUE(s.ok());
+ s = ts_db_->Create(*ctx_, "test2", TSCreateOption());
+ EXPECT_TRUE(s.ok());
+ s = ts_db_->Create(*ctx_, "test3", TSCreateOption());
+ EXPECT_TRUE(s.ok());
+ s = ts_db_->Create(*ctx_, "test4", TSCreateOption());
+ EXPECT_TRUE(s.ok());
+
+ // Create rules
+ redis::TSAggregator aggregator;
+ redis::TSCreateRuleResult res = redis::TSCreateRuleResult::kOK;
+
+ aggregator.type = redis::TSAggregatorType::SUM;
+ aggregator.bucket_duration = 10;
+ s = ts_db_->CreateRule(*ctx_, "test1", "test2", aggregator, &res);
+ EXPECT_TRUE(s.ok());
+ aggregator.bucket_duration = 200;
+ s = ts_db_->CreateRule(*ctx_, "test1", "test4", aggregator, &res);
+ EXPECT_TRUE(s.ok());
+ aggregator.bucket_duration = 20;
+ aggregator.alignment = 10;
+ s = ts_db_->CreateRule(*ctx_, "test1", "test3", aggregator, &res);
+ EXPECT_TRUE(s.ok());
+
+ // Add samples
+ std::vector<TSSample> samples = {{1, 1}, {2, 2}, {11, 11}, {15, 15}, {16, 16}, {21, 21},
+ {24, 24}, {31, 31}, {35, 35}, {39, 39}, {42, 42}, {49, 49}};
+ std::vector<TSChunk::AddResult> results;
+ results.resize(samples.size());
+ s = ts_db_->MAdd(*ctx_, "test1", samples, &results);
+ EXPECT_TRUE(s.ok());
+
+ // Delete samples between timestamps 10 and 40
+ uint64_t deleted = 0;
+ s = ts_db_->Del(*ctx_, "test1", 12, 40, &deleted);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(deleted, 7);
+
+ // Validate downstream ranges
+ std::vector<std::string> keys = {"test2", "test3", "test4"};
+ auto check = [&](const std::vector<std::vector<TSSample>> &expected_samples) {
+ for (size_t i = 0; i < keys.size(); ++i) {
+ std::vector<TSSample> res;
+ TSRangeOption option;
+ option.start_ts = 0;
+ option.end_ts = TSSample::MAX_TIMESTAMP;
+ s = ts_db_->Range(*ctx_, keys[i], option, &res);
+ EXPECT_TRUE(s.ok());
+ EXPECT_EQ(res, expected_samples[i]);
+ }
+ };
+ std::vector<std::vector<TSSample>> expected_samples = {
+ {{0, 3}, {10, 11}}, // test2
+ {{0, 3}, {10, 11}}, // test3
+ {} // test4
+ };
+ check(expected_samples);
+
+ // Add new samples
+ TSSample new_sample{50, 50};
+ TSChunk::AddResult add_result;
+ s = ts_db_->Add(*ctx_, "test1", new_sample, TSCreateOption(), &add_result);
+ EXPECT_TRUE(s.ok());
+ // Validate updated ranges
+ expected_samples = {
+ {{0, 3}, {10, 11}, {40, 91}}, // test2
+ {{0, 3}, {10, 11}, {30, 91}}, // test3
+ {} // test4
+ };
+ check(expected_samples);
+
+ // Add final sample
+ TSSample final_sample{200, 200};
+ s = ts_db_->Add(*ctx_, "test1", final_sample, TSCreateOption(), &add_result);
+ EXPECT_TRUE(s.ok());
+ // Validate final ranges
+ expected_samples = {
+ {{0, 3}, {10, 11}, {40, 91}, {50, 50}}, // test2
+ {{0, 3}, {10, 11}, {30, 91}, {50, 50}}, // test3
+ {{0, 155}} // test4
+ };
+ check(expected_samples);
+}
diff --git a/tests/gocase/unit/type/timeseries/timeseries_test.go b/tests/gocase/unit/type/timeseries/timeseries_test.go
index 7ca217f..8af9e2b 100644
--- a/tests/gocase/unit/type/timeseries/timeseries_test.go
+++ b/tests/gocase/unit/type/timeseries/timeseries_test.go
@@ -879,4 +879,32 @@
require.Equal(t, 1, len(res))
require.Equal(t, []interface{}{int64(1657811829000), 389.0}, res[0])
})
+
+ t.Run("TS.Del Test", func(t *testing.T) {
+ srcKey := "del_test_src"
+ dstKey := "del_test_dst"
+ // Create source key with retention=10
+ require.NoError(t, rdb.Do(ctx, "ts.create", srcKey, "retention", "10").Err())
+ // Create destination key
+ require.NoError(t, rdb.Do(ctx, "ts.create", dstKey).Err())
+
+ // Test: Create rule successfully
+ require.NoError(t, rdb.Do(ctx, "ts.createrule", srcKey, dstKey, "aggregation", "sum", "10").Err())
+
+ // Test: Add samples
+ res := rdb.Do(ctx, "ts.madd", srcKey, "5", "5", srcKey, "8", "8", srcKey, "12", "12", srcKey, "13", "13", srcKey, "15", "15").Val().([]interface{})
+ assert.Equal(t, []interface{}{int64(5), int64(8), int64(12), int64(13), int64(15)}, res)
+
+ // Test: Delete samples within retention period
+ deletedCount := rdb.Do(ctx, "ts.del", srcKey, "11", "14").Val().(int64)
+ assert.Equal(t, int64(2), deletedCount) // Deletes 12 and 13
+
+ // Test: Try delete samples beyond retention period
+ _, err := rdb.Do(ctx, "ts.del", srcKey, "5", "8").Result()
+ require.ErrorContains(t, err, "When a series has compactions, deleting samples or compaction buckets beyond the series retention period is not possible")
+
+ // Test: Try delete all samples with range
+ _, err = rdb.Do(ctx, "ts.del", srcKey, "-", "+").Result()
+ require.ErrorContains(t, err, "When a series has compactions, deleting samples or compaction buckets beyond the series retention period is not possible")
+ })
}