| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #pragma once |
| |
| #include <gen_cpp/PaloInternalService_types.h> |
| #include <gen_cpp/Types_types.h> |
| |
| #include <atomic> |
| #include <memory> |
| #include <string> |
| |
| #include "common/config.h" |
| #include "common/factory_creator.h" |
| #include "common/object_pool.h" |
| #include "runtime/datetime_value.h" |
| #include "runtime/exec_env.h" |
| #include "runtime/memory/mem_tracker_limiter.h" |
| #include "runtime/query_statistics.h" |
| #include "runtime/runtime_filter_mgr.h" |
| #include "runtime/runtime_predicate.h" |
| #include "runtime/runtime_query_statistics_mgr.h" |
| #include "task_group/task_group.h" |
| #include "util/pretty_printer.h" |
| #include "util/threadpool.h" |
| #include "vec/exec/scan/scanner_scheduler.h" |
| #include "vec/runtime/shared_hash_table_controller.h" |
| #include "vec/runtime/shared_scanner_controller.h" |
| |
| namespace doris { |
| |
| // Save the common components of fragments in a query. |
| // Some components like DescriptorTbl may be very large |
| // that will slow down each execution of fragments when DeSer them every time. |
| class DescriptorTbl; |
| class QueryContext { |
| ENABLE_FACTORY_CREATOR(QueryContext); |
| |
| public: |
| QueryContext(int total_fragment_num, ExecEnv* exec_env, const TQueryOptions& query_options) |
| : fragment_num(total_fragment_num), |
| timeout_second(-1), |
| _exec_env(exec_env), |
| _runtime_filter_mgr(new RuntimeFilterMgr(TUniqueId(), this)), |
| _query_options(query_options) { |
| _start_time = vectorized::VecDateTimeValue::local_time(); |
| _shared_hash_table_controller.reset(new vectorized::SharedHashTableController()); |
| _shared_scanner_controller.reset(new vectorized::SharedScannerController()); |
| } |
| |
| ~QueryContext() { |
| // query mem tracker consumption is equal to 0, it means that after QueryContext is created, |
| // it is found that query already exists in _query_ctx_map, and query mem tracker is not used. |
| // query mem tracker consumption is not equal to 0 after use, because there is memory consumed |
| // on query mem tracker, released on other trackers. |
| if (query_mem_tracker->peak_consumption() != 0) { |
| LOG(INFO) << fmt::format( |
| "Deregister query/load memory tracker, queryId={}, Limit={}, CurrUsed={}, " |
| "PeakUsed={}", |
| print_id(query_id), MemTracker::print_bytes(query_mem_tracker->limit()), |
| MemTracker::print_bytes(query_mem_tracker->consumption()), |
| MemTracker::print_bytes(query_mem_tracker->peak_consumption())); |
| } |
| if (_task_group) { |
| _task_group->remove_mem_tracker_limiter(query_mem_tracker); |
| } |
| if (_exec_env && |
| _exec_env |
| ->runtime_query_statistics_mgr()) { // for BE ut FragmentMgrTest.Normal, Meaningless |
| _exec_env->runtime_query_statistics_mgr()->set_query_finished(print_id(query_id)); |
| } |
| } |
| |
| // Notice. For load fragments, the fragment_num sent by FE has a small probability of 0. |
| // this may be a bug, bug <= 1 in theory it shouldn't cause any problems at this stage. |
| bool countdown() { return fragment_num.fetch_sub(1) <= 1; } |
| |
| ExecEnv* exec_env() { return _exec_env; } |
| |
| bool is_timeout(const vectorized::VecDateTimeValue& now) const { |
| if (timeout_second <= 0) { |
| return false; |
| } |
| if (now.second_diff(_start_time) > timeout_second) { |
| return true; |
| } |
| return false; |
| } |
| |
| void set_thread_token(int concurrency, bool is_serial) { |
| _thread_token = _exec_env->scanner_scheduler()->new_limited_scan_pool_token( |
| is_serial ? ThreadPool::ExecutionMode::SERIAL |
| : ThreadPool::ExecutionMode::CONCURRENT, |
| concurrency); |
| } |
| |
| ThreadPoolToken* get_token() { return _thread_token.get(); } |
| |
| void set_ready_to_execute(bool is_cancelled) { |
| { |
| std::lock_guard<std::mutex> l(_start_lock); |
| _is_cancelled = is_cancelled; |
| _ready_to_execute = true; |
| } |
| if (query_mem_tracker && is_cancelled) { |
| query_mem_tracker->set_is_query_cancelled(is_cancelled); |
| } |
| _start_cond.notify_all(); |
| } |
| void set_ready_to_execute_only() { |
| { |
| std::lock_guard<std::mutex> l(_start_lock); |
| _ready_to_execute = true; |
| } |
| _start_cond.notify_all(); |
| } |
| |
| bool is_ready_to_execute() { |
| std::lock_guard<std::mutex> l(_start_lock); |
| return _ready_to_execute; |
| } |
| |
| bool wait_for_start() { |
| int wait_time = config::max_fragment_start_wait_time_seconds; |
| std::unique_lock<std::mutex> l(_start_lock); |
| while (!_ready_to_execute.load() && !_is_cancelled.load() && --wait_time > 0) { |
| _start_cond.wait_for(l, std::chrono::seconds(1)); |
| } |
| return _ready_to_execute.load() && !_is_cancelled.load(); |
| } |
| |
| std::shared_ptr<vectorized::SharedHashTableController> get_shared_hash_table_controller() { |
| return _shared_hash_table_controller; |
| } |
| |
| std::shared_ptr<vectorized::SharedScannerController> get_shared_scanner_controller() { |
| return _shared_scanner_controller; |
| } |
| |
| vectorized::RuntimePredicate& get_runtime_predicate() { return _runtime_predicate; } |
| |
| void set_task_group(taskgroup::TaskGroupPtr& tg) { _task_group = tg; } |
| |
| taskgroup::TaskGroup* get_task_group() const { return _task_group.get(); } |
| |
| int execution_timeout() const { |
| return _query_options.__isset.execution_timeout ? _query_options.execution_timeout |
| : _query_options.query_timeout; |
| } |
| |
| int32_t runtime_filter_wait_time_ms() const { |
| return _query_options.runtime_filter_wait_time_ms; |
| } |
| |
| bool enable_pipeline_exec() const { |
| return _query_options.__isset.enable_pipeline_engine && |
| _query_options.enable_pipeline_engine; |
| } |
| |
| int be_exec_version() const { |
| if (!_query_options.__isset.be_exec_version) { |
| return 0; |
| } |
| return _query_options.be_exec_version; |
| } |
| |
| RuntimeFilterMgr* runtime_filter_mgr() { return _runtime_filter_mgr.get(); } |
| |
| void register_query_statistics(std::shared_ptr<QueryStatistics> qs) { |
| _exec_env->runtime_query_statistics_mgr()->register_query_statistics(print_id(query_id), qs, |
| current_connect_fe); |
| } |
| |
| std::shared_ptr<QueryStatistics> get_query_statistics() { |
| return _exec_env->runtime_query_statistics_mgr()->get_runtime_query_statistics( |
| print_id(query_id)); |
| } |
| |
| void register_memory_statistics() { |
| if (query_mem_tracker) { |
| std::shared_ptr<QueryStatistics> qs = query_mem_tracker->get_query_statistics(); |
| std::string query_id_str = print_id(query_id); |
| if (qs) { |
| if (_exec_env && |
| _exec_env->runtime_query_statistics_mgr()) { // for ut FragmentMgrTest.normal |
| _exec_env->runtime_query_statistics_mgr()->register_query_statistics( |
| query_id_str, qs, current_connect_fe); |
| } |
| } else { |
| LOG(INFO) << " query " << query_id_str << " get memory query statistics failed "; |
| } |
| } |
| } |
| |
| void register_cpu_statistics() { |
| if (!_cpu_statistics) { |
| _cpu_statistics = std::make_shared<QueryStatistics>(); |
| if (_exec_env && |
| _exec_env->runtime_query_statistics_mgr()) { // for ut FragmentMgrTest.normal |
| _exec_env->runtime_query_statistics_mgr()->register_query_statistics( |
| print_id(query_id), _cpu_statistics, current_connect_fe); |
| } |
| } |
| } |
| |
| std::shared_ptr<QueryStatistics> get_cpu_statistics() { return _cpu_statistics; } |
| |
| public: |
| TUniqueId query_id; |
| DescriptorTbl* desc_tbl; |
| bool set_rsc_info = false; |
| std::string user; |
| std::string group; |
| TNetworkAddress coord_addr; |
| TNetworkAddress current_connect_fe; |
| TQueryGlobals query_globals; |
| |
| /// In the current implementation, for multiple fragments executed by a query on the same BE node, |
| /// we store some common components in QueryContext, and save QueryContext in FragmentMgr. |
| /// When all Fragments are executed, QueryContext needs to be deleted from FragmentMgr. |
| /// Here we use a counter to store the number of Fragments that have not yet been completed, |
| /// and after each Fragment is completed, this value will be reduced by one. |
| /// When the last Fragment is completed, the counter is cleared, and the worker thread of the last Fragment |
| /// will clean up QueryContext. |
| std::atomic<int> fragment_num; |
| int timeout_second; |
| ObjectPool obj_pool; |
| // MemTracker that is shared by all fragment instances running on this host. |
| std::shared_ptr<MemTrackerLimiter> query_mem_tracker; |
| |
| std::vector<TUniqueId> fragment_ids; |
| |
| // plan node id -> TFileScanRangeParams |
| // only for file scan node |
| std::map<int, TFileScanRangeParams> file_scan_range_params_map; |
| |
| private: |
| ExecEnv* _exec_env; |
| vectorized::VecDateTimeValue _start_time; |
| |
| // A token used to submit olap scanner to the "_limited_scan_thread_pool", |
| // This thread pool token is created from "_limited_scan_thread_pool" from exec env. |
| // And will be shared by all instances of this query. |
| // So that we can control the max thread that a query can be used to execute. |
| // If this token is not set, the scanner will be executed in "_scan_thread_pool" in exec env. |
| std::unique_ptr<ThreadPoolToken> _thread_token; |
| |
| std::mutex _start_lock; |
| std::condition_variable _start_cond; |
| // Only valid when _need_wait_execution_trigger is set to true in FragmentExecState. |
| // And all fragments of this query will start execution when this is set to true. |
| std::atomic<bool> _ready_to_execute {false}; |
| std::atomic<bool> _is_cancelled {false}; |
| |
| std::shared_ptr<vectorized::SharedHashTableController> _shared_hash_table_controller; |
| std::shared_ptr<vectorized::SharedScannerController> _shared_scanner_controller; |
| vectorized::RuntimePredicate _runtime_predicate; |
| |
| taskgroup::TaskGroupPtr _task_group; |
| std::unique_ptr<RuntimeFilterMgr> _runtime_filter_mgr; |
| const TQueryOptions _query_options; |
| |
| std::shared_ptr<QueryStatistics> _cpu_statistics = nullptr; |
| }; |
| |
| } // namespace doris |