blob: 5fb564e4ab896613a12ba36fa2949ca089e244e8 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "scanner_context.h"
#include <fmt/format.h>
#include <gen_cpp/Metrics_types.h>
#include <glog/logging.h>
#include <mutex>
#include <ostream>
#include <tuple>
#include <utility>
#include "common/config.h"
#include "common/status.h"
#include "olap/tablet.h"
#include "pipeline/exec/scan_operator.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "util/uid_util.h"
#include "vec/core/block.h"
#include "vec/exec/scan/scanner_scheduler.h"
#include "vec/exec/scan/vscan_node.h"
namespace doris::vectorized {
using namespace std::chrono_literals;
ScannerContext::ScannerContext(
RuntimeState* state, pipeline::ScanLocalStateBase* local_state,
const TupleDescriptor* output_tuple_desc, const RowDescriptor* output_row_descriptor,
const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners, int64_t limit_,
std::shared_ptr<pipeline::Dependency> dependency, bool ignore_data_distribution,
bool is_file_scan_operator, int num_parallel_instances)
: HasTaskExecutionCtx(state),
_state(state),
_local_state(local_state),
_output_tuple_desc(output_row_descriptor
? output_row_descriptor->tuple_descriptors().front()
: output_tuple_desc),
_output_row_descriptor(output_row_descriptor),
_batch_size(state->batch_size()),
limit(limit_),
_scanner_scheduler_global(state->exec_env()->scanner_scheduler()),
_all_scanners(scanners.begin(), scanners.end()),
_ignore_data_distribution(ignore_data_distribution),
_is_file_scan_operator(is_file_scan_operator),
_num_parallel_instances(num_parallel_instances) {
DCHECK(_output_row_descriptor == nullptr ||
_output_row_descriptor->tuple_descriptors().size() == 1);
_query_id = _state->get_query_ctx()->query_id();
ctx_id = UniqueId::gen_uid().to_string();
if (!_scanners.enqueue_bulk(scanners.begin(), scanners.size())) [[unlikely]] {
throw Exception(ErrorCode::INTERNAL_ERROR,
"Exception occurs during scanners initialization.");
};
if (limit < 0) {
limit = -1;
}
MAX_SCALE_UP_RATIO = _state->scanner_scale_up_ratio();
_query_thread_context = {_query_id, _state->query_mem_tracker(),
_state->get_query_ctx()->workload_group()};
_dependency = dependency;
DorisMetrics::instance()->scanner_ctx_cnt->increment(1);
}
// After init function call, should not access _parent
Status ScannerContext::init() {
_scanner_profile = _local_state->_scanner_profile;
_scanner_sched_counter = _local_state->_scanner_sched_counter;
_newly_create_free_blocks_num = _local_state->_newly_create_free_blocks_num;
_scale_up_scanners_counter = _local_state->_scale_up_scanners_counter;
_scanner_memory_used_counter = _local_state->_memory_used_counter;
#ifndef BE_TEST
// 3. get thread token
if (!_state->get_query_ctx()) {
return Status::InternalError("Query context of {} is not set",
print_id(_state->query_id()));
}
thread_token = _state->get_query_ctx()->get_token();
if (_state->get_query_ctx()->get_scan_scheduler()) {
_should_reset_thread_name = false;
}
#endif
_local_state->_runtime_profile->add_info_string("UseSpecificThreadToken",
thread_token == nullptr ? "False" : "True");
// _max_bytes_in_queue controls the maximum memory that can be used by a single scan instance.
// scan_queue_mem_limit on FE is 100MB by default, on backend we will make sure its actual value
// is larger than 10MB.
_max_bytes_in_queue = std::max(_state->scan_queue_mem_limit(), (int64_t)1024 * 1024 * 10);
// Provide more memory for wide tables, increase proportionally by multiples of 300
_max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1;
// TODO: Where is the proper position to place this code?
if (_all_scanners.empty()) {
_is_finished = true;
_set_scanner_done();
}
// This is a track implementation.
// The logic is kept only for the purpose of the potential performance issue.
bool submit_many_scan_tasks_for_potential_performance_issue = true;
auto scanner = _all_scanners.front().lock();
DCHECK(scanner != nullptr);
// A query could have remote scan task and local scan task at the same time.
// So we need to compute the _scanner_scheduler in each scan operator instead of query context.
SimplifiedScanScheduler* simple_scan_scheduler = _state->get_query_ctx()->get_scan_scheduler();
SimplifiedScanScheduler* remote_scan_task_scheduler =
_state->get_query_ctx()->get_remote_scan_scheduler();
if (scanner->_scanner->get_storage_type() == TabletStorageType::STORAGE_TYPE_LOCAL) {
// scan_scheduler could be empty if query does not have a workload group.
if (simple_scan_scheduler) {
_scanner_scheduler = simple_scan_scheduler;
} else {
_scanner_scheduler = _scanner_scheduler_global->get_local_scan_thread_pool();
}
} else {
// remote_scan_task_scheduler could be empty if query does not have a workload group.
if (remote_scan_task_scheduler) {
_scanner_scheduler = remote_scan_task_scheduler;
} else {
_scanner_scheduler = _scanner_scheduler_global->get_remote_scan_thread_pool();
}
}
// _scannner_scheduler will be used to submit scan task.
// file_scan_operator currentlly has performance issue if we submit too many scan tasks to scheduler.
// we should fix this problem in the future.
if (_scanner_scheduler->get_queue_size() * 2 > config::doris_scanner_thread_pool_queue_size ||
_is_file_scan_operator) {
submit_many_scan_tasks_for_potential_performance_issue = false;
}
// _max_thread_num controls how many scanners of this ScanOperator can be submitted to scheduler at a time.
// The overall target of our system is to make full utilization of the resources.
// At the same time, we dont want too many tasks are queued by scheduler, that is not necessary.
// So, first of all, we try to make sure _max_thread_num of a ScanNode of a query on a single backend is less than
// 2 * config::doris_scanner_thread_pool_thread_num, so that we can make all io threads busy.
// For example, on a 64-core machine, the default value of config::doris_scanner_thread_pool_thread_num will be 64*2 =128.
// and the num_parallel_instances of this scan operator will be 64/2=32.
// For a query who has one scan nodes, the _max_thread_num of each scan node instance will be 4 * 128 / 32 = 16.
// We have 32 instances of this scan operator, so for the ScanNode, we have 16 * 32 = 8 * 64 = 512 scanner tasks can be submitted at a time.
_max_thread_num = _state->num_scanner_threads() > 0 ? _state->num_scanner_threads() : 0;
if (_max_thread_num == 0) {
// NOTE: When ignore_data_distribution is true, the parallelism
// of the scan operator is regarded as 1 (actually maybe not).
// That will make the number of scan task can be submitted to the scheduler
// in a vary large value. This logicl is kept from the older implementation.
if (submit_many_scan_tasks_for_potential_performance_issue || _ignore_data_distribution) {
_max_thread_num = config::doris_scanner_thread_pool_thread_num / 1;
} else {
const size_t factor = _is_file_scan_operator ? 1 : 4;
_max_thread_num = factor * (config::doris_scanner_thread_pool_thread_num /
_num_parallel_instances);
// In some rare cases, user may set num_parallel_instances to 1 handly to make many query could be executed
// in parallel. We need to make sure the _max_thread_num is smaller than previous value.
_max_thread_num =
std::min(_max_thread_num, config::doris_scanner_thread_pool_thread_num);
}
}
_max_thread_num = _max_thread_num == 0 ? 1 : _max_thread_num;
// In some situation, there are not too many big tablets involed, so we can reduce the thread number.
// NOTE: when _all_scanners.size is zero, the _max_thread_num will be 0.
_max_thread_num = std::min(_max_thread_num, (int32_t)_all_scanners.size());
// 1. Calculate max concurrency
// For select * from table limit 10; should just use one thread.
if (_local_state->should_run_serial()) {
_max_thread_num = 1;
}
VLOG_CRITICAL << "debug num_scanner_threads: " << _state->num_scanner_threads()
<< ", potential_performance_issue: "
<< submit_many_scan_tasks_for_potential_performance_issue
<< ", _ignore_data_distribution: " << _ignore_data_distribution
<< ", _is_file_scan_operator: " << _is_file_scan_operator
<< ", doris_scanner_thread_pool_thread_num: "
<< config::doris_scanner_thread_pool_thread_num
<< ", _num_parallel_instances: " << _num_parallel_instances
<< ", _all_scanners.size: " << _all_scanners.size()
<< ", should_run_serial: " << _local_state->should_run_serial()
<< ", _max_thread_num: " << _max_thread_num
<< ", query id: " << print_id(_state->query_id());
// when user not specify scan_thread_num, so we can try downgrade _max_thread_num.
// becaue we found in a table with 5k columns, column reader may ocuppy too much memory.
// you can refer https://github.com/apache/doris/issues/35340 for details.
int32_t max_column_reader_num = _state->query_options().max_column_reader_num;
if (_max_thread_num != 1 && max_column_reader_num > 0) {
int32_t scan_column_num = _output_tuple_desc->slots().size();
int32_t current_column_num = scan_column_num * _max_thread_num;
if (current_column_num > max_column_reader_num) {
int32_t new_max_thread_num = max_column_reader_num / scan_column_num;
new_max_thread_num = new_max_thread_num <= 0 ? 1 : new_max_thread_num;
if (new_max_thread_num < _max_thread_num) {
int32_t origin_max_thread_num = _max_thread_num;
_max_thread_num = new_max_thread_num;
LOG(INFO) << "downgrade query:" << print_id(_state->query_id())
<< " scan's max_thread_num from " << origin_max_thread_num << " to "
<< _max_thread_num << ",column num: " << scan_column_num
<< ", max_column_reader_num: " << max_column_reader_num;
}
}
}
COUNTER_SET(_local_state->_max_scanner_thread_num, (int64_t)_max_thread_num);
// submit `_max_thread_num` running scanners to `ScannerScheduler`
// When a running scanners is finished, it will submit one of the remaining scanners.
for (int i = 0; i < _max_thread_num; ++i) {
std::weak_ptr<ScannerDelegate> next_scanner;
if (_scanners.try_dequeue(next_scanner)) {
RETURN_IF_ERROR(submit_scan_task(std::make_shared<ScanTask>(next_scanner)));
_num_running_scanners++;
}
}
return Status::OK();
}
vectorized::BlockUPtr ScannerContext::get_free_block(bool force) {
vectorized::BlockUPtr block = nullptr;
if (_free_blocks.try_dequeue(block)) {
DCHECK(block->mem_reuse());
_block_memory_usage -= block->allocated_bytes();
_scanner_memory_used_counter->set(static_cast<int64_t>(_block_memory_usage));
// A free block is reused, so the memory usage should be decreased
// The caller of get_free_block will increase the memory usage
} else if (_block_memory_usage < _max_bytes_in_queue || force) {
_newly_create_free_blocks_num->update(1);
block = vectorized::Block::create_unique(_output_tuple_desc->slots(), 0,
true /*ignore invalid slots*/);
}
return block;
}
void ScannerContext::return_free_block(vectorized::BlockUPtr block) {
if (block->mem_reuse() && _block_memory_usage < _max_bytes_in_queue) {
size_t block_size_to_reuse = block->allocated_bytes();
_block_memory_usage += block_size_to_reuse;
_scanner_memory_used_counter->set(static_cast<int64_t>(_block_memory_usage));
block->clear_column_data();
_free_blocks.enqueue(std::move(block));
}
}
Status ScannerContext::submit_scan_task(std::shared_ptr<ScanTask> scan_task) {
_scanner_sched_counter->update(1);
_num_scheduled_scanners++;
return _scanner_scheduler_global->submit(shared_from_this(), scan_task);
}
void ScannerContext::push_back_scan_task(std::shared_ptr<ScanTask> scan_task) {
if (scan_task->status_ok()) {
for (const auto& [block, _] : scan_task->cached_blocks) {
if (block->rows() > 0) {
Status st = validate_block_schema(block.get());
if (!st.ok()) {
scan_task->set_status(st);
break;
}
}
}
}
std::lock_guard<std::mutex> l(_transfer_lock);
if (!scan_task->status_ok()) {
_process_status = scan_task->get_status();
}
if (_last_scale_up_time == 0) {
_last_scale_up_time = UnixMillis();
}
if (_tasks_queue.empty() && _last_fetch_time != 0) {
// there's no block in queue before current block, so the consumer is waiting
_total_wait_block_time += UnixMillis() - _last_fetch_time;
}
_num_scheduled_scanners--;
_tasks_queue.emplace_back(scan_task);
_dependency->set_ready();
}
Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Block* block,
bool* eos, int id) {
if (state->is_cancelled()) {
_set_scanner_done();
return state->cancel_reason();
}
std::unique_lock l(_transfer_lock);
if (!_process_status.ok()) {
_set_scanner_done();
return _process_status;
}
if (!_tasks_queue.empty() && !done()) {
_last_fetch_time = UnixMillis();
auto scan_task = _tasks_queue.front();
DCHECK(scan_task);
// The abnormal status of scanner may come from the execution of the scanner itself,
// or come from the scanner scheduler, such as TooManyTasks.
if (!scan_task->status_ok()) {
// TODO: If the scanner status is TooManyTasks, maybe we can retry the scanner after a while.
_process_status = scan_task->get_status();
_set_scanner_done();
return _process_status;
}
if (!scan_task->cached_blocks.empty()) {
auto [current_block, block_size] = std::move(scan_task->cached_blocks.front());
scan_task->cached_blocks.pop_front();
if (_estimated_block_size > block_size) {
_estimated_block_size = block_size;
}
_block_memory_usage -= block_size;
// consume current block
block->swap(*current_block);
return_free_block(std::move(current_block));
} else {
// This scan task do not have any cached blocks.
_tasks_queue.pop_front();
// current scanner is finished, and no more data to read
if (scan_task->is_eos()) {
_num_finished_scanners++;
std::weak_ptr<ScannerDelegate> next_scanner;
// submit one of the remaining scanners
if (_scanners.try_dequeue(next_scanner)) {
auto submit_status = submit_scan_task(std::make_shared<ScanTask>(next_scanner));
if (!submit_status.ok()) {
_process_status = submit_status;
_set_scanner_done();
return _process_status;
}
} else {
// no more scanner to be scheduled
// `_free_blocks` serve all running scanners, maybe it's too large for the remaining scanners
int free_blocks_for_each = _free_blocks.size_approx() / _num_running_scanners;
_num_running_scanners--;
for (int i = 0; i < free_blocks_for_each; ++i) {
vectorized::BlockUPtr removed_block;
if (_free_blocks.try_dequeue(removed_block)) {
_block_memory_usage -= block->allocated_bytes();
}
}
}
} else {
// resubmit current running scanner to read the next block
Status submit_status = submit_scan_task(scan_task);
if (!submit_status.ok()) {
_process_status = submit_status;
_set_scanner_done();
return _process_status;
}
}
}
// scale up
RETURN_IF_ERROR(_try_to_scale_up());
}
if (_num_finished_scanners == _all_scanners.size() && _tasks_queue.empty()) {
_set_scanner_done();
_is_finished = true;
}
*eos = done();
if (_tasks_queue.empty()) {
_dependency->block();
}
return Status::OK();
}
Status ScannerContext::_try_to_scale_up() {
// Four criteria to determine whether to increase the parallelism of the scanners
// 1. It ran for at least `SCALE_UP_DURATION` ms after last scale up
// 2. Half(`WAIT_BLOCK_DURATION_RATIO`) of the duration is waiting to get blocks
// 3. `_free_blocks_memory_usage` < `_max_bytes_in_queue`, remains enough memory to scale up
// 4. At most scale up `MAX_SCALE_UP_RATIO` times to `_max_thread_num`
if (MAX_SCALE_UP_RATIO > 0 && _scanners.size_approx() > 0 &&
(_num_running_scanners < _max_thread_num * MAX_SCALE_UP_RATIO) &&
(_last_fetch_time - _last_scale_up_time > SCALE_UP_DURATION) && // duration > 5000ms
(_total_wait_block_time > (_last_fetch_time - _last_scale_up_time) *
WAIT_BLOCK_DURATION_RATIO)) { // too large lock time
double wait_ratio =
(double)_total_wait_block_time / (_last_fetch_time - _last_scale_up_time);
if (_last_wait_duration_ratio > 0 && wait_ratio > _last_wait_duration_ratio * 0.8) {
// when _last_wait_duration_ratio > 0, it has scaled up before.
// we need to determine if the scale-up is effective:
// the wait duration ratio after last scaling up should less than 80% of `_last_wait_duration_ratio`
return Status::OK();
}
bool is_scale_up = false;
// calculate the number of scanners that can be scheduled
int num_add = int(std::min(_num_running_scanners * SCALE_UP_RATIO,
_max_thread_num * MAX_SCALE_UP_RATIO - _num_running_scanners));
if (_estimated_block_size > 0) {
int most_add = (_max_bytes_in_queue - _block_memory_usage) / _estimated_block_size;
num_add = std::min(num_add, most_add);
}
for (int i = 0; i < num_add; ++i) {
// get enough memory to launch one more scanner.
std::weak_ptr<ScannerDelegate> scale_up_scanner;
if (_scanners.try_dequeue(scale_up_scanner)) {
// Just return error to caller.
// Because _try_to_scale_up is called under _transfer_lock locked, if we add the scanner
// to the block queue, we will get a deadlock.
RETURN_IF_ERROR(submit_scan_task(std::make_shared<ScanTask>(scale_up_scanner)));
_num_running_scanners++;
_scale_up_scanners_counter->update(1);
is_scale_up = true;
} else {
break;
}
}
if (is_scale_up) {
_last_wait_duration_ratio = wait_ratio;
_last_scale_up_time = UnixMillis();
_total_wait_block_time = 0;
}
}
return Status::OK();
}
Status ScannerContext::validate_block_schema(Block* block) {
size_t index = 0;
for (auto& slot : _output_tuple_desc->slots()) {
if (!slot->need_materialize()) {
continue;
}
auto& data = block->get_by_position(index++);
if (data.column->is_nullable() != data.type->is_nullable()) {
return Status::Error<ErrorCode::INVALID_SCHEMA>(
"column(name: {}) nullable({}) does not match type nullable({}), slot(id: {}, "
"name:{})",
data.name, data.column->is_nullable(), data.type->is_nullable(), slot->id(),
slot->col_name());
}
if (data.column->is_nullable() != slot->is_nullable()) {
return Status::Error<ErrorCode::INVALID_SCHEMA>(
"column(name: {}) nullable({}) does not match slot(id: {}, name: {}) "
"nullable({})",
data.name, data.column->is_nullable(), slot->id(), slot->col_name(),
slot->is_nullable());
}
}
return Status::OK();
}
void ScannerContext::stop_scanners(RuntimeState* state) {
std::lock_guard<std::mutex> l(_transfer_lock);
if (_should_stop) {
return;
}
_should_stop = true;
_set_scanner_done();
for (const std::weak_ptr<ScannerDelegate>& scanner : _all_scanners) {
if (std::shared_ptr<ScannerDelegate> sc = scanner.lock()) {
sc->_scanner->try_stop();
}
}
_tasks_queue.clear();
// TODO yiguolei, call mark close to scanners
if (state->enable_profile()) {
std::stringstream scanner_statistics;
std::stringstream scanner_rows_read;
std::stringstream scanner_wait_worker_time;
std::stringstream scanner_projection;
scanner_statistics << "[";
scanner_rows_read << "[";
scanner_wait_worker_time << "[";
scanner_projection << "[";
// Scanners can in 3 state
// state 1: in scanner context, not scheduled
// state 2: in scanner worker pool's queue, scheduled but not running
// state 3: scanner is running.
for (auto& scanner_ref : _all_scanners) {
auto scanner = scanner_ref.lock();
if (scanner == nullptr) {
continue;
}
// Add per scanner running time before close them
scanner_statistics << PrettyPrinter::print(scanner->_scanner->get_time_cost_ns(),
TUnit::TIME_NS)
<< ", ";
scanner_projection << PrettyPrinter::print(scanner->_scanner->projection_time(),
TUnit::TIME_NS)
<< ", ";
scanner_rows_read << PrettyPrinter::print(scanner->_scanner->get_rows_read(),
TUnit::UNIT)
<< ", ";
scanner_wait_worker_time
<< PrettyPrinter::print(scanner->_scanner->get_scanner_wait_worker_timer(),
TUnit::TIME_NS)
<< ", ";
// since there are all scanners, some scanners is running, so that could not call scanner
// close here.
}
scanner_statistics << "]";
scanner_rows_read << "]";
scanner_wait_worker_time << "]";
scanner_projection << "]";
_scanner_profile->add_info_string("PerScannerRunningTime", scanner_statistics.str());
_scanner_profile->add_info_string("PerScannerRowsRead", scanner_rows_read.str());
_scanner_profile->add_info_string("PerScannerWaitTime", scanner_wait_worker_time.str());
_scanner_profile->add_info_string("PerScannerProjectionTime", scanner_projection.str());
}
}
std::string ScannerContext::debug_string() {
return fmt::format(
"id: {}, total scanners: {}, pending tasks: {},"
" _should_stop: {}, _is_finished: {}, free blocks: {},"
" limit: {}, _num_running_scanners: {}, _max_thread_num: {},"
" _max_bytes_in_queue: {}, query_id: {}",
ctx_id, _all_scanners.size(), _tasks_queue.size(), _should_stop, _is_finished,
_free_blocks.size_approx(), limit, _num_scheduled_scanners, _max_thread_num,
_max_bytes_in_queue, print_id(_query_id));
}
void ScannerContext::_set_scanner_done() {
_dependency->set_always_ready();
}
void ScannerContext::update_peak_running_scanner(int num) {
_local_state->_peak_running_scanner->add(num);
}
} // namespace doris::vectorized