blob: d6c3793ff09c7314a2c3648c4cfb365bcd3bf05d [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include <bthread/types.h>
#include <stdint.h>
#include <atomic>
#include <cstdint>
#include <list>
#include <memory>
#include <mutex>
#include <stack>
#include <string>
#include <utility>
#include <vector>
#include "common/config.h"
#include "common/factory_creator.h"
#include "common/status.h"
#include "concurrentqueue.h"
#include "util/doris_metrics.h"
#include "util/runtime_profile.h"
#include "vec/core/block.h"
#include "vec/exec/executor/split_runner.h"
#include "vec/exec/scan/scanner.h"
namespace doris {
class RuntimeState;
class TupleDescriptor;
class WorkloadGroup;
namespace pipeline {
class ScanLocalStateBase;
class Dependency;
} // namespace pipeline
namespace vectorized {
class Scanner;
class ScannerDelegate;
class ScannerScheduler;
class ScannerScheduler;
class TaskExecutor;
class TaskHandle;
class ScanTask {
public:
ScanTask(std::weak_ptr<ScannerDelegate> delegate_scanner) : scanner(delegate_scanner) {
_resource_ctx = thread_context()->resource_ctx();
DorisMetrics::instance()->scanner_task_cnt->increment(1);
}
ScanTask(std::shared_ptr<ResourceContext> resource_ctx,
std::weak_ptr<ScannerDelegate> delegate_scanner)
: _resource_ctx(std::move(resource_ctx)), scanner(delegate_scanner) {}
~ScanTask() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_resource_ctx->memory_context()->mem_tracker());
cached_blocks.clear();
DorisMetrics::instance()->scanner_task_cnt->increment(-1);
}
private:
// whether current scanner is finished
bool eos = false;
Status status = Status::OK();
std::shared_ptr<ResourceContext> _resource_ctx;
public:
std::weak_ptr<ScannerDelegate> scanner;
std::list<std::pair<vectorized::BlockUPtr, size_t>> cached_blocks;
bool is_first_schedule = true;
// Use weak_ptr to avoid circular references and potential memory leaks with SplitRunner.
// ScannerContext only needs to observe the lifetime of SplitRunner without owning it.
// When SplitRunner is destroyed, split_runner.lock() will return nullptr, ensuring safe access.
std::weak_ptr<SplitRunner> split_runner;
void set_status(Status _status) {
if (_status.is<ErrorCode::END_OF_FILE>()) {
// set `eos` if `END_OF_FILE`, don't take `END_OF_FILE` as error
eos = true;
}
status = _status;
}
Status get_status() const { return status; }
bool status_ok() { return status.ok() || status.is<ErrorCode::END_OF_FILE>(); }
bool is_eos() const { return eos; }
void set_eos(bool _eos) { eos = _eos; }
};
// ScannerContext is responsible for recording the execution status
// of a group of Scanners corresponding to a ScanNode.
// Including how many scanners are being scheduled, and maintaining
// a producer-consumer blocks queue between scanners and scan nodes.
//
// ScannerContext is also the scheduling unit of ScannerScheduler.
// ScannerScheduler schedules a ScannerContext at a time,
// and submits the Scanners to the scanner thread pool for data scanning.
class ScannerContext : public std::enable_shared_from_this<ScannerContext>,
public HasTaskExecutionCtx {
ENABLE_FACTORY_CREATOR(ScannerContext);
friend class ScannerScheduler;
public:
ScannerContext(RuntimeState* state, pipeline::ScanLocalStateBase* local_state,
const TupleDescriptor* output_tuple_desc,
const RowDescriptor* output_row_descriptor,
const std::list<std::shared_ptr<vectorized::ScannerDelegate>>& scanners,
int64_t limit_, std::shared_ptr<pipeline::Dependency> dependency
#ifdef BE_TEST
,
int num_parallel_instances
#endif
);
~ScannerContext() override;
Status init();
vectorized::BlockUPtr get_free_block(bool force);
void return_free_block(vectorized::BlockUPtr block);
void clear_free_blocks();
inline void inc_block_usage(size_t usage) { _block_memory_usage += usage; }
int64_t block_memory_usage() { return _block_memory_usage; }
// Caller should make sure the pipeline task is still running when calling this function
void update_peak_running_scanner(int num);
// Get next block from blocks queue. Called by ScanNode/ScanOperator
// Set eos to true if there is no more data to read.
Status get_block_from_queue(RuntimeState* state, vectorized::Block* block, bool* eos, int id);
[[nodiscard]] Status validate_block_schema(Block* block);
// submit the running scanner to thread pool in `ScannerScheduler`
// set the next scanned block to `ScanTask::current_block`
// set the error state to `ScanTask::status`
// set the `eos` to `ScanTask::eos` if there is no more data in current scanner
Status submit_scan_task(std::shared_ptr<ScanTask> scan_task, std::unique_lock<std::mutex>&);
// Push back a scan task.
void push_back_scan_task(std::shared_ptr<ScanTask> scan_task);
// Return true if this ScannerContext need no more process
bool done() const { return _is_finished || _should_stop; }
std::string debug_string();
std::shared_ptr<TaskHandle> task_handle() const { return _task_handle; }
std::shared_ptr<ResourceContext> resource_ctx() const { return _resource_ctx; }
RuntimeState* state() { return _state; }
void stop_scanners(RuntimeState* state);
int batch_size() const { return _batch_size; }
// During low memory mode, there will be at most 4 scanners running and every scanner will
// cache at most 1MB data. So that every instance will keep 8MB buffer.
bool low_memory_mode() const;
// TODO(yiguolei) add this as session variable
int32_t low_memory_mode_scan_bytes_per_scanner() const {
return 1 * 1024 * 1024; // 1MB
}
int32_t low_memory_mode_scanners() const { return 4; }
pipeline::ScanLocalStateBase* local_state() const { return _local_state; }
// the unique id of this context
std::string ctx_id;
TUniqueId _query_id;
bool _should_reset_thread_name = true;
int32_t num_scheduled_scanners() {
std::lock_guard<std::mutex> l(_transfer_lock);
return _num_scheduled_scanners;
}
Status schedule_scan_task(std::shared_ptr<ScanTask> current_scan_task,
std::unique_lock<std::mutex>& transfer_lock,
std::unique_lock<std::shared_mutex>& scheduler_lock);
protected:
/// Four criteria to determine whether to increase the parallelism of the scanners
/// 1. It ran for at least `SCALE_UP_DURATION` ms after last scale up
/// 2. Half(`WAIT_BLOCK_DURATION_RATIO`) of the duration is waiting to get blocks
/// 3. `_free_blocks_memory_usage` < `_max_bytes_in_queue`, remains enough memory to scale up
/// 4. At most scale up `MAX_SCALE_UP_RATIO` times to `_max_thread_num`
void _set_scanner_done();
RuntimeState* _state = nullptr;
pipeline::ScanLocalStateBase* _local_state = nullptr;
// the comment of same fields in VScanNode
const TupleDescriptor* _output_tuple_desc = nullptr;
const RowDescriptor* _output_row_descriptor = nullptr;
std::mutex _transfer_lock;
std::list<std::shared_ptr<ScanTask>> _tasks_queue;
Status _process_status = Status::OK();
std::atomic_bool _should_stop = false;
std::atomic_bool _is_finished = false;
// Lazy-allocated blocks for all scanners to share, for memory reuse.
moodycamel::ConcurrentQueue<vectorized::BlockUPtr> _free_blocks;
int _batch_size;
// The limit from SQL's limit clause
int64_t limit;
int64_t _max_bytes_in_queue = 0;
// Using stack so that we can resubmit scanner in a LIFO order, maybe more cache friendly
std::stack<std::shared_ptr<ScanTask>> _pending_scanners;
// Scanner that is submitted to the scheduler.
std::atomic_int _num_scheduled_scanners = 0;
// Scanner that is eos or error.
int32_t _num_finished_scanners = 0;
// weak pointer for _scanners, used in stop function
std::vector<std::weak_ptr<ScannerDelegate>> _all_scanners;
std::shared_ptr<RuntimeProfile> _scanner_profile;
// This counter refers to scan operator's local state
RuntimeProfile::Counter* _scanner_memory_used_counter = nullptr;
RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr;
RuntimeProfile::Counter* _scale_up_scanners_counter = nullptr;
std::shared_ptr<ResourceContext> _resource_ctx;
std::shared_ptr<pipeline::Dependency> _dependency = nullptr;
std::shared_ptr<doris::vectorized::TaskHandle> _task_handle;
std::atomic<int64_t> _block_memory_usage = 0;
// adaptive scan concurrency related
ScannerScheduler* _scanner_scheduler = nullptr;
MOCK_REMOVE(const) int32_t _min_scan_concurrency_of_scan_scheduler = 0;
// The overall target of our system is to make full utilization of the resources.
// At the same time, we dont want too many tasks are queued by scheduler, that is not necessary.
// Each scan operator can submit _max_scan_concurrency scanner to scheduelr if scheduler has enough resource.
// So that for a single query, we can make sure it could make full utilization of the resource.
int32_t _max_scan_concurrency = 0;
MOCK_REMOVE(const) int32_t _min_scan_concurrency = 1;
std::shared_ptr<ScanTask> _pull_next_scan_task(std::shared_ptr<ScanTask> current_scan_task,
int32_t current_concurrency);
int32_t _get_margin(std::unique_lock<std::mutex>& transfer_lock,
std::unique_lock<std::shared_mutex>& scheduler_lock);
// TODO: Add implementation of runtime_info_feed_back
// adaptive scan concurrency related end
};
} // namespace vectorized
} // namespace doris