| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "vec/olap/block_reader.h" |
| |
| #include <gen_cpp/olap_file.pb.h> |
| #include <glog/logging.h> |
| #include <stdint.h> |
| |
| #include <algorithm> |
| #include <boost/iterator/iterator_facade.hpp> |
| #include <memory> |
| #include <ostream> |
| #include <string> |
| |
| // IWYU pragma: no_include <opentelemetry/common/threadlocal.h> |
| #include "cloud/config.h" |
| #include "common/compiler_util.h" // IWYU pragma: keep |
| #include "common/status.h" |
| #include "exprs/function_filter.h" |
| #include "olap/like_column_predicate.h" |
| #include "olap/olap_common.h" |
| #include "olap/olap_define.h" |
| #include "olap/rowset/rowset.h" |
| #include "olap/rowset/rowset_reader_context.h" |
| #include "olap/tablet.h" |
| #include "olap/tablet_schema.h" |
| #include "runtime/runtime_state.h" |
| #include "vec/aggregate_functions/aggregate_function_reader.h" |
| #include "vec/columns/column_nullable.h" |
| #include "vec/columns/column_vector.h" |
| #include "vec/core/column_with_type_and_name.h" |
| #include "vec/data_types/data_type_number.h" |
| #include "vec/olap/vcollect_iterator.h" |
| |
| namespace doris { |
| class ColumnPredicate; |
| } // namespace doris |
| |
| namespace doris::vectorized { |
| using namespace ErrorCode; |
| |
| BlockReader::~BlockReader() { |
| for (int i = 0; i < _agg_functions.size(); ++i) { |
| _agg_functions[i]->destroy(_agg_places[i]); |
| delete[] _agg_places[i]; |
| } |
| } |
| |
| Status BlockReader::next_block_with_aggregation(Block* block, bool* eof) { |
| auto res = (this->*_next_block_func)(block, eof); |
| if (!config::is_cloud_mode()) { |
| if (!res.ok()) [[unlikely]] { |
| static_cast<Tablet*>(_tablet.get())->report_error(res); |
| } |
| } |
| return res; |
| } |
| |
| bool BlockReader::_rowsets_not_mono_asc_disjoint(const ReaderParams& read_params) { |
| std::string pre_rs_last_key; |
| bool pre_rs_key_bounds_truncated {false}; |
| const std::vector<RowSetSplits>& rs_splits = read_params.rs_splits; |
| for (const auto& rs_split : rs_splits) { |
| if (rs_split.rs_reader->rowset()->num_rows() == 0) { |
| continue; |
| } |
| if (rs_split.rs_reader->rowset()->is_segments_overlapping()) { |
| return true; |
| } |
| std::string rs_first_key; |
| bool has_first_key = rs_split.rs_reader->rowset()->first_key(&rs_first_key); |
| if (!has_first_key) { |
| return true; |
| } |
| bool cur_rs_key_bounds_truncated { |
| rs_split.rs_reader->rowset()->is_segments_key_bounds_truncated()}; |
| if (!Slice::lhs_is_strictly_less_than_rhs(Slice {pre_rs_last_key}, |
| pre_rs_key_bounds_truncated, Slice {rs_first_key}, |
| cur_rs_key_bounds_truncated)) { |
| return true; |
| } |
| bool has_last_key = rs_split.rs_reader->rowset()->last_key(&pre_rs_last_key); |
| pre_rs_key_bounds_truncated = cur_rs_key_bounds_truncated; |
| CHECK(has_last_key); |
| } |
| return false; |
| } |
| |
| Status BlockReader::_init_collect_iter(const ReaderParams& read_params) { |
| auto res = _capture_rs_readers(read_params); |
| if (!res.ok()) { |
| LOG(WARNING) << "fail to init reader when _capture_rs_readers. res:" << res |
| << ", tablet_id:" << read_params.tablet->tablet_id() |
| << ", schema_hash:" << read_params.tablet->schema_hash() |
| << ", reader_type:" << int(read_params.reader_type) |
| << ", version:" << read_params.version; |
| return res; |
| } |
| // check if rowsets are noneoverlapping |
| { |
| SCOPED_RAW_TIMER(&_stats.block_reader_vcollect_iter_init_timer_ns); |
| _is_rowsets_overlapping = _rowsets_not_mono_asc_disjoint(read_params); |
| _vcollect_iter.init(this, _is_rowsets_overlapping, read_params.read_orderby_key, |
| read_params.read_orderby_key_reverse); |
| } |
| |
| std::vector<RowsetReaderSharedPtr> valid_rs_readers; |
| RuntimeState* runtime_state = read_params.runtime_state; |
| |
| { |
| SCOPED_RAW_TIMER(&_stats.block_reader_rs_readers_init_timer_ns); |
| for (int i = 0; i < read_params.rs_splits.size(); ++i) { |
| if (runtime_state != nullptr && runtime_state->is_cancelled()) { |
| return runtime_state->cancel_reason(); |
| } |
| |
| auto& rs_split = read_params.rs_splits[i]; |
| |
| // _vcollect_iter.topn_next() will init rs_reader by itself |
| if (!_vcollect_iter.use_topn_next()) { |
| RETURN_IF_ERROR(rs_split.rs_reader->init(&_reader_context, rs_split)); |
| } |
| |
| Status res = _vcollect_iter.add_child(rs_split); |
| if (!res.ok() && !res.is<END_OF_FILE>()) { |
| LOG(WARNING) << "failed to add child to iterator, err=" << res; |
| return res; |
| } |
| if (res.ok()) { |
| valid_rs_readers.push_back(rs_split.rs_reader); |
| } |
| } |
| } |
| // read_params.vir_cid_to_idx_in_block |
| { |
| SCOPED_RAW_TIMER(&_stats.block_reader_build_heap_init_timer_ns); |
| RETURN_IF_ERROR(_vcollect_iter.build_heap(valid_rs_readers)); |
| // _vcollect_iter.topn_next() can not use current_row |
| if (!_vcollect_iter.use_topn_next()) { |
| auto status = _vcollect_iter.current_row(&_next_row); |
| _eof = status.is<END_OF_FILE>(); |
| } |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status BlockReader::_init_agg_state(const ReaderParams& read_params) { |
| if (_eof) { |
| return Status::OK(); |
| } |
| |
| _stored_data_columns = |
| _next_row.block->create_same_struct_block(_reader_context.batch_size)->mutate_columns(); |
| |
| _stored_has_null_tag.resize(_stored_data_columns.size()); |
| _stored_has_variable_length_tag.resize(_stored_data_columns.size()); |
| |
| auto& tablet_schema = *_tablet_schema; |
| for (auto idx : _agg_columns_idx) { |
| auto column = tablet_schema.column( |
| read_params.origin_return_columns->at(_return_columns_loc[idx])); |
| AggregateFunctionPtr function = column.get_aggregate_function( |
| vectorized::AGG_READER_SUFFIX, read_params.get_be_exec_version()); |
| |
| // to avoid coredump when something goes wrong(i.e. column missmatch) |
| if (!function) { |
| return Status::InternalError( |
| "Failed to init reader when init agg state: " |
| "tablet_id: {}, schema_hash: {}, reader_type: {}, version: {}", |
| read_params.tablet->tablet_id(), read_params.tablet->schema_hash(), |
| int(read_params.reader_type), read_params.version.to_string()); |
| } |
| _agg_functions.push_back(function); |
| // create aggregate data |
| AggregateDataPtr place = new char[function->size_of_data()]; |
| SAFE_CREATE(function->create(place), { |
| _agg_functions.pop_back(); |
| delete[] place; |
| }); |
| _agg_places.push_back(place); |
| |
| // calculate `_has_variable_length_tag` tag. like string, array, map |
| _stored_has_variable_length_tag[idx] = _stored_data_columns[idx]->is_variable_length(); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status BlockReader::init(const ReaderParams& read_params) { |
| RETURN_IF_ERROR(TabletReader::init(read_params)); |
| |
| int32_t return_column_size = read_params.origin_return_columns->size(); |
| // _return_columns_loc is a mapping. |
| // 记录了 _return_columns 中的每个 cid 在 _origin_return_columns 中的位置 |
| _return_columns_loc.resize(read_params.return_columns.size()); |
| for (int i = 0; i < return_column_size; ++i) { |
| auto cid = read_params.origin_return_columns->at(i); |
| // For each original cid, find the index in return_columns |
| for (int j = 0; j < read_params.return_columns.size(); ++j) { |
| if (read_params.return_columns[j] == cid) { |
| if (j < _tablet->num_key_columns() || _tablet->keys_type() != AGG_KEYS) { |
| _normal_columns_idx.emplace_back(j); |
| } else { |
| _agg_columns_idx.emplace_back(j); |
| } |
| _return_columns_loc[j] = i; |
| break; |
| } |
| } |
| } |
| /* |
| where abs() |
| */ |
| auto status = _init_collect_iter(read_params); |
| if (!status.ok()) [[unlikely]] { |
| if (!config::is_cloud_mode()) { |
| static_cast<Tablet*>(_tablet.get())->report_error(status); |
| } |
| return status; |
| } |
| |
| if (_direct_mode) { |
| _next_block_func = &BlockReader::_direct_next_block; |
| return Status::OK(); |
| } |
| |
| switch (tablet()->keys_type()) { |
| // What is the difference between direct_mode and DUP_KEYS? |
| case KeysType::DUP_KEYS: |
| _next_block_func = &BlockReader::_direct_next_block; |
| break; |
| case KeysType::UNIQUE_KEYS: |
| if (read_params.reader_type == ReaderType::READER_QUERY && |
| _reader_context.enable_unique_key_merge_on_write) { |
| _next_block_func = &BlockReader::_direct_next_block; |
| } else { |
| _next_block_func = &BlockReader::_unique_key_next_block; |
| if (_filter_delete) { |
| _delete_filter_column = ColumnUInt8::create(); |
| } |
| } |
| break; |
| case KeysType::AGG_KEYS: |
| _next_block_func = &BlockReader::_agg_key_next_block; |
| RETURN_IF_ERROR(_init_agg_state(read_params)); |
| break; |
| default: |
| DCHECK(false) << "No next row function for type:" << tablet()->keys_type(); |
| break; |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status BlockReader::_direct_next_block(Block* block, bool* eof) { |
| auto res = _vcollect_iter.next(block); |
| if (UNLIKELY(!res.ok() && !res.is<END_OF_FILE>())) { |
| return res; |
| } |
| *eof = res.is<END_OF_FILE>(); |
| _eof = *eof; |
| if (UNLIKELY(_reader_context.record_rowids)) { |
| res = _vcollect_iter.current_block_row_locations(&_block_row_locations); |
| if (UNLIKELY(!res.ok() && res != Status::Error<END_OF_FILE>(""))) { |
| return res; |
| } |
| DCHECK_EQ(_block_row_locations.size(), block->rows()); |
| } |
| return Status::OK(); |
| } |
| |
| Status BlockReader::_direct_agg_key_next_block(Block* block, bool* eof) { |
| return Status::OK(); |
| } |
| |
| Status BlockReader::_agg_key_next_block(Block* block, bool* eof) { |
| if (UNLIKELY(_eof)) { |
| *eof = true; |
| return Status::OK(); |
| } |
| |
| auto target_block_row = 0; |
| auto merged_row = 0; |
| auto target_columns = block->mutate_columns(); |
| RETURN_IF_ERROR(_insert_data_normal(target_columns)); |
| target_block_row++; |
| _append_agg_data(target_columns); |
| /* |
| colK, cloA |
| select colA from tbl where abs(colA) > 10; |
| |
| block: colA |
| |
| */ |
| while (true) { |
| auto res = _vcollect_iter.next(&_next_row); |
| if (UNLIKELY(res.is<END_OF_FILE>())) { |
| _eof = true; |
| *eof = true; |
| break; |
| } |
| if (UNLIKELY(!res.ok())) { |
| LOG(WARNING) << "next failed: " << res; |
| return res; |
| } |
| |
| if (!_get_next_row_same()) { |
| if (target_block_row == _reader_context.batch_size) { |
| break; |
| } |
| _agg_data_counters.push_back(_last_agg_data_counter); |
| _last_agg_data_counter = 0; |
| |
| RETURN_IF_ERROR(_insert_data_normal(target_columns)); |
| |
| target_block_row++; |
| } else { |
| merged_row++; |
| } |
| |
| _append_agg_data(target_columns); |
| } |
| |
| _agg_data_counters.push_back(_last_agg_data_counter); |
| _last_agg_data_counter = 0; |
| _update_agg_data(target_columns); |
| block->set_columns(std::move(target_columns)); |
| |
| _merged_rows += merged_row; |
| return Status::OK(); |
| } |
| |
| Status BlockReader::_unique_key_next_block(Block* block, bool* eof) { |
| if (UNLIKELY(_eof)) { |
| *eof = true; |
| return Status::OK(); |
| } |
| |
| auto target_block_row = 0; |
| auto target_columns = block->mutate_columns(); |
| if (UNLIKELY(_reader_context.record_rowids)) { |
| _block_row_locations.resize(_reader_context.batch_size); |
| } |
| |
| do { |
| RETURN_IF_ERROR(_insert_data_normal(target_columns)); |
| |
| if (UNLIKELY(_reader_context.record_rowids)) { |
| _block_row_locations[target_block_row] = _vcollect_iter.current_row_location(); |
| } |
| target_block_row++; |
| |
| // the version is in reverse order, the first row is the highest version, |
| // in UNIQUE_KEY highest version is the final result, there is no need to |
| // merge the lower versions |
| auto res = _vcollect_iter.next(&_next_row); |
| if (UNLIKELY(res.is<END_OF_FILE>())) { |
| _eof = true; |
| *eof = true; |
| if (UNLIKELY(_reader_context.record_rowids)) { |
| _block_row_locations.resize(target_block_row); |
| } |
| break; |
| } |
| |
| if (UNLIKELY(!res.ok())) { |
| LOG(WARNING) << "next failed: " << res; |
| return res; |
| } |
| } while (target_block_row < _reader_context.batch_size); |
| |
| if (_delete_sign_available) { |
| int delete_sign_idx = _reader_context.tablet_schema->field_index(DELETE_SIGN); |
| DCHECK(delete_sign_idx > 0); |
| if (delete_sign_idx <= 0 || delete_sign_idx >= target_columns.size()) { |
| LOG(WARNING) << "tablet_id: " << tablet()->tablet_id() << " delete sign idx " |
| << delete_sign_idx |
| << " not invalid, skip filter delete in base compaction"; |
| return Status::OK(); |
| } |
| MutableColumnPtr delete_filter_column = (*std::move(_delete_filter_column)).mutate(); |
| reinterpret_cast<ColumnUInt8*>(delete_filter_column.get())->resize(target_block_row); |
| |
| auto* __restrict filter_data = |
| reinterpret_cast<ColumnUInt8*>(delete_filter_column.get())->get_data().data(); |
| auto* __restrict delete_data = |
| reinterpret_cast<ColumnInt8*>(target_columns[delete_sign_idx].get()) |
| ->get_data() |
| .data(); |
| int delete_count = 0; |
| for (int i = 0; i < target_block_row; ++i) { |
| bool sign = (delete_data[i] == 0); |
| filter_data[i] = sign; |
| if (UNLIKELY(!sign)) { |
| if (UNLIKELY(_reader_context.record_rowids)) { |
| _block_row_locations[i].row_id = -1; |
| delete_count++; |
| } |
| } |
| } |
| auto target_columns_size = target_columns.size(); |
| ColumnWithTypeAndName column_with_type_and_name {_delete_filter_column, |
| std::make_shared<DataTypeUInt8>(), |
| "__DORIS_COMPACTION_FILTER__"}; |
| block->set_columns(std::move(target_columns)); |
| block->insert(column_with_type_and_name); |
| RETURN_IF_ERROR(Block::filter_block(block, target_columns_size, target_columns_size)); |
| _stats.rows_del_filtered += target_block_row - block->rows(); |
| DCHECK(block->try_get_by_name("__DORIS_COMPACTION_FILTER__") == nullptr); |
| if (UNLIKELY(_reader_context.record_rowids)) { |
| DCHECK_EQ(_block_row_locations.size(), block->rows() + delete_count); |
| } |
| } else { |
| block->set_columns(std::move(target_columns)); |
| } |
| return Status::OK(); |
| } |
| |
| Status BlockReader::_insert_data_normal(MutableColumns& columns) { |
| auto block = _next_row.block.get(); |
| |
| RETURN_IF_CATCH_EXCEPTION({ |
| for (auto idx : _normal_columns_idx) { |
| columns[_return_columns_loc[idx]]->insert_from(*block->get_by_position(idx).column, |
| _next_row.row_pos); |
| } |
| }); |
| return Status::OK(); |
| } |
| |
| void BlockReader::_append_agg_data(MutableColumns& columns) { |
| _stored_row_ref.push_back(_next_row); |
| _last_agg_data_counter++; |
| |
| // execute aggregate when have `batch_size` column or some ref invalid soon |
| bool is_last = (_next_row.block->rows() == _next_row.row_pos + 1); |
| if (is_last || _stored_row_ref.size() == _reader_context.batch_size) { |
| _update_agg_data(columns); |
| } |
| } |
| |
| void BlockReader::_update_agg_data(MutableColumns& columns) { |
| // copy data to stored block |
| size_t copy_size = _copy_agg_data(); |
| |
| // calculate has_null_tag |
| for (auto idx : _agg_columns_idx) { |
| _stored_has_null_tag[idx] = _stored_data_columns[idx]->has_null(copy_size); |
| } |
| |
| // calculate aggregate and insert |
| int counter_sum = 0; |
| for (int counter : _agg_data_counters) { |
| _update_agg_value(columns, counter_sum, counter_sum + counter - 1); |
| counter_sum += counter; |
| } |
| |
| // some key still has value at next block, so do not insert |
| if (_last_agg_data_counter) { |
| _update_agg_value(columns, counter_sum, counter_sum + _last_agg_data_counter - 1, false); |
| _last_agg_data_counter = 0; |
| } |
| |
| _agg_data_counters.clear(); |
| } |
| |
| size_t BlockReader::_copy_agg_data() { |
| size_t copy_size = _stored_row_ref.size(); |
| |
| for (size_t i = 0; i < copy_size; i++) { |
| auto& ref = _stored_row_ref[i]; |
| _temp_ref_map[ref.block.get()].emplace_back(ref.row_pos, i); |
| } |
| |
| for (auto idx : _agg_columns_idx) { |
| auto& dst_column = _stored_data_columns[idx]; |
| if (_stored_has_variable_length_tag[idx]) { |
| //variable length type should replace ordered |
| dst_column->clear(); |
| for (size_t i = 0; i < copy_size; i++) { |
| auto& ref = _stored_row_ref[i]; |
| dst_column->insert_from(*ref.block->get_by_position(idx).column, ref.row_pos); |
| } |
| } else { |
| for (auto& it : _temp_ref_map) { |
| if (!it.second.empty()) { |
| auto& src_column = *it.first->get_by_position(idx).column; |
| for (auto& pos : it.second) { |
| dst_column->replace_column_data(src_column, pos.first, pos.second); |
| } |
| } |
| } |
| } |
| } |
| |
| for (auto& it : _temp_ref_map) { |
| it.second.clear(); |
| } |
| _stored_row_ref.clear(); |
| |
| return copy_size; |
| } |
| |
| void BlockReader::_update_agg_value(MutableColumns& columns, int begin, int end, bool is_close) { |
| for (int i = 0; i < _agg_columns_idx.size(); i++) { |
| auto idx = _agg_columns_idx[i]; |
| |
| AggregateFunctionPtr function = _agg_functions[i]; |
| AggregateDataPtr place = _agg_places[i]; |
| auto* column_ptr = _stored_data_columns[idx].get(); |
| |
| if (begin <= end) { |
| function->add_batch_range(begin, end, place, const_cast<const IColumn**>(&column_ptr), |
| &_arena, _stored_has_null_tag[idx]); |
| } |
| |
| if (is_close) { |
| function->insert_result_into(place, *columns[_return_columns_loc[idx]]); |
| // reset aggregate data |
| function->reset(place); |
| } |
| } |
| if (is_close) { |
| _arena.clear(); |
| } |
| } |
| |
| bool BlockReader::_get_next_row_same() { |
| if (_next_row.is_same) { |
| return true; |
| } else { |
| auto* block = _next_row.block.get(); |
| return block->get_same_bit(_next_row.row_pos); |
| } |
| } |
| |
| } // namespace doris::vectorized |