blob: 321188ecbc509bf8b15c5b5970eaa9bbb6be55b9 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef COMMON_TSFILE_COMMON_H
#define COMMON_TSFILE_COMMON_H
#include <map>
#include <string>
#include <unordered_map>
#include "common/allocator/my_string.h"
#include "common/allocator/page_arena.h"
#include "common/config/config.h"
#include "common/container/list.h"
#include "device_id.h"
#include "reader/bloom_filter.h"
#include "statistic.h"
#include "utils/db_utils.h"
#include "utils/storage_utils.h"
namespace storage {
extern const char *MAGIC_STRING_TSFILE;
extern const int MAGIC_STRING_TSFILE_LEN;
extern const char VERSION_NUM_BYTE;
extern const char CHUNK_GROUP_HEADER_MARKER;
extern const char CHUNK_HEADER_MARKER;
extern const char ONLY_ONE_PAGE_CHUNK_HEADER_MARKER;
extern const char SEPARATOR_MARKER;
extern const char OPERATION_INDEX_RANGE;
typedef int64_t TsFileID;
// TODO review the String.len_ used
// Note, in tsfile_io_writer, we just writer fields of PageHeader
// instead of do a serialize of PageHeader object.
// That is because statistic_ in PageHeader may be omitted if only
// one page exists in the chunk but we know that fact after we writer
// the first page.
struct PageHeader {
uint32_t uncompressed_size_;
uint32_t compressed_size_;
Statistic *statistic_;
PageHeader()
: uncompressed_size_(0), compressed_size_(0), statistic_(nullptr) {}
~PageHeader() { reset(); }
void reset() {
if (statistic_ != nullptr) {
StatisticFactory::free(statistic_);
statistic_ = nullptr;
}
uncompressed_size_ = 0;
compressed_size_ = 0;
}
int deserialize_from(common::ByteStream &in, bool deserialize_stat,
common::TSDataType data_type) {
int ret = error_info::E_OK;
if (RET_FAIL(common::SerializationUtil::read_var_uint(
uncompressed_size_, in))) {
} else if (RET_FAIL(common::SerializationUtil::read_var_uint(
compressed_size_, in))) {
} else if (deserialize_stat) {
statistic_ = StatisticFactory::alloc_statistic(data_type);
if (IS_NULL(statistic_)) {
return E_OOM;
} else if (RET_FAIL(statistic_->deserialize_from(in))) {
}
}
return ret;
}
/** max page header size without statistics. */
static int estimat_max_page_header_size_without_statistics() {
// uncompressedSize, compressedSize
// because we use unsigned varInt to encode these two integer, each
// unsigned varInt will cost at most 5 bytes
return 2 * (4 + 1);
}
#ifndef NDEBUG
friend std::ostream &operator<<(std::ostream &os, const PageHeader &h) {
os << "{uncompressed_size_=" << h.uncompressed_size_
<< ", compressed_size_=" << h.uncompressed_size_;
if (h.statistic_ == nullptr) {
os << ", stat=nil}";
} else {
os << ", stat=" << h.statistic_->to_string() << "}";
}
return os;
}
#endif
};
struct ChunkHeader {
ChunkHeader()
: measurement_name_(""),
data_size_(0),
data_type_(common::INVALID_DATATYPE),
compression_type_(common::INVALID_COMPRESSION),
encoding_type_(common::INVALID_ENCODING),
num_of_pages_(0),
serialized_size_(0),
chunk_type_(0) {}
void reset() {
data_size_ = 0;
num_of_pages_ = 0;
serialized_size_ = 0;
chunk_type_ = 0;
}
~ChunkHeader() = default;
int serialize_to(common::ByteStream &out) {
int ret = error_info::E_OK;
if (RET_FAIL(common::SerializationUtil::write_char(chunk_type_, out))) {
} else if (RET_FAIL(common::SerializationUtil::write_var_str(
measurement_name_, out))) {
} else if (RET_FAIL(common::SerializationUtil::write_var_uint(
data_size_, out))) {
} else if (RET_FAIL(common::SerializationUtil::write_ui8(data_type_,
out))) {
} else if (RET_FAIL(common::SerializationUtil::write_ui8(
compression_type_, out))) {
} else if (RET_FAIL(common::SerializationUtil::write_ui8(
encoding_type_, out))) {
}
return ret;
}
int deserialize_from(common::ByteStream &in) {
int ret = error_info::E_OK;
in.mark_read_pos();
if (RET_FAIL(common::SerializationUtil::read_char(chunk_type_, in))) {
} else if (RET_FAIL(common::SerializationUtil::read_var_str(
measurement_name_, in))) {
} else if (RET_FAIL(common::SerializationUtil::read_var_uint(data_size_,
in))) {
} else if (RET_FAIL(common::SerializationUtil::read_char(
(char &)data_type_, in))) {
} else if (RET_FAIL(common::SerializationUtil::read_char(
(char &)compression_type_, in))) {
} else if (RET_FAIL(common::SerializationUtil::read_char(
(char &)encoding_type_, in))) {
} else {
// There will meet data shortcut.
serialized_size_ = static_cast<int32_t>(in.get_mark_len());
}
return ret;
}
#ifndef NDEBUG
#endif
std::string measurement_name_;
uint32_t data_size_;
common::TSDataType data_type_;
common::CompressionType compression_type_;
common::TSEncoding encoding_type_;
int32_t num_of_pages_;
int32_t serialized_size_; // TODO seems no usage
char chunk_type_; // TODO give a description here
static const int MIN_SERIALIZED_SIZE = 7;
};
struct ChunkMeta {
// std::string measurement_name_;
common::String measurement_name_;
common::TSDataType data_type_;
int64_t offset_of_chunk_header_;
Statistic *statistic_;
char mask_;
common::TSEncoding encoding_;
common::CompressionType compression_type_;
ChunkMeta()
: measurement_name_(),
data_type_(),
offset_of_chunk_header_(0),
statistic_(nullptr),
mask_(0) {}
int init(const common::String &measurement_name,
common::TSDataType data_type, int64_t offset_of_chunk_header,
Statistic *stat, char mask, common::TSEncoding encoding,
common::CompressionType compression_type, common::PageArena &pa) {
// TODO check parameter valid
measurement_name_.dup_from(measurement_name, pa);
data_type_ = data_type;
offset_of_chunk_header_ = offset_of_chunk_header;
statistic_ = stat;
mask_ = mask;
encoding_ = encoding;
compression_type_ = compression_type;
return error_info::E_OK;
}
FORCE_INLINE void clone_statistic_from(Statistic *stat) {
clone_statistic(stat, statistic_, data_type_);
}
FORCE_INLINE int clone_from(ChunkMeta &that, common::PageArena *pa) {
int ret = error_info::E_OK;
if (RET_FAIL(measurement_name_.dup_from(that.measurement_name_, *pa))) {
return ret;
}
data_type_ = that.data_type_;
offset_of_chunk_header_ = that.offset_of_chunk_header_;
if (that.statistic_ != nullptr) {
statistic_ =
StatisticFactory::alloc_statistic_with_pa(data_type_, pa);
if (IS_NULL(statistic_)) {
return E_OOM;
}
clone_statistic_from(that.statistic_);
}
mask_ = that.mask_;
return ret;
}
int serialize_to(common::ByteStream &out, bool serialize_statistic) {
int ret = error_info::E_OK;
if (RET_FAIL(common::SerializationUtil::write_i64(
offset_of_chunk_header_, out))) {
} else if (serialize_statistic) {
ret = statistic_->serialize_to(out);
}
return ret;
}
int deserialize_from(common::ByteStream &in, bool deserialize_stat,
common::PageArena *pa) {
int ret = error_info::E_OK;
if (RET_FAIL(common::SerializationUtil::read_i64(
offset_of_chunk_header_, in))) {
} else if (deserialize_stat) {
statistic_ =
StatisticFactory::alloc_statistic_with_pa(data_type_, pa);
if (IS_NULL(statistic_)) {
ret = E_OOM;
} else {
ret = statistic_->deserialize_from(in);
}
}
return ret;
}
#ifndef NDEBUG
friend std::ostream &operator<<(std::ostream &os, const ChunkMeta &cm) {
os << "{measurement_name=" << cm.measurement_name_
<< ", data_type=" << cm.data_type_
<< ", offset_of_chunk_header=" << cm.offset_of_chunk_header_
<< ", mask=" << ((int)cm.mask_);
if (cm.statistic_ == nullptr) {
os << ", statistic=nil}";
} else {
os << ", statistic=" << cm.statistic_->to_string() << "}";
}
return os;
}
#endif
};
struct ChunkGroupMeta {
std::shared_ptr<IDeviceID> device_id_;
common::SimpleList<ChunkMeta *> chunk_meta_list_;
explicit ChunkGroupMeta(common::PageArena *pa_ptr)
: chunk_meta_list_(pa_ptr) {}
FORCE_INLINE int init(std::shared_ptr<IDeviceID> device_id) {
device_id_ = device_id;
return 0;
}
FORCE_INLINE int push(ChunkMeta *cm) {
return chunk_meta_list_.push_back(cm);
}
};
class ITimeseriesIndex {
public:
ITimeseriesIndex() {}
~ITimeseriesIndex() {}
virtual common::SimpleList<ChunkMeta *> *get_chunk_meta_list() const {
return nullptr;
}
virtual common::SimpleList<ChunkMeta *> *get_time_chunk_meta_list() const {
return nullptr;
}
virtual common::SimpleList<ChunkMeta *> *get_value_chunk_meta_list() const {
return nullptr;
}
virtual common::String get_measurement_name() const {
return common::String();
}
virtual common::TSDataType get_data_type() const {
return common::INVALID_DATATYPE;
}
virtual Statistic *get_statistic() const { return nullptr; }
};
/*
* A TimeseriesIndex may have one or more chunk metas,
* that means we have such a map: <Timeseries, List<ChunkMeta>>.
*/
class TimeseriesIndex : public ITimeseriesIndex {
public:
static const uint32_t CHUNK_META_LIST_SERIALIZED_BUF_PAGE_SIZE = 128;
static const uint32_t PAGE_ARENA_PAGE_SIZE = 256;
static const common::AllocModID PAGE_ARENA_MOD_ID =
common::MOD_TIMESERIES_INDEX_OBJ;
public:
TimeseriesIndex()
: timeseries_meta_type_((char)255),
chunk_meta_list_data_size_(0),
measurement_name_(),
ts_id_(),
data_type_(common::INVALID_DATATYPE),
statistic_(nullptr),
statistic_from_pa_(false),
chunk_meta_list_serialized_buf_(
CHUNK_META_LIST_SERIALIZED_BUF_PAGE_SIZE, PAGE_ARENA_MOD_ID),
chunk_meta_list_(nullptr) {
// page_arena_.init(PAGE_ARENA_PAGE_SIZE, PAGE_ARENA_MOD_ID);
}
~TimeseriesIndex() { destroy(); }
void destroy() {
// page_arena_.destroy();
reset();
}
void reset() // FIXME reuse
{
timeseries_meta_type_ = 0;
chunk_meta_list_data_size_ = 0;
measurement_name_.reset();
ts_id_.reset();
data_type_ = common::VECTOR;
chunk_meta_list_serialized_buf_.reset();
if (statistic_ != nullptr && !statistic_from_pa_) {
StatisticFactory::free(statistic_);
statistic_ = nullptr;
}
}
int add_chunk_meta(ChunkMeta *chunk_meta, bool serialize_statistic);
FORCE_INLINE int set_measurement_name(common::String &measurement_name,
common::PageArena &pa) {
return measurement_name_.dup_from(measurement_name, pa);
}
FORCE_INLINE void set_measurement_name(common::String &measurement_name) {
measurement_name_.shallow_copy_from(measurement_name);
}
FORCE_INLINE virtual common::String get_measurement_name() const {
return measurement_name_;
}
virtual inline common::SimpleList<ChunkMeta *> *get_chunk_meta_list()
const {
return chunk_meta_list_;
}
FORCE_INLINE void set_ts_meta_type(char ts_meta_type) {
timeseries_meta_type_ = ts_meta_type;
}
FORCE_INLINE void set_data_type(common::TSDataType data_type) {
data_type_ = data_type;
}
FORCE_INLINE virtual common::TSDataType get_data_type() const {
return data_type_;
}
int init_statistic(common::TSDataType data_type) {
if (statistic_ != nullptr &&
!statistic_from_pa_) { // clear old statistic
StatisticFactory::free(statistic_);
statistic_ = nullptr;
}
statistic_ = StatisticFactory::alloc_statistic(data_type);
if (IS_NULL(statistic_)) {
return E_OOM;
}
statistic_->reset();
return error_info::E_OK;
}
virtual Statistic *get_statistic() const { return statistic_; }
common::TsID get_ts_id() const { return ts_id_; }
FORCE_INLINE void finish() {
chunk_meta_list_data_size_ =
chunk_meta_list_serialized_buf_.total_size();
}
int serialize_to(common::ByteStream &out) {
int ret = error_info::E_OK;
if (RET_FAIL(common::SerializationUtil::write_char(
timeseries_meta_type_, out))) {
} else if (RET_FAIL(common::SerializationUtil::write_mystring(
measurement_name_, out))) {
} else if (RET_FAIL(common::SerializationUtil::write_char(data_type_,
out))) {
} else if (RET_FAIL(common::SerializationUtil::write_var_uint(
chunk_meta_list_data_size_, out))) {
} else if (RET_FAIL(statistic_->serialize_to(out))) {
} else if (RET_FAIL(merge_byte_stream(
out, chunk_meta_list_serialized_buf_))) {
}
return ret;
}
int deserialize_from(common::ByteStream &in, common::PageArena *pa) {
int ret = error_info::E_OK;
if (RET_FAIL(common::SerializationUtil::read_char(timeseries_meta_type_,
in))) {
} else if (RET_FAIL(common::SerializationUtil::read_mystring(
measurement_name_, pa, in))) {
} else if (RET_FAIL(common::SerializationUtil::read_char(
(char &)data_type_, in))) {
} else if (RET_FAIL(common::SerializationUtil::read_var_uint(
chunk_meta_list_data_size_, in))) {
} else if (nullptr ==
(statistic_ = StatisticFactory::alloc_statistic_with_pa(
data_type_, pa))) {
ret = E_OOM;
} else if (RET_FAIL(statistic_->deserialize_from(in))) {
} else {
statistic_from_pa_ = true;
void *chunk_meta_list_buf = pa->alloc(sizeof(*chunk_meta_list_));
if (IS_NULL(chunk_meta_list_buf)) {
return E_OOM;
}
const bool deserialize_chunk_meta_statistic =
(timeseries_meta_type_ & 0x3F); // TODO
chunk_meta_list_ =
new (chunk_meta_list_buf) common::SimpleList<ChunkMeta *>(pa);
uint32_t start_pos = in.read_pos();
while (IS_SUCC(ret) &&
in.read_pos() < start_pos + chunk_meta_list_data_size_) {
void *cm_buf = pa->alloc(sizeof(ChunkMeta));
if (IS_NULL(cm_buf)) {
ret = E_OOM;
} else {
ChunkMeta *cm = new (cm_buf) ChunkMeta;
cm->measurement_name_.shallow_copy_from(
this->measurement_name_);
cm->data_type_ = this->data_type_;
cm->mask_ = 0; // TODO
if (RET_FAIL(cm->deserialize_from(
in, deserialize_chunk_meta_statistic, pa))) {
} else if (RET_FAIL(chunk_meta_list_->push_back(cm))) {
}
}
}
}
return ret;
}
int clone_from(const TimeseriesIndex &that, common::PageArena *pa) {
int ret = error_info::E_OK;
timeseries_meta_type_ = that.timeseries_meta_type_;
chunk_meta_list_data_size_ = that.chunk_meta_list_data_size_;
ts_id_ = that.ts_id_;
data_type_ = that.data_type_;
statistic_ = StatisticFactory::alloc_statistic_with_pa(data_type_, pa);
if (IS_NULL(statistic_)) {
return E_OOM;
}
clone_statistic(that.statistic_, this->statistic_, data_type_);
statistic_from_pa_ = true;
if (RET_FAIL(measurement_name_.dup_from(that.measurement_name_, *pa))) {
return ret;
}
if (that.chunk_meta_list_ != nullptr) {
void *buf = pa->alloc(sizeof(*chunk_meta_list_));
if (IS_NULL(buf)) {
return E_OOM;
}
chunk_meta_list_ = new (buf) common::SimpleList<ChunkMeta *>(pa);
common::SimpleList<ChunkMeta *>::Iterator it;
for (it = that.chunk_meta_list_->begin();
IS_SUCC(ret) && it != that.chunk_meta_list_->end(); it++) {
ChunkMeta *cm = it.get();
void *cm_buf = pa->alloc(sizeof(ChunkMeta));
if (IS_NULL(cm_buf)) {
return E_OOM;
} else {
ChunkMeta *my_cm = new (cm_buf) ChunkMeta;
if (RET_FAIL(my_cm->clone_from(*cm, pa))) {
} else if (RET_FAIL(chunk_meta_list_->push_back(my_cm))) {
}
}
}
} // end (that.chunk_meta_list_ != nullptr)
return ret;
}
#ifndef NDEBUG
friend std::ostream &operator<<(std::ostream &os,
const TimeseriesIndex &tsi) {
os << "{meta_type=" << (int)tsi.timeseries_meta_type_
<< ", chunk_meta_list_data_size=" << tsi.chunk_meta_list_data_size_
<< ", measurement_name=" << tsi.measurement_name_
<< ", ts_id=" << tsi.ts_id_.to_string()
<< ", data_type=" << common::get_data_type_name(tsi.data_type_)
<< ", statistic=" << tsi.statistic_->to_string();
if (tsi.chunk_meta_list_) {
os << ", chunk_meta_list={";
int count = 0;
common::SimpleList<ChunkMeta *>::Iterator it =
tsi.chunk_meta_list_->begin();
for (; it != tsi.chunk_meta_list_->end(); it++, count++) {
if (count != 0) {
os << ", ";
}
os << "[" << count << "]={" << *it.get() << "}";
}
os << "}";
}
return os;
}
#endif
private:
/*
* If this timeseries has more than one chunk meta, timeseries_meta_type_
* is 1. Otherwise timeseries_meta_type_ is 0. It also should OR with mask
* of chunk meta.
*/
char timeseries_meta_type_;
// Sum of chunk meta serialized size in List<ChunkMeta> of this timeseries.
uint32_t chunk_meta_list_data_size_;
// std::string measurement_name_;
common::String measurement_name_;
common::TsID ts_id_;
common::TSDataType data_type_;
/*
* If TimeseriesIndex has only one ChunkMeta, then
* TimeseriesIndex.statistic_ is duplicated with ChunkMeta.statistic_. In
* this case, we do not serialize ChunkMeta.statistic_.
*/
Statistic *statistic_;
bool statistic_from_pa_;
common::ByteStream chunk_meta_list_serialized_buf_;
// common::PageArena page_arena_;
common::SimpleList<ChunkMeta *> *chunk_meta_list_; // for deserialize_from
};
class AlignedTimeseriesIndex : public ITimeseriesIndex {
public:
TimeseriesIndex *time_ts_idx_;
TimeseriesIndex *value_ts_idx_;
AlignedTimeseriesIndex() {}
~AlignedTimeseriesIndex() {}
virtual common::SimpleList<ChunkMeta *> *get_time_chunk_meta_list() const {
return time_ts_idx_->get_chunk_meta_list();
}
virtual common::SimpleList<ChunkMeta *> *get_value_chunk_meta_list() const {
return value_ts_idx_->get_chunk_meta_list();
}
virtual common::String get_measurement_name() const {
return value_ts_idx_->get_measurement_name();
}
virtual common::TSDataType get_data_type() const {
return time_ts_idx_->get_data_type();
}
virtual Statistic *get_statistic() const {
return value_ts_idx_->get_statistic();
}
#ifndef NDEBUG
friend std::ostream &operator<<(std::ostream &os,
const AlignedTimeseriesIndex &tsi) {
os << "time_ts_idx=" << *tsi.time_ts_idx_;
os << ", value_ts_idx=" << *tsi.value_ts_idx_;
return os;
}
#endif
};
class TSMIterator {
public:
explicit TSMIterator(
common::SimpleList<ChunkGroupMeta *> &chunk_group_meta_list)
: chunk_group_meta_list_(chunk_group_meta_list),
chunk_group_meta_iter_(),
chunk_meta_iter_() {}
// sort => iterate
int init();
bool has_next() const;
int get_next(std::shared_ptr<IDeviceID> &ret_device_id,
common::String &ret_measurement_name,
TimeseriesIndex &ret_ts_index);
private:
common::SimpleList<ChunkGroupMeta *> &chunk_group_meta_list_;
common::SimpleList<ChunkGroupMeta *>::Iterator chunk_group_meta_iter_;
common::SimpleList<ChunkMeta *>::Iterator chunk_meta_iter_;
// timeseries measurenemnt chunk meta info
// map <device_name, <measurement_name, vector<chunk_meta>>>
std::map<std::shared_ptr<IDeviceID>,
std::map<common::String, std::vector<ChunkMeta *>>>
tsm_chunk_meta_info_;
// device iterator
std::map<std::shared_ptr<IDeviceID>,
std::map<common::String, std::vector<ChunkMeta *>>>::iterator
tsm_device_iter_;
// measurement iterator
std::map<common::String, std::vector<ChunkMeta *>>::iterator
tsm_measurement_iter_;
};
/* =============== TsFile Index ================ */
struct IComparable {
virtual ~IComparable() = default;
virtual bool operator<(const IComparable &other) const = 0;
virtual bool operator>(const IComparable &other) const = 0;
virtual bool operator==(const IComparable &other) const = 0;
virtual int compare(const IComparable &other) {
if (this->operator<(other)) {
return -1;
} else if (this->operator==(other)) {
return 0;
} else {
return 1;
}
}
virtual std::string to_string() const = 0;
};
struct DeviceIDComparable : IComparable {
std::shared_ptr<IDeviceID> device_id_;
explicit DeviceIDComparable(const std::shared_ptr<IDeviceID> &device_id)
: device_id_(device_id) {}
bool operator<(const IComparable &other) const override {
const auto *other_device =
dynamic_cast<const DeviceIDComparable *>(&other);
if (!other_device) throw std::runtime_error("Incompatible comparison");
return *device_id_ < *other_device->device_id_;
}
bool operator>(const IComparable &other) const override {
const auto *other_device =
dynamic_cast<const DeviceIDComparable *>(&other);
if (!other_device) throw std::runtime_error("Incompatible comparison");
return *device_id_ != *other_device->device_id_ &&
!(*device_id_ < *other_device->device_id_);
}
bool operator==(const IComparable &other) const override {
const auto *other_device =
dynamic_cast<const DeviceIDComparable *>(&other);
if (!other_device) throw std::runtime_error("Incompatible comparison");
return *device_id_ == *other_device->device_id_;
}
std::string to_string() const override {
return device_id_->get_device_name();
}
};
struct StringComparable : IComparable {
std::string value_;
explicit StringComparable(const std::string &value) : value_(value) {}
bool operator<(const IComparable &other) const override {
const auto *other_string =
dynamic_cast<const StringComparable *>(&other);
if (!other_string) throw std::runtime_error("Incompatible comparison");
return value_ < other_string->value_;
}
bool operator>(const IComparable &other) const override {
const auto *other_string =
dynamic_cast<const StringComparable *>(&other);
if (!other_string) throw std::runtime_error("Incompatible comparison");
return value_ > other_string->value_;
}
bool operator==(const IComparable &other) const override {
const auto *other_string =
dynamic_cast<const StringComparable *>(&other);
if (!other_string) throw std::runtime_error("Incompatible comparison");
return value_ == other_string->value_;
}
std::string to_string() const override { return value_; }
};
struct IMetaIndexEntry {
static void self_destructor(IMetaIndexEntry *ptr) {
if (ptr) {
ptr->~IMetaIndexEntry();
}
}
IMetaIndexEntry() = default;
virtual ~IMetaIndexEntry() = default;
virtual int serialize_to(common::ByteStream &out);
virtual int deserialize_from(common::ByteStream &out,
common::PageArena *pa);
virtual int64_t get_offset() const = 0;
virtual bool is_device_level() const = 0;
virtual std::shared_ptr<IComparable> get_compare_key() const;
virtual common::String get_name() const;
virtual std::shared_ptr<IDeviceID> get_device_id() const;
virtual std::shared_ptr<IMetaIndexEntry> clone(common::PageArena *pa) = 0;
#ifndef NDEBUG
virtual void print(std::ostream &os) const {}
friend std::ostream &operator<<(std::ostream &os,
const IMetaIndexEntry &entry) {
entry.print(os);
return os;
}
#endif
};
struct DeviceMetaIndexEntry : IMetaIndexEntry {
std::shared_ptr<IDeviceID> device_id_;
int64_t offset_;
DeviceMetaIndexEntry() = default;
DeviceMetaIndexEntry(const std::shared_ptr<IDeviceID> &device_id,
const int64_t offset)
: device_id_(device_id), offset_(offset) {}
~DeviceMetaIndexEntry() override = default;
static void self_deleter(DeviceMetaIndexEntry *ptr) {
if (ptr) {
ptr->~DeviceMetaIndexEntry();
}
}
int serialize_to(common::ByteStream &out) override {
int ret = error_info::E_OK;
if (RET_FAIL(device_id_->serialize(out))) {
} else if (RET_FAIL(
common::SerializationUtil::write_i64(offset_, out))) {
}
return ret;
}
std::shared_ptr<IDeviceID> &get_device_id() { return device_id_; }
int deserialize_from(common::ByteStream &in,
common::PageArena *pa) override {
int ret = error_info::E_OK;
device_id_ = std::make_shared<StringArrayDeviceID>("init");
if (RET_FAIL(device_id_->deserialize(in))) {
} else if (RET_FAIL(common::SerializationUtil::read_i64(offset_, in))) {
}
return ret;
}
int64_t get_offset() const override { return offset_; }
std::shared_ptr<IComparable> get_compare_key() const override {
return std::make_shared<DeviceIDComparable>(device_id_);
}
bool is_device_level() const override { return true; }
common::String get_name() const override { return {}; }
std::shared_ptr<IDeviceID> get_device_id() const override {
return device_id_;
}
std::shared_ptr<IMetaIndexEntry> clone(common::PageArena *pa) override {
return std::make_shared<DeviceMetaIndexEntry>(device_id_, offset_);
}
#ifndef NDEBUG
void print(std::ostream &os) const override {
os << "name=" << device_id_ << ", offset=" << offset_;
}
#endif
};
struct MeasurementMetaIndexEntry : IMetaIndexEntry {
common::String name_;
int64_t offset_;
~MeasurementMetaIndexEntry() override = default;
MeasurementMetaIndexEntry() = default;
MeasurementMetaIndexEntry(const common::String &name, const int64_t offset,
common::PageArena &pa) {
offset_ = offset;
name_.dup_from(name, pa);
}
FORCE_INLINE int init(const std::string &str, const int64_t offset,
common::PageArena &pa) {
offset_ = offset;
return name_.dup_from(str, pa);
}
int serialize_to(common::ByteStream &out) override {
int ret = error_info::E_OK;
if (RET_FAIL(common::SerializationUtil::write_mystring(name_, out))) {
} else if (RET_FAIL(
common::SerializationUtil::write_i64(offset_, out))) {
}
return ret;
}
int deserialize_from(common::ByteStream &in,
common::PageArena *pa) override {
int ret = error_info::E_OK;
if (RET_FAIL(common::SerializationUtil::read_mystring(name_, pa, in))) {
} else if (RET_FAIL(common::SerializationUtil::read_i64(offset_, in))) {
}
return ret;
}
int64_t get_offset() const override { return offset_; }
std::shared_ptr<IComparable> get_compare_key() const override {
return std::make_shared<StringComparable>(name_.to_std_string());
}
bool is_device_level() const override { return false; }
common::String get_name() const override { return name_; }
std::shared_ptr<IDeviceID> get_device_id() const override {
return nullptr;
}
std::shared_ptr<IMetaIndexEntry> clone(common::PageArena *pa) override {
return std::make_shared<MeasurementMetaIndexEntry>(name_, offset_, *pa);
}
#ifndef NDEBUG
void print(std::ostream &os) const override {
os << "name=" << name_ << ", offset=" << offset_;
}
#endif
};
enum MetaIndexNodeType {
INTERNAL_DEVICE = 0,
LEAF_DEVICE = 1,
INTERNAL_MEASUREMENT = 2,
LEAF_MEASUREMENT = 3,
INVALID_META_NODE_TYPE = 4,
};
#ifndef NDEBUG
static const char *meta_index_node_type_names[5] = {
"INTERNAL_DEVICE", "LEAF_DEVICE", "INTERNAL_MEASUREMENT",
"LEAF_MEASUREMENT", "INVALID_META_NODE_TYPE"};
#endif
struct MetaIndexNode {
// TODO use vector to support binary search
// common::SimpleList<MetaIndexEntry*> children_;
std::vector<std::shared_ptr<IMetaIndexEntry>> children_;
int64_t end_offset_;
MetaIndexNodeType node_type_;
common::PageArena *pa_;
explicit MetaIndexNode(common::PageArena *pa)
: children_(), end_offset_(0), node_type_(), pa_(pa) {}
std::shared_ptr<IMetaIndexEntry> peek() {
if (children_.empty()) {
return nullptr;
}
return children_[0];
}
~MetaIndexNode() {}
static void self_deleter(MetaIndexNode *ptr) {
if (ptr) {
ptr->~MetaIndexNode();
}
}
int binary_search_children(
std::shared_ptr<IComparable> key, bool exact_search,
std::shared_ptr<IMetaIndexEntry> &ret_index_entry,
int64_t &ret_end_offset);
int serialize_to(common::ByteStream &out) {
int ret = error_info::E_OK;
#if DEBUG_SE
int64_t start_pos = out.total_size();
#endif
if (RET_FAIL(common::SerializationUtil::write_var_uint(children_.size(),
out))) {
} else {
for (size_t i = 0; IS_SUCC(ret) && i < children_.size(); i++) {
auto entry = children_[i];
if (RET_FAIL(entry->serialize_to(out))) {
}
}
if (IS_SUCC(ret)) {
if (RET_FAIL(common::SerializationUtil::write_i64(end_offset_,
out))) {
} else if (RET_FAIL(common::SerializationUtil::write_char(
node_type_, out))) {
}
}
}
#if DEBUG_SE
std::cout << "MetaIndexNode serialize_to. this=" << *this
<< " at file pos: " << start_pos << " to " << out.total_size()
<< std::endl;
#endif
return ret;
}
int deserialize_from(char *buf, int len) {
common::ByteStream bs;
bs.wrap_from(buf, len);
return deserialize_from(bs);
}
int deserialize_from(common::ByteStream &in) {
int ret = error_info::E_OK;
uint32_t children_size = 0;
if (RET_FAIL(
common::SerializationUtil::read_var_uint(children_size, in))) {
return ret;
}
for (uint32_t i = 0; i < children_size && IS_SUCC(ret); i++) {
void *entry_buf = pa_->alloc(sizeof(MeasurementMetaIndexEntry));
if (IS_NULL(entry_buf)) {
return E_OOM;
}
auto entry = new (entry_buf) MeasurementMetaIndexEntry;
if (RET_FAIL(entry->deserialize_from(in, pa_))) {
} else {
children_.push_back(std::shared_ptr<IMetaIndexEntry>(
entry, IMetaIndexEntry::self_destructor));
}
} // end for
if (IS_SUCC(ret)) {
char node_type_ch = 0;
if (RET_FAIL(
common::SerializationUtil::read_i64(end_offset_, in))) {
} else if (RET_FAIL(common::SerializationUtil::read_char(
node_type_ch, in))) {
} else {
node_type_ = (MetaIndexNodeType)node_type_ch;
}
}
#if DEBUG_SE
std::cout << "MetaIndexNode deserialize_from. this=" << *this
<< std::endl;
#endif
return ret;
}
int device_deserialize_from(char *buf, int len) {
common::ByteStream bs;
bs.wrap_from(buf, len);
return device_deserialize_from(bs);
}
int device_deserialize_from(common::ByteStream &in) {
int ret = error_info::E_OK;
uint32_t children_size = 0;
if (RET_FAIL(
common::SerializationUtil::read_var_uint(children_size, in))) {
return ret;
}
for (uint32_t i = 0; i < children_size && IS_SUCC(ret); i++) {
void *entry_buf = pa_->alloc(sizeof(DeviceMetaIndexEntry));
if (IS_NULL(entry_buf)) {
return E_OOM;
}
auto* entry_ptr = new(entry_buf) DeviceMetaIndexEntry();
auto entry = std::shared_ptr<DeviceMetaIndexEntry>(
entry_ptr, DeviceMetaIndexEntry::self_deleter);
if (RET_FAIL(entry->deserialize_from(in, pa_))) {
} else {
children_.push_back(entry);
}
} // end for
if (IS_SUCC(ret)) {
char node_type_ch = 0;
if (RET_FAIL(
common::SerializationUtil::read_i64(end_offset_, in))) {
} else if (RET_FAIL(common::SerializationUtil::read_char(
node_type_ch, in))) {
} else {
node_type_ = (MetaIndexNodeType)node_type_ch;
}
}
#if DEBUG_SE
std::cout << "MetaIndexNode deserialize_from. this=" << *this
<< std::endl;
#endif
return ret;
}
#ifndef NDEBUG
friend std::ostream &operator<<(std::ostream &os,
const MetaIndexNode &node) {
os << "end_offset=" << node.end_offset_
<< ", node_type=" << meta_index_node_type_names[node.node_type_];
os << ", MetaIndexEntry children={";
for (size_t i = 0; i < node.children_.size(); i++) {
os << (i == 0 ? "" : ", ") << "[" << i << "]={"
<< *node.children_[i] << "}";
}
os << "}";
return os;
}
#endif
FORCE_INLINE bool is_full() const {
return children_.size() >=
common::g_config_value_.max_degree_of_index_node_;
}
FORCE_INLINE bool is_empty() const { return children_.size() == 0; }
FORCE_INLINE int push_entry(std::shared_ptr<IMetaIndexEntry> entry) {
#if DEBUG_SE
std::cout << "MetaIndexNode.push_entry(" << *entry << ")" << std::endl;
#endif
children_.push_back(entry);
return error_info::E_OK;
}
FORCE_INLINE void destroy() {
// std::vector<MetaIndexEntry*>().swap(children_);
children_.~vector();
}
};
class TableSchema;
struct TsFileMeta {
typedef std::map<std::shared_ptr<IDeviceID>, std::shared_ptr<MetaIndexNode>,
IDeviceIDComparator>
DeviceNodeMap;
std::map<std::string, std::shared_ptr<MetaIndexNode>>
table_metadata_index_node_map_;
std::unordered_map<std::string, std::string *> tsfile_properties_;
typedef std::unordered_map<std::string, std::shared_ptr<TableSchema>>
TableSchemasMap;
TableSchemasMap table_schemas_;
int64_t meta_offset_;
BloomFilter *bloom_filter_;
common::PageArena *page_arena_;
int get_table_metaindex_node(const std::string &table_name,
MetaIndexNode *&ret_node) {
std::map<std::string, std::shared_ptr<MetaIndexNode>>::iterator it =
table_metadata_index_node_map_.find(table_name);
if (it == table_metadata_index_node_map_.end()) {
return E_TABLE_NOT_EXIST;
}
ret_node = it->second.get();
return error_info::E_OK;
}
int get_table_schema(const std::string &table_name,
std::shared_ptr<TableSchema> &ret_schema) {
TableSchemasMap::iterator it = table_schemas_.find(table_name);
if (it == table_schemas_.end()) {
return E_TABLE_NOT_EXIST;
}
ret_schema = it->second;
return error_info::E_OK;
}
TsFileMeta()
: meta_offset_(0), bloom_filter_(nullptr), page_arena_(nullptr) {}
explicit TsFileMeta(common::PageArena *pa)
: meta_offset_(0), bloom_filter_(nullptr), page_arena_(pa) {}
~TsFileMeta() {
if (bloom_filter_ != nullptr) {
bloom_filter_->destroy();
}
for (auto properties : tsfile_properties_) {
if (properties.second != nullptr) {
delete properties.second;
}
}
table_metadata_index_node_map_.clear();
table_schemas_.clear();
}
int serialize_to(common::ByteStream &out);
int deserialize_from(common::ByteStream &in);
#ifndef NDEBUG
friend std::ostream &operator<<(std::ostream &os,
const TsFileMeta &tsfile_meta) {
os << "meta_offset=" << tsfile_meta.meta_offset_;
return os;
}
#endif
};
// Timeseries ID and its [start_time, end_time] in a tsfile
struct TimeseriesTimeIndexEntry {
common::TsID ts_id_;
TimeRange time_range_;
};
} // end namespace storage
#endif // COMMON_TSFILE_COMMON_H