blob: 273f09a44fe8551793506f3f1ed8f53723211f67 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "file/tsfile_io_reader.h"
#include "common/allocator/alloc_base.h"
using namespace common;
namespace storage {
int TsFileIOReader::init(const std::string &file_path) {
int ret = E_OK;
read_file_ = new ReadFile;
read_file_created_ = true;
if (RET_FAIL(read_file_->open(file_path))) {
}
return ret;
}
int TsFileIOReader::init(ReadFile *read_file) {
if (IS_NULL(read_file)) {
ASSERT(false);
return E_INVALID_ARG;
}
read_file_created_ = false;
read_file_ = read_file;
return E_OK;
}
void TsFileIOReader::reset() {
if (read_file_ != nullptr) {
if (read_file_created_) {
read_file_->destroy();
delete read_file_;
}
read_file_ = nullptr;
tsfile_meta_page_arena_.destroy();
tsfile_meta_ready_ = false;
}
}
int TsFileIOReader::alloc_ssi(std::shared_ptr<IDeviceID> device_id,
const std::string &measurement_name,
TsFileSeriesScanIterator *&ssi,
common::PageArena &pa, Filter *time_filter) {
int ret = E_OK;
if (RET_FAIL(load_tsfile_meta_if_necessary())) {
} else {
ssi = new TsFileSeriesScanIterator;
ssi->init(device_id, measurement_name, read_file_, time_filter, pa);
if (RET_FAIL(load_timeseries_index_for_ssi(device_id, measurement_name,
ssi))) {
} else if (time_filter != nullptr &&
!filter_stasify(ssi->itimeseries_index_, time_filter)) {
ret = E_NO_MORE_DATA;
} else if (RET_FAIL(ssi->init_chunk_reader())) {
}
if (ret != E_OK) {
ssi->destroy();
delete ssi;
ssi = nullptr;
}
}
return ret;
}
void TsFileIOReader::revert_ssi(TsFileSeriesScanIterator *ssi) {
if (ssi != nullptr) {
ssi->destroy();
delete ssi;
}
}
int TsFileIOReader::get_device_timeseries_meta_without_chunk_meta(
std::shared_ptr<IDeviceID> device_id,
std::vector<ITimeseriesIndex *> &timeseries_indexs, PageArena &pa) {
int ret = E_OK;
load_tsfile_meta_if_necessary();
std::shared_ptr<IMetaIndexEntry> meta_index_entry;
int64_t end_offset;
std::vector<std::pair<std::shared_ptr<IMetaIndexEntry>, int64_t> >
meta_index_entry_list;
if (RET_FAIL(load_device_index_entry(
std::make_shared<DeviceIDComparable>(device_id), meta_index_entry,
end_offset))) {
} else if (RET_FAIL(load_all_measurement_index_entry(
meta_index_entry->get_offset(), end_offset, pa,
meta_index_entry_list))) {
} else if (RET_FAIL(do_load_all_timeseries_index(meta_index_entry_list, pa,
timeseries_indexs))) {
}
return ret;
}
bool TsFileIOReader::filter_stasify(ITimeseriesIndex *ts_index,
Filter *time_filter) {
ASSERT(ts_index->get_statistic() != nullptr);
return time_filter->satisfy(ts_index->get_statistic());
}
int TsFileIOReader::load_tsfile_meta_if_necessary() {
int ret = E_OK;
if (!tsfile_meta_ready_) {
if (RET_FAIL(load_tsfile_meta())) {
// log_err("load_tsfile_meta error, ret=%d", ret);
return ret;
} else {
tsfile_meta_ready_ = true;
}
}
return ret;
}
int TsFileIOReader::load_tsfile_meta() {
const int32_t TSFILE_READ_IO_SIZE = 1024; // TODO make it configurable
const int32_t TAIL_MAGIC_AND_META_SIZE_SIZE =
10; // magic(6B) + meta_size(4B)
ASSERT(file_size() > 0); // > 13
int ret = E_OK;
uint32_t tsfile_meta_size = 0;
int32_t read_offset = 0;
int32_t ret_read_len = 0;
// Step 1: reader the tsfile_meta_size
// 1.1 prepare reader buffer
int32_t alloc_size = UTIL_MIN(TSFILE_READ_IO_SIZE, file_size());
char *read_buf = (char *)mem_alloc(alloc_size, MOD_TSFILE_READER);
if (IS_NULL(read_buf)) {
return E_OOM;
}
// 1.2 reader data from file
read_offset = file_size() - alloc_size;
ret_read_len = 0;
if (RET_FAIL(read_file_->read(read_offset, read_buf, alloc_size,
ret_read_len))) {
} else if (ret_read_len != alloc_size) {
ret = E_FILE_READ_ERR;
// log_err("do not reader enough data from tsfile, want-size=%d,
// reader-size=%d, file=%s", alloc_size, ret_read_len,
// get_file_path().c_str());
}
// 1.3 deserialize tsfile_meta_size
if (IS_SUCC(ret)) {
// deserialize tsfile_meta_size
char *size_buf = read_buf + alloc_size - TAIL_MAGIC_AND_META_SIZE_SIZE;
tsfile_meta_size = SerializationUtil::read_ui32(size_buf);
ASSERT(tsfile_meta_size > 0 && tsfile_meta_size <= (1ll << 20));
}
// Step 2: reader TsFileMeta
if (IS_SUCC(ret)) {
// 2.1 prepare enough buffer (use the previous buffer if can).
char *tsfile_meta_buf = nullptr;
if (tsfile_meta_size + TAIL_MAGIC_AND_META_SIZE_SIZE >
(uint32_t)alloc_size) {
// prepare buffer to re-reader from start of tsfile_meta
char *old_read_buf = read_buf;
read_buf = (char *)mem_realloc(read_buf, tsfile_meta_size);
if (IS_NULL(read_buf)) {
read_buf = old_read_buf;
ret = E_OOM;
} else if (RET_FAIL(read_file_->read(
file_size() - tsfile_meta_size -
TAIL_MAGIC_AND_META_SIZE_SIZE,
read_buf, tsfile_meta_size, ret_read_len))) {
} else if (tsfile_meta_size != (uint32_t)ret_read_len) {
ret = E_FILE_READ_ERR;
// log_err("do not reader enough data from tsfile, want-size=%d,
// reader-size=%d, file=%s", tsfile_meta_size, ret_read_len,
// get_file_path().c_str());
} else {
tsfile_meta_buf = read_buf;
}
} else {
// the previous buffer has contained the TsFileMeta data
tsfile_meta_buf = read_buf + alloc_size - tsfile_meta_size -
TAIL_MAGIC_AND_META_SIZE_SIZE;
// DEBUG_hex_dump_buf("tsfile_meta_buf=", tsfile_meta_buf,
// tsfile_meta_size);
}
if (IS_SUCC(ret)) {
ByteStream tsfile_meta_bs;
tsfile_meta_bs.wrap_from(tsfile_meta_buf, tsfile_meta_size);
if (RET_FAIL(tsfile_meta_.deserialize_from(tsfile_meta_bs))) {
}
#if DEBUG_SE
std::cout << "load tsfile_meta, ret=" << ret
<< ", tsfile_meta_=" << tsfile_meta_ << std::endl;
#endif
}
}
mem_free(read_buf);
return ret;
}
int TsFileIOReader::load_timeseries_index_for_ssi(
std::shared_ptr<IDeviceID> device_id, const std::string &measurement_name,
TsFileSeriesScanIterator *&ssi) {
int ret = E_OK;
std::shared_ptr<IMetaIndexEntry> device_index_entry;
int64_t device_ie_end_offset = 0;
std::shared_ptr<IMetaIndexEntry> measurement_index_entry;
int64_t measurement_ie_end_offset = 0;
// bool is_aligned = false;
if (RET_FAIL(load_device_index_entry(
std::make_shared<DeviceIDComparable>(device_id), device_index_entry,
device_ie_end_offset))) {
return ret;
}
auto &pa = ssi->timeseries_index_pa_;
int start_offset = device_index_entry->get_offset(),
end_offset = device_ie_end_offset;
ASSERT(start_offset < end_offset);
const int32_t read_size = end_offset - start_offset;
int32_t ret_read_len = 0;
char *data_buf = (char *)pa.alloc(read_size);
void *m_idx_node_buf = pa.alloc(sizeof(MetaIndexNode));
if (IS_NULL(data_buf) || IS_NULL(m_idx_node_buf)) {
return E_OOM;
}
auto *top_node_ptr = new (m_idx_node_buf) MetaIndexNode(&pa);
auto top_node = std::shared_ptr<MetaIndexNode>(top_node_ptr,
MetaIndexNode::self_deleter);
if (RET_FAIL(read_file_->read(start_offset, data_buf, read_size,
ret_read_len))) {
return ret;
} else if (RET_FAIL(top_node->deserialize_from(data_buf, read_size))) {
return ret;
}
bool is_aligned = is_aligned_device(top_node);
TimeseriesIndex *timeseries_index = nullptr;
if (is_aligned) {
if (RET_FAIL(
get_time_column_metadata(top_node, timeseries_index, pa))) {
return ret;
}
}
if (RET_FAIL(load_measurement_index_entry(measurement_name, top_node,
measurement_index_entry,
measurement_ie_end_offset))) {
return ret;
} else if (RET_FAIL(do_load_timeseries_index(
measurement_name, measurement_index_entry->get_offset(),
measurement_ie_end_offset, ssi->timeseries_index_pa_,
ssi->itimeseries_index_, is_aligned))) {
return ret;
}
if (is_aligned) {
auto *aligned_timeseries_index =
dynamic_cast<AlignedTimeseriesIndex *>(ssi->itimeseries_index_);
if (aligned_timeseries_index) {
aligned_timeseries_index->time_ts_idx_ = timeseries_index;
}
}
#if DEBUG_SE
if (measurement_index_entry.name_.len_) {
std::cout << "load timeseries index: "
<< *((TimeseriesIndex *)ssi->itimeseries_index_) << std::endl;
} else {
std::cout << "load aligned timeseries index: "
<< *((AlignedTimeseriesIndex *)ssi->itimeseries_index_)
<< std::endl;
}
#endif
return ret;
}
int TsFileIOReader::load_device_index_entry(
std::shared_ptr<IComparable> device_name,
std::shared_ptr<IMetaIndexEntry> &device_index_entry, int64_t &end_offset) {
int ret = E_OK;
std::shared_ptr<DeviceIDComparable> device_id_comparable =
std::dynamic_pointer_cast<DeviceIDComparable>(device_name);
if (device_id_comparable == nullptr) {
return E_INVALID_DATA_POINT;
}
auto index_node = tsfile_meta_.table_metadata_index_node_map_
[device_id_comparable->device_id_->get_table_name()];
assert(tsfile_meta_.table_metadata_index_node_map_.find(
device_id_comparable->device_id_->get_table_name()) !=
tsfile_meta_.table_metadata_index_node_map_.end());
assert(index_node != nullptr);
if (index_node->node_type_ == LEAF_DEVICE) {
// FIXME
ret = index_node->binary_search_children(
device_name, true, device_index_entry, end_offset);
} else {
ret = search_from_internal_node(device_name, true, index_node,
device_index_entry, end_offset);
}
if (ret == E_NOT_EXIST) {
ret = E_DEVICE_NOT_EXIST;
}
#if DEBUG_SE
std::cout << "load_device_index_entry, device_index_entry={"
<< device_index_entry << "}, end_offset=" << end_offset
<< std::endl;
#endif
return ret;
}
int TsFileIOReader::load_measurement_index_entry(
const std::string &measurement_name_str,
std::shared_ptr<MetaIndexNode> top_node,
std::shared_ptr<IMetaIndexEntry> &ret_measurement_index_entry,
int64_t &ret_end_offset) {
int ret = E_OK;
// search from top_node in top-down way
auto measurement_name =
std::make_shared<StringComparable>(measurement_name_str);
if (top_node->node_type_ == LEAF_MEASUREMENT) {
ret = top_node->binary_search_children(
measurement_name, /*exact*/ false, ret_measurement_index_entry,
ret_end_offset);
} else {
ret = search_from_internal_node(measurement_name, false, top_node,
ret_measurement_index_entry,
ret_end_offset);
}
if (ret == E_NOT_EXIST) {
ret = E_MEASUREMENT_NOT_EXIST;
}
return ret;
}
int TsFileIOReader::load_all_measurement_index_entry(
int64_t start_offset, int64_t end_offset, common::PageArena &pa,
std::vector<std::pair<std::shared_ptr<IMetaIndexEntry>, int64_t> >
&ret_measurement_index_entry) {
#if DEBUG_SE
std::cout << "load_measurement_index_entry: measurement_name_str= "
<< ", start_offset=" << start_offset
<< ", end_offset=" << end_offset << std::endl;
#endif
ASSERT(start_offset < end_offset);
int ret = E_OK;
// 1. load top measuremnt_index_node
const int32_t read_size = (int32_t)(end_offset - start_offset);
int32_t ret_read_len = 0;
char *data_buf = (char *)pa.alloc(read_size);
void *m_idx_node_buf = pa.alloc(sizeof(MetaIndexNode));
if (IS_NULL(data_buf) || IS_NULL(m_idx_node_buf)) {
return E_OOM;
}
auto *top_node_ptr = new (m_idx_node_buf) MetaIndexNode(&pa);
auto top_node = std::shared_ptr<MetaIndexNode>(top_node_ptr,
MetaIndexNode::self_deleter);
if (RET_FAIL(read_file_->read(start_offset, data_buf, read_size,
ret_read_len))) {
} else if (RET_FAIL(top_node->deserialize_from(data_buf, read_size))) {
}
#if DEBUG_SE
std::cout
<< "load_measurement_index_entry deserialize MetaIndexNode, top_node="
<< *top_node << " at file pos " << start_offset << " to " << end_offset
<< std::endl;
#endif
// 2. search from top_node in top-down way
if (IS_SUCC(ret)) {
get_all_leaf(top_node, ret_measurement_index_entry);
}
if (ret == E_NOT_EXIST) {
ret = E_MEASUREMENT_NOT_EXIST;
}
return ret;
}
int TsFileIOReader::read_device_meta_index(int32_t start_offset,
int32_t end_offset,
common::PageArena &pa,
MetaIndexNode *&device_meta_index,
bool leaf) {
int ret = E_OK;
ASSERT(start_offset < end_offset);
const int32_t read_size = (int32_t)(end_offset - start_offset);
int32_t ret_read_len = 0;
char *data_buf = (char *)pa.alloc(read_size);
void *m_idx_node_buf = pa.alloc(sizeof(MetaIndexNode));
if (IS_NULL(data_buf) || IS_NULL(m_idx_node_buf)) {
return E_OOM;
}
device_meta_index = new (m_idx_node_buf) MetaIndexNode(&pa);
if (RET_FAIL(read_file_->read(start_offset, data_buf, read_size,
ret_read_len))) {
}
if (!leaf) {
ret = device_meta_index->device_deserialize_from(data_buf, read_size);
} else {
ret = device_meta_index->deserialize_from(data_buf, read_size);
}
return ret;
}
int TsFileIOReader::get_timeseries_indexes(
std::shared_ptr<IDeviceID> device_id,
const std::unordered_set<std::string> &measurement_names,
std::vector<ITimeseriesIndex *> &timeseries_indexs, common::PageArena &pa) {
int ret = E_OK;
std::shared_ptr<IMetaIndexEntry> device_index_entry;
int64_t device_ie_end_offset = 0;
std::shared_ptr<IMetaIndexEntry> measurement_index_entry;
int64_t measurement_ie_end_offset = 0;
if (RET_FAIL(load_device_index_entry(
std::make_shared<DeviceIDComparable>(device_id), device_index_entry,
device_ie_end_offset))) {
return ret;
}
int start_offset = device_index_entry->get_offset(),
end_offset = device_ie_end_offset;
ASSERT(start_offset < end_offset);
const int32_t read_size = end_offset - start_offset;
int32_t ret_read_len = 0;
char *data_buf = (char *)pa.alloc(read_size);
void *m_idx_node_buf = pa.alloc(sizeof(MetaIndexNode));
if (IS_NULL(data_buf) || IS_NULL(m_idx_node_buf)) {
return E_OOM;
}
auto *top_node_ptr = new (m_idx_node_buf) MetaIndexNode(&pa);
auto top_node = std::shared_ptr<MetaIndexNode>(top_node_ptr,
MetaIndexNode::self_deleter);
if (RET_FAIL(read_file_->read(start_offset, data_buf, read_size,
ret_read_len))) {
return ret;
} else if (RET_FAIL(top_node->deserialize_from(data_buf, read_size))) {
return ret;
}
bool is_aligned = is_aligned_device(top_node);
TimeseriesIndex *timeseries_index = nullptr;
if (is_aligned) {
get_time_column_metadata(top_node, timeseries_index, pa);
}
int64_t idx = 0;
for (const auto &measurement_name : measurement_names) {
if (RET_FAIL(load_measurement_index_entry(measurement_name, top_node,
measurement_index_entry,
measurement_ie_end_offset))) {
} else if (RET_FAIL(do_load_timeseries_index(
measurement_name, measurement_index_entry->get_offset(),
measurement_ie_end_offset, pa, timeseries_indexs[idx],
is_aligned))) {
}
if (is_aligned) {
AlignedTimeseriesIndex *aligned_timeseries_index =
dynamic_cast<AlignedTimeseriesIndex *>(timeseries_indexs[idx]);
if (aligned_timeseries_index) {
aligned_timeseries_index->time_ts_idx_ = timeseries_index;
}
}
idx++;
}
return ret;
}
/*
* @target_name device_name or measurement_name
* @index_node leaf device node or leaf measurement node
*/
int TsFileIOReader::search_from_leaf_node(
std::shared_ptr<IComparable> target_name,
std::shared_ptr<MetaIndexNode> index_node,
std::shared_ptr<IMetaIndexEntry> &ret_index_entry,
int64_t &ret_end_offset) {
int ret = E_OK;
ret = index_node->binary_search_children(target_name, true, ret_index_entry,
ret_end_offset);
return ret;
}
int TsFileIOReader::search_from_internal_node(
std::shared_ptr<IComparable> target_name, bool is_device,
std::shared_ptr<MetaIndexNode> index_node,
std::shared_ptr<IMetaIndexEntry> &ret_index_entry,
int64_t &ret_end_offset) {
int ret = E_OK;
std::shared_ptr<IMetaIndexEntry> index_entry;
int64_t end_offset = 0;
ASSERT(index_node->node_type_ == INTERNAL_MEASUREMENT ||
index_node->node_type_ == INTERNAL_DEVICE);
if (RET_FAIL(index_node->binary_search_children(
target_name, /*exact=*/false, index_entry, end_offset))) {
return ret;
}
while (IS_SUCC(ret)) {
// reader next level index node
const int read_size = end_offset - index_entry->get_offset();
#if DEBUG_SE
std::cout << "search_from_internal_node, end_offset=" << end_offset
<< ", index_entry.offset_=" << index_entry.get_offset()
<< std::endl;
#endif
ASSERT(read_size > 0 && read_size < (1 << 30));
PageArena cur_level_index_node_pa;
void *buf = cur_level_index_node_pa.alloc(sizeof(MetaIndexNode));
char *data_buf = (char *)cur_level_index_node_pa.alloc(read_size);
if (IS_NULL(buf) || IS_NULL(data_buf)) {
return E_OOM;
}
MetaIndexNode *cur_level_index_node =
new (buf) MetaIndexNode(&cur_level_index_node_pa);
int32_t ret_read_len = 0;
if (RET_FAIL(read_file_->read(index_entry->get_offset(), data_buf,
read_size, ret_read_len))) {
} else if (read_size != ret_read_len) {
return E_TSFILE_CORRUPTED;
}
if (!is_device) {
ret = cur_level_index_node->deserialize_from(data_buf, read_size);
} else {
ret = cur_level_index_node->device_deserialize_from(data_buf,
read_size);
}
if (ret != E_OK) {
return ret;
}
if (cur_level_index_node->node_type_ == LEAF_DEVICE) {
ret = cur_level_index_node->binary_search_children(
target_name, /*exact=*/true, ret_index_entry, ret_end_offset);
cur_level_index_node->destroy();
return ret; //// FIXME
} else if (cur_level_index_node->node_type_ == LEAF_MEASUREMENT) {
ret = cur_level_index_node->binary_search_children(
target_name, /*exact=*/false, ret_index_entry, ret_end_offset);
cur_level_index_node->destroy();
return ret; //// FIXME
} else {
ret = cur_level_index_node->binary_search_children(
target_name, /*exact=*/false, index_entry, end_offset);
cur_level_index_node->destroy();
}
}
return ret;
}
bool TsFileIOReader::is_aligned_device(
std::shared_ptr<MetaIndexNode> measurement_node) {
auto entry = measurement_node->children_[0];
return entry->get_name().is_null() ||
entry->get_name().to_std_string() == "";
}
int TsFileIOReader::get_time_column_metadata(
std::shared_ptr<MetaIndexNode> measurement_node,
TimeseriesIndex *&ret_timeseries_index, PageArena &pa) {
int ret = E_OK;
if (!is_aligned_device(measurement_node)) {
return ret;
}
char *ti_buf = nullptr;
int start_idx = 0, end_idx = 0;
int ret_read_len = 0;
if (measurement_node->node_type_ == LEAF_MEASUREMENT) {
ByteStream buffer;
if (measurement_node->children_.size() > 1) {
start_idx = measurement_node->children_[0]->get_offset();
end_idx = measurement_node->children_[1]->get_offset();
ti_buf = pa.alloc(end_idx - start_idx);
if (RET_FAIL(read_file_->read(start_idx, ti_buf,
end_idx - start_idx, ret_read_len))) {
return ret;
}
} else {
start_idx = measurement_node->children_[0]->get_offset();
end_idx = measurement_node->end_offset_;
ti_buf = pa.alloc(end_idx - start_idx);
if (RET_FAIL(read_file_->read(start_idx, ti_buf,
end_idx - start_idx, ret_read_len))) {
return ret;
}
}
buffer.wrap_from(ti_buf, end_idx - start_idx);
void *buf = pa.alloc(sizeof(TimeseriesIndex));
if (IS_NULL(buf)) {
return E_OOM;
}
ret_timeseries_index = new (buf) TimeseriesIndex;
ret_timeseries_index->deserialize_from(buffer, &pa);
} else if (measurement_node->node_type_ == INTERNAL_MEASUREMENT) {
start_idx = measurement_node->children_[0]->get_offset();
end_idx = measurement_node->children_[1]->get_offset();
ti_buf = pa.alloc(end_idx - start_idx);
if (RET_FAIL(read_file_->read(start_idx, ti_buf, end_idx - start_idx,
ret_read_len))) {
return ret;
}
std::shared_ptr<MetaIndexNode> meta_index_node =
std::make_shared<MetaIndexNode>(&pa);
meta_index_node->deserialize_from(ti_buf, end_idx - start_idx);
return get_time_column_metadata(meta_index_node, ret_timeseries_index,
pa);
}
return ret;
}
int TsFileIOReader::do_load_timeseries_index(
const std::string &measurement_name_str, int64_t start_offset,
int64_t end_offset, PageArena &in_timeseries_index_pa,
ITimeseriesIndex *&ret_timeseries_index, bool is_aligned) {
ASSERT(end_offset > start_offset);
int ret = E_OK;
int32_t read_size = (int32_t)(end_offset - start_offset);
int32_t ret_read_len = 0;
char *ti_buf = (char *)mem_alloc(read_size, MOD_TSFILE_READER);
if (IS_NULL(ti_buf)) {
return E_OOM;
}
if (RET_FAIL(
read_file_->read(start_offset, ti_buf, read_size, ret_read_len))) {
} else {
ByteStream bs;
bs.wrap_from(ti_buf, read_size);
const String target_measurement_name(
(char *)measurement_name_str.c_str(),
strlen(measurement_name_str.c_str()));
bool found = false;
#if DEBUG_SE
std::cout << "do_load_timeseries_index, reader file at " << start_offset
<< " to " << end_offset << std::endl;
#endif
while (IS_SUCC(ret)) {
TimeseriesIndex cur_timeseries_index;
PageArena cur_timeseries_index_pa;
cur_timeseries_index_pa.init(512, MOD_TSFILE_READER); // TODO 512
if (RET_FAIL(cur_timeseries_index.deserialize_from(
bs, &cur_timeseries_index_pa))) {
} else if (is_aligned &&
cur_timeseries_index.get_measurement_name().equal_to(
target_measurement_name)) {
void *buf = in_timeseries_index_pa.alloc(
sizeof(AlignedTimeseriesIndex));
if (IS_NULL(buf)) {
return E_OOM;
}
AlignedTimeseriesIndex *aligned_ts_idx =
new (buf) AlignedTimeseriesIndex;
buf = in_timeseries_index_pa.alloc(sizeof(TimeseriesIndex));
if (IS_NULL(buf)) {
return E_OOM;
}
aligned_ts_idx->value_ts_idx_ = new (buf) TimeseriesIndex;
aligned_ts_idx->value_ts_idx_->clone_from(
cur_timeseries_index, &in_timeseries_index_pa);
ret_timeseries_index = aligned_ts_idx;
found = true;
break;
} else if (!is_aligned &&
cur_timeseries_index.get_measurement_name().equal_to(
target_measurement_name)) {
void *buf =
in_timeseries_index_pa.alloc(sizeof(TimeseriesIndex));
auto ts_idx = new (buf) TimeseriesIndex;
ts_idx->clone_from(cur_timeseries_index,
&in_timeseries_index_pa);
ret_timeseries_index = ts_idx;
found = true;
break;
}
} // end while
if (!found) {
ret = E_NOT_EXIST;
}
}
mem_free(ti_buf);
return ret;
}
int TsFileIOReader::do_load_all_timeseries_index(
std::vector<std::pair<std::shared_ptr<IMetaIndexEntry>, int64_t> >
&index_node_entry_list,
common::PageArena &in_timeseries_index_pa,
std::vector<ITimeseriesIndex *> &ts_indexs) {
int ret = E_OK;
for (const auto &index_node_entry : index_node_entry_list) {
int64_t start_offset = index_node_entry.first->get_offset(),
end_offset = index_node_entry.second;
int32_t read_size = (int32_t)(end_offset - start_offset);
int32_t ret_read_len = 0;
char *ti_buf = in_timeseries_index_pa.alloc(read_size);
if (IS_NULL(ti_buf)) {
return E_OOM;
}
if (RET_FAIL(read_file_->read(start_offset, ti_buf, read_size,
ret_read_len))) {
return ret;
}
ByteStream bs;
bs.wrap_from(ti_buf, read_size);
while (bs.has_remaining()) {
void *buf = in_timeseries_index_pa.alloc(sizeof(TimeseriesIndex));
auto ts_idx = new (buf) TimeseriesIndex;
if (RET_FAIL(
ts_idx->deserialize_from(bs, &in_timeseries_index_pa))) {
return ret;
}
if (ts_idx->get_measurement_name().len_ == 0) continue;
ts_indexs.push_back(ts_idx);
}
}
return ret;
}
int TsFileIOReader::get_all_leaf(
std::shared_ptr<MetaIndexNode> index_node,
std::vector<std::pair<std::shared_ptr<IMetaIndexEntry>, int64_t> >
&index_node_entry_list) {
int ret = E_OK;
if (index_node->node_type_ == LEAF_MEASUREMENT ||
index_node->node_type_ == LEAF_DEVICE) {
for (size_t i = 0; i < index_node->children_.size(); i++) {
if (i + 1 < index_node->children_.size()) {
index_node_entry_list.push_back(
std::make_pair(index_node->children_[i],
index_node->children_[i + 1]->get_offset()));
} else {
index_node_entry_list.push_back(std::make_pair(
index_node->children_[i], index_node->end_offset_));
}
}
} else {
// read next level index node
for (size_t i = 0; i < index_node->children_.size(); i++) {
int64_t end_offset = index_node->end_offset_;
if (i + 1 < index_node->children_.size()) {
end_offset = index_node->children_[i + 1]->get_offset();
}
const int read_size =
end_offset - index_node->children_[i]->get_offset();
#if DEBUG_SE
std::cout << "search_from_internal_node, end_offset=" << end_offset
<< ", index_entry.offset_="
<< index_node->children_[i]->get_offset() << std::endl;
#endif
ASSERT(read_size > 0 && read_size < (1 << 30));
PageArena cur_level_index_node_pa;
void *buf = cur_level_index_node_pa.alloc(sizeof(MetaIndexNode));
char *data_buf = (char *)cur_level_index_node_pa.alloc(read_size);
if (IS_NULL(buf) || IS_NULL(data_buf)) {
return E_OOM;
}
auto *cur_level_index_node_ptr =
new (buf) MetaIndexNode(&cur_level_index_node_pa);
auto cur_level_index_node = std::shared_ptr<MetaIndexNode>(
cur_level_index_node_ptr, MetaIndexNode::self_deleter);
int32_t ret_read_len = 0;
if (RET_FAIL(
read_file_->read(index_node->children_[i]->get_offset(),
data_buf, read_size, ret_read_len))) {
} else if (read_size != ret_read_len) {
ret = E_TSFILE_CORRUPTED;
} else if (RET_FAIL(cur_level_index_node->deserialize_from(
data_buf, read_size))) {
} else {
ret = get_all_leaf(cur_level_index_node, index_node_entry_list);
}
}
}
return ret;
}
#if 0
int TsFileIOReader::get_next(const std::string &device_path,
const std::string &measurement_name,
TsBlock *ret_tsblock,
TimeRange &ret_time_range)
{
int ret = E_OK;
if (RET_FAIL(load_timeseries_index_if_necessary(device_path, measurement_name))) {
return ret;
}
return get_next_page(ret_tsblock);
}
int TsFileIOReader::get_next_page(TsBlock *ret_tsblock)
{
int ret = E_OK;
if (!chunk_reader_.has_more_data()) {
// has finished reading current chunk
if (has_next_chunk()) {
cursor_to_next_chunk();
ChunkMeta *next_chunk_meta = chunk_meta_cursor_.get();
if (RET_FAIL(init_next_chunk_reader(next_chunk_meta))) {
}
} else {
// has finished reading all chunks of this tsfile
ret = E_NO_MORE_DATA;
}
} // end if (!chunk_reader_.has_more_data())
if (IS_SUCC(ret)) {
ret = chunk_reader_.get_next_page(ret_tsblock);
}
return ret;
}
int TsFileIOReader::init_first_chunk_reader(ChunkMeta *cm,
ReadFile *read_file,
const ColumnDesc &col_desc)
{
ASSERT(!chunk_reader_.has_more_data());
int ret = E_OK;
if (RET_FAIL(chunk_reader_.init(read_file,
timeseries_index_.get_measurement_name(),
col_desc.type_,
col_desc.encoding_))) {
} else if (RET_FAIL(chunk_reader_.load_by_meta(cm))) {
}
return ret;
}
int TsFileIOReader::init_next_chunk_reader(ChunkMeta *cm)
{
ASSERT(!chunk_reader_.has_more_data());
chunk_reader_.reset();
return chunk_reader_.load_by_meta(cm);
}
int TsFileIOReader::load_timeseries_index_if_necessary(const std::string &device_path,
const std::string &measurement_name)
{
int ret = E_OK;
if (col_desc.ts_id_ != timeseries_index_.get_ts_id()) {
if (RET_FAIL(load_timeseries_index(col_desc))) {
//log_err("load timeseries_index error, ret=%d", ret);
} else {
chunk_meta_cursor_ = timeseries_index_.get_chunk_meta_list()->begin();
ChunkMeta *next_chunk_meta = chunk_meta_cursor_.get();
if (RET_FAIL(init_first_chunk_reader(next_chunk_meta, read_file_, col_desc))) {
} else {
cursor_to_next_chunk();
}
}
} else {
// timeseries_index_ is ready
}
return ret;
}
// TODO add a result cache for load_timeseries_index
int TsFileIOReader::load_timeseries_index(const ColumnDesc &col_desc)
{
int ret = E_OK;
if (RET_FAIL(load_tsfile_meta_if_necessary())) {
return ret;
}
MetaIndexEntry device_index_entry;
int64_t device_ie_end_offset = 0;
MetaIndexEntry measurement_index_entry;
int64_t measurement_ie_end_offset = 0;
if (RET_FAIL(load_device_index_entry(col_desc, device_index_entry, device_ie_end_offset))) {
} else if (RET_FAIL(load_measurement_index_entry(col_desc, device_index_entry.offset_,
device_ie_end_offset, measurement_index_entry,
measurement_ie_end_offset))) {
} else if (RET_FAIL(do_load_timeseries_index(col_desc, measurement_index_entry.offset_,
measurement_ie_end_offset))) {
} else {
#if STORAGE_ENGIEN_DEBUG
std::cout << "load timeseries index: " << timeseries_index_ << std::endl;
#endif
}
return ret;
}
#endif
} // end namespace storage