feat(ts):  Add support for deleting range of data and `TS.DEL` command (#3174)

```
127.0.0.1:6666> TS.CREATE temp:JLM LABELS type temp location JLM
127.0.0.1:6666> TS.MADD temp:JLM 1005 30 temp:JLM 1015 35 temp:JLM 1025 9999 temp:JLM 1035 40
127.0.0.1:6666> TS.DEL temp:JLM 1000 1030
(integer) 3
127.0.0.1:6666> TS.DEL temp:JLM - +
(integer) 1
```
diff --git a/src/commands/cmd_timeseries.cc b/src/commands/cmd_timeseries.cc
index 8c1fda7..1726ebd 100644
--- a/src/commands/cmd_timeseries.cc
+++ b/src/commands/cmd_timeseries.cc
@@ -1039,6 +1039,53 @@
   double value_ = 0;
 };
 
+class CommandTSDel : public Commander {
+ public:
+  Status Parse(const std::vector<std::string> &args) override {
+    if (args.size() < 4) {
+      return {Status::RedisParseErr, "wrong number of arguments for 'ts.del' command"};
+    }
+    CommandParser parser(args, 2);
+    // Parse start timestamp
+    auto start_parse = parser.TakeInt<uint64_t>();
+    if (!start_parse.IsOK()) {
+      auto start_ts_str = parser.TakeStr();
+      if (!start_ts_str.IsOK() || start_ts_str.GetValue() != "-") {
+        return {Status::RedisParseErr, "wrong fromTimestamp"};
+      }
+      // "-" means use default start timestamp: 0
+    } else {
+      start_ts_ = start_parse.GetValue();
+    }
+    // Parse end timestamp
+    auto end_parse = parser.TakeInt<uint64_t>();
+    if (!end_parse.IsOK()) {
+      auto end_ts_str = parser.TakeStr();
+      if (!end_ts_str.IsOK() || end_ts_str.GetValue() != "+") {
+        return {Status::RedisParseErr, "wrong toTimestamp"};
+      }
+      // "+" means use default end timestamp: MAX_TIMESTAMP
+    } else {
+      end_ts_ = end_parse.GetValue();
+    }
+    return Commander::Parse(args);
+  }
+  Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
+    auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
+    uint64_t deleted_count = 0;
+    auto s = timeseries_db.Del(ctx, args_[1], start_ts_, end_ts_, &deleted_count);
+    if (!s.ok()) {
+      return {Status::RedisExecErr, s.ToString()};
+    }
+    *output = redis::Integer(deleted_count);
+    return Status::OK();
+  }
+
+ private:
+  uint64_t start_ts_ = 0;
+  uint64_t end_ts_ = TSSample::MAX_TIMESTAMP;
+};
+
 REDIS_REGISTER_COMMANDS(Timeseries, MakeCmdAttr<CommandTSCreate>("ts.create", -2, "write", 1, 1, 1),
                         MakeCmdAttr<CommandTSAdd>("ts.add", -4, "write", 1, 1, 1),
                         MakeCmdAttr<CommandTSMAdd>("ts.madd", -4, "write", 1, -3, 1),
@@ -1049,6 +1096,7 @@
                         MakeCmdAttr<CommandTSMGet>("ts.mget", -3, "read-only", NO_KEY),
                         MakeCmdAttr<CommandTSMRange>("ts.mrange", -5, "read-only", NO_KEY),
                         MakeCmdAttr<CommandTSIncrByDecrBy>("ts.incrby", -3, "write", 1, 1, 1),
-                        MakeCmdAttr<CommandTSIncrByDecrBy>("ts.decrby", -3, "write", 1, 1, 1), );
+                        MakeCmdAttr<CommandTSIncrByDecrBy>("ts.decrby", -3, "write", 1, 1, 1),
+                        MakeCmdAttr<CommandTSDel>("ts.del", -4, "write", 1, 1, 1), );
 
 }  // namespace redis
