blob: abd08b821b6a97486d7c09887ab534f69802ef5d [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "tsfile_io_writer.h"
#include <fcntl.h>
#include "common/global.h"
#include "common/logger/elog.h"
#include "writer/chunk_writer.h"
using namespace common;
namespace storage {
#if 0
#define OFFSET_DEBUG(msg) \
std::cout << "OFFSET_DEBUG: " << msg \
<< ". offset=" << write_stream_.total_size() << std::endl
#else
#define OFFSET_DEBUG(msg) void(msg)
#endif
#ifndef LIBTSFILE_SDK
int TsFileIOWriter::init() {
int ret = E_OK;
const uint32_t page_size = 1024;
meta_allocator_.init(page_size, MOD_TSFILE_WRITER_META);
chunk_meta_count_ = 0;
file_ = new WriteFile;
write_file_created_ = true;
FileID file_id;
file_id.seq_ = get_cur_timestamp();
file_id.version_ = 0;
file_id.merge_ = 0;
if (RET_FAIL(file_->create(file_id, O_RDWR | O_CREAT, 0644))) {
// log_err("file open error, ret=%d", ret);
}
return ret;
}
#endif
int TsFileIOWriter::init(WriteFile *write_file) {
int ret = E_OK;
const uint32_t page_size = 1024;
meta_allocator_.init(page_size, MOD_TSFILE_WRITER_META);
chunk_meta_count_ = 0;
file_ = write_file;
return ret;
}
void TsFileIOWriter::destroy() {
meta_allocator_.destroy();
write_stream_.destroy();
if (write_file_created_ && file_ != nullptr) {
delete file_;
file_ = nullptr;
}
}
int TsFileIOWriter::start_file() {
int ret = E_OK;
if (RET_FAIL(write_buf(MAGIC_STRING_TSFILE, MAGIC_STRING_TSFILE_LEN))) {
} else if (RET_FAIL(write_byte(VERSION_NUM_BYTE))) {
} else if (RET_FAIL(flush_stream_to_file())) {
}
return ret;
}
int TsFileIOWriter::start_flush_chunk_group(const std::string &device_name,
bool is_aligned) {
int ret = write_byte(CHUNK_GROUP_HEADER_MARKER);
if (ret != common::E_OK) {
return ret;
}
ret = write_string(device_name);
if (ret != common::E_OK) {
return ret;
}
cur_device_name_ = device_name;
ASSERT(cur_chunk_group_meta_ == nullptr);
use_prev_alloc_cgm_ = false;
for (auto iter = chunk_group_meta_list_.begin();
iter != chunk_group_meta_list_.end(); iter++) {
common::String cur_device_name((char *)cur_device_name_.c_str(),
cur_device_name_.size());
if (iter.get()->device_name_.equal_to(cur_device_name)) {
use_prev_alloc_cgm_ = true;
cur_chunk_group_meta_ = iter.get();
break;
}
}
if (!use_prev_alloc_cgm_) {
void *buf = meta_allocator_.alloc(sizeof(*cur_chunk_group_meta_));
if (IS_NULL(buf)) {
ret = E_OOM;
} else {
cur_chunk_group_meta_ = new (buf) ChunkGroupMeta(&meta_allocator_);
ret = cur_chunk_group_meta_->init(device_name, meta_allocator_);
}
}
return ret;
}
int TsFileIOWriter::start_flush_chunk(ByteStream &chunk_data,
ColumnDesc &col_desc,
int32_t num_of_pages) {
std::string measurement_name = col_desc.get_measurement_name_str();
return start_flush_chunk(chunk_data, measurement_name, col_desc.type_,
col_desc.encoding_, col_desc.compression_,
num_of_pages, col_desc.ts_id_);
}
int TsFileIOWriter::start_flush_chunk(common::ByteStream &chunk_data,
std::string &measurement_name,
common::TSDataType data_type,
common::TSEncoding encoding,
common::CompressionType compression,
int32_t num_of_pages,
common::TsID ts_id) {
int ret = E_OK;
// Step 1. record chunk meta
const int mask = 0; // for common chunk
ASSERT(cur_chunk_meta_ == nullptr);
void *buf1 = meta_allocator_.alloc(sizeof(*cur_chunk_meta_));
void *buf2 = meta_allocator_.alloc(get_typed_statistic_sizeof(data_type));
if (IS_NULL(buf1) || IS_NULL(buf2)) {
ret = E_OOM;
} else {
cur_chunk_meta_ = new (buf1) ChunkMeta();
Statistic *chunk_statistic_copy =
placement_new_statistic(data_type, buf2);
String mname((char *)measurement_name.c_str(),
strlen(measurement_name.c_str()));
ret = cur_chunk_meta_->init(mname, data_type, cur_file_position(),
chunk_statistic_copy, ts_id, mask,
meta_allocator_);
}
// Step 2. serialize chunk header to write_stream_
if (IS_SUCC(ret)) {
ChunkHeader chunk_header;
chunk_header.measurement_name_ = measurement_name,
chunk_header.data_size_ = chunk_data.total_size();
chunk_header.data_type_ = data_type;
chunk_header.compression_type_ = compression;
chunk_header.encoding_type_ = encoding;
chunk_header.num_of_pages_ = num_of_pages;
chunk_header.chunk_type_ =
(num_of_pages <= 1 ? ONLY_ONE_PAGE_CHUNK_HEADER_MARKER
: CHUNK_HEADER_MARKER);
OFFSET_DEBUG("start flush chunk header");
ret = chunk_header.serialize_to(write_stream_);
OFFSET_DEBUG("after flush chunk header");
#if DEBUG_SE
std::cout << "TsFileIOWriter writer ChunkHeader: " << chunk_header
<< std::endl;
#endif
}
return ret;
}
int TsFileIOWriter::flush_chunk(ByteStream &chunk_data) {
int ret = E_OK;
if (RET_FAIL(write_chunk_data(chunk_data))) {
// log_err("writer chunk data error, ret=%d", ret);
} else if (RET_FAIL(flush_stream_to_file())) {
// log_err("flush stream error, ret=%d", ret);
}
return ret;
}
int TsFileIOWriter::write_chunk_data(ByteStream &chunk_data) {
OFFSET_DEBUG("before writer chunk data");
// may print chunk_data here for debug.
#if DEBUG_SE
DEBUG_print_byte_stream("WriteChunkData chunk_data=", chunk_data);
std::cout << "WriteChunkData: write_stream_.total_size="
<< write_stream_.total_size()
<< ", chunk_data.total_size=" << chunk_data.total_size()
<< std::endl;
#endif
int ret = merge_byte_stream(write_stream_, chunk_data, true);
OFFSET_DEBUG("before writer chunk data");
return ret;
}
int TsFileIOWriter::end_flush_chunk(Statistic *chunk_statistic) {
int ret = E_OK;
chunk_meta_count_++;
cur_chunk_meta_->clone_statistic_from(chunk_statistic);
if (RET_FAIL(cur_chunk_group_meta_->push(cur_chunk_meta_))) {
} else {
cur_chunk_meta_ = nullptr;
}
return ret;
}
int TsFileIOWriter::end_flush_chunk_group(bool is_aligned) {
if (use_prev_alloc_cgm_) {
cur_chunk_group_meta_ = nullptr;
return common::E_OK;
}
int ret = chunk_group_meta_list_.push_back(cur_chunk_group_meta_);
cur_chunk_group_meta_ = nullptr;
return ret;
}
int TsFileIOWriter::end_file() {
int ret = E_OK;
OFFSET_DEBUG("before end file");
if (RET_FAIL(write_log_index_range())) {
std::cout << "writer range index error, ret =" << ret << std::endl;
} else if (RET_FAIL(write_file_index())) {
std::cout << "writer file index error, ret = " << ret << std::endl;
} else if (RET_FAIL(write_file_footer())) {
std::cout << "writer file footer error, ret = " << ret << std::endl;
} else if (RET_FAIL(sync_file())) {
std::cout << "sync file error, ret = " << ret << std::endl;
} else if (RET_FAIL(close_file())) {
std::cout << "close file error, ret = " << ret << std::endl;
}
return ret;
}
int TsFileIOWriter::write_log_index_range() {
int ret = E_OK;
// Currently, Java IoTDB leaves it as default value.
const int64_t min_plan_index = 0; // 0x7fffffffffffffff;
const int64_t max_plan_index = 0; // 0x8000000000000000;
// OFFSET_DEBUG("before writer log index");
if (RET_FAIL(write_byte(OPERATION_INDEX_RANGE))) {
std::cout << "writer byte error " << ret << std::endl;
} else if (RET_FAIL(SerializationUtil::write_i64(min_plan_index,
write_stream_))) {
std::cout << "min index error " << ret << std::endl;
} else if (RET_FAIL(SerializationUtil::write_i64(max_plan_index,
write_stream_))) {
std::cout << "max index error " << ret << std::endl;
}
return ret;
}
#if DEBUG_SE
void debug_print_chunk_group_meta(ChunkGroupMeta *cgm) {
std::cout << "ChunkGroupMeta = {device_name_=" << cgm->device_name_
<< ", chunk_meta_list_={";
SimpleList<ChunkMeta *>::Iterator cm_it = cgm->chunk_meta_list_.begin();
for (; cm_it != cgm->chunk_meta_list_.end(); cm_it++) {
ChunkMeta *cm = cm_it.get();
std::cout << "{measurement=" << cm->measurement_name_
<< ", offset_of_chunk_header=" << cm->offset_of_chunk_header_
<< ", statistic=" << cm->statistic_->to_string() << "}}}";
}
std::cout << std::endl;
}
void debug_print_chunk_group_meta_list(
common::SimpleList<ChunkGroupMeta *> &cgm_list) {
SimpleList<ChunkGroupMeta *>::Iterator cgm_it = cgm_list.begin();
for (; cgm_it != cgm_list.end(); cgm_it++) {
ChunkGroupMeta *cgm = cgm_it.get();
debug_print_chunk_group_meta(cgm);
}
}
#endif
// TODO better memory management.
int TsFileIOWriter::write_file_index() {
#if DEBUG_SE
debug_print_chunk_group_meta_list(chunk_group_meta_list_);
#endif
int ret = E_OK;
int64_t meta_offset = 0;
BloomFilter filter;
// TODO: better memory manage for this while-loop, cur_index_node_queue
FileIndexWritingMemManager writing_mm;
String device_name;
String measurement_name;
TimeseriesIndex ts_index;
String prev_device_name;
int entry_count_in_cur_device = 0;
MetaIndexEntry *meta_index_entry = nullptr;
MetaIndexNode *cur_index_node = nullptr;
SimpleList<MetaIndexNode *> *cur_index_node_queue = nullptr;
DeviceNodeMap device_map;
TSMIterator tsm_iter(chunk_group_meta_list_);
if (RET_FAIL(write_separator_marker(meta_offset))) {
return ret;
} else if (RET_FAIL(init_bloom_filter(filter))) {
return ret;
}
if (RET_FAIL(tsm_iter.init())) {
return ret;
}
while (IS_SUCC(ret) && tsm_iter.has_next()) {
ts_index.reset(); // TODO reuse
if (RET_FAIL(
tsm_iter.get_next(device_name, measurement_name, ts_index))) {
break;
}
#if DEBUG_SE
std::cout << "tsm_iter get next = {device_name=" << device_name
<< ", measurement_name=" << measurement_name
<< ", ts_index=" << ts_index << std::endl;
#endif
// prepare if it is an entry of a new device
if (!prev_device_name.equal_to(device_name)) {
if (!prev_device_name.is_null()) {
if (RET_FAIL(add_cur_index_node_to_queue(
cur_index_node, cur_index_node_queue))) {
} else if (RET_FAIL(add_device_node(
device_map, prev_device_name,
cur_index_node_queue, writing_mm))) {
}
}
if (IS_SUCC(ret)) {
if (RET_FAIL(alloc_meta_index_node_queue(
writing_mm, cur_index_node_queue))) {
} else if (RET_FAIL(alloc_and_init_meta_index_node(
writing_mm, cur_index_node, LEAF_MEASUREMENT))) {
}
}
prev_device_name.shallow_copy_from(device_name);
entry_count_in_cur_device = 0;
}
// pick an entry as index entry every max_degree_of_index_node entries.
if (IS_SUCC(ret) && entry_count_in_cur_device %
g_config_value_.max_degree_of_index_node_ ==
0) {
if (cur_index_node->is_full()) {
if (RET_FAIL(add_cur_index_node_to_queue(
cur_index_node, cur_index_node_queue))) {
} else if (RET_FAIL(alloc_and_init_meta_index_node(
writing_mm, cur_index_node, LEAF_MEASUREMENT))) {
}
}
if (IS_SUCC(ret)) {
if (RET_FAIL(alloc_and_init_meta_index_entry(
writing_mm, meta_index_entry, measurement_name))) {
} else if (RET_FAIL(
cur_index_node->push_entry(meta_index_entry))) {
}
}
}
if (IS_SUCC(ret)) {
OFFSET_DEBUG("before ts_index written");
if (ts_index.get_data_type() != common::VECTOR) {
ret = filter.add_path_entry(device_name, measurement_name);
}
if (RET_FAIL(ts_index.serialize_to(write_stream_))) {
} else {
#if DEBUG_SE
std::cout << "ts_index.serialize. ts_index=" << ts_index
<< " file_pos=" << cur_file_position() << std::endl;
#endif
entry_count_in_cur_device++;
// add_ts_time_index_entry(ts_index);
}
}
} // end while
if (ret == E_NO_MORE_DATA) {
ret = E_OK;
}
ASSERT(ret == E_OK);
if (IS_SUCC(ret)) { // iter finish
ASSERT(cur_index_node != nullptr);
ASSERT(cur_index_node_queue != nullptr);
if (RET_FAIL(add_cur_index_node_to_queue(cur_index_node,
cur_index_node_queue))) {
} else if (RET_FAIL(add_device_node(device_map, prev_device_name,
cur_index_node_queue,
writing_mm))) {
}
}
if (IS_SUCC(ret)) {
MetaIndexNode *device_index_root_node = nullptr;
if (RET_FAIL(build_device_level(device_map, device_index_root_node,
writing_mm))) {
} else {
TsFileMeta tsfile_meta;
tsfile_meta.index_node_ = device_index_root_node;
tsfile_meta.meta_offset_ = meta_offset;
int64_t tsfile_meta_offset = cur_file_position();
uint32_t size = 0; // cppcheck-suppress unreadVariable
OFFSET_DEBUG("before tsfile_meta written");
if (RET_FAIL(tsfile_meta.serialize_to(write_stream_))) {
} else if (RET_FAIL(filter.serialize_to(write_stream_))) {
} else {
int64_t tsfile_meta_end_offset = cur_file_position();
size = (uint32_t)(tsfile_meta_end_offset - tsfile_meta_offset);
ret = SerializationUtil::write_ui32(size, write_stream_);
}
#if DEBUG_SE
std::cout << "writer tsfile_meta: " << tsfile_meta
<< ", tsfile_meta_offset=" << tsfile_meta_offset
<< ", size=" << size << std::endl;
#endif
tsfile_meta.index_node_ =
nullptr; // memory management is delegated to writing_mm
}
}
return ret;
}
int TsFileIOWriter::write_separator_marker(int64_t &meta_offset) {
meta_offset = cur_file_position();
return write_byte(SEPARATOR_MARKER);
}
int TsFileIOWriter::init_bloom_filter(BloomFilter &filter) {
int ret = E_OK;
int32_t path_count = get_path_count(chunk_group_meta_list_);
if (RET_FAIL(filter.init(
g_config_value_.tsfile_index_bloom_filter_error_percent_,
path_count))) {
}
return ret;
}
int32_t TsFileIOWriter::get_path_count(
common::SimpleList<ChunkGroupMeta *> &cgm_list) {
int32_t path_count = 0;
String prev_measurement;
SimpleList<ChunkGroupMeta *>::Iterator cgm_it = cgm_list.begin();
for (; cgm_it != cgm_list.end(); cgm_it++) {
ChunkGroupMeta *cgm = cgm_it.get();
SimpleList<ChunkMeta *>::Iterator cm_it = cgm->chunk_meta_list_.begin();
for (; cm_it != cgm->chunk_meta_list_.end(); cm_it++) {
ChunkMeta *cm = cm_it.get();
// TODO currently, we do not have multi chunks of same timeseries in
// tsfile.
if (!cm->measurement_name_.equal_to(prev_measurement)) {
path_count++;
prev_measurement = cm->measurement_name_;
}
}
}
return path_count;
}
int TsFileIOWriter::write_file_footer() {
int ret = E_OK;
if (RET_FAIL(write_buf(MAGIC_STRING_TSFILE, MAGIC_STRING_TSFILE_LEN))) {
} else if (RET_FAIL(flush_stream_to_file())) {
}
return ret;
}
int TsFileIOWriter::build_device_level(DeviceNodeMap &device_map,
MetaIndexNode *&ret_root,
FileIndexWritingMemManager &wmm) {
int ret = E_OK;
SimpleList<MetaIndexNode *> node_queue(1024,
MOD_TSFILE_WRITER_META); // FIXME
DeviceNodeMapIterator device_map_iter;
MetaIndexNode *cur_index_node = nullptr;
if (RET_FAIL(
alloc_and_init_meta_index_node(wmm, cur_index_node, LEAF_DEVICE))) {
return ret;
}
for (device_map_iter = device_map.begin();
device_map_iter != device_map.end() && IS_SUCC(ret);
device_map_iter++) {
String device_name;
device_name.shallow_copy_from(device_map_iter->first);
MetaIndexEntry *entry = nullptr;
if (cur_index_node->is_full()) {
cur_index_node->end_offset_ = cur_file_position();
#if DEBUG_SE
std::cout << "TsFileIOWriter::build_device_level, cur_index_node="
<< *cur_index_node << std::endl;
#endif
if (RET_FAIL(node_queue.push_back(cur_index_node))) {
} else if (RET_FAIL(alloc_and_init_meta_index_node(
wmm, cur_index_node, LEAF_DEVICE))) {
}
}
if (RET_FAIL(
alloc_and_init_meta_index_entry(wmm, entry, device_name))) {
} else if (RET_FAIL(
device_map_iter->second->serialize_to(write_stream_))) {
} else if (RET_FAIL(cur_index_node->push_entry(entry))) {
}
} // end for
if (IS_SUCC(ret)) {
if (!cur_index_node->is_empty()) {
cur_index_node->end_offset_ = cur_file_position();
ret = node_queue.push_back(cur_index_node);
}
}
if (IS_SUCC(ret)) {
if (node_queue.size() > 0) {
if (RET_FAIL(generate_root(&node_queue, ret_root, INTERNAL_DEVICE,
wmm))) {
}
} else {
ret_root = cur_index_node;
ret_root->end_offset_ = cur_file_position();
ret_root->node_type_ = LEAF_DEVICE;
}
}
return ret;
}
int TsFileIOWriter::alloc_and_init_meta_index_entry(
FileIndexWritingMemManager &wmm, MetaIndexEntry *&ret_entry, String &name) {
void *buf = wmm.pa_.alloc(sizeof(MetaIndexEntry));
if (IS_NULL(buf)) {
return E_OOM;
}
ret_entry = new (buf) MetaIndexEntry;
ret_entry->name_.shallow_copy_from(name);
ret_entry->offset_ = cur_file_position();
#if DEBUG_SE
std::cout << "alloc_and_init_meta_index_entry, MetaIndexEntry="
<< *ret_entry << std::endl;
#endif
return E_OK;
}
int TsFileIOWriter::alloc_and_init_meta_index_node(
FileIndexWritingMemManager &wmm, MetaIndexNode *&ret_node,
const MetaIndexNodeType node_type) {
void *buf = wmm.pa_.alloc(sizeof(MetaIndexNode));
if (IS_NULL(buf)) {
return E_OOM;
}
ret_node = new (buf) MetaIndexNode(&wmm.pa_);
ret_node->node_type_ = node_type;
wmm.all_index_nodes_.push_back(ret_node);
#if DEBUG_SE
std::cout << "alloc_and_init_meta_index_node, node=" << *ret_node
<< std::endl;
#endif
return E_OK;
}
int TsFileIOWriter::add_cur_index_node_to_queue(
MetaIndexNode *node, SimpleList<MetaIndexNode *> *queue) {
node->end_offset_ = cur_file_position();
#if DEBUG_SE
std::cout << "add_cur_index_node_to_queue, node=" << (void *)node << " = "
<< *node << ", set offset=" << cur_file_position() << std::endl;
#endif
return queue->push_back(node);
}
int TsFileIOWriter::alloc_meta_index_node_queue(
FileIndexWritingMemManager &wmm, SimpleList<MetaIndexNode *> *&queue) {
void *buf = wmm.pa_.alloc(sizeof(*queue));
if (IS_NULL(buf)) {
return E_OOM;
}
queue = new (buf) SimpleList<MetaIndexNode *>(&wmm.pa_);
return E_OK;
}
int TsFileIOWriter::add_device_node(
DeviceNodeMap &device_map, String device_name,
SimpleList<MetaIndexNode *> *measurement_index_node_queue,
FileIndexWritingMemManager &wmm) {
ASSERT(measurement_index_node_queue->size() > 0);
int ret = E_OK;
DeviceNodeMapIterator find_iter = device_map.find(device_name);
if (find_iter != device_map.end()) {
return E_ALREADY_EXIST;
}
MetaIndexNode *root = nullptr;
if (RET_FAIL(generate_root(measurement_index_node_queue, root,
INTERNAL_MEASUREMENT, wmm))) {
} else {
std::pair<String, MetaIndexNode *> ins_pair;
ins_pair.first.shallow_copy_from(device_name);
ins_pair.second = root;
std::pair<DeviceNodeMapIterator, bool> ins_res =
device_map.insert(ins_pair);
if (!ins_res.second) {
ASSERT(false);
}
}
return ret;
}
int TsFileIOWriter::generate_root(SimpleList<MetaIndexNode *> *node_queue,
MetaIndexNode *&root_node,
MetaIndexNodeType node_type,
FileIndexWritingMemManager &wmm) {
int ret = E_OK;
ASSERT(node_queue->size() > 0);
if (node_queue->size() == 1) {
root_node = node_queue->front();
return ret;
}
const uint32_t LIST_PAGE_SIZE = 256;
const AllocModID mid = MOD_TSFILE_WRITER_META;
SimpleList<MetaIndexNode *> list_x(LIST_PAGE_SIZE, mid);
SimpleList<MetaIndexNode *> list_y(LIST_PAGE_SIZE, mid);
if (RET_FAIL(clone_node_list(node_queue, &list_x))) {
return ret;
}
SimpleList<MetaIndexNode *> *from = &list_x;
SimpleList<MetaIndexNode *> *to = &list_y;
MetaIndexNode *cur_index_node = nullptr;
if (RET_FAIL(
alloc_and_init_meta_index_node(wmm, cur_index_node, node_type))) {
}
uint32_t from_size = from->size();
while (IS_SUCC(ret)) {
to->clear();
SimpleList<MetaIndexNode *>::Iterator from_iter;
for (from_iter = from->begin();
IS_SUCC(ret) && from_iter != from->end(); from_iter++) {
MetaIndexNode *iter_node = from_iter.get();
MetaIndexEntry *entry = nullptr;
String name;
if (RET_FAIL(iter_node->get_first_child_name(name))) {
} else if (RET_FAIL(
alloc_and_init_meta_index_entry(wmm, entry, name))) {
} else if (cur_index_node->is_full()) {
cur_index_node->end_offset_ = cur_file_position();
if (RET_FAIL(to->push_back(cur_index_node))) {
} else {
#if DEBUG_SE
std::cout
<< "generate root, alloc_and_init_meta_index_node. "
"cur_index_node="
<< *cur_index_node << std::endl;
#endif
if (RET_FAIL(alloc_and_init_meta_index_node(
wmm, cur_index_node, node_type))) {
}
}
}
if (IS_SUCC(ret)) {
if (RET_FAIL(cur_index_node->push_entry(entry))) {
} else {
OFFSET_DEBUG("before writer index_node in generate_root");
ret = iter_node->serialize_to(write_stream_);
OFFSET_DEBUG("after writer index_node in generate_root");
}
}
} // end for
if (IS_SUCC(ret)) {
if (!cur_index_node->is_empty()) {
cur_index_node->end_offset_ = cur_file_position();
if (RET_FAIL(to->push_back(cur_index_node))) {
}
#if DEBUG_SE
std::cout << "genereate root 2, "
"alloc_and_init_meta_index_node. cur_index_node="
<< *cur_index_node << std::endl;
#endif
if (RET_FAIL(alloc_and_init_meta_index_node(wmm, cur_index_node,
node_type))) {
}
}
}
if (IS_SUCC(ret)) {
ASSERT(from_size > to->size());
if (to->size() == 1) {
root_node = to->front();
break;
} else {
swap_list(from, to);
}
}
} // end while
return ret;
}
int TsFileIOWriter::clone_node_list(SimpleList<MetaIndexNode *> *src,
SimpleList<MetaIndexNode *> *dest) {
int ret = E_OK;
SimpleList<MetaIndexNode *>::Iterator it;
for (it = src->begin(); IS_SUCC(ret) && it != src->end(); it++) {
ret = dest->push_back(it.get());
}
return ret;
}
// #if DEBUG_SE
// void DEBUG_print_byte_stream_buf(const char *tag,
// const char *buf,
// const uint32_t len,
// bool reset_print)
// {
// static int print_count = 0;
// if (reset_print) {
// print_count = 0;
// }
//
// for (uint32_t i = 0; i < len; i++) {
// if (print_count++ % 16 == 0) {
// printf("\n%s: BUF=", tag);
// }
// printf("%02x ", (uint8_t)buf[i]);
// }
// printf("\n\n");
// }
// #endif
/*
* TODO:
* when finish flushing stream to file, reclaim memory used by stream
*/
int TsFileIOWriter::flush_stream_to_file() {
int ret = E_OK;
while (true) {
ByteStream::Buffer b =
write_stream_consumer_.get_next_buf(write_stream_);
if (b.buf_ == nullptr) {
break;
} else {
if (RET_FAIL(file_->write(b.buf_, b.len_))) {
break;
}
}
}
write_stream_.purge_prev_pages();
return ret;
}
void TsFileIOWriter::add_ts_time_index_entry(TimeseriesIndex &ts_index) {
TimeseriesTimeIndexEntry time_index_entry;
time_index_entry.ts_id_ = ts_index.get_ts_id();
time_index_entry.time_range_.start_time_ =
ts_index.get_statistic()->start_time_;
time_index_entry.time_range_.end_time_ =
ts_index.get_statistic()->end_time_;
ts_time_index_vector_.push_back(time_index_entry);
}
} // namespace storage