| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * License); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include "file/tsfile_io_reader.h" |
| |
| #include "common/allocator/alloc_base.h" |
| |
| using namespace common; |
| |
| namespace storage { |
| int TsFileIOReader::init(const std::string &file_path) { |
| int ret = E_OK; |
| read_file_ = new ReadFile; |
| read_file_created_ = true; |
| if (RET_FAIL(read_file_->open(file_path))) { |
| } |
| return ret; |
| } |
| |
| int TsFileIOReader::init(ReadFile *read_file) { |
| if (IS_NULL(read_file)) { |
| ASSERT(false); |
| return E_INVALID_ARG; |
| } |
| read_file_created_ = false; |
| read_file_ = read_file; |
| return E_OK; |
| } |
| |
| void TsFileIOReader::reset() { |
| if (read_file_ != nullptr) { |
| if (read_file_created_) { |
| read_file_->destroy(); |
| delete read_file_; |
| } |
| read_file_ = nullptr; |
| tsfile_meta_page_arena_.destroy(); |
| tsfile_meta_ready_ = false; |
| } |
| } |
| |
| int TsFileIOReader::alloc_ssi(std::shared_ptr<IDeviceID> device_id, |
| const std::string &measurement_name, |
| TsFileSeriesScanIterator *&ssi, |
| common::PageArena &pa, Filter *time_filter) { |
| int ret = E_OK; |
| if (RET_FAIL(load_tsfile_meta_if_necessary())) { |
| } else { |
| ssi = new TsFileSeriesScanIterator; |
| ssi->init(device_id, measurement_name, read_file_, time_filter, pa); |
| if (RET_FAIL(load_timeseries_index_for_ssi(device_id, measurement_name, |
| ssi))) { |
| } else if (time_filter != nullptr && |
| !filter_stasify(ssi->itimeseries_index_, time_filter)) { |
| ret = E_NO_MORE_DATA; |
| } else if (RET_FAIL(ssi->init_chunk_reader())) { |
| } |
| if (ret != E_OK) { |
| ssi->destroy(); |
| delete ssi; |
| ssi = nullptr; |
| } |
| } |
| return ret; |
| } |
| |
| void TsFileIOReader::revert_ssi(TsFileSeriesScanIterator *ssi) { |
| if (ssi != nullptr) { |
| ssi->destroy(); |
| delete ssi; |
| } |
| } |
| |
| int TsFileIOReader::get_device_timeseries_meta_without_chunk_meta( |
| std::shared_ptr<IDeviceID> device_id, |
| std::vector<ITimeseriesIndex *> ×eries_indexs, PageArena &pa) { |
| int ret = E_OK; |
| load_tsfile_meta_if_necessary(); |
| std::shared_ptr<IMetaIndexEntry> meta_index_entry; |
| int64_t end_offset; |
| std::vector<std::pair<std::shared_ptr<IMetaIndexEntry>, int64_t> > |
| meta_index_entry_list; |
| if (RET_FAIL(load_device_index_entry( |
| std::make_shared<DeviceIDComparable>(device_id), meta_index_entry, |
| end_offset))) { |
| } else if (RET_FAIL(load_all_measurement_index_entry( |
| meta_index_entry->get_offset(), end_offset, pa, |
| meta_index_entry_list))) { |
| } else if (RET_FAIL(do_load_all_timeseries_index(meta_index_entry_list, pa, |
| timeseries_indexs))) { |
| } |
| return ret; |
| } |
| |
| bool TsFileIOReader::filter_stasify(ITimeseriesIndex *ts_index, |
| Filter *time_filter) { |
| ASSERT(ts_index->get_statistic() != nullptr); |
| return time_filter->satisfy(ts_index->get_statistic()); |
| } |
| |
| int TsFileIOReader::load_tsfile_meta_if_necessary() { |
| int ret = E_OK; |
| if (!tsfile_meta_ready_) { |
| if (RET_FAIL(load_tsfile_meta())) { |
| // log_err("load_tsfile_meta error, ret=%d", ret); |
| return ret; |
| } else { |
| tsfile_meta_ready_ = true; |
| } |
| } |
| return ret; |
| } |
| |
| int TsFileIOReader::load_tsfile_meta() { |
| const int32_t TSFILE_READ_IO_SIZE = 1024; // TODO make it configurable |
| const int32_t TAIL_MAGIC_AND_META_SIZE_SIZE = |
| 10; // magic(6B) + meta_size(4B) |
| ASSERT(file_size() > 0); // > 13 |
| |
| int ret = E_OK; |
| uint32_t tsfile_meta_size = 0; |
| int32_t read_offset = 0; |
| int32_t ret_read_len = 0; |
| |
| // Step 1: reader the tsfile_meta_size |
| // 1.1 prepare reader buffer |
| int32_t alloc_size = UTIL_MIN(TSFILE_READ_IO_SIZE, file_size()); |
| char *read_buf = (char *)mem_alloc(alloc_size, MOD_TSFILE_READER); |
| if (IS_NULL(read_buf)) { |
| return E_OOM; |
| } |
| // 1.2 reader data from file |
| read_offset = file_size() - alloc_size; |
| ret_read_len = 0; |
| if (RET_FAIL(read_file_->read(read_offset, read_buf, alloc_size, |
| ret_read_len))) { |
| } else if (ret_read_len != alloc_size) { |
| ret = E_FILE_READ_ERR; |
| // log_err("do not reader enough data from tsfile, want-size=%d, |
| // reader-size=%d, file=%s", alloc_size, ret_read_len, |
| // get_file_path().c_str()); |
| } |
| // 1.3 deserialize tsfile_meta_size |
| if (IS_SUCC(ret)) { |
| // deserialize tsfile_meta_size |
| char *size_buf = read_buf + alloc_size - TAIL_MAGIC_AND_META_SIZE_SIZE; |
| tsfile_meta_size = SerializationUtil::read_ui32(size_buf); |
| ASSERT(tsfile_meta_size > 0 && tsfile_meta_size <= (1ll << 20)); |
| } |
| |
| // Step 2: reader TsFileMeta |
| if (IS_SUCC(ret)) { |
| // 2.1 prepare enough buffer (use the previous buffer if can). |
| char *tsfile_meta_buf = nullptr; |
| if (tsfile_meta_size + TAIL_MAGIC_AND_META_SIZE_SIZE > |
| (uint32_t)alloc_size) { |
| // prepare buffer to re-reader from start of tsfile_meta |
| char *old_read_buf = read_buf; |
| read_buf = (char *)mem_realloc(read_buf, tsfile_meta_size); |
| if (IS_NULL(read_buf)) { |
| read_buf = old_read_buf; |
| ret = E_OOM; |
| } else if (RET_FAIL(read_file_->read( |
| file_size() - tsfile_meta_size - |
| TAIL_MAGIC_AND_META_SIZE_SIZE, |
| read_buf, tsfile_meta_size, ret_read_len))) { |
| } else if (tsfile_meta_size != (uint32_t)ret_read_len) { |
| ret = E_FILE_READ_ERR; |
| // log_err("do not reader enough data from tsfile, want-size=%d, |
| // reader-size=%d, file=%s", tsfile_meta_size, ret_read_len, |
| // get_file_path().c_str()); |
| } else { |
| tsfile_meta_buf = read_buf; |
| } |
| } else { |
| // the previous buffer has contained the TsFileMeta data |
| tsfile_meta_buf = read_buf + alloc_size - tsfile_meta_size - |
| TAIL_MAGIC_AND_META_SIZE_SIZE; |
| // DEBUG_hex_dump_buf("tsfile_meta_buf=", tsfile_meta_buf, |
| // tsfile_meta_size); |
| } |
| if (IS_SUCC(ret)) { |
| ByteStream tsfile_meta_bs; |
| tsfile_meta_bs.wrap_from(tsfile_meta_buf, tsfile_meta_size); |
| if (RET_FAIL(tsfile_meta_.deserialize_from(tsfile_meta_bs))) { |
| } |
| #if DEBUG_SE |
| std::cout << "load tsfile_meta, ret=" << ret |
| << ", tsfile_meta_=" << tsfile_meta_ << std::endl; |
| #endif |
| } |
| } |
| mem_free(read_buf); |
| return ret; |
| } |
| |
| int TsFileIOReader::load_timeseries_index_for_ssi( |
| std::shared_ptr<IDeviceID> device_id, const std::string &measurement_name, |
| TsFileSeriesScanIterator *&ssi) { |
| int ret = E_OK; |
| std::shared_ptr<IMetaIndexEntry> device_index_entry; |
| int64_t device_ie_end_offset = 0; |
| std::shared_ptr<IMetaIndexEntry> measurement_index_entry; |
| int64_t measurement_ie_end_offset = 0; |
| // bool is_aligned = false; |
| if (RET_FAIL(load_device_index_entry( |
| std::make_shared<DeviceIDComparable>(device_id), device_index_entry, |
| device_ie_end_offset))) { |
| return ret; |
| } |
| auto &pa = ssi->timeseries_index_pa_; |
| |
| int start_offset = device_index_entry->get_offset(), |
| end_offset = device_ie_end_offset; |
| ASSERT(start_offset < end_offset); |
| const int32_t read_size = end_offset - start_offset; |
| int32_t ret_read_len = 0; |
| char *data_buf = (char *)pa.alloc(read_size); |
| void *m_idx_node_buf = pa.alloc(sizeof(MetaIndexNode)); |
| if (IS_NULL(data_buf) || IS_NULL(m_idx_node_buf)) { |
| return E_OOM; |
| } |
| auto *top_node_ptr = new (m_idx_node_buf) MetaIndexNode(&pa); |
| auto top_node = std::shared_ptr<MetaIndexNode>(top_node_ptr, |
| MetaIndexNode::self_deleter); |
| |
| if (RET_FAIL(read_file_->read(start_offset, data_buf, read_size, |
| ret_read_len))) { |
| return ret; |
| } else if (RET_FAIL(top_node->deserialize_from(data_buf, read_size))) { |
| return ret; |
| } |
| |
| bool is_aligned = is_aligned_device(top_node); |
| TimeseriesIndex *timeseries_index = nullptr; |
| if (is_aligned) { |
| if (RET_FAIL( |
| get_time_column_metadata(top_node, timeseries_index, pa))) { |
| return ret; |
| } |
| } |
| |
| if (RET_FAIL(load_measurement_index_entry(measurement_name, top_node, |
| measurement_index_entry, |
| measurement_ie_end_offset))) { |
| return ret; |
| } else if (RET_FAIL(do_load_timeseries_index( |
| measurement_name, measurement_index_entry->get_offset(), |
| measurement_ie_end_offset, ssi->timeseries_index_pa_, |
| ssi->itimeseries_index_, is_aligned))) { |
| return ret; |
| } |
| if (is_aligned) { |
| auto *aligned_timeseries_index = |
| dynamic_cast<AlignedTimeseriesIndex *>(ssi->itimeseries_index_); |
| if (aligned_timeseries_index) { |
| aligned_timeseries_index->time_ts_idx_ = timeseries_index; |
| } |
| } |
| |
| #if DEBUG_SE |
| if (measurement_index_entry.name_.len_) { |
| std::cout << "load timeseries index: " |
| << *((TimeseriesIndex *)ssi->itimeseries_index_) << std::endl; |
| } else { |
| std::cout << "load aligned timeseries index: " |
| << *((AlignedTimeseriesIndex *)ssi->itimeseries_index_) |
| << std::endl; |
| } |
| #endif |
| return ret; |
| } |
| |
| int TsFileIOReader::load_device_index_entry( |
| std::shared_ptr<IComparable> device_name, |
| std::shared_ptr<IMetaIndexEntry> &device_index_entry, int64_t &end_offset) { |
| int ret = E_OK; |
| std::shared_ptr<DeviceIDComparable> device_id_comparable = |
| std::dynamic_pointer_cast<DeviceIDComparable>(device_name); |
| if (device_id_comparable == nullptr) { |
| return E_INVALID_DATA_POINT; |
| } |
| auto index_node = tsfile_meta_.table_metadata_index_node_map_ |
| [device_id_comparable->device_id_->get_table_name()]; |
| assert(tsfile_meta_.table_metadata_index_node_map_.find( |
| device_id_comparable->device_id_->get_table_name()) != |
| tsfile_meta_.table_metadata_index_node_map_.end()); |
| assert(index_node != nullptr); |
| if (index_node->node_type_ == LEAF_DEVICE) { |
| // FIXME |
| ret = index_node->binary_search_children( |
| device_name, true, device_index_entry, end_offset); |
| } else { |
| ret = search_from_internal_node(device_name, true, index_node, |
| device_index_entry, end_offset); |
| } |
| if (ret == E_NOT_EXIST) { |
| ret = E_DEVICE_NOT_EXIST; |
| } |
| #if DEBUG_SE |
| std::cout << "load_device_index_entry, device_index_entry={" |
| << device_index_entry << "}, end_offset=" << end_offset |
| << std::endl; |
| #endif |
| return ret; |
| } |
| |
| int TsFileIOReader::load_measurement_index_entry( |
| const std::string &measurement_name_str, |
| std::shared_ptr<MetaIndexNode> top_node, |
| std::shared_ptr<IMetaIndexEntry> &ret_measurement_index_entry, |
| int64_t &ret_end_offset) { |
| int ret = E_OK; |
| // search from top_node in top-down way |
| auto measurement_name = |
| std::make_shared<StringComparable>(measurement_name_str); |
| if (top_node->node_type_ == LEAF_MEASUREMENT) { |
| ret = top_node->binary_search_children( |
| measurement_name, /*exact*/ false, ret_measurement_index_entry, |
| ret_end_offset); |
| } else { |
| ret = search_from_internal_node(measurement_name, false, top_node, |
| ret_measurement_index_entry, |
| ret_end_offset); |
| } |
| if (ret == E_NOT_EXIST) { |
| ret = E_MEASUREMENT_NOT_EXIST; |
| } |
| return ret; |
| } |
| |
| int TsFileIOReader::load_all_measurement_index_entry( |
| int64_t start_offset, int64_t end_offset, common::PageArena &pa, |
| std::vector<std::pair<std::shared_ptr<IMetaIndexEntry>, int64_t> > |
| &ret_measurement_index_entry) { |
| #if DEBUG_SE |
| std::cout << "load_measurement_index_entry: measurement_name_str= " |
| << ", start_offset=" << start_offset |
| << ", end_offset=" << end_offset << std::endl; |
| #endif |
| ASSERT(start_offset < end_offset); |
| int ret = E_OK; |
| // 1. load top measuremnt_index_node |
| const int32_t read_size = (int32_t)(end_offset - start_offset); |
| int32_t ret_read_len = 0; |
| char *data_buf = (char *)pa.alloc(read_size); |
| void *m_idx_node_buf = pa.alloc(sizeof(MetaIndexNode)); |
| if (IS_NULL(data_buf) || IS_NULL(m_idx_node_buf)) { |
| return E_OOM; |
| } |
| auto *top_node_ptr = new (m_idx_node_buf) MetaIndexNode(&pa); |
| auto top_node = std::shared_ptr<MetaIndexNode>(top_node_ptr, |
| MetaIndexNode::self_deleter); |
| if (RET_FAIL(read_file_->read(start_offset, data_buf, read_size, |
| ret_read_len))) { |
| } else if (RET_FAIL(top_node->deserialize_from(data_buf, read_size))) { |
| } |
| #if DEBUG_SE |
| std::cout |
| << "load_measurement_index_entry deserialize MetaIndexNode, top_node=" |
| << *top_node << " at file pos " << start_offset << " to " << end_offset |
| << std::endl; |
| #endif |
| // 2. search from top_node in top-down way |
| if (IS_SUCC(ret)) { |
| get_all_leaf(top_node, ret_measurement_index_entry); |
| } |
| if (ret == E_NOT_EXIST) { |
| ret = E_MEASUREMENT_NOT_EXIST; |
| } |
| return ret; |
| } |
| |
| int TsFileIOReader::read_device_meta_index(int32_t start_offset, |
| int32_t end_offset, |
| common::PageArena &pa, |
| MetaIndexNode *&device_meta_index, |
| bool leaf) { |
| int ret = E_OK; |
| ASSERT(start_offset < end_offset); |
| const int32_t read_size = (int32_t)(end_offset - start_offset); |
| int32_t ret_read_len = 0; |
| char *data_buf = (char *)pa.alloc(read_size); |
| void *m_idx_node_buf = pa.alloc(sizeof(MetaIndexNode)); |
| if (IS_NULL(data_buf) || IS_NULL(m_idx_node_buf)) { |
| return E_OOM; |
| } |
| device_meta_index = new (m_idx_node_buf) MetaIndexNode(&pa); |
| if (RET_FAIL(read_file_->read(start_offset, data_buf, read_size, |
| ret_read_len))) { |
| } |
| if (!leaf) { |
| ret = device_meta_index->device_deserialize_from(data_buf, read_size); |
| } else { |
| ret = device_meta_index->deserialize_from(data_buf, read_size); |
| } |
| return ret; |
| } |
| |
| int TsFileIOReader::get_timeseries_indexes( |
| std::shared_ptr<IDeviceID> device_id, |
| const std::unordered_set<std::string> &measurement_names, |
| std::vector<ITimeseriesIndex *> ×eries_indexs, common::PageArena &pa) { |
| int ret = E_OK; |
| std::shared_ptr<IMetaIndexEntry> device_index_entry; |
| int64_t device_ie_end_offset = 0; |
| std::shared_ptr<IMetaIndexEntry> measurement_index_entry; |
| int64_t measurement_ie_end_offset = 0; |
| if (RET_FAIL(load_device_index_entry( |
| std::make_shared<DeviceIDComparable>(device_id), device_index_entry, |
| device_ie_end_offset))) { |
| return ret; |
| } |
| |
| int start_offset = device_index_entry->get_offset(), |
| end_offset = device_ie_end_offset; |
| ASSERT(start_offset < end_offset); |
| const int32_t read_size = end_offset - start_offset; |
| int32_t ret_read_len = 0; |
| char *data_buf = (char *)pa.alloc(read_size); |
| void *m_idx_node_buf = pa.alloc(sizeof(MetaIndexNode)); |
| if (IS_NULL(data_buf) || IS_NULL(m_idx_node_buf)) { |
| return E_OOM; |
| } |
| auto *top_node_ptr = new (m_idx_node_buf) MetaIndexNode(&pa); |
| auto top_node = std::shared_ptr<MetaIndexNode>(top_node_ptr, |
| MetaIndexNode::self_deleter); |
| |
| if (RET_FAIL(read_file_->read(start_offset, data_buf, read_size, |
| ret_read_len))) { |
| return ret; |
| } else if (RET_FAIL(top_node->deserialize_from(data_buf, read_size))) { |
| return ret; |
| } |
| |
| bool is_aligned = is_aligned_device(top_node); |
| TimeseriesIndex *timeseries_index = nullptr; |
| if (is_aligned) { |
| get_time_column_metadata(top_node, timeseries_index, pa); |
| } |
| |
| int64_t idx = 0; |
| for (const auto &measurement_name : measurement_names) { |
| if (RET_FAIL(load_measurement_index_entry(measurement_name, top_node, |
| measurement_index_entry, |
| measurement_ie_end_offset))) { |
| } else if (RET_FAIL(do_load_timeseries_index( |
| measurement_name, measurement_index_entry->get_offset(), |
| measurement_ie_end_offset, pa, timeseries_indexs[idx], |
| is_aligned))) { |
| } |
| if (is_aligned) { |
| AlignedTimeseriesIndex *aligned_timeseries_index = |
| dynamic_cast<AlignedTimeseriesIndex *>(timeseries_indexs[idx]); |
| if (aligned_timeseries_index) { |
| aligned_timeseries_index->time_ts_idx_ = timeseries_index; |
| } |
| } |
| |
| idx++; |
| } |
| return ret; |
| } |
| |
| /* |
| * @target_name device_name or measurement_name |
| * @index_node leaf device node or leaf measurement node |
| */ |
| int TsFileIOReader::search_from_leaf_node( |
| std::shared_ptr<IComparable> target_name, |
| std::shared_ptr<MetaIndexNode> index_node, |
| std::shared_ptr<IMetaIndexEntry> &ret_index_entry, |
| int64_t &ret_end_offset) { |
| int ret = E_OK; |
| ret = index_node->binary_search_children(target_name, true, ret_index_entry, |
| ret_end_offset); |
| return ret; |
| } |
| |
| int TsFileIOReader::search_from_internal_node( |
| std::shared_ptr<IComparable> target_name, bool is_device, |
| std::shared_ptr<MetaIndexNode> index_node, |
| std::shared_ptr<IMetaIndexEntry> &ret_index_entry, |
| int64_t &ret_end_offset) { |
| int ret = E_OK; |
| std::shared_ptr<IMetaIndexEntry> index_entry; |
| int64_t end_offset = 0; |
| |
| ASSERT(index_node->node_type_ == INTERNAL_MEASUREMENT || |
| index_node->node_type_ == INTERNAL_DEVICE); |
| if (RET_FAIL(index_node->binary_search_children( |
| target_name, /*exact=*/false, index_entry, end_offset))) { |
| return ret; |
| } |
| |
| while (IS_SUCC(ret)) { |
| // reader next level index node |
| const int read_size = end_offset - index_entry->get_offset(); |
| #if DEBUG_SE |
| std::cout << "search_from_internal_node, end_offset=" << end_offset |
| << ", index_entry.offset_=" << index_entry.get_offset() |
| << std::endl; |
| #endif |
| ASSERT(read_size > 0 && read_size < (1 << 30)); |
| PageArena cur_level_index_node_pa; |
| void *buf = cur_level_index_node_pa.alloc(sizeof(MetaIndexNode)); |
| char *data_buf = (char *)cur_level_index_node_pa.alloc(read_size); |
| if (IS_NULL(buf) || IS_NULL(data_buf)) { |
| return E_OOM; |
| } |
| MetaIndexNode *cur_level_index_node = |
| new (buf) MetaIndexNode(&cur_level_index_node_pa); |
| int32_t ret_read_len = 0; |
| if (RET_FAIL(read_file_->read(index_entry->get_offset(), data_buf, |
| read_size, ret_read_len))) { |
| } else if (read_size != ret_read_len) { |
| return E_TSFILE_CORRUPTED; |
| } |
| if (!is_device) { |
| ret = cur_level_index_node->deserialize_from(data_buf, read_size); |
| } else { |
| ret = cur_level_index_node->device_deserialize_from(data_buf, |
| read_size); |
| } |
| if (ret != E_OK) { |
| return ret; |
| } |
| if (cur_level_index_node->node_type_ == LEAF_DEVICE) { |
| ret = cur_level_index_node->binary_search_children( |
| target_name, /*exact=*/true, ret_index_entry, ret_end_offset); |
| cur_level_index_node->destroy(); |
| return ret; //// FIXME |
| } else if (cur_level_index_node->node_type_ == LEAF_MEASUREMENT) { |
| ret = cur_level_index_node->binary_search_children( |
| target_name, /*exact=*/false, ret_index_entry, ret_end_offset); |
| cur_level_index_node->destroy(); |
| return ret; //// FIXME |
| } else { |
| ret = cur_level_index_node->binary_search_children( |
| target_name, /*exact=*/false, index_entry, end_offset); |
| cur_level_index_node->destroy(); |
| } |
| } |
| return ret; |
| } |
| |
| bool TsFileIOReader::is_aligned_device( |
| std::shared_ptr<MetaIndexNode> measurement_node) { |
| auto entry = measurement_node->children_[0]; |
| return entry->get_name().is_null() || |
| entry->get_name().to_std_string() == ""; |
| } |
| |
| int TsFileIOReader::get_time_column_metadata( |
| std::shared_ptr<MetaIndexNode> measurement_node, |
| TimeseriesIndex *&ret_timeseries_index, PageArena &pa) { |
| int ret = E_OK; |
| if (!is_aligned_device(measurement_node)) { |
| return ret; |
| } |
| char *ti_buf = nullptr; |
| int start_idx = 0, end_idx = 0; |
| int ret_read_len = 0; |
| if (measurement_node->node_type_ == LEAF_MEASUREMENT) { |
| ByteStream buffer; |
| if (measurement_node->children_.size() > 1) { |
| start_idx = measurement_node->children_[0]->get_offset(); |
| end_idx = measurement_node->children_[1]->get_offset(); |
| ti_buf = pa.alloc(end_idx - start_idx); |
| if (RET_FAIL(read_file_->read(start_idx, ti_buf, |
| end_idx - start_idx, ret_read_len))) { |
| return ret; |
| } |
| } else { |
| start_idx = measurement_node->children_[0]->get_offset(); |
| end_idx = measurement_node->end_offset_; |
| ti_buf = pa.alloc(end_idx - start_idx); |
| if (RET_FAIL(read_file_->read(start_idx, ti_buf, |
| end_idx - start_idx, ret_read_len))) { |
| return ret; |
| } |
| } |
| buffer.wrap_from(ti_buf, end_idx - start_idx); |
| void *buf = pa.alloc(sizeof(TimeseriesIndex)); |
| if (IS_NULL(buf)) { |
| return E_OOM; |
| } |
| ret_timeseries_index = new (buf) TimeseriesIndex; |
| ret_timeseries_index->deserialize_from(buffer, &pa); |
| } else if (measurement_node->node_type_ == INTERNAL_MEASUREMENT) { |
| start_idx = measurement_node->children_[0]->get_offset(); |
| end_idx = measurement_node->children_[1]->get_offset(); |
| ti_buf = pa.alloc(end_idx - start_idx); |
| if (RET_FAIL(read_file_->read(start_idx, ti_buf, end_idx - start_idx, |
| ret_read_len))) { |
| return ret; |
| } |
| std::shared_ptr<MetaIndexNode> meta_index_node = |
| std::make_shared<MetaIndexNode>(&pa); |
| meta_index_node->deserialize_from(ti_buf, end_idx - start_idx); |
| return get_time_column_metadata(meta_index_node, ret_timeseries_index, |
| pa); |
| } |
| return ret; |
| } |
| |
| int TsFileIOReader::do_load_timeseries_index( |
| const std::string &measurement_name_str, int64_t start_offset, |
| int64_t end_offset, PageArena &in_timeseries_index_pa, |
| ITimeseriesIndex *&ret_timeseries_index, bool is_aligned) { |
| ASSERT(end_offset > start_offset); |
| int ret = E_OK; |
| int32_t read_size = (int32_t)(end_offset - start_offset); |
| int32_t ret_read_len = 0; |
| char *ti_buf = (char *)mem_alloc(read_size, MOD_TSFILE_READER); |
| if (IS_NULL(ti_buf)) { |
| return E_OOM; |
| } |
| if (RET_FAIL( |
| read_file_->read(start_offset, ti_buf, read_size, ret_read_len))) { |
| } else { |
| ByteStream bs; |
| bs.wrap_from(ti_buf, read_size); |
| const String target_measurement_name( |
| (char *)measurement_name_str.c_str(), |
| strlen(measurement_name_str.c_str())); |
| bool found = false; |
| #if DEBUG_SE |
| std::cout << "do_load_timeseries_index, reader file at " << start_offset |
| << " to " << end_offset << std::endl; |
| #endif |
| while (IS_SUCC(ret)) { |
| TimeseriesIndex cur_timeseries_index; |
| PageArena cur_timeseries_index_pa; |
| cur_timeseries_index_pa.init(512, MOD_TSFILE_READER); // TODO 512 |
| if (RET_FAIL(cur_timeseries_index.deserialize_from( |
| bs, &cur_timeseries_index_pa))) { |
| } else if (is_aligned && |
| cur_timeseries_index.get_measurement_name().equal_to( |
| target_measurement_name)) { |
| void *buf = in_timeseries_index_pa.alloc( |
| sizeof(AlignedTimeseriesIndex)); |
| if (IS_NULL(buf)) { |
| return E_OOM; |
| } |
| AlignedTimeseriesIndex *aligned_ts_idx = |
| new (buf) AlignedTimeseriesIndex; |
| buf = in_timeseries_index_pa.alloc(sizeof(TimeseriesIndex)); |
| if (IS_NULL(buf)) { |
| return E_OOM; |
| } |
| aligned_ts_idx->value_ts_idx_ = new (buf) TimeseriesIndex; |
| aligned_ts_idx->value_ts_idx_->clone_from( |
| cur_timeseries_index, &in_timeseries_index_pa); |
| ret_timeseries_index = aligned_ts_idx; |
| found = true; |
| break; |
| } else if (!is_aligned && |
| cur_timeseries_index.get_measurement_name().equal_to( |
| target_measurement_name)) { |
| void *buf = |
| in_timeseries_index_pa.alloc(sizeof(TimeseriesIndex)); |
| auto ts_idx = new (buf) TimeseriesIndex; |
| ts_idx->clone_from(cur_timeseries_index, |
| &in_timeseries_index_pa); |
| ret_timeseries_index = ts_idx; |
| found = true; |
| break; |
| } |
| } // end while |
| if (!found) { |
| ret = E_NOT_EXIST; |
| } |
| } |
| mem_free(ti_buf); |
| return ret; |
| } |
| |
| int TsFileIOReader::do_load_all_timeseries_index( |
| std::vector<std::pair<std::shared_ptr<IMetaIndexEntry>, int64_t> > |
| &index_node_entry_list, |
| common::PageArena &in_timeseries_index_pa, |
| std::vector<ITimeseriesIndex *> &ts_indexs) { |
| int ret = E_OK; |
| for (const auto &index_node_entry : index_node_entry_list) { |
| int64_t start_offset = index_node_entry.first->get_offset(), |
| end_offset = index_node_entry.second; |
| int32_t read_size = (int32_t)(end_offset - start_offset); |
| int32_t ret_read_len = 0; |
| char *ti_buf = in_timeseries_index_pa.alloc(read_size); |
| if (IS_NULL(ti_buf)) { |
| return E_OOM; |
| } |
| if (RET_FAIL(read_file_->read(start_offset, ti_buf, read_size, |
| ret_read_len))) { |
| return ret; |
| } |
| ByteStream bs; |
| bs.wrap_from(ti_buf, read_size); |
| while (bs.has_remaining()) { |
| void *buf = in_timeseries_index_pa.alloc(sizeof(TimeseriesIndex)); |
| auto ts_idx = new (buf) TimeseriesIndex; |
| if (RET_FAIL( |
| ts_idx->deserialize_from(bs, &in_timeseries_index_pa))) { |
| return ret; |
| } |
| if (ts_idx->get_measurement_name().len_ == 0) continue; |
| ts_indexs.push_back(ts_idx); |
| } |
| } |
| return ret; |
| } |
| |
| int TsFileIOReader::get_all_leaf( |
| std::shared_ptr<MetaIndexNode> index_node, |
| std::vector<std::pair<std::shared_ptr<IMetaIndexEntry>, int64_t> > |
| &index_node_entry_list) { |
| int ret = E_OK; |
| if (index_node->node_type_ == LEAF_MEASUREMENT || |
| index_node->node_type_ == LEAF_DEVICE) { |
| for (size_t i = 0; i < index_node->children_.size(); i++) { |
| if (i + 1 < index_node->children_.size()) { |
| index_node_entry_list.push_back( |
| std::make_pair(index_node->children_[i], |
| index_node->children_[i + 1]->get_offset())); |
| } else { |
| index_node_entry_list.push_back(std::make_pair( |
| index_node->children_[i], index_node->end_offset_)); |
| } |
| } |
| } else { |
| // read next level index node |
| for (size_t i = 0; i < index_node->children_.size(); i++) { |
| int64_t end_offset = index_node->end_offset_; |
| if (i + 1 < index_node->children_.size()) { |
| end_offset = index_node->children_[i + 1]->get_offset(); |
| } |
| const int read_size = |
| end_offset - index_node->children_[i]->get_offset(); |
| #if DEBUG_SE |
| std::cout << "search_from_internal_node, end_offset=" << end_offset |
| << ", index_entry.offset_=" |
| << index_node->children_[i]->get_offset() << std::endl; |
| #endif |
| ASSERT(read_size > 0 && read_size < (1 << 30)); |
| PageArena cur_level_index_node_pa; |
| void *buf = cur_level_index_node_pa.alloc(sizeof(MetaIndexNode)); |
| char *data_buf = (char *)cur_level_index_node_pa.alloc(read_size); |
| if (IS_NULL(buf) || IS_NULL(data_buf)) { |
| return E_OOM; |
| } |
| auto *cur_level_index_node_ptr = |
| new (buf) MetaIndexNode(&cur_level_index_node_pa); |
| auto cur_level_index_node = std::shared_ptr<MetaIndexNode>( |
| cur_level_index_node_ptr, MetaIndexNode::self_deleter); |
| |
| int32_t ret_read_len = 0; |
| if (RET_FAIL( |
| read_file_->read(index_node->children_[i]->get_offset(), |
| data_buf, read_size, ret_read_len))) { |
| } else if (read_size != ret_read_len) { |
| ret = E_TSFILE_CORRUPTED; |
| } else if (RET_FAIL(cur_level_index_node->deserialize_from( |
| data_buf, read_size))) { |
| } else { |
| ret = get_all_leaf(cur_level_index_node, index_node_entry_list); |
| } |
| } |
| } |
| return ret; |
| } |
| #if 0 |
| int TsFileIOReader::get_next(const std::string &device_path, |
| const std::string &measurement_name, |
| TsBlock *ret_tsblock, |
| TimeRange &ret_time_range) |
| { |
| int ret = E_OK; |
| if (RET_FAIL(load_timeseries_index_if_necessary(device_path, measurement_name))) { |
| return ret; |
| } |
| return get_next_page(ret_tsblock); |
| } |
| |
| int TsFileIOReader::get_next_page(TsBlock *ret_tsblock) |
| { |
| int ret = E_OK; |
| |
| if (!chunk_reader_.has_more_data()) { |
| // has finished reading current chunk |
| if (has_next_chunk()) { |
| cursor_to_next_chunk(); |
| ChunkMeta *next_chunk_meta = chunk_meta_cursor_.get(); |
| if (RET_FAIL(init_next_chunk_reader(next_chunk_meta))) { |
| } |
| } else { |
| // has finished reading all chunks of this tsfile |
| ret = E_NO_MORE_DATA; |
| } |
| } // end if (!chunk_reader_.has_more_data()) |
| |
| if (IS_SUCC(ret)) { |
| ret = chunk_reader_.get_next_page(ret_tsblock); |
| } |
| return ret; |
| } |
| |
| int TsFileIOReader::init_first_chunk_reader(ChunkMeta *cm, |
| ReadFile *read_file, |
| const ColumnDesc &col_desc) |
| { |
| ASSERT(!chunk_reader_.has_more_data()); |
| int ret = E_OK; |
| if (RET_FAIL(chunk_reader_.init(read_file, |
| timeseries_index_.get_measurement_name(), |
| col_desc.type_, |
| col_desc.encoding_))) { |
| } else if (RET_FAIL(chunk_reader_.load_by_meta(cm))) { |
| } |
| return ret; |
| } |
| |
| int TsFileIOReader::init_next_chunk_reader(ChunkMeta *cm) |
| { |
| ASSERT(!chunk_reader_.has_more_data()); |
| chunk_reader_.reset(); |
| return chunk_reader_.load_by_meta(cm); |
| } |
| |
| int TsFileIOReader::load_timeseries_index_if_necessary(const std::string &device_path, |
| const std::string &measurement_name) |
| { |
| int ret = E_OK; |
| if (col_desc.ts_id_ != timeseries_index_.get_ts_id()) { |
| if (RET_FAIL(load_timeseries_index(col_desc))) { |
| //log_err("load timeseries_index error, ret=%d", ret); |
| } else { |
| chunk_meta_cursor_ = timeseries_index_.get_chunk_meta_list()->begin(); |
| ChunkMeta *next_chunk_meta = chunk_meta_cursor_.get(); |
| if (RET_FAIL(init_first_chunk_reader(next_chunk_meta, read_file_, col_desc))) { |
| } else { |
| cursor_to_next_chunk(); |
| } |
| } |
| } else { |
| // timeseries_index_ is ready |
| } |
| return ret; |
| } |
| |
| // TODO add a result cache for load_timeseries_index |
| int TsFileIOReader::load_timeseries_index(const ColumnDesc &col_desc) |
| { |
| int ret = E_OK; |
| |
| if (RET_FAIL(load_tsfile_meta_if_necessary())) { |
| return ret; |
| } |
| |
| MetaIndexEntry device_index_entry; |
| int64_t device_ie_end_offset = 0; |
| MetaIndexEntry measurement_index_entry; |
| int64_t measurement_ie_end_offset = 0; |
| |
| if (RET_FAIL(load_device_index_entry(col_desc, device_index_entry, device_ie_end_offset))) { |
| } else if (RET_FAIL(load_measurement_index_entry(col_desc, device_index_entry.offset_, |
| device_ie_end_offset, measurement_index_entry, |
| measurement_ie_end_offset))) { |
| } else if (RET_FAIL(do_load_timeseries_index(col_desc, measurement_index_entry.offset_, |
| measurement_ie_end_offset))) { |
| } else { |
| #if STORAGE_ENGIEN_DEBUG |
| std::cout << "load timeseries index: " << timeseries_index_ << std::endl; |
| #endif |
| } |
| return ret; |
| } |
| #endif |
| } // end namespace storage |