| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * License); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #ifndef COMMON_TSFILE_COMMON_H |
| #define COMMON_TSFILE_COMMON_H |
| |
| #include <map> |
| #include <string> |
| #include <unordered_map> |
| |
| #include "common/allocator/my_string.h" |
| #include "common/allocator/page_arena.h" |
| #include "common/config/config.h" |
| #include "common/container/list.h" |
| #include "device_id.h" |
| #include "reader/bloom_filter.h" |
| #include "statistic.h" |
| #include "utils/db_utils.h" |
| #include "utils/storage_utils.h" |
| |
| namespace storage { |
| |
| extern const char *MAGIC_STRING_TSFILE; |
| extern const int MAGIC_STRING_TSFILE_LEN; |
| extern const char VERSION_NUM_BYTE; |
| extern const char CHUNK_GROUP_HEADER_MARKER; |
| extern const char CHUNK_HEADER_MARKER; |
| extern const char ONLY_ONE_PAGE_CHUNK_HEADER_MARKER; |
| extern const char SEPARATOR_MARKER; |
| extern const char OPERATION_INDEX_RANGE; |
| |
| typedef int64_t TsFileID; |
| |
| // TODO review the String.len_ used |
| |
| // Note, in tsfile_io_writer, we just writer fields of PageHeader |
| // instead of do a serialize of PageHeader object. |
| // That is because statistic_ in PageHeader may be omitted if only |
| // one page exists in the chunk but we know that fact after we writer |
| // the first page. |
| struct PageHeader { |
| uint32_t uncompressed_size_; |
| uint32_t compressed_size_; |
| Statistic *statistic_; |
| |
| PageHeader() |
| : uncompressed_size_(0), compressed_size_(0), statistic_(nullptr) {} |
| ~PageHeader() { reset(); } |
| void reset() { |
| if (statistic_ != nullptr) { |
| StatisticFactory::free(statistic_); |
| statistic_ = nullptr; |
| } |
| uncompressed_size_ = 0; |
| compressed_size_ = 0; |
| } |
| int deserialize_from(common::ByteStream &in, bool deserialize_stat, |
| common::TSDataType data_type) { |
| int ret = error_info::E_OK; |
| if (RET_FAIL(common::SerializationUtil::read_var_uint( |
| uncompressed_size_, in))) { |
| } else if (RET_FAIL(common::SerializationUtil::read_var_uint( |
| compressed_size_, in))) { |
| } else if (deserialize_stat) { |
| statistic_ = StatisticFactory::alloc_statistic(data_type); |
| if (IS_NULL(statistic_)) { |
| return E_OOM; |
| } else if (RET_FAIL(statistic_->deserialize_from(in))) { |
| } |
| } |
| return ret; |
| } |
| |
| /** max page header size without statistics. */ |
| static int estimat_max_page_header_size_without_statistics() { |
| // uncompressedSize, compressedSize |
| // because we use unsigned varInt to encode these two integer, each |
| // unsigned varInt will cost at most 5 bytes |
| return 2 * (4 + 1); |
| } |
| |
| #ifndef NDEBUG |
| friend std::ostream &operator<<(std::ostream &os, const PageHeader &h) { |
| os << "{uncompressed_size_=" << h.uncompressed_size_ |
| << ", compressed_size_=" << h.uncompressed_size_; |
| if (h.statistic_ == nullptr) { |
| os << ", stat=nil}"; |
| } else { |
| os << ", stat=" << h.statistic_->to_string() << "}"; |
| } |
| return os; |
| } |
| #endif |
| }; |
| |
| struct ChunkHeader { |
| ChunkHeader() |
| : measurement_name_(""), |
| data_size_(0), |
| data_type_(common::INVALID_DATATYPE), |
| compression_type_(common::INVALID_COMPRESSION), |
| encoding_type_(common::INVALID_ENCODING), |
| num_of_pages_(0), |
| serialized_size_(0), |
| chunk_type_(0) {} |
| |
| void reset() { |
| data_size_ = 0; |
| num_of_pages_ = 0; |
| serialized_size_ = 0; |
| chunk_type_ = 0; |
| } |
| |
| ~ChunkHeader() = default; |
| |
| int serialize_to(common::ByteStream &out) { |
| int ret = error_info::E_OK; |
| if (RET_FAIL(common::SerializationUtil::write_char(chunk_type_, out))) { |
| } else if (RET_FAIL(common::SerializationUtil::write_var_str( |
| measurement_name_, out))) { |
| } else if (RET_FAIL(common::SerializationUtil::write_var_uint( |
| data_size_, out))) { |
| } else if (RET_FAIL(common::SerializationUtil::write_ui8(data_type_, |
| out))) { |
| } else if (RET_FAIL(common::SerializationUtil::write_ui8( |
| compression_type_, out))) { |
| } else if (RET_FAIL(common::SerializationUtil::write_ui8( |
| encoding_type_, out))) { |
| } |
| return ret; |
| } |
| int deserialize_from(common::ByteStream &in) { |
| int ret = error_info::E_OK; |
| in.mark_read_pos(); |
| if (RET_FAIL(common::SerializationUtil::read_char(chunk_type_, in))) { |
| } else if (RET_FAIL(common::SerializationUtil::read_var_str( |
| measurement_name_, in))) { |
| } else if (RET_FAIL(common::SerializationUtil::read_var_uint(data_size_, |
| in))) { |
| } else if (RET_FAIL(common::SerializationUtil::read_char( |
| (char &)data_type_, in))) { |
| } else if (RET_FAIL(common::SerializationUtil::read_char( |
| (char &)compression_type_, in))) { |
| } else if (RET_FAIL(common::SerializationUtil::read_char( |
| (char &)encoding_type_, in))) { |
| } else { |
| // There will meet data shortcut. |
| serialized_size_ = static_cast<int32_t>(in.get_mark_len()); |
| } |
| return ret; |
| } |
| #ifndef NDEBUG |
| |
| #endif |
| |
| std::string measurement_name_; |
| uint32_t data_size_; |
| common::TSDataType data_type_; |
| common::CompressionType compression_type_; |
| common::TSEncoding encoding_type_; |
| int32_t num_of_pages_; |
| int32_t serialized_size_; // TODO seems no usage |
| char chunk_type_; // TODO give a description here |
| |
| static const int MIN_SERIALIZED_SIZE = 7; |
| }; |
| |
| struct ChunkMeta { |
| // std::string measurement_name_; |
| common::String measurement_name_; |
| common::TSDataType data_type_; |
| int64_t offset_of_chunk_header_; |
| Statistic *statistic_; |
| char mask_; |
| common::TSEncoding encoding_; |
| common::CompressionType compression_type_; |
| |
| ChunkMeta() |
| : measurement_name_(), |
| data_type_(), |
| offset_of_chunk_header_(0), |
| statistic_(nullptr), |
| mask_(0) {} |
| |
| int init(const common::String &measurement_name, |
| common::TSDataType data_type, int64_t offset_of_chunk_header, |
| Statistic *stat, char mask, common::TSEncoding encoding, |
| common::CompressionType compression_type, common::PageArena &pa) { |
| // TODO check parameter valid |
| measurement_name_.dup_from(measurement_name, pa); |
| data_type_ = data_type; |
| offset_of_chunk_header_ = offset_of_chunk_header; |
| statistic_ = stat; |
| mask_ = mask; |
| encoding_ = encoding; |
| compression_type_ = compression_type; |
| return error_info::E_OK; |
| } |
| FORCE_INLINE void clone_statistic_from(Statistic *stat) { |
| clone_statistic(stat, statistic_, data_type_); |
| } |
| FORCE_INLINE int clone_from(ChunkMeta &that, common::PageArena *pa) { |
| int ret = error_info::E_OK; |
| if (RET_FAIL(measurement_name_.dup_from(that.measurement_name_, *pa))) { |
| return ret; |
| } |
| data_type_ = that.data_type_; |
| offset_of_chunk_header_ = that.offset_of_chunk_header_; |
| if (that.statistic_ != nullptr) { |
| statistic_ = |
| StatisticFactory::alloc_statistic_with_pa(data_type_, pa); |
| if (IS_NULL(statistic_)) { |
| return E_OOM; |
| } |
| clone_statistic_from(that.statistic_); |
| } |
| mask_ = that.mask_; |
| return ret; |
| } |
| int serialize_to(common::ByteStream &out, bool serialize_statistic) { |
| int ret = error_info::E_OK; |
| if (RET_FAIL(common::SerializationUtil::write_i64( |
| offset_of_chunk_header_, out))) { |
| } else if (serialize_statistic) { |
| ret = statistic_->serialize_to(out); |
| } |
| return ret; |
| } |
| int deserialize_from(common::ByteStream &in, bool deserialize_stat, |
| common::PageArena *pa) { |
| int ret = error_info::E_OK; |
| if (RET_FAIL(common::SerializationUtil::read_i64( |
| offset_of_chunk_header_, in))) { |
| } else if (deserialize_stat) { |
| statistic_ = |
| StatisticFactory::alloc_statistic_with_pa(data_type_, pa); |
| if (IS_NULL(statistic_)) { |
| ret = E_OOM; |
| } else { |
| ret = statistic_->deserialize_from(in); |
| } |
| } |
| return ret; |
| } |
| #ifndef NDEBUG |
| friend std::ostream &operator<<(std::ostream &os, const ChunkMeta &cm) { |
| os << "{measurement_name=" << cm.measurement_name_ |
| << ", data_type=" << cm.data_type_ |
| << ", offset_of_chunk_header=" << cm.offset_of_chunk_header_ |
| << ", mask=" << ((int)cm.mask_); |
| if (cm.statistic_ == nullptr) { |
| os << ", statistic=nil}"; |
| } else { |
| os << ", statistic=" << cm.statistic_->to_string() << "}"; |
| } |
| return os; |
| } |
| #endif |
| }; |
| |
| struct ChunkGroupMeta { |
| std::shared_ptr<IDeviceID> device_id_; |
| common::SimpleList<ChunkMeta *> chunk_meta_list_; |
| |
| explicit ChunkGroupMeta(common::PageArena *pa_ptr) |
| : chunk_meta_list_(pa_ptr) {} |
| |
| FORCE_INLINE int init(std::shared_ptr<IDeviceID> device_id) { |
| device_id_ = device_id; |
| return 0; |
| } |
| FORCE_INLINE int push(ChunkMeta *cm) { |
| return chunk_meta_list_.push_back(cm); |
| } |
| }; |
| |
| class ITimeseriesIndex { |
| public: |
| ITimeseriesIndex() {} |
| ~ITimeseriesIndex() {} |
| virtual common::SimpleList<ChunkMeta *> *get_chunk_meta_list() const { |
| return nullptr; |
| } |
| virtual common::SimpleList<ChunkMeta *> *get_time_chunk_meta_list() const { |
| return nullptr; |
| } |
| virtual common::SimpleList<ChunkMeta *> *get_value_chunk_meta_list() const { |
| return nullptr; |
| } |
| |
| virtual common::String get_measurement_name() const { |
| return common::String(); |
| } |
| virtual common::TSDataType get_data_type() const { |
| return common::INVALID_DATATYPE; |
| } |
| virtual Statistic *get_statistic() const { return nullptr; } |
| }; |
| |
| /* |
| * A TimeseriesIndex may have one or more chunk metas, |
| * that means we have such a map: <Timeseries, List<ChunkMeta>>. |
| */ |
| class TimeseriesIndex : public ITimeseriesIndex { |
| public: |
| static const uint32_t CHUNK_META_LIST_SERIALIZED_BUF_PAGE_SIZE = 128; |
| static const uint32_t PAGE_ARENA_PAGE_SIZE = 256; |
| static const common::AllocModID PAGE_ARENA_MOD_ID = |
| common::MOD_TIMESERIES_INDEX_OBJ; |
| |
| public: |
| TimeseriesIndex() |
| : timeseries_meta_type_((char)255), |
| chunk_meta_list_data_size_(0), |
| measurement_name_(), |
| ts_id_(), |
| data_type_(common::INVALID_DATATYPE), |
| statistic_(nullptr), |
| statistic_from_pa_(false), |
| chunk_meta_list_serialized_buf_( |
| CHUNK_META_LIST_SERIALIZED_BUF_PAGE_SIZE, PAGE_ARENA_MOD_ID), |
| chunk_meta_list_(nullptr) { |
| // page_arena_.init(PAGE_ARENA_PAGE_SIZE, PAGE_ARENA_MOD_ID); |
| } |
| ~TimeseriesIndex() { destroy(); } |
| void destroy() { |
| // page_arena_.destroy(); |
| reset(); |
| } |
| void reset() // FIXME reuse |
| { |
| timeseries_meta_type_ = 0; |
| chunk_meta_list_data_size_ = 0; |
| measurement_name_.reset(); |
| ts_id_.reset(); |
| data_type_ = common::VECTOR; |
| chunk_meta_list_serialized_buf_.reset(); |
| if (statistic_ != nullptr && !statistic_from_pa_) { |
| StatisticFactory::free(statistic_); |
| statistic_ = nullptr; |
| } |
| } |
| |
| int add_chunk_meta(ChunkMeta *chunk_meta, bool serialize_statistic); |
| FORCE_INLINE int set_measurement_name(common::String &measurement_name, |
| common::PageArena &pa) { |
| return measurement_name_.dup_from(measurement_name, pa); |
| } |
| FORCE_INLINE void set_measurement_name(common::String &measurement_name) { |
| measurement_name_.shallow_copy_from(measurement_name); |
| } |
| FORCE_INLINE virtual common::String get_measurement_name() const { |
| return measurement_name_; |
| } |
| virtual inline common::SimpleList<ChunkMeta *> *get_chunk_meta_list() |
| const { |
| return chunk_meta_list_; |
| } |
| FORCE_INLINE void set_ts_meta_type(char ts_meta_type) { |
| timeseries_meta_type_ = ts_meta_type; |
| } |
| FORCE_INLINE void set_data_type(common::TSDataType data_type) { |
| data_type_ = data_type; |
| } |
| FORCE_INLINE virtual common::TSDataType get_data_type() const { |
| return data_type_; |
| } |
| int init_statistic(common::TSDataType data_type) { |
| if (statistic_ != nullptr && |
| !statistic_from_pa_) { // clear old statistic |
| StatisticFactory::free(statistic_); |
| statistic_ = nullptr; |
| } |
| statistic_ = StatisticFactory::alloc_statistic(data_type); |
| if (IS_NULL(statistic_)) { |
| return E_OOM; |
| } |
| statistic_->reset(); |
| return error_info::E_OK; |
| } |
| virtual Statistic *get_statistic() const { return statistic_; } |
| common::TsID get_ts_id() const { return ts_id_; } |
| |
| FORCE_INLINE void finish() { |
| chunk_meta_list_data_size_ = |
| chunk_meta_list_serialized_buf_.total_size(); |
| } |
| |
| int serialize_to(common::ByteStream &out) { |
| int ret = error_info::E_OK; |
| if (RET_FAIL(common::SerializationUtil::write_char( |
| timeseries_meta_type_, out))) { |
| } else if (RET_FAIL(common::SerializationUtil::write_mystring( |
| measurement_name_, out))) { |
| } else if (RET_FAIL(common::SerializationUtil::write_char(data_type_, |
| out))) { |
| } else if (RET_FAIL(common::SerializationUtil::write_var_uint( |
| chunk_meta_list_data_size_, out))) { |
| } else if (RET_FAIL(statistic_->serialize_to(out))) { |
| } else if (RET_FAIL(merge_byte_stream( |
| out, chunk_meta_list_serialized_buf_))) { |
| } |
| return ret; |
| } |
| |
| int deserialize_from(common::ByteStream &in, common::PageArena *pa) { |
| int ret = error_info::E_OK; |
| if (RET_FAIL(common::SerializationUtil::read_char(timeseries_meta_type_, |
| in))) { |
| } else if (RET_FAIL(common::SerializationUtil::read_mystring( |
| measurement_name_, pa, in))) { |
| } else if (RET_FAIL(common::SerializationUtil::read_char( |
| (char &)data_type_, in))) { |
| } else if (RET_FAIL(common::SerializationUtil::read_var_uint( |
| chunk_meta_list_data_size_, in))) { |
| } else if (nullptr == |
| (statistic_ = StatisticFactory::alloc_statistic_with_pa( |
| data_type_, pa))) { |
| ret = E_OOM; |
| } else if (RET_FAIL(statistic_->deserialize_from(in))) { |
| } else { |
| statistic_from_pa_ = true; |
| void *chunk_meta_list_buf = pa->alloc(sizeof(*chunk_meta_list_)); |
| if (IS_NULL(chunk_meta_list_buf)) { |
| return E_OOM; |
| } |
| const bool deserialize_chunk_meta_statistic = |
| (timeseries_meta_type_ & 0x3F); // TODO |
| chunk_meta_list_ = |
| new (chunk_meta_list_buf) common::SimpleList<ChunkMeta *>(pa); |
| uint32_t start_pos = in.read_pos(); |
| while (IS_SUCC(ret) && |
| in.read_pos() < start_pos + chunk_meta_list_data_size_) { |
| void *cm_buf = pa->alloc(sizeof(ChunkMeta)); |
| if (IS_NULL(cm_buf)) { |
| ret = E_OOM; |
| } else { |
| ChunkMeta *cm = new (cm_buf) ChunkMeta; |
| cm->measurement_name_.shallow_copy_from( |
| this->measurement_name_); |
| cm->data_type_ = this->data_type_; |
| cm->mask_ = 0; // TODO |
| if (RET_FAIL(cm->deserialize_from( |
| in, deserialize_chunk_meta_statistic, pa))) { |
| } else if (RET_FAIL(chunk_meta_list_->push_back(cm))) { |
| } |
| } |
| } |
| } |
| return ret; |
| } |
| |
| int clone_from(const TimeseriesIndex &that, common::PageArena *pa) { |
| int ret = error_info::E_OK; |
| timeseries_meta_type_ = that.timeseries_meta_type_; |
| chunk_meta_list_data_size_ = that.chunk_meta_list_data_size_; |
| ts_id_ = that.ts_id_; |
| data_type_ = that.data_type_; |
| |
| statistic_ = StatisticFactory::alloc_statistic_with_pa(data_type_, pa); |
| if (IS_NULL(statistic_)) { |
| return E_OOM; |
| } |
| clone_statistic(that.statistic_, this->statistic_, data_type_); |
| statistic_from_pa_ = true; |
| |
| if (RET_FAIL(measurement_name_.dup_from(that.measurement_name_, *pa))) { |
| return ret; |
| } |
| |
| if (that.chunk_meta_list_ != nullptr) { |
| void *buf = pa->alloc(sizeof(*chunk_meta_list_)); |
| if (IS_NULL(buf)) { |
| return E_OOM; |
| } |
| chunk_meta_list_ = new (buf) common::SimpleList<ChunkMeta *>(pa); |
| common::SimpleList<ChunkMeta *>::Iterator it; |
| for (it = that.chunk_meta_list_->begin(); |
| IS_SUCC(ret) && it != that.chunk_meta_list_->end(); it++) { |
| ChunkMeta *cm = it.get(); |
| void *cm_buf = pa->alloc(sizeof(ChunkMeta)); |
| if (IS_NULL(cm_buf)) { |
| return E_OOM; |
| } else { |
| ChunkMeta *my_cm = new (cm_buf) ChunkMeta; |
| if (RET_FAIL(my_cm->clone_from(*cm, pa))) { |
| } else if (RET_FAIL(chunk_meta_list_->push_back(my_cm))) { |
| } |
| } |
| } |
| } // end (that.chunk_meta_list_ != nullptr) |
| return ret; |
| } |
| #ifndef NDEBUG |
| friend std::ostream &operator<<(std::ostream &os, |
| const TimeseriesIndex &tsi) { |
| os << "{meta_type=" << (int)tsi.timeseries_meta_type_ |
| << ", chunk_meta_list_data_size=" << tsi.chunk_meta_list_data_size_ |
| << ", measurement_name=" << tsi.measurement_name_ |
| << ", ts_id=" << tsi.ts_id_.to_string() |
| << ", data_type=" << common::get_data_type_name(tsi.data_type_) |
| << ", statistic=" << tsi.statistic_->to_string(); |
| |
| if (tsi.chunk_meta_list_) { |
| os << ", chunk_meta_list={"; |
| int count = 0; |
| common::SimpleList<ChunkMeta *>::Iterator it = |
| tsi.chunk_meta_list_->begin(); |
| for (; it != tsi.chunk_meta_list_->end(); it++, count++) { |
| if (count != 0) { |
| os << ", "; |
| } |
| os << "[" << count << "]={" << *it.get() << "}"; |
| } |
| os << "}"; |
| } |
| return os; |
| } |
| #endif |
| private: |
| /* |
| * If this timeseries has more than one chunk meta, timeseries_meta_type_ |
| * is 1. Otherwise timeseries_meta_type_ is 0. It also should OR with mask |
| * of chunk meta. |
| */ |
| char timeseries_meta_type_; |
| |
| // Sum of chunk meta serialized size in List<ChunkMeta> of this timeseries. |
| uint32_t chunk_meta_list_data_size_; |
| |
| // std::string measurement_name_; |
| common::String measurement_name_; |
| common::TsID ts_id_; |
| common::TSDataType data_type_; |
| |
| /* |
| * If TimeseriesIndex has only one ChunkMeta, then |
| * TimeseriesIndex.statistic_ is duplicated with ChunkMeta.statistic_. In |
| * this case, we do not serialize ChunkMeta.statistic_. |
| */ |
| Statistic *statistic_; |
| bool statistic_from_pa_; |
| common::ByteStream chunk_meta_list_serialized_buf_; |
| // common::PageArena page_arena_; |
| common::SimpleList<ChunkMeta *> *chunk_meta_list_; // for deserialize_from |
| }; |
| |
| class AlignedTimeseriesIndex : public ITimeseriesIndex { |
| public: |
| TimeseriesIndex *time_ts_idx_; |
| TimeseriesIndex *value_ts_idx_; |
| |
| AlignedTimeseriesIndex() {} |
| ~AlignedTimeseriesIndex() {} |
| virtual common::SimpleList<ChunkMeta *> *get_time_chunk_meta_list() const { |
| return time_ts_idx_->get_chunk_meta_list(); |
| } |
| virtual common::SimpleList<ChunkMeta *> *get_value_chunk_meta_list() const { |
| return value_ts_idx_->get_chunk_meta_list(); |
| } |
| |
| virtual common::String get_measurement_name() const { |
| return value_ts_idx_->get_measurement_name(); |
| } |
| virtual common::TSDataType get_data_type() const { |
| return time_ts_idx_->get_data_type(); |
| } |
| virtual Statistic *get_statistic() const { |
| return value_ts_idx_->get_statistic(); |
| } |
| |
| #ifndef NDEBUG |
| friend std::ostream &operator<<(std::ostream &os, |
| const AlignedTimeseriesIndex &tsi) { |
| os << "time_ts_idx=" << *tsi.time_ts_idx_; |
| os << ", value_ts_idx=" << *tsi.value_ts_idx_; |
| return os; |
| } |
| #endif |
| }; |
| |
| class TSMIterator { |
| public: |
| explicit TSMIterator( |
| common::SimpleList<ChunkGroupMeta *> &chunk_group_meta_list) |
| : chunk_group_meta_list_(chunk_group_meta_list), |
| chunk_group_meta_iter_(), |
| chunk_meta_iter_() {} |
| |
| // sort => iterate |
| int init(); |
| bool has_next() const; |
| int get_next(std::shared_ptr<IDeviceID> &ret_device_id, |
| common::String &ret_measurement_name, |
| TimeseriesIndex &ret_ts_index); |
| |
| private: |
| common::SimpleList<ChunkGroupMeta *> &chunk_group_meta_list_; |
| common::SimpleList<ChunkGroupMeta *>::Iterator chunk_group_meta_iter_; |
| common::SimpleList<ChunkMeta *>::Iterator chunk_meta_iter_; |
| |
| // timeseries measurenemnt chunk meta info |
| // map <device_name, <measurement_name, vector<chunk_meta>>> |
| std::map<std::shared_ptr<IDeviceID>, |
| std::map<common::String, std::vector<ChunkMeta *>>> |
| tsm_chunk_meta_info_; |
| |
| // device iterator |
| std::map<std::shared_ptr<IDeviceID>, |
| std::map<common::String, std::vector<ChunkMeta *>>>::iterator |
| tsm_device_iter_; |
| |
| // measurement iterator |
| std::map<common::String, std::vector<ChunkMeta *>>::iterator |
| tsm_measurement_iter_; |
| }; |
| |
| /* =============== TsFile Index ================ */ |
| struct IComparable { |
| virtual ~IComparable() = default; |
| virtual bool operator<(const IComparable &other) const = 0; |
| virtual bool operator>(const IComparable &other) const = 0; |
| virtual bool operator==(const IComparable &other) const = 0; |
| virtual int compare(const IComparable &other) { |
| if (this->operator<(other)) { |
| return -1; |
| } else if (this->operator==(other)) { |
| return 0; |
| } else { |
| return 1; |
| } |
| } |
| virtual std::string to_string() const = 0; |
| }; |
| |
| struct DeviceIDComparable : IComparable { |
| std::shared_ptr<IDeviceID> device_id_; |
| |
| explicit DeviceIDComparable(const std::shared_ptr<IDeviceID> &device_id) |
| : device_id_(device_id) {} |
| |
| bool operator<(const IComparable &other) const override { |
| const auto *other_device = |
| dynamic_cast<const DeviceIDComparable *>(&other); |
| if (!other_device) throw std::runtime_error("Incompatible comparison"); |
| return *device_id_ < *other_device->device_id_; |
| } |
| |
| bool operator>(const IComparable &other) const override { |
| const auto *other_device = |
| dynamic_cast<const DeviceIDComparable *>(&other); |
| if (!other_device) throw std::runtime_error("Incompatible comparison"); |
| return *device_id_ != *other_device->device_id_ && |
| !(*device_id_ < *other_device->device_id_); |
| } |
| |
| bool operator==(const IComparable &other) const override { |
| const auto *other_device = |
| dynamic_cast<const DeviceIDComparable *>(&other); |
| if (!other_device) throw std::runtime_error("Incompatible comparison"); |
| return *device_id_ == *other_device->device_id_; |
| } |
| |
| std::string to_string() const override { |
| return device_id_->get_device_name(); |
| } |
| }; |
| |
| struct StringComparable : IComparable { |
| std::string value_; |
| |
| explicit StringComparable(const std::string &value) : value_(value) {} |
| |
| bool operator<(const IComparable &other) const override { |
| const auto *other_string = |
| dynamic_cast<const StringComparable *>(&other); |
| if (!other_string) throw std::runtime_error("Incompatible comparison"); |
| return value_ < other_string->value_; |
| } |
| |
| bool operator>(const IComparable &other) const override { |
| const auto *other_string = |
| dynamic_cast<const StringComparable *>(&other); |
| if (!other_string) throw std::runtime_error("Incompatible comparison"); |
| return value_ > other_string->value_; |
| } |
| |
| bool operator==(const IComparable &other) const override { |
| const auto *other_string = |
| dynamic_cast<const StringComparable *>(&other); |
| if (!other_string) throw std::runtime_error("Incompatible comparison"); |
| return value_ == other_string->value_; |
| } |
| |
| std::string to_string() const override { return value_; } |
| }; |
| |
| struct IMetaIndexEntry { |
| static void self_destructor(IMetaIndexEntry *ptr) { |
| if (ptr) { |
| ptr->~IMetaIndexEntry(); |
| } |
| } |
| IMetaIndexEntry() = default; |
| virtual ~IMetaIndexEntry() = default; |
| |
| virtual int serialize_to(common::ByteStream &out); |
| virtual int deserialize_from(common::ByteStream &out, |
| common::PageArena *pa); |
| virtual int64_t get_offset() const = 0; |
| virtual bool is_device_level() const = 0; |
| virtual std::shared_ptr<IComparable> get_compare_key() const; |
| virtual common::String get_name() const; |
| virtual std::shared_ptr<IDeviceID> get_device_id() const; |
| virtual std::shared_ptr<IMetaIndexEntry> clone(common::PageArena *pa) = 0; |
| #ifndef NDEBUG |
| virtual void print(std::ostream &os) const {} |
| friend std::ostream &operator<<(std::ostream &os, |
| const IMetaIndexEntry &entry) { |
| entry.print(os); |
| return os; |
| } |
| #endif |
| }; |
| |
| struct DeviceMetaIndexEntry : IMetaIndexEntry { |
| std::shared_ptr<IDeviceID> device_id_; |
| int64_t offset_; |
| |
| DeviceMetaIndexEntry() = default; |
| |
| DeviceMetaIndexEntry(const std::shared_ptr<IDeviceID> &device_id, |
| const int64_t offset) |
| : device_id_(device_id), offset_(offset) {} |
| |
| ~DeviceMetaIndexEntry() override = default; |
| |
| static void self_deleter(DeviceMetaIndexEntry *ptr) { |
| if (ptr) { |
| ptr->~DeviceMetaIndexEntry(); |
| } |
| } |
| |
| int serialize_to(common::ByteStream &out) override { |
| int ret = error_info::E_OK; |
| if (RET_FAIL(device_id_->serialize(out))) { |
| } else if (RET_FAIL( |
| common::SerializationUtil::write_i64(offset_, out))) { |
| } |
| return ret; |
| } |
| |
| std::shared_ptr<IDeviceID> &get_device_id() { return device_id_; } |
| |
| int deserialize_from(common::ByteStream &in, |
| common::PageArena *pa) override { |
| int ret = error_info::E_OK; |
| device_id_ = std::make_shared<StringArrayDeviceID>("init"); |
| if (RET_FAIL(device_id_->deserialize(in))) { |
| } else if (RET_FAIL(common::SerializationUtil::read_i64(offset_, in))) { |
| } |
| return ret; |
| } |
| |
| int64_t get_offset() const override { return offset_; } |
| |
| std::shared_ptr<IComparable> get_compare_key() const override { |
| return std::make_shared<DeviceIDComparable>(device_id_); |
| } |
| |
| bool is_device_level() const override { return true; } |
| common::String get_name() const override { return {}; } |
| std::shared_ptr<IDeviceID> get_device_id() const override { |
| return device_id_; |
| } |
| std::shared_ptr<IMetaIndexEntry> clone(common::PageArena *pa) override { |
| return std::make_shared<DeviceMetaIndexEntry>(device_id_, offset_); |
| } |
| #ifndef NDEBUG |
| void print(std::ostream &os) const override { |
| os << "name=" << device_id_ << ", offset=" << offset_; |
| } |
| #endif |
| }; |
| |
| struct MeasurementMetaIndexEntry : IMetaIndexEntry { |
| common::String name_; |
| int64_t offset_; |
| |
| ~MeasurementMetaIndexEntry() override = default; |
| |
| MeasurementMetaIndexEntry() = default; |
| MeasurementMetaIndexEntry(const common::String &name, const int64_t offset, |
| common::PageArena &pa) { |
| offset_ = offset; |
| name_.dup_from(name, pa); |
| } |
| |
| FORCE_INLINE int init(const std::string &str, const int64_t offset, |
| common::PageArena &pa) { |
| offset_ = offset; |
| return name_.dup_from(str, pa); |
| } |
| |
| int serialize_to(common::ByteStream &out) override { |
| int ret = error_info::E_OK; |
| if (RET_FAIL(common::SerializationUtil::write_mystring(name_, out))) { |
| } else if (RET_FAIL( |
| common::SerializationUtil::write_i64(offset_, out))) { |
| } |
| return ret; |
| } |
| |
| int deserialize_from(common::ByteStream &in, |
| common::PageArena *pa) override { |
| int ret = error_info::E_OK; |
| if (RET_FAIL(common::SerializationUtil::read_mystring(name_, pa, in))) { |
| } else if (RET_FAIL(common::SerializationUtil::read_i64(offset_, in))) { |
| } |
| return ret; |
| } |
| |
| int64_t get_offset() const override { return offset_; } |
| |
| std::shared_ptr<IComparable> get_compare_key() const override { |
| return std::make_shared<StringComparable>(name_.to_std_string()); |
| } |
| |
| bool is_device_level() const override { return false; } |
| |
| common::String get_name() const override { return name_; } |
| std::shared_ptr<IDeviceID> get_device_id() const override { |
| return nullptr; |
| } |
| std::shared_ptr<IMetaIndexEntry> clone(common::PageArena *pa) override { |
| return std::make_shared<MeasurementMetaIndexEntry>(name_, offset_, *pa); |
| } |
| #ifndef NDEBUG |
| void print(std::ostream &os) const override { |
| os << "name=" << name_ << ", offset=" << offset_; |
| } |
| #endif |
| }; |
| |
| enum MetaIndexNodeType { |
| INTERNAL_DEVICE = 0, |
| LEAF_DEVICE = 1, |
| INTERNAL_MEASUREMENT = 2, |
| LEAF_MEASUREMENT = 3, |
| INVALID_META_NODE_TYPE = 4, |
| }; |
| #ifndef NDEBUG |
| static const char *meta_index_node_type_names[5] = { |
| "INTERNAL_DEVICE", "LEAF_DEVICE", "INTERNAL_MEASUREMENT", |
| "LEAF_MEASUREMENT", "INVALID_META_NODE_TYPE"}; |
| #endif |
| |
| struct MetaIndexNode { |
| // TODO use vector to support binary search |
| // common::SimpleList<MetaIndexEntry*> children_; |
| std::vector<std::shared_ptr<IMetaIndexEntry>> children_; |
| int64_t end_offset_; |
| MetaIndexNodeType node_type_; |
| common::PageArena *pa_; |
| |
| explicit MetaIndexNode(common::PageArena *pa) |
| : children_(), end_offset_(0), node_type_(), pa_(pa) {} |
| |
| std::shared_ptr<IMetaIndexEntry> peek() { |
| if (children_.empty()) { |
| return nullptr; |
| } |
| return children_[0]; |
| } |
| |
| ~MetaIndexNode() {} |
| |
| static void self_deleter(MetaIndexNode *ptr) { |
| if (ptr) { |
| ptr->~MetaIndexNode(); |
| } |
| } |
| |
| int binary_search_children( |
| std::shared_ptr<IComparable> key, bool exact_search, |
| std::shared_ptr<IMetaIndexEntry> &ret_index_entry, |
| int64_t &ret_end_offset); |
| |
| int serialize_to(common::ByteStream &out) { |
| int ret = error_info::E_OK; |
| #if DEBUG_SE |
| int64_t start_pos = out.total_size(); |
| #endif |
| if (RET_FAIL(common::SerializationUtil::write_var_uint(children_.size(), |
| out))) { |
| } else { |
| for (size_t i = 0; IS_SUCC(ret) && i < children_.size(); i++) { |
| auto entry = children_[i]; |
| if (RET_FAIL(entry->serialize_to(out))) { |
| } |
| } |
| if (IS_SUCC(ret)) { |
| if (RET_FAIL(common::SerializationUtil::write_i64(end_offset_, |
| out))) { |
| } else if (RET_FAIL(common::SerializationUtil::write_char( |
| node_type_, out))) { |
| } |
| } |
| } |
| #if DEBUG_SE |
| std::cout << "MetaIndexNode serialize_to. this=" << *this |
| << " at file pos: " << start_pos << " to " << out.total_size() |
| << std::endl; |
| #endif |
| return ret; |
| } |
| |
| int deserialize_from(char *buf, int len) { |
| common::ByteStream bs; |
| bs.wrap_from(buf, len); |
| return deserialize_from(bs); |
| } |
| int deserialize_from(common::ByteStream &in) { |
| int ret = error_info::E_OK; |
| uint32_t children_size = 0; |
| if (RET_FAIL( |
| common::SerializationUtil::read_var_uint(children_size, in))) { |
| return ret; |
| } |
| for (uint32_t i = 0; i < children_size && IS_SUCC(ret); i++) { |
| void *entry_buf = pa_->alloc(sizeof(MeasurementMetaIndexEntry)); |
| if (IS_NULL(entry_buf)) { |
| return E_OOM; |
| } |
| auto entry = new (entry_buf) MeasurementMetaIndexEntry; |
| |
| if (RET_FAIL(entry->deserialize_from(in, pa_))) { |
| } else { |
| children_.push_back(std::shared_ptr<IMetaIndexEntry>( |
| entry, IMetaIndexEntry::self_destructor)); |
| } |
| } // end for |
| if (IS_SUCC(ret)) { |
| char node_type_ch = 0; |
| if (RET_FAIL( |
| common::SerializationUtil::read_i64(end_offset_, in))) { |
| } else if (RET_FAIL(common::SerializationUtil::read_char( |
| node_type_ch, in))) { |
| } else { |
| node_type_ = (MetaIndexNodeType)node_type_ch; |
| } |
| } |
| #if DEBUG_SE |
| std::cout << "MetaIndexNode deserialize_from. this=" << *this |
| << std::endl; |
| #endif |
| return ret; |
| } |
| int device_deserialize_from(char *buf, int len) { |
| common::ByteStream bs; |
| bs.wrap_from(buf, len); |
| return device_deserialize_from(bs); |
| } |
| int device_deserialize_from(common::ByteStream &in) { |
| int ret = error_info::E_OK; |
| uint32_t children_size = 0; |
| if (RET_FAIL( |
| common::SerializationUtil::read_var_uint(children_size, in))) { |
| return ret; |
| } |
| for (uint32_t i = 0; i < children_size && IS_SUCC(ret); i++) { |
| void *entry_buf = pa_->alloc(sizeof(DeviceMetaIndexEntry)); |
| if (IS_NULL(entry_buf)) { |
| return E_OOM; |
| } |
| auto* entry_ptr = new(entry_buf) DeviceMetaIndexEntry(); |
| auto entry = std::shared_ptr<DeviceMetaIndexEntry>( |
| entry_ptr, DeviceMetaIndexEntry::self_deleter); |
| if (RET_FAIL(entry->deserialize_from(in, pa_))) { |
| } else { |
| children_.push_back(entry); |
| } |
| } // end for |
| if (IS_SUCC(ret)) { |
| char node_type_ch = 0; |
| if (RET_FAIL( |
| common::SerializationUtil::read_i64(end_offset_, in))) { |
| } else if (RET_FAIL(common::SerializationUtil::read_char( |
| node_type_ch, in))) { |
| } else { |
| node_type_ = (MetaIndexNodeType)node_type_ch; |
| } |
| } |
| #if DEBUG_SE |
| std::cout << "MetaIndexNode deserialize_from. this=" << *this |
| << std::endl; |
| #endif |
| return ret; |
| } |
| |
| #ifndef NDEBUG |
| friend std::ostream &operator<<(std::ostream &os, |
| const MetaIndexNode &node) { |
| os << "end_offset=" << node.end_offset_ |
| << ", node_type=" << meta_index_node_type_names[node.node_type_]; |
| |
| os << ", MetaIndexEntry children={"; |
| for (size_t i = 0; i < node.children_.size(); i++) { |
| os << (i == 0 ? "" : ", ") << "[" << i << "]={" |
| << *node.children_[i] << "}"; |
| } |
| os << "}"; |
| return os; |
| } |
| #endif |
| |
| FORCE_INLINE bool is_full() const { |
| return children_.size() >= |
| common::g_config_value_.max_degree_of_index_node_; |
| } |
| |
| FORCE_INLINE bool is_empty() const { return children_.size() == 0; } |
| |
| FORCE_INLINE int push_entry(std::shared_ptr<IMetaIndexEntry> entry) { |
| #if DEBUG_SE |
| std::cout << "MetaIndexNode.push_entry(" << *entry << ")" << std::endl; |
| #endif |
| children_.push_back(entry); |
| return error_info::E_OK; |
| } |
| FORCE_INLINE void destroy() { |
| // std::vector<MetaIndexEntry*>().swap(children_); |
| children_.~vector(); |
| } |
| }; |
| |
| class TableSchema; |
| |
| struct TsFileMeta { |
| typedef std::map<std::shared_ptr<IDeviceID>, std::shared_ptr<MetaIndexNode>, |
| IDeviceIDComparator> |
| DeviceNodeMap; |
| std::map<std::string, std::shared_ptr<MetaIndexNode>> |
| table_metadata_index_node_map_; |
| std::unordered_map<std::string, std::string *> tsfile_properties_; |
| typedef std::unordered_map<std::string, std::shared_ptr<TableSchema>> |
| TableSchemasMap; |
| TableSchemasMap table_schemas_; |
| int64_t meta_offset_; |
| BloomFilter *bloom_filter_; |
| common::PageArena *page_arena_; |
| |
| int get_table_metaindex_node(const std::string &table_name, |
| MetaIndexNode *&ret_node) { |
| std::map<std::string, std::shared_ptr<MetaIndexNode>>::iterator it = |
| table_metadata_index_node_map_.find(table_name); |
| if (it == table_metadata_index_node_map_.end()) { |
| return E_TABLE_NOT_EXIST; |
| } |
| ret_node = it->second.get(); |
| return error_info::E_OK; |
| } |
| |
| int get_table_schema(const std::string &table_name, |
| std::shared_ptr<TableSchema> &ret_schema) { |
| TableSchemasMap::iterator it = table_schemas_.find(table_name); |
| if (it == table_schemas_.end()) { |
| return E_TABLE_NOT_EXIST; |
| } |
| ret_schema = it->second; |
| return error_info::E_OK; |
| } |
| |
| TsFileMeta() |
| : meta_offset_(0), bloom_filter_(nullptr), page_arena_(nullptr) {} |
| |
| explicit TsFileMeta(common::PageArena *pa) |
| : meta_offset_(0), bloom_filter_(nullptr), page_arena_(pa) {} |
| ~TsFileMeta() { |
| if (bloom_filter_ != nullptr) { |
| bloom_filter_->destroy(); |
| } |
| for (auto properties : tsfile_properties_) { |
| if (properties.second != nullptr) { |
| delete properties.second; |
| } |
| } |
| table_metadata_index_node_map_.clear(); |
| table_schemas_.clear(); |
| } |
| |
| int serialize_to(common::ByteStream &out); |
| |
| int deserialize_from(common::ByteStream &in); |
| |
| #ifndef NDEBUG |
| friend std::ostream &operator<<(std::ostream &os, |
| const TsFileMeta &tsfile_meta) { |
| os << "meta_offset=" << tsfile_meta.meta_offset_; |
| return os; |
| } |
| #endif |
| }; |
| |
| // Timeseries ID and its [start_time, end_time] in a tsfile |
| struct TimeseriesTimeIndexEntry { |
| common::TsID ts_id_; |
| TimeRange time_range_; |
| }; |
| |
| } // end namespace storage |
| #endif // COMMON_TSFILE_COMMON_H |