diff --git a/src/types/redis_timeseries.cc b/src/types/redis_timeseries.cc
index b108cfb..540d582 100644
--- a/src/types/redis_timeseries.cc
+++ b/src/types/redis_timeseries.cc
@@ -320,6 +320,7 @@
 }
 
 void TSDownStreamMeta::AggregateLatestBucket(nonstd::span<const TSSample> samples) {
+  if (samples.empty()) return;
   double temp_v = 0.0;
   switch (aggregator.type) {
     case TSAggregatorType::SUM:
@@ -732,15 +733,18 @@
   return spans;
 }
 
-nonstd::span<const TSSample> TSAggregator::GetBucketByTimestamp(nonstd::span<const TSSample> samples,
-                                                                uint64_t ts) const {
+nonstd::span<const TSSample> TSAggregator::GetBucketByTimestamp(nonstd::span<const TSSample> samples, uint64_t ts,
+                                                                uint64_t less_than) const {
   if (type == TSAggregatorType::NONE || samples.empty()) {
     return {};
   }
   uint64_t start_bucket = CalculateAlignedBucketLeft(ts);
-  uint64_t end_bucket = CalculateAlignedBucketRight(ts);
+  uint64_t end_bucket = std::min(CalculateAlignedBucketRight(ts), less_than);
   auto lower = std::lower_bound(samples.begin(), samples.end(), TSSample{start_bucket, 0.0});
   auto upper = std::lower_bound(lower, samples.end(), TSSample{end_bucket, 0.0});
+  if (lower == upper) {
+    return {};
+  }
   return {lower, upper};
 }
 
@@ -830,8 +834,23 @@
 
 rocksdb::Status TimeSeries::upsertCommon(engine::Context &ctx, const Slice &ns_key, TimeSeriesMetadata &metadata,
                                          SampleBatch &sample_batch, std::vector<std::string> *new_chunks) {
+  auto batch = storage_->GetWriteBatchBase();
+  auto s = upsertCommonInBatch(ctx, ns_key, metadata, sample_batch, batch, new_chunks);
+  if (!s.ok()) return s;
+  return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
+}
+
+rocksdb::Status TimeSeries::upsertCommonInBatch(engine::Context &ctx, const Slice &ns_key, TimeSeriesMetadata &metadata,
+                                                SampleBatch &sample_batch,
+                                                ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch,
+                                                std::vector<std::string> *new_chunks) {
   auto all_batch_slice = sample_batch.AsSlice();
 
+  if (all_batch_slice.GetSampleSpan().empty() && new_chunks != nullptr) {
+    new_chunks->clear();
+    return rocksdb::Status::OK();
+  }
+
   // In the emun `TSSubkeyType`, `LABEL` is the next of `CHUNK`
   std::string chunk_upper_bound = internalKeyFromLabelKey(ns_key, metadata, "");
   std::string end_key = internalKeyFromChunkID(ns_key, metadata, TSSample::MAX_TIMESTAMP);
@@ -867,7 +886,6 @@
     return rocksdb::Status::OK();
   }
 
-  auto batch = storage_->GetWriteBatchBase();
   WriteBatchLogData log_data(kRedisTimeSeries);
   auto s = batch->PutLogData(log_data.Encode());
   if (!s.ok()) return s;
@@ -953,7 +971,7 @@
     }
   }
 
-  return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
+  return rocksdb::Status::OK();
 }
 
 rocksdb::Status TimeSeries::rangeCommon(engine::Context &ctx, const Slice &ns_key, const TimeSeriesMetadata &metadata,
@@ -1301,6 +1319,219 @@
   return rocksdb::Status::OK();
 }
 
