| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "pipeline/exec/olap_scan_operator.h" |
| |
| #include <fmt/format.h> |
| |
| #include <memory> |
| #include <numeric> |
| |
| #include "cloud/cloud_meta_mgr.h" |
| #include "cloud/cloud_storage_engine.h" |
| #include "cloud/cloud_tablet.h" |
| #include "cloud/cloud_tablet_hotspot.h" |
| #include "cloud/config.h" |
| #include "io/cache/block_file_cache_profile.h" |
| #include "olap/parallel_scanner_builder.h" |
| #include "olap/rowset/segment_v2/ann_index/ann_topn_runtime.h" |
| #include "olap/storage_engine.h" |
| #include "olap/tablet_manager.h" |
| #include "pipeline/exec/scan_operator.h" |
| #include "pipeline/query_cache/query_cache.h" |
| #include "runtime/runtime_state.h" |
| #include "runtime_filter/runtime_filter_consumer_helper.h" |
| #include "service/backend_options.h" |
| #include "util/runtime_profile.h" |
| #include "util/to_string.h" |
| #include "vec/exec/scan/olap_scanner.h" |
| #include "vec/exprs/score_runtime.h" |
| #include "vec/exprs/vectorized_fn_call.h" |
| #include "vec/exprs/vexpr.h" |
| #include "vec/exprs/vexpr_context.h" |
| #include "vec/exprs/vslot_ref.h" |
| #include "vec/functions/in.h" |
| |
| namespace doris::pipeline { |
| #include "common/compile_check_begin.h" |
| |
| Status OlapScanLocalState::init(RuntimeState* state, LocalStateInfo& info) { |
| const TOlapScanNode& olap_scan_node = _parent->cast<OlapScanOperatorX>()._olap_scan_node; |
| |
| if (olap_scan_node.__isset.score_sort_info && olap_scan_node.__isset.score_sort_limit) { |
| const doris::TExpr& ordering_expr = olap_scan_node.score_sort_info.ordering_exprs.front(); |
| const bool asc = olap_scan_node.score_sort_info.is_asc_order[0]; |
| const size_t limit = olap_scan_node.score_sort_limit; |
| std::shared_ptr<vectorized::VExprContext> ordering_expr_ctx; |
| RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(ordering_expr, ordering_expr_ctx)); |
| _score_runtime = vectorized::ScoreRuntime::create_shared(ordering_expr_ctx, asc, limit); |
| } |
| |
| if (olap_scan_node.__isset.ann_sort_info || olap_scan_node.__isset.ann_sort_limit) { |
| DCHECK(olap_scan_node.__isset.ann_sort_info); |
| DCHECK(olap_scan_node.__isset.ann_sort_limit); |
| DCHECK(olap_scan_node.ann_sort_info.ordering_exprs.size() == 1); |
| const doris::TExpr& ordering_expr = olap_scan_node.ann_sort_info.ordering_exprs.front(); |
| DCHECK(ordering_expr.nodes[0].__isset.slot_ref); |
| DCHECK(ordering_expr.nodes[0].slot_ref.is_virtual_slot); |
| DCHECK(olap_scan_node.ann_sort_info.is_asc_order.size() == 1); |
| const bool asc = olap_scan_node.ann_sort_info.is_asc_order[0]; |
| const size_t limit = olap_scan_node.ann_sort_limit; |
| std::shared_ptr<vectorized::VExprContext> ordering_expr_ctx; |
| RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(ordering_expr, ordering_expr_ctx)); |
| _ann_topn_runtime = |
| segment_v2::AnnTopNRuntime::create_shared(asc, limit, ordering_expr_ctx); |
| } |
| |
| RETURN_IF_ERROR(Base::init(state, info)); |
| RETURN_IF_ERROR(_sync_cloud_tablets(state)); |
| return Status::OK(); |
| } |
| |
| Status OlapScanLocalState::_init_profile() { |
| RETURN_IF_ERROR(ScanLocalState<OlapScanLocalState>::_init_profile()); |
| // Rows read from storage. |
| // Include the rows read from doris page cache. |
| _scan_rows = ADD_COUNTER(custom_profile(), "ScanRows", TUnit::UNIT); |
| |
| // 1. init segment profile |
| _segment_profile.reset(new RuntimeProfile("SegmentIterator")); |
| _scanner_profile->add_child(_segment_profile.get(), true, nullptr); |
| |
| // 2. init timer and counters |
| _reader_init_timer = ADD_TIMER(_scanner_profile, "ReaderInitTime"); |
| _scanner_init_timer = ADD_TIMER(_scanner_profile, "ScannerInitTime"); |
| _process_conjunct_timer = ADD_TIMER(custom_profile(), "ProcessConjunctTime"); |
| _read_compressed_counter = ADD_COUNTER(_segment_profile, "CompressedBytesRead", TUnit::BYTES); |
| _read_uncompressed_counter = |
| ADD_COUNTER(_segment_profile, "UncompressedBytesRead", TUnit::BYTES); |
| _block_load_timer = ADD_TIMER(_segment_profile, "BlockLoadTime"); |
| _block_load_counter = ADD_COUNTER(_segment_profile, "BlocksLoad", TUnit::UNIT); |
| _block_fetch_timer = ADD_TIMER(_scanner_profile, "BlockFetchTime"); |
| _delete_bitmap_get_agg_timer = ADD_TIMER(_scanner_profile, "DeleteBitmapGetAggTime"); |
| if (config::is_cloud_mode()) { |
| static const char* sync_rowset_timer_name = "SyncRowsetTime"; |
| _sync_rowset_timer = ADD_TIMER(_scanner_profile, sync_rowset_timer_name); |
| _sync_rowset_tablet_meta_cache_hit = |
| ADD_CHILD_COUNTER(_scanner_profile, "SyncRowsetTabletMetaCacheHitCount", |
| TUnit::UNIT, sync_rowset_timer_name); |
| _sync_rowset_tablet_meta_cache_miss = |
| ADD_CHILD_COUNTER(_scanner_profile, "SyncRowsetTabletMetaCacheMissCount", |
| TUnit::UNIT, sync_rowset_timer_name); |
| _sync_rowset_get_remote_tablet_meta_rpc_timer = ADD_CHILD_TIMER( |
| _scanner_profile, "SyncRowsetGetRemoteTabletMetaRpcTime", sync_rowset_timer_name); |
| _sync_rowset_tablets_rowsets_total_num = |
| ADD_CHILD_COUNTER(_scanner_profile, "SyncRowsetTabletsRowsetsTotatCount", |
| TUnit::UNIT, sync_rowset_timer_name); |
| _sync_rowset_get_remote_rowsets_num = |
| ADD_CHILD_COUNTER(_scanner_profile, "SyncRowsetGetRemoteRowsetsCount", TUnit::UNIT, |
| sync_rowset_timer_name); |
| _sync_rowset_get_remote_rowsets_rpc_timer = ADD_CHILD_TIMER( |
| _scanner_profile, "SyncRowsetGetRemoteRowsetsRpcTime", sync_rowset_timer_name); |
| _sync_rowset_get_local_delete_bitmap_rowsets_num = |
| ADD_CHILD_COUNTER(_scanner_profile, "SyncRowsetGetLocalDeleteBitmapRowsetsCount", |
| TUnit::UNIT, sync_rowset_timer_name); |
| _sync_rowset_get_remote_delete_bitmap_rowsets_num = |
| ADD_CHILD_COUNTER(_scanner_profile, "SyncRowsetGetRemoteDeleteBitmapRowsetsCount", |
| TUnit::UNIT, sync_rowset_timer_name); |
| _sync_rowset_get_remote_delete_bitmap_key_count = |
| ADD_CHILD_COUNTER(_scanner_profile, "SyncRowsetGetRemoteDeleteBitmapKeyCount", |
| TUnit::UNIT, sync_rowset_timer_name); |
| _sync_rowset_get_remote_delete_bitmap_bytes = |
| ADD_CHILD_COUNTER(_scanner_profile, "SyncRowsetGetRemoteDeleteBitmapBytes", |
| TUnit::BYTES, sync_rowset_timer_name); |
| _sync_rowset_get_remote_delete_bitmap_rpc_timer = ADD_CHILD_TIMER( |
| _scanner_profile, "SyncRowsetGetRemoteDeleteBitmapRpcTime", sync_rowset_timer_name); |
| } |
| _block_init_timer = ADD_TIMER(_segment_profile, "BlockInitTime"); |
| _block_init_seek_timer = ADD_TIMER(_segment_profile, "BlockInitSeekTime"); |
| _block_init_seek_counter = ADD_COUNTER(_segment_profile, "BlockInitSeekCount", TUnit::UNIT); |
| _segment_generate_row_range_by_keys_timer = |
| ADD_TIMER(_segment_profile, "GenerateRowRangeByKeysTime"); |
| _segment_generate_row_range_by_column_conditions_timer = |
| ADD_TIMER(_segment_profile, "GenerateRowRangeByColumnConditionsTime"); |
| _segment_generate_row_range_by_bf_timer = |
| ADD_TIMER(_segment_profile, "GenerateRowRangeByBloomFilterIndexTime"); |
| _collect_iterator_merge_next_timer = ADD_TIMER(_segment_profile, "CollectIteratorMergeTime"); |
| _segment_generate_row_range_by_zonemap_timer = |
| ADD_TIMER(_segment_profile, "GenerateRowRangeByZoneMapIndexTime"); |
| _segment_generate_row_range_by_dict_timer = |
| ADD_TIMER(_segment_profile, "GenerateRowRangeByDictTime"); |
| |
| _rows_vec_cond_filtered_counter = |
| ADD_COUNTER(_segment_profile, "RowsVectorPredFiltered", TUnit::UNIT); |
| _rows_short_circuit_cond_filtered_counter = |
| ADD_COUNTER(_segment_profile, "RowsShortCircuitPredFiltered", TUnit::UNIT); |
| _rows_expr_cond_filtered_counter = |
| ADD_COUNTER(_segment_profile, "RowsExprPredFiltered", TUnit::UNIT); |
| _rows_vec_cond_input_counter = |
| ADD_COUNTER(_segment_profile, "RowsVectorPredInput", TUnit::UNIT); |
| _rows_short_circuit_cond_input_counter = |
| ADD_COUNTER(_segment_profile, "RowsShortCircuitPredInput", TUnit::UNIT); |
| _rows_expr_cond_input_counter = ADD_COUNTER(_segment_profile, "RowsExprPredInput", TUnit::UNIT); |
| _vec_cond_timer = ADD_TIMER(_segment_profile, "VectorPredEvalTime"); |
| _short_cond_timer = ADD_TIMER(_segment_profile, "ShortPredEvalTime"); |
| _expr_filter_timer = ADD_TIMER(_segment_profile, "ExprFilterEvalTime"); |
| _predicate_column_read_timer = ADD_TIMER(_segment_profile, "PredicateColumnReadTime"); |
| _non_predicate_column_read_timer = ADD_TIMER(_segment_profile, "NonPredicateColumnReadTime"); |
| _predicate_column_read_seek_timer = ADD_TIMER(_segment_profile, "PredicateColumnReadSeekTime"); |
| _predicate_column_read_seek_counter = |
| ADD_COUNTER(_segment_profile, "PredicateColumnReadSeekCount", TUnit::UNIT); |
| |
| _lazy_read_timer = ADD_TIMER(_segment_profile, "LazyReadTime"); |
| _lazy_read_seek_timer = ADD_TIMER(_segment_profile, "LazyReadSeekTime"); |
| _lazy_read_seek_counter = ADD_COUNTER(_segment_profile, "LazyReadSeekCount", TUnit::UNIT); |
| |
| _output_col_timer = ADD_TIMER(_segment_profile, "OutputColumnTime"); |
| |
| _stats_filtered_counter = ADD_COUNTER(_segment_profile, "RowsStatsFiltered", TUnit::UNIT); |
| _stats_rp_filtered_counter = |
| ADD_COUNTER(_segment_profile, "RowsZoneMapRuntimePredicateFiltered", TUnit::UNIT); |
| _bf_filtered_counter = ADD_COUNTER(_segment_profile, "RowsBloomFilterFiltered", TUnit::UNIT); |
| _dict_filtered_counter = ADD_COUNTER(_segment_profile, "SegmentDictFiltered", TUnit::UNIT); |
| _del_filtered_counter = ADD_COUNTER(_scanner_profile, "RowsDelFiltered", TUnit::UNIT); |
| _conditions_filtered_counter = |
| ADD_COUNTER(_segment_profile, "RowsConditionsFiltered", TUnit::UNIT); |
| _key_range_filtered_counter = |
| ADD_COUNTER(_segment_profile, "RowsKeyRangeFiltered", TUnit::UNIT); |
| |
| _io_timer = ADD_TIMER(_segment_profile, "IOTimer"); |
| _decompressor_timer = ADD_TIMER(_segment_profile, "DecompressorTimer"); |
| |
| _total_pages_num_counter = ADD_COUNTER(_segment_profile, "TotalPagesNum", TUnit::UNIT); |
| _cached_pages_num_counter = ADD_COUNTER(_segment_profile, "CachedPagesNum", TUnit::UNIT); |
| |
| _statistics_collect_timer = ADD_TIMER(_scanner_profile, "StatisticsCollectTime"); |
| _inverted_index_filter_counter = |
| ADD_COUNTER(_segment_profile, "RowsInvertedIndexFiltered", TUnit::UNIT); |
| _inverted_index_filter_timer = ADD_TIMER(_segment_profile, "InvertedIndexFilterTime"); |
| _inverted_index_query_cache_hit_counter = |
| ADD_COUNTER(_segment_profile, "InvertedIndexQueryCacheHit", TUnit::UNIT); |
| _inverted_index_query_cache_miss_counter = |
| ADD_COUNTER(_segment_profile, "InvertedIndexQueryCacheMiss", TUnit::UNIT); |
| _inverted_index_query_timer = ADD_TIMER(_segment_profile, "InvertedIndexQueryTime"); |
| _inverted_index_query_null_bitmap_timer = |
| ADD_TIMER(_segment_profile, "InvertedIndexQueryNullBitmapTime"); |
| _inverted_index_query_bitmap_copy_timer = |
| ADD_TIMER(_segment_profile, "InvertedIndexQueryBitmapCopyTime"); |
| _inverted_index_searcher_open_timer = |
| ADD_TIMER(_segment_profile, "InvertedIndexSearcherOpenTime"); |
| _inverted_index_searcher_search_timer = |
| ADD_TIMER(_segment_profile, "InvertedIndexSearcherSearchTime"); |
| _inverted_index_searcher_search_init_timer = |
| ADD_TIMER(_segment_profile, "InvertedIndexSearcherSearchInitTime"); |
| _inverted_index_searcher_search_exec_timer = |
| ADD_TIMER(_segment_profile, "InvertedIndexSearcherSearchExecTime"); |
| _inverted_index_searcher_cache_hit_counter = |
| ADD_COUNTER(_segment_profile, "InvertedIndexSearcherCacheHit", TUnit::UNIT); |
| _inverted_index_searcher_cache_miss_counter = |
| ADD_COUNTER(_segment_profile, "InvertedIndexSearcherCacheMiss", TUnit::UNIT); |
| _inverted_index_downgrade_count_counter = |
| ADD_COUNTER(_segment_profile, "InvertedIndexDowngradeCount", TUnit::UNIT); |
| _inverted_index_analyzer_timer = ADD_TIMER(_segment_profile, "InvertedIndexAnalyzerTime"); |
| _inverted_index_lookup_timer = ADD_TIMER(_segment_profile, "InvertedIndexLookupTimer"); |
| |
| _output_index_result_column_timer = ADD_TIMER(_segment_profile, "OutputIndexResultColumnTime"); |
| _filtered_segment_counter = ADD_COUNTER(_segment_profile, "NumSegmentFiltered", TUnit::UNIT); |
| _total_segment_counter = ADD_COUNTER(_segment_profile, "NumSegmentTotal", TUnit::UNIT); |
| _tablet_counter = ADD_COUNTER(custom_profile(), "TabletNum", TUnit::UNIT); |
| _key_range_counter = ADD_COUNTER(custom_profile(), "KeyRangesNum", TUnit::UNIT); |
| _tablet_reader_init_timer = ADD_TIMER(_scanner_profile, "TabletReaderInitTimer"); |
| _tablet_reader_capture_rs_readers_timer = |
| ADD_TIMER(_scanner_profile, "TabletReaderCaptureRsReadersTimer"); |
| _tablet_reader_init_return_columns_timer = |
| ADD_TIMER(_scanner_profile, "TabletReaderInitReturnColumnsTimer"); |
| _tablet_reader_init_keys_param_timer = |
| ADD_TIMER(_scanner_profile, "TabletReaderInitKeysParamTimer"); |
| _tablet_reader_init_orderby_keys_param_timer = |
| ADD_TIMER(_scanner_profile, "TabletReaderInitOrderbyKeysParamTimer"); |
| _tablet_reader_init_conditions_param_timer = |
| ADD_TIMER(_scanner_profile, "TabletReaderInitConditionsParamTimer"); |
| _tablet_reader_init_delete_condition_param_timer = |
| ADD_TIMER(_scanner_profile, "TabletReaderInitDeleteConditionParamTimer"); |
| _block_reader_vcollect_iter_init_timer = |
| ADD_TIMER(_scanner_profile, "BlockReaderVcollectIterInitTimer"); |
| _block_reader_rs_readers_init_timer = |
| ADD_TIMER(_scanner_profile, "BlockReaderRsReadersInitTimer"); |
| _block_reader_build_heap_init_timer = |
| ADD_TIMER(_scanner_profile, "BlockReaderBuildHeapInitTimer"); |
| |
| _rowset_reader_get_segment_iterators_timer = |
| ADD_TIMER(_scanner_profile, "RowsetReaderGetSegmentIteratorsTimer"); |
| _rowset_reader_create_iterators_timer = |
| ADD_TIMER(_scanner_profile, "RowsetReaderCreateIteratorsTimer"); |
| _rowset_reader_init_iterators_timer = |
| ADD_TIMER(_scanner_profile, "RowsetReaderInitIteratorsTimer"); |
| _rowset_reader_load_segments_timer = |
| ADD_TIMER(_scanner_profile, "RowsetReaderLoadSegmentsTimer"); |
| |
| _segment_iterator_init_timer = ADD_TIMER(_scanner_profile, "SegmentIteratorInitTimer"); |
| _segment_iterator_init_return_column_iterators_timer = |
| ADD_TIMER(_scanner_profile, "SegmentIteratorInitReturnColumnIteratorsTimer"); |
| _segment_iterator_init_index_iterators_timer = |
| ADD_TIMER(_scanner_profile, "SegmentIteratorInitIndexIteratorsTimer"); |
| |
| _segment_create_column_readers_timer = |
| ADD_TIMER(_scanner_profile, "SegmentCreateColumnReadersTimer"); |
| _segment_load_index_timer = ADD_TIMER(_scanner_profile, "SegmentLoadIndexTimer"); |
| |
| _index_filter_profile = std::make_unique<RuntimeProfile>("IndexFilter"); |
| _scanner_profile->add_child(_index_filter_profile.get(), true, nullptr); |
| /* |
| SegmentIterator: |
| - AnnIndexLoadCosts: 102.262us |
| - AnnIndexRangeSearchCosts: 0ns |
| - AnnIndexRangeSearchFiltered: 0 |
| - AnnIndexTopNCosts: 658.303ms |
| - AnnIndexTopNFiltered: 9.49791M (9497910) |
| - AnnIndexTopNSearchCnt: 209ns |
| */ |
| _ann_range_search_filter_counter = |
| ADD_COUNTER(_segment_profile, "AnnIndexRangeSearchFiltered", TUnit::UNIT); |
| _ann_topn_filter_counter = ADD_COUNTER(_segment_profile, "AnnIndexTopNFiltered", TUnit::UNIT); |
| |
| _ann_topn_search_costs = ADD_TIMER(_segment_profile, "AnnIndexTopNSearchCosts"); |
| _ann_topn_search_cnt = ADD_COUNTER(_segment_profile, "AnnIndexTopNSearchCnt", TUnit::UNIT); |
| _ann_range_search_costs = ADD_TIMER(_segment_profile, "AnnIndexRangeSearchCosts"); |
| _ann_range_search_cnt = ADD_COUNTER(_segment_profile, "AnnIndexRangeSearchCnt", TUnit::UNIT); |
| |
| // Detailed ANN timers (TopN) |
| // Create child timers under AnnIndexTopNSearchCosts for better readability |
| _ann_topn_engine_search_costs = ADD_CHILD_TIMER( |
| _segment_profile, "AnnIndexTopNEngineSearchCosts", "AnnIndexTopNSearchCosts"); |
| _ann_index_load_costs = ADD_TIMER(_segment_profile, "AnnIndexLoadCosts"); |
| _ann_topn_post_process_costs = ADD_CHILD_TIMER( |
| _segment_profile, "AnnIndexTopNResultPostProcessCosts", "AnnIndexTopNSearchCosts"); |
| _ann_topn_pre_process_costs = ADD_CHILD_TIMER( |
| _segment_profile, "AnnIndexTopNEnginePrepareCosts", "AnnIndexTopNSearchCosts"); |
| // Detailed ANN timers (Range) |
| // Create child timers under AnnIndexRangeSearchCosts to mirror TopN hierarchy |
| _ann_range_engine_search_costs = ADD_CHILD_TIMER( |
| _segment_profile, "AnnIndexRangeEngineSearchCosts", "AnnIndexRangeSearchCosts"); |
| _ann_range_post_process_costs = ADD_CHILD_TIMER( |
| _segment_profile, "AnnIndexRangeResultPostProcessCosts", "AnnIndexRangeSearchCosts"); |
| _ann_range_pre_process_costs = ADD_CHILD_TIMER( |
| _segment_profile, "AnnIndexRangeEnginePrepareCosts", "AnnIndexRangeSearchCosts"); |
| // Conversion inside FAISS wrappers (TopN): two separate sub counters under post process |
| _ann_topn_engine_convert_costs = |
| ADD_CHILD_TIMER(_segment_profile, "AnnIndexTopNEngineConvertCosts", |
| "AnnIndexTopNResultPostProcessCosts"); |
| _ann_range_engine_convert_costs = |
| ADD_CHILD_TIMER(_segment_profile, "AnnIndexRangeEngineConvertCosts", |
| "AnnIndexRangeResultPostProcessCosts"); |
| // Keep this as a child of post process to show the sum for Doris-side handling |
| _ann_topn_result_convert_costs = |
| ADD_CHILD_TIMER(_segment_profile, "AnnIndexTopNResultConvertCosts", |
| "AnnIndexTopNResultPostProcessCosts"); |
| _ann_range_result_convert_costs = |
| ADD_CHILD_TIMER(_segment_profile, "AnnIndexRangeResultConvertCosts", |
| "AnnIndexRangeResultPostProcessCosts"); |
| _variant_scan_sparse_column_timer = ADD_TIMER(_segment_profile, "VariantScanSparseColumnTimer"); |
| _variant_scan_sparse_column_bytes = |
| ADD_COUNTER(_segment_profile, "VariantScanSparseColumnBytes", TUnit::BYTES); |
| _variant_fill_path_from_sparse_column_timer = |
| ADD_TIMER(_segment_profile, "VariantFillPathFromSparseColumnTimer"); |
| _variant_subtree_default_iter_count = |
| ADD_COUNTER(_segment_profile, "VariantSubtreeDefaultIterCount", TUnit::UNIT); |
| _variant_subtree_leaf_iter_count = |
| ADD_COUNTER(_segment_profile, "VariantSubtreeLeafIterCount", TUnit::UNIT); |
| _variant_subtree_hierarchical_iter_count = |
| ADD_COUNTER(_segment_profile, "VariantSubtreeHierarchicalIterCount", TUnit::UNIT); |
| _variant_subtree_sparse_iter_count = |
| ADD_COUNTER(_segment_profile, "VariantSubtreeSparseIterCount", TUnit::UNIT); |
| |
| _condition_cache_hit_segment_counter = |
| ADD_COUNTER(_segment_profile, "ConditionCacheSegmentHit", TUnit::UNIT); |
| _condition_cache_filtered_rows_counter = |
| ADD_COUNTER(_segment_profile, "ConditionCacheFilteredRows", TUnit::UNIT); |
| return Status::OK(); |
| } |
| |
| Status OlapScanLocalState::_process_conjuncts(RuntimeState* state) { |
| SCOPED_TIMER(_process_conjunct_timer); |
| RETURN_IF_ERROR(ScanLocalState::_process_conjuncts(state)); |
| if (ScanLocalState::_eos) { |
| return Status::OK(); |
| } |
| RETURN_IF_ERROR(_build_key_ranges_and_filters()); |
| return Status::OK(); |
| } |
| |
| bool OlapScanLocalState::_is_key_column(const std::string& key_name) { |
| // all column in dup_keys table or unique_keys with merge on write table olap scan node threat |
| // as key column |
| if (_storage_no_merge()) { |
| return true; |
| } |
| |
| auto& p = _parent->cast<OlapScanOperatorX>(); |
| auto res = std::find(p._olap_scan_node.key_column_name.begin(), |
| p._olap_scan_node.key_column_name.end(), key_name); |
| return res != p._olap_scan_node.key_column_name.end(); |
| } |
| |
| Status OlapScanLocalState::_should_push_down_function_filter(vectorized::VectorizedFnCall* fn_call, |
| vectorized::VExprContext* expr_ctx, |
| StringRef* constant_str, |
| doris::FunctionContext** fn_ctx, |
| PushDownType& pdt) { |
| // Now only `like` function filters is supported to push down |
| if (fn_call->fn().name.function_name != "like") { |
| pdt = PushDownType::UNACCEPTABLE; |
| return Status::OK(); |
| } |
| |
| const auto& children = fn_call->children(); |
| doris::FunctionContext* func_cxt = expr_ctx->fn_context(fn_call->fn_context_index()); |
| DCHECK(func_cxt != nullptr); |
| DCHECK(children.size() == 2); |
| for (size_t i = 0; i < children.size(); i++) { |
| if (vectorized::VExpr::expr_without_cast(children[i])->node_type() != |
| TExprNodeType::SLOT_REF) { |
| // not a slot ref(column) |
| continue; |
| } |
| if (!children[1 - i]->is_constant()) { |
| // only handle constant value |
| pdt = PushDownType::UNACCEPTABLE; |
| return Status::OK(); |
| } else { |
| DCHECK(is_string_type(children[1 - i]->data_type()->get_primitive_type())); |
| std::shared_ptr<ColumnPtrWrapper> const_col_wrapper; |
| RETURN_IF_ERROR(children[1 - i]->get_const_col(expr_ctx, &const_col_wrapper)); |
| if (const auto* const_column = check_and_get_column<vectorized::ColumnConst>( |
| const_col_wrapper->column_ptr.get())) { |
| *constant_str = const_column->get_data_at(0); |
| } else { |
| pdt = PushDownType::UNACCEPTABLE; |
| return Status::OK(); |
| } |
| } |
| } |
| *fn_ctx = func_cxt; |
| pdt = PushDownType::ACCEPTABLE; |
| return Status::OK(); |
| } |
| |
| bool OlapScanLocalState::_should_push_down_common_expr() { |
| return state()->enable_common_expr_pushdown() && _storage_no_merge(); |
| } |
| |
| bool OlapScanLocalState::_storage_no_merge() { |
| auto& p = _parent->cast<OlapScanOperatorX>(); |
| return (p._olap_scan_node.keyType == TKeysType::DUP_KEYS || |
| (p._olap_scan_node.keyType == TKeysType::UNIQUE_KEYS && |
| p._olap_scan_node.__isset.enable_unique_key_merge_on_write && |
| p._olap_scan_node.enable_unique_key_merge_on_write)); |
| } |
| |
| Status OlapScanLocalState::_init_scanners(std::list<vectorized::ScannerSPtr>* scanners) { |
| if (_scan_ranges.empty()) { |
| _eos = true; |
| _scan_dependency->set_ready(); |
| return Status::OK(); |
| } |
| SCOPED_TIMER(_scanner_init_timer); |
| |
| if (!_conjuncts.empty() && _state->enable_profile()) { |
| std::string message; |
| for (auto& conjunct : _conjuncts) { |
| if (conjunct->root()) { |
| if (!message.empty()) { |
| message += ", "; |
| } |
| message += conjunct->root()->debug_string(); |
| } |
| } |
| custom_profile()->add_info_string("RemainedDownPredicates", message); |
| } |
| auto& p = _parent->cast<OlapScanOperatorX>(); |
| |
| for (auto uid : p._olap_scan_node.output_column_unique_ids) { |
| _output_column_ids.emplace(uid); |
| } |
| |
| // ranges constructed from scan keys |
| RETURN_IF_ERROR(_scan_keys.get_key_range(&_cond_ranges)); |
| // if we can't get ranges from conditions, we give it a total range |
| if (_cond_ranges.empty()) { |
| _cond_ranges.emplace_back(new doris::OlapScanRange()); |
| } |
| |
| bool enable_parallel_scan = state()->enable_parallel_scan(); |
| |
| // The flag of preagg's meaning is whether return pre agg data(or partial agg data) |
| // PreAgg ON: The storage layer returns partially aggregated data without additional processing. (Fast data reading) |
| // for example, if a table is select userid,count(*) from base table. |
| // And the user send a query like select userid,count(*) from base table group by userid. |
| // then the storage layer do not need do aggregation, it could just return the partial agg data, because the compute layer will do aggregation. |
| // PreAgg OFF: The storage layer must complete pre-aggregation and return fully aggregated data. (Slow data reading) |
| if (enable_parallel_scan && !p._should_run_serial && |
| p._push_down_agg_type == TPushAggOp::NONE && |
| (_storage_no_merge() || p._olap_scan_node.is_preaggregation)) { |
| std::vector<OlapScanRange*> key_ranges; |
| for (auto& range : _cond_ranges) { |
| if (range->begin_scan_range.size() == 1 && |
| range->begin_scan_range.get_value(0) == NEGATIVE_INFINITY) { |
| continue; |
| } |
| key_ranges.emplace_back(range.get()); |
| } |
| |
| ParallelScannerBuilder scanner_builder(this, _tablets, _read_sources, _scanner_profile, |
| key_ranges, state(), p._limit, true, |
| p._olap_scan_node.is_preaggregation); |
| |
| int max_scanners_count = state()->parallel_scan_max_scanners_count(); |
| |
| // If the `max_scanners_count` was not set, |
| // use `CpuInfo::num_cores()` as the default value. |
| if (max_scanners_count <= 0) { |
| max_scanners_count = CpuInfo::num_cores(); |
| } |
| |
| // Too small value of `min_rows_per_scanner` is meaningless. |
| auto min_rows_per_scanner = |
| std::max<int64_t>(1024, state()->parallel_scan_min_rows_per_scanner()); |
| scanner_builder.set_max_scanners_count(max_scanners_count); |
| scanner_builder.set_min_rows_per_scanner(min_rows_per_scanner); |
| // If the session variable is set, force one scanner per segment. |
| if (state()->query_options().__isset.optimize_index_scan_parallelism && |
| state()->query_options().optimize_index_scan_parallelism) { |
| // TODO: Use optimize_index_scan_parallelism for ann range search in the future. |
| // Currently, ann topn is enough |
| if (_ann_topn_runtime != nullptr) { |
| scanner_builder.set_scan_parallelism_by_per_segment(true); |
| } |
| } |
| |
| RETURN_IF_ERROR(scanner_builder.build_scanners(*scanners)); |
| for (auto& scanner : *scanners) { |
| auto* olap_scanner = assert_cast<vectorized::OlapScanner*>(scanner.get()); |
| RETURN_IF_ERROR(olap_scanner->init(state(), _conjuncts)); |
| } |
| |
| const OlapReaderStatistics* stats = scanner_builder.builder_stats(); |
| io::FileCacheProfileReporter cache_profile(_segment_profile.get()); |
| cache_profile.update(&stats->file_cache_stats); |
| |
| DorisMetrics::instance()->query_scan_bytes_from_local->increment( |
| stats->file_cache_stats.bytes_read_from_local); |
| DorisMetrics::instance()->query_scan_bytes_from_remote->increment( |
| stats->file_cache_stats.bytes_read_from_remote); |
| |
| return Status::OK(); |
| } |
| |
| int scanners_per_tablet = std::max(1, 64 / (int)_scan_ranges.size()); |
| for (size_t scan_range_idx = 0; scan_range_idx < _scan_ranges.size(); scan_range_idx++) { |
| int64_t version = 0; |
| std::from_chars(_scan_ranges[scan_range_idx]->version.data(), |
| _scan_ranges[scan_range_idx]->version.data() + |
| _scan_ranges[scan_range_idx]->version.size(), |
| version); |
| std::vector<std::unique_ptr<doris::OlapScanRange>>* ranges = &_cond_ranges; |
| int size_based_scanners_per_tablet = 1; |
| |
| if (config::doris_scan_range_max_mb > 0) { |
| size_based_scanners_per_tablet = |
| std::max(1, (int)(_tablets[scan_range_idx].tablet->tablet_footprint() / |
| (config::doris_scan_range_max_mb << 20))); |
| } |
| int ranges_per_scanner = |
| std::max(1, (int)ranges->size() / |
| std::min(scanners_per_tablet, size_based_scanners_per_tablet)); |
| int64_t num_ranges = ranges->size(); |
| for (int64_t i = 0; i < num_ranges;) { |
| std::vector<doris::OlapScanRange*> scanner_ranges; |
| scanner_ranges.push_back((*ranges)[i].get()); |
| ++i; |
| for (int64_t j = 1; i < num_ranges && j < ranges_per_scanner && |
| (*ranges)[i]->end_include == (*ranges)[i - 1]->end_include; |
| ++j, ++i) { |
| scanner_ranges.push_back((*ranges)[i].get()); |
| } |
| |
| COUNTER_UPDATE(_key_range_counter, scanner_ranges.size()); |
| // `rs_reader` should not be shared by different scanners |
| for (auto& split : _read_sources[scan_range_idx].rs_splits) { |
| split.rs_reader = split.rs_reader->clone(); |
| } |
| auto scanner = vectorized::OlapScanner::create_shared( |
| this, vectorized::OlapScanner::Params { |
| state(), |
| _scanner_profile.get(), |
| scanner_ranges, |
| _tablets[scan_range_idx].tablet, |
| version, |
| _read_sources[scan_range_idx], |
| p._limit, |
| p._olap_scan_node.is_preaggregation, |
| }); |
| RETURN_IF_ERROR(scanner->init(state(), _conjuncts)); |
| scanners->push_back(std::move(scanner)); |
| } |
| } |
| _tablets.clear(); |
| _read_sources.clear(); |
| |
| return Status::OK(); |
| } |
| |
| Status OlapScanLocalState::_sync_cloud_tablets(RuntimeState* state) { |
| if (config::is_cloud_mode() && !_sync_tablet) { |
| _pending_tablets_num = _scan_ranges.size(); |
| if (_pending_tablets_num > 0) { |
| _sync_cloud_tablets_watcher.start(); |
| _cloud_tablet_dependency = Dependency::create_shared( |
| _parent->operator_id(), _parent->node_id(), "CLOUD_TABLET_DEP"); |
| _tablets.resize(_scan_ranges.size()); |
| std::vector<std::function<Status()>> tasks; |
| _sync_statistics.resize(_scan_ranges.size()); |
| for (size_t i = 0; i < _scan_ranges.size(); i++) { |
| auto* sync_stats = &_sync_statistics[i]; |
| int64_t version = 0; |
| std::from_chars(_scan_ranges[i]->version.data(), |
| _scan_ranges[i]->version.data() + _scan_ranges[i]->version.size(), |
| version); |
| auto task_ctx = state->get_task_execution_context(); |
| tasks.emplace_back([this, sync_stats, version, i, task_ctx]() { |
| auto task_lock = task_ctx.lock(); |
| if (task_lock == nullptr) { |
| return Status::OK(); |
| } |
| Defer defer([&] { |
| if (_pending_tablets_num.fetch_sub(1) == 1) { |
| _cloud_tablet_dependency->set_ready(); |
| _sync_cloud_tablets_watcher.stop(); |
| } |
| }); |
| auto tablet = |
| DORIS_TRY(ExecEnv::get_tablet(_scan_ranges[i]->tablet_id, sync_stats)); |
| _tablets[i] = {std::move(tablet), version}; |
| SyncOptions options; |
| options.query_version = version; |
| options.merge_schema = true; |
| RETURN_IF_ERROR(std::dynamic_pointer_cast<CloudTablet>(_tablets[i].tablet) |
| ->sync_rowsets(options, sync_stats)); |
| // FIXME(plat1ko): Avoid pointer cast |
| ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_hotspot().count( |
| *_tablets[i].tablet); |
| return Status::OK(); |
| }); |
| } |
| RETURN_IF_ERROR(cloud::bthread_fork_join(std::move(tasks), |
| config::init_scanner_sync_rowsets_parallelism, |
| &_cloud_tablet_future)); |
| } |
| _sync_tablet = true; |
| } |
| return Status::OK(); |
| } |
| |
| Status OlapScanLocalState::prepare(RuntimeState* state) { |
| if (_prepared) { |
| return Status::OK(); |
| } |
| MonotonicStopWatch timer; |
| timer.start(); |
| _read_sources.resize(_scan_ranges.size()); |
| |
| if (config::is_cloud_mode()) { |
| if (!_cloud_tablet_dependency || |
| _cloud_tablet_dependency->is_blocked_by(nullptr) != nullptr) { |
| // Remote tablet still in-flight. |
| return Status::OK(); |
| } |
| COUNTER_UPDATE(_sync_rowset_timer, _sync_cloud_tablets_watcher.elapsed_time()); |
| RETURN_IF_ERROR(_cloud_tablet_future.get()); |
| auto total_rowsets = std::accumulate( |
| _tablets.cbegin(), _tablets.cend(), 0LL, |
| [](long long acc, const auto& tabletWithVersion) { |
| return acc + tabletWithVersion.tablet->tablet_meta()->all_rs_metas().size(); |
| }); |
| COUNTER_UPDATE(_sync_rowset_tablets_rowsets_total_num, total_rowsets); |
| for (const auto& sync_stats : _sync_statistics) { |
| COUNTER_UPDATE(_sync_rowset_tablet_meta_cache_hit, sync_stats.tablet_meta_cache_hit); |
| COUNTER_UPDATE(_sync_rowset_tablet_meta_cache_miss, sync_stats.tablet_meta_cache_miss); |
| COUNTER_UPDATE(_sync_rowset_get_remote_tablet_meta_rpc_timer, |
| sync_stats.get_remote_tablet_meta_rpc_ns); |
| COUNTER_UPDATE(_sync_rowset_get_remote_rowsets_num, sync_stats.get_remote_rowsets_num); |
| COUNTER_UPDATE(_sync_rowset_get_remote_rowsets_rpc_timer, |
| sync_stats.get_remote_rowsets_rpc_ns); |
| COUNTER_UPDATE(_sync_rowset_get_local_delete_bitmap_rowsets_num, |
| sync_stats.get_local_delete_bitmap_rowsets_num); |
| COUNTER_UPDATE(_sync_rowset_get_remote_delete_bitmap_rowsets_num, |
| sync_stats.get_remote_delete_bitmap_rowsets_num); |
| COUNTER_UPDATE(_sync_rowset_get_remote_delete_bitmap_key_count, |
| sync_stats.get_remote_delete_bitmap_key_count); |
| COUNTER_UPDATE(_sync_rowset_get_remote_delete_bitmap_bytes, |
| sync_stats.get_remote_delete_bitmap_bytes); |
| COUNTER_UPDATE(_sync_rowset_get_remote_delete_bitmap_rpc_timer, |
| sync_stats.get_remote_delete_bitmap_rpc_ns); |
| } |
| auto time_ms = _sync_cloud_tablets_watcher.elapsed_time_microseconds(); |
| if (time_ms >= config::sync_rowsets_slow_threshold_ms) { |
| DorisMetrics::instance()->get_remote_tablet_slow_time_ms->increment(time_ms); |
| DorisMetrics::instance()->get_remote_tablet_slow_cnt->increment(1); |
| LOG_WARNING("get tablet takes too long") |
| .tag("query_id", print_id(PipelineXLocalState<>::_state->query_id())) |
| .tag("node_id", _parent->node_id()) |
| .tag("total_time", |
| PrettyPrinter::print(_sync_cloud_tablets_watcher.elapsed_time(), |
| TUnit::TIME_NS)) |
| .tag("num_tablets", _tablets.size()) |
| .tag("tablet_meta_cache_hit", _sync_rowset_tablet_meta_cache_hit->value()) |
| .tag("tablet_meta_cache_miss", _sync_rowset_tablet_meta_cache_miss->value()) |
| .tag("get_remote_tablet_meta_rpc_time", |
| PrettyPrinter::print( |
| _sync_rowset_get_remote_tablet_meta_rpc_timer->value(), |
| TUnit::TIME_NS)) |
| .tag("remote_rowsets_num", _sync_rowset_get_remote_rowsets_num->value()) |
| .tag("get_remote_rowsets_rpc_time", |
| PrettyPrinter::print(_sync_rowset_get_remote_rowsets_rpc_timer->value(), |
| TUnit::TIME_NS)) |
| .tag("local_delete_bitmap_rowsets_num", |
| _sync_rowset_get_local_delete_bitmap_rowsets_num->value()) |
| .tag("remote_delete_bitmap_rowsets_num", |
| _sync_rowset_get_remote_delete_bitmap_rowsets_num->value()) |
| .tag("remote_delete_bitmap_key_count", |
| _sync_rowset_get_remote_delete_bitmap_key_count->value()) |
| .tag("remote_delete_bitmap_bytes", |
| PrettyPrinter::print(_sync_rowset_get_remote_delete_bitmap_bytes->value(), |
| TUnit::BYTES)) |
| .tag("get_remote_delete_bitmap_rpc_time", |
| PrettyPrinter::print( |
| _sync_rowset_get_remote_delete_bitmap_rpc_timer->value(), |
| TUnit::TIME_NS)); |
| } |
| } else { |
| _tablets.resize(_scan_ranges.size()); |
| for (size_t i = 0; i < _scan_ranges.size(); i++) { |
| int64_t version = 0; |
| std::from_chars(_scan_ranges[i]->version.data(), |
| _scan_ranges[i]->version.data() + _scan_ranges[i]->version.size(), |
| version); |
| auto tablet = DORIS_TRY(ExecEnv::get_tablet(_scan_ranges[i]->tablet_id)); |
| _tablets[i] = {std::move(tablet), version}; |
| } |
| } |
| |
| for (size_t i = 0; i < _scan_ranges.size(); i++) { |
| _read_sources[i] = DORIS_TRY(_tablets[i].tablet->capture_read_source( |
| {0, _tablets[i].version}, |
| {.skip_missing_versions = _state->skip_missing_version(), |
| .enable_fetch_rowsets_from_peers = config::enable_fetch_rowsets_from_peer_replicas, |
| .enable_prefer_cached_rowset = |
| config::is_cloud_mode() ? _state->enable_prefer_cached_rowset() : false, |
| .query_freshness_tolerance_ms = |
| config::is_cloud_mode() ? _state->query_freshness_tolerance_ms() : -1})); |
| if (!PipelineXLocalState<>::_state->skip_delete_predicate()) { |
| _read_sources[i].fill_delete_predicates(); |
| } |
| if (config::enable_mow_verbose_log && |
| _tablets[i].tablet->enable_unique_key_merge_on_write()) { |
| LOG_INFO("finish capture_rs_readers for tablet={}, query_id={}", |
| _tablets[i].tablet->tablet_id(), |
| print_id(PipelineXLocalState<>::_state->query_id())); |
| } |
| } |
| timer.stop(); |
| double cost_secs = static_cast<double>(timer.elapsed_time()) / NANOS_PER_SEC; |
| if (cost_secs > 1) { |
| LOG_WARNING( |
| "Try to hold tablets costs {} seconds, it costs too much. (Query-ID={}, NodeId={}, " |
| "ScanRangeNum={})", |
| cost_secs, print_id(PipelineXLocalState<>::_state->query_id()), _parent->node_id(), |
| _scan_ranges.size()); |
| } |
| _prepared = true; |
| return Status::OK(); |
| } |
| |
| Status OlapScanLocalState::open(RuntimeState* state) { |
| auto& p = _parent->cast<OlapScanOperatorX>(); |
| for (const auto& pair : p._slot_id_to_slot_desc) { |
| const SlotDescriptor* slot_desc = pair.second; |
| std::shared_ptr<doris::TExpr> virtual_col_expr = slot_desc->get_virtual_column_expr(); |
| if (virtual_col_expr) { |
| std::shared_ptr<doris::vectorized::VExprContext> virtual_column_expr_ctx; |
| RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(*virtual_col_expr, |
| virtual_column_expr_ctx)); |
| RETURN_IF_ERROR(virtual_column_expr_ctx->prepare(state, p.intermediate_row_desc())); |
| RETURN_IF_ERROR(virtual_column_expr_ctx->open(state)); |
| |
| _slot_id_to_virtual_column_expr[slot_desc->id()] = virtual_column_expr_ctx; |
| _slot_id_to_col_type[slot_desc->id()] = slot_desc->get_data_type_ptr(); |
| int col_pos = p.intermediate_row_desc().get_column_id(slot_desc->id()); |
| if (col_pos < 0) { |
| return Status::InternalError( |
| "Invalid virtual slot, can not find its information. Slot desc:\n{}\nRow " |
| "desc:\n{}", |
| slot_desc->debug_string(), p.row_desc().debug_string()); |
| } else { |
| _slot_id_to_index_in_block[slot_desc->id()] = col_pos; |
| } |
| } |
| } |
| |
| if (_score_runtime) { |
| RETURN_IF_ERROR(_score_runtime->prepare(state, p.intermediate_row_desc())); |
| } |
| |
| if (_ann_topn_runtime) { |
| RETURN_IF_ERROR(_ann_topn_runtime->prepare(state, p.intermediate_row_desc())); |
| } |
| |
| RETURN_IF_ERROR(ScanLocalState<OlapScanLocalState>::open(state)); |
| |
| return Status::OK(); |
| } |
| |
| TOlapScanNode& OlapScanLocalState::olap_scan_node() const { |
| return _parent->cast<OlapScanOperatorX>()._olap_scan_node; |
| } |
| |
| void OlapScanLocalState::set_scan_ranges(RuntimeState* state, |
| const std::vector<TScanRangeParams>& scan_ranges) { |
| const auto& cache_param = _parent->cast<OlapScanOperatorX>()._cache_param; |
| bool hit_cache = false; |
| if (!cache_param.digest.empty() && !cache_param.force_refresh_query_cache) { |
| std::string cache_key; |
| int64_t version = 0; |
| auto status = QueryCache::build_cache_key(scan_ranges, cache_param, &cache_key, &version); |
| if (!status.ok()) { |
| throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, status.msg()); |
| } |
| doris::QueryCacheHandle handle; |
| hit_cache = QueryCache::instance()->lookup(cache_key, version, &handle); |
| } |
| |
| if (!hit_cache) { |
| for (auto& scan_range : scan_ranges) { |
| DCHECK(scan_range.scan_range.__isset.palo_scan_range); |
| _scan_ranges.emplace_back(new TPaloScanRange(scan_range.scan_range.palo_scan_range)); |
| COUNTER_UPDATE(_tablet_counter, 1); |
| } |
| } |
| } |
| |
| static std::string predicates_to_string( |
| const phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>>& |
| slot_id_to_predicates) { |
| fmt::memory_buffer debug_string_buffer; |
| for (const auto& [slot_id, predicates] : slot_id_to_predicates) { |
| if (predicates.empty()) { |
| continue; |
| } |
| fmt::format_to(debug_string_buffer, "Slot ID: {}: [", slot_id); |
| for (const auto& predicate : predicates) { |
| fmt::format_to(debug_string_buffer, "{{{}}}, ", predicate->debug_string()); |
| } |
| fmt::format_to(debug_string_buffer, "] "); |
| } |
| return fmt::to_string(debug_string_buffer); |
| } |
| |
| static std::string tablets_id_to_string( |
| const std::vector<std::unique_ptr<TPaloScanRange>>& scan_ranges) { |
| if (scan_ranges.empty()) { |
| return "[empty]"; |
| } |
| std::stringstream ss; |
| ss << "[" << scan_ranges[0]->tablet_id; |
| for (int i = 1; i < scan_ranges.size(); ++i) { |
| ss << ", " << scan_ranges[i]->tablet_id; |
| } |
| ss << "]"; |
| return ss.str(); |
| } |
| |
| inline std::string push_down_agg_to_string(const TPushAggOp::type& op) { |
| if (op == TPushAggOp::MINMAX) { |
| return "MINMAX"; |
| } else if (op == TPushAggOp::COUNT) { |
| return "COUNT"; |
| } else if (op == TPushAggOp::MIX) { |
| return "MIX"; |
| } else { |
| return "NONE"; |
| } |
| } |
| |
| Status OlapScanLocalState::_build_key_ranges_and_filters() { |
| auto& p = _parent->cast<OlapScanOperatorX>(); |
| if (p._push_down_agg_type == TPushAggOp::NONE || |
| p._push_down_agg_type == TPushAggOp::COUNT_ON_INDEX) { |
| const std::vector<std::string>& column_names = p._olap_scan_node.key_column_name; |
| const std::vector<TPrimitiveType::type>& column_types = p._olap_scan_node.key_column_type; |
| DCHECK(column_types.size() == column_names.size()); |
| |
| // 1. construct scan key except last olap engine short key |
| _scan_keys.set_is_convertible(p.limit() == -1); |
| |
| // we use `exact_range` to identify a key range is an exact range or not when we convert |
| // it to `_scan_keys`. If `exact_range` is true, we can just discard it from `_olap_filters`. |
| bool exact_range = true; |
| |
| // If the `_scan_keys` cannot extend by the range of column, should stop. |
| bool should_break = false; |
| |
| bool eos = false; |
| for (int column_index = 0; column_index < column_names.size() && |
| !_scan_keys.has_range_value() && !eos && !should_break; |
| ++column_index) { |
| if (p._colname_to_slot_id.find(column_names[column_index]) == |
| p._colname_to_slot_id.end()) { |
| break; |
| } |
| auto iter = |
| _slot_id_to_value_range.find(p._colname_to_slot_id[column_names[column_index]]); |
| if (_slot_id_to_value_range.end() == iter) { |
| break; |
| } |
| DCHECK(_slot_id_to_predicates.count(iter->first) > 0); |
| const auto& value_range = iter->second; |
| |
| RETURN_IF_ERROR(std::visit( |
| [&](auto&& range) { |
| // make a copy or range and pass to extend_scan_key, keep the range unchanged |
| // because extend_scan_key method may change the first parameter. |
| // but the original range may be converted to olap filters, if it's not a exact_range. |
| auto temp_range = range; |
| if (range.get_fixed_value_size() <= p._max_pushdown_conditions_per_column) { |
| RETURN_IF_ERROR( |
| _scan_keys.extend_scan_key(temp_range, p._max_scan_key_num, |
| &exact_range, &eos, &should_break)); |
| if (exact_range) { |
| auto key = iter->first; |
| _slot_id_to_value_range.erase(key); |
| |
| std::vector<std::shared_ptr<ColumnPredicate>> new_predicates; |
| for (const auto& it : _slot_id_to_predicates[key]) { |
| if (it->type() == PredicateType::NOT_IN_LIST || |
| it->type() == PredicateType::NE) { |
| new_predicates.push_back(it); |
| } |
| } |
| if (new_predicates.empty()) { |
| _slot_id_to_predicates.erase(key); |
| } else { |
| _slot_id_to_predicates[key] = new_predicates; |
| } |
| } |
| } else { |
| // if exceed max_pushdown_conditions_per_column, use whole_value_rang instead |
| // and will not erase from _slot_id_to_value_range, it must be not exact_range |
| temp_range.set_whole_value_range(); |
| RETURN_IF_ERROR( |
| _scan_keys.extend_scan_key(temp_range, p._max_scan_key_num, |
| &exact_range, &eos, &should_break)); |
| } |
| return Status::OK(); |
| }, |
| value_range)); |
| } |
| if (eos) { |
| _eos = true; |
| _scan_dependency->set_ready(); |
| } |
| } else { |
| custom_profile()->add_info_string("PushDownAggregate", |
| push_down_agg_to_string(p._push_down_agg_type)); |
| } |
| |
| if (state()->enable_profile()) { |
| custom_profile()->add_info_string("PushDownPredicates", |
| predicates_to_string(_slot_id_to_predicates)); |
| custom_profile()->add_info_string("KeyRanges", _scan_keys.debug_string()); |
| custom_profile()->add_info_string("TabletIds", tablets_id_to_string(_scan_ranges)); |
| } |
| VLOG_CRITICAL << _scan_keys.debug_string(); |
| |
| return Status::OK(); |
| } |
| |
| OlapScanOperatorX::OlapScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, |
| const DescriptorTbl& descs, int parallel_tasks, |
| const TQueryCacheParam& param) |
| : ScanOperatorX<OlapScanLocalState>(pool, tnode, operator_id, descs, parallel_tasks), |
| _olap_scan_node(tnode.olap_scan_node), |
| _cache_param(param) { |
| _output_tuple_id = tnode.olap_scan_node.tuple_id; |
| if (_olap_scan_node.__isset.sort_info && _olap_scan_node.__isset.sort_limit) { |
| _limit_per_scanner = _olap_scan_node.sort_limit; |
| } |
| DBUG_EXECUTE_IF("segment_iterator.topn_opt_1", { |
| LOG(INFO) << "limit_per_scanner: " << _limit_per_scanner |
| << ", sort_limit: " << _olap_scan_node.sort_limit |
| << ", isset.sort_limit: " << _olap_scan_node.__isset.sort_limit; |
| }) |
| |
| if (_olap_scan_node.__isset.columns_desc && !_olap_scan_node.columns_desc.empty() && |
| _olap_scan_node.columns_desc[0].col_unique_id >= 0) { |
| _tablet_schema = std::make_shared<TabletSchema>(); |
| _tablet_schema->clear_columns(); |
| for (const auto& column_desc : _olap_scan_node.columns_desc) { |
| _tablet_schema->append_column(TabletColumn(column_desc)); |
| } |
| if (_olap_scan_node.__isset.schema_version) { |
| _tablet_schema->set_schema_version(_olap_scan_node.schema_version); |
| } |
| if (_olap_scan_node.__isset.indexes_desc) { |
| _tablet_schema->update_indexes_from_thrift(_olap_scan_node.indexes_desc); |
| } |
| } |
| } |
| |
| #include "common/compile_check_end.h" |
| } // namespace doris::pipeline |