blob: 35520c88d8f6f3bddc964ffddb29db4aa7b2b24d [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "tsfile_writer.h"
#include <unistd.h>
#include "chunk_writer.h"
#include "common/config/config.h"
#ifdef ENABLE_THREADS
#include "common/thread_pool.h"
#endif
#include "file/restorable_tsfile_io_writer.h"
#include "file/tsfile_io_writer.h"
#include "file/write_file.h"
#include "utils/errno_define.h"
using namespace common;
namespace storage {
namespace libtsfile {
bool g_s_is_inited = false;
}
int libtsfile_init() {
libtsfile::g_s_is_inited = false;
if (libtsfile::g_s_is_inited) {
return E_OK;
}
ModStat::get_instance().init();
init_common();
libtsfile::g_s_is_inited = true;
return E_OK;
}
void libtsfile_destroy() {
#ifdef ENABLE_THREADS
delete common::g_write_thread_pool_;
common::g_write_thread_pool_ = nullptr;
#endif
ModStat::get_instance().destroy();
libtsfile::g_s_is_inited = false;
}
void set_page_max_point_count(uint32_t page_max_ponint_count) {
config_set_page_max_point_count(page_max_ponint_count);
}
void set_max_degree_of_index_node(uint32_t max_degree_of_index_node) {
config_set_max_degree_of_index_node(max_degree_of_index_node);
}
void set_strict_page_size(bool strict_page_size) {
config_set_strict_page_size(strict_page_size);
}
TsFileWriter::TsFileWriter()
: write_file_(nullptr),
io_writer_(nullptr),
schemas_(),
start_file_done_(false),
record_count_since_last_flush_(0),
record_count_for_next_mem_check_(
g_config_value_.record_count_for_next_mem_check_),
write_file_created_(false),
io_writer_owned_(true) {}
TsFileWriter::~TsFileWriter() { destroy(); }
void TsFileWriter::destroy() {
if (write_file_created_ && write_file_ != nullptr) {
delete write_file_;
write_file_ = nullptr;
}
if (io_writer_owned_ && io_writer_) {
delete io_writer_;
}
io_writer_ = nullptr;
DeviceSchemasMapIter dev_iter;
// cppcheck-suppress postfixOperator
for (dev_iter = schemas_.begin(); dev_iter != schemas_.end(); dev_iter++) {
MeasurementSchemaMap& ms_map =
dev_iter->second->measurement_schema_map_;
MeasurementSchemaMapIter ms_iter;
for (ms_iter = ms_map.begin(); ms_iter != ms_map.end(); ms_iter++) {
MeasurementSchema* ms = ms_iter->second;
if (ms != nullptr) {
if (ms->chunk_writer_ != nullptr) {
delete ms->chunk_writer_;
ms->chunk_writer_ = nullptr;
}
delete ms;
ms_iter->second = nullptr;
}
}
delete dev_iter->second;
dev_iter->second = nullptr;
}
schemas_.clear();
record_count_since_last_flush_ = 0;
}
int TsFileWriter::init(WriteFile* write_file) {
if (write_file == nullptr) {
return E_INVALID_ARG;
} else if (!write_file->file_opened()) {
return E_INVALID_ARG;
}
write_file_ = write_file;
write_file_created_ = false;
io_writer_owned_ = true;
io_writer_ = new TsFileIOWriter();
io_writer_->init(write_file_);
return E_OK;
}
// -----------------------------------------------------------------------------
// Recovery init: rebuild schemas_ from recovered chunk group metas (aligned
// with Java). Use each CGM's actual device_id from file as key so tree and
// table model both get correct lookups. Table model can still lazy-create from
// table_schema_map_ in do_check_schema_table when a new device appears.
// All new MeasurementSchemaGroup/MeasurementSchema are freed in destroy().
// -----------------------------------------------------------------------------
int TsFileWriter::init(RestorableTsFileIOWriter* rw) {
if (rw == nullptr || !rw->can_write()) {
return E_INVALID_ARG;
}
write_file_ = rw->get_write_file();
write_file_created_ = false;
io_writer_owned_ = false;
io_writer_ = rw;
const std::vector<ChunkGroupMeta*>& recovered =
rw->get_recovered_chunk_group_metas();
for (ChunkGroupMeta* cgm : recovered) {
if (cgm == nullptr || cgm->device_id_ == nullptr) {
continue;
}
std::shared_ptr<IDeviceID> device_id = cgm->device_id_;
// Find existing group for same device (same device may have multiple
// CGMs from multiple flushes).
DeviceSchemasMapIter it = schemas_.begin();
for (; it != schemas_.end(); ++it) {
if (it->first != nullptr && *it->first == *device_id) {
break;
}
}
MeasurementSchemaGroup* group = nullptr;
if (it != schemas_.end()) {
group = it->second;
} else {
group = new MeasurementSchemaGroup;
group->is_aligned_ =
rw->is_device_aligned(device_id->get_table_name());
schemas_.insert(std::make_pair(device_id, group));
}
// Add measurement schemas from this CGM (skip time column: empty name).
for (auto iter = cgm->chunk_meta_list_.begin();
iter != cgm->chunk_meta_list_.end(); iter++) {
ChunkMeta* cm = iter.get();
if (cm == nullptr) {
continue;
}
std::string mname = cm->measurement_name_.to_std_string();
if (mname.empty()) {
continue;
}
if (group->measurement_schema_map_.find(mname) !=
group->measurement_schema_map_.end()) {
continue;
}
MeasurementSchema* ms = new MeasurementSchema(
mname, cm->data_type_, cm->encoding_, cm->compression_type_);
group->measurement_schema_map_.insert(std::make_pair(mname, ms));
}
}
start_file_done_ = true;
return E_OK;
}
void TsFileWriter::set_generate_table_schema(bool generate_table_schema) {
io_writer_->set_generate_table_schema(generate_table_schema);
}
int TsFileWriter::register_table(
const std::shared_ptr<TableSchema>& table_schema) {
if (!table_schema) return E_INVALID_ARG;
// Empty table name or column name is not allowed.
if (table_schema->get_table_name().empty()) {
return E_INVALID_ARG;
}
for (const auto& name : table_schema->get_measurement_names()) {
if (name.empty()) {
return E_INVALID_ARG;
}
}
// Because it is not possible to return an error code for duplicate name
// checks during the construction phase of TabletSchema, the duplicate name
// check has been moved to the table registration stage.
// TODO: Add Debug INFO if ErrorCode is not enough to describe problems.
if (table_schema->get_column_pos_index_num() !=
table_schema->get_measurement_names().size()) {
return E_INVALID_ARG;
}
if (io_writer_->get_schema()->table_schema_map_.find(
table_schema->get_table_name()) !=
io_writer_->get_schema()->table_schema_map_.end()) {
return E_ALREADY_EXIST;
}
io_writer_->get_schema()->register_table_schema(table_schema);
return E_OK;
}
bool check_file_exist(const std::string& file_path) {
return access(file_path.c_str(), F_OK) == 0;
}
int TsFileWriter::open(const std::string& file_path, int flags, mode_t mode) {
if (check_file_exist(file_path)) {
return E_ALREADY_EXIST;
}
write_file_ = new WriteFile;
write_file_created_ = true;
io_writer_ = new TsFileIOWriter;
int ret = E_OK;
if (RET_FAIL(write_file_->create(file_path, flags, mode))) {
} else {
io_writer_->init(write_file_);
}
return ret;
}
int TsFileWriter::open(const std::string& file_path) {
return open(file_path, O_RDWR | O_CREAT | O_TRUNC, 0666);
}
int TsFileWriter::register_aligned_timeseries(
const std::string& device_id, const MeasurementSchema& measurement_schema) {
MeasurementSchema* ms = new MeasurementSchema(
measurement_schema.measurement_name_, measurement_schema.data_type_,
measurement_schema.encoding_, measurement_schema.compression_type_);
return register_timeseries(device_id, ms, true);
}
int TsFileWriter::register_aligned_timeseries(
const std::string& device_id,
const std::vector<MeasurementSchema*>& measurement_schemas) {
int ret = E_OK;
for (auto it : measurement_schemas) {
ret = register_timeseries(device_id, it, true);
if (ret != E_OK) {
return ret;
}
}
return ret;
}
int TsFileWriter::register_timeseries(
const std::string& device_id, const MeasurementSchema& measurement_schema) {
MeasurementSchema* ms = new MeasurementSchema(
measurement_schema.measurement_name_, measurement_schema.data_type_,
measurement_schema.encoding_, measurement_schema.compression_type_);
return register_timeseries(device_id, ms, false);
}
int TsFileWriter::register_timeseries(const std::string& device_path,
MeasurementSchema* measurement_schema,
bool is_aligned) {
std::shared_ptr<IDeviceID> device_id =
std::make_shared<StringArrayDeviceID>(device_path);
DeviceSchemasMapIter device_iter = schemas_.find(device_id);
if (device_iter != schemas_.end()) {
MeasurementSchemaMap& msm =
device_iter->second->measurement_schema_map_;
MeasurementSchemaMapInsertResult ins_res = msm.insert(std::make_pair(
measurement_schema->measurement_name_, measurement_schema));
if (UNLIKELY(!ins_res.second)) {
return E_ALREADY_EXIST;
}
} else {
MeasurementSchemaGroup* ms_group = new MeasurementSchemaGroup;
ms_group->is_aligned_ = is_aligned;
ms_group->measurement_schema_map_.insert(std::make_pair(
measurement_schema->measurement_name_, measurement_schema));
schemas_.insert(std::make_pair(device_id, ms_group));
}
return E_OK;
}
int TsFileWriter::register_timeseries(
const std::string& device_id,
const std::vector<MeasurementSchema*>& measurement_schema_vec) {
int ret = E_OK;
auto it = measurement_schema_vec.begin();
for (; it != measurement_schema_vec.end();
it++) { // cppcheck-suppress postfixOperator
ret = register_timeseries(device_id, *it);
if (ret != E_OK) {
return ret;
}
}
return ret;
}
struct MeasurementSchemaMapNamesGetter {
public:
explicit MeasurementSchemaMapNamesGetter(
const MeasurementSchemaMap& measurement_schema_map)
: measurement_schema_map_(
const_cast<MeasurementSchemaMap&>(measurement_schema_map)) {
measurement_name_idx_ = measurement_schema_map_.begin();
}
FORCE_INLINE uint32_t get_count() const {
return measurement_schema_map_.size();
}
FORCE_INLINE const std::string& next() {
ASSERT(measurement_name_idx_ != measurement_schema_map_.end());
std::string& ret = measurement_name_idx_->second->measurement_name_;
measurement_name_idx_++;
return ret;
}
private:
MeasurementSchemaMap& measurement_schema_map_;
MeasurementSchemaMap::iterator measurement_name_idx_;
};
struct MeasurementNamesFromRecord {
public:
explicit MeasurementNamesFromRecord(const TsRecord& record)
: record_(record), measurement_name_idx_(0) {}
FORCE_INLINE uint32_t get_count() const { return record_.points_.size(); }
FORCE_INLINE const std::string& next() {
return this->at(measurement_name_idx_++);
}
private:
const TsRecord& record_;
size_t measurement_name_idx_;
FORCE_INLINE const std::string& at(size_t idx) const {
ASSERT(idx < record_.points_.size());
return record_.points_[idx].measurement_name_;
}
};
struct MeasurementNamesFromTablet {
explicit MeasurementNamesFromTablet(const Tablet& tablet)
: tablet_(tablet), measurement_name_idx_(0) {}
FORCE_INLINE uint32_t get_count() const {
return tablet_.schema_vec_->size();
}
FORCE_INLINE const std::string& next() {
return this->at(measurement_name_idx_++);
}
private:
const Tablet& tablet_;
size_t measurement_name_idx_;
FORCE_INLINE const std::string& at(size_t idx) const {
ASSERT(idx < tablet_.schema_vec_->size());
return tablet_.schema_vec_->at(idx).measurement_name_;
}
};
int TsFileWriter::do_check_and_prepare_tablet(Tablet& tablet) {
if (tablet.column_categories_.empty()) {
auto& schema_map = io_writer_->get_schema()->table_schema_map_;
auto table_schema_it = schema_map.find(tablet.get_table_name());
auto table_schema = table_schema_it->second;
uint32_t column_cnt = tablet.get_column_count();
for (uint32_t i = 0; i < column_cnt; i++) {
auto& col_name = tablet.get_column_name(i);
int col_index = table_schema->find_column_index(col_name);
if (col_index == -1) {
return E_COLUMN_NOT_EXIST;
}
if (table_schema->get_data_types()[col_index] !=
tablet.schema_vec_->at(i).data_type_) {
return E_TYPE_NOT_MATCH;
}
const common::ColumnCategory column_category =
table_schema->get_column_categories()[col_index];
tablet.column_categories_.emplace_back(column_category);
if (column_category == ColumnCategory::TAG) {
tablet.id_column_indexes_.push_back(i);
}
}
}
return common::E_OK;
}
std::shared_ptr<TableSchema> TsFileWriter::get_table_schema(
const std::string& table_name) const {
auto& schema_map = io_writer_->get_schema()->table_schema_map_;
auto it = schema_map.find(table_name);
if (it == schema_map.end()) return nullptr;
return it->second;
}
template <typename MeasurementNamesGetter>
int TsFileWriter::do_check_schema(
std::shared_ptr<IDeviceID> device_id,
MeasurementNamesGetter& measurement_names,
SimpleVector<ChunkWriter*>& chunk_writers,
SimpleVector<common::TSDataType>& data_types) {
int ret = E_OK;
DeviceSchemasMapIter dev_it = schemas_.find(device_id);
MeasurementSchemaGroup* device_schema = nullptr;
if (UNLIKELY(dev_it == schemas_.end()) ||
IS_NULL(device_schema = dev_it->second)) {
return E_DEVICE_NOT_EXIST;
}
MeasurementSchemaMap& msm = device_schema->measurement_schema_map_;
uint32_t measurement_count = measurement_names.get_count();
// chunk_writers.reserve(measurement_count);
for (uint32_t i = 0; i < measurement_count; i++) {
auto ms_iter = msm.find(measurement_names.next());
if (UNLIKELY(ms_iter == msm.end())) {
chunk_writers.push_back(NULL);
data_types.push_back(common::NULL_TYPE);
} else {
// In Java we will check data_type. But in C++, no check here.
// Because checks are performed at the chunk layer and page layer
MeasurementSchema* ms = ms_iter->second;
if (IS_NULL(ms->chunk_writer_)) {
ms->chunk_writer_ = new ChunkWriter;
ret = ms->chunk_writer_->init(ms->measurement_name_,
ms->data_type_, ms->encoding_,
ms->compression_type_);
if (IS_SUCC(ret)) {
chunk_writers.push_back(ms->chunk_writer_);
} else {
for (size_t chunk_writer_idx = 0;
chunk_writer_idx < chunk_writers.size();
chunk_writer_idx++) {
if (!chunk_writers[chunk_writer_idx]) {
delete chunk_writers[chunk_writer_idx];
}
}
ret = common::E_INVALID_ARG;
return ret;
}
} else {
chunk_writers.push_back(ms->chunk_writer_);
}
data_types.push_back(ms->data_type_);
}
}
return ret;
}
template <typename MeasurementNamesGetter>
int TsFileWriter::do_check_schema_aligned(
std::shared_ptr<IDeviceID> device_id,
MeasurementNamesGetter& measurement_names,
storage::TimeChunkWriter*& time_chunk_writer,
common::SimpleVector<storage::ValueChunkWriter*>& value_chunk_writers,
SimpleVector<common::TSDataType>& data_types) {
int ret = E_OK;
auto dev_it = schemas_.find(device_id);
MeasurementSchemaGroup* device_schema = NULL;
if (UNLIKELY(dev_it == schemas_.end()) ||
IS_NULL(device_schema = dev_it->second)) {
return E_DEVICE_NOT_EXIST;
}
if (IS_NULL(device_schema->time_chunk_writer_)) {
device_schema->time_chunk_writer_ = new TimeChunkWriter();
device_schema->time_chunk_writer_->init(
"", g_config_value_.time_encoding_type_,
g_config_value_.time_compress_type_);
}
time_chunk_writer = device_schema->time_chunk_writer_;
MeasurementSchemaMap& msm = device_schema->measurement_schema_map_;
uint32_t measurement_count = measurement_names.get_count();
for (uint32_t i = 0; i < measurement_count; i++) {
auto ms_iter = msm.find(measurement_names.next());
if (UNLIKELY(ms_iter == msm.end())) {
value_chunk_writers.push_back(NULL);
data_types.push_back(common::NULL_TYPE);
} else {
// Here we may check data_type against ms_iter. But in Java
// libtsfile, no check here.
MeasurementSchema* ms = ms_iter->second;
if (IS_NULL(ms->value_chunk_writer_)) {
ms->value_chunk_writer_ = new ValueChunkWriter;
ret = ms->value_chunk_writer_->init(
ms->measurement_name_, ms->data_type_, ms->encoding_,
ms->compression_type_);
if (IS_SUCC(ret)) {
value_chunk_writers.push_back(ms->value_chunk_writer_);
} else {
value_chunk_writers.push_back(NULL);
for (size_t chunk_writer_idx = 0;
chunk_writer_idx < value_chunk_writers.size();
chunk_writer_idx++) {
if (!value_chunk_writers[chunk_writer_idx]) {
delete value_chunk_writers[chunk_writer_idx];
}
}
ret = common::E_INVALID_ARG;
return ret;
}
} else {
value_chunk_writers.push_back(ms->value_chunk_writer_);
}
data_types.push_back(ms->data_type_);
}
}
return ret;
}
int TsFileWriter::do_check_schema_table(
std::shared_ptr<IDeviceID> device_id, Tablet& tablet,
storage::TimeChunkWriter*& time_chunk_writer,
common::SimpleVector<storage::ValueChunkWriter*>& value_chunk_writers) {
int ret = E_OK;
auto dev_it = schemas_.find(device_id);
MeasurementSchemaGroup* device_schema = NULL;
auto& schema_map = io_writer_->get_schema()->table_schema_map_;
auto table_schema_it = schema_map.find(tablet.get_table_name());
if (UNLIKELY(table_schema_it == schema_map.end())) {
return E_TABLE_NOT_EXIST;
}
auto table_schema = table_schema_it->second;
if (UNLIKELY(dev_it == schemas_.end()) ||
IS_NULL(device_schema = dev_it->second)) {
device_schema = new MeasurementSchemaGroup;
device_schema->is_aligned_ = true;
device_schema->time_chunk_writer_ = new TimeChunkWriter();
device_schema->time_chunk_writer_->init(
"", g_config_value_.time_encoding_type_,
g_config_value_.time_compress_type_);
for (uint32_t i = 0; i < table_schema->get_measurement_schemas().size();
++i) {
if (table_schema->get_column_categories().at(i) ==
common::ColumnCategory::FIELD) {
auto table_column_schema =
table_schema->get_measurement_schemas().at(i);
auto device_column_schema = new MeasurementSchema(
table_column_schema->measurement_name_,
table_column_schema->data_type_,
table_column_schema->encoding_,
table_column_schema->compression_type_);
if (!table_column_schema->props_.empty()) {
device_column_schema->props_ = table_column_schema->props_;
}
device_schema->measurement_schema_map_
[device_column_schema->measurement_name_] =
device_column_schema;
}
}
schemas_[device_id] = device_schema;
}
// After recovery, device_schema may exist but time_chunk_writer_ not yet
// created
if (IS_NULL(device_schema->time_chunk_writer_)) {
device_schema->time_chunk_writer_ = new TimeChunkWriter();
device_schema->time_chunk_writer_->init(
"", g_config_value_.time_encoding_type_,
g_config_value_.time_compress_type_);
}
uint32_t column_cnt = tablet.get_column_count();
time_chunk_writer = device_schema->time_chunk_writer_;
MeasurementSchemaMap& msm = device_schema->measurement_schema_map_;
for (uint32_t i = 0; i < column_cnt; i++) {
if (tablet.column_categories_.at(i) != common::ColumnCategory::FIELD) {
continue;
}
auto ms_iter = msm.find(tablet.get_column_name(i));
if (UNLIKELY(ms_iter == msm.end())) {
value_chunk_writers.push_back(NULL);
} else {
// Here we may check data_type against ms_iter. But in Java
// libtsfile, no check here.
MeasurementSchema* ms = ms_iter->second;
if (IS_NULL(ms->value_chunk_writer_)) {
ms->value_chunk_writer_ = new ValueChunkWriter;
ret = ms->value_chunk_writer_->init(
ms->measurement_name_, ms->data_type_, ms->encoding_,
ms->compression_type_);
if (IS_SUCC(ret)) {
value_chunk_writers.push_back(ms->value_chunk_writer_);
} else {
value_chunk_writers.push_back(NULL);
for (size_t chunk_writer_idx = 0;
chunk_writer_idx < value_chunk_writers.size();
chunk_writer_idx++) {
if (!value_chunk_writers[chunk_writer_idx]) {
delete value_chunk_writers[chunk_writer_idx];
}
}
ret = common::E_INVALID_ARG;
return ret;
}
} else {
value_chunk_writers.push_back(ms->value_chunk_writer_);
}
}
}
return ret;
}
int64_t TsFileWriter::calculate_mem_size_for_all_group() {
int64_t mem_total_size = 0;
DeviceSchemasMapIter device_iter;
for (device_iter = schemas_.begin(); device_iter != schemas_.end();
device_iter++) {
MeasurementSchemaGroup* chunk_group = device_iter->second;
MeasurementSchemaMap& map = chunk_group->measurement_schema_map_;
for (MeasurementSchemaMapIter ms_iter = map.begin();
ms_iter != map.end(); ms_iter++) {
MeasurementSchema* m_schema = ms_iter->second;
if (!chunk_group->is_aligned_) {
ChunkWriter*& chunk_writer = m_schema->chunk_writer_;
if (chunk_writer != nullptr) {
mem_total_size +=
chunk_writer->estimate_max_series_mem_size();
}
} else {
ValueChunkWriter*& chunk_writer = m_schema->value_chunk_writer_;
if (chunk_writer != nullptr) {
mem_total_size +=
chunk_writer->estimate_max_series_mem_size();
}
}
}
if (chunk_group->is_aligned_) {
TimeChunkWriter*& time_chunk_writer =
chunk_group->time_chunk_writer_;
if (time_chunk_writer != nullptr) {
mem_total_size +=
time_chunk_writer->estimate_max_series_mem_size();
}
}
}
return mem_total_size;
}
/**
* check occupied memory size, if it exceeds the chunkGroupSize threshold, flush
* them to given OutputStream.
*/
int TsFileWriter::check_memory_size_and_may_flush_chunks() {
int ret = E_OK;
if (record_count_since_last_flush_ >= record_count_for_next_mem_check_) {
int64_t mem_size = calculate_mem_size_for_all_group();
record_count_for_next_mem_check_ =
record_count_since_last_flush_ *
common::g_config_value_.chunk_group_size_threshold_ / mem_size;
if (mem_size > common::g_config_value_.chunk_group_size_threshold_) {
ret = flush();
}
}
return ret;
}
int TsFileWriter::write_record(const TsRecord& record) {
int ret = E_OK;
// std::vector<ChunkWriter*> chunk_writers;
SimpleVector<ChunkWriter*> chunk_writers;
SimpleVector<common::TSDataType> data_types;
MeasurementNamesFromRecord mnames_getter(record);
if (RET_FAIL(do_check_schema(
std::make_shared<StringArrayDeviceID>(record.device_id_),
mnames_getter, chunk_writers, data_types))) {
return ret;
}
ASSERT(chunk_writers.size() == record.points_.size());
for (uint32_t c = 0; c < chunk_writers.size(); c++) {
ChunkWriter* chunk_writer = chunk_writers[c];
if (IS_NULL(chunk_writer)) {
continue;
}
// ignore point writer failure
write_point(chunk_writer, record.timestamp_, data_types[c],
record.points_[c]);
}
record_count_since_last_flush_++;
ret = check_memory_size_and_may_flush_chunks();
return ret;
}
int TsFileWriter::write_record_aligned(const TsRecord& record) {
int ret = E_OK;
SimpleVector<ValueChunkWriter*> value_chunk_writers;
SimpleVector<common::TSDataType> data_types;
TimeChunkWriter* time_chunk_writer;
MeasurementNamesFromRecord mnames_getter(record);
if (RET_FAIL(do_check_schema_aligned(
std::make_shared<StringArrayDeviceID>(record.device_id_),
mnames_getter, time_chunk_writer, value_chunk_writers,
data_types))) {
return ret;
}
if (value_chunk_writers.size() != record.points_.size()) {
return E_INVALID_ARG;
}
int32_t time_pages_before = time_chunk_writer->num_of_pages();
std::vector<int32_t> value_pages_before(value_chunk_writers.size(), 0);
for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
ValueChunkWriter* value_chunk_writer = value_chunk_writers[c];
if (!IS_NULL(value_chunk_writer)) {
value_pages_before[c] = value_chunk_writer->num_of_pages();
}
}
time_chunk_writer->write(record.timestamp_);
for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
ValueChunkWriter* value_chunk_writer = value_chunk_writers[c];
if (IS_NULL(value_chunk_writer)) {
continue;
}
write_point_aligned(value_chunk_writer, record.timestamp_,
data_types[c], record.points_[c]);
}
if (RET_FAIL(maybe_seal_aligned_pages_together(
time_chunk_writer, value_chunk_writers, time_pages_before,
value_pages_before))) {
return ret;
}
return ret;
}
int TsFileWriter::write_point(ChunkWriter* chunk_writer, int64_t timestamp,
common::TSDataType data_type,
const DataPoint& point) {
switch (data_type) {
case common::BOOLEAN:
return chunk_writer->write(timestamp, point.u_.bool_val_);
case common::DATE:
case common::INT32:
return chunk_writer->write(timestamp, point.u_.i32_val_);
case common::TIMESTAMP:
case common::INT64:
return chunk_writer->write(timestamp, point.u_.i64_val_);
case common::FLOAT:
return chunk_writer->write(timestamp, point.u_.float_val_);
case common::DOUBLE:
return chunk_writer->write(timestamp, point.u_.double_val_);
case common::BLOB:
case common::TEXT:
case common::STRING:
return chunk_writer->write(timestamp, point.text_val_);
default:
return E_INVALID_DATA_POINT;
}
}
int TsFileWriter::write_point_aligned(ValueChunkWriter* value_chunk_writer,
int64_t timestamp,
common::TSDataType data_type,
const DataPoint& point) {
bool isnull = point.isnull;
switch (data_type) {
case common::BOOLEAN:
return value_chunk_writer->write(timestamp, point.u_.bool_val_,
isnull);
case common::INT32:
case common::DATE:
return value_chunk_writer->write(timestamp, point.u_.i32_val_,
isnull);
case common::TIMESTAMP:
case common::INT64:
return value_chunk_writer->write(timestamp, point.u_.i64_val_,
isnull);
case common::FLOAT:
return value_chunk_writer->write(timestamp, point.u_.float_val_,
isnull);
case common::DOUBLE:
return value_chunk_writer->write(timestamp, point.u_.double_val_,
isnull);
case common::BLOB:
case common::TEXT:
case common::STRING:
return value_chunk_writer->write(timestamp, point.text_val_,
isnull);
default:
return E_INVALID_DATA_POINT;
}
}
int TsFileWriter::maybe_seal_aligned_pages_together(
TimeChunkWriter* time_chunk_writer,
common::SimpleVector<ValueChunkWriter*>& value_chunk_writers,
int32_t time_pages_before, const std::vector<int32_t>& value_pages_before) {
bool should_seal_all =
time_chunk_writer->num_of_pages() > time_pages_before;
for (uint32_t c = 0; c < value_chunk_writers.size() && !should_seal_all;
c++) {
ValueChunkWriter* value_chunk_writer = value_chunk_writers[c];
if (!IS_NULL(value_chunk_writer) &&
value_chunk_writer->num_of_pages() > value_pages_before[c]) {
should_seal_all = true;
break;
}
}
if (!should_seal_all) {
return E_OK;
}
int ret = E_OK;
if (time_chunk_writer->has_current_page_data() &&
RET_FAIL(time_chunk_writer->seal_current_page())) {
return ret;
}
for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
ValueChunkWriter* value_chunk_writer = value_chunk_writers[c];
if (!IS_NULL(value_chunk_writer) &&
value_chunk_writer->has_current_page_data() &&
RET_FAIL(value_chunk_writer->seal_current_page())) {
return ret;
}
}
return ret;
}
int TsFileWriter::write_tablet_aligned(const Tablet& tablet) {
int ret = E_OK;
SimpleVector<ValueChunkWriter*> value_chunk_writers;
TimeChunkWriter* time_chunk_writer = nullptr;
SimpleVector<common::TSDataType> data_types;
MeasurementNamesFromTablet mnames_getter(tablet);
if (RET_FAIL(do_check_schema_aligned(
std::make_shared<StringArrayDeviceID>(tablet.insert_target_name_),
mnames_getter, time_chunk_writer, value_chunk_writers,
data_types))) {
return ret;
}
const uint32_t total_rows = tablet.get_cur_row_size();
const bool strict_page_size = common::g_config_value_.strict_page_size_;
// Decide whether we have string/blob/text columns.
bool has_varlen_column = false;
for (uint32_t i = 0; i < data_types.size(); i++) {
if (data_types[i] == common::STRING || data_types[i] == common::TEXT ||
data_types[i] == common::BLOB) {
has_varlen_column = true;
break;
}
}
// Keep writers' seal-check behavior consistent across calls.
time_chunk_writer->set_enable_page_seal_if_full(strict_page_size);
for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
if (!IS_NULL(value_chunk_writers[c])) {
value_chunk_writers[c]->set_enable_page_seal_if_full(
strict_page_size);
}
}
if (strict_page_size) {
// Strict mode: keep the original row-based insertion to ensure aligned
// pages seal together when either side becomes full.
for (uint32_t row = 0; row < total_rows; row++) {
int32_t time_pages_before = time_chunk_writer->num_of_pages();
std::vector<int32_t> value_pages_before(value_chunk_writers.size(),
0);
for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
ValueChunkWriter* value_chunk_writer = value_chunk_writers[c];
if (!IS_NULL(value_chunk_writer)) {
value_pages_before[c] = value_chunk_writer->num_of_pages();
}
}
if (RET_FAIL(time_chunk_writer->write(tablet.timestamps_[row]))) {
return ret;
}
ASSERT(value_chunk_writers.size() == tablet.get_column_count());
for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
ValueChunkWriter* value_chunk_writer = value_chunk_writers[c];
if (IS_NULL(value_chunk_writer)) {
continue;
}
if (RET_FAIL(value_write_column(value_chunk_writer, tablet, c,
row, row + 1))) {
return ret;
}
}
if (RET_FAIL(maybe_seal_aligned_pages_together(
time_chunk_writer, value_chunk_writers, time_pages_before,
value_pages_before))) {
return ret;
}
}
return ret;
}
// Non-strict mode: switch to column-based insertion.
if (!has_varlen_column) {
// Optimization: when there is no string/blob/text column, we only need
// to split by point-number so that each split will trigger a page
// seal (and avoid the per-row page-size check).
const uint32_t points_per_page =
common::g_config_value_.page_writer_max_point_num_;
// Disable auto page sealing. We will seal pages at split boundaries.
time_chunk_writer->set_enable_page_seal_if_full(false);
for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
if (!IS_NULL(value_chunk_writers[c])) {
value_chunk_writers[c]->set_enable_page_seal_if_full(false);
}
}
// Determine how many points we need to fill the current unsealed time
// page (it may already contain data from previous tablets).
uint32_t time_cur_points = time_chunk_writer->get_point_numer();
if (time_cur_points >= points_per_page &&
time_chunk_writer->has_current_page_data()) {
// Close the already-full page together with all aligned value
// pages.
if (RET_FAIL(time_chunk_writer->seal_current_page())) {
return ret;
}
for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
ValueChunkWriter* value_chunk_writer = value_chunk_writers[c];
if (!IS_NULL(value_chunk_writer) &&
value_chunk_writer->has_current_page_data()) {
if (RET_FAIL(value_chunk_writer->seal_current_page())) {
return ret;
}
}
}
time_cur_points = 0;
}
const uint32_t first_seg_len =
(time_cur_points > 0 && time_cur_points < points_per_page)
? (points_per_page - time_cur_points)
: points_per_page;
// 1) Write time in segments and seal all full segments (except the
// last remaining segment).
uint32_t seg_start = 0;
uint32_t seg_len = first_seg_len;
while (seg_start < total_rows) {
const uint32_t seg_end = std::min(seg_start + seg_len, total_rows);
if (RET_FAIL(time_write_column(time_chunk_writer, tablet, seg_start,
seg_end))) {
return ret;
}
seg_start = seg_end;
if (seg_start < total_rows) {
if (RET_FAIL(time_chunk_writer->seal_current_page())) {
return ret;
}
}
seg_len = points_per_page;
}
// 2) Write each value column in the same segments.
ASSERT(value_chunk_writers.size() == tablet.get_column_count());
for (uint32_t col = 0; col < value_chunk_writers.size(); col++) {
ValueChunkWriter* value_chunk_writer = value_chunk_writers[col];
if (IS_NULL(value_chunk_writer)) {
continue;
}
seg_start = 0;
seg_len = first_seg_len;
while (seg_start < total_rows) {
const uint32_t seg_end =
std::min(seg_start + seg_len, total_rows);
if (RET_FAIL(value_write_column(value_chunk_writer, tablet, col,
seg_start, seg_end))) {
return ret;
}
seg_start = seg_end;
if (seg_start < total_rows) {
if (value_chunk_writer->has_current_page_data() &&
RET_FAIL(value_chunk_writer->seal_current_page())) {
return ret;
}
}
seg_len = points_per_page;
}
}
return ret;
}
// General non-strict (may have varlen STRING/TEXT/BLOB columns):
// time auto-seals to provide aligned page boundaries; value writers
// skip auto page sealing and are sealed manually at time boundaries.
// Attention: since value-side auto-seal is disabled, if a varlen value
// page hits the memory threshold earlier, it may not seal immediately
// and instead will be sealed later at the recorded time-page boundaries
// (this may sacrifice the strict page size limit for performance).
time_chunk_writer->set_enable_page_seal_if_full(true);
for (uint32_t c = 0; c < value_chunk_writers.size(); c++) {
if (!IS_NULL(value_chunk_writers[c])) {
value_chunk_writers[c]->set_enable_page_seal_if_full(false);
}
}
std::vector<uint32_t> time_page_row_ends;
const uint32_t page_max_points = std::max<uint32_t>(
1, common::g_config_value_.page_writer_max_point_num_);
time_page_row_ends.reserve(total_rows / page_max_points + 1);
// Write time and record where a time page is sealed.
for (uint32_t row = 0; row < total_rows; row++) {
const int32_t pages_before = time_chunk_writer->num_of_pages();
if (RET_FAIL(time_chunk_writer->write(tablet.timestamps_[row]))) {
return ret;
}
const int32_t pages_after = time_chunk_writer->num_of_pages();
if (pages_after > pages_before) {
const uint32_t boundary_end = row + 1;
if (time_page_row_ends.empty() ||
time_page_row_ends.back() != boundary_end) {
time_page_row_ends.push_back(boundary_end);
}
}
}
// Write values column-by-column and seal at recorded boundaries.
ASSERT(value_chunk_writers.size() == tablet.get_column_count());
for (uint32_t col = 0; col < value_chunk_writers.size(); col++) {
ValueChunkWriter* value_chunk_writer = value_chunk_writers[col];
if (IS_NULL(value_chunk_writer)) {
continue;
}
uint32_t seg_start = 0;
for (uint32_t boundary_end : time_page_row_ends) {
if (boundary_end <= seg_start) {
continue;
}
if (RET_FAIL(value_write_column(value_chunk_writer, tablet, col,
seg_start, boundary_end))) {
return ret;
}
if (value_chunk_writer->has_current_page_data() &&
RET_FAIL(value_chunk_writer->seal_current_page())) {
return ret;
}
seg_start = boundary_end;
}
if (seg_start < total_rows) {
if (RET_FAIL(value_write_column(value_chunk_writer, tablet, col,
seg_start, total_rows))) {
return ret;
}
}
}
return ret;
}
int TsFileWriter::write_tablet(const Tablet& tablet) {
int ret = E_OK;
SimpleVector<ChunkWriter*> chunk_writers;
SimpleVector<common::TSDataType> data_types;
MeasurementNamesFromTablet mnames_getter(tablet);
if (RET_FAIL(do_check_schema(
std::make_shared<StringArrayDeviceID>(tablet.insert_target_name_),
mnames_getter, chunk_writers, data_types))) {
return ret;
}
ASSERT(chunk_writers.size() == tablet.get_column_count());
for (uint32_t c = 0; c < chunk_writers.size(); c++) {
ChunkWriter* chunk_writer = chunk_writers[c];
if (IS_NULL(chunk_writer)) {
continue;
}
if (RET_FAIL(write_column(chunk_writer, tablet, c))) {
return ret;
}
}
record_count_since_last_flush_ += tablet.max_row_num_;
ret = check_memory_size_and_may_flush_chunks();
return ret;
}
int TsFileWriter::write_tree(const Tablet& tablet) {
auto device_id =
std::make_shared<StringArrayDeviceID>(tablet.insert_target_name_);
if (schemas_.find(device_id) != schemas_.end()) {
auto schema = schemas_[device_id];
if (schema->is_aligned_) {
return this->write_tablet_aligned(tablet);
}
return this->write_tablet(tablet);
}
return E_NOT_EXIST;
}
int TsFileWriter::write_tree(const TsRecord& record) {
auto device_id = std::make_shared<StringArrayDeviceID>(record.device_id_);
if (schemas_.find(device_id) != schemas_.end()) {
auto schema = schemas_[device_id];
if (schema->is_aligned_) {
return this->write_record_aligned(record);
}
return this->write_record(record);
}
return E_NOT_EXIST;
}
int TsFileWriter::write_table(Tablet& tablet) {
int ret = E_OK;
if (io_writer_->get_schema()->table_schema_map_.find(
tablet.insert_target_name_) ==
io_writer_->get_schema()->table_schema_map_.end()) {
ret = E_TABLE_NOT_EXIST;
return ret;
}
if (RET_FAIL(do_check_and_prepare_tablet(tablet))) {
return ret;
}
auto device_id_end_index_pairs = split_tablet_by_device(tablet);
int start_idx = 0;
for (auto& device_id_end_index_pair : device_id_end_index_pairs) {
auto device_id = device_id_end_index_pair.first;
int end_idx = device_id_end_index_pair.second;
if (end_idx == 0) continue;
SimpleVector<ValueChunkWriter*> value_chunk_writers;
TimeChunkWriter* time_chunk_writer = nullptr;
if (RET_FAIL(do_check_schema_table(device_id, tablet, time_chunk_writer,
value_chunk_writers))) {
return ret;
}
std::vector<uint32_t> field_columns;
field_columns.reserve(tablet.get_column_count());
for (uint32_t col = 0; col < tablet.get_column_count(); ++col) {
if (tablet.column_categories_[col] ==
common::ColumnCategory::FIELD) {
field_columns.push_back(col);
}
}
ASSERT(field_columns.size() == value_chunk_writers.size());
// Precompute page boundaries from point counts — no serial write
// needed. The first segment may be shorter if the time page already
// holds data from a previous write_table call.
const uint32_t page_max_points = std::max<uint32_t>(
1, common::g_config_value_.page_writer_max_point_num_);
const uint32_t si = static_cast<uint32_t>(start_idx);
const uint32_t ei = static_cast<uint32_t>(end_idx);
// If the current unsealed page is already at or past capacity (from
// a previous write_table call), seal it before starting new segments.
uint32_t time_cur_points = time_chunk_writer->get_point_numer();
if (time_cur_points >= page_max_points) {
if (time_chunk_writer->has_current_page_data()) {
if (RET_FAIL(time_chunk_writer->seal_current_page())) {
return ret;
}
}
for (uint32_t k = 0; k < value_chunk_writers.size(); k++) {
if (!IS_NULL(value_chunk_writers[k]) &&
value_chunk_writers[k]->has_current_page_data()) {
if (RET_FAIL(value_chunk_writers[k]->seal_current_page())) {
return ret;
}
}
}
time_cur_points = 0;
}
const uint32_t first_seg_cap =
(time_cur_points > 0 && time_cur_points < page_max_points)
? (page_max_points - time_cur_points)
: page_max_points;
std::vector<uint32_t> page_boundaries; // row indices where a page
// should seal
{
uint32_t pos = si;
uint32_t seg_cap = first_seg_cap;
while (pos < ei) {
uint32_t seg_end = std::min(pos + seg_cap, ei);
if (seg_end < ei) {
page_boundaries.push_back(seg_end);
}
pos = seg_end;
seg_cap = page_max_points;
}
}
// We control page sealing explicitly at precomputed boundaries, so
// auto-seal must be disabled during segmented writes — otherwise a
// segment of exactly page_max_points would trigger auto-seal AND
// our explicit seal, double-sealing (sealing an empty page → crash).
// Note: with auto-seal off, the memory-based threshold
// (page_writer_max_memory_bytes_) is not enforced within a segment.
// For varlen columns (STRING/TEXT/BLOB), individual pages may exceed
// the memory limit. Each segment is still bounded by
// page_max_points rows, keeping pages within a reasonable size.
auto write_time_in_segments = [this, &tablet, &page_boundaries, si,
ei](TimeChunkWriter* tcw) -> int {
int r = E_OK;
tcw->set_enable_page_seal_if_full(false);
uint32_t seg_start = si;
for (uint32_t boundary : page_boundaries) {
if ((r = time_write_column(tcw, tablet, seg_start, boundary)) !=
E_OK)
return r;
if ((r = tcw->seal_current_page()) != E_OK) return r;
seg_start = boundary;
}
if (seg_start < ei) {
r = time_write_column(tcw, tablet, seg_start, ei);
}
tcw->set_enable_page_seal_if_full(true);
return r;
};
auto write_value_in_segments = [this, &tablet, &page_boundaries, si,
ei](ValueChunkWriter* vcw,
uint32_t col_idx) -> int {
int r = E_OK;
vcw->set_enable_page_seal_if_full(false);
uint32_t seg_start = si;
for (uint32_t boundary : page_boundaries) {
if ((r = value_write_column(vcw, tablet, col_idx, seg_start,
boundary)) != E_OK)
return r;
if (vcw->has_current_page_data() &&
(r = vcw->seal_current_page()) != E_OK)
return r;
seg_start = boundary;
}
if (seg_start < ei) {
r = value_write_column(vcw, tablet, col_idx, seg_start, ei);
}
vcw->set_enable_page_seal_if_full(true);
return r;
};
// All columns (time + values) write the same row segments and seal
// at the same boundaries — fully parallel.
#ifdef ENABLE_THREADS
if (g_config_value_.parallel_write_enabled_) {
std::vector<std::future<int>> futures;
futures.push_back(g_write_thread_pool_->submit(
[&write_time_in_segments, time_chunk_writer]() {
return write_time_in_segments(time_chunk_writer);
}));
for (uint32_t k = 0; k < value_chunk_writers.size(); k++) {
ValueChunkWriter* vcw = value_chunk_writers[k];
if (IS_NULL(vcw)) continue;
uint32_t col_idx = field_columns[k];
futures.push_back(g_write_thread_pool_->submit(
[&write_value_in_segments, vcw, col_idx]() {
return write_value_in_segments(vcw, col_idx);
}));
}
for (auto& f : futures) {
int r = f.get();
if (r != E_OK && ret == E_OK) ret = r;
}
if (ret != E_OK) return ret;
} else
#endif
{
if (RET_FAIL(write_time_in_segments(time_chunk_writer))) {
return ret;
}
for (uint32_t k = 0; k < value_chunk_writers.size(); k++) {
ValueChunkWriter* vcw = value_chunk_writers[k];
if (IS_NULL(vcw)) continue;
if (RET_FAIL(write_value_in_segments(vcw, field_columns[k]))) {
return ret;
}
}
}
start_idx = end_idx;
}
record_count_since_last_flush_ += tablet.cur_row_size_;
// Reset string column buffers so the tablet can be reused for the next
// batch without accumulating memory across writes.
tablet.reset_string_columns();
ret = check_memory_size_and_may_flush_chunks();
return ret;
}
std::vector<std::pair<std::shared_ptr<IDeviceID>, int>>
TsFileWriter::split_tablet_by_device(const Tablet& tablet) {
std::vector<std::pair<std::shared_ptr<IDeviceID>, int>> result;
if (tablet.id_column_indexes_.empty()) {
auto sentinel = std::make_shared<StringArrayDeviceID>("last_device_id");
result.emplace_back(std::move(sentinel), 0);
std::vector<std::string*> id_array;
id_array.push_back(new std::string(tablet.insert_target_name_));
auto res = std::make_shared<StringArrayDeviceID>(id_array);
delete id_array[0];
result.emplace_back(std::move(res), tablet.get_cur_row_size());
return result;
}
const uint32_t row_count = tablet.get_cur_row_size();
if (row_count == 0) return result;
auto sentinel = std::make_shared<StringArrayDeviceID>("last_device_id");
result.emplace_back(std::move(sentinel), 0);
auto boundaries = tablet.find_all_device_boundaries();
uint32_t seg_start = 0;
for (uint32_t b : boundaries) {
std::shared_ptr<IDeviceID> dev_id(tablet.get_device_id(seg_start));
result.emplace_back(std::move(dev_id), b);
seg_start = b;
}
std::shared_ptr<IDeviceID> last_id(tablet.get_device_id(seg_start));
result.emplace_back(std::move(last_id), row_count);
return result;
}
int TsFileWriter::write_column(ChunkWriter* chunk_writer, const Tablet& tablet,
int col_idx, uint32_t start_idx,
uint32_t end_idx) {
int ret = E_OK;
common::TSDataType data_type = tablet.schema_vec_->at(col_idx).data_type_;
int64_t* timestamps = tablet.timestamps_;
Tablet::ValueMatrixEntry col_values = tablet.value_matrix_[col_idx];
BitMap& col_notnull_bitmap = tablet.bitmaps_[col_idx];
end_idx = std::min(end_idx, tablet.max_row_num_);
if (data_type == common::BOOLEAN) {
ret = write_typed_column(chunk_writer, timestamps, col_values.bool_data,
col_notnull_bitmap, start_idx, end_idx);
} else if (data_type == common::INT32) {
ret =
write_typed_column(chunk_writer, timestamps, col_values.int32_data,
col_notnull_bitmap, start_idx, end_idx);
} else if (data_type == common::INT64) {
ret =
write_typed_column(chunk_writer, timestamps, col_values.int64_data,
col_notnull_bitmap, start_idx, end_idx);
} else if (data_type == common::FLOAT) {
ret =
write_typed_column(chunk_writer, timestamps, col_values.float_data,
col_notnull_bitmap, start_idx, end_idx);
} else if (data_type == common::DOUBLE) {
ret =
write_typed_column(chunk_writer, timestamps, col_values.double_data,
col_notnull_bitmap, start_idx, end_idx);
} else if (data_type == common::STRING) {
ret =
write_typed_column(chunk_writer, timestamps, col_values.string_col,
col_notnull_bitmap, start_idx, end_idx);
} else {
ASSERT(false);
}
return ret;
}
int TsFileWriter::time_write_column(TimeChunkWriter* time_chunk_writer,
const Tablet& tablet, uint32_t start_idx,
uint32_t end_idx) {
int64_t* timestamps = tablet.timestamps_;
int ret = E_OK;
if (IS_NULL(time_chunk_writer) || IS_NULL(timestamps)) {
return E_INVALID_ARG;
}
for (uint32_t r = start_idx; r < end_idx && r < tablet.max_row_num_; r++) {
if (RET_FAIL(time_chunk_writer->write(timestamps[r]))) {
break;
}
}
return ret;
}
int TsFileWriter::value_write_column(ValueChunkWriter* value_chunk_writer,
const Tablet& tablet, int col_idx,
uint32_t start_idx, uint32_t end_idx) {
int ret = E_OK;
TSDataType data_type = tablet.schema_vec_->at(col_idx).data_type_;
int64_t* timestamps = tablet.timestamps_;
Tablet::ValueMatrixEntry col_values = tablet.value_matrix_[col_idx];
BitMap& col_notnull_bitmap = tablet.bitmaps_[col_idx];
switch (data_type) {
case common::BOOLEAN:
ret = write_typed_column(value_chunk_writer, timestamps,
(bool*)col_values.bool_data,
col_notnull_bitmap, start_idx, end_idx);
break;
case common::DATE:
case common::INT32:
ret = write_typed_column(value_chunk_writer, timestamps,
(int32_t*)col_values.int32_data,
col_notnull_bitmap, start_idx, end_idx);
break;
case common::TIMESTAMP:
case common::INT64:
ret = write_typed_column(value_chunk_writer, timestamps,
(int64_t*)col_values.int64_data,
col_notnull_bitmap, start_idx, end_idx);
break;
case common::FLOAT:
ret = write_typed_column(value_chunk_writer, timestamps,
(float*)col_values.float_data,
col_notnull_bitmap, start_idx, end_idx);
break;
case common::DOUBLE:
ret = write_typed_column(value_chunk_writer, timestamps,
(double*)col_values.double_data,
col_notnull_bitmap, start_idx, end_idx);
break;
case common::STRING:
case common::TEXT:
case common::BLOB:
ret = write_typed_column(value_chunk_writer, timestamps,
col_values.string_col, col_notnull_bitmap,
start_idx, end_idx);
break;
default:
ret = E_NOT_SUPPORT;
}
return ret;
}
#define DO_WRITE_TYPED_COLUMN() \
do { \
int ret = E_OK; \
for (uint32_t r = start_idx; r < end_idx; r++) { \
if (LIKELY(!col_notnull_bitmap.test(r))) { \
if (RET_FAIL( \
chunk_writer->write(timestamps[r], col_values[r]))) { \
return ret; \
} \
} \
} \
return ret; \
} while (false)
#define DO_VALUE_WRITE_TYPED_COLUMN() \
do { \
int ret = E_OK; \
for (uint32_t r = start_idx; r < end_idx; r++) { \
if (LIKELY(col_notnull_bitmap.test(r))) { \
if (RET_FAIL(value_chunk_writer->write( \
timestamps[r], col_values[r], true))) { \
return ret; \
} \
} else { \
if (RET_FAIL(value_chunk_writer->write( \
timestamps[r], col_values[r], false))) { \
return ret; \
} \
} \
} \
return ret; \
} while (false)
int TsFileWriter::write_typed_column(ChunkWriter* chunk_writer,
int64_t* timestamps, bool* col_values,
BitMap& col_notnull_bitmap,
uint32_t start_idx, uint32_t end_idx) {
DO_WRITE_TYPED_COLUMN();
}
int TsFileWriter::write_typed_column(ChunkWriter* chunk_writer,
int64_t* timestamps, int32_t* col_values,
BitMap& col_notnull_bitmap,
uint32_t start_idx, uint32_t end_idx) {
DO_WRITE_TYPED_COLUMN();
}
int TsFileWriter::write_typed_column(ChunkWriter* chunk_writer,
int64_t* timestamps, int64_t* col_values,
BitMap& col_notnull_bitmap,
uint32_t start_idx, uint32_t end_idx) {
DO_WRITE_TYPED_COLUMN();
}
int TsFileWriter::write_typed_column(ChunkWriter* chunk_writer,
int64_t* timestamps, float* col_values,
BitMap& col_notnull_bitmap,
uint32_t start_idx, uint32_t end_idx) {
DO_WRITE_TYPED_COLUMN();
}
int TsFileWriter::write_typed_column(ChunkWriter* chunk_writer,
int64_t* timestamps, double* col_values,
BitMap& col_notnull_bitmap,
uint32_t start_idx, uint32_t end_idx) {
DO_WRITE_TYPED_COLUMN();
}
int TsFileWriter::write_typed_column(ChunkWriter* chunk_writer,
int64_t* timestamps,
Tablet::StringColumn* string_col,
BitMap& col_notnull_bitmap,
uint32_t start_idx, uint32_t end_idx) {
int ret = E_OK;
for (uint32_t r = start_idx; r < end_idx; r++) {
if (LIKELY(!col_notnull_bitmap.test(r))) {
common::String val(
string_col->buffer + string_col->offsets[r],
static_cast<uint32_t>(string_col->offsets[r + 1] -
string_col->offsets[r]));
if (RET_FAIL(chunk_writer->write(timestamps[r], val))) {
return ret;
}
}
}
return ret;
}
int TsFileWriter::write_typed_column(ValueChunkWriter* value_chunk_writer,
int64_t* timestamps, bool* col_values,
BitMap& col_notnull_bitmap,
uint32_t start_idx, uint32_t end_idx) {
DO_VALUE_WRITE_TYPED_COLUMN();
}
int TsFileWriter::write_typed_column(ValueChunkWriter* value_chunk_writer,
int64_t* timestamps, int32_t* col_values,
BitMap& col_notnull_bitmap,
uint32_t start_idx, uint32_t end_idx) {
DO_VALUE_WRITE_TYPED_COLUMN();
}
int TsFileWriter::write_typed_column(ValueChunkWriter* value_chunk_writer,
int64_t* timestamps, int64_t* col_values,
BitMap& col_notnull_bitmap,
uint32_t start_idx, uint32_t end_idx) {
DO_VALUE_WRITE_TYPED_COLUMN();
}
int TsFileWriter::write_typed_column(ValueChunkWriter* value_chunk_writer,
int64_t* timestamps, float* col_values,
BitMap& col_notnull_bitmap,
uint32_t start_idx, uint32_t end_idx) {
DO_VALUE_WRITE_TYPED_COLUMN();
}
int TsFileWriter::write_typed_column(ValueChunkWriter* value_chunk_writer,
int64_t* timestamps, double* col_values,
BitMap& col_notnull_bitmap,
uint32_t start_idx, uint32_t end_idx) {
DO_VALUE_WRITE_TYPED_COLUMN();
}
int TsFileWriter::write_typed_column(ValueChunkWriter* value_chunk_writer,
int64_t* timestamps,
Tablet::StringColumn* string_col,
common::BitMap& col_notnull_bitmap,
uint32_t start_idx, uint32_t end_idx) {
int ret = E_OK;
for (uint32_t r = start_idx; r < end_idx; r++) {
common::String val(string_col->buffer + string_col->offsets[r],
static_cast<uint32_t>(string_col->offsets[r + 1] -
string_col->offsets[r]));
if (LIKELY(col_notnull_bitmap.test(r))) {
if (RET_FAIL(value_chunk_writer->write(timestamps[r], val, true))) {
return ret;
}
} else {
if (RET_FAIL(
value_chunk_writer->write(timestamps[r], val, false))) {
return ret;
}
}
}
return ret;
}
// TODO make sure ret is meaningful to SDK user
int TsFileWriter::flush() {
int ret = E_OK;
if (!start_file_done_) {
if (RET_FAIL(io_writer_->start_file())) {
return ret;
}
start_file_done_ = true;
}
/* since @schemas_ used std::map which is rbtree underlying,
so map itself is ordered by device name. */
DeviceSchemasMapIter device_iter;
for (device_iter = schemas_.begin(); device_iter != schemas_.end();
device_iter++) { // cppcheck-suppress postfixOperator
if (check_chunk_group_empty(device_iter->second,
device_iter->second->is_aligned_)) {
continue;
}
bool is_aligned = device_iter->second->is_aligned_;
if (RET_FAIL(io_writer_->start_flush_chunk_group(device_iter->first,
is_aligned))) {
} else if (RET_FAIL(
flush_chunk_group(device_iter->second, is_aligned))) {
} else if (RET_FAIL(io_writer_->end_flush_chunk_group(is_aligned))) {
}
}
record_count_since_last_flush_ = 0;
return ret;
}
bool TsFileWriter::check_chunk_group_empty(MeasurementSchemaGroup* chunk_group,
bool is_aligned) {
if (chunk_group->is_aligned_ &&
chunk_group->time_chunk_writer_ != nullptr &&
chunk_group->time_chunk_writer_->hasData()) {
return false;
}
MeasurementSchemaMap& map = chunk_group->measurement_schema_map_;
for (MeasurementSchemaMapIter ms_iter = map.begin(); ms_iter != map.end();
ms_iter++) {
MeasurementSchema* m_schema = ms_iter->second;
if (is_aligned) {
if (m_schema->value_chunk_writer_ != NULL &&
m_schema->value_chunk_writer_->hasData()) {
return false;
}
} else {
if (m_schema->chunk_writer_ != NULL &&
m_schema->chunk_writer_->hasData()) {
// first condition is to avoid first flush empty chunk group
// second condition is to avoid repeated flush
return false;
}
}
}
return true;
}
#define FLUSH_CHUNK(writer, io_writer, name, data_type, encoding, compression, \
num_pages) \
if (RET_FAIL(writer->end_encode_chunk())) { \
} else if (RET_FAIL(io_writer->start_flush_chunk( \
writer->get_chunk_data(), name, data_type, encoding, \
compression, num_pages))) { \
} else if (RET_FAIL(io_writer->flush_chunk(writer->get_chunk_data()))) { \
} else if (RET_FAIL(io_writer->end_flush_chunk( \
writer->get_chunk_statistic()))) { \
} else { \
writer->reset(); \
}
int TsFileWriter::flush_chunk_group(MeasurementSchemaGroup* chunk_group,
bool is_aligned) {
int ret = E_OK;
MeasurementSchemaMap& map = chunk_group->measurement_schema_map_;
if (chunk_group->is_aligned_) {
TimeChunkWriter*& time_chunk_writer = chunk_group->time_chunk_writer_;
ChunkHeader chunk_header = time_chunk_writer->get_chunk_header();
FLUSH_CHUNK(time_chunk_writer, io_writer_,
chunk_header.measurement_name_, chunk_header.data_type_,
chunk_header.encoding_type_, chunk_header.compression_type_,
time_chunk_writer->num_of_pages())
}
for (MeasurementSchemaMapIter ms_iter = map.begin(); ms_iter != map.end();
ms_iter++) {
MeasurementSchema* m_schema = ms_iter->second;
if (!chunk_group->is_aligned_ && m_schema->chunk_writer_ != nullptr) {
ChunkWriter*& chunk_writer = m_schema->chunk_writer_;
FLUSH_CHUNK(chunk_writer, io_writer_, m_schema->measurement_name_,
m_schema->data_type_, m_schema->encoding_,
m_schema->compression_type_,
chunk_writer->num_of_pages())
} else if (m_schema->value_chunk_writer_ != nullptr) {
ValueChunkWriter*& value_chunk_writer =
m_schema->value_chunk_writer_;
FLUSH_CHUNK(value_chunk_writer, io_writer_,
m_schema->measurement_name_, m_schema->data_type_,
m_schema->encoding_, m_schema->compression_type_,
value_chunk_writer->num_of_pages())
}
}
return ret;
}
int TsFileWriter::close() { return io_writer_->end_file(); }
} // end namespace storage