blob: 9912281f6d3a464e4847fc85391bdbdfab60ed3c [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "command_parser.h"
#include "commander.h"
#include "error_constants.h"
#include "server/server.h"
#include "types/redis_timeseries.h"
namespace {
constexpr const char *errBadRetention = "Couldn't parse RETENTION";
constexpr const char *errBadChunkSize = "invalid CHUNK_SIZE";
constexpr const char *errBadEncoding = "unknown ENCODING parameter";
constexpr const char *errDuplicatePolicy = "Unknown DUPLICATE_POLICY";
constexpr const char *errInvalidTimestamp = "invalid timestamp";
constexpr const char *errInvalidValue = "invalid value";
constexpr const char *errOldTimestamp = "Timestamp is older than retention";
constexpr const char *errDupBlock =
"Error at upsert, update is not supported when DUPLICATE_POLICY is set to BLOCK mode";
constexpr const char *errTSKeyNotFound = "the key is not a TSDB key";
constexpr const char *errTSInvalidAlign = "unknown ALIGN parameter";
constexpr const char *errTSMRangeArgsNum = "wrong number of arguments for 'ts.mrange' command";
using ChunkType = TimeSeriesMetadata::ChunkType;
using DuplicatePolicy = TimeSeriesMetadata::DuplicatePolicy;
using TSAggregatorType = redis::TSAggregatorType;
using TSCreateRuleResult = redis::TSCreateRuleResult;
using GroupReducerType = redis::TSMRangeOption::GroupReducerType;
const std::unordered_map<ChunkType, std::string_view> kChunkTypeMap = {
{ChunkType::COMPRESSED, "compressed"},
{ChunkType::UNCOMPRESSED, "uncompressed"},
};
const std::unordered_map<DuplicatePolicy, std::string_view> kDuplicatePolicyMap = {
{DuplicatePolicy::BLOCK, "block"}, {DuplicatePolicy::FIRST, "first"}, {DuplicatePolicy::LAST, "last"},
{DuplicatePolicy::MIN, "min"}, {DuplicatePolicy::MAX, "max"}, {DuplicatePolicy::SUM, "sum"},
};
const std::unordered_map<TSAggregatorType, std::string_view> kAggregatorTypeMap = {
{TSAggregatorType::AVG, "avg"}, {TSAggregatorType::SUM, "sum"}, {TSAggregatorType::MIN, "min"},
{TSAggregatorType::MAX, "max"}, {TSAggregatorType::RANGE, "range"}, {TSAggregatorType::COUNT, "count"},
{TSAggregatorType::FIRST, "first"}, {TSAggregatorType::LAST, "last"}, {TSAggregatorType::STD_P, "std.p"},
{TSAggregatorType::STD_S, "std.s"}, {TSAggregatorType::VAR_P, "var.p"}, {TSAggregatorType::VAR_S, "var.s"},
};
const std::unordered_map<GroupReducerType, std::string_view> kGroupReducerTypeMap = {
{GroupReducerType::AVG, "avg"}, {GroupReducerType::SUM, "sum"}, {GroupReducerType::MIN, "min"},
{GroupReducerType::MAX, "max"}, {GroupReducerType::RANGE, "range"}, {GroupReducerType::COUNT, "count"},
{GroupReducerType::STD_P, "std.p"}, {GroupReducerType::STD_S, "std.s"}, {GroupReducerType::VAR_P, "var.p"},
{GroupReducerType::VAR_S, "var.s"},
};
std::string FormatAddResultAsRedisReply(TSChunk::AddResult res) {
using AddResultType = TSChunk::AddResultType;
switch (res.type) {
case AddResultType::kInsert:
case AddResultType::kUpdate:
case AddResultType::kSkip:
return redis::Integer(res.sample.ts);
case AddResultType::kOld:
return redis::Error({Status::NotOK, errOldTimestamp});
case AddResultType::kBlock:
return redis::Error({Status::NotOK, errDupBlock});
default:
unreachable();
}
return "";
}
std::string FormatTSSampleAsRedisReply(TSSample sample) {
std::string res = redis::MultiLen(2);
res += redis::Integer(sample.ts);
res += redis::Double(redis::RESP::v3, sample.v);
return res;
}
std::string_view FormatChunkTypeAsRedisReply(ChunkType chunk_type) {
auto it = kChunkTypeMap.find(chunk_type);
if (it == kChunkTypeMap.end()) {
unreachable();
}
return it->second;
}
std::string_view FormatDuplicatePolicyAsRedisReply(DuplicatePolicy policy) {
auto it = kDuplicatePolicyMap.find(policy);
if (it == kDuplicatePolicyMap.end()) {
unreachable();
}
return it->second;
}
std::string_view FormatAggregatorTypeAsRedisReply(TSAggregatorType aggregator) {
auto it = kAggregatorTypeMap.find(aggregator);
if (it == kAggregatorTypeMap.end()) {
unreachable();
}
return it->second;
}
std::string_view GroupReducerTypeToString(GroupReducerType reducer) {
auto it = kGroupReducerTypeMap.find(reducer);
if (it == kGroupReducerTypeMap.end()) {
unreachable();
}
return it->second;
}
std::string GroupSourceToString(const std::vector<std::string> &sources) {
std::string res;
size_t total_size = 0;
for (auto &src : sources) {
total_size += src.size();
}
res.reserve(total_size + sources.size());
for (size_t i = 0; i < sources.size(); i++) {
res += sources[i];
if (i != sources.size() - 1) {
res += ',';
}
}
return res;
}
std::string FormatCreateRuleResAsRedisReply(TSCreateRuleResult res) {
switch (res) {
case TSCreateRuleResult::kOK:
return redis::RESP_OK;
case TSCreateRuleResult::kSrcNotExist:
case TSCreateRuleResult::kDstNotExist:
return redis::Error({Status::NotOK, errTSKeyNotFound});
case TSCreateRuleResult::kSrcEqDst:
return redis::Error({Status::NotOK, "the source key and destination key should be different"});
case TSCreateRuleResult::kSrcHasSourceRule:
return redis::Error({Status::NotOK, "the source key already has a source rule"});
case TSCreateRuleResult::kDstHasSourceRule:
return redis::Error({Status::NotOK, "the destination key already has a src rule"});
case TSCreateRuleResult::kDstHasDestRule:
return redis::Error({Status::NotOK, "the destination key already has a dst rule"});
default:
unreachable();
}
return "";
}
std::string FormatTSLabelListAsRedisReply(const redis::LabelKVList &labels) {
std::vector<std::string> labels_str;
labels_str.reserve(labels.size());
for (const auto &label : labels) {
auto str = redis::Array(
{redis::BulkString(label.k), label.v.size() ? redis::BulkString(label.v) : redis::NilString(redis::RESP::v3)});
labels_str.push_back(str);
}
return redis::Array(labels_str);
}
} // namespace
namespace redis {
class KeywordCommandBase : public Commander {
public:
KeywordCommandBase() = default;
Status Parse(const std::vector<std::string> &args) override {
TSOptionsParser parser(std::next(args.begin(), static_cast<std::ptrdiff_t>(skip_num_)),
std::prev(args.end(), static_cast<std::ptrdiff_t>(tail_skip_num_)));
while (parser.Good()) {
auto &value = parser.RawTake();
auto value_upper = util::ToUpper(value);
if (containsKeyword(value_upper, true)) {
Status s = handlers_[value_upper](parser);
if (!s.IsOK()) return s;
if (required_keywords_.count(value_upper)) {
required_keywords_.erase(value_upper);
}
}
}
if (!required_keywords_.empty()) {
return {Status::InvalidArgument, required_keywords_.begin()->second};
}
return Commander::Parse(args);
}
protected:
using TSOptionsParser = CommandParser<CommandTokens::const_iterator>;
template <typename Handler>
void registerHandler(const std::string &keyword, Handler &&handler) {
handlers_.emplace(util::ToUpper(keyword), std::forward<Handler>(handler));
}
template <typename Handler>
void registerHandlerRequired(const std::string &keyword, Handler &&handler, std::string_view err_msg) {
auto it = handlers_.emplace(util::ToUpper(keyword), std::forward<Handler>(handler)).first;
required_keywords_.emplace(it->first, err_msg);
}
virtual void registerDefaultHandlers() = 0;
void setSkipNum(size_t num) { skip_num_ = num; }
void setTailSkipNum(size_t num) { tail_skip_num_ = num; }
bool containsKeyword(const std::string &keyword, bool is_upper = false) const {
if (is_upper) {
return handlers_.count(keyword);
} else {
return handlers_.count(util::ToUpper(keyword));
}
}
private:
size_t skip_num_ = 0;
size_t tail_skip_num_ = 0;
std::unordered_map<std::string_view, std::string> required_keywords_;
std::unordered_map<std::string, std::function<Status(TSOptionsParser &)>> handlers_;
};
class CommandTSCreateBase : public KeywordCommandBase {
public:
Status Execute([[maybe_unused]] engine::Context &ctx, Server *srv, [[maybe_unused]] Connection *conn,
[[maybe_unused]] std::string *output) override {
if (srv->GetConfig()->cluster_enabled && getCreateOption().labels.size()) {
return {Status::RedisExecErr, "Specifying LABELS is not supported in cluster mode"};
}
return Status::OK();
}
protected:
const TSCreateOption &getCreateOption() const { return create_option_; }
void registerDefaultHandlers() override {
registerHandler("RETENTION",
[this](TSOptionsParser &parser) { return handleRetention(parser, create_option_.retention_time); });
registerHandler("CHUNK_SIZE",
[this](TSOptionsParser &parser) { return handleChunkSize(parser, create_option_.chunk_size); });
registerHandler("ENCODING",
[this](TSOptionsParser &parser) { return handleEncoding(parser, create_option_.chunk_type); });
registerHandler("DUPLICATE_POLICY", [this](TSOptionsParser &parser) {
return handleDuplicatePolicy(parser, create_option_.duplicate_policy);
});
registerHandler("LABELS", [this](TSOptionsParser &parser) { return handleLabels(parser, create_option_.labels); });
}
static Status handleRetention(TSOptionsParser &parser, uint64_t &retention_time) {
auto parse_retention = parser.TakeInt<uint64_t>();
if (!parse_retention.IsOK()) {
return {Status::RedisParseErr, errBadRetention};
}
retention_time = parse_retention.GetValue();
return Status::OK();
}
static Status handleChunkSize(TSOptionsParser &parser, uint64_t &chunk_size) {
auto parse_chunk_size = parser.TakeInt<uint64_t>();
if (!parse_chunk_size.IsOK()) {
return {Status::RedisParseErr, errBadChunkSize};
}
chunk_size = parse_chunk_size.GetValue();
return Status::OK();
}
static Status handleEncoding(TSOptionsParser &parser, ChunkType &chunk_type) {
if (parser.EatEqICase("UNCOMPRESSED")) {
chunk_type = ChunkType::UNCOMPRESSED;
} else if (parser.EatEqICase("COMPRESSED")) {
chunk_type = ChunkType::COMPRESSED;
} else {
return {Status::RedisParseErr, errBadEncoding};
}
return Status::OK();
}
static Status handleDuplicatePolicy(TSOptionsParser &parser, DuplicatePolicy &duplicate_policy) {
if (parser.EatEqICase("BLOCK")) {
duplicate_policy = DuplicatePolicy::BLOCK;
} else if (parser.EatEqICase("FIRST")) {
duplicate_policy = DuplicatePolicy::FIRST;
} else if (parser.EatEqICase("LAST")) {
duplicate_policy = DuplicatePolicy::LAST;
} else if (parser.EatEqICase("MAX")) {
duplicate_policy = DuplicatePolicy::MAX;
} else if (parser.EatEqICase("MIN")) {
duplicate_policy = DuplicatePolicy::MIN;
} else if (parser.EatEqICase("SUM")) {
duplicate_policy = DuplicatePolicy::SUM;
} else {
return {Status::RedisParseErr, errDuplicatePolicy};
}
return Status::OK();
}
static Status handleLabels(TSOptionsParser &parser, LabelKVList &labels) {
while (parser.Good()) {
auto parse_key = parser.TakeStr();
auto parse_value = parser.TakeStr();
if (!parse_key.IsOK() || !parse_value.IsOK()) {
break;
}
labels.push_back({parse_key.GetValue(), parse_value.GetValue()});
}
return Status::OK();
}
private:
TSCreateOption create_option_;
};
class CommandTSCreate : public CommandTSCreateBase {
public:
CommandTSCreate() { registerDefaultHandlers(); }
Status Parse(const std::vector<std::string> &args) override {
if (args.size() < 2) {
return {Status::RedisParseErr, errWrongNumOfArguments};
}
CommandTSCreateBase::setSkipNum(2);
return CommandTSCreateBase::Parse(args);
}
Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
auto sc = CommandTSCreateBase::Execute(ctx, srv, conn, output);
if (!sc.IsOK()) return sc;
auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
auto s = timeseries_db.Create(ctx, args_[1], getCreateOption());
if (!s.ok() && s.IsInvalidArgument()) return {Status::RedisExecErr, errKeyAlreadyExists};
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
*output = redis::RESP_OK;
return Status::OK();
}
};
class CommandTSInfo : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override { return Commander::Parse(args); }
Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
TSInfoResult info;
auto s = timeseries_db.Info(ctx, args_[1], &info);
if (s.IsNotFound()) return {Status::RedisExecErr, errTSKeyNotFound};
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
*output = redis::MultiLen(24);
*output += redis::SimpleString("totalSamples");
*output += redis::Integer(info.total_samples);
*output += redis::SimpleString("memoryUsage");
*output += redis::Integer(info.memory_usage);
*output += redis::SimpleString("firstTimestamp");
*output += redis::Integer(info.first_timestamp);
*output += redis::SimpleString("lastTimestamp");
*output += redis::Integer(info.last_timestamp);
*output += redis::SimpleString("retentionTime");
*output += redis::Integer(info.metadata.retention_time);
*output += redis::SimpleString("chunkCount");
*output += redis::Integer(info.metadata.size);
*output += redis::SimpleString("chunkSize");
*output += redis::Integer(info.metadata.chunk_size);
*output += redis::SimpleString("chunkType");
*output += redis::SimpleString(FormatChunkTypeAsRedisReply(info.metadata.chunk_type));
*output += redis::SimpleString("duplicatePolicy");
*output += redis::SimpleString(FormatDuplicatePolicyAsRedisReply(info.metadata.duplicate_policy));
*output += redis::SimpleString("labels");
*output += FormatTSLabelListAsRedisReply(info.labels);
*output += redis::SimpleString("sourceKey");
*output += info.metadata.source_key.empty() ? redis::NilString(redis::RESP::v3)
: redis::BulkString(info.metadata.source_key);
*output += redis::SimpleString("rules");
std::vector<std::string> rules_str;
rules_str.reserve(info.downstream_rules.size());
for (const auto &[key, rule] : info.downstream_rules) {
const auto &aggregator = rule.aggregator;
auto str = redis::Array({redis::BulkString(key), redis::Integer(aggregator.bucket_duration),
redis::SimpleString(FormatAggregatorTypeAsRedisReply(aggregator.type)),
redis::Integer(aggregator.alignment)});
rules_str.push_back(str);
}
*output += redis::Array(rules_str);
return Status::OK();
}
};
class CommandTSAdd : public CommandTSCreateBase {
public:
CommandTSAdd() { registerDefaultHandlers(); }
Status Parse(const std::vector<std::string> &args) override {
if (args.size() < 4) {
return {Status::RedisParseErr, errWrongNumOfArguments};
}
CommandParser parser(args, 2);
auto ts_parse = parser.TakeInt<uint64_t>();
if (!ts_parse.IsOK()) {
auto ts_str = parser.TakeStr();
if (!ts_str.IsOK() || ts_str.GetValue() != "*") {
return {Status::RedisParseErr, errInvalidTimestamp};
}
ts_ = util::GetTimeStampMS();
} else {
ts_ = ts_parse.GetValue();
}
auto value_parse = parser.TakeFloat<double>();
if (!value_parse.IsOK()) {
return {Status::RedisParseErr, errInvalidValue};
}
value_ = value_parse.GetValue();
CommandTSCreateBase::setSkipNum(4);
return CommandTSCreateBase::Parse(args);
}
Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
auto sc = CommandTSCreateBase::Execute(ctx, srv, conn, output);
if (!sc.IsOK()) return sc;
auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
const auto &option = getCreateOption();
TSChunk::AddResult res;
auto s = timeseries_db.Add(ctx, args_[1], {ts_, value_}, option, &res,
is_on_dup_policy_set_ ? &on_dup_policy_ : nullptr);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
*output += FormatAddResultAsRedisReply(res);
return Status::OK();
}
protected:
void registerDefaultHandlers() override {
CommandTSCreateBase::registerDefaultHandlers();
registerHandler("ON_DUPLICATE", [this](TSOptionsParser &parser) { return handleOnDuplicatePolicy(parser); });
}
private:
DuplicatePolicy on_dup_policy_ = DuplicatePolicy::BLOCK;
bool is_on_dup_policy_set_ = false;
uint64_t ts_ = 0;
double value_ = 0;
Status handleOnDuplicatePolicy(TSOptionsParser &parser) {
if (parser.EatEqICase("BLOCK")) {
on_dup_policy_ = DuplicatePolicy::BLOCK;
} else if (parser.EatEqICase("FIRST")) {
on_dup_policy_ = DuplicatePolicy::FIRST;
} else if (parser.EatEqICase("LAST")) {
on_dup_policy_ = DuplicatePolicy::LAST;
} else if (parser.EatEqICase("MAX")) {
on_dup_policy_ = DuplicatePolicy::MAX;
} else if (parser.EatEqICase("MIN")) {
on_dup_policy_ = DuplicatePolicy::MIN;
} else if (parser.EatEqICase("SUM")) {
on_dup_policy_ = DuplicatePolicy::SUM;
} else {
return {Status::RedisParseErr, errDuplicatePolicy};
}
is_on_dup_policy_set_ = true;
return Status::OK();
}
};
class CommandTSMAdd : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
if (args.size() < 4 || (args.size() - 1) % 3 != 0) {
return {Status::RedisParseErr, errWrongNumOfArguments};
}
CommandParser parser(args, 1);
for (size_t i = 1; i < args.size(); i += 3) {
const auto &user_key = args[i];
parser.Skip(1);
auto ts_parse = parser.TakeInt<uint64_t>();
if (!ts_parse.IsOK()) {
return {Status::RedisParseErr, errInvalidTimestamp};
}
auto value_parse = parser.TakeFloat<double>();
if (!value_parse.IsOK()) {
return Status{Status::RedisParseErr, errInvalidValue};
}
userkey_samples_map_[user_key].push_back({ts_parse.GetValue(), value_parse.GetValue()});
userkey_indexes_map_[user_key].push_back(i / 3);
samples_count_ += 1;
}
return Commander::Parse(args);
}
Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
auto replies = std::vector<std::string>(samples_count_);
for (auto &[user_key, samples] : userkey_samples_map_) {
std::vector<TSChunk::AddResult> res;
auto count = samples.size();
auto s = timeseries_db.MAdd(ctx, user_key, std::move(samples), &res);
std::string err_reply;
if (!s.ok()) {
err_reply = s.IsNotFound() ? redis::Error({Status::NotOK, errTSKeyNotFound})
: redis::Error({Status::NotOK, s.ToString()});
}
for (size_t i = 0; i < count; i++) {
size_t idx = userkey_indexes_map_[user_key][i];
replies[idx] = s.ok() ? FormatAddResultAsRedisReply(res[i]) : err_reply;
}
}
*output = redis::MultiLen(samples_count_);
for (auto &reply : replies) {
if (reply.empty()) continue;
*output += reply;
}
return Status::OK();
}
private:
size_t samples_count_ = 0;
std::unordered_map<std::string_view, std::vector<TSSample>> userkey_samples_map_;
std::unordered_map<std::string_view, std::vector<size_t>> userkey_indexes_map_;
};
class CommandTSAggregatorBase : public KeywordCommandBase {
protected:
TSAggregator &getAggregator() { return aggregator_; }
void registerDefaultHandlers() override {
registerHandler("AGGREGATION", [this](TSOptionsParser &parser) { return handleAggregation(parser, aggregator_); });
registerHandler("ALIGN", [this](TSOptionsParser &parser) { return handleAlignCommon(parser, aggregator_); });
}
static Status handleAggregation(TSOptionsParser &parser, TSAggregator &aggregator) {
auto &type = aggregator.type;
if (parser.EatEqICase("AVG")) {
type = TSAggregatorType::AVG;
} else if (parser.EatEqICase("SUM")) {
type = TSAggregatorType::SUM;
} else if (parser.EatEqICase("MIN")) {
type = TSAggregatorType::MIN;
} else if (parser.EatEqICase("MAX")) {
type = TSAggregatorType::MAX;
} else if (parser.EatEqICase("RANGE")) {
type = TSAggregatorType::RANGE;
} else if (parser.EatEqICase("COUNT")) {
type = TSAggregatorType::COUNT;
} else if (parser.EatEqICase("FIRST")) {
type = TSAggregatorType::FIRST;
} else if (parser.EatEqICase("LAST")) {
type = TSAggregatorType::LAST;
} else if (parser.EatEqICase("STD.P")) {
type = TSAggregatorType::STD_P;
} else if (parser.EatEqICase("STD.S")) {
type = TSAggregatorType::STD_S;
} else if (parser.EatEqICase("VAR.P")) {
type = TSAggregatorType::VAR_P;
} else if (parser.EatEqICase("VAR.S")) {
type = TSAggregatorType::VAR_S;
} else {
return {Status::RedisParseErr, "Invalid aggregator type"};
}
auto duration = parser.TakeInt<uint64_t>();
if (!duration.IsOK()) {
return {Status::RedisParseErr, "Couldn't parse AGGREGATION"};
}
aggregator.bucket_duration = duration.GetValue();
if (aggregator.bucket_duration == 0) {
return {Status::RedisParseErr, "bucketDuration must be greater than zero"};
}
return Status::OK();
}
static Status handleAlignCommon(TSOptionsParser &parser, TSAggregator &aggregator) {
auto align = parser.TakeInt<uint64_t>();
if (!align.IsOK()) {
return {Status::RedisParseErr, errTSInvalidAlign};
}
aggregator.alignment = align.GetValue();
return Status::OK();
}
static Status handleLatest([[maybe_unused]] TSOptionsParser &parser, bool &is_return_latest) {
is_return_latest = true;
return Status::OK();
}
private:
TSAggregator aggregator_;
};
class CommandTSRangeBase : virtual public CommandTSAggregatorBase {
public:
explicit CommandTSRangeBase(size_t skip_num) : skip_num_(skip_num) {}
Status Parse(const std::vector<std::string> &args) override {
TSOptionsParser parser(std::next(args.begin(), static_cast<std::ptrdiff_t>(skip_num_)), args.end());
// Parse start timestamp
auto start_ts = parser.TakeInt<uint64_t>();
if (!start_ts.IsOK()) {
auto start_ts_str = parser.TakeStr();
if (!start_ts_str.IsOK() || start_ts_str.GetValue() != "-") {
return {Status::RedisParseErr, "wrong fromTimestamp"};
}
// "-" means use default start timestamp: 0
} else {
is_start_explicit_set_ = true;
option_.start_ts = start_ts.GetValue();
}
// Parse end timestamp
auto end_ts = parser.TakeInt<uint64_t>();
if (!end_ts.IsOK()) {
auto end_ts_str = parser.TakeStr();
if (!end_ts_str.IsOK() || end_ts_str.GetValue() != "+") {
return {Status::RedisParseErr, "wrong toTimestamp"};
}
// "+" means use default end timestamp: MAX_TIMESTAMP
} else {
is_end_explicit_set_ = true;
option_.end_ts = end_ts.GetValue();
}
KeywordCommandBase::setSkipNum(skip_num_ + 2);
auto s = KeywordCommandBase::Parse(args);
if (!s.IsOK()) return s;
if (is_alignment_explicit_set_ && option_.aggregator.type == TSAggregatorType::NONE) {
return {Status::RedisParseErr, "ALIGN parameter can only be used with AGGREGATION"};
}
return s;
}
protected:
const TSRangeOption &getRangeOption() const { return option_; }
void registerDefaultHandlers() override {
registerHandler("LATEST",
[this](TSOptionsParser &parser) { return handleLatest(parser, option_.is_return_latest); });
registerHandler("FILTER_BY_TS",
[this](TSOptionsParser &parser) { return handleFilterByTS(parser, option_.filter_by_ts); });
registerHandler("FILTER_BY_VALUE",
[this](TSOptionsParser &parser) { return handleFilterByValue(parser, option_.filter_by_value); });
registerHandler("COUNT", [this](TSOptionsParser &parser) { return handleCount(parser, option_.count_limit); });
registerHandler("ALIGN", [this](TSOptionsParser &parser) { return handleAlign(parser); });
registerHandler("AGGREGATION",
[this](TSOptionsParser &parser) { return handleAggregation(parser, option_.aggregator); });
registerHandler("BUCKETTIMESTAMP",
[this](TSOptionsParser &parser) { return handleBucketTimestamp(parser, option_); });
registerHandler("EMPTY", [this](TSOptionsParser &parser) { return handleEmpty(parser, option_); });
}
static Status handleFilterByTS(TSOptionsParser &parser, std::set<uint64_t> &filter_by_ts) {
filter_by_ts.clear();
while (parser.Good()) {
auto ts = parser.TakeInt<uint64_t>();
if (!ts.IsOK()) break;
filter_by_ts.insert(ts.GetValue());
}
return Status::OK();
}
static Status handleFilterByValue(TSOptionsParser &parser,
std::optional<std::pair<double, double>> &filter_by_value) {
auto min = parser.TakeFloat<double>();
auto max = parser.TakeFloat<double>();
if (!min.IsOK() || !max.IsOK()) {
return {Status::RedisParseErr, "Invalid min or max value"};
}
filter_by_value = std::make_optional(std::make_pair(min.GetValue(), max.GetValue()));
return Status::OK();
}
static Status handleCount(TSOptionsParser &parser, uint64_t &count_limit) {
auto count = parser.TakeInt<uint64_t>();
if (!count.IsOK()) {
return {Status::RedisParseErr, "Couldn't parse COUNT"};
}
count_limit = count.GetValue();
if (count_limit == 0) {
return {Status::RedisParseErr, "Invalid COUNT value"};
}
return Status::OK();
}
static Status handleBucketTimestamp(TSOptionsParser &parser, TSRangeOption &option) {
if (option.aggregator.type == TSAggregatorType::NONE) {
return {Status::RedisParseErr, "BUCKETTIMESTAMP flag should be the 3rd or 4th flag after AGGREGATION flag"};
}
using BucketTimestampType = TSRangeOption::BucketTimestampType;
if (parser.EatEqICase("START")) {
option.bucket_timestamp_type = BucketTimestampType::Start;
} else if (parser.EatEqICase("END")) {
option.bucket_timestamp_type = BucketTimestampType::End;
} else if (parser.EatEqICase("MID")) {
option.bucket_timestamp_type = BucketTimestampType::Mid;
} else {
return {Status::RedisParseErr, "unknown BUCKETTIMESTAMP parameter"};
}
return Status::OK();
}
static Status handleEmpty([[maybe_unused]] TSOptionsParser &parser, TSRangeOption &option) {
if (option.aggregator.type == TSAggregatorType::NONE) {
return {Status::RedisParseErr, "EMPTY flag should be the 3rd or 5th flag after AGGREGATION flag"};
}
option.is_return_empty = true;
return Status::OK();
}
private:
TSRangeOption option_;
size_t skip_num_;
bool is_start_explicit_set_ = false;
bool is_end_explicit_set_ = false;
bool is_alignment_explicit_set_ = false;
Status handleAlign(TSOptionsParser &parser) {
auto s = handleAlignCommon(parser, option_.aggregator);
if (s.IsOK()) {
is_alignment_explicit_set_ = true;
return Status::OK();
}
auto align_str = parser.TakeStr();
if (!align_str.IsOK()) {
return {Status::RedisParseErr, errTSInvalidAlign};
}
const auto &value = align_str.GetValue();
if (value == "-" || value == "+") {
bool is_explicit_set = value == "-" ? is_start_explicit_set_ : is_end_explicit_set_;
auto err_msg = value == "-" ? "start alignment can only be used with explicit start timestamp"
: "end alignment can only be used with explicit end timestamp";
if (!is_explicit_set) {
return {Status::RedisParseErr, err_msg};
}
option_.aggregator.alignment = value == "-" ? option_.start_ts : option_.end_ts;
} else {
return {Status::RedisParseErr, errTSInvalidAlign};
}
is_alignment_explicit_set_ = true;
return Status::OK();
}
};
class CommandTSRange : public CommandTSRangeBase {
public:
CommandTSRange() : CommandTSRangeBase(2) { registerDefaultHandlers(); }
Status Parse(const std::vector<std::string> &args) override {
if (args.size() < 4) {
return {Status::RedisParseErr, "wrong number of arguments for 'ts.range' command"};
}
return CommandTSRangeBase::Parse(args);
}
Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
std::vector<TSSample> res;
auto s = timeseries_db.Range(ctx, args_[1], getRangeOption(), &res);
if (!s.ok()) return {Status::RedisExecErr, errKeyNotFound};
std::vector<std::string> reply;
reply.reserve(res.size());
for (auto &sample : res) {
reply.push_back(FormatTSSampleAsRedisReply(sample));
}
*output = redis::Array(reply);
return Status::OK();
}
};
class CommandTSCreateRule : public CommandTSAggregatorBase {
public:
explicit CommandTSCreateRule() { registerDefaultHandlers(); }
Status Parse(const std::vector<std::string> &args) override {
if (args.size() > 7) {
return {Status::NotOK, "wrong number of arguments for 'TS.CREATERULE' command"};
}
src_key_ = args[1];
dst_key_ = args[2];
CommandTSAggregatorBase::setSkipNum(3);
return CommandTSAggregatorBase::Parse(args);
}
Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
auto res = TSCreateRuleResult::kOK;
auto s = timeseries_db.CreateRule(ctx, src_key_, dst_key_, getAggregator(), &res);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
*output = FormatCreateRuleResAsRedisReply(res);
return Status::OK();
}
protected:
void registerDefaultHandlers() override {
registerHandlerRequired(
"AGGREGATION",
[this](TSOptionsParser &parser) -> Status {
auto s = handleAggregation(parser, getAggregator());
if (!s.IsOK()) return s;
if (parser.Good()) {
auto align_parse = parser.TakeInt<uint64_t>();
if (align_parse.IsOK()) {
getAggregator().alignment = align_parse.GetValue();
} else {
return {Status::RedisParseErr, errTSInvalidAlign};
}
}
return Status::OK();
},
"AGGREGATION is required");
}
private:
std::string src_key_;
std::string dst_key_;
};
class CommandTSGet : public CommandTSAggregatorBase {
public:
CommandTSGet() { registerDefaultHandlers(); }
Status Parse(const std::vector<std::string> &args) override {
if (args.size() < 2) {
return {Status::RedisParseErr, "wrong number of arguments for 'ts.get' command"};
}
CommandTSAggregatorBase::setSkipNum(2);
return CommandTSAggregatorBase::Parse(args);
}
Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
std::vector<TSSample> res;
auto s = timeseries_db.Get(ctx, args_[1], is_return_latest_, &res);
if (!s.ok()) return {Status::RedisExecErr, errKeyNotFound};
std::vector<std::string> reply;
reply.reserve(res.size());
for (auto &sample : res) {
reply.push_back(FormatTSSampleAsRedisReply(sample));
}
*output = redis::Array(reply);
return Status::OK();
}
protected:
void registerDefaultHandlers() override {
registerHandler("LATEST", [this](TSOptionsParser &parser) { return handleLatest(parser, is_return_latest_); });
}
private:
bool is_return_latest_ = false;
};
class CommandTSMGetBase : virtual public CommandTSAggregatorBase {
protected:
const TSMGetOption &getMGetOption() const { return option_; }
void registerDefaultHandlers() override {
registerHandler("WITHLABELS",
[this](TSOptionsParser &parser) { return handleWithLabels(parser, option_.with_labels); });
registerHandler("SELECTED_LABELS",
[this](TSOptionsParser &parser) { return handleSelectedLabels(parser, option_.selected_labels); });
registerHandlerRequired(
"FILTER", [this](TSOptionsParser &parser) { return handleFilterExpr(parser, option_.filter); },
"missing FILTER argument");
}
static Status handleWithLabels([[maybe_unused]] TSOptionsParser &parser, bool &with_labels) {
with_labels = true;
return Status::OK();
}
Status handleSelectedLabels(TSOptionsParser &parser, std::set<std::string> &selected_labels) {
while (parser.Good()) {
auto &value = parser.RawPeek();
if (containsKeyword(value)) {
break;
}
selected_labels.emplace(parser.TakeStr().GetValue());
}
return Status::OK();
}
Status handleFilterExpr(TSOptionsParser &parser, TSMGetOption::FilterOption &filter_option) {
auto filter_parser = TSMQueryFilterParser(filter_option);
while (parser.Good()) {
auto &value = parser.RawPeek();
if (containsKeyword(value)) {
break;
}
auto s = filter_parser.Parse(parser.TakeStr().GetValue());
if (!s.IsOK()) return s;
}
return filter_parser.Check();
}
private:
TSMGetOption option_;
};
class CommandTSMGet : public CommandTSMGetBase {
public:
CommandTSMGet() { registerDefaultHandlers(); }
Status Parse(const std::vector<std::string> &args) override {
if (args.size() < 3) {
return {Status::RedisParseErr, "wrong number of arguments for 'ts.mget' command"};
}
CommandTSMGetBase::setSkipNum(1);
return CommandTSMGetBase::Parse(args);
}
Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
if (srv->GetConfig()->cluster_enabled) {
return {Status::RedisExecErr, "TS.MGet is not supported in cluster mode"};
}
auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
std::vector<TSMGetResult> results;
auto s = timeseries_db.MGet(ctx, getMGetOption(), is_return_latest_, &results);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
std::vector<std::string> reply;
reply.reserve(results.size());
for (auto &result : results) {
std::vector<std::string> entry(3);
entry[0] = redis::BulkString(result.name);
entry[1] = FormatTSLabelListAsRedisReply(result.labels);
std::vector<std::string> temp;
for (auto &sample : result.samples) {
temp.push_back(FormatTSSampleAsRedisReply(sample));
}
entry[2] = redis::Array(temp);
reply.push_back(redis::Array(entry));
}
*output = redis::Array(reply);
return Status::OK();
}
protected:
void registerDefaultHandlers() override {
CommandTSMGetBase::registerDefaultHandlers();
registerHandler("LATEST", [this](TSOptionsParser &parser) { return handleLatest(parser, is_return_latest_); });
}
private:
bool is_return_latest_ = false;
};
class CommandTSMRange : public CommandTSRangeBase, public CommandTSMGetBase {
public:
CommandTSMRange() : CommandTSRangeBase(1) { registerDefaultHandlers(); }
Status Parse(const std::vector<std::string> &args) override {
if (args.size() < 5) {
return {Status::RedisParseErr, errTSMRangeArgsNum};
}
auto s = CommandTSRangeBase::Parse(args);
if (!s.IsOK()) return s;
// Combine MGet and Range options
static_cast<TSRangeOption &>(option_) = getRangeOption();
static_cast<TSMGetOption &>(option_) = getMGetOption();
return Status::OK();
}
Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
if (srv->GetConfig()->cluster_enabled) {
return {Status::RedisExecErr, "TS.MRANGE is not supported in cluster mode"};
}
auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
std::vector<TSMRangeResult> results;
auto s = timeseries_db.MRange(ctx, option_, &results);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
std::vector<std::string> reply;
reply.reserve(results.size());
for (auto &result : results) {
std::vector<std::string> entry(3);
entry[0] =
redis::BulkString(option_.group_by_label.empty() ? result.name : option_.group_by_label + "=" + result.name);
if (option_.group_by_label.size() && option_.with_labels) {
result.labels.reserve(result.labels.size() + 2);
result.labels.push_back(LabelKVPair{"__reducer__", std::string(GroupReducerTypeToString(option_.reducer))});
result.labels.push_back(LabelKVPair{"__source__", GroupSourceToString(result.source_keys)});
}
entry[1] = FormatTSLabelListAsRedisReply(result.labels);
std::vector<std::string> temp;
for (auto &sample : result.samples) {
temp.push_back(FormatTSSampleAsRedisReply(sample));
}
entry[2] = redis::Array(temp);
reply.push_back(redis::Array(entry));
}
*output = redis::Array(reply);
return Status::OK();
}
protected:
void registerDefaultHandlers() override {
CommandTSMGetBase::registerDefaultHandlers();
CommandTSRangeBase::registerDefaultHandlers();
registerHandler("GROUPBY", [this](TSOptionsParser &parser) { return handleGroupBy(parser, option_); });
}
static Status handleGroupBy(TSOptionsParser &parser, TSMRangeOption &option) {
auto group_value_parse = parser.TakeStr();
if (group_value_parse.IsOK()) {
option.group_by_label = std::move(group_value_parse.GetValue());
} else {
return {Status::RedisParseErr, errTSMRangeArgsNum};
}
auto reduce_keyword_parse = parser.TakeStr();
if (!reduce_keyword_parse.IsOK() || reduce_keyword_parse.GetValue() != "REDUCE") {
return {Status::RedisParseErr, errTSMRangeArgsNum};
}
auto &type = option.reducer;
using GroupReducerType = TSMRangeOption::GroupReducerType;
if (parser.EatEqICase("AVG")) {
type = GroupReducerType::AVG;
} else if (parser.EatEqICase("SUM")) {
type = GroupReducerType::SUM;
} else if (parser.EatEqICase("MIN")) {
type = GroupReducerType::MIN;
} else if (parser.EatEqICase("MAX")) {
type = GroupReducerType::MAX;
} else if (parser.EatEqICase("RANGE")) {
type = GroupReducerType::RANGE;
} else if (parser.EatEqICase("COUNT")) {
type = GroupReducerType::COUNT;
} else if (parser.EatEqICase("STD.P")) {
type = GroupReducerType::STD_P;
} else if (parser.EatEqICase("STD.S")) {
type = GroupReducerType::STD_S;
} else if (parser.EatEqICase("VAR.P")) {
type = GroupReducerType::VAR_P;
} else if (parser.EatEqICase("VAR.S")) {
type = GroupReducerType::VAR_S;
} else if (parser.Good()) {
return {Status::RedisParseErr, "Invalid reducer type"};
} else {
return {Status::RedisParseErr, errTSMRangeArgsNum};
}
return Status::OK();
}
private:
TSMRangeOption option_;
};
class CommandTSIncrByDecrBy : public CommandTSCreateBase {
public:
CommandTSIncrByDecrBy() { registerDefaultHandlers(); }
Status Parse(const std::vector<std::string> &args) override {
CommandParser parser(args, 2);
auto value_parse = parser.TakeFloat<double>();
if (!value_parse.IsOK()) {
return {Status::RedisParseErr, errInvalidValue};
}
value_ = value_parse.GetValue();
if (util::ToUpper(args[0]) == "TS.DECRBY") {
value_ = -value_;
}
CommandTSCreateBase::setSkipNum(3);
return CommandTSCreateBase::Parse(args);
}
Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
auto sc = CommandTSCreateBase::Execute(ctx, srv, conn, output);
if (!sc.IsOK()) return sc;
auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
const auto &option = getCreateOption();
if (!is_ts_set_) {
ts_ = util::GetTimeStampMS();
}
TSChunk::AddResult res;
auto s = timeseries_db.IncrBy(ctx, args_[1], {ts_, value_}, option, &res);
if (!s.ok()) return {Status::RedisExecErr, s.ToString()};
if (res.type == TSChunk::AddResultType::kOld) {
*output +=
redis::Error({Status::NotOK, "timestamp must be equal to or higher than the maximum existing timestamp"});
} else {
*output += FormatAddResultAsRedisReply(res);
}
return Status::OK();
}
protected:
void registerDefaultHandlers() override {
CommandTSCreateBase::registerDefaultHandlers();
registerHandler("TIMESTAMP", [this](TSOptionsParser &parser) {
auto s = handleTimeStamp(parser, ts_);
if (!s.IsOK()) return s;
is_ts_set_ = true;
return Status::OK();
});
}
static Status handleTimeStamp(TSOptionsParser &parser, uint64_t &ts) {
auto parse_timestamp = parser.TakeInt<uint64_t>();
if (!parse_timestamp.IsOK()) {
return {Status::RedisParseErr, errInvalidTimestamp};
}
ts = parse_timestamp.GetValue();
return Status::OK();
}
private:
bool is_ts_set_ = false;
uint64_t ts_ = 0;
double value_ = 0;
};
class CommandTSDel : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
if (args.size() < 4) {
return {Status::RedisParseErr, "wrong number of arguments for 'ts.del' command"};
}
CommandParser parser(args, 2);
// Parse start timestamp
auto start_parse = parser.TakeInt<uint64_t>();
if (!start_parse.IsOK()) {
auto start_ts_str = parser.TakeStr();
if (!start_ts_str.IsOK() || start_ts_str.GetValue() != "-") {
return {Status::RedisParseErr, "wrong fromTimestamp"};
}
// "-" means use default start timestamp: 0
} else {
start_ts_ = start_parse.GetValue();
}
// Parse end timestamp
auto end_parse = parser.TakeInt<uint64_t>();
if (!end_parse.IsOK()) {
auto end_ts_str = parser.TakeStr();
if (!end_ts_str.IsOK() || end_ts_str.GetValue() != "+") {
return {Status::RedisParseErr, "wrong toTimestamp"};
}
// "+" means use default end timestamp: MAX_TIMESTAMP
} else {
end_ts_ = end_parse.GetValue();
}
return Commander::Parse(args);
}
Status Execute(engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override {
auto timeseries_db = TimeSeries(srv->storage, conn->GetNamespace());
uint64_t deleted_count = 0;
auto s = timeseries_db.Del(ctx, args_[1], start_ts_, end_ts_, &deleted_count);
if (!s.ok()) {
return {Status::RedisExecErr, s.ToString()};
}
*output = redis::Integer(deleted_count);
return Status::OK();
}
private:
uint64_t start_ts_ = 0;
uint64_t end_ts_ = TSSample::MAX_TIMESTAMP;
};
REDIS_REGISTER_COMMANDS(Timeseries, MakeCmdAttr<CommandTSCreate>("ts.create", -2, "write", 1, 1, 1),
MakeCmdAttr<CommandTSAdd>("ts.add", -4, "write", 1, 1, 1),
MakeCmdAttr<CommandTSMAdd>("ts.madd", -4, "write", 1, -3, 1),
MakeCmdAttr<CommandTSRange>("ts.range", -4, "read-only", 1, 1, 1),
MakeCmdAttr<CommandTSInfo>("ts.info", -2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandTSGet>("ts.get", -2, "read-only", 1, 1, 1),
MakeCmdAttr<CommandTSCreateRule>("ts.createrule", -6, "write", 1, 2, 1),
MakeCmdAttr<CommandTSMGet>("ts.mget", -3, "read-only", NO_KEY),
MakeCmdAttr<CommandTSMRange>("ts.mrange", -5, "read-only", NO_KEY),
MakeCmdAttr<CommandTSIncrByDecrBy>("ts.incrby", -3, "write", 1, 1, 1),
MakeCmdAttr<CommandTSIncrByDecrBy>("ts.decrby", -3, "write", 1, 1, 1),
MakeCmdAttr<CommandTSDel>("ts.del", -4, "write", 1, 1, 1), );
} // namespace redis