| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * License); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| */ |
| |
| #include "single_device_tsblock_reader.h" |
| |
| namespace storage { |
| |
| SingleDeviceTsBlockReader::SingleDeviceTsBlockReader( |
| DeviceQueryTask* device_query_task, uint32_t block_size, |
| IMetadataQuerier* metadata_querier, TsFileIOReader* tsfile_io_reader, |
| Filter* time_filter, Filter* field_filter) |
| : device_query_task_(device_query_task), |
| field_filter_(field_filter), |
| block_size_(block_size), |
| tuple_desc_(), |
| tsfile_io_reader_(tsfile_io_reader) {} |
| |
| int SingleDeviceTsBlockReader::init(DeviceQueryTask* device_query_task, |
| uint32_t block_size, Filter* time_filter, |
| Filter* field_filter) { |
| int ret = common::E_OK; |
| pa_.init(512, common::AllocModID::MOD_TSFILE_READER); |
| tuple_desc_.reset(); |
| auto table_schema = device_query_task->get_table_schema(); |
| tuple_desc_.push_back(common::g_time_column_schema); |
| for (const auto& column_name : device_query_task_->get_column_names()) { |
| common::ColumnSchema column_schema( |
| table_schema->get_column_schema(column_name)); |
| if (column_schema.is_valid() && |
| column_schema.data_type_ != common::VECTOR) { |
| tuple_desc_.push_back(column_schema); |
| } |
| } |
| time_column_index_ = 0; |
| if (RET_FAIL(common::TsBlock::create_tsblock(&tuple_desc_, current_block_, |
| block_size))) { |
| return ret; |
| } |
| col_appenders_.resize(tuple_desc_.get_column_count()); |
| for (uint32_t i = 0; i < tuple_desc_.get_column_count(); i++) { |
| col_appenders_[i] = new common::ColAppender(i, current_block_); |
| } |
| row_appender_ = new common::RowAppender(current_block_); |
| std::vector<ITimeseriesIndex*> time_series_indexs( |
| device_query_task_->get_column_mapping() |
| ->get_measurement_columns() |
| .size()); |
| if (RET_FAIL(tsfile_io_reader_->get_timeseries_indexes( |
| device_query_task->get_device_id(), |
| device_query_task->get_column_mapping()->get_measurement_columns(), |
| time_series_indexs, pa_))) { |
| return ret; |
| } |
| for (const auto& time_series_index : time_series_indexs) { |
| construct_column_context(time_series_index, time_filter); |
| } |
| |
| // There is no data in this single device tsblock reader. |
| if (field_column_contexts_.empty()) { |
| delete current_block_; |
| current_block_ = nullptr; |
| return common::E_OK; |
| } |
| |
| for (const auto& id_column : |
| device_query_task->get_column_mapping()->get_id_columns()) { |
| const auto& column_pos_in_result = |
| device_query_task->get_column_mapping()->get_column_pos(id_column); |
| int column_pos_in_id = table_schema->find_id_column_order(id_column) + |
| (!table_schema->is_virtual_table()); |
| id_column_contexts_.insert(std::make_pair( |
| id_column, |
| IdColumnContext(column_pos_in_result, column_pos_in_id))); |
| } |
| return ret; |
| } |
| |
| int SingleDeviceTsBlockReader::has_next(bool& has_next) { |
| if (!last_block_returned_) { |
| has_next = true; |
| return common::E_OK; |
| } |
| |
| if (field_column_contexts_.empty()) { |
| has_next = false; |
| return common::E_OK; |
| } |
| |
| for (auto col_appender : col_appenders_) { |
| col_appender->reset(); |
| } |
| |
| current_block_->reset(); |
| |
| bool next_time_set = false; |
| next_time_ = -1; |
| |
| std::vector<MeasurementColumnContext*> min_time_columns; |
| while (current_block_->get_row_count() < block_size_) { |
| for (auto& column_context : field_column_contexts_) { |
| int64_t time; |
| if (IS_FAIL(column_context.second->get_current_time(time))) { |
| continue; |
| } |
| if (!next_time_set || time < next_time_) { |
| next_time_set = true; |
| next_time_ = time; |
| min_time_columns.clear(); |
| min_time_columns.push_back(column_context.second); |
| } else if (time == next_time_) { |
| min_time_columns.push_back(column_context.second); |
| } |
| } |
| if (IS_FAIL(fill_measurements(min_time_columns))) { |
| has_next = false; |
| return common::E_OK; |
| } else { |
| next_time_set = false; |
| next_time_ = -1; |
| } |
| |
| if (field_column_contexts_.empty()) { |
| break; |
| } |
| } |
| int ret = common::E_OK; |
| if (current_block_->get_row_count() > 0) { |
| if (RET_FAIL(fill_ids())) { |
| return ret; |
| } |
| current_block_->fill_trailling_nulls(); |
| last_block_returned_ = false; |
| has_next = true; |
| return ret; |
| } |
| has_next = false; |
| return ret; // return value is not used |
| } |
| |
| int SingleDeviceTsBlockReader::fill_measurements( |
| std::vector<MeasurementColumnContext*>& column_contexts) { |
| int ret = common::E_OK; |
| if (field_filter_ == |
| nullptr /*TODO: || field_filter_->satisfy(column_contexts)*/) { |
| row_appender_->add_row(); |
| if (!col_appenders_[time_column_index_]->add_row()) { |
| assert(false); |
| } |
| col_appenders_[time_column_index_]->append((const char*)&next_time_, |
| sizeof(next_time_)); |
| for (auto& column_context : column_contexts) { |
| column_context->fill_into(col_appenders_); |
| if (RET_FAIL(advance_column(column_context))) { |
| break; |
| } |
| } |
| |
| // Align all columns, filling with nulls where data is missing. |
| uint32_t row_count = |
| col_appenders_[time_column_index_]->get_col_row_count(); |
| for (auto& col_appender : col_appenders_) { |
| if (tuple_desc_.get_column_category( |
| col_appender->get_column_index()) != |
| common::ColumnCategory::FIELD) { |
| continue; |
| } |
| while (col_appender->get_col_row_count() < row_count) { |
| col_appender->add_row(); |
| col_appender->append_null(); |
| } |
| } |
| } |
| return ret; |
| } |
| |
| int SingleDeviceTsBlockReader::advance_column( |
| MeasurementColumnContext* column_context) { |
| int ret = column_context->move_iter(); |
| if (ret == common::E_NO_MORE_DATA) { |
| column_context->remove_from(field_column_contexts_); |
| ret = common::E_OK; |
| } |
| return ret; |
| } |
| |
| void SingleMeasurementColumnContext::remove_from( |
| std::map<std::string, MeasurementColumnContext*>& column_context_map) { |
| auto iter = column_context_map.find(column_name_); |
| if (iter != column_context_map.end()) { |
| delete iter->second; |
| column_context_map.erase(iter); |
| } |
| } |
| |
| int SingleDeviceTsBlockReader::fill_ids() { |
| int ret = common::E_OK; |
| for (const auto& entry : id_column_contexts_) { |
| const auto& id_column_context = entry.second; |
| for (int32_t pos : id_column_context.pos_in_result_) { |
| std::string* device_tag = nullptr; |
| auto device_id = device_query_task_->get_device_id(); |
| int32_t pos_in_device_id = id_column_context.pos_in_device_id_; |
| if (pos_in_device_id >= 0 && static_cast<size_t>(pos_in_device_id) < |
| device_id->get_split_seg_num()) { |
| device_tag = device_id->get_split_segname_at(pos_in_device_id); |
| } |
| |
| if (device_tag == nullptr) { |
| ret = col_appenders_[pos + 1]->fill_null( |
| current_block_->get_row_count()); |
| if (ret != common::E_OK) { |
| return ret; |
| } |
| continue; |
| } |
| |
| if (RET_FAIL(col_appenders_[pos + 1]->fill( |
| device_tag->c_str(), device_tag->length(), |
| current_block_->get_row_count()))) { |
| return ret; |
| } |
| } |
| } |
| return ret; |
| } |
| |
| int SingleDeviceTsBlockReader::next(common::TsBlock*& ret_block) { |
| bool next = false; |
| has_next(next); |
| if (!next) { |
| return common::E_NO_MORE_DATA; |
| } |
| last_block_returned_ = true; |
| ret_block = current_block_; |
| return common::E_OK; |
| } |
| |
| void SingleDeviceTsBlockReader::close() { |
| for (auto& column_context : field_column_contexts_) { |
| delete column_context.second; |
| } |
| for (auto& col_appender : col_appenders_) { |
| if (col_appender) { |
| delete col_appender; |
| col_appender = nullptr; |
| } |
| } |
| if (row_appender_) { |
| delete row_appender_; |
| row_appender_ = nullptr; |
| } |
| if (device_query_task_) { |
| device_query_task_->~DeviceQueryTask(); |
| } |
| if (current_block_) { |
| delete current_block_; |
| current_block_ = nullptr; |
| } |
| } |
| |
| int SingleDeviceTsBlockReader::construct_column_context( |
| const ITimeseriesIndex* time_series_index, Filter* time_filter) { |
| int ret = common::E_OK; |
| if (time_series_index == nullptr || |
| (time_series_index->get_data_type() != common::TSDataType::VECTOR && |
| time_series_index->get_chunk_meta_list()->empty())) { |
| } else if (time_series_index->get_data_type() == common::VECTOR) { |
| const AlignedTimeseriesIndex* aligned_time_series_index = |
| dynamic_cast<const AlignedTimeseriesIndex*>(time_series_index); |
| if (aligned_time_series_index == nullptr) { |
| assert(false); |
| } |
| // Todo: when multi value index is supported in aligned time series |
| // index, we need to change the column context to |
| // VectorMeasurementColumnContext |
| SingleMeasurementColumnContext* column_context = |
| new SingleMeasurementColumnContext(tsfile_io_reader_); |
| // May no more data. just return to avoid null pointer. |
| if (RET_FAIL(column_context->init( |
| device_query_task_, time_series_index, time_filter, |
| device_query_task_->get_column_mapping()->get_column_pos( |
| time_series_index->get_measurement_name().to_std_string()), |
| pa_))) { |
| delete column_context; |
| return ret; |
| } |
| field_column_contexts_.insert(std::make_pair( |
| time_series_index->get_measurement_name().to_std_string(), |
| column_context)); |
| } else { |
| SingleMeasurementColumnContext* column_context = |
| new SingleMeasurementColumnContext(tsfile_io_reader_); |
| if (RET_FAIL(column_context->init( |
| device_query_task_, time_series_index, time_filter, |
| device_query_task_->get_column_mapping()->get_column_pos( |
| time_series_index->get_measurement_name().to_std_string()), |
| pa_))) { |
| delete column_context; |
| return ret; |
| } |
| |
| field_column_contexts_.insert(std::make_pair( |
| time_series_index->get_measurement_name().to_std_string(), |
| column_context)); |
| } |
| return ret; |
| } |
| |
| int SingleMeasurementColumnContext::init( |
| DeviceQueryTask* device_query_task, |
| const ITimeseriesIndex* time_series_index, Filter* time_filter, |
| const std::vector<int32_t>& pos_in_result, common::PageArena& pa) { |
| int ret = common::E_OK; |
| pos_in_result_ = pos_in_result; |
| column_name_ = time_series_index->get_measurement_name().to_std_string(); |
| if (RET_FAIL(tsfile_io_reader_->alloc_ssi( |
| device_query_task->get_device_id(), |
| time_series_index->get_measurement_name().to_std_string(), ssi_, pa, |
| time_filter))) { |
| } else if (RET_FAIL(get_next_tsblock(true))) { |
| } |
| return ret; |
| } |
| |
| int SingleMeasurementColumnContext::get_next_tsblock(bool alloc_mem) { |
| int ret = common::E_OK; |
| if (tsblock_ != nullptr) { |
| if (time_iter_) { |
| delete time_iter_; |
| time_iter_ = nullptr; |
| } |
| if (value_iter_) { |
| delete value_iter_; |
| value_iter_ = nullptr; |
| } |
| tsblock_->reset(); |
| } |
| if (RET_FAIL(ssi_->get_next(tsblock_, alloc_mem))) { |
| if (time_iter_) { |
| delete time_iter_; |
| time_iter_ = nullptr; |
| } |
| if (value_iter_) { |
| delete value_iter_; |
| value_iter_ = nullptr; |
| } |
| if (tsblock_) { |
| ssi_->destroy(); |
| tsblock_ = nullptr; |
| } |
| } else { |
| time_iter_ = new common::ColIterator(0, tsblock_); |
| value_iter_ = new common::ColIterator(1, tsblock_); |
| } |
| return ret; |
| } |
| |
| int SingleMeasurementColumnContext::get_current_time(int64_t& time) { |
| if (time_iter_->end()) { |
| return common::E_NO_MORE_DATA; |
| } |
| uint32_t len = 0; |
| time = *(int64_t*)(time_iter_->read(&len)); |
| return common::E_OK; |
| } |
| |
| int SingleMeasurementColumnContext::get_current_value(char*& value, |
| uint32_t& len) { |
| if (value_iter_->end()) { |
| return common::E_NO_MORE_DATA; |
| } |
| bool is_null = false; |
| value = value_iter_->read(&len, &is_null); |
| return common::E_OK; |
| } |
| |
| int SingleMeasurementColumnContext::move_iter() { |
| int ret = common::E_OK; |
| time_iter_->next(); |
| value_iter_->next(); |
| if (time_iter_->end()) { |
| if (RET_FAIL(get_next_tsblock(false))) { |
| return ret; |
| } |
| } |
| return ret; |
| } |
| |
| void SingleMeasurementColumnContext::fill_into( |
| std::vector<common::ColAppender*>& col_appenders) { |
| char* val = nullptr; |
| uint32_t len = 0; |
| if (IS_FAIL(get_current_value(val, len))) { |
| return; |
| } |
| for (int32_t pos : pos_in_result_) { |
| col_appenders[pos + 1]->add_row(); |
| if (val == nullptr) { |
| col_appenders[pos + 1]->append_null(); |
| } else { |
| col_appenders[pos + 1]->append(val, len); |
| } |
| } |
| } |
| |
| } // namespace storage |