+rocksdb::Status TimeSeries::delRangeCommon(engine::Context &ctx, const Slice &ns_key, TimeSeriesMetadata &metadata,
+                                           uint64_t from, uint64_t to, uint64_t *deleted, bool inclusive_to) {
+  auto batch = storage_->GetWriteBatchBase();
+  auto s = delRangeCommonInBatch(ctx, ns_key, metadata, from, to, batch, deleted, inclusive_to);
+  if (!s.ok()) return s;
+  return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
+}
+
+rocksdb::Status TimeSeries::delRangeCommonInBatch(engine::Context &ctx, const Slice &ns_key,
+                                                  TimeSeriesMetadata &metadata, uint64_t from, uint64_t to,
+                                                  ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch,
+                                                  uint64_t *deleted, bool inclusive_to) {
+  *deleted = 0;
+  if (from > to || (from == to && !inclusive_to)) {
+    return rocksdb::Status::OK();
+  }
+  // In the emun `TSSubkeyType`, `LABEL` is the next of `CHUNK`
+  std::string start_key = internalKeyFromChunkID(ns_key, metadata, from);
+  std::string prefix = start_key.substr(0, start_key.size() - sizeof(uint64_t));
+  std::string end_key;
+  if (to == TSSample::MAX_TIMESTAMP && inclusive_to) {
+    end_key = internalKeyFromLabelKey(ns_key, metadata, "");
+  } else if (inclusive_to) {
+    end_key = internalKeyFromChunkID(ns_key, metadata, to + 1);
+  } else {
+    end_key = internalKeyFromChunkID(ns_key, metadata, to);
+  }
+
+  uint64_t chunk_count = metadata.size;
+
+  rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
+  rocksdb::Slice upper_bound(end_key);
+  read_options.iterate_upper_bound = &upper_bound;
+  rocksdb::Slice lower_bound(prefix);
+  read_options.iterate_lower_bound = &lower_bound;
+
+  WriteBatchLogData log_data(kRedisTimeSeries);
+  auto s = batch->PutLogData(log_data.Encode());
+  if (!s.ok()) return s;
+
+  auto iter = util::UniqueIterator(ctx, read_options);
+  iter->SeekForPrev(start_key);
+  if (!iter->Valid()) {
+    iter->Seek(start_key);
+  } else if (!iter->key().starts_with(prefix)) {
+    iter->Next();
+  }
+  for (; iter->Valid() && iter->key().starts_with(prefix); iter->Next()) {
+    auto chunk = CreateTSChunkFromData(iter->value());
+    uint64_t deleted_temp = 0;
+    auto new_chunk_data = chunk->RemoveSamplesBetween(from, to, &deleted_temp, inclusive_to);
+    if (new_chunk_data.empty() || deleted_temp == 0) {
+      // No samples deleted
+      continue;
+    }
+    *deleted += deleted_temp;
+    auto new_chunk = CreateTSChunkFromData(new_chunk_data);
+    bool need_delete_old_key = false;
+    if (new_chunk->GetCount() == 0) {
+      // Delete the whole chunk
+      need_delete_old_key = true;
+      if (chunk_count > 0) chunk_count--;
+    } else {
+      auto new_key = internalKeyFromChunkID(ns_key, metadata, new_chunk->GetFirstTimestamp());
+      if (new_key != iter->key()) {
+        // Change the chunk key
+        need_delete_old_key = true;
+      }
+      s = batch->Put(new_key, new_chunk_data);
+      if (!s.ok()) return s;
+    }
+    if (need_delete_old_key) {
+      s = batch->Delete(iter->key());
+      if (!s.ok()) return s;
+    }
+  }
+  if (chunk_count != metadata.size) {
+    metadata.size = chunk_count;
+    std::string bytes;
+    metadata.Encode(&bytes);
+    s = batch->Put(metadata_cf_handle_, ns_key, bytes);
+    if (!s.ok()) return s;
+  }
+
+  return rocksdb::Status::OK();
+}
+
+rocksdb::Status TimeSeries::delRangeDownStream(engine::Context &ctx, const Slice &ns_key, TimeSeriesMetadata &metadata,
+                                               std::vector<std::string> &ds_keys,
+                                               std::vector<TSDownStreamMeta> &ds_metas, uint64_t from, uint64_t to) {
+  if (from > to || ds_keys.empty()) return rocksdb::Status::OK();
+
+  auto batch = storage_->GetWriteBatchBase();
+  WriteBatchLogData log_data(kRedisTimeSeries);
+  auto s = batch->PutLogData(log_data.Encode());
+  if (!s.ok()) return s;
+
+  // Calculate key boundaries for latest chunk retrieval
+  std::string chunk_upper_bound = internalKeyFromLabelKey(ns_key, metadata, "");
+  std::string end_key = internalKeyFromChunkID(ns_key, metadata, TSSample::MAX_TIMESTAMP);
+  std::string prefix = end_key.substr(0, end_key.size() - sizeof(uint64_t));
+  // Configure read options for reverse iteration
+  rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
+  rocksdb::Slice upper_bound(chunk_upper_bound);
+  read_options.iterate_upper_bound = &upper_bound;
+  rocksdb::Slice lower_bound(prefix);
+  read_options.iterate_lower_bound = &lower_bound;
+  // Retrieve the latest chunk for boundary calculations
+  auto iter = util::UniqueIterator(ctx, read_options);
+  iter->SeekForPrev(end_key);
+
+  // If no chunks found
+  uint64_t last_chunk_start = 0;
+  uint64_t last_chunk_end = 0;
+  // Check if any chunks exist for the source time series.
+  bool has_chunk = iter->Valid() && iter->key().starts_with(prefix);
+  if (has_chunk) {
+    auto last_chunk = CreateTSChunkFromData(iter->value());
+    last_chunk_start = last_chunk->GetFirstTimestamp();
+    last_chunk_end = last_chunk->GetLastTimestamp();
+  }
+  iter->Reset();  // Release iterator resources
+
+  // Determine global time range needed for sample retrieval
+  uint64_t retrieve_start_ts = TSSample::MAX_TIMESTAMP;
+  uint64_t retrieve_end_ts = 0;
+  if (has_chunk) {
+    for (const auto &ds_meta : ds_metas) {
+      retrieve_start_ts = std::min(retrieve_start_ts, ds_meta.aggregator.CalculateAlignedBucketLeft(from));
+      retrieve_end_ts = std::max(retrieve_end_ts, ds_meta.aggregator.CalculateAlignedBucketRight(to) - 1);
+    }
+  }
+
+  // Retrieve samples needed for downstream recalculation
+  std::vector<TSSample> retrieved_samples;
+  if (has_chunk) {
+    TSRangeOption range_option;
+    range_option.start_ts = retrieve_start_ts;
+    range_option.end_ts = retrieve_end_ts;
+    s = rangeCommon(ctx, ns_key, metadata, range_option, &retrieved_samples, true);
+    if (!s.ok()) return s;
+  }
+
+  // Process each downstream rule
+  for (size_t i = 0; i < ds_keys.size(); i++) {
+    auto &ds_meta = ds_metas[i];
+    auto &agg = ds_meta.aggregator;
+
+    TimeSeriesMetadata meta;
+    auto ds_ns_key = AppendNamespacePrefix(downstreamKeyFromInternalKey(ds_keys[i]));
+    s = getTimeSeriesMetadata(ctx, ds_ns_key, &meta);
+    if (!s.ok()) return s;
+
+    // Calculate the range of buckets affected by this deletion.
+    uint64_t start_bucket = agg.CalculateAlignedBucketLeft(from);
+    uint64_t end_bucket = agg.CalculateAlignedBucketLeft(to);
+    CHECK(start_bucket <= ds_meta.latest_bucket_idx);
+
+    std::vector<TSSample> new_samples;  // To store re-aggregated boundary buckets.
+
+    // Recalculate the start bucket.
+    auto start_span = agg.GetBucketByTimestamp(retrieved_samples, start_bucket);
+    // If start_span is empty, the entire bucket will be deleted. Otherwise, it's re-aggregated,
+    // and the deletion starts from the next bucket.
+    uint64_t del_start = start_span.empty() ? start_bucket : start_bucket + 1;
+    if (!start_span.empty() && start_bucket < ds_meta.latest_bucket_idx) {
+      new_samples.push_back({start_bucket, agg.AggregateSamplesValue(start_span)});
+    }
+
+    // Recalculate the end bucket.
+    auto end_span = (start_bucket == end_bucket) ? start_span : agg.GetBucketByTimestamp(retrieved_samples, end_bucket);
+    // If end_span is empty, the bucket is included in the deletion. Otherwise, it's re-aggregated
+    // and excluded from deletion.
+    bool inclusive_end = end_span.empty();
+    if (!end_span.empty() && end_bucket < ds_meta.latest_bucket_idx && start_bucket != end_bucket) {
+      new_samples.push_back({end_bucket, agg.AggregateSamplesValue(end_span)});
+    }
+
+    // Update recalculated buckets
+    auto sample_batch = SampleBatch(std::move(new_samples), DuplicatePolicy::LAST);
+    s = upsertCommonInBatch(ctx, ds_ns_key, meta, sample_batch, batch);
+    if (!s.ok()) return s;
+
+    // Delete affected buckets in downstream
+    uint64_t deleted = 0;
+    s = delRangeCommonInBatch(ctx, ds_ns_key, meta, del_start, end_bucket, batch, &deleted, inclusive_end);
+    if (!s.ok()) return s;
+
+    // Update latest bucket if deletion affects the end
+    if (end_bucket < ds_meta.latest_bucket_idx) continue;
+
+    if (!has_chunk) {
+      ds_meta.latest_bucket_idx = 0;
+    } else if (to > last_chunk_end) {
+      ds_meta.latest_bucket_idx = agg.CalculateAlignedBucketLeft(last_chunk_end);
+    }
+
+    // Reaggregate latest bucket if needed
+    ds_meta.ResetAuxs();
+    if (has_chunk && last_chunk_start > 0) {
+      auto span = agg.GetBucketByTimestamp(retrieved_samples, ds_meta.latest_bucket_idx, last_chunk_start - 1);
+      ds_meta.AggregateLatestBucket(span);
+    }
+
+    // Persist downstream metadata updates if needed
+    std::string bytes;
+    ds_meta.Encode(&bytes);
+    batch->Put(ds_keys[i], bytes);
+  }
+
+  return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
+}
+
 rocksdb::Status TimeSeries::createLabelIndexInBatch(const Slice &ns_key, const TimeSeriesMetadata &metadata,
                                                     ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch,
                                                     const LabelKVList &labels) {
@@ -1841,4 +2072,41 @@
   return rocksdb::Status::OK();
 }
 
