blob: a0e8ceac4bf48726920412045c82f232ad99e006 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/query_context.h"
#include <fmt/core.h>
#include <gen_cpp/FrontendService_types.h>
#include <gen_cpp/RuntimeProfile_types.h>
#include <gen_cpp/Types_types.h>
#include <glog/logging.h>
#include <algorithm>
#include <exception>
#include <memory>
#include <mutex>
#include <utility>
#include <vector>
#include "common/logging.h"
#include "common/status.h"
#include "olap/olap_common.h"
#include "pipeline/dependency.h"
#include "pipeline/pipeline_fragment_context.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "runtime/memory/heap_profiler.h"
#include "runtime/runtime_query_statistics_mgr.h"
#include "runtime/runtime_state.h"
#include "runtime/thread_context.h"
#include "runtime/workload_group/workload_group_manager.h"
#include "util/mem_info.h"
#include "util/uid_util.h"
#include "vec/spill/spill_stream_manager.h"
namespace doris {
class DelayReleaseToken : public Runnable {
ENABLE_FACTORY_CREATOR(DelayReleaseToken);
public:
DelayReleaseToken(std::unique_ptr<ThreadPoolToken>&& token) { token_ = std::move(token); }
~DelayReleaseToken() override = default;
void run() override {}
std::unique_ptr<ThreadPoolToken> token_;
};
const std::string toString(QuerySource queryType) {
switch (queryType) {
case QuerySource::INTERNAL_FRONTEND:
return "INTERNAL_FRONTEND";
case QuerySource::STREAM_LOAD:
return "STREAM_LOAD";
case QuerySource::GROUP_COMMIT_LOAD:
return "EXTERNAL_QUERY";
case QuerySource::ROUTINE_LOAD:
return "ROUTINE_LOAD";
case QuerySource::EXTERNAL_CONNECTOR:
return "EXTERNAL_CONNECTOR";
default:
return "UNKNOWN";
}
}
std::unique_ptr<TaskController> QueryContext::QueryTaskController::create(QueryContext* query_ctx) {
return QueryContext::QueryTaskController::create_unique(query_ctx->shared_from_this());
}
bool QueryContext::QueryTaskController::is_cancelled() const {
auto query_ctx = query_ctx_.lock();
if (query_ctx == nullptr) {
return true;
}
return query_ctx->is_cancelled();
}
Status QueryContext::QueryTaskController::cancel(const Status& reason, int fragment_id) {
auto query_ctx = query_ctx_.lock();
if (query_ctx == nullptr) {
return Status::InternalError("QueryContext is destroyed");
}
query_ctx->cancel(reason, fragment_id);
return Status::OK();
}
std::unique_ptr<MemoryContext> QueryContext::QueryMemoryContext::create() {
return QueryContext::QueryMemoryContext::create_unique();
}
std::shared_ptr<QueryContext> QueryContext::create(TUniqueId query_id, ExecEnv* exec_env,
const TQueryOptions& query_options,
TNetworkAddress coord_addr, bool is_nereids,
TNetworkAddress current_connect_fe,
QuerySource query_type) {
auto ctx = QueryContext::create_shared(query_id, exec_env, query_options, coord_addr,
is_nereids, current_connect_fe, query_type);
ctx->init_query_task_controller();
return ctx;
}
QueryContext::QueryContext(TUniqueId query_id, ExecEnv* exec_env,
const TQueryOptions& query_options, TNetworkAddress coord_addr,
bool is_nereids, TNetworkAddress current_connect_fe,
QuerySource query_source)
: _timeout_second(-1),
_query_id(std::move(query_id)),
_exec_env(exec_env),
_is_nereids(is_nereids),
_query_options(query_options),
_query_source(query_source) {
_init_resource_context();
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_mem_tracker());
_query_watcher.start();
_shared_hash_table_controller.reset(new vectorized::SharedHashTableController());
_execution_dependency = pipeline::QueryGlobalDependency::create_unique("ExecutionDependency");
_memory_sufficient_dependency =
pipeline::QueryGlobalDependency::create_unique("MemorySufficientDependency", true);
_runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(
TUniqueId(), RuntimeFilterParamsContext::create(this), query_mem_tracker(), true);
_timeout_second = query_options.execution_timeout;
bool is_query_type_valid = query_options.query_type == TQueryType::SELECT ||
query_options.query_type == TQueryType::LOAD ||
query_options.query_type == TQueryType::EXTERNAL;
DCHECK_EQ(is_query_type_valid, true);
this->coord_addr = coord_addr;
// current_connect_fe is used for report query statistics
this->current_connect_fe = current_connect_fe;
// external query has no current_connect_fe
if (query_options.query_type != TQueryType::EXTERNAL) {
bool is_report_fe_addr_valid =
!this->current_connect_fe.hostname.empty() && this->current_connect_fe.port != 0;
DCHECK_EQ(is_report_fe_addr_valid, true);
}
clock_gettime(CLOCK_MONOTONIC, &this->_query_arrival_timestamp);
DorisMetrics::instance()->query_ctx_cnt->increment(1);
}
void QueryContext::_init_query_mem_tracker() {
bool has_query_mem_limit = _query_options.__isset.mem_limit && (_query_options.mem_limit > 0);
int64_t bytes_limit = has_query_mem_limit ? _query_options.mem_limit : -1;
if (bytes_limit > MemInfo::mem_limit() || bytes_limit == -1) {
VLOG_NOTICE << "Query memory limit " << PrettyPrinter::print(bytes_limit, TUnit::BYTES)
<< " exceeds process memory limit of "
<< PrettyPrinter::print(MemInfo::mem_limit(), TUnit::BYTES)
<< " OR is -1. Using process memory limit instead.";
bytes_limit = MemInfo::mem_limit();
}
// If the query is a pure load task(streamload, routine load, group commit), then it should not use
// memlimit per query to limit their memory usage.
if (is_pure_load_task()) {
bytes_limit = MemInfo::mem_limit();
}
std::shared_ptr<MemTrackerLimiter> query_mem_tracker;
if (_query_options.query_type == TQueryType::SELECT) {
query_mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::QUERY, fmt::format("Query#Id={}", print_id(_query_id)),
bytes_limit);
} else if (_query_options.query_type == TQueryType::LOAD) {
query_mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::LOAD, fmt::format("Load#Id={}", print_id(_query_id)),
bytes_limit);
} else if (_query_options.query_type == TQueryType::EXTERNAL) { // spark/flink/etc..
query_mem_tracker = MemTrackerLimiter::create_shared(
MemTrackerLimiter::Type::QUERY, fmt::format("External#Id={}", print_id(_query_id)),
bytes_limit);
} else {
LOG(FATAL) << "__builtin_unreachable";
__builtin_unreachable();
}
if (_query_options.__isset.is_report_success && _query_options.is_report_success) {
query_mem_tracker->enable_print_log_usage();
}
query_mem_tracker->set_enable_reserve_memory(_query_options.__isset.enable_reserve_memory &&
_query_options.enable_reserve_memory);
_user_set_mem_limit = bytes_limit;
_adjusted_mem_limit = bytes_limit;
_resource_ctx->memory_context()->set_mem_tracker(query_mem_tracker);
}
void QueryContext::_init_resource_context() {
_resource_ctx = ResourceContext::create_shared();
_resource_ctx->set_memory_context(QueryContext::QueryMemoryContext::create());
_init_query_mem_tracker();
}
void QueryContext::init_query_task_controller() {
_resource_ctx->set_task_controller(QueryContext::QueryTaskController::create(this));
_resource_ctx->task_controller()->set_task_id(_query_id);
_resource_ctx->task_controller()->set_fe_addr(current_connect_fe);
_resource_ctx->task_controller()->set_query_type(_query_options.query_type);
#ifndef BE_TEST
_exec_env->runtime_query_statistics_mgr()->register_resource_context(print_id(_query_id),
_resource_ctx);
#endif
}
QueryContext::~QueryContext() {
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_mem_tracker());
// query mem tracker consumption is equal to 0, it means that after QueryContext is created,
// it is found that query already exists in _query_ctx_map, and query mem tracker is not used.
// query mem tracker consumption is not equal to 0 after use, because there is memory consumed
// on query mem tracker, released on other trackers.
std::string mem_tracker_msg;
if (query_mem_tracker()->peak_consumption() != 0) {
mem_tracker_msg = fmt::format(
"deregister query/load memory tracker, queryId={}, Limit={}, CurrUsed={}, "
"PeakUsed={}",
print_id(_query_id), MemCounter::print_bytes(query_mem_tracker()->limit()),
MemCounter::print_bytes(query_mem_tracker()->consumption()),
MemCounter::print_bytes(query_mem_tracker()->peak_consumption()));
}
[[maybe_unused]] uint64_t group_id = 0;
if (workload_group()) {
group_id = workload_group()->id(); // before remove
}
_resource_ctx->task_controller()->finish();
if (enable_profile()) {
_report_query_profile();
}
// Not release the the thread token in query context's dector method, because the query
// conext may be dectored in the thread token it self. It is very dangerous and may core.
// And also thread token need shutdown, it may take some time, may cause the thread that
// release the token hang, the thread maybe a pipeline task scheduler thread.
if (_thread_token) {
Status submit_st = ExecEnv::GetInstance()->lazy_release_obj_pool()->submit(
DelayReleaseToken::create_shared(std::move(_thread_token)));
if (!submit_st.ok()) {
LOG(WARNING) << "Failed to release query context thread token, query_id "
<< print_id(_query_id) << ", error status " << submit_st;
}
}
#ifndef BE_TEST
if (ExecEnv::GetInstance()->pipeline_tracer_context()->enabled()) [[unlikely]] {
try {
ExecEnv::GetInstance()->pipeline_tracer_context()->end_query(_query_id, group_id);
} catch (std::exception& e) {
LOG(WARNING) << "Dump trace log failed bacause " << e.what();
}
}
#endif
_runtime_filter_mgr.reset();
_execution_dependency.reset();
_shared_hash_table_controller.reset();
_runtime_predicates.clear();
file_scan_range_params_map.clear();
obj_pool.clear();
_merge_controller_handler.reset();
#ifndef BE_TEST
_exec_env->spill_stream_mgr()->async_cleanup_query(_query_id);
#endif
DorisMetrics::instance()->query_ctx_cnt->increment(-1);
// the only one msg shows query's end. any other msg should append to it if need.
LOG_INFO("Query {} deconstructed, mem_tracker: {}", print_id(this->_query_id), mem_tracker_msg);
}
void QueryContext::set_ready_to_execute(Status reason) {
set_execution_dependency_ready();
_exec_status.update(reason);
if (query_mem_tracker() && !reason.ok()) {
query_mem_tracker()->set_is_query_cancelled(!reason.ok());
}
}
void QueryContext::set_ready_to_execute_only() {
set_execution_dependency_ready();
}
void QueryContext::set_execution_dependency_ready() {
_execution_dependency->set_ready();
}
void QueryContext::set_memory_sufficient(bool sufficient) {
if (sufficient) {
{
_memory_sufficient_dependency->set_ready();
std::lock_guard l(_paused_mutex);
_paused_reason = Status::OK();
}
} else {
_memory_sufficient_dependency->block();
++_paused_count;
}
}
void QueryContext::cancel(Status new_status, int fragment_id) {
if (!_exec_status.update(new_status)) {
return;
}
// Tasks should be always runnable.
_execution_dependency->set_always_ready();
_memory_sufficient_dependency->set_always_ready();
if ((new_status.is<ErrorCode::MEM_LIMIT_EXCEEDED>() ||
new_status.is<ErrorCode::MEM_ALLOC_FAILED>()) &&
_query_options.__isset.dump_heap_profile_when_mem_limit_exceeded &&
_query_options.dump_heap_profile_when_mem_limit_exceeded) {
// if query is cancelled because of query mem limit exceeded, dump heap profile
// at the time of cancellation can get the most accurate memory usage for problem analysis
auto wg = workload_group();
auto log_str = fmt::format(
"Query {} canceled because of memory limit exceeded, dumping memory "
"detail profiles. wg: {}. {}",
print_id(_query_id), wg ? wg->debug_string() : "null",
doris::ProcessProfile::instance()->memory_profile()->process_memory_detail_str());
LOG_LONG_STRING(INFO, log_str);
std::string dot = HeapProfiler::instance()->dump_heap_profile_to_dot();
if (!dot.empty()) {
dot += "\n-------------------------------------------------------\n";
dot += "Copy the text after `digraph` in the above output to "
"http://www.webgraphviz.com to generate a dot graph.\n"
"after start heap profiler, if there is no operation, will print `No nodes "
"to "
"print`."
"If there are many errors: `addr2line: Dwarf Error`,"
"or other FAQ, reference doc: "
"https://doris.apache.org/community/developer-guide/debug-tool/#4-qa\n";
auto log_str =
fmt::format("Query {}, dump heap profile to dot: {}", print_id(_query_id), dot);
LOG_LONG_STRING(INFO, log_str);
}
}
set_ready_to_execute(new_status);
cancel_all_pipeline_context(new_status, fragment_id);
}
void QueryContext::cancel_all_pipeline_context(const Status& reason, int fragment_id) {
std::vector<std::weak_ptr<pipeline::PipelineFragmentContext>> ctx_to_cancel;
{
std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
for (auto& [f_id, f_context] : _fragment_id_to_pipeline_ctx) {
if (fragment_id == f_id) {
continue;
}
ctx_to_cancel.push_back(f_context);
}
}
for (auto& f_context : ctx_to_cancel) {
if (auto pipeline_ctx = f_context.lock()) {
pipeline_ctx->cancel(reason);
}
}
}
std::string QueryContext::print_all_pipeline_context() {
std::vector<std::weak_ptr<pipeline::PipelineFragmentContext>> ctx_to_print;
fmt::memory_buffer debug_string_buffer;
size_t i = 0;
{
fmt::format_to(debug_string_buffer, "{} pipeline fragment contexts in query {}. \n",
_fragment_id_to_pipeline_ctx.size(), print_id(_query_id));
{
std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
for (auto& [f_id, f_context] : _fragment_id_to_pipeline_ctx) {
ctx_to_print.push_back(f_context);
}
}
for (auto& f_context : ctx_to_print) {
if (auto pipeline_ctx = f_context.lock()) {
auto elapsed = pipeline_ctx->elapsed_time() / 1000000000.0;
fmt::format_to(debug_string_buffer,
"No.{} (elapse_second={}s, fragment_id={}) : {}\n", i, elapsed,
pipeline_ctx->get_fragment_id(), pipeline_ctx->debug_string());
i++;
}
}
}
return fmt::to_string(debug_string_buffer);
}
void QueryContext::set_pipeline_context(
const int fragment_id, std::shared_ptr<pipeline::PipelineFragmentContext> pip_ctx) {
std::lock_guard<std::mutex> lock(_pipeline_map_write_lock);
_fragment_id_to_pipeline_ctx.insert({fragment_id, pip_ctx});
}
doris::pipeline::TaskScheduler* QueryContext::get_pipe_exec_scheduler() {
if (workload_group()) {
if (_task_scheduler) {
return _task_scheduler;
}
}
return _exec_env->pipeline_task_scheduler();
}
ThreadPool* QueryContext::get_memtable_flush_pool() {
if (workload_group()) {
return _memtable_flush_pool;
} else {
return nullptr;
}
}
void QueryContext::set_workload_group(WorkloadGroupPtr& wg) {
_resource_ctx->set_workload_group(wg);
// Should add query first, then the workload group will not be deleted.
// see task_group_manager::delete_workload_group_by_ids
workload_group()->add_mem_tracker_limiter(query_mem_tracker());
workload_group()->get_query_scheduler(&_task_scheduler, &_scan_task_scheduler,
&_memtable_flush_pool, &_remote_scan_task_scheduler);
}
void QueryContext::add_fragment_profile(
int fragment_id, const std::vector<std::shared_ptr<TRuntimeProfileTree>>& pipeline_profiles,
std::shared_ptr<TRuntimeProfileTree> load_channel_profile) {
if (pipeline_profiles.empty()) {
std::string msg = fmt::format("Add pipeline profile failed, query {}, fragment {}",
print_id(this->_query_id), fragment_id);
LOG_ERROR(msg);
DCHECK(false) << msg;
return;
}
#ifndef NDEBUG
for (const auto& p : pipeline_profiles) {
DCHECK(p != nullptr) << fmt::format("Add pipeline profile failed, query {}, fragment {}",
print_id(this->_query_id), fragment_id);
}
#endif
std::lock_guard<std::mutex> l(_profile_mutex);
VLOG_ROW << fmt::format(
"Query add fragment profile, query {}, fragment {}, pipeline profile count {} ",
print_id(this->_query_id), fragment_id, pipeline_profiles.size());
_profile_map.insert(std::make_pair(fragment_id, pipeline_profiles));
if (load_channel_profile != nullptr) {
_load_channel_profile_map.insert(std::make_pair(fragment_id, load_channel_profile));
}
}
void QueryContext::_report_query_profile() {
std::lock_guard<std::mutex> lg(_profile_mutex);
for (auto& [fragment_id, fragment_profile] : _profile_map) {
std::shared_ptr<TRuntimeProfileTree> load_channel_profile = nullptr;
if (_load_channel_profile_map.contains(fragment_id)) {
load_channel_profile = _load_channel_profile_map[fragment_id];
}
ExecEnv::GetInstance()->runtime_query_statistics_mgr()->register_fragment_profile(
_query_id, this->coord_addr, fragment_id, fragment_profile, load_channel_profile);
}
ExecEnv::GetInstance()->runtime_query_statistics_mgr()->trigger_report_profile();
}
void QueryContext::get_revocable_info(size_t* revocable_size, size_t* memory_usage,
bool* has_running_task) const {
*revocable_size = 0;
for (auto&& [fragment_id, fragment_wptr] : _fragment_id_to_pipeline_ctx) {
auto fragment_ctx = fragment_wptr.lock();
if (!fragment_ctx) {
continue;
}
*revocable_size += fragment_ctx->get_revocable_size(has_running_task);
// Should wait for all tasks are not running before revoking memory.
if (*has_running_task) {
break;
}
}
*memory_usage = query_mem_tracker()->consumption();
}
size_t QueryContext::get_revocable_size() const {
size_t revocable_size = 0;
for (auto&& [fragment_id, fragment_wptr] : _fragment_id_to_pipeline_ctx) {
auto fragment_ctx = fragment_wptr.lock();
if (!fragment_ctx) {
continue;
}
bool has_running_task = false;
revocable_size += fragment_ctx->get_revocable_size(&has_running_task);
// Should wait for all tasks are not running before revoking memory.
if (has_running_task) {
return 0;
}
}
return revocable_size;
}
Status QueryContext::revoke_memory() {
std::vector<std::pair<size_t, pipeline::PipelineTask*>> tasks;
std::vector<std::shared_ptr<pipeline::PipelineFragmentContext>> fragments;
for (auto&& [fragment_id, fragment_wptr] : _fragment_id_to_pipeline_ctx) {
auto fragment_ctx = fragment_wptr.lock();
if (!fragment_ctx) {
continue;
}
auto tasks_of_fragment = fragment_ctx->get_revocable_tasks();
for (auto* task : tasks_of_fragment) {
tasks.emplace_back(task->get_revocable_size(), task);
}
fragments.emplace_back(std::move(fragment_ctx));
}
std::sort(tasks.begin(), tasks.end(), [](auto&& l, auto&& r) { return l.first > r.first; });
// Do not use memlimit, use current memory usage.
// For example, if current limit is 1.6G, but current used is 1G, if reserve failed
// should free 200MB memory, not 300MB
const auto target_revoking_size = (int64_t)(query_mem_tracker()->consumption() * 0.2);
size_t revoked_size = 0;
size_t total_revokable_size = 0;
std::vector<pipeline::PipelineTask*> chosen_tasks;
for (auto&& [revocable_size, task] : tasks) {
// Only revoke the largest task to ensure memory is used as much as possible
// break;
if (revoked_size < target_revoking_size) {
chosen_tasks.emplace_back(task);
revoked_size += revocable_size;
}
total_revokable_size += revocable_size;
}
std::weak_ptr<QueryContext> this_ctx = shared_from_this();
auto spill_context = std::make_shared<pipeline::SpillContext>(
chosen_tasks.size(), _query_id, [this_ctx](pipeline::SpillContext* context) {
auto query_context = this_ctx.lock();
if (!query_context) {
return;
}
LOG(INFO) << query_context->debug_string() << ", context: " << ((void*)context)
<< " all spill tasks done, resume it.";
query_context->set_memory_sufficient(true);
});
LOG(INFO) << fmt::format(
"{}, spill context: {}, revokable mem: {}/{}, tasks count: {}/{}", this->debug_string(),
((void*)spill_context.get()), PrettyPrinter::print_bytes(revoked_size),
PrettyPrinter::print_bytes(total_revokable_size), chosen_tasks.size(), tasks.size());
for (auto* task : chosen_tasks) {
RETURN_IF_ERROR(task->revoke_memory(spill_context));
}
return Status::OK();
}
void QueryContext::decrease_revoking_tasks_count() {
_revoking_tasks_count.fetch_sub(1);
}
std::vector<pipeline::PipelineTask*> QueryContext::get_revocable_tasks() const {
std::vector<pipeline::PipelineTask*> tasks;
for (auto&& [fragment_id, fragment_wptr] : _fragment_id_to_pipeline_ctx) {
auto fragment_ctx = fragment_wptr.lock();
if (!fragment_ctx) {
continue;
}
auto tasks_of_fragment = fragment_ctx->get_revocable_tasks();
tasks.insert(tasks.end(), tasks_of_fragment.cbegin(), tasks_of_fragment.cend());
}
return tasks;
}
std::string QueryContext::debug_string() {
std::lock_guard l(_paused_mutex);
return fmt::format(
"QueryId={}, Memory [Used={}, Limit={}, Peak={}], Spill[RunningSpillTaskCnt={}, "
"TotalPausedPeriodSecs={}, LatestPausedReason={}]",
print_id(_query_id),
PrettyPrinter::print(query_mem_tracker()->consumption(), TUnit::BYTES),
PrettyPrinter::print(query_mem_tracker()->limit(), TUnit::BYTES),
PrettyPrinter::print(query_mem_tracker()->peak_consumption(), TUnit::BYTES),
_revoking_tasks_count,
_memory_sufficient_dependency->watcher_elapse_time() / NANOS_PER_SEC,
_paused_reason.to_string());
}
std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>>
QueryContext::_collect_realtime_query_profile() const {
std::unordered_map<int, std::vector<std::shared_ptr<TRuntimeProfileTree>>> res;
for (const auto& [fragment_id, fragment_ctx_wptr] : _fragment_id_to_pipeline_ctx) {
if (auto fragment_ctx = fragment_ctx_wptr.lock()) {
if (fragment_ctx == nullptr) {
std::string msg =
fmt::format("PipelineFragmentContext is nullptr, query {} fragment_id: {}",
print_id(_query_id), fragment_id);
LOG_ERROR(msg);
DCHECK(false) << msg;
continue;
}
auto profile = fragment_ctx->collect_realtime_profile();
if (profile.empty()) {
std::string err_msg = fmt::format(
"Get nothing when collecting profile, query {}, fragment_id: {}",
print_id(_query_id), fragment_id);
LOG_ERROR(err_msg);
DCHECK(false) << err_msg;
continue;
}
res.insert(std::make_pair(fragment_id, profile));
}
}
return res;
}
TReportExecStatusParams QueryContext::get_realtime_exec_status() const {
TReportExecStatusParams exec_status;
auto realtime_query_profile = _collect_realtime_query_profile();
std::vector<std::shared_ptr<TRuntimeProfileTree>> load_channel_profiles;
for (auto load_channel_profile : _load_channel_profile_map) {
if (load_channel_profile.second != nullptr) {
load_channel_profiles.push_back(load_channel_profile.second);
}
}
exec_status = RuntimeQueryStatisticsMgr::create_report_exec_status_params(
this->_query_id, std::move(realtime_query_profile), std::move(load_channel_profiles),
/*is_done=*/false);
return exec_status;
}
} // namespace doris