blob: 0710b587315771ee0e57b35ec4a23691fc5b9ee3 [file]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* License); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include "qds_without_timegenerator.h"
#include "utils/util_define.h"
using namespace common;
namespace storage {
int QDSWithoutTimeGenerator::init(TsFileIOReader* io_reader,
QueryExpression* qe) {
remaining_offset_ = 0;
remaining_limit_ = -1;
is_single_path_ = false;
return init_internal(io_reader, qe);
}
int QDSWithoutTimeGenerator::init(TsFileIOReader* io_reader,
QueryExpression* qe, int offset, int limit) {
remaining_offset_ = offset;
remaining_limit_ = limit;
is_single_path_ = false;
return init_internal(io_reader, qe);
}
int QDSWithoutTimeGenerator::init_internal(TsFileIOReader* io_reader,
QueryExpression* qe) {
int ret = E_OK; // cppcheck-suppress unreadVariable
pa_.reset();
pa_.init(512, common::MOD_TSFILE_READER);
io_reader_ = io_reader;
qe_ = qe;
std::vector<Path> paths = qe_->selected_series_;
size_t origin_path_count = paths.size();
std::vector<Path> valid_paths;
std::vector<std::string> column_names;
std::vector<common::TSDataType> data_types;
column_names.reserve(origin_path_count);
data_types.reserve(origin_path_count);
Expression* global_time_expression = qe->expression_;
Filter* global_time_filter = nullptr;
if (global_time_expression != nullptr) {
global_time_filter = global_time_expression->filter_;
}
index_lookup_.insert({"time", 0});
for (size_t i = 0; i < origin_path_count; i++) {
TsFileSeriesScanIterator* ssi = nullptr;
ret = io_reader_->alloc_ssi(paths[i].device_id_, paths[i].measurement_,
ssi, pa_, global_time_filter);
if (ret != 0) {
return ret;
} else {
index_lookup_.insert({paths[i].measurement_, i + 1});
if (paths[i].full_path_ != paths[i].measurement_) {
index_lookup_.insert({paths[i].full_path_, i + 1});
}
ssi_vec_.push_back(ssi);
valid_paths.push_back(paths[i]);
column_names.push_back(paths[i].full_path_);
}
}
size_t path_count = valid_paths.size();
is_single_path_ = (path_count == 1);
// Only push offset/limit to SSI for single-path; multi-path applies at
// merge.
for (size_t i = 0; i < path_count; i++) {
ssi_vec_[i]->set_row_range(is_single_path_ ? remaining_offset_ : 0,
is_single_path_ ? remaining_limit_ : -1);
}
row_record_ = new RowRecord(path_count + 1);
tsblocks_.resize(path_count);
time_iters_.resize(path_count);
value_iters_.resize(path_count);
for (size_t i = 0; i < path_count; i++) {
get_next_tsblock(i, true);
data_types.push_back(value_iters_[i] != nullptr
? value_iters_[i]->get_data_type()
: TSDataType::NULL_TYPE);
}
// Single-path: SSI may have consumed offset/limit by skipping chunks/pages
// during first get_next_tsblock(); sync so QDS does not double-apply.
if (is_single_path_) {
remaining_offset_ = ssi_vec_[0]->get_row_offset();
remaining_limit_ = ssi_vec_[0]->get_row_limit();
}
result_set_metadata_ =
std::make_shared<ResultSetMetadata>(column_names, data_types);
return E_OK; // ignore invalid timeseries
}
void QDSWithoutTimeGenerator::close() {
if (row_record_ != nullptr) {
delete row_record_;
row_record_ = nullptr;
}
for (size_t i = 0; i < time_iters_.size(); i++) {
delete time_iters_[i];
time_iters_[i] = nullptr;
}
time_iters_.clear();
for (size_t i = 0; i < value_iters_.size(); i++) {
delete value_iters_[i];
value_iters_[i] = nullptr;
}
value_iters_.clear();
heap_time_.clear();
ASSERT(ssi_vec_.size() == tsblocks_.size());
for (size_t i = 0; i < ssi_vec_.size(); i++) {
ssi_vec_[i]->revert_tsblock();
}
for (size_t i = 0; i < ssi_vec_.size(); i++) {
TsFileSeriesScanIterator* ssi = ssi_vec_[i];
io_reader_->revert_ssi(ssi);
}
ssi_vec_.clear();
if (qe_ != nullptr) {
delete qe_;
qe_ = nullptr;
}
pa_.destroy();
}
int QDSWithoutTimeGenerator::next(bool& has_next) {
// For single path, apply offset/limit at row level.
if (is_single_path_) {
while (true) {
row_record_->reset();
if (heap_time_.size() == 0) {
has_next = false;
return E_OK;
}
if (remaining_limit_ == 0) {
has_next = false;
return E_OK;
}
int64_t time = heap_time_.begin()->first;
bool skip_row = remaining_offset_ > 0;
if (skip_row) {
remaining_offset_--;
} else {
row_record_->set_timestamp(time);
row_record_->get_field(0)->set_value(INT64, &time,
get_len(INT64), pa_);
}
uint32_t len = 0;
uint32_t idx = heap_time_.begin()->second;
bool is_null_val = false;
auto val_datatype = value_iters_[idx]->get_data_type();
void* val_ptr = value_iters_[idx]->read(&len, &is_null_val);
if (!skip_row) {
if (!is_null_val) {
row_record_->get_field(idx + 1)->set_value(
val_datatype, val_ptr, len, pa_);
}
}
value_iters_[idx]->next();
heap_time_.erase(heap_time_.begin());
if (!time_iters_[idx]->end()) {
int64_t timev = *(int64_t*)(time_iters_[idx]->read(&len));
heap_time_.insert(std::make_pair(timev, idx));
time_iters_[idx]->next();
} else {
get_next_tsblock(idx, false);
}
if (skip_row) {
continue;
}
// Emit this row and decrement limit.
if (remaining_limit_ > 0) {
remaining_limit_--;
}
has_next = true;
return E_OK;
}
}
// Multi-path: apply offset/limit at merge layer.
while (true) {
row_record_->reset();
if (heap_time_.size() == 0) {
has_next = false;
return E_OK;
}
// Check limit (limit == 0 means no more rows needed).
if (remaining_limit_ == 0) {
has_next = false;
return E_OK;
}
int64_t time = heap_time_.begin()->first;
row_record_->set_timestamp(time);
row_record_->get_field(0)->set_value(INT64, &time, get_len(INT64), pa_);
uint32_t count = heap_time_.count(time);
std::multimap<int64_t, uint32_t>::iterator iter = heap_time_.find(time);
for (uint32_t i = 0; i < count; ++i) {
uint32_t len = 0;
bool is_null_val = false;
auto val_datatype = value_iters_[iter->second]->get_data_type();
void* val_ptr =
value_iters_[iter->second]->read(&len, &is_null_val);
if (!is_null_val) {
row_record_->get_field(iter->second + 1)
->set_value(val_datatype, val_ptr, len, pa_);
}
value_iters_[iter->second]->next();
if (!time_iters_[iter->second]->end()) {
int64_t timev =
*(int64_t*)(time_iters_[iter->second]->read(&len));
heap_time_.insert(std::make_pair(timev, iter->second));
time_iters_[iter->second]->next();
} else {
// Pass merge_cursor (current time) as min_time_hint
// to help SSI skip chunks/pages that are entirely before
// the current merge position.
get_next_tsblock_with_hint(iter->second, false, time);
}
std::multimap<int64_t, uint32_t>::iterator cur = iter;
iter++; // cppcheck-suppress postfixOperator
heap_time_.erase(cur);
}
// Apply offset: skip this row.
if (remaining_offset_ > 0) {
remaining_offset_--;
continue;
}
// Emit this row and decrement limit.
if (remaining_limit_ > 0) {
remaining_limit_--;
}
has_next = true;
return E_OK;
}
}
bool QDSWithoutTimeGenerator::is_null(const std::string& column_name) {
auto iter = index_lookup_.find(column_name);
if (iter == index_lookup_.end()) {
return true;
} else {
return is_null(iter->second + 1);
}
}
bool QDSWithoutTimeGenerator::is_null(uint32_t column_index) {
return row_record_->get_field(column_index - 1) == nullptr ||
row_record_->get_field(column_index - 1)->type_ == NULL_TYPE;
}
RowRecord* QDSWithoutTimeGenerator::get_row_record() { return row_record_; }
std::shared_ptr<ResultSetMetadata> QDSWithoutTimeGenerator::get_metadata() {
return result_set_metadata_;
}
int QDSWithoutTimeGenerator::get_next_tsblock(uint32_t index, bool alloc_mem) {
if (tsblocks_[index] != nullptr) {
delete time_iters_[index];
time_iters_[index] = nullptr;
delete value_iters_[index];
value_iters_[index] = nullptr;
tsblocks_[index]->reset();
}
int ret = ssi_vec_[index]->get_next(tsblocks_[index], alloc_mem);
if (IS_SUCC(ret)) {
time_iters_[index] = new ColIterator(0, tsblocks_[index]);
uint32_t len = 0;
int64_t time = *(int64_t*)(time_iters_[index]->read(&len));
time_iters_[index]->next();
heap_time_.insert(std::pair<uint64_t, uint32_t>(time, index));
value_iters_[index] = new ColIterator(1, tsblocks_[index]);
} else if (ret == E_NO_MORE_DATA) {
if (time_iters_[index]) {
delete time_iters_[index];
time_iters_[index] = nullptr;
}
if (value_iters_[index]) {
delete value_iters_[index];
value_iters_[index] = nullptr;
}
if (tsblocks_[index]) {
ssi_vec_[index]->destroy();
tsblocks_[index] = nullptr;
}
ret = E_OK;
}
return ret;
}
int QDSWithoutTimeGenerator::get_next_tsblock_with_hint(uint32_t index,
bool alloc_mem,
int64_t min_time_hint) {
if (tsblocks_[index] != nullptr) {
delete time_iters_[index];
time_iters_[index] = nullptr;
delete value_iters_[index];
value_iters_[index] = nullptr;
tsblocks_[index]->reset();
}
int ret = ssi_vec_[index]->get_next(tsblocks_[index], alloc_mem, nullptr,
min_time_hint);
if (IS_SUCC(ret)) {
time_iters_[index] = new ColIterator(0, tsblocks_[index]);
uint32_t len = 0;
int64_t time = *(int64_t*)(time_iters_[index]->read(&len));
time_iters_[index]->next();
heap_time_.insert(std::pair<uint64_t, uint32_t>(time, index));
value_iters_[index] = new ColIterator(1, tsblocks_[index]);
} else if (ret == E_NO_MORE_DATA) {
if (time_iters_[index]) {
delete time_iters_[index];
time_iters_[index] = nullptr;
}
if (value_iters_[index]) {
delete value_iters_[index];
value_iters_[index] = nullptr;
}
if (tsblocks_[index]) {
ssi_vec_[index]->destroy();
tsblocks_[index] = nullptr;
}
ret = E_OK;
}
return ret;
}
} // namespace storage