+rocksdb::Status TimeSeries::Del(engine::Context &ctx, const Slice &user_key, uint64_t from, uint64_t to,
+                                uint64_t *deleted) {
+  std::string ns_key = AppendNamespacePrefix(user_key);
+  TimeSeriesMetadata metadata(false);
+  rocksdb::Status s = getTimeSeriesMetadata(ctx, ns_key, &metadata);
+  if (!s.ok()) return s;
+
+  // Get downstream rules
+  std::vector<std::string> ds_keys;
+  std::vector<TSDownStreamMeta> ds_metas;
+  s = getDownStreamRules(ctx, ns_key, metadata, &ds_keys, &ds_metas);
+  if (!s.ok()) return s;
+
+  // Check retention and compaction rules
+  std::vector<TSSample> get_samples;
+  s = getCommon(ctx, ns_key, metadata, true, &get_samples);
+  if (!s.ok()) return s;
+  if (get_samples.empty()) return rocksdb::Status::OK();
+  uint64_t last_ts = get_samples.back().ts;
+  uint64_t retention_bound =
+      (metadata.retention_time > 0 && metadata.retention_time < last_ts) ? last_ts - metadata.retention_time : 0;
+  for (const auto &ds_meta : ds_metas) {
+    const auto &agg = ds_meta.aggregator;
+    if (agg.CalculateAlignedBucketLeft(from) < retention_bound) {
+      return rocksdb::Status::InvalidArgument(
+          "When a series has compactions, deleting samples or compaction buckets beyond the series retention period is "
+          "not possible");
+    }
+  }
+
+  s = delRangeCommon(ctx, ns_key, metadata, from, to, deleted);
+  if (!s.ok()) return s;
+  if (*deleted == 0) return rocksdb::Status::OK();
+  s = delRangeDownStream(ctx, ns_key, metadata, ds_keys, ds_metas, from, to);
+  return s;
+}
+
 }  // namespace redis
