| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "vec/exprs/vexpr_context.h" |
| |
| #include <algorithm> |
| #include <cstdint> |
| #include <string> |
| |
| #include "common/compiler_util.h" // IWYU pragma: keep |
| #include "common/exception.h" |
| #include "common/status.h" |
| #include "olap/olap_common.h" |
| #include "olap/rowset/segment_v2/column_reader.h" |
| #include "runtime/runtime_state.h" |
| #include "runtime/thread_context.h" |
| #include "udf/udf.h" |
| #include "util/simd/bits.h" |
| #include "vec/columns/column.h" |
| #include "vec/columns/column_const.h" |
| #include "vec/core/column_numbers.h" |
| #include "vec/core/column_with_type_and_name.h" |
| #include "vec/core/columns_with_type_and_name.h" |
| #include "vec/exprs/vexpr.h" |
| |
| namespace doris { |
| class RowDescriptor; |
| } // namespace doris |
| |
| namespace doris::vectorized { |
| #include "common/compile_check_begin.h" |
| |
| VExprContext::~VExprContext() { |
| // In runtime filter, only create expr context to get expr root, will not call |
| // prepare or open, so that it is not need to call close. And call close may core |
| // because the function context in expr is not set. |
| if (!_prepared || !_opened) { |
| return; |
| } |
| try { |
| close(); |
| } catch (const Exception& e) { |
| LOG(WARNING) << "Exception occurs when expr context deconstruct: " << e.to_string(); |
| } |
| } |
| |
| Status VExprContext::execute(vectorized::Block* block, int* result_column_id) { |
| Status st; |
| RETURN_IF_CATCH_EXCEPTION({ |
| st = _root->execute(this, block, result_column_id); |
| _last_result_column_id = *result_column_id; |
| // We should first check the status, as some expressions might incorrectly set result_column_id, even if the st is not ok. |
| if (st.ok() && _last_result_column_id != -1) { |
| block->get_by_position(*result_column_id).column->sanity_check(); |
| RETURN_IF_ERROR( |
| block->get_by_position(*result_column_id).check_type_and_column_match()); |
| } |
| }); |
| return st; |
| } |
| |
| Status VExprContext::execute(const Block* block, ColumnPtr& result_column) { |
| Status st; |
| RETURN_IF_CATCH_EXCEPTION( |
| { st = _root->execute_column(this, block, block->rows(), result_column); }); |
| return st; |
| } |
| |
| Status VExprContext::execute(const Block* block, ColumnWithTypeAndName& result_data) { |
| Status st; |
| ColumnPtr result_column; |
| RETURN_IF_CATCH_EXCEPTION( |
| { st = _root->execute_column(this, block, block->rows(), result_column); }); |
| RETURN_IF_ERROR(st); |
| result_data.column = result_column; |
| result_data.type = execute_type(block); |
| result_data.name = _root->expr_name(); |
| return Status::OK(); |
| } |
| |
| DataTypePtr VExprContext::execute_type(const Block* block) { |
| return _root->execute_type(block); |
| } |
| |
| Status VExprContext::execute_const_expr(ColumnWithTypeAndName& result) { |
| Status st; |
| RETURN_IF_CATCH_EXCEPTION({ st = _root->execute_column(this, nullptr, 1, result.column); }); |
| RETURN_IF_ERROR(st); |
| result.type = _root->execute_type(nullptr); |
| result.name = _root->expr_name(); |
| return Status::OK(); |
| } |
| |
| [[nodiscard]] const std::string& VExprContext::expr_name() const { |
| return _root->expr_name(); |
| } |
| |
| bool VExprContext::is_blockable() const { |
| return _root->is_blockable(); |
| } |
| |
| Status VExprContext::prepare(RuntimeState* state, const RowDescriptor& row_desc) { |
| _prepared = true; |
| Status st; |
| RETURN_IF_CATCH_EXCEPTION({ st = _root->prepare(state, row_desc, this); }); |
| return st; |
| } |
| |
| Status VExprContext::open(RuntimeState* state) { |
| DCHECK(_prepared); |
| if (_opened) { |
| return Status::OK(); |
| } |
| _opened = true; |
| // Fragment-local state is only initialized for original contexts. Clones inherit the |
| // original's fragment state and only need to have thread-local state initialized. |
| FunctionContext::FunctionStateScope scope = |
| _is_clone ? FunctionContext::THREAD_LOCAL : FunctionContext::FRAGMENT_LOCAL; |
| Status st; |
| RETURN_IF_CATCH_EXCEPTION({ st = _root->open(state, this, scope); }); |
| return st; |
| } |
| |
| void VExprContext::close() { |
| // Sometimes expr context may not have a root, then it need not call close |
| if (_root == nullptr) { |
| return; |
| } |
| FunctionContext::FunctionStateScope scope = |
| _is_clone ? FunctionContext::THREAD_LOCAL : FunctionContext::FRAGMENT_LOCAL; |
| _root->close(this, scope); |
| } |
| |
| Status VExprContext::clone(RuntimeState* state, VExprContextSPtr& new_ctx) { |
| DCHECK(_prepared) << "expr context not prepared"; |
| DCHECK(_opened); |
| DCHECK(new_ctx.get() == nullptr); |
| |
| new_ctx = std::make_shared<VExprContext>(_root); |
| for (auto& _fn_context : _fn_contexts) { |
| new_ctx->_fn_contexts.push_back(_fn_context->clone()); |
| } |
| |
| new_ctx->_is_clone = true; |
| new_ctx->_prepared = true; |
| new_ctx->_opened = true; |
| // segment_v2::AnnRangeSearchRuntime should be cloned as well. |
| // The object of segment_v2::AnnRangeSearchRuntime is not shared by threads. |
| new_ctx->_ann_range_search_runtime = this->_ann_range_search_runtime; |
| |
| return _root->open(state, new_ctx.get(), FunctionContext::THREAD_LOCAL); |
| } |
| |
| void VExprContext::clone_fn_contexts(VExprContext* other) { |
| for (auto& _fn_context : _fn_contexts) { |
| other->_fn_contexts.push_back(_fn_context->clone()); |
| } |
| } |
| |
| int VExprContext::register_function_context(RuntimeState* state, const DataTypePtr& return_type, |
| const std::vector<DataTypePtr>& arg_types) { |
| _fn_contexts.push_back(FunctionContext::create_context(state, return_type, arg_types)); |
| _fn_contexts.back()->set_check_overflow_for_decimal(state->check_overflow_for_decimal()); |
| _fn_contexts.back()->set_enable_strict_mode(state->enable_strict_mode()); |
| return static_cast<int>(_fn_contexts.size()) - 1; |
| } |
| |
| Status VExprContext::evaluate_inverted_index(uint32_t segment_num_rows) { |
| Status st; |
| RETURN_IF_CATCH_EXCEPTION({ st = _root->evaluate_inverted_index(this, segment_num_rows); }); |
| return st; |
| } |
| |
| bool VExprContext::all_expr_inverted_index_evaluated() { |
| return _index_context->has_index_result_for_expr(_root.get()); |
| } |
| |
| Status VExprContext::filter_block(VExprContext* vexpr_ctx, Block* block) { |
| if (vexpr_ctx == nullptr || block->rows() == 0) { |
| return Status::OK(); |
| } |
| ColumnPtr filter_column; |
| RETURN_IF_ERROR(vexpr_ctx->execute(block, filter_column)); |
| size_t filter_column_id = block->columns(); |
| block->insert({filter_column, vexpr_ctx->execute_type(block), "filter_column"}); |
| vexpr_ctx->_memory_usage = filter_column->allocated_bytes(); |
| return Block::filter_block(block, filter_column_id, filter_column_id); |
| } |
| |
| Status VExprContext::filter_block(const VExprContextSPtrs& expr_contexts, Block* block, |
| size_t column_to_keep) { |
| if (expr_contexts.empty() || block->rows() == 0) { |
| return Status::OK(); |
| } |
| |
| ColumnNumbers columns_to_filter(column_to_keep); |
| std::iota(columns_to_filter.begin(), columns_to_filter.end(), 0); |
| |
| return execute_conjuncts_and_filter_block(expr_contexts, block, columns_to_filter, |
| static_cast<int>(column_to_keep)); |
| } |
| |
| Status VExprContext::execute_conjuncts(const VExprContextSPtrs& ctxs, |
| const std::vector<IColumn::Filter*>* filters, Block* block, |
| IColumn::Filter* result_filter, bool* can_filter_all) { |
| return execute_conjuncts(ctxs, filters, false, block, result_filter, can_filter_all); |
| } |
| |
| // TODO: Performance Optimization |
| Status VExprContext::execute_conjuncts(const VExprContextSPtrs& ctxs, |
| const std::vector<IColumn::Filter*>* filters, |
| bool accept_null, const Block* block, |
| IColumn::Filter* result_filter, bool* can_filter_all) { |
| size_t rows = block->rows(); |
| DCHECK_EQ(result_filter->size(), rows); |
| *can_filter_all = false; |
| auto* __restrict result_filter_data = result_filter->data(); |
| for (const auto& ctx : ctxs) { |
| // Statistics are only required when an rf wrapper exists in the expr. |
| bool is_rf_wrapper = ctx->root()->is_rf_wrapper(); |
| ColumnPtr filter_column; |
| RETURN_IF_ERROR(ctx->execute(block, filter_column)); |
| if (const auto* nullable_column = check_and_get_column<ColumnNullable>(*filter_column)) { |
| size_t column_size = nullable_column->size(); |
| if (column_size == 0) { |
| *can_filter_all = true; |
| return Status::OK(); |
| } else { |
| const ColumnPtr& nested_column = nullable_column->get_nested_column_ptr(); |
| const IColumn::Filter& filter = |
| assert_cast<const ColumnUInt8&>(*nested_column).get_data(); |
| const auto* __restrict filter_data = filter.data(); |
| const auto* __restrict null_map_data = nullable_column->get_null_map_data().data(); |
| |
| size_t input_rows = |
| rows - (is_rf_wrapper |
| ? simd::count_zero_num((int8_t*)result_filter_data, rows) |
| : 0); |
| |
| if (accept_null) { |
| for (size_t i = 0; i < rows; ++i) { |
| result_filter_data[i] &= (null_map_data[i]) || filter_data[i]; |
| } |
| } else { |
| for (size_t i = 0; i < rows; ++i) { |
| result_filter_data[i] &= (!null_map_data[i]) & filter_data[i]; |
| } |
| } |
| |
| size_t output_rows = |
| rows - (is_rf_wrapper |
| ? simd::count_zero_num((int8_t*)result_filter_data, rows) |
| : 0); |
| |
| if (is_rf_wrapper) { |
| ctx->root()->do_judge_selectivity(input_rows - output_rows, input_rows); |
| } |
| |
| if ((is_rf_wrapper && output_rows == 0) || |
| (!is_rf_wrapper && memchr(result_filter_data, 0x1, rows) == nullptr)) { |
| *can_filter_all = true; |
| return Status::OK(); |
| } |
| } |
| } else if (const auto* const_column = check_and_get_column<ColumnConst>(*filter_column)) { |
| // filter all |
| if (!const_column->get_bool(0)) { |
| *can_filter_all = true; |
| memset(result_filter_data, 0, result_filter->size()); |
| return Status::OK(); |
| } |
| } else { |
| const IColumn::Filter& filter = |
| assert_cast<const ColumnUInt8&>(*filter_column).get_data(); |
| const auto* __restrict filter_data = filter.data(); |
| |
| size_t input_rows = |
| rows - |
| (is_rf_wrapper ? simd::count_zero_num((int8_t*)result_filter_data, rows) : 0); |
| |
| for (size_t i = 0; i < rows; ++i) { |
| result_filter_data[i] &= filter_data[i]; |
| } |
| |
| size_t output_rows = |
| rows - |
| (is_rf_wrapper ? simd::count_zero_num((int8_t*)result_filter_data, rows) : 0); |
| |
| if (is_rf_wrapper) { |
| ctx->root()->do_judge_selectivity(input_rows - output_rows, input_rows); |
| } |
| |
| if ((is_rf_wrapper && output_rows == 0) || |
| (!is_rf_wrapper && memchr(result_filter_data, 0x1, rows) == nullptr)) { |
| *can_filter_all = true; |
| return Status::OK(); |
| } |
| } |
| } |
| if (filters != nullptr) { |
| for (auto* filter : *filters) { |
| auto* __restrict filter_data = filter->data(); |
| const size_t size = filter->size(); |
| for (size_t i = 0; i < size; ++i) { |
| result_filter_data[i] &= filter_data[i]; |
| } |
| if (memchr(result_filter_data, 0x1, size) == nullptr) { |
| *can_filter_all = true; |
| return Status::OK(); |
| } |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status VExprContext::execute_conjuncts(const VExprContextSPtrs& conjuncts, const Block* block, |
| ColumnUInt8& null_map, IColumn::Filter& filter) { |
| const auto& rows = block->rows(); |
| if (rows == 0) { |
| return Status::OK(); |
| } |
| if (null_map.size() != rows) { |
| return Status::InternalError("null_map.size()!=rows, null_map.size()={}, rows={}", |
| null_map.size(), rows); |
| } |
| |
| auto* final_null_map = null_map.get_data().data(); |
| auto* final_filter_ptr = filter.data(); |
| |
| for (const auto& conjunct : conjuncts) { |
| ColumnPtr result_column; |
| RETURN_IF_ERROR(conjunct->execute(block, result_column)); |
| auto [filter_column, is_const] = unpack_if_const(result_column); |
| const auto* nullable_column = assert_cast<const ColumnNullable*>(filter_column.get()); |
| if (!is_const) { |
| const ColumnPtr& nested_column = nullable_column->get_nested_column_ptr(); |
| const IColumn::Filter& result = |
| assert_cast<const ColumnUInt8&>(*nested_column).get_data(); |
| const auto* __restrict filter_data = result.data(); |
| const auto* __restrict null_map_data = nullable_column->get_null_map_data().data(); |
| DCHECK_EQ(rows, nullable_column->size()); |
| |
| for (size_t i = 0; i != rows; ++i) { |
| // null and null => null |
| // null and true => null |
| // null and false => false |
| final_null_map[i] = (final_null_map[i] & (null_map_data[i] | filter_data[i])) | |
| (null_map_data[i] & (final_null_map[i] | final_filter_ptr[i])); |
| final_filter_ptr[i] = final_filter_ptr[i] & filter_data[i]; |
| } |
| } else { |
| bool filter_data = nullable_column->get_bool(0); |
| bool null_map_data = nullable_column->is_null_at(0); |
| for (size_t i = 0; i != rows; ++i) { |
| // null and null => null |
| // null and true => null |
| // null and false => false |
| final_null_map[i] = (final_null_map[i] & (null_map_data | filter_data)) | |
| (null_map_data & (final_null_map[i] | final_filter_ptr[i])); |
| final_filter_ptr[i] = final_filter_ptr[i] & filter_data; |
| } |
| } |
| } |
| return Status::OK(); |
| } |
| |
| // TODO Performance Optimization |
| // need exception safety |
| Status VExprContext::execute_conjuncts_and_filter_block(const VExprContextSPtrs& ctxs, Block* block, |
| std::vector<uint32_t>& columns_to_filter, |
| int column_to_keep) { |
| IColumn::Filter result_filter(block->rows(), 1); |
| bool can_filter_all; |
| |
| _reset_memory_usage(ctxs); |
| |
| RETURN_IF_ERROR( |
| execute_conjuncts(ctxs, nullptr, false, block, &result_filter, &can_filter_all)); |
| |
| // Accumulate the usage of `result_filter` into the first context. |
| if (!ctxs.empty()) { |
| ctxs[0]->_memory_usage += result_filter.allocated_bytes(); |
| } |
| if (can_filter_all) { |
| for (auto& col : columns_to_filter) { |
| block->get_by_position(col).column->assume_mutable()->clear(); |
| } |
| } else { |
| try { |
| Block::filter_block_internal(block, columns_to_filter, result_filter); |
| } catch (const Exception& e) { |
| std::string str; |
| for (auto ctx : ctxs) { |
| if (str.length()) { |
| str += ","; |
| } |
| str += ctx->root()->debug_string(); |
| } |
| |
| return Status::InternalError( |
| "filter_block_internal meet exception, exprs=[{}], exception={}", str, |
| e.what()); |
| } |
| } |
| Block::erase_useless_column(block, column_to_keep); |
| return Status::OK(); |
| } |
| |
| Status VExprContext::execute_conjuncts_and_filter_block(const VExprContextSPtrs& ctxs, Block* block, |
| std::vector<uint32_t>& columns_to_filter, |
| int column_to_keep, |
| IColumn::Filter& filter) { |
| _reset_memory_usage(ctxs); |
| filter.resize_fill(block->rows(), 1); |
| bool can_filter_all; |
| RETURN_IF_ERROR(execute_conjuncts(ctxs, nullptr, false, block, &filter, &can_filter_all)); |
| |
| // Accumulate the usage of `result_filter` into the first context. |
| if (!ctxs.empty()) { |
| ctxs[0]->_memory_usage += filter.allocated_bytes(); |
| } |
| if (can_filter_all) { |
| for (auto& col : columns_to_filter) { |
| // NOLINTNEXTLINE(performance-move-const-arg) |
| std::move(*block->get_by_position(col).column).assume_mutable()->clear(); |
| } |
| } else { |
| RETURN_IF_CATCH_EXCEPTION(Block::filter_block_internal(block, columns_to_filter, filter)); |
| } |
| |
| Block::erase_useless_column(block, column_to_keep); |
| return Status::OK(); |
| } |
| |
| // do_projection: for some query(e.g. in MultiCastDataStreamerSourceOperator::get_block()), |
| // output_vexpr_ctxs will output the same column more than once, and if the output_block |
| // is mem-reused later, it will trigger DCHECK_EQ(d.column->use_count(), 1) failure when |
| // doing Block::clear_column_data, set do_projection to true to copy the column data to |
| // avoid this problem. |
| Status VExprContext::get_output_block_after_execute_exprs( |
| const VExprContextSPtrs& output_vexpr_ctxs, const Block& input_block, Block* output_block, |
| bool do_projection) { |
| auto rows = input_block.rows(); |
| vectorized::ColumnsWithTypeAndName result_columns; |
| _reset_memory_usage(output_vexpr_ctxs); |
| |
| for (const auto& vexpr_ctx : output_vexpr_ctxs) { |
| ColumnPtr result_column; |
| RETURN_IF_ERROR(vexpr_ctx->execute(&input_block, result_column)); |
| |
| auto type = vexpr_ctx->execute_type(&input_block); |
| const auto& name = vexpr_ctx->expr_name(); |
| |
| vexpr_ctx->_memory_usage += result_column->allocated_bytes(); |
| if (do_projection) { |
| result_columns.emplace_back(result_column->clone_resized(rows), type, name); |
| |
| } else { |
| result_columns.emplace_back(result_column, type, name); |
| } |
| } |
| *output_block = {result_columns}; |
| return Status::OK(); |
| } |
| |
| void VExprContext::_reset_memory_usage(const VExprContextSPtrs& contexts) { |
| std::for_each(contexts.begin(), contexts.end(), |
| [](auto&& context) { context->_memory_usage = 0; }); |
| } |
| |
| void VExprContext::prepare_ann_range_search(const doris::VectorSearchUserParams& params) { |
| if (_root == nullptr) { |
| return; |
| } |
| |
| _root->prepare_ann_range_search(params, _ann_range_search_runtime, _suitable_for_ann_index); |
| VLOG_DEBUG << fmt::format("Prepare ann range search result {}, _suitable_for_ann_index {}", |
| this->_ann_range_search_runtime.to_string(), |
| this->_suitable_for_ann_index); |
| return; |
| } |
| |
| Status VExprContext::evaluate_ann_range_search( |
| const std::vector<std::unique_ptr<segment_v2::IndexIterator>>& cid_to_index_iterators, |
| const std::vector<ColumnId>& idx_to_cid, |
| const std::vector<std::unique_ptr<segment_v2::ColumnIterator>>& column_iterators, |
| const std::unordered_map<vectorized::VExprContext*, |
| std::unordered_map<ColumnId, vectorized::VExpr*>>& |
| common_expr_to_slotref_map, |
| roaring::Roaring& row_bitmap, segment_v2::AnnIndexStats& ann_index_stats) { |
| if (_root == nullptr) { |
| return Status::OK(); |
| } |
| |
| RETURN_IF_ERROR(_root->evaluate_ann_range_search( |
| _ann_range_search_runtime, cid_to_index_iterators, idx_to_cid, column_iterators, |
| row_bitmap, ann_index_stats)); |
| |
| if (!_root->ann_range_search_executedd()) { |
| return Status::OK(); |
| } |
| |
| if (!_root->ann_dist_is_fulfilled()) { |
| // Do not perform index scan in this case. |
| return Status::OK(); |
| } |
| |
| auto src_col_idx = _ann_range_search_runtime.src_col_idx; |
| auto slot_ref_map_it = common_expr_to_slotref_map.find(this); |
| if (slot_ref_map_it == common_expr_to_slotref_map.end()) { |
| return Status::OK(); |
| } |
| auto& slot_ref_map = slot_ref_map_it->second; |
| ColumnId cid = idx_to_cid[src_col_idx]; |
| if (slot_ref_map.find(cid) == slot_ref_map.end()) { |
| return Status::OK(); |
| } |
| const VExpr* slot_ref_expr_addr = slot_ref_map.find(cid)->second; |
| _index_context->set_true_for_index_status(slot_ref_expr_addr, idx_to_cid[cid]); |
| |
| VLOG_DEBUG << fmt::format( |
| "Evaluate ann range search for expr {}, src_col_idx {}, cid {}, row_bitmap " |
| "cardinality {}", |
| _root->debug_string(), src_col_idx, cid, row_bitmap.cardinality()); |
| return Status::OK(); |
| } |
| |
| uint64_t VExprContext::get_digest(uint64_t seed) const { |
| return _root->get_digest(seed); |
| } |
| |
| #include "common/compile_check_end.h" |
| } // namespace doris::vectorized |