blob: 6da094303e4148dc80856fee3c0d54cb0b71943a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "tsfile_reader.h"
#include "common/schema.h"
#include "filter/time_operator.h"
#include "tsfile_executor.h"
using namespace common;
using namespace storage;
namespace storage {
TsFileReader::TsFileReader()
: read_file_(nullptr),
tsfile_executor_(nullptr),
table_query_executor_(nullptr) {}
TsFileReader::~TsFileReader() { close(); }
int TsFileReader::open(const std::string& file_path) {
int ret = E_OK;
read_file_ = new storage::ReadFile;
tsfile_executor_ = new storage::TsFileExecutor();
if (RET_FAIL(read_file_->open(file_path))) {
std::cout << "filed to open file " << ret << std::endl;
} else if (RET_FAIL(tsfile_executor_->init(read_file_))) {
std::cout << "filed to init " << ret << std::endl;
}
table_query_executor_ = new storage::TableQueryExecutor(read_file_);
return ret;
}
int TsFileReader::close() {
int ret = E_OK;
if (tsfile_executor_ != nullptr) {
delete tsfile_executor_;
tsfile_executor_ = nullptr;
}
if (table_query_executor_ != nullptr) {
delete table_query_executor_;
table_query_executor_ = nullptr;
}
if (read_file_ != nullptr) {
read_file_->close();
delete read_file_;
read_file_ = nullptr;
}
return ret;
}
int TsFileReader::query(QueryExpression* qe, ResultSet*& ret_qds) {
return tsfile_executor_->execute(qe, ret_qds);
}
int TsFileReader::query(std::vector<std::string>& path_list, int64_t start_time,
int64_t end_time, ResultSet*& result_set) {
int ret = E_OK;
Filter* time_filter = new TimeBetween(start_time, end_time, false);
Expression* exp =
new storage::Expression(storage::GLOBALTIME_EXPR, time_filter);
std::vector<Path> path_list_vec;
for (const auto& path : path_list) {
path_list_vec.emplace_back(Path(path, true));
}
QueryExpression* query_expression =
QueryExpression::create(path_list_vec, exp);
ret = tsfile_executor_->execute(query_expression, result_set);
return ret;
}
int TsFileReader::query(const std::string& table_name,
const std::vector<std::string>& columns_names,
int64_t start_time, int64_t end_time,
ResultSet*& result_set) {
int ret = E_OK;
TsFileMeta* tsfile_meta = tsfile_executor_->get_tsfile_meta();
if (tsfile_meta == nullptr) {
return E_TSFILE_WRITER_META_ERR;
}
std::shared_ptr<TableSchema> table_schema =
tsfile_meta->table_schemas_.at(to_lower(table_name));
if (table_schema == nullptr) {
return E_TABLE_NOT_EXIST;
}
std::vector<TSDataType> data_types = table_schema->get_data_types();
Filter* time_filter = new TimeBetween(start_time, end_time, false);
ret =
table_query_executor_->query(to_lower(table_name), columns_names,
time_filter, nullptr, nullptr, result_set);
return ret;
}
void TsFileReader::destroy_query_data_set(storage::ResultSet* qds) {
tsfile_executor_->destroy_query_data_set(qds);
}
std::vector<std::shared_ptr<IDeviceID>> TsFileReader::get_all_devices(
std::string table_name) {
TsFileMeta* tsfile_meta = tsfile_executor_->get_tsfile_meta();
std::vector<std::shared_ptr<IDeviceID>> device_ids;
if (tsfile_meta != nullptr) {
PageArena pa;
pa.init(512, MOD_TSFILE_READER);
to_lowercase_inplace(table_name);
auto index_node =
tsfile_meta->table_metadata_index_node_map_[table_name];
get_all_devices(device_ids, index_node, pa);
}
return device_ids;
}
std::vector<std::shared_ptr<IDeviceID>> TsFileReader::get_all_device_ids() {
TsFileMeta* tsfile_meta = tsfile_executor_->get_tsfile_meta();
std::vector<std::shared_ptr<IDeviceID>> device_ids;
if (tsfile_meta != nullptr) {
PageArena pa;
pa.init(512, MOD_TSFILE_READER);
for (auto entry : tsfile_meta->table_metadata_index_node_map_) {
auto index_node = entry.second;
get_all_devices(device_ids, index_node, pa);
}
}
return device_ids;
}
int TsFileReader::get_all_devices(
std::vector<std::shared_ptr<IDeviceID>>& device_ids,
std::shared_ptr<MetaIndexNode> index_node, PageArena& pa) {
int ret = E_OK;
if (index_node != nullptr) {
if (index_node->node_type_ == LEAF_DEVICE) {
for (const auto& meta_index_entry : index_node->children_) {
device_ids.push_back(meta_index_entry->get_device_id());
}
} else {
for (size_t idx = 0; idx < index_node->children_.size(); idx++) {
auto meta_index_entry = index_node->children_[idx];
int start_offset = meta_index_entry->get_offset();
int end_offset = index_node->end_offset_;
if (idx + 1 < index_node->children_.size()) {
end_offset = index_node->children_[idx + 1]->get_offset();
}
ASSERT(end_offset - start_offset > 0);
const int32_t read_size = (int32_t)end_offset - start_offset;
int32_t ret_read_len = 0;
char* data_buf = (char*)pa.alloc(read_size);
void* m_idx_node_buf = pa.alloc(sizeof(MetaIndexNode));
if (IS_NULL(data_buf) || IS_NULL(m_idx_node_buf)) {
return E_OOM;
}
auto* top_node_ptr = new (m_idx_node_buf) MetaIndexNode(&pa);
auto top_node = std::shared_ptr<MetaIndexNode>(
top_node_ptr, [](MetaIndexNode* ptr) {
if (ptr) {
ptr->~MetaIndexNode();
}
});
if (RET_FAIL(read_file_->read(start_offset, data_buf, read_size,
ret_read_len))) {
} else if (RET_FAIL(top_node->device_deserialize_from(
data_buf, read_size))) {
} else {
ret = get_all_devices(device_ids, top_node, pa);
}
}
}
}
return ret;
}
int TsFileReader::get_timeseries_schema(
std::shared_ptr<IDeviceID> device_id,
std::vector<MeasurementSchema>& result) {
int ret = E_OK;
std::vector<ITimeseriesIndex*> timeseries_indexs;
PageArena pa;
pa.init(512, MOD_TSFILE_READER);
if (RET_FAIL(tsfile_executor_->get_tsfile_io_reader()
->get_device_timeseries_meta_without_chunk_meta(
device_id, timeseries_indexs, pa))) {
} else {
for (auto timeseries_index : timeseries_indexs) {
MeasurementSchema ms(
timeseries_index->get_measurement_name().to_std_string(),
timeseries_index->get_data_type());
result.push_back(ms);
}
}
return E_OK;
}
ResultSet* TsFileReader::read_timeseries(
const std::shared_ptr<IDeviceID>& device_id,
const std::vector<std::string>& measurement_name) {
return nullptr;
}
std::shared_ptr<TableSchema> TsFileReader::get_table_schema(
const std::string& table_name) {
TsFileMeta* file_metadata = tsfile_executor_->get_tsfile_meta();
MetaIndexNode* table_root = nullptr;
std::shared_ptr<TableSchema> table_schema;
if (IS_FAIL(file_metadata->get_table_metaindex_node(to_lower(table_name),
table_root))) {
} else if (IS_FAIL(file_metadata->get_table_schema(to_lower(table_name),
table_schema))) {
}
return table_schema;
}
std::vector<std::shared_ptr<TableSchema>>
TsFileReader::get_all_table_schemas() {
TsFileMeta* file_metadata = tsfile_executor_->get_tsfile_meta();
std::vector<std::shared_ptr<TableSchema>> table_schemas;
for (const auto& table_schema : file_metadata->table_schemas_) {
table_schemas.push_back(table_schema.second);
}
return table_schemas;
}
} // namespace storage