diff --git a/src/types/redis_timeseries.h b/src/types/redis_timeseries.h
index 5531235..cf5a031 100644
--- a/src/types/redis_timeseries.h
+++ b/src/types/redis_timeseries.h
@@ -80,8 +80,9 @@
   // Splits the given samples into buckets.
   std::vector<nonstd::span<const TSSample>> SplitSamplesToBuckets(nonstd::span<const TSSample> samples) const;
 
-  // Returns the samples in the bucket that contains the given timestamp.
-  nonstd::span<const TSSample> GetBucketByTimestamp(nonstd::span<const TSSample> samples, uint64_t ts) const;
+  // Returns the samples earlier than `less_than` in the bucket that contains `ts`.
+  nonstd::span<const TSSample> GetBucketByTimestamp(nonstd::span<const TSSample> samples, uint64_t ts,
+                                                    uint64_t less_than = TSSample::MAX_TIMESTAMP) const;
 
   // Calculates the aggregated value of the given samples according to the aggregator type
   double AggregateSamplesValue(nonstd::span<const TSSample> samples) const;
@@ -270,6 +271,7 @@
   rocksdb::Status MRange(engine::Context &ctx, const TSMRangeOption &option, std::vector<TSMRangeResult> *res);
   rocksdb::Status IncrBy(engine::Context &ctx, const Slice &user_key, TSSample sample, const TSCreateOption &option,
                          AddResult *res);
+  rocksdb::Status Del(engine::Context &ctx, const Slice &user_key, uint64_t from, uint64_t to, uint64_t *deleted);
 
  private:
   rocksdb::ColumnFamilyHandle *index_cf_handle_;
