blob: 47fd9e264bfca298ec80f2866435136e17fc08c9 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "runtime/fragment_mgr.h"
#include <brpc/controller.h>
#include <bvar/latency_recorder.h>
#include <fmt/format.h>
#include <gen_cpp/DorisExternalService_types.h>
#include <gen_cpp/FrontendService.h>
#include <gen_cpp/FrontendService_types.h>
#include <gen_cpp/HeartbeatService_types.h>
#include <gen_cpp/Metrics_types.h>
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/PlanNodes_types.h>
#include <gen_cpp/Planner_types.h>
#include <gen_cpp/QueryPlanExtra_types.h>
#include <gen_cpp/RuntimeProfile_types.h>
#include <gen_cpp/Types_types.h>
#include <gen_cpp/internal_service.pb.h>
#include <pthread.h>
#include <sys/time.h>
#include <thrift/TApplicationException.h>
#include <thrift/Thrift.h>
#include <thrift/protocol/TDebugProtocol.h>
#include <thrift/transport/TTransportException.h>
#include <unistd.h>
#include <algorithm>
#include <cstddef>
#include <ctime>
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
#include <cstdint>
#include <map>
#include <memory>
#include <mutex>
#include <sstream>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include "common/config.h"
#include "common/exception.h"
#include "common/logging.h"
#include "common/object_pool.h"
#include "common/status.h"
#include "common/utils.h"
#include "io/fs/stream_load_pipe.h"
#include "pipeline/pipeline_fragment_context.h"
#include "runtime/client_cache.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "runtime/frontend_info.h"
#include "runtime/primitive_type.h"
#include "runtime/query_context.h"
#include "runtime/runtime_query_statistics_mgr.h"
#include "runtime/runtime_state.h"
#include "runtime/stream_load/new_load_stream_mgr.h"
#include "runtime/stream_load/stream_load_context.h"
#include "runtime/stream_load/stream_load_executor.h"
#include "runtime/thread_context.h"
#include "runtime/types.h"
#include "runtime/workload_group/workload_group.h"
#include "runtime/workload_group/workload_group_manager.h"
#include "runtime_filter/runtime_filter_consumer.h"
#include "runtime_filter/runtime_filter_mgr.h"
#include "service/backend_options.h"
#include "util/brpc_client_cache.h"
#include "util/debug_points.h"
#include "util/debug_util.h"
#include "util/doris_metrics.h"
#include "util/network_util.h"
#include "util/runtime_profile.h"
#include "util/thread.h"
#include "util/threadpool.h"
#include "util/thrift_util.h"
#include "util/uid_util.h"
namespace doris {
#include "common/compile_check_begin.h"
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_instance_count, MetricUnit::NOUNIT);
DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(timeout_canceled_fragment_count, MetricUnit::NOUNIT);
bvar::LatencyRecorder g_fragmentmgr_prepare_latency("doris_FragmentMgr", "prepare");
bvar::Adder<uint64_t> g_fragment_executing_count("fragment_executing_count");
bvar::Status<uint64_t> g_fragment_last_active_time(
"fragment_last_active_time", duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count());
uint64_t get_fragment_executing_count() {
return g_fragment_executing_count.get_value();
}
uint64_t get_fragment_last_active_time() {
return g_fragment_last_active_time.get_value();
}
std::string to_load_error_http_path(const std::string& file_name) {
if (file_name.empty()) {
return "";
}
if (file_name.compare(0, 4, "http") == 0) {
return file_name;
}
std::stringstream url;
url << "http://" << get_host_port(BackendOptions::get_localhost(), config::webserver_port)
<< "/api/_load_error_log?"
<< "file=" << file_name;
return url.str();
}
using apache::thrift::TException;
using apache::thrift::transport::TTransportException;
static Status _do_fetch_running_queries_rpc(const FrontendInfo& fe_info,
std::unordered_set<TUniqueId>& query_set) {
TFetchRunningQueriesResult rpc_result;
TFetchRunningQueriesRequest rpc_request;
Status client_status;
const int32_t timeout_ms = 3 * 1000;
FrontendServiceConnection rpc_client(ExecEnv::GetInstance()->frontend_client_cache(),
fe_info.info.coordinator_address, timeout_ms,
&client_status);
// Abort this fe.
if (!client_status.ok()) {
LOG_WARNING("Failed to get client for {}, reason is {}",
PrintThriftNetworkAddress(fe_info.info.coordinator_address),
client_status.to_string());
return Status::InternalError("Failed to get client for {}, reason is {}",
PrintThriftNetworkAddress(fe_info.info.coordinator_address),
client_status.to_string());
}
// do rpc
try {
try {
rpc_client->fetchRunningQueries(rpc_result, rpc_request);
} catch (const apache::thrift::transport::TTransportException& e) {
LOG_WARNING("Transport exception reason: {}, reopening", e.what());
client_status = rpc_client.reopen(config::thrift_rpc_timeout_ms);
if (!client_status.ok()) {
LOG_WARNING("Reopen failed, reason: {}", client_status.to_string_no_stack());
return Status::InternalError("Reopen failed, reason: {}",
client_status.to_string_no_stack());
}
rpc_client->fetchRunningQueries(rpc_result, rpc_request);
}
} catch (apache::thrift::TException& e) {
// During upgrading cluster or meet any other network error.
LOG_WARNING("Failed to fetch running queries from {}, reason: {}",
PrintThriftNetworkAddress(fe_info.info.coordinator_address), e.what());
return Status::InternalError("Failed to fetch running queries from {}, reason: {}",
PrintThriftNetworkAddress(fe_info.info.coordinator_address),
e.what());
}
// Avoid logic error in frontend.
if (!rpc_result.__isset.status || rpc_result.status.status_code != TStatusCode::OK) {
LOG_WARNING("Failed to fetch running queries from {}, reason: {}",
PrintThriftNetworkAddress(fe_info.info.coordinator_address),
doris::to_string(rpc_result.status.status_code));
return Status::InternalError("Failed to fetch running queries from {}, reason: {}",
PrintThriftNetworkAddress(fe_info.info.coordinator_address),
doris::to_string(rpc_result.status.status_code));
}
if (!rpc_result.__isset.running_queries) {
return Status::InternalError("Failed to fetch running queries from {}, reason: {}",
PrintThriftNetworkAddress(fe_info.info.coordinator_address),
"running_queries is not set");
}
query_set = std::unordered_set<TUniqueId>(rpc_result.running_queries.begin(),
rpc_result.running_queries.end());
return Status::OK();
};
static std::map<int64_t, std::unordered_set<TUniqueId>> _get_all_running_queries_from_fe() {
const std::map<TNetworkAddress, FrontendInfo>& running_fes =
ExecEnv::GetInstance()->get_running_frontends();
std::map<int64_t, std::unordered_set<TUniqueId>> result;
std::vector<FrontendInfo> qualified_fes;
for (const auto& fe : running_fes) {
// Only consider normal frontend.
if (fe.first.port != 0 && fe.second.info.process_uuid != 0) {
qualified_fes.push_back(fe.second);
} else {
return {};
}
}
for (const auto& fe_addr : qualified_fes) {
const int64_t process_uuid = fe_addr.info.process_uuid;
std::unordered_set<TUniqueId> query_set;
Status st = _do_fetch_running_queries_rpc(fe_addr, query_set);
if (!st.ok()) {
// Empty result, cancel worker will not do anything
return {};
}
// frontend_info and process_uuid has been checked in rpc threads.
result[process_uuid] = query_set;
}
return result;
}
inline uint32_t get_map_id(const TUniqueId& query_id, size_t capacity) {
uint32_t value = HashUtil::hash(&query_id.lo, 8, 0);
value = HashUtil::hash(&query_id.hi, 8, value);
return value % capacity;
}
inline uint32_t get_map_id(std::pair<TUniqueId, int> key, size_t capacity) {
uint32_t value = HashUtil::hash(&key.first.lo, 8, 0);
value = HashUtil::hash(&key.first.hi, 8, value);
return value % capacity;
}
template <typename Key, typename Value, typename ValueType>
ConcurrentContextMap<Key, Value, ValueType>::ConcurrentContextMap() {
_internal_map.resize(config::num_query_ctx_map_partitions);
for (size_t i = 0; i < config::num_query_ctx_map_partitions; i++) {
_internal_map[i] = {std::make_unique<std::shared_mutex>(),
phmap::flat_hash_map<Key, Value>()};
}
}
template <typename Key, typename Value, typename ValueType>
Value ConcurrentContextMap<Key, Value, ValueType>::find(const Key& query_id) {
auto id = get_map_id(query_id, _internal_map.size());
{
std::shared_lock lock(*_internal_map[id].first);
auto& map = _internal_map[id].second;
auto search = map.find(query_id);
if (search != map.end()) {
return search->second;
}
return std::shared_ptr<ValueType>(nullptr);
}
}
template <typename Key, typename Value, typename ValueType>
Status ConcurrentContextMap<Key, Value, ValueType>::apply_if_not_exists(
const Key& query_id, std::shared_ptr<ValueType>& query_ctx, ApplyFunction&& function) {
auto id = get_map_id(query_id, _internal_map.size());
{
std::unique_lock lock(*_internal_map[id].first);
auto& map = _internal_map[id].second;
auto search = map.find(query_id);
if (search != map.end()) {
query_ctx = search->second.lock();
}
if (!query_ctx) {
return function(map);
}
return Status::OK();
}
}
template <typename Key, typename Value, typename ValueType>
void ConcurrentContextMap<Key, Value, ValueType>::erase(const Key& query_id) {
auto id = get_map_id(query_id, _internal_map.size());
{
std::unique_lock lock(*_internal_map[id].first);
auto& map = _internal_map[id].second;
map.erase(query_id);
}
}
template <typename Key, typename Value, typename ValueType>
void ConcurrentContextMap<Key, Value, ValueType>::insert(const Key& query_id,
std::shared_ptr<ValueType> query_ctx) {
auto id = get_map_id(query_id, _internal_map.size());
{
std::unique_lock lock(*_internal_map[id].first);
auto& map = _internal_map[id].second;
map.insert({query_id, query_ctx});
}
}
template <typename Key, typename Value, typename ValueType>
void ConcurrentContextMap<Key, Value, ValueType>::clear() {
for (auto& pair : _internal_map) {
std::unique_lock lock(*pair.first);
auto& map = pair.second;
map.clear();
}
}
FragmentMgr::FragmentMgr(ExecEnv* exec_env)
: _exec_env(exec_env), _stop_background_threads_latch(1) {
_entity = DorisMetrics::instance()->metric_registry()->register_entity("FragmentMgr");
INT_UGAUGE_METRIC_REGISTER(_entity, timeout_canceled_fragment_count);
auto s = Thread::create(
"FragmentMgr", "cancel_timeout_plan_fragment", [this]() { this->cancel_worker(); },
&_cancel_thread);
CHECK(s.ok()) << s.to_string();
s = ThreadPoolBuilder("FragmentMgrAsyncWorkThreadPool")
.set_min_threads(config::fragment_mgr_async_work_pool_thread_num_min)
.set_max_threads(config::fragment_mgr_async_work_pool_thread_num_max)
.set_max_queue_size(config::fragment_mgr_async_work_pool_queue_size)
.build(&_thread_pool);
CHECK(s.ok()) << s.to_string();
}
FragmentMgr::~FragmentMgr() = default;
void FragmentMgr::stop() {
DEREGISTER_HOOK_METRIC(fragment_instance_count);
_stop_background_threads_latch.count_down();
if (_cancel_thread) {
_cancel_thread->join();
}
_thread_pool->shutdown();
// Only me can delete
_query_ctx_map.clear();
_pipeline_map.clear();
}
std::string FragmentMgr::to_http_path(const std::string& file_name) {
std::stringstream url;
url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port
<< "/api/_download_load?"
<< "token=" << _exec_env->token() << "&file=" << file_name;
return url.str();
}
Status FragmentMgr::trigger_pipeline_context_report(
const ReportStatusRequest req, std::shared_ptr<pipeline::PipelineFragmentContext>&& ctx) {
return _thread_pool->submit_func([this, req, ctx]() {
SCOPED_ATTACH_TASK(ctx->get_query_ctx()->query_mem_tracker());
coordinator_callback(req);
if (!req.done) {
ctx->refresh_next_report_time();
}
});
}
// There can only be one of these callbacks in-flight at any moment, because
// it is only invoked from the executor's reporting thread.
// Also, the reported status will always reflect the most recent execution status,
// including the final status when execution finishes.
void FragmentMgr::coordinator_callback(const ReportStatusRequest& req) {
DCHECK(req.status.ok() || req.done); // if !status.ok() => done
if (req.coord_addr.hostname == "external") {
// External query (flink/spark read tablets) not need to report to FE.
return;
}
int callback_retries = 10;
const int sleep_ms = 1000;
Status exec_status = req.status;
Status coord_status;
std::unique_ptr<FrontendServiceConnection> coord = nullptr;
do {
coord = std::make_unique<FrontendServiceConnection>(_exec_env->frontend_client_cache(),
req.coord_addr, &coord_status);
if (!coord_status.ok()) {
std::this_thread::sleep_for(std::chrono::milliseconds(sleep_ms));
}
} while (!coord_status.ok() && callback_retries-- > 0);
if (!coord_status.ok()) {
std::stringstream ss;
UniqueId uid(req.query_id.hi, req.query_id.lo);
static_cast<void>(req.cancel_fn(Status::InternalError(
"query_id: {}, couldn't get a client for {}, reason is {}", uid.to_string(),
PrintThriftNetworkAddress(req.coord_addr), coord_status.to_string())));
return;
}
TReportExecStatusParams params;
params.protocol_version = FrontendServiceVersion::V1;
params.__set_query_id(req.query_id);
params.__set_backend_num(req.backend_num);
params.__set_fragment_instance_id(req.fragment_instance_id);
params.__set_fragment_id(req.fragment_id);
params.__set_status(exec_status.to_thrift());
params.__set_done(req.done);
params.__set_query_type(req.runtime_state->query_type());
params.__isset.profile = false;
DCHECK(req.runtime_state != nullptr);
if (req.runtime_state->query_type() == TQueryType::LOAD) {
params.__set_loaded_rows(req.runtime_state->num_rows_load_total());
params.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
} else {
DCHECK(!req.runtime_states.empty());
if (!req.runtime_state->output_files().empty()) {
params.__isset.delta_urls = true;
for (auto& it : req.runtime_state->output_files()) {
params.delta_urls.push_back(to_http_path(it));
}
}
if (!params.delta_urls.empty()) {
params.__isset.delta_urls = true;
}
}
// load rows
static std::string s_dpp_normal_all = "dpp.norm.ALL";
static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL";
static std::string s_unselected_rows = "unselected.rows";
int64_t num_rows_load_success = 0;
int64_t num_rows_load_filtered = 0;
int64_t num_rows_load_unselected = 0;
if (req.runtime_state->num_rows_load_total() > 0 ||
req.runtime_state->num_rows_load_filtered() > 0 ||
req.runtime_state->num_finished_range() > 0) {
params.__isset.load_counters = true;
num_rows_load_success = req.runtime_state->num_rows_load_success();
num_rows_load_filtered = req.runtime_state->num_rows_load_filtered();
num_rows_load_unselected = req.runtime_state->num_rows_load_unselected();
params.__isset.fragment_instance_reports = true;
TFragmentInstanceReport t;
t.__set_fragment_instance_id(req.runtime_state->fragment_instance_id());
t.__set_num_finished_range(cast_set<int>(req.runtime_state->num_finished_range()));
t.__set_loaded_rows(req.runtime_state->num_rows_load_total());
t.__set_loaded_bytes(req.runtime_state->num_bytes_load_total());
params.fragment_instance_reports.push_back(t);
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
if (rs->num_rows_load_total() > 0 || rs->num_rows_load_filtered() > 0 ||
rs->num_finished_range() > 0) {
params.__isset.load_counters = true;
num_rows_load_success += rs->num_rows_load_success();
num_rows_load_filtered += rs->num_rows_load_filtered();
num_rows_load_unselected += rs->num_rows_load_unselected();
params.__isset.fragment_instance_reports = true;
TFragmentInstanceReport t;
t.__set_fragment_instance_id(rs->fragment_instance_id());
t.__set_num_finished_range(cast_set<int>(rs->num_finished_range()));
t.__set_loaded_rows(rs->num_rows_load_total());
t.__set_loaded_bytes(rs->num_bytes_load_total());
params.fragment_instance_reports.push_back(t);
}
}
}
params.load_counters.emplace(s_dpp_normal_all, std::to_string(num_rows_load_success));
params.load_counters.emplace(s_dpp_abnormal_all, std::to_string(num_rows_load_filtered));
params.load_counters.emplace(s_unselected_rows, std::to_string(num_rows_load_unselected));
if (!req.load_error_url.empty()) {
params.__set_tracking_url(req.load_error_url);
}
if (!req.first_error_msg.empty()) {
params.__set_first_error_msg(req.first_error_msg);
}
for (auto* rs : req.runtime_states) {
if (rs->wal_id() > 0) {
params.__set_txn_id(rs->wal_id());
params.__set_label(rs->import_label());
}
}
if (!req.runtime_state->export_output_files().empty()) {
params.__isset.export_files = true;
params.export_files = req.runtime_state->export_output_files();
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
if (!rs->export_output_files().empty()) {
params.__isset.export_files = true;
params.export_files.insert(params.export_files.end(),
rs->export_output_files().begin(),
rs->export_output_files().end());
}
}
}
if (auto tci = req.runtime_state->tablet_commit_infos(); !tci.empty()) {
params.__isset.commitInfos = true;
params.commitInfos.insert(params.commitInfos.end(), tci.begin(), tci.end());
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
if (auto rs_tci = rs->tablet_commit_infos(); !rs_tci.empty()) {
params.__isset.commitInfos = true;
params.commitInfos.insert(params.commitInfos.end(), rs_tci.begin(), rs_tci.end());
}
}
}
if (auto eti = req.runtime_state->error_tablet_infos(); !eti.empty()) {
params.__isset.errorTabletInfos = true;
params.errorTabletInfos.insert(params.errorTabletInfos.end(), eti.begin(), eti.end());
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
if (auto rs_eti = rs->error_tablet_infos(); !rs_eti.empty()) {
params.__isset.errorTabletInfos = true;
params.errorTabletInfos.insert(params.errorTabletInfos.end(), rs_eti.begin(),
rs_eti.end());
}
}
}
if (auto hpu = req.runtime_state->hive_partition_updates(); !hpu.empty()) {
params.__isset.hive_partition_updates = true;
params.hive_partition_updates.insert(params.hive_partition_updates.end(), hpu.begin(),
hpu.end());
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
if (auto rs_hpu = rs->hive_partition_updates(); !rs_hpu.empty()) {
params.__isset.hive_partition_updates = true;
params.hive_partition_updates.insert(params.hive_partition_updates.end(),
rs_hpu.begin(), rs_hpu.end());
}
}
}
if (auto icd = req.runtime_state->iceberg_commit_datas(); !icd.empty()) {
params.__isset.iceberg_commit_datas = true;
params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(), icd.begin(),
icd.end());
} else if (!req.runtime_states.empty()) {
for (auto* rs : req.runtime_states) {
if (auto rs_icd = rs->iceberg_commit_datas(); !rs_icd.empty()) {
params.__isset.iceberg_commit_datas = true;
params.iceberg_commit_datas.insert(params.iceberg_commit_datas.end(),
rs_icd.begin(), rs_icd.end());
}
}
}
// Send new errors to coordinator
req.runtime_state->get_unreported_errors(&(params.error_log));
params.__isset.error_log = (!params.error_log.empty());
if (_exec_env->cluster_info()->backend_id != 0) {
params.__set_backend_id(_exec_env->cluster_info()->backend_id);
}
TReportExecStatusResult res;
Status rpc_status;
VLOG_DEBUG << "reportExecStatus params is "
<< apache::thrift::ThriftDebugString(params).c_str();
if (!exec_status.ok()) {
LOG(WARNING) << "report error status: " << exec_status.msg()
<< " to coordinator: " << req.coord_addr
<< ", query id: " << print_id(req.query_id);
}
try {
try {
(*coord)->reportExecStatus(res, params);
} catch ([[maybe_unused]] TTransportException& e) {
#ifndef ADDRESS_SANITIZER
LOG(WARNING) << "Retrying ReportExecStatus. query id: " << print_id(req.query_id)
<< ", instance id: " << print_id(req.fragment_instance_id) << " to "
<< req.coord_addr << ", err: " << e.what();
#endif
rpc_status = coord->reopen();
if (!rpc_status.ok()) {
// we need to cancel the execution of this fragment
req.cancel_fn(rpc_status);
return;
}
(*coord)->reportExecStatus(res, params);
}
rpc_status = Status::create<false>(res.status);
} catch (TException& e) {
rpc_status = Status::InternalError("ReportExecStatus() to {} failed: {}",
PrintThriftNetworkAddress(req.coord_addr), e.what());
}
if (!rpc_status.ok()) {
LOG_INFO("Going to cancel query {} since report exec status got rpc failed: {}",
print_id(req.query_id), rpc_status.to_string());
// we need to cancel the execution of this fragment
req.cancel_fn(rpc_status);
}
}
static void empty_function(RuntimeState*, Status*) {}
Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
const QuerySource query_source,
const TPipelineFragmentParamsList& parent) {
if (params.txn_conf.need_txn) {
std::shared_ptr<StreamLoadContext> stream_load_ctx =
std::make_shared<StreamLoadContext>(_exec_env);
stream_load_ctx->db = params.txn_conf.db;
stream_load_ctx->db_id = params.txn_conf.db_id;
stream_load_ctx->table = params.txn_conf.tbl;
stream_load_ctx->txn_id = params.txn_conf.txn_id;
stream_load_ctx->id = UniqueId(params.query_id);
stream_load_ctx->put_result.__set_pipeline_params(params);
stream_load_ctx->use_streaming = true;
stream_load_ctx->load_type = TLoadType::MANUL_LOAD;
stream_load_ctx->load_src_type = TLoadSourceType::RAW;
stream_load_ctx->label = params.import_label;
stream_load_ctx->format = TFileFormatType::FORMAT_CSV_PLAIN;
stream_load_ctx->timeout_second = 3600;
stream_load_ctx->auth.token = params.txn_conf.token;
stream_load_ctx->need_commit_self = true;
stream_load_ctx->need_rollback = true;
auto pipe = std::make_shared<io::StreamLoadPipe>(
io::kMaxPipeBufferedBytes /* max_buffered_bytes */, 64 * 1024 /* min_chunk_size */,
-1 /* total_length */, true /* use_proto */);
stream_load_ctx->body_sink = pipe;
stream_load_ctx->pipe = pipe;
stream_load_ctx->max_filter_ratio = params.txn_conf.max_filter_ratio;
RETURN_IF_ERROR(
_exec_env->new_load_stream_mgr()->put(stream_load_ctx->id, stream_load_ctx));
RETURN_IF_ERROR(
_exec_env->stream_load_executor()->execute_plan_fragment(stream_load_ctx, parent));
return Status::OK();
} else {
return exec_plan_fragment(params, query_source, empty_function, parent);
}
}
// Stage 2. prepare finished. then get FE instruction to execute
Status FragmentMgr::start_query_execution(const PExecPlanFragmentStartRequest* request) {
TUniqueId query_id;
query_id.__set_hi(request->query_id().hi());
query_id.__set_lo(request->query_id().lo());
auto q_ctx = get_query_ctx(query_id);
if (q_ctx) {
q_ctx->set_ready_to_execute(Status::OK());
LOG_INFO("Query {} start execution", print_id(query_id));
} else {
return Status::InternalError(
"Failed to get query fragments context. Query {} may be "
"timeout or be cancelled. host: {}",
print_id(query_id), BackendOptions::get_localhost());
}
return Status::OK();
}
void FragmentMgr::remove_pipeline_context(std::pair<TUniqueId, int> key) {
int64_t now = duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
g_fragment_executing_count << -1;
g_fragment_last_active_time.set_value(now);
_pipeline_map.erase(key);
}
void FragmentMgr::remove_query_context(const TUniqueId& key) {
#ifndef BE_TEST
_query_ctx_map.erase(key);
#endif
}
std::shared_ptr<QueryContext> FragmentMgr::get_query_ctx(const TUniqueId& query_id) {
auto val = _query_ctx_map.find(query_id);
if (auto q_ctx = val.lock()) {
return q_ctx;
}
return nullptr;
}
Status FragmentMgr::_get_or_create_query_ctx(const TPipelineFragmentParams& params,
const TPipelineFragmentParamsList& parent,
QuerySource query_source,
std::shared_ptr<QueryContext>& query_ctx) {
auto query_id = params.query_id;
DBUG_EXECUTE_IF("FragmentMgr._get_query_ctx.failed", {
return Status::InternalError("FragmentMgr._get_query_ctx.failed, query id {}",
print_id(query_id));
});
// Find _query_ctx_map, in case some other request has already
// create the query fragments context.
query_ctx = get_query_ctx(query_id);
if (params.is_simplified_param) {
// Get common components from _query_ctx_map
if (!query_ctx) {
return Status::InternalError(
"Failed to get query fragments context. Query {} may be timeout or be "
"cancelled. host: {}",
print_id(query_id), BackendOptions::get_localhost());
}
} else {
if (!query_ctx) {
RETURN_IF_ERROR(_query_ctx_map.apply_if_not_exists(
query_id, query_ctx,
[&](phmap::flat_hash_map<TUniqueId, std::weak_ptr<QueryContext>>& map)
-> Status {
WorkloadGroupPtr workload_group_ptr = nullptr;
std::vector<uint64_t> wg_id_set;
if (params.__isset.workload_groups && !params.workload_groups.empty()) {
for (auto& wg : params.workload_groups) {
wg_id_set.push_back(wg.id);
}
}
workload_group_ptr = _exec_env->workload_group_mgr()->get_group(wg_id_set);
// First time a fragment of a query arrived. print logs.
LOG(INFO) << "query_id: " << print_id(query_id)
<< ", coord_addr: " << params.coord
<< ", total fragment num on current host: "
<< params.fragment_num_on_host
<< ", fe process uuid: " << params.query_options.fe_process_uuid
<< ", query type: " << params.query_options.query_type
<< ", report audit fe:" << params.current_connect_fe
<< ", use wg:" << workload_group_ptr->id() << ","
<< workload_group_ptr->name();
// This may be a first fragment request of the query.
// Create the query fragments context.
query_ctx = QueryContext::create(query_id, _exec_env, params.query_options,
params.coord, params.is_nereids,
params.current_connect_fe, query_source);
SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(query_ctx->query_mem_tracker());
RETURN_IF_ERROR(DescriptorTbl::create(
&(query_ctx->obj_pool), params.desc_tbl, &(query_ctx->desc_tbl)));
// set file scan range params
if (params.__isset.file_scan_params) {
query_ctx->file_scan_range_params_map = params.file_scan_params;
}
query_ctx->query_globals = params.query_globals;
if (params.__isset.resource_info) {
query_ctx->user = params.resource_info.user;
query_ctx->group = params.resource_info.group;
query_ctx->set_rsc_info = true;
}
if (params.__isset.ai_resources) {
query_ctx->set_ai_resources(params.ai_resources);
}
RETURN_IF_ERROR(query_ctx->set_workload_group(workload_group_ptr));
if (parent.__isset.runtime_filter_info) {
auto info = parent.runtime_filter_info;
if (info.__isset.runtime_filter_params) {
auto handler =
std::make_shared<RuntimeFilterMergeControllerEntity>();
RETURN_IF_ERROR(
handler->init(query_ctx, info.runtime_filter_params));
query_ctx->set_merge_controller_handler(handler);
query_ctx->runtime_filter_mgr()->set_runtime_filter_params(
info.runtime_filter_params);
}
if (info.__isset.topn_filter_descs) {
query_ctx->init_runtime_predicates(info.topn_filter_descs);
}
}
// There is some logic in query ctx's dctor, we could not check if exists and delete the
// temp query ctx now. For example, the query id maybe removed from workload group's queryset.
map.insert({query_id, query_ctx});
return Status::OK();
}));
}
}
return Status::OK();
}
std::string FragmentMgr::dump_pipeline_tasks(int64_t duration) {
fmt::memory_buffer debug_string_buffer;
size_t i = 0;
{
fmt::format_to(debug_string_buffer,
"{} pipeline fragment contexts are still running! duration_limit={}\n",
_pipeline_map.num_items(), duration);
timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
_pipeline_map.apply([&](phmap::flat_hash_map<
std::pair<TUniqueId, int>,
std::shared_ptr<pipeline::PipelineFragmentContext>>& map)
-> Status {
std::set<TUniqueId> query_id_set;
for (auto& it : map) {
auto elapsed = it.second->elapsed_time() / 1000000000;
if (elapsed < duration) {
// Only display tasks which has been running for more than {duration} seconds.
continue;
}
if (!query_id_set.contains(it.first.first)) {
query_id_set.insert(it.first.first);
fmt::format_to(
debug_string_buffer, "QueryId: {}, global_runtime_filter_mgr: {}\n",
print_id(it.first.first),
it.second->get_query_ctx()->runtime_filter_mgr()->debug_string());
if (it.second->get_query_ctx()->get_merge_controller_handler()) {
fmt::format_to(debug_string_buffer, "{}\n",
it.second->get_query_ctx()
->get_merge_controller_handler()
->debug_string());
}
}
auto timeout_second = it.second->timeout_second();
fmt::format_to(
debug_string_buffer,
"No.{} (elapse_second={}s, query_timeout_second={}s, is_timeout={}): {}\n",
i, elapsed, timeout_second, it.second->is_timeout(now),
it.second->debug_string());
i++;
}
return Status::OK();
});
}
return fmt::to_string(debug_string_buffer);
}
std::string FragmentMgr::dump_pipeline_tasks(TUniqueId& query_id) {
if (auto q_ctx = get_query_ctx(query_id)) {
return q_ctx->print_all_pipeline_context();
} else {
return fmt::format(
"Dump pipeline tasks failed: Query context (query id = {}) not found. \n",
print_id(query_id));
}
}
Status FragmentMgr::exec_plan_fragment(const TPipelineFragmentParams& params,
QuerySource query_source, const FinishCallback& cb,
const TPipelineFragmentParamsList& parent) {
VLOG_ROW << "Query: " << print_id(params.query_id) << " exec_plan_fragment params is "
<< apache::thrift::ThriftDebugString(params).c_str();
// sometimes TPipelineFragmentParams debug string is too long and glog
// will truncate the log line, so print query options seperately for debuggin purpose
VLOG_ROW << "Query: " << print_id(params.query_id) << "query options is "
<< apache::thrift::ThriftDebugString(params.query_options).c_str();
std::shared_ptr<QueryContext> query_ctx;
RETURN_IF_ERROR(_get_or_create_query_ctx(params, parent, query_source, query_ctx));
SCOPED_ATTACH_TASK(query_ctx.get()->resource_ctx());
int64_t duration_ns = 0;
std::shared_ptr<pipeline::PipelineFragmentContext> context =
std::make_shared<pipeline::PipelineFragmentContext>(
query_ctx->query_id(), params, query_ctx, _exec_env, cb,
[this](const ReportStatusRequest& req, auto&& ctx) {
return this->trigger_pipeline_context_report(req, std::move(ctx));
});
{
SCOPED_RAW_TIMER(&duration_ns);
Status prepare_st = Status::OK();
ASSIGN_STATUS_IF_CATCH_EXCEPTION(prepare_st = context->prepare(_thread_pool.get()),
prepare_st);
DBUG_EXECUTE_IF("FragmentMgr.exec_plan_fragment.prepare_failed", {
prepare_st = Status::Aborted("FragmentMgr.exec_plan_fragment.prepare_failed");
});
if (!prepare_st.ok()) {
query_ctx->cancel(prepare_st, params.fragment_id);
return prepare_st;
}
}
g_fragmentmgr_prepare_latency << (duration_ns / 1000);
DBUG_EXECUTE_IF("FragmentMgr.exec_plan_fragment.failed",
{ return Status::Aborted("FragmentMgr.exec_plan_fragment.failed"); });
{
int64_t now = duration_cast<std::chrono::milliseconds>(
std::chrono::system_clock::now().time_since_epoch())
.count();
g_fragment_executing_count << 1;
g_fragment_last_active_time.set_value(now);
// (query_id, fragment_id) is executed only on one BE, locks _pipeline_map.
auto res = _pipeline_map.find({params.query_id, params.fragment_id});
if (res != nullptr) {
return Status::InternalError(
"exec_plan_fragment query_id({}) input duplicated fragment_id({})",
print_id(params.query_id), params.fragment_id);
}
_pipeline_map.insert({params.query_id, params.fragment_id}, context);
}
if (!params.__isset.need_wait_execution_trigger || !params.need_wait_execution_trigger) {
query_ctx->set_ready_to_execute_only();
}
query_ctx->set_pipeline_context(params.fragment_id, context);
RETURN_IF_ERROR(context->submit());
return Status::OK();
}
void FragmentMgr::cancel_query(const TUniqueId query_id, const Status reason) {
std::shared_ptr<QueryContext> query_ctx = nullptr;
{
if (auto q_ctx = get_query_ctx(query_id)) {
query_ctx = q_ctx;
} else {
LOG(WARNING) << "Query " << print_id(query_id)
<< " does not exists, failed to cancel it";
return;
}
}
query_ctx->cancel(reason);
remove_query_context(query_id);
LOG(INFO) << "Query " << print_id(query_id)
<< " is cancelled and removed. Reason: " << reason.to_string();
}
void FragmentMgr::cancel_worker() {
LOG(INFO) << "FragmentMgr cancel worker start working.";
timespec check_invalid_query_last_timestamp;
clock_gettime(CLOCK_MONOTONIC, &check_invalid_query_last_timestamp);
do {
std::vector<TUniqueId> queries_lost_coordinator;
std::vector<TUniqueId> queries_timeout;
std::vector<TUniqueId> queries_pipeline_task_leak;
// Fe process uuid -> set<QueryId>
std::map<int64_t, std::unordered_set<TUniqueId>> running_queries_on_all_fes;
const std::map<TNetworkAddress, FrontendInfo>& running_fes =
ExecEnv::GetInstance()->get_running_frontends();
timespec now;
clock_gettime(CLOCK_MONOTONIC, &now);
if (config::enable_pipeline_task_leakage_detect &&
now.tv_sec - check_invalid_query_last_timestamp.tv_sec >
config::pipeline_task_leakage_detect_period_secs) {
check_invalid_query_last_timestamp = now;
running_queries_on_all_fes = _get_all_running_queries_from_fe();
} else {
running_queries_on_all_fes.clear();
}
std::vector<std::shared_ptr<pipeline::PipelineFragmentContext>> ctx;
_pipeline_map.apply(
[&](phmap::flat_hash_map<std::pair<TUniqueId, int>,
std::shared_ptr<pipeline::PipelineFragmentContext>>& map)
-> Status {
ctx.reserve(ctx.size() + map.size());
for (auto& pipeline_itr : map) {
ctx.push_back(pipeline_itr.second);
}
return Status::OK();
});
for (auto& c : ctx) {
c->clear_finished_tasks();
}
std::unordered_map<std::shared_ptr<PBackendService_Stub>, BrpcItem> brpc_stub_with_queries;
{
std::vector<std::shared_ptr<QueryContext>> contexts;
_query_ctx_map.apply([&](phmap::flat_hash_map<TUniqueId, std::weak_ptr<QueryContext>>&
map) -> Status {
for (auto it = map.begin(); it != map.end();) {
if (auto q_ctx = it->second.lock()) {
contexts.push_back(q_ctx);
if (q_ctx->is_timeout(now)) {
LOG_WARNING("Query {} is timeout", print_id(it->first));
queries_timeout.push_back(it->first);
} else if (config::enable_brpc_connection_check) {
auto brpc_stubs = q_ctx->get_using_brpc_stubs();
for (auto& item : brpc_stubs) {
if (!brpc_stub_with_queries.contains(item.second)) {
brpc_stub_with_queries.emplace(item.second,
BrpcItem {item.first, {q_ctx}});
} else {
brpc_stub_with_queries[item.second].queries.emplace_back(q_ctx);
}
}
}
++it;
} else {
it = map.erase(it);
}
}
return Status::OK();
});
std::vector<std::shared_ptr<QueryContext>> {}.swap(contexts);
// We use a very conservative cancel strategy.
// 0. If there are no running frontends, do not cancel any queries.
// 1. If query's process uuid is zero, do not cancel
// 2. If same process uuid, do not cancel
// 3. If fe has zero process uuid, do not cancel
if (running_fes.empty() && _query_ctx_map.num_items() != 0) {
LOG_EVERY_N(WARNING, 10)
<< "Could not find any running frontends, maybe we are upgrading or "
"starting? "
<< "We will not cancel any outdated queries in this situation.";
} else {
std::vector<std::shared_ptr<QueryContext>> q_contexts;
_query_ctx_map.apply([&](phmap::flat_hash_map<TUniqueId,
std::weak_ptr<QueryContext>>& map)
-> Status {
for (const auto& it : map) {
if (auto q_ctx = it.second.lock()) {
q_contexts.push_back(q_ctx);
const int64_t fe_process_uuid = q_ctx->get_fe_process_uuid();
if (fe_process_uuid == 0) {
// zero means this query is from a older version fe or
// this fe is starting
continue;
}
// If the query is not running on the any frontends, cancel it.
if (auto itr = running_queries_on_all_fes.find(fe_process_uuid);
itr != running_queries_on_all_fes.end()) {
// Query not found on this frontend, and the query arrives before the last check
if (itr->second.find(it.first) == itr->second.end() &&
// tv_nsec represents the number of nanoseconds that have elapsed since the time point stored in tv_sec.
// tv_sec is enough, we do not need to check tv_nsec.
q_ctx->get_query_arrival_timestamp().tv_sec <
check_invalid_query_last_timestamp.tv_sec &&
q_ctx->get_query_source() == QuerySource::INTERNAL_FRONTEND) {
queries_pipeline_task_leak.push_back(q_ctx->query_id());
LOG_INFO(
"Query {}, type {} is not found on any frontends, "
"maybe it "
"is leaked.",
print_id(q_ctx->query_id()),
toString(q_ctx->get_query_source()));
continue;
}
}
auto itr = running_fes.find(q_ctx->coord_addr);
if (itr != running_fes.end()) {
if (fe_process_uuid == itr->second.info.process_uuid ||
itr->second.info.process_uuid == 0) {
continue;
} else {
LOG_WARNING(
"Coordinator of query {} restarted, going to cancel "
"it.",
print_id(q_ctx->query_id()));
}
} else {
// In some rear cases, the rpc port of follower is not updated in time,
// then the port of this follower will be zero, but acutally it is still running,
// and be has already received the query from follower.
// So we need to check if host is in running_fes.
bool fe_host_is_standing =
std::any_of(running_fes.begin(), running_fes.end(),
[&q_ctx](const auto& fe) {
return fe.first.hostname ==
q_ctx->coord_addr.hostname &&
fe.first.port == 0;
});
if (fe_host_is_standing) {
LOG_WARNING(
"Coordinator {}:{} is not found, but its host is still "
"running with an unstable brpc port, not going to "
"cancel "
"it.",
q_ctx->coord_addr.hostname, q_ctx->coord_addr.port,
print_id(q_ctx->query_id()));
continue;
} else {
LOG_WARNING(
"Could not find target coordinator {}:{} of query {}, "
"going to "
"cancel it.",
q_ctx->coord_addr.hostname, q_ctx->coord_addr.port,
print_id(q_ctx->query_id()));
}
}
}
// Coordinator of this query has already dead or query context has been released.
queries_lost_coordinator.push_back(it.first);
}
return Status::OK();
});
}
}
if (config::enable_brpc_connection_check) {
for (auto it : brpc_stub_with_queries) {
if (!it.first) {
LOG(WARNING) << "brpc stub is nullptr, skip it.";
continue;
}
_check_brpc_available(it.first, it.second);
}
}
if (!queries_lost_coordinator.empty()) {
LOG(INFO) << "There are " << queries_lost_coordinator.size()
<< " queries need to be cancelled, coordinator dead or restarted.";
}
for (const auto& qid : queries_timeout) {
cancel_query(qid,
Status::Error<ErrorCode::TIMEOUT>(
"FragmentMgr cancel worker going to cancel timeout instance "));
}
for (const auto& qid : queries_pipeline_task_leak) {
// Cancel the query, and maybe try to report debug info to fe so that we can
// collect debug info by sql or http api instead of search log.
cancel_query(qid, Status::Error<ErrorCode::ILLEGAL_STATE>(
"Potential pipeline task leakage"));
}
for (const auto& qid : queries_lost_coordinator) {
cancel_query(qid, Status::Error<ErrorCode::CANCELLED>(
"Source frontend is not running or restarted"));
}
} while (!_stop_background_threads_latch.wait_for(
std::chrono::seconds(config::fragment_mgr_cancel_worker_interval_seconds)));
LOG(INFO) << "FragmentMgr cancel worker is going to exit.";
}
void FragmentMgr::_check_brpc_available(const std::shared_ptr<PBackendService_Stub>& brpc_stub,
const BrpcItem& brpc_item) {
const std::string message = "hello doris!";
std::string error_message;
int32_t failed_count = 0;
const int64_t check_timeout_ms =
std::max<int64_t>(100, config::brpc_connection_check_timeout_ms);
while (true) {
PHandShakeRequest request;
request.set_hello(message);
PHandShakeResponse response;
brpc::Controller cntl;
cntl.set_timeout_ms(check_timeout_ms);
cntl.set_max_retry(10);
brpc_stub->hand_shake(&cntl, &request, &response, nullptr);
if (cntl.Failed()) {
error_message = cntl.ErrorText();
LOG(WARNING) << "brpc stub: " << brpc_item.network_address.hostname << ":"
<< brpc_item.network_address.port << " check failed: " << error_message;
} else if (response.has_status() && response.status().status_code() == 0) {
break;
} else {
error_message = response.DebugString();
LOG(WARNING) << "brpc stub: " << brpc_item.network_address.hostname << ":"
<< brpc_item.network_address.port << " check failed: " << error_message;
}
failed_count++;
if (failed_count == 2) {
for (const auto& query_wptr : brpc_item.queries) {
auto query = query_wptr.lock();
if (query && !query->is_cancelled()) {
query->cancel(Status::InternalError("brpc(dest: {}:{}) check failed: {}",
brpc_item.network_address.hostname,
brpc_item.network_address.port,
error_message));
}
}
LOG(WARNING) << "remove brpc stub from cache: " << brpc_item.network_address.hostname
<< ":" << brpc_item.network_address.port << ", error: " << error_message;
ExecEnv::GetInstance()->brpc_internal_client_cache()->erase(
brpc_item.network_address.hostname, brpc_item.network_address.port);
break;
}
}
}
void FragmentMgr::debug(std::stringstream& ss) {}
/*
* 1. resolve opaqued_query_plan to thrift structure
* 2. build TPipelineFragmentParams
*/
Status FragmentMgr::exec_external_plan_fragment(const TScanOpenParams& params,
const TQueryPlanInfo& t_query_plan_info,
const TUniqueId& query_id,
const TUniqueId& fragment_instance_id,
std::vector<TScanColumnDesc>* selected_columns) {
// set up desc tbl
DescriptorTbl* desc_tbl = nullptr;
ObjectPool obj_pool;
Status st = DescriptorTbl::create(&obj_pool, t_query_plan_info.desc_tbl, &desc_tbl);
if (!st.ok()) {
LOG(WARNING) << "open context error: extract DescriptorTbl failure";
std::stringstream msg;
msg << " create DescriptorTbl error, should not be modified after returned Doris FE "
"processed";
return Status::InvalidArgument(msg.str());
}
TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
if (tuple_desc == nullptr) {
LOG(WARNING) << "open context error: extract TupleDescriptor failure";
std::stringstream msg;
msg << " get TupleDescriptor error, should not be modified after returned Doris FE "
"processed";
return Status::InvalidArgument(msg.str());
}
// process selected columns form slots
for (const SlotDescriptor* slot : tuple_desc->slots()) {
TScanColumnDesc col;
col.__set_name(slot->col_name());
col.__set_type(to_thrift(slot->type()->get_primitive_type()));
selected_columns->emplace_back(std::move(col));
}
VLOG_QUERY << "BackendService execute open() TQueryPlanInfo: "
<< apache::thrift::ThriftDebugString(t_query_plan_info);
// assign the param used to execute PlanFragment
TPipelineFragmentParams exec_fragment_params;
exec_fragment_params.protocol_version = (PaloInternalServiceVersion::type)0;
exec_fragment_params.__set_is_simplified_param(false);
exec_fragment_params.__set_fragment(t_query_plan_info.plan_fragment);
exec_fragment_params.__set_desc_tbl(t_query_plan_info.desc_tbl);
// assign the param used for executing of PlanFragment-self
TPipelineInstanceParams fragment_exec_params;
exec_fragment_params.query_id = query_id;
fragment_exec_params.fragment_instance_id = fragment_instance_id;
exec_fragment_params.coord.hostname = "external";
std::map<::doris::TPlanNodeId, std::vector<TScanRangeParams>> per_node_scan_ranges;
std::vector<TScanRangeParams> scan_ranges;
std::vector<int64_t> tablet_ids = params.tablet_ids;
TNetworkAddress address;
address.hostname = BackendOptions::get_localhost();
address.port = doris::config::be_port;
std::map<int64_t, TTabletVersionInfo> tablet_info = t_query_plan_info.tablet_info;
for (auto tablet_id : params.tablet_ids) {
TPaloScanRange scan_range;
scan_range.db_name = params.database;
scan_range.table_name = params.table;
auto iter = tablet_info.find(tablet_id);
if (iter != tablet_info.end()) {
TTabletVersionInfo info = iter->second;
scan_range.tablet_id = tablet_id;
scan_range.version = std::to_string(info.version);
// Useless but it is required field in TPaloScanRange
scan_range.version_hash = "0";
scan_range.schema_hash = std::to_string(info.schema_hash);
scan_range.hosts.push_back(address);
} else {
std::stringstream msg;
msg << "tablet_id: " << tablet_id << " not found";
LOG(WARNING) << "tablet_id [ " << tablet_id << " ] not found";
return Status::NotFound(msg.str());
}
TScanRange doris_scan_range;
doris_scan_range.__set_palo_scan_range(scan_range);
TScanRangeParams scan_range_params;
scan_range_params.scan_range = doris_scan_range;
scan_ranges.push_back(scan_range_params);
}
per_node_scan_ranges.insert(std::make_pair((::doris::TPlanNodeId)0, scan_ranges));
fragment_exec_params.per_node_scan_ranges = per_node_scan_ranges;
exec_fragment_params.local_params.push_back(fragment_exec_params);
TQueryOptions query_options;
query_options.batch_size = params.batch_size;
query_options.execution_timeout = params.execution_timeout;
query_options.mem_limit = params.mem_limit;
query_options.query_type = TQueryType::EXTERNAL;
query_options.be_exec_version = BeExecVersionManager::get_newest_version();
exec_fragment_params.__set_query_options(query_options);
VLOG_ROW << "external exec_plan_fragment params is "
<< apache::thrift::ThriftDebugString(exec_fragment_params).c_str();
TPipelineFragmentParamsList mocked;
return exec_plan_fragment(exec_fragment_params, QuerySource::EXTERNAL_CONNECTOR, mocked);
}
Status FragmentMgr::apply_filterv2(const PPublishFilterRequestV2* request,
butil::IOBufAsZeroCopyInputStream* attach_data) {
UniqueId queryid = request->query_id();
TUniqueId query_id;
query_id.__set_hi(queryid.hi);
query_id.__set_lo(queryid.lo);
if (auto q_ctx = get_query_ctx(query_id)) {
SCOPED_ATTACH_TASK(q_ctx.get());
RuntimeFilterMgr* runtime_filter_mgr = q_ctx->runtime_filter_mgr();
DCHECK(runtime_filter_mgr != nullptr);
// 1. get the target filters
std::vector<std::shared_ptr<RuntimeFilterConsumer>> filters =
runtime_filter_mgr->get_consume_filters(request->filter_id());
// 2. create the filter wrapper to replace or ignore/disable the target filters
if (!filters.empty()) {
RETURN_IF_ERROR(filters[0]->assign(*request, attach_data));
std::ranges::for_each(filters, [&](auto& filter) { filter->signal(filters[0].get()); });
}
}
return Status::OK();
}
Status FragmentMgr::send_filter_size(const PSendFilterSizeRequest* request) {
UniqueId queryid = request->query_id();
TUniqueId query_id;
query_id.__set_hi(queryid.hi);
query_id.__set_lo(queryid.lo);
if (config::enable_debug_points &&
DebugPoints::instance()->is_enable("FragmentMgr::send_filter_size.return_eof")) {
return Status::EndOfFile("inject FragmentMgr::send_filter_size.return_eof");
}
if (auto q_ctx = get_query_ctx(query_id)) {
return q_ctx->get_merge_controller_handler()->send_filter_size(q_ctx, request);
} else {
return Status::EndOfFile(
"Send filter size failed: Query context (query-id: {}) not found, maybe "
"finished",
queryid.to_string());
}
}
Status FragmentMgr::sync_filter_size(const PSyncFilterSizeRequest* request) {
UniqueId queryid = request->query_id();
TUniqueId query_id;
query_id.__set_hi(queryid.hi);
query_id.__set_lo(queryid.lo);
if (auto q_ctx = get_query_ctx(query_id)) {
try {
return q_ctx->runtime_filter_mgr()->sync_filter_size(request);
} catch (const Exception& e) {
return Status::InternalError(
"Sync filter size failed: Query context (query-id: {}) error: {}",
queryid.to_string(), e.what());
}
} else {
return Status::EndOfFile(
"Sync filter size failed: Query context (query-id: {}) already finished",
queryid.to_string());
}
}
Status FragmentMgr::merge_filter(const PMergeFilterRequest* request,
butil::IOBufAsZeroCopyInputStream* attach_data) {
UniqueId queryid = request->query_id();
TUniqueId query_id;
query_id.__set_hi(queryid.hi);
query_id.__set_lo(queryid.lo);
if (auto q_ctx = get_query_ctx(query_id)) {
SCOPED_ATTACH_TASK(q_ctx.get());
if (!q_ctx->get_merge_controller_handler()) {
return Status::InternalError("Merge filter failed: Merge controller handler is null");
}
return q_ctx->get_merge_controller_handler()->merge(q_ctx, request, attach_data);
} else {
return Status::EndOfFile(
"Merge filter size failed: Query context (query-id: {}) already finished",
queryid.to_string());
}
}
void FragmentMgr::get_runtime_query_info(
std::vector<std::weak_ptr<ResourceContext>>* _resource_ctx_list) {
std::vector<std::shared_ptr<QueryContext>> contexts;
_query_ctx_map.apply(
[&](phmap::flat_hash_map<TUniqueId, std::weak_ptr<QueryContext>>& map) -> Status {
for (auto iter = map.begin(); iter != map.end();) {
if (auto q_ctx = iter->second.lock()) {
_resource_ctx_list->push_back(q_ctx->resource_ctx());
contexts.push_back(q_ctx);
iter++;
} else {
iter = map.erase(iter);
}
}
return Status::OK();
});
}
Status FragmentMgr::get_realtime_exec_status(const TUniqueId& query_id,
TReportExecStatusParams* exec_status) {
if (exec_status == nullptr) {
return Status::InvalidArgument("exes_status is nullptr");
}
std::shared_ptr<QueryContext> query_context = get_query_ctx(query_id);
if (query_context == nullptr) {
return Status::NotFound("Query {} not found or released", print_id(query_id));
}
*exec_status = query_context->get_realtime_exec_status();
return Status::OK();
}
Status FragmentMgr::get_query_statistics(const TUniqueId& query_id, TQueryStatistics* query_stats) {
if (query_stats == nullptr) {
return Status::InvalidArgument("query_stats is nullptr");
}
return ExecEnv::GetInstance()->runtime_query_statistics_mgr()->get_query_statistics(
print_id(query_id), query_stats);
}
#include "common/compile_check_end.h"
} // namespace doris