| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "runtime/fragment_mgr.h" |
| |
| #include <bvar/latency_recorder.h> |
| #include <gperftools/profiler.h> |
| #include <thrift/protocol/TDebugProtocol.h> |
| |
| #include <memory> |
| #include <sstream> |
| |
| #include "agent/cgroups_mgr.h" |
| #include "common/object_pool.h" |
| #include "gen_cpp/FrontendService.h" |
| #include "gen_cpp/PaloInternalService_types.h" |
| #include "gen_cpp/PlanNodes_types.h" |
| #include "gen_cpp/QueryPlanExtra_types.h" |
| #include "gen_cpp/Types_types.h" |
| #include "gutil/strings/substitute.h" |
| #include "opentelemetry/trace/scope.h" |
| #include "runtime/client_cache.h" |
| #include "runtime/datetime_value.h" |
| #include "runtime/descriptors.h" |
| #include "runtime/exec_env.h" |
| #include "runtime/plan_fragment_executor.h" |
| #include "runtime/runtime_filter_mgr.h" |
| #include "runtime/stream_load/load_stream_mgr.h" |
| #include "runtime/stream_load/stream_load_context.h" |
| #include "runtime/stream_load/stream_load_pipe.h" |
| #include "runtime/thread_context.h" |
| #include "service/backend_options.h" |
| #include "util/debug_util.h" |
| #include "util/doris_metrics.h" |
| #include "util/stopwatch.hpp" |
| #include "util/telemetry/telemetry.h" |
| #include "util/threadpool.h" |
| #include "util/thrift_util.h" |
| #include "util/uid_util.h" |
| #include "util/url_coding.h" |
| |
| namespace doris { |
| |
| DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(plan_fragment_count, MetricUnit::NOUNIT); |
| DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(timeout_canceled_fragment_count, MetricUnit::NOUNIT); |
| DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(fragment_thread_pool_queue_size, MetricUnit::NOUNIT); |
| bvar::LatencyRecorder g_fragmentmgr_prepare_latency("doris_FragmentMgr", "prepare"); |
| |
| std::string to_load_error_http_path(const std::string& file_name) { |
| if (file_name.empty()) { |
| return ""; |
| } |
| std::stringstream url; |
| url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port |
| << "/api/_load_error_log?" |
| << "file=" << file_name; |
| return url.str(); |
| } |
| |
| using apache::thrift::TException; |
| using apache::thrift::TProcessor; |
| using apache::thrift::transport::TTransportException; |
| |
| class RuntimeProfile; |
| class FragmentExecState { |
| public: |
| // Constructor by using QueryFragmentsCtx |
| FragmentExecState(const TUniqueId& query_id, const TUniqueId& instance_id, int backend_num, |
| ExecEnv* exec_env, std::shared_ptr<QueryFragmentsCtx> fragments_ctx); |
| |
| FragmentExecState(const TUniqueId& query_id, const TUniqueId& instance_id, int backend_num, |
| ExecEnv* exec_env, const TNetworkAddress& coord_addr); |
| |
| Status prepare(const TExecPlanFragmentParams& params); |
| |
| // just no use now |
| void callback(const Status& status, RuntimeProfile* profile, bool done); |
| |
| std::string to_http_path(const std::string& file_name); |
| |
| Status execute(); |
| |
| Status cancel(const PPlanFragmentCancelReason& reason, const std::string& msg = ""); |
| bool is_canceled() { return _cancelled; } |
| |
| TUniqueId fragment_instance_id() const { return _fragment_instance_id; } |
| |
| TUniqueId query_id() const { return _query_id; } |
| |
| PlanFragmentExecutor* executor() { return &_executor; } |
| |
| const DateTimeValue& start_time() const { return _start_time; } |
| |
| void set_merge_controller_handler( |
| std::shared_ptr<RuntimeFilterMergeControllerEntity>& handler) { |
| _merge_controller_handler = handler; |
| } |
| |
| // Update status of this fragment execute |
| Status update_status(Status status) { |
| std::lock_guard<std::mutex> l(_status_lock); |
| if (!status.ok() && _exec_status.ok()) { |
| _exec_status = status; |
| } |
| return _exec_status; |
| } |
| |
| void set_group(const TResourceInfo& info) { |
| _set_rsc_info = true; |
| _user = info.user; |
| _group = info.group; |
| } |
| |
| bool is_timeout(const DateTimeValue& now) const { |
| if (_timeout_second <= 0) { |
| return false; |
| } |
| if (now.second_diff(_start_time) > _timeout_second) { |
| return true; |
| } |
| return false; |
| } |
| |
| int get_timeout_second() const { return _timeout_second; } |
| |
| std::shared_ptr<QueryFragmentsCtx> get_fragments_ctx() { return _fragments_ctx; } |
| |
| void set_pipe(std::shared_ptr<StreamLoadPipe> pipe) { _pipe = pipe; } |
| std::shared_ptr<StreamLoadPipe> get_pipe() const { return _pipe; } |
| |
| void set_need_wait_execution_trigger() { _need_wait_execution_trigger = true; } |
| |
| private: |
| void coordinator_callback(const Status& status, RuntimeProfile* profile, bool done); |
| |
| // This context is shared by all fragments of this host in a query |
| // |
| // NOTE: _fragments_ctx should be declared at the beginning of members so that |
| // it's destructed last. |
| // Because Objects create in _fragments_ctx.obj_pool maybe used during |
| // destruction, if _fragments_ctx is destructed too early, it will coredump. |
| std::shared_ptr<QueryFragmentsCtx> _fragments_ctx; |
| |
| // Id of this query |
| TUniqueId _query_id; |
| // Id of this instance |
| TUniqueId _fragment_instance_id; |
| // Used to report to coordinator which backend is over |
| int _backend_num; |
| ExecEnv* _exec_env; |
| TNetworkAddress _coord_addr; |
| |
| PlanFragmentExecutor _executor; |
| DateTimeValue _start_time; |
| |
| std::mutex _status_lock; |
| Status _exec_status; |
| |
| bool _set_rsc_info = false; |
| std::string _user; |
| std::string _group; |
| |
| int _timeout_second; |
| std::atomic<bool> _cancelled {false}; |
| |
| std::shared_ptr<RuntimeFilterMergeControllerEntity> _merge_controller_handler; |
| // The pipe for data transfering, such as insert. |
| std::shared_ptr<StreamLoadPipe> _pipe; |
| |
| // If set the true, this plan fragment will be executed only after FE send execution start rpc. |
| bool _need_wait_execution_trigger = false; |
| }; |
| |
| FragmentExecState::FragmentExecState(const TUniqueId& query_id, |
| const TUniqueId& fragment_instance_id, int backend_num, |
| ExecEnv* exec_env, |
| std::shared_ptr<QueryFragmentsCtx> fragments_ctx) |
| : _fragments_ctx(std::move(fragments_ctx)), |
| _query_id(query_id), |
| _fragment_instance_id(fragment_instance_id), |
| _backend_num(backend_num), |
| _exec_env(exec_env), |
| _executor(exec_env, std::bind<void>(std::mem_fn(&FragmentExecState::coordinator_callback), |
| this, std::placeholders::_1, std::placeholders::_2, |
| std::placeholders::_3)), |
| _set_rsc_info(false), |
| _timeout_second(-1) { |
| _start_time = DateTimeValue::local_time(); |
| _coord_addr = _fragments_ctx->coord_addr; |
| } |
| |
| FragmentExecState::FragmentExecState(const TUniqueId& query_id, |
| const TUniqueId& fragment_instance_id, int backend_num, |
| ExecEnv* exec_env, const TNetworkAddress& coord_addr) |
| : _query_id(query_id), |
| _fragment_instance_id(fragment_instance_id), |
| _backend_num(backend_num), |
| _exec_env(exec_env), |
| _coord_addr(coord_addr), |
| _executor(exec_env, std::bind<void>(std::mem_fn(&FragmentExecState::coordinator_callback), |
| this, std::placeholders::_1, std::placeholders::_2, |
| std::placeholders::_3)), |
| _timeout_second(-1) { |
| _start_time = DateTimeValue::local_time(); |
| } |
| |
| Status FragmentExecState::prepare(const TExecPlanFragmentParams& params) { |
| if (params.__isset.query_options) { |
| _timeout_second = params.query_options.query_timeout; |
| } |
| |
| if (_fragments_ctx == nullptr) { |
| if (params.__isset.resource_info) { |
| set_group(params.resource_info); |
| } |
| } |
| |
| if (_fragments_ctx == nullptr) { |
| return _executor.prepare(params); |
| } else { |
| return _executor.prepare(params, _fragments_ctx.get()); |
| } |
| } |
| |
| Status FragmentExecState::execute() { |
| if (_need_wait_execution_trigger) { |
| // if _need_wait_execution_trigger is true, which means this instance |
| // is prepared but need to wait for the signal to do the rest execution. |
| if (!_fragments_ctx->wait_for_start()) { |
| return cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "wait fragment start timeout"); |
| } |
| } |
| #ifndef BE_TEST |
| if (_executor.runtime_state()->is_cancelled()) { |
| return Status::Cancelled("cancelled before execution"); |
| } |
| #endif |
| int64_t duration_ns = 0; |
| { |
| SCOPED_RAW_TIMER(&duration_ns); |
| CgroupsMgr::apply_system_cgroup(); |
| opentelemetry::trace::Tracer::GetCurrentSpan()->AddEvent("start executing Fragment"); |
| Status st = _executor.open(); |
| WARN_IF_ERROR(st, |
| strings::Substitute("Got error while opening fragment $0, query id: $1", |
| print_id(_fragment_instance_id), print_id(_query_id))); |
| if (!st.ok()) { |
| cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "PlanFragmentExecutor open failed"); |
| } |
| |
| _executor.close(); |
| } |
| DorisMetrics::instance()->fragment_requests_total->increment(1); |
| DorisMetrics::instance()->fragment_request_duration_us->increment(duration_ns / 1000); |
| return Status::OK(); |
| } |
| |
| Status FragmentExecState::cancel(const PPlanFragmentCancelReason& reason, const std::string& msg) { |
| if (!_cancelled) { |
| std::lock_guard<std::mutex> l(_status_lock); |
| if (reason == PPlanFragmentCancelReason::LIMIT_REACH) { |
| _executor.set_is_report_on_cancel(false); |
| } |
| _executor.cancel(reason, msg); |
| if (_pipe != nullptr) { |
| _pipe->cancel(PPlanFragmentCancelReason_Name(reason)); |
| } |
| _cancelled = true; |
| } |
| return Status::OK(); |
| } |
| |
| void FragmentExecState::callback(const Status& status, RuntimeProfile* profile, bool done) {} |
| |
| std::string FragmentExecState::to_http_path(const std::string& file_name) { |
| std::stringstream url; |
| url << "http://" << BackendOptions::get_localhost() << ":" << config::webserver_port |
| << "/api/_download_load?" |
| << "token=" << _exec_env->token() << "&file=" << file_name; |
| return url.str(); |
| } |
| |
| // There can only be one of these callbacks in-flight at any moment, because |
| // it is only invoked from the executor's reporting thread. |
| // Also, the reported status will always reflect the most recent execution status, |
| // including the final status when execution finishes. |
| void FragmentExecState::coordinator_callback(const Status& status, RuntimeProfile* profile, |
| bool done) { |
| DCHECK(status.ok() || done); // if !status.ok() => done |
| Status exec_status = update_status(status); |
| |
| Status coord_status; |
| FrontendServiceConnection coord(_exec_env->frontend_client_cache(), _coord_addr, &coord_status); |
| if (!coord_status.ok()) { |
| std::stringstream ss; |
| UniqueId uid(_query_id.hi, _query_id.lo); |
| ss << "couldn't get a client for " << _coord_addr << ", reason: " << coord_status; |
| LOG(WARNING) << "query_id: " << uid << ", " << ss.str(); |
| update_status(Status::InternalError(ss.str())); |
| return; |
| } |
| |
| TReportExecStatusParams params; |
| params.protocol_version = FrontendServiceVersion::V1; |
| params.__set_query_id(_query_id); |
| params.__set_backend_num(_backend_num); |
| params.__set_fragment_instance_id(_fragment_instance_id); |
| exec_status.set_t_status(¶ms); |
| params.__set_done(done); |
| |
| RuntimeState* runtime_state = _executor.runtime_state(); |
| DCHECK(runtime_state != nullptr); |
| if (runtime_state->query_type() == TQueryType::LOAD && !done && status.ok()) { |
| // this is a load plan, and load is not finished, just make a brief report |
| params.__set_loaded_rows(runtime_state->num_rows_load_total()); |
| params.__set_loaded_bytes(runtime_state->num_bytes_load_total()); |
| } else { |
| if (runtime_state->query_type() == TQueryType::LOAD) { |
| params.__set_loaded_rows(runtime_state->num_rows_load_total()); |
| params.__set_loaded_bytes(runtime_state->num_bytes_load_total()); |
| } |
| if (profile == nullptr) { |
| params.__isset.profile = false; |
| } else { |
| profile->to_thrift(¶ms.profile); |
| params.__isset.profile = true; |
| } |
| |
| if (!runtime_state->output_files().empty()) { |
| params.__isset.delta_urls = true; |
| for (auto& it : runtime_state->output_files()) { |
| params.delta_urls.push_back(to_http_path(it)); |
| } |
| } |
| if (runtime_state->num_rows_load_total() > 0 || |
| runtime_state->num_rows_load_filtered() > 0) { |
| params.__isset.load_counters = true; |
| |
| static std::string s_dpp_normal_all = "dpp.norm.ALL"; |
| static std::string s_dpp_abnormal_all = "dpp.abnorm.ALL"; |
| static std::string s_unselected_rows = "unselected.rows"; |
| |
| params.load_counters.emplace(s_dpp_normal_all, |
| std::to_string(runtime_state->num_rows_load_success())); |
| params.load_counters.emplace(s_dpp_abnormal_all, |
| std::to_string(runtime_state->num_rows_load_filtered())); |
| params.load_counters.emplace(s_unselected_rows, |
| std::to_string(runtime_state->num_rows_load_unselected())); |
| } |
| if (!runtime_state->get_error_log_file_path().empty()) { |
| params.__set_tracking_url( |
| to_load_error_http_path(runtime_state->get_error_log_file_path())); |
| } |
| if (!runtime_state->export_output_files().empty()) { |
| params.__isset.export_files = true; |
| params.export_files = runtime_state->export_output_files(); |
| } |
| if (!runtime_state->tablet_commit_infos().empty()) { |
| params.__isset.commitInfos = true; |
| params.commitInfos.reserve(runtime_state->tablet_commit_infos().size()); |
| for (auto& info : runtime_state->tablet_commit_infos()) { |
| params.commitInfos.push_back(info); |
| } |
| } |
| if (!runtime_state->error_tablet_infos().empty()) { |
| params.__isset.errorTabletInfos = true; |
| params.errorTabletInfos.reserve(runtime_state->error_tablet_infos().size()); |
| for (auto& info : runtime_state->error_tablet_infos()) { |
| params.errorTabletInfos.push_back(info); |
| } |
| } |
| |
| // Send new errors to coordinator |
| runtime_state->get_unreported_errors(&(params.error_log)); |
| params.__isset.error_log = (params.error_log.size() > 0); |
| } |
| |
| if (_exec_env->master_info()->__isset.backend_id) { |
| params.__set_backend_id(_exec_env->master_info()->backend_id); |
| } |
| |
| TReportExecStatusResult res; |
| Status rpc_status; |
| |
| VLOG_DEBUG << "reportExecStatus params is " |
| << apache::thrift::ThriftDebugString(params).c_str(); |
| if (!exec_status.ok()) { |
| LOG(WARNING) << "report error status: " << exec_status.to_string() |
| << " to coordinator: " << _coord_addr << ", query id: " << print_id(_query_id) |
| << ", instance id: " << print_id(_fragment_instance_id); |
| } |
| try { |
| try { |
| coord->reportExecStatus(res, params); |
| } catch (TTransportException& e) { |
| LOG(WARNING) << "Retrying ReportExecStatus. query id: " << print_id(_query_id) |
| << ", instance id: " << print_id(_fragment_instance_id) << " to " |
| << _coord_addr << ", err: " << e.what(); |
| rpc_status = coord.reopen(); |
| |
| if (!rpc_status.ok()) { |
| // we need to cancel the execution of this fragment |
| update_status(rpc_status); |
| _executor.cancel(); |
| return; |
| } |
| coord->reportExecStatus(res, params); |
| } |
| |
| rpc_status = Status(res.status); |
| } catch (TException& e) { |
| std::stringstream msg; |
| msg << "ReportExecStatus() to " << _coord_addr << " failed:\n" << e.what(); |
| LOG(WARNING) << msg.str(); |
| rpc_status = Status::InternalError(msg.str()); |
| } |
| |
| if (!rpc_status.ok()) { |
| // we need to cancel the execution of this fragment |
| update_status(rpc_status); |
| _executor.cancel(); |
| } |
| } |
| |
| FragmentMgr::FragmentMgr(ExecEnv* exec_env) |
| : _exec_env(exec_env), |
| _fragment_map(), |
| _fragments_ctx_map(), |
| _stop_background_threads_latch(1) { |
| _entity = DorisMetrics::instance()->metric_registry()->register_entity("FragmentMgr"); |
| INT_UGAUGE_METRIC_REGISTER(_entity, timeout_canceled_fragment_count); |
| REGISTER_HOOK_METRIC(plan_fragment_count, [this]() { return _fragment_map.size(); }); |
| |
| auto s = Thread::create( |
| "FragmentMgr", "cancel_timeout_plan_fragment", [this]() { this->cancel_worker(); }, |
| &_cancel_thread); |
| CHECK(s.ok()) << s.to_string(); |
| |
| // TODO(zc): we need a better thread-pool |
| // now one user can use all the thread pool, others have no resource. |
| s = ThreadPoolBuilder("FragmentMgrThreadPool") |
| .set_min_threads(config::fragment_pool_thread_num_min) |
| .set_max_threads(config::fragment_pool_thread_num_max) |
| .set_max_queue_size(config::fragment_pool_queue_size) |
| .build(&_thread_pool); |
| |
| REGISTER_HOOK_METRIC(fragment_thread_pool_queue_size, |
| [this]() { return _thread_pool->get_queue_size(); }); |
| CHECK(s.ok()) << s.to_string(); |
| } |
| |
| FragmentMgr::~FragmentMgr() { |
| DEREGISTER_HOOK_METRIC(plan_fragment_count); |
| DEREGISTER_HOOK_METRIC(fragment_thread_pool_queue_size); |
| _stop_background_threads_latch.count_down(); |
| if (_cancel_thread) { |
| _cancel_thread->join(); |
| } |
| // Stop all the worker, should wait for a while? |
| // _thread_pool->wait_for(); |
| _thread_pool->shutdown(); |
| |
| // Only me can delete |
| { |
| std::lock_guard<std::mutex> lock(_lock); |
| _fragment_map.clear(); |
| _fragments_ctx_map.clear(); |
| } |
| } |
| |
| static void empty_function(PlanFragmentExecutor* exec) {} |
| |
| void FragmentMgr::_exec_actual(std::shared_ptr<FragmentExecState> exec_state, FinishCallback cb) { |
| std::string func_name {"PlanFragmentExecutor::_exec_actual"}; |
| #ifndef BE_TEST |
| auto span = exec_state->executor()->runtime_state()->get_tracer()->StartSpan(func_name); |
| SCOPED_ATTACH_TASK(exec_state->executor()->runtime_state()); |
| #else |
| auto span = telemetry::get_noop_tracer()->StartSpan(func_name); |
| #endif |
| auto scope = opentelemetry::trace::Scope {span}; |
| span->SetAttribute("query_id", print_id(exec_state->query_id())); |
| span->SetAttribute("instance_id", print_id(exec_state->fragment_instance_id())); |
| |
| LOG_INFO(func_name) |
| .tag("query_id", exec_state->query_id()) |
| .tag("instance_id", exec_state->fragment_instance_id()) |
| .tag("pthread_id", (uintptr_t)pthread_self()); |
| |
| Status st = exec_state->execute(); |
| if (!st.ok()) { |
| exec_state->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "exec_state execute failed"); |
| } |
| |
| std::shared_ptr<QueryFragmentsCtx> fragments_ctx = exec_state->get_fragments_ctx(); |
| bool all_done = false; |
| if (fragments_ctx != nullptr) { |
| // decrease the number of unfinished fragments |
| all_done = fragments_ctx->countdown(); |
| } |
| |
| // remove exec state after this fragment finished |
| { |
| std::lock_guard<std::mutex> lock(_lock); |
| _fragment_map.erase(exec_state->fragment_instance_id()); |
| if (all_done && fragments_ctx) { |
| _fragments_ctx_map.erase(fragments_ctx->query_id); |
| } |
| } |
| |
| // Callback after remove from this id |
| cb(exec_state->executor()); |
| } |
| |
| Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params) { |
| if (params.txn_conf.need_txn) { |
| StreamLoadContext* stream_load_ctx = new StreamLoadContext(_exec_env); |
| stream_load_ctx->db = params.txn_conf.db; |
| stream_load_ctx->db_id = params.txn_conf.db_id; |
| stream_load_ctx->table = params.txn_conf.tbl; |
| stream_load_ctx->txn_id = params.txn_conf.txn_id; |
| stream_load_ctx->id = UniqueId(params.params.query_id); |
| stream_load_ctx->put_result.params = params; |
| stream_load_ctx->use_streaming = true; |
| stream_load_ctx->load_type = TLoadType::MANUL_LOAD; |
| stream_load_ctx->load_src_type = TLoadSourceType::RAW; |
| stream_load_ctx->label = params.import_label; |
| stream_load_ctx->format = TFileFormatType::FORMAT_CSV_PLAIN; |
| stream_load_ctx->timeout_second = 3600; |
| stream_load_ctx->auth.auth_code_uuid = params.txn_conf.auth_code_uuid; |
| stream_load_ctx->need_commit_self = true; |
| stream_load_ctx->need_rollback = true; |
| // total_length == -1 means read one message from pipe in once time, don't care the length. |
| auto pipe = std::make_shared<StreamLoadPipe>(kMaxPipeBufferedBytes /* max_buffered_bytes */, |
| 64 * 1024 /* min_chunk_size */, |
| -1 /* total_length */, true /* use_proto */); |
| stream_load_ctx->body_sink = pipe; |
| stream_load_ctx->max_filter_ratio = params.txn_conf.max_filter_ratio; |
| |
| RETURN_IF_ERROR(_exec_env->load_stream_mgr()->put(stream_load_ctx->id, pipe)); |
| |
| RETURN_IF_ERROR(_exec_env->stream_load_executor()->execute_plan_fragment(stream_load_ctx)); |
| set_pipe(params.params.fragment_instance_id, pipe); |
| return Status::OK(); |
| } else { |
| return exec_plan_fragment(params, std::bind<void>(&empty_function, std::placeholders::_1)); |
| } |
| } |
| |
| Status FragmentMgr::start_query_execution(const PExecPlanFragmentStartRequest* request) { |
| std::lock_guard<std::mutex> lock(_lock); |
| TUniqueId query_id; |
| query_id.__set_hi(request->query_id().hi()); |
| query_id.__set_lo(request->query_id().lo()); |
| auto search = _fragments_ctx_map.find(query_id); |
| if (search == _fragments_ctx_map.end()) { |
| return Status::InternalError( |
| "Failed to get query fragments context. Query may be " |
| "timeout or be cancelled. host: {}", |
| BackendOptions::get_localhost()); |
| } |
| search->second->set_ready_to_execute(false); |
| return Status::OK(); |
| } |
| |
| void FragmentMgr::set_pipe(const TUniqueId& fragment_instance_id, |
| std::shared_ptr<StreamLoadPipe> pipe) { |
| { |
| std::lock_guard<std::mutex> lock(_lock); |
| auto iter = _fragment_map.find(fragment_instance_id); |
| if (iter != _fragment_map.end()) { |
| _fragment_map[fragment_instance_id]->set_pipe(std::move(pipe)); |
| } |
| } |
| } |
| |
| std::shared_ptr<StreamLoadPipe> FragmentMgr::get_pipe(const TUniqueId& fragment_instance_id) { |
| { |
| std::lock_guard<std::mutex> lock(_lock); |
| auto iter = _fragment_map.find(fragment_instance_id); |
| if (iter != _fragment_map.end()) { |
| return _fragment_map[fragment_instance_id]->get_pipe(); |
| } else { |
| return nullptr; |
| } |
| } |
| } |
| |
| Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, FinishCallback cb) { |
| auto tracer = telemetry::is_current_span_valid() ? telemetry::get_tracer("tracer") |
| : telemetry::get_noop_tracer(); |
| START_AND_SCOPE_SPAN(tracer, span, "FragmentMgr::exec_plan_fragment"); |
| const TUniqueId& fragment_instance_id = params.params.fragment_instance_id; |
| { |
| std::lock_guard<std::mutex> lock(_lock); |
| auto iter = _fragment_map.find(fragment_instance_id); |
| if (iter != _fragment_map.end()) { |
| // Duplicated |
| return Status::OK(); |
| } |
| } |
| |
| std::shared_ptr<FragmentExecState> exec_state; |
| std::shared_ptr<QueryFragmentsCtx> fragments_ctx; |
| if (params.is_simplified_param) { |
| // Get common components from _fragments_ctx_map |
| std::lock_guard<std::mutex> lock(_lock); |
| auto search = _fragments_ctx_map.find(params.params.query_id); |
| if (search == _fragments_ctx_map.end()) { |
| return Status::InternalError( |
| "Failed to get query fragments context. Query may be " |
| "timeout or be cancelled. host: {}", |
| BackendOptions::get_localhost()); |
| } |
| fragments_ctx = search->second; |
| } else { |
| // This may be a first fragment request of the query. |
| // Create the query fragments context. |
| fragments_ctx.reset(new QueryFragmentsCtx(params.fragment_num_on_host, _exec_env)); |
| fragments_ctx->query_id = params.params.query_id; |
| RETURN_IF_ERROR(DescriptorTbl::create(&(fragments_ctx->obj_pool), params.desc_tbl, |
| &(fragments_ctx->desc_tbl))); |
| fragments_ctx->coord_addr = params.coord; |
| LOG(INFO) << "query_id: " |
| << UniqueId(fragments_ctx->query_id.hi, fragments_ctx->query_id.lo) |
| << " coord_addr " << fragments_ctx->coord_addr |
| << " total fragment num on current host: " << params.fragment_num_on_host; |
| fragments_ctx->query_globals = params.query_globals; |
| |
| if (params.__isset.resource_info) { |
| fragments_ctx->user = params.resource_info.user; |
| fragments_ctx->group = params.resource_info.group; |
| fragments_ctx->set_rsc_info = true; |
| } |
| |
| fragments_ctx->timeout_second = params.query_options.query_timeout; |
| _set_scan_concurrency(params, fragments_ctx.get()); |
| |
| bool has_query_mem_tracker = |
| params.query_options.__isset.mem_limit && (params.query_options.mem_limit > 0); |
| int64_t bytes_limit = has_query_mem_tracker ? params.query_options.mem_limit : -1; |
| if (bytes_limit > MemInfo::mem_limit()) { |
| VLOG_NOTICE << "Query memory limit " << PrettyPrinter::print(bytes_limit, TUnit::BYTES) |
| << " exceeds process memory limit of " |
| << PrettyPrinter::print(MemInfo::mem_limit(), TUnit::BYTES) |
| << ". Using process memory limit instead"; |
| bytes_limit = MemInfo::mem_limit(); |
| } |
| if (params.query_options.query_type == TQueryType::SELECT) { |
| fragments_ctx->query_mem_tracker = std::make_shared<MemTrackerLimiter>( |
| MemTrackerLimiter::Type::QUERY, |
| fmt::format("Query#Id={}", print_id(fragments_ctx->query_id)), bytes_limit); |
| } else if (params.query_options.query_type == TQueryType::LOAD) { |
| fragments_ctx->query_mem_tracker = std::make_shared<MemTrackerLimiter>( |
| MemTrackerLimiter::Type::LOAD, |
| fmt::format("Load#Id={}", print_id(fragments_ctx->query_id)), bytes_limit); |
| } else { // EXTERNAL |
| fragments_ctx->query_mem_tracker = std::make_shared<MemTrackerLimiter>( |
| MemTrackerLimiter::Type::LOAD, |
| fmt::format("External#Id={}", print_id(fragments_ctx->query_id)), bytes_limit); |
| } |
| if (params.query_options.__isset.is_report_success && |
| params.query_options.is_report_success) { |
| fragments_ctx->query_mem_tracker->enable_print_log_usage(); |
| } |
| |
| { |
| // Find _fragments_ctx_map again, in case some other request has already |
| // create the query fragments context. |
| std::lock_guard<std::mutex> lock(_lock); |
| auto search = _fragments_ctx_map.find(params.params.query_id); |
| if (search == _fragments_ctx_map.end()) { |
| _fragments_ctx_map.insert(std::make_pair(fragments_ctx->query_id, fragments_ctx)); |
| LOG(INFO) << "Register query/load memory tracker, query/load id: " |
| << print_id(fragments_ctx->query_id) |
| << " limit: " << PrettyPrinter::print(bytes_limit, TUnit::BYTES); |
| } else { |
| // Already has a query fragments context, use it |
| fragments_ctx = search->second; |
| } |
| } |
| } |
| fragments_ctx->fragment_ids.push_back(fragment_instance_id); |
| |
| exec_state.reset(new FragmentExecState(fragments_ctx->query_id, |
| params.params.fragment_instance_id, params.backend_num, |
| _exec_env, fragments_ctx)); |
| if (params.__isset.need_wait_execution_trigger && params.need_wait_execution_trigger) { |
| // set need_wait_execution_trigger means this instance will not actually being executed |
| // until the execPlanFragmentStart RPC trigger to start it. |
| exec_state->set_need_wait_execution_trigger(); |
| } |
| |
| int64_t duration_ns = 0; |
| { |
| SCOPED_RAW_TIMER(&duration_ns); |
| RETURN_IF_ERROR(exec_state->prepare(params)); |
| } |
| g_fragmentmgr_prepare_latency << (duration_ns / 1000); |
| |
| std::shared_ptr<RuntimeFilterMergeControllerEntity> handler; |
| _runtimefilter_controller.add_entity(params, &handler, exec_state->executor()->runtime_state()); |
| exec_state->set_merge_controller_handler(handler); |
| |
| { |
| std::lock_guard<std::mutex> lock(_lock); |
| _fragment_map.insert(std::make_pair(params.params.fragment_instance_id, exec_state)); |
| _cv.notify_all(); |
| } |
| |
| auto st = _thread_pool->submit_func( |
| [this, exec_state, cb, parent_span = opentelemetry::trace::Tracer::GetCurrentSpan()] { |
| OpentelemetryScope scope {parent_span}; |
| _exec_actual(exec_state, cb); |
| }); |
| if (!st.ok()) { |
| { |
| // Remove the exec state added |
| std::lock_guard<std::mutex> lock(_lock); |
| _fragment_map.erase(params.params.fragment_instance_id); |
| } |
| exec_state->cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, |
| "push plan fragment to thread pool failed"); |
| return Status::InternalError( |
| "push plan fragment {} to thread pool failed. err = {}, BE: {}", |
| print_id(params.params.fragment_instance_id), st.to_string(), |
| BackendOptions::get_localhost()); |
| } |
| |
| return Status::OK(); |
| } |
| |
| void FragmentMgr::_set_scan_concurrency(const TExecPlanFragmentParams& params, |
| QueryFragmentsCtx* fragments_ctx) { |
| #ifndef BE_TEST |
| // set thread token |
| // the thread token will be set if |
| // 1. the cpu_limit is set, or |
| // 2. the limit is very small ( < 1024) |
| // If the token is set, the scan task will use limited_scan_pool in scanner scheduler. |
| // Otherwise, the scan task will use local/remote scan pool in scanner scheduler |
| int concurrency = 1; |
| bool is_serial = false; |
| bool need_token = false; |
| if (params.query_options.__isset.resource_limit && |
| params.query_options.resource_limit.__isset.cpu_limit) { |
| concurrency = params.query_options.resource_limit.cpu_limit; |
| need_token = true; |
| } else { |
| concurrency = config::doris_scanner_thread_pool_thread_num; |
| } |
| if (params.__isset.fragment && params.fragment.__isset.plan && |
| params.fragment.plan.nodes.size() > 0) { |
| for (auto& node : params.fragment.plan.nodes) { |
| // Only for SCAN NODE |
| if (!_is_scan_node(node.node_type)) { |
| continue; |
| } |
| if (node.__isset.conjuncts && !node.conjuncts.empty()) { |
| // If the scan node has where predicate, do not set concurrency |
| continue; |
| } |
| if (node.limit > 0 && node.limit < 1024) { |
| concurrency = 1; |
| is_serial = true; |
| need_token = true; |
| break; |
| } |
| } |
| } |
| if (need_token) { |
| fragments_ctx->set_thread_token(concurrency, is_serial); |
| } |
| #endif |
| } |
| |
| bool FragmentMgr::_is_scan_node(const TPlanNodeType::type& type) { |
| return type == TPlanNodeType::OLAP_SCAN_NODE || type == TPlanNodeType::MYSQL_SCAN_NODE || |
| type == TPlanNodeType::SCHEMA_SCAN_NODE || type == TPlanNodeType::META_SCAN_NODE || |
| type == TPlanNodeType::BROKER_SCAN_NODE || type == TPlanNodeType::ES_SCAN_NODE || |
| type == TPlanNodeType::ES_HTTP_SCAN_NODE || type == TPlanNodeType::ODBC_SCAN_NODE || |
| type == TPlanNodeType::DATA_GEN_SCAN_NODE || type == TPlanNodeType::FILE_SCAN_NODE || |
| type == TPlanNodeType::JDBC_SCAN_NODE; |
| } |
| |
| Status FragmentMgr::cancel(const TUniqueId& fragment_id, const PPlanFragmentCancelReason& reason, |
| const std::string& msg) { |
| std::shared_ptr<FragmentExecState> exec_state; |
| { |
| std::lock_guard<std::mutex> lock(_lock); |
| auto iter = _fragment_map.find(fragment_id); |
| if (iter == _fragment_map.end()) { |
| // No match |
| return Status::OK(); |
| } |
| exec_state = iter->second; |
| } |
| exec_state->cancel(reason, msg); |
| |
| return Status::OK(); |
| } |
| |
| void FragmentMgr::cancel_query(const TUniqueId& query_id, const PPlanFragmentCancelReason& reason, |
| const std::string& msg) { |
| std::vector<TUniqueId> cancel_fragment_ids; |
| { |
| std::lock_guard<std::mutex> lock(_lock); |
| auto ctx = _fragments_ctx_map.find(query_id); |
| if (ctx != _fragments_ctx_map.end()) { |
| cancel_fragment_ids = ctx->second->fragment_ids; |
| } |
| } |
| for (auto it : cancel_fragment_ids) { |
| cancel(it, reason, msg); |
| } |
| } |
| |
| bool FragmentMgr::query_is_canceled(const TUniqueId& query_id) { |
| std::lock_guard<std::mutex> lock(_lock); |
| auto ctx = _fragments_ctx_map.find(query_id); |
| if (ctx != _fragments_ctx_map.end()) { |
| for (auto it : ctx->second->fragment_ids) { |
| auto exec_state_iter = _fragment_map.find(it); |
| if (exec_state_iter != _fragment_map.end() && exec_state_iter->second) { |
| return exec_state_iter->second->is_canceled(); |
| } |
| } |
| } |
| return true; |
| } |
| |
| void FragmentMgr::cancel_worker() { |
| LOG(INFO) << "FragmentMgr cancel worker start working."; |
| do { |
| std::vector<TUniqueId> to_cancel; |
| std::vector<TUniqueId> to_cancel_queries; |
| DateTimeValue now = DateTimeValue::local_time(); |
| { |
| std::lock_guard<std::mutex> lock(_lock); |
| for (auto& it : _fragment_map) { |
| if (it.second->is_timeout(now)) { |
| to_cancel.push_back(it.second->fragment_instance_id()); |
| } |
| } |
| for (auto it = _fragments_ctx_map.begin(); it != _fragments_ctx_map.end();) { |
| if (it->second->is_timeout(now)) { |
| it = _fragments_ctx_map.erase(it); |
| } else { |
| ++it; |
| } |
| } |
| } |
| timeout_canceled_fragment_count->increment(to_cancel.size()); |
| for (auto& id : to_cancel) { |
| cancel(id, PPlanFragmentCancelReason::TIMEOUT); |
| LOG(INFO) << "FragmentMgr cancel worker going to cancel timeout fragment " |
| << print_id(id); |
| } |
| } while (!_stop_background_threads_latch.wait_for(std::chrono::seconds(1))); |
| LOG(INFO) << "FragmentMgr cancel worker is going to exit."; |
| } |
| |
| void FragmentMgr::debug(std::stringstream& ss) { |
| // Keep things simple |
| std::lock_guard<std::mutex> lock(_lock); |
| |
| ss << "FragmentMgr have " << _fragment_map.size() << " jobs.\n"; |
| ss << "job_id\t\tstart_time\t\texecute_time(s)\n"; |
| DateTimeValue now = DateTimeValue::local_time(); |
| for (auto& it : _fragment_map) { |
| ss << it.first << "\t" << it.second->start_time().debug_string() << "\t" |
| << now.second_diff(it.second->start_time()) << "\n"; |
| } |
| } |
| |
| /* |
| * 1. resolve opaqued_query_plan to thrift structure |
| * 2. build TExecPlanFragmentParams |
| */ |
| Status FragmentMgr::exec_external_plan_fragment(const TScanOpenParams& params, |
| const TUniqueId& fragment_instance_id, |
| std::vector<TScanColumnDesc>* selected_columns) { |
| const std::string& opaqued_query_plan = params.opaqued_query_plan; |
| std::string query_plan_info; |
| // base64 decode query plan |
| if (!base64_decode(opaqued_query_plan, &query_plan_info)) { |
| LOG(WARNING) << "open context error: base64_decode decode opaqued_query_plan failure"; |
| std::stringstream msg; |
| msg << "query_plan_info: " << query_plan_info |
| << " validate error, should not be modified after returned Doris FE processed"; |
| return Status::InvalidArgument(msg.str()); |
| } |
| TQueryPlanInfo t_query_plan_info; |
| const uint8_t* buf = (const uint8_t*)query_plan_info.data(); |
| uint32_t len = query_plan_info.size(); |
| // deserialize TQueryPlanInfo |
| auto st = deserialize_thrift_msg(buf, &len, false, &t_query_plan_info); |
| if (!st.ok()) { |
| LOG(WARNING) << "open context error: deserialize TQueryPlanInfo failure"; |
| std::stringstream msg; |
| msg << "query_plan_info: " << query_plan_info |
| << " deserialize error, should not be modified after returned Doris FE processed"; |
| return Status::InvalidArgument(msg.str()); |
| } |
| |
| // set up desc tbl |
| DescriptorTbl* desc_tbl = nullptr; |
| ObjectPool obj_pool; |
| st = DescriptorTbl::create(&obj_pool, t_query_plan_info.desc_tbl, &desc_tbl); |
| if (!st.ok()) { |
| LOG(WARNING) << "open context error: extract DescriptorTbl failure"; |
| std::stringstream msg; |
| msg << "query_plan_info: " << query_plan_info |
| << " create DescriptorTbl error, should not be modified after returned Doris FE " |
| "processed"; |
| return Status::InvalidArgument(msg.str()); |
| } |
| TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0); |
| if (tuple_desc == nullptr) { |
| LOG(WARNING) << "open context error: extract TupleDescriptor failure"; |
| std::stringstream msg; |
| msg << "query_plan_info: " << query_plan_info |
| << " get TupleDescriptor error, should not be modified after returned Doris FE " |
| "processed"; |
| return Status::InvalidArgument(msg.str()); |
| } |
| // process selected columns form slots |
| for (const SlotDescriptor* slot : tuple_desc->slots()) { |
| TScanColumnDesc col; |
| col.__set_name(slot->col_name()); |
| col.__set_type(to_thrift(slot->type().type)); |
| selected_columns->emplace_back(std::move(col)); |
| } |
| |
| VLOG_QUERY << "BackendService execute open() TQueryPlanInfo: " |
| << apache::thrift::ThriftDebugString(t_query_plan_info); |
| // assign the param used to execute PlanFragment |
| TExecPlanFragmentParams exec_fragment_params; |
| exec_fragment_params.protocol_version = (PaloInternalServiceVersion::type)0; |
| exec_fragment_params.__set_is_simplified_param(false); |
| exec_fragment_params.__set_fragment(t_query_plan_info.plan_fragment); |
| exec_fragment_params.__set_desc_tbl(t_query_plan_info.desc_tbl); |
| |
| // assign the param used for executing of PlanFragment-self |
| TPlanFragmentExecParams fragment_exec_params; |
| fragment_exec_params.query_id = t_query_plan_info.query_id; |
| fragment_exec_params.fragment_instance_id = fragment_instance_id; |
| std::map<::doris::TPlanNodeId, std::vector<TScanRangeParams>> per_node_scan_ranges; |
| std::vector<TScanRangeParams> scan_ranges; |
| std::vector<int64_t> tablet_ids = params.tablet_ids; |
| TNetworkAddress address; |
| address.hostname = BackendOptions::get_localhost(); |
| address.port = doris::config::be_port; |
| std::map<int64_t, TTabletVersionInfo> tablet_info = t_query_plan_info.tablet_info; |
| for (auto tablet_id : params.tablet_ids) { |
| TPaloScanRange scan_range; |
| scan_range.db_name = params.database; |
| scan_range.table_name = params.table; |
| auto iter = tablet_info.find(tablet_id); |
| if (iter != tablet_info.end()) { |
| TTabletVersionInfo info = iter->second; |
| scan_range.tablet_id = tablet_id; |
| scan_range.version = std::to_string(info.version); |
| // Useless but it is required field in TPaloScanRange |
| scan_range.version_hash = "0"; |
| scan_range.schema_hash = std::to_string(info.schema_hash); |
| scan_range.hosts.push_back(address); |
| } else { |
| std::stringstream msg; |
| msg << "tablet_id: " << tablet_id << " not found"; |
| LOG(WARNING) << "tablet_id [ " << tablet_id << " ] not found"; |
| return Status::NotFound(msg.str()); |
| } |
| TScanRange doris_scan_range; |
| doris_scan_range.__set_palo_scan_range(scan_range); |
| TScanRangeParams scan_range_params; |
| scan_range_params.scan_range = doris_scan_range; |
| scan_ranges.push_back(scan_range_params); |
| } |
| per_node_scan_ranges.insert(std::make_pair((::doris::TPlanNodeId)0, scan_ranges)); |
| fragment_exec_params.per_node_scan_ranges = per_node_scan_ranges; |
| exec_fragment_params.__set_params(fragment_exec_params); |
| // batch_size for one RowBatch |
| TQueryOptions query_options; |
| query_options.batch_size = params.batch_size; |
| query_options.query_timeout = params.query_timeout; |
| query_options.mem_limit = params.mem_limit; |
| query_options.query_type = TQueryType::EXTERNAL; |
| exec_fragment_params.__set_query_options(query_options); |
| VLOG_ROW << "external exec_plan_fragment params is " |
| << apache::thrift::ThriftDebugString(exec_fragment_params).c_str(); |
| return exec_plan_fragment(exec_fragment_params); |
| } |
| |
| Status FragmentMgr::apply_filter(const PPublishFilterRequest* request, |
| butil::IOBufAsZeroCopyInputStream* attach_data) { |
| UniqueId fragment_instance_id = request->fragment_id(); |
| TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift(); |
| std::shared_ptr<FragmentExecState> fragment_state; |
| |
| { |
| std::unique_lock<std::mutex> lock(_lock); |
| if (!_fragment_map.count(tfragment_instance_id)) { |
| VLOG_NOTICE << "wait for fragment start execute, fragment-id:" << fragment_instance_id; |
| _cv.wait_for(lock, std::chrono::milliseconds(1000), |
| [&] { return _fragment_map.count(tfragment_instance_id); }); |
| } |
| |
| auto iter = _fragment_map.find(tfragment_instance_id); |
| if (iter == _fragment_map.end()) { |
| VLOG_CRITICAL << "unknown.... fragment-id:" << fragment_instance_id; |
| return Status::InvalidArgument("fragment-id: {}", fragment_instance_id.to_string()); |
| } |
| fragment_state = iter->second; |
| } |
| |
| DCHECK(fragment_state != nullptr); |
| RuntimeFilterMgr* runtime_filter_mgr = |
| fragment_state->executor()->runtime_state()->runtime_filter_mgr(); |
| |
| Status st = runtime_filter_mgr->update_filter(request, attach_data); |
| return st; |
| } |
| |
| Status FragmentMgr::merge_filter(const PMergeFilterRequest* request, |
| butil::IOBufAsZeroCopyInputStream* attach_data) { |
| UniqueId queryid = request->query_id(); |
| std::shared_ptr<RuntimeFilterMergeControllerEntity> filter_controller; |
| RETURN_IF_ERROR(_runtimefilter_controller.acquire(queryid, &filter_controller)); |
| |
| auto fragment_instance_id = filter_controller->instance_id(); |
| TUniqueId tfragment_instance_id = fragment_instance_id.to_thrift(); |
| std::shared_ptr<FragmentExecState> fragment_state; |
| { |
| std::unique_lock<std::mutex> lock(_lock); |
| auto iter = _fragment_map.find(tfragment_instance_id); |
| if (iter == _fragment_map.end()) { |
| VLOG_CRITICAL << "unknown fragment-id:" << fragment_instance_id; |
| return Status::InvalidArgument("fragment-id: {}", fragment_instance_id.to_string()); |
| } |
| |
| // hold reference to fragment_state, or else runtime_state can be destroyed |
| // when filter_controller->merge is still in progress |
| fragment_state = iter->second; |
| } |
| RETURN_IF_ERROR(filter_controller->merge(request, attach_data)); |
| return Status::OK(); |
| } |
| |
| } // namespace doris |