@@ -283,12 +285,23 @@
                                  LabelKVList *labels);
   rocksdb::Status upsertCommon(engine::Context &ctx, const Slice &ns_key, TimeSeriesMetadata &metadata,
                                SampleBatch &sample_batch, std::vector<std::string> *new_chunks = nullptr);
+  rocksdb::Status upsertCommonInBatch(engine::Context &ctx, const Slice &ns_key, TimeSeriesMetadata &metadata,
+                                      SampleBatch &sample_batch, ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch,
+                                      std::vector<std::string> *new_chunks = nullptr);
   rocksdb::Status rangeCommon(engine::Context &ctx, const Slice &ns_key, const TimeSeriesMetadata &metadata,
                               const TSRangeOption &option, std::vector<TSSample> *res, bool apply_retention = true);
   rocksdb::Status upsertDownStream(engine::Context &ctx, const Slice &ns_key, const TimeSeriesMetadata &metadata,
                                    const std::vector<std::string> &new_chunks, SampleBatch &sample_batch);
   rocksdb::Status getCommon(engine::Context &ctx, const Slice &ns_key, const TimeSeriesMetadata &metadata,
                             bool is_return_latest, std::vector<TSSample> *res);
+  rocksdb::Status delRangeCommon(engine::Context &ctx, const Slice &ns_key, TimeSeriesMetadata &metadata, uint64_t from,
+                                 uint64_t to, uint64_t *deleted, bool inclusive_to = true);
+  rocksdb::Status delRangeCommonInBatch(engine::Context &ctx, const Slice &ns_key, TimeSeriesMetadata &metadata,
+                                        uint64_t from, uint64_t to, ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch,
+                                        uint64_t *deleted, bool inclusive_to = true);
+  rocksdb::Status delRangeDownStream(engine::Context &ctx, const Slice &ns_key, TimeSeriesMetadata &metadata,
+                                     std::vector<std::string> &ds_keys, std::vector<TSDownStreamMeta> &ds_metas,
+                                     uint64_t from, uint64_t to);
   rocksdb::Status createLabelIndexInBatch(const Slice &ns_key, const TimeSeriesMetadata &metadata,
                                           ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch,
                                           const LabelKVList &labels);
diff --git a/src/types/timeseries.cc b/src/types/timeseries.cc
index 950f856..e76a69e 100644
--- a/src/types/timeseries.cc
+++ b/src/types/timeseries.cc
@@ -478,24 +478,39 @@
   return res;
 }
 
