blob: e27dc73a3c30244ab0bb65a112eeb2be541f8557 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "exec/operator/file_scan_operator.h"
#include <fmt/format.h>
#include <memory>
#include "exec/operator/olap_scan_operator.h"
#include "exec/operator/scan_operator.h"
#include "exec/scan/file_scanner.h"
#include "exec/scan/scanner_context.h"
#include "format/format_common.h"
#include "storage/storage_engine.h"
#include "storage/tablet/tablet_manager.h"
namespace doris {
#include "common/compile_check_begin.h"
PushDownType FileScanLocalState::_should_push_down_binary_predicate(
VectorizedFnCall* fn_call, VExprContext* expr_ctx, Field& constant_val,
const std::set<std::string> fn_name) const {
if (!fn_name.contains(fn_call->fn().name.function_name)) {
return PushDownType::UNACCEPTABLE;
}
const auto& children = fn_call->children();
DCHECK(children.size() == 2);
DCHECK_EQ(children[0]->node_type(), TExprNodeType::SLOT_REF);
if (children[1]->is_constant()) {
std::shared_ptr<ColumnPtrWrapper> const_col_wrapper;
THROW_IF_ERROR(children[1]->get_const_col(expr_ctx, &const_col_wrapper));
const auto* const_column =
assert_cast<const ColumnConst*>(const_col_wrapper->column_ptr.get());
constant_val = const_column->operator[](0);
return PushDownType::PARTIAL_ACCEPTABLE;
} else {
// only handle constant value
return PushDownType::UNACCEPTABLE;
}
}
int FileScanLocalState::max_scanners_concurrency(RuntimeState* state) const {
// For select * from table limit 10; should just use one thread.
if (should_run_serial()) {
return 1;
}
/*
* The max concurrency of file scanners for each FileScanLocalState is determined by:
* 1. User specified max_file_scanners_concurrency which is set through session variable.
* 2. Default: 16
*
* If this is a serial operator, the max concurrency should multiply by the number of parallel instances of the operator.
*/
return (state->max_file_scanners_concurrency() > 0 ? state->max_file_scanners_concurrency()
: 16) *
(state->query_parallel_instance_num() / _parent->parallelism(state));
}
int FileScanLocalState::min_scanners_concurrency(RuntimeState* state) const {
if (should_run_serial()) {
return 1;
}
/*
* The min concurrency of scanners for each FileScanLocalState is determined by:
* 1. User specified min_file_scanners_concurrency which is set through session variable.
* 2. Default: 1
*
* If this is a serial operator, the max concurrency should multiply by the number of parallel instances of the operator.
*/
return (state->min_file_scanners_concurrency() > 0 ? state->min_file_scanners_concurrency()
: 1) *
(state->query_parallel_instance_num() / _parent->parallelism(state));
}
ScannerScheduler* FileScanLocalState::scan_scheduler(RuntimeState* state) const {
return state->get_query_ctx()->get_remote_scan_scheduler();
}
Status FileScanLocalState::_init_scanners(std::list<ScannerSPtr>* scanners) {
if (_split_source->num_scan_ranges() == 0) {
_eos = true;
return Status::OK();
}
auto& id_file_map = state()->get_id_file_map();
if (id_file_map != nullptr) {
id_file_map->set_external_scan_params(state()->get_query_ctx(), _max_scanners);
}
auto& p = _parent->cast<FileScanOperatorX>();
// There's only one scan range for each backend in batch split mode. Each backend only starts up one ScanNode instance.
uint32_t shard_num =
std::min(ScannerScheduler::default_remote_scan_thread_num() / p.parallelism(state()),
_max_scanners);
shard_num = std::max(shard_num, 1U);
_kv_cache.reset(new ShardedKVCache(shard_num));
for (int i = 0; i < _max_scanners; ++i) {
std::unique_ptr<FileScanner> scanner = FileScanner::create_unique(
state(), this, p._limit, _split_source, _scanner_profile.get(), _kv_cache.get(),
&p._colname_to_slot_id);
RETURN_IF_ERROR(scanner->init(state(), _conjuncts));
scanners->push_back(std::move(scanner));
}
return Status::OK();
}
std::string FileScanLocalState::name_suffix() const {
if (_parent->nereids_id() == -1) {
return fmt::format("(id={}, table_name={})", _parent->node_id(),
_parent->cast<FileScanOperatorX>()._table_name);
}
return fmt::format("(nereids_id={}, id={}, table_name={})", _parent->nereids_id(),
_parent->node_id(), _parent->cast<FileScanOperatorX>()._table_name);
}
void FileScanLocalState::set_scan_ranges(RuntimeState* state,
const std::vector<TScanRangeParams>& scan_ranges) {
auto& p = _parent->cast<FileScanOperatorX>();
auto calc_max_scanners = [&](int parallel_instance_num) -> int {
int max_scanners =
ScannerScheduler::default_remote_scan_thread_num() / parallel_instance_num;
// For external tables, each scanner is not bound to specific splits.
// Instead, when a scanner is scheduled, it dynamically fetches the next scan range
// from a unified split source for scanning.
// Therefore, the number of scanners only needs to match "max_scanners_concurrency"
// to ensure full-speed execution.
// For 32 core node, the default "max_scanners_concurrency" should be 16
max_scanners = std::min(max_scanners, max_scanners_concurrency(state));
return max_scanners;
};
if (scan_ranges.size() == 1) {
auto scan_range = scan_ranges[0].scan_range.ext_scan_range.file_scan_range;
if (scan_range.__isset.split_source) {
p._batch_split_mode = true;
custom_profile()->add_info_string("BatchSplitMode", "true");
auto split_source = scan_range.split_source;
RuntimeProfile::Counter* get_split_timer = ADD_TIMER(custom_profile(), "GetSplitTime");
_max_scanners = calc_max_scanners(p.parallelism(state));
_split_source = std::make_shared<RemoteSplitSourceConnector>(
state, get_split_timer, split_source.split_source_id, split_source.num_splits,
_max_scanners);
}
}
if (!p._batch_split_mode) {
_max_scanners = calc_max_scanners(p.parallelism(state));
if (_split_source == nullptr) {
_split_source = std::make_shared<LocalSplitSourceConnector>(scan_ranges, _max_scanners);
}
// currently the total number of splits in the bach split mode cannot be accurately obtained,
// so we don't do it in the batch split mode.
_max_scanners = std::min(_max_scanners, _split_source->num_scan_ranges());
}
if (!scan_ranges.empty() &&
scan_ranges[0].scan_range.ext_scan_range.file_scan_range.__isset.params) {
// for compatibility.
// in new implement, the tuple id is set in prepare phase
_output_tuple_id =
scan_ranges[0].scan_range.ext_scan_range.file_scan_range.params.dest_tuple_id;
}
}
Status FileScanLocalState::init(RuntimeState* state, LocalStateInfo& info) {
RETURN_IF_ERROR(ScanLocalState<FileScanLocalState>::init(state, info));
SCOPED_TIMER(_init_timer);
auto& p = _parent->cast<FileScanOperatorX>();
_output_tuple_id = p._output_tuple_id;
return Status::OK();
}
Status FileScanLocalState::_process_conjuncts(RuntimeState* state) {
RETURN_IF_ERROR(ScanLocalState<FileScanLocalState>::_process_conjuncts(state));
if (Base::_eos) {
return Status::OK();
}
// TODO: Push conjuncts down to reader.
return Status::OK();
}
Status FileScanOperatorX::prepare(RuntimeState* state) {
RETURN_IF_ERROR(ScanOperatorX<FileScanLocalState>::prepare(state));
if (state->get_query_ctx() != nullptr &&
state->get_query_ctx()->file_scan_range_params_map.contains(node_id())) {
TFileScanRangeParams& params =
state->get_query_ctx()->file_scan_range_params_map[node_id()];
_output_tuple_id = params.dest_tuple_id;
}
return Status::OK();
}
} // namespace doris