blob: 4aabf67c2618ad660b34d5f1203930db1552c6dd [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "scanner_context.h"
#include <fmt/format.h>
#include <gen_cpp/Metrics_types.h>
#include <glog/logging.h>
#include <zconf.h>
#include <cstdint>
#include <ctime>
#include <memory>
#include <mutex>
#include <ostream>
#include <shared_mutex>
#include <tuple>
#include <utility>
#include "common/config.h"
#include "common/exception.h"
#include "common/logging.h"
#include "common/status.h"
#include "olap/tablet.h"
#include "pipeline/exec/scan_operator.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/runtime_state.h"
#include "util/doris_metrics.h"
#include "util/runtime_profile.h"
#include "util/time.h"
#include "util/uid_util.h"
#include "vec/core/block.h"
#include "vec/exec/scan/scan_node.h"
#include "vec/exec/scan/scanner_scheduler.h"
namespace doris::vectorized {
using namespace std::chrono_literals;
#include "common/compile_check_begin.h"
ScannerContext::ScannerContext(
RuntimeState* state, pipeline::ScanLocalStateBase* local_state,
const TupleDescriptor* output_tuple_desc, const RowDescriptor* output_row_descriptor,
const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners, int64_t limit_,
std::shared_ptr<pipeline::Dependency> dependency
#ifdef BE_TEST
,
int num_parallel_instances
#endif
)
: HasTaskExecutionCtx(state),
_state(state),
_local_state(local_state),
_output_tuple_desc(output_row_descriptor
? output_row_descriptor->tuple_descriptors().front()
: output_tuple_desc),
_output_row_descriptor(output_row_descriptor),
_batch_size(state->batch_size()),
limit(limit_),
_all_scanners(scanners.begin(), scanners.end()),
#ifndef BE_TEST
_scanner_scheduler(local_state->scan_scheduler(state)),
_min_scan_concurrency_of_scan_scheduler(
_scanner_scheduler->get_min_active_scan_threads()),
_max_scan_concurrency(std::min(local_state->max_scanners_concurrency(state),
cast_set<int>(scanners.size()))),
#else
_scanner_scheduler(state->get_query_ctx()->get_scan_scheduler()),
_min_scan_concurrency_of_scan_scheduler(0),
_max_scan_concurrency(num_parallel_instances),
#endif
_min_scan_concurrency(local_state->min_scanners_concurrency(state)) {
DCHECK(_state != nullptr);
DCHECK(_output_row_descriptor == nullptr ||
_output_row_descriptor->tuple_descriptors().size() == 1);
_query_id = _state->get_query_ctx()->query_id();
_resource_ctx = _state->get_query_ctx()->resource_ctx();
ctx_id = UniqueId::gen_uid().to_string();
for (auto& scanner : _all_scanners) {
_pending_scanners.push(std::make_shared<ScanTask>(scanner));
};
if (limit < 0) {
limit = -1;
}
_dependency = dependency;
DorisMetrics::instance()->scanner_ctx_cnt->increment(1);
}
// After init function call, should not access _parent
Status ScannerContext::init() {
#ifndef BE_TEST
_scanner_profile = _local_state->_scanner_profile;
_newly_create_free_blocks_num = _local_state->_newly_create_free_blocks_num;
_scanner_memory_used_counter = _local_state->_memory_used_counter;
// 3. get thread token
if (!_state->get_query_ctx()) {
return Status::InternalError("Query context of {} is not set",
print_id(_state->query_id()));
}
if (_state->get_query_ctx()->get_scan_scheduler()) {
_should_reset_thread_name = false;
}
auto scanner = _all_scanners.front().lock();
DCHECK(scanner != nullptr);
if (auto* task_executor_scheduler =
dynamic_cast<TaskExecutorSimplifiedScanScheduler*>(_scanner_scheduler)) {
std::shared_ptr<TaskExecutor> task_executor = task_executor_scheduler->task_executor();
vectorized::TaskId task_id(fmt::format("{}-{}", print_id(_state->query_id()), ctx_id));
_task_handle = DORIS_TRY(task_executor->create_task(
task_id, []() { return 0.0; },
config::task_executor_initial_max_concurrency_per_task > 0
? config::task_executor_initial_max_concurrency_per_task
: std::max(48, CpuInfo::num_cores() * 2),
std::chrono::milliseconds(100), std::nullopt));
}
#endif
// _max_bytes_in_queue controls the maximum memory that can be used by a single scan operator.
// scan_queue_mem_limit on FE is 100MB by default, on backend we will make sure its actual value
// is larger than 10MB.
_max_bytes_in_queue = std::max(_state->scan_queue_mem_limit(), (int64_t)1024 * 1024 * 10);
// Provide more memory for wide tables, increase proportionally by multiples of 300
_max_bytes_in_queue *= _output_tuple_desc->slots().size() / 300 + 1;
if (_all_scanners.empty()) {
_is_finished = true;
_set_scanner_done();
}
// when user not specify scan_thread_num, so we can try downgrade _max_thread_num.
// becaue we found in a table with 5k columns, column reader may ocuppy too much memory.
// you can refer https://github.com/apache/doris/issues/35340 for details.
const int32_t max_column_reader_num = _state->max_column_reader_num();
if (_max_scan_concurrency != 1 && max_column_reader_num > 0) {
int32_t scan_column_num = cast_set<int32_t>(_output_tuple_desc->slots().size());
int32_t current_column_num = scan_column_num * _max_scan_concurrency;
if (current_column_num > max_column_reader_num) {
int32_t new_max_thread_num = max_column_reader_num / scan_column_num;
new_max_thread_num = new_max_thread_num <= 0 ? 1 : new_max_thread_num;
if (new_max_thread_num < _max_scan_concurrency) {
int32_t origin_max_thread_num = _max_scan_concurrency;
_max_scan_concurrency = new_max_thread_num;
LOG(INFO) << "downgrade query:" << print_id(_state->query_id())
<< " scan's max_thread_num from " << origin_max_thread_num << " to "
<< _max_scan_concurrency << ",column num: " << scan_column_num
<< ", max_column_reader_num: " << max_column_reader_num;
}
}
}
COUNTER_SET(_local_state->_max_scan_concurrency, (int64_t)_max_scan_concurrency);
COUNTER_SET(_local_state->_min_scan_concurrency, (int64_t)_min_scan_concurrency);
std::unique_lock<std::mutex> l(_transfer_lock);
RETURN_IF_ERROR(_scanner_scheduler->schedule_scan_task(shared_from_this(), nullptr, l));
return Status::OK();
}
ScannerContext::~ScannerContext() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_resource_ctx->memory_context()->mem_tracker());
_tasks_queue.clear();
vectorized::BlockUPtr block;
while (_free_blocks.try_dequeue(block)) {
// do nothing
}
block.reset();
DorisMetrics::instance()->scanner_ctx_cnt->increment(-1);
if (_task_handle) {
if (auto* task_executor_scheduler =
dynamic_cast<TaskExecutorSimplifiedScanScheduler*>(_scanner_scheduler)) {
static_cast<void>(task_executor_scheduler->task_executor()->remove_task(_task_handle));
}
_task_handle = nullptr;
}
}
vectorized::BlockUPtr ScannerContext::get_free_block(bool force) {
vectorized::BlockUPtr block = nullptr;
if (_free_blocks.try_dequeue(block)) {
DCHECK(block->mem_reuse());
_block_memory_usage -= block->allocated_bytes();
_scanner_memory_used_counter->set(_block_memory_usage);
// A free block is reused, so the memory usage should be decreased
// The caller of get_free_block will increase the memory usage
} else if (_block_memory_usage < _max_bytes_in_queue || force) {
_newly_create_free_blocks_num->update(1);
block = vectorized::Block::create_unique(_output_tuple_desc->slots(), 0);
}
return block;
}
void ScannerContext::return_free_block(vectorized::BlockUPtr block) {
// If under low memory mode, should not return the freeblock, it will occupy too much memory.
if (!_local_state->low_memory_mode() && block->mem_reuse() &&
_block_memory_usage < _max_bytes_in_queue) {
size_t block_size_to_reuse = block->allocated_bytes();
_block_memory_usage += block_size_to_reuse;
_scanner_memory_used_counter->set(_block_memory_usage);
block->clear_column_data();
// Free blocks is used to improve memory efficiency. Failure during pushing back
// free block will not incur any bad result so just ignore the return value.
_free_blocks.enqueue(std::move(block));
}
}
Status ScannerContext::submit_scan_task(std::shared_ptr<ScanTask> scan_task,
std::unique_lock<std::mutex>& /*transfer_lock*/) {
// increase _num_finished_scanners no matter the scan_task is submitted successfully or not.
// since if submit failed, it will be added back by ScannerContext::push_back_scan_task
// and _num_finished_scanners will be reduced.
// if submit succeed, it will be also added back by ScannerContext::push_back_scan_task
// see ScannerScheduler::_scanner_scan.
_num_scheduled_scanners++;
return _scanner_scheduler->submit(shared_from_this(), scan_task);
}
void ScannerContext::clear_free_blocks() {
clear_blocks(_free_blocks);
}
void ScannerContext::push_back_scan_task(std::shared_ptr<ScanTask> scan_task) {
if (scan_task->status_ok()) {
for (const auto& [block, _] : scan_task->cached_blocks) {
if (block->rows() > 0) {
Status st = validate_block_schema(block.get());
if (!st.ok()) {
scan_task->set_status(st);
break;
}
}
}
}
std::lock_guard<std::mutex> l(_transfer_lock);
if (!scan_task->status_ok()) {
_process_status = scan_task->get_status();
}
_tasks_queue.push_back(scan_task);
_num_scheduled_scanners--;
_dependency->set_ready();
}
Status ScannerContext::get_block_from_queue(RuntimeState* state, vectorized::Block* block,
bool* eos, int id) {
if (state->is_cancelled()) {
_set_scanner_done();
return state->cancel_reason();
}
std::unique_lock l(_transfer_lock);
if (!_process_status.ok()) {
_set_scanner_done();
return _process_status;
}
std::shared_ptr<ScanTask> scan_task = nullptr;
if (!_tasks_queue.empty() && !done()) {
// https://en.cppreference.com/w/cpp/container/list/front
// The behavior is undefined if the list is empty.
scan_task = _tasks_queue.front();
}
if (scan_task != nullptr) {
// The abnormal status of scanner may come from the execution of the scanner itself,
// or come from the scanner scheduler, such as TooManyTasks.
if (!scan_task->status_ok()) {
// TODO: If the scanner status is TooManyTasks, maybe we can retry the scanner after a while.
_process_status = scan_task->get_status();
_set_scanner_done();
return _process_status;
}
// No need to worry about small block, block is merged together when they are appended to cached_blocks.
if (!scan_task->cached_blocks.empty()) {
auto [current_block, block_size] = std::move(scan_task->cached_blocks.front());
scan_task->cached_blocks.pop_front();
_block_memory_usage -= block_size;
// consume current block
block->swap(*current_block);
return_free_block(std::move(current_block));
}
VLOG_DEBUG << fmt::format(
"ScannerContext {} get block from queue, task_queue size {}, current scan "
"task remaing cached_block size {}, eos {}, scheduled tasks {}",
ctx_id, _tasks_queue.size(), scan_task->cached_blocks.size(), scan_task->is_eos(),
_num_scheduled_scanners);
if (scan_task->cached_blocks.empty()) {
// All Cached blocks are consumed, pop this task from task_queue.
if (!_tasks_queue.empty()) {
_tasks_queue.pop_front();
}
if (scan_task->is_eos()) {
// 1. if eos, record a finished scanner.
_num_finished_scanners++;
RETURN_IF_ERROR(
_scanner_scheduler->schedule_scan_task(shared_from_this(), nullptr, l));
} else {
RETURN_IF_ERROR(
_scanner_scheduler->schedule_scan_task(shared_from_this(), scan_task, l));
}
}
}
if (_num_finished_scanners == _all_scanners.size() && _tasks_queue.empty()) {
_set_scanner_done();
_is_finished = true;
}
*eos = done();
if (_tasks_queue.empty()) {
_dependency->block();
}
return Status::OK();
}
Status ScannerContext::validate_block_schema(Block* block) {
size_t index = 0;
for (auto& slot : _output_tuple_desc->slots()) {
auto& data = block->get_by_position(index++);
if (data.column->is_nullable() != data.type->is_nullable()) {
return Status::Error<ErrorCode::INVALID_SCHEMA>(
"column(name: {}) nullable({}) does not match type nullable({}), slot(id: "
"{}, "
"name:{})",
data.name, data.column->is_nullable(), data.type->is_nullable(), slot->id(),
slot->col_name());
}
if (data.column->is_nullable() != slot->is_nullable()) {
return Status::Error<ErrorCode::INVALID_SCHEMA>(
"column(name: {}) nullable({}) does not match slot(id: {}, name: {}) "
"nullable({})",
data.name, data.column->is_nullable(), slot->id(), slot->col_name(),
slot->is_nullable());
}
}
return Status::OK();
}
void ScannerContext::stop_scanners(RuntimeState* state) {
std::lock_guard<std::mutex> l(_transfer_lock);
if (_should_stop) {
return;
}
_should_stop = true;
_set_scanner_done();
for (const std::weak_ptr<ScannerDelegate>& scanner : _all_scanners) {
if (std::shared_ptr<ScannerDelegate> sc = scanner.lock()) {
sc->_scanner->try_stop();
}
}
_tasks_queue.clear();
if (_task_handle) {
if (auto* task_executor_scheduler =
dynamic_cast<TaskExecutorSimplifiedScanScheduler*>(_scanner_scheduler)) {
static_cast<void>(task_executor_scheduler->task_executor()->remove_task(_task_handle));
}
_task_handle = nullptr;
}
// TODO yiguolei, call mark close to scanners
if (state->enable_profile()) {
std::stringstream scanner_statistics;
std::stringstream scanner_rows_read;
std::stringstream scanner_wait_worker_time;
std::stringstream scanner_projection;
scanner_statistics << "[";
scanner_rows_read << "[";
scanner_wait_worker_time << "[";
scanner_projection << "[";
// Scanners can in 3 state
// state 1: in scanner context, not scheduled
// state 2: in scanner worker pool's queue, scheduled but not running
// state 3: scanner is running.
for (auto& scanner_ref : _all_scanners) {
auto scanner = scanner_ref.lock();
if (scanner == nullptr) {
continue;
}
// Add per scanner running time before close them
scanner_statistics << PrettyPrinter::print(scanner->_scanner->get_time_cost_ns(),
TUnit::TIME_NS)
<< ", ";
scanner_projection << PrettyPrinter::print(scanner->_scanner->projection_time(),
TUnit::TIME_NS)
<< ", ";
scanner_rows_read << PrettyPrinter::print(scanner->_scanner->get_rows_read(),
TUnit::UNIT)
<< ", ";
scanner_wait_worker_time
<< PrettyPrinter::print(scanner->_scanner->get_scanner_wait_worker_timer(),
TUnit::TIME_NS)
<< ", ";
// since there are all scanners, some scanners is running, so that could not call scanner
// close here.
}
scanner_statistics << "]";
scanner_rows_read << "]";
scanner_wait_worker_time << "]";
scanner_projection << "]";
_scanner_profile->add_info_string("PerScannerRunningTime", scanner_statistics.str());
_scanner_profile->add_info_string("PerScannerRowsRead", scanner_rows_read.str());
_scanner_profile->add_info_string("PerScannerWaitTime", scanner_wait_worker_time.str());
_scanner_profile->add_info_string("PerScannerProjectionTime", scanner_projection.str());
}
}
std::string ScannerContext::debug_string() {
return fmt::format(
"id: {}, total scanners: {}, pending tasks: {},"
" _should_stop: {}, _is_finished: {}, free blocks: {},"
" limit: {}, _num_running_scanners: {}, _max_thread_num: {},"
" _max_bytes_in_queue: {}, query_id: {}",
ctx_id, _all_scanners.size(), _tasks_queue.size(), _should_stop, _is_finished,
_free_blocks.size_approx(), limit, _num_scheduled_scanners, _max_scan_concurrency,
_max_bytes_in_queue, print_id(_query_id));
}
void ScannerContext::_set_scanner_done() {
_dependency->set_always_ready();
}
void ScannerContext::update_peak_running_scanner(int num) {
_local_state->_peak_running_scanner->add(num);
}
int32_t ScannerContext::_get_margin(std::unique_lock<std::mutex>& transfer_lock,
std::unique_lock<std::shared_mutex>& scheduler_lock) {
// margin_1 is used to ensure each scan operator could have at least _min_scan_concurrency scan tasks.
int32_t margin_1 = _min_scan_concurrency -
(cast_set<int32_t>(_tasks_queue.size()) + _num_scheduled_scanners);
// margin_2 is used to ensure the scan scheduler could have at least _min_scan_concurrency_of_scan_scheduler scan tasks.
int32_t margin_2 =
_min_scan_concurrency_of_scan_scheduler -
(_scanner_scheduler->get_active_threads() + _scanner_scheduler->get_queue_size());
if (margin_1 <= 0 && margin_2 <= 0) {
return 0;
}
int32_t margin = std::max(margin_1, margin_2);
if (low_memory_mode()) {
// In low memory mode, we will limit the number of running scanners to `low_memory_mode_scanners()`.
// So that we will not submit too many scan tasks to scheduler.
margin = std::min(low_memory_mode_scanners() - _num_scheduled_scanners, margin);
}
VLOG_DEBUG << fmt::format(
"[{}|{}] schedule scan task, margin_1: {} = {} - ({} + {}), margin_2: {} = {} - "
"({} + {}), margin: {}",
print_id(_query_id), ctx_id, margin_1, _min_scan_concurrency, _tasks_queue.size(),
_num_scheduled_scanners, margin_2, _min_scan_concurrency_of_scan_scheduler,
_scanner_scheduler->get_active_threads(), _scanner_scheduler->get_queue_size(), margin);
return margin;
}
// This function must be called with:
// 1. _transfer_lock held.
// 2. ScannerScheduler::_lock held.
Status ScannerContext::schedule_scan_task(std::shared_ptr<ScanTask> current_scan_task,
std::unique_lock<std::mutex>& transfer_lock,
std::unique_lock<std::shared_mutex>& scheduler_lock) {
if (current_scan_task &&
(!current_scan_task->cached_blocks.empty() || current_scan_task->is_eos())) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler logical error.");
}
std::list<std::shared_ptr<ScanTask>> tasks_to_submit;
int32_t margin = _get_margin(transfer_lock, scheduler_lock);
// margin is less than zero. Means this scan operator could not submit any scan task for now.
if (margin <= 0) {
// Be careful with current scan task.
// We need to add it back to task queue to make sure it could be resubmitted.
if (current_scan_task) {
// This usually happens when we should downgrade the concurrency.
_pending_scanners.push(current_scan_task);
VLOG_DEBUG << fmt::format(
"{} push back scanner to task queue, because diff <= 0, task_queue size "
"{}, _num_scheduled_scanners {}",
ctx_id, _tasks_queue.size(), _num_scheduled_scanners);
}
#ifndef NDEBUG
// This DCHECK is necessary.
// We need to make sure each scan operator could have at least 1 scan tasks.
// Or this scan operator will not be re-scheduled.
if (!_pending_scanners.empty() && _num_scheduled_scanners == 0 && _tasks_queue.empty()) {
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler logical error.");
}
#endif
return Status::OK();
}
bool first_pull = true;
while (margin-- > 0) {
std::shared_ptr<ScanTask> task_to_run;
const int32_t current_concurrency = cast_set<int32_t>(
_tasks_queue.size() + _num_scheduled_scanners + tasks_to_submit.size());
VLOG_DEBUG << fmt::format("{} currenct concurrency: {} = {} + {} + {}", ctx_id,
current_concurrency, _tasks_queue.size(), _num_scheduled_scanners,
tasks_to_submit.size());
if (first_pull) {
task_to_run = _pull_next_scan_task(current_scan_task, current_concurrency);
if (task_to_run == nullptr) {
// In two situations we will get nullptr.
// 1. current_concurrency already reached _max_scan_concurrency.
// 2. all scanners are finished.
if (current_scan_task) {
DCHECK(current_scan_task->cached_blocks.empty());
DCHECK(!current_scan_task->is_eos());
if (!current_scan_task->cached_blocks.empty() || current_scan_task->is_eos()) {
// This should not happen.
throw doris::Exception(ErrorCode::INTERNAL_ERROR,
"Scanner scheduler logical error.");
}
// Current scan task is not eos, but we can not resubmit it.
// Add current_scan_task back to task queue, so that we have chance to resubmit it in the future.
_pending_scanners.push(current_scan_task);
}
}
first_pull = false;
} else {
task_to_run = _pull_next_scan_task(nullptr, current_concurrency);
}
if (task_to_run) {
tasks_to_submit.push_back(task_to_run);
} else {
break;
}
}
if (tasks_to_submit.empty()) {
return Status::OK();
}
VLOG_DEBUG << fmt::format("[{}:{}] submit {} scan tasks to scheduler, remaining scanner: {}",
print_id(_query_id), ctx_id, tasks_to_submit.size(),
_pending_scanners.size());
for (auto& scan_task_iter : tasks_to_submit) {
Status submit_status = submit_scan_task(scan_task_iter, transfer_lock);
if (!submit_status.ok()) {
_process_status = submit_status;
_set_scanner_done();
return _process_status;
}
}
return Status::OK();
}
std::shared_ptr<ScanTask> ScannerContext::_pull_next_scan_task(
std::shared_ptr<ScanTask> current_scan_task, int32_t current_concurrency) {
if (current_concurrency >= _max_scan_concurrency) {
VLOG_DEBUG << fmt::format(
"ScannerContext {} current concurrency {} >= _max_scan_concurrency {}, skip "
"pull",
ctx_id, current_concurrency, _max_scan_concurrency);
return nullptr;
}
if (current_scan_task != nullptr) {
if (!current_scan_task->cached_blocks.empty() || current_scan_task->is_eos()) {
// This should not happen.
throw doris::Exception(ErrorCode::INTERNAL_ERROR, "Scanner scheduler logical error.");
}
return current_scan_task;
}
if (!_pending_scanners.empty()) {
std::shared_ptr<ScanTask> next_scan_task;
next_scan_task = _pending_scanners.top();
_pending_scanners.pop();
return next_scan_task;
} else {
return nullptr;
}
}
bool ScannerContext::low_memory_mode() const {
return _local_state->low_memory_mode();
}
#include "common/compile_check_end.h"
} // namespace doris::vectorized