blob: 50761199d031f2cf38c11286dba1f672e1f04622 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifndef COMMON_SCHEMA_H
#define COMMON_SCHEMA_H
#include <map> // use unordered_map instead
#include <memory>
#include <string>
#include "common/db_common.h"
#include "writer/time_chunk_writer.h"
#include "writer/value_chunk_writer.h"
namespace storage {
class ChunkWriter;
}
namespace storage {
/* schema information for one measurement */
struct MeasurementSchema {
std::string measurement_name_; // for example: "s1"
common::TSDataType data_type_;
common::TSEncoding encoding_;
common::CompressionType compression_type_;
storage::ChunkWriter *chunk_writer_;
ValueChunkWriter *value_chunk_writer_;
std::map<std::string, std::string> props_;
MeasurementSchema()
: measurement_name_(),
data_type_(common::INVALID_DATATYPE),
encoding_(common::INVALID_ENCODING),
compression_type_(common::INVALID_COMPRESSION),
chunk_writer_(nullptr),
value_chunk_writer_(nullptr) {
}
MeasurementSchema(const std::string &measurement_name,
common::TSDataType data_type)
: measurement_name_(measurement_name),
data_type_(data_type),
encoding_(get_default_encoding_for_type(data_type)),
compression_type_(common::UNCOMPRESSED),
chunk_writer_(nullptr),
value_chunk_writer_(nullptr) {
}
MeasurementSchema(const std::string &measurement_name,
common::TSDataType data_type, common::TSEncoding encoding,
common::CompressionType compression_type)
: measurement_name_(measurement_name),
data_type_(data_type),
encoding_(encoding),
compression_type_(compression_type),
chunk_writer_(nullptr),
value_chunk_writer_(nullptr) {
}
int serialize_to(common::ByteStream &out) {
int ret = common::E_OK;
if (RET_FAIL(
common::SerializationUtil::write_str(measurement_name_, out))) {
} else if (RET_FAIL(
common::SerializationUtil::write_ui8(data_type_, out))) {
} else if (RET_FAIL(
common::SerializationUtil::write_ui8(encoding_, out))) {
} else if (RET_FAIL(common::SerializationUtil::write_ui8(
compression_type_, out))) {
}
if (ret == common::E_OK) {
if (RET_FAIL(common::SerializationUtil::write_ui32(props_.size(),
out))) {
for (const auto &prop: props_) {
if (RET_FAIL(common::SerializationUtil::write_str(
prop.first, out))) {
} else if (RET_FAIL(common::SerializationUtil::write_str(
prop.second, out))) {
}
if (IS_FAIL(ret)) break;
}
}
}
return ret;
}
int deserialize_from(common::ByteStream &in) {
int ret = common::E_OK;
uint8_t data_type = common::TSDataType::INVALID_DATATYPE,
encoding = common::TSEncoding::INVALID_ENCODING,
compression_type = common::CompressionType::INVALID_COMPRESSION;
if (RET_FAIL(
common::SerializationUtil::read_str(measurement_name_, in))) {
} else if (RET_FAIL(
common::SerializationUtil::read_ui8(data_type, in))) {
} else if (RET_FAIL(
common::SerializationUtil::read_ui8(encoding, in))) {
} else if (RET_FAIL(common::SerializationUtil::read_ui8(
compression_type, in))) {
}
data_type_ = static_cast<common::TSDataType>(data_type);
encoding_ = static_cast<common::TSEncoding>(encoding);
compression_type_ = static_cast<common::CompressionType>(compression_type);
uint32_t props_size;
if (ret == common::E_OK) {
if (RET_FAIL(common::SerializationUtil::read_ui32(props_size,
in))) {
for (uint32_t i = 0; i < props_.size(); ++i) {
std::string key, value;
if (RET_FAIL(common::SerializationUtil::read_str(
key, in))) {
} else if (RET_FAIL(common::SerializationUtil::read_str(
value, in))) {
}
props_.insert(std::make_pair(key, value));
if (IS_FAIL(ret)) break;
}
}
}
return ret;
}
};
typedef std::map<std::string, MeasurementSchema *> MeasurementSchemaMap;
typedef std::map<std::string, MeasurementSchema *>::iterator
MeasurementSchemaMapIter;
typedef std::pair<MeasurementSchemaMapIter, bool>
MeasurementSchemaMapInsertResult;
/* schema information for a device */
struct MeasurementSchemaGroup {
// measurement_name -> MeasurementSchema
MeasurementSchemaMap measurement_schema_map_;
bool is_aligned_ = false;
TimeChunkWriter *time_chunk_writer_ = nullptr;
};
enum class ColumnCategory { TAG, FIELD };
class TableSchema {
public:
static void to_lowercase_inplace(std::string &str) {
std::transform(str.begin(), str.end(), str.begin(),
[](unsigned char c) -> unsigned char { return std::tolower(c); });
}
TableSchema() = default;
TableSchema(const std::string &table_name,
const std::vector<MeasurementSchema*>
&column_schemas,
const std::vector<ColumnCategory> &column_categories)
: table_name_(table_name),
column_categories_(column_categories) {
to_lowercase_inplace(table_name_);
for (const auto column_schema : column_schemas) {
if (column_schema != nullptr) {
column_schemas_.emplace_back(std::shared_ptr<MeasurementSchema>(column_schema));
}
}
int idx = 0;
for (const auto &measurement_schema: column_schemas_) {
to_lowercase_inplace(measurement_schema->measurement_name_);
column_pos_index_.insert(
std::make_pair(measurement_schema->measurement_name_, idx++));
}
}
TableSchema(TableSchema &&other) noexcept
: table_name_(std::move(other.table_name_)),
column_schemas_(std::move(other.column_schemas_)),
column_categories_(std::move(other.column_categories_)) {
}
TableSchema(const TableSchema &other) = default;
int serialize_to(common::ByteStream &out) {
int ret = common::E_OK;
if (RET_FAIL(common::SerializationUtil::write_var_uint(
column_schemas_.size(), out))) {
} else {
for (size_t i = 0; IS_SUCC(ret) && i < column_schemas_.size();
i++) {
auto column_schema = column_schemas_[i];
auto column_category = column_categories_[i];
if (RET_FAIL(column_schema->serialize_to(out))) {
} else if (RET_FAIL(common::SerializationUtil::write_i8(
static_cast<int8_t>(column_category), out))) {
}
}
}
return ret;
}
int deserialize(common::ByteStream &in) {
int ret = common::E_OK;
uint32_t num_columns;
if (RET_FAIL(common::SerializationUtil::read_var_uint(
num_columns, in))) {
} else {
for (size_t i = 0; IS_SUCC(ret) && i < num_columns;
i++) {
auto column_schema = std::make_shared<MeasurementSchema>();
int8_t column_category = 0;
if (RET_FAIL(column_schema->deserialize_from(in))) {
} else if (RET_FAIL(common::SerializationUtil::read_i8(
column_category, in))) {
}
column_schemas_.emplace_back(column_schema);
column_categories_.emplace_back(static_cast<ColumnCategory>(column_category));
}
}
return ret;
}
~TableSchema() {
column_schemas_.clear();
}
const std::string &get_table_name() { return table_name_; }
std::vector<std::string> get_measurement_names() const {
std::vector<std::string> ret(column_schemas_.size());
for (size_t i = 0; i < column_schemas_.size(); i++) {
ret[i] = column_schemas_[i]->measurement_name_;
}
return ret;
}
int find_column_index(const std::string &column_name) {
std::string lower_case_column_name = to_lower(column_name);
auto it = column_pos_index_.find(lower_case_column_name);
if (it != column_pos_index_.end()) {
return it->second;
} else {
int index = -1;
for (size_t i = 0; i < column_schemas_.size(); ++i) {
if (to_lower(column_schemas_[i]->measurement_name_) ==
lower_case_column_name) {
index = static_cast<int>(i);
break;
}
}
column_pos_index_[lower_case_column_name] = index;
return index;
}
}
void update(ChunkGroupMeta *chunk_group_meta) {
for (auto iter = chunk_group_meta->chunk_meta_list_.begin();
iter != chunk_group_meta->chunk_meta_list_.end(); iter++) {
auto &chunk_meta = iter.get();
int column_idx =
find_column_index(chunk_meta->measurement_name_.to_std_string());
if (column_idx == -1) {
auto measurement_schema = std::make_shared<MeasurementSchema>(
chunk_meta->measurement_name_.to_std_string(),
chunk_meta->data_type_, chunk_meta->encoding_,
chunk_meta->compression_type_);
column_schemas_.emplace_back(measurement_schema);
column_categories_.emplace_back(ColumnCategory::FIELD);
column_pos_index_.insert(
std::make_pair(chunk_meta->measurement_name_.to_std_string(),
column_schemas_.size() - 1));
} else {
auto origin_measurement_schema = column_schemas_.at(column_idx);
if (origin_measurement_schema->data_type_ !=
chunk_meta->data_type_) {
origin_measurement_schema->data_type_ =
common::TSDataType::STRING;
}
}
}
}
std::vector<common::TSDataType> get_data_types() const {
std::vector<common::TSDataType> ret;
for (const auto &measurement_schema: column_schemas_) {
ret.emplace_back(measurement_schema->data_type_);
}
return ret;
}
std::vector<ColumnCategory> get_column_categories() const {
return column_categories_;
}
std::vector<std::shared_ptr<MeasurementSchema> > get_measurement_schemas()
const {
return column_schemas_;
}
private:
static std::string to_lower(const std::string &str) {
std::string result;
std::transform(str.begin(), str.end(), std::back_inserter(result),
[](unsigned char c) -> unsigned char { return std::tolower(c); });
return result;
}
std::string table_name_;
std::vector<std::shared_ptr<MeasurementSchema> > column_schemas_;
std::vector<ColumnCategory> column_categories_;
std::map<std::string, int> column_pos_index_;
};
struct Schema {
typedef std::unordered_map<std::string, std::shared_ptr<TableSchema> >
TableSchemasMap;
TableSchemasMap table_schema_map_;
void update_table_schema(ChunkGroupMeta *chunk_group_meta) {
std::shared_ptr<IDeviceID> device_id = chunk_group_meta->device_id_;
auto table_name = device_id->get_table_name();
if (table_schema_map_.find(table_name) == table_schema_map_.end()) {
table_schema_map_[table_name] = std::make_shared<TableSchema>();
}
table_schema_map_[table_name]->update(chunk_group_meta);
}
};
} // end namespace storage
#endif // COMMON_SCHEMA_H