blob: 06e7e7e4202f2ce63087b877a9a5b1823614c46a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef COMMON_SCHEMA_H
#define COMMON_SCHEMA_H
#include <writer/chunk_writer.h>
#include <algorithm>
#include <map> // use unordered_map instead
#include <memory>
#include <string>
#include <unordered_map>
#include "common/db_common.h"
#include "writer/time_chunk_writer.h"
#include "writer/value_chunk_writer.h"
namespace storage {
class ChunkWriter;
class ValueChunkWriter;
class TimeChunkWriter;
} // namespace storage
namespace storage {
/* schema information for one measurement */
struct MeasurementSchema {
std::string measurement_name_; // for example: "s1"
common::TSDataType data_type_;
common::TSEncoding encoding_;
common::CompressionType compression_type_;
storage::ChunkWriter *chunk_writer_;
ValueChunkWriter *value_chunk_writer_;
std::map<std::string, std::string> props_;
MeasurementSchema()
: measurement_name_(),
data_type_(common::INVALID_DATATYPE),
encoding_(common::INVALID_ENCODING),
compression_type_(common::INVALID_COMPRESSION),
chunk_writer_(nullptr),
value_chunk_writer_(nullptr) {}
MeasurementSchema(const std::string &measurement_name,
common::TSDataType data_type)
: measurement_name_(measurement_name),
data_type_(data_type),
encoding_(common::get_value_encoder(data_type)),
compression_type_(common::get_default_compressor()),
chunk_writer_(nullptr),
value_chunk_writer_(nullptr) {}
MeasurementSchema(const std::string &measurement_name,
common::TSDataType data_type, common::TSEncoding encoding,
common::CompressionType compression_type)
: measurement_name_(measurement_name),
data_type_(data_type),
encoding_(encoding),
compression_type_(compression_type),
chunk_writer_(nullptr),
value_chunk_writer_(nullptr) {}
~MeasurementSchema() {
if (chunk_writer_ != nullptr) {
delete chunk_writer_;
chunk_writer_ = nullptr;
}
if (value_chunk_writer_ != nullptr) {
delete value_chunk_writer_;
value_chunk_writer_ = nullptr;
}
}
int serialize_to(common::ByteStream &out) {
int ret = common::E_OK;
if (RET_FAIL(
common::SerializationUtil::write_str(measurement_name_, out))) {
} else if (RET_FAIL(
common::SerializationUtil::write_ui8(data_type_, out))) {
} else if (RET_FAIL(
common::SerializationUtil::write_ui8(encoding_, out))) {
} else if (RET_FAIL(common::SerializationUtil::write_ui8(
compression_type_, out))) {
}
if (ret == common::E_OK) {
if (RET_FAIL(common::SerializationUtil::write_ui32(props_.size(),
out))) {
for (const auto &prop : props_) {
if (RET_FAIL(common::SerializationUtil::write_str(
prop.first, out))) {
} else if (RET_FAIL(common::SerializationUtil::write_str(
prop.second, out))) {
}
if (IS_FAIL(ret)) break;
}
}
}
return ret;
}
int deserialize_from(common::ByteStream &in) {
int ret = common::E_OK;
uint8_t data_type = common::TSDataType::INVALID_DATATYPE,
encoding = common::TSEncoding::INVALID_ENCODING,
compression_type = common::CompressionType::INVALID_COMPRESSION;
if (RET_FAIL(
common::SerializationUtil::read_str(measurement_name_, in))) {
} else if (RET_FAIL(
common::SerializationUtil::read_ui8(data_type, in))) {
} else if (RET_FAIL(
common::SerializationUtil::read_ui8(encoding, in))) {
} else if (RET_FAIL(common::SerializationUtil::read_ui8(
compression_type, in))) {
}
data_type_ = static_cast<common::TSDataType>(data_type);
encoding_ = static_cast<common::TSEncoding>(encoding);
compression_type_ =
static_cast<common::CompressionType>(compression_type);
uint32_t props_size;
if (ret == common::E_OK) {
if (RET_FAIL(
common::SerializationUtil::read_ui32(props_size, in))) {
for (uint32_t i = 0; i < props_.size(); ++i) {
std::string key, value;
if (RET_FAIL(
common::SerializationUtil::read_str(key, in))) {
} else if (RET_FAIL(common::SerializationUtil::read_str(
value, in))) {
}
props_.insert(std::make_pair(key, value));
if (IS_FAIL(ret)) break;
}
}
}
return ret;
}
};
typedef std::map<std::string, MeasurementSchema *> MeasurementSchemaMap;
typedef std::map<std::string, MeasurementSchema *>::iterator
MeasurementSchemaMapIter;
typedef std::pair<MeasurementSchemaMapIter, bool>
MeasurementSchemaMapInsertResult;
/* schema information for a device */
struct MeasurementSchemaGroup {
// measurement_name -> MeasurementSchema
MeasurementSchemaMap measurement_schema_map_;
bool is_aligned_ = false;
TimeChunkWriter *time_chunk_writer_ = nullptr;
~MeasurementSchemaGroup() {
if (time_chunk_writer_ != nullptr) {
delete time_chunk_writer_;
time_chunk_writer_ = nullptr;
}
}
};
/**
* @brief Represents the schema information for an entire table.
*
* This class holds the metadata necessary to describe how a specific table is
* structured, including its name and the schemas of all its columns.
*/
class TableSchema {
public:
TableSchema() = default;
/**
* Constructs a TableSchema object with the given table name, column
* schemas, and column categories.
*
* @param table_name The name of the table. Must be a non-empty string.
* This name is used to identify the table within the
* system.
* @param column_schemas A vector containing ColumnSchema objects.
* Each ColumnSchema defines the schema for one column
* in the table.
*/
TableSchema(const std::string &table_name,
const std::vector<common::ColumnSchema> &column_schemas)
: table_name_(table_name) {
to_lowercase_inplace(table_name_);
for (const common::ColumnSchema &column_schema : column_schemas) {
column_schemas_.emplace_back(std::make_shared<MeasurementSchema>(
column_schema.get_column_name(),
column_schema.get_data_type()));
column_categories_.emplace_back(
column_schema.get_column_category());
}
int idx = 0;
for (const auto &measurement_schema : column_schemas_) {
to_lowercase_inplace(measurement_schema->measurement_name_);
column_pos_index_.insert(
std::make_pair(measurement_schema->measurement_name_, idx++));
}
}
TableSchema(const std::string &table_name,
const std::vector<MeasurementSchema *> &column_schemas,
const std::vector<common::ColumnCategory> &column_categories)
: table_name_(table_name), column_categories_(column_categories) {
to_lowercase_inplace(table_name_);
for (const auto column_schema : column_schemas) {
if (column_schema != nullptr) {
column_schemas_.emplace_back(
std::shared_ptr<MeasurementSchema>(column_schema));
}
}
int idx = 0;
for (const auto &measurement_schema : column_schemas_) {
to_lowercase_inplace(measurement_schema->measurement_name_);
column_pos_index_.insert(
std::make_pair(measurement_schema->measurement_name_, idx++));
}
}
TableSchema(TableSchema &&other) noexcept
: table_name_(std::move(other.table_name_)),
column_schemas_(std::move(other.column_schemas_)),
column_categories_(std::move(other.column_categories_)) {}
TableSchema(const TableSchema &other) noexcept
: table_name_(other.table_name_),
column_categories_(other.column_categories_) {
for (const auto &column_schema : other.column_schemas_) {
// Just call default construction
column_schemas_.emplace_back(
std::make_shared<MeasurementSchema>(*column_schema));
}
int idx = 0;
for (const auto &measurement_schema : column_schemas_) {
column_pos_index_.insert(
std::make_pair(measurement_schema->measurement_name_, idx++));
}
}
int serialize_to(common::ByteStream &out) {
int ret = common::E_OK;
if (RET_FAIL(common::SerializationUtil::write_var_uint(
column_schemas_.size(), out))) {
} else {
for (size_t i = 0; IS_SUCC(ret) && i < column_schemas_.size();
i++) {
auto column_schema = column_schemas_[i];
auto column_category = column_categories_[i];
if (RET_FAIL(column_schema->serialize_to(out))) {
} else if (RET_FAIL(common::SerializationUtil::write_i32(
static_cast<int32_t>(column_category), out))) {
}
}
}
return ret;
}
int deserialize(common::ByteStream &in) {
int ret = common::E_OK;
uint32_t num_columns;
if (RET_FAIL(
common::SerializationUtil::read_var_uint(num_columns, in))) {
} else {
for (size_t i = 0; IS_SUCC(ret) && i < num_columns; i++) {
auto column_schema = std::make_shared<MeasurementSchema>();
int32_t column_category = 0;
if (RET_FAIL(column_schema->deserialize_from(in))) {
} else if (RET_FAIL(common::SerializationUtil::read_i32(
column_category, in))) {
}
column_schemas_.emplace_back(column_schema);
column_categories_.emplace_back(
static_cast<common::ColumnCategory>(column_category));
}
}
return ret;
}
~TableSchema() { column_schemas_.clear(); }
const std::string &get_table_name() { return table_name_; }
void set_table_name(const std::string &table_name) {
table_name_ = table_name;
}
std::vector<std::string> get_measurement_names() const {
std::vector<std::string> ret(column_schemas_.size());
for (size_t i = 0; i < column_schemas_.size(); i++) {
ret[i] = column_schemas_[i]->measurement_name_;
}
return ret;
}
int32_t get_columns_num() const { return column_schemas_.size(); }
int find_column_index(const std::string &column_name) {
std::string lower_case_column_name = to_lower(column_name);
auto it = column_pos_index_.find(lower_case_column_name);
if (it != column_pos_index_.end()) {
return it->second;
} else {
int index = -1;
for (size_t i = 0; i < column_schemas_.size(); ++i) {
if (column_schemas_[i]->measurement_name_ ==
lower_case_column_name) {
index = static_cast<int>(i);
break;
}
}
if (index != -1) {
column_pos_index_[lower_case_column_name] = index;
}
return index;
}
}
size_t get_column_pos_index_num() const { return column_pos_index_.size(); }
void update(ChunkGroupMeta *chunk_group_meta) {
for (auto iter = chunk_group_meta->chunk_meta_list_.begin();
iter != chunk_group_meta->chunk_meta_list_.end(); iter++) {
auto &chunk_meta = iter.get();
if (chunk_meta->data_type_ == common::VECTOR) {
continue;
}
int column_idx = find_column_index(
chunk_meta->measurement_name_.to_std_string());
if (column_idx == -1) {
auto measurement_schema = std::make_shared<MeasurementSchema>(
chunk_meta->measurement_name_.to_std_string(),
chunk_meta->data_type_, chunk_meta->encoding_,
chunk_meta->compression_type_);
column_schemas_.emplace_back(measurement_schema);
column_categories_.emplace_back(common::ColumnCategory::FIELD);
column_pos_index_.insert(std::make_pair(
chunk_meta->measurement_name_.to_std_string(),
column_schemas_.size() - 1));
} else {
auto origin_measurement_schema = column_schemas_.at(column_idx);
if (origin_measurement_schema->data_type_ !=
chunk_meta->data_type_) {
origin_measurement_schema->data_type_ =
common::TSDataType::STRING;
}
}
}
}
std::vector<common::TSDataType> get_data_types() const {
std::vector<common::TSDataType> ret;
for (const auto &measurement_schema : column_schemas_) {
ret.emplace_back(measurement_schema->data_type_);
}
return ret;
}
std::vector<common::ColumnCategory> get_column_categories() const {
return column_categories_;
}
std::vector<std::shared_ptr<MeasurementSchema> > get_measurement_schemas()
const {
return column_schemas_;
}
common::ColumnSchema get_column_schema(const std::string &column_name) {
int column_idx = find_column_index(column_name);
if (column_idx == -1) {
return common::ColumnSchema();
} else {
return common::ColumnSchema(
column_schemas_[column_idx]->measurement_name_,
column_schemas_[column_idx]->data_type_,
column_schemas_[column_idx]->compression_type_,
column_schemas_[column_idx]->encoding_,
column_categories_[column_idx]);
}
}
int32_t find_id_column_order(const std::string &column_name) {
std::string lower_case_column_name = to_lower(column_name);
int column_order = 0;
for (size_t i = 0; i < column_schemas_.size(); ++i) {
if (column_schemas_[i]->measurement_name_ ==
lower_case_column_name &&
column_categories_[i] == common::ColumnCategory::TAG) {
return column_order;
} else if (column_categories_[i] == common::ColumnCategory::TAG) {
column_order++;
}
}
return -1;
}
private:
std::string table_name_;
std::vector<std::shared_ptr<MeasurementSchema> > column_schemas_;
std::vector<common::ColumnCategory> column_categories_;
std::map<std::string, int> column_pos_index_;
};
struct Schema {
typedef std::unordered_map<std::string, std::shared_ptr<TableSchema> >
TableSchemasMap;
TableSchemasMap table_schema_map_;
void update_table_schema(ChunkGroupMeta *chunk_group_meta) {
std::shared_ptr<IDeviceID> device_id = chunk_group_meta->device_id_;
auto table_name = device_id->get_table_name();
if (table_schema_map_.find(table_name) == table_schema_map_.end()) {
table_schema_map_[table_name] = std::make_shared<TableSchema>();
}
table_schema_map_[table_name]->update(chunk_group_meta);
}
void register_table_schema(
const std::shared_ptr<TableSchema> &table_schema) {
table_schema_map_[table_schema->get_table_name()] = table_schema;
}
};
} // end namespace storage
#endif // COMMON_SCHEMA_H