| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #pragma once |
| |
| #include <glog/logging.h> |
| |
| #include <algorithm> |
| #include <cstddef> |
| #include <memory> |
| #include <utility> |
| #include <vector> |
| |
| #include "common/factory_creator.h" |
| #include "common/status.h" |
| #include "olap/rowset/segment_v2/ann_index/range_search_runtime_info.h" |
| #include "olap/rowset/segment_v2/column_reader.h" |
| #include "olap/rowset/segment_v2/inverted_index_reader.h" |
| #include "runtime/runtime_state.h" |
| #include "runtime/types.h" |
| #include "udf/udf.h" |
| #include "vec/core/block.h" |
| #include "vec/exprs/vexpr_fwd.h" |
| |
| namespace doris { |
| class RowDescriptor; |
| class RuntimeState; |
| } // namespace doris |
| |
| namespace doris::segment_v2 { |
| class ColumnIterator; |
| } // namespace doris::segment_v2 |
| |
| namespace doris::vectorized { |
| |
| class InvertedIndexContext { |
| public: |
| InvertedIndexContext( |
| const std::vector<ColumnId>& col_ids, |
| const std::vector<std::unique_ptr<segment_v2::IndexIterator>>& index_iterators, |
| const std::vector<vectorized::IndexFieldNameAndTypePair>& storage_name_and_type_vec, |
| std::unordered_map<ColumnId, std::unordered_map<const vectorized::VExpr*, bool>>& |
| common_expr_inverted_index_status) |
| : _col_ids(col_ids), |
| _index_iterators(index_iterators), |
| _storage_name_and_type(storage_name_and_type_vec), |
| _expr_inverted_index_status(common_expr_inverted_index_status) {} |
| |
| segment_v2::IndexIterator* get_inverted_index_iterator_by_column_id(int column_index) const { |
| if (column_index < 0 || column_index >= _col_ids.size()) { |
| return nullptr; |
| } |
| const auto& column_id = _col_ids[column_index]; |
| if (column_id >= _index_iterators.size()) { |
| return nullptr; |
| } |
| if (!_index_iterators[column_id]) { |
| return nullptr; |
| } |
| return _index_iterators[column_id].get(); |
| } |
| |
| const vectorized::IndexFieldNameAndTypePair* get_storage_name_and_type_by_column_id( |
| int column_index) const { |
| if (column_index < 0 || column_index >= _col_ids.size()) { |
| return nullptr; |
| } |
| const auto& column_id = _col_ids[column_index]; |
| if (column_id >= _storage_name_and_type.size()) { |
| return nullptr; |
| } |
| return &_storage_name_and_type[column_id]; |
| } |
| |
| bool has_inverted_index_result_for_expr(const vectorized::VExpr* expr) const { |
| return _inverted_index_result_bitmap.contains(expr); |
| } |
| |
| void set_inverted_index_result_for_expr(const vectorized::VExpr* expr, |
| segment_v2::InvertedIndexResultBitmap bitmap) { |
| _inverted_index_result_bitmap[expr] = std::move(bitmap); |
| } |
| |
| std::unordered_map<const vectorized::VExpr*, segment_v2::InvertedIndexResultBitmap>& |
| get_inverted_index_result_bitmap() { |
| return _inverted_index_result_bitmap; |
| } |
| |
| std::unordered_map<const vectorized::VExpr*, ColumnPtr>& get_inverted_index_result_column() { |
| return _inverted_index_result_column; |
| } |
| |
| const segment_v2::InvertedIndexResultBitmap* get_inverted_index_result_for_expr( |
| const vectorized::VExpr* expr) { |
| auto iter = _inverted_index_result_bitmap.find(expr); |
| if (iter == _inverted_index_result_bitmap.end()) { |
| return nullptr; |
| } |
| return &iter->second; |
| } |
| |
| void set_inverted_index_result_column_for_expr(const vectorized::VExpr* expr, |
| ColumnPtr column) { |
| _inverted_index_result_column[expr] = std::move(column); |
| } |
| |
| void set_true_for_inverted_index_status(const vectorized::VExpr* expr, int column_index) { |
| if (column_index < 0 || column_index >= _col_ids.size()) { |
| return; |
| } |
| const auto& column_id = _col_ids[column_index]; |
| if (_expr_inverted_index_status.contains(column_id)) { |
| if (_expr_inverted_index_status[column_id].contains(expr)) { |
| _expr_inverted_index_status[column_id][expr] = true; |
| } |
| } |
| } |
| |
| private: |
| // A reference to a vector of column IDs for the current expression's output columns. |
| const std::vector<ColumnId>& _col_ids; |
| |
| // A reference to a vector of unique pointers to index iterators. |
| const std::vector<std::unique_ptr<segment_v2::IndexIterator>>& _index_iterators; |
| |
| // A reference to a vector of storage name and type pairs related to schema. |
| const std::vector<vectorized::IndexFieldNameAndTypePair>& _storage_name_and_type; |
| |
| // A map of expressions to their corresponding inverted index result bitmaps. |
| std::unordered_map<const vectorized::VExpr*, segment_v2::InvertedIndexResultBitmap> |
| _inverted_index_result_bitmap; |
| |
| // A map of expressions to their corresponding result columns. |
| std::unordered_map<const vectorized::VExpr*, ColumnPtr> _inverted_index_result_column; |
| |
| // A reference to a map of common expressions to their inverted index evaluation status. |
| std::unordered_map<ColumnId, std::unordered_map<const vectorized::VExpr*, bool>>& |
| _expr_inverted_index_status; |
| }; |
| |
| class VExprContext { |
| ENABLE_FACTORY_CREATOR(VExprContext); |
| |
| public: |
| VExprContext(VExprSPtr expr) : _root(std::move(expr)) {} |
| ~VExprContext(); |
| [[nodiscard]] Status prepare(RuntimeState* state, const RowDescriptor& row_desc); |
| [[nodiscard]] Status open(RuntimeState* state); |
| [[nodiscard]] Status clone(RuntimeState* state, VExprContextSPtr& new_ctx); |
| [[nodiscard]] Status execute(Block* block, int* result_column_id); |
| |
| VExprSPtr root() { return _root; } |
| void set_root(const VExprSPtr& expr) { _root = expr; } |
| void set_inverted_index_context(std::shared_ptr<InvertedIndexContext> inverted_index_context) { |
| _inverted_index_context = std::move(inverted_index_context); |
| } |
| |
| std::shared_ptr<InvertedIndexContext> get_inverted_index_context() const { |
| return _inverted_index_context; |
| } |
| |
| /// Creates a FunctionContext, and returns the index that's passed to fn_context() to |
| /// retrieve the created context. Exprs that need a FunctionContext should call this in |
| /// Prepare() and save the returned index. 'varargs_buffer_size', if specified, is the |
| /// size of the varargs buffer in the created FunctionContext (see udf-internal.h). |
| int register_function_context(RuntimeState* state, const DataTypePtr& return_type, |
| const std::vector<DataTypePtr>& arg_types); |
| |
| /// Retrieves a registered FunctionContext. 'i' is the index returned by the call to |
| /// register_function_context(). This should only be called by VExprs. |
| FunctionContext* fn_context(int i) { |
| if (i < 0 || i >= _fn_contexts.size()) { |
| throw Exception(ErrorCode::INTERNAL_ERROR, |
| "fn_context index invalid, index={}, _fn_contexts.size()={}", i, |
| _fn_contexts.size()); |
| } |
| return _fn_contexts[i].get(); |
| } |
| |
| // execute expr with inverted index which column a, b has inverted indexes |
| // but some situation although column b has indexes, but apply index is not useful, we should |
| // skip this expr, just do not apply index anymore. |
| |
| [[nodiscard]] Status evaluate_inverted_index(uint32_t segment_num_rows); |
| |
| bool all_expr_inverted_index_evaluated(); |
| |
| [[nodiscard]] static Status filter_block(VExprContext* vexpr_ctx, Block* block, |
| size_t column_to_keep); |
| |
| [[nodiscard]] static Status filter_block(const VExprContextSPtrs& expr_contexts, Block* block, |
| size_t column_to_keep); |
| |
| [[nodiscard]] static Status execute_conjuncts(const VExprContextSPtrs& ctxs, |
| const std::vector<IColumn::Filter*>* filters, |
| bool accept_null, Block* block, |
| IColumn::Filter* result_filter, |
| bool* can_filter_all); |
| |
| [[nodiscard]] static Status execute_conjuncts(const VExprContextSPtrs& conjuncts, Block* block, |
| ColumnUInt8& null_map, |
| IColumn::Filter& result_filter); |
| |
| static Status execute_conjuncts(const VExprContextSPtrs& ctxs, |
| const std::vector<IColumn::Filter*>* filters, Block* block, |
| IColumn::Filter* result_filter, bool* can_filter_all); |
| |
| [[nodiscard]] static Status execute_conjuncts_and_filter_block( |
| const VExprContextSPtrs& ctxs, Block* block, std::vector<uint32_t>& columns_to_filter, |
| int column_to_keep); |
| |
| static Status execute_conjuncts_and_filter_block(const VExprContextSPtrs& ctxs, Block* block, |
| std::vector<uint32_t>& columns_to_filter, |
| int column_to_keep, IColumn::Filter& filter); |
| |
| [[nodiscard]] static Status get_output_block_after_execute_exprs(const VExprContextSPtrs&, |
| const Block&, Block*, |
| bool do_projection = false); |
| |
| int get_last_result_column_id() const { |
| DCHECK(_last_result_column_id != -1); |
| return _last_result_column_id; |
| } |
| |
| FunctionContext::FunctionStateScope get_function_state_scope() const { |
| return _is_clone ? FunctionContext::THREAD_LOCAL : FunctionContext::FRAGMENT_LOCAL; |
| } |
| |
| void clone_fn_contexts(VExprContext* other); |
| |
| bool force_materialize_slot() const { return _force_materialize_slot; } |
| |
| void set_force_materialize_slot() { _force_materialize_slot = true; } |
| |
| VExprContext& operator=(const VExprContext& other) { |
| if (this == &other) { |
| return *this; |
| } |
| |
| _root = other._root; |
| _is_clone = other._is_clone; |
| _prepared = other._prepared; |
| _opened = other._opened; |
| |
| for (const auto& fn : other._fn_contexts) { |
| _fn_contexts.emplace_back(fn->clone()); |
| } |
| |
| _last_result_column_id = other._last_result_column_id; |
| _depth_num = other._depth_num; |
| return *this; |
| } |
| |
| VExprContext& operator=(VExprContext&& other) { |
| _root = other._root; |
| other._root = nullptr; |
| _is_clone = other._is_clone; |
| _prepared = other._prepared; |
| _opened = other._opened; |
| _fn_contexts = std::move(other._fn_contexts); |
| _last_result_column_id = other._last_result_column_id; |
| _depth_num = other._depth_num; |
| return *this; |
| } |
| |
| [[nodiscard]] static size_t get_memory_usage(const VExprContextSPtrs& contexts) { |
| size_t usage = 0; |
| std::for_each(contexts.cbegin(), contexts.cend(), |
| [&usage](auto&& context) { usage += context->_memory_usage; }); |
| return usage; |
| } |
| |
| [[nodiscard]] size_t get_memory_usage() const { return _memory_usage; } |
| |
| Status prepare_ann_range_search(const doris::VectorSearchUserParams& params); |
| |
| Status evaluate_ann_range_search( |
| const std::vector<std::unique_ptr<segment_v2::IndexIterator>>& cid_to_index_iterators, |
| const std::vector<ColumnId>& idx_to_cid, |
| const std::vector<std::unique_ptr<segment_v2::ColumnIterator>>& column_iterators, |
| roaring::Roaring& row_bitmap); |
| |
| private: |
| // Close method is called in vexpr context dector, not need call expicility |
| void close(); |
| |
| static void _reset_memory_usage(const VExprContextSPtrs& contexts); |
| |
| friend class VExpr; |
| |
| /// The expr tree this context is for. |
| VExprSPtr _root; |
| |
| /// True if this context came from a Clone() call. Used to manage FunctionStateScope. |
| bool _is_clone = false; |
| |
| /// Variables keeping track of current state. |
| bool _prepared = false; |
| bool _opened = false; |
| |
| /// FunctionContexts for each registered expression. The FunctionContexts are created |
| /// and owned by this VExprContext. |
| std::vector<std::unique_ptr<FunctionContext>> _fn_contexts; |
| |
| int _last_result_column_id = -1; |
| |
| /// The depth of expression-tree. |
| int _depth_num = 0; |
| |
| // This flag only works on VSlotRef. |
| // Force to materialize even if the slot need_materialize is false, we just ignore need_materialize flag |
| bool _force_materialize_slot = false; |
| |
| std::shared_ptr<InvertedIndexContext> _inverted_index_context; |
| size_t _memory_usage = 0; |
| |
| RangeSearchRuntimeInfo _ann_range_search_runtime; |
| bool _suitable_for_ann_index = true; |
| }; |
| } // namespace doris::vectorized |