| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "runtime/runtime_query_statistics_mgr.h" |
| |
| #include <gen_cpp/FrontendService_types.h> |
| #include <gen_cpp/RuntimeProfile_types.h> |
| #include <gen_cpp/Status_types.h> |
| #include <gen_cpp/Types_types.h> |
| #include <thrift/TApplicationException.h> |
| |
| #include <condition_variable> |
| #include <cstdint> |
| #include <memory> |
| #include <mutex> |
| #include <shared_mutex> |
| #include <string> |
| #include <tuple> |
| #include <unordered_map> |
| #include <vector> |
| |
| #include "common/logging.h" |
| #include "common/status.h" |
| #include "exec/schema_scanner/schema_scanner_helper.h" |
| #include "runtime/client_cache.h" |
| #include "runtime/exec_env.h" |
| #include "util/debug_util.h" |
| #include "util/threadpool.h" |
| #include "util/thrift_client.h" |
| #include "util/time.h" |
| #include "util/uid_util.h" |
| #include "vec/core/block.h" |
| |
| namespace doris { |
| // TODO: Currently this function is only used to report profile. |
| // In the future, all exec status and query statistics should be reported |
| // thorough this function. |
| static Status _do_report_exec_stats_rpc(const TNetworkAddress& coor_addr, |
| const TReportExecStatusParams& req, |
| TReportExecStatusResult& res) { |
| Status client_status; |
| FrontendServiceConnection rpc_client(ExecEnv::GetInstance()->frontend_client_cache(), coor_addr, |
| config::thrift_rpc_timeout_ms, &client_status); |
| if (!client_status.ok()) { |
| LOG_WARNING( |
| "Could not get client rpc client of {} when reporting profiles, reason is {}, " |
| "not reporting, profile will be lost", |
| PrintThriftNetworkAddress(coor_addr), client_status.to_string()); |
| return Status::RpcError("Client rpc client failed"); |
| } |
| |
| VLOG_DEBUG << "Sending profile"; |
| |
| try { |
| try { |
| rpc_client->reportExecStatus(res, req); |
| } catch (const apache::thrift::transport::TTransportException& e) { |
| #ifndef ADDRESS_SANITIZER |
| LOG_WARNING("Transport exception from {}, reason: {}, reopening", |
| PrintThriftNetworkAddress(coor_addr), e.what()); |
| client_status = rpc_client.reopen(config::thrift_rpc_timeout_ms); |
| if (!client_status.ok()) { |
| LOG_WARNING("Reopen failed, reason: {}", client_status.to_string()); |
| return Status::RpcError("Open rpc client failed"); |
| } |
| |
| rpc_client->reportExecStatus(res, req); |
| #else |
| return Status::RpcError("Transport exception when report query profile, {}", e.what()); |
| #endif |
| } |
| } catch (apache::thrift::TApplicationException& e) { |
| if (e.getType() == e.UNKNOWN_METHOD) { |
| LOG_WARNING( |
| "Failed to report query profile to {} due to {}, usually because the frontend " |
| "is not upgraded, check the version", |
| PrintThriftNetworkAddress(coor_addr), e.what()); |
| } else { |
| LOG_WARNING( |
| "Failed to report query profile to {}, reason: {}, you can see fe log for " |
| "details.", |
| PrintThriftNetworkAddress(coor_addr), e.what()); |
| } |
| return Status::RpcError("Send stats failed"); |
| } catch (apache::thrift::TException& e) { |
| LOG_WARNING("Failed to report query profile to {}, reason: {} ", |
| PrintThriftNetworkAddress(coor_addr), e.what()); |
| std::this_thread::sleep_for( |
| std::chrono::milliseconds(config::thrift_client_retry_interval_ms * 2)); |
| // just reopen to disable this connection |
| static_cast<void>(rpc_client.reopen(config::thrift_rpc_timeout_ms)); |
| return Status::RpcError("Transport exception when report query profile"); |
| } catch (std::exception& e) { |
| LOG_WARNING( |
| "Failed to report query profile to {}, reason: {}, you can see fe log for details.", |
| PrintThriftNetworkAddress(coor_addr), e.what()); |
| return Status::RpcError("Send report query profile failed"); |
| } |
| |
| return Status::OK(); |
| } |
| |
| static void _report_query_profiles_function( |
| std::unordered_map< |
| TUniqueId, |
| std::tuple< |
| TNetworkAddress, |
| std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>>> |
| profile_copy, |
| std::unordered_map<std::pair<TUniqueId, int32_t>, std::shared_ptr<TRuntimeProfileTree>> |
| load_channel_profile_copy) { |
| // query_id -> {coordinator_addr, {fragment_id -> std::vectpr<pipeline_profile>}} |
| for (auto& entry : profile_copy) { |
| const auto& query_id = entry.first; |
| const auto& coor_addr = std::get<0>(entry.second); |
| auto& fragment_profile_map = std::get<1>(entry.second); |
| |
| if (fragment_profile_map.empty()) { |
| auto msg = fmt::format("Query {} does not have profile", print_id(query_id)); |
| DCHECK(false) << msg; |
| LOG_ERROR(msg); |
| continue; |
| } |
| |
| std::vector<std::shared_ptr<TRuntimeProfileTree>> load_channel_profiles; |
| for (auto load_channel_profile : load_channel_profile_copy) { |
| if (load_channel_profile.second == nullptr) { |
| auto msg = fmt::format( |
| "Register fragment profile {} {} failed, load channel profile is null", |
| print_id(query_id), -1); |
| DCHECK(false) << msg; |
| LOG_ERROR(msg); |
| continue; |
| } |
| |
| load_channel_profiles.push_back(load_channel_profile.second); |
| } |
| |
| TReportExecStatusParams req = RuntimeQueryStatisticsMgr::create_report_exec_status_params( |
| query_id, std::move(fragment_profile_map), std::move(load_channel_profiles), |
| /*is_done=*/true); |
| TReportExecStatusResult res; |
| |
| auto rpc_status = _do_report_exec_stats_rpc(coor_addr, req, res); |
| |
| if (res.status.status_code != TStatusCode::OK || !rpc_status.ok()) { |
| LOG_WARNING("Query {} send profile to {} failed", print_id(query_id), |
| PrintThriftNetworkAddress(coor_addr)); |
| } else { |
| LOG_INFO("Send {} profile succeed", print_id(query_id)); |
| } |
| } |
| } |
| |
| TReportExecStatusParams RuntimeQueryStatisticsMgr::create_report_exec_status_params( |
| const TUniqueId& query_id, |
| std::unordered_map<int32_t, std::vector<std::shared_ptr<TRuntimeProfileTree>>> |
| fragment_id_to_profile, |
| std::vector<std::shared_ptr<TRuntimeProfileTree>> load_channel_profiles, bool is_done) { |
| // This function will clear the data of fragment_id_to_profile and load_channel_profiles. |
| TQueryProfile profile; |
| profile.__set_query_id(query_id); |
| |
| std::map<int32_t, std::vector<TDetailedReportParams>> fragment_id_to_profile_req; |
| |
| for (const auto& entry : fragment_id_to_profile) { |
| int32_t fragment_id = entry.first; |
| const std::vector<std::shared_ptr<TRuntimeProfileTree>>& fragment_profile = entry.second; |
| std::vector<TDetailedReportParams> detailed_params; |
| bool is_first = true; |
| for (auto pipeline_profile : fragment_profile) { |
| if (pipeline_profile == nullptr) { |
| auto msg = fmt::format("Register fragment profile {} {} failed, profile is null", |
| print_id(query_id), fragment_id); |
| DCHECK(false) << msg; |
| LOG_ERROR(msg); |
| continue; |
| } |
| |
| TDetailedReportParams tmp; |
| THRIFT_MOVE_VALUES(tmp, profile, *pipeline_profile); |
| // First profile is fragment level |
| tmp.__set_is_fragment_level(is_first); |
| is_first = false; |
| // tmp.fragment_instance_id is not needed for pipeline x |
| detailed_params.push_back(std::move(tmp)); |
| } |
| |
| fragment_id_to_profile_req[fragment_id] = std::move(detailed_params); |
| } |
| |
| if (fragment_id_to_profile_req.empty()) { |
| LOG_WARNING("No fragment profile found for query {}", print_id(query_id)); |
| } |
| |
| profile.__set_fragment_id_to_profile(fragment_id_to_profile_req); |
| |
| std::vector<TRuntimeProfileTree> load_channel_profiles_req; |
| for (auto load_channel_profile : load_channel_profiles) { |
| if (load_channel_profile == nullptr) { |
| auto msg = fmt::format( |
| "Register fragment profile {} {} failed, load channel profile is null", |
| print_id(query_id), -1); |
| DCHECK(false) << msg; |
| LOG_ERROR(msg); |
| continue; |
| } |
| |
| load_channel_profiles_req.push_back(std::move(*load_channel_profile)); |
| } |
| |
| if (!load_channel_profiles_req.empty()) { |
| THRIFT_MOVE_VALUES(profile, load_channel_profiles, load_channel_profiles_req); |
| } |
| |
| TReportExecStatusParams req; |
| THRIFT_MOVE_VALUES(req, query_profile, profile); |
| req.__set_backend_id(ExecEnv::GetInstance()->cluster_info()->backend_id); |
| // invalid query id to avoid API compatibility during upgrade |
| req.__set_query_id(TUniqueId()); |
| req.__set_done(is_done); |
| |
| return req; |
| } |
| |
| Status RuntimeQueryStatisticsMgr::start_report_thread() { |
| if (started.load()) { |
| DCHECK(false) << "report thread has been started"; |
| LOG_ERROR("report thread has been started"); |
| return Status::InternalError("Report thread has been started"); |
| } |
| |
| started.store(true); |
| ThreadPoolBuilder profile_report_thread_pool_builder("ReportProfileThreadPool"); |
| |
| return profile_report_thread_pool_builder.set_max_threads(config::report_exec_status_thread_num) |
| .build(&_thread_pool); |
| } |
| |
| // 1. lock the profile_map. |
| // 2. copy the profile_map and load_channel_profile_map to local variables. |
| // 3. unlock the profile_map. |
| // 4. create a profile reporting task and add it to the thread pool. |
| void RuntimeQueryStatisticsMgr::trigger_profile_reporting() { |
| decltype(_profile_map) profile_copy; |
| decltype(_load_channel_profile_map) load_channel_profile_copy; |
| |
| { |
| std::unique_lock<std::mutex> lg(_profile_map_lock); |
| _profile_map.swap(profile_copy); |
| _load_channel_profile_map.swap(load_channel_profile_copy); |
| } |
| |
| // ATTN: Local variables are copied to avoid memory reclamation issues. |
| auto st = _thread_pool->submit_func([profile_copy, load_channel_profile_copy]() { |
| _report_query_profiles_function(profile_copy, load_channel_profile_copy); |
| }); |
| |
| if (!st.ok()) { |
| LOG_WARNING("Failed to submit profile reporting task, reason: {}", st.to_string()); |
| // If the thread pool is full, we will not report the profile. |
| // The profile will be lost. |
| return; |
| } |
| } |
| |
| void RuntimeQueryStatisticsMgr::stop_report_thread() { |
| if (!started) { |
| return; |
| } |
| |
| LOG_INFO("All report threads are going to stop"); |
| _thread_pool->shutdown(); |
| LOG_INFO("All report threads stopped"); |
| } |
| |
| void RuntimeQueryStatisticsMgr::register_fragment_profile( |
| const TUniqueId& query_id, const TNetworkAddress& coor_addr, int32_t fragment_id, |
| std::vector<std::shared_ptr<TRuntimeProfileTree>> p_profiles, |
| std::shared_ptr<TRuntimeProfileTree> load_channel_profile) { |
| for (const auto& p : p_profiles) { |
| if (p == nullptr) { |
| auto msg = fmt::format("Register fragment profile {} {} failed, profile is null", |
| print_id(query_id), fragment_id); |
| DCHECK(false) << msg; |
| LOG_ERROR(msg); |
| return; |
| } |
| } |
| |
| std::unique_lock<std::mutex> lg(_profile_map_lock); |
| |
| if (!_profile_map.contains(query_id)) { |
| _profile_map[query_id] = std::make_tuple( |
| coor_addr, |
| std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>()); |
| } |
| |
| std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>& |
| fragment_profile_map = std::get<1>(_profile_map[query_id]); |
| fragment_profile_map.insert(std::make_pair(fragment_id, p_profiles)); |
| |
| if (load_channel_profile != nullptr) { |
| _load_channel_profile_map[std::make_pair(query_id, fragment_id)] = load_channel_profile; |
| } |
| |
| LOG_INFO("register x profile done {}, fragment {}, profiles {}", print_id(query_id), |
| fragment_id, p_profiles.size()); |
| } |
| |
| void RuntimeQueryStatisticsMgr::register_resource_context( |
| std::string query_id, std::shared_ptr<ResourceContext> resource_ctx) { |
| std::lock_guard<std::shared_mutex> write_lock(_resource_contexts_map_lock); |
| // Note: `group_commit_insert` will use the same `query_id` to submit multiple load tasks in sequence. |
| // After the previous load task ends but QueryStatistics has not been reported to FE, |
| // if the next load task with the same `query_id` starts to execute, `register_resource_context` will |
| // find that `query_id` already exists in _resource_contexts_map. |
| // At this time, directly overwriting the `resource_ctx` corresponding to the `query_id` |
| // in `register_resource_context` will cause the previous load task not to be reported to FE. |
| // DCHECK(_resource_contexts_map.find(query_id) == _resource_contexts_map.end()); |
| _resource_contexts_map[query_id] = resource_ctx; |
| } |
| |
| void RuntimeQueryStatisticsMgr::report_runtime_query_statistics() { |
| int64_t be_id = ExecEnv::GetInstance()->cluster_info()->backend_id; |
| // 1 get query statistics map |
| // <fe_addr, <query_id, <query_statistics, is_query_finished>>> |
| std::map<TNetworkAddress, std::map<std::string, std::pair<TQueryStatistics, bool>>> fe_qs_map; |
| std::map<std::string, std::pair<bool, bool>> qs_status; // <finished, timeout> |
| { |
| std::lock_guard<std::shared_mutex> write_lock(_resource_contexts_map_lock); |
| int64_t current_time = MonotonicMillis(); |
| int64_t conf_qs_timeout = config::query_statistics_reserve_timeout_ms; |
| |
| for (auto iter = _resource_contexts_map.begin(); iter != _resource_contexts_map.end();) { |
| std::string query_id = iter->first; |
| auto resource_ctx = iter->second; |
| bool is_query_finished = resource_ctx->task_controller()->is_finished(); |
| bool is_timeout_after_finish = false; |
| if (is_query_finished) { |
| is_timeout_after_finish = |
| (current_time - resource_ctx->task_controller()->finish_time()) > |
| conf_qs_timeout; |
| } |
| |
| // external query not need to report to FE, so we can remove it directly. |
| if (resource_ctx->task_controller()->query_type() == TQueryType::EXTERNAL && |
| is_query_finished) { |
| iter = _resource_contexts_map.erase(iter); |
| } else { |
| if (resource_ctx->task_controller()->query_type() != TQueryType::EXTERNAL) { |
| if (fe_qs_map.find(resource_ctx->task_controller()->fe_addr()) == |
| fe_qs_map.end()) { |
| std::map<std::string, std::pair<TQueryStatistics, bool>> tmp_map; |
| fe_qs_map[resource_ctx->task_controller()->fe_addr()] = std::move(tmp_map); |
| } |
| |
| TQueryStatistics ret_t_qs; |
| resource_ctx->to_thrift_query_statistics(&ret_t_qs); |
| fe_qs_map.at(resource_ctx->task_controller()->fe_addr())[query_id] = |
| std::make_pair(ret_t_qs, is_query_finished); |
| qs_status[query_id] = |
| std::make_pair(is_query_finished, is_timeout_after_finish); |
| } |
| |
| iter++; |
| } |
| } |
| } |
| |
| // 2 report query statistics to fe |
| std::map<TNetworkAddress, bool> rpc_result; |
| for (auto& [addr, qs_map] : fe_qs_map) { |
| rpc_result[addr] = false; |
| // 2.1 get client |
| Status coord_status; |
| FrontendServiceConnection coord(ExecEnv::GetInstance()->frontend_client_cache(), addr, |
| config::thrift_rpc_timeout_ms, &coord_status); |
| std::string add_str = PrintThriftNetworkAddress(addr); |
| if (!coord_status.ok()) { |
| std::stringstream ss; |
| LOG(WARNING) << "[report_query_statistics]could not get client " << add_str |
| << " when report workload runtime stats, reason:" |
| << coord_status.to_string(); |
| continue; |
| } |
| |
| auto reopen_coord = [&coord]() -> Status { |
| std::this_thread::sleep_for( |
| std::chrono::milliseconds(config::thrift_client_retry_interval_ms * 2)); |
| // just reopen to disable this connection |
| return coord.reopen(config::thrift_rpc_timeout_ms); |
| }; |
| |
| // 2.2 send report |
| TReportWorkloadRuntimeStatusParams report_runtime_params; |
| report_runtime_params.__set_backend_id(be_id); |
| |
| // Build the query statistics map with TQueryStatisticsResult |
| std::map<std::string, TQueryStatisticsResult> query_stats_result_map; |
| for (const auto& [query_id, query_stats_pair] : qs_map) { |
| TQueryStatisticsResult stats_result; |
| stats_result.__set_statistics(query_stats_pair.first); // TQueryStatistics |
| stats_result.__set_query_finished(query_stats_pair.second); // is_query_finished |
| query_stats_result_map[query_id] = stats_result; |
| } |
| |
| report_runtime_params.__set_query_statistics_result_map(query_stats_result_map); |
| |
| TReportExecStatusParams params; |
| params.__set_report_workload_runtime_status(report_runtime_params); |
| |
| TReportExecStatusResult res; |
| Status rpc_status; |
| |
| try { |
| try { |
| coord->reportExecStatus(res, params); |
| rpc_result[addr] = true; |
| } catch (apache::thrift::transport::TTransportException& e) { |
| rpc_status = reopen_coord(); |
| #ifndef ADDRESS_SANITIZER |
| LOG_WARNING( |
| "[report_query_statistics] report to fe {} failed, reason:{}, try reopen.", |
| add_str, e.what()); |
| #else |
| std::cerr << "thrift error, reason=" << e.what(); |
| #endif |
| if (rpc_status.ok()) { |
| coord->reportExecStatus(res, params); |
| rpc_result[addr] = true; |
| } |
| } |
| } catch (apache::thrift::TApplicationException& e) { |
| LOG_WARNING( |
| "[report_query_statistics]fe {} throw exception when report statistics, " |
| "reason:{}, you can see fe log for details.", |
| add_str, e.what()); |
| rpc_status = reopen_coord(); |
| } catch (apache::thrift::TException& e) { |
| LOG_WARNING( |
| "[report_query_statistics]report workload runtime statistics to {} failed, " |
| "reason: {}", |
| add_str, e.what()); |
| rpc_status = reopen_coord(); |
| } catch (std::exception& e) { |
| LOG_WARNING( |
| "[report_query_statistics]unknown exception when report workload runtime " |
| "statistics to {}, reason:{}. ", |
| add_str, e.what()); |
| } |
| |
| if (!rpc_status.ok()) { |
| LOG_WARNING( |
| "[report_query_statistics]reopen thrift client failed when report " |
| "workload runtime statistics to {}, reason: {}", |
| add_str, rpc_status.to_string()); |
| } |
| } |
| |
| // 3 when query is finished and (last rpc is send success), remove finished query statistics |
| if (fe_qs_map.empty()) { |
| return; |
| } |
| |
| { |
| std::lock_guard<std::shared_mutex> write_lock(_resource_contexts_map_lock); |
| for (auto& [addr, qs_map] : fe_qs_map) { |
| bool is_rpc_success = rpc_result[addr]; |
| for (auto& [query_id, qs] : qs_map) { |
| auto& qs_status_pair = qs_status[query_id]; |
| bool is_query_finished = qs_status_pair.first; |
| bool is_timeout_after_finish = qs_status_pair.second; |
| if ((is_rpc_success && is_query_finished) || is_timeout_after_finish) { |
| _resource_contexts_map.erase(query_id); |
| } |
| } |
| } |
| } |
| } |
| |
| void RuntimeQueryStatisticsMgr::get_active_be_tasks_block(vectorized::Block* block) { |
| std::shared_lock<std::shared_mutex> read_lock(_resource_contexts_map_lock); |
| int64_t be_id = ExecEnv::GetInstance()->cluster_info()->backend_id; |
| |
| // block's schema come from SchemaBackendActiveTasksScanner::_s_tbls_columns |
| for (auto& [query_id, resource_ctx] : _resource_contexts_map) { |
| TQueryStatistics tqs; |
| resource_ctx->to_thrift_query_statistics(&tqs); |
| SchemaScannerHelper::insert_int64_value(0, be_id, block); |
| SchemaScannerHelper::insert_string_value( |
| 1, resource_ctx->task_controller()->fe_addr().hostname, block); |
| auto wg = resource_ctx->workload_group(); |
| SchemaScannerHelper::insert_int64_value(2, wg ? wg->id() : -1, block); |
| SchemaScannerHelper::insert_string_value(3, query_id, block); |
| |
| int64_t task_time = |
| resource_ctx->task_controller()->is_finished() |
| ? resource_ctx->task_controller()->finish_time() - |
| resource_ctx->task_controller()->start_time() |
| : MonotonicMillis() - resource_ctx->task_controller()->start_time(); |
| SchemaScannerHelper::insert_int64_value(4, task_time, block); |
| SchemaScannerHelper::insert_int64_value(5, tqs.cpu_ms, block); |
| SchemaScannerHelper::insert_int64_value(6, tqs.scan_rows, block); |
| SchemaScannerHelper::insert_int64_value(7, tqs.scan_bytes, block); |
| SchemaScannerHelper::insert_int64_value(8, tqs.max_peak_memory_bytes, block); |
| SchemaScannerHelper::insert_int64_value(9, tqs.current_used_memory_bytes, block); |
| SchemaScannerHelper::insert_int64_value(10, tqs.shuffle_send_bytes, block); |
| SchemaScannerHelper::insert_int64_value(11, tqs.shuffle_send_rows, block); |
| |
| std::stringstream ss; |
| ss << resource_ctx->task_controller()->query_type(); |
| SchemaScannerHelper::insert_string_value(12, ss.str(), block); |
| SchemaScannerHelper::insert_int64_value(13, tqs.spill_write_bytes_to_local_storage, block); |
| SchemaScannerHelper::insert_int64_value(14, tqs.spill_read_bytes_from_local_storage, block); |
| } |
| } |
| |
| Status RuntimeQueryStatisticsMgr::get_query_statistics(const std::string& query_id, |
| TQueryStatistics* query_stats) { |
| std::shared_lock<std::shared_mutex> read_lock(_resource_contexts_map_lock); |
| |
| auto resource_ctx = _resource_contexts_map.find(query_id); |
| if (resource_ctx == _resource_contexts_map.end()) { |
| return Status::InternalError("failed to find query with id {}", query_id); |
| } |
| |
| resource_ctx->second->to_thrift_query_statistics(query_stats); |
| return Status::OK(); |
| } |
| |
| void RuntimeQueryStatisticsMgr::get_tasks_resource_context( |
| std::vector<std::shared_ptr<ResourceContext>>& resource_ctxs) { |
| std::shared_lock<std::shared_mutex> read_lock(_resource_contexts_map_lock); |
| for (auto& iter : _resource_contexts_map) { |
| resource_ctxs.push_back(iter.second); |
| } |
| } |
| |
| } // namespace doris |