blob: a9d1226b0d06eedb039aa9fe62f5cf894d046726 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/runtime_query_statistics_mgr.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
#include "util/debug_util.h"
#include "util/time.h"
namespace doris {
void RuntimeQueryStatiticsMgr::register_query_statistics(std::string query_id,
std::shared_ptr<QueryStatistics> qs_ptr,
TNetworkAddress fe_addr) {
std::lock_guard<std::shared_mutex> write_lock(_qs_ctx_map_lock);
if (_query_statistics_ctx_map.find(query_id) == _query_statistics_ctx_map.end()) {
_query_statistics_ctx_map[query_id] = std::make_unique<QueryStatisticsCtx>(fe_addr);
}
_query_statistics_ctx_map.at(query_id)->qs_list.push_back(qs_ptr);
}
void RuntimeQueryStatiticsMgr::report_runtime_query_statistics() {
int64_t be_id = ExecEnv::GetInstance()->master_info()->backend_id;
// 1 get query statistics map
std::map<TNetworkAddress, std::map<std::string, TQueryStatistics>> fe_qs_map;
std::map<std::string, std::pair<bool, bool>> qs_status; // <finished, timeout>
{
std::lock_guard<std::shared_mutex> write_lock(_qs_ctx_map_lock);
int64_t current_time = MonotonicMillis();
int64_t conf_qs_timeout = config::query_statistics_reserve_timeout_ms;
for (auto& [query_id, qs_ctx_ptr] : _query_statistics_ctx_map) {
if (fe_qs_map.find(qs_ctx_ptr->fe_addr) == fe_qs_map.end()) {
std::map<std::string, TQueryStatistics> tmp_map;
fe_qs_map[qs_ctx_ptr->fe_addr] = std::move(tmp_map);
}
QueryStatistics tmp_qs;
for (auto& qs_ptr : qs_ctx_ptr->qs_list) {
tmp_qs.merge(*qs_ptr);
}
TQueryStatistics ret_t_qs;
tmp_qs.to_thrift(&ret_t_qs);
fe_qs_map.at(qs_ctx_ptr->fe_addr)[query_id] = ret_t_qs;
bool is_query_finished = qs_ctx_ptr->is_query_finished;
bool is_timeout_after_finish = false;
if (is_query_finished) {
is_timeout_after_finish =
(current_time - qs_ctx_ptr->query_finish_time) > conf_qs_timeout;
}
qs_status[query_id] = std::make_pair(is_query_finished, is_timeout_after_finish);
}
}
// 2 report query statistics to fe
std::map<TNetworkAddress, bool> rpc_result;
for (auto& [addr, qs_map] : fe_qs_map) {
rpc_result[addr] = false;
// 2.1 get client
Status coord_status;
FrontendServiceConnection coord(ExecEnv::GetInstance()->frontend_client_cache(), addr,
&coord_status);
std::string add_str = PrintThriftNetworkAddress(addr);
if (!coord_status.ok()) {
std::stringstream ss;
LOG(WARNING) << "could not get client " << add_str
<< " when report workload runtime stats, reason is "
<< coord_status.to_string();
continue;
}
// 2.2 send report
TReportWorkloadRuntimeStatusParams report_runtime_params;
report_runtime_params.__set_backend_id(be_id);
report_runtime_params.__set_query_statistics_map(qs_map);
TReportExecStatusParams params;
params.report_workload_runtime_status = report_runtime_params;
TReportExecStatusResult res;
Status rpc_status;
try {
coord->reportExecStatus(res, params);
rpc_result[addr] = true;
} catch (apache::thrift::TApplicationException& e) {
LOG(WARNING) << "fe " << add_str
<< " throw exception when report statistics, reason=" << e.what()
<< " , you can see fe log for details.";
} catch (apache::thrift::transport::TTransportException& e) {
LOG(WARNING) << "report workload runtime statistics to " << add_str
<< " failed, err: " << e.what();
rpc_status = coord.reopen();
if (!rpc_status.ok()) {
LOG(WARNING)
<< "reopen thrift client failed when report workload runtime statistics to"
<< add_str;
} else {
try {
coord->reportExecStatus(res, params);
rpc_result[addr] = true;
} catch (apache::thrift::transport::TTransportException& e2) {
LOG(WARNING) << "retry report workload runtime stats to " << add_str
<< " failed, err: " << e2.what();
}
}
}
}
// 3 when query is finished and (last rpc is send success), remove finished query statistics
if (fe_qs_map.size() == 0) {
return;
}
{
std::lock_guard<std::shared_mutex> write_lock(_qs_ctx_map_lock);
for (auto& [addr, qs_map] : fe_qs_map) {
bool is_rpc_success = rpc_result[addr];
for (auto& [query_id, qs] : qs_map) {
auto& qs_status_pair = qs_status[query_id];
bool is_query_finished = qs_status_pair.first;
bool is_timeout_after_finish = qs_status_pair.second;
if ((is_rpc_success && is_query_finished) || is_timeout_after_finish) {
_query_statistics_ctx_map.erase(query_id);
}
}
}
}
}
void RuntimeQueryStatiticsMgr::set_query_finished(std::string query_id) {
// NOTE: here must be a write lock
std::lock_guard<std::shared_mutex> write_lock(_qs_ctx_map_lock);
// when a query get query_ctx succ, but failed before create node/operator,
// it may not register query statistics, so it can not be mark finish
if (_query_statistics_ctx_map.find(query_id) != _query_statistics_ctx_map.end()) {
auto* qs_ptr = _query_statistics_ctx_map.at(query_id).get();
qs_ptr->is_query_finished = true;
qs_ptr->query_finish_time = MonotonicMillis();
}
}
std::shared_ptr<QueryStatistics> RuntimeQueryStatiticsMgr::get_runtime_query_statistics(
std::string query_id) {
std::shared_lock<std::shared_mutex> read_lock(_qs_ctx_map_lock);
if (_query_statistics_ctx_map.find(query_id) == _query_statistics_ctx_map.end()) {
return nullptr;
}
std::shared_ptr<QueryStatistics> qs_ptr = std::make_shared<QueryStatistics>();
for (auto const& qs : _query_statistics_ctx_map[query_id]->qs_list) {
qs_ptr->merge(*qs);
}
return qs_ptr;
}
} // namespace doris