-std::string UncompTSChunk::RemoveSamplesBetween(uint64_t from, uint64_t to) const {
+std::string TSChunk::RemoveSamplesBetween(uint64_t from, uint64_t to, uint64_t* deleted, bool inclusive_to) const {
+  uint64_t temp = 0;
+  if (deleted == nullptr) deleted = &temp;
+  return doRemoveSamplesBetween(from, to, deleted, inclusive_to);
+}
+
+std::string UncompTSChunk::doRemoveSamplesBetween(uint64_t from, uint64_t to, uint64_t* deleted,
+                                                  bool inclusive_to) const {
   if (from > to) {
+    *deleted = 0;
     return "";
   }
 
   // Find the range of samples to delete using binary search
   auto start_it = std::lower_bound(samples_.begin(), samples_.end(), TSSample{from, 0.0});
   if (start_it == samples_.end()) {
+    *deleted = 0;
     return "";
   }
-  auto end_it = std::upper_bound(samples_.begin(), samples_.end(), TSSample{to, 0.0});
+
+  auto end_it = inclusive_to ? std::upper_bound(start_it, samples_.end(), TSSample{to, 0.0})
+                             : std::lower_bound(start_it, samples_.end(), TSSample{to, 0.0});
 
   size_t start_idx = std::distance(samples_.begin(), start_it);
   size_t end_idx = std::distance(samples_.begin(), end_it);
 
+  *deleted = end_idx - start_idx;
+  if (*deleted == 0) {
+    return "";
+  }
   // Calculate buffer size: header + remaining samples
   const size_t header_size = TSChunk::MetaData::kEncodedSize;
-  const size_t remaining_count = metadata_.count - (end_idx - start_idx);
+  const size_t remaining_count = metadata_.count - *deleted;
   const size_t required_size = header_size + remaining_count * sizeof(TSSample);
 
   // Prepare new buffer
diff --git a/src/types/timeseries.h b/src/types/timeseries.h
index 224d43e..0734483 100644
--- a/src/types/timeseries.h
+++ b/src/types/timeseries.h
@@ -190,7 +190,8 @@
 
   // Delete samples in [from, to] timestamp range
   // Returns new chunk data without deleted samples. Returns empty string if no changes
-  virtual std::string RemoveSamplesBetween(uint64_t from, uint64_t to) const = 0;
+  std::string RemoveSamplesBetween(uint64_t from, uint64_t to, uint64_t* deleted = nullptr,
+                                   bool inclusive_to = true) const;
 
   // Update sample value at specified timestamp
   // is_add_on controls whether to add to existing value or replace it
@@ -206,6 +207,9 @@
  protected:
   nonstd::span<const char> data_;
   MetaData metadata_;
+
+  virtual std::string doRemoveSamplesBetween(uint64_t from, uint64_t to, uint64_t* deleted,
+                                             bool inclusive_to) const = 0;
 };
 
 class UncompTSChunk : public TSChunk {
@@ -219,12 +223,14 @@
   std::string UpsertSamples(SampleBatchSlice samples) const override;
   std::vector<std::string> UpsertSampleAndSplit(SampleBatchSlice batch, uint64_t preferred_chunk_size,
                                                 bool is_fix_split_mode) const override;
-  std::string RemoveSamplesBetween(uint64_t from, uint64_t to) const override;
   std::string UpdateSampleValue(uint64_t ts, double value, bool is_add_on) const override;
   TSSample GetLatestSample(uint32_t idx) const override;
 
   nonstd::span<const TSSample> GetSamplesSpan() const override { return samples_; }
 
+ protected:
+  std::string doRemoveSamplesBetween(uint64_t from, uint64_t to, uint64_t* deleted, bool inclusive_to) const override;
+
  private:
   nonstd::span<const TSSample> samples_;
 };
diff --git a/tests/cppunit/types/timeseries_test.cc b/tests/cppunit/types/timeseries_test.cc
index fbaf042..c448bc8 100644
--- a/tests/cppunit/types/timeseries_test.cc
+++ b/tests/cppunit/types/timeseries_test.cc
@@ -982,3 +982,92 @@
     EXPECT_TRUE(actual.empty());
   }
 }
