blob: f97570885852ec34181c91a3c16b99f4605ff380 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "tsfile_reader.h"
#include "common/schema.h"
#include "filter/time_operator.h"
#include "tsfile_executor.h"
using namespace common;
using namespace storage;
namespace storage {
TsFileReader::TsFileReader()
: read_file_(nullptr),
tsfile_executor_(nullptr),
table_query_executor_(nullptr) {}
TsFileReader::~TsFileReader() { close(); }
int TsFileReader::open(const std::string& file_path) {
int ret = E_OK;
read_file_ = new storage::ReadFile;
tsfile_executor_ = new storage::TsFileExecutor();
if (RET_FAIL(read_file_->open(file_path))) {
std::cout << "filed to open file " << ret << std::endl;
} else if (RET_FAIL(tsfile_executor_->init(read_file_))) {
std::cout << "filed to init " << ret << std::endl;
}
table_query_executor_ = new storage::TableQueryExecutor(read_file_);
return ret;
}
int TsFileReader::close() {
int ret = E_OK;
if (tsfile_executor_ != nullptr) {
delete tsfile_executor_;
tsfile_executor_ = nullptr;
}
if (table_query_executor_ != nullptr) {
delete table_query_executor_;
table_query_executor_ = nullptr;
}
if (read_file_ != nullptr) {
read_file_->close();
delete read_file_;
read_file_ = nullptr;
}
return ret;
}
int TsFileReader::query(QueryExpression* qe, ResultSet*& ret_qds) {
return tsfile_executor_->execute(qe, ret_qds);
}
int TsFileReader::query(std::vector<std::string>& path_list, int64_t start_time,
int64_t end_time, ResultSet*& result_set) {
int ret = E_OK;
Filter* time_filter = new TimeBetween(start_time, end_time, false);
Expression* exp =
new storage::Expression(storage::GLOBALTIME_EXPR, time_filter);
std::vector<Path> path_list_vec;
for (const auto& path : path_list) {
path_list_vec.emplace_back(Path(path, true));
}
QueryExpression* query_expression =
QueryExpression::create(path_list_vec, exp);
ret = tsfile_executor_->execute(query_expression, result_set);
return ret;
}
int TsFileReader::query(const std::string& table_name,
const std::vector<std::string>& columns_names,
int64_t start_time, int64_t end_time,
ResultSet*& result_set) {
return this->query(table_name, columns_names, start_time, end_time,
result_set, nullptr);
}
int TsFileReader::query(const std::string& table_name,
const std::vector<std::string>& columns_names,
int64_t start_time, int64_t end_time,
ResultSet*& result_set, Filter* tag_filter) {
int ret = E_OK;
TsFileMeta* tsfile_meta = tsfile_executor_->get_tsfile_meta();
if (tsfile_meta == nullptr) {
return E_TSFILE_WRITER_META_ERR;
}
std::shared_ptr<TableSchema> table_schema =
tsfile_meta->table_schemas_.at(to_lower(table_name));
if (table_schema == nullptr) {
return E_TABLE_NOT_EXIST;
}
Filter* time_filter = new TimeBetween(start_time, end_time, false);
ret = table_query_executor_->query(to_lower(table_name), columns_names,
time_filter, tag_filter, nullptr,
result_set);
return ret;
}
int TsFileReader::query_table_on_tree(
const std::vector<std::string>& measurement_names, int64_t star_time,
int64_t end_time, ResultSet*& result_set) {
int ret = E_OK;
TsFileMeta* tsfile_meta = tsfile_executor_->get_tsfile_meta();
if (tsfile_meta == nullptr) {
return E_TSFILE_WRITER_META_ERR;
}
auto device_ids = this->get_all_device_ids();
std::vector<std::shared_ptr<IDeviceID>> satisfied_device_ids;
std::unordered_set<std::string> measurement_names_set_to_query;
size_t device_max_len = 0;
if (measurement_names.empty()) {
for (auto& device_name : device_ids) {
std::vector<MeasurementSchema> schemas;
this->get_timeseries_schema(device_name, schemas);
satisfied_device_ids.push_back(device_name);
for (auto& schema : schemas) {
measurement_names_set_to_query.insert(schema.measurement_name_);
}
device_name->split_table_name();
if (device_name->get_split_seg_num() > device_max_len) {
device_max_len = device_name->get_split_seg_num();
}
}
} else {
std::unordered_set<std::string> found_measurement_names;
std::unordered_set<std::string> required_measurement_names(
measurement_names.begin(), measurement_names.end());
for (auto& device_name : device_ids) {
std::vector<MeasurementSchema> schemas;
this->get_timeseries_schema(device_name, schemas);
bool device_has_required_measurement_names = false;
for (auto& schema : schemas) {
if (required_measurement_names.find(schema.measurement_name_) !=
required_measurement_names.end()) {
found_measurement_names.insert(schema.measurement_name_);
device_has_required_measurement_names = true;
}
}
if (device_has_required_measurement_names) {
device_name->split_table_name();
satisfied_device_ids.push_back(device_name);
if (device_name->get_split_seg_num() > device_max_len) {
device_max_len = device_name->get_split_seg_num();
}
}
}
if (found_measurement_names.size() <
required_measurement_names.size()) {
return E_COLUMN_NOT_EXIST;
}
measurement_names_set_to_query = found_measurement_names;
}
std::vector<std::string> measurement_names_to_query;
// Get all columns.
if (measurement_names.empty() && !measurement_names_set_to_query.empty()) {
for (auto& measurement_name : measurement_names_set_to_query) {
measurement_names_to_query.push_back(measurement_name);
}
} else {
measurement_names_to_query = measurement_names;
}
std::vector<std::string> columns_names(device_max_len);
for (int i = 0; i < device_max_len; i++) {
columns_names[i] = "col_" + std::to_string(i);
}
Filter* time_filter = new TimeBetween(star_time, end_time, false);
ret = table_query_executor_->query_on_tree(
satisfied_device_ids, columns_names, measurement_names_to_query,
time_filter, result_set);
return ret;
}
void TsFileReader::destroy_query_data_set(storage::ResultSet* qds) {
tsfile_executor_->destroy_query_data_set(qds);
}
std::vector<std::shared_ptr<IDeviceID>> TsFileReader::get_all_devices(
std::string table_name) {
TsFileMeta* tsfile_meta = tsfile_executor_->get_tsfile_meta();
std::vector<std::shared_ptr<IDeviceID>> device_ids;
if (tsfile_meta != nullptr) {
PageArena pa;
pa.init(512, MOD_TSFILE_READER);
to_lowercase_inplace(table_name);
auto index_node =
tsfile_meta->table_metadata_index_node_map_[table_name];
get_all_devices(device_ids, index_node, pa);
}
return device_ids;
}
std::vector<std::shared_ptr<IDeviceID>> TsFileReader::get_all_device_ids() {
TsFileMeta* tsfile_meta = tsfile_executor_->get_tsfile_meta();
std::vector<std::shared_ptr<IDeviceID>> device_ids;
if (tsfile_meta != nullptr) {
PageArena pa;
pa.init(512, MOD_TSFILE_READER);
for (auto entry : tsfile_meta->table_metadata_index_node_map_) {
auto index_node = entry.second;
get_all_devices(device_ids, index_node, pa);
}
}
return device_ids;
}
int TsFileReader::get_all_devices(
std::vector<std::shared_ptr<IDeviceID>>& device_ids,
std::shared_ptr<MetaIndexNode> index_node, PageArena& pa) {
int ret = E_OK;
if (index_node != nullptr) {
if (index_node->node_type_ == LEAF_DEVICE) {
for (const auto& meta_index_entry : index_node->children_) {
device_ids.push_back(meta_index_entry->get_device_id());
}
} else {
for (size_t idx = 0; idx < index_node->children_.size(); idx++) {
auto meta_index_entry = index_node->children_[idx];
int start_offset = meta_index_entry->get_offset();
int end_offset = index_node->end_offset_;
if (idx + 1 < index_node->children_.size()) {
end_offset = index_node->children_[idx + 1]->get_offset();
}
ASSERT(end_offset - start_offset > 0);
const int32_t read_size = (int32_t)end_offset - start_offset;
int32_t ret_read_len = 0;
char* data_buf = (char*)pa.alloc(read_size);
void* m_idx_node_buf = pa.alloc(sizeof(MetaIndexNode));
if (IS_NULL(data_buf) || IS_NULL(m_idx_node_buf)) {
return E_OOM;
}
auto* top_node_ptr = new (m_idx_node_buf) MetaIndexNode(&pa);
auto top_node = std::shared_ptr<MetaIndexNode>(
top_node_ptr, [](MetaIndexNode* ptr) {
if (ptr) {
ptr->~MetaIndexNode();
}
});
if (RET_FAIL(read_file_->read(start_offset, data_buf, read_size,
ret_read_len))) {
} else if (RET_FAIL(top_node->device_deserialize_from(
data_buf, read_size))) {
} else {
ret = get_all_devices(device_ids, top_node, pa);
}
}
}
}
return ret;
}
int TsFileReader::get_timeseries_schema(
std::shared_ptr<IDeviceID> device_id,
std::vector<MeasurementSchema>& result) {
int ret = E_OK;
std::vector<ITimeseriesIndex*> timeseries_indexs;
PageArena pa;
pa.init(512, MOD_TSFILE_READER);
if (RET_FAIL(tsfile_executor_->get_tsfile_io_reader()
->get_device_timeseries_meta_without_chunk_meta(
device_id, timeseries_indexs, pa))) {
} else {
for (auto timeseries_index : timeseries_indexs) {
MeasurementSchema ms(
timeseries_index->get_measurement_name().to_std_string(),
timeseries_index->get_data_type());
result.push_back(ms);
}
}
return E_OK;
}
ResultSet* TsFileReader::read_timeseries(
const std::shared_ptr<IDeviceID>& device_id,
const std::vector<std::string>& measurement_name) {
return nullptr;
}
std::shared_ptr<TableSchema> TsFileReader::get_table_schema(
const std::string& table_name) {
TsFileMeta* file_metadata = tsfile_executor_->get_tsfile_meta();
MetaIndexNode* table_root = nullptr;
std::shared_ptr<TableSchema> table_schema;
if (IS_FAIL(file_metadata->get_table_metaindex_node(to_lower(table_name),
table_root))) {
} else if (IS_FAIL(file_metadata->get_table_schema(to_lower(table_name),
table_schema))) {
}
return table_schema;
}
std::vector<std::shared_ptr<TableSchema>>
TsFileReader::get_all_table_schemas() {
TsFileMeta* file_metadata = tsfile_executor_->get_tsfile_meta();
std::vector<std::shared_ptr<TableSchema>> table_schemas;
for (const auto& table_schema : file_metadata->table_schemas_) {
table_schemas.push_back(table_schema.second);
}
return table_schemas;
}
} // namespace storage