blob: af7f242934846703635dc8a6d6b39b75cd61d417 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "qds_without_timegenerator.h"
#include "utils/util_define.h"
using namespace common;
namespace storage {
int QDSWithoutTimeGenerator::init(TsFileIOReader *io_reader,
QueryExpression *qe) {
int ret = E_OK; // cppcheck-suppress unreadVariable
io_reader_ = io_reader;
qe_ = qe;
std::vector<Path> paths = qe_->selected_series_;
size_t origin_path_count = paths.size();
std::vector<Path> valid_paths;
Expression *global_time_expression = qe->expression_;
Filter *global_time_filter = nullptr;
if (global_time_expression != nullptr) {
global_time_filter = global_time_expression->filter_;
}
for (size_t i = 0; i < origin_path_count; i++) {
TsFileSeriesScanIterator *ssi = nullptr;
ret = io_reader_->alloc_ssi(paths[i].device_, paths[i].measurement_,
ssi, global_time_filter);
if (ret != 0) {
return ret;
} else {
ssi_vec_.push_back(ssi);
valid_paths.push_back(paths[i]);
}
}
size_t path_count = valid_paths.size();
row_record_ = new RowRecord(path_count);
tsblocks_.resize(path_count);
time_iters_.resize(path_count);
value_iters_.resize(path_count);
for (size_t i = 0; i < path_count; i++) {
get_next_tsblock(i, true);
}
return E_OK; // ignore invalid timeseries
}
void QDSWithoutTimeGenerator::destroy() {
if (row_record_ != nullptr) {
delete row_record_;
row_record_ = nullptr;
}
for (size_t i = 0; i < time_iters_.size(); i++) {
delete time_iters_[i];
time_iters_[i] = nullptr;
}
time_iters_.clear();
for (size_t i = 0; i < value_iters_.size(); i++) {
delete value_iters_[i];
value_iters_[i] = nullptr;
}
value_iters_.clear();
heap_time_.clear();
ASSERT(ssi_vec_.size() == tsblocks_.size());
for (size_t i = 0; i < ssi_vec_.size(); i++) {
ssi_vec_[i]->revert_tsblock();
}
for (size_t i = 0; i < ssi_vec_.size(); i++) {
TsFileSeriesScanIterator *ssi = ssi_vec_[i];
io_reader_->revert_ssi(ssi);
}
ssi_vec_.clear();
}
RowRecord *QDSWithoutTimeGenerator::get_next() {
row_record_->reset();
if (heap_time_.size() == 0) {
return nullptr;
}
int64_t time = heap_time_.begin()->first;
row_record_->set_timestamp(time);
uint32_t count = heap_time_.count(time);
std::multimap<int64_t, uint32_t>::iterator iter = heap_time_.find(time);
for (uint32_t i = 0; i < count; ++i) {
uint32_t len = 0;
row_record_->get_field(iter->second)
->set_value(value_iters_[iter->second]->get_data_type(),
value_iters_[iter->second]->read(&len));
value_iters_[iter->second]->next();
if (!time_iters_[iter->second]->end()) {
int64_t timev = *(int64_t *)(time_iters_[iter->second]->read(&len));
heap_time_.insert(std::make_pair(timev, iter->second));
time_iters_[iter->second]->next();
} else {
get_next_tsblock(iter->second, false);
}
std::multimap<int64_t, uint32_t>::iterator cur = iter;
iter++; // cppcheck-suppress postfixOperator
heap_time_.erase(cur);
}
return row_record_;
}
int QDSWithoutTimeGenerator::get_next_tsblock(uint32_t index, bool alloc_mem) {
if (tsblocks_[index] != nullptr) {
delete time_iters_[index];
time_iters_[index] = nullptr;
delete value_iters_[index];
value_iters_[index] = nullptr;
tsblocks_[index]->reset();
}
int ret = ssi_vec_[index]->get_next(tsblocks_[index], alloc_mem);
if (IS_SUCC(ret)) {
time_iters_[index] = new ColIterator(0, tsblocks_[index]);
uint32_t len = 0;
int64_t time = *(int64_t *)(time_iters_[index]->read(&len));
time_iters_[index]->next();
heap_time_.insert(std::pair<uint64_t, uint32_t>(time, index));
value_iters_[index] = new ColIterator(1, tsblocks_[index]);
} else {
if (time_iters_[index]) {
delete time_iters_[index];
time_iters_[index] = nullptr;
}
if (value_iters_[index]) {
delete value_iters_[index];
value_iters_[index] = nullptr;
}
if (tsblocks_[index]) {
ssi_vec_[index]->destroy();
tsblocks_[index] = nullptr;
}
ret = E_OK; // TODO
}
return ret;
}
} // namespace storage