| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "pipeline/exec/file_scan_operator.h" |
| |
| #include <fmt/format.h> |
| |
| #include <memory> |
| |
| #include "olap/storage_engine.h" |
| #include "olap/tablet_manager.h" |
| #include "pipeline/exec/olap_scan_operator.h" |
| #include "pipeline/exec/scan_operator.h" |
| #include "vec/exec/format/format_common.h" |
| #include "vec/exec/scan/file_scanner.h" |
| #include "vec/exec/scan/scanner_context.h" |
| |
| namespace doris::pipeline { |
| #include "common/compile_check_begin.h" |
| |
| int FileScanLocalState::max_scanners_concurrency(RuntimeState* state) const { |
| // For select * from table limit 10; should just use one thread. |
| if (should_run_serial()) { |
| return 1; |
| } |
| /* |
| * The max concurrency of file scanners for each FileScanLocalState is determined by: |
| * 1. User specified max_file_scanners_concurrency which is set through session variable. |
| * 2. Default: 16 |
| * |
| * If this is a serial operator, the max concurrency should multiply by the number of parallel instances of the operator. |
| */ |
| return (state->max_file_scanners_concurrency() > 0 ? state->max_file_scanners_concurrency() |
| : 16) * |
| (state->query_parallel_instance_num() / _parent->parallelism(state)); |
| } |
| |
| int FileScanLocalState::min_scanners_concurrency(RuntimeState* state) const { |
| if (should_run_serial()) { |
| return 1; |
| } |
| /* |
| * The min concurrency of scanners for each FileScanLocalState is determined by: |
| * 1. User specified min_file_scanners_concurrency which is set through session variable. |
| * 2. Default: 1 |
| * |
| * If this is a serial operator, the max concurrency should multiply by the number of parallel instances of the operator. |
| */ |
| return (state->min_file_scanners_concurrency() > 0 ? state->min_file_scanners_concurrency() |
| : 1) * |
| (state->query_parallel_instance_num() / _parent->parallelism(state)); |
| } |
| |
| vectorized::ScannerScheduler* FileScanLocalState::scan_scheduler(RuntimeState* state) const { |
| return state->get_query_ctx()->get_remote_scan_scheduler(); |
| } |
| |
| Status FileScanLocalState::_init_scanners(std::list<vectorized::ScannerSPtr>* scanners) { |
| if (_split_source->num_scan_ranges() == 0) { |
| _eos = true; |
| return Status::OK(); |
| } |
| |
| auto& id_file_map = state()->get_id_file_map(); |
| if (id_file_map != nullptr) { |
| id_file_map->set_external_scan_params(state()->get_query_ctx(), _max_scanners); |
| } |
| |
| auto& p = _parent->cast<FileScanOperatorX>(); |
| // There's only one scan range for each backend in batch split mode. Each backend only starts up one ScanNode instance. |
| uint32_t shard_num = std::min( |
| vectorized::ScannerScheduler::default_remote_scan_thread_num() / p.parallelism(state()), |
| _max_scanners); |
| shard_num = std::max(shard_num, 1U); |
| _kv_cache.reset(new vectorized::ShardedKVCache(shard_num)); |
| for (int i = 0; i < _max_scanners; ++i) { |
| std::unique_ptr<vectorized::FileScanner> scanner = vectorized::FileScanner::create_unique( |
| state(), this, p._limit, _split_source, _scanner_profile.get(), _kv_cache.get(), |
| &p._colname_to_slot_id); |
| RETURN_IF_ERROR(scanner->init(state(), _conjuncts)); |
| scanners->push_back(std::move(scanner)); |
| } |
| return Status::OK(); |
| } |
| |
| std::string FileScanLocalState::name_suffix() const { |
| return fmt::format("(nereids_id={}. table_name={})" + operator_name_suffix, |
| std::to_string(_parent->nereids_id()), |
| _parent->cast<FileScanOperatorX>()._table_name, |
| std::to_string(_parent->node_id())); |
| } |
| |
| void FileScanLocalState::set_scan_ranges(RuntimeState* state, |
| const std::vector<TScanRangeParams>& scan_ranges) { |
| auto& p = _parent->cast<FileScanOperatorX>(); |
| |
| auto calc_max_scanners = [&](int parallel_instance_num) -> int { |
| int max_scanners = vectorized::ScannerScheduler::default_remote_scan_thread_num() / |
| parallel_instance_num; |
| if (should_run_serial()) { |
| max_scanners = 1; |
| } |
| return max_scanners; |
| }; |
| |
| if (scan_ranges.size() == 1) { |
| auto scan_range = scan_ranges[0].scan_range.ext_scan_range.file_scan_range; |
| if (scan_range.__isset.split_source) { |
| p._batch_split_mode = true; |
| custom_profile()->add_info_string("BatchSplitMode", "true"); |
| auto split_source = scan_range.split_source; |
| RuntimeProfile::Counter* get_split_timer = ADD_TIMER(custom_profile(), "GetSplitTime"); |
| |
| _max_scanners = calc_max_scanners(p.parallelism(state)); |
| _split_source = std::make_shared<vectorized::RemoteSplitSourceConnector>( |
| state, get_split_timer, split_source.split_source_id, split_source.num_splits, |
| _max_scanners); |
| } |
| } |
| |
| if (!p._batch_split_mode) { |
| _max_scanners = calc_max_scanners(p.parallelism(state)); |
| if (_split_source == nullptr) { |
| _split_source = std::make_shared<vectorized::LocalSplitSourceConnector>(scan_ranges, |
| _max_scanners); |
| } |
| // currently the total number of splits in the bach split mode cannot be accurately obtained, |
| // so we don't do it in the batch split mode. |
| _max_scanners = std::min(_max_scanners, _split_source->num_scan_ranges()); |
| } |
| |
| if (!scan_ranges.empty() && |
| scan_ranges[0].scan_range.ext_scan_range.file_scan_range.__isset.params) { |
| // for compatibility. |
| // in new implement, the tuple id is set in prepare phase |
| _output_tuple_id = |
| scan_ranges[0].scan_range.ext_scan_range.file_scan_range.params.dest_tuple_id; |
| } |
| } |
| |
| Status FileScanLocalState::init(RuntimeState* state, LocalStateInfo& info) { |
| RETURN_IF_ERROR(ScanLocalState<FileScanLocalState>::init(state, info)); |
| SCOPED_TIMER(_init_timer); |
| auto& p = _parent->cast<FileScanOperatorX>(); |
| _output_tuple_id = p._output_tuple_id; |
| return Status::OK(); |
| } |
| |
| Status FileScanLocalState::_process_conjuncts(RuntimeState* state) { |
| RETURN_IF_ERROR(ScanLocalState<FileScanLocalState>::_process_conjuncts(state)); |
| if (Base::_eos) { |
| return Status::OK(); |
| } |
| // TODO: Push conjuncts down to reader. |
| return Status::OK(); |
| } |
| |
| Status FileScanOperatorX::prepare(RuntimeState* state) { |
| RETURN_IF_ERROR(ScanOperatorX<FileScanLocalState>::prepare(state)); |
| if (state->get_query_ctx() != nullptr && |
| state->get_query_ctx()->file_scan_range_params_map.contains(node_id())) { |
| TFileScanRangeParams& params = |
| state->get_query_ctx()->file_scan_range_params_map[node_id()]; |
| _output_tuple_id = params.dest_tuple_id; |
| } |
| return Status::OK(); |
| } |
| |
| } // namespace doris::pipeline |