| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "exec/operator/olap_scan_operator.h" |
| |
| #include <fmt/format.h> |
| |
| #include <memory> |
| #include <numeric> |
| #include <optional> |
| |
| #include "cloud/cloud_meta_mgr.h" |
| #include "cloud/cloud_storage_engine.h" |
| #include "cloud/cloud_tablet.h" |
| #include "cloud/cloud_tablet_hotspot.h" |
| #include "cloud/config.h" |
| #include "exec/operator/scan_operator.h" |
| #include "exec/runtime_filter/runtime_filter_consumer_helper.h" |
| #include "exec/scan/olap_scanner.h" |
| #include "exec/scan/parallel_scanner_builder.h" |
| #include "exprs/function/in.h" |
| #include "exprs/score_runtime.h" |
| #include "exprs/vectorized_fn_call.h" |
| #include "exprs/vexpr.h" |
| #include "exprs/vexpr_context.h" |
| #include "exprs/vslot_ref.h" |
| #include "io/cache/block_file_cache_profile.h" |
| #include "runtime/query_cache/query_cache.h" |
| #include "runtime/runtime_profile.h" |
| #include "runtime/runtime_state.h" |
| #include "service/backend_options.h" |
| #include "storage/index/ann/ann_topn_runtime.h" |
| #include "storage/storage_engine.h" |
| #include "storage/tablet/tablet_manager.h" |
| #include "util/to_string.h" |
| |
| namespace doris { |
| #include "common/compile_check_begin.h" |
| |
| Status OlapScanLocalState::init(RuntimeState* state, LocalStateInfo& info) { |
| const TOlapScanNode& olap_scan_node = _parent->cast<OlapScanOperatorX>()._olap_scan_node; |
| |
| if (olap_scan_node.__isset.score_sort_info && olap_scan_node.__isset.score_sort_limit) { |
| const doris::TExpr& ordering_expr = olap_scan_node.score_sort_info.ordering_exprs.front(); |
| const bool asc = olap_scan_node.score_sort_info.is_asc_order[0]; |
| const size_t limit = olap_scan_node.score_sort_limit; |
| std::shared_ptr<VExprContext> ordering_expr_ctx; |
| RETURN_IF_ERROR(VExpr::create_expr_tree(ordering_expr, ordering_expr_ctx)); |
| _score_runtime = ScoreRuntime::create_shared(ordering_expr_ctx, asc, limit); |
| } |
| |
| if (olap_scan_node.__isset.ann_sort_info || olap_scan_node.__isset.ann_sort_limit) { |
| DCHECK(olap_scan_node.__isset.ann_sort_info); |
| DCHECK(olap_scan_node.__isset.ann_sort_limit); |
| DCHECK(olap_scan_node.ann_sort_info.ordering_exprs.size() == 1); |
| const doris::TExpr& ordering_expr = olap_scan_node.ann_sort_info.ordering_exprs.front(); |
| DCHECK(ordering_expr.nodes[0].__isset.slot_ref); |
| DCHECK(ordering_expr.nodes[0].slot_ref.is_virtual_slot); |
| DCHECK(olap_scan_node.ann_sort_info.is_asc_order.size() == 1); |
| const bool asc = olap_scan_node.ann_sort_info.is_asc_order[0]; |
| const size_t limit = olap_scan_node.ann_sort_limit; |
| std::shared_ptr<VExprContext> ordering_expr_ctx; |
| RETURN_IF_ERROR(VExpr::create_expr_tree(ordering_expr, ordering_expr_ctx)); |
| _ann_topn_runtime = |
| segment_v2::AnnTopNRuntime::create_shared(asc, limit, ordering_expr_ctx); |
| } |
| |
| // Parse score range filtering parameters and set to ScoreRuntime |
| if (olap_scan_node.__isset.score_range_info) { |
| const auto& score_range_info = olap_scan_node.score_range_info; |
| if (score_range_info.__isset.op && score_range_info.__isset.threshold) { |
| if (_score_runtime) { |
| _score_runtime->set_score_range_info(score_range_info.op, |
| score_range_info.threshold); |
| } |
| } |
| } |
| |
| RETURN_IF_ERROR(Base::init(state, info)); |
| RETURN_IF_ERROR(_sync_cloud_tablets(state)); |
| return Status::OK(); |
| } |
| |
| PushDownType OlapScanLocalState::_should_push_down_binary_predicate( |
| VectorizedFnCall* fn_call, VExprContext* expr_ctx, Field& constant_val, |
| const std::set<std::string> fn_name) const { |
| if (!fn_name.contains(fn_call->fn().name.function_name)) { |
| return PushDownType::UNACCEPTABLE; |
| } |
| const auto& children = fn_call->children(); |
| DCHECK(children.size() == 2); |
| DCHECK_EQ(VExpr::expr_without_cast(children[0])->node_type(), TExprNodeType::SLOT_REF); |
| if (children[1]->is_constant()) { |
| std::shared_ptr<ColumnPtrWrapper> const_col_wrapper; |
| THROW_IF_ERROR(children[1]->get_const_col(expr_ctx, &const_col_wrapper)); |
| const auto* const_column = |
| assert_cast<const ColumnConst*>(const_col_wrapper->column_ptr.get()); |
| constant_val = const_column->operator[](0); |
| return PushDownType::ACCEPTABLE; |
| } else { |
| // only handle constant value |
| return PushDownType::UNACCEPTABLE; |
| } |
| } |
| |
| Status OlapScanLocalState::_init_profile() { |
| RETURN_IF_ERROR(ScanLocalState<OlapScanLocalState>::_init_profile()); |
| // Rows read from storage. |
| // Include the rows read from doris page cache. |
| _scan_rows = ADD_COUNTER(custom_profile(), "ScanRows", TUnit::UNIT); |
| |
| // 1. init segment profile |
| _segment_profile.reset(new RuntimeProfile("SegmentIterator")); |
| _scanner_profile->add_child(_segment_profile.get(), true, nullptr); |
| |
| // 2. init timer and counters |
| _reader_init_timer = ADD_TIMER(_scanner_profile, "ReaderInitTime"); |
| _scanner_init_timer = ADD_TIMER(_scanner_profile, "ScannerInitTime"); |
| _process_conjunct_timer = ADD_TIMER(custom_profile(), "ProcessConjunctTime"); |
| _read_compressed_counter = ADD_COUNTER(_segment_profile, "CompressedBytesRead", TUnit::BYTES); |
| _read_uncompressed_counter = |
| ADD_COUNTER(_segment_profile, "UncompressedBytesRead", TUnit::BYTES); |
| _block_load_timer = ADD_TIMER(_segment_profile, "BlockLoadTime"); |
| _block_load_counter = ADD_COUNTER(_segment_profile, "BlocksLoad", TUnit::UNIT); |
| _block_fetch_timer = ADD_TIMER(_scanner_profile, "BlockFetchTime"); |
| _delete_bitmap_get_agg_timer = ADD_TIMER(_scanner_profile, "DeleteBitmapGetAggTime"); |
| if (config::is_cloud_mode()) { |
| static const char* sync_rowset_timer_name = "SyncRowsetTime"; |
| _sync_rowset_timer = ADD_TIMER(_scanner_profile, sync_rowset_timer_name); |
| _sync_rowset_tablet_meta_cache_hit = |
| ADD_CHILD_COUNTER(_scanner_profile, "SyncRowsetTabletMetaCacheHitCount", |
| TUnit::UNIT, sync_rowset_timer_name); |
| _sync_rowset_tablet_meta_cache_miss = |
| ADD_CHILD_COUNTER(_scanner_profile, "SyncRowsetTabletMetaCacheMissCount", |
| TUnit::UNIT, sync_rowset_timer_name); |
| _sync_rowset_get_remote_tablet_meta_rpc_timer = ADD_CHILD_TIMER( |
| _scanner_profile, "SyncRowsetGetRemoteTabletMetaRpcTime", sync_rowset_timer_name); |
| _sync_rowset_tablets_rowsets_total_num = |
| ADD_CHILD_COUNTER(_scanner_profile, "SyncRowsetTabletsRowsetsTotatCount", |
| TUnit::UNIT, sync_rowset_timer_name); |
| _sync_rowset_get_remote_rowsets_num = |
| ADD_CHILD_COUNTER(_scanner_profile, "SyncRowsetGetRemoteRowsetsCount", TUnit::UNIT, |
| sync_rowset_timer_name); |
| _sync_rowset_get_remote_rowsets_rpc_timer = ADD_CHILD_TIMER( |
| _scanner_profile, "SyncRowsetGetRemoteRowsetsRpcTime", sync_rowset_timer_name); |
| _sync_rowset_get_local_delete_bitmap_rowsets_num = |
| ADD_CHILD_COUNTER(_scanner_profile, "SyncRowsetGetLocalDeleteBitmapRowsetsCount", |
| TUnit::UNIT, sync_rowset_timer_name); |
| _sync_rowset_get_remote_delete_bitmap_rowsets_num = |
| ADD_CHILD_COUNTER(_scanner_profile, "SyncRowsetGetRemoteDeleteBitmapRowsetsCount", |
| TUnit::UNIT, sync_rowset_timer_name); |
| _sync_rowset_get_remote_delete_bitmap_key_count = |
| ADD_CHILD_COUNTER(_scanner_profile, "SyncRowsetGetRemoteDeleteBitmapKeyCount", |
| TUnit::UNIT, sync_rowset_timer_name); |
| _sync_rowset_get_remote_delete_bitmap_bytes = |
| ADD_CHILD_COUNTER(_scanner_profile, "SyncRowsetGetRemoteDeleteBitmapBytes", |
| TUnit::BYTES, sync_rowset_timer_name); |
| _sync_rowset_get_remote_delete_bitmap_rpc_timer = ADD_CHILD_TIMER( |
| _scanner_profile, "SyncRowsetGetRemoteDeleteBitmapRpcTime", sync_rowset_timer_name); |
| _sync_rowset_bthread_schedule_wait_timer = ADD_CHILD_TIMER( |
| _scanner_profile, "SyncRowsetBthreadScheduleWaitTime", sync_rowset_timer_name); |
| _sync_rowset_meta_lock_wait_timer = ADD_CHILD_TIMER( |
| _scanner_profile, "SyncRowsetMetaLockWaitTime", sync_rowset_timer_name); |
| _sync_rowset_sync_meta_lock_wait_timer = ADD_CHILD_TIMER( |
| _scanner_profile, "SyncRowsetSyncMetaLockWaitTime", sync_rowset_timer_name); |
| } |
| _block_init_timer = ADD_TIMER(_segment_profile, "BlockInitTime"); |
| _block_init_seek_timer = ADD_TIMER(_segment_profile, "BlockInitSeekTime"); |
| _block_init_seek_counter = ADD_COUNTER(_segment_profile, "BlockInitSeekCount", TUnit::UNIT); |
| _segment_generate_row_range_by_keys_timer = |
| ADD_TIMER(_segment_profile, "GenerateRowRangeByKeysTime"); |
| _segment_generate_row_range_by_column_conditions_timer = |
| ADD_TIMER(_segment_profile, "GenerateRowRangeByColumnConditionsTime"); |
| _segment_generate_row_range_by_bf_timer = |
| ADD_TIMER(_segment_profile, "GenerateRowRangeByBloomFilterIndexTime"); |
| _collect_iterator_merge_next_timer = ADD_TIMER(_segment_profile, "CollectIteratorMergeTime"); |
| _segment_generate_row_range_by_zonemap_timer = |
| ADD_TIMER(_segment_profile, "GenerateRowRangeByZoneMapIndexTime"); |
| _segment_generate_row_range_by_dict_timer = |
| ADD_TIMER(_segment_profile, "GenerateRowRangeByDictTime"); |
| |
| _rows_vec_cond_filtered_counter = |
| ADD_COUNTER(_segment_profile, "RowsVectorPredFiltered", TUnit::UNIT); |
| _rows_short_circuit_cond_filtered_counter = |
| ADD_COUNTER(_segment_profile, "RowsShortCircuitPredFiltered", TUnit::UNIT); |
| _rows_expr_cond_filtered_counter = |
| ADD_COUNTER(_segment_profile, "RowsExprPredFiltered", TUnit::UNIT); |
| _rows_vec_cond_input_counter = |
| ADD_COUNTER(_segment_profile, "RowsVectorPredInput", TUnit::UNIT); |
| _rows_short_circuit_cond_input_counter = |
| ADD_COUNTER(_segment_profile, "RowsShortCircuitPredInput", TUnit::UNIT); |
| _rows_expr_cond_input_counter = ADD_COUNTER(_segment_profile, "RowsExprPredInput", TUnit::UNIT); |
| _vec_cond_timer = ADD_TIMER(_segment_profile, "VectorPredEvalTime"); |
| _short_cond_timer = ADD_TIMER(_segment_profile, "ShortPredEvalTime"); |
| _expr_filter_timer = ADD_TIMER(_segment_profile, "ExprFilterEvalTime"); |
| _predicate_column_read_timer = ADD_TIMER(_segment_profile, "PredicateColumnReadTime"); |
| _non_predicate_column_read_timer = ADD_TIMER(_segment_profile, "NonPredicateColumnReadTime"); |
| _predicate_column_read_seek_timer = ADD_TIMER(_segment_profile, "PredicateColumnReadSeekTime"); |
| _predicate_column_read_seek_counter = |
| ADD_COUNTER(_segment_profile, "PredicateColumnReadSeekCount", TUnit::UNIT); |
| |
| _lazy_read_timer = ADD_TIMER(_segment_profile, "LazyReadTime"); |
| _lazy_read_seek_timer = ADD_TIMER(_segment_profile, "LazyReadSeekTime"); |
| _lazy_read_seek_counter = ADD_COUNTER(_segment_profile, "LazyReadSeekCount", TUnit::UNIT); |
| |
| _output_col_timer = ADD_TIMER(_segment_profile, "OutputColumnTime"); |
| |
| _stats_filtered_counter = ADD_COUNTER(_segment_profile, "RowsStatsFiltered", TUnit::UNIT); |
| _stats_rp_filtered_counter = |
| ADD_COUNTER(_segment_profile, "RowsZoneMapRuntimePredicateFiltered", TUnit::UNIT); |
| _bf_filtered_counter = ADD_COUNTER(_segment_profile, "RowsBloomFilterFiltered", TUnit::UNIT); |
| _dict_filtered_counter = ADD_COUNTER(_segment_profile, "SegmentDictFiltered", TUnit::UNIT); |
| _del_filtered_counter = ADD_COUNTER(_scanner_profile, "RowsDelFiltered", TUnit::UNIT); |
| _conditions_filtered_counter = |
| ADD_COUNTER(_segment_profile, "RowsConditionsFiltered", TUnit::UNIT); |
| _key_range_filtered_counter = |
| ADD_COUNTER(_segment_profile, "RowsKeyRangeFiltered", TUnit::UNIT); |
| |
| _io_timer = ADD_TIMER(_segment_profile, "IOTimer"); |
| _decompressor_timer = ADD_TIMER(_segment_profile, "DecompressorTimer"); |
| |
| _total_pages_num_counter = ADD_COUNTER(_segment_profile, "TotalPagesNum", TUnit::UNIT); |
| _cached_pages_num_counter = ADD_COUNTER(_segment_profile, "CachedPagesNum", TUnit::UNIT); |
| |
| _statistics_collect_timer = ADD_TIMER(_scanner_profile, "StatisticsCollectTime"); |
| _inverted_index_filter_counter = |
| ADD_COUNTER(_segment_profile, "RowsInvertedIndexFiltered", TUnit::UNIT); |
| _inverted_index_filter_timer = ADD_TIMER(_segment_profile, "InvertedIndexFilterTime"); |
| _inverted_index_query_cache_hit_counter = |
| ADD_COUNTER(_segment_profile, "InvertedIndexQueryCacheHit", TUnit::UNIT); |
| _inverted_index_query_cache_miss_counter = |
| ADD_COUNTER(_segment_profile, "InvertedIndexQueryCacheMiss", TUnit::UNIT); |
| _inverted_index_query_timer = ADD_TIMER(_segment_profile, "InvertedIndexQueryTime"); |
| _inverted_index_query_null_bitmap_timer = |
| ADD_TIMER(_segment_profile, "InvertedIndexQueryNullBitmapTime"); |
| _inverted_index_query_bitmap_copy_timer = |
| ADD_TIMER(_segment_profile, "InvertedIndexQueryBitmapCopyTime"); |
| _inverted_index_searcher_open_timer = |
| ADD_TIMER(_segment_profile, "InvertedIndexSearcherOpenTime"); |
| _inverted_index_searcher_search_timer = |
| ADD_TIMER(_segment_profile, "InvertedIndexSearcherSearchTime"); |
| _inverted_index_searcher_search_init_timer = |
| ADD_TIMER(_segment_profile, "InvertedIndexSearcherSearchInitTime"); |
| _inverted_index_searcher_search_exec_timer = |
| ADD_TIMER(_segment_profile, "InvertedIndexSearcherSearchExecTime"); |
| _inverted_index_searcher_cache_hit_counter = |
| ADD_COUNTER(_segment_profile, "InvertedIndexSearcherCacheHit", TUnit::UNIT); |
| _inverted_index_searcher_cache_miss_counter = |
| ADD_COUNTER(_segment_profile, "InvertedIndexSearcherCacheMiss", TUnit::UNIT); |
| _inverted_index_downgrade_count_counter = |
| ADD_COUNTER(_segment_profile, "InvertedIndexDowngradeCount", TUnit::UNIT); |
| _inverted_index_analyzer_timer = ADD_TIMER(_segment_profile, "InvertedIndexAnalyzerTime"); |
| _inverted_index_lookup_timer = ADD_TIMER(_segment_profile, "InvertedIndexLookupTimer"); |
| |
| _output_index_result_column_timer = ADD_TIMER(_segment_profile, "OutputIndexResultColumnTime"); |
| _filtered_segment_counter = ADD_COUNTER(_segment_profile, "NumSegmentFiltered", TUnit::UNIT); |
| _total_segment_counter = ADD_COUNTER(_segment_profile, "NumSegmentTotal", TUnit::UNIT); |
| _tablet_counter = ADD_COUNTER(custom_profile(), "TabletNum", TUnit::UNIT); |
| _key_range_counter = ADD_COUNTER(custom_profile(), "KeyRangesNum", TUnit::UNIT); |
| _tablet_reader_init_timer = ADD_TIMER(_scanner_profile, "TabletReaderInitTimer"); |
| _tablet_reader_capture_rs_readers_timer = |
| ADD_TIMER(_scanner_profile, "TabletReaderCaptureRsReadersTimer"); |
| _tablet_reader_init_return_columns_timer = |
| ADD_TIMER(_scanner_profile, "TabletReaderInitReturnColumnsTimer"); |
| _tablet_reader_init_keys_param_timer = |
| ADD_TIMER(_scanner_profile, "TabletReaderInitKeysParamTimer"); |
| _tablet_reader_init_orderby_keys_param_timer = |
| ADD_TIMER(_scanner_profile, "TabletReaderInitOrderbyKeysParamTimer"); |
| _tablet_reader_init_conditions_param_timer = |
| ADD_TIMER(_scanner_profile, "TabletReaderInitConditionsParamTimer"); |
| _tablet_reader_init_delete_condition_param_timer = |
| ADD_TIMER(_scanner_profile, "TabletReaderInitDeleteConditionParamTimer"); |
| _block_reader_vcollect_iter_init_timer = |
| ADD_TIMER(_scanner_profile, "BlockReaderVcollectIterInitTimer"); |
| _block_reader_rs_readers_init_timer = |
| ADD_TIMER(_scanner_profile, "BlockReaderRsReadersInitTimer"); |
| _block_reader_build_heap_init_timer = |
| ADD_TIMER(_scanner_profile, "BlockReaderBuildHeapInitTimer"); |
| |
| _rowset_reader_get_segment_iterators_timer = |
| ADD_TIMER(_scanner_profile, "RowsetReaderGetSegmentIteratorsTimer"); |
| _rowset_reader_create_iterators_timer = |
| ADD_TIMER(_scanner_profile, "RowsetReaderCreateIteratorsTimer"); |
| _rowset_reader_init_iterators_timer = |
| ADD_TIMER(_scanner_profile, "RowsetReaderInitIteratorsTimer"); |
| _rowset_reader_load_segments_timer = |
| ADD_TIMER(_scanner_profile, "RowsetReaderLoadSegmentsTimer"); |
| |
| _segment_iterator_init_timer = ADD_TIMER(_scanner_profile, "SegmentIteratorInitTimer"); |
| _segment_iterator_init_return_column_iterators_timer = |
| ADD_TIMER(_scanner_profile, "SegmentIteratorInitReturnColumnIteratorsTimer"); |
| _segment_iterator_init_index_iterators_timer = |
| ADD_TIMER(_scanner_profile, "SegmentIteratorInitIndexIteratorsTimer"); |
| _segment_iterator_init_segment_prefetchers_timer = |
| ADD_TIMER(_scanner_profile, "SegmentIteratorInitSegmentPrefetchersTimer"); |
| |
| _segment_create_column_readers_timer = |
| ADD_TIMER(_scanner_profile, "SegmentCreateColumnReadersTimer"); |
| _segment_load_index_timer = ADD_TIMER(_scanner_profile, "SegmentLoadIndexTimer"); |
| |
| _index_filter_profile = std::make_unique<RuntimeProfile>("IndexFilter"); |
| _scanner_profile->add_child(_index_filter_profile.get(), true, nullptr); |
| /* |
| SegmentIterator: |
| - AnnIndexLoadCosts: 102.262us |
| - AnnIndexRangeSearchCosts: 0ns |
| - AnnIndexRangeSearchFiltered: 0 |
| - AnnIndexTopNCosts: 658.303ms |
| - AnnIndexTopNFiltered: 9.49791M (9497910) |
| - AnnIndexTopNSearchCnt: 209ns |
| */ |
| _ann_range_search_filter_counter = |
| ADD_COUNTER(_segment_profile, "AnnIndexRangeSearchFiltered", TUnit::UNIT); |
| _ann_topn_filter_counter = ADD_COUNTER(_segment_profile, "AnnIndexTopNFiltered", TUnit::UNIT); |
| |
| _ann_topn_search_costs = ADD_TIMER(_segment_profile, "AnnIndexTopNSearchCosts"); |
| _ann_topn_search_cnt = ADD_COUNTER(_segment_profile, "AnnIndexTopNSearchCnt", TUnit::UNIT); |
| _ann_range_search_costs = ADD_TIMER(_segment_profile, "AnnIndexRangeSearchCosts"); |
| _ann_range_search_cnt = ADD_COUNTER(_segment_profile, "AnnIndexRangeSearchCnt", TUnit::UNIT); |
| |
| // Detailed ANN timers (TopN) |
| // Create child timers under AnnIndexTopNSearchCosts for better readability |
| _ann_topn_engine_search_costs = ADD_CHILD_TIMER( |
| _segment_profile, "AnnIndexTopNEngineSearchCosts", "AnnIndexTopNSearchCosts"); |
| _ann_index_load_costs = ADD_TIMER(_segment_profile, "AnnIndexLoadCosts"); |
| _ann_topn_post_process_costs = ADD_CHILD_TIMER( |
| _segment_profile, "AnnIndexTopNResultPostProcessCosts", "AnnIndexTopNSearchCosts"); |
| _ann_topn_pre_process_costs = ADD_CHILD_TIMER( |
| _segment_profile, "AnnIndexTopNEnginePrepareCosts", "AnnIndexTopNSearchCosts"); |
| // Detailed ANN timers (Range) |
| // Create child timers under AnnIndexRangeSearchCosts to mirror TopN hierarchy |
| _ann_range_engine_search_costs = ADD_CHILD_TIMER( |
| _segment_profile, "AnnIndexRangeEngineSearchCosts", "AnnIndexRangeSearchCosts"); |
| _ann_range_post_process_costs = ADD_CHILD_TIMER( |
| _segment_profile, "AnnIndexRangeResultPostProcessCosts", "AnnIndexRangeSearchCosts"); |
| _ann_range_pre_process_costs = ADD_CHILD_TIMER( |
| _segment_profile, "AnnIndexRangeEnginePrepareCosts", "AnnIndexRangeSearchCosts"); |
| // Conversion inside FAISS wrappers (TopN): two separate sub counters under post process |
| _ann_topn_engine_convert_costs = |
| ADD_CHILD_TIMER(_segment_profile, "AnnIndexTopNEngineConvertCosts", |
| "AnnIndexTopNResultPostProcessCosts"); |
| _ann_range_engine_convert_costs = |
| ADD_CHILD_TIMER(_segment_profile, "AnnIndexRangeEngineConvertCosts", |
| "AnnIndexRangeResultPostProcessCosts"); |
| // Keep this as a child of post process to show the sum for Doris-side handling |
| _ann_topn_result_convert_costs = |
| ADD_CHILD_TIMER(_segment_profile, "AnnIndexTopNResultConvertCosts", |
| "AnnIndexTopNResultPostProcessCosts"); |
| _ann_range_result_convert_costs = |
| ADD_CHILD_TIMER(_segment_profile, "AnnIndexRangeResultConvertCosts", |
| "AnnIndexRangeResultPostProcessCosts"); |
| _ann_fallback_brute_force_cnt = |
| ADD_COUNTER(_segment_profile, "AnnIndexFallbackBruteForceCnt", TUnit::UNIT); |
| _variant_scan_sparse_column_timer = ADD_TIMER(_segment_profile, "VariantScanSparseColumnTimer"); |
| _variant_scan_sparse_column_bytes = |
| ADD_COUNTER(_segment_profile, "VariantScanSparseColumnBytes", TUnit::BYTES); |
| _variant_fill_path_from_sparse_column_timer = |
| ADD_TIMER(_segment_profile, "VariantFillPathFromSparseColumnTimer"); |
| _variant_subtree_default_iter_count = |
| ADD_COUNTER(_segment_profile, "VariantSubtreeDefaultIterCount", TUnit::UNIT); |
| _variant_subtree_leaf_iter_count = |
| ADD_COUNTER(_segment_profile, "VariantSubtreeLeafIterCount", TUnit::UNIT); |
| _variant_subtree_hierarchical_iter_count = |
| ADD_COUNTER(_segment_profile, "VariantSubtreeHierarchicalIterCount", TUnit::UNIT); |
| _variant_subtree_sparse_iter_count = |
| ADD_COUNTER(_segment_profile, "VariantSubtreeSparseIterCount", TUnit::UNIT); |
| _variant_doc_value_column_iter_count = |
| ADD_COUNTER(_segment_profile, "VariantDocValueColumnIterCount", TUnit::UNIT); |
| |
| return Status::OK(); |
| } |
| |
| Status OlapScanLocalState::_process_conjuncts(RuntimeState* state) { |
| SCOPED_TIMER(_process_conjunct_timer); |
| RETURN_IF_ERROR(ScanLocalState::_process_conjuncts(state)); |
| if (ScanLocalState::_eos) { |
| return Status::OK(); |
| } |
| RETURN_IF_ERROR(_build_key_ranges_and_filters()); |
| return Status::OK(); |
| } |
| |
| bool OlapScanLocalState::_is_key_column(const std::string& key_name) { |
| // all column in dup_keys table or unique_keys with merge on write table olap scan node threat |
| // as key column |
| if (_storage_no_merge()) { |
| return true; |
| } |
| |
| auto& p = _parent->cast<OlapScanOperatorX>(); |
| auto res = std::find(p._olap_scan_node.key_column_name.begin(), |
| p._olap_scan_node.key_column_name.end(), key_name); |
| return res != p._olap_scan_node.key_column_name.end(); |
| } |
| |
| Status OlapScanLocalState::_should_push_down_function_filter(VectorizedFnCall* fn_call, |
| VExprContext* expr_ctx, |
| StringRef* constant_str, |
| doris::FunctionContext** fn_ctx, |
| PushDownType& pdt) { |
| // Now only `like` function filters is supported to push down |
| if (fn_call->fn().name.function_name != "like") { |
| pdt = PushDownType::UNACCEPTABLE; |
| return Status::OK(); |
| } |
| |
| const auto& children = fn_call->children(); |
| doris::FunctionContext* func_cxt = expr_ctx->fn_context(fn_call->fn_context_index()); |
| DCHECK(func_cxt != nullptr); |
| DCHECK(children.size() == 2); |
| for (size_t i = 0; i < children.size(); i++) { |
| if (VExpr::expr_without_cast(children[i])->node_type() != TExprNodeType::SLOT_REF) { |
| // not a slot ref(column) |
| continue; |
| } |
| if (!children[1 - i]->is_constant()) { |
| // only handle constant value |
| pdt = PushDownType::UNACCEPTABLE; |
| return Status::OK(); |
| } else { |
| DCHECK(is_string_type(children[1 - i]->data_type()->get_primitive_type())); |
| std::shared_ptr<ColumnPtrWrapper> const_col_wrapper; |
| RETURN_IF_ERROR(children[1 - i]->get_const_col(expr_ctx, &const_col_wrapper)); |
| if (const auto* const_column = |
| check_and_get_column<ColumnConst>(const_col_wrapper->column_ptr.get())) { |
| *constant_str = const_column->get_data_at(0); |
| } else { |
| pdt = PushDownType::UNACCEPTABLE; |
| return Status::OK(); |
| } |
| } |
| } |
| *fn_ctx = func_cxt; |
| pdt = PushDownType::ACCEPTABLE; |
| return Status::OK(); |
| } |
| |
| bool OlapScanLocalState::_should_push_down_common_expr() { |
| return state()->enable_common_expr_pushdown() && _storage_no_merge(); |
| } |
| |
| bool OlapScanLocalState::_storage_no_merge() { |
| auto& p = _parent->cast<OlapScanOperatorX>(); |
| return (p._olap_scan_node.keyType == TKeysType::DUP_KEYS || |
| (p._olap_scan_node.keyType == TKeysType::UNIQUE_KEYS && |
| p._olap_scan_node.__isset.enable_unique_key_merge_on_write && |
| p._olap_scan_node.enable_unique_key_merge_on_write)) || |
| _read_mor_as_dup(); |
| } |
| |
| bool OlapScanLocalState::_read_mor_as_dup() { |
| auto& p = _parent->cast<OlapScanOperatorX>(); |
| return p._olap_scan_node.__isset.read_mor_as_dup && p._olap_scan_node.read_mor_as_dup; |
| } |
| |
| Status OlapScanLocalState::_init_scanners(std::list<ScannerSPtr>* scanners) { |
| if (_scan_ranges.empty()) { |
| _eos = true; |
| _scan_dependency->set_ready(); |
| return Status::OK(); |
| } |
| SCOPED_TIMER(_scanner_init_timer); |
| auto& p = _parent->cast<OlapScanOperatorX>(); |
| |
| for (auto uid : p._olap_scan_node.output_column_unique_ids) { |
| _output_column_ids.emplace(uid); |
| } |
| |
| // Step 3: convert accumulated scan key pairs into OlapScanRange objects. |
| // Each OlapScanRange carries real begin/end OlapTuples with has_lower_bound = true. |
| RETURN_IF_ERROR(_scan_keys.get_key_range(&_cond_ranges)); |
| // If no key predicates were pushed down, _cond_ranges is empty. |
| // Create a single default-constructed OlapScanRange (has_lower_bound = false) |
| // to represent a full table scan. Consumers detect this and skip pushing |
| // key range to the tablet reader. |
| if (_cond_ranges.empty()) { |
| _cond_ranges.emplace_back(new doris::OlapScanRange()); |
| } |
| |
| bool enable_parallel_scan = state()->enable_parallel_scan(); |
| |
| // The flag of preagg's meaning is whether return pre agg data(or partial agg data) |
| // PreAgg ON: The storage layer returns partially aggregated data without additional processing. (Fast data reading) |
| // for example, if a table is select userid,count(*) from base table. |
| // And the user send a query like select userid,count(*) from base table group by userid. |
| // then the storage layer do not need do aggregation, it could just return the partial agg data, because the compute layer will do aggregation. |
| // PreAgg OFF: The storage layer must complete pre-aggregation and return fully aggregated data. (Slow data reading) |
| if (enable_parallel_scan && !p._should_run_serial && |
| p._push_down_agg_type == TPushAggOp::NONE && |
| (_storage_no_merge() || p._olap_scan_node.is_preaggregation)) { |
| // Filter out the "full scan" placeholder range (has_lower_bound == false) |
| // so that only ranges with real key bounds are forwarded to the parallel scanner. |
| std::vector<OlapScanRange*> key_ranges; |
| for (auto& range : _cond_ranges) { |
| if (!range->has_lower_bound) { |
| continue; |
| } |
| key_ranges.emplace_back(range.get()); |
| } |
| |
| ParallelScannerBuilder scanner_builder(this, _tablets, _read_sources, _scanner_profile, |
| key_ranges, state(), p._limit, true, |
| p._olap_scan_node.is_preaggregation); |
| |
| int max_scanners_count = state()->parallel_scan_max_scanners_count(); |
| |
| // If the `max_scanners_count` was not set, |
| // use `CpuInfo::num_cores()` as the default value. |
| if (max_scanners_count <= 0) { |
| max_scanners_count = CpuInfo::num_cores(); |
| } |
| |
| // Too small value of `min_rows_per_scanner` is meaningless. |
| auto min_rows_per_scanner = |
| std::max<int64_t>(1024, state()->parallel_scan_min_rows_per_scanner()); |
| scanner_builder.set_max_scanners_count(max_scanners_count); |
| scanner_builder.set_min_rows_per_scanner(min_rows_per_scanner); |
| // If the session variable is set, force one scanner per segment. |
| if (state()->query_options().__isset.optimize_index_scan_parallelism && |
| state()->query_options().optimize_index_scan_parallelism) { |
| // TODO: Use optimize_index_scan_parallelism for ann range search in the future. |
| // Currently, ann topn is enough |
| if (_ann_topn_runtime != nullptr) { |
| scanner_builder.set_scan_parallelism_by_per_segment(true); |
| } |
| } |
| |
| RETURN_IF_ERROR(scanner_builder.build_scanners(*scanners)); |
| for (auto& scanner : *scanners) { |
| auto* olap_scanner = assert_cast<OlapScanner*>(scanner.get()); |
| RETURN_IF_ERROR(olap_scanner->init(state(), _conjuncts)); |
| } |
| |
| const OlapReaderStatistics* stats = scanner_builder.builder_stats(); |
| io::FileCacheProfileReporter cache_profile(_segment_profile.get()); |
| cache_profile.update(&stats->file_cache_stats); |
| |
| DorisMetrics::instance()->query_scan_bytes_from_local->increment( |
| stats->file_cache_stats.bytes_read_from_local); |
| DorisMetrics::instance()->query_scan_bytes_from_remote->increment( |
| stats->file_cache_stats.bytes_read_from_remote); |
| |
| return Status::OK(); |
| } |
| |
| int scanners_per_tablet = std::max(1, 64 / (int)_scan_ranges.size()); |
| for (size_t scan_range_idx = 0; scan_range_idx < _scan_ranges.size(); scan_range_idx++) { |
| int64_t version = 0; |
| std::from_chars(_scan_ranges[scan_range_idx]->version.data(), |
| _scan_ranges[scan_range_idx]->version.data() + |
| _scan_ranges[scan_range_idx]->version.size(), |
| version); |
| std::vector<std::unique_ptr<doris::OlapScanRange>>* ranges = &_cond_ranges; |
| int size_based_scanners_per_tablet = 1; |
| |
| if (config::doris_scan_range_max_mb > 0) { |
| size_based_scanners_per_tablet = |
| std::max(1, (int)(_tablets[scan_range_idx].tablet->tablet_footprint() / |
| (config::doris_scan_range_max_mb << 20))); |
| } |
| int ranges_per_scanner = |
| std::max(1, (int)ranges->size() / |
| std::min(scanners_per_tablet, size_based_scanners_per_tablet)); |
| int64_t num_ranges = ranges->size(); |
| for (int64_t i = 0; i < num_ranges;) { |
| std::vector<doris::OlapScanRange*> scanner_ranges; |
| scanner_ranges.push_back((*ranges)[i].get()); |
| ++i; |
| for (int64_t j = 1; i < num_ranges && j < ranges_per_scanner && |
| (*ranges)[i]->end_include == (*ranges)[i - 1]->end_include; |
| ++j, ++i) { |
| scanner_ranges.push_back((*ranges)[i].get()); |
| } |
| |
| COUNTER_UPDATE(_key_range_counter, scanner_ranges.size()); |
| // `rs_reader` should not be shared by different scanners |
| for (auto& split : _read_sources[scan_range_idx].rs_splits) { |
| split.rs_reader = split.rs_reader->clone(); |
| } |
| auto scanner = |
| OlapScanner::create_shared(this, OlapScanner::Params { |
| state(), |
| _scanner_profile.get(), |
| scanner_ranges, |
| _tablets[scan_range_idx].tablet, |
| version, |
| _read_sources[scan_range_idx], |
| p._limit, |
| p._olap_scan_node.is_preaggregation, |
| }); |
| RETURN_IF_ERROR(scanner->init(state(), _conjuncts)); |
| scanners->push_back(std::move(scanner)); |
| } |
| } |
| _tablets.clear(); |
| _read_sources.clear(); |
| |
| return Status::OK(); |
| } |
| |
| Status OlapScanLocalState::_sync_cloud_tablets(RuntimeState* state) { |
| if (config::is_cloud_mode() && !_sync_tablet) { |
| _pending_tablets_num = _scan_ranges.size(); |
| if (_pending_tablets_num > 0) { |
| _sync_cloud_tablets_watcher.start(); |
| _cloud_tablet_dependency = Dependency::create_shared( |
| _parent->operator_id(), _parent->node_id(), "CLOUD_TABLET_DEP"); |
| _tablets.resize(_scan_ranges.size()); |
| std::vector<std::function<Status()>> tasks; |
| _sync_statistics.resize(_scan_ranges.size()); |
| for (size_t i = 0; i < _scan_ranges.size(); i++) { |
| auto* sync_stats = &_sync_statistics[i]; |
| int64_t version = 0; |
| std::from_chars(_scan_ranges[i]->version.data(), |
| _scan_ranges[i]->version.data() + _scan_ranges[i]->version.size(), |
| version); |
| auto task_ctx = state->get_task_execution_context(); |
| auto task_create_time = std::chrono::steady_clock::now(); |
| tasks.emplace_back([this, sync_stats, version, i, task_ctx, task_create_time]() { |
| // Record bthread scheduling delay |
| auto task_start_time = std::chrono::steady_clock::now(); |
| if (sync_stats) { |
| sync_stats->bthread_schedule_delay_ns += |
| std::chrono::duration_cast<std::chrono::nanoseconds>( |
| task_start_time - task_create_time) |
| .count(); |
| } |
| auto task_lock = task_ctx.lock(); |
| if (task_lock == nullptr) { |
| return Status::OK(); |
| } |
| Defer defer([&] { |
| if (_pending_tablets_num.fetch_sub(1) == 1) { |
| _cloud_tablet_dependency->set_ready(); |
| _sync_cloud_tablets_watcher.stop(); |
| } |
| }); |
| auto tablet = |
| DORIS_TRY(ExecEnv::get_tablet(_scan_ranges[i]->tablet_id, sync_stats)); |
| _tablets[i] = {std::move(tablet), version}; |
| SyncOptions options; |
| options.query_version = version; |
| options.merge_schema = true; |
| RETURN_IF_ERROR(std::dynamic_pointer_cast<CloudTablet>(_tablets[i].tablet) |
| ->sync_rowsets(options, sync_stats)); |
| // FIXME(plat1ko): Avoid pointer cast |
| ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_hotspot().count( |
| *_tablets[i].tablet); |
| return Status::OK(); |
| }); |
| } |
| RETURN_IF_ERROR(cloud::bthread_fork_join(std::move(tasks), |
| config::init_scanner_sync_rowsets_parallelism, |
| &_cloud_tablet_future)); |
| } |
| _sync_tablet = true; |
| } |
| return Status::OK(); |
| } |
| |
| Status OlapScanLocalState::prepare(RuntimeState* state) { |
| if (_prepared) { |
| return Status::OK(); |
| } |
| MonotonicStopWatch timer; |
| timer.start(); |
| _read_sources.resize(_scan_ranges.size()); |
| |
| if (config::is_cloud_mode()) { |
| if (!_cloud_tablet_dependency || |
| _cloud_tablet_dependency->is_blocked_by(nullptr) != nullptr) { |
| // Remote tablet still in-flight. |
| return Status::OK(); |
| } |
| COUNTER_UPDATE(_sync_rowset_timer, _sync_cloud_tablets_watcher.elapsed_time()); |
| RETURN_IF_ERROR(_cloud_tablet_future.get()); |
| auto total_rowsets = std::accumulate( |
| _tablets.cbegin(), _tablets.cend(), 0LL, |
| [](long long acc, const auto& tabletWithVersion) { |
| return acc + tabletWithVersion.tablet->tablet_meta()->all_rs_metas().size(); |
| }); |
| COUNTER_UPDATE(_sync_rowset_tablets_rowsets_total_num, total_rowsets); |
| for (const auto& sync_stats : _sync_statistics) { |
| COUNTER_UPDATE(_sync_rowset_tablet_meta_cache_hit, sync_stats.tablet_meta_cache_hit); |
| COUNTER_UPDATE(_sync_rowset_tablet_meta_cache_miss, sync_stats.tablet_meta_cache_miss); |
| COUNTER_UPDATE(_sync_rowset_get_remote_tablet_meta_rpc_timer, |
| sync_stats.get_remote_tablet_meta_rpc_ns); |
| COUNTER_UPDATE(_sync_rowset_get_remote_rowsets_num, sync_stats.get_remote_rowsets_num); |
| COUNTER_UPDATE(_sync_rowset_get_remote_rowsets_rpc_timer, |
| sync_stats.get_remote_rowsets_rpc_ns); |
| COUNTER_UPDATE(_sync_rowset_get_local_delete_bitmap_rowsets_num, |
| sync_stats.get_local_delete_bitmap_rowsets_num); |
| COUNTER_UPDATE(_sync_rowset_get_remote_delete_bitmap_rowsets_num, |
| sync_stats.get_remote_delete_bitmap_rowsets_num); |
| COUNTER_UPDATE(_sync_rowset_get_remote_delete_bitmap_key_count, |
| sync_stats.get_remote_delete_bitmap_key_count); |
| COUNTER_UPDATE(_sync_rowset_get_remote_delete_bitmap_bytes, |
| sync_stats.get_remote_delete_bitmap_bytes); |
| COUNTER_UPDATE(_sync_rowset_get_remote_delete_bitmap_rpc_timer, |
| sync_stats.get_remote_delete_bitmap_rpc_ns); |
| COUNTER_UPDATE(_sync_rowset_bthread_schedule_wait_timer, |
| sync_stats.bthread_schedule_delay_ns); |
| COUNTER_UPDATE(_sync_rowset_meta_lock_wait_timer, sync_stats.meta_lock_wait_ns); |
| COUNTER_UPDATE(_sync_rowset_sync_meta_lock_wait_timer, |
| sync_stats.sync_meta_lock_wait_ns); |
| } |
| auto time_ms = _sync_cloud_tablets_watcher.elapsed_time_microseconds(); |
| if (time_ms >= config::sync_rowsets_slow_threshold_ms) { |
| DorisMetrics::instance()->get_remote_tablet_slow_time_ms->increment(time_ms); |
| DorisMetrics::instance()->get_remote_tablet_slow_cnt->increment(1); |
| LOG_WARNING("get tablet takes too long") |
| .tag("query_id", print_id(PipelineXLocalState<>::_state->query_id())) |
| .tag("node_id", _parent->node_id()) |
| .tag("total_time", |
| PrettyPrinter::print(_sync_cloud_tablets_watcher.elapsed_time(), |
| TUnit::TIME_NS)) |
| .tag("num_tablets", _tablets.size()) |
| .tag("tablet_meta_cache_hit", _sync_rowset_tablet_meta_cache_hit->value()) |
| .tag("tablet_meta_cache_miss", _sync_rowset_tablet_meta_cache_miss->value()) |
| .tag("get_remote_tablet_meta_rpc_time", |
| PrettyPrinter::print( |
| _sync_rowset_get_remote_tablet_meta_rpc_timer->value(), |
| TUnit::TIME_NS)) |
| .tag("remote_rowsets_num", _sync_rowset_get_remote_rowsets_num->value()) |
| .tag("get_remote_rowsets_rpc_time", |
| PrettyPrinter::print(_sync_rowset_get_remote_rowsets_rpc_timer->value(), |
| TUnit::TIME_NS)) |
| .tag("local_delete_bitmap_rowsets_num", |
| _sync_rowset_get_local_delete_bitmap_rowsets_num->value()) |
| .tag("remote_delete_bitmap_rowsets_num", |
| _sync_rowset_get_remote_delete_bitmap_rowsets_num->value()) |
| .tag("remote_delete_bitmap_key_count", |
| _sync_rowset_get_remote_delete_bitmap_key_count->value()) |
| .tag("remote_delete_bitmap_bytes", |
| PrettyPrinter::print(_sync_rowset_get_remote_delete_bitmap_bytes->value(), |
| TUnit::BYTES)) |
| .tag("get_remote_delete_bitmap_rpc_time", |
| PrettyPrinter::print( |
| _sync_rowset_get_remote_delete_bitmap_rpc_timer->value(), |
| TUnit::TIME_NS)); |
| } |
| } else { |
| _tablets.resize(_scan_ranges.size()); |
| for (size_t i = 0; i < _scan_ranges.size(); i++) { |
| int64_t version = 0; |
| std::from_chars(_scan_ranges[i]->version.data(), |
| _scan_ranges[i]->version.data() + _scan_ranges[i]->version.size(), |
| version); |
| auto tablet = DORIS_TRY(ExecEnv::get_tablet(_scan_ranges[i]->tablet_id)); |
| _tablets[i] = {std::move(tablet), version}; |
| } |
| } |
| |
| for (size_t i = 0; i < _scan_ranges.size(); i++) { |
| _read_sources[i] = DORIS_TRY(_tablets[i].tablet->capture_read_source( |
| {0, _tablets[i].version}, |
| {.skip_missing_versions = _state->skip_missing_version(), |
| .enable_fetch_rowsets_from_peers = config::enable_fetch_rowsets_from_peer_replicas, |
| .enable_prefer_cached_rowset = |
| config::is_cloud_mode() ? _state->enable_prefer_cached_rowset() : false, |
| .query_freshness_tolerance_ms = |
| config::is_cloud_mode() ? _state->query_freshness_tolerance_ms() : -1})); |
| if (!PipelineXLocalState<>::_state->skip_delete_predicate()) { |
| _read_sources[i].fill_delete_predicates(); |
| } |
| if (config::enable_mow_verbose_log && |
| _tablets[i].tablet->enable_unique_key_merge_on_write()) { |
| LOG_INFO("finish capture_rs_readers for tablet={}, query_id={}", |
| _tablets[i].tablet->tablet_id(), |
| print_id(PipelineXLocalState<>::_state->query_id())); |
| } |
| } |
| timer.stop(); |
| double cost_secs = static_cast<double>(timer.elapsed_time()) / NANOS_PER_SEC; |
| if (cost_secs > 1) { |
| LOG_WARNING( |
| "Try to hold tablets costs {} seconds, it costs too much. (Query-ID={}, NodeId={}, " |
| "ScanRangeNum={})", |
| cost_secs, print_id(PipelineXLocalState<>::_state->query_id()), _parent->node_id(), |
| _scan_ranges.size()); |
| } |
| _prepared = true; |
| return Status::OK(); |
| } |
| |
| Status OlapScanLocalState::open(RuntimeState* state) { |
| auto& p = _parent->cast<OlapScanOperatorX>(); |
| for (const auto& pair : p._slot_id_to_slot_desc) { |
| const SlotDescriptor* slot_desc = pair.second; |
| std::shared_ptr<doris::TExpr> virtual_col_expr = slot_desc->get_virtual_column_expr(); |
| if (virtual_col_expr) { |
| std::shared_ptr<doris::VExprContext> virtual_column_expr_ctx; |
| RETURN_IF_ERROR(VExpr::create_expr_tree(*virtual_col_expr, virtual_column_expr_ctx)); |
| RETURN_IF_ERROR(virtual_column_expr_ctx->prepare(state, p.intermediate_row_desc())); |
| RETURN_IF_ERROR(virtual_column_expr_ctx->open(state)); |
| |
| _slot_id_to_virtual_column_expr[slot_desc->id()] = virtual_column_expr_ctx; |
| _slot_id_to_col_type[slot_desc->id()] = slot_desc->get_data_type_ptr(); |
| int col_pos = p.intermediate_row_desc().get_column_id(slot_desc->id()); |
| if (col_pos < 0) { |
| return Status::InternalError( |
| "Invalid virtual slot, can not find its information. Slot desc:\n{}\nRow " |
| "desc:\n{}", |
| slot_desc->debug_string(), p.row_desc().debug_string()); |
| } else { |
| _slot_id_to_index_in_block[slot_desc->id()] = col_pos; |
| } |
| } |
| } |
| |
| if (_score_runtime) { |
| RETURN_IF_ERROR(_score_runtime->prepare(state, p.intermediate_row_desc())); |
| } |
| |
| if (_ann_topn_runtime) { |
| RETURN_IF_ERROR(_ann_topn_runtime->prepare(state, p.intermediate_row_desc())); |
| } |
| |
| RETURN_IF_ERROR(ScanLocalState<OlapScanLocalState>::open(state)); |
| |
| return Status::OK(); |
| } |
| |
| TOlapScanNode& OlapScanLocalState::olap_scan_node() const { |
| return _parent->cast<OlapScanOperatorX>()._olap_scan_node; |
| } |
| |
| void OlapScanLocalState::set_scan_ranges(RuntimeState* state, |
| const std::vector<TScanRangeParams>& scan_ranges) { |
| const auto& cache_param = _parent->cast<OlapScanOperatorX>()._cache_param; |
| bool hit_cache = false; |
| if (!cache_param.digest.empty() && !cache_param.force_refresh_query_cache) { |
| std::string cache_key; |
| int64_t version = 0; |
| auto status = QueryCache::build_cache_key(scan_ranges, cache_param, &cache_key, &version); |
| if (!status.ok()) { |
| throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, status.msg()); |
| } |
| doris::QueryCacheHandle handle; |
| hit_cache = QueryCache::instance()->lookup(cache_key, version, &handle); |
| } |
| |
| if (!hit_cache) { |
| for (auto& scan_range : scan_ranges) { |
| DCHECK(scan_range.scan_range.__isset.palo_scan_range); |
| _scan_ranges.emplace_back(new TPaloScanRange(scan_range.scan_range.palo_scan_range)); |
| COUNTER_UPDATE(_tablet_counter, 1); |
| } |
| } |
| } |
| |
| static std::string tablets_id_to_string( |
| const std::vector<std::unique_ptr<TPaloScanRange>>& scan_ranges) { |
| if (scan_ranges.empty()) { |
| return "[empty]"; |
| } |
| std::stringstream ss; |
| ss << "[" << scan_ranges[0]->tablet_id; |
| for (int i = 1; i < scan_ranges.size(); ++i) { |
| ss << ", " << scan_ranges[i]->tablet_id; |
| } |
| ss << "]"; |
| return ss.str(); |
| } |
| |
| inline std::string push_down_agg_to_string(const TPushAggOp::type& op) { |
| if (op == TPushAggOp::MINMAX) { |
| return "MINMAX"; |
| } else if (op == TPushAggOp::COUNT) { |
| return "COUNT"; |
| } else if (op == TPushAggOp::MIX) { |
| return "MIX"; |
| } else { |
| return "NONE"; |
| } |
| } |
| |
| /// Step 2 of the scan-key generation pipeline. |
| /// |
| /// Iterate key columns in schema order; for each one, look up its ColumnValueRange |
| /// from _slot_id_to_value_range (populated by _normalize_conjuncts) and call |
| /// _scan_keys.extend_scan_key() to grow the multi-column prefix key set. |
| /// |
| /// Example – table t(k1 INT, k2 INT, v INT), key columns = (k1, k2): |
| /// Input ColumnValueRanges: |
| /// k1: fixed_values = {1, 2} |
| /// k2: fixed_values = {10} |
| /// After extend_scan_key(k1): |
| /// _begin_scan_keys = [(1), (2)] _end_scan_keys = [(1), (2)] |
| /// After extend_scan_key(k2): |
| /// _begin_scan_keys = [(1,10), (2,10)] _end_scan_keys = [(1,10), (2,10)] |
| /// |
| /// Loop terminates when: |
| /// - A key column has no predicate (break) |
| /// - A range column was appended (_has_range_value, cannot extend further) |
| /// - The ColumnValueRange is provably empty (eos) |
| /// - The fixed-value set exceeds max_scan_key_num (should_break or fall back to range) |
| /// |
| /// At the end, _scan_keys.get_key_range() converts these into OlapScanRange objects. |
| Status OlapScanLocalState::_build_key_ranges_and_filters() { |
| auto& p = _parent->cast<OlapScanOperatorX>(); |
| if (p._push_down_agg_type == TPushAggOp::NONE || |
| p._push_down_agg_type == TPushAggOp::COUNT_ON_INDEX) { |
| const std::vector<std::string>& column_names = p._olap_scan_node.key_column_name; |
| const std::vector<TPrimitiveType::type>& column_types = p._olap_scan_node.key_column_type; |
| DCHECK(column_types.size() == column_names.size()); |
| |
| // 1. construct scan key except last olap engine short key |
| _scan_keys.set_is_convertible(p.limit() == -1); |
| |
| // we use `exact_range` to identify a key range is an exact range or not when we convert |
| // it to `_scan_keys`. If `exact_range` is true, we can just discard it from `_olap_filters`. |
| bool exact_range = true; |
| |
| // If the `_scan_keys` cannot extend by the range of column, should stop. |
| bool should_break = false; |
| |
| bool eos = false; |
| for (int column_index = 0; column_index < column_names.size() && |
| !_scan_keys.has_range_value() && !eos && !should_break; |
| ++column_index) { |
| if (p._colname_to_slot_id.find(column_names[column_index]) == |
| p._colname_to_slot_id.end()) { |
| break; |
| } |
| auto iter = |
| _slot_id_to_value_range.find(p._colname_to_slot_id[column_names[column_index]]); |
| if (_slot_id_to_value_range.end() == iter) { |
| break; |
| } |
| DCHECK(_slot_id_to_predicates.count(iter->first) > 0); |
| const auto& value_range = iter->second; |
| |
| std::optional<int> key_to_erase; |
| |
| RETURN_IF_ERROR(std::visit( |
| [&](auto&& range) { |
| // make a copy or range and pass to extend_scan_key, keep the range unchanged |
| // because extend_scan_key method may change the first parameter. |
| // but the original range may be converted to olap filters, if it's not a exact_range. |
| auto temp_range = range; |
| if (range.get_fixed_value_size() <= p._max_pushdown_conditions_per_column) { |
| RETURN_IF_ERROR( |
| _scan_keys.extend_scan_key(temp_range, p._max_scan_key_num, |
| &exact_range, &eos, &should_break)); |
| if (exact_range) { |
| key_to_erase = iter->first; |
| } |
| } else { |
| // if exceed max_pushdown_conditions_per_column, use whole_value_rang instead |
| // and will not erase from _slot_id_to_value_range, it must be not exact_range |
| temp_range.set_whole_value_range(); |
| RETURN_IF_ERROR( |
| _scan_keys.extend_scan_key(temp_range, p._max_scan_key_num, |
| &exact_range, &eos, &should_break)); |
| } |
| return Status::OK(); |
| }, |
| value_range)); |
| |
| // Perform the erase operation after the visit is complete, still under lock |
| if (key_to_erase.has_value()) { |
| _slot_id_to_value_range.erase(*key_to_erase); |
| |
| std::vector<std::shared_ptr<ColumnPredicate>> new_predicates; |
| for (const auto& it : _slot_id_to_predicates[*key_to_erase]) { |
| if (!it->could_be_erased()) { |
| new_predicates.push_back(it); |
| } |
| } |
| if (new_predicates.empty()) { |
| _slot_id_to_predicates.erase(*key_to_erase); |
| } else { |
| _slot_id_to_predicates[*key_to_erase] = new_predicates; |
| } |
| } |
| // lock is released here when it goes out of scope |
| } |
| if (eos) { |
| _eos = true; |
| _scan_dependency->set_ready(); |
| } |
| } else { |
| custom_profile()->add_info_string("PushDownAggregate", |
| push_down_agg_to_string(p._push_down_agg_type)); |
| } |
| |
| if (state()->enable_profile()) { |
| custom_profile()->add_info_string("KeyRanges", _scan_keys.debug_string()); |
| custom_profile()->add_info_string("TabletIds", tablets_id_to_string(_scan_ranges)); |
| } |
| VLOG_CRITICAL << _scan_keys.debug_string(); |
| |
| return Status::OK(); |
| } |
| |
| OlapScanOperatorX::OlapScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, |
| const DescriptorTbl& descs, int parallel_tasks, |
| const TQueryCacheParam& param) |
| : ScanOperatorX<OlapScanLocalState>(pool, tnode, operator_id, descs, parallel_tasks), |
| _olap_scan_node(tnode.olap_scan_node), |
| _cache_param(param) { |
| _output_tuple_id = tnode.olap_scan_node.tuple_id; |
| if (_olap_scan_node.__isset.sort_info && _olap_scan_node.__isset.sort_limit) { |
| _limit_per_scanner = _olap_scan_node.sort_limit; |
| } |
| DBUG_EXECUTE_IF("segment_iterator.topn_opt_1", { |
| LOG(INFO) << "limit_per_scanner: " << _limit_per_scanner |
| << ", sort_limit: " << _olap_scan_node.sort_limit |
| << ", isset.sort_limit: " << _olap_scan_node.__isset.sort_limit; |
| }) |
| |
| if (_olap_scan_node.__isset.columns_desc && !_olap_scan_node.columns_desc.empty() && |
| _olap_scan_node.columns_desc[0].col_unique_id >= 0) { |
| _tablet_schema = std::make_shared<TabletSchema>(); |
| _tablet_schema->clear_columns(); |
| for (const auto& column_desc : _olap_scan_node.columns_desc) { |
| _tablet_schema->append_column(TabletColumn(column_desc)); |
| } |
| if (_olap_scan_node.__isset.schema_version) { |
| _tablet_schema->set_schema_version(_olap_scan_node.schema_version); |
| } |
| if (_olap_scan_node.__isset.indexes_desc) { |
| _tablet_schema->update_indexes_from_thrift(_olap_scan_node.indexes_desc); |
| } |
| } |
| } |
| |
| #include "common/compile_check_end.h" |
| } // namespace doris |