| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "vec/exec/scan/olap_scanner.h" |
| |
| #include <gen_cpp/Descriptors_types.h> |
| #include <gen_cpp/PlanNodes_types.h> |
| #include <gen_cpp/Types_types.h> |
| #include <glog/logging.h> |
| #include <stdlib.h> |
| #include <thrift/protocol/TDebugProtocol.h> |
| |
| #include <algorithm> |
| #include <atomic> |
| #include <iterator> |
| #include <ostream> |
| #include <set> |
| |
| #include "cloud/cloud_storage_engine.h" |
| #include "cloud/cloud_tablet_hotspot.h" |
| #include "cloud/config.h" |
| #include "common/config.h" |
| #include "common/consts.h" |
| #include "common/logging.h" |
| #include "exec/olap_utils.h" |
| #include "exprs/function_filter.h" |
| #include "io/cache/block_file_cache_profile.h" |
| #include "io/io_common.h" |
| #include "olap/id_manager.h" |
| #include "olap/inverted_index_profile.h" |
| #include "olap/olap_common.h" |
| #include "olap/olap_tuple.h" |
| #include "olap/schema_cache.h" |
| #include "olap/storage_engine.h" |
| #include "olap/tablet_schema.h" |
| #include "pipeline/exec/olap_scan_operator.h" |
| #include "runtime/descriptors.h" |
| #include "runtime/exec_env.h" |
| #include "runtime/runtime_state.h" |
| #include "service/backend_options.h" |
| #include "util/doris_metrics.h" |
| #include "util/runtime_profile.h" |
| #include "vec/common/schema_util.h" |
| #include "vec/core/block.h" |
| #include "vec/exec/scan/scan_node.h" |
| #include "vec/exprs/vexpr.h" |
| #include "vec/exprs/vexpr_context.h" |
| #include "vec/json/path_in_data.h" |
| #include "vec/olap/block_reader.h" |
| |
| namespace doris::vectorized { |
| #include "common/compile_check_avoid_begin.h" |
| |
| using ReadSource = TabletReadSource; |
| |
| OlapScanner::OlapScanner(pipeline::ScanLocalStateBase* parent, OlapScanner::Params&& params) |
| : Scanner(params.state, parent, params.limit, params.profile), |
| _key_ranges(std::move(params.key_ranges)), |
| _tablet_reader_params({.tablet = std::move(params.tablet), |
| .tablet_schema {}, |
| .aggregation = params.aggregation, |
| .version = {0, params.version}, |
| .start_key {}, |
| .end_key {}, |
| .predicates {}, |
| .function_filters {}, |
| .delete_predicates {}, |
| .target_cast_type_for_variants {}, |
| .all_access_paths {}, |
| .predicate_access_paths {}, |
| .rs_splits {}, |
| .return_columns {}, |
| .output_columns {}, |
| .remaining_conjunct_roots {}, |
| .common_expr_ctxs_push_down {}, |
| .topn_filter_source_node_ids {}, |
| .filter_block_conjuncts {}, |
| .key_group_cluster_key_idxes {}, |
| .virtual_column_exprs {}, |
| .vir_cid_to_idx_in_block {}, |
| .vir_col_idx_to_type {}, |
| .score_runtime {}, |
| .collection_statistics {}, |
| .ann_topn_runtime {}, |
| .condition_cache_digest = parent->get_condition_cache_digest()}) { |
| _tablet_reader_params.set_read_source(std::move(params.read_source), |
| _state->skip_delete_bitmap()); |
| _has_prepared = false; |
| _vector_search_params = params.state->get_vector_search_params(); |
| } |
| |
| static std::string read_columns_to_string(TabletSchemaSPtr tablet_schema, |
| const std::vector<uint32_t>& read_columns) { |
| // avoid too long for one line, |
| // it is hard to display in `show profile` stmt if one line is too long. |
| const int col_per_line = 10; |
| int i = 0; |
| std::string read_columns_string; |
| read_columns_string += "["; |
| for (auto it = read_columns.cbegin(); it != read_columns.cend(); it++) { |
| if (it != read_columns.cbegin()) { |
| read_columns_string += ", "; |
| } |
| read_columns_string += tablet_schema->columns().at(*it)->name(); |
| if (i >= col_per_line) { |
| read_columns_string += "\n"; |
| i = 0; |
| } else { |
| ++i; |
| } |
| } |
| read_columns_string += "]"; |
| return read_columns_string; |
| } |
| |
| Status OlapScanner::prepare() { |
| auto* local_state = static_cast<pipeline::OlapScanLocalState*>(_local_state); |
| auto& tablet = _tablet_reader_params.tablet; |
| auto& tablet_schema = _tablet_reader_params.tablet_schema; |
| DBUG_EXECUTE_IF("CloudTablet.capture_rs_readers.return.e-230", { |
| LOG_WARNING("CloudTablet.capture_rs_readers.return e-230 init") |
| .tag("tablet_id", tablet->tablet_id()); |
| return Status::Error<false>(-230, "injected error"); |
| }); |
| |
| for (auto& ctx : local_state->_common_expr_ctxs_push_down) { |
| VExprContextSPtr context; |
| RETURN_IF_ERROR(ctx->clone(_state, context)); |
| _common_expr_ctxs_push_down.emplace_back(context); |
| context->prepare_ann_range_search(_vector_search_params); |
| } |
| |
| for (auto pair : local_state->_slot_id_to_virtual_column_expr) { |
| // Scanner will be executed in a different thread, so we need to clone the context. |
| VExprContextSPtr context; |
| RETURN_IF_ERROR(pair.second->clone(_state, context)); |
| _slot_id_to_virtual_column_expr[pair.first] = context; |
| } |
| |
| _slot_id_to_index_in_block = local_state->_slot_id_to_index_in_block; |
| _slot_id_to_col_type = local_state->_slot_id_to_col_type; |
| _score_runtime = local_state->_score_runtime; |
| |
| _score_runtime = local_state->_score_runtime; |
| // All scanners share the same ann_topn_runtime. |
| _ann_topn_runtime = local_state->_ann_topn_runtime; |
| |
| // set limit to reduce end of rowset and segment mem use |
| _tablet_reader = std::make_unique<BlockReader>(); |
| // batch size is passed down to segment iterator, use _state->batch_size() |
| // instead of _parent->limit(), because if _parent->limit() is a very small |
| // value (e.g. select a from t where a .. and b ... limit 1), |
| // it will be very slow when reading data in segment iterator |
| _tablet_reader->set_batch_size(_state->batch_size()); |
| TabletSchemaSPtr cached_schema; |
| std::string schema_key; |
| { |
| TOlapScanNode& olap_scan_node = local_state->olap_scan_node(); |
| |
| const auto check_can_use_cache = [&]() { |
| if (!(olap_scan_node.__isset.schema_version && olap_scan_node.__isset.columns_desc && |
| !olap_scan_node.columns_desc.empty() && |
| olap_scan_node.columns_desc[0].col_unique_id >= 0 && // Why check first column? |
| tablet->tablet_schema()->num_variant_columns() == 0 && |
| tablet->tablet_schema()->num_virtual_columns() == 0)) { |
| return false; |
| } |
| |
| const bool has_pruned_column = |
| std::ranges::any_of(_output_tuple_desc->slots(), [](const auto& slot) { |
| if ((slot->type()->get_primitive_type() == PrimitiveType::TYPE_STRUCT || |
| slot->type()->get_primitive_type() == PrimitiveType::TYPE_MAP || |
| slot->type()->get_primitive_type() == PrimitiveType::TYPE_ARRAY) && |
| !slot->all_access_paths().empty()) { |
| return true; |
| } |
| return false; |
| }); |
| return !has_pruned_column; |
| }(); |
| |
| if (check_can_use_cache) { |
| schema_key = |
| SchemaCache::get_schema_key(tablet->tablet_id(), olap_scan_node.columns_desc, |
| olap_scan_node.schema_version); |
| cached_schema = SchemaCache::instance()->get_schema(schema_key); |
| } |
| if (cached_schema && cached_schema->num_virtual_columns() == 0) { |
| tablet_schema = cached_schema; |
| } else { |
| // If schema is not cached or cached schema has virtual columns, |
| // we need to create a new TabletSchema. |
| tablet_schema = std::make_shared<TabletSchema>(); |
| tablet_schema->copy_from(*tablet->tablet_schema()); |
| if (olap_scan_node.__isset.columns_desc && !olap_scan_node.columns_desc.empty() && |
| olap_scan_node.columns_desc[0].col_unique_id >= 0) { |
| // Originally scanner get TabletSchema from tablet object in BE. |
| // To support lightweight schema change for adding / dropping columns, |
| // tabletschema is bounded to rowset and tablet's schema maybe outdated, |
| // so we have to use schema from a query plan witch FE puts it in query plans. |
| tablet_schema->clear_columns(); |
| for (const auto& column_desc : olap_scan_node.columns_desc) { |
| tablet_schema->append_column(TabletColumn(column_desc)); |
| } |
| if (olap_scan_node.__isset.schema_version) { |
| tablet_schema->set_schema_version(olap_scan_node.schema_version); |
| } |
| } |
| if (olap_scan_node.__isset.indexes_desc) { |
| tablet_schema->update_indexes_from_thrift(olap_scan_node.indexes_desc); |
| } |
| } |
| |
| if (_tablet_reader_params.rs_splits.empty()) { |
| // Non-pipeline mode, Tablet : Scanner = 1 : 1 |
| // acquire tablet rowset readers at the beginning of the scan node |
| // to prevent this case: when there are lots of olap scanners to run for example 10000 |
| // the rowsets maybe compacted when the last olap scanner starts |
| ReadSource read_source; |
| |
| if (config::is_cloud_mode()) { |
| // FIXME(plat1ko): Avoid pointer cast |
| ExecEnv::GetInstance()->storage_engine().to_cloud().tablet_hotspot().count(*tablet); |
| } |
| |
| auto maybe_read_source = tablet->capture_read_source( |
| _tablet_reader_params.version, |
| { |
| .skip_missing_versions = _state->skip_missing_version(), |
| .enable_fetch_rowsets_from_peers = |
| config::enable_fetch_rowsets_from_peer_replicas, |
| .enable_prefer_cached_rowset = |
| config::is_cloud_mode() ? _state->enable_prefer_cached_rowset() |
| : false, |
| .query_freshness_tolerance_ms = |
| config::is_cloud_mode() ? _state->query_freshness_tolerance_ms() |
| : -1, |
| }); |
| if (!maybe_read_source) { |
| LOG(WARNING) << "fail to init reader. res=" << maybe_read_source.error(); |
| return maybe_read_source.error(); |
| } |
| |
| read_source = std::move(maybe_read_source.value()); |
| |
| if (config::enable_mow_verbose_log && tablet->enable_unique_key_merge_on_write()) { |
| LOG_INFO("finish capture_rs_readers for tablet={}, query_id={}", |
| tablet->tablet_id(), print_id(_state->query_id())); |
| } |
| |
| if (!_state->skip_delete_predicate()) { |
| read_source.fill_delete_predicates(); |
| } |
| _tablet_reader_params.set_read_source(std::move(read_source)); |
| } |
| |
| // Initialize tablet_reader_params |
| RETURN_IF_ERROR(_init_tablet_reader_params( |
| local_state->_parent->cast<pipeline::OlapScanOperatorX>()._slot_id_to_slot_desc, |
| _key_ranges, local_state->_slot_id_to_predicates, |
| local_state->_push_down_functions)); |
| } |
| |
| // add read columns in profile |
| if (_state->enable_profile()) { |
| _profile->add_info_string("ReadColumns", |
| read_columns_to_string(tablet_schema, _return_columns)); |
| } |
| |
| // Add newly created tablet schema to schema cache if it does not have virtual columns. |
| if (cached_schema == nullptr && !schema_key.empty() && |
| tablet_schema->num_virtual_columns() == 0 && !tablet_schema->has_pruned_columns()) { |
| SchemaCache::instance()->insert_schema(schema_key, tablet_schema); |
| } |
| |
| if (_tablet_reader_params.score_runtime) { |
| _tablet_reader_params.collection_statistics = std::make_shared<CollectionStatistics>(); |
| RETURN_IF_ERROR(_tablet_reader_params.collection_statistics->collect( |
| _state, _tablet_reader_params.rs_splits, _tablet_reader_params.tablet_schema, |
| _tablet_reader_params.common_expr_ctxs_push_down)); |
| } |
| |
| _has_prepared = true; |
| return Status::OK(); |
| } |
| |
| Status OlapScanner::open(RuntimeState* state) { |
| RETURN_IF_ERROR(Scanner::open(state)); |
| SCOPED_TIMER(_local_state->cast<pipeline::OlapScanLocalState>()._reader_init_timer); |
| |
| auto res = _tablet_reader->init(_tablet_reader_params); |
| if (!res.ok()) { |
| res.append("failed to initialize storage reader. tablet=" + |
| std::to_string(_tablet_reader_params.tablet->tablet_id()) + |
| ", backend=" + BackendOptions::get_localhost()); |
| return res; |
| } |
| |
| // Do not hold rs_splits any more to release memory. |
| _tablet_reader_params.rs_splits.clear(); |
| |
| return Status::OK(); |
| } |
| |
| // it will be called under tablet read lock because capture rs readers need |
| Status OlapScanner::_init_tablet_reader_params( |
| const phmap::flat_hash_map<int, SlotDescriptor*>& slot_id_to_slot_desc, |
| const std::vector<OlapScanRange*>& key_ranges, |
| const phmap::flat_hash_map<int, std::vector<std::shared_ptr<ColumnPredicate>>>& |
| slot_to_predicates, |
| const std::vector<FunctionFilter>& function_filters) { |
| // if the table with rowset [0-x] or [0-1] [2-y], and [0-1] is empty |
| const bool single_version = _tablet_reader_params.has_single_version(); |
| |
| if (_state->skip_storage_engine_merge()) { |
| _tablet_reader_params.direct_mode = true; |
| _tablet_reader_params.aggregation = true; |
| } else { |
| auto push_down_agg_type = _local_state->get_push_down_agg_type(); |
| _tablet_reader_params.direct_mode = _tablet_reader_params.aggregation || single_version || |
| (push_down_agg_type != TPushAggOp::NONE && |
| push_down_agg_type != TPushAggOp::COUNT_ON_INDEX); |
| } |
| |
| RETURN_IF_ERROR(_init_variant_columns()); |
| RETURN_IF_ERROR(_init_return_columns()); |
| |
| _tablet_reader_params.reader_type = ReaderType::READER_QUERY; |
| _tablet_reader_params.push_down_agg_type_opt = _local_state->get_push_down_agg_type(); |
| |
| // TODO: If a new runtime filter arrives after `_conjuncts` move to `_common_expr_ctxs_push_down`, |
| if (_common_expr_ctxs_push_down.empty()) { |
| for (auto& conjunct : _conjuncts) { |
| _tablet_reader_params.remaining_conjunct_roots.emplace_back(conjunct->root()); |
| } |
| } else { |
| for (auto& ctx : _common_expr_ctxs_push_down) { |
| _tablet_reader_params.remaining_conjunct_roots.emplace_back(ctx->root()); |
| } |
| } |
| |
| _tablet_reader_params.common_expr_ctxs_push_down = _common_expr_ctxs_push_down; |
| _tablet_reader_params.virtual_column_exprs = _virtual_column_exprs; |
| _tablet_reader_params.vir_cid_to_idx_in_block = _vir_cid_to_idx_in_block; |
| _tablet_reader_params.vir_col_idx_to_type = _vir_col_idx_to_type; |
| _tablet_reader_params.score_runtime = _score_runtime; |
| _tablet_reader_params.output_columns = |
| ((pipeline::OlapScanLocalState*)_local_state)->_output_column_ids; |
| _tablet_reader_params.ann_topn_runtime = _ann_topn_runtime; |
| for (const auto& ele : |
| ((pipeline::OlapScanLocalState*)_local_state)->_cast_types_for_variants) { |
| _tablet_reader_params.target_cast_type_for_variants[ele.first] = ele.second; |
| }; |
| auto& tablet_schema = _tablet_reader_params.tablet_schema; |
| for (auto& predicates : slot_to_predicates) { |
| const int sid = predicates.first; |
| DCHECK(slot_id_to_slot_desc.contains(sid)); |
| int32_t index = |
| tablet_schema->field_index(slot_id_to_slot_desc.find(sid)->second->col_name()); |
| if (index < 0) { |
| throw Exception( |
| Status::InternalError("Column {} not found in tablet schema", |
| slot_id_to_slot_desc.find(sid)->second->col_name())); |
| } |
| for (auto& predicate : predicates.second) { |
| _tablet_reader_params.predicates.push_back(predicate->clone(index)); |
| } |
| } |
| |
| std::copy(function_filters.cbegin(), function_filters.cend(), |
| std::inserter(_tablet_reader_params.function_filters, |
| _tablet_reader_params.function_filters.begin())); |
| |
| // Merge the columns in delete predicate that not in latest schema in to current tablet schema |
| for (auto& del_pred : _tablet_reader_params.delete_predicates) { |
| tablet_schema->merge_dropped_columns(*del_pred->tablet_schema()); |
| } |
| |
| // Range |
| for (auto* key_range : key_ranges) { |
| if (key_range->begin_scan_range.size() == 1 && |
| key_range->begin_scan_range.get_value(0) == NEGATIVE_INFINITY) { |
| continue; |
| } |
| |
| _tablet_reader_params.start_key_include = key_range->begin_include; |
| _tablet_reader_params.end_key_include = key_range->end_include; |
| |
| _tablet_reader_params.start_key.push_back(key_range->begin_scan_range); |
| _tablet_reader_params.end_key.push_back(key_range->end_scan_range); |
| } |
| |
| _tablet_reader_params.profile = _local_state->custom_profile(); |
| _tablet_reader_params.runtime_state = _state; |
| |
| _tablet_reader_params.origin_return_columns = &_return_columns; |
| _tablet_reader_params.tablet_columns_convert_to_null_set = &_tablet_columns_convert_to_null_set; |
| |
| if (_tablet_reader_params.direct_mode) { |
| _tablet_reader_params.return_columns = _return_columns; |
| } else { |
| // we need to fetch all key columns to do the right aggregation on storage engine side. |
| for (size_t i = 0; i < tablet_schema->num_key_columns(); ++i) { |
| _tablet_reader_params.return_columns.push_back(i); |
| } |
| for (auto index : _return_columns) { |
| if (tablet_schema->column(index).is_key()) { |
| continue; |
| } |
| _tablet_reader_params.return_columns.push_back(index); |
| } |
| // expand the sequence column |
| if (tablet_schema->has_sequence_col()) { |
| bool has_replace_col = false; |
| for (auto col : _return_columns) { |
| if (tablet_schema->column(col).aggregation() == |
| FieldAggregationMethod::OLAP_FIELD_AGGREGATION_REPLACE) { |
| has_replace_col = true; |
| break; |
| } |
| } |
| if (auto sequence_col_idx = tablet_schema->sequence_col_idx(); |
| has_replace_col && std::find(_return_columns.begin(), _return_columns.end(), |
| sequence_col_idx) == _return_columns.end()) { |
| _tablet_reader_params.return_columns.push_back(sequence_col_idx); |
| } |
| } |
| } |
| |
| _tablet_reader_params.use_page_cache = _state->enable_page_cache(); |
| |
| DBUG_EXECUTE_IF("NewOlapScanner::_init_tablet_reader_params.block", DBUG_BLOCK); |
| |
| if (!_state->skip_storage_engine_merge()) { |
| auto* olap_scan_local_state = (pipeline::OlapScanLocalState*)_local_state; |
| TOlapScanNode& olap_scan_node = olap_scan_local_state->olap_scan_node(); |
| // order by table keys optimization for topn |
| // will only read head/tail of data file since it's already sorted by keys |
| if (olap_scan_node.__isset.sort_info && !olap_scan_node.sort_info.is_asc_order.empty()) { |
| _limit = _local_state->limit_per_scanner(); |
| _tablet_reader_params.read_orderby_key = true; |
| if (!olap_scan_node.sort_info.is_asc_order[0]) { |
| _tablet_reader_params.read_orderby_key_reverse = true; |
| } |
| _tablet_reader_params.read_orderby_key_num_prefix_columns = |
| olap_scan_node.sort_info.is_asc_order.size(); |
| _tablet_reader_params.read_orderby_key_limit = _limit; |
| |
| if (_tablet_reader_params.read_orderby_key_limit > 0 && |
| olap_scan_local_state->_storage_no_merge()) { |
| _tablet_reader_params.filter_block_conjuncts = _conjuncts; |
| _conjuncts.clear(); |
| } |
| } |
| |
| // set push down topn filter |
| _tablet_reader_params.topn_filter_source_node_ids = |
| olap_scan_local_state->get_topn_filter_source_node_ids(_state, true); |
| if (!_tablet_reader_params.topn_filter_source_node_ids.empty()) { |
| _tablet_reader_params.topn_filter_target_node_id = |
| olap_scan_local_state->parent()->node_id(); |
| } |
| } |
| |
| // If this is a Two-Phase read query, and we need to delay the release of Rowset |
| // by rowset->update_delayed_expired_timestamp().This could expand the lifespan of Rowset |
| if (tablet_schema->field_index(BeConsts::ROWID_COL) >= 0) { |
| constexpr static int delayed_s = 60; |
| for (auto rs_reader : _tablet_reader_params.rs_splits) { |
| uint64_t delayed_expired_timestamp = |
| UnixSeconds() + _tablet_reader_params.runtime_state->execution_timeout() + |
| delayed_s; |
| rs_reader.rs_reader->rowset()->update_delayed_expired_timestamp( |
| delayed_expired_timestamp); |
| ExecEnv::GetInstance()->storage_engine().add_quering_rowset( |
| rs_reader.rs_reader->rowset()); |
| } |
| } |
| |
| if (tablet_schema->has_global_row_id()) { |
| auto& id_file_map = _state->get_id_file_map(); |
| for (auto rs_reader : _tablet_reader_params.rs_splits) { |
| id_file_map->add_temp_rowset(rs_reader.rs_reader->rowset()); |
| } |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status OlapScanner::_init_variant_columns() { |
| auto& tablet_schema = _tablet_reader_params.tablet_schema; |
| if (tablet_schema->num_variant_columns() == 0) { |
| return Status::OK(); |
| } |
| // Parent column has path info to distinction from each other |
| for (auto* slot : _output_tuple_desc->slots()) { |
| if (slot->type()->get_primitive_type() == PrimitiveType::TYPE_VARIANT) { |
| // Such columns are not exist in frontend schema info, so we need to |
| // add them into tablet_schema for later column indexing. |
| TabletColumn subcol = TabletColumn::create_materialized_variant_column( |
| tablet_schema->column_by_uid(slot->col_unique_id()).name_lower_case(), |
| slot->column_paths(), slot->col_unique_id(), |
| assert_cast<const vectorized::DataTypeVariant&>(*remove_nullable(slot->type())) |
| .variant_max_subcolumns_count()); |
| if (tablet_schema->field_index(*subcol.path_info_ptr()) < 0) { |
| tablet_schema->append_column(subcol, TabletSchema::ColumnType::VARIANT); |
| } |
| } |
| } |
| schema_util::inherit_column_attributes(tablet_schema); |
| return Status::OK(); |
| } |
| |
| Status OlapScanner::_init_return_columns() { |
| for (auto* slot : _output_tuple_desc->slots()) { |
| // variant column using path to index a column |
| int32_t index = 0; |
| auto& tablet_schema = _tablet_reader_params.tablet_schema; |
| if (slot->type()->get_primitive_type() == PrimitiveType::TYPE_VARIANT) { |
| index = tablet_schema->field_index(PathInData( |
| tablet_schema->column_by_uid(slot->col_unique_id()).name_lower_case(), |
| slot->column_paths())); |
| } else { |
| index = slot->col_unique_id() >= 0 ? tablet_schema->field_index(slot->col_unique_id()) |
| : tablet_schema->field_index(slot->col_name()); |
| } |
| |
| if (index < 0) { |
| return Status::InternalError( |
| "field name is invalid. field={}, field_name_to_index={}, col_unique_id={}", |
| slot->col_name(), tablet_schema->get_all_field_names(), slot->col_unique_id()); |
| } |
| |
| if (slot->get_virtual_column_expr()) { |
| ColumnId virtual_column_cid = index; |
| _virtual_column_exprs[virtual_column_cid] = _slot_id_to_virtual_column_expr[slot->id()]; |
| size_t idx_in_block = _slot_id_to_index_in_block[slot->id()]; |
| _vir_cid_to_idx_in_block[virtual_column_cid] = idx_in_block; |
| _vir_col_idx_to_type[idx_in_block] = _slot_id_to_col_type[slot->id()]; |
| |
| VLOG_DEBUG << fmt::format( |
| "Virtual column, slot id: {}, cid {}, column index: {}, type: {}", slot->id(), |
| virtual_column_cid, _vir_cid_to_idx_in_block[virtual_column_cid], |
| _vir_col_idx_to_type[idx_in_block]->get_name()); |
| } |
| |
| const auto& column = tablet_schema->column(index); |
| if (!slot->all_access_paths().empty()) { |
| _tablet_reader_params.all_access_paths.insert( |
| {column.unique_id(), slot->all_access_paths()}); |
| } |
| |
| if (!slot->predicate_access_paths().empty()) { |
| _tablet_reader_params.predicate_access_paths.insert( |
| {column.unique_id(), slot->predicate_access_paths()}); |
| } |
| |
| if ((slot->type()->get_primitive_type() == PrimitiveType::TYPE_STRUCT || |
| slot->type()->get_primitive_type() == PrimitiveType::TYPE_MAP || |
| slot->type()->get_primitive_type() == PrimitiveType::TYPE_ARRAY) && |
| !slot->all_access_paths().empty()) { |
| tablet_schema->add_pruned_columns_data_type(column.unique_id(), slot->type()); |
| } |
| |
| _return_columns.push_back(index); |
| if (slot->is_nullable() && !tablet_schema->column(index).is_nullable()) { |
| _tablet_columns_convert_to_null_set.emplace(index); |
| } else if (!slot->is_nullable() && tablet_schema->column(index).is_nullable()) { |
| return Status::Error<ErrorCode::INVALID_SCHEMA>( |
| "slot(id: {}, name: {})'s nullable does not match " |
| "column(tablet id: {}, index: {}, name: {}) ", |
| slot->id(), slot->col_name(), tablet_schema->table_id(), index, |
| tablet_schema->column(index).name()); |
| } |
| } |
| |
| if (_return_columns.empty()) { |
| return Status::InternalError("failed to build storage scanner, no materialized slot!"); |
| } |
| |
| return Status::OK(); |
| } |
| |
| doris::TabletStorageType OlapScanner::get_storage_type() { |
| if (config::is_cloud_mode()) { |
| // we don't have cold storage in cloud mode, all storage is treated as local |
| return doris::TabletStorageType::STORAGE_TYPE_LOCAL; |
| } |
| int local_reader = 0; |
| for (const auto& reader : _tablet_reader_params.rs_splits) { |
| local_reader += reader.rs_reader->rowset()->is_local(); |
| } |
| int total_reader = _tablet_reader_params.rs_splits.size(); |
| |
| if (local_reader == total_reader) { |
| return doris::TabletStorageType::STORAGE_TYPE_LOCAL; |
| } else if (local_reader == 0) { |
| return doris::TabletStorageType::STORAGE_TYPE_REMOTE; |
| } |
| return doris::TabletStorageType::STORAGE_TYPE_REMOTE_AND_LOCAL; |
| } |
| |
| Status OlapScanner::_get_block_impl(RuntimeState* state, Block* block, bool* eof) { |
| // Read one block from block reader |
| // ATTN: Here we need to let the _get_block_impl method guarantee the semantics of the interface, |
| // that is, eof can be set to true only when the returned block is empty. |
| RETURN_IF_ERROR(_tablet_reader->next_block_with_aggregation(block, eof)); |
| if (block->rows() > 0) { |
| _tablet_reader_params.tablet->read_block_count.fetch_add(1, std::memory_order_relaxed); |
| *eof = false; |
| } |
| return Status::OK(); |
| } |
| |
| Status OlapScanner::close(RuntimeState* state) { |
| if (!_try_close()) { |
| return Status::OK(); |
| } |
| RETURN_IF_ERROR(Scanner::close(state)); |
| return Status::OK(); |
| } |
| |
| void OlapScanner::update_realtime_counters() { |
| pipeline::OlapScanLocalState* local_state = |
| static_cast<pipeline::OlapScanLocalState*>(_local_state); |
| const OlapReaderStatistics& stats = _tablet_reader->stats(); |
| COUNTER_UPDATE(local_state->_read_compressed_counter, stats.compressed_bytes_read); |
| COUNTER_UPDATE(local_state->_read_uncompressed_counter, stats.uncompressed_bytes_read); |
| COUNTER_UPDATE(local_state->_scan_bytes, stats.uncompressed_bytes_read); |
| COUNTER_UPDATE(local_state->_scan_rows, stats.raw_rows_read); |
| |
| // Make sure the scan bytes and scan rows counter in audit log is the same as the counter in |
| // doris metrics. |
| // ScanBytes is the uncompressed bytes read from local + remote |
| // bytes_read_from_local is the compressed bytes read from local |
| // bytes_read_from_remote is the compressed bytes read from remote |
| // scan bytes > bytes_read_from_local + bytes_read_from_remote |
| _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_rows(stats.raw_rows_read); |
| _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes( |
| stats.uncompressed_bytes_read); |
| |
| // In case of no cache, we still need to update the IO stats. uncompressed bytes read == local + remote |
| if (stats.file_cache_stats.bytes_read_from_local == 0 && |
| stats.file_cache_stats.bytes_read_from_remote == 0) { |
| _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes_from_local_storage( |
| stats.compressed_bytes_read); |
| DorisMetrics::instance()->query_scan_bytes_from_local->increment( |
| stats.compressed_bytes_read); |
| } else { |
| _state->get_query_ctx()->resource_ctx()->io_context()->update_scan_bytes_from_local_storage( |
| stats.file_cache_stats.bytes_read_from_local - _bytes_read_from_local); |
| _state->get_query_ctx() |
| ->resource_ctx() |
| ->io_context() |
| ->update_scan_bytes_from_remote_storage( |
| stats.file_cache_stats.bytes_read_from_remote - _bytes_read_from_remote); |
| |
| DorisMetrics::instance()->query_scan_bytes_from_local->increment( |
| stats.file_cache_stats.bytes_read_from_local - _bytes_read_from_local); |
| DorisMetrics::instance()->query_scan_bytes_from_remote->increment( |
| stats.file_cache_stats.bytes_read_from_remote - _bytes_read_from_remote); |
| } |
| |
| _tablet_reader->mutable_stats()->compressed_bytes_read = 0; |
| _tablet_reader->mutable_stats()->uncompressed_bytes_read = 0; |
| _tablet_reader->mutable_stats()->raw_rows_read = 0; |
| |
| _bytes_read_from_local = _tablet_reader->stats().file_cache_stats.bytes_read_from_local; |
| _bytes_read_from_remote = _tablet_reader->stats().file_cache_stats.bytes_read_from_remote; |
| } |
| |
| void OlapScanner::_collect_profile_before_close() { |
| // Please don't directly enable the profile here, we need to set QueryStatistics using the counter inside. |
| if (_has_updated_counter) { |
| return; |
| } |
| _has_updated_counter = true; |
| _tablet_reader->update_profile(_profile); |
| |
| Scanner::_collect_profile_before_close(); |
| |
| // Update counters for OlapScanner |
| // Update counters from tablet reader's stats |
| auto& stats = _tablet_reader->stats(); |
| auto* local_state = (pipeline::OlapScanLocalState*)_local_state; |
| COUNTER_UPDATE(local_state->_io_timer, stats.io_ns); |
| COUNTER_UPDATE(local_state->_read_compressed_counter, stats.compressed_bytes_read); |
| COUNTER_UPDATE(local_state->_scan_bytes, stats.uncompressed_bytes_read); |
| COUNTER_UPDATE(local_state->_decompressor_timer, stats.decompress_ns); |
| COUNTER_UPDATE(local_state->_read_uncompressed_counter, stats.uncompressed_bytes_read); |
| COUNTER_UPDATE(local_state->_block_load_timer, stats.block_load_ns); |
| COUNTER_UPDATE(local_state->_block_load_counter, stats.blocks_load); |
| COUNTER_UPDATE(local_state->_block_fetch_timer, stats.block_fetch_ns); |
| COUNTER_UPDATE(local_state->_delete_bitmap_get_agg_timer, stats.delete_bitmap_get_agg_ns); |
| COUNTER_UPDATE(local_state->_scan_rows, stats.raw_rows_read); |
| COUNTER_UPDATE(local_state->_vec_cond_timer, stats.vec_cond_ns); |
| COUNTER_UPDATE(local_state->_short_cond_timer, stats.short_cond_ns); |
| COUNTER_UPDATE(local_state->_expr_filter_timer, stats.expr_filter_ns); |
| COUNTER_UPDATE(local_state->_block_init_timer, stats.block_init_ns); |
| COUNTER_UPDATE(local_state->_block_init_seek_timer, stats.block_init_seek_ns); |
| COUNTER_UPDATE(local_state->_block_init_seek_counter, stats.block_init_seek_num); |
| COUNTER_UPDATE(local_state->_segment_generate_row_range_by_keys_timer, |
| stats.generate_row_ranges_by_keys_ns); |
| COUNTER_UPDATE(local_state->_segment_generate_row_range_by_column_conditions_timer, |
| stats.generate_row_ranges_by_column_conditions_ns); |
| COUNTER_UPDATE(local_state->_segment_generate_row_range_by_bf_timer, |
| stats.generate_row_ranges_by_bf_ns); |
| COUNTER_UPDATE(local_state->_collect_iterator_merge_next_timer, |
| stats.collect_iterator_merge_next_timer); |
| COUNTER_UPDATE(local_state->_segment_generate_row_range_by_zonemap_timer, |
| stats.generate_row_ranges_by_zonemap_ns); |
| COUNTER_UPDATE(local_state->_segment_generate_row_range_by_dict_timer, |
| stats.generate_row_ranges_by_dict_ns); |
| COUNTER_UPDATE(local_state->_predicate_column_read_timer, stats.predicate_column_read_ns); |
| COUNTER_UPDATE(local_state->_non_predicate_column_read_timer, stats.non_predicate_read_ns); |
| COUNTER_UPDATE(local_state->_predicate_column_read_seek_timer, |
| stats.predicate_column_read_seek_ns); |
| COUNTER_UPDATE(local_state->_predicate_column_read_seek_counter, |
| stats.predicate_column_read_seek_num); |
| COUNTER_UPDATE(local_state->_lazy_read_timer, stats.lazy_read_ns); |
| COUNTER_UPDATE(local_state->_lazy_read_seek_timer, stats.block_lazy_read_seek_ns); |
| COUNTER_UPDATE(local_state->_lazy_read_seek_counter, stats.block_lazy_read_seek_num); |
| COUNTER_UPDATE(local_state->_output_col_timer, stats.output_col_ns); |
| COUNTER_UPDATE(local_state->_rows_vec_cond_filtered_counter, stats.rows_vec_cond_filtered); |
| COUNTER_UPDATE(local_state->_rows_short_circuit_cond_filtered_counter, |
| stats.rows_short_circuit_cond_filtered); |
| COUNTER_UPDATE(local_state->_rows_expr_cond_filtered_counter, stats.rows_expr_cond_filtered); |
| COUNTER_UPDATE(local_state->_rows_vec_cond_input_counter, stats.vec_cond_input_rows); |
| COUNTER_UPDATE(local_state->_rows_short_circuit_cond_input_counter, |
| stats.short_circuit_cond_input_rows); |
| COUNTER_UPDATE(local_state->_rows_expr_cond_input_counter, stats.expr_cond_input_rows); |
| COUNTER_UPDATE(local_state->_stats_filtered_counter, stats.rows_stats_filtered); |
| COUNTER_UPDATE(local_state->_stats_rp_filtered_counter, stats.rows_stats_rp_filtered); |
| COUNTER_UPDATE(local_state->_dict_filtered_counter, stats.segment_dict_filtered); |
| COUNTER_UPDATE(local_state->_bf_filtered_counter, stats.rows_bf_filtered); |
| COUNTER_UPDATE(local_state->_del_filtered_counter, stats.rows_del_filtered); |
| COUNTER_UPDATE(local_state->_del_filtered_counter, stats.rows_del_by_bitmap); |
| COUNTER_UPDATE(local_state->_del_filtered_counter, stats.rows_vec_del_cond_filtered); |
| COUNTER_UPDATE(local_state->_conditions_filtered_counter, stats.rows_conditions_filtered); |
| COUNTER_UPDATE(local_state->_key_range_filtered_counter, stats.rows_key_range_filtered); |
| COUNTER_UPDATE(local_state->_total_pages_num_counter, stats.total_pages_num); |
| COUNTER_UPDATE(local_state->_cached_pages_num_counter, stats.cached_pages_num); |
| COUNTER_UPDATE(local_state->_inverted_index_filter_counter, stats.rows_inverted_index_filtered); |
| COUNTER_UPDATE(local_state->_inverted_index_filter_timer, stats.inverted_index_filter_timer); |
| COUNTER_UPDATE(local_state->_inverted_index_query_cache_hit_counter, |
| stats.inverted_index_query_cache_hit); |
| COUNTER_UPDATE(local_state->_inverted_index_query_cache_miss_counter, |
| stats.inverted_index_query_cache_miss); |
| COUNTER_UPDATE(local_state->_inverted_index_query_timer, stats.inverted_index_query_timer); |
| COUNTER_UPDATE(local_state->_inverted_index_query_null_bitmap_timer, |
| stats.inverted_index_query_null_bitmap_timer); |
| COUNTER_UPDATE(local_state->_inverted_index_query_bitmap_copy_timer, |
| stats.inverted_index_query_bitmap_copy_timer); |
| COUNTER_UPDATE(local_state->_inverted_index_searcher_open_timer, |
| stats.inverted_index_searcher_open_timer); |
| COUNTER_UPDATE(local_state->_inverted_index_searcher_search_timer, |
| stats.inverted_index_searcher_search_timer); |
| COUNTER_UPDATE(local_state->_inverted_index_searcher_search_init_timer, |
| stats.inverted_index_searcher_search_init_timer); |
| COUNTER_UPDATE(local_state->_inverted_index_searcher_search_exec_timer, |
| stats.inverted_index_searcher_search_exec_timer); |
| COUNTER_UPDATE(local_state->_inverted_index_searcher_cache_hit_counter, |
| stats.inverted_index_searcher_cache_hit); |
| COUNTER_UPDATE(local_state->_inverted_index_searcher_cache_miss_counter, |
| stats.inverted_index_searcher_cache_miss); |
| COUNTER_UPDATE(local_state->_inverted_index_downgrade_count_counter, |
| stats.inverted_index_downgrade_count); |
| COUNTER_UPDATE(local_state->_inverted_index_analyzer_timer, |
| stats.inverted_index_analyzer_timer); |
| COUNTER_UPDATE(local_state->_inverted_index_lookup_timer, stats.inverted_index_lookup_timer); |
| COUNTER_UPDATE(local_state->_variant_scan_sparse_column_timer, |
| stats.variant_scan_sparse_column_timer_ns); |
| COUNTER_UPDATE(local_state->_variant_scan_sparse_column_bytes, |
| stats.variant_scan_sparse_column_bytes); |
| COUNTER_UPDATE(local_state->_variant_fill_path_from_sparse_column_timer, |
| stats.variant_fill_path_from_sparse_column_timer_ns); |
| COUNTER_UPDATE(local_state->_variant_subtree_default_iter_count, |
| stats.variant_subtree_default_iter_count); |
| COUNTER_UPDATE(local_state->_variant_subtree_leaf_iter_count, |
| stats.variant_subtree_leaf_iter_count); |
| COUNTER_UPDATE(local_state->_variant_subtree_hierarchical_iter_count, |
| stats.variant_subtree_hierarchical_iter_count); |
| COUNTER_UPDATE(local_state->_variant_subtree_sparse_iter_count, |
| stats.variant_subtree_sparse_iter_count); |
| |
| InvertedIndexProfileReporter inverted_index_profile; |
| inverted_index_profile.update(local_state->_index_filter_profile.get(), |
| &stats.inverted_index_stats); |
| |
| // only cloud deploy mode will use file cache. |
| if (config::is_cloud_mode() && config::enable_file_cache) { |
| io::FileCacheProfileReporter cache_profile(local_state->_segment_profile.get()); |
| cache_profile.update(&stats.file_cache_stats); |
| _state->get_query_ctx()->resource_ctx()->io_context()->update_bytes_write_into_cache( |
| stats.file_cache_stats.bytes_write_into_cache); |
| } |
| COUNTER_UPDATE(local_state->_output_index_result_column_timer, |
| stats.output_index_result_column_timer); |
| COUNTER_UPDATE(local_state->_filtered_segment_counter, stats.filtered_segment_number); |
| COUNTER_UPDATE(local_state->_total_segment_counter, stats.total_segment_number); |
| COUNTER_UPDATE(local_state->_condition_cache_hit_segment_counter, |
| stats.condition_cache_hit_seg_nums); |
| COUNTER_UPDATE(local_state->_condition_cache_filtered_rows_counter, |
| stats.condition_cache_filtered_rows); |
| |
| COUNTER_UPDATE(local_state->_tablet_reader_init_timer, stats.tablet_reader_init_timer_ns); |
| COUNTER_UPDATE(local_state->_tablet_reader_capture_rs_readers_timer, |
| stats.tablet_reader_capture_rs_readers_timer_ns); |
| COUNTER_UPDATE(local_state->_tablet_reader_init_return_columns_timer, |
| stats.tablet_reader_init_return_columns_timer_ns); |
| COUNTER_UPDATE(local_state->_tablet_reader_init_keys_param_timer, |
| stats.tablet_reader_init_keys_param_timer_ns); |
| COUNTER_UPDATE(local_state->_tablet_reader_init_orderby_keys_param_timer, |
| stats.tablet_reader_init_orderby_keys_param_timer_ns); |
| COUNTER_UPDATE(local_state->_tablet_reader_init_conditions_param_timer, |
| stats.tablet_reader_init_conditions_param_timer_ns); |
| COUNTER_UPDATE(local_state->_tablet_reader_init_delete_condition_param_timer, |
| stats.tablet_reader_init_delete_condition_param_timer_ns); |
| COUNTER_UPDATE(local_state->_block_reader_vcollect_iter_init_timer, |
| stats.block_reader_vcollect_iter_init_timer_ns); |
| COUNTER_UPDATE(local_state->_block_reader_rs_readers_init_timer, |
| stats.block_reader_rs_readers_init_timer_ns); |
| COUNTER_UPDATE(local_state->_block_reader_build_heap_init_timer, |
| stats.block_reader_build_heap_init_timer_ns); |
| |
| COUNTER_UPDATE(local_state->_rowset_reader_get_segment_iterators_timer, |
| stats.rowset_reader_get_segment_iterators_timer_ns); |
| COUNTER_UPDATE(local_state->_rowset_reader_create_iterators_timer, |
| stats.rowset_reader_create_iterators_timer_ns); |
| COUNTER_UPDATE(local_state->_rowset_reader_init_iterators_timer, |
| stats.rowset_reader_init_iterators_timer_ns); |
| COUNTER_UPDATE(local_state->_rowset_reader_load_segments_timer, |
| stats.rowset_reader_load_segments_timer_ns); |
| |
| COUNTER_UPDATE(local_state->_segment_iterator_init_timer, stats.segment_iterator_init_timer_ns); |
| COUNTER_UPDATE(local_state->_segment_iterator_init_return_column_iterators_timer, |
| stats.segment_iterator_init_return_column_iterators_timer_ns); |
| COUNTER_UPDATE(local_state->_segment_iterator_init_index_iterators_timer, |
| stats.segment_iterator_init_index_iterators_timer_ns); |
| |
| COUNTER_UPDATE(local_state->_segment_create_column_readers_timer, |
| stats.segment_create_column_readers_timer_ns); |
| COUNTER_UPDATE(local_state->_segment_load_index_timer, stats.segment_load_index_timer_ns); |
| |
| // Update metrics |
| DorisMetrics::instance()->query_scan_bytes->increment( |
| local_state->_read_uncompressed_counter->value()); |
| DorisMetrics::instance()->query_scan_rows->increment(local_state->_scan_rows->value()); |
| auto& tablet = _tablet_reader_params.tablet; |
| tablet->query_scan_bytes->increment(local_state->_read_uncompressed_counter->value()); |
| tablet->query_scan_rows->increment(local_state->_scan_rows->value()); |
| tablet->query_scan_count->increment(1); |
| |
| COUNTER_UPDATE(local_state->_ann_range_search_filter_counter, |
| stats.rows_ann_index_range_filtered); |
| COUNTER_UPDATE(local_state->_ann_topn_filter_counter, stats.rows_ann_index_topn_filtered); |
| COUNTER_UPDATE(local_state->_ann_index_load_costs, stats.ann_index_load_ns); |
| COUNTER_UPDATE(local_state->_ann_range_search_costs, stats.ann_index_range_search_ns); |
| COUNTER_UPDATE(local_state->_ann_range_search_cnt, stats.ann_index_range_search_cnt); |
| COUNTER_UPDATE(local_state->_ann_range_engine_search_costs, stats.ann_range_engine_search_ns); |
| // Engine prepare before search |
| COUNTER_UPDATE(local_state->_ann_range_pre_process_costs, stats.ann_range_pre_process_ns); |
| // Post process parent: Doris result process + engine convert |
| COUNTER_UPDATE(local_state->_ann_range_post_process_costs, |
| stats.ann_range_result_convert_ns + stats.ann_range_engine_convert_ns); |
| // Engine convert (child under post-process) |
| COUNTER_UPDATE(local_state->_ann_range_engine_convert_costs, stats.ann_range_engine_convert_ns); |
| // Doris-side result convert (child under post-process) |
| COUNTER_UPDATE(local_state->_ann_range_result_convert_costs, stats.ann_range_result_convert_ns); |
| |
| COUNTER_UPDATE(local_state->_ann_topn_search_costs, stats.ann_topn_search_ns); |
| COUNTER_UPDATE(local_state->_ann_topn_search_cnt, stats.ann_index_topn_search_cnt); |
| |
| // Detailed ANN timers |
| // ANN TopN timers with hierarchy |
| // Engine search time (FAISS) |
| COUNTER_UPDATE(local_state->_ann_topn_engine_search_costs, |
| stats.ann_index_topn_engine_search_ns); |
| // Engine prepare time (allocations/buffer setup before search) |
| COUNTER_UPDATE(local_state->_ann_topn_pre_process_costs, |
| stats.ann_index_topn_engine_prepare_ns); |
| // Post process parent includes Doris result processing + engine convert |
| COUNTER_UPDATE(local_state->_ann_topn_post_process_costs, |
| stats.ann_index_topn_result_process_ns + stats.ann_index_topn_engine_convert_ns); |
| // Engine-side conversion time inside FAISS wrappers (child under post-process) |
| COUNTER_UPDATE(local_state->_ann_topn_engine_convert_costs, |
| stats.ann_index_topn_engine_convert_ns); |
| |
| // Doris-side result convert costs (show separately as another child counter); use pure process time |
| COUNTER_UPDATE(local_state->_ann_topn_result_convert_costs, |
| stats.ann_index_topn_result_process_ns); |
| |
| // Overhead counter removed; precise instrumentation is reported via engine_prepare above. |
| } |
| |
| #include "common/compile_check_avoid_end.h" |
| } // namespace doris::vectorized |