blob: 3ae5cf28a538b053febf8f136ab6acc9c9932584 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "tsfile_writer.h"
#include <unistd.h>
#include "chunk_writer.h"
#include "common/config/config.h"
#include "file/tsfile_io_writer.h"
#include "file/write_file.h"
#include "utils/errno_define.h"
using namespace common;
namespace storage {
int libtsfile_init() {
static bool g_s_is_inited = false;
if (g_s_is_inited) {
return E_OK;
}
ModStat::get_instance().init();
init_common();
g_s_is_inited = true;
return E_OK;
}
void libtsfile_destroy() { ModStat::get_instance().destroy(); }
void set_page_max_point_count(uint32_t page_max_ponint_count) {
config_set_page_max_point_count(page_max_ponint_count);
}
void set_max_degree_of_index_node(uint32_t max_degree_of_index_node) {
config_set_max_degree_of_index_node(max_degree_of_index_node);
}
TsFileWriter::TsFileWriter()
: write_file_(nullptr),
io_writer_(nullptr),
schemas_(),
start_file_done_(false),
record_count_since_last_flush_(0),
record_count_for_next_mem_check_(
g_config_value_.record_count_for_next_mem_check_),
write_file_created_(false) {}
TsFileWriter::~TsFileWriter() { destroy(); }
void TsFileWriter::destroy() {
if (write_file_created_ && write_file_ != nullptr) {
delete write_file_;
write_file_ = nullptr;
}
if (io_writer_) {
delete io_writer_;
io_writer_ = nullptr;
}
DeviceSchemasMapIter dev_iter;
// cppcheck-suppress postfixOperator
for (dev_iter = schemas_.begin(); dev_iter != schemas_.end(); dev_iter++) {
MeasurementSchemaMap &ms_map =
dev_iter->second->measurement_schema_map_;
MeasurementSchemaMapIter ms_iter;
for (ms_iter = ms_map.begin(); ms_iter != ms_map.end(); ms_iter++) {
MeasurementSchema *ms = ms_iter->second;
if (ms != nullptr) {
if (ms->chunk_writer_ != nullptr) {
delete ms->chunk_writer_;
ms->chunk_writer_ = nullptr;
}
delete ms;
ms_iter->second = nullptr;
}
}
delete dev_iter->second;
dev_iter->second = nullptr;
}
schemas_.clear();
record_count_since_last_flush_ = 0;
}
int TsFileWriter::init(WriteFile *write_file) {
if (write_file == nullptr) {
return E_INVALID_ARG;
} else if (!write_file->file_opened()) {
return E_INVALID_ARG;
}
write_file_ = write_file;
write_file_created_ = false;
io_writer_ = new TsFileIOWriter();
io_writer_->init(write_file_);
return E_OK;
}
void TsFileWriter::set_generate_table_schema(bool generate_table_schema) {
io_writer_->set_generate_table_schema(generate_table_schema);
}
int TsFileWriter::register_table(
const std::shared_ptr<TableSchema> &table_schema) {
if (!table_schema) return E_INVALID_ARG;
// Empty table name or column name is not allowed.
if (table_schema->get_table_name().empty()) {
return E_INVALID_ARG;
}
for (const auto &name : table_schema->get_measurement_names()) {
if (name.empty()) {
return E_INVALID_ARG;
}
}
// Because it is not possible to return an error code for duplicate name
// checks during the construction phase of TabletSchema, the duplicate name
// check has been moved to the table registration stage.
// TODO: Add Debug INFO if ErrorCode is not enough to describe problems.
if (table_schema->get_column_pos_index_num() !=
table_schema->get_measurement_names().size()) {
return E_INVALID_ARG;
}
if (io_writer_->get_schema()->table_schema_map_.find(
table_schema->get_table_name()) !=
io_writer_->get_schema()->table_schema_map_.end()) {
return E_ALREADY_EXIST;
}
io_writer_->get_schema()->register_table_schema(table_schema);
return E_OK;
}
bool check_file_exist(const std::string &file_path) {
return access(file_path.c_str(), F_OK) == 0;
}
int TsFileWriter::open(const std::string &file_path, int flags, mode_t mode) {
if (check_file_exist(file_path)) {
return E_ALREADY_EXIST;
}
write_file_ = new WriteFile;
write_file_created_ = true;
io_writer_ = new TsFileIOWriter;
int ret = E_OK;
if (RET_FAIL(write_file_->create(file_path, flags, mode))) {
} else {
io_writer_->init(write_file_);
}
return ret;
}
int TsFileWriter::open(const std::string &file_path) {
return open(file_path, O_RDWR | O_CREAT | O_TRUNC, 0666);
}
int TsFileWriter::register_aligned_timeseries(
const std::string &device_id, const MeasurementSchema &measurement_schema) {
MeasurementSchema *ms = new MeasurementSchema(
measurement_schema.measurement_name_, measurement_schema.data_type_,
measurement_schema.encoding_, measurement_schema.compression_type_);
return register_timeseries(device_id, ms, true);
}
int TsFileWriter::register_aligned_timeseries(
const std::string &device_id,
const std::vector<MeasurementSchema *> &measurement_schemas) {
int ret = E_OK;
for (auto it : measurement_schemas) {
ret = register_timeseries(device_id, it, true);
if (ret != E_OK) {
return ret;
}
}
return ret;
}
int TsFileWriter::register_timeseries(
const std::string &device_id, const MeasurementSchema &measurement_schema) {
MeasurementSchema *ms = new MeasurementSchema(
measurement_schema.measurement_name_, measurement_schema.data_type_,
measurement_schema.encoding_, measurement_schema.compression_type_);
return register_timeseries(device_id, ms, false);
}
int TsFileWriter::register_timeseries(const std::string &device_path,
MeasurementSchema *measurement_schema,
bool is_aligned) {
std::shared_ptr<IDeviceID> device_id =
std::make_shared<StringArrayDeviceID>(device_path);
DeviceSchemasMapIter device_iter = schemas_.find(device_id);
if (device_iter != schemas_.end()) {
MeasurementSchemaMap &msm =
device_iter->second->measurement_schema_map_;
MeasurementSchemaMapInsertResult ins_res = msm.insert(std::make_pair(
measurement_schema->measurement_name_, measurement_schema));
if (UNLIKELY(!ins_res.second)) {
return E_ALREADY_EXIST;
}
} else {
MeasurementSchemaGroup *ms_group = new MeasurementSchemaGroup;
ms_group->is_aligned_ = is_aligned;
ms_group->measurement_schema_map_.insert(std::make_pair(
measurement_schema->measurement_name_, measurement_schema));
schemas_.insert(std::make_pair(device_id, ms_group));
}
return E_OK;
}
int TsFileWriter::register_timeseries(
const std::string &device_id,
const std::vector<MeasurementSchema *> &measurement_schema_vec) {
int ret = E_OK;
auto it = measurement_schema_vec.begin();
for (; it != measurement_schema_vec.end();
it++) { // cppcheck-suppress postfixOperator
ret = register_timeseries(device_id, *it);
if (ret != E_OK) {
return ret;
}
}
return ret;
}
struct MeasurementSchemaMapNamesGetter {
public:
explicit MeasurementSchemaMapNamesGetter(
const MeasurementSchemaMap &measurement_schema_map)
: measurement_schema_map_(
const_cast<MeasurementSchemaMap &>(measurement_schema_map)) {
measurement_name_idx_ = measurement_schema_map_.begin();
}
FORCE_INLINE uint32_t get_count() const {
return measurement_schema_map_.size();
}
FORCE_INLINE const std::string &next() {
ASSERT(measurement_name_idx_ != measurement_schema_map_.end());
std::string &ret = measurement_name_idx_->second->measurement_name_;
measurement_name_idx_++;
return ret;
}
private:
MeasurementSchemaMap &measurement_schema_map_;
MeasurementSchemaMap::iterator measurement_name_idx_;
};
struct MeasurementNamesFromRecord {
public:
explicit MeasurementNamesFromRecord(const TsRecord &record)
: record_(record), measurement_name_idx_(0) {}
FORCE_INLINE uint32_t get_count() const { return record_.points_.size(); }
FORCE_INLINE const std::string &next() {
return this->at(measurement_name_idx_++);
}
private:
const TsRecord &record_;
size_t measurement_name_idx_;
FORCE_INLINE const std::string &at(size_t idx) const {
ASSERT(idx < record_.points_.size());
return record_.points_[idx].measurement_name_;
}
};
struct MeasurementNamesFromTablet {
explicit MeasurementNamesFromTablet(const Tablet &tablet)
: tablet_(tablet), measurement_name_idx_(0) {}
FORCE_INLINE uint32_t get_count() const {
return tablet_.schema_vec_->size();
}
FORCE_INLINE const std::string &next() {
return this->at(measurement_name_idx_++);
}
private:
const Tablet &tablet_;
size_t measurement_name_idx_;
FORCE_INLINE const std::string &at(size_t idx) const {
ASSERT(idx < tablet_.schema_vec_->size());
return tablet_.schema_vec_->at(idx).measurement_name_;
}
};
int TsFileWriter::do_check_and_prepare_tablet(Tablet &tablet) {
if (tablet.column_categories_.empty()) {
auto &schema_map = io_writer_->get_schema()->table_schema_map_;
auto table_schema_it = schema_map.find(tablet.get_table_name());
auto table_schema = table_schema_it->second;
uint32_t column_cnt = tablet.get_column_count();
for (uint32_t i = 0; i < column_cnt; i++) {
auto &col_name = tablet.get_column_name(i);
int col_index = table_schema->find_column_index(col_name);
if (col_index == -1) {
return E_COLUMN_NOT_EXIST;
}
if (table_schema->get_data_types()[col_index] !=
tablet.schema_vec_->at(i).data_type_) {
return E_TYPE_NOT_MATCH;
}
const common::ColumnCategory column_category =
table_schema->get_column_categories()[col_index];
tablet.column_categories_.emplace_back(column_category);
if (column_category == ColumnCategory::TAG) {
tablet.id_column_indexes_.push_back(i);
}
}
}
return common::E_OK;
}
template <typename MeasurementNamesGetter>
int TsFileWriter::do_check_schema(
std::shared_ptr<IDeviceID> device_id,
MeasurementNamesGetter &measurement_names,
SimpleVector<ChunkWriter *> &chunk_writers,
SimpleVector<common::TSDataType> &data_types) {
int ret = E_OK;
DeviceSchemasMapIter dev_it = schemas_.find(device_id);
MeasurementSchemaGroup *device_schema = nullptr;
if (UNLIKELY(dev_it == schemas_.end()) ||
IS_NULL(device_schema = dev_it->second)) {
return E_DEVICE_NOT_EXIST;
}
MeasurementSchemaMap &msm = device_schema->measurement_schema_map_;
uint32_t measurement_count = measurement_names.get_count();
// chunk_writers.reserve(measurement_count);
for (uint32_t i = 0; i < measurement_count; i++) {
auto ms_iter = msm.find(measurement_names.next());
if (UNLIKELY(ms_iter == msm.end())) {
chunk_writers.push_back(NULL);
data_types.push_back(common::NULL_TYPE);
} else {
// In Java we will check data_type. But in C++, no check here.
// Because checks are performed at the chunk layer and page layer
MeasurementSchema *ms = ms_iter->second;
if (IS_NULL(ms->chunk_writer_)) {
ms->chunk_writer_ = new ChunkWriter;
ret = ms->chunk_writer_->init(ms->measurement_name_,
ms->data_type_, ms->encoding_,
ms->compression_type_);
if (IS_SUCC(ret)) {
chunk_writers.push_back(ms->chunk_writer_);
} else {
for (size_t chunk_writer_idx = 0;
chunk_writer_idx < chunk_writers.size();
chunk_writer_idx++) {
if (!chunk_writers[chunk_writer_idx]) {
delete chunk_writers[chunk_writer_idx];
}
}
ret = common::E_INVALID_ARG;
return ret;
}
} else {
chunk_writers.push_back(ms->chunk_writer_);
}
data_types.push_back(ms->data_type_);
}
}
return ret;
}
template <typename MeasurementNamesGetter>
int TsFileWriter::do_check_schema_aligned(
std::shared_ptr<IDeviceID> device_id,
MeasurementNamesGetter &measurement_names,
storage::TimeChunkWriter *&time_chunk_writer,
common::SimpleVector<storage::ValueChunkWriter *> &value_chunk_writers,
SimpleVector<common::TSDataType> &data_types) {
int ret = E_OK;
auto dev_it = schemas_.find(device_id);
MeasurementSchemaGroup *device_schema = NULL;
if (UNLIKELY(dev_it == schemas_.end()) ||
IS_NULL(device_schema = dev_it->second)) {
return E_DEVICE_NOT_EXIST;
}
if (IS_NULL(device_schema->time_chunk_writer_)) {
device_schema->time_chunk_writer_ = new TimeChunkWriter();
device_schema->time_chunk_writer_->init(
"", g_config_value_.time_encoding_type_,
g_config_value_.time_compress_type_);
}
time_chunk_writer = device_schema->time_chunk_writer_;
MeasurementSchemaMap &msm = device_schema->measurement_schema_map_;
uint32_t measurement_count = measurement_names.get_count();
for (uint32_t i = 0; i < measurement_count; i++) {
auto ms_iter = msm.find(measurement_names.next());
if (UNLIKELY(ms_iter == msm.end())) {
value_chunk_writers.push_back(NULL);
data_types.push_back(common::NULL_TYPE);
} else {
// Here we may check data_type against ms_iter. But in Java
// libtsfile, no check here.
MeasurementSchema *ms = ms_iter->second;
if (IS_NULL(ms->value_chunk_writer_)) {
ms->value_chunk_writer_ = new ValueChunkWriter;
ret = ms->value_chunk_writer_->init(
ms->measurement_name_, ms->data_type_, ms->encoding_,
ms->compression_type_);
if (IS_SUCC(ret)) {
value_chunk_writers.push_back(ms->value_chunk_writer_);
} else {
value_chunk_writers.push_back(NULL);
for (size_t chunk_writer_idx = 0;
chunk_writer_idx < value_chunk_writers.size();
chunk_writer_idx++) {
if (!value_chunk_writers[chunk_writer_idx]) {
delete value_chunk_writers[chunk_writer_idx];
}
}
ret = common::E_INVALID_ARG;
return ret;
}
} else {
value_chunk_writers.push_back(ms->value_chunk_writer_);
}
data_types.push_back(ms->data_type_);
}
}
return ret;
}
int TsFileWriter::do_check_schema_table(
std::shared_ptr<IDeviceID> device_id, Tablet &tablet,
storage::TimeChunkWriter *&time_chunk_writer,
common::SimpleVector<storage::ValueChunkWriter *> &value_chunk_writers) {
int ret = E_OK;
auto dev_it = schemas_.find(device_id);
MeasurementSchemaGroup *device_schema = NULL;
auto &schema_map = io_writer_->get_schema()->table_schema_map_;
auto table_schema_it = schema_map.find(tablet.get_table_name());
if (UNLIKELY(table_schema_it == schema_map.end())) {
return E_TABLE_NOT_EXIST;
}
auto table_schema = table_schema_it->second;
if (UNLIKELY(dev_it == schemas_.end()) ||
IS_NULL(device_schema = dev_it->second)) {
device_schema = new MeasurementSchemaGroup;
device_schema->is_aligned_ = true;
device_schema->time_chunk_writer_ = new TimeChunkWriter();
device_schema->time_chunk_writer_->init(
"", g_config_value_.time_encoding_type_,
g_config_value_.time_compress_type_);
for (uint32_t i = 0; i < table_schema->get_measurement_schemas().size();
++i) {
if (table_schema->get_column_categories().at(i) ==
common::ColumnCategory::FIELD) {
auto table_column_schema =
table_schema->get_measurement_schemas().at(i);
auto device_column_schema = new MeasurementSchema(
table_column_schema->measurement_name_,
table_column_schema->data_type_,
table_column_schema->encoding_,
table_column_schema->compression_type_);
if (!table_column_schema->props_.empty()) {
device_column_schema->props_ = table_column_schema->props_;
}
device_schema->measurement_schema_map_
[device_column_schema->measurement_name_] =
device_column_schema;
}
}
schemas_[device_id] = device_schema;
}
uint32_t column_cnt = tablet.get_column_count();
time_chunk_writer = device_schema->time_chunk_writer_;
MeasurementSchemaMap &msm = device_schema->measurement_schema_map_;
for (uint32_t i = 0; i < column_cnt; i++) {
if (tablet.column_categories_.at(i) != common::ColumnCategory::FIELD) {
continue;
}
auto ms_iter = msm.find(tablet.get_column_name(i));
if (UNLIKELY(ms_iter == msm.end())) {
value_chunk_writers.push_back(NULL);
} else {
// Here we may check data_type against ms_iter. But in Java
// libtsfile, no check here.
MeasurementSchema *ms = ms_iter->second;
if (IS_NULL(ms->value_chunk_writer_)) {
ms->value_chunk_writer_ = new ValueChunkWriter;
ret = ms->value_chunk_writer_->init(
ms->measurement_name_, ms->data_type_, ms->encoding_,
ms->compression_type_);
if (IS_SUCC(ret)) {
value_chunk_writers.push_back(ms->value_chunk_writer_);
} else {
value_chunk_writers.push_back(NULL);
for (size_t chunk_writer_idx = 0;
chunk_writer_idx < value_chunk_writers.size();
chunk_writer_idx++) {
if (!value_chunk_writers[chunk_writer_idx]) {
delete value_chunk_writers[chunk_writer_idx];
}
}
ret = common::E_INVALID_ARG;
return ret;
}
} else {
value_chunk_writers.push_back(ms->value_chunk_writer_);
}
}
}
return ret;
}
int64_t TsFileWriter::calculate_mem_size_for_all_group() {
int64_t mem_total_size = 0;
DeviceSchemasMapIter device_iter;
for (device_iter = schemas_.begin(); device_iter != schemas_.end();
device_iter++) {
MeasurementSchemaGroup *chunk_group = device_iter->second;
MeasurementSchemaMap &map = chunk_group->measurement_schema_map_;
for (MeasurementSchemaMapIter ms_iter = map.begin();
ms_iter != map.end(); ms_iter++) {
MeasurementSchema *m_schema = ms_iter->second;
if (!chunk_group->is_aligned_) {
ChunkWriter *&chunk_writer = m_schema->chunk_writer_;
if (chunk_writer != nullptr) {
mem_total_size +=
chunk_writer->estimate_max_series_mem_size();
}
} else {
ValueChunkWriter *&chunk_writer = m_schema->value_chunk_writer_;
if (chunk_writer != nullptr) {
mem_total_size +=
chunk_writer->estimate_max_series_mem_size();
}
}
}
if (chunk_group->is_aligned_) {
TimeChunkWriter *&time_chunk_writer =
chunk_group->time_chunk_writer_;
if (time_chunk_writer != nullptr) {
mem_total_size +=
time_chunk_writer->estimate_max_series_mem_size();
}
}
}
return mem_total_size;
}
/**
* check occupied memory size, if it exceeds the chunkGroupSize threshold, flush
* them to given OutputStream.
*/
int TsFileWriter::check_memory_size_and_may_flush_chunks() {
int ret = E_OK;
if (record_count_since_last_flush_ >= record_count_for_next_mem_check_) {
int64_t mem_size = calculate_mem_size_for_all_group();
record_count_for_next_mem_check_ =
record_count_since_last_flush_ *
common::g_config_value_.chunk_group_size_threshold_ / mem_size;
if (mem_size > common::g_config_value_.chunk_group_size_threshold_) {
ret = flush();
}
}
return ret;
}
int TsFileWriter::write_record(const TsRecord &record) {
int ret = E_OK;
// std::vector<ChunkWriter*> chunk_writers;
SimpleVector<ChunkWriter *> chunk_writers;
SimpleVector<common::TSDataType> data_types;
MeasurementNamesFromRecord mnames_getter(record);
if (RET_FAIL(do_check_schema(
std::make_shared<StringArrayDeviceID>(record.device_id_),
mnames_getter, chunk_writers, data_types))) {
return ret;
}
ASSERT(chunk_writers.size() == record.points_.size());
for (uint32_t c = 0; c < chunk_writers.size(); c++) {
ChunkWriter *chunk_writer = chunk_writers[c];
if (IS_NULL(chunk_writer)) {
continue;
}
// ignore point writer failure
write_point(chunk_writer, record.timestamp_, data_types[c],
record.points_[c]);
}
record_count_since_last_flush_++;
ret = check_memory_size_and_may_flush_chunks();
return ret;
}
int TsFileWriter::write_record_aligned(const TsRecord &record) {
int ret = E_OK;
SimpleVector<ValueChunkWriter *> value_chunk_writers;
SimpleVector<common::TSDataType> data_types;
TimeChunkWriter *time_chunk_writer;
MeasurementNamesFromRecord mnames_getter(record);
if (RET_FAIL(do_check_schema_aligned(
std::make_shared<StringArrayDeviceID>(record.device_id_),
mnames_getter, time_chunk_writer, value_chunk_writers,
data_types))) {
return ret;
}
if (value_chunk_writers.size() != record.points_.size()) {
return E_INVALID_ARG;
}
time_chunk_writer->write(record.timestamp_);
for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
ValueChunkWriter *value_chunk_writer = value_chunk_writers[c];
if (IS_NULL(value_chunk_writer)) {
continue;
}
write_point_aligned(value_chunk_writer, record.timestamp_,
data_types[c], record.points_[c]);
}
return ret;
}
int TsFileWriter::write_point(ChunkWriter *chunk_writer, int64_t timestamp,
common::TSDataType data_type,
const DataPoint &point) {
switch (data_type) {
case common::BOOLEAN:
return chunk_writer->write(timestamp, point.u_.bool_val_);
case common::DATE:
case common::INT32:
return chunk_writer->write(timestamp, point.u_.i32_val_);
case common::TIMESTAMP:
case common::INT64:
return chunk_writer->write(timestamp, point.u_.i64_val_);
case common::FLOAT:
return chunk_writer->write(timestamp, point.u_.float_val_);
case common::DOUBLE:
return chunk_writer->write(timestamp, point.u_.double_val_);
case common::BLOB:
case common::TEXT:
case common::STRING:
return chunk_writer->write(timestamp, *point.u_.str_val_);
default:
return E_INVALID_DATA_POINT;
}
}
int TsFileWriter::write_point_aligned(ValueChunkWriter *value_chunk_writer,
int64_t timestamp,
common::TSDataType data_type,
const DataPoint &point) {
bool isnull = point.isnull;
switch (data_type) {
case common::BOOLEAN:
return value_chunk_writer->write(timestamp, point.u_.bool_val_,
isnull);
case common::INT32:
case common::DATE:
return value_chunk_writer->write(timestamp, point.u_.i32_val_,
isnull);
case common::TIMESTAMP:
case common::INT64:
return value_chunk_writer->write(timestamp, point.u_.i64_val_,
isnull);
case common::FLOAT:
return value_chunk_writer->write(timestamp, point.u_.float_val_,
isnull);
case common::DOUBLE:
return value_chunk_writer->write(timestamp, point.u_.double_val_,
isnull);
case common::BLOB:
case common::TEXT:
case common::STRING:
return value_chunk_writer->write(timestamp, point.u_.str_val_,
isnull);
default:
return E_INVALID_DATA_POINT;
}
}
int TsFileWriter::write_tablet_aligned(const Tablet &tablet) {
int ret = E_OK;
SimpleVector<ValueChunkWriter *> value_chunk_writers;
TimeChunkWriter *time_chunk_writer = nullptr;
SimpleVector<common::TSDataType> data_types;
MeasurementNamesFromTablet mnames_getter(tablet);
if (RET_FAIL(do_check_schema_aligned(
std::make_shared<StringArrayDeviceID>(tablet.insert_target_name_),
mnames_getter, time_chunk_writer, value_chunk_writers,
data_types))) {
return ret;
}
time_write_column(time_chunk_writer, tablet);
ASSERT(value_chunk_writers.size() == tablet.get_column_count());
for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
ValueChunkWriter *value_chunk_writer = value_chunk_writers[c];
if (IS_NULL(value_chunk_writer)) {
continue;
}
if (RET_FAIL(value_write_column(value_chunk_writer, tablet, c, 0,
tablet.get_cur_row_size()))) {
return ret;
}
}
return ret;
}
int TsFileWriter::write_tablet(const Tablet &tablet) {
int ret = E_OK;
SimpleVector<ChunkWriter *> chunk_writers;
SimpleVector<common::TSDataType> data_types;
MeasurementNamesFromTablet mnames_getter(tablet);
if (RET_FAIL(do_check_schema(
std::make_shared<StringArrayDeviceID>(tablet.insert_target_name_),
mnames_getter, chunk_writers, data_types))) {
return ret;
}
ASSERT(chunk_writers.size() == tablet.get_column_count());
for (uint32_t c = 0; c < chunk_writers.size(); c++) {
ChunkWriter *chunk_writer = chunk_writers[c];
if (IS_NULL(chunk_writer)) {
continue;
}
if (RET_FAIL(write_column(chunk_writer, tablet, c))) {
return ret;
}
}
record_count_since_last_flush_ += tablet.max_row_num_;
ret = check_memory_size_and_may_flush_chunks();
return ret;
}
int TsFileWriter::write_tree(const Tablet &tablet) {
auto device_id =
std::make_shared<StringArrayDeviceID>(tablet.insert_target_name_);
if (schemas_.find(device_id) != schemas_.end()) {
auto schema = schemas_[device_id];
if (schema->is_aligned_) {
return this->write_tablet_aligned(tablet);
}
return this->write_tablet(tablet);
}
return E_NOT_EXIST;
}
int TsFileWriter::write_tree(const TsRecord &record) {
auto device_id = std::make_shared<StringArrayDeviceID>(record.device_id_);
if (schemas_.find(device_id) != schemas_.end()) {
auto schema = schemas_[device_id];
if (schema->is_aligned_) {
return this->write_record_aligned(record);
}
return this->write_record(record);
}
return E_NOT_EXIST;
}
int TsFileWriter::write_table(Tablet &tablet) {
int ret = E_OK;
if (io_writer_->get_schema()->table_schema_map_.find(
tablet.insert_target_name_) ==
io_writer_->get_schema()->table_schema_map_.end()) {
ret = E_TABLE_NOT_EXIST;
return ret;
}
if (RET_FAIL(do_check_and_prepare_tablet(tablet))) {
return ret;
}
auto device_id_end_index_pairs = split_tablet_by_device(tablet);
int start_idx = 0;
for (auto &device_id_end_index_pair : device_id_end_index_pairs) {
auto device_id = device_id_end_index_pair.first;
int end_idx = device_id_end_index_pair.second;
if (end_idx == 0) continue;
if (table_aligned_) {
SimpleVector<ValueChunkWriter *> value_chunk_writers;
TimeChunkWriter *time_chunk_writer = nullptr;
if (RET_FAIL(do_check_schema_table(device_id, tablet,
time_chunk_writer,
value_chunk_writers))) {
return ret;
}
for (int i = start_idx; i < end_idx; i++) {
if (RET_FAIL(time_chunk_writer->write(tablet.timestamps_[i]))) {
return ret;
}
}
uint32_t field_col_count = 0;
for (uint32_t i = 0; i < tablet.get_column_count(); ++i) {
if (tablet.column_categories_[i] ==
common::ColumnCategory::FIELD) {
ValueChunkWriter *value_chunk_writer =
value_chunk_writers[field_col_count];
if (IS_NULL(value_chunk_writer)) {
continue;
}
if (RET_FAIL(value_write_column(value_chunk_writer, tablet,
i, start_idx, end_idx))) {
return ret;
}
field_col_count++;
}
}
start_idx = end_idx;
} else {
MeasurementNamesFromTablet mnames_getter(tablet);
SimpleVector<ChunkWriter *> chunk_writers;
SimpleVector<common::TSDataType> data_types;
if (RET_FAIL(do_check_schema(device_id, mnames_getter,
chunk_writers, data_types))) {
return ret;
}
ASSERT(chunk_writers.size() == tablet.get_column_count());
for (uint32_t c = 0; c < chunk_writers.size(); c++) {
ChunkWriter *chunk_writer = chunk_writers[c];
if (IS_NULL(chunk_writer)) {
continue;
}
if (RET_FAIL(write_column(chunk_writer, tablet, c, start_idx,
device_id_end_index_pair.second))) {
return ret;
}
}
start_idx = device_id_end_index_pair.second;
}
}
record_count_since_last_flush_ += tablet.cur_row_size_;
ret = check_memory_size_and_may_flush_chunks();
return ret;
}
std::vector<std::pair<std::shared_ptr<IDeviceID>, int>>
TsFileWriter::split_tablet_by_device(const Tablet &tablet) {
std::vector<std::pair<std::shared_ptr<IDeviceID>, int>> result;
std::shared_ptr<IDeviceID> last_device_id =
std::make_shared<StringArrayDeviceID>("last_device_id");
for (uint32_t i = 0; i < tablet.get_cur_row_size(); i++) {
std::shared_ptr<IDeviceID> cur_device_id(tablet.get_device_id(i));
if (*cur_device_id != *last_device_id) {
result.emplace_back(std::move(last_device_id), i);
last_device_id = std::move(cur_device_id);
}
}
result.emplace_back(std::move(last_device_id), tablet.get_cur_row_size());
return result;
}
int TsFileWriter::write_column(ChunkWriter *chunk_writer, const Tablet &tablet,
int col_idx, uint32_t start_idx,
uint32_t end_idx) {
int ret = E_OK;
common::TSDataType data_type = tablet.schema_vec_->at(col_idx).data_type_;
int64_t *timestamps = tablet.timestamps_;
Tablet::ValueMatrixEntry col_values = tablet.value_matrix_[col_idx];
BitMap &col_notnull_bitmap = tablet.bitmaps_[col_idx];
end_idx = std::min(end_idx, tablet.max_row_num_);
if (data_type == common::BOOLEAN) {
ret = write_typed_column(chunk_writer, timestamps, col_values.bool_data,
col_notnull_bitmap, start_idx, end_idx);
} else if (data_type == common::INT32) {
ret =
write_typed_column(chunk_writer, timestamps, col_values.int32_data,
col_notnull_bitmap, start_idx, end_idx);
} else if (data_type == common::INT64) {
ret =
write_typed_column(chunk_writer, timestamps, col_values.int64_data,
col_notnull_bitmap, start_idx, end_idx);
} else if (data_type == common::FLOAT) {
ret =
write_typed_column(chunk_writer, timestamps, col_values.float_data,
col_notnull_bitmap, start_idx, end_idx);
} else if (data_type == common::DOUBLE) {
ret =
write_typed_column(chunk_writer, timestamps, col_values.double_data,
col_notnull_bitmap, start_idx, end_idx);
} else if (data_type == common::STRING) {
ret =
write_typed_column(chunk_writer, timestamps, col_values.string_data,
col_notnull_bitmap, start_idx, end_idx);
} else {
ASSERT(false);
}
return ret;
}
int TsFileWriter::time_write_column(TimeChunkWriter *time_chunk_writer,
const Tablet &tablet, uint32_t start_idx,
uint32_t end_idx) {
int64_t *timestamps = tablet.timestamps_;
int ret = E_OK;
if (IS_NULL(time_chunk_writer) || IS_NULL(timestamps)) {
return E_INVALID_ARG;
}
for (uint32_t r = start_idx; r < end_idx && r < tablet.max_row_num_; r++) {
if (RET_FAIL(time_chunk_writer->write(timestamps[r]))) {
break;
}
}
return ret;
}
int TsFileWriter::value_write_column(ValueChunkWriter *value_chunk_writer,
const Tablet &tablet, int col_idx,
uint32_t start_idx, uint32_t end_idx) {
int ret = E_OK;
TSDataType data_type = tablet.schema_vec_->at(col_idx).data_type_;
int64_t *timestamps = tablet.timestamps_;
Tablet::ValueMatrixEntry col_values = tablet.value_matrix_[col_idx];
BitMap &col_notnull_bitmap = tablet.bitmaps_[col_idx];
switch (data_type) {
case common::BOOLEAN:
ret = write_typed_column(value_chunk_writer, timestamps,
(bool *)col_values.bool_data,
col_notnull_bitmap, start_idx, end_idx);
break;
case common::DATE:
case common::INT32:
ret = write_typed_column(value_chunk_writer, timestamps,
(int32_t *)col_values.int32_data,
col_notnull_bitmap, start_idx, end_idx);
break;
case common::TIMESTAMP:
case common::INT64:
ret = write_typed_column(value_chunk_writer, timestamps,
(int64_t *)col_values.int64_data,
col_notnull_bitmap, start_idx, end_idx);
break;
case common::FLOAT:
ret = write_typed_column(value_chunk_writer, timestamps,
(float *)col_values.float_data,
col_notnull_bitmap, start_idx, end_idx);
break;
case common::DOUBLE:
ret = write_typed_column(value_chunk_writer, timestamps,
(double *)col_values.double_data,
col_notnull_bitmap, start_idx, end_idx);
break;
case common::STRING:
case common::TEXT:
case common::BLOB:
ret = write_typed_column(value_chunk_writer, timestamps,
(common::String *)col_values.string_data,
col_notnull_bitmap, start_idx, end_idx);
break;
default:
ret = E_NOT_SUPPORT;
}
return ret;
}
#define DO_WRITE_TYPED_COLUMN() \
do { \
int ret = E_OK; \
for (uint32_t r = start_idx; r < end_idx; r++) { \
if (LIKELY(!col_notnull_bitmap.test(r))) { \
if (RET_FAIL( \
chunk_writer->write(timestamps[r], col_values[r]))) { \
return ret; \
} \
} \
} \
return ret; \
} while (false)
#define DO_VALUE_WRITE_TYPED_COLUMN() \
do { \
int ret = E_OK; \
for (uint32_t r = start_idx; r < end_idx; r++) { \
if (LIKELY(col_notnull_bitmap.test(r))) { \
if (RET_FAIL(value_chunk_writer->write( \
timestamps[r], col_values[r], true))) { \
return ret; \
} \
} else { \
if (RET_FAIL(value_chunk_writer->write( \
timestamps[r], col_values[r], false))) { \
return ret; \
} \
} \
} \
return ret; \
} while (false)
int TsFileWriter::write_typed_column(ChunkWriter *chunk_writer,
int64_t *timestamps, bool *col_values,
BitMap &col_notnull_bitmap,
uint32_t start_idx, uint32_t end_idx) {
DO_WRITE_TYPED_COLUMN();
}
int TsFileWriter::write_typed_column(ChunkWriter *chunk_writer,
int64_t *timestamps, int32_t *col_values,
BitMap &col_notnull_bitmap,
uint32_t start_idx, uint32_t end_idx) {
DO_WRITE_TYPED_COLUMN();
}
int TsFileWriter::write_typed_column(ChunkWriter *chunk_writer,
int64_t *timestamps, int64_t *col_values,
BitMap &col_notnull_bitmap,
uint32_t start_idx, uint32_t end_idx) {
DO_WRITE_TYPED_COLUMN();
}
int TsFileWriter::write_typed_column(ChunkWriter *chunk_writer,
int64_t *timestamps, float *col_values,
BitMap &col_notnull_bitmap,
uint32_t start_idx, uint32_t end_idx) {
DO_WRITE_TYPED_COLUMN();
}
int TsFileWriter::write_typed_column(ChunkWriter *chunk_writer,
int64_t *timestamps, double *col_values,
BitMap &col_notnull_bitmap,
uint32_t start_idx, uint32_t end_idx) {
DO_WRITE_TYPED_COLUMN();
}
int TsFileWriter::write_typed_column(ChunkWriter *chunk_writer,
int64_t *timestamps,
common::String *col_values,
BitMap &col_notnull_bitmap,
uint32_t start_idx, uint32_t end_idx) {
DO_WRITE_TYPED_COLUMN();
}
int TsFileWriter::write_typed_column(ValueChunkWriter *value_chunk_writer,
int64_t *timestamps, bool *col_values,
BitMap &col_notnull_bitmap,
uint32_t start_idx, uint32_t end_idx) {
DO_VALUE_WRITE_TYPED_COLUMN();
}
int TsFileWriter::write_typed_column(ValueChunkWriter *value_chunk_writer,
int64_t *timestamps, int32_t *col_values,
BitMap &col_notnull_bitmap,
uint32_t start_idx, uint32_t end_idx) {
DO_VALUE_WRITE_TYPED_COLUMN();
}
int TsFileWriter::write_typed_column(ValueChunkWriter *value_chunk_writer,
int64_t *timestamps, int64_t *col_values,
BitMap &col_notnull_bitmap,
uint32_t start_idx, uint32_t end_idx) {
DO_VALUE_WRITE_TYPED_COLUMN();
}
int TsFileWriter::write_typed_column(ValueChunkWriter *value_chunk_writer,
int64_t *timestamps, float *col_values,
BitMap &col_notnull_bitmap,
uint32_t start_idx, uint32_t end_idx) {
DO_VALUE_WRITE_TYPED_COLUMN();
}
int TsFileWriter::write_typed_column(ValueChunkWriter *value_chunk_writer,
int64_t *timestamps, double *col_values,
BitMap &col_notnull_bitmap,
uint32_t start_idx, uint32_t end_idx) {
DO_VALUE_WRITE_TYPED_COLUMN();
}
int TsFileWriter::write_typed_column(ValueChunkWriter *value_chunk_writer,
int64_t *timestamps,
common::String *col_values,
common::BitMap &col_notnull_bitmap,
uint32_t start_idx, uint32_t end_idx) {
DO_VALUE_WRITE_TYPED_COLUMN();
}
// TODO make sure ret is meaningful to SDK user
int TsFileWriter::flush() {
int ret = E_OK;
if (!start_file_done_) {
if (RET_FAIL(io_writer_->start_file())) {
return ret;
}
start_file_done_ = true;
}
/* since @schemas_ used std::map which is rbtree underlying,
so map itself is ordered by device name. */
DeviceSchemasMapIter device_iter;
for (device_iter = schemas_.begin(); device_iter != schemas_.end();
device_iter++) { // cppcheck-suppress postfixOperator
if (check_chunk_group_empty(device_iter->second,
device_iter->second->is_aligned_)) {
continue;
}
bool is_aligned = device_iter->second->is_aligned_;
if (RET_FAIL(io_writer_->start_flush_chunk_group(device_iter->first,
is_aligned))) {
} else if (RET_FAIL(
flush_chunk_group(device_iter->second, is_aligned))) {
} else if (RET_FAIL(io_writer_->end_flush_chunk_group(is_aligned))) {
}
}
record_count_since_last_flush_ = 0;
return ret;
}
bool TsFileWriter::check_chunk_group_empty(MeasurementSchemaGroup *chunk_group,
bool is_aligned) {
if (chunk_group->is_aligned_ &&
chunk_group->time_chunk_writer_ != nullptr &&
chunk_group->time_chunk_writer_->hasData()) {
return false;
}
MeasurementSchemaMap &map = chunk_group->measurement_schema_map_;
for (MeasurementSchemaMapIter ms_iter = map.begin(); ms_iter != map.end();
ms_iter++) {
MeasurementSchema *m_schema = ms_iter->second;
if (is_aligned) {
if (m_schema->value_chunk_writer_ != NULL &&
m_schema->value_chunk_writer_->hasData()) {
return false;
}
} else {
if (m_schema->chunk_writer_ != NULL &&
m_schema->chunk_writer_->hasData()) {
// first condition is to avoid first flush empty chunk group
// second condition is to avoid repeated flush
return false;
}
}
}
return true;
}
#define FLUSH_CHUNK(writer, io_writer, name, data_type, encoding, compression, \
num_pages) \
if (RET_FAIL(writer->end_encode_chunk())) { \
} else if (RET_FAIL(io_writer->start_flush_chunk( \
writer->get_chunk_data(), name, data_type, encoding, \
compression, num_pages))) { \
} else if (RET_FAIL(io_writer->flush_chunk(writer->get_chunk_data()))) { \
} else if (RET_FAIL(io_writer->end_flush_chunk( \
writer->get_chunk_statistic()))) { \
} else { \
writer->reset(); \
}
int TsFileWriter::flush_chunk_group(MeasurementSchemaGroup *chunk_group,
bool is_aligned) {
int ret = E_OK;
MeasurementSchemaMap &map = chunk_group->measurement_schema_map_;
if (chunk_group->is_aligned_) {
TimeChunkWriter *&time_chunk_writer = chunk_group->time_chunk_writer_;
ChunkHeader chunk_header = time_chunk_writer->get_chunk_header();
FLUSH_CHUNK(time_chunk_writer, io_writer_,
chunk_header.measurement_name_, chunk_header.data_type_,
chunk_header.encoding_type_, chunk_header.compression_type_,
time_chunk_writer->num_of_pages())
}
for (MeasurementSchemaMapIter ms_iter = map.begin(); ms_iter != map.end();
ms_iter++) {
MeasurementSchema *m_schema = ms_iter->second;
if (!chunk_group->is_aligned_ && m_schema->chunk_writer_ != nullptr) {
ChunkWriter *&chunk_writer = m_schema->chunk_writer_;
FLUSH_CHUNK(chunk_writer, io_writer_, m_schema->measurement_name_,
m_schema->data_type_, m_schema->encoding_,
m_schema->compression_type_,
chunk_writer->num_of_pages())
} else if (m_schema->value_chunk_writer_ != nullptr) {
ValueChunkWriter *&value_chunk_writer =
m_schema->value_chunk_writer_;
FLUSH_CHUNK(value_chunk_writer, io_writer_,
m_schema->measurement_name_, m_schema->data_type_,
m_schema->encoding_, m_schema->compression_type_,
value_chunk_writer->num_of_pages())
}
}
return ret;
}
int TsFileWriter::close() { return io_writer_->end_file(); }
} // end namespace storage