| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| * |
| */ |
| |
| #pragma once |
| |
| #include <cstdint> |
| |
| #include "storage/redis_db.h" |
| #include "storage/redis_metadata.h" |
| #include "types/timeseries.h" |
| |
| namespace redis { |
| |
| enum class TSSubkeyType : uint8_t { |
| CHUNK = 0, |
| LABEL = 1, |
| DOWNSTREAM = 2, |
| }; |
| |
| // Enum prefix for new CF. |
| enum class IndexKeyType : uint8_t { |
| TS_LABEL = 0, |
| }; |
| |
| enum class TSAggregatorType : uint8_t { |
| NONE = 0, |
| SUM = 1, |
| MIN = 2, |
| MAX = 3, |
| COUNT = 4, |
| FIRST = 5, |
| LAST = 6, |
| AVG = 7, |
| RANGE = 8, |
| STD_P = 9, |
| STD_S = 10, |
| VAR_P = 11, |
| VAR_S = 12, |
| }; |
| |
| inline bool IsIncrementalAggregatorType(TSAggregatorType type) { |
| auto type_num = static_cast<uint8_t>(type); |
| return type_num >= 1 && type_num <= 4; |
| } |
| |
| struct TSAggregator { |
| TSAggregatorType type = TSAggregatorType::NONE; |
| uint64_t bucket_duration = 0; |
| uint64_t alignment = 0; |
| |
| TSAggregator() = default; |
| TSAggregator(TSAggregatorType type, uint64_t bucket_duration, uint64_t alignment) |
| : type(type), bucket_duration(bucket_duration), alignment(alignment) {} |
| |
| // Calculates the start timestamp of the aligned bucket that contains the given timestamp. |
| // E.g. `ts`=100, `duration`=30, `alignment`=20. |
| // The bucket containing `ts=100` starts at `80` (since 80 ≤ 100 < 110). Returns `80`. |
| uint64_t CalculateAlignedBucketLeft(uint64_t ts) const; |
| |
| // Calculates the end timestamp of the aligned bucket that contains the given timestamp. |
| uint64_t CalculateAlignedBucketRight(uint64_t ts) const; |
| |
| // Splits the given samples into buckets. |
| std::vector<nonstd::span<const TSSample>> SplitSamplesToBuckets(nonstd::span<const TSSample> samples) const; |
| |
| // Returns the samples earlier than `less_than` in the bucket that contains `ts`. |
| nonstd::span<const TSSample> GetBucketByTimestamp(nonstd::span<const TSSample> samples, uint64_t ts, |
| uint64_t less_than = TSSample::MAX_TIMESTAMP) const; |
| |
| // Calculates the aggregated value of the given samples according to the aggregator type |
| double AggregateSamplesValue(nonstd::span<const TSSample> samples) const; |
| }; |
| |
| struct TSDownStreamMeta { |
| TSAggregator aggregator; |
| uint64_t latest_bucket_idx; |
| |
| // store auxiliary info for each aggregator. |
| // e.g. for avg, need to store sum and count: u64_auxs={count}, f64_auxs={sum} |
| std::vector<uint64_t> u64_auxs; |
| std::vector<double> f64_auxs; |
| |
| TSDownStreamMeta() = default; |
| TSDownStreamMeta(TSAggregatorType agg_type, uint64_t bucket_duration, uint64_t alignment, uint64_t latest_bucket_idx) |
| : aggregator(agg_type, bucket_duration, alignment), latest_bucket_idx(latest_bucket_idx) {} |
| |
| // Aggregate samples and update the auxiliary info and latest_bucket_idx if needed. |
| // Returns the aggregated samples if there are new buckets. |
| // Note: Samples must be sorted by timestamp. |
| std::vector<TSSample> AggregateMultiBuckets(const std::vector<nonstd::span<const TSSample>> &bucket_spans, |
| bool skip_last_bucket = false); |
| |
| // Aggregate the samples to the latest bucket, update the auxiliary info. |
| void AggregateLatestBucket(nonstd::span<const TSSample> samples); |
| |
| // Reset auxiliary info. |
| void ResetAuxs(); |
| |
| void Encode(std::string *dst) const; |
| rocksdb::Status Decode(Slice *input); |
| }; |
| |
| struct IndexInternalKey { |
| Slice ns; |
| IndexKeyType type; |
| IndexInternalKey(Slice ns, IndexKeyType type) : ns(ns), type(type) {} |
| explicit IndexInternalKey(Slice input); |
| }; |
| |
| struct TSRevLabelKey : public IndexInternalKey { |
| Slice label_key; |
| Slice label_value; |
| Slice user_key; |
| |
| TSRevLabelKey(Slice ns, Slice label_key, Slice label_value, Slice user_key = Slice()) |
| : IndexInternalKey(ns, IndexKeyType::TS_LABEL), |
| label_key(label_key), |
| label_value(label_value), |
| user_key(user_key) {} |
| explicit TSRevLabelKey(Slice input); |
| |
| [[nodiscard]] std::string Encode() const; |
| static std::string UpperBound(Slice ns); |
| }; |
| |
| struct LabelKVPair { |
| std::string k; |
| std::string v; |
| }; |
| using LabelKVList = std::vector<LabelKVPair>; |
| |
| struct TSCreateOption { |
| uint64_t retention_time; |
| uint64_t chunk_size; |
| TimeSeriesMetadata::ChunkType chunk_type; |
| TimeSeriesMetadata::DuplicatePolicy duplicate_policy; |
| std::string source_key; |
| LabelKVList labels; |
| |
| TSCreateOption(); |
| }; |
| |
| struct TSInfoResult { |
| TimeSeriesMetadata metadata; |
| uint64_t total_samples; |
| uint64_t memory_usage; |
| uint64_t first_timestamp; |
| uint64_t last_timestamp; |
| std::vector<std::pair<std::string, TSDownStreamMeta>> downstream_rules; |
| LabelKVList labels; |
| }; |
| |
| struct TSRangeOption { |
| enum class BucketTimestampType : uint8_t { |
| Start = 0, |
| End = 1, |
| Mid = 2, |
| }; |
| uint64_t start_ts = 0; |
| uint64_t end_ts = TSSample::MAX_TIMESTAMP; |
| uint64_t count_limit = 0; |
| std::set<uint64_t> filter_by_ts; |
| std::optional<std::pair<double, double>> filter_by_value; |
| |
| // Used for comapction |
| TSAggregator aggregator; |
| bool is_return_latest = false; |
| bool is_return_empty = false; |
| BucketTimestampType bucket_timestamp_type = BucketTimestampType::Start; |
| }; |
| |
| struct TSMGetOption { |
| struct FilterOption { |
| std::unordered_map<std::string, std::set<std::string>> labels_equals; |
| std::unordered_map<std::string, std::set<std::string>> labels_not_equals; |
| }; |
| |
| bool with_labels = false; |
| std::set<std::string> selected_labels; |
| FilterOption filter; |
| }; |
| |
| struct TSMGetResult { |
| std::string name; // name of the source key or the group |
| LabelKVList labels; |
| std::vector<TSSample> samples; |
| }; |
| |
| class TSMQueryFilterParser { |
| public: |
| explicit TSMQueryFilterParser(TSMGetOption::FilterOption &option) : option_(option) {} |
| Status Parse(std::string_view expr); |
| Status Check() const; |
| |
| private: |
| TSMGetOption::FilterOption &option_; |
| bool has_matcher_ = false; |
| static std::pair<size_t, size_t> findOperator(std::string_view expr); |
| static std::string_view trim(std::string_view s); |
| static std::string_view unquote(std::string_view s); |
| static std::vector<std::string_view> splitValueList(std::string_view list); |
| void handleEquals(std::string_view label, std::string_view value_str); |
| void handleNotEquals(std::string_view label, std::string_view value_str); |
| }; |
| |
| struct TSMRangeOption : TSMGetOption, TSRangeOption { |
| enum class GroupReducerType : uint8_t { |
| NONE = 0, |
| AVG = 1, |
| SUM = 2, |
| MIN = 3, |
| MAX = 4, |
| RANGE = 5, |
| COUNT = 6, |
| STD_P = 7, |
| STD_S = 8, |
| VAR_P = 9, |
| VAR_S = 10, |
| }; |
| |
| GroupReducerType reducer = GroupReducerType::NONE; |
| std::string group_by_label; |
| }; |
| |
| struct TSMRangeResult : TSMGetResult { |
| std::vector<std::string> source_keys; |
| }; |
| |
| enum class TSCreateRuleResult : uint8_t { |
| kOK = 0, |
| kSrcNotExist = 1, |
| kDstNotExist = 2, |
| kSrcHasSourceRule = 3, |
| kDstHasSourceRule = 4, |
| kDstHasDestRule = 5, |
| kSrcEqDst = 6, |
| }; |
| |
| std::vector<TSSample> GroupSamplesAndReduce(const std::vector<std::vector<TSSample>> &all_samples, |
| TSMRangeOption::GroupReducerType reducer_type); |
| |
| TimeSeriesMetadata CreateMetadataFromOption(const TSCreateOption &option); |
| |
| class TimeSeries : public SubKeyScanner { |
| public: |
| using SampleBatch = TSChunk::SampleBatch; |
| using AddResult = TSChunk::AddResult; |
| using DuplicatePolicy = TimeSeriesMetadata::DuplicatePolicy; |
| |
| TimeSeries(engine::Storage *storage, const std::string &ns) |
| : SubKeyScanner(storage, ns), index_cf_handle_(storage->GetCFHandle(ColumnFamilyID::Index)) {} |
| rocksdb::Status Create(engine::Context &ctx, const Slice &user_key, const TSCreateOption &option); |
| rocksdb::Status Add(engine::Context &ctx, const Slice &user_key, TSSample sample, const TSCreateOption &option, |
| AddResult *res, const DuplicatePolicy *on_dup_policy = nullptr); |
| rocksdb::Status MAdd(engine::Context &ctx, const Slice &user_key, std::vector<TSSample> samples, |
| std::vector<AddResult> *res); |
| rocksdb::Status Info(engine::Context &ctx, const Slice &user_key, TSInfoResult *res); |
| rocksdb::Status Range(engine::Context &ctx, const Slice &user_key, const TSRangeOption &option, |
| std::vector<TSSample> *res); |
| rocksdb::Status Get(engine::Context &ctx, const Slice &user_key, bool is_return_latest, std::vector<TSSample> *res); |
| rocksdb::Status CreateRule(engine::Context &ctx, const Slice &src_key, const Slice &dst_key, |
| const TSAggregator &aggregator, TSCreateRuleResult *res); |
| rocksdb::Status MGet(engine::Context &ctx, const TSMGetOption &option, bool is_return_latest, |
| std::vector<TSMGetResult> *res); |
| rocksdb::Status MRange(engine::Context &ctx, const TSMRangeOption &option, std::vector<TSMRangeResult> *res); |
| rocksdb::Status IncrBy(engine::Context &ctx, const Slice &user_key, TSSample sample, const TSCreateOption &option, |
| AddResult *res); |
| rocksdb::Status Del(engine::Context &ctx, const Slice &user_key, uint64_t from, uint64_t to, uint64_t *deleted); |
| |
| private: |
| // Bundles the arguments for a downstream upsert operation |
| struct DownstreamUpsertArgs { |
| std::vector<std::string> new_chunks; // Newly created chunks |
| bool was_source_empty = false; // Whether the source time series was empty before upsert |
| SampleBatch *sample_batch = nullptr; // The sample batch that has been upserted to source |
| }; |
| |
| rocksdb::ColumnFamilyHandle *index_cf_handle_; |
| |
| rocksdb::Status getTimeSeriesMetadata(engine::Context &ctx, const Slice &ns_key, TimeSeriesMetadata *metadata); |
| rocksdb::Status createTimeSeries(engine::Context &ctx, const Slice &ns_key, TimeSeriesMetadata *metadata_out, |
| const TSCreateOption *options); |
| rocksdb::Status getOrCreateTimeSeries(engine::Context &ctx, const Slice &ns_key, TimeSeriesMetadata *metadata_out, |
| const TSCreateOption *option = nullptr); |
| rocksdb::Status getLabelKVList(engine::Context &ctx, const Slice &ns_key, const TimeSeriesMetadata &metadata, |
| LabelKVList *labels); |
| rocksdb::Status upsertCommon(engine::Context &ctx, const Slice &ns_key, TimeSeriesMetadata &metadata, |
| SampleBatch &sample_batch, DownstreamUpsertArgs *ds_args = nullptr); |
| rocksdb::Status upsertCommonInBatch(engine::Context &ctx, const Slice &ns_key, TimeSeriesMetadata &metadata, |
| SampleBatch &sample_batch, ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch, |
| DownstreamUpsertArgs *ds_args = nullptr); |
| rocksdb::Status rangeCommon(engine::Context &ctx, const Slice &ns_key, const TimeSeriesMetadata &metadata, |
| const TSRangeOption &option, std::vector<TSSample> *res, bool apply_retention = true); |
| rocksdb::Status upsertDownStream(engine::Context &ctx, const Slice &ns_key, const TimeSeriesMetadata &metadata, |
| DownstreamUpsertArgs &ds_args); |
| rocksdb::Status getCommon(engine::Context &ctx, const Slice &ns_key, const TimeSeriesMetadata &metadata, |
| bool is_return_latest, std::vector<TSSample> *res); |
| rocksdb::Status delRangeCommon(engine::Context &ctx, const Slice &ns_key, TimeSeriesMetadata &metadata, uint64_t from, |
| uint64_t to, uint64_t *deleted, bool inclusive_to = true); |
| rocksdb::Status delRangeCommonInBatch(engine::Context &ctx, const Slice &ns_key, TimeSeriesMetadata &metadata, |
| uint64_t from, uint64_t to, ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch, |
| uint64_t *deleted, bool inclusive_to = true); |
| rocksdb::Status delRangeDownStream(engine::Context &ctx, const Slice &ns_key, TimeSeriesMetadata &metadata, |
| std::vector<std::string> &ds_keys, std::vector<TSDownStreamMeta> &ds_metas, |
| uint64_t from, uint64_t to); |
| rocksdb::Status createLabelIndexInBatch(const Slice &ns_key, const TimeSeriesMetadata &metadata, |
| ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch, |
| const LabelKVList &labels); |
| rocksdb::Status createDownStreamMetadataInBatch(engine::Context &ctx, const Slice &ns_src_key, const Slice &dst_key, |
| const TimeSeriesMetadata &src_metadata, |
| const TSAggregator &aggregator, |
| ObserverOrUniquePtr<rocksdb::WriteBatchBase> &batch, |
| TSDownStreamMeta *ds_metadata); |
| rocksdb::Status getDownStreamRules(engine::Context &ctx, const Slice &ns_src_key, |
| const TimeSeriesMetadata &src_metadata, std::vector<std::string> *keys, |
| std::vector<TSDownStreamMeta> *metas = nullptr); |
| rocksdb::Status getTSKeyByFilter(engine::Context &ctx, const TSMGetOption::FilterOption &filter, |
| std::vector<std::string> *user_keys, std::vector<LabelKVList> *labels_vec = nullptr, |
| std::vector<TimeSeriesMetadata> *metas = nullptr); |
| |
| std::string internalKeyFromChunkID(const Slice &ns_key, const TimeSeriesMetadata &metadata, uint64_t id) const; |
| std::string internalKeyFromLabelKey(const Slice &ns_key, const TimeSeriesMetadata &metadata, Slice label_key) const; |
| std::string internalKeyFromDownstreamKey(const Slice &ns_key, const TimeSeriesMetadata &metadata, |
| Slice downstream_key) const; |
| std::string labelKeyFromInternalKey(Slice internal_key) const; |
| std::string downstreamKeyFromInternalKey(Slice internal_key) const; |
| static uint64_t chunkIDFromInternalKey(Slice internal_key); |
| }; |
| |
| } // namespace redis |