blob: f613d735e716d572d6d78d20753fa18cc90ee72e [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "tsfile_reader.h"
#include "common/schema.h"
#include "filter/time_operator.h"
#include "tsfile_executor.h"
using namespace common;
using namespace storage;
namespace storage {
TsFileReader::TsFileReader() : read_file_(nullptr), tsfile_executor_(nullptr) {}
TsFileReader::~TsFileReader() { close(); }
int TsFileReader::open(const std::string &file_path) {
int ret = E_OK;
read_file_ = new storage::ReadFile;
tsfile_executor_ = new storage::TsFileExecutor();
if (RET_FAIL(read_file_->open(file_path))) {
std::cout << "filed to open file " << ret << std::endl;
} else if (RET_FAIL(tsfile_executor_->init(read_file_))) {
std::cout << "filed to init " << ret << std::endl;
}
return ret;
}
int TsFileReader::close() {
int ret = E_OK;
if (tsfile_executor_ != nullptr) {
delete tsfile_executor_;
tsfile_executor_ = nullptr;
}
if (read_file_ != nullptr) {
read_file_->close();
delete read_file_;
read_file_ = nullptr;
}
return ret;
}
int TsFileReader::query(QueryExpression *qe, ResultSet *&ret_qds) {
return tsfile_executor_->execute(qe, ret_qds);
}
int TsFileReader::query(std::vector<std::string> &path_list, int64_t start_time,
int64_t end_time, ResultSet *&result_set) {
int ret = E_OK;
Filter *time_filter = new TimeBetween(start_time, end_time, false);
Expression *exp =
new storage::Expression(storage::GLOBALTIME_EXPR, time_filter);
std::vector<Path> path_list_vec;
for (const auto &path : path_list) {
path_list_vec.emplace_back(Path(path, true));
}
QueryExpression *query_expression =
QueryExpression::create(path_list_vec, exp);
ret = tsfile_executor_->execute(query_expression, result_set);
return ret;
}
void TsFileReader::destroy_query_data_set(storage::ResultSet *qds) {
tsfile_executor_->destroy_query_data_set(qds);
}
std::vector<std::string> TsFileReader::get_all_devices() {
TsFileMeta *tsfile_meta = tsfile_executor_->get_tsfile_meta();
std::vector<std::string> device_ids;
if (tsfile_meta != nullptr) {
PageArena pa;
pa.init(512, MOD_TSFILE_READER);
get_all_devices(device_ids, tsfile_meta->index_node_, pa);
}
return device_ids;
}
int TsFileReader::get_all_devices(std::vector<std::string> &device_ids, MetaIndexNode *index_node, PageArena &pa) {
int ret = E_OK;
if (index_node != nullptr) {
if (index_node->node_type_ == LEAF_DEVICE) {
for (const auto &meta_index_entry : index_node->children_) {
device_ids.push_back(meta_index_entry->name_.to_std_string());
}
} else {
for (size_t idx = 0; idx < index_node->children_.size(); idx++) {
MetaIndexEntry* meta_index_entry = index_node->children_[idx];
int start_offset = meta_index_entry->offset_;
int end_offset = index_node->end_offset_;
if (idx + 1 < index_node->children_.size()) {
end_offset = index_node->children_[idx + 1]->offset_;
}
ASSERT(end_offset - start_offset > 0);
const int32_t read_size = (int32_t)end_offset - start_offset;
int32_t ret_read_len = 0;
char *data_buf = (char *)pa.alloc(read_size);
void *m_idx_node_buf = pa.alloc(sizeof(MetaIndexNode));
if (IS_NULL(data_buf) || IS_NULL(m_idx_node_buf)) {
return E_OOM;
}
MetaIndexNode *top_node = new (m_idx_node_buf) MetaIndexNode(&pa);
if (RET_FAIL(read_file_->read(start_offset, data_buf, read_size,
ret_read_len))) {
} else if (RET_FAIL(top_node->deserialize_from(data_buf, read_size))) {
} else {
ret = get_all_devices(device_ids, top_node, pa);
}
top_node->destroy();
}
}
}
return ret;
}
int TsFileReader::get_timeseries_schema(
const std::string &device_id, std::vector<MeasurementSchema> &result) {
int ret = E_OK;
std::vector<ITimeseriesIndex *> timeseries_indexs;
PageArena pa;
pa.init(512, MOD_TSFILE_READER);
if (RET_FAIL(tsfile_executor_->get_tsfile_io_reader()
->get_device_timeseries_meta_without_chunk_meta(
device_id, timeseries_indexs, pa))) {
} else {
for (auto timeseries_index : timeseries_indexs) {
MeasurementSchema ms(
timeseries_index->get_measurement_name().to_std_string(),
timeseries_index->get_data_type());
result.push_back(ms);
}
}
return E_OK;
}
ResultSet *TsFileReader::read_timeseries(
const std::string &device_name, std::vector<std::string> measurement_name) {
return nullptr;
}
} // namespace storage