blob: 6e7946a082b249b83ca95e25014976c8e74b6dca [file]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/operator/olap_scan_operator.h"
#include <fmt/format.h>
#include <memory>
#include <numeric>
#include <optional>
#include <shared_mutex>
#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet.h"
#include "cloud/cloud_tablet_hotspot.h"
#include "cloud/config.h"
#include "exec/operator/scan_operator.h"
#include "exec/runtime_filter/runtime_filter_consumer_helper.h"
#include "exec/scan/olap_scanner.h"
#include "exec/scan/parallel_scanner_builder.h"
#include "exprs/function/in.h"
#include "exprs/hybrid_set.h"
#include "exprs/score_runtime.h"
#include "exprs/vectorized_fn_call.h"
#include "exprs/vexpr.h"
#include "exprs/vexpr_context.h"
#include "exprs/virtual_slot_ref.h"
#include "exprs/vslot_ref.h"
#include "io/cache/block_file_cache_profile.h"
#include "runtime/query_cache/query_cache.h"
#include "runtime/runtime_profile.h"
#include "runtime/runtime_state.h"
#include "service/backend_options.h"
#include "storage/index/ann/ann_topn_runtime.h"
#include "storage/storage_engine.h"
#include "storage/tablet/tablet.h"
#include "storage/tablet/tablet_manager.h"
#include "util/to_string.h"
namespace doris {
Status OlapScanLocalState::init(RuntimeState* state, LocalStateInfo& info) {
const TOlapScanNode& olap_scan_node = _parent->cast<OlapScanOperatorX>()._olap_scan_node;
if (olap_scan_node.__isset.score_sort_info && olap_scan_node.__isset.score_sort_limit) {
const doris::TExpr& ordering_expr = olap_scan_node.score_sort_info.ordering_exprs.front();
const bool asc = olap_scan_node.score_sort_info.is_asc_order[0];
const size_t limit = olap_scan_node.score_sort_limit;
std::shared_ptr<VExprContext> ordering_expr_ctx;
RETURN_IF_ERROR(VExpr::create_expr_tree(ordering_expr, ordering_expr_ctx));
_score_runtime = ScoreRuntime::create_shared(ordering_expr_ctx, asc, limit);
}
if (olap_scan_node.__isset.ann_sort_info || olap_scan_node.__isset.ann_sort_limit) {
DCHECK(olap_scan_node.__isset.ann_sort_info);
DCHECK(olap_scan_node.__isset.ann_sort_limit);
DCHECK(olap_scan_node.ann_sort_info.ordering_exprs.size() == 1);
const doris::TExpr& ordering_expr = olap_scan_node.ann_sort_info.ordering_exprs.front();
DCHECK(ordering_expr.nodes[0].__isset.slot_ref);
DCHECK(ordering_expr.nodes[0].slot_ref.is_virtual_slot);
DCHECK(olap_scan_node.ann_sort_info.is_asc_order.size() == 1);
const bool asc = olap_scan_node.ann_sort_info.is_asc_order[0];
const size_t limit = olap_scan_node.ann_sort_limit;
std::shared_ptr<VExprContext> ordering_expr_ctx;
RETURN_IF_ERROR(VExpr::create_expr_tree(ordering_expr, ordering_expr_ctx));
_ann_topn_runtime =
segment_v2::AnnTopNRuntime::create_shared(asc, limit, ordering_expr_ctx);
}
// Parse score range filtering parameters and set to ScoreRuntime
if (olap_scan_node.__isset.score_range_info) {
const auto& score_range_info = olap_scan_node.score_range_info;
if (score_range_info.__isset.op && score_range_info.__isset.threshold) {
if (_score_runtime) {
_score_runtime->set_score_range_info(score_range_info.op,
score_range_info.threshold);
}
}
}
RETURN_IF_ERROR(Base::init(state, info));
RETURN_IF_ERROR(_sync_cloud_tablets(state));
_attach_partition_boundaries();
return Status::OK();
}
PushDownType OlapScanLocalState::_should_push_down_binary_predicate(
VectorizedFnCall* fn_call, VExprContext* expr_ctx, Field& constant_val,
const std::set<std::string> fn_name) const {
if (!fn_name.contains(fn_call->fn().name.function_name)) {
return PushDownType::UNACCEPTABLE;
}
const auto& children = fn_call->children();
DCHECK(children.size() == 2);
DCHECK_EQ(VExpr::expr_without_cast(children[0])->node_type(), TExprNodeType::SLOT_REF);
if (children[1]->is_constant()) {
std::shared_ptr<ColumnPtrWrapper> const_col_wrapper;
THROW_IF_ERROR(children[1]->get_const_col(expr_ctx, &const_col_wrapper));
const auto* const_column =
assert_cast<const ColumnConst*>(const_col_wrapper->column_ptr.get());
constant_val = const_column->operator[](0);
return PushDownType::ACCEPTABLE;
} else {
// only handle constant value
return PushDownType::UNACCEPTABLE;
}
}
Status OlapScanLocalState::_init_profile() {
RETURN_IF_ERROR(ScanLocalState<OlapScanLocalState>::_init_profile());
// Rows read from storage.
// Include the rows read from doris page cache.
_scan_rows = ADD_COUNTER(custom_profile(), "ScanRows", TUnit::UNIT);
_tablets_pruned_by_rf_counter =
ADD_COUNTER(custom_profile(), "TabletsPrunedByRuntimeFilter", TUnit::UNIT);
// 1. init segment profile
_segment_profile.reset(new RuntimeProfile("SegmentIterator"));
_scanner_profile->add_child(_segment_profile.get(), true, nullptr);
// 2. init timer and counters
_reader_init_timer = ADD_TIMER(_scanner_profile, "ReaderInitTime");
_scanner_init_timer = ADD_TIMER(_scanner_profile, "ScannerInitTime");
_process_conjunct_timer = ADD_TIMER(custom_profile(), "ProcessConjunctTime");
_read_compressed_counter = ADD_COUNTER(_segment_profile, "CompressedBytesRead", TUnit::BYTES);
_read_uncompressed_counter =
ADD_COUNTER(_segment_profile, "UncompressedBytesRead", TUnit::BYTES);
_block_load_timer = ADD_TIMER(_segment_profile, "BlockLoadTime");
_block_load_counter = ADD_COUNTER(_segment_profile, "BlocksLoad", TUnit::UNIT);
_block_fetch_timer = ADD_CHILD_TIMER(_scanner_profile, "BlockFetchTime", "ScannerGetBlockTime");
if (config::is_cloud_mode()) {
static const char* sync_rowset_timer_name = "SyncRowsetTime";
_sync_rowset_timer = ADD_TIMER(_scanner_profile, sync_rowset_timer_name);
_sync_rowset_tablet_meta_cache_hit =
ADD_CHILD_COUNTER(_scanner_profile, "SyncRowsetTabletMetaCacheHitCount",
TUnit::UNIT, sync_rowset_timer_name);
_sync_rowset_tablet_meta_cache_miss =
ADD_CHILD_COUNTER(_scanner_profile, "SyncRowsetTabletMetaCacheMissCount",
TUnit::UNIT, sync_rowset_timer_name);
_sync_rowset_get_remote_tablet_meta_rpc_timer = ADD_CHILD_TIMER(
_scanner_profile, "SyncRowsetGetRemoteTabletMetaRpcTime", sync_rowset_timer_name);
_sync_rowset_tablets_rowsets_total_num =
ADD_CHILD_COUNTER(_scanner_profile, "SyncRowsetTabletsRowsetsTotatCount",
TUnit::UNIT, sync_rowset_timer_name);
_sync_rowset_get_remote_rowsets_num =
ADD_CHILD_COUNTER(_scanner_profile, "SyncRowsetGetRemoteRowsetsCount", TUnit::UNIT,
sync_rowset_timer_name);
_sync_rowset_get_remote_rowsets_rpc_timer = ADD_CHILD_TIMER(
_scanner_profile, "SyncRowsetGetRemoteRowsetsRpcTime", sync_rowset_timer_name);
_sync_rowset_get_local_delete_bitmap_rowsets_num =
ADD_CHILD_COUNTER(_scanner_profile, "SyncRowsetGetLocalDeleteBitmapRowsetsCount",
TUnit::UNIT, sync_rowset_timer_name);
_sync_rowset_get_remote_delete_bitmap_rowsets_num =
ADD_CHILD_COUNTER(_scanner_profile, "SyncRowsetGetRemoteDeleteBitmapRowsetsCount",
TUnit::UNIT, sync_rowset_timer_name);
_sync_rowset_get_remote_delete_bitmap_key_count =
ADD_CHILD_COUNTER(_scanner_profile, "SyncRowsetGetRemoteDeleteBitmapKeyCount",
TUnit::UNIT, sync_rowset_timer_name);
_sync_rowset_get_remote_delete_bitmap_bytes =
ADD_CHILD_COUNTER(_scanner_profile, "SyncRowsetGetRemoteDeleteBitmapBytes",
TUnit::BYTES, sync_rowset_timer_name);
_sync_rowset_get_remote_delete_bitmap_rpc_timer = ADD_CHILD_TIMER(
_scanner_profile, "SyncRowsetGetRemoteDeleteBitmapRpcTime", sync_rowset_timer_name);
_sync_rowset_bthread_schedule_wait_timer = ADD_CHILD_TIMER(
_scanner_profile, "SyncRowsetBthreadScheduleWaitTime", sync_rowset_timer_name);
_sync_rowset_meta_lock_wait_timer = ADD_CHILD_TIMER(
_scanner_profile, "SyncRowsetMetaLockWaitTime", sync_rowset_timer_name);
_sync_rowset_sync_meta_lock_wait_timer = ADD_CHILD_TIMER(
_scanner_profile, "SyncRowsetSyncMetaLockWaitTime", sync_rowset_timer_name);
}
_block_init_timer = ADD_TIMER(_segment_profile, "BlockInitTime");
_block_init_seek_timer = ADD_TIMER(_segment_profile, "BlockInitSeekTime");
_block_init_seek_counter = ADD_COUNTER(_segment_profile, "BlockInitSeekCount", TUnit::UNIT);
_segment_generate_row_range_by_keys_timer =
ADD_TIMER(_segment_profile, "GenerateRowRangeByKeysTime");
_segment_generate_row_range_by_column_conditions_timer =
ADD_TIMER(_segment_profile, "GenerateRowRangeByColumnConditionsTime");
_segment_generate_row_range_by_bf_timer =
ADD_TIMER(_segment_profile, "GenerateRowRangeByBloomFilterIndexTime");
_collect_iterator_merge_next_timer = ADD_TIMER(_segment_profile, "CollectIteratorMergeTime");
_segment_generate_row_range_by_zonemap_timer =
ADD_TIMER(_segment_profile, "GenerateRowRangeByZoneMapIndexTime");
_segment_generate_row_range_by_dict_timer =
ADD_TIMER(_segment_profile, "GenerateRowRangeByDictTime");
_rows_vec_cond_filtered_counter =
ADD_COUNTER(_segment_profile, "RowsVectorPredFiltered", TUnit::UNIT);
_rows_short_circuit_cond_filtered_counter =
ADD_COUNTER(_segment_profile, "RowsShortCircuitPredFiltered", TUnit::UNIT);
_rows_expr_cond_filtered_counter =
ADD_COUNTER(_segment_profile, "RowsExprPredFiltered", TUnit::UNIT);
_rows_vec_cond_input_counter =
ADD_COUNTER(_segment_profile, "RowsVectorPredInput", TUnit::UNIT);
_rows_short_circuit_cond_input_counter =
ADD_COUNTER(_segment_profile, "RowsShortCircuitPredInput", TUnit::UNIT);
_rows_expr_cond_input_counter = ADD_COUNTER(_segment_profile, "RowsExprPredInput", TUnit::UNIT);
_vec_cond_timer = ADD_TIMER(_segment_profile, "VectorPredEvalTime");
_short_cond_timer = ADD_TIMER(_segment_profile, "ShortPredEvalTime");
_expr_filter_timer = ADD_TIMER(_segment_profile, "ExprFilterEvalTime");
_predicate_column_read_timer = ADD_TIMER(_segment_profile, "PredicateColumnReadTime");
_non_predicate_column_read_timer = ADD_TIMER(_segment_profile, "NonPredicateColumnReadTime");
_predicate_column_read_seek_timer = ADD_TIMER(_segment_profile, "PredicateColumnReadSeekTime");
_predicate_column_read_seek_counter =
ADD_COUNTER(_segment_profile, "PredicateColumnReadSeekCount", TUnit::UNIT);
_lazy_read_timer = ADD_TIMER(_segment_profile, "LazyReadTime");
_lazy_read_seek_timer = ADD_TIMER(_segment_profile, "LazyReadSeekTime");
_lazy_read_seek_counter = ADD_COUNTER(_segment_profile, "LazyReadSeekCount", TUnit::UNIT);
_output_col_timer = ADD_TIMER(_segment_profile, "OutputColumnTime");
_stats_filtered_counter = ADD_COUNTER(_segment_profile, "RowsStatsFiltered", TUnit::UNIT);
_stats_rp_filtered_counter =
ADD_COUNTER(_segment_profile, "RowsZoneMapRuntimePredicateFiltered", TUnit::UNIT);
_bf_filtered_counter = ADD_COUNTER(_segment_profile, "RowsBloomFilterFiltered", TUnit::UNIT);
_dict_filtered_counter = ADD_COUNTER(_segment_profile, "SegmentDictFiltered", TUnit::UNIT);
_del_filtered_counter = ADD_COUNTER(_scanner_profile, "RowsDelFiltered", TUnit::UNIT);
_conditions_filtered_counter =
ADD_COUNTER(_segment_profile, "RowsConditionsFiltered", TUnit::UNIT);
_key_range_filtered_counter =
ADD_COUNTER(_segment_profile, "RowsKeyRangeFiltered", TUnit::UNIT);
_io_timer = ADD_TIMER(_segment_profile, "IOTimer");
_decompressor_timer = ADD_TIMER(_segment_profile, "DecompressorTimer");
_total_pages_num_counter = ADD_COUNTER(_segment_profile, "TotalPagesNum", TUnit::UNIT);
_cached_pages_num_counter = ADD_COUNTER(_segment_profile, "CachedPagesNum", TUnit::UNIT);
_statistics_collect_timer = ADD_TIMER(_scanner_profile, "StatisticsCollectTime");
_inverted_index_filter_counter =
ADD_COUNTER_WITH_LEVEL(_segment_profile, "RowsInvertedIndexFiltered", TUnit::UNIT, 1);
_inverted_index_filter_timer =
ADD_TIMER_WITH_LEVEL(_segment_profile, "InvertedIndexFilterTime", 1);
_inverted_index_query_cache_hit_counter =
ADD_COUNTER_WITH_LEVEL(_segment_profile, "InvertedIndexQueryCacheHit", TUnit::UNIT, 1);
_inverted_index_query_cache_miss_counter =
ADD_COUNTER_WITH_LEVEL(_segment_profile, "InvertedIndexQueryCacheMiss", TUnit::UNIT, 1);
_inverted_index_query_timer =
ADD_TIMER_WITH_LEVEL(_segment_profile, "InvertedIndexQueryTime", 1);
_inverted_index_query_null_bitmap_timer =
ADD_TIMER_WITH_LEVEL(_segment_profile, "InvertedIndexQueryNullBitmapTime", 1);
_inverted_index_query_bitmap_copy_timer =
ADD_TIMER_WITH_LEVEL(_segment_profile, "InvertedIndexQueryBitmapCopyTime", 1);
_inverted_index_searcher_open_timer =
ADD_TIMER_WITH_LEVEL(_segment_profile, "InvertedIndexSearcherOpenTime", 1);
_inverted_index_searcher_search_timer =
ADD_TIMER_WITH_LEVEL(_segment_profile, "InvertedIndexSearcherSearchTime", 1);
_inverted_index_searcher_search_init_timer =
ADD_TIMER_WITH_LEVEL(_segment_profile, "InvertedIndexSearcherSearchInitTime", 1);
_inverted_index_searcher_search_exec_timer =
ADD_TIMER_WITH_LEVEL(_segment_profile, "InvertedIndexSearcherSearchExecTime", 1);
_inverted_index_searcher_cache_hit_counter = ADD_COUNTER_WITH_LEVEL(
_segment_profile, "InvertedIndexSearcherCacheHit", TUnit::UNIT, 1);
_inverted_index_searcher_cache_miss_counter = ADD_COUNTER_WITH_LEVEL(
_segment_profile, "InvertedIndexSearcherCacheMiss", TUnit::UNIT, 1);
_inverted_index_downgrade_count_counter =
ADD_COUNTER_WITH_LEVEL(_segment_profile, "InvertedIndexDowngradeCount", TUnit::UNIT, 1);
_inverted_index_analyzer_timer =
ADD_TIMER_WITH_LEVEL(_segment_profile, "InvertedIndexAnalyzerTime", 1);
_inverted_index_lookup_timer =
ADD_TIMER_WITH_LEVEL(_segment_profile, "InvertedIndexLookupTimer", 1);
_output_index_result_column_timer = ADD_TIMER(_segment_profile, "OutputIndexResultColumnTime");
_filtered_segment_counter = ADD_COUNTER(_segment_profile, "NumSegmentFiltered", TUnit::UNIT);
_total_segment_counter = ADD_COUNTER(_segment_profile, "NumSegmentTotal", TUnit::UNIT);
_tablet_counter = ADD_COUNTER(custom_profile(), "TabletNum", TUnit::UNIT);
_key_range_counter = ADD_COUNTER(custom_profile(), "KeyRangesNum", TUnit::UNIT);
_tablet_reader_init_timer =
ADD_CHILD_TIMER(_scanner_profile, "TabletReaderInitTimer", "ReaderInitTime");
_tablet_reader_capture_rs_readers_timer = ADD_CHILD_TIMER(
_scanner_profile, "TabletReaderCaptureRsReadersTimer", "TabletReaderInitTimer");
_tablet_reader_init_return_columns_timer = ADD_CHILD_TIMER(
_scanner_profile, "TabletReaderInitReturnColumnsTimer", "TabletReaderInitTimer");
_tablet_reader_init_keys_param_timer = ADD_CHILD_TIMER(
_scanner_profile, "TabletReaderInitKeysParamTimer", "TabletReaderInitTimer");
_tablet_reader_init_orderby_keys_param_timer = ADD_CHILD_TIMER(
_scanner_profile, "TabletReaderInitOrderbyKeysParamTimer", "TabletReaderInitTimer");
_tablet_reader_init_conditions_param_timer = ADD_CHILD_TIMER(
_scanner_profile, "TabletReaderInitConditionsParamTimer", "TabletReaderInitTimer");
_tablet_reader_init_delete_condition_param_timer = ADD_CHILD_TIMER(
_scanner_profile, "TabletReaderInitDeleteConditionParamTimer", "TabletReaderInitTimer");
_block_reader_vcollect_iter_init_timer = ADD_CHILD_TIMER(
_scanner_profile, "BlockReaderVcollectIterInitTimer", "TabletReaderInitTimer");
_block_reader_rs_readers_init_timer = ADD_CHILD_TIMER(
_scanner_profile, "BlockReaderRsReadersInitTimer", "TabletReaderInitTimer");
_block_reader_build_heap_init_timer = ADD_CHILD_TIMER(
_scanner_profile, "BlockReaderBuildHeapInitTimer", "TabletReaderInitTimer");
_rowset_reader_get_segment_iterators_timer = ADD_CHILD_TIMER(
_scanner_profile, "RowsetReaderGetSegmentIteratorsTimer", "ScannerGetBlockTime");
_delete_bitmap_get_agg_timer = ADD_CHILD_TIMER(_scanner_profile, "DeleteBitmapGetAggTime",
"RowsetReaderGetSegmentIteratorsTimer");
_rowset_reader_create_iterators_timer =
ADD_CHILD_TIMER(_scanner_profile, "RowsetReaderCreateIteratorsTimer",
"RowsetReaderGetSegmentIteratorsTimer");
_rowset_reader_init_iterators_timer = ADD_CHILD_TIMER(
_scanner_profile, "RowsetReaderInitIteratorsTimer", "ScannerGetBlockTime");
_rowset_reader_load_segments_timer = ADD_CHILD_TIMER(
_scanner_profile, "RowsetReaderLoadSegmentsTimer", "ScannerGetBlockTime");
_segment_iterator_init_timer =
ADD_CHILD_TIMER(_scanner_profile, "SegmentIteratorInitTimer", "BlockFetchTime");
_segment_iterator_init_return_column_iterators_timer =
ADD_CHILD_TIMER(_scanner_profile, "SegmentIteratorInitReturnColumnIteratorsTimer",
"SegmentIteratorInitTimer");
_segment_iterator_init_index_iterators_timer = ADD_CHILD_TIMER(
_scanner_profile, "SegmentIteratorInitIndexIteratorsTimer", "SegmentIteratorInitTimer");
_segment_iterator_init_segment_prefetchers_timer =
ADD_CHILD_TIMER(_scanner_profile, "SegmentIteratorInitSegmentPrefetchersTimer",
"SegmentIteratorInitTimer");
// These two timers span both iterator init and later lazy segment init paths,
// so their nearest stable ancestor is ScannerGetBlockTime instead of any
// narrower SegmentIterator/BlockFetch subphase.
_segment_create_column_readers_timer = ADD_CHILD_TIMER(
_scanner_profile, "SegmentCreateColumnReadersTimer", "ScannerGetBlockTime");
_segment_load_index_timer =
ADD_CHILD_TIMER(_scanner_profile, "SegmentLoadIndexTimer", "ScannerGetBlockTime");
_index_filter_profile = std::make_unique<RuntimeProfile>("IndexFilter");
_scanner_profile->add_child(_index_filter_profile.get(), true, nullptr);
/*
SegmentIterator:
- AnnIndexLoadCosts: 102.262us
- AnnIndexRangeSearchCosts: 0ns
- AnnIndexRangeSearchFiltered: 0
- AnnIndexTopNCosts: 658.303ms
- AnnIndexTopNFiltered: 9.49791M (9497910)
- AnnIndexTopNSearchCnt: 209ns
*/
_ann_range_search_filter_counter =
ADD_COUNTER(_segment_profile, "AnnIndexRangeSearchFiltered", TUnit::UNIT);
_ann_topn_filter_counter = ADD_COUNTER(_segment_profile, "AnnIndexTopNFiltered", TUnit::UNIT);
_ann_topn_search_costs = ADD_TIMER(_segment_profile, "AnnIndexTopNSearchCosts");
_ann_topn_search_cnt = ADD_COUNTER(_segment_profile, "AnnIndexTopNSearchCnt", TUnit::UNIT);
_ann_cache_hit_cnt = ADD_COUNTER(_segment_profile, "AnnIndexCacheHitCnt", TUnit::UNIT);
_ann_range_cache_hit_cnt =
ADD_COUNTER(_segment_profile, "AnnIndexRangeCacheHitCnt", TUnit::UNIT);
_ann_range_search_costs = ADD_TIMER(_segment_profile, "AnnIndexRangeSearchCosts");
_ann_range_search_cnt = ADD_COUNTER(_segment_profile, "AnnIndexRangeSearchCnt", TUnit::UNIT);
// Detailed ANN timers (TopN)
// Create child timers under AnnIndexTopNSearchCosts for better readability
_ann_topn_engine_search_costs = ADD_CHILD_TIMER(
_segment_profile, "AnnIndexTopNEngineSearchCosts", "AnnIndexTopNSearchCosts");
_ann_index_load_costs = ADD_TIMER(_segment_profile, "AnnIndexLoadCosts");
_ann_ivf_on_disk_load_costs = ADD_TIMER(_segment_profile, "AnnIvfOnDiskLoadCosts");
_ann_ivf_on_disk_cache_hit_cnt =
ADD_COUNTER(_segment_profile, "AnnIvfOnDiskCacheHitCnt", TUnit::UNIT);
_ann_ivf_on_disk_cache_miss_cnt =
ADD_COUNTER(_segment_profile, "AnnIvfOnDiskCacheMissCnt", TUnit::UNIT);
_ann_topn_post_process_costs = ADD_CHILD_TIMER(
_segment_profile, "AnnIndexTopNResultPostProcessCosts", "AnnIndexTopNSearchCosts");
_ann_topn_pre_process_costs = ADD_CHILD_TIMER(
_segment_profile, "AnnIndexTopNEnginePrepareCosts", "AnnIndexTopNSearchCosts");
// Detailed ANN timers (Range)
// Create child timers under AnnIndexRangeSearchCosts to mirror TopN hierarchy
_ann_range_engine_search_costs = ADD_CHILD_TIMER(
_segment_profile, "AnnIndexRangeEngineSearchCosts", "AnnIndexRangeSearchCosts");
_ann_range_post_process_costs = ADD_CHILD_TIMER(
_segment_profile, "AnnIndexRangeResultPostProcessCosts", "AnnIndexRangeSearchCosts");
_ann_range_pre_process_costs = ADD_CHILD_TIMER(
_segment_profile, "AnnIndexRangeEnginePrepareCosts", "AnnIndexRangeSearchCosts");
// Conversion inside FAISS wrappers (TopN): two separate sub counters under post process
_ann_topn_engine_convert_costs =
ADD_CHILD_TIMER(_segment_profile, "AnnIndexTopNEngineConvertCosts",
"AnnIndexTopNResultPostProcessCosts");
_ann_range_engine_convert_costs =
ADD_CHILD_TIMER(_segment_profile, "AnnIndexRangeEngineConvertCosts",
"AnnIndexRangeResultPostProcessCosts");
// Keep this as a child of post process to show the sum for Doris-side handling
_ann_topn_result_convert_costs =
ADD_CHILD_TIMER(_segment_profile, "AnnIndexTopNResultConvertCosts",
"AnnIndexTopNResultPostProcessCosts");
_ann_range_result_convert_costs =
ADD_CHILD_TIMER(_segment_profile, "AnnIndexRangeResultConvertCosts",
"AnnIndexRangeResultPostProcessCosts");
_ann_fallback_brute_force_cnt =
ADD_COUNTER(_segment_profile, "AnnIndexFallbackBruteForceCnt", TUnit::UNIT);
_variant_scan_sparse_column_timer = ADD_TIMER(_segment_profile, "VariantScanSparseColumnTimer");
_variant_scan_sparse_column_bytes =
ADD_COUNTER(_segment_profile, "VariantScanSparseColumnBytes", TUnit::BYTES);
_variant_fill_path_from_sparse_column_timer =
ADD_TIMER(_segment_profile, "VariantFillPathFromSparseColumnTimer");
_variant_subtree_default_iter_count =
ADD_COUNTER(_segment_profile, "VariantSubtreeDefaultIterCount", TUnit::UNIT);
_variant_subtree_leaf_iter_count =
ADD_COUNTER(_segment_profile, "VariantSubtreeLeafIterCount", TUnit::UNIT);
_variant_subtree_hierarchical_iter_count =
ADD_COUNTER(_segment_profile, "VariantSubtreeHierarchicalIterCount", TUnit::UNIT);
_variant_subtree_sparse_iter_count =
ADD_COUNTER(_segment_profile, "VariantSubtreeSparseIterCount", TUnit::UNIT);
_variant_doc_value_column_iter_count =
ADD_COUNTER(_segment_profile, "VariantDocValueColumnIterCount", TUnit::UNIT);
_adaptive_batch_predict_min_rows_counter =
ADD_COUNTER(_segment_profile, "AdaptiveBatchPredictMinRows", TUnit::UNIT);
_adaptive_batch_predict_max_rows_counter =
ADD_COUNTER(_segment_profile, "AdaptiveBatchPredictMaxRows", TUnit::UNIT);
return Status::OK();
}
static bool contains_expr_node_type(const VExprSPtr& expr, TExprNodeType::type node_type) {
DORIS_CHECK(expr != nullptr);
if (expr->node_type() == node_type) {
return true;
}
if (expr->is_rf_wrapper() && contains_expr_node_type(expr->get_impl(), node_type)) {
return true;
}
return std::ranges::any_of(expr->children(), [node_type](const auto& child) {
return contains_expr_node_type(child, node_type);
});
}
static Status validate_residual_scan_conjuncts(RuntimeState* state,
TPushAggOp::type push_down_agg_type,
const VExprContextSPtrs& conjuncts) {
for (const auto& conjunct : conjuncts) {
const auto& root = conjunct->root();
if (contains_expr_node_type(root, TExprNodeType::SEARCH_EXPR)) {
return Status::InvalidArgument(
"SEARCH expression remains as a residual scan predicate. A valid search() "
"must bind at least one indexed field and be evaluated in SegmentIterator. "
"enable_segment_limit_pushdown only controls SegmentIterator LIMIT pushdown "
"and cannot make residual SEARCH executable.");
}
if (!state->query_options().enable_match_without_inverted_index &&
contains_expr_node_type(root, TExprNodeType::MATCH_PRED)) {
return Status::InvalidArgument(
"MATCH expression remains as a residual scan predicate and would fall back to "
"a disabled slow path because enable_match_without_inverted_index is false. "
"enable_segment_limit_pushdown only controls SegmentIterator LIMIT pushdown "
"and cannot make residual MATCH executable. Set "
"enable_match_without_inverted_index=true to allow slow MATCH execution.");
}
}
if (push_down_agg_type == TPushAggOp::COUNT_ON_INDEX && !conjuncts.empty()) {
return Status::InvalidArgument(
"COUNT_ON_INDEX pushdown cannot be used with residual scan predicates. "
"Residual predicates must be evaluated before COUNT_ON_INDEX counts rows; "
"otherwise the query may return incorrect results. "
"enable_segment_limit_pushdown only controls SegmentIterator LIMIT pushdown and "
"does not make COUNT_ON_INDEX safe with residual predicates. Set "
"enable_count_on_index_pushdown=false to disable COUNT_ON_INDEX pushdown.");
}
return Status::OK();
}
Status OlapScanLocalState::_process_conjuncts(RuntimeState* state) {
SCOPED_TIMER(_process_conjunct_timer);
RETURN_IF_ERROR(ScanLocalState::_process_conjuncts(state));
if (ScanLocalState::_eos) {
return Status::OK();
}
auto& p = _parent->cast<OlapScanOperatorX>();
RETURN_IF_ERROR(validate_residual_scan_conjuncts(state, p._push_down_agg_type, _conjuncts));
RETURN_IF_ERROR(_build_key_ranges_and_filters());
return Status::OK();
}
bool OlapScanLocalState::_is_key_column(const std::string& key_name) {
// all column in dup_keys table or unique_keys with merge on write table olap scan node threat
// as key column
if (_storage_no_merge()) {
return true;
}
auto& p = _parent->cast<OlapScanOperatorX>();
auto res = std::find(p._olap_scan_node.key_column_name.begin(),
p._olap_scan_node.key_column_name.end(), key_name);
return res != p._olap_scan_node.key_column_name.end();
}
Status OlapScanLocalState::_should_push_down_function_filter(VectorizedFnCall* fn_call,
VExprContext* expr_ctx,
StringRef* constant_str,
doris::FunctionContext** fn_ctx,
PushDownType& pdt) {
// Now only `like` function filters is supported to push down
if (fn_call->fn().name.function_name != "like") {
pdt = PushDownType::UNACCEPTABLE;
return Status::OK();
}
const auto& children = fn_call->children();
doris::FunctionContext* func_cxt = expr_ctx->fn_context(fn_call->fn_context_index());
DCHECK(func_cxt != nullptr);
DCHECK(children.size() == 2);
for (size_t i = 0; i < children.size(); i++) {
if (VExpr::expr_without_cast(children[i])->node_type() != TExprNodeType::SLOT_REF) {
// not a slot ref(column)
continue;
}
if (!children[1 - i]->is_constant()) {
// only handle constant value
pdt = PushDownType::UNACCEPTABLE;
return Status::OK();
} else {
DCHECK(is_string_type(children[1 - i]->data_type()->get_primitive_type()));
std::shared_ptr<ColumnPtrWrapper> const_col_wrapper;
RETURN_IF_ERROR(children[1 - i]->get_const_col(expr_ctx, &const_col_wrapper));
if (const auto* const_column =
check_and_get_column<ColumnConst>(const_col_wrapper->column_ptr.get())) {
*constant_str = const_column->get_data_at(0);
} else {
pdt = PushDownType::UNACCEPTABLE;
return Status::OK();
}
}
}
*fn_ctx = func_cxt;
pdt = PushDownType::ACCEPTABLE;
return Status::OK();
}
bool OlapScanLocalState::_should_push_down_common_expr(const VExprSPtr& expr) {
// SegmentIterator common exprs must eventually act on at least one scan slot.
if (!_check_expr_storage_filter(expr, ExprStorageFilterCheckMode::HAS_SEGMENT_EVALUABLE_EXPR)) {
return false;
}
// DUP and UNIQUE-MOW/MOR-as-DUP do not need storage aggregation/merge, so any slot-based common
// expression can be evaluated together with SegmentIterator lazy materialization.
if (_storage_no_merge()) {
return true;
}
// AGG and UNIQUE-MOR may still merge value columns above SegmentIterator. Push only key-column
// expressions so filtering does not observe pre-merge values.
return !_check_expr_storage_filter(expr, ExprStorageFilterCheckMode::HAS_NON_KEY_SLOT);
}
bool OlapScanLocalState::_check_expr_storage_filter(const VExprSPtr& expr,
ExprStorageFilterCheckMode mode) {
if (expr->is_slot_ref()) {
const auto* slot_ref = assert_cast<const VSlotRef*>(expr.get());
return mode == ExprStorageFilterCheckMode::HAS_SEGMENT_EVALUABLE_EXPR ||
!_is_key_column(slot_ref->expr_name());
}
if (expr->is_virtual_slot_ref()) {
// Treat virtual slot ref as non-key because it may depend on non-key source columns.
return true;
}
return std::ranges::any_of(expr->children(), [this, mode](const auto& child) {
return _check_expr_storage_filter(child, mode);
});
}
bool OlapScanLocalState::_storage_no_merge() {
auto& p = _parent->cast<OlapScanOperatorX>();
return (p._olap_scan_node.keyType == TKeysType::DUP_KEYS ||
(p._olap_scan_node.keyType == TKeysType::UNIQUE_KEYS &&
p._olap_scan_node.__isset.enable_unique_key_merge_on_write &&
p._olap_scan_node.enable_unique_key_merge_on_write)) ||
_read_mor_as_dup();
}
bool OlapScanLocalState::_read_mor_as_dup() {
auto& p = _parent->cast<OlapScanOperatorX>();
return p._olap_scan_node.__isset.read_mor_as_dup && p._olap_scan_node.read_mor_as_dup;
}
Status OlapScanLocalState::_init_scanners(std::list<ScannerSPtr>* scanners) {
if (_scan_ranges.empty()) {
_eos = true;
_scan_dependency->set_ready();
return Status::OK();
}
SCOPED_TIMER(_scanner_init_timer);
auto& p = _parent->cast<OlapScanOperatorX>();
for (auto uid : p._olap_scan_node.output_column_unique_ids) {
_output_column_ids.emplace(uid);
}
// Step 3: convert accumulated scan key pairs into OlapScanRange objects.
// Each OlapScanRange carries real begin/end OlapTuples with has_lower_bound = true.
RETURN_IF_ERROR(_scan_keys.get_key_range(&_cond_ranges));
// If no key predicates were pushed down, _cond_ranges is empty.
// Create a single default-constructed OlapScanRange (has_lower_bound = false)
// to represent a full table scan. Consumers detect this and skip pushing
// key range to the tablet reader.
if (_cond_ranges.empty()) {
_cond_ranges.emplace_back(new doris::OlapScanRange());
}
// Filter out tablets whose partitions have been pruned by runtime filters.
//
// TODO(rf-partition-prune): this happens after OlapScanLocalState::init()
// has already executed _sync_cloud_tablets() (in cloud mode that performs
// get_tablet() and waits for sync_rowsets() on every scan range) and after
// capture_read_source() has been called for every tablet. RFs that are
// ready at start therefore still pay the full per-tablet metadata / read
// source setup cost for partitions that are immediately dropped here, so
// the current feature only saves scanner construction and scan IO, not
// the expensive setup work that partition pruning was originally intended
// to avoid. To fix this we need to (a) add partition_id onto
// TPaloScanRange so BE knows the partition without first materializing
// the tablet, (b) acquire ready-at-start RFs before _sync_cloud_tablets()
// and run partition pruning there to filter _scan_ranges by partition_id
// so the heavy per-tablet work is skipped for pruned partitions.
if (_rf_partition_pruner.pruned_partition_count() > 0) {
DCHECK_EQ(_tablets.size(), _scan_ranges.size());
DCHECK_EQ(_tablets.size(), _read_sources.size());
size_t write_idx = 0;
for (size_t read_idx = 0; read_idx < _tablets.size(); ++read_idx) {
int64_t pid = _tablets[read_idx].tablet->partition_id();
if (!_rf_partition_pruner.is_partition_pruned(pid)) {
if (write_idx != read_idx) {
_tablets[write_idx] = std::move(_tablets[read_idx]);
_scan_ranges[write_idx] = std::move(_scan_ranges[read_idx]);
_read_sources[write_idx] = std::move(_read_sources[read_idx]);
}
++write_idx;
}
}
if (write_idx < _tablets.size()) {
COUNTER_SET(_tablets_pruned_by_rf_counter,
static_cast<int64_t>(_tablets.size() - write_idx));
_tablets.resize(write_idx);
_scan_ranges.resize(write_idx);
_read_sources.resize(write_idx);
}
if (_tablets.empty()) {
_eos = true;
_scan_dependency->set_ready();
return Status::OK();
}
}
bool enable_parallel_scan = state()->enable_parallel_scan();
bool read_row_binlog =
p._olap_scan_node.__isset.read_row_binlog && p._olap_scan_node.read_row_binlog;
// The flag of preagg's meaning is whether return pre agg data(or partial agg data)
// PreAgg ON: The storage layer returns partially aggregated data without additional processing. (Fast data reading)
// for example, if a table is select userid,count(*) from base table.
// And the user send a query like select userid,count(*) from base table group by userid.
// then the storage layer do not need do aggregation, it could just return the partial agg data, because the compute layer will do aggregation.
// PreAgg OFF: The storage layer must complete pre-aggregation and return fully aggregated data. (Slow data reading)
if (enable_parallel_scan && !p._should_run_serial &&
p._push_down_agg_type == TPushAggOp::NONE &&
(_storage_no_merge() || p._olap_scan_node.is_preaggregation)
// binlog<row> need to be read in order
&& !read_row_binlog) {
// Filter out the "full scan" placeholder range (has_lower_bound == false)
// so that only ranges with real key bounds are forwarded to the parallel scanner.
std::vector<OlapScanRange*> key_ranges;
for (auto& range : _cond_ranges) {
if (!range->has_lower_bound) {
continue;
}
key_ranges.emplace_back(range.get());
}
ParallelScannerBuilder scanner_builder(this, _tablets, _read_sources, _scanner_profile,
key_ranges, state(), p._limit, true,
p._olap_scan_node.is_preaggregation);
int max_scanners_count = state()->parallel_scan_max_scanners_count();
// If the `max_scanners_count` was not set,
// use `CpuInfo::num_cores()` as the default value.
if (max_scanners_count <= 0) {
max_scanners_count = CpuInfo::num_cores();
}
// Too small value of `min_rows_per_scanner` is meaningless.
auto min_rows_per_scanner =
std::max<int64_t>(1024, state()->parallel_scan_min_rows_per_scanner());
scanner_builder.set_max_scanners_count(max_scanners_count);
scanner_builder.set_min_rows_per_scanner(min_rows_per_scanner);
// If the session variable is set, force one scanner per segment.
if (state()->query_options().__isset.optimize_index_scan_parallelism &&
state()->query_options().optimize_index_scan_parallelism) {
// TODO: Use optimize_index_scan_parallelism for ann range search in the future.
// Currently, ann topn is enough
if (_ann_topn_runtime != nullptr) {
scanner_builder.set_scan_parallelism_by_per_segment(true);
}
}
RETURN_IF_ERROR(scanner_builder.build_scanners(*scanners));
for (auto& scanner : *scanners) {
auto* olap_scanner = assert_cast<OlapScanner*>(scanner.get());
RETURN_IF_ERROR(olap_scanner->init(state(), _conjuncts));
}
const OlapReaderStatistics* stats = scanner_builder.builder_stats();
io::FileCacheProfileReporter cache_profile(_segment_profile.get());
cache_profile.update(&stats->file_cache_stats);
DorisMetrics::instance()->query_scan_bytes_from_local->increment(
stats->file_cache_stats.bytes_read_from_local);
DorisMetrics::instance()->query_scan_bytes_from_remote->increment(
stats->file_cache_stats.bytes_read_from_remote);
return Status::OK();
}
int scanners_per_tablet = std::max(1, 64 / (int)_scan_ranges.size());
for (size_t scan_range_idx = 0; scan_range_idx < _scan_ranges.size(); scan_range_idx++) {
int64_t version = 0;
std::from_chars(_scan_ranges[scan_range_idx]->version.data(),
_scan_ranges[scan_range_idx]->version.data() +
_scan_ranges[scan_range_idx]->version.size(),
version);
std::vector<std::unique_ptr<doris::OlapScanRange>>* ranges = &_cond_ranges;
int size_based_scanners_per_tablet = 1;
if (config::doris_scan_range_max_mb > 0) {
size_based_scanners_per_tablet =
std::max(1, (int)(_tablets[scan_range_idx].tablet->tablet_footprint() /
(config::doris_scan_range_max_mb << 20)));
}
int ranges_per_scanner =
std::max(1, (int)ranges->size() /
std::min(scanners_per_tablet, size_based_scanners_per_tablet));
int64_t num_ranges = ranges->size();
for (int64_t i = 0; i < num_ranges;) {
std::vector<doris::OlapScanRange*> scanner_ranges;
scanner_ranges.push_back((*ranges)[i].get());
++i;
for (int64_t j = 1; i < num_ranges && j < ranges_per_scanner &&
(*ranges)[i]->end_include == (*ranges)[i - 1]->end_include;
++j, ++i) {
scanner_ranges.push_back((*ranges)[i].get());
}
COUNTER_UPDATE(_key_range_counter, scanner_ranges.size());
// `rs_reader` should not be shared by different scanners
for (auto& split : _read_sources[scan_range_idx].rs_splits) {
split.rs_reader = split.rs_reader->clone();
}
auto scanner =
OlapScanner::create_shared(this, OlapScanner::Params {
state(),
_scanner_profile.get(),
scanner_ranges,
_tablets[scan_range_idx].tablet,
version,
_read_sources[scan_range_idx],
p._limit,
p._olap_scan_node.is_preaggregation,
read_row_binlog,
});
RETURN_IF_ERROR(scanner->init(state(), _conjuncts));
scanners->push_back(std::move(scanner));
}
}
_tablets.clear();
_read_sources.clear();
return Status::OK();
}
Status OlapScanLocalState::_sync_cloud_tablets(RuntimeState* state) {
if (config::is_cloud_mode() && !_sync_tablet) {
_pending_tablets_num = _scan_ranges.size();
if (_pending_tablets_num > 0) {
_sync_cloud_tablets_watcher.start();
_cloud_tablet_dependency = Dependency::create_shared(
_parent->operator_id(), _parent->node_id(), "CLOUD_TABLET_DEP");
_tablets.resize(_scan_ranges.size());
std::vector<std::function<Status()>> tasks;
_sync_statistics.resize(_scan_ranges.size());
for (size_t i = 0; i < _scan_ranges.size(); i++) {
auto* sync_stats = &_sync_statistics[i];
int64_t version = 0;
std::from_chars(_scan_ranges[i]->version.data(),
_scan_ranges[i]->version.data() + _scan_ranges[i]->version.size(),
version);
auto task_ctx = state->get_task_execution_context();
auto task_create_time = std::chrono::steady_clock::now();
tasks.emplace_back([this, sync_stats, version, i, task_ctx, task_create_time]() {
// Record bthread scheduling delay
auto task_start_time = std::chrono::steady_clock::now();
if (sync_stats) {
sync_stats->bthread_schedule_delay_ns +=
std::chrono::duration_cast<std::chrono::nanoseconds>(
task_start_time - task_create_time)
.count();
}
auto task_lock = task_ctx.lock();
if (task_lock == nullptr) {
return Status::OK();
}
Defer defer([&] {
if (_pending_tablets_num.fetch_sub(1) == 1) {
_cloud_tablet_dependency->set_ready();
_sync_cloud_tablets_watcher.stop();
}
});
auto tablet =
DORIS_TRY(ExecEnv::get_tablet(_scan_ranges[i]->tablet_id, sync_stats));
_tablets[i] = {std::move(tablet), version};
SyncOptions options;
options.query_version = version;
options.merge_schema = true;
RETURN_IF_ERROR(std::dynamic_pointer_cast<CloudTablet>(_tablets[i].tablet)
->sync_rowsets(options, sync_stats));
// FIXME(plat1ko): Avoid pointer cast
ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_hotspot().count(
*_tablets[i].tablet);
return Status::OK();
});
}
RETURN_IF_ERROR(cloud::bthread_fork_join(std::move(tasks),
config::init_scanner_sync_rowsets_parallelism,
&_cloud_tablet_future));
}
_sync_tablet = true;
}
return Status::OK();
}
Status OlapScanLocalState::prepare(RuntimeState* state) {
if (_prepared) {
return Status::OK();
}
MonotonicStopWatch timer;
timer.start();
_read_sources.resize(_scan_ranges.size());
if (config::is_cloud_mode()) {
if (!_cloud_tablet_dependency ||
_cloud_tablet_dependency->is_blocked_by(nullptr) != nullptr) {
// Remote tablet still in-flight.
return Status::OK();
}
COUNTER_UPDATE(_sync_rowset_timer, _sync_cloud_tablets_watcher.elapsed_time());
RETURN_IF_ERROR(_cloud_tablet_future.get());
auto total_rowsets = std::accumulate(
_tablets.cbegin(), _tablets.cend(), 0LL,
[](long long acc, const auto& tabletWithVersion) {
return acc + tabletWithVersion.tablet->tablet_meta()->all_rs_metas().size();
});
COUNTER_UPDATE(_sync_rowset_tablets_rowsets_total_num, total_rowsets);
for (const auto& sync_stats : _sync_statistics) {
COUNTER_UPDATE(_sync_rowset_tablet_meta_cache_hit, sync_stats.tablet_meta_cache_hit);
COUNTER_UPDATE(_sync_rowset_tablet_meta_cache_miss, sync_stats.tablet_meta_cache_miss);
COUNTER_UPDATE(_sync_rowset_get_remote_tablet_meta_rpc_timer,
sync_stats.get_remote_tablet_meta_rpc_ns);
COUNTER_UPDATE(_sync_rowset_get_remote_rowsets_num, sync_stats.get_remote_rowsets_num);
COUNTER_UPDATE(_sync_rowset_get_remote_rowsets_rpc_timer,
sync_stats.get_remote_rowsets_rpc_ns);
COUNTER_UPDATE(_sync_rowset_get_local_delete_bitmap_rowsets_num,
sync_stats.get_local_delete_bitmap_rowsets_num);
COUNTER_UPDATE(_sync_rowset_get_remote_delete_bitmap_rowsets_num,
sync_stats.get_remote_delete_bitmap_rowsets_num);
COUNTER_UPDATE(_sync_rowset_get_remote_delete_bitmap_key_count,
sync_stats.get_remote_delete_bitmap_key_count);
COUNTER_UPDATE(_sync_rowset_get_remote_delete_bitmap_bytes,
sync_stats.get_remote_delete_bitmap_bytes);
COUNTER_UPDATE(_sync_rowset_get_remote_delete_bitmap_rpc_timer,
sync_stats.get_remote_delete_bitmap_rpc_ns);
COUNTER_UPDATE(_sync_rowset_bthread_schedule_wait_timer,
sync_stats.bthread_schedule_delay_ns);
COUNTER_UPDATE(_sync_rowset_meta_lock_wait_timer, sync_stats.meta_lock_wait_ns);
COUNTER_UPDATE(_sync_rowset_sync_meta_lock_wait_timer,
sync_stats.sync_meta_lock_wait_ns);
}
auto time_ms = _sync_cloud_tablets_watcher.elapsed_time_microseconds();
if (time_ms >= config::sync_rowsets_slow_threshold_ms) {
DorisMetrics::instance()->get_remote_tablet_slow_time_ms->increment(time_ms);
DorisMetrics::instance()->get_remote_tablet_slow_cnt->increment(1);
LOG_WARNING("get tablet takes too long")
.tag("query_id", print_id(PipelineXLocalState<>::_state->query_id()))
.tag("node_id", _parent->node_id())
.tag("total_time",
PrettyPrinter::print(_sync_cloud_tablets_watcher.elapsed_time(),
TUnit::TIME_NS))
.tag("num_tablets", _tablets.size())
.tag("tablet_meta_cache_hit", _sync_rowset_tablet_meta_cache_hit->value())
.tag("tablet_meta_cache_miss", _sync_rowset_tablet_meta_cache_miss->value())
.tag("get_remote_tablet_meta_rpc_time",
PrettyPrinter::print(
_sync_rowset_get_remote_tablet_meta_rpc_timer->value(),
TUnit::TIME_NS))
.tag("remote_rowsets_num", _sync_rowset_get_remote_rowsets_num->value())
.tag("get_remote_rowsets_rpc_time",
PrettyPrinter::print(_sync_rowset_get_remote_rowsets_rpc_timer->value(),
TUnit::TIME_NS))
.tag("local_delete_bitmap_rowsets_num",
_sync_rowset_get_local_delete_bitmap_rowsets_num->value())
.tag("remote_delete_bitmap_rowsets_num",
_sync_rowset_get_remote_delete_bitmap_rowsets_num->value())
.tag("remote_delete_bitmap_key_count",
_sync_rowset_get_remote_delete_bitmap_key_count->value())
.tag("remote_delete_bitmap_bytes",
PrettyPrinter::print(_sync_rowset_get_remote_delete_bitmap_bytes->value(),
TUnit::BYTES))
.tag("get_remote_delete_bitmap_rpc_time",
PrettyPrinter::print(
_sync_rowset_get_remote_delete_bitmap_rpc_timer->value(),
TUnit::TIME_NS));
}
} else {
_tablets.resize(_scan_ranges.size());
for (size_t i = 0; i < _scan_ranges.size(); i++) {
int64_t version = 0;
std::from_chars(_scan_ranges[i]->version.data(),
_scan_ranges[i]->version.data() + _scan_ranges[i]->version.size(),
version);
auto tablet = DORIS_TRY(ExecEnv::get_tablet(_scan_ranges[i]->tablet_id));
_tablets[i] = {std::move(tablet), version};
}
}
for (size_t i = 0; i < _scan_ranges.size(); i++) {
_read_sources[i] = DORIS_TRY(_tablets[i].tablet->capture_read_source(
{0, _tablets[i].version},
{.skip_missing_versions = _state->skip_missing_version(),
.enable_fetch_rowsets_from_peers = config::enable_fetch_rowsets_from_peer_replicas,
.capture_row_binlog = olap_scan_node().__isset.read_row_binlog &&
olap_scan_node().read_row_binlog,
.enable_prefer_cached_rowset =
config::is_cloud_mode() ? _state->enable_prefer_cached_rowset() : false,
.query_freshness_tolerance_ms =
config::is_cloud_mode() ? _state->query_freshness_tolerance_ms() : -1}));
if (!PipelineXLocalState<>::_state->skip_delete_predicate()) {
_read_sources[i].fill_delete_predicates();
}
if (config::enable_mow_verbose_log &&
_tablets[i].tablet->enable_unique_key_merge_on_write()) {
LOG_INFO("finish capture_rs_readers for tablet={}, query_id={}",
_tablets[i].tablet->tablet_id(),
print_id(PipelineXLocalState<>::_state->query_id()));
}
}
timer.stop();
double cost_secs = static_cast<double>(timer.elapsed_time()) / NANOS_PER_SEC;
if (cost_secs > 1) {
LOG_WARNING(
"Try to hold tablets costs {} seconds, it costs too much. (Query-ID={}, NodeId={}, "
"ScanRangeNum={})",
cost_secs, print_id(PipelineXLocalState<>::_state->query_id()), _parent->node_id(),
_scan_ranges.size());
}
_prepared = true;
return Status::OK();
}
Status OlapScanLocalState::open(RuntimeState* state) {
auto& p = _parent->cast<OlapScanOperatorX>();
for (const auto& pair : p._slot_id_to_slot_desc) {
const SlotDescriptor* slot_desc = pair.second;
std::shared_ptr<doris::TExpr> virtual_col_expr = slot_desc->get_virtual_column_expr();
if (virtual_col_expr) {
std::shared_ptr<doris::VExprContext> virtual_column_expr_ctx;
RETURN_IF_ERROR(VExpr::create_expr_tree(*virtual_col_expr, virtual_column_expr_ctx));
RETURN_IF_ERROR(virtual_column_expr_ctx->prepare(state, p.intermediate_row_desc()));
RETURN_IF_ERROR(virtual_column_expr_ctx->open(state));
_slot_id_to_virtual_column_expr[slot_desc->id()] = virtual_column_expr_ctx;
_slot_id_to_col_type[slot_desc->id()] = slot_desc->get_data_type_ptr();
int col_pos = p.intermediate_row_desc().get_column_id(slot_desc->id());
if (col_pos < 0) {
return Status::InternalError(
"Invalid virtual slot, can not find its information. Slot desc:\n{}\nRow "
"desc:\n{}",
slot_desc->debug_string(), p.row_desc().debug_string());
} else {
_slot_id_to_index_in_block[slot_desc->id()] = col_pos;
}
}
}
if (_score_runtime) {
RETURN_IF_ERROR(_score_runtime->prepare(state, p.intermediate_row_desc()));
}
if (_ann_topn_runtime) {
RETURN_IF_ERROR(_ann_topn_runtime->prepare(state, p.intermediate_row_desc()));
}
RETURN_IF_ERROR(ScanLocalState<OlapScanLocalState>::open(state));
return Status::OK();
}
TOlapScanNode& OlapScanLocalState::olap_scan_node() const {
return _parent->cast<OlapScanOperatorX>()._olap_scan_node;
}
void OlapScanLocalState::set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) {
const auto& cache_param = _parent->cast<OlapScanOperatorX>()._cache_param;
bool hit_cache = false;
// read binlog<row> scan should not participate in query cache.
if (olap_scan_node().__isset.read_row_binlog && olap_scan_node().read_row_binlog) {
hit_cache = false;
} else if (!cache_param.digest.empty() && !cache_param.force_refresh_query_cache) {
std::string cache_key;
int64_t version = 0;
auto status = QueryCache::build_cache_key(scan_ranges, cache_param, &cache_key, &version);
if (!status.ok()) {
throw doris::Exception(doris::ErrorCode::INTERNAL_ERROR, status.msg());
}
doris::QueryCacheHandle handle;
hit_cache = QueryCache::instance()->lookup(cache_key, version, &handle);
}
if (!hit_cache) {
for (auto& scan_range : scan_ranges) {
DCHECK(scan_range.scan_range.__isset.palo_scan_range);
_scan_ranges.emplace_back(new TPaloScanRange(scan_range.scan_range.palo_scan_range));
COUNTER_UPDATE(_tablet_counter, 1);
}
}
}
static std::string tablets_id_to_string(
const std::vector<std::unique_ptr<TPaloScanRange>>& scan_ranges) {
if (scan_ranges.empty()) {
return "[empty]";
}
std::stringstream ss;
ss << "[" << scan_ranges[0]->tablet_id;
for (int i = 1; i < scan_ranges.size(); ++i) {
ss << ", " << scan_ranges[i]->tablet_id;
}
ss << "]";
return ss.str();
}
inline std::string push_down_agg_to_string(const TPushAggOp::type& op) {
if (op == TPushAggOp::MINMAX) {
return "MINMAX";
} else if (op == TPushAggOp::COUNT) {
return "COUNT";
} else if (op == TPushAggOp::MIX) {
return "MIX";
} else {
return "NONE";
}
}
/// Step 2 of the scan-key generation pipeline.
///
/// Iterate key columns in schema order; for each one, look up its ColumnValueRange
/// from _slot_id_to_value_range (populated by _normalize_conjuncts) and call
/// _scan_keys.extend_scan_key() to grow the multi-column prefix key set.
///
/// Example – table t(k1 INT, k2 INT, v INT), key columns = (k1, k2):
/// Input ColumnValueRanges:
/// k1: fixed_values = {1, 2}
/// k2: fixed_values = {10}
/// After extend_scan_key(k1):
/// _begin_scan_keys = [(1), (2)] _end_scan_keys = [(1), (2)]
/// After extend_scan_key(k2):
/// _begin_scan_keys = [(1,10), (2,10)] _end_scan_keys = [(1,10), (2,10)]
///
/// Loop terminates when:
/// - A key column has no predicate (break)
/// - A range column was appended (_has_range_value, cannot extend further)
/// - The ColumnValueRange is provably empty (eos)
/// - The fixed-value set exceeds max_scan_key_num (should_break or fall back to range)
///
/// At the end, _scan_keys.get_key_range() converts these into OlapScanRange objects.
Status OlapScanLocalState::_build_key_ranges_and_filters() {
auto& p = _parent->cast<OlapScanOperatorX>();
if (p._push_down_agg_type == TPushAggOp::NONE ||
p._push_down_agg_type == TPushAggOp::COUNT_ON_INDEX) {
const std::vector<std::string>& column_names = p._olap_scan_node.key_column_name;
const std::vector<TPrimitiveType::type>& column_types = p._olap_scan_node.key_column_type;
DCHECK(column_types.size() == column_names.size());
// 1. construct scan key except last olap engine short key
_scan_keys.set_is_convertible(p.limit() == -1);
// we use `exact_range` to identify a key range is an exact range or not when we convert
// it to `_scan_keys`. If `exact_range` is true, we can just discard it from `_olap_filters`.
bool exact_range = true;
// If the `_scan_keys` cannot extend by the range of column, should stop.
bool should_break = false;
bool eos = false;
for (int column_index = 0; column_index < column_names.size() &&
!_scan_keys.has_range_value() && !eos && !should_break;
++column_index) {
if (p._colname_to_slot_id.find(column_names[column_index]) ==
p._colname_to_slot_id.end()) {
break;
}
auto iter =
_slot_id_to_value_range.find(p._colname_to_slot_id[column_names[column_index]]);
if (_slot_id_to_value_range.end() == iter) {
break;
}
DCHECK(_slot_id_to_predicates.count(iter->first) > 0);
const auto& value_range = iter->second;
std::optional<int> key_to_erase;
bool is_fixed_value_range = false;
RETURN_IF_ERROR(std::visit(
[&](auto&& range) {
// make a copy or range and pass to extend_scan_key, keep the range unchanged
// because extend_scan_key method may change the first parameter.
// but the original range may be converted to olap filters, if it's not a exact_range.
auto temp_range = range;
if (range.get_fixed_value_size() <= p._max_pushdown_conditions_per_column) {
RETURN_IF_ERROR(
_scan_keys.extend_scan_key(temp_range, p._max_scan_key_num,
&exact_range, &eos, &should_break));
if (exact_range) {
key_to_erase = iter->first;
is_fixed_value_range = range.is_fixed_value_range();
}
} else {
// if exceed max_pushdown_conditions_per_column, use whole_value_rang instead
// and will not erase from _slot_id_to_value_range, it must be not exact_range
temp_range.set_whole_value_range();
RETURN_IF_ERROR(
_scan_keys.extend_scan_key(temp_range, p._max_scan_key_num,
&exact_range, &eos, &should_break));
}
return Status::OK();
},
value_range));
// Perform the erase operation after the visit is complete, still under lock
if (key_to_erase.has_value()) {
_slot_id_to_value_range.erase(*key_to_erase);
// Determine which predicates are subsumed by the scan key range and can
// be removed. The rule depends on the ColumnValueRange type:
//
// Fixed value range → scan keys are exact point lookups, so both
// comparison (EQ/LT/LE/GT/GE) and positive IN_LIST
// predicates are fully captured and can be erased.
//
// Scope range → scan keys only capture [low, high] boundaries,
// so only comparison predicates are subsumed.
// IN_LIST predicates (whose values may NOT have been
// absorbed into the ColumnValueRange, e.g., because
// the value count exceeded max_pushdown_conditions_per_column)
// must be preserved.
//
// In either case, predicates with negation semantics (effective NE / NOT_IN_LIST)
// are never subsumed by scan key ranges and must always be preserved.
auto can_erase_predicate = [is_fixed_value_range](const ColumnPredicate& pred) {
PredicateType pt = pred.type();
bool opposite = pred.opposite();
// Effective NE: never subsumed by any scan key range.
if ((pt == PredicateType::NE && !opposite) ||
(pt == PredicateType::EQ && opposite)) {
return false;
}
// Comparison predicates (EQ/LT/LE/GT/GE) or IS_NULL/IS_NOT_NULL: subsumed by both
// fixed value and scope ranges.
if (PredicateTypeTraits::is_comparison(pt) || pt == PredicateType::IS_NULL ||
pt == PredicateType::IS_NOT_NULL) {
return true;
}
// Effective IN_LIST: only subsumed by fixed value range.
if ((pt == PredicateType::IN_LIST && !opposite) ||
(pt == PredicateType::NOT_IN_LIST && opposite)) {
return is_fixed_value_range;
}
// Everything else (BF, BITMAP, NOT_IN_LIST, etc.): keep.
return false;
};
std::vector<std::shared_ptr<ColumnPredicate>> new_predicates;
for (const auto& it : _slot_id_to_predicates[*key_to_erase]) {
if (!can_erase_predicate(*it)) {
new_predicates.push_back(it);
}
}
if (new_predicates.empty()) {
_slot_id_to_predicates.erase(*key_to_erase);
} else {
_slot_id_to_predicates[*key_to_erase] = new_predicates;
}
}
// lock is released here when it goes out of scope
}
if (eos) {
_eos = true;
_scan_dependency->set_ready();
}
} else {
custom_profile()->add_info_string("PushDownAggregate",
push_down_agg_to_string(p._push_down_agg_type));
}
if (state()->enable_profile()) {
custom_profile()->add_info_string("KeyRanges", _scan_keys.debug_string());
custom_profile()->add_info_string("TabletIds", tablets_id_to_string(_scan_ranges));
}
VLOG_CRITICAL << _scan_keys.debug_string();
return Status::OK();
}
OlapScanOperatorX::OlapScanOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id,
const DescriptorTbl& descs, int parallel_tasks,
const TQueryCacheParam& param)
: ScanOperatorX<OlapScanLocalState>(pool, tnode, operator_id, descs, parallel_tasks),
_olap_scan_node(tnode.olap_scan_node),
_cache_param(param) {
_output_tuple_id = tnode.olap_scan_node.tuple_id;
if (_olap_scan_node.__isset.sort_info && _olap_scan_node.__isset.sort_limit) {
DORIS_CHECK(_limit < 0);
DORIS_CHECK(_olap_scan_node.sort_limit > 0);
_limit_per_scanner = _olap_scan_node.sort_limit;
}
DBUG_EXECUTE_IF("segment_iterator.topn_opt_1", {
LOG(INFO) << "limit_per_scanner: " << _limit_per_scanner
<< ", sort_limit: " << _olap_scan_node.sort_limit
<< ", isset.sort_limit: " << _olap_scan_node.__isset.sort_limit;
})
if (_olap_scan_node.__isset.columns_desc && !_olap_scan_node.columns_desc.empty() &&
_olap_scan_node.columns_desc[0].col_unique_id >= 0) {
_tablet_schema = std::make_shared<TabletSchema>();
_tablet_schema->clear_columns();
for (const auto& column_desc : _olap_scan_node.columns_desc) {
_tablet_schema->append_column(TabletColumn(column_desc));
}
if (_olap_scan_node.__isset.schema_version) {
_tablet_schema->set_schema_version(_olap_scan_node.schema_version);
}
if (_olap_scan_node.__isset.indexes_desc) {
_tablet_schema->update_indexes_from_thrift(_olap_scan_node.indexes_desc);
}
}
}
// ======== Runtime Filter Partition Pruning ========
Status OlapScanOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(ScanOperatorX<OlapScanLocalState>::prepare(state));
// Parse partition boundaries once per fragment-on-host. Cost (VLiteral
// construction + ColumnPtr materialization + ColumnValueRange build per
// boundary literal) is plan-time-static, so doing it here -- rather than
// in per-instance LocalState::init -- avoids paying it parallel_tasks
// times. The parsed result lives on the generic ScanOperatorX base and is
// read by every per-instance pruner via OperatorXBase::parsed_partition_boundaries().
if (state->query_options().enable_runtime_filter_partition_prune &&
_olap_scan_node.__isset.partition_boundaries &&
!_olap_scan_node.partition_boundaries.empty()) {
RETURN_IF_ERROR(_parsed_partition_boundaries.parse(_olap_scan_node.partition_boundaries,
_slot_id_to_slot_desc));
}
return Status::OK();
}
void OlapScanLocalState::_attach_partition_boundaries() {
const auto* parsed = _parent->parsed_partition_boundaries();
if (parsed == nullptr || parsed->empty()) {
return;
}
COUNTER_SET(_total_partitions_rf_counter, parsed->total_partitions());
}
} // namespace doris