+
+TEST_F(TimeSeriesTest, DelComprehensive) {
+  using TSCreateOption = redis::TSCreateOption;
+  using TSRangeOption = redis::TSRangeOption;
+  // Create time series
+  auto s = ts_db_->Create(*ctx_, "test1", TSCreateOption());
+  EXPECT_TRUE(s.ok());
+  s = ts_db_->Create(*ctx_, "test2", TSCreateOption());
+  EXPECT_TRUE(s.ok());
+  s = ts_db_->Create(*ctx_, "test3", TSCreateOption());
+  EXPECT_TRUE(s.ok());
+  s = ts_db_->Create(*ctx_, "test4", TSCreateOption());
+  EXPECT_TRUE(s.ok());
+
+  // Create rules
+  redis::TSAggregator aggregator;
+  redis::TSCreateRuleResult res = redis::TSCreateRuleResult::kOK;
+
+  aggregator.type = redis::TSAggregatorType::SUM;
+  aggregator.bucket_duration = 10;
+  s = ts_db_->CreateRule(*ctx_, "test1", "test2", aggregator, &res);
+  EXPECT_TRUE(s.ok());
+  aggregator.bucket_duration = 200;
+  s = ts_db_->CreateRule(*ctx_, "test1", "test4", aggregator, &res);
+  EXPECT_TRUE(s.ok());
+  aggregator.bucket_duration = 20;
+  aggregator.alignment = 10;
+  s = ts_db_->CreateRule(*ctx_, "test1", "test3", aggregator, &res);
+  EXPECT_TRUE(s.ok());
+
+  // Add samples
+  std::vector<TSSample> samples = {{1, 1},   {2, 2},   {11, 11}, {15, 15}, {16, 16}, {21, 21},
+                                   {24, 24}, {31, 31}, {35, 35}, {39, 39}, {42, 42}, {49, 49}};
+  std::vector<TSChunk::AddResult> results;
+  results.resize(samples.size());
+  s = ts_db_->MAdd(*ctx_, "test1", samples, &results);
+  EXPECT_TRUE(s.ok());
+
+  // Delete samples between timestamps 10 and 40
+  uint64_t deleted = 0;
+  s = ts_db_->Del(*ctx_, "test1", 12, 40, &deleted);
+  EXPECT_TRUE(s.ok());
+  EXPECT_EQ(deleted, 7);
+
+  // Validate downstream ranges
+  std::vector<std::string> keys = {"test2", "test3", "test4"};
+  auto check = [&](const std::vector<std::vector<TSSample>> &expected_samples) {
+    for (size_t i = 0; i < keys.size(); ++i) {
+      std::vector<TSSample> res;
+      TSRangeOption option;
+      option.start_ts = 0;
+      option.end_ts = TSSample::MAX_TIMESTAMP;
+      s = ts_db_->Range(*ctx_, keys[i], option, &res);
+      EXPECT_TRUE(s.ok());
+      EXPECT_EQ(res, expected_samples[i]);
+    }
+  };
+  std::vector<std::vector<TSSample>> expected_samples = {
+      {{0, 3}, {10, 11}},  // test2
+      {{0, 3}, {10, 11}},  // test3
+      {}                   // test4
+  };
+  check(expected_samples);
+
+  // Add new samples
+  TSSample new_sample{50, 50};
+  TSChunk::AddResult add_result;
+  s = ts_db_->Add(*ctx_, "test1", new_sample, TSCreateOption(), &add_result);
+  EXPECT_TRUE(s.ok());
+  // Validate updated ranges
+  expected_samples = {
+      {{0, 3}, {10, 11}, {40, 91}},  // test2
+      {{0, 3}, {10, 11}, {30, 91}},  // test3
+      {}                             // test4
+  };
+  check(expected_samples);
+
+  // Add final sample
+  TSSample final_sample{200, 200};
+  s = ts_db_->Add(*ctx_, "test1", final_sample, TSCreateOption(), &add_result);
+  EXPECT_TRUE(s.ok());
+  // Validate final ranges
+  expected_samples = {
+      {{0, 3}, {10, 11}, {40, 91}, {50, 50}},  // test2
+      {{0, 3}, {10, 11}, {30, 91}, {50, 50}},  // test3
+      {{0, 155}}                               // test4
+  };
+  check(expected_samples);
+}
diff --git a/tests/gocase/unit/type/timeseries/timeseries_test.go b/tests/gocase/unit/type/timeseries/timeseries_test.go
index 7ca217f..8af9e2b 100644
--- a/tests/gocase/unit/type/timeseries/timeseries_test.go
+++ b/tests/gocase/unit/type/timeseries/timeseries_test.go
@@ -879,4 +879,32 @@
 		require.Equal(t, 1, len(res))
 		require.Equal(t, []interface{}{int64(1657811829000), 389.0}, res[0])
 	})
+
+	t.Run("TS.Del Test", func(t *testing.T) {
+		srcKey := "del_test_src"
+		dstKey := "del_test_dst"
+		// Create source key with retention=10
+		require.NoError(t, rdb.Do(ctx, "ts.create", srcKey, "retention", "10").Err())
+		// Create destination key
+		require.NoError(t, rdb.Do(ctx, "ts.create", dstKey).Err())
+
+		// Test: Create rule successfully
+		require.NoError(t, rdb.Do(ctx, "ts.createrule", srcKey, dstKey, "aggregation", "sum", "10").Err())
+
+		// Test: Add samples
+		res := rdb.Do(ctx, "ts.madd", srcKey, "5", "5", srcKey, "8", "8", srcKey, "12", "12", srcKey, "13", "13", srcKey, "15", "15").Val().([]interface{})
+		assert.Equal(t, []interface{}{int64(5), int64(8), int64(12), int64(13), int64(15)}, res)
+
+		// Test: Delete samples within retention period
+		deletedCount := rdb.Do(ctx, "ts.del", srcKey, "11", "14").Val().(int64)
+		assert.Equal(t, int64(2), deletedCount) // Deletes 12 and 13
+
+		// Test: Try delete samples beyond retention period
+		_, err := rdb.Do(ctx, "ts.del", srcKey, "5", "8").Result()
+		require.ErrorContains(t, err, "When a series has compactions, deleting samples or compaction buckets beyond the series retention period is not possible")
+
+		// Test: Try delete all samples with range
+		_, err = rdb.Do(ctx, "ts.del", srcKey, "-", "+").Result()
+		require.ErrorContains(t, err, "When a series has compactions, deleting samples or compaction buckets beyond the series retention period is not possible")
+	})
 }