| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "vec/exec/scan/vscan_node.h" |
| |
| #include <gen_cpp/Exprs_types.h> |
| #include <gen_cpp/Metrics_types.h> |
| #include <gen_cpp/Opcodes_types.h> |
| #include <gen_cpp/PaloInternalService_types.h> |
| #include <gen_cpp/Types_types.h> |
| #include <string.h> |
| |
| #include <algorithm> |
| #include <memory> |
| #include <mutex> |
| #include <ostream> |
| #include <variant> |
| |
| #include "common/config.h" |
| #include "common/consts.h" |
| #include "common/logging.h" |
| #include "common/status.h" |
| #include "exec/olap_utils.h" |
| #include "exprs/bloom_filter_func.h" |
| #include "exprs/hybrid_set.h" |
| #include "exprs/runtime_filter.h" |
| #include "runtime/define_primitive_type.h" |
| #include "runtime/descriptors.h" |
| #include "runtime/exec_env.h" |
| #include "runtime/primitive_type.h" |
| #include "runtime/runtime_filter_mgr.h" |
| #include "runtime/types.h" |
| #include "udf/udf.h" |
| #include "util/defer_op.h" |
| #include "util/runtime_profile.h" |
| #include "vec/columns/column.h" |
| #include "vec/columns/column_const.h" |
| #include "vec/columns/column_vector.h" |
| #include "vec/common/string_ref.h" |
| #include "vec/core/block.h" |
| #include "vec/core/types.h" |
| #include "vec/exec/scan/pip_scanner_context.h" |
| #include "vec/exec/scan/scanner_scheduler.h" |
| #include "vec/exprs/vcompound_pred.h" |
| #include "vec/exprs/vectorized_fn_call.h" |
| #include "vec/exprs/vexpr.h" |
| #include "vec/exprs/vexpr_context.h" |
| #include "vec/exprs/vin_predicate.h" |
| #include "vec/exprs/vslot_ref.h" |
| #include "vec/functions/in.h" |
| #include "vec/runtime/vdatetime_value.h" |
| |
| namespace doris::vectorized { |
| |
| #define RETURN_IF_PUSH_DOWN(stmt, status) \ |
| if (pdt == PushDownType::UNACCEPTABLE) { \ |
| status = stmt; \ |
| if (!status.ok()) { \ |
| return; \ |
| } \ |
| } else { \ |
| return; \ |
| } |
| |
| static bool ignore_cast(SlotDescriptor* slot, VExpr* expr) { |
| if (slot->type().is_string_type() && expr->type().is_string_type()) { |
| return true; |
| } |
| // Variant slot cast could be eliminated |
| // We could use predicate to speed up query, so ignore cast to build predicate |
| if (slot->type().is_variant_type()) { |
| return true; |
| } |
| if (slot->type().is_array_type()) { |
| if (slot->type().children[0].type == expr->type().type) { |
| return true; |
| } |
| if (slot->type().children[0].is_string_type() && expr->type().is_string_type()) { |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| Status VScanNode::init(const TPlanNode& tnode, RuntimeState* state) { |
| RETURN_IF_ERROR(ExecNode::init(tnode, state)); |
| RETURN_IF_ERROR(RuntimeFilterConsumer::init(state)); |
| _state = state; |
| _is_pipeline_scan = state->enable_pipeline_exec(); |
| |
| const TQueryOptions& query_options = state->query_options(); |
| if (query_options.__isset.max_scan_key_num) { |
| _max_scan_key_num = query_options.max_scan_key_num; |
| } else { |
| _max_scan_key_num = config::doris_max_scan_key_num; |
| } |
| if (query_options.__isset.max_pushdown_conditions_per_column) { |
| _max_pushdown_conditions_per_column = query_options.max_pushdown_conditions_per_column; |
| } else { |
| _max_pushdown_conditions_per_column = config::max_pushdown_conditions_per_column; |
| } |
| |
| // tnode.olap_scan_node.push_down_agg_type_opt field is deprecated |
| // Introduced a new field : tnode.push_down_agg_type_opt |
| // |
| // make it compatible here |
| if (tnode.__isset.push_down_agg_type_opt) { |
| _push_down_agg_type = tnode.push_down_agg_type_opt; |
| } else if (tnode.olap_scan_node.__isset.push_down_agg_type_opt) { |
| _push_down_agg_type = tnode.olap_scan_node.push_down_agg_type_opt; |
| |
| } else { |
| _push_down_agg_type = TPushAggOp::type::NONE; |
| } |
| |
| if (tnode.__isset.push_down_count) { |
| _push_down_count = tnode.push_down_count; |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status VScanNode::prepare(RuntimeState* state) { |
| RETURN_IF_ERROR(ExecNode::prepare(state)); |
| |
| // init profile for runtime filter |
| RuntimeFilterConsumer::_init_profile(_runtime_profile.get()); |
| |
| if (_is_pipeline_scan) { |
| if (_shared_scan_opt) { |
| _shared_scanner_controller = state->get_query_ctx()->get_shared_scanner_controller(); |
| auto [should_create_scanner, queue_id] = |
| _shared_scanner_controller->should_build_scanner_and_queue_id(id()); |
| _should_create_scanner = should_create_scanner; |
| _context_queue_id = queue_id; |
| } else { |
| _should_create_scanner = true; |
| _context_queue_id = 0; |
| } |
| } |
| |
| // 1: running at not pipeline mode will init profile. |
| // 2: the scan node should create scanner at pipeline mode will init profile. |
| // during pipeline mode with more instances, olap scan node maybe not new VScanner object, |
| // so the profile of VScanner and SegmentIterator infos are always empty, could not init those. |
| if (!_is_pipeline_scan || _should_create_scanner) { |
| RETURN_IF_ERROR(_init_profile()); |
| } |
| // if you want to add some profile in scan node, even it have not new VScanner object |
| // could add here, not in the _init_profile() function |
| _get_next_timer = ADD_TIMER(_runtime_profile, "GetNextTime"); |
| |
| _prepare_rf_timer(_runtime_profile.get()); |
| |
| _open_timer = ADD_TIMER(_runtime_profile, "OpenTime"); |
| _alloc_resource_timer = ADD_TIMER(_runtime_profile, "AllocateResourceTime"); |
| return Status::OK(); |
| } |
| |
| Status VScanNode::open(RuntimeState* state) { |
| SCOPED_TIMER(_exec_timer); |
| SCOPED_TIMER(_runtime_profile->total_time_counter()); |
| SCOPED_TIMER(_open_timer); |
| RETURN_IF_CANCELLED(state); |
| return ExecNode::open(state); |
| } |
| |
| Status VScanNode::alloc_resource(RuntimeState* state) { |
| SCOPED_TIMER(_exec_timer); |
| SCOPED_TIMER(_alloc_resource_timer); |
| if (_opened) { |
| return Status::OK(); |
| } |
| _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id); |
| RETURN_IF_ERROR(ExecNode::alloc_resource(state)); |
| RETURN_IF_ERROR(_acquire_runtime_filter()); |
| RETURN_IF_ERROR(_process_conjuncts()); |
| |
| if (_is_pipeline_scan) { |
| if (_should_create_scanner) { |
| auto status = |
| !_eos ? _prepare_scanners(state->query_parallel_instance_num()) : Status::OK(); |
| if (_scanner_ctx) { |
| DCHECK(!_eos && _num_scanners->value() > 0); |
| RETURN_IF_ERROR(_scanner_ctx->init()); |
| RETURN_IF_ERROR(_state->exec_env()->scanner_scheduler()->submit(_scanner_ctx)); |
| } |
| if (_shared_scan_opt) { |
| LOG(INFO) << "instance shared scan enabled" |
| << print_id(state->fragment_instance_id()); |
| _shared_scanner_controller->set_scanner_context(id(), |
| _eos ? nullptr : _scanner_ctx); |
| } |
| RETURN_IF_ERROR(status); |
| } else if (_shared_scanner_controller->scanner_context_is_ready(id())) { |
| _scanner_ctx = _shared_scanner_controller->get_scanner_context(id()); |
| if (!_scanner_ctx) { |
| _eos = true; |
| } |
| } else { |
| return Status::WaitForScannerContext("Need wait for scanner context create"); |
| } |
| } else { |
| RETURN_IF_ERROR(!_eos ? _prepare_scanners(state->query_parallel_instance_num()) |
| : Status::OK()); |
| if (_scanner_ctx) { |
| RETURN_IF_ERROR(_scanner_ctx->init()); |
| RETURN_IF_ERROR(_state->exec_env()->scanner_scheduler()->submit(_scanner_ctx)); |
| } |
| } |
| |
| RETURN_IF_CANCELLED(state); |
| _opened = true; |
| return Status::OK(); |
| } |
| |
| Status VScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) { |
| SCOPED_TIMER(_exec_timer); |
| SCOPED_TIMER(_get_next_timer); |
| SCOPED_TIMER(_runtime_profile->total_time_counter()); |
| // in inverted index apply logic, in order to optimize query performance, |
| // we built some temporary columns into block, these columns only used in scan node level, |
| // remove them when query leave scan node to avoid other nodes use block->columns() to make a wrong decision |
| Defer drop_block_temp_column {[&]() { |
| std::unique_lock l(_block_lock); |
| auto all_column_names = block->get_names(); |
| for (auto& name : all_column_names) { |
| if (name.rfind(BeConsts::BLOCK_TEMP_COLUMN_PREFIX, 0) == 0) { |
| block->erase(name); |
| } |
| } |
| }}; |
| |
| if (state->is_cancelled()) { |
| // ISSUE: https://github.com/apache/doris/issues/16360 |
| // _scanner_ctx may be null here, see: `VScanNode::alloc_resource` (_eos == null) |
| if (_scanner_ctx) { |
| _scanner_ctx->set_status_on_error(Status::Cancelled("query cancelled")); |
| return _scanner_ctx->status(); |
| } else { |
| return Status::Cancelled("query cancelled"); |
| } |
| } |
| |
| if (_eos) { |
| *eos = true; |
| return Status::OK(); |
| } |
| |
| vectorized::BlockUPtr scan_block = nullptr; |
| RETURN_IF_ERROR(_scanner_ctx->get_block_from_queue(state, &scan_block, eos, _context_queue_id)); |
| if (*eos) { |
| DCHECK(scan_block == nullptr); |
| return Status::OK(); |
| } |
| |
| // get scanner's block memory |
| block->swap(*scan_block); |
| _scanner_ctx->return_free_block(std::move(scan_block)); |
| |
| reached_limit(block, eos); |
| if (*eos) { |
| // reach limit, stop the scanners. |
| _scanner_ctx->stop_scanners(state); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status VScanNode::_init_profile() { |
| // 1. counters for scan node |
| _rows_read_counter = ADD_COUNTER(_runtime_profile, "ScanRowsRead", TUnit::UNIT); |
| _byte_read_counter = ADD_COUNTER(_runtime_profile, "ScanByteRead", TUnit::BYTES); |
| _total_throughput_counter = |
| runtime_profile()->add_rate_counter("TotalReadThroughput", _rows_read_counter); |
| _num_scanners = ADD_COUNTER(_runtime_profile, "NumScanners", TUnit::UNIT); |
| |
| // 2. counters for scanners |
| _scanner_profile.reset(new RuntimeProfile("VScanner")); |
| runtime_profile()->add_child(_scanner_profile.get(), true, nullptr); |
| |
| _memory_usage_counter = ADD_LABEL_COUNTER(_scanner_profile, "MemoryUsage"); |
| _queued_blocks_memory_usage = |
| _scanner_profile->AddHighWaterMarkCounter("QueuedBlocks", TUnit::BYTES, "MemoryUsage"); |
| _free_blocks_memory_usage = |
| _scanner_profile->AddHighWaterMarkCounter("FreeBlocks", TUnit::BYTES, "MemoryUsage"); |
| _newly_create_free_blocks_num = |
| ADD_COUNTER(_scanner_profile, "NewlyCreateFreeBlocksNum", TUnit::UNIT); |
| // time of transfer thread to wait for block from scan thread |
| _scanner_wait_batch_timer = ADD_TIMER(_scanner_profile, "ScannerBatchWaitTime"); |
| _scanner_sched_counter = ADD_COUNTER(_scanner_profile, "ScannerSchedCount", TUnit::UNIT); |
| _scanner_ctx_sched_counter = ADD_COUNTER(_scanner_profile, "ScannerCtxSchedCount", TUnit::UNIT); |
| _scanner_ctx_sched_time = ADD_TIMER(_scanner_profile, "ScannerCtxSchedTime"); |
| |
| _scan_timer = ADD_TIMER(_scanner_profile, "ScannerGetBlockTime"); |
| _scan_cpu_timer = ADD_TIMER(_scanner_profile, "ScannerCpuTime"); |
| _prefilter_timer = ADD_TIMER(_scanner_profile, "ScannerPrefilterTime"); |
| _convert_block_timer = ADD_TIMER(_scanner_profile, "ScannerConvertBlockTime"); |
| _filter_timer = ADD_TIMER(_scanner_profile, "ScannerFilterTime"); |
| |
| // time of scan thread to wait for worker thread of the thread pool |
| _scanner_wait_worker_timer = ADD_TIMER(_runtime_profile, "ScannerWorkerWaitTime"); |
| |
| _max_scanner_thread_num = ADD_COUNTER(_runtime_profile, "MaxScannerThreadNum", TUnit::UNIT); |
| |
| return Status::OK(); |
| } |
| |
| void VScanNode::_start_scanners(const std::list<std::shared_ptr<ScannerDelegate>>& scanners, |
| const int query_parallel_instance_num) { |
| if (_is_pipeline_scan) { |
| int max_queue_size = _shared_scan_opt ? std::max(query_parallel_instance_num, 1) : 1; |
| _scanner_ctx = pipeline::PipScannerContext::create_shared( |
| _state, this, _output_tuple_desc, _output_row_descriptor.get(), scanners, limit(), |
| _state->scan_queue_mem_limit(), max_queue_size); |
| } else { |
| _scanner_ctx = ScannerContext::create_shared(_state, this, _output_tuple_desc, |
| _output_row_descriptor.get(), scanners, |
| limit(), _state->scan_queue_mem_limit()); |
| } |
| } |
| |
| Status VScanNode::close(RuntimeState* state) { |
| if (is_closed()) { |
| return Status::OK(); |
| } |
| |
| RETURN_IF_ERROR(ExecNode::close(state)); |
| return Status::OK(); |
| } |
| |
| void VScanNode::release_resource(RuntimeState* state) { |
| if (_scanner_ctx) { |
| if (!state->enable_pipeline_exec() || _should_create_scanner) { |
| // stop and wait the scanner scheduler to be done |
| // _scanner_ctx may not be created for some short circuit case. |
| _scanner_ctx->stop_scanners(state); |
| } |
| } |
| _scanners.clear(); |
| ExecNode::release_resource(state); |
| } |
| |
| Status VScanNode::_normalize_conjuncts() { |
| // The conjuncts is always on output tuple, so use _output_tuple_desc; |
| std::vector<SlotDescriptor*> slots = _output_tuple_desc->slots(); |
| |
| auto init_value_range = [&](SlotDescriptor* slot, PrimitiveType type) { |
| switch (type) { |
| #define M(NAME) \ |
| case TYPE_##NAME: { \ |
| ColumnValueRange<TYPE_##NAME> range(slot->col_name(), slot->is_nullable(), \ |
| slot->type().precision, slot->type().scale); \ |
| _slot_id_to_value_range[slot->id()] = std::pair {slot, range}; \ |
| break; \ |
| } |
| #define APPLY_FOR_PRIMITIVE_TYPE(M) \ |
| M(TINYINT) \ |
| M(SMALLINT) \ |
| M(INT) \ |
| M(BIGINT) \ |
| M(LARGEINT) \ |
| M(CHAR) \ |
| M(DATE) \ |
| M(DATETIME) \ |
| M(DATEV2) \ |
| M(DATETIMEV2) \ |
| M(VARCHAR) \ |
| M(STRING) \ |
| M(HLL) \ |
| M(DECIMAL32) \ |
| M(DECIMAL64) \ |
| M(DECIMAL128I) \ |
| M(DECIMAL256) \ |
| M(DECIMALV2) \ |
| M(BOOLEAN) |
| APPLY_FOR_PRIMITIVE_TYPE(M) |
| #undef M |
| default: { |
| VLOG_CRITICAL << "Unsupported Normalize Slot [ColName=" << slot->col_name() << "]"; |
| break; |
| } |
| } |
| }; |
| |
| for (int slot_idx = 0; slot_idx < slots.size(); ++slot_idx) { |
| _colname_to_slot_id[slots[slot_idx]->col_name()] = slots[slot_idx]->id(); |
| _slot_id_to_slot_desc[slots[slot_idx]->id()] = slots[slot_idx]; |
| |
| auto type = slots[slot_idx]->type().type; |
| if (slots[slot_idx]->type().type == TYPE_ARRAY) { |
| type = slots[slot_idx]->type().children[0].type; |
| if (type == TYPE_ARRAY) { |
| continue; |
| } |
| } |
| init_value_range(slots[slot_idx], slots[slot_idx]->type().type); |
| } |
| |
| get_cast_types_for_variants(); |
| for (const auto& [colname, type] : _cast_types_for_variants) { |
| init_value_range(_slot_id_to_slot_desc[_colname_to_slot_id[colname]], type); |
| } |
| |
| for (auto it = _conjuncts.begin(); it != _conjuncts.end();) { |
| auto& conjunct = *it; |
| if (conjunct->root()) { |
| VExprSPtr new_root; |
| RETURN_IF_ERROR(_normalize_predicate(conjunct->root(), conjunct.get(), new_root)); |
| if (new_root) { |
| conjunct->set_root(new_root); |
| if (_should_push_down_common_expr() && |
| VExpr::is_acting_on_a_slot(*(conjunct->root()))) { |
| // We need to make sure conjunct is acting on a slot before push it down. |
| // Or it will not be executed by SegmentIterator::_vec_init_lazy_materialization |
| _common_expr_ctxs_push_down.emplace_back(conjunct); |
| it = _conjuncts.erase(it); |
| continue; |
| } |
| } else { |
| // Whole conjunct is pushed down as predicate column |
| _stale_expr_ctxs.emplace_back(conjunct); |
| it = _conjuncts.erase(it); |
| continue; |
| } |
| } |
| ++it; |
| } |
| |
| for (auto& it : _slot_id_to_value_range) { |
| std::visit( |
| [&](auto&& range) { |
| if (range.is_empty_value_range()) { |
| _eos = true; |
| } |
| }, |
| it.second.second); |
| _colname_to_value_range[it.second.first->col_name()] = it.second.second; |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status VScanNode::_normalize_predicate(const VExprSPtr& conjunct_expr_root, VExprContext* context, |
| VExprSPtr& output_expr) { |
| static constexpr auto is_leaf = [](auto&& expr) { return !expr->is_and_expr(); }; |
| auto in_predicate_checker = [](const VExprSPtrs& children, std::shared_ptr<VSlotRef>& slot, |
| VExprSPtr& child_contains_slot) { |
| if (children.empty() || |
| VExpr::expr_without_cast(children[0])->node_type() != TExprNodeType::SLOT_REF) { |
| // not a slot ref(column) |
| return false; |
| } |
| slot = std::dynamic_pointer_cast<VSlotRef>(VExpr::expr_without_cast(children[0])); |
| child_contains_slot = children[0]; |
| return true; |
| }; |
| auto eq_predicate_checker = [](const VExprSPtrs& children, std::shared_ptr<VSlotRef>& slot, |
| VExprSPtr& child_contains_slot) { |
| for (const auto& child : children) { |
| if (VExpr::expr_without_cast(child)->node_type() != TExprNodeType::SLOT_REF) { |
| // not a slot ref(column) |
| continue; |
| } |
| slot = std::dynamic_pointer_cast<VSlotRef>(VExpr::expr_without_cast(child)); |
| CHECK(slot != nullptr); |
| child_contains_slot = child; |
| return true; |
| } |
| return false; |
| }; |
| |
| if (conjunct_expr_root != nullptr) { |
| if (is_leaf(conjunct_expr_root)) { |
| auto impl = conjunct_expr_root->get_impl(); |
| // If impl is not null, which means this a conjuncts from runtime filter. |
| auto cur_expr = impl ? impl.get() : conjunct_expr_root.get(); |
| bool _is_runtime_filter_predicate = |
| _rf_vexpr_set.find(conjunct_expr_root) != _rf_vexpr_set.end(); |
| SlotDescriptor* slot = nullptr; |
| ColumnValueRangeType* range = nullptr; |
| PushDownType pdt = PushDownType::UNACCEPTABLE; |
| RETURN_IF_ERROR(_eval_const_conjuncts(cur_expr, context, &pdt)); |
| if (pdt == PushDownType::ACCEPTABLE) { |
| output_expr = nullptr; |
| return Status::OK(); |
| } |
| std::shared_ptr<VSlotRef> slotref; |
| for (const auto& child : cur_expr->children()) { |
| if (VExpr::expr_without_cast(child)->node_type() != TExprNodeType::SLOT_REF) { |
| // not a slot ref(column) |
| continue; |
| } |
| slotref = std::dynamic_pointer_cast<VSlotRef>(VExpr::expr_without_cast(child)); |
| } |
| if (_is_predicate_acting_on_slot(cur_expr, in_predicate_checker, &slot, &range) || |
| _is_predicate_acting_on_slot(cur_expr, eq_predicate_checker, &slot, &range)) { |
| Status status = Status::OK(); |
| std::visit( |
| [&](auto& value_range) { |
| Defer mark_runtime_filter_flag {[&]() { |
| value_range.mark_runtime_filter_predicate( |
| _is_runtime_filter_predicate); |
| }}; |
| RETURN_IF_PUSH_DOWN(_normalize_in_and_eq_predicate( |
| cur_expr, context, slot, value_range, &pdt), |
| status); |
| RETURN_IF_PUSH_DOWN(_normalize_not_in_and_not_eq_predicate( |
| cur_expr, context, slot, value_range, &pdt), |
| status); |
| RETURN_IF_PUSH_DOWN(_normalize_is_null_predicate( |
| cur_expr, context, slot, value_range, &pdt), |
| status); |
| RETURN_IF_PUSH_DOWN(_normalize_noneq_binary_predicate( |
| cur_expr, context, slot, value_range, &pdt), |
| status); |
| RETURN_IF_PUSH_DOWN(_normalize_match_predicate(cur_expr, context, slot, |
| value_range, &pdt), |
| status); |
| if (_is_key_column(slot->col_name())) { |
| RETURN_IF_PUSH_DOWN( |
| _normalize_bitmap_filter(cur_expr, context, slot, &pdt), |
| status); |
| RETURN_IF_PUSH_DOWN( |
| _normalize_bloom_filter(cur_expr, context, slot, &pdt), |
| status); |
| if (_state->enable_function_pushdown()) { |
| RETURN_IF_PUSH_DOWN(_normalize_function_filters( |
| cur_expr, context, slot, &pdt), |
| status); |
| } |
| } |
| }, |
| *range); |
| RETURN_IF_ERROR(status); |
| } |
| |
| if (pdt == PushDownType::UNACCEPTABLE && |
| TExprNodeType::COMPOUND_PRED == cur_expr->node_type()) { |
| static_cast<void>(_normalize_compound_predicate( |
| cur_expr, context, &pdt, _is_runtime_filter_predicate, in_predicate_checker, |
| eq_predicate_checker)); |
| output_expr = conjunct_expr_root; // remaining in conjunct tree |
| return Status::OK(); |
| } |
| |
| if (pdt == PushDownType::ACCEPTABLE && |
| TExprNodeType::MATCH_PRED == cur_expr->node_type()) { |
| // remaining it in the expr tree, in order to filter by function if the pushdown |
| // match_predicate failed to apply inverted index in the storage layer |
| output_expr = conjunct_expr_root; // remaining in conjunct tree |
| return Status::OK(); |
| } |
| |
| if (pdt == PushDownType::ACCEPTABLE && slotref != nullptr && |
| slotref->type().is_variant_type()) { |
| // remaining it in the expr tree, in order to filter by function if the pushdown |
| // predicate is not applied |
| output_expr = conjunct_expr_root; // remaining in conjunct tree |
| return Status::OK(); |
| } |
| |
| if (pdt == PushDownType::ACCEPTABLE && |
| (_is_key_column(slot->col_name()) || _storage_no_merge())) { |
| output_expr = nullptr; |
| return Status::OK(); |
| } else { |
| // for PARTIAL_ACCEPTABLE and UNACCEPTABLE, do not remove expr from the tree |
| output_expr = conjunct_expr_root; |
| return Status::OK(); |
| } |
| } else { |
| VExprSPtr left_child; |
| RETURN_IF_ERROR( |
| _normalize_predicate(conjunct_expr_root->children()[0], context, left_child)); |
| VExprSPtr right_child; |
| RETURN_IF_ERROR( |
| _normalize_predicate(conjunct_expr_root->children()[1], context, right_child)); |
| |
| if (left_child != nullptr && right_child != nullptr) { |
| conjunct_expr_root->set_children({left_child, right_child}); |
| output_expr = conjunct_expr_root; |
| return Status::OK(); |
| } else { |
| if (left_child == nullptr) { |
| conjunct_expr_root->children()[0]->close(context, |
| context->get_function_state_scope()); |
| } |
| if (right_child == nullptr) { |
| conjunct_expr_root->children()[1]->close(context, |
| context->get_function_state_scope()); |
| } |
| // here only close the and expr self, do not close the child |
| conjunct_expr_root->set_children({}); |
| conjunct_expr_root->close(context, context->get_function_state_scope()); |
| } |
| |
| // here do not close VExpr* now |
| output_expr = left_child != nullptr ? left_child : right_child; |
| return Status::OK(); |
| } |
| } |
| output_expr = conjunct_expr_root; |
| return Status::OK(); |
| } |
| |
| Status VScanNode::_normalize_bloom_filter(VExpr* expr, VExprContext* expr_ctx, SlotDescriptor* slot, |
| PushDownType* pdt) { |
| if (TExprNodeType::BLOOM_PRED == expr->node_type()) { |
| DCHECK(expr->children().size() == 1); |
| PushDownType temp_pdt = _should_push_down_bloom_filter(); |
| if (temp_pdt != PushDownType::UNACCEPTABLE) { |
| _filter_predicates.bloom_filters.emplace_back(slot->col_name(), |
| expr->get_bloom_filter_func()); |
| *pdt = temp_pdt; |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status VScanNode::_normalize_bitmap_filter(VExpr* expr, VExprContext* expr_ctx, |
| SlotDescriptor* slot, PushDownType* pdt) { |
| if (TExprNodeType::BITMAP_PRED == expr->node_type()) { |
| DCHECK(expr->children().size() == 1); |
| PushDownType temp_pdt = _should_push_down_bitmap_filter(); |
| if (temp_pdt != PushDownType::UNACCEPTABLE) { |
| _filter_predicates.bitmap_filters.emplace_back(slot->col_name(), |
| expr->get_bitmap_filter_func()); |
| *pdt = temp_pdt; |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status VScanNode::_normalize_function_filters(VExpr* expr, VExprContext* expr_ctx, |
| SlotDescriptor* slot, PushDownType* pdt) { |
| bool opposite = false; |
| VExpr* fn_expr = expr; |
| if (TExprNodeType::COMPOUND_PRED == expr->node_type() && |
| expr->fn().name.function_name == "not") { |
| fn_expr = fn_expr->children()[0].get(); |
| opposite = true; |
| } |
| |
| if (TExprNodeType::FUNCTION_CALL == fn_expr->node_type()) { |
| doris::FunctionContext* fn_ctx = nullptr; |
| StringRef val; |
| PushDownType temp_pdt; |
| RETURN_IF_ERROR(_should_push_down_function_filter( |
| reinterpret_cast<VectorizedFnCall*>(fn_expr), expr_ctx, &val, &fn_ctx, temp_pdt)); |
| if (temp_pdt != PushDownType::UNACCEPTABLE) { |
| std::string col = slot->col_name(); |
| _push_down_functions.emplace_back(opposite, col, fn_ctx, val); |
| *pdt = temp_pdt; |
| } |
| } |
| return Status::OK(); |
| } |
| |
| bool VScanNode::_is_predicate_acting_on_slot( |
| VExpr* expr, |
| const std::function<bool(const VExprSPtrs&, std::shared_ptr<VSlotRef>&, VExprSPtr&)>& |
| checker, |
| SlotDescriptor** slot_desc, ColumnValueRangeType** range) { |
| std::shared_ptr<VSlotRef> slot_ref; |
| VExprSPtr child_contains_slot; |
| if (!checker(expr->children(), slot_ref, child_contains_slot)) { |
| // not a slot ref(column) |
| return false; |
| } |
| |
| auto entry = _slot_id_to_value_range.find(slot_ref->slot_id()); |
| if (_slot_id_to_value_range.end() == entry) { |
| return false; |
| } |
| // if the slot is a complex type(array/map/struct), we do not push down the predicate, because |
| // we delete pack these type into predict column, and origin pack action is wrong. we should |
| // make sense to push down this complex type after we delete predict column. |
| if (is_complex_type(remove_nullable(slot_ref->data_type()))) { |
| return false; |
| } |
| *slot_desc = entry->second.first; |
| DCHECK(child_contains_slot != nullptr); |
| if (child_contains_slot->type().type != (*slot_desc)->type().type || |
| child_contains_slot->type().precision != (*slot_desc)->type().precision || |
| child_contains_slot->type().scale != (*slot_desc)->type().scale) { |
| if (!ignore_cast(*slot_desc, child_contains_slot.get())) { |
| // the type of predicate not match the slot's type |
| return false; |
| } |
| } else if (child_contains_slot->type().is_datetime_type() && |
| child_contains_slot->node_type() == doris::TExprNodeType::CAST_EXPR) { |
| // Expr `CAST(CAST(datetime_col AS DATE) AS DATETIME) = datetime_literal` should not be |
| // push down. |
| return false; |
| } |
| *range = &(entry->second.second); |
| return true; |
| } |
| |
| Status VScanNode::_eval_const_conjuncts(VExpr* vexpr, VExprContext* expr_ctx, PushDownType* pdt) { |
| char* constant_val = nullptr; |
| if (vexpr->is_constant()) { |
| std::shared_ptr<ColumnPtrWrapper> const_col_wrapper; |
| RETURN_IF_ERROR(vexpr->get_const_col(expr_ctx, &const_col_wrapper)); |
| if (const ColumnConst* const_column = |
| check_and_get_column<ColumnConst>(const_col_wrapper->column_ptr)) { |
| constant_val = const_cast<char*>(const_column->get_data_at(0).data); |
| if (constant_val == nullptr || !*reinterpret_cast<bool*>(constant_val)) { |
| *pdt = PushDownType::ACCEPTABLE; |
| _eos = true; |
| } |
| } else if (const ColumnVector<UInt8>* bool_column = |
| check_and_get_column<ColumnVector<UInt8>>( |
| const_col_wrapper->column_ptr)) { |
| // TODO: If `vexpr->is_constant()` is true, a const column is expected here. |
| // But now we still don't cover all predicates for const expression. |
| // For example, for query `SELECT col FROM tbl WHERE 'PROMOTION' LIKE 'AAA%'`, |
| // predicate `like` will return a ColumnVector<UInt8> which contains a single value. |
| LOG(WARNING) << "VExpr[" << vexpr->debug_string() |
| << "] should return a const column but actually is " |
| << const_col_wrapper->column_ptr->get_name(); |
| DCHECK_EQ(bool_column->size(), 1); |
| if (bool_column->size() == 1) { |
| constant_val = const_cast<char*>(bool_column->get_data_at(0).data); |
| if (constant_val == nullptr || !*reinterpret_cast<bool*>(constant_val)) { |
| *pdt = PushDownType::ACCEPTABLE; |
| _eos = true; |
| } |
| } else { |
| LOG(WARNING) << "Constant predicate in scan node should return a bool column with " |
| "`size == 1` but actually is " |
| << bool_column->size(); |
| } |
| } else { |
| LOG(WARNING) << "VExpr[" << vexpr->debug_string() |
| << "] should return a const column but actually is " |
| << const_col_wrapper->column_ptr->get_name(); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| template <PrimitiveType T> |
| Status VScanNode::_normalize_in_and_eq_predicate(VExpr* expr, VExprContext* expr_ctx, |
| SlotDescriptor* slot, ColumnValueRange<T>& range, |
| PushDownType* pdt) { |
| auto temp_range = ColumnValueRange<T>::create_empty_column_value_range( |
| slot->is_nullable(), slot->type().precision, slot->type().scale); |
| // 1. Normalize in conjuncts like 'where col in (v1, v2, v3)' |
| if (TExprNodeType::IN_PRED == expr->node_type()) { |
| HybridSetBase::IteratorBase* iter = nullptr; |
| auto hybrid_set = expr->get_set_func(); |
| |
| if (hybrid_set != nullptr) { |
| // runtime filter produce VDirectInPredicate |
| if (hybrid_set->size() <= _max_pushdown_conditions_per_column) { |
| iter = hybrid_set->begin(); |
| } else { |
| _filter_predicates.in_filters.emplace_back(slot->col_name(), expr->get_set_func()); |
| *pdt = PushDownType::ACCEPTABLE; |
| return Status::OK(); |
| } |
| } else { |
| // normal in predicate |
| VInPredicate* pred = static_cast<VInPredicate*>(expr); |
| PushDownType temp_pdt = _should_push_down_in_predicate(pred, expr_ctx, false); |
| if (temp_pdt == PushDownType::UNACCEPTABLE) { |
| return Status::OK(); |
| } |
| |
| // begin to push InPredicate value into ColumnValueRange |
| InState* state = reinterpret_cast<InState*>( |
| expr_ctx->fn_context(pred->fn_context_index()) |
| ->get_function_state(FunctionContext::FRAGMENT_LOCAL)); |
| |
| // xx in (col, xx, xx) should not be push down |
| if (!state->use_set) { |
| return Status::OK(); |
| } |
| |
| iter = state->hybrid_set->begin(); |
| } |
| |
| while (iter->has_next()) { |
| // column in (nullptr) is always false so continue to |
| // dispose next item |
| if (nullptr == iter->get_value()) { |
| iter->next(); |
| continue; |
| } |
| auto value = const_cast<void*>(iter->get_value()); |
| RETURN_IF_ERROR(_change_value_range<true>( |
| temp_range, value, ColumnValueRange<T>::add_fixed_value_range, "")); |
| iter->next(); |
| } |
| range.intersection(temp_range); |
| *pdt = PushDownType::ACCEPTABLE; |
| } else if (TExprNodeType::BINARY_PRED == expr->node_type()) { |
| DCHECK(expr->children().size() == 2); |
| auto eq_checker = [](const std::string& fn_name) { return fn_name == "eq"; }; |
| |
| StringRef value; |
| int slot_ref_child = -1; |
| |
| PushDownType temp_pdt; |
| RETURN_IF_ERROR(_should_push_down_binary_predicate( |
| reinterpret_cast<VectorizedFnCall*>(expr), expr_ctx, &value, &slot_ref_child, |
| eq_checker, temp_pdt)); |
| if (temp_pdt == PushDownType::UNACCEPTABLE) { |
| return Status::OK(); |
| } |
| DCHECK(slot_ref_child >= 0); |
| // where A = nullptr should return empty result set |
| auto fn_name = std::string(""); |
| if (value.data != nullptr) { |
| if constexpr (T == TYPE_CHAR || T == TYPE_VARCHAR || T == TYPE_STRING || |
| T == TYPE_HLL) { |
| auto val = StringRef(value.data, value.size); |
| RETURN_IF_ERROR(_change_value_range<true>( |
| temp_range, reinterpret_cast<void*>(&val), |
| ColumnValueRange<T>::add_fixed_value_range, fn_name)); |
| } else { |
| if (sizeof(typename PrimitiveTypeTraits<T>::CppType) != value.size) { |
| return Status::InternalError( |
| "PrimitiveType {} meet invalid input value_size={}, expect_size={}, " |
| "node_id={}", |
| T, value.size, sizeof(typename PrimitiveTypeTraits<T>::CppType), _id); |
| } |
| RETURN_IF_ERROR(_change_value_range<true>( |
| temp_range, reinterpret_cast<void*>(const_cast<char*>(value.data)), |
| ColumnValueRange<T>::add_fixed_value_range, fn_name)); |
| } |
| range.intersection(temp_range); |
| } |
| *pdt = temp_pdt; |
| } |
| |
| return Status::OK(); |
| } |
| |
| template <PrimitiveType T> |
| Status VScanNode::_normalize_not_in_and_not_eq_predicate(VExpr* expr, VExprContext* expr_ctx, |
| SlotDescriptor* slot, |
| ColumnValueRange<T>& range, |
| PushDownType* pdt) { |
| bool is_fixed_range = range.is_fixed_value_range(); |
| auto not_in_range = ColumnValueRange<T>::create_empty_column_value_range( |
| range.column_name(), slot->is_nullable(), slot->type().precision, slot->type().scale); |
| PushDownType temp_pdt = PushDownType::UNACCEPTABLE; |
| // 1. Normalize in conjuncts like 'where col in (v1, v2, v3)' |
| if (TExprNodeType::IN_PRED == expr->node_type()) { |
| VInPredicate* pred = static_cast<VInPredicate*>(expr); |
| if ((temp_pdt = _should_push_down_in_predicate(pred, expr_ctx, true)) == |
| PushDownType::UNACCEPTABLE) { |
| return Status::OK(); |
| } |
| |
| // begin to push InPredicate value into ColumnValueRange |
| InState* state = reinterpret_cast<InState*>( |
| expr_ctx->fn_context(pred->fn_context_index()) |
| ->get_function_state(FunctionContext::FRAGMENT_LOCAL)); |
| |
| // xx in (col, xx, xx) should not be push down |
| if (!state->use_set) { |
| return Status::OK(); |
| } |
| |
| HybridSetBase::IteratorBase* iter = state->hybrid_set->begin(); |
| auto fn_name = std::string(""); |
| if (!is_fixed_range && state->null_in_set) { |
| _eos = true; |
| } |
| while (iter->has_next()) { |
| // column not in (nullptr) is always true |
| if (nullptr == iter->get_value()) { |
| continue; |
| } |
| auto value = const_cast<void*>(iter->get_value()); |
| if (is_fixed_range) { |
| RETURN_IF_ERROR(_change_value_range<true>( |
| range, value, ColumnValueRange<T>::remove_fixed_value_range, fn_name)); |
| } else { |
| RETURN_IF_ERROR(_change_value_range<true>( |
| not_in_range, value, ColumnValueRange<T>::add_fixed_value_range, fn_name)); |
| } |
| iter->next(); |
| } |
| } else if (TExprNodeType::BINARY_PRED == expr->node_type()) { |
| DCHECK(expr->children().size() == 2); |
| |
| auto ne_checker = [](const std::string& fn_name) { return fn_name == "ne"; }; |
| StringRef value; |
| int slot_ref_child = -1; |
| RETURN_IF_ERROR(_should_push_down_binary_predicate( |
| reinterpret_cast<VectorizedFnCall*>(expr), expr_ctx, &value, &slot_ref_child, |
| ne_checker, temp_pdt)); |
| if (temp_pdt == PushDownType::UNACCEPTABLE) { |
| return Status::OK(); |
| } |
| |
| DCHECK(slot_ref_child >= 0); |
| // where A = nullptr should return empty result set |
| if (value.data != nullptr) { |
| auto fn_name = std::string(""); |
| if constexpr (T == TYPE_CHAR || T == TYPE_VARCHAR || T == TYPE_STRING || |
| T == TYPE_HLL) { |
| auto val = StringRef(value.data, value.size); |
| if (is_fixed_range) { |
| RETURN_IF_ERROR(_change_value_range<true>( |
| range, reinterpret_cast<void*>(&val), |
| ColumnValueRange<T>::remove_fixed_value_range, fn_name)); |
| } else { |
| RETURN_IF_ERROR(_change_value_range<true>( |
| not_in_range, reinterpret_cast<void*>(&val), |
| ColumnValueRange<T>::add_fixed_value_range, fn_name)); |
| } |
| } else { |
| if (is_fixed_range) { |
| RETURN_IF_ERROR(_change_value_range<true>( |
| range, reinterpret_cast<void*>(const_cast<char*>(value.data)), |
| ColumnValueRange<T>::remove_fixed_value_range, fn_name)); |
| } else { |
| RETURN_IF_ERROR(_change_value_range<true>( |
| not_in_range, reinterpret_cast<void*>(const_cast<char*>(value.data)), |
| ColumnValueRange<T>::add_fixed_value_range, fn_name)); |
| } |
| } |
| } |
| } else { |
| return Status::OK(); |
| } |
| |
| if (is_fixed_range || |
| not_in_range.get_fixed_value_size() <= _max_pushdown_conditions_per_column) { |
| if (!is_fixed_range) { |
| _not_in_value_ranges.push_back(not_in_range); |
| } |
| *pdt = temp_pdt; |
| } |
| return Status::OK(); |
| } |
| |
| template <PrimitiveType T> |
| Status VScanNode::_normalize_is_null_predicate(VExpr* expr, VExprContext* expr_ctx, |
| SlotDescriptor* slot, ColumnValueRange<T>& range, |
| PushDownType* pdt) { |
| PushDownType temp_pdt = _should_push_down_is_null_predicate(); |
| if (temp_pdt == PushDownType::UNACCEPTABLE) { |
| return Status::OK(); |
| } |
| |
| if (TExprNodeType::FUNCTION_CALL == expr->node_type()) { |
| if (reinterpret_cast<VectorizedFnCall*>(expr)->fn().name.function_name == "is_null_pred") { |
| auto temp_range = ColumnValueRange<T>::create_empty_column_value_range( |
| slot->is_nullable(), slot->type().precision, slot->type().scale); |
| temp_range.set_contain_null(true); |
| range.intersection(temp_range); |
| *pdt = temp_pdt; |
| } else if (reinterpret_cast<VectorizedFnCall*>(expr)->fn().name.function_name == |
| "is_not_null_pred") { |
| auto temp_range = ColumnValueRange<T>::create_empty_column_value_range( |
| slot->is_nullable(), slot->type().precision, slot->type().scale); |
| temp_range.set_contain_null(false); |
| range.intersection(temp_range); |
| *pdt = temp_pdt; |
| } |
| } |
| return Status::OK(); |
| } |
| |
| template <PrimitiveType T> |
| Status VScanNode::_normalize_noneq_binary_predicate(VExpr* expr, VExprContext* expr_ctx, |
| SlotDescriptor* slot, |
| ColumnValueRange<T>& range, PushDownType* pdt) { |
| if (TExprNodeType::BINARY_PRED == expr->node_type()) { |
| DCHECK(expr->children().size() == 2); |
| |
| auto noneq_checker = [](const std::string& fn_name) { |
| return fn_name != "ne" && fn_name != "eq" && fn_name != "eq_for_null"; |
| }; |
| StringRef value; |
| int slot_ref_child = -1; |
| PushDownType temp_pdt; |
| RETURN_IF_ERROR(_should_push_down_binary_predicate( |
| reinterpret_cast<VectorizedFnCall*>(expr), expr_ctx, &value, &slot_ref_child, |
| noneq_checker, temp_pdt)); |
| if (temp_pdt != PushDownType::UNACCEPTABLE) { |
| DCHECK(slot_ref_child >= 0); |
| const std::string& fn_name = |
| reinterpret_cast<VectorizedFnCall*>(expr)->fn().name.function_name; |
| |
| // where A = nullptr should return empty result set |
| if (value.data != nullptr) { |
| if constexpr (T == TYPE_CHAR || T == TYPE_VARCHAR || T == TYPE_STRING || |
| T == TYPE_HLL) { |
| auto val = StringRef(value.data, value.size); |
| RETURN_IF_ERROR(_change_value_range<false>(range, reinterpret_cast<void*>(&val), |
| ColumnValueRange<T>::add_value_range, |
| fn_name, slot_ref_child)); |
| } else { |
| RETURN_IF_ERROR(_change_value_range<false>( |
| range, reinterpret_cast<void*>(const_cast<char*>(value.data)), |
| ColumnValueRange<T>::add_value_range, fn_name, slot_ref_child)); |
| } |
| *pdt = temp_pdt; |
| } |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status VScanNode::_normalize_compound_predicate( |
| vectorized::VExpr* expr, VExprContext* expr_ctx, PushDownType* pdt, |
| bool _is_runtime_filter_predicate, |
| const std::function<bool(const VExprSPtrs&, std::shared_ptr<VSlotRef>&, VExprSPtr&)>& |
| in_predicate_checker, |
| const std::function<bool(const VExprSPtrs&, std::shared_ptr<VSlotRef>&, VExprSPtr&)>& |
| eq_predicate_checker) { |
| if (TExprNodeType::COMPOUND_PRED == expr->node_type()) { |
| auto compound_fn_name = expr->fn().name.function_name; |
| auto children_num = expr->children().size(); |
| for (auto i = 0; i < children_num; ++i) { |
| auto child_expr = expr->children()[i].get(); |
| if (TExprNodeType::BINARY_PRED == child_expr->node_type()) { |
| SlotDescriptor* slot = nullptr; |
| ColumnValueRangeType* range_on_slot = nullptr; |
| if (_is_predicate_acting_on_slot(child_expr, in_predicate_checker, &slot, |
| &range_on_slot) || |
| _is_predicate_acting_on_slot(child_expr, eq_predicate_checker, &slot, |
| &range_on_slot)) { |
| ColumnValueRangeType active_range = |
| *range_on_slot; // copy, in order not to affect the range in the _colname_to_value_range |
| std::visit( |
| [&](auto& value_range) { |
| Defer mark_runtime_filter_flag {[&]() { |
| value_range.mark_runtime_filter_predicate( |
| _is_runtime_filter_predicate); |
| }}; |
| static_cast<void>(_normalize_binary_in_compound_predicate( |
| child_expr, expr_ctx, slot, value_range, pdt)); |
| }, |
| active_range); |
| |
| _compound_value_ranges.emplace_back(active_range); |
| } |
| } else if (TExprNodeType::MATCH_PRED == child_expr->node_type()) { |
| SlotDescriptor* slot = nullptr; |
| ColumnValueRangeType* range_on_slot = nullptr; |
| if (_is_predicate_acting_on_slot(child_expr, in_predicate_checker, &slot, |
| &range_on_slot) || |
| _is_predicate_acting_on_slot(child_expr, eq_predicate_checker, &slot, |
| &range_on_slot)) { |
| ColumnValueRangeType active_range = |
| *range_on_slot; // copy, in order not to affect the range in the _colname_to_value_range |
| std::visit( |
| [&](auto& value_range) { |
| Defer mark_runtime_filter_flag {[&]() { |
| value_range.mark_runtime_filter_predicate( |
| _is_runtime_filter_predicate); |
| }}; |
| static_cast<void>(_normalize_match_in_compound_predicate( |
| child_expr, expr_ctx, slot, value_range, pdt)); |
| }, |
| active_range); |
| |
| _compound_value_ranges.emplace_back(active_range); |
| } |
| } else if (TExprNodeType::COMPOUND_PRED == child_expr->node_type()) { |
| static_cast<void>(_normalize_compound_predicate( |
| child_expr, expr_ctx, pdt, _is_runtime_filter_predicate, |
| in_predicate_checker, eq_predicate_checker)); |
| } |
| } |
| } |
| |
| return Status::OK(); |
| } |
| |
| template <PrimitiveType T> |
| Status VScanNode::_normalize_binary_in_compound_predicate(vectorized::VExpr* expr, |
| VExprContext* expr_ctx, |
| SlotDescriptor* slot, |
| ColumnValueRange<T>& range, |
| PushDownType* pdt) { |
| DCHECK(expr->children().size() == 2); |
| if (TExprNodeType::BINARY_PRED == expr->node_type()) { |
| auto eq_checker = [](const std::string& fn_name) { return fn_name == "eq"; }; |
| auto ne_checker = [](const std::string& fn_name) { return fn_name == "ne"; }; |
| auto noneq_checker = [](const std::string& fn_name) { |
| return fn_name != "ne" && fn_name != "eq"; |
| }; |
| |
| StringRef value; |
| int slot_ref_child = -1; |
| PushDownType eq_pdt; |
| PushDownType ne_pdt; |
| PushDownType noneq_pdt; |
| RETURN_IF_ERROR(_should_push_down_binary_predicate( |
| reinterpret_cast<VectorizedFnCall*>(expr), expr_ctx, &value, &slot_ref_child, |
| eq_checker, eq_pdt)); |
| RETURN_IF_ERROR(_should_push_down_binary_predicate( |
| reinterpret_cast<VectorizedFnCall*>(expr), expr_ctx, &value, &slot_ref_child, |
| ne_checker, ne_pdt)); |
| RETURN_IF_ERROR(_should_push_down_binary_predicate( |
| reinterpret_cast<VectorizedFnCall*>(expr), expr_ctx, &value, &slot_ref_child, |
| noneq_checker, noneq_pdt)); |
| if (eq_pdt == PushDownType::UNACCEPTABLE && ne_pdt == PushDownType::UNACCEPTABLE && |
| noneq_pdt == PushDownType::UNACCEPTABLE) { |
| return Status::OK(); |
| } |
| DCHECK(slot_ref_child >= 0); |
| const std::string& fn_name = |
| reinterpret_cast<VectorizedFnCall*>(expr)->fn().name.function_name; |
| if (eq_pdt == PushDownType::ACCEPTABLE || ne_pdt == PushDownType::ACCEPTABLE || |
| noneq_pdt == PushDownType::ACCEPTABLE) { |
| if (value.data != nullptr) { |
| if constexpr (T == TYPE_CHAR || T == TYPE_VARCHAR || T == TYPE_STRING || |
| T == TYPE_HLL) { |
| auto val = StringRef(value.data, value.size); |
| RETURN_IF_ERROR(_change_value_range<false>( |
| range, reinterpret_cast<void*>(&val), |
| ColumnValueRange<T>::add_compound_value_range, fn_name, |
| slot_ref_child)); |
| } else { |
| RETURN_IF_ERROR(_change_value_range<false>( |
| range, reinterpret_cast<void*>(const_cast<char*>(value.data)), |
| ColumnValueRange<T>::add_compound_value_range, fn_name, |
| slot_ref_child)); |
| } |
| } |
| *pdt = PushDownType::ACCEPTABLE; |
| } |
| } |
| return Status::OK(); |
| } |
| |
| template <PrimitiveType T> |
| Status VScanNode::_normalize_match_in_compound_predicate(vectorized::VExpr* expr, |
| VExprContext* expr_ctx, |
| SlotDescriptor* slot, |
| ColumnValueRange<T>& range, |
| PushDownType* pdt) { |
| DCHECK(expr->children().size() == 2); |
| if (TExprNodeType::MATCH_PRED == expr->node_type()) { |
| RETURN_IF_ERROR(_normalize_match_predicate(expr, expr_ctx, slot, range, pdt)); |
| } |
| |
| return Status::OK(); |
| } |
| |
| template <PrimitiveType T> |
| Status VScanNode::_normalize_match_predicate(VExpr* expr, VExprContext* expr_ctx, |
| SlotDescriptor* slot, ColumnValueRange<T>& range, |
| PushDownType* pdt) { |
| if (TExprNodeType::MATCH_PRED == expr->node_type()) { |
| DCHECK(expr->children().size() == 2); |
| |
| // create empty range as temp range, temp range should do intersection on range |
| auto temp_range = ColumnValueRange<T>::create_empty_column_value_range( |
| slot->is_nullable(), slot->type().precision, slot->type().scale); |
| // Normalize match conjuncts like 'where col match value' |
| |
| auto match_checker = [](const std::string& fn_name) { return is_match_condition(fn_name); }; |
| StringRef value; |
| int slot_ref_child = -1; |
| PushDownType temp_pdt; |
| RETURN_IF_ERROR(_should_push_down_binary_predicate( |
| reinterpret_cast<VectorizedFnCall*>(expr), expr_ctx, &value, &slot_ref_child, |
| match_checker, temp_pdt)); |
| if (temp_pdt != PushDownType::UNACCEPTABLE) { |
| DCHECK(slot_ref_child >= 0); |
| if (value.data != nullptr) { |
| using CppType = typename PrimitiveTypeTraits<T>::CppType; |
| if constexpr (T == TYPE_CHAR || T == TYPE_VARCHAR || T == TYPE_STRING || |
| T == TYPE_HLL) { |
| auto val = StringRef(value.data, value.size); |
| ColumnValueRange<T>::add_match_value_range(temp_range, |
| to_match_type(expr->op()), |
| reinterpret_cast<CppType*>(&val)); |
| } else { |
| ColumnValueRange<T>::add_match_value_range( |
| temp_range, to_match_type(expr->op()), |
| reinterpret_cast<CppType*>(const_cast<char*>(value.data))); |
| } |
| range.intersection(temp_range); |
| } |
| *pdt = temp_pdt; |
| } |
| } |
| return Status::OK(); |
| } |
| |
| template <bool IsFixed, PrimitiveType PrimitiveType, typename ChangeFixedValueRangeFunc> |
| Status VScanNode::_change_value_range(ColumnValueRange<PrimitiveType>& temp_range, void* value, |
| const ChangeFixedValueRangeFunc& func, |
| const std::string& fn_name, int slot_ref_child) { |
| if constexpr (PrimitiveType == TYPE_DATE) { |
| VecDateTimeValue tmp_value; |
| memcpy(&tmp_value, value, sizeof(VecDateTimeValue)); |
| if constexpr (IsFixed) { |
| if (!tmp_value.check_loss_accuracy_cast_to_date()) { |
| func(temp_range, |
| reinterpret_cast<typename PrimitiveTypeTraits<PrimitiveType>::CppType*>( |
| &tmp_value)); |
| } |
| } else { |
| if (tmp_value.check_loss_accuracy_cast_to_date()) { |
| if (fn_name == "lt" || fn_name == "ge") { |
| ++tmp_value; |
| } |
| } |
| func(temp_range, to_olap_filter_type(fn_name, slot_ref_child), |
| reinterpret_cast<typename PrimitiveTypeTraits<PrimitiveType>::CppType*>( |
| &tmp_value)); |
| } |
| } else if constexpr (PrimitiveType == TYPE_DATETIME) { |
| if constexpr (IsFixed) { |
| func(temp_range, |
| reinterpret_cast<typename PrimitiveTypeTraits<PrimitiveType>::CppType*>(value)); |
| } else { |
| func(temp_range, to_olap_filter_type(fn_name, slot_ref_child), |
| reinterpret_cast<typename PrimitiveTypeTraits<PrimitiveType>::CppType*>( |
| reinterpret_cast<char*>(value))); |
| } |
| } else if constexpr ((PrimitiveType == TYPE_DECIMALV2) || (PrimitiveType == TYPE_CHAR) || |
| (PrimitiveType == TYPE_VARCHAR) || (PrimitiveType == TYPE_HLL) || |
| (PrimitiveType == TYPE_DATETIMEV2) || (PrimitiveType == TYPE_TINYINT) || |
| (PrimitiveType == TYPE_SMALLINT) || (PrimitiveType == TYPE_INT) || |
| (PrimitiveType == TYPE_BIGINT) || (PrimitiveType == TYPE_LARGEINT) || |
| (PrimitiveType == TYPE_DECIMAL32) || (PrimitiveType == TYPE_DECIMAL64) || |
| (PrimitiveType == TYPE_DECIMAL128I) || |
| (PrimitiveType == TYPE_DECIMAL256) || (PrimitiveType == TYPE_STRING) || |
| (PrimitiveType == TYPE_BOOLEAN) || (PrimitiveType == TYPE_DATEV2)) { |
| if constexpr (IsFixed) { |
| func(temp_range, |
| reinterpret_cast<typename PrimitiveTypeTraits<PrimitiveType>::CppType*>(value)); |
| } else { |
| func(temp_range, to_olap_filter_type(fn_name, slot_ref_child), |
| reinterpret_cast<typename PrimitiveTypeTraits<PrimitiveType>::CppType*>(value)); |
| } |
| } else { |
| static_assert(always_false_v<PrimitiveType>); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status VScanNode::clone_conjunct_ctxs(VExprContextSPtrs& conjuncts) { |
| if (!_conjuncts.empty()) { |
| std::unique_lock l(_rf_locks); |
| conjuncts.resize(_conjuncts.size()); |
| for (size_t i = 0; i != _conjuncts.size(); ++i) { |
| RETURN_IF_ERROR(_conjuncts[i]->clone(_state, conjuncts[i])); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status VScanNode::_should_push_down_binary_predicate( |
| VectorizedFnCall* fn_call, VExprContext* expr_ctx, StringRef* constant_val, |
| int* slot_ref_child, const std::function<bool(const std::string&)>& fn_checker, |
| VScanNode::PushDownType& pdt) { |
| if (!fn_checker(fn_call->fn().name.function_name)) { |
| pdt = PushDownType::UNACCEPTABLE; |
| return Status::OK(); |
| } |
| |
| const auto& children = fn_call->children(); |
| DCHECK(children.size() == 2); |
| for (size_t i = 0; i < children.size(); i++) { |
| if (VExpr::expr_without_cast(children[i])->node_type() != TExprNodeType::SLOT_REF) { |
| // not a slot ref(column) |
| continue; |
| } |
| if (!children[1 - i]->is_constant()) { |
| // only handle constant value |
| pdt = PushDownType::UNACCEPTABLE; |
| return Status::OK(); |
| } else { |
| std::shared_ptr<ColumnPtrWrapper> const_col_wrapper; |
| RETURN_IF_ERROR(children[1 - i]->get_const_col(expr_ctx, &const_col_wrapper)); |
| if (const ColumnConst* const_column = |
| check_and_get_column<ColumnConst>(const_col_wrapper->column_ptr)) { |
| *slot_ref_child = i; |
| *constant_val = const_column->get_data_at(0); |
| } else { |
| pdt = PushDownType::UNACCEPTABLE; |
| return Status::OK(); |
| } |
| } |
| } |
| pdt = PushDownType::ACCEPTABLE; |
| return Status::OK(); |
| } |
| |
| VScanNode::PushDownType VScanNode::_should_push_down_in_predicate(VInPredicate* pred, |
| VExprContext* expr_ctx, |
| bool is_not_in) { |
| if (pred->is_not_in() != is_not_in) { |
| return PushDownType::UNACCEPTABLE; |
| } |
| return PushDownType::ACCEPTABLE; |
| } |
| |
| Status VScanNode::_prepare_scanners(const int query_parallel_instance_num) { |
| std::list<VScannerSPtr> scanners; |
| RETURN_IF_ERROR(_init_scanners(&scanners)); |
| // Init scanner wrapper |
| for (auto it = scanners.begin(); it != scanners.end(); ++it) { |
| _scanners.emplace_back(std::make_shared<ScannerDelegate>(*it)); |
| } |
| if (scanners.empty()) { |
| _eos = true; |
| } else { |
| for (auto& scanner : scanners) { |
| scanner->set_query_statistics(_query_statistics.get()); |
| } |
| COUNTER_SET(_num_scanners, static_cast<int64_t>(scanners.size())); |
| _start_scanners(_scanners, query_parallel_instance_num); |
| } |
| return Status::OK(); |
| } |
| } // namespace doris::vectorized |