| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #pragma once |
| |
| #include <gen_cpp/PaloInternalService_types.h> |
| #include <gen_cpp/RuntimeProfile_types.h> |
| #include <gen_cpp/Types_types.h> |
| #include <glog/logging.h> |
| |
| #include <atomic> |
| #include <cstdint> |
| #include <memory> |
| #include <mutex> |
| #include <string> |
| #include <unordered_map> |
| |
| #include "common/config.h" |
| #include "common/factory_creator.h" |
| #include "common/object_pool.h" |
| #include "pipeline/dependency.h" |
| #include "runtime/exec_env.h" |
| #include "runtime/memory/mem_tracker_limiter.h" |
| #include "runtime/runtime_filter_mgr.h" |
| #include "runtime/runtime_predicate.h" |
| #include "runtime/workload_management/resource_context.h" |
| #include "util/hash_util.hpp" |
| #include "util/threadpool.h" |
| #include "vec/exec/scan/scanner_scheduler.h" |
| #include "vec/runtime/shared_hash_table_controller.h" |
| #include "workload_group/workload_group.h" |
| |
| namespace doris { |
| |
| namespace pipeline { |
| class PipelineFragmentContext; |
| class PipelineTask; |
| } // namespace pipeline |
| |
| struct ReportStatusRequest { |
| const Status status; |
| std::vector<RuntimeState*> runtime_states; |
| bool done; |
| TNetworkAddress coord_addr; |
| TUniqueId query_id; |
| int fragment_id; |
| TUniqueId fragment_instance_id; |
| int backend_num; |
| RuntimeState* runtime_state; |
| std::function<void(const Status&)> cancel_fn; |
| }; |
| |
| enum class QuerySource { |
| INTERNAL_FRONTEND, |
| STREAM_LOAD, |
| GROUP_COMMIT_LOAD, |
| ROUTINE_LOAD, |
| EXTERNAL_CONNECTOR |
| }; |
| |
| const std::string toString(QuerySource query_source); |
| |
| // Save the common components of fragments in a query. |
| // Some components like DescriptorTbl may be very large |
| // that will slow down each execution of fragments when DeSer them every time. |
| class DescriptorTbl; |
| class QueryContext : public std::enable_shared_from_this<QueryContext> { |
| ENABLE_FACTORY_CREATOR(QueryContext); |
| |
| public: |
| class QueryTaskController : public TaskController { |
| ENABLE_FACTORY_CREATOR(QueryTaskController); |
| |
| public: |
| static std::unique_ptr<TaskController> create(QueryContext* query_ctx); |
| |
| bool is_cancelled() const override; |
| Status cancel(const Status& reason, int fragment_id); |
| Status cancel(const Status& reason) override { return cancel(reason, -1); } |
| |
| private: |
| QueryTaskController(const std::shared_ptr<QueryContext>& query_ctx) |
| : query_ctx_(query_ctx) {} |
| |
| const std::weak_ptr<QueryContext> query_ctx_; |
| }; |
| |
| class QueryMemoryContext : public MemoryContext { |
| ENABLE_FACTORY_CREATOR(QueryMemoryContext); |
| |
| public: |
| static std::unique_ptr<MemoryContext> create(); |
| |
| int64_t revokable_bytes() override { |
| // TODO |
| return 0; |
| } |
| |
| bool ready_do_revoke() override { |
| // TODO |
| return true; |
| } |
| |
| Status revoke(int64_t bytes) override { |
| // TODO |
| return Status::OK(); |
| } |
| |
| Status enter_arbitration(Status reason) override { |
| // TODO, pause the pipeline |
| return Status::OK(); |
| } |
| |
| Status leave_arbitration(Status reason) override { |
| // TODO, start pipeline |
| return Status::OK(); |
| } |
| |
| private: |
| QueryMemoryContext() = default; |
| }; |
| |
| static std::shared_ptr<QueryContext> create(TUniqueId query_id, ExecEnv* exec_env, |
| const TQueryOptions& query_options, |
| TNetworkAddress coord_addr, bool is_nereids, |
| TNetworkAddress current_connect_fe, |
| QuerySource query_type); |
| |
| // use QueryContext::create, cannot be made private because of ENABLE_FACTORY_CREATOR::create_shared. |
| QueryContext(TUniqueId query_id, ExecEnv* exec_env, const TQueryOptions& query_options, |
| TNetworkAddress coord_addr, bool is_nereids, TNetworkAddress current_connect_fe, |
| QuerySource query_type); |
| |
| ~QueryContext(); |
| |
| void init_query_task_controller(); |
| |
| ExecEnv* exec_env() { return _exec_env; } |
| |
| bool is_timeout(timespec now) const { |
| if (_timeout_second <= 0) { |
| return false; |
| } |
| return _query_watcher.elapsed_time_seconds(now) > _timeout_second; |
| } |
| |
| void set_thread_token(int concurrency, bool is_serial) { |
| _thread_token = _exec_env->scanner_scheduler()->new_limited_scan_pool_token( |
| is_serial ? ThreadPool::ExecutionMode::SERIAL |
| : ThreadPool::ExecutionMode::CONCURRENT, |
| concurrency); |
| } |
| |
| ThreadPoolToken* get_token() { return _thread_token.get(); } |
| |
| void set_ready_to_execute(Status reason); |
| |
| [[nodiscard]] bool is_cancelled() const { return !_exec_status.ok(); } |
| |
| void cancel_all_pipeline_context(const Status& reason, int fragment_id = -1); |
| std::string print_all_pipeline_context(); |
| void set_pipeline_context(const int fragment_id, |
| std::shared_ptr<pipeline::PipelineFragmentContext> pip_ctx); |
| void cancel(Status new_status, int fragment_id = -1); |
| |
| [[nodiscard]] Status exec_status() { return _exec_status.status(); } |
| |
| void set_execution_dependency_ready(); |
| |
| void set_memory_sufficient(bool sufficient); |
| |
| void set_ready_to_execute_only(); |
| |
| std::shared_ptr<vectorized::SharedHashTableController> get_shared_hash_table_controller() { |
| return _shared_hash_table_controller; |
| } |
| |
| bool has_runtime_predicate(int source_node_id) { |
| return _runtime_predicates.contains(source_node_id); |
| } |
| |
| vectorized::RuntimePredicate& get_runtime_predicate(int source_node_id) { |
| DCHECK(has_runtime_predicate(source_node_id)); |
| return _runtime_predicates.find(source_node_id)->second; |
| } |
| |
| void init_runtime_predicates(const std::vector<TTopnFilterDesc>& topn_filter_descs) { |
| for (auto desc : topn_filter_descs) { |
| _runtime_predicates.try_emplace(desc.source_node_id, desc); |
| } |
| } |
| |
| void set_workload_group(WorkloadGroupPtr& wg); |
| |
| int execution_timeout() const { |
| return _query_options.__isset.execution_timeout ? _query_options.execution_timeout |
| : _query_options.query_timeout; |
| } |
| |
| int32_t runtime_filter_wait_time_ms() const { |
| return _query_options.runtime_filter_wait_time_ms; |
| } |
| |
| bool runtime_filter_wait_infinitely() const { |
| return _query_options.__isset.runtime_filter_wait_infinitely && |
| _query_options.runtime_filter_wait_infinitely; |
| } |
| |
| int be_exec_version() const { |
| if (!_query_options.__isset.be_exec_version) { |
| return 0; |
| } |
| return _query_options.be_exec_version; |
| } |
| |
| [[nodiscard]] int64_t get_fe_process_uuid() const { |
| return _query_options.__isset.fe_process_uuid ? _query_options.fe_process_uuid : 0; |
| } |
| |
| bool ignore_runtime_filter_error() const { |
| return _query_options.__isset.ignore_runtime_filter_error |
| ? _query_options.ignore_runtime_filter_error |
| : false; |
| } |
| |
| bool enable_force_spill() const { |
| return _query_options.__isset.enable_force_spill && _query_options.enable_force_spill; |
| } |
| |
| // global runtime filter mgr, the runtime filter have remote target or |
| // need local merge should regist here. before publish() or push_to_remote() |
| // the runtime filter should do the local merge work |
| RuntimeFilterMgr* runtime_filter_mgr() { return _runtime_filter_mgr.get(); } |
| |
| TUniqueId query_id() const { return _query_id; } |
| |
| vectorized::SimplifiedScanScheduler* get_scan_scheduler() { return _scan_task_scheduler; } |
| |
| vectorized::SimplifiedScanScheduler* get_remote_scan_scheduler() { |
| return _remote_scan_task_scheduler; |
| } |
| |
| pipeline::Dependency* get_execution_dependency() { return _execution_dependency.get(); } |
| pipeline::Dependency* get_memory_sufficient_dependency() { |
| return _memory_sufficient_dependency.get(); |
| } |
| |
| std::vector<pipeline::PipelineTask*> get_revocable_tasks() const; |
| |
| Status revoke_memory(); |
| |
| doris::pipeline::TaskScheduler* get_pipe_exec_scheduler(); |
| |
| ThreadPool* get_memtable_flush_pool(); |
| |
| void set_merge_controller_handler( |
| std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) { |
| _merge_controller_handler = handler; |
| } |
| |
| bool is_nereids() const { return _is_nereids; } |
| |
| WorkloadGroupPtr workload_group() const { return _resource_ctx->workload_group(); } |
| std::shared_ptr<MemTrackerLimiter> query_mem_tracker() const { |
| return _resource_ctx->memory_context()->mem_tracker(); |
| } |
| |
| void increase_revoking_tasks_count() { _revoking_tasks_count.fetch_add(1); } |
| |
| void decrease_revoking_tasks_count(); |
| |
| int get_revoking_tasks_count() const { return _revoking_tasks_count.load(); } |
| |
| void get_revocable_info(size_t* revocable_size, size_t* memory_usage, |
| bool* has_running_task) const; |
| size_t get_revocable_size() const; |
| |
| // This method is called by workload group manager to set query's memlimit using slot |
| // If user set query limit explicitly, then should use less one |
| void set_mem_limit(int64_t new_mem_limit) { |
| _resource_ctx->memory_context()->mem_tracker()->set_limit(new_mem_limit); |
| } |
| |
| int64_t get_mem_limit() const { |
| return _resource_ctx->memory_context()->mem_tracker()->limit(); |
| } |
| |
| // The new memlimit should be less than user set memlimit. |
| void set_adjusted_mem_limit(int64_t new_mem_limit) { |
| _adjusted_mem_limit = std::min<int64_t>(new_mem_limit, _user_set_mem_limit); |
| } |
| |
| // Expected mem limit is the limit when workload group reached limit. |
| int64_t adjusted_mem_limit() { return _adjusted_mem_limit; } |
| |
| MemTrackerLimiter* get_mem_tracker() { |
| return _resource_ctx->memory_context()->mem_tracker().get(); |
| } |
| |
| int32_t get_slot_count() const { |
| return _query_options.__isset.query_slot_count ? _query_options.query_slot_count : 1; |
| } |
| |
| DescriptorTbl* desc_tbl = nullptr; |
| bool set_rsc_info = false; |
| std::string user; |
| std::string group; |
| TNetworkAddress coord_addr; |
| TNetworkAddress current_connect_fe; |
| TQueryGlobals query_globals; |
| |
| ObjectPool obj_pool; |
| |
| std::shared_ptr<ResourceContext> resource_ctx() { return _resource_ctx; } |
| |
| std::vector<TUniqueId> fragment_instance_ids; |
| |
| // plan node id -> TFileScanRangeParams |
| // only for file scan node |
| std::map<int, TFileScanRangeParams> file_scan_range_params_map; |
| |
| void add_using_brpc_stub(const TNetworkAddress& network_address, |
| std::shared_ptr<PBackendService_Stub> brpc_stub) { |
| if (network_address.port == 0) { |
| return; |
| } |
| std::lock_guard<std::mutex> lock(_brpc_stubs_mutex); |
| if (!_using_brpc_stubs.contains(network_address)) { |
| _using_brpc_stubs.emplace(network_address, brpc_stub); |
| } |
| |
| DCHECK_EQ(_using_brpc_stubs[network_address].get(), brpc_stub.get()); |
| } |
| |
| std::unordered_map<TNetworkAddress, std::shared_ptr<PBackendService_Stub>> |
| get_using_brpc_stubs() { |
| std::lock_guard<std::mutex> lock(_brpc_stubs_mutex); |
| return _using_brpc_stubs; |
| } |
| |
| void set_low_memory_mode() { _low_memory_mode = true; } |
| |
| bool low_memory_mode() { return _low_memory_mode; } |
| |
| void disable_reserve_memory() { _enable_reserve_memory = false; } |
| |
| bool enable_reserve_memory() const { |
| return _query_options.__isset.enable_reserve_memory && |
| _query_options.enable_reserve_memory && _enable_reserve_memory; |
| } |
| |
| void update_paused_reason(const Status& st) { |
| std::lock_guard l(_paused_mutex); |
| if (_paused_reason.is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) { |
| return; |
| } else if (_paused_reason.is<ErrorCode::WORKLOAD_GROUP_MEMORY_EXCEEDED>()) { |
| if (st.is<ErrorCode::QUERY_MEMORY_EXCEEDED>()) { |
| _paused_reason = st; |
| return; |
| } else { |
| return; |
| } |
| } else { |
| _paused_reason = st; |
| } |
| } |
| |
| Status paused_reason() { |
| std::lock_guard l(_paused_mutex); |
| return _paused_reason; |
| } |
| |
| bool is_pure_load_task() { |
| return _query_source == QuerySource::STREAM_LOAD || |
| _query_source == QuerySource::ROUTINE_LOAD || |
| _query_source == QuerySource::GROUP_COMMIT_LOAD; |
| } |
| |
| std::string debug_string(); |
| |
| private: |
| int _timeout_second; |
| TUniqueId _query_id; |
| ExecEnv* _exec_env = nullptr; |
| MonotonicStopWatch _query_watcher; |
| bool _is_nereids = false; |
| |
| std::shared_ptr<ResourceContext> _resource_ctx; |
| |
| std::mutex _revoking_tasks_mutex; |
| std::atomic<int> _revoking_tasks_count = 0; |
| |
| // A token used to submit olap scanner to the "_limited_scan_thread_pool", |
| // This thread pool token is created from "_limited_scan_thread_pool" from exec env. |
| // And will be shared by all instances of this query. |
| // So that we can control the max thread that a query can be used to execute. |
| // If this token is not set, the scanner will be executed in "_scan_thread_pool" in exec env. |
| std::unique_ptr<ThreadPoolToken> _thread_token {nullptr}; |
| |
| void _init_resource_context(); |
| void _init_query_mem_tracker(); |
| |
| std::shared_ptr<vectorized::SharedHashTableController> _shared_hash_table_controller; |
| std::unordered_map<int, vectorized::RuntimePredicate> _runtime_predicates; |
| |
| std::unique_ptr<RuntimeFilterMgr> _runtime_filter_mgr; |
| const TQueryOptions _query_options; |
| |
| // All pipeline tasks use the same query context to report status. So we need a `_exec_status` |
| // to report the real message if failed. |
| AtomicStatus _exec_status; |
| |
| doris::pipeline::TaskScheduler* _task_scheduler = nullptr; |
| vectorized::SimplifiedScanScheduler* _scan_task_scheduler = nullptr; |
| ThreadPool* _memtable_flush_pool = nullptr; |
| vectorized::SimplifiedScanScheduler* _remote_scan_task_scheduler = nullptr; |
| // This dependency indicates if the 2nd phase RPC received from FE. |
| std::unique_ptr<pipeline::Dependency> _execution_dependency; |
| // This dependency indicates if memory is sufficient to execute. |
| std::unique_ptr<pipeline::Dependency> _memory_sufficient_dependency; |
| |
| // This shared ptr is never used. It is just a reference to hold the object. |
| // There is a weak ptr in runtime filter manager to reference this object. |
| std::shared_ptr<RuntimeFilterMergeControllerEntity> _merge_controller_handler; |
| |
| std::map<int, std::weak_ptr<pipeline::PipelineFragmentContext>> _fragment_id_to_pipeline_ctx; |
| std::mutex _pipeline_map_write_lock; |
| |
| std::mutex _paused_mutex; |
| Status _paused_reason; |
| std::atomic<int64_t> _paused_count = 0; |
| std::atomic<bool> _low_memory_mode = false; |
| std::atomic<bool> _enable_reserve_memory = true; |
| int64_t _user_set_mem_limit = 0; |
| std::atomic<int64_t> _adjusted_mem_limit = 0; |
| |
| std::mutex _profile_mutex; |
| timespec _query_arrival_timestamp; |
| // Distinguish the query source, for query that comes from fe, we will have some memory structure on FE to |
| // help us manage the query. |
| QuerySource _query_source; |
| |
| std::mutex _brpc_stubs_mutex; |
| std::unordered_map<TNetworkAddress, std::shared_ptr<PBackendService_Stub>> _using_brpc_stubs; |
| |
| // when fragment of pipeline is closed, it will register its profile to this map by using add_fragment_profile |
| // flatten profile of one fragment: |
| // Pipeline 0 |
| // PipelineTask 0 |
| // Operator 1 |
| // Operator 2 |
| // Scanner |
| // PipelineTask 1 |
| // Operator 1 |
| // Operator 2 |
| // Scanner |
| // Pipeline 1 |
| // PipelineTask 2 |
| // Operator 3 |
| // PipelineTask 3 |
| // Operator 3 |
| // fragment_id -> list<profile> |
| std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>> _profile_map; |
| std::unordered_map<int, std::shared_ptr<TRuntimeProfileTree>> _load_channel_profile_map; |
| |
| void _report_query_profile(); |
| |
| std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>> |
| _collect_realtime_query_profile() const; |
| |
| public: |
| // when fragment of pipeline is closed, it will register its profile to this map by using add_fragment_profile |
| void add_fragment_profile( |
| int fragment_id, |
| const std::vector<std::shared_ptr<TRuntimeProfileTree>>& pipeline_profile, |
| std::shared_ptr<TRuntimeProfileTree> load_channel_profile); |
| |
| TReportExecStatusParams get_realtime_exec_status() const; |
| |
| bool enable_profile() const { |
| return _query_options.__isset.enable_profile && _query_options.enable_profile; |
| } |
| |
| timespec get_query_arrival_timestamp() const { return this->_query_arrival_timestamp; } |
| QuerySource get_query_source() const { return this->_query_source; } |
| }; |
| |
| } // namespace doris |