blob: 90c782131efef9d99b6345a121f38fc386a2ff65 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "qds_without_timegenerator.h"
#include "utils/util_define.h"
using namespace common;
namespace storage {
int QDSWithoutTimeGenerator::init(TsFileIOReader* io_reader,
QueryExpression* qe) {
int ret = E_OK; // cppcheck-suppress unreadVariable
pa_.reset();
pa_.init(512, common::MOD_TSFILE_READER);
io_reader_ = io_reader;
qe_ = qe;
std::vector<Path> paths = qe_->selected_series_;
size_t origin_path_count = paths.size();
std::vector<Path> valid_paths;
std::vector<std::string> column_names;
std::vector<common::TSDataType> data_types;
column_names.reserve(origin_path_count);
data_types.reserve(origin_path_count);
Expression* global_time_expression = qe->expression_;
Filter* global_time_filter = nullptr;
if (global_time_expression != nullptr) {
global_time_filter = global_time_expression->filter_;
}
index_lookup_.insert({"time", 0});
for (size_t i = 0; i < origin_path_count; i++) {
TsFileSeriesScanIterator* ssi = nullptr;
ret = io_reader_->alloc_ssi(paths[i].device_id_, paths[i].measurement_,
ssi, pa_, global_time_filter);
if (ret != 0) {
return ret;
} else {
index_lookup_.insert({paths[i].measurement_, i + 1});
ssi_vec_.push_back(ssi);
valid_paths.push_back(paths[i]);
column_names.push_back(paths[i].full_path_);
}
}
size_t path_count = valid_paths.size();
row_record_ = new RowRecord(path_count + 1);
tsblocks_.resize(path_count);
time_iters_.resize(path_count);
value_iters_.resize(path_count);
for (size_t i = 0; i < path_count; i++) {
get_next_tsblock(i, true);
data_types.push_back(value_iters_[i] != nullptr
? value_iters_[i]->get_data_type()
: TSDataType::NULL_TYPE);
}
result_set_metadata_ =
std::make_shared<ResultSetMetadata>(column_names, data_types);
return E_OK; // ignore invalid timeseries
}
void QDSWithoutTimeGenerator::close() {
if (row_record_ != nullptr) {
delete row_record_;
row_record_ = nullptr;
}
for (size_t i = 0; i < time_iters_.size(); i++) {
delete time_iters_[i];
time_iters_[i] = nullptr;
}
time_iters_.clear();
for (size_t i = 0; i < value_iters_.size(); i++) {
delete value_iters_[i];
value_iters_[i] = nullptr;
}
value_iters_.clear();
heap_time_.clear();
ASSERT(ssi_vec_.size() == tsblocks_.size());
for (size_t i = 0; i < ssi_vec_.size(); i++) {
ssi_vec_[i]->revert_tsblock();
}
for (size_t i = 0; i < ssi_vec_.size(); i++) {
TsFileSeriesScanIterator* ssi = ssi_vec_[i];
io_reader_->revert_ssi(ssi);
}
ssi_vec_.clear();
if (qe_ != nullptr) {
delete qe_;
qe_ = nullptr;
}
pa_.destroy();
}
int QDSWithoutTimeGenerator::next(bool& has_next) {
row_record_->reset();
if (heap_time_.size() == 0) {
has_next = false;
return E_OK;
}
int64_t time = heap_time_.begin()->first;
row_record_->set_timestamp(time);
row_record_->get_field(0)->set_value(INT64, &time, get_len(INT64), pa_);
uint32_t count = heap_time_.count(time);
std::multimap<int64_t, uint32_t>::iterator iter = heap_time_.find(time);
for (uint32_t i = 0; i < count; ++i) {
uint32_t len = 0;
auto val_datatype = value_iters_[iter->second]->get_data_type();
void* val_ptr = value_iters_[iter->second]->read(&len);
row_record_->get_field(iter->second + 1)
->set_value(val_datatype, val_ptr, len, pa_);
value_iters_[iter->second]->next();
if (!time_iters_[iter->second]->end()) {
int64_t timev = *(int64_t*)(time_iters_[iter->second]->read(&len));
heap_time_.insert(std::make_pair(timev, iter->second));
time_iters_[iter->second]->next();
} else {
get_next_tsblock(iter->second, false);
}
std::multimap<int64_t, uint32_t>::iterator cur = iter;
iter++; // cppcheck-suppress postfixOperator
heap_time_.erase(cur);
}
has_next = true;
return E_OK;
}
bool QDSWithoutTimeGenerator::is_null(const std::string& column_name) {
auto iter = index_lookup_.find(column_name);
if (iter == index_lookup_.end()) {
return true;
} else {
return is_null(iter->second + 1);
}
}
bool QDSWithoutTimeGenerator::is_null(uint32_t column_index) {
return row_record_->get_field(column_index - 1) == nullptr ||
row_record_->get_field(column_index - 1)->type_ == NULL_TYPE;
}
RowRecord* QDSWithoutTimeGenerator::get_row_record() { return row_record_; }
std::shared_ptr<ResultSetMetadata> QDSWithoutTimeGenerator::get_metadata() {
return result_set_metadata_;
}
int QDSWithoutTimeGenerator::get_next_tsblock(uint32_t index, bool alloc_mem) {
if (tsblocks_[index] != nullptr) {
delete time_iters_[index];
time_iters_[index] = nullptr;
delete value_iters_[index];
value_iters_[index] = nullptr;
tsblocks_[index]->reset();
}
int ret = ssi_vec_[index]->get_next(tsblocks_[index], alloc_mem);
if (IS_SUCC(ret)) {
time_iters_[index] = new ColIterator(0, tsblocks_[index]);
uint32_t len = 0;
int64_t time = *(int64_t*)(time_iters_[index]->read(&len));
time_iters_[index]->next();
heap_time_.insert(std::pair<uint64_t, uint32_t>(time, index));
value_iters_[index] = new ColIterator(1, tsblocks_[index]);
} else {
if (time_iters_[index]) {
delete time_iters_[index];
time_iters_[index] = nullptr;
}
if (value_iters_[index]) {
delete value_iters_[index];
value_iters_[index] = nullptr;
}
if (tsblocks_[index]) {
ssi_vec_[index]->destroy();
tsblocks_[index] = nullptr;
}
ret = E_OK; // TODO
}
return ret;
}
} // namespace storage