| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * License); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include "tsfile_writer.h" |
| |
| #include <unistd.h> |
| |
| #include "chunk_writer.h" |
| #include "common/config/config.h" |
| #ifdef ENABLE_THREADS |
| #include "common/thread_pool.h" |
| #endif |
| #include "file/restorable_tsfile_io_writer.h" |
| #include "file/tsfile_io_writer.h" |
| #include "file/write_file.h" |
| #include "utils/errno_define.h" |
| |
| using namespace common; |
| |
| namespace storage { |
| |
| namespace libtsfile { |
| bool g_s_is_inited = false; |
| } |
| |
| int libtsfile_init() { |
| libtsfile::g_s_is_inited = false; |
| if (libtsfile::g_s_is_inited) { |
| return E_OK; |
| } |
| ModStat::get_instance().init(); |
| |
| init_common(); |
| |
| libtsfile::g_s_is_inited = true; |
| return E_OK; |
| } |
| |
| void libtsfile_destroy() { |
| #ifdef ENABLE_THREADS |
| delete common::g_write_thread_pool_; |
| common::g_write_thread_pool_ = nullptr; |
| #endif |
| ModStat::get_instance().destroy(); |
| libtsfile::g_s_is_inited = false; |
| } |
| |
| void set_page_max_point_count(uint32_t page_max_ponint_count) { |
| config_set_page_max_point_count(page_max_ponint_count); |
| } |
| void set_max_degree_of_index_node(uint32_t max_degree_of_index_node) { |
| config_set_max_degree_of_index_node(max_degree_of_index_node); |
| } |
| |
| void set_strict_page_size(bool strict_page_size) { |
| config_set_strict_page_size(strict_page_size); |
| } |
| |
| TsFileWriter::TsFileWriter() |
| : write_file_(nullptr), |
| io_writer_(nullptr), |
| schemas_(), |
| start_file_done_(false), |
| record_count_since_last_flush_(0), |
| record_count_for_next_mem_check_( |
| g_config_value_.record_count_for_next_mem_check_), |
| write_file_created_(false), |
| io_writer_owned_(true) {} |
| |
| TsFileWriter::~TsFileWriter() { destroy(); } |
| |
| void TsFileWriter::destroy() { |
| if (write_file_created_ && write_file_ != nullptr) { |
| delete write_file_; |
| write_file_ = nullptr; |
| } |
| if (io_writer_owned_ && io_writer_) { |
| delete io_writer_; |
| } |
| io_writer_ = nullptr; |
| DeviceSchemasMapIter dev_iter; |
| // cppcheck-suppress postfixOperator |
| for (dev_iter = schemas_.begin(); dev_iter != schemas_.end(); dev_iter++) { |
| MeasurementSchemaMap& ms_map = |
| dev_iter->second->measurement_schema_map_; |
| MeasurementSchemaMapIter ms_iter; |
| for (ms_iter = ms_map.begin(); ms_iter != ms_map.end(); ms_iter++) { |
| MeasurementSchema* ms = ms_iter->second; |
| if (ms != nullptr) { |
| if (ms->chunk_writer_ != nullptr) { |
| delete ms->chunk_writer_; |
| ms->chunk_writer_ = nullptr; |
| } |
| delete ms; |
| ms_iter->second = nullptr; |
| } |
| } |
| delete dev_iter->second; |
| dev_iter->second = nullptr; |
| } |
| schemas_.clear(); |
| record_count_since_last_flush_ = 0; |
| } |
| |
| int TsFileWriter::init(WriteFile* write_file) { |
| if (write_file == nullptr) { |
| return E_INVALID_ARG; |
| } else if (!write_file->file_opened()) { |
| return E_INVALID_ARG; |
| } |
| write_file_ = write_file; |
| write_file_created_ = false; |
| io_writer_owned_ = true; |
| io_writer_ = new TsFileIOWriter(); |
| io_writer_->init(write_file_); |
| return E_OK; |
| } |
| |
| // ----------------------------------------------------------------------------- |
| // Recovery init: rebuild schemas_ from recovered chunk group metas (aligned |
| // with Java). Use each CGM's actual device_id from file as key so tree and |
| // table model both get correct lookups. Table model can still lazy-create from |
| // table_schema_map_ in do_check_schema_table when a new device appears. |
| // All new MeasurementSchemaGroup/MeasurementSchema are freed in destroy(). |
| // ----------------------------------------------------------------------------- |
| int TsFileWriter::init(RestorableTsFileIOWriter* rw) { |
| if (rw == nullptr || !rw->can_write()) { |
| return E_INVALID_ARG; |
| } |
| write_file_ = rw->get_write_file(); |
| write_file_created_ = false; |
| io_writer_owned_ = false; |
| io_writer_ = rw; |
| |
| const std::vector<ChunkGroupMeta*>& recovered = |
| rw->get_recovered_chunk_group_metas(); |
| for (ChunkGroupMeta* cgm : recovered) { |
| if (cgm == nullptr || cgm->device_id_ == nullptr) { |
| continue; |
| } |
| std::shared_ptr<IDeviceID> device_id = cgm->device_id_; |
| |
| // Find existing group for same device (same device may have multiple |
| // CGMs from multiple flushes). |
| DeviceSchemasMapIter it = schemas_.begin(); |
| for (; it != schemas_.end(); ++it) { |
| if (it->first != nullptr && *it->first == *device_id) { |
| break; |
| } |
| } |
| |
| MeasurementSchemaGroup* group = nullptr; |
| if (it != schemas_.end()) { |
| group = it->second; |
| } else { |
| group = new MeasurementSchemaGroup; |
| group->is_aligned_ = |
| rw->is_device_aligned(device_id->get_table_name()); |
| schemas_.insert(std::make_pair(device_id, group)); |
| } |
| |
| // Add measurement schemas from this CGM (skip time column: empty name). |
| for (auto iter = cgm->chunk_meta_list_.begin(); |
| iter != cgm->chunk_meta_list_.end(); iter++) { |
| ChunkMeta* cm = iter.get(); |
| if (cm == nullptr) { |
| continue; |
| } |
| std::string mname = cm->measurement_name_.to_std_string(); |
| if (mname.empty()) { |
| continue; |
| } |
| if (group->measurement_schema_map_.find(mname) != |
| group->measurement_schema_map_.end()) { |
| continue; |
| } |
| MeasurementSchema* ms = new MeasurementSchema( |
| mname, cm->data_type_, cm->encoding_, cm->compression_type_); |
| group->measurement_schema_map_.insert(std::make_pair(mname, ms)); |
| } |
| } |
| |
| start_file_done_ = true; |
| return E_OK; |
| } |
| |
| void TsFileWriter::set_generate_table_schema(bool generate_table_schema) { |
| io_writer_->set_generate_table_schema(generate_table_schema); |
| } |
| |
| int TsFileWriter::register_table( |
| const std::shared_ptr<TableSchema>& table_schema) { |
| if (!table_schema) return E_INVALID_ARG; |
| |
| // Empty table name or column name is not allowed. |
| if (table_schema->get_table_name().empty()) { |
| return E_INVALID_ARG; |
| } |
| for (const auto& name : table_schema->get_measurement_names()) { |
| if (name.empty()) { |
| return E_INVALID_ARG; |
| } |
| } |
| |
| // Because it is not possible to return an error code for duplicate name |
| // checks during the construction phase of TabletSchema, the duplicate name |
| // check has been moved to the table registration stage. |
| |
| // TODO: Add Debug INFO if ErrorCode is not enough to describe problems. |
| if (table_schema->get_column_pos_index_num() != |
| table_schema->get_measurement_names().size()) { |
| return E_INVALID_ARG; |
| } |
| |
| if (io_writer_->get_schema()->table_schema_map_.find( |
| table_schema->get_table_name()) != |
| io_writer_->get_schema()->table_schema_map_.end()) { |
| return E_ALREADY_EXIST; |
| } |
| io_writer_->get_schema()->register_table_schema(table_schema); |
| return E_OK; |
| } |
| |
| bool check_file_exist(const std::string& file_path) { |
| return access(file_path.c_str(), F_OK) == 0; |
| } |
| |
| int TsFileWriter::open(const std::string& file_path, int flags, mode_t mode) { |
| if (check_file_exist(file_path)) { |
| return E_ALREADY_EXIST; |
| } |
| write_file_ = new WriteFile; |
| write_file_created_ = true; |
| io_writer_ = new TsFileIOWriter; |
| int ret = E_OK; |
| if (RET_FAIL(write_file_->create(file_path, flags, mode))) { |
| } else { |
| io_writer_->init(write_file_); |
| } |
| return ret; |
| } |
| |
| int TsFileWriter::open(const std::string& file_path) { |
| return open(file_path, O_RDWR | O_CREAT | O_TRUNC, 0666); |
| } |
| |
| int TsFileWriter::register_aligned_timeseries( |
| const std::string& device_id, const MeasurementSchema& measurement_schema) { |
| MeasurementSchema* ms = new MeasurementSchema( |
| measurement_schema.measurement_name_, measurement_schema.data_type_, |
| measurement_schema.encoding_, measurement_schema.compression_type_); |
| return register_timeseries(device_id, ms, true); |
| } |
| |
| int TsFileWriter::register_aligned_timeseries( |
| const std::string& device_id, |
| const std::vector<MeasurementSchema*>& measurement_schemas) { |
| int ret = E_OK; |
| for (auto it : measurement_schemas) { |
| ret = register_timeseries(device_id, it, true); |
| if (ret != E_OK) { |
| return ret; |
| } |
| } |
| return ret; |
| } |
| |
| int TsFileWriter::register_timeseries( |
| const std::string& device_id, const MeasurementSchema& measurement_schema) { |
| MeasurementSchema* ms = new MeasurementSchema( |
| measurement_schema.measurement_name_, measurement_schema.data_type_, |
| measurement_schema.encoding_, measurement_schema.compression_type_); |
| return register_timeseries(device_id, ms, false); |
| } |
| |
| int TsFileWriter::register_timeseries(const std::string& device_path, |
| MeasurementSchema* measurement_schema, |
| bool is_aligned) { |
| std::shared_ptr<IDeviceID> device_id = |
| std::make_shared<StringArrayDeviceID>(device_path); |
| DeviceSchemasMapIter device_iter = schemas_.find(device_id); |
| if (device_iter != schemas_.end()) { |
| MeasurementSchemaMap& msm = |
| device_iter->second->measurement_schema_map_; |
| MeasurementSchemaMapInsertResult ins_res = msm.insert(std::make_pair( |
| measurement_schema->measurement_name_, measurement_schema)); |
| if (UNLIKELY(!ins_res.second)) { |
| return E_ALREADY_EXIST; |
| } |
| } else { |
| MeasurementSchemaGroup* ms_group = new MeasurementSchemaGroup; |
| ms_group->is_aligned_ = is_aligned; |
| ms_group->measurement_schema_map_.insert(std::make_pair( |
| measurement_schema->measurement_name_, measurement_schema)); |
| schemas_.insert(std::make_pair(device_id, ms_group)); |
| } |
| return E_OK; |
| } |
| |
| int TsFileWriter::register_timeseries( |
| const std::string& device_id, |
| const std::vector<MeasurementSchema*>& measurement_schema_vec) { |
| int ret = E_OK; |
| auto it = measurement_schema_vec.begin(); |
| for (; it != measurement_schema_vec.end(); |
| it++) { // cppcheck-suppress postfixOperator |
| ret = register_timeseries(device_id, *it); |
| if (ret != E_OK) { |
| return ret; |
| } |
| } |
| return ret; |
| } |
| |
| struct MeasurementSchemaMapNamesGetter { |
| public: |
| explicit MeasurementSchemaMapNamesGetter( |
| const MeasurementSchemaMap& measurement_schema_map) |
| : measurement_schema_map_( |
| const_cast<MeasurementSchemaMap&>(measurement_schema_map)) { |
| measurement_name_idx_ = measurement_schema_map_.begin(); |
| } |
| |
| FORCE_INLINE uint32_t get_count() const { |
| return measurement_schema_map_.size(); |
| } |
| |
| FORCE_INLINE const std::string& next() { |
| ASSERT(measurement_name_idx_ != measurement_schema_map_.end()); |
| std::string& ret = measurement_name_idx_->second->measurement_name_; |
| measurement_name_idx_++; |
| return ret; |
| } |
| |
| private: |
| MeasurementSchemaMap& measurement_schema_map_; |
| MeasurementSchemaMap::iterator measurement_name_idx_; |
| }; |
| |
| struct MeasurementNamesFromRecord { |
| public: |
| explicit MeasurementNamesFromRecord(const TsRecord& record) |
| : record_(record), measurement_name_idx_(0) {} |
| FORCE_INLINE uint32_t get_count() const { return record_.points_.size(); } |
| |
| FORCE_INLINE const std::string& next() { |
| return this->at(measurement_name_idx_++); |
| } |
| |
| private: |
| const TsRecord& record_; |
| size_t measurement_name_idx_; |
| FORCE_INLINE const std::string& at(size_t idx) const { |
| ASSERT(idx < record_.points_.size()); |
| return record_.points_[idx].measurement_name_; |
| } |
| }; |
| |
| struct MeasurementNamesFromTablet { |
| explicit MeasurementNamesFromTablet(const Tablet& tablet) |
| : tablet_(tablet), measurement_name_idx_(0) {} |
| FORCE_INLINE uint32_t get_count() const { |
| return tablet_.schema_vec_->size(); |
| } |
| FORCE_INLINE const std::string& next() { |
| return this->at(measurement_name_idx_++); |
| } |
| |
| private: |
| const Tablet& tablet_; |
| size_t measurement_name_idx_; |
| FORCE_INLINE const std::string& at(size_t idx) const { |
| ASSERT(idx < tablet_.schema_vec_->size()); |
| return tablet_.schema_vec_->at(idx).measurement_name_; |
| } |
| }; |
| |
| int TsFileWriter::do_check_and_prepare_tablet(Tablet& tablet) { |
| if (tablet.column_categories_.empty()) { |
| auto& schema_map = io_writer_->get_schema()->table_schema_map_; |
| auto table_schema_it = schema_map.find(tablet.get_table_name()); |
| auto table_schema = table_schema_it->second; |
| uint32_t column_cnt = tablet.get_column_count(); |
| for (uint32_t i = 0; i < column_cnt; i++) { |
| auto& col_name = tablet.get_column_name(i); |
| int col_index = table_schema->find_column_index(col_name); |
| if (col_index == -1) { |
| return E_COLUMN_NOT_EXIST; |
| } |
| if (table_schema->get_data_types()[col_index] != |
| tablet.schema_vec_->at(i).data_type_) { |
| return E_TYPE_NOT_MATCH; |
| } |
| const common::ColumnCategory column_category = |
| table_schema->get_column_categories()[col_index]; |
| tablet.column_categories_.emplace_back(column_category); |
| if (column_category == ColumnCategory::TAG) { |
| tablet.id_column_indexes_.push_back(i); |
| } |
| } |
| } |
| return common::E_OK; |
| } |
| |
| std::shared_ptr<TableSchema> TsFileWriter::get_table_schema( |
| const std::string& table_name) const { |
| auto& schema_map = io_writer_->get_schema()->table_schema_map_; |
| auto it = schema_map.find(table_name); |
| if (it == schema_map.end()) return nullptr; |
| return it->second; |
| } |
| |
| template <typename MeasurementNamesGetter> |
| int TsFileWriter::do_check_schema( |
| std::shared_ptr<IDeviceID> device_id, |
| MeasurementNamesGetter& measurement_names, |
| SimpleVector<ChunkWriter*>& chunk_writers, |
| SimpleVector<common::TSDataType>& data_types) { |
| int ret = E_OK; |
| DeviceSchemasMapIter dev_it = schemas_.find(device_id); |
| MeasurementSchemaGroup* device_schema = nullptr; |
| if (UNLIKELY(dev_it == schemas_.end()) || |
| IS_NULL(device_schema = dev_it->second)) { |
| return E_DEVICE_NOT_EXIST; |
| } |
| MeasurementSchemaMap& msm = device_schema->measurement_schema_map_; |
| uint32_t measurement_count = measurement_names.get_count(); |
| // chunk_writers.reserve(measurement_count); |
| for (uint32_t i = 0; i < measurement_count; i++) { |
| auto ms_iter = msm.find(measurement_names.next()); |
| if (UNLIKELY(ms_iter == msm.end())) { |
| chunk_writers.push_back(NULL); |
| data_types.push_back(common::NULL_TYPE); |
| } else { |
| // In Java we will check data_type. But in C++, no check here. |
| // Because checks are performed at the chunk layer and page layer |
| MeasurementSchema* ms = ms_iter->second; |
| if (IS_NULL(ms->chunk_writer_)) { |
| ms->chunk_writer_ = new ChunkWriter; |
| ret = ms->chunk_writer_->init(ms->measurement_name_, |
| ms->data_type_, ms->encoding_, |
| ms->compression_type_); |
| if (IS_SUCC(ret)) { |
| chunk_writers.push_back(ms->chunk_writer_); |
| } else { |
| for (size_t chunk_writer_idx = 0; |
| chunk_writer_idx < chunk_writers.size(); |
| chunk_writer_idx++) { |
| if (!chunk_writers[chunk_writer_idx]) { |
| delete chunk_writers[chunk_writer_idx]; |
| } |
| } |
| ret = common::E_INVALID_ARG; |
| return ret; |
| } |
| } else { |
| chunk_writers.push_back(ms->chunk_writer_); |
| } |
| data_types.push_back(ms->data_type_); |
| } |
| } |
| return ret; |
| } |
| |
| template <typename MeasurementNamesGetter> |
| int TsFileWriter::do_check_schema_aligned( |
| std::shared_ptr<IDeviceID> device_id, |
| MeasurementNamesGetter& measurement_names, |
| storage::TimeChunkWriter*& time_chunk_writer, |
| common::SimpleVector<storage::ValueChunkWriter*>& value_chunk_writers, |
| SimpleVector<common::TSDataType>& data_types) { |
| int ret = E_OK; |
| auto dev_it = schemas_.find(device_id); |
| MeasurementSchemaGroup* device_schema = NULL; |
| if (UNLIKELY(dev_it == schemas_.end()) || |
| IS_NULL(device_schema = dev_it->second)) { |
| return E_DEVICE_NOT_EXIST; |
| } |
| if (IS_NULL(device_schema->time_chunk_writer_)) { |
| device_schema->time_chunk_writer_ = new TimeChunkWriter(); |
| device_schema->time_chunk_writer_->init( |
| "", g_config_value_.time_encoding_type_, |
| g_config_value_.time_compress_type_); |
| } |
| time_chunk_writer = device_schema->time_chunk_writer_; |
| MeasurementSchemaMap& msm = device_schema->measurement_schema_map_; |
| uint32_t measurement_count = measurement_names.get_count(); |
| for (uint32_t i = 0; i < measurement_count; i++) { |
| auto ms_iter = msm.find(measurement_names.next()); |
| if (UNLIKELY(ms_iter == msm.end())) { |
| value_chunk_writers.push_back(NULL); |
| data_types.push_back(common::NULL_TYPE); |
| } else { |
| // Here we may check data_type against ms_iter. But in Java |
| // libtsfile, no check here. |
| MeasurementSchema* ms = ms_iter->second; |
| if (IS_NULL(ms->value_chunk_writer_)) { |
| ms->value_chunk_writer_ = new ValueChunkWriter; |
| ret = ms->value_chunk_writer_->init( |
| ms->measurement_name_, ms->data_type_, ms->encoding_, |
| ms->compression_type_); |
| if (IS_SUCC(ret)) { |
| value_chunk_writers.push_back(ms->value_chunk_writer_); |
| } else { |
| value_chunk_writers.push_back(NULL); |
| for (size_t chunk_writer_idx = 0; |
| chunk_writer_idx < value_chunk_writers.size(); |
| chunk_writer_idx++) { |
| if (!value_chunk_writers[chunk_writer_idx]) { |
| delete value_chunk_writers[chunk_writer_idx]; |
| } |
| } |
| ret = common::E_INVALID_ARG; |
| return ret; |
| } |
| } else { |
| value_chunk_writers.push_back(ms->value_chunk_writer_); |
| } |
| data_types.push_back(ms->data_type_); |
| } |
| } |
| return ret; |
| } |
| |
| int TsFileWriter::do_check_schema_table( |
| std::shared_ptr<IDeviceID> device_id, Tablet& tablet, |
| storage::TimeChunkWriter*& time_chunk_writer, |
| common::SimpleVector<storage::ValueChunkWriter*>& value_chunk_writers) { |
| int ret = E_OK; |
| |
| auto dev_it = schemas_.find(device_id); |
| MeasurementSchemaGroup* device_schema = NULL; |
| |
| auto& schema_map = io_writer_->get_schema()->table_schema_map_; |
| auto table_schema_it = schema_map.find(tablet.get_table_name()); |
| if (UNLIKELY(table_schema_it == schema_map.end())) { |
| return E_TABLE_NOT_EXIST; |
| } |
| auto table_schema = table_schema_it->second; |
| |
| if (UNLIKELY(dev_it == schemas_.end()) || |
| IS_NULL(device_schema = dev_it->second)) { |
| device_schema = new MeasurementSchemaGroup; |
| device_schema->is_aligned_ = true; |
| device_schema->time_chunk_writer_ = new TimeChunkWriter(); |
| device_schema->time_chunk_writer_->init( |
| "", g_config_value_.time_encoding_type_, |
| g_config_value_.time_compress_type_); |
| |
| for (uint32_t i = 0; i < table_schema->get_measurement_schemas().size(); |
| ++i) { |
| if (table_schema->get_column_categories().at(i) == |
| common::ColumnCategory::FIELD) { |
| auto table_column_schema = |
| table_schema->get_measurement_schemas().at(i); |
| auto device_column_schema = new MeasurementSchema( |
| table_column_schema->measurement_name_, |
| table_column_schema->data_type_, |
| table_column_schema->encoding_, |
| table_column_schema->compression_type_); |
| if (!table_column_schema->props_.empty()) { |
| device_column_schema->props_ = table_column_schema->props_; |
| } |
| device_schema->measurement_schema_map_ |
| [device_column_schema->measurement_name_] = |
| device_column_schema; |
| } |
| } |
| schemas_[device_id] = device_schema; |
| } |
| |
| // After recovery, device_schema may exist but time_chunk_writer_ not yet |
| // created |
| if (IS_NULL(device_schema->time_chunk_writer_)) { |
| device_schema->time_chunk_writer_ = new TimeChunkWriter(); |
| device_schema->time_chunk_writer_->init( |
| "", g_config_value_.time_encoding_type_, |
| g_config_value_.time_compress_type_); |
| } |
| |
| uint32_t column_cnt = tablet.get_column_count(); |
| time_chunk_writer = device_schema->time_chunk_writer_; |
| MeasurementSchemaMap& msm = device_schema->measurement_schema_map_; |
| |
| for (uint32_t i = 0; i < column_cnt; i++) { |
| if (tablet.column_categories_.at(i) != common::ColumnCategory::FIELD) { |
| continue; |
| } |
| auto ms_iter = msm.find(tablet.get_column_name(i)); |
| if (UNLIKELY(ms_iter == msm.end())) { |
| value_chunk_writers.push_back(NULL); |
| } else { |
| // Here we may check data_type against ms_iter. But in Java |
| // libtsfile, no check here. |
| MeasurementSchema* ms = ms_iter->second; |
| if (IS_NULL(ms->value_chunk_writer_)) { |
| ms->value_chunk_writer_ = new ValueChunkWriter; |
| ret = ms->value_chunk_writer_->init( |
| ms->measurement_name_, ms->data_type_, ms->encoding_, |
| ms->compression_type_); |
| if (IS_SUCC(ret)) { |
| value_chunk_writers.push_back(ms->value_chunk_writer_); |
| } else { |
| value_chunk_writers.push_back(NULL); |
| for (size_t chunk_writer_idx = 0; |
| chunk_writer_idx < value_chunk_writers.size(); |
| chunk_writer_idx++) { |
| if (!value_chunk_writers[chunk_writer_idx]) { |
| delete value_chunk_writers[chunk_writer_idx]; |
| } |
| } |
| ret = common::E_INVALID_ARG; |
| return ret; |
| } |
| } else { |
| value_chunk_writers.push_back(ms->value_chunk_writer_); |
| } |
| } |
| } |
| return ret; |
| } |
| |
| int64_t TsFileWriter::calculate_mem_size_for_all_group() { |
| int64_t mem_total_size = 0; |
| DeviceSchemasMapIter device_iter; |
| for (device_iter = schemas_.begin(); device_iter != schemas_.end(); |
| device_iter++) { |
| MeasurementSchemaGroup* chunk_group = device_iter->second; |
| MeasurementSchemaMap& map = chunk_group->measurement_schema_map_; |
| for (MeasurementSchemaMapIter ms_iter = map.begin(); |
| ms_iter != map.end(); ms_iter++) { |
| MeasurementSchema* m_schema = ms_iter->second; |
| if (!chunk_group->is_aligned_) { |
| ChunkWriter*& chunk_writer = m_schema->chunk_writer_; |
| if (chunk_writer != nullptr) { |
| mem_total_size += |
| chunk_writer->estimate_max_series_mem_size(); |
| } |
| } else { |
| ValueChunkWriter*& chunk_writer = m_schema->value_chunk_writer_; |
| if (chunk_writer != nullptr) { |
| mem_total_size += |
| chunk_writer->estimate_max_series_mem_size(); |
| } |
| } |
| } |
| if (chunk_group->is_aligned_) { |
| TimeChunkWriter*& time_chunk_writer = |
| chunk_group->time_chunk_writer_; |
| if (time_chunk_writer != nullptr) { |
| mem_total_size += |
| time_chunk_writer->estimate_max_series_mem_size(); |
| } |
| } |
| } |
| return mem_total_size; |
| } |
| |
| /** |
| * check occupied memory size, if it exceeds the chunkGroupSize threshold, flush |
| * them to given OutputStream. |
| */ |
| int TsFileWriter::check_memory_size_and_may_flush_chunks() { |
| int ret = E_OK; |
| if (record_count_since_last_flush_ >= record_count_for_next_mem_check_) { |
| int64_t mem_size = calculate_mem_size_for_all_group(); |
| record_count_for_next_mem_check_ = |
| record_count_since_last_flush_ * |
| common::g_config_value_.chunk_group_size_threshold_ / mem_size; |
| if (mem_size > common::g_config_value_.chunk_group_size_threshold_) { |
| ret = flush(); |
| } |
| } |
| return ret; |
| } |
| |
| int TsFileWriter::write_record(const TsRecord& record) { |
| int ret = E_OK; |
| // std::vector<ChunkWriter*> chunk_writers; |
| SimpleVector<ChunkWriter*> chunk_writers; |
| SimpleVector<common::TSDataType> data_types; |
| MeasurementNamesFromRecord mnames_getter(record); |
| if (RET_FAIL(do_check_schema( |
| std::make_shared<StringArrayDeviceID>(record.device_id_), |
| mnames_getter, chunk_writers, data_types))) { |
| return ret; |
| } |
| |
| ASSERT(chunk_writers.size() == record.points_.size()); |
| for (uint32_t c = 0; c < chunk_writers.size(); c++) { |
| ChunkWriter* chunk_writer = chunk_writers[c]; |
| if (IS_NULL(chunk_writer)) { |
| continue; |
| } |
| // ignore point writer failure |
| write_point(chunk_writer, record.timestamp_, data_types[c], |
| record.points_[c]); |
| } |
| |
| record_count_since_last_flush_++; |
| ret = check_memory_size_and_may_flush_chunks(); |
| return ret; |
| } |
| |
| int TsFileWriter::write_record_aligned(const TsRecord& record) { |
| int ret = E_OK; |
| SimpleVector<ValueChunkWriter*> value_chunk_writers; |
| SimpleVector<common::TSDataType> data_types; |
| TimeChunkWriter* time_chunk_writer; |
| MeasurementNamesFromRecord mnames_getter(record); |
| if (RET_FAIL(do_check_schema_aligned( |
| std::make_shared<StringArrayDeviceID>(record.device_id_), |
| mnames_getter, time_chunk_writer, value_chunk_writers, |
| data_types))) { |
| return ret; |
| } |
| if (value_chunk_writers.size() != record.points_.size()) { |
| return E_INVALID_ARG; |
| } |
| int32_t time_pages_before = time_chunk_writer->num_of_pages(); |
| std::vector<int32_t> value_pages_before(value_chunk_writers.size(), 0); |
| for (uint32_t c = 0; c < value_chunk_writers.size(); c++) { |
| ValueChunkWriter* value_chunk_writer = value_chunk_writers[c]; |
| if (!IS_NULL(value_chunk_writer)) { |
| value_pages_before[c] = value_chunk_writer->num_of_pages(); |
| } |
| } |
| time_chunk_writer->write(record.timestamp_); |
| for (uint32_t c = 0; c < value_chunk_writers.size(); c++) { |
| ValueChunkWriter* value_chunk_writer = value_chunk_writers[c]; |
| if (IS_NULL(value_chunk_writer)) { |
| continue; |
| } |
| write_point_aligned(value_chunk_writer, record.timestamp_, |
| data_types[c], record.points_[c]); |
| } |
| if (RET_FAIL(maybe_seal_aligned_pages_together( |
| time_chunk_writer, value_chunk_writers, time_pages_before, |
| value_pages_before))) { |
| return ret; |
| } |
| return ret; |
| } |
| |
| int TsFileWriter::write_point(ChunkWriter* chunk_writer, int64_t timestamp, |
| common::TSDataType data_type, |
| const DataPoint& point) { |
| switch (data_type) { |
| case common::BOOLEAN: |
| return chunk_writer->write(timestamp, point.u_.bool_val_); |
| case common::DATE: |
| case common::INT32: |
| return chunk_writer->write(timestamp, point.u_.i32_val_); |
| case common::TIMESTAMP: |
| case common::INT64: |
| return chunk_writer->write(timestamp, point.u_.i64_val_); |
| case common::FLOAT: |
| return chunk_writer->write(timestamp, point.u_.float_val_); |
| case common::DOUBLE: |
| return chunk_writer->write(timestamp, point.u_.double_val_); |
| case common::BLOB: |
| case common::TEXT: |
| case common::STRING: |
| return chunk_writer->write(timestamp, point.text_val_); |
| default: |
| return E_INVALID_DATA_POINT; |
| } |
| } |
| |
| int TsFileWriter::write_point_aligned(ValueChunkWriter* value_chunk_writer, |
| int64_t timestamp, |
| common::TSDataType data_type, |
| const DataPoint& point) { |
| bool isnull = point.isnull; |
| switch (data_type) { |
| case common::BOOLEAN: |
| return value_chunk_writer->write(timestamp, point.u_.bool_val_, |
| isnull); |
| case common::INT32: |
| case common::DATE: |
| return value_chunk_writer->write(timestamp, point.u_.i32_val_, |
| isnull); |
| case common::TIMESTAMP: |
| case common::INT64: |
| return value_chunk_writer->write(timestamp, point.u_.i64_val_, |
| isnull); |
| case common::FLOAT: |
| return value_chunk_writer->write(timestamp, point.u_.float_val_, |
| isnull); |
| case common::DOUBLE: |
| return value_chunk_writer->write(timestamp, point.u_.double_val_, |
| isnull); |
| case common::BLOB: |
| case common::TEXT: |
| case common::STRING: |
| return value_chunk_writer->write(timestamp, point.text_val_, |
| isnull); |
| default: |
| return E_INVALID_DATA_POINT; |
| } |
| } |
| |
| int TsFileWriter::maybe_seal_aligned_pages_together( |
| TimeChunkWriter* time_chunk_writer, |
| common::SimpleVector<ValueChunkWriter*>& value_chunk_writers, |
| int32_t time_pages_before, const std::vector<int32_t>& value_pages_before) { |
| bool should_seal_all = |
| time_chunk_writer->num_of_pages() > time_pages_before; |
| for (uint32_t c = 0; c < value_chunk_writers.size() && !should_seal_all; |
| c++) { |
| ValueChunkWriter* value_chunk_writer = value_chunk_writers[c]; |
| if (!IS_NULL(value_chunk_writer) && |
| value_chunk_writer->num_of_pages() > value_pages_before[c]) { |
| should_seal_all = true; |
| break; |
| } |
| } |
| if (!should_seal_all) { |
| return E_OK; |
| } |
| |
| int ret = E_OK; |
| if (time_chunk_writer->has_current_page_data() && |
| RET_FAIL(time_chunk_writer->seal_current_page())) { |
| return ret; |
| } |
| for (uint32_t c = 0; c < value_chunk_writers.size(); c++) { |
| ValueChunkWriter* value_chunk_writer = value_chunk_writers[c]; |
| if (!IS_NULL(value_chunk_writer) && |
| value_chunk_writer->has_current_page_data() && |
| RET_FAIL(value_chunk_writer->seal_current_page())) { |
| return ret; |
| } |
| } |
| return ret; |
| } |
| |
| int TsFileWriter::write_tablet_aligned(const Tablet& tablet) { |
| int ret = E_OK; |
| SimpleVector<ValueChunkWriter*> value_chunk_writers; |
| TimeChunkWriter* time_chunk_writer = nullptr; |
| SimpleVector<common::TSDataType> data_types; |
| MeasurementNamesFromTablet mnames_getter(tablet); |
| if (RET_FAIL(do_check_schema_aligned( |
| std::make_shared<StringArrayDeviceID>(tablet.insert_target_name_), |
| mnames_getter, time_chunk_writer, value_chunk_writers, |
| data_types))) { |
| return ret; |
| } |
| const uint32_t total_rows = tablet.get_cur_row_size(); |
| const bool strict_page_size = common::g_config_value_.strict_page_size_; |
| |
| // Decide whether we have string/blob/text columns. |
| bool has_varlen_column = false; |
| for (uint32_t i = 0; i < data_types.size(); i++) { |
| if (data_types[i] == common::STRING || data_types[i] == common::TEXT || |
| data_types[i] == common::BLOB) { |
| has_varlen_column = true; |
| break; |
| } |
| } |
| |
| // Keep writers' seal-check behavior consistent across calls. |
| time_chunk_writer->set_enable_page_seal_if_full(strict_page_size); |
| for (uint32_t c = 0; c < value_chunk_writers.size(); c++) { |
| if (!IS_NULL(value_chunk_writers[c])) { |
| value_chunk_writers[c]->set_enable_page_seal_if_full( |
| strict_page_size); |
| } |
| } |
| |
| if (strict_page_size) { |
| // Strict mode: keep the original row-based insertion to ensure aligned |
| // pages seal together when either side becomes full. |
| for (uint32_t row = 0; row < total_rows; row++) { |
| int32_t time_pages_before = time_chunk_writer->num_of_pages(); |
| std::vector<int32_t> value_pages_before(value_chunk_writers.size(), |
| 0); |
| for (uint32_t c = 0; c < value_chunk_writers.size(); c++) { |
| ValueChunkWriter* value_chunk_writer = value_chunk_writers[c]; |
| if (!IS_NULL(value_chunk_writer)) { |
| value_pages_before[c] = value_chunk_writer->num_of_pages(); |
| } |
| } |
| |
| if (RET_FAIL(time_chunk_writer->write(tablet.timestamps_[row]))) { |
| return ret; |
| } |
| ASSERT(value_chunk_writers.size() == tablet.get_column_count()); |
| for (uint32_t c = 0; c < value_chunk_writers.size(); c++) { |
| ValueChunkWriter* value_chunk_writer = value_chunk_writers[c]; |
| if (IS_NULL(value_chunk_writer)) { |
| continue; |
| } |
| if (RET_FAIL(value_write_column(value_chunk_writer, tablet, c, |
| row, row + 1))) { |
| return ret; |
| } |
| } |
| if (RET_FAIL(maybe_seal_aligned_pages_together( |
| time_chunk_writer, value_chunk_writers, time_pages_before, |
| value_pages_before))) { |
| return ret; |
| } |
| } |
| return ret; |
| } |
| |
| // Non-strict mode: switch to column-based insertion. |
| if (!has_varlen_column) { |
| // Optimization: when there is no string/blob/text column, we only need |
| // to split by point-number so that each split will trigger a page |
| // seal (and avoid the per-row page-size check). |
| const uint32_t points_per_page = |
| common::g_config_value_.page_writer_max_point_num_; |
| |
| // Disable auto page sealing. We will seal pages at split boundaries. |
| time_chunk_writer->set_enable_page_seal_if_full(false); |
| for (uint32_t c = 0; c < value_chunk_writers.size(); c++) { |
| if (!IS_NULL(value_chunk_writers[c])) { |
| value_chunk_writers[c]->set_enable_page_seal_if_full(false); |
| } |
| } |
| |
| // Determine how many points we need to fill the current unsealed time |
| // page (it may already contain data from previous tablets). |
| uint32_t time_cur_points = time_chunk_writer->get_point_numer(); |
| if (time_cur_points >= points_per_page && |
| time_chunk_writer->has_current_page_data()) { |
| // Close the already-full page together with all aligned value |
| // pages. |
| if (RET_FAIL(time_chunk_writer->seal_current_page())) { |
| return ret; |
| } |
| for (uint32_t c = 0; c < value_chunk_writers.size(); c++) { |
| ValueChunkWriter* value_chunk_writer = value_chunk_writers[c]; |
| if (!IS_NULL(value_chunk_writer) && |
| value_chunk_writer->has_current_page_data()) { |
| if (RET_FAIL(value_chunk_writer->seal_current_page())) { |
| return ret; |
| } |
| } |
| } |
| time_cur_points = 0; |
| } |
| const uint32_t first_seg_len = |
| (time_cur_points > 0 && time_cur_points < points_per_page) |
| ? (points_per_page - time_cur_points) |
| : points_per_page; |
| |
| // 1) Write time in segments and seal all full segments (except the |
| // last remaining segment). |
| uint32_t seg_start = 0; |
| uint32_t seg_len = first_seg_len; |
| while (seg_start < total_rows) { |
| const uint32_t seg_end = std::min(seg_start + seg_len, total_rows); |
| if (RET_FAIL(time_write_column(time_chunk_writer, tablet, seg_start, |
| seg_end))) { |
| return ret; |
| } |
| seg_start = seg_end; |
| if (seg_start < total_rows) { |
| if (RET_FAIL(time_chunk_writer->seal_current_page())) { |
| return ret; |
| } |
| } |
| seg_len = points_per_page; |
| } |
| |
| // 2) Write each value column in the same segments. |
| ASSERT(value_chunk_writers.size() == tablet.get_column_count()); |
| for (uint32_t col = 0; col < value_chunk_writers.size(); col++) { |
| ValueChunkWriter* value_chunk_writer = value_chunk_writers[col]; |
| if (IS_NULL(value_chunk_writer)) { |
| continue; |
| } |
| |
| seg_start = 0; |
| seg_len = first_seg_len; |
| while (seg_start < total_rows) { |
| const uint32_t seg_end = |
| std::min(seg_start + seg_len, total_rows); |
| if (RET_FAIL(value_write_column(value_chunk_writer, tablet, col, |
| seg_start, seg_end))) { |
| return ret; |
| } |
| seg_start = seg_end; |
| if (seg_start < total_rows) { |
| if (value_chunk_writer->has_current_page_data() && |
| RET_FAIL(value_chunk_writer->seal_current_page())) { |
| return ret; |
| } |
| } |
| seg_len = points_per_page; |
| } |
| } |
| return ret; |
| } |
| |
| // General non-strict (may have varlen STRING/TEXT/BLOB columns): |
| // time auto-seals to provide aligned page boundaries; value writers |
| // skip auto page sealing and are sealed manually at time boundaries. |
| // Attention: since value-side auto-seal is disabled, if a varlen value |
| // page hits the memory threshold earlier, it may not seal immediately |
| // and instead will be sealed later at the recorded time-page boundaries |
| // (this may sacrifice the strict page size limit for performance). |
| time_chunk_writer->set_enable_page_seal_if_full(true); |
| for (uint32_t c = 0; c < value_chunk_writers.size(); c++) { |
| if (!IS_NULL(value_chunk_writers[c])) { |
| value_chunk_writers[c]->set_enable_page_seal_if_full(false); |
| } |
| } |
| |
| std::vector<uint32_t> time_page_row_ends; |
| const uint32_t page_max_points = std::max<uint32_t>( |
| 1, common::g_config_value_.page_writer_max_point_num_); |
| time_page_row_ends.reserve(total_rows / page_max_points + 1); |
| |
| // Write time and record where a time page is sealed. |
| for (uint32_t row = 0; row < total_rows; row++) { |
| const int32_t pages_before = time_chunk_writer->num_of_pages(); |
| if (RET_FAIL(time_chunk_writer->write(tablet.timestamps_[row]))) { |
| return ret; |
| } |
| const int32_t pages_after = time_chunk_writer->num_of_pages(); |
| if (pages_after > pages_before) { |
| const uint32_t boundary_end = row + 1; |
| if (time_page_row_ends.empty() || |
| time_page_row_ends.back() != boundary_end) { |
| time_page_row_ends.push_back(boundary_end); |
| } |
| } |
| } |
| |
| // Write values column-by-column and seal at recorded boundaries. |
| ASSERT(value_chunk_writers.size() == tablet.get_column_count()); |
| for (uint32_t col = 0; col < value_chunk_writers.size(); col++) { |
| ValueChunkWriter* value_chunk_writer = value_chunk_writers[col]; |
| if (IS_NULL(value_chunk_writer)) { |
| continue; |
| } |
| uint32_t seg_start = 0; |
| for (uint32_t boundary_end : time_page_row_ends) { |
| if (boundary_end <= seg_start) { |
| continue; |
| } |
| if (RET_FAIL(value_write_column(value_chunk_writer, tablet, col, |
| seg_start, boundary_end))) { |
| return ret; |
| } |
| if (value_chunk_writer->has_current_page_data() && |
| RET_FAIL(value_chunk_writer->seal_current_page())) { |
| return ret; |
| } |
| seg_start = boundary_end; |
| } |
| if (seg_start < total_rows) { |
| if (RET_FAIL(value_write_column(value_chunk_writer, tablet, col, |
| seg_start, total_rows))) { |
| return ret; |
| } |
| } |
| } |
| return ret; |
| } |
| |
| int TsFileWriter::write_tablet(const Tablet& tablet) { |
| int ret = E_OK; |
| SimpleVector<ChunkWriter*> chunk_writers; |
| SimpleVector<common::TSDataType> data_types; |
| MeasurementNamesFromTablet mnames_getter(tablet); |
| if (RET_FAIL(do_check_schema( |
| std::make_shared<StringArrayDeviceID>(tablet.insert_target_name_), |
| mnames_getter, chunk_writers, data_types))) { |
| return ret; |
| } |
| ASSERT(chunk_writers.size() == tablet.get_column_count()); |
| for (uint32_t c = 0; c < chunk_writers.size(); c++) { |
| ChunkWriter* chunk_writer = chunk_writers[c]; |
| if (IS_NULL(chunk_writer)) { |
| continue; |
| } |
| if (RET_FAIL(write_column(chunk_writer, tablet, c))) { |
| return ret; |
| } |
| } |
| |
| record_count_since_last_flush_ += tablet.max_row_num_; |
| ret = check_memory_size_and_may_flush_chunks(); |
| return ret; |
| } |
| |
| int TsFileWriter::write_tree(const Tablet& tablet) { |
| auto device_id = |
| std::make_shared<StringArrayDeviceID>(tablet.insert_target_name_); |
| if (schemas_.find(device_id) != schemas_.end()) { |
| auto schema = schemas_[device_id]; |
| if (schema->is_aligned_) { |
| return this->write_tablet_aligned(tablet); |
| } |
| return this->write_tablet(tablet); |
| } |
| return E_NOT_EXIST; |
| } |
| |
| int TsFileWriter::write_tree(const TsRecord& record) { |
| auto device_id = std::make_shared<StringArrayDeviceID>(record.device_id_); |
| if (schemas_.find(device_id) != schemas_.end()) { |
| auto schema = schemas_[device_id]; |
| if (schema->is_aligned_) { |
| return this->write_record_aligned(record); |
| } |
| return this->write_record(record); |
| } |
| return E_NOT_EXIST; |
| } |
| |
| int TsFileWriter::write_table(Tablet& tablet) { |
| int ret = E_OK; |
| if (io_writer_->get_schema()->table_schema_map_.find( |
| tablet.insert_target_name_) == |
| io_writer_->get_schema()->table_schema_map_.end()) { |
| ret = E_TABLE_NOT_EXIST; |
| return ret; |
| } |
| if (RET_FAIL(do_check_and_prepare_tablet(tablet))) { |
| return ret; |
| } |
| |
| auto device_id_end_index_pairs = split_tablet_by_device(tablet); |
| int start_idx = 0; |
| for (auto& device_id_end_index_pair : device_id_end_index_pairs) { |
| auto device_id = device_id_end_index_pair.first; |
| int end_idx = device_id_end_index_pair.second; |
| if (end_idx == 0) continue; |
| |
| SimpleVector<ValueChunkWriter*> value_chunk_writers; |
| TimeChunkWriter* time_chunk_writer = nullptr; |
| if (RET_FAIL(do_check_schema_table(device_id, tablet, time_chunk_writer, |
| value_chunk_writers))) { |
| return ret; |
| } |
| |
| std::vector<uint32_t> field_columns; |
| field_columns.reserve(tablet.get_column_count()); |
| for (uint32_t col = 0; col < tablet.get_column_count(); ++col) { |
| if (tablet.column_categories_[col] == |
| common::ColumnCategory::FIELD) { |
| field_columns.push_back(col); |
| } |
| } |
| ASSERT(field_columns.size() == value_chunk_writers.size()); |
| |
| // Precompute page boundaries from point counts — no serial write |
| // needed. The first segment may be shorter if the time page already |
| // holds data from a previous write_table call. |
| const uint32_t page_max_points = std::max<uint32_t>( |
| 1, common::g_config_value_.page_writer_max_point_num_); |
| const uint32_t si = static_cast<uint32_t>(start_idx); |
| const uint32_t ei = static_cast<uint32_t>(end_idx); |
| |
| // If the current unsealed page is already at or past capacity (from |
| // a previous write_table call), seal it before starting new segments. |
| uint32_t time_cur_points = time_chunk_writer->get_point_numer(); |
| if (time_cur_points >= page_max_points) { |
| if (time_chunk_writer->has_current_page_data()) { |
| if (RET_FAIL(time_chunk_writer->seal_current_page())) { |
| return ret; |
| } |
| } |
| for (uint32_t k = 0; k < value_chunk_writers.size(); k++) { |
| if (!IS_NULL(value_chunk_writers[k]) && |
| value_chunk_writers[k]->has_current_page_data()) { |
| if (RET_FAIL(value_chunk_writers[k]->seal_current_page())) { |
| return ret; |
| } |
| } |
| } |
| time_cur_points = 0; |
| } |
| const uint32_t first_seg_cap = |
| (time_cur_points > 0 && time_cur_points < page_max_points) |
| ? (page_max_points - time_cur_points) |
| : page_max_points; |
| |
| std::vector<uint32_t> page_boundaries; // row indices where a page |
| // should seal |
| { |
| uint32_t pos = si; |
| uint32_t seg_cap = first_seg_cap; |
| while (pos < ei) { |
| uint32_t seg_end = std::min(pos + seg_cap, ei); |
| if (seg_end < ei) { |
| page_boundaries.push_back(seg_end); |
| } |
| pos = seg_end; |
| seg_cap = page_max_points; |
| } |
| } |
| |
| // We control page sealing explicitly at precomputed boundaries, so |
| // auto-seal must be disabled during segmented writes — otherwise a |
| // segment of exactly page_max_points would trigger auto-seal AND |
| // our explicit seal, double-sealing (sealing an empty page → crash). |
| // Note: with auto-seal off, the memory-based threshold |
| // (page_writer_max_memory_bytes_) is not enforced within a segment. |
| // For varlen columns (STRING/TEXT/BLOB), individual pages may exceed |
| // the memory limit. Each segment is still bounded by |
| // page_max_points rows, keeping pages within a reasonable size. |
| auto write_time_in_segments = [this, &tablet, &page_boundaries, si, |
| ei](TimeChunkWriter* tcw) -> int { |
| int r = E_OK; |
| tcw->set_enable_page_seal_if_full(false); |
| uint32_t seg_start = si; |
| for (uint32_t boundary : page_boundaries) { |
| if ((r = time_write_column(tcw, tablet, seg_start, boundary)) != |
| E_OK) |
| return r; |
| if ((r = tcw->seal_current_page()) != E_OK) return r; |
| seg_start = boundary; |
| } |
| if (seg_start < ei) { |
| r = time_write_column(tcw, tablet, seg_start, ei); |
| } |
| tcw->set_enable_page_seal_if_full(true); |
| return r; |
| }; |
| |
| auto write_value_in_segments = [this, &tablet, &page_boundaries, si, |
| ei](ValueChunkWriter* vcw, |
| uint32_t col_idx) -> int { |
| int r = E_OK; |
| vcw->set_enable_page_seal_if_full(false); |
| uint32_t seg_start = si; |
| for (uint32_t boundary : page_boundaries) { |
| if ((r = value_write_column(vcw, tablet, col_idx, seg_start, |
| boundary)) != E_OK) |
| return r; |
| if (vcw->has_current_page_data() && |
| (r = vcw->seal_current_page()) != E_OK) |
| return r; |
| seg_start = boundary; |
| } |
| if (seg_start < ei) { |
| r = value_write_column(vcw, tablet, col_idx, seg_start, ei); |
| } |
| vcw->set_enable_page_seal_if_full(true); |
| return r; |
| }; |
| |
| // All columns (time + values) write the same row segments and seal |
| // at the same boundaries — fully parallel. |
| #ifdef ENABLE_THREADS |
| if (g_config_value_.parallel_write_enabled_) { |
| std::vector<std::future<int>> futures; |
| futures.push_back(g_write_thread_pool_->submit( |
| [&write_time_in_segments, time_chunk_writer]() { |
| return write_time_in_segments(time_chunk_writer); |
| })); |
| for (uint32_t k = 0; k < value_chunk_writers.size(); k++) { |
| ValueChunkWriter* vcw = value_chunk_writers[k]; |
| if (IS_NULL(vcw)) continue; |
| uint32_t col_idx = field_columns[k]; |
| futures.push_back(g_write_thread_pool_->submit( |
| [&write_value_in_segments, vcw, col_idx]() { |
| return write_value_in_segments(vcw, col_idx); |
| })); |
| } |
| for (auto& f : futures) { |
| int r = f.get(); |
| if (r != E_OK && ret == E_OK) ret = r; |
| } |
| if (ret != E_OK) return ret; |
| } else |
| #endif |
| { |
| if (RET_FAIL(write_time_in_segments(time_chunk_writer))) { |
| return ret; |
| } |
| for (uint32_t k = 0; k < value_chunk_writers.size(); k++) { |
| ValueChunkWriter* vcw = value_chunk_writers[k]; |
| if (IS_NULL(vcw)) continue; |
| if (RET_FAIL(write_value_in_segments(vcw, field_columns[k]))) { |
| return ret; |
| } |
| } |
| } |
| start_idx = end_idx; |
| } |
| record_count_since_last_flush_ += tablet.cur_row_size_; |
| // Reset string column buffers so the tablet can be reused for the next |
| // batch without accumulating memory across writes. |
| tablet.reset_string_columns(); |
| ret = check_memory_size_and_may_flush_chunks(); |
| return ret; |
| } |
| |
| std::vector<std::pair<std::shared_ptr<IDeviceID>, int>> |
| TsFileWriter::split_tablet_by_device(const Tablet& tablet) { |
| std::vector<std::pair<std::shared_ptr<IDeviceID>, int>> result; |
| |
| if (tablet.id_column_indexes_.empty()) { |
| auto sentinel = std::make_shared<StringArrayDeviceID>("last_device_id"); |
| result.emplace_back(std::move(sentinel), 0); |
| std::vector<std::string*> id_array; |
| id_array.push_back(new std::string(tablet.insert_target_name_)); |
| auto res = std::make_shared<StringArrayDeviceID>(id_array); |
| delete id_array[0]; |
| result.emplace_back(std::move(res), tablet.get_cur_row_size()); |
| return result; |
| } |
| |
| const uint32_t row_count = tablet.get_cur_row_size(); |
| if (row_count == 0) return result; |
| |
| auto sentinel = std::make_shared<StringArrayDeviceID>("last_device_id"); |
| result.emplace_back(std::move(sentinel), 0); |
| |
| auto boundaries = tablet.find_all_device_boundaries(); |
| |
| uint32_t seg_start = 0; |
| for (uint32_t b : boundaries) { |
| std::shared_ptr<IDeviceID> dev_id(tablet.get_device_id(seg_start)); |
| result.emplace_back(std::move(dev_id), b); |
| seg_start = b; |
| } |
| std::shared_ptr<IDeviceID> last_id(tablet.get_device_id(seg_start)); |
| result.emplace_back(std::move(last_id), row_count); |
| return result; |
| } |
| |
| int TsFileWriter::write_column(ChunkWriter* chunk_writer, const Tablet& tablet, |
| int col_idx, uint32_t start_idx, |
| uint32_t end_idx) { |
| int ret = E_OK; |
| |
| common::TSDataType data_type = tablet.schema_vec_->at(col_idx).data_type_; |
| int64_t* timestamps = tablet.timestamps_; |
| Tablet::ValueMatrixEntry col_values = tablet.value_matrix_[col_idx]; |
| BitMap& col_notnull_bitmap = tablet.bitmaps_[col_idx]; |
| end_idx = std::min(end_idx, tablet.max_row_num_); |
| |
| if (data_type == common::BOOLEAN) { |
| ret = write_typed_column(chunk_writer, timestamps, col_values.bool_data, |
| col_notnull_bitmap, start_idx, end_idx); |
| } else if (data_type == common::INT32) { |
| ret = |
| write_typed_column(chunk_writer, timestamps, col_values.int32_data, |
| col_notnull_bitmap, start_idx, end_idx); |
| } else if (data_type == common::INT64) { |
| ret = |
| write_typed_column(chunk_writer, timestamps, col_values.int64_data, |
| col_notnull_bitmap, start_idx, end_idx); |
| } else if (data_type == common::FLOAT) { |
| ret = |
| write_typed_column(chunk_writer, timestamps, col_values.float_data, |
| col_notnull_bitmap, start_idx, end_idx); |
| } else if (data_type == common::DOUBLE) { |
| ret = |
| write_typed_column(chunk_writer, timestamps, col_values.double_data, |
| col_notnull_bitmap, start_idx, end_idx); |
| } else if (data_type == common::STRING) { |
| ret = |
| write_typed_column(chunk_writer, timestamps, col_values.string_col, |
| col_notnull_bitmap, start_idx, end_idx); |
| } else { |
| ASSERT(false); |
| } |
| return ret; |
| } |
| |
| int TsFileWriter::time_write_column(TimeChunkWriter* time_chunk_writer, |
| const Tablet& tablet, uint32_t start_idx, |
| uint32_t end_idx) { |
| int64_t* timestamps = tablet.timestamps_; |
| int ret = E_OK; |
| if (IS_NULL(time_chunk_writer) || IS_NULL(timestamps)) { |
| return E_INVALID_ARG; |
| } |
| for (uint32_t r = start_idx; r < end_idx && r < tablet.max_row_num_; r++) { |
| if (RET_FAIL(time_chunk_writer->write(timestamps[r]))) { |
| break; |
| } |
| } |
| return ret; |
| } |
| |
| int TsFileWriter::value_write_column(ValueChunkWriter* value_chunk_writer, |
| const Tablet& tablet, int col_idx, |
| uint32_t start_idx, uint32_t end_idx) { |
| int ret = E_OK; |
| |
| TSDataType data_type = tablet.schema_vec_->at(col_idx).data_type_; |
| int64_t* timestamps = tablet.timestamps_; |
| Tablet::ValueMatrixEntry col_values = tablet.value_matrix_[col_idx]; |
| BitMap& col_notnull_bitmap = tablet.bitmaps_[col_idx]; |
| switch (data_type) { |
| case common::BOOLEAN: |
| ret = write_typed_column(value_chunk_writer, timestamps, |
| (bool*)col_values.bool_data, |
| col_notnull_bitmap, start_idx, end_idx); |
| break; |
| case common::DATE: |
| case common::INT32: |
| ret = write_typed_column(value_chunk_writer, timestamps, |
| (int32_t*)col_values.int32_data, |
| col_notnull_bitmap, start_idx, end_idx); |
| break; |
| case common::TIMESTAMP: |
| case common::INT64: |
| ret = write_typed_column(value_chunk_writer, timestamps, |
| (int64_t*)col_values.int64_data, |
| col_notnull_bitmap, start_idx, end_idx); |
| break; |
| case common::FLOAT: |
| ret = write_typed_column(value_chunk_writer, timestamps, |
| (float*)col_values.float_data, |
| col_notnull_bitmap, start_idx, end_idx); |
| break; |
| case common::DOUBLE: |
| ret = write_typed_column(value_chunk_writer, timestamps, |
| (double*)col_values.double_data, |
| col_notnull_bitmap, start_idx, end_idx); |
| break; |
| case common::STRING: |
| case common::TEXT: |
| case common::BLOB: |
| ret = write_typed_column(value_chunk_writer, timestamps, |
| col_values.string_col, col_notnull_bitmap, |
| start_idx, end_idx); |
| break; |
| default: |
| ret = E_NOT_SUPPORT; |
| } |
| return ret; |
| } |
| |
| #define DO_WRITE_TYPED_COLUMN() \ |
| do { \ |
| int ret = E_OK; \ |
| for (uint32_t r = start_idx; r < end_idx; r++) { \ |
| if (LIKELY(!col_notnull_bitmap.test(r))) { \ |
| if (RET_FAIL( \ |
| chunk_writer->write(timestamps[r], col_values[r]))) { \ |
| return ret; \ |
| } \ |
| } \ |
| } \ |
| return ret; \ |
| } while (false) |
| |
| #define DO_VALUE_WRITE_TYPED_COLUMN() \ |
| do { \ |
| int ret = E_OK; \ |
| for (uint32_t r = start_idx; r < end_idx; r++) { \ |
| if (LIKELY(col_notnull_bitmap.test(r))) { \ |
| if (RET_FAIL(value_chunk_writer->write( \ |
| timestamps[r], col_values[r], true))) { \ |
| return ret; \ |
| } \ |
| } else { \ |
| if (RET_FAIL(value_chunk_writer->write( \ |
| timestamps[r], col_values[r], false))) { \ |
| return ret; \ |
| } \ |
| } \ |
| } \ |
| return ret; \ |
| } while (false) |
| |
| int TsFileWriter::write_typed_column(ChunkWriter* chunk_writer, |
| int64_t* timestamps, bool* col_values, |
| BitMap& col_notnull_bitmap, |
| uint32_t start_idx, uint32_t end_idx) { |
| DO_WRITE_TYPED_COLUMN(); |
| } |
| |
| int TsFileWriter::write_typed_column(ChunkWriter* chunk_writer, |
| int64_t* timestamps, int32_t* col_values, |
| BitMap& col_notnull_bitmap, |
| uint32_t start_idx, uint32_t end_idx) { |
| DO_WRITE_TYPED_COLUMN(); |
| } |
| |
| int TsFileWriter::write_typed_column(ChunkWriter* chunk_writer, |
| int64_t* timestamps, int64_t* col_values, |
| BitMap& col_notnull_bitmap, |
| uint32_t start_idx, uint32_t end_idx) { |
| DO_WRITE_TYPED_COLUMN(); |
| } |
| |
| int TsFileWriter::write_typed_column(ChunkWriter* chunk_writer, |
| int64_t* timestamps, float* col_values, |
| BitMap& col_notnull_bitmap, |
| uint32_t start_idx, uint32_t end_idx) { |
| DO_WRITE_TYPED_COLUMN(); |
| } |
| |
| int TsFileWriter::write_typed_column(ChunkWriter* chunk_writer, |
| int64_t* timestamps, double* col_values, |
| BitMap& col_notnull_bitmap, |
| uint32_t start_idx, uint32_t end_idx) { |
| DO_WRITE_TYPED_COLUMN(); |
| } |
| |
| int TsFileWriter::write_typed_column(ChunkWriter* chunk_writer, |
| int64_t* timestamps, |
| Tablet::StringColumn* string_col, |
| BitMap& col_notnull_bitmap, |
| uint32_t start_idx, uint32_t end_idx) { |
| int ret = E_OK; |
| for (uint32_t r = start_idx; r < end_idx; r++) { |
| if (LIKELY(!col_notnull_bitmap.test(r))) { |
| common::String val( |
| string_col->buffer + string_col->offsets[r], |
| static_cast<uint32_t>(string_col->offsets[r + 1] - |
| string_col->offsets[r])); |
| if (RET_FAIL(chunk_writer->write(timestamps[r], val))) { |
| return ret; |
| } |
| } |
| } |
| return ret; |
| } |
| |
| int TsFileWriter::write_typed_column(ValueChunkWriter* value_chunk_writer, |
| int64_t* timestamps, bool* col_values, |
| BitMap& col_notnull_bitmap, |
| uint32_t start_idx, uint32_t end_idx) { |
| DO_VALUE_WRITE_TYPED_COLUMN(); |
| } |
| |
| int TsFileWriter::write_typed_column(ValueChunkWriter* value_chunk_writer, |
| int64_t* timestamps, int32_t* col_values, |
| BitMap& col_notnull_bitmap, |
| uint32_t start_idx, uint32_t end_idx) { |
| DO_VALUE_WRITE_TYPED_COLUMN(); |
| } |
| |
| int TsFileWriter::write_typed_column(ValueChunkWriter* value_chunk_writer, |
| int64_t* timestamps, int64_t* col_values, |
| BitMap& col_notnull_bitmap, |
| uint32_t start_idx, uint32_t end_idx) { |
| DO_VALUE_WRITE_TYPED_COLUMN(); |
| } |
| |
| int TsFileWriter::write_typed_column(ValueChunkWriter* value_chunk_writer, |
| int64_t* timestamps, float* col_values, |
| BitMap& col_notnull_bitmap, |
| uint32_t start_idx, uint32_t end_idx) { |
| DO_VALUE_WRITE_TYPED_COLUMN(); |
| } |
| |
| int TsFileWriter::write_typed_column(ValueChunkWriter* value_chunk_writer, |
| int64_t* timestamps, double* col_values, |
| BitMap& col_notnull_bitmap, |
| uint32_t start_idx, uint32_t end_idx) { |
| DO_VALUE_WRITE_TYPED_COLUMN(); |
| } |
| |
| int TsFileWriter::write_typed_column(ValueChunkWriter* value_chunk_writer, |
| int64_t* timestamps, |
| Tablet::StringColumn* string_col, |
| common::BitMap& col_notnull_bitmap, |
| uint32_t start_idx, uint32_t end_idx) { |
| int ret = E_OK; |
| for (uint32_t r = start_idx; r < end_idx; r++) { |
| common::String val(string_col->buffer + string_col->offsets[r], |
| static_cast<uint32_t>(string_col->offsets[r + 1] - |
| string_col->offsets[r])); |
| if (LIKELY(col_notnull_bitmap.test(r))) { |
| if (RET_FAIL(value_chunk_writer->write(timestamps[r], val, true))) { |
| return ret; |
| } |
| } else { |
| if (RET_FAIL( |
| value_chunk_writer->write(timestamps[r], val, false))) { |
| return ret; |
| } |
| } |
| } |
| return ret; |
| } |
| |
| // TODO make sure ret is meaningful to SDK user |
| int TsFileWriter::flush() { |
| int ret = E_OK; |
| if (!start_file_done_) { |
| if (RET_FAIL(io_writer_->start_file())) { |
| return ret; |
| } |
| start_file_done_ = true; |
| } |
| |
| /* since @schemas_ used std::map which is rbtree underlying, |
| so map itself is ordered by device name. */ |
| DeviceSchemasMapIter device_iter; |
| for (device_iter = schemas_.begin(); device_iter != schemas_.end(); |
| device_iter++) { // cppcheck-suppress postfixOperator |
| if (check_chunk_group_empty(device_iter->second, |
| device_iter->second->is_aligned_)) { |
| continue; |
| } |
| bool is_aligned = device_iter->second->is_aligned_; |
| |
| if (RET_FAIL(io_writer_->start_flush_chunk_group(device_iter->first, |
| is_aligned))) { |
| } else if (RET_FAIL( |
| flush_chunk_group(device_iter->second, is_aligned))) { |
| } else if (RET_FAIL(io_writer_->end_flush_chunk_group(is_aligned))) { |
| } |
| } |
| record_count_since_last_flush_ = 0; |
| return ret; |
| } |
| |
| bool TsFileWriter::check_chunk_group_empty(MeasurementSchemaGroup* chunk_group, |
| bool is_aligned) { |
| if (chunk_group->is_aligned_ && |
| chunk_group->time_chunk_writer_ != nullptr && |
| chunk_group->time_chunk_writer_->hasData()) { |
| return false; |
| } |
| MeasurementSchemaMap& map = chunk_group->measurement_schema_map_; |
| for (MeasurementSchemaMapIter ms_iter = map.begin(); ms_iter != map.end(); |
| ms_iter++) { |
| MeasurementSchema* m_schema = ms_iter->second; |
| if (is_aligned) { |
| if (m_schema->value_chunk_writer_ != NULL && |
| m_schema->value_chunk_writer_->hasData()) { |
| return false; |
| } |
| } else { |
| if (m_schema->chunk_writer_ != NULL && |
| m_schema->chunk_writer_->hasData()) { |
| // first condition is to avoid first flush empty chunk group |
| // second condition is to avoid repeated flush |
| return false; |
| } |
| } |
| } |
| return true; |
| } |
| |
| #define FLUSH_CHUNK(writer, io_writer, name, data_type, encoding, compression, \ |
| num_pages) \ |
| if (RET_FAIL(writer->end_encode_chunk())) { \ |
| } else if (RET_FAIL(io_writer->start_flush_chunk( \ |
| writer->get_chunk_data(), name, data_type, encoding, \ |
| compression, num_pages))) { \ |
| } else if (RET_FAIL(io_writer->flush_chunk(writer->get_chunk_data()))) { \ |
| } else if (RET_FAIL(io_writer->end_flush_chunk( \ |
| writer->get_chunk_statistic()))) { \ |
| } else { \ |
| writer->reset(); \ |
| } |
| |
| int TsFileWriter::flush_chunk_group(MeasurementSchemaGroup* chunk_group, |
| bool is_aligned) { |
| int ret = E_OK; |
| MeasurementSchemaMap& map = chunk_group->measurement_schema_map_; |
| |
| if (chunk_group->is_aligned_) { |
| TimeChunkWriter*& time_chunk_writer = chunk_group->time_chunk_writer_; |
| ChunkHeader chunk_header = time_chunk_writer->get_chunk_header(); |
| FLUSH_CHUNK(time_chunk_writer, io_writer_, |
| chunk_header.measurement_name_, chunk_header.data_type_, |
| chunk_header.encoding_type_, chunk_header.compression_type_, |
| time_chunk_writer->num_of_pages()) |
| } |
| |
| for (MeasurementSchemaMapIter ms_iter = map.begin(); ms_iter != map.end(); |
| ms_iter++) { |
| MeasurementSchema* m_schema = ms_iter->second; |
| if (!chunk_group->is_aligned_ && m_schema->chunk_writer_ != nullptr) { |
| ChunkWriter*& chunk_writer = m_schema->chunk_writer_; |
| FLUSH_CHUNK(chunk_writer, io_writer_, m_schema->measurement_name_, |
| m_schema->data_type_, m_schema->encoding_, |
| m_schema->compression_type_, |
| chunk_writer->num_of_pages()) |
| } else if (m_schema->value_chunk_writer_ != nullptr) { |
| ValueChunkWriter*& value_chunk_writer = |
| m_schema->value_chunk_writer_; |
| FLUSH_CHUNK(value_chunk_writer, io_writer_, |
| m_schema->measurement_name_, m_schema->data_type_, |
| m_schema->encoding_, m_schema->compression_type_, |
| value_chunk_writer->num_of_pages()) |
| } |
| } |
| |
| return ret; |
| } |
| |
| int TsFileWriter::close() { return io_writer_->end_file(); } |
| |
| } // end namespace storage |