blob: 540d582d0a6efd1608ad73824a4e6967e29217b1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "redis_timeseries.h"
#include <queue>
#include "commands/error_constants.h"
#include "db_util.h"
namespace redis {
// TODO: make it configurable
constexpr uint64_t kDefaultRetentionTime = 0;
constexpr uint64_t kDefaultChunkSize = 1024;
constexpr auto kDefaultChunkType = TimeSeriesMetadata::ChunkType::UNCOMPRESSED;
constexpr auto kDefaultDuplicatePolicy = TimeSeriesMetadata::DuplicatePolicy::BLOCK;
struct Reducer {
static inline double Sum(nonstd::span<const TSSample> samples) {
return std::accumulate(samples.begin(), samples.end(), 0.0,
[](double acc, const TSSample &sample) { return acc + sample.v; });
}
static inline double SquareSum(nonstd::span<const TSSample> samples) {
return std::accumulate(samples.begin(), samples.end(), 0.0,
[](double acc, const TSSample &sample) { return acc + sample.v * sample.v; });
}
static inline double Min(nonstd::span<const TSSample> samples) {
return std::min_element(samples.begin(), samples.end(),
[](const TSSample &a, const TSSample &b) { return a.v < b.v; })
->v;
}
static inline double Max(nonstd::span<const TSSample> samples) {
return std::max_element(samples.begin(), samples.end(),
[](const TSSample &a, const TSSample &b) { return a.v < b.v; })
->v;
}
static inline double VarP(nonstd::span<const TSSample> samples) {
auto sample_size = static_cast<double>(samples.size());
double sum = Sum(samples);
double square_sum = SquareSum(samples);
return (square_sum - sum * sum / sample_size) / sample_size;
}
static inline double VarS(nonstd::span<const TSSample> samples) {
if (samples.size() <= 1) return 0.0;
auto sample_size = static_cast<double>(samples.size());
return VarP(samples) * sample_size / (sample_size - 1);
}
static inline double StdP(nonstd::span<const TSSample> samples) { return std::sqrt(VarP(samples)); }
static inline double StdS(nonstd::span<const TSSample> samples) { return std::sqrt(VarS(samples)); }
static inline double Range(nonstd::span<const TSSample> samples) {
if (samples.empty()) return 0.0;
auto [min, max] = std::minmax_element(samples.begin(), samples.end(),
[](const TSSample &a, const TSSample &b) { return a.v < b.v; });
return max->v - min->v;
}
};
std::vector<TSSample> AggregateSamplesByRangeOption(std::vector<TSSample> samples, const TSRangeOption &option) {
const auto &aggregator = option.aggregator;
std::vector<TSSample> res;
if (aggregator.type == TSAggregatorType::NONE || samples.empty()) {
res = std::move(samples);
return res;
}
auto spans = aggregator.SplitSamplesToBuckets(samples);
auto get_bucket_ts = [&](uint64_t left) -> uint64_t {
using BucketTimestampType = TSRangeOption::BucketTimestampType;
switch (option.bucket_timestamp_type) {
case BucketTimestampType::Start:
return left;
case BucketTimestampType::End:
return left + aggregator.bucket_duration;
case BucketTimestampType::Mid:
return left + aggregator.bucket_duration / 2;
default:
unreachable();
}
return 0;
};
res.reserve(spans.size());
uint64_t bucket_left = aggregator.CalculateAlignedBucketLeft(samples.front().ts);
for (size_t i = 0; i < spans.size(); i++) {
if (option.count_limit && res.size() >= option.count_limit) {
break;
}
TSSample sample;
if (i != 0) {
bucket_left = aggregator.CalculateAlignedBucketRight(bucket_left);
}
sample.ts = get_bucket_ts(bucket_left);
if (option.is_return_empty && spans[i].empty()) {
switch (aggregator.type) {
case TSAggregatorType::SUM:
case TSAggregatorType::COUNT:
sample.v = 0;
break;
case TSAggregatorType::LAST:
if (i == 0 || spans[i - 1].empty()) {
sample.v = TSSample::NAN_VALUE;
} else {
sample.v = spans[i].back().v;
}
break;
default:
sample.v = TSSample::NAN_VALUE;
}
} else if (!spans[i].empty()) {
sample.v = aggregator.AggregateSamplesValue(spans[i]);
} else {
continue;
}
res.emplace_back(sample);
}
return res;
}
LabelKVList ExtractSelectedLabels(LabelKVList labels, const std::set<std::string> &selected_labels) {
std::unordered_map<std::string_view, LabelKVPair *> labels_map;
labels_map.reserve(labels.size());
for (auto &label : labels) {
labels_map[label.k] = &label;
}
LabelKVList res;
res.reserve(selected_labels.size());
for (const auto &selected_key : selected_labels) {
auto it = labels_map.find(selected_key);
if (it != labels_map.end()) {
res.emplace_back(std::move(*(it->second)));
} else {
res.push_back({selected_key, ""});
}
}
return res;
}
std::vector<TSSample> GroupSamplesAndReduce(const std::vector<std::vector<TSSample>> &all_samples,
TSMRangeOption::GroupReducerType reducer_type) {
if (reducer_type == TSMRangeOption::GroupReducerType::NONE) {
return {};
}
struct SamplePtr {
const TSSample *sample;
size_t vector_idx;
size_t sample_idx;
bool operator>(const SamplePtr &other) const { return sample->ts > other.sample->ts; }
};
std::vector<TSSample> result;
std::priority_queue<SamplePtr, std::vector<SamplePtr>, std::greater<SamplePtr>> min_heap;
// Initialize the min-heap with the first element of each vector
for (size_t i = 0; i < all_samples.size(); ++i) {
if (!all_samples[i].empty()) {
min_heap.push({&all_samples[i][0], i, 0});
}
}
if (min_heap.empty()) {
return result;
}
auto reduce = [&](nonstd::span<const TSSample> samples) -> double {
auto sample_size = static_cast<double>(samples.size());
switch (reducer_type) {
case TSMRangeOption::GroupReducerType::SUM:
return Reducer::Sum(samples);
case TSMRangeOption::GroupReducerType::AVG:
return samples.empty() ? 0.0 : Reducer::Sum(samples) / sample_size;
case TSMRangeOption::GroupReducerType::MIN:
return Reducer::Min(samples);
case TSMRangeOption::GroupReducerType::MAX:
return Reducer::Max(samples);
case TSMRangeOption::GroupReducerType::RANGE:
return Reducer::Range(samples);
case TSMRangeOption::GroupReducerType::COUNT:
return sample_size;
case TSMRangeOption::GroupReducerType::STD_P:
return Reducer::StdP(samples);
case TSMRangeOption::GroupReducerType::STD_S:
return Reducer::StdS(samples);
case TSMRangeOption::GroupReducerType::VAR_P:
return Reducer::VarP(samples);
case TSMRangeOption::GroupReducerType::VAR_S:
return Reducer::VarS(samples);
case TSMRangeOption::GroupReducerType::NONE:
return 0.0;
}
return 0.0;
};
std::vector<TSSample> current_group;
current_group.reserve(all_samples.size());
while (!min_heap.empty()) {
// Get the top element from the min-heap
SamplePtr top = min_heap.top();
min_heap.pop();
// Check if the timestamp is the same as the current group
if (!current_group.empty() && top.sample->ts != current_group.back().ts) {
// Different timestamp, reduce the current group and start a new one
uint64_t group_ts = current_group.back().ts;
nonstd::span<const TSSample> group_span(current_group);
double reduced_value = reduce(group_span);
result.push_back({group_ts, reduced_value});
current_group.clear();
}
current_group.push_back(*top.sample);
// Push the next element from the same vector into the min-heap
size_t next_sample_idx = top.sample_idx + 1;
if (next_sample_idx < all_samples[top.vector_idx].size()) {
min_heap.push({&all_samples[top.vector_idx][next_sample_idx], top.vector_idx, next_sample_idx});
}
}
// Process the last group if it exists
if (!current_group.empty()) {
uint64_t group_ts = current_group.back().ts;
nonstd::span<const TSSample> group_span(current_group);
double reduced_value = reduce(group_span);
result.push_back({group_ts, reduced_value});
}
return result;
}
std::vector<TSSample> TSDownStreamMeta::AggregateMultiBuckets(nonstd::span<const TSSample> samples,
bool skip_last_bucket) {
std::vector<TSSample> res;
auto bucket_spans = aggregator.SplitSamplesToBuckets(samples);
for (size_t i = 0; i < bucket_spans.size(); i++) {
const auto &span = bucket_spans[i];
if (span.empty()) {
continue;
}
auto bucket_idx = aggregator.CalculateAlignedBucketLeft(span.front().ts);
if (bucket_idx < latest_bucket_idx) {
continue;
}
if (bucket_idx > latest_bucket_idx) {
// Aggregate the previous bucket from aux info and push to result
TSSample sample;
sample.ts = latest_bucket_idx;
double v = 0.0;
double temp_n = 0.0;
switch (aggregator.type) {
case TSAggregatorType::SUM:
case TSAggregatorType::MIN:
case TSAggregatorType::MAX:
case TSAggregatorType::COUNT:
case TSAggregatorType::FIRST:
case TSAggregatorType::LAST:
sample.v = f64_auxs[0];
break;
case TSAggregatorType::AVG:
temp_n = static_cast<double>(u64_auxs[0]);
sample.v = f64_auxs[0] / temp_n;
break;
case TSAggregatorType::STD_P:
case TSAggregatorType::STD_S:
case TSAggregatorType::VAR_P:
case TSAggregatorType::VAR_S:
temp_n = static_cast<double>(u64_auxs[0]);
v = f64_auxs[1] - f64_auxs[0] * f64_auxs[0] / temp_n;
if (aggregator.type == TSAggregatorType::STD_S || aggregator.type == TSAggregatorType::VAR_S) {
if (u64_auxs[0] > 1) {
v = v / (temp_n - 1);
} else {
v = 0.0;
}
} else {
v = v / temp_n;
}
if (aggregator.type == TSAggregatorType::STD_P || aggregator.type == TSAggregatorType::STD_S) {
sample.v = std::sqrt(v);
} else {
sample.v = v;
}
break;
case TSAggregatorType::RANGE:
sample.v = f64_auxs[1] - f64_auxs[0];
break;
default:
unreachable();
}
res.push_back(sample);
// Reset aux info for the new bucket
ResetAuxs();
latest_bucket_idx = bucket_idx;
}
if (skip_last_bucket && i == bucket_spans.size() - 1) {
// Skip updating aux info for the last bucket
break;
}
AggregateLatestBucket(span);
}
return res;
}
void TSDownStreamMeta::AggregateLatestBucket(nonstd::span<const TSSample> samples) {
if (samples.empty()) return;
double temp_v = 0.0;
switch (aggregator.type) {
case TSAggregatorType::SUM:
f64_auxs[0] += Reducer::Sum(samples);
break;
case TSAggregatorType::MIN:
temp_v = Reducer::Min(samples);
f64_auxs[0] = std::isnan(f64_auxs[0]) ? temp_v : std::min(f64_auxs[0], temp_v);
break;
case TSAggregatorType::MAX:
temp_v = Reducer::Max(samples);
f64_auxs[0] = std::isnan(f64_auxs[0]) ? temp_v : std::max(f64_auxs[0], temp_v);
break;
case TSAggregatorType::COUNT:
f64_auxs[0] += static_cast<double>(samples.size());
break;
case TSAggregatorType::FIRST:
if (std::isnan(f64_auxs[0]) || samples.front().ts < u64_auxs[0]) {
f64_auxs[0] = samples.front().v;
u64_auxs[0] = samples.front().ts;
}
break;
case TSAggregatorType::LAST:
if (std::isnan(f64_auxs[0]) || samples.back().ts > u64_auxs[0]) {
f64_auxs[0] = samples.back().v;
u64_auxs[0] = samples.back().ts;
}
break;
case TSAggregatorType::AVG:
u64_auxs[0] += static_cast<uint64_t>(samples.size());
f64_auxs[0] += Reducer::Sum(samples);
break;
case TSAggregatorType::STD_P:
case TSAggregatorType::STD_S:
case TSAggregatorType::VAR_P:
case TSAggregatorType::VAR_S:
u64_auxs[0] += static_cast<uint64_t>(samples.size());
f64_auxs[0] += Reducer::Sum(samples);
f64_auxs[1] += Reducer::SquareSum(samples);
break;
case TSAggregatorType::RANGE:
if (std::isnan(f64_auxs[0])) {
f64_auxs[0] = Reducer::Min(samples);
f64_auxs[1] = Reducer::Max(samples);
} else {
f64_auxs[0] = std::min(f64_auxs[0], Reducer::Min(samples));
f64_auxs[1] = std::max(f64_auxs[1], Reducer::Max(samples));
}
break;
default:
unreachable();
}
}
void TSDownStreamMeta::ResetAuxs() {
auto type = aggregator.type;
switch (type) {
case TSAggregatorType::SUM:
f64_auxs = {0.0};
break;
case TSAggregatorType::MIN:
case TSAggregatorType::MAX:
f64_auxs = {TSSample::NAN_VALUE};
break;
case TSAggregatorType::COUNT:
f64_auxs = {0};
break;
case TSAggregatorType::FIRST:
u64_auxs = {TSSample::MAX_TIMESTAMP};
f64_auxs = {TSSample::NAN_VALUE};
break;
case TSAggregatorType::LAST:
u64_auxs = {0};
f64_auxs = {TSSample::NAN_VALUE};
break;
case TSAggregatorType::AVG:
u64_auxs = {0};
f64_auxs = {0.0};
break;
case TSAggregatorType::STD_P:
case TSAggregatorType::STD_S:
case TSAggregatorType::VAR_P:
case TSAggregatorType::VAR_S:
u64_auxs = {0};
f64_auxs = {0.0, 0.0};
break;
case TSAggregatorType::RANGE:
f64_auxs = {TSSample::NAN_VALUE, TSSample::NAN_VALUE};
break;
default:
unreachable();
}
}
void TSDownStreamMeta::Encode(std::string *dst) const {
PutFixed8(dst, static_cast<uint8_t>(aggregator.type));
PutFixed64(dst, aggregator.bucket_duration);
PutFixed64(dst, aggregator.alignment);
PutFixed64(dst, latest_bucket_idx);
PutFixed8(dst, static_cast<uint8_t>(u64_auxs.size()));
PutFixed8(dst, static_cast<uint8_t>(f64_auxs.size()));
for (const auto &aux : u64_auxs) {
PutFixed64(dst, aux);
}
for (const auto &aux : f64_auxs) {
PutDouble(dst, aux);
}
}
rocksdb::Status TSDownStreamMeta::Decode(Slice *input) {
if (input->size() < sizeof(uint8_t) * 3 + sizeof(uint64_t) * 3) {
return rocksdb::Status::InvalidArgument("TSDownStreamMeta size is too short");
}
GetFixed8(input, reinterpret_cast<uint8_t *>(&aggregator.type));
GetFixed64(input, &aggregator.bucket_duration);
GetFixed64(input, &aggregator.alignment);
GetFixed64(input, &latest_bucket_idx);
uint8_t u64_auxs_size = 0;
GetFixed8(input, &u64_auxs_size);
uint8_t f64_auxs_size = 0;
GetFixed8(input, &f64_auxs_size);
// Strict checking to prevent accidental overwrites
if (input->size() != sizeof(uint64_t) * u64_auxs_size + sizeof(double) * f64_auxs_size) {
return rocksdb::Status::InvalidArgument("Invalid auxinfo size");
}
for (uint8_t i = 0; i < u64_auxs_size; i++) {
uint64_t aux = 0;
GetFixed64(input, &aux);
u64_auxs.push_back(aux);
}
for (uint8_t i = 0; i < f64_auxs_size; i++) {
double aux = 0;
GetDouble(input, &aux);
f64_auxs.push_back(aux);
}
return rocksdb::Status::OK();
}
std::string TSRevLabelKey::Encode() const {
std::string encoded;
size_t total = 1 + ns.size() + 1 + 4 + label_key.size() + 4 + label_value.size() + user_key.size();
encoded.resize(total);
auto buf = encoded.data();
buf = EncodeFixed8(buf, static_cast<uint8_t>(ns.size()));
buf = EncodeBuffer(buf, ns);
buf = EncodeFixed8(buf, static_cast<uint8_t>(IndexKeyType::TS_LABEL));
buf = EncodeFixed32(buf, static_cast<uint32_t>(label_key.size()));
buf = EncodeBuffer(buf, label_key);
buf = EncodeFixed32(buf, static_cast<uint32_t>(label_value.size()));
buf = EncodeBuffer(buf, label_value);
EncodeBuffer(buf, user_key);
return encoded;
}
std::string TSRevLabelKey::UpperBound(Slice ns) {
std::string encoded;
size_t total = 1 + ns.size() + 1;
encoded.resize(total);
auto buf = encoded.data();
buf = EncodeFixed8(buf, static_cast<uint8_t>(ns.size()));
buf = EncodeBuffer(buf, ns);
EncodeFixed8(buf, static_cast<uint8_t>(IndexKeyType::TS_LABEL) + 1);
return encoded;
}
TSCreateOption::TSCreateOption()
: retention_time(kDefaultRetentionTime),
chunk_size(kDefaultChunkSize),
chunk_type(kDefaultChunkType),
duplicate_policy(kDefaultDuplicatePolicy) {}
Status TSMQueryFilterParser::Parse(std::string_view expr) {
if (expr.empty()) return Status::OK();
// Locate "!=" or "="
const auto [op_pos, op_len] = findOperator(expr);
if (op_pos == std::string_view::npos) {
return {Status::RedisParseErr, "failed parsing labels"};
}
// Extract label and value
std::string_view label = expr.substr(0, op_pos);
label = trim(label);
std::string_view value_str = expr.substr(op_pos + op_len);
std::string_view op = expr.substr(op_pos, op_len); // "=" or "!="
if (op == "=") {
handleEquals(label, value_str);
} else if (op == "!=") {
handleNotEquals(label, value_str);
}
return Status::OK();
}
Status TSMQueryFilterParser::Check() const {
if (option_.labels_equals.empty() || !has_matcher_) {
return {Status::RedisParseErr, "please provide at least one matcher"};
}
return Status::OK();
}
std::pair<size_t, size_t> TSMQueryFilterParser::findOperator(std::string_view expr) {
char quote = 0;
for (size_t i = 0; i < expr.size(); i++) {
char c = expr[i];
if (c == '\'' || c == '"') {
if (quote == 0)
quote = c;
else if (quote == c)
quote = 0;
} else if (quote == 0) {
if (c == '!' && i + 1 < expr.size() && expr[i + 1] == '=') {
return {i, 2};
} else if (c == '=') {
return {i, 1};
}
}
}
return {std::string_view::npos, 0};
}
std::string_view TSMQueryFilterParser::trim(std::string_view s) {
while (!s.empty() && std::isspace(s.front())) {
s.remove_prefix(1);
}
while (!s.empty() && std::isspace(s.back())) {
s.remove_suffix(1);
}
return s;
}
std::string_view TSMQueryFilterParser::unquote(std::string_view s) {
if (s.size() >= 2) {
char first = s.front();
char last = s.back();
if ((first == '"' && last == '"') || (first == '\'' && last == '\'')) {
return s.substr(1, s.size() - 2);
}
}
return s;
}
std::vector<std::string_view> TSMQueryFilterParser::splitValueList(std::string_view list) {
std::vector<std::string_view> values;
if (list.empty()) return values;
char quote = 0;
int depth = 0;
size_t start = 0;
for (size_t i = 0; i <= list.size(); i++) {
if (i == list.size()) {
if (start < i) {
auto val = trim(unquote(list.substr(start, i - start)));
if (!val.empty()) {
values.push_back(val);
}
}
break;
}
char c = list[i];
if (c == '\'' || c == '"') {
if (quote == 0)
quote = c;
else if (quote == c)
quote = 0;
} else if (quote == 0) {
if (c == '(')
depth++;
else if (c == ')')
if (depth > 0) depth--;
}
if (c == ',' && quote == 0 && depth == 0) {
auto val = trim(unquote(list.substr(start, i - start)));
if (!val.empty()) {
values.push_back(val);
}
start = i + 1;
}
}
return values;
}
void TSMQueryFilterParser::handleEquals(std::string_view label, std::string_view value_str) {
std::string label_str(label);
if (value_str.empty()) {
// Label not exists: label=
option_.labels_equals[std::move(label_str)].clear();
} else {
has_matcher_ = true;
// If label exists, but value is empty, means label not exists, skip it
if (option_.labels_equals.count(label_str) && option_.labels_equals[label_str].empty()) {
return;
}
std::set<std::string> values;
if (value_str.front() == '(' && value_str.back() == ')') {
// List: label=(v1,v2)
for (auto val : splitValueList(value_str.substr(1, value_str.size() - 2))) {
values.emplace(val);
}
} else {
// Single value: label=value
values.emplace(unquote(value_str));
}
option_.labels_equals[std::move(label_str)].merge(std::move(values));
}
}
void TSMQueryFilterParser::handleNotEquals(std::string_view label, std::string_view value_str) {
std::string label_str(label);
if (value_str.empty()) {
// Label exists: label!=
option_.labels_not_equals[std::move(label_str)].insert(""); // Use empty string to indicate label exists
} else {
std::set<std::string> values;
if (value_str.front() == '(' && value_str.back() == ')') {
// List: label!=(v1,v2)
for (auto val : splitValueList(value_str.substr(1, value_str.size() - 2))) {
values.emplace(val);
}
} else {
// Single value: label!=value
values.emplace(unquote(value_str));
}
option_.labels_not_equals[std::move(label_str)].merge(std::move(values));
}
}
TimeSeriesMetadata CreateMetadataFromOption(const TSCreateOption &option) {
TimeSeriesMetadata metadata;
metadata.retention_time = option.retention_time;
metadata.chunk_size = option.chunk_size;
metadata.chunk_type = option.chunk_type;
metadata.duplicate_policy = option.duplicate_policy;
metadata.SetSourceKey(option.source_key);
return metadata;
}
TSDownStreamMeta CreateDownStreamMetaFromAgg(const TSAggregator &aggregator) {
TSDownStreamMeta meta;
meta.aggregator = aggregator;
meta.latest_bucket_idx = 0;
meta.ResetAuxs();
return meta;
}
uint64_t TSAggregator::CalculateAlignedBucketLeft(uint64_t ts) const {
uint64_t x = 0;
if (ts >= alignment) {
uint64_t diff = ts - alignment;
uint64_t k = diff / bucket_duration;
x = alignment + k * bucket_duration;
} else {
uint64_t diff = alignment - ts;
uint64_t m0 = diff / bucket_duration + (diff % bucket_duration == 0 ? 0 : 1);
if (m0 <= alignment / bucket_duration) {
x = alignment - m0 * bucket_duration;
}
}
return x;
}
uint64_t TSAggregator::CalculateAlignedBucketRight(uint64_t ts) const {
uint64_t x = TSSample::MAX_TIMESTAMP;
if (ts < alignment) {
uint64_t diff = alignment - ts;
uint64_t k = diff / bucket_duration;
x = alignment - k * bucket_duration;
} else {
uint64_t diff = ts - alignment;
uint64_t m0 = diff / bucket_duration + 1;
if (m0 <= (TSSample::MAX_TIMESTAMP - alignment) / bucket_duration) {
x = alignment + m0 * bucket_duration;
}
}
return x;
}
std::vector<nonstd::span<const TSSample>> TSAggregator::SplitSamplesToBuckets(
nonstd::span<const TSSample> samples) const {
std::vector<nonstd::span<const TSSample>> spans;
if (type == TSAggregatorType::NONE || samples.empty()) {
return spans;
}
uint64_t start_bucket = CalculateAlignedBucketLeft(samples.front().ts);
uint64_t end_bucket = CalculateAlignedBucketLeft(samples.back().ts);
uint64_t bucket_count = (end_bucket - start_bucket) / bucket_duration + 1;
spans.reserve(bucket_count);
auto it = samples.begin();
const auto end = samples.end();
uint64_t bucket_left = start_bucket;
while (it != end) {
uint64_t bucket_right = CalculateAlignedBucketRight(bucket_left);
auto lower = std::lower_bound(it, end, TSSample{bucket_left, 0.0});
auto upper = std::lower_bound(lower, end, TSSample{bucket_right, 0.0});
spans.emplace_back(lower, upper);
it = upper;
bucket_left = bucket_right;
}
return spans;
}
nonstd::span<const TSSample> TSAggregator::GetBucketByTimestamp(nonstd::span<const TSSample> samples, uint64_t ts,
uint64_t less_than) const {
if (type == TSAggregatorType::NONE || samples.empty()) {
return {};
}
uint64_t start_bucket = CalculateAlignedBucketLeft(ts);
uint64_t end_bucket = std::min(CalculateAlignedBucketRight(ts), less_than);
auto lower = std::lower_bound(samples.begin(), samples.end(), TSSample{start_bucket, 0.0});
auto upper = std::lower_bound(lower, samples.end(), TSSample{end_bucket, 0.0});
if (lower == upper) {
return {};
}
return {lower, upper};
}
double TSAggregator::AggregateSamplesValue(nonstd::span<const TSSample> samples) const {
double res = TSSample::NAN_VALUE;
if (samples.empty()) {
return res;
}
auto sample_size = static_cast<double>(samples.size());
switch (type) {
case TSAggregatorType::AVG:
res = Reducer::Sum(samples) / sample_size;
break;
case TSAggregatorType::SUM:
res = Reducer::Sum(samples);
break;
case TSAggregatorType::MIN:
res = Reducer::Min(samples);
break;
case TSAggregatorType::MAX:
res = Reducer::Max(samples);
break;
case TSAggregatorType::RANGE:
res = Reducer::Range(samples);
break;
case TSAggregatorType::COUNT:
res = sample_size;
break;
case TSAggregatorType::FIRST:
res = samples.front().v;
break;
case TSAggregatorType::LAST:
res = samples.back().v;
break;
case TSAggregatorType::STD_P:
res = Reducer::StdP(samples);
break;
case TSAggregatorType::STD_S:
res = Reducer::StdS(samples);
break;
case TSAggregatorType::VAR_P:
res = Reducer::VarP(samples);
break;
case TSAggregatorType::VAR_S:
res = Reducer::VarS(samples);
break;
default:
unreachable();
}
return res;
}
rocksdb::Status TimeSeries::getTimeSeriesMetadata(engine::Context &ctx, const Slice &ns_key,
TimeSeriesMetadata *metadata) {
return Database::GetMetadata(ctx, {kRedisTimeSeries}, ns_key, metadata);
}
rocksdb::Status TimeSeries::createTimeSeries(engine::Context &ctx, const Slice &ns_key,
TimeSeriesMetadata *metadata_out, const TSCreateOption *option) {
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisTimeSeries, {"createTimeSeries"});
auto s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
*metadata_out = CreateMetadataFromOption(option ? *option : TSCreateOption{});
std::string bytes;
metadata_out->Encode(&bytes);
s = batch->Put(metadata_cf_handle_, ns_key, bytes);
if (!s.ok()) return s;
if (option && !option->labels.empty()) {
createLabelIndexInBatch(ns_key, *metadata_out, batch, option->labels);
}
return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}
rocksdb::Status TimeSeries::getOrCreateTimeSeries(engine::Context &ctx, const Slice &ns_key,
TimeSeriesMetadata *metadata_out, const TSCreateOption *option) {
auto s = getTimeSeriesMetadata(ctx, ns_key, metadata_out);
if (s.ok()) {
return s;
}
return createTimeSeries(ctx, ns_key, metadata_out, option);
}
rocksdb::Status TimeSeries::upsertCommon(engine::Context &ctx, const Slice &ns_key, TimeSeriesMetadata &metadata,
SampleBatch &sample_batch, std::vector<std::string> *new_chunks) {
auto batch = storage_->GetWriteBatchBase();
auto s = upsertCommonInBatch(ctx, ns_key, metadata, sample_batch, batch, new_chunks);
if (!s.ok()) return s;
return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}
rocksdb::Status TimeSeries::upsertCommonInBatch(engine::Context &ctx, const Slice &ns_key, TimeSeriesMetadata &metadata,
SampleBatch &sample_batch,
ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch,
std::vector<std::string> *new_chunks) {
auto all_batch_slice = sample_batch.AsSlice();
if (all_batch_slice.GetSampleSpan().empty() && new_chunks != nullptr) {
new_chunks->clear();
return rocksdb::Status::OK();
}
// In the emun `TSSubkeyType`, `LABEL` is the next of `CHUNK`
std::string chunk_upper_bound = internalKeyFromLabelKey(ns_key, metadata, "");
std::string end_key = internalKeyFromChunkID(ns_key, metadata, TSSample::MAX_TIMESTAMP);
std::string prefix = end_key.substr(0, end_key.size() - sizeof(uint64_t));
rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
rocksdb::Slice upper_bound(chunk_upper_bound);
read_options.iterate_upper_bound = &upper_bound;
rocksdb::Slice lower_bound(prefix);
read_options.iterate_lower_bound = &lower_bound;
uint64_t chunk_count = metadata.size;
// Get the latest chunk
auto iter = util::UniqueIterator(ctx, read_options);
iter->SeekForPrev(end_key);
TSChunkPtr latest_chunk;
std::string latest_chunk_key, latest_chunk_value;
if (!iter->Valid() || !iter->key().starts_with(prefix)) {
// Create a new empty chunk if there is no chunk
auto [chunk_ptr_, data_] = CreateEmptyOwnedTSChunk();
latest_chunk_value = std::move(data_);
latest_chunk = std::move(chunk_ptr_);
} else {
latest_chunk_key = iter->key().ToString();
latest_chunk_value = iter->value().ToString();
latest_chunk = CreateTSChunkFromData(latest_chunk_value);
}
// Filter out samples older than retention time
sample_batch.Expire(latest_chunk->GetLastTimestamp(), metadata.retention_time);
if (all_batch_slice.GetValidCount() == 0) {
return rocksdb::Status::OK();
}
WriteBatchLogData log_data(kRedisTimeSeries);
auto s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
// Get the first chunk
auto start_key = internalKeyFromChunkID(ns_key, metadata, all_batch_slice.GetFirstTimestamp());
iter->SeekForPrev(start_key);
if (!iter->Valid()) {
iter->Seek(start_key);
} else if (!iter->key().starts_with(prefix)) {
iter->Next();
}
// Process samples added to sealed chunks
uint64_t start_ts = 0;
uint64_t end_ts = TSSample::MAX_TIMESTAMP;
bool is_chunk = (iter->Valid() && iter->key().starts_with(prefix));
while (is_chunk) {
auto cur_chunk_data = iter->value().ToString();
auto cur_chunk_key = iter->key().ToString();
iter->Next();
is_chunk = (iter->Valid() && iter->key().starts_with(prefix));
if (!is_chunk) {
// Process last chunk
break;
}
end_ts = chunkIDFromInternalKey(iter->key());
auto chunk = CreateTSChunkFromData(cur_chunk_data);
auto sample_slice = all_batch_slice.SliceByTimestamps(start_ts, end_ts);
if (sample_slice.GetValidCount() == 0) {
continue;
}
auto new_data_list = chunk->UpsertSampleAndSplit(sample_slice, metadata.chunk_size, false);
for (size_t i = 0; i < new_data_list.size(); i++) {
auto &new_data = new_data_list[i];
auto new_chunk = CreateTSChunkFromData(new_data);
auto new_key = internalKeyFromChunkID(ns_key, metadata, new_chunk->GetFirstTimestamp());
// Process samples older than the first chunk, should update the key
if (i == 0 && new_key != cur_chunk_key) {
s = batch->Delete(cur_chunk_key);
if (!s.ok()) return s;
}
s = batch->Put(new_key, new_data);
if (!s.ok()) return s;
}
if (!new_data_list.empty()) {
chunk_count += new_data_list.size() - 1;
}
}
// Process samples added to latest chunk(unseal)
auto remained_samples = all_batch_slice.SliceByTimestamps(start_ts, TSSample::MAX_TIMESTAMP, true);
auto new_data_list = latest_chunk->UpsertSampleAndSplit(remained_samples, metadata.chunk_size, true);
for (size_t i = 0; i < new_data_list.size(); i++) {
auto &new_data = new_data_list[i];
auto new_chunk = CreateTSChunkFromData(new_data);
auto new_key = internalKeyFromChunkID(ns_key, metadata, new_chunk->GetFirstTimestamp());
if (i == 0 && new_key != latest_chunk_key) {
s = batch->Delete(latest_chunk_key);
if (!s.ok()) return s;
}
s = batch->Put(new_key, new_data);
if (!s.ok()) return s;
}
if (!new_data_list.empty()) {
chunk_count += new_data_list.size() - (metadata.size == 0 ? 0 : 1);
}
if (chunk_count != metadata.size) {
metadata.size = chunk_count;
std::string bytes;
metadata.Encode(&bytes);
s = batch->Put(metadata_cf_handle_, ns_key, bytes);
if (!s.ok()) return s;
}
if (new_chunks) {
if (new_data_list.size()) {
*new_chunks = std::move(new_data_list);
} else {
*new_chunks = {std::move(latest_chunk_value)};
}
}
return rocksdb::Status::OK();
}
rocksdb::Status TimeSeries::rangeCommon(engine::Context &ctx, const Slice &ns_key, const TimeSeriesMetadata &metadata,
const TSRangeOption &option, std::vector<TSSample> *res, bool apply_retention) {
if (option.end_ts < option.start_ts) {
return rocksdb::Status::OK();
}
// In the emun `TSSubkeyType`, `LABEL` is the next of `CHUNK`
std::string chunk_upper_bound = internalKeyFromLabelKey(ns_key, metadata, "");
std::string end_key = internalKeyFromChunkID(ns_key, metadata, TSSample::MAX_TIMESTAMP);
std::string prefix = end_key.substr(0, end_key.size() - sizeof(uint64_t));
rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
rocksdb::Slice upper_bound(chunk_upper_bound);
read_options.iterate_upper_bound = &upper_bound;
rocksdb::Slice lower_bound(prefix);
read_options.iterate_lower_bound = &lower_bound;
// Get the latest chunk
auto iter = util::UniqueIterator(ctx, read_options);
iter->SeekForPrev(end_key);
if (!iter->Valid() || !iter->key().starts_with(prefix)) {
return rocksdb::Status::OK();
}
auto chunk = CreateTSChunkFromData(iter->value());
uint64_t last_timestamp = chunk->GetLastTimestamp();
uint64_t retention_bound =
(apply_retention && metadata.retention_time != 0 && last_timestamp > metadata.retention_time)
? last_timestamp - metadata.retention_time
: 0;
uint64_t start_timestamp = std::max(retention_bound, option.start_ts);
uint64_t end_timestamp = std::min(last_timestamp, option.end_ts);
// Update iterator options
auto start_key = internalKeyFromChunkID(ns_key, metadata, start_timestamp);
if (end_timestamp != TSSample::MAX_TIMESTAMP) {
end_key = internalKeyFromChunkID(ns_key, metadata, end_timestamp + 1);
}
upper_bound = Slice(end_key);
read_options.iterate_upper_bound = &upper_bound;
iter = util::UniqueIterator(ctx, read_options);
iter->SeekForPrev(start_key);
if (!iter->Valid()) {
iter->Seek(start_key);
} else if (!iter->key().starts_with(prefix)) {
iter->Next();
}
// Prepare to store results
std::vector<TSSample> temp_results;
const auto &aggregator = option.aggregator;
bool has_aggregator = aggregator.type != TSAggregatorType::NONE;
if (iter->Valid()) {
if (option.count_limit != 0 && !has_aggregator) {
temp_results.reserve(option.count_limit);
} else {
chunk = CreateTSChunkFromData(iter->value());
auto range = chunk->GetLastTimestamp() - chunk->GetFirstTimestamp() + 1;
auto estimate_chunks = std::min((end_timestamp - start_timestamp) / range, uint64_t(32));
temp_results.reserve(estimate_chunks * metadata.chunk_size);
}
}
// Get samples from chunks
uint64_t bucket_count = 0;
uint64_t last_bucket = 0;
bool is_not_enough = true;
for (; iter->Valid() && is_not_enough; iter->Next()) {
chunk = CreateTSChunkFromData(iter->value());
auto it = chunk->CreateIterator();
while (it->HasNext()) {
auto sample = it->Next().value();
// Early termination check
if (!has_aggregator && option.count_limit && temp_results.size() >= option.count_limit) {
is_not_enough = false;
break;
}
const bool in_time_range = sample->ts >= start_timestamp && sample->ts <= end_timestamp;
const bool not_time_filtered = option.filter_by_ts.empty() || option.filter_by_ts.count(sample->ts);
const bool value_in_range = !option.filter_by_value || (sample->v >= option.filter_by_value->first &&
sample->v <= option.filter_by_value->second);
if (!in_time_range || !not_time_filtered || !value_in_range) {
continue;
}
// Do checks for early termination when `count_limit` is set.
if (has_aggregator && option.count_limit > 0) {
const auto bucket = aggregator.CalculateAlignedBucketRight(sample->ts);
const bool is_empty_count = (last_bucket > 0 && option.is_return_empty);
const size_t increment = is_empty_count ? (bucket - last_bucket) / aggregator.bucket_duration : 1;
bucket_count += increment;
last_bucket = bucket;
if (bucket_count > option.count_limit) {
is_not_enough = false;
temp_results.push_back(*sample); // Ensure empty bucket is reported
break;
}
}
temp_results.push_back(*sample);
}
}
// Process compaction logic
*res = AggregateSamplesByRangeOption(std::move(temp_results), option);
return rocksdb::Status::OK();
}
rocksdb::Status TimeSeries::upsertDownStream(engine::Context &ctx, const Slice &ns_key,
const TimeSeriesMetadata &metadata,
const std::vector<std::string> &new_chunks, SampleBatch &sample_batch) {
// If no valid written
if (new_chunks.empty()) return rocksdb::Status::OK();
std::vector<std::string> downstream_keys;
std::vector<TSDownStreamMeta> downstream_metas;
auto s = getDownStreamRules(ctx, ns_key, metadata, &downstream_keys, &downstream_metas);
if (!s.ok()) return s;
if (downstream_keys.empty()) return rocksdb::Status::OK();
auto all_batch_slice = sample_batch.AsSlice();
uint64_t new_chunk_first_ts = CreateTSChunkFromData(new_chunks[0])->GetFirstTimestamp();
nonstd::span<const AddResult> add_results = all_batch_slice.GetAddResultSpan();
auto samples_span = all_batch_slice.GetSampleSpan();
std::vector<std::vector<TSSample>> all_agg_samples(downstream_metas.size());
std::vector<std::vector<TSSample>> all_agg_samples_inc(downstream_metas.size());
std::vector<uint64_t> last_buckets(downstream_metas.size());
std::vector<bool> is_meta_updates(downstream_metas.size(), false);
using AddResultType = TSChunk::AddResultType;
struct ProcessingInfo {
uint64_t start_ts;
uint64_t end_ts;
size_t sample_idx;
std::vector<size_t> downstream_indices;
};
std::vector<ProcessingInfo> processing_infos;
processing_infos.reserve(add_results.size());
for (size_t i = 0; i < add_results.size(); i++) {
const auto &add_result = add_results[i];
auto sample_ts = add_result.sample.ts;
const auto type = add_result.type;
if (type != AddResultType::kInsert && type != AddResultType::kUpdate) {
continue;
}
// Prepare info for samples added to sealed chunks
ProcessingInfo info;
info.sample_idx = i;
info.start_ts = TSSample::MAX_TIMESTAMP;
info.end_ts = 0;
for (size_t j = 0; j < downstream_metas.size(); j++) {
const auto &agg = downstream_metas[j].aggregator;
uint64_t latest_bucket_idx = downstream_metas[j].latest_bucket_idx;
uint64_t bkt_left = agg.CalculateAlignedBucketLeft(sample_ts);
// Skip samples with timestamps beyond the retrieval boundary
// Boundary is defined as the later of:
// - New chunk start time (new_chunk_first_ts)
// - Latest bucket index (latest_bucket_idx)
auto boundary = std::max(new_chunk_first_ts, latest_bucket_idx);
if (sample_ts >= boundary) {
continue;
}
// For these type, no need retrieve source samples
if (IsIncrementalAggregatorType(agg.type)) {
info.downstream_indices.push_back(j);
continue;
}
if ((i > 0 && bkt_left == last_buckets[j])) {
continue;
}
info.downstream_indices.push_back(j);
uint64_t bkt_right = agg.CalculateAlignedBucketRight(sample_ts);
info.start_ts = std::min(info.start_ts, bkt_left);
info.end_ts = std::max(info.end_ts, bkt_right);
info.end_ts = std::min(info.end_ts, boundary - 1); // Exclusive. Boundary > 0
}
if (info.downstream_indices.size()) {
processing_infos.push_back(info);
}
}
// Process samples added to sealed chunks
for (const auto &info : processing_infos) {
const auto &add_result = add_results[info.sample_idx];
const auto &sample = samples_span[info.sample_idx];
TSRangeOption option;
option.start_ts = info.start_ts;
option.end_ts = info.end_ts;
std::vector<TSSample> retrieve_samples;
s = rangeCommon(ctx, ns_key, metadata, option, &retrieve_samples, false);
if (!s.ok()) return s;
for (size_t j : info.downstream_indices) {
auto &meta = downstream_metas[j];
const auto &agg = meta.aggregator;
uint64_t bkt_left = agg.CalculateAlignedBucketLeft(add_result.sample.ts);
if (IsIncrementalAggregatorType(agg.type)) {
std::vector<TSSample> sample_temp = {{bkt_left, add_result.sample.v}};
switch (agg.type) {
case TSAggregatorType::MIN:
case TSAggregatorType::MAX:
sample_temp[0].v = sample.v;
break;
case TSAggregatorType::COUNT:
sample_temp[0].v = 1.0;
break;
default:
break;
}
if (bkt_left == meta.latest_bucket_idx) {
meta.AggregateLatestBucket(sample_temp);
is_meta_updates[j] = true;
} else {
all_agg_samples_inc[j].push_back({bkt_left, sample_temp[0].v});
}
} else {
auto span = agg.GetBucketByTimestamp(retrieve_samples, bkt_left);
CHECK(!span.empty());
last_buckets[j] = bkt_left;
if (bkt_left == meta.latest_bucket_idx) {
meta.ResetAuxs();
meta.AggregateLatestBucket(span);
is_meta_updates[j] = true;
} else {
all_agg_samples[j].push_back({bkt_left, agg.AggregateSamplesValue(span)});
}
}
}
}
// Process samples added to the latest chunk
for (size_t i = 0; i < downstream_metas.size(); i++) {
auto &agg_samples = all_agg_samples[i];
auto &meta = downstream_metas[i];
const auto &agg = meta.aggregator;
if (new_chunks.size() > 1) {
is_meta_updates[i] = true;
}
// For chunk except the last chunk(sealed)
for (size_t j = 0; j < new_chunks.size() - 1; j++) {
auto chunk = CreateTSChunkFromData(new_chunks[j]);
auto samples = meta.AggregateMultiBuckets(chunk->GetSamplesSpan());
agg_samples.insert(agg_samples.end(), samples.begin(), samples.end());
}
// For last chunk(unsealed)
auto chunk = CreateTSChunkFromData(new_chunks.back());
auto newest_bucket_idx = agg.CalculateAlignedBucketLeft(chunk->GetLastTimestamp());
if (meta.latest_bucket_idx < newest_bucket_idx) {
auto samples = meta.AggregateMultiBuckets(chunk->GetSamplesSpan(), true);
agg_samples.insert(agg_samples.end(), samples.begin(), samples.end());
is_meta_updates[i] = true;
}
}
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisTimeSeries, {"upsertDownStream"});
s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
// Write downstream metadata
for (size_t i = 0; i < downstream_metas.size(); i++) {
if (!is_meta_updates[i]) {
continue;
}
const auto &meta = downstream_metas[i];
const auto &key = downstream_keys[i];
std::string bytes;
meta.Encode(&bytes);
s = batch->Put(key, bytes);
if (!s.ok()) return s;
}
// Write aggregated samples
for (size_t i = 0; i < downstream_metas.size(); i++) {
const auto &ds_key = downstream_keys[i];
auto key = downstreamKeyFromInternalKey(ds_key);
auto ns_key = AppendNamespacePrefix(key);
auto &agg_samples = all_agg_samples[i];
auto &agg_samples_inc = all_agg_samples_inc[i];
if (agg_samples.empty() && agg_samples_inc.empty()) {
continue;
}
TimeSeriesMetadata metadata;
s = getTimeSeriesMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s;
if (agg_samples.size()) {
auto sample_batch_t = SampleBatch(std::move(agg_samples), DuplicatePolicy::LAST);
s = upsertCommon(ctx, ns_key, metadata, sample_batch_t);
if (!s.ok()) return s;
}
if (agg_samples_inc.size()) {
const auto &agg = downstream_metas[i].aggregator;
DuplicatePolicy policy = DuplicatePolicy::LAST;
if (agg.type == TSAggregatorType::SUM || agg.type == TSAggregatorType::COUNT) {
policy = DuplicatePolicy::SUM;
} else if (agg.type == TSAggregatorType::MIN) {
policy = DuplicatePolicy::MIN;
} else if (agg.type == TSAggregatorType::MAX) {
policy = DuplicatePolicy::MAX;
}
auto sample_batch_t = SampleBatch(std::move(agg_samples_inc), policy);
s = upsertCommon(ctx, ns_key, metadata, sample_batch_t);
if (!s.ok()) return s;
}
}
return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}
rocksdb::Status TimeSeries::getCommon(engine::Context &ctx, const Slice &ns_key, const TimeSeriesMetadata &metadata,
bool is_return_latest, std::vector<TSSample> *res) {
// In the emun `TSSubkeyType`, `LABEL` is the next of `CHUNK`
std::string chunk_upper_bound = internalKeyFromLabelKey(ns_key, metadata, "");
std::string end_key = internalKeyFromChunkID(ns_key, metadata, TSSample::MAX_TIMESTAMP);
std::string prefix = end_key.substr(0, end_key.size() - sizeof(uint64_t));
rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
rocksdb::Slice upper_bound(chunk_upper_bound);
read_options.iterate_upper_bound = &upper_bound;
rocksdb::Slice lower_bound(prefix);
read_options.iterate_lower_bound = &lower_bound;
// Get the latest chunk
auto iter = util::UniqueIterator(ctx, read_options);
iter->SeekForPrev(end_key);
if (!iter->Valid() || !iter->key().starts_with(prefix)) {
return rocksdb::Status::OK();
}
auto chunk = CreateTSChunkFromData(iter->value());
if (is_return_latest) {
// TODO: need process `latest` option
}
res->push_back(chunk->GetLatestSample(0));
return rocksdb::Status::OK();
}
rocksdb::Status TimeSeries::delRangeCommon(engine::Context &ctx, const Slice &ns_key, TimeSeriesMetadata &metadata,
uint64_t from, uint64_t to, uint64_t *deleted, bool inclusive_to) {
auto batch = storage_->GetWriteBatchBase();
auto s = delRangeCommonInBatch(ctx, ns_key, metadata, from, to, batch, deleted, inclusive_to);
if (!s.ok()) return s;
return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}
rocksdb::Status TimeSeries::delRangeCommonInBatch(engine::Context &ctx, const Slice &ns_key,
TimeSeriesMetadata &metadata, uint64_t from, uint64_t to,
ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch,
uint64_t *deleted, bool inclusive_to) {
*deleted = 0;
if (from > to || (from == to && !inclusive_to)) {
return rocksdb::Status::OK();
}
// In the emun `TSSubkeyType`, `LABEL` is the next of `CHUNK`
std::string start_key = internalKeyFromChunkID(ns_key, metadata, from);
std::string prefix = start_key.substr(0, start_key.size() - sizeof(uint64_t));
std::string end_key;
if (to == TSSample::MAX_TIMESTAMP && inclusive_to) {
end_key = internalKeyFromLabelKey(ns_key, metadata, "");
} else if (inclusive_to) {
end_key = internalKeyFromChunkID(ns_key, metadata, to + 1);
} else {
end_key = internalKeyFromChunkID(ns_key, metadata, to);
}
uint64_t chunk_count = metadata.size;
rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
rocksdb::Slice upper_bound(end_key);
read_options.iterate_upper_bound = &upper_bound;
rocksdb::Slice lower_bound(prefix);
read_options.iterate_lower_bound = &lower_bound;
WriteBatchLogData log_data(kRedisTimeSeries);
auto s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
auto iter = util::UniqueIterator(ctx, read_options);
iter->SeekForPrev(start_key);
if (!iter->Valid()) {
iter->Seek(start_key);
} else if (!iter->key().starts_with(prefix)) {
iter->Next();
}
for (; iter->Valid() && iter->key().starts_with(prefix); iter->Next()) {
auto chunk = CreateTSChunkFromData(iter->value());
uint64_t deleted_temp = 0;
auto new_chunk_data = chunk->RemoveSamplesBetween(from, to, &deleted_temp, inclusive_to);
if (new_chunk_data.empty() || deleted_temp == 0) {
// No samples deleted
continue;
}
*deleted += deleted_temp;
auto new_chunk = CreateTSChunkFromData(new_chunk_data);
bool need_delete_old_key = false;
if (new_chunk->GetCount() == 0) {
// Delete the whole chunk
need_delete_old_key = true;
if (chunk_count > 0) chunk_count--;
} else {
auto new_key = internalKeyFromChunkID(ns_key, metadata, new_chunk->GetFirstTimestamp());
if (new_key != iter->key()) {
// Change the chunk key
need_delete_old_key = true;
}
s = batch->Put(new_key, new_chunk_data);
if (!s.ok()) return s;
}
if (need_delete_old_key) {
s = batch->Delete(iter->key());
if (!s.ok()) return s;
}
}
if (chunk_count != metadata.size) {
metadata.size = chunk_count;
std::string bytes;
metadata.Encode(&bytes);
s = batch->Put(metadata_cf_handle_, ns_key, bytes);
if (!s.ok()) return s;
}
return rocksdb::Status::OK();
}
rocksdb::Status TimeSeries::delRangeDownStream(engine::Context &ctx, const Slice &ns_key, TimeSeriesMetadata &metadata,
std::vector<std::string> &ds_keys,
std::vector<TSDownStreamMeta> &ds_metas, uint64_t from, uint64_t to) {
if (from > to || ds_keys.empty()) return rocksdb::Status::OK();
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisTimeSeries);
auto s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
// Calculate key boundaries for latest chunk retrieval
std::string chunk_upper_bound = internalKeyFromLabelKey(ns_key, metadata, "");
std::string end_key = internalKeyFromChunkID(ns_key, metadata, TSSample::MAX_TIMESTAMP);
std::string prefix = end_key.substr(0, end_key.size() - sizeof(uint64_t));
// Configure read options for reverse iteration
rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
rocksdb::Slice upper_bound(chunk_upper_bound);
read_options.iterate_upper_bound = &upper_bound;
rocksdb::Slice lower_bound(prefix);
read_options.iterate_lower_bound = &lower_bound;
// Retrieve the latest chunk for boundary calculations
auto iter = util::UniqueIterator(ctx, read_options);
iter->SeekForPrev(end_key);
// If no chunks found
uint64_t last_chunk_start = 0;
uint64_t last_chunk_end = 0;
// Check if any chunks exist for the source time series.
bool has_chunk = iter->Valid() && iter->key().starts_with(prefix);
if (has_chunk) {
auto last_chunk = CreateTSChunkFromData(iter->value());
last_chunk_start = last_chunk->GetFirstTimestamp();
last_chunk_end = last_chunk->GetLastTimestamp();
}
iter->Reset(); // Release iterator resources
// Determine global time range needed for sample retrieval
uint64_t retrieve_start_ts = TSSample::MAX_TIMESTAMP;
uint64_t retrieve_end_ts = 0;
if (has_chunk) {
for (const auto &ds_meta : ds_metas) {
retrieve_start_ts = std::min(retrieve_start_ts, ds_meta.aggregator.CalculateAlignedBucketLeft(from));
retrieve_end_ts = std::max(retrieve_end_ts, ds_meta.aggregator.CalculateAlignedBucketRight(to) - 1);
}
}
// Retrieve samples needed for downstream recalculation
std::vector<TSSample> retrieved_samples;
if (has_chunk) {
TSRangeOption range_option;
range_option.start_ts = retrieve_start_ts;
range_option.end_ts = retrieve_end_ts;
s = rangeCommon(ctx, ns_key, metadata, range_option, &retrieved_samples, true);
if (!s.ok()) return s;
}
// Process each downstream rule
for (size_t i = 0; i < ds_keys.size(); i++) {
auto &ds_meta = ds_metas[i];
auto &agg = ds_meta.aggregator;
TimeSeriesMetadata meta;
auto ds_ns_key = AppendNamespacePrefix(downstreamKeyFromInternalKey(ds_keys[i]));
s = getTimeSeriesMetadata(ctx, ds_ns_key, &meta);
if (!s.ok()) return s;
// Calculate the range of buckets affected by this deletion.
uint64_t start_bucket = agg.CalculateAlignedBucketLeft(from);
uint64_t end_bucket = agg.CalculateAlignedBucketLeft(to);
CHECK(start_bucket <= ds_meta.latest_bucket_idx);
std::vector<TSSample> new_samples; // To store re-aggregated boundary buckets.
// Recalculate the start bucket.
auto start_span = agg.GetBucketByTimestamp(retrieved_samples, start_bucket);
// If start_span is empty, the entire bucket will be deleted. Otherwise, it's re-aggregated,
// and the deletion starts from the next bucket.
uint64_t del_start = start_span.empty() ? start_bucket : start_bucket + 1;
if (!start_span.empty() && start_bucket < ds_meta.latest_bucket_idx) {
new_samples.push_back({start_bucket, agg.AggregateSamplesValue(start_span)});
}
// Recalculate the end bucket.
auto end_span = (start_bucket == end_bucket) ? start_span : agg.GetBucketByTimestamp(retrieved_samples, end_bucket);
// If end_span is empty, the bucket is included in the deletion. Otherwise, it's re-aggregated
// and excluded from deletion.
bool inclusive_end = end_span.empty();
if (!end_span.empty() && end_bucket < ds_meta.latest_bucket_idx && start_bucket != end_bucket) {
new_samples.push_back({end_bucket, agg.AggregateSamplesValue(end_span)});
}
// Update recalculated buckets
auto sample_batch = SampleBatch(std::move(new_samples), DuplicatePolicy::LAST);
s = upsertCommonInBatch(ctx, ds_ns_key, meta, sample_batch, batch);
if (!s.ok()) return s;
// Delete affected buckets in downstream
uint64_t deleted = 0;
s = delRangeCommonInBatch(ctx, ds_ns_key, meta, del_start, end_bucket, batch, &deleted, inclusive_end);
if (!s.ok()) return s;
// Update latest bucket if deletion affects the end
if (end_bucket < ds_meta.latest_bucket_idx) continue;
if (!has_chunk) {
ds_meta.latest_bucket_idx = 0;
} else if (to > last_chunk_end) {
ds_meta.latest_bucket_idx = agg.CalculateAlignedBucketLeft(last_chunk_end);
}
// Reaggregate latest bucket if needed
ds_meta.ResetAuxs();
if (has_chunk && last_chunk_start > 0) {
auto span = agg.GetBucketByTimestamp(retrieved_samples, ds_meta.latest_bucket_idx, last_chunk_start - 1);
ds_meta.AggregateLatestBucket(span);
}
// Persist downstream metadata updates if needed
std::string bytes;
ds_meta.Encode(&bytes);
batch->Put(ds_keys[i], bytes);
}
return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}
rocksdb::Status TimeSeries::createLabelIndexInBatch(const Slice &ns_key, const TimeSeriesMetadata &metadata,
ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch,
const LabelKVList &labels) {
for (auto &label : labels) {
auto internal_key = internalKeyFromLabelKey(ns_key, metadata, label.k);
auto s = batch->Put(internal_key, label.v);
if (!s.ok()) return s;
}
auto [ns, user_key] = ExtractNamespaceKey(ns_key, storage_->IsSlotIdEncoded());
// Reverse index
for (auto &label : labels) {
auto rev_index_key = TSRevLabelKey(ns, label.k, label.v, user_key).Encode();
auto s = batch->Put(index_cf_handle_, rev_index_key, Slice());
if (!s.ok()) return s;
}
return rocksdb::Status::OK();
}
rocksdb::Status TimeSeries::getLabelKVList(engine::Context &ctx, const Slice &ns_key,
const TimeSeriesMetadata &metadata, LabelKVList *labels) {
// In the emun `TSSubkeyType`, `DOWNSTREAM` is the next of `LABEL`
std::string label_upper_bound = internalKeyFromDownstreamKey(ns_key, metadata, "");
std::string prefix = internalKeyFromLabelKey(ns_key, metadata, "");
rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
rocksdb::Slice upper_bound(label_upper_bound);
read_options.iterate_upper_bound = &upper_bound;
rocksdb::Slice lower_bound(prefix);
read_options.iterate_lower_bound = &lower_bound;
auto iter = util::UniqueIterator(ctx, read_options);
labels->clear();
for (iter->Seek(lower_bound); iter->Valid(); iter->Next()) {
labels->push_back({labelKeyFromInternalKey(iter->key()), iter->value().ToString()});
}
return rocksdb::Status::OK();
}
rocksdb::Status TimeSeries::createDownStreamMetadataInBatch(engine::Context &ctx, const Slice &ns_src_key,
const Slice &dst_key,
const TimeSeriesMetadata &src_metadata,
const TSAggregator &aggregator,
ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch,
TSDownStreamMeta *ds_metadata) {
WriteBatchLogData log_data(kRedisTimeSeries, {"createDownStreamMetadata"});
auto s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
*ds_metadata = CreateDownStreamMetaFromAgg(aggregator);
std::string bytes;
ds_metadata->Encode(&bytes);
auto ikey = internalKeyFromDownstreamKey(ns_src_key, src_metadata, dst_key);
s = batch->Put(ikey, bytes);
if (!s.ok()) return s;
return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}
rocksdb::Status TimeSeries::getDownStreamRules(engine::Context &ctx, const Slice &ns_src_key,
const TimeSeriesMetadata &src_metadata, std::vector<std::string> *keys,
std::vector<TSDownStreamMeta> *metas) {
std::string prefix = internalKeyFromDownstreamKey(ns_src_key, src_metadata, "");
rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
rocksdb::Slice lower_bound(prefix);
read_options.iterate_lower_bound = &lower_bound;
auto iter = util::UniqueIterator(ctx, read_options);
keys->clear();
if (metas != nullptr) {
metas->clear();
}
for (iter->Seek(lower_bound); iter->Valid() && iter->key().starts_with(prefix); iter->Next()) {
keys->push_back(iter->key().ToString());
if (metas != nullptr) {
TSDownStreamMeta meta;
Slice slice = iter->value().ToStringView();
meta.Decode(&slice);
metas->push_back(meta);
}
}
return rocksdb::Status::OK();
}
rocksdb::Status TimeSeries::getTSKeyByFilter(engine::Context &ctx, const TSMGetOption::FilterOption &filter,
std::vector<std::string> *user_keys, std::vector<LabelKVList> *labels_vec,
std::vector<TimeSeriesMetadata> *metas) {
std::set<std::string> temp_keys;
rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
auto rev_index_upper_bound = TSRevLabelKey::UpperBound(namespace_);
for (const auto &[label_k, label_v_set] : filter.labels_equals) {
if (label_v_set.empty()) {
continue;
}
for (const auto &label_v : label_v_set) {
auto rev_label_key = TSRevLabelKey(namespace_, label_k, label_v);
auto rev_index_prefix = rev_label_key.Encode();
Slice lower_bound(rev_index_prefix);
read_options.iterate_lower_bound = &lower_bound;
Slice upper_bound(rev_index_upper_bound);
read_options.iterate_upper_bound = &upper_bound;
auto iter = util::UniqueIterator(ctx, read_options, index_cf_handle_);
for (iter->Seek(lower_bound); iter->Valid() && iter->key().starts_with(rev_index_prefix); iter->Next()) {
auto user_key = iter->key();
user_key.remove_prefix(rev_index_prefix.size());
temp_keys.emplace(user_key.data(), user_key.size());
}
}
}
// Filter
user_keys->clear();
user_keys->reserve(temp_keys.size());
if (labels_vec != nullptr) {
labels_vec->clear();
labels_vec->reserve(temp_keys.size());
}
if (metas != nullptr) {
metas->clear();
metas->reserve(temp_keys.size());
}
for (auto &user_key : temp_keys) {
std::string ns_key = AppendNamespacePrefix(user_key);
TimeSeriesMetadata metadata;
auto s = getTimeSeriesMetadata(ctx, ns_key, &metadata);
if (!s.ok()) continue;
LabelKVList labels;
getLabelKVList(ctx, ns_key, metadata, &labels);
std::unordered_map<std::string_view, std::string *> label_map;
for (auto &label : labels) {
label_map[label.k] = &label.v;
}
// Check labels_equals conditions
bool match = std::all_of(filter.labels_equals.begin(), filter.labels_equals.end(), [&label_map](const auto &kv) {
auto it = label_map.find(kv.first);
// If labels_equals value set is empty, means the label key must not exist
return (kv.second.empty() && it == label_map.end()) ||
(it != label_map.end() && kv.second.count(*(it->second)) > 0);
});
if (!match) continue;
// Check labels_not_equals conditions
match = std::all_of(filter.labels_not_equals.begin(), filter.labels_not_equals.end(), [&label_map](const auto &kv) {
auto it = label_map.find(kv.first);
const std::string &str = (it != label_map.end()) ? *(it->second) : "";
return kv.second.count(str) == 0;
});
if (!match) continue;
user_keys->push_back(user_key);
if (labels_vec != nullptr) {
labels_vec->push_back(std::move(labels));
}
if (metas != nullptr) {
metas->push_back(std::move(metadata));
}
}
return rocksdb::Status::OK();
}
std::string TimeSeries::internalKeyFromChunkID(const Slice &ns_key, const TimeSeriesMetadata &metadata,
uint64_t id) const {
std::string sub_key;
PutFixed8(&sub_key, static_cast<uint8_t>(TSSubkeyType::CHUNK));
PutFixed64(&sub_key, id);
return InternalKey(ns_key, sub_key, metadata.version, storage_->IsSlotIdEncoded()).Encode();
}
std::string TimeSeries::internalKeyFromLabelKey(const Slice &ns_key, const TimeSeriesMetadata &metadata,
Slice label_key) const {
std::string sub_key;
sub_key.resize(1 + label_key.size());
auto buf = sub_key.data();
buf = EncodeFixed8(buf, static_cast<uint8_t>(TSSubkeyType::LABEL));
EncodeBuffer(buf, label_key);
return InternalKey(ns_key, sub_key, metadata.version, storage_->IsSlotIdEncoded()).Encode();
}
std::string TimeSeries::internalKeyFromDownstreamKey(const Slice &ns_key, const TimeSeriesMetadata &metadata,
Slice downstream_key) const {
std::string sub_key;
sub_key.resize(1 + downstream_key.size());
auto buf = sub_key.data();
buf = EncodeFixed8(buf, static_cast<uint8_t>(TSSubkeyType::DOWNSTREAM));
EncodeBuffer(buf, downstream_key);
return InternalKey(ns_key, sub_key, metadata.version, storage_->IsSlotIdEncoded()).Encode();
}
uint64_t TimeSeries::chunkIDFromInternalKey(Slice internal_key) {
auto size = internal_key.size();
internal_key.remove_prefix(size - sizeof(uint64_t));
return DecodeFixed64(internal_key.data());
}
std::string TimeSeries::labelKeyFromInternalKey(Slice internal_key) const {
auto key = InternalKey(internal_key, storage_->IsSlotIdEncoded());
auto label_key = key.GetSubKey();
label_key.remove_prefix(sizeof(TSSubkeyType));
return label_key.ToString();
}
std::string TimeSeries::downstreamKeyFromInternalKey(Slice internal_key) const {
auto key = InternalKey(internal_key, storage_->IsSlotIdEncoded());
auto ds_key = key.GetSubKey();
ds_key.remove_prefix(sizeof(TSSubkeyType));
return ds_key.ToString();
}
rocksdb::Status TimeSeries::Create(engine::Context &ctx, const Slice &user_key, const TSCreateOption &option) {
std::string ns_key = AppendNamespacePrefix(user_key);
TimeSeriesMetadata metadata;
auto s = getTimeSeriesMetadata(ctx, ns_key, &metadata);
if (s.ok()) {
return rocksdb::Status::InvalidArgument("key already exists");
}
return createTimeSeries(ctx, ns_key, &metadata, &option);
}
rocksdb::Status TimeSeries::Add(engine::Context &ctx, const Slice &user_key, TSSample sample,
const TSCreateOption &option, AddResult *res, const DuplicatePolicy *on_dup_policy) {
std::string ns_key = AppendNamespacePrefix(user_key);
TimeSeriesMetadata metadata(false);
rocksdb::Status s = getOrCreateTimeSeries(ctx, ns_key, &metadata, &option);
if (!s.ok()) return s;
auto sample_batch = SampleBatch({sample}, on_dup_policy ? *on_dup_policy : metadata.duplicate_policy);
std::vector<std::string> new_chunks;
s = upsertCommon(ctx, ns_key, metadata, sample_batch, &new_chunks);
if (!s.ok()) return s;
s = upsertDownStream(ctx, ns_key, metadata, new_chunks, sample_batch);
if (!s.ok()) return s;
*res = sample_batch.GetFinalResults()[0];
return rocksdb::Status::OK();
}
rocksdb::Status TimeSeries::MAdd(engine::Context &ctx, const Slice &user_key, std::vector<TSSample> samples,
std::vector<AddResult> *res) {
std::string ns_key = AppendNamespacePrefix(user_key);
TimeSeriesMetadata metadata(false);
rocksdb::Status s = getTimeSeriesMetadata(ctx, ns_key, &metadata);
if (!s.ok()) {
return s;
}
auto sample_batch = SampleBatch(std::move(samples), metadata.duplicate_policy);
std::vector<std::string> new_chunks;
s = upsertCommon(ctx, ns_key, metadata, sample_batch, &new_chunks);
if (!s.ok()) return s;
s = upsertDownStream(ctx, ns_key, metadata, new_chunks, sample_batch);
if (!s.ok()) return s;
*res = sample_batch.GetFinalResults();
return rocksdb::Status::OK();
}
rocksdb::Status TimeSeries::Info(engine::Context &ctx, const Slice &user_key, TSInfoResult *res) {
std::string ns_key = AppendNamespacePrefix(user_key);
rocksdb::Status s = getTimeSeriesMetadata(ctx, ns_key, &res->metadata);
if (!s.ok()) {
return s;
}
auto chunk_count = res->metadata.size;
auto &metadata = res->metadata;
// Approximate total samples
res->total_samples = chunk_count * res->metadata.chunk_size;
// TODO: Estimate disk usage for the field `memoryUsage`
res->memory_usage = 0;
// Retrieve the first and last timestamp
std::string chunk_upper_bound = internalKeyFromLabelKey(ns_key, metadata, "");
std::string end_key = internalKeyFromChunkID(ns_key, metadata, TSSample::MAX_TIMESTAMP);
std::string prefix = end_key.substr(0, end_key.size() - sizeof(uint64_t));
rocksdb::ReadOptions read_options = ctx.DefaultScanOptions();
rocksdb::Slice upper_bound(chunk_upper_bound);
read_options.iterate_upper_bound = &upper_bound;
rocksdb::Slice lower_bound(prefix);
read_options.iterate_lower_bound = &lower_bound;
auto iter = util::UniqueIterator(ctx, read_options);
iter->SeekForPrev(end_key);
if (!iter->Valid() || !iter->key().starts_with(prefix)) {
// no chunk
res->first_timestamp = 0;
res->last_timestamp = 0;
} else {
auto chunk = CreateTSChunkFromData(iter->value());
res->last_timestamp = chunk->GetLastTimestamp();
// Get the first timestamp
TSRangeOption range_option;
range_option.count_limit = 1;
std::vector<TSSample> samples;
s = rangeCommon(ctx, ns_key, metadata, range_option, &samples);
if (!s.ok()) return s;
CHECK(samples.size() == 1);
res->first_timestamp = samples[0].ts;
}
getLabelKVList(ctx, ns_key, metadata, &res->labels);
// Retrieve downstream downstream_rules
std::vector<std::string> downstream_keys;
std::vector<TSDownStreamMeta> downstream_rules;
getDownStreamRules(ctx, ns_key, metadata, &downstream_keys, &downstream_rules);
for (size_t i = 0; i < downstream_keys.size(); i++) {
auto key = downstreamKeyFromInternalKey(downstream_keys[i]);
res->downstream_rules.emplace_back(std::move(key), std::move(downstream_rules[i]));
}
return rocksdb::Status::OK();
}
rocksdb::Status TimeSeries::Range(engine::Context &ctx, const Slice &user_key, const TSRangeOption &option,
std::vector<TSSample> *res) {
std::string ns_key = AppendNamespacePrefix(user_key);
TimeSeriesMetadata metadata(false);
rocksdb::Status s = getTimeSeriesMetadata(ctx, ns_key, &metadata);
if (!s.ok()) {
return s;
}
s = rangeCommon(ctx, ns_key, metadata, option, res);
return s;
}
rocksdb::Status TimeSeries::Get(engine::Context &ctx, const Slice &user_key, bool is_return_latest,
std::vector<TSSample> *res) {
res->clear();
std::string ns_key = AppendNamespacePrefix(user_key);
TimeSeriesMetadata metadata(false);
rocksdb::Status s = getTimeSeriesMetadata(ctx, ns_key, &metadata);
if (!s.ok()) {
return s;
}
s = getCommon(ctx, ns_key, metadata, is_return_latest, res);
return s;
}
rocksdb::Status TimeSeries::CreateRule(engine::Context &ctx, const Slice &src_key, const Slice &dst_key,
const TSAggregator &aggregator, TSCreateRuleResult *res) {
if (src_key == dst_key) {
*res = TSCreateRuleResult::kSrcEqDst;
return rocksdb::Status::OK();
}
std::string ns_src_key = AppendNamespacePrefix(src_key);
TimeSeriesMetadata src_metadata;
auto s = getTimeSeriesMetadata(ctx, ns_src_key, &src_metadata);
if (!s.ok()) {
*res = TSCreateRuleResult::kSrcNotExist;
return rocksdb::Status::OK();
}
TimeSeriesMetadata dst_metadata;
std::string ns_dst_key = AppendNamespacePrefix(dst_key);
s = getTimeSeriesMetadata(ctx, ns_dst_key, &dst_metadata);
if (!s.ok()) {
*res = TSCreateRuleResult::kDstNotExist;
return rocksdb::Status::OK();
}
if (src_metadata.source_key.size()) {
*res = TSCreateRuleResult::kSrcHasSourceRule;
return rocksdb::Status::OK();
}
if (dst_metadata.source_key.size()) {
*res = TSCreateRuleResult::kDstHasSourceRule;
return rocksdb::Status::OK();
}
std::vector<std::string> dst_ds_keys;
s = getDownStreamRules(ctx, ns_dst_key, dst_metadata, &dst_ds_keys);
if (!s.ok()) return s;
if (dst_ds_keys.size()) {
*res = TSCreateRuleResult::kDstHasDestRule;
return rocksdb::Status::OK();
}
// Create downstream metadata
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisTimeSeries);
s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
TSDownStreamMeta downstream_metadata;
s = createDownStreamMetadataInBatch(ctx, ns_src_key, dst_key, src_metadata, aggregator, batch, &downstream_metadata);
if (!s.ok()) return s;
dst_metadata.SetSourceKey(src_key);
std::string bytes;
dst_metadata.Encode(&bytes);
s = batch->Put(metadata_cf_handle_, ns_dst_key, bytes);
if (!s.ok()) return s;
*res = TSCreateRuleResult::kOK;
return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}
rocksdb::Status TimeSeries::MGet(engine::Context &ctx, const TSMGetOption &option, bool is_return_latest,
std::vector<TSMGetResult> *res) {
std::vector<std::string> user_keys;
std::vector<LabelKVList> labels_vec;
std::vector<TimeSeriesMetadata> metas;
auto s = getTSKeyByFilter(ctx, option.filter, &user_keys, &labels_vec, &metas);
if (!s.ok()) return s;
res->resize(user_keys.size());
for (size_t i = 0; i < user_keys.size(); i++) {
std::string ns_key = AppendNamespacePrefix(user_keys[i]);
auto &res_i = (*res)[i];
auto &metadata = metas[i];
auto &labels = labels_vec[i];
s = getCommon(ctx, ns_key, metadata, is_return_latest, &res_i.samples);
if (!s.ok()) return s;
res_i.name = std::move(user_keys[i]);
if (option.with_labels) {
res_i.labels = std::move(labels);
} else if (!option.selected_labels.empty()) {
res_i.labels = ExtractSelectedLabels(std::move(labels), option.selected_labels);
}
}
return s;
}
rocksdb::Status TimeSeries::MRange(engine::Context &ctx, const TSMRangeOption &option,
std::vector<TSMRangeResult> *res) {
std::vector<std::string> user_keys;
std::vector<LabelKVList> labels_vec;
std::vector<TimeSeriesMetadata> metas;
auto s = getTSKeyByFilter(ctx, option.filter, &user_keys, &labels_vec, &metas);
if (!s.ok()) return s;
res->clear();
res->reserve(user_keys.size());
// Group
using GroupReducerType = TSMRangeOption::GroupReducerType;
bool is_group_by = option.group_by_label.size() && option.reducer != GroupReducerType::NONE;
std::map<std::string_view, std::vector<size_t>> group_map;
if (is_group_by) {
for (size_t i = 0; i < user_keys.size(); i++) {
auto &labels = labels_vec[i];
auto it = std::lower_bound(labels.begin(), labels.end(), option.group_by_label,
[](const LabelKVPair &label, const std::string &key) { return label.k < key; });
if (it != labels.end() && it->k == option.group_by_label) {
group_map[it->v].push_back(i);
}
}
if (group_map.empty()) {
// No matched group
return rocksdb::Status::OK();
}
}
if (is_group_by) {
for (const auto &[group_value, indices] : group_map) {
TSMRangeResult group_res;
// Labels
LabelKVList group_labels = {LabelKVPair{option.group_by_label, std::string(group_value)}};
if (option.with_labels) {
group_res.labels = std::move(group_labels);
} else if (option.selected_labels.size()) {
group_res.labels = ExtractSelectedLabels(std::move(group_labels), option.selected_labels);
}
// Samples
std::vector<std::vector<TSSample>> all_samples;
all_samples.reserve(indices.size());
for (size_t i : indices) {
std::vector<TSSample> samples;
s = rangeCommon(ctx, AppendNamespacePrefix(user_keys[i]), metas[i], option, &samples);
if (!s.ok()) return s;
all_samples.push_back(std::move(samples));
}
group_res.samples = GroupSamplesAndReduce(all_samples, option.reducer);
// Sources
for (size_t i : indices) {
group_res.source_keys.push_back(std::move(user_keys[i]));
}
// Name
group_res.name = group_value;
res->push_back(std::move(group_res));
}
} else {
for (size_t i = 0; i < user_keys.size(); i++) {
TSMRangeResult group_res;
// Labels
if (option.with_labels) {
group_res.labels = std::move(labels_vec[i]);
} else if (option.selected_labels.size()) {
group_res.labels = ExtractSelectedLabels(std::move(labels_vec[i]), option.selected_labels);
}
// Samples
s = rangeCommon(ctx, AppendNamespacePrefix(user_keys[i]), metas[i], option, &group_res.samples);
if (!s.ok()) return s;
// Name
group_res.name = std::move(user_keys[i]);
res->push_back(std::move(group_res));
}
}
return rocksdb::Status::OK();
}
rocksdb::Status TimeSeries::IncrBy(engine::Context &ctx, const Slice &user_key, TSSample sample,
const TSCreateOption &option, AddResult *res) {
std::string ns_key = AppendNamespacePrefix(user_key);
TimeSeriesMetadata metadata(false);
rocksdb::Status s = getOrCreateTimeSeries(ctx, ns_key, &metadata, &option);
if (!s.ok()) return s;
std::vector<TSSample> get_samples;
s = getCommon(ctx, ns_key, metadata, true, &get_samples);
if (!s.ok()) return s;
if (get_samples.size() && sample < get_samples.back()) {
res->type = TSChunk::AddResultType::kOld;
return rocksdb::Status::OK();
}
if (get_samples.size()) {
sample.v += get_samples.back().v;
}
auto sample_batch = SampleBatch({sample}, DuplicatePolicy::LAST);
std::vector<std::string> new_chunks;
s = upsertCommon(ctx, ns_key, metadata, sample_batch, &new_chunks);
if (!s.ok()) return s;
s = upsertDownStream(ctx, ns_key, metadata, new_chunks, sample_batch);
if (!s.ok()) return s;
*res = sample_batch.GetFinalResults()[0];
return rocksdb::Status::OK();
}
rocksdb::Status TimeSeries::Del(engine::Context &ctx, const Slice &user_key, uint64_t from, uint64_t to,
uint64_t *deleted) {
std::string ns_key = AppendNamespacePrefix(user_key);
TimeSeriesMetadata metadata(false);
rocksdb::Status s = getTimeSeriesMetadata(ctx, ns_key, &metadata);
if (!s.ok()) return s;
// Get downstream rules
std::vector<std::string> ds_keys;
std::vector<TSDownStreamMeta> ds_metas;
s = getDownStreamRules(ctx, ns_key, metadata, &ds_keys, &ds_metas);
if (!s.ok()) return s;
// Check retention and compaction rules
std::vector<TSSample> get_samples;
s = getCommon(ctx, ns_key, metadata, true, &get_samples);
if (!s.ok()) return s;
if (get_samples.empty()) return rocksdb::Status::OK();
uint64_t last_ts = get_samples.back().ts;
uint64_t retention_bound =
(metadata.retention_time > 0 && metadata.retention_time < last_ts) ? last_ts - metadata.retention_time : 0;
for (const auto &ds_meta : ds_metas) {
const auto &agg = ds_meta.aggregator;
if (agg.CalculateAlignedBucketLeft(from) < retention_bound) {
return rocksdb::Status::InvalidArgument(
"When a series has compactions, deleting samples or compaction buckets beyond the series retention period is "
"not possible");
}
}
s = delRangeCommon(ctx, ns_key, metadata, from, to, deleted);
if (!s.ok()) return s;
if (*deleted == 0) return rocksdb::Status::OK();
s = delRangeDownStream(ctx, ns_key, metadata, ds_keys, ds_metas, from, to);
return s;
}
} // namespace redis