| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "exec/pipeline/pipeline_fragment_context.h" |
| |
| #include <gen_cpp/DataSinks_types.h> |
| #include <gen_cpp/PaloInternalService_types.h> |
| #include <gen_cpp/PlanNodes_types.h> |
| #include <pthread.h> |
| |
| #include <algorithm> |
| #include <cstdlib> |
| // IWYU pragma: no_include <bits/chrono.h> |
| #include <fmt/format.h> |
| |
| #include <chrono> // IWYU pragma: keep |
| #include <map> |
| #include <memory> |
| #include <ostream> |
| #include <utility> |
| |
| #include "cloud/config.h" |
| #include "common/cast_set.h" |
| #include "common/config.h" |
| #include "common/exception.h" |
| #include "common/logging.h" |
| #include "common/status.h" |
| #include "exec/exchange/local_exchange_sink_operator.h" |
| #include "exec/exchange/local_exchange_source_operator.h" |
| #include "exec/exchange/local_exchanger.h" |
| #include "exec/exchange/vdata_stream_mgr.h" |
| #include "exec/operator/aggregation_sink_operator.h" |
| #include "exec/operator/aggregation_source_operator.h" |
| #include "exec/operator/analytic_sink_operator.h" |
| #include "exec/operator/analytic_source_operator.h" |
| #include "exec/operator/assert_num_rows_operator.h" |
| #include "exec/operator/blackhole_sink_operator.h" |
| #include "exec/operator/cache_sink_operator.h" |
| #include "exec/operator/cache_source_operator.h" |
| #include "exec/operator/datagen_operator.h" |
| #include "exec/operator/dict_sink_operator.h" |
| #include "exec/operator/distinct_streaming_aggregation_operator.h" |
| #include "exec/operator/empty_set_operator.h" |
| #include "exec/operator/es_scan_operator.h" |
| #include "exec/operator/exchange_sink_operator.h" |
| #include "exec/operator/exchange_source_operator.h" |
| #include "exec/operator/file_scan_operator.h" |
| #include "exec/operator/group_commit_block_sink_operator.h" |
| #include "exec/operator/group_commit_scan_operator.h" |
| #include "exec/operator/hashjoin_build_sink.h" |
| #include "exec/operator/hashjoin_probe_operator.h" |
| #include "exec/operator/hive_table_sink_operator.h" |
| #include "exec/operator/iceberg_delete_sink_operator.h" |
| #include "exec/operator/iceberg_merge_sink_operator.h" |
| #include "exec/operator/iceberg_table_sink_operator.h" |
| #include "exec/operator/jdbc_scan_operator.h" |
| #include "exec/operator/jdbc_table_sink_operator.h" |
| #include "exec/operator/local_merge_sort_source_operator.h" |
| #include "exec/operator/materialization_opertor.h" |
| #include "exec/operator/maxcompute_table_sink_operator.h" |
| #include "exec/operator/memory_scratch_sink_operator.h" |
| #include "exec/operator/meta_scan_operator.h" |
| #include "exec/operator/multi_cast_data_stream_sink.h" |
| #include "exec/operator/multi_cast_data_stream_source.h" |
| #include "exec/operator/nested_loop_join_build_operator.h" |
| #include "exec/operator/nested_loop_join_probe_operator.h" |
| #include "exec/operator/olap_scan_operator.h" |
| #include "exec/operator/olap_table_sink_operator.h" |
| #include "exec/operator/olap_table_sink_v2_operator.h" |
| #include "exec/operator/partition_sort_sink_operator.h" |
| #include "exec/operator/partition_sort_source_operator.h" |
| #include "exec/operator/partitioned_aggregation_sink_operator.h" |
| #include "exec/operator/partitioned_aggregation_source_operator.h" |
| #include "exec/operator/partitioned_hash_join_probe_operator.h" |
| #include "exec/operator/partitioned_hash_join_sink_operator.h" |
| #include "exec/operator/rec_cte_anchor_sink_operator.h" |
| #include "exec/operator/rec_cte_scan_operator.h" |
| #include "exec/operator/rec_cte_sink_operator.h" |
| #include "exec/operator/rec_cte_source_operator.h" |
| #include "exec/operator/repeat_operator.h" |
| #include "exec/operator/result_file_sink_operator.h" |
| #include "exec/operator/result_sink_operator.h" |
| #include "exec/operator/schema_scan_operator.h" |
| #include "exec/operator/select_operator.h" |
| #include "exec/operator/set_probe_sink_operator.h" |
| #include "exec/operator/set_sink_operator.h" |
| #include "exec/operator/set_source_operator.h" |
| #include "exec/operator/sort_sink_operator.h" |
| #include "exec/operator/sort_source_operator.h" |
| #include "exec/operator/spill_iceberg_table_sink_operator.h" |
| #include "exec/operator/spill_sort_sink_operator.h" |
| #include "exec/operator/spill_sort_source_operator.h" |
| #include "exec/operator/streaming_aggregation_operator.h" |
| #include "exec/operator/table_function_operator.h" |
| #include "exec/operator/tvf_table_sink_operator.h" |
| #include "exec/operator/union_sink_operator.h" |
| #include "exec/operator/union_source_operator.h" |
| #include "exec/pipeline/dependency.h" |
| #include "exec/pipeline/pipeline_task.h" |
| #include "exec/pipeline/task_scheduler.h" |
| #include "exec/runtime_filter/runtime_filter_mgr.h" |
| #include "exec/sort/topn_sorter.h" |
| #include "exec/spill/spill_file.h" |
| #include "io/fs/stream_load_pipe.h" |
| #include "load/stream_load/new_load_stream_mgr.h" |
| #include "runtime/exec_env.h" |
| #include "runtime/fragment_mgr.h" |
| #include "runtime/result_block_buffer.h" |
| #include "runtime/result_buffer_mgr.h" |
| #include "runtime/runtime_state.h" |
| #include "runtime/thread_context.h" |
| #include "service/backend_options.h" |
| #include "util/countdown_latch.h" |
| #include "util/debug_util.h" |
| #include "util/uid_util.h" |
| |
| namespace doris { |
| #include "common/compile_check_begin.h" |
| PipelineFragmentContext::PipelineFragmentContext( |
| TUniqueId query_id, const TPipelineFragmentParams& request, |
| std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env, |
| const std::function<void(RuntimeState*, Status*)>& call_back, |
| report_status_callback report_status_cb) |
| : _query_id(std::move(query_id)), |
| _fragment_id(request.fragment_id), |
| _exec_env(exec_env), |
| _query_ctx(std::move(query_ctx)), |
| _call_back(call_back), |
| _is_report_on_cancel(true), |
| _report_status_cb(std::move(report_status_cb)), |
| _params(request), |
| _parallel_instances(_params.__isset.parallel_instances ? _params.parallel_instances : 0), |
| _need_notify_close(request.__isset.need_notify_close ? request.need_notify_close |
| : false) { |
| _fragment_watcher.start(); |
| } |
| |
| PipelineFragmentContext::~PipelineFragmentContext() { |
| LOG_INFO("PipelineFragmentContext::~PipelineFragmentContext") |
| .tag("query_id", print_id(_query_id)) |
| .tag("fragment_id", _fragment_id); |
| _release_resource(); |
| { |
| // The memory released by the query end is recorded in the query mem tracker. |
| SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker()); |
| _runtime_state.reset(); |
| _query_ctx.reset(); |
| } |
| } |
| |
| bool PipelineFragmentContext::is_timeout(timespec now) const { |
| if (_timeout <= 0) { |
| return false; |
| } |
| return _fragment_watcher.elapsed_time_seconds(now) > _timeout; |
| } |
| |
| // Must not add lock in this method. Because it will call query ctx cancel. And |
| // QueryCtx cancel will call fragment ctx cancel. And Also Fragment ctx's running |
| // Method like exchange sink buffer will call query ctx cancel. If we add lock here |
| // There maybe dead lock. |
| void PipelineFragmentContext::cancel(const Status reason) { |
| LOG_INFO("PipelineFragmentContext::cancel") |
| .tag("query_id", print_id(_query_id)) |
| .tag("fragment_id", _fragment_id) |
| .tag("reason", reason.to_string()); |
| { |
| std::lock_guard<std::mutex> l(_task_mutex); |
| if (_closed_tasks >= _total_tasks) { |
| // All tasks in this PipelineXFragmentContext already closed. |
| return; |
| } |
| } |
| // Timeout is a special error code, we need print current stack to debug timeout issue. |
| if (reason.is<ErrorCode::TIMEOUT>()) { |
| auto dbg_str = fmt::format("PipelineFragmentContext is cancelled due to timeout:\n{}", |
| debug_string()); |
| LOG_LONG_STRING(WARNING, dbg_str); |
| } |
| |
| // `ILLEGAL_STATE` means queries this fragment belongs to was not found in FE (maybe finished) |
| if (reason.is<ErrorCode::ILLEGAL_STATE>()) { |
| LOG_WARNING("PipelineFragmentContext is cancelled due to illegal state : {}", |
| debug_string()); |
| } |
| |
| if (reason.is<ErrorCode::MEM_LIMIT_EXCEEDED>() || reason.is<ErrorCode::MEM_ALLOC_FAILED>()) { |
| print_profile("cancel pipeline, reason: " + reason.to_string()); |
| } |
| |
| if (auto error_url = get_load_error_url(); !error_url.empty()) { |
| _query_ctx->set_load_error_url(error_url); |
| } |
| |
| if (auto first_error_msg = get_first_error_msg(); !first_error_msg.empty()) { |
| _query_ctx->set_first_error_msg(first_error_msg); |
| } |
| |
| _query_ctx->cancel(reason, _fragment_id); |
| if (reason.is<ErrorCode::LIMIT_REACH>()) { |
| _is_report_on_cancel = false; |
| } else { |
| for (auto& id : _fragment_instance_ids) { |
| LOG(WARNING) << "PipelineFragmentContext cancel instance: " << print_id(id); |
| } |
| } |
| // Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe |
| // For stream load the fragment's query_id == load id, it is set in FE. |
| auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id); |
| if (stream_load_ctx != nullptr) { |
| stream_load_ctx->pipe->cancel(reason.to_string()); |
| // Set error URL here because after pipe is cancelled, stream load execution may return early. |
| // We need to set the error URL at this point to ensure error information is properly |
| // propagated to the client. |
| stream_load_ctx->error_url = get_load_error_url(); |
| stream_load_ctx->first_error_msg = get_first_error_msg(); |
| } |
| |
| for (auto& tasks : _tasks) { |
| for (auto& task : tasks) { |
| task.first->terminate(); |
| } |
| } |
| } |
| |
| PipelinePtr PipelineFragmentContext::add_pipeline(PipelinePtr parent, int idx) { |
| PipelineId id = _next_pipeline_id++; |
| auto pipeline = std::make_shared<Pipeline>( |
| id, parent ? std::min(parent->num_tasks(), _num_instances) : _num_instances, |
| parent ? parent->num_tasks() : _num_instances); |
| if (idx >= 0) { |
| _pipelines.insert(_pipelines.begin() + idx, pipeline); |
| } else { |
| _pipelines.emplace_back(pipeline); |
| } |
| if (parent) { |
| parent->set_children(pipeline); |
| } |
| return pipeline; |
| } |
| |
| Status PipelineFragmentContext::_build_and_prepare_full_pipeline(ThreadPool* thread_pool) { |
| { |
| SCOPED_TIMER(_build_pipelines_timer); |
| // 2. Build pipelines with operators in this fragment. |
| auto root_pipeline = add_pipeline(); |
| RETURN_IF_ERROR(_build_pipelines(_runtime_state->obj_pool(), *_query_ctx->desc_tbl, |
| &_root_op, root_pipeline)); |
| |
| // 3. Create sink operator |
| if (!_params.fragment.__isset.output_sink) { |
| return Status::InternalError("No output sink in this fragment!"); |
| } |
| RETURN_IF_ERROR(_create_data_sink(_runtime_state->obj_pool(), _params.fragment.output_sink, |
| _params.fragment.output_exprs, _params, |
| root_pipeline->output_row_desc(), _runtime_state.get(), |
| *_desc_tbl, root_pipeline->id())); |
| RETURN_IF_ERROR(_sink->init(_params.fragment.output_sink)); |
| RETURN_IF_ERROR(root_pipeline->set_sink(_sink)); |
| |
| for (PipelinePtr& pipeline : _pipelines) { |
| DCHECK(pipeline->sink() != nullptr) << pipeline->operators().size(); |
| RETURN_IF_ERROR(pipeline->sink()->set_child(pipeline->operators().back())); |
| } |
| } |
| // 4. Build local exchanger |
| if (_runtime_state->enable_local_shuffle()) { |
| SCOPED_TIMER(_plan_local_exchanger_timer); |
| RETURN_IF_ERROR(_plan_local_exchange(_params.num_buckets, |
| _params.bucket_seq_to_instance_idx, |
| _params.shuffle_idx_to_instance_idx)); |
| } |
| |
| // 5. Initialize global states in pipelines. |
| for (PipelinePtr& pipeline : _pipelines) { |
| SCOPED_TIMER(_prepare_all_pipelines_timer); |
| pipeline->children().clear(); |
| RETURN_IF_ERROR(pipeline->prepare(_runtime_state.get())); |
| } |
| |
| { |
| SCOPED_TIMER(_build_tasks_timer); |
| // 6. Build pipeline tasks and initialize local state. |
| RETURN_IF_ERROR(_build_pipeline_tasks(thread_pool)); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status PipelineFragmentContext::prepare(ThreadPool* thread_pool) { |
| if (_prepared) { |
| return Status::InternalError("Already prepared"); |
| } |
| if (_params.__isset.query_options && _params.query_options.__isset.execution_timeout) { |
| _timeout = _params.query_options.execution_timeout; |
| } |
| |
| _fragment_level_profile = std::make_unique<RuntimeProfile>("PipelineContext"); |
| _prepare_timer = ADD_TIMER(_fragment_level_profile, "PrepareTime"); |
| SCOPED_TIMER(_prepare_timer); |
| _build_pipelines_timer = ADD_TIMER(_fragment_level_profile, "BuildPipelinesTime"); |
| _init_context_timer = ADD_TIMER(_fragment_level_profile, "InitContextTime"); |
| _plan_local_exchanger_timer = ADD_TIMER(_fragment_level_profile, "PlanLocalLocalExchangerTime"); |
| _build_tasks_timer = ADD_TIMER(_fragment_level_profile, "BuildTasksTime"); |
| _prepare_all_pipelines_timer = ADD_TIMER(_fragment_level_profile, "PrepareAllPipelinesTime"); |
| { |
| SCOPED_TIMER(_init_context_timer); |
| cast_set(_num_instances, _params.local_params.size()); |
| _total_instances = |
| _params.__isset.total_instances ? _params.total_instances : _num_instances; |
| |
| auto* fragment_context = this; |
| |
| if (_params.query_options.__isset.is_report_success) { |
| fragment_context->set_is_report_success(_params.query_options.is_report_success); |
| } |
| |
| // 1. Set up the global runtime state. |
| _runtime_state = RuntimeState::create_unique( |
| _params.query_id, _params.fragment_id, _params.query_options, |
| _query_ctx->query_globals, _exec_env, _query_ctx.get()); |
| _runtime_state->set_task_execution_context(shared_from_this()); |
| SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_runtime_state->query_mem_tracker()); |
| if (_params.__isset.backend_id) { |
| _runtime_state->set_backend_id(_params.backend_id); |
| } |
| if (_params.__isset.import_label) { |
| _runtime_state->set_import_label(_params.import_label); |
| } |
| if (_params.__isset.db_name) { |
| _runtime_state->set_db_name(_params.db_name); |
| } |
| if (_params.__isset.load_job_id) { |
| _runtime_state->set_load_job_id(_params.load_job_id); |
| } |
| |
| if (_params.is_simplified_param) { |
| _desc_tbl = _query_ctx->desc_tbl; |
| } else { |
| DCHECK(_params.__isset.desc_tbl); |
| RETURN_IF_ERROR(DescriptorTbl::create(_runtime_state->obj_pool(), _params.desc_tbl, |
| &_desc_tbl)); |
| } |
| _runtime_state->set_desc_tbl(_desc_tbl); |
| _runtime_state->set_num_per_fragment_instances(_params.num_senders); |
| _runtime_state->set_load_stream_per_node(_params.load_stream_per_node); |
| _runtime_state->set_total_load_streams(_params.total_load_streams); |
| _runtime_state->set_num_local_sink(_params.num_local_sink); |
| |
| // init fragment_instance_ids |
| const auto target_size = _params.local_params.size(); |
| _fragment_instance_ids.resize(target_size); |
| for (size_t i = 0; i < _params.local_params.size(); i++) { |
| auto fragment_instance_id = _params.local_params[i].fragment_instance_id; |
| _fragment_instance_ids[i] = fragment_instance_id; |
| } |
| } |
| |
| RETURN_IF_ERROR(_build_and_prepare_full_pipeline(thread_pool)); |
| |
| _init_next_report_time(); |
| |
| _prepared = true; |
| return Status::OK(); |
| } |
| |
| Status PipelineFragmentContext::_build_pipeline_tasks_for_instance( |
| int instance_idx, |
| const std::vector<std::shared_ptr<RuntimeProfile>>& pipeline_id_to_profile) { |
| const auto& local_params = _params.local_params[instance_idx]; |
| auto fragment_instance_id = local_params.fragment_instance_id; |
| auto runtime_filter_mgr = std::make_unique<RuntimeFilterMgr>(false); |
| std::map<PipelineId, PipelineTask*> pipeline_id_to_task; |
| auto get_shared_state = [&](PipelinePtr pipeline) |
| -> std::map<int, std::pair<std::shared_ptr<BasicSharedState>, |
| std::vector<std::shared_ptr<Dependency>>>> { |
| std::map<int, std::pair<std::shared_ptr<BasicSharedState>, |
| std::vector<std::shared_ptr<Dependency>>>> |
| shared_state_map; |
| for (auto& op : pipeline->operators()) { |
| auto source_id = op->operator_id(); |
| if (auto iter = _op_id_to_shared_state.find(source_id); |
| iter != _op_id_to_shared_state.end()) { |
| shared_state_map.insert({source_id, iter->second}); |
| } |
| } |
| for (auto sink_to_source_id : pipeline->sink()->dests_id()) { |
| if (auto iter = _op_id_to_shared_state.find(sink_to_source_id); |
| iter != _op_id_to_shared_state.end()) { |
| shared_state_map.insert({sink_to_source_id, iter->second}); |
| } |
| } |
| return shared_state_map; |
| }; |
| |
| for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) { |
| auto& pipeline = _pipelines[pip_idx]; |
| if (pipeline->num_tasks() > 1 || instance_idx == 0) { |
| auto task_runtime_state = RuntimeState::create_unique( |
| local_params.fragment_instance_id, _params.query_id, _params.fragment_id, |
| _params.query_options, _query_ctx->query_globals, _exec_env, _query_ctx.get()); |
| { |
| // Initialize runtime state for this task |
| task_runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker()); |
| |
| task_runtime_state->set_task_execution_context(shared_from_this()); |
| task_runtime_state->set_be_number(local_params.backend_num); |
| if (_need_notify_close) { |
| // rec cte require child rf to wait infinitely to make sure all rpc done |
| task_runtime_state->set_force_make_rf_wait_infinite(); |
| } |
| |
| if (_params.__isset.backend_id) { |
| task_runtime_state->set_backend_id(_params.backend_id); |
| } |
| if (_params.__isset.import_label) { |
| task_runtime_state->set_import_label(_params.import_label); |
| } |
| if (_params.__isset.db_name) { |
| task_runtime_state->set_db_name(_params.db_name); |
| } |
| if (_params.__isset.load_job_id) { |
| task_runtime_state->set_load_job_id(_params.load_job_id); |
| } |
| if (_params.__isset.wal_id) { |
| task_runtime_state->set_wal_id(_params.wal_id); |
| } |
| if (_params.__isset.content_length) { |
| task_runtime_state->set_content_length(_params.content_length); |
| } |
| |
| task_runtime_state->set_desc_tbl(_desc_tbl); |
| task_runtime_state->set_per_fragment_instance_idx(local_params.sender_id); |
| task_runtime_state->set_num_per_fragment_instances(_params.num_senders); |
| task_runtime_state->resize_op_id_to_local_state(max_operator_id()); |
| task_runtime_state->set_max_operator_id(max_operator_id()); |
| task_runtime_state->set_load_stream_per_node(_params.load_stream_per_node); |
| task_runtime_state->set_total_load_streams(_params.total_load_streams); |
| task_runtime_state->set_num_local_sink(_params.num_local_sink); |
| |
| task_runtime_state->set_runtime_filter_mgr(runtime_filter_mgr.get()); |
| } |
| auto cur_task_id = _total_tasks++; |
| task_runtime_state->set_task_id(cur_task_id); |
| task_runtime_state->set_task_num(pipeline->num_tasks()); |
| auto task = std::make_shared<PipelineTask>( |
| pipeline, cur_task_id, task_runtime_state.get(), |
| std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this()), |
| pipeline_id_to_profile[pip_idx].get(), get_shared_state(pipeline), |
| instance_idx); |
| pipeline->incr_created_tasks(instance_idx, task.get()); |
| pipeline_id_to_task.insert({pipeline->id(), task.get()}); |
| _tasks[instance_idx].emplace_back( |
| std::pair<std::shared_ptr<PipelineTask>, std::unique_ptr<RuntimeState>> { |
| std::move(task), std::move(task_runtime_state)}); |
| } |
| } |
| |
| /** |
| * Build DAG for pipeline tasks. |
| * For example, we have |
| * |
| * ExchangeSink (Pipeline1) JoinBuildSink (Pipeline2) |
| * \ / |
| * JoinProbeOperator1 (Pipeline1) JoinBuildSink (Pipeline3) |
| * \ / |
| * JoinProbeOperator2 (Pipeline1) |
| * |
| * In this fragment, we have three pipelines and pipeline 1 depends on pipeline 2 and pipeline 3. |
| * To build this DAG, `_dag` manage dependencies between pipelines by pipeline ID and |
| * `pipeline_id_to_task` is used to find the task by a unique pipeline ID. |
| * |
| * Finally, we have two upstream dependencies in Pipeline1 corresponding to JoinProbeOperator1 |
| * and JoinProbeOperator2. |
| */ |
| for (auto& _pipeline : _pipelines) { |
| if (pipeline_id_to_task.contains(_pipeline->id())) { |
| auto* task = pipeline_id_to_task[_pipeline->id()]; |
| DCHECK(task != nullptr); |
| |
| // If this task has upstream dependency, then inject it into this task. |
| if (_dag.contains(_pipeline->id())) { |
| auto& deps = _dag[_pipeline->id()]; |
| for (auto& dep : deps) { |
| if (pipeline_id_to_task.contains(dep)) { |
| auto ss = pipeline_id_to_task[dep]->get_sink_shared_state(); |
| if (ss) { |
| task->inject_shared_state(ss); |
| } else { |
| pipeline_id_to_task[dep]->inject_shared_state( |
| task->get_source_shared_state()); |
| } |
| } |
| } |
| } |
| } |
| } |
| for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) { |
| if (pipeline_id_to_task.contains(_pipelines[pip_idx]->id())) { |
| auto* task = pipeline_id_to_task[_pipelines[pip_idx]->id()]; |
| DCHECK(pipeline_id_to_profile[pip_idx]); |
| std::vector<TScanRangeParams> scan_ranges; |
| auto node_id = _pipelines[pip_idx]->operators().front()->node_id(); |
| if (local_params.per_node_scan_ranges.contains(node_id)) { |
| scan_ranges = local_params.per_node_scan_ranges.find(node_id)->second; |
| } |
| RETURN_IF_ERROR_OR_CATCH_EXCEPTION(task->prepare(scan_ranges, local_params.sender_id, |
| _params.fragment.output_sink)); |
| } |
| } |
| { |
| std::lock_guard<std::mutex> l(_state_map_lock); |
| _runtime_filter_mgr_map[instance_idx] = std::move(runtime_filter_mgr); |
| } |
| return Status::OK(); |
| } |
| |
| Status PipelineFragmentContext::_build_pipeline_tasks(ThreadPool* thread_pool) { |
| _total_tasks = 0; |
| _closed_tasks = 0; |
| const auto target_size = _params.local_params.size(); |
| _tasks.resize(target_size); |
| _runtime_filter_mgr_map.resize(target_size); |
| for (size_t pip_idx = 0; pip_idx < _pipelines.size(); pip_idx++) { |
| _pip_id_to_pipeline[_pipelines[pip_idx]->id()] = _pipelines[pip_idx].get(); |
| } |
| auto pipeline_id_to_profile = _runtime_state->build_pipeline_profile(_pipelines.size()); |
| |
| if (target_size > 1 && |
| (_runtime_state->query_options().__isset.parallel_prepare_threshold && |
| target_size > _runtime_state->query_options().parallel_prepare_threshold)) { |
| // If instances parallelism is big enough ( > parallel_prepare_threshold), we will prepare all tasks by multi-threads |
| std::vector<Status> prepare_status(target_size); |
| int submitted_tasks = 0; |
| Status submit_status; |
| CountDownLatch latch((int)target_size); |
| for (int i = 0; i < target_size; i++) { |
| submit_status = thread_pool->submit_func([&, i]() { |
| SCOPED_ATTACH_TASK(_query_ctx.get()); |
| prepare_status[i] = _build_pipeline_tasks_for_instance(i, pipeline_id_to_profile); |
| latch.count_down(); |
| }); |
| if (LIKELY(submit_status.ok())) { |
| submitted_tasks++; |
| } else { |
| break; |
| } |
| } |
| latch.arrive_and_wait(target_size - submitted_tasks); |
| if (UNLIKELY(!submit_status.ok())) { |
| return submit_status; |
| } |
| for (int i = 0; i < submitted_tasks; i++) { |
| if (!prepare_status[i].ok()) { |
| return prepare_status[i]; |
| } |
| } |
| } else { |
| for (int i = 0; i < target_size; i++) { |
| RETURN_IF_ERROR(_build_pipeline_tasks_for_instance(i, pipeline_id_to_profile)); |
| } |
| } |
| _pipeline_parent_map.clear(); |
| _op_id_to_shared_state.clear(); |
| |
| return Status::OK(); |
| } |
| |
| void PipelineFragmentContext::_init_next_report_time() { |
| auto interval_s = config::pipeline_status_report_interval; |
| if (_is_report_success && interval_s > 0 && _timeout > interval_s) { |
| VLOG_FILE << "enable period report: fragment id=" << _fragment_id; |
| uint64_t report_fragment_offset = (uint64_t)(rand() % interval_s) * NANOS_PER_SEC; |
| // We don't want to wait longer than it takes to run the entire fragment. |
| _previous_report_time = |
| MonotonicNanos() + report_fragment_offset - (uint64_t)(interval_s)*NANOS_PER_SEC; |
| _disable_period_report = false; |
| } |
| } |
| |
| void PipelineFragmentContext::refresh_next_report_time() { |
| auto disable = _disable_period_report.load(std::memory_order_acquire); |
| DCHECK(disable == true); |
| _previous_report_time.store(MonotonicNanos(), std::memory_order_release); |
| _disable_period_report.compare_exchange_strong(disable, false); |
| } |
| |
| void PipelineFragmentContext::trigger_report_if_necessary() { |
| if (!_is_report_success) { |
| return; |
| } |
| auto disable = _disable_period_report.load(std::memory_order_acquire); |
| if (disable) { |
| return; |
| } |
| int32_t interval_s = config::pipeline_status_report_interval; |
| if (interval_s <= 0) { |
| LOG(WARNING) << "config::status_report_interval is equal to or less than zero, do not " |
| "trigger " |
| "report."; |
| } |
| uint64_t next_report_time = _previous_report_time.load(std::memory_order_acquire) + |
| (uint64_t)(interval_s)*NANOS_PER_SEC; |
| if (MonotonicNanos() > next_report_time) { |
| if (!_disable_period_report.compare_exchange_strong(disable, true, |
| std::memory_order_acq_rel)) { |
| return; |
| } |
| if (VLOG_FILE_IS_ON) { |
| VLOG_FILE << "Reporting " |
| << "profile for query_id " << print_id(_query_id) |
| << ", fragment id: " << _fragment_id; |
| |
| std::stringstream ss; |
| _runtime_state->runtime_profile()->compute_time_in_profile(); |
| _runtime_state->runtime_profile()->pretty_print(&ss); |
| if (_runtime_state->load_channel_profile()) { |
| _runtime_state->load_channel_profile()->pretty_print(&ss); |
| } |
| |
| VLOG_FILE << "Query " << print_id(get_query_id()) << " fragment " << get_fragment_id() |
| << " profile:\n" |
| << ss.str(); |
| } |
| auto st = send_report(false); |
| if (!st.ok()) { |
| disable = true; |
| _disable_period_report.compare_exchange_strong(disable, false, |
| std::memory_order_acq_rel); |
| } |
| } |
| } |
| |
| Status PipelineFragmentContext::_build_pipelines(ObjectPool* pool, const DescriptorTbl& descs, |
| OperatorPtr* root, PipelinePtr cur_pipe) { |
| if (_params.fragment.plan.nodes.empty()) { |
| throw Exception(ErrorCode::INTERNAL_ERROR, "Invalid plan which has no plan node!"); |
| } |
| |
| int node_idx = 0; |
| |
| RETURN_IF_ERROR(_create_tree_helper(pool, _params.fragment.plan.nodes, descs, nullptr, |
| &node_idx, root, cur_pipe, 0, false, false)); |
| |
| if (node_idx + 1 != _params.fragment.plan.nodes.size()) { |
| return Status::InternalError( |
| "Plan tree only partially reconstructed. Not all thrift nodes were used."); |
| } |
| return Status::OK(); |
| } |
| |
| Status PipelineFragmentContext::_create_tree_helper( |
| ObjectPool* pool, const std::vector<TPlanNode>& tnodes, const DescriptorTbl& descs, |
| OperatorPtr parent, int* node_idx, OperatorPtr* root, PipelinePtr& cur_pipe, int child_idx, |
| const bool followed_by_shuffled_operator, const bool require_bucket_distribution) { |
| // propagate error case |
| if (*node_idx >= tnodes.size()) { |
| return Status::InternalError( |
| "Failed to reconstruct plan tree from thrift. Node id: {}, number of nodes: {}", |
| *node_idx, tnodes.size()); |
| } |
| const TPlanNode& tnode = tnodes[*node_idx]; |
| |
| int num_children = tnodes[*node_idx].num_children; |
| bool current_followed_by_shuffled_operator = followed_by_shuffled_operator; |
| bool current_require_bucket_distribution = require_bucket_distribution; |
| // TODO: Create CacheOperator is confused now |
| OperatorPtr op = nullptr; |
| OperatorPtr cache_op = nullptr; |
| RETURN_IF_ERROR(_create_operator(pool, tnodes[*node_idx], descs, op, cur_pipe, |
| parent == nullptr ? -1 : parent->node_id(), child_idx, |
| followed_by_shuffled_operator, |
| current_require_bucket_distribution, cache_op)); |
| // Initialization must be done here. For example, group by expressions in agg will be used to |
| // decide if a local shuffle should be planed, so it must be initialized here. |
| RETURN_IF_ERROR(op->init(tnode, _runtime_state.get())); |
| // assert(parent != nullptr || (node_idx == 0 && root_expr != nullptr)); |
| if (parent != nullptr) { |
| // add to parent's child(s) |
| RETURN_IF_ERROR(parent->set_child(cache_op ? cache_op : op)); |
| } else { |
| *root = op; |
| } |
| /** |
| * `ExchangeType::HASH_SHUFFLE` should be used if an operator is followed by a shuffled operator (shuffled hash join, union operator followed by co-located operators). |
| * |
| * For plan: |
| * LocalExchange(id=0) -> Aggregation(id=1) -> ShuffledHashJoin(id=2) |
| * Exchange(id=3) -> ShuffledHashJoinBuild(id=2) |
| * We must ensure data distribution of `LocalExchange(id=0)` is same as Exchange(id=3). |
| * |
| * If an operator's is followed by a local exchange without shuffle (e.g. passthrough), a |
| * shuffled local exchanger will be used before join so it is not followed by shuffle join. |
| */ |
| auto required_data_distribution = |
| cur_pipe->operators().empty() |
| ? cur_pipe->sink()->required_data_distribution(_runtime_state.get()) |
| : op->required_data_distribution(_runtime_state.get()); |
| current_followed_by_shuffled_operator = |
| ((followed_by_shuffled_operator || |
| (cur_pipe->operators().empty() ? cur_pipe->sink()->is_shuffled_operator() |
| : op->is_shuffled_operator())) && |
| Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) || |
| (followed_by_shuffled_operator && |
| required_data_distribution.distribution_type == ExchangeType::NOOP); |
| |
| current_require_bucket_distribution = |
| ((require_bucket_distribution || |
| (cur_pipe->operators().empty() ? cur_pipe->sink()->is_colocated_operator() |
| : op->is_colocated_operator())) && |
| Pipeline::is_hash_exchange(required_data_distribution.distribution_type)) || |
| (require_bucket_distribution && |
| required_data_distribution.distribution_type == ExchangeType::NOOP); |
| |
| if (num_children == 0) { |
| _use_serial_source = op->is_serial_operator(); |
| } |
| // rely on that tnodes is preorder of the plan |
| for (int i = 0; i < num_children; i++) { |
| ++*node_idx; |
| RETURN_IF_ERROR(_create_tree_helper(pool, tnodes, descs, op, node_idx, nullptr, cur_pipe, i, |
| current_followed_by_shuffled_operator, |
| current_require_bucket_distribution)); |
| |
| // we are expecting a child, but have used all nodes |
| // this means we have been given a bad tree and must fail |
| if (*node_idx >= tnodes.size()) { |
| return Status::InternalError( |
| "Failed to reconstruct plan tree from thrift. Node id: {}, number of " |
| "nodes: {}", |
| *node_idx, tnodes.size()); |
| } |
| } |
| |
| return Status::OK(); |
| } |
| |
| void PipelineFragmentContext::_inherit_pipeline_properties( |
| const DataDistribution& data_distribution, PipelinePtr pipe_with_source, |
| PipelinePtr pipe_with_sink) { |
| pipe_with_sink->set_num_tasks(pipe_with_source->num_tasks()); |
| pipe_with_source->set_num_tasks(_num_instances); |
| pipe_with_source->set_data_distribution(data_distribution); |
| } |
| |
| Status PipelineFragmentContext::_add_local_exchange_impl( |
| int idx, ObjectPool* pool, PipelinePtr cur_pipe, PipelinePtr new_pip, |
| DataDistribution data_distribution, bool* do_local_exchange, int num_buckets, |
| const std::map<int, int>& bucket_seq_to_instance_idx, |
| const std::map<int, int>& shuffle_idx_to_instance_idx) { |
| auto& operators = cur_pipe->operators(); |
| const auto downstream_pipeline_id = cur_pipe->id(); |
| auto local_exchange_id = next_operator_id(); |
| // 1. Create a new pipeline with local exchange sink. |
| DataSinkOperatorPtr sink; |
| auto sink_id = next_sink_operator_id(); |
| |
| /** |
| * `bucket_seq_to_instance_idx` is empty if no scan operator is contained in this fragment. |
| * So co-located operators(e.g. Agg, Analytic) should use `HASH_SHUFFLE` instead of `BUCKET_HASH_SHUFFLE`. |
| */ |
| const bool followed_by_shuffled_operator = |
| operators.size() > idx ? operators[idx]->followed_by_shuffled_operator() |
| : cur_pipe->sink()->followed_by_shuffled_operator(); |
| const bool use_global_hash_shuffle = bucket_seq_to_instance_idx.empty() && |
| !shuffle_idx_to_instance_idx.contains(-1) && |
| followed_by_shuffled_operator && !_use_serial_source; |
| sink = std::make_shared<LocalExchangeSinkOperatorX>( |
| sink_id, local_exchange_id, use_global_hash_shuffle ? _total_instances : _num_instances, |
| data_distribution.partition_exprs, bucket_seq_to_instance_idx); |
| if (bucket_seq_to_instance_idx.empty() && |
| data_distribution.distribution_type == ExchangeType::BUCKET_HASH_SHUFFLE) { |
| data_distribution.distribution_type = ExchangeType::HASH_SHUFFLE; |
| } |
| RETURN_IF_ERROR(new_pip->set_sink(sink)); |
| RETURN_IF_ERROR(new_pip->sink()->init(_runtime_state.get(), data_distribution.distribution_type, |
| num_buckets, use_global_hash_shuffle, |
| shuffle_idx_to_instance_idx)); |
| |
| // 2. Create and initialize LocalExchangeSharedState. |
| std::shared_ptr<LocalExchangeSharedState> shared_state = |
| LocalExchangeSharedState::create_shared(_num_instances); |
| switch (data_distribution.distribution_type) { |
| case ExchangeType::HASH_SHUFFLE: |
| shared_state->exchanger = ShuffleExchanger::create_unique( |
| std::max(cur_pipe->num_tasks(), _num_instances), _num_instances, |
| use_global_hash_shuffle ? _total_instances : _num_instances, |
| _runtime_state->query_options().__isset.local_exchange_free_blocks_limit |
| ? cast_set<int>( |
| _runtime_state->query_options().local_exchange_free_blocks_limit) |
| : 0); |
| break; |
| case ExchangeType::BUCKET_HASH_SHUFFLE: |
| shared_state->exchanger = BucketShuffleExchanger::create_unique( |
| std::max(cur_pipe->num_tasks(), _num_instances), _num_instances, num_buckets, |
| _runtime_state->query_options().__isset.local_exchange_free_blocks_limit |
| ? cast_set<int>( |
| _runtime_state->query_options().local_exchange_free_blocks_limit) |
| : 0); |
| break; |
| case ExchangeType::PASSTHROUGH: |
| shared_state->exchanger = PassthroughExchanger::create_unique( |
| cur_pipe->num_tasks(), _num_instances, |
| _runtime_state->query_options().__isset.local_exchange_free_blocks_limit |
| ? cast_set<int>( |
| _runtime_state->query_options().local_exchange_free_blocks_limit) |
| : 0); |
| break; |
| case ExchangeType::BROADCAST: |
| shared_state->exchanger = BroadcastExchanger::create_unique( |
| cur_pipe->num_tasks(), _num_instances, |
| _runtime_state->query_options().__isset.local_exchange_free_blocks_limit |
| ? cast_set<int>( |
| _runtime_state->query_options().local_exchange_free_blocks_limit) |
| : 0); |
| break; |
| case ExchangeType::PASS_TO_ONE: |
| if (_runtime_state->enable_share_hash_table_for_broadcast_join()) { |
| // If shared hash table is enabled for BJ, hash table will be built by only one task |
| shared_state->exchanger = PassToOneExchanger::create_unique( |
| cur_pipe->num_tasks(), _num_instances, |
| _runtime_state->query_options().__isset.local_exchange_free_blocks_limit |
| ? cast_set<int>(_runtime_state->query_options() |
| .local_exchange_free_blocks_limit) |
| : 0); |
| } else { |
| shared_state->exchanger = BroadcastExchanger::create_unique( |
| cur_pipe->num_tasks(), _num_instances, |
| _runtime_state->query_options().__isset.local_exchange_free_blocks_limit |
| ? cast_set<int>(_runtime_state->query_options() |
| .local_exchange_free_blocks_limit) |
| : 0); |
| } |
| break; |
| case ExchangeType::ADAPTIVE_PASSTHROUGH: |
| shared_state->exchanger = AdaptivePassthroughExchanger::create_unique( |
| std::max(cur_pipe->num_tasks(), _num_instances), _num_instances, |
| _runtime_state->query_options().__isset.local_exchange_free_blocks_limit |
| ? cast_set<int>( |
| _runtime_state->query_options().local_exchange_free_blocks_limit) |
| : 0); |
| break; |
| default: |
| return Status::InternalError("Unsupported local exchange type : " + |
| std::to_string((int)data_distribution.distribution_type)); |
| } |
| shared_state->create_source_dependencies(_num_instances, local_exchange_id, local_exchange_id, |
| "LOCAL_EXCHANGE_OPERATOR"); |
| shared_state->create_sink_dependency(sink_id, local_exchange_id, "LOCAL_EXCHANGE_SINK"); |
| _op_id_to_shared_state.insert({local_exchange_id, {shared_state, shared_state->sink_deps}}); |
| |
| // 3. Set two pipelines' operator list. For example, split pipeline [Scan - AggSink] to |
| // pipeline1 [Scan - LocalExchangeSink] and pipeline2 [LocalExchangeSource - AggSink]. |
| |
| // 3.1 Initialize new pipeline's operator list. |
| std::copy(operators.begin(), operators.begin() + idx, |
| std::inserter(new_pip->operators(), new_pip->operators().end())); |
| |
| // 3.2 Erase unused operators in previous pipeline. |
| operators.erase(operators.begin(), operators.begin() + idx); |
| |
| // 4. Initialize LocalExchangeSource and insert it into this pipeline. |
| OperatorPtr source_op; |
| source_op = std::make_shared<LocalExchangeSourceOperatorX>(pool, local_exchange_id); |
| RETURN_IF_ERROR(source_op->set_child(new_pip->operators().back())); |
| RETURN_IF_ERROR(source_op->init(data_distribution.distribution_type)); |
| if (!operators.empty()) { |
| RETURN_IF_ERROR(operators.front()->set_child(nullptr)); |
| RETURN_IF_ERROR(operators.front()->set_child(source_op)); |
| } |
| operators.insert(operators.begin(), source_op); |
| |
| // 5. Set children for two pipelines separately. |
| std::vector<std::shared_ptr<Pipeline>> new_children; |
| std::vector<PipelineId> edges_with_source; |
| for (auto child : cur_pipe->children()) { |
| bool found = false; |
| for (auto op : new_pip->operators()) { |
| if (child->sink()->node_id() == op->node_id()) { |
| new_pip->set_children(child); |
| found = true; |
| }; |
| } |
| if (!found) { |
| new_children.push_back(child); |
| edges_with_source.push_back(child->id()); |
| } |
| } |
| new_children.push_back(new_pip); |
| edges_with_source.push_back(new_pip->id()); |
| |
| // 6. Set DAG for new pipelines. |
| if (!new_pip->children().empty()) { |
| std::vector<PipelineId> edges_with_sink; |
| for (auto child : new_pip->children()) { |
| edges_with_sink.push_back(child->id()); |
| } |
| _dag.insert({new_pip->id(), edges_with_sink}); |
| } |
| cur_pipe->set_children(new_children); |
| _dag[downstream_pipeline_id] = edges_with_source; |
| RETURN_IF_ERROR(new_pip->sink()->set_child(new_pip->operators().back())); |
| RETURN_IF_ERROR(cur_pipe->sink()->set_child(nullptr)); |
| RETURN_IF_ERROR(cur_pipe->sink()->set_child(cur_pipe->operators().back())); |
| |
| // 7. Inherit properties from current pipeline. |
| _inherit_pipeline_properties(data_distribution, cur_pipe, new_pip); |
| return Status::OK(); |
| } |
| |
| Status PipelineFragmentContext::_add_local_exchange( |
| int pip_idx, int idx, int node_id, ObjectPool* pool, PipelinePtr cur_pipe, |
| DataDistribution data_distribution, bool* do_local_exchange, int num_buckets, |
| const std::map<int, int>& bucket_seq_to_instance_idx, |
| const std::map<int, int>& shuffle_idx_to_instance_idx) { |
| if (_num_instances <= 1 || cur_pipe->num_tasks_of_parent() <= 1) { |
| return Status::OK(); |
| } |
| |
| if (!cur_pipe->need_to_local_exchange(data_distribution, idx)) { |
| return Status::OK(); |
| } |
| *do_local_exchange = true; |
| |
| auto& operators = cur_pipe->operators(); |
| auto total_op_num = operators.size(); |
| auto new_pip = add_pipeline(cur_pipe, pip_idx + 1); |
| RETURN_IF_ERROR(_add_local_exchange_impl( |
| idx, pool, cur_pipe, new_pip, data_distribution, do_local_exchange, num_buckets, |
| bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx)); |
| |
| CHECK(total_op_num + 1 == cur_pipe->operators().size() + new_pip->operators().size()) |
| << "total_op_num: " << total_op_num |
| << " cur_pipe->operators().size(): " << cur_pipe->operators().size() |
| << " new_pip->operators().size(): " << new_pip->operators().size(); |
| |
| // There are some local shuffles with relatively heavy operations on the sink. |
| // If the local sink concurrency is 1 and the local source concurrency is n, the sink becomes a bottleneck. |
| // Therefore, local passthrough is used to increase the concurrency of the sink. |
| // op -> local sink(1) -> local source (n) |
| // op -> local passthrough(1) -> local passthrough(n) -> local sink(n) -> local source (n) |
| if (cur_pipe->num_tasks() > 1 && new_pip->num_tasks() == 1 && |
| Pipeline::heavy_operations_on_the_sink(data_distribution.distribution_type)) { |
| RETURN_IF_ERROR(_add_local_exchange_impl( |
| cast_set<int>(new_pip->operators().size()), pool, new_pip, |
| add_pipeline(new_pip, pip_idx + 2), DataDistribution(ExchangeType::PASSTHROUGH), |
| do_local_exchange, num_buckets, bucket_seq_to_instance_idx, |
| shuffle_idx_to_instance_idx)); |
| } |
| return Status::OK(); |
| } |
| |
| Status PipelineFragmentContext::_plan_local_exchange( |
| int num_buckets, const std::map<int, int>& bucket_seq_to_instance_idx, |
| const std::map<int, int>& shuffle_idx_to_instance_idx) { |
| for (int pip_idx = cast_set<int>(_pipelines.size()) - 1; pip_idx >= 0; pip_idx--) { |
| _pipelines[pip_idx]->init_data_distribution(_runtime_state.get()); |
| // Set property if child pipeline is not join operator's child. |
| if (!_pipelines[pip_idx]->children().empty()) { |
| for (auto& child : _pipelines[pip_idx]->children()) { |
| if (child->sink()->node_id() == |
| _pipelines[pip_idx]->operators().front()->node_id()) { |
| _pipelines[pip_idx]->set_data_distribution(child->data_distribution()); |
| } |
| } |
| } |
| |
| // if 'num_buckets == 0' means the fragment is colocated by exchange node not the |
| // scan node. so here use `_num_instance` to replace the `num_buckets` to prevent dividing 0 |
| // still keep colocate plan after local shuffle |
| RETURN_IF_ERROR(_plan_local_exchange(num_buckets, pip_idx, _pipelines[pip_idx], |
| bucket_seq_to_instance_idx, |
| shuffle_idx_to_instance_idx)); |
| } |
| return Status::OK(); |
| } |
| |
| Status PipelineFragmentContext::_plan_local_exchange( |
| int num_buckets, int pip_idx, PipelinePtr pip, |
| const std::map<int, int>& bucket_seq_to_instance_idx, |
| const std::map<int, int>& shuffle_idx_to_instance_idx) { |
| int idx = 1; |
| bool do_local_exchange = false; |
| do { |
| auto& ops = pip->operators(); |
| do_local_exchange = false; |
| // Plan local exchange for each operator. |
| for (; idx < ops.size();) { |
| if (ops[idx]->required_data_distribution(_runtime_state.get()).need_local_exchange()) { |
| RETURN_IF_ERROR(_add_local_exchange( |
| pip_idx, idx, ops[idx]->node_id(), _runtime_state->obj_pool(), pip, |
| ops[idx]->required_data_distribution(_runtime_state.get()), |
| &do_local_exchange, num_buckets, bucket_seq_to_instance_idx, |
| shuffle_idx_to_instance_idx)); |
| } |
| if (do_local_exchange) { |
| // If local exchange is needed for current operator, we will split this pipeline to |
| // two pipelines by local exchange sink/source. And then we need to process remaining |
| // operators in this pipeline so we set idx to 2 (0 is local exchange source and 1 |
| // is current operator was already processed) and continue to plan local exchange. |
| idx = 2; |
| break; |
| } |
| idx++; |
| } |
| } while (do_local_exchange); |
| if (pip->sink()->required_data_distribution(_runtime_state.get()).need_local_exchange()) { |
| RETURN_IF_ERROR(_add_local_exchange( |
| pip_idx, idx, pip->sink()->node_id(), _runtime_state->obj_pool(), pip, |
| pip->sink()->required_data_distribution(_runtime_state.get()), &do_local_exchange, |
| num_buckets, bucket_seq_to_instance_idx, shuffle_idx_to_instance_idx)); |
| } |
| return Status::OK(); |
| } |
| |
| Status PipelineFragmentContext::_create_data_sink(ObjectPool* pool, const TDataSink& thrift_sink, |
| const std::vector<TExpr>& output_exprs, |
| const TPipelineFragmentParams& params, |
| const RowDescriptor& row_desc, |
| RuntimeState* state, DescriptorTbl& desc_tbl, |
| PipelineId cur_pipeline_id) { |
| switch (thrift_sink.type) { |
| case TDataSinkType::DATA_STREAM_SINK: { |
| if (!thrift_sink.__isset.stream_sink) { |
| return Status::InternalError("Missing data stream sink."); |
| } |
| _sink = std::make_shared<ExchangeSinkOperatorX>( |
| state, row_desc, next_sink_operator_id(), thrift_sink.stream_sink, |
| params.destinations, _fragment_instance_ids); |
| break; |
| } |
| case TDataSinkType::RESULT_SINK: { |
| if (!thrift_sink.__isset.result_sink) { |
| return Status::InternalError("Missing data buffer sink."); |
| } |
| |
| _sink = std::make_shared<ResultSinkOperatorX>(next_sink_operator_id(), row_desc, |
| output_exprs, thrift_sink.result_sink); |
| break; |
| } |
| case TDataSinkType::DICTIONARY_SINK: { |
| if (!thrift_sink.__isset.dictionary_sink) { |
| return Status::InternalError("Missing dict sink."); |
| } |
| |
| _sink = std::make_shared<DictSinkOperatorX>(next_sink_operator_id(), row_desc, output_exprs, |
| thrift_sink.dictionary_sink); |
| break; |
| } |
| case TDataSinkType::GROUP_COMMIT_OLAP_TABLE_SINK: |
| case TDataSinkType::OLAP_TABLE_SINK: { |
| if (state->query_options().enable_memtable_on_sink_node && |
| !_has_inverted_index_v1_or_partial_update(thrift_sink.olap_table_sink) && |
| !config::is_cloud_mode()) { |
| _sink = std::make_shared<OlapTableSinkV2OperatorX>(pool, next_sink_operator_id(), |
| row_desc, output_exprs); |
| } else { |
| _sink = std::make_shared<OlapTableSinkOperatorX>(pool, next_sink_operator_id(), |
| row_desc, output_exprs); |
| } |
| break; |
| } |
| case TDataSinkType::GROUP_COMMIT_BLOCK_SINK: { |
| DCHECK(thrift_sink.__isset.olap_table_sink); |
| DCHECK(state->get_query_ctx() != nullptr); |
| state->get_query_ctx()->query_mem_tracker()->is_group_commit_load = true; |
| _sink = std::make_shared<GroupCommitBlockSinkOperatorX>(next_sink_operator_id(), row_desc, |
| output_exprs); |
| break; |
| } |
| case TDataSinkType::HIVE_TABLE_SINK: { |
| if (!thrift_sink.__isset.hive_table_sink) { |
| return Status::InternalError("Missing hive table sink."); |
| } |
| _sink = std::make_shared<HiveTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc, |
| output_exprs); |
| break; |
| } |
| case TDataSinkType::ICEBERG_TABLE_SINK: { |
| if (!thrift_sink.__isset.iceberg_table_sink) { |
| return Status::InternalError("Missing iceberg table sink."); |
| } |
| if (thrift_sink.iceberg_table_sink.__isset.sort_info) { |
| _sink = std::make_shared<SpillIcebergTableSinkOperatorX>(pool, next_sink_operator_id(), |
| row_desc, output_exprs); |
| } else { |
| _sink = std::make_shared<IcebergTableSinkOperatorX>(pool, next_sink_operator_id(), |
| row_desc, output_exprs); |
| } |
| break; |
| } |
| case TDataSinkType::ICEBERG_DELETE_SINK: { |
| if (!thrift_sink.__isset.iceberg_delete_sink) { |
| return Status::InternalError("Missing iceberg delete sink."); |
| } |
| _sink = std::make_shared<IcebergDeleteSinkOperatorX>(pool, next_sink_operator_id(), |
| row_desc, output_exprs); |
| break; |
| } |
| case TDataSinkType::ICEBERG_MERGE_SINK: { |
| if (!thrift_sink.__isset.iceberg_merge_sink) { |
| return Status::InternalError("Missing iceberg merge sink."); |
| } |
| _sink = std::make_shared<IcebergMergeSinkOperatorX>(pool, next_sink_operator_id(), row_desc, |
| output_exprs); |
| break; |
| } |
| case TDataSinkType::MAXCOMPUTE_TABLE_SINK: { |
| if (!thrift_sink.__isset.max_compute_table_sink) { |
| return Status::InternalError("Missing max compute table sink."); |
| } |
| _sink = std::make_shared<MCTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc, |
| output_exprs); |
| break; |
| } |
| case TDataSinkType::JDBC_TABLE_SINK: { |
| if (!thrift_sink.__isset.jdbc_table_sink) { |
| return Status::InternalError("Missing data jdbc sink."); |
| } |
| if (config::enable_java_support) { |
| _sink = std::make_shared<JdbcTableSinkOperatorX>(row_desc, next_sink_operator_id(), |
| output_exprs); |
| } else { |
| return Status::InternalError( |
| "Jdbc table sink is not enabled, you can change be config " |
| "enable_java_support to true and restart be."); |
| } |
| break; |
| } |
| case TDataSinkType::MEMORY_SCRATCH_SINK: { |
| if (!thrift_sink.__isset.memory_scratch_sink) { |
| return Status::InternalError("Missing data buffer sink."); |
| } |
| |
| _sink = std::make_shared<MemoryScratchSinkOperatorX>(row_desc, next_sink_operator_id(), |
| output_exprs); |
| break; |
| } |
| case TDataSinkType::RESULT_FILE_SINK: { |
| if (!thrift_sink.__isset.result_file_sink) { |
| return Status::InternalError("Missing result file sink."); |
| } |
| |
| // Result file sink is not the top sink |
| if (params.__isset.destinations && !params.destinations.empty()) { |
| _sink = std::make_shared<ResultFileSinkOperatorX>( |
| next_sink_operator_id(), row_desc, thrift_sink.result_file_sink, |
| params.destinations, output_exprs, desc_tbl); |
| } else { |
| _sink = std::make_shared<ResultFileSinkOperatorX>(next_sink_operator_id(), row_desc, |
| output_exprs); |
| } |
| break; |
| } |
| case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: { |
| DCHECK(thrift_sink.__isset.multi_cast_stream_sink); |
| DCHECK_GT(thrift_sink.multi_cast_stream_sink.sinks.size(), 0); |
| auto sink_id = next_sink_operator_id(); |
| const int multi_cast_node_id = sink_id; |
| auto sender_size = thrift_sink.multi_cast_stream_sink.sinks.size(); |
| // one sink has multiple sources. |
| std::vector<int> sources; |
| for (int i = 0; i < sender_size; ++i) { |
| auto source_id = next_operator_id(); |
| sources.push_back(source_id); |
| } |
| |
| _sink = std::make_shared<MultiCastDataStreamSinkOperatorX>( |
| sink_id, multi_cast_node_id, sources, pool, thrift_sink.multi_cast_stream_sink); |
| for (int i = 0; i < sender_size; ++i) { |
| auto new_pipeline = add_pipeline(); |
| // use to exchange sink |
| RowDescriptor* exchange_row_desc = nullptr; |
| { |
| const auto& tmp_row_desc = |
| !thrift_sink.multi_cast_stream_sink.sinks[i].output_exprs.empty() |
| ? RowDescriptor(state->desc_tbl(), |
| {thrift_sink.multi_cast_stream_sink.sinks[i] |
| .output_tuple_id}) |
| : row_desc; |
| exchange_row_desc = pool->add(new RowDescriptor(tmp_row_desc)); |
| } |
| auto source_id = sources[i]; |
| OperatorPtr source_op; |
| // 1. create and set the source operator of multi_cast_data_stream_source for new pipeline |
| source_op = std::make_shared<MultiCastDataStreamerSourceOperatorX>( |
| /*node_id*/ source_id, /*consumer_id*/ i, pool, |
| thrift_sink.multi_cast_stream_sink.sinks[i], row_desc, |
| /*operator_id=*/source_id); |
| RETURN_IF_ERROR(new_pipeline->add_operator( |
| source_op, params.__isset.parallel_instances ? params.parallel_instances : 0)); |
| // 2. create and set sink operator of data stream sender for new pipeline |
| |
| DataSinkOperatorPtr sink_op; |
| sink_op = std::make_shared<ExchangeSinkOperatorX>( |
| state, *exchange_row_desc, next_sink_operator_id(), |
| thrift_sink.multi_cast_stream_sink.sinks[i], |
| thrift_sink.multi_cast_stream_sink.destinations[i], _fragment_instance_ids); |
| |
| RETURN_IF_ERROR(new_pipeline->set_sink(sink_op)); |
| { |
| TDataSink* t = pool->add(new TDataSink()); |
| t->stream_sink = thrift_sink.multi_cast_stream_sink.sinks[i]; |
| RETURN_IF_ERROR(sink_op->init(*t)); |
| } |
| |
| // 3. set dependency dag |
| _dag[new_pipeline->id()].push_back(cur_pipeline_id); |
| } |
| if (sources.empty()) { |
| return Status::InternalError("size of sources must be greater than 0"); |
| } |
| break; |
| } |
| case TDataSinkType::BLACKHOLE_SINK: { |
| if (!thrift_sink.__isset.blackhole_sink) { |
| return Status::InternalError("Missing blackhole sink."); |
| } |
| |
| _sink.reset(new BlackholeSinkOperatorX(next_sink_operator_id())); |
| break; |
| } |
| case TDataSinkType::TVF_TABLE_SINK: { |
| if (!thrift_sink.__isset.tvf_table_sink) { |
| return Status::InternalError("Missing TVF table sink."); |
| } |
| _sink = std::make_shared<TVFTableSinkOperatorX>(pool, next_sink_operator_id(), row_desc, |
| output_exprs); |
| break; |
| } |
| default: |
| return Status::InternalError("Unsuported sink type in pipeline: {}", thrift_sink.type); |
| } |
| return Status::OK(); |
| } |
| |
| // NOLINTBEGIN(readability-function-size) |
| // NOLINTBEGIN(readability-function-cognitive-complexity) |
| Status PipelineFragmentContext::_create_operator(ObjectPool* pool, const TPlanNode& tnode, |
| const DescriptorTbl& descs, OperatorPtr& op, |
| PipelinePtr& cur_pipe, int parent_idx, |
| int child_idx, |
| const bool followed_by_shuffled_operator, |
| const bool require_bucket_distribution, |
| OperatorPtr& cache_op) { |
| std::vector<DataSinkOperatorPtr> sink_ops; |
| Defer defer = Defer([&]() { |
| if (op) { |
| op->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution); |
| } |
| for (auto& s : sink_ops) { |
| s->update_operator(tnode, followed_by_shuffled_operator, require_bucket_distribution); |
| } |
| }); |
| // We directly construct the operator from Thrift because the given array is in the order of preorder traversal. |
| // Therefore, here we need to use a stack-like structure. |
| _pipeline_parent_map.pop(cur_pipe, parent_idx, child_idx); |
| std::stringstream error_msg; |
| bool enable_query_cache = _params.fragment.__isset.query_cache_param; |
| |
| bool fe_with_old_version = false; |
| switch (tnode.node_type) { |
| case TPlanNodeType::OLAP_SCAN_NODE: { |
| op = std::make_shared<OlapScanOperatorX>( |
| pool, tnode, next_operator_id(), descs, _num_instances, |
| enable_query_cache ? _params.fragment.query_cache_param : TQueryCacheParam {}); |
| RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); |
| fe_with_old_version = !tnode.__isset.is_serial_operator; |
| break; |
| } |
| case TPlanNodeType::GROUP_COMMIT_SCAN_NODE: { |
| DCHECK(_query_ctx != nullptr); |
| _query_ctx->query_mem_tracker()->is_group_commit_load = true; |
| op = std::make_shared<GroupCommitOperatorX>(pool, tnode, next_operator_id(), descs, |
| _num_instances); |
| RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); |
| fe_with_old_version = !tnode.__isset.is_serial_operator; |
| break; |
| } |
| case TPlanNodeType::JDBC_SCAN_NODE: { |
| if (config::enable_java_support) { |
| op = std::make_shared<JDBCScanOperatorX>(pool, tnode, next_operator_id(), descs, |
| _num_instances); |
| RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); |
| } else { |
| return Status::InternalError( |
| "Jdbc scan node is disabled, you can change be config enable_java_support " |
| "to true and restart be."); |
| } |
| fe_with_old_version = !tnode.__isset.is_serial_operator; |
| break; |
| } |
| case TPlanNodeType::FILE_SCAN_NODE: { |
| op = std::make_shared<FileScanOperatorX>(pool, tnode, next_operator_id(), descs, |
| _num_instances); |
| RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); |
| fe_with_old_version = !tnode.__isset.is_serial_operator; |
| break; |
| } |
| case TPlanNodeType::ES_SCAN_NODE: |
| case TPlanNodeType::ES_HTTP_SCAN_NODE: { |
| op = std::make_shared<EsScanOperatorX>(pool, tnode, next_operator_id(), descs, |
| _num_instances); |
| RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); |
| fe_with_old_version = !tnode.__isset.is_serial_operator; |
| break; |
| } |
| case TPlanNodeType::EXCHANGE_NODE: { |
| int num_senders = _params.per_exch_num_senders.contains(tnode.node_id) |
| ? _params.per_exch_num_senders.find(tnode.node_id)->second |
| : 0; |
| DCHECK_GT(num_senders, 0); |
| op = std::make_shared<ExchangeSourceOperatorX>(pool, tnode, next_operator_id(), descs, |
| num_senders); |
| RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); |
| fe_with_old_version = !tnode.__isset.is_serial_operator; |
| break; |
| } |
| case TPlanNodeType::AGGREGATION_NODE: { |
| if (tnode.agg_node.grouping_exprs.empty() && |
| descs.get_tuple_descriptor(tnode.agg_node.output_tuple_id)->slots().empty()) { |
| return Status::InternalError("Illegal aggregate node " + std::to_string(tnode.node_id) + |
| ": group by and output is empty"); |
| } |
| bool need_create_cache_op = |
| enable_query_cache && tnode.node_id == _params.fragment.query_cache_param.node_id; |
| auto create_query_cache_operator = [&](PipelinePtr& new_pipe) { |
| auto cache_node_id = _params.local_params[0].per_node_scan_ranges.begin()->first; |
| auto cache_source_id = next_operator_id(); |
| op = std::make_shared<CacheSourceOperatorX>(pool, cache_node_id, cache_source_id, |
| _params.fragment.query_cache_param); |
| RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); |
| |
| const auto downstream_pipeline_id = cur_pipe->id(); |
| if (!_dag.contains(downstream_pipeline_id)) { |
| _dag.insert({downstream_pipeline_id, {}}); |
| } |
| new_pipe = add_pipeline(cur_pipe); |
| _dag[downstream_pipeline_id].push_back(new_pipe->id()); |
| |
| DataSinkOperatorPtr cache_sink(new CacheSinkOperatorX( |
| next_sink_operator_id(), op->node_id(), op->operator_id())); |
| RETURN_IF_ERROR(new_pipe->set_sink(cache_sink)); |
| return Status::OK(); |
| }; |
| const bool group_by_limit_opt = |
| tnode.agg_node.__isset.agg_sort_info_by_group_key && tnode.limit > 0; |
| |
| /// PartitionedAggSourceOperatorX does not support "group by limit opt(#29641)" yet. |
| /// If `group_by_limit_opt` is true, then it might not need to spill at all. |
| const bool enable_spill = _runtime_state->enable_spill() && |
| !tnode.agg_node.grouping_exprs.empty() && !group_by_limit_opt; |
| const bool is_streaming_agg = tnode.agg_node.__isset.use_streaming_preaggregation && |
| tnode.agg_node.use_streaming_preaggregation && |
| !tnode.agg_node.grouping_exprs.empty(); |
| // TODO: distinct streaming agg does not support spill. |
| const bool can_use_distinct_streaming_agg = |
| (!enable_spill || is_streaming_agg) && tnode.agg_node.aggregate_functions.empty() && |
| !tnode.agg_node.__isset.agg_sort_info_by_group_key && |
| _params.query_options.__isset.enable_distinct_streaming_aggregation && |
| _params.query_options.enable_distinct_streaming_aggregation; |
| |
| if (can_use_distinct_streaming_agg) { |
| if (need_create_cache_op) { |
| PipelinePtr new_pipe; |
| RETURN_IF_ERROR(create_query_cache_operator(new_pipe)); |
| |
| cache_op = op; |
| op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(), |
| tnode, descs); |
| RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances)); |
| RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op)); |
| cur_pipe = new_pipe; |
| } else { |
| op = std::make_shared<DistinctStreamingAggOperatorX>(pool, next_operator_id(), |
| tnode, descs); |
| RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); |
| } |
| } else if (is_streaming_agg) { |
| if (need_create_cache_op) { |
| PipelinePtr new_pipe; |
| RETURN_IF_ERROR(create_query_cache_operator(new_pipe)); |
| cache_op = op; |
| op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode, |
| descs); |
| RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op)); |
| RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances)); |
| cur_pipe = new_pipe; |
| } else { |
| op = std::make_shared<StreamingAggOperatorX>(pool, next_operator_id(), tnode, |
| descs); |
| RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); |
| } |
| } else { |
| // create new pipeline to add query cache operator |
| PipelinePtr new_pipe; |
| if (need_create_cache_op) { |
| RETURN_IF_ERROR(create_query_cache_operator(new_pipe)); |
| cache_op = op; |
| } |
| |
| if (enable_spill) { |
| op = std::make_shared<PartitionedAggSourceOperatorX>(pool, tnode, |
| next_operator_id(), descs); |
| } else { |
| op = std::make_shared<AggSourceOperatorX>(pool, tnode, next_operator_id(), descs); |
| } |
| if (need_create_cache_op) { |
| RETURN_IF_ERROR(cur_pipe->operators().front()->set_child(op)); |
| RETURN_IF_ERROR(new_pipe->add_operator(op, _parallel_instances)); |
| cur_pipe = new_pipe; |
| } else { |
| RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); |
| } |
| |
| const auto downstream_pipeline_id = cur_pipe->id(); |
| if (!_dag.contains(downstream_pipeline_id)) { |
| _dag.insert({downstream_pipeline_id, {}}); |
| } |
| cur_pipe = add_pipeline(cur_pipe); |
| _dag[downstream_pipeline_id].push_back(cur_pipe->id()); |
| |
| if (enable_spill) { |
| sink_ops.push_back(std::make_shared<PartitionedAggSinkOperatorX>( |
| pool, next_sink_operator_id(), op->operator_id(), tnode, descs)); |
| } else { |
| sink_ops.push_back(std::make_shared<AggSinkOperatorX>( |
| pool, next_sink_operator_id(), op->operator_id(), tnode, descs)); |
| } |
| RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back())); |
| RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get())); |
| } |
| break; |
| } |
| case TPlanNodeType::HASH_JOIN_NODE: { |
| const auto is_broadcast_join = tnode.hash_join_node.__isset.is_broadcast_join && |
| tnode.hash_join_node.is_broadcast_join; |
| const auto enable_spill = _runtime_state->enable_spill(); |
| if (enable_spill && !is_broadcast_join) { |
| auto tnode_ = tnode; |
| tnode_.runtime_filters.clear(); |
| auto inner_probe_operator = |
| std::make_shared<HashJoinProbeOperatorX>(pool, tnode_, 0, descs); |
| |
| // probe side inner sink operator is used to build hash table on probe side when data is spilled. |
| // So here use `tnode_` which has no runtime filters. |
| auto probe_side_inner_sink_operator = |
| std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode_, descs); |
| |
| RETURN_IF_ERROR(inner_probe_operator->init(tnode_, _runtime_state.get())); |
| RETURN_IF_ERROR(probe_side_inner_sink_operator->init(tnode_, _runtime_state.get())); |
| |
| auto probe_operator = std::make_shared<PartitionedHashJoinProbeOperatorX>( |
| pool, tnode_, next_operator_id(), descs); |
| probe_operator->set_inner_operators(probe_side_inner_sink_operator, |
| inner_probe_operator); |
| op = std::move(probe_operator); |
| RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); |
| |
| const auto downstream_pipeline_id = cur_pipe->id(); |
| if (!_dag.contains(downstream_pipeline_id)) { |
| _dag.insert({downstream_pipeline_id, {}}); |
| } |
| PipelinePtr build_side_pipe = add_pipeline(cur_pipe); |
| _dag[downstream_pipeline_id].push_back(build_side_pipe->id()); |
| |
| auto inner_sink_operator = |
| std::make_shared<HashJoinBuildSinkOperatorX>(pool, 0, 0, tnode, descs); |
| auto sink_operator = std::make_shared<PartitionedHashJoinSinkOperatorX>( |
| pool, next_sink_operator_id(), op->operator_id(), tnode_, descs); |
| RETURN_IF_ERROR(inner_sink_operator->init(tnode, _runtime_state.get())); |
| |
| sink_operator->set_inner_operators(inner_sink_operator, inner_probe_operator); |
| sink_ops.push_back(std::move(sink_operator)); |
| RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back())); |
| RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode_, _runtime_state.get())); |
| |
| _pipeline_parent_map.push(op->node_id(), cur_pipe); |
| _pipeline_parent_map.push(op->node_id(), build_side_pipe); |
| } else { |
| op = std::make_shared<HashJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs); |
| RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); |
| |
| const auto downstream_pipeline_id = cur_pipe->id(); |
| if (!_dag.contains(downstream_pipeline_id)) { |
| _dag.insert({downstream_pipeline_id, {}}); |
| } |
| PipelinePtr build_side_pipe = add_pipeline(cur_pipe); |
| _dag[downstream_pipeline_id].push_back(build_side_pipe->id()); |
| |
| sink_ops.push_back(std::make_shared<HashJoinBuildSinkOperatorX>( |
| pool, next_sink_operator_id(), op->operator_id(), tnode, descs)); |
| RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back())); |
| RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get())); |
| |
| _pipeline_parent_map.push(op->node_id(), cur_pipe); |
| _pipeline_parent_map.push(op->node_id(), build_side_pipe); |
| } |
| if (is_broadcast_join && _runtime_state->enable_share_hash_table_for_broadcast_join()) { |
| std::shared_ptr<HashJoinSharedState> shared_state = |
| HashJoinSharedState::create_shared(_num_instances); |
| for (int i = 0; i < _num_instances; i++) { |
| auto sink_dep = std::make_shared<Dependency>(op->operator_id(), op->node_id(), |
| "HASH_JOIN_BUILD_DEPENDENCY"); |
| sink_dep->set_shared_state(shared_state.get()); |
| shared_state->sink_deps.push_back(sink_dep); |
| } |
| shared_state->create_source_dependencies(_num_instances, op->operator_id(), |
| op->node_id(), "HASH_JOIN_PROBE"); |
| _op_id_to_shared_state.insert( |
| {op->operator_id(), {shared_state, shared_state->sink_deps}}); |
| } |
| break; |
| } |
| case TPlanNodeType::CROSS_JOIN_NODE: { |
| op = std::make_shared<NestedLoopJoinProbeOperatorX>(pool, tnode, next_operator_id(), descs); |
| RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); |
| |
| const auto downstream_pipeline_id = cur_pipe->id(); |
| if (!_dag.contains(downstream_pipeline_id)) { |
| _dag.insert({downstream_pipeline_id, {}}); |
| } |
| PipelinePtr build_side_pipe = add_pipeline(cur_pipe); |
| _dag[downstream_pipeline_id].push_back(build_side_pipe->id()); |
| |
| sink_ops.push_back(std::make_shared<NestedLoopJoinBuildSinkOperatorX>( |
| pool, next_sink_operator_id(), op->operator_id(), tnode, descs)); |
| RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back())); |
| RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get())); |
| _pipeline_parent_map.push(op->node_id(), cur_pipe); |
| _pipeline_parent_map.push(op->node_id(), build_side_pipe); |
| break; |
| } |
| case TPlanNodeType::UNION_NODE: { |
| int child_count = tnode.num_children; |
| op = std::make_shared<UnionSourceOperatorX>(pool, tnode, next_operator_id(), descs); |
| RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); |
| |
| const auto downstream_pipeline_id = cur_pipe->id(); |
| if (!_dag.contains(downstream_pipeline_id)) { |
| _dag.insert({downstream_pipeline_id, {}}); |
| } |
| for (int i = 0; i < child_count; i++) { |
| PipelinePtr build_side_pipe = add_pipeline(cur_pipe); |
| _dag[downstream_pipeline_id].push_back(build_side_pipe->id()); |
| sink_ops.push_back(std::make_shared<UnionSinkOperatorX>( |
| i, next_sink_operator_id(), op->operator_id(), pool, tnode, descs)); |
| RETURN_IF_ERROR(build_side_pipe->set_sink(sink_ops.back())); |
| RETURN_IF_ERROR(build_side_pipe->sink()->init(tnode, _runtime_state.get())); |
| // preset children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build. |
| _pipeline_parent_map.push(op->node_id(), build_side_pipe); |
| } |
| break; |
| } |
| case TPlanNodeType::SORT_NODE: { |
| const auto should_spill = _runtime_state->enable_spill() && |
| tnode.sort_node.algorithm == TSortAlgorithm::FULL_SORT; |
| const bool use_local_merge = |
| tnode.sort_node.__isset.use_local_merge && tnode.sort_node.use_local_merge; |
| if (should_spill) { |
| op = std::make_shared<SpillSortSourceOperatorX>(pool, tnode, next_operator_id(), descs); |
| } else if (use_local_merge) { |
| op = std::make_shared<LocalMergeSortSourceOperatorX>(pool, tnode, next_operator_id(), |
| descs); |
| } else { |
| op = std::make_shared<SortSourceOperatorX>(pool, tnode, next_operator_id(), descs); |
| } |
| RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); |
| |
| const auto downstream_pipeline_id = cur_pipe->id(); |
| if (!_dag.contains(downstream_pipeline_id)) { |
| _dag.insert({downstream_pipeline_id, {}}); |
| } |
| cur_pipe = add_pipeline(cur_pipe); |
| _dag[downstream_pipeline_id].push_back(cur_pipe->id()); |
| |
| if (should_spill) { |
| sink_ops.push_back(std::make_shared<SpillSortSinkOperatorX>( |
| pool, next_sink_operator_id(), op->operator_id(), tnode, descs)); |
| } else { |
| sink_ops.push_back(std::make_shared<SortSinkOperatorX>( |
| pool, next_sink_operator_id(), op->operator_id(), tnode, descs)); |
| } |
| RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back())); |
| RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get())); |
| break; |
| } |
| case TPlanNodeType::PARTITION_SORT_NODE: { |
| op = std::make_shared<PartitionSortSourceOperatorX>(pool, tnode, next_operator_id(), descs); |
| RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); |
| |
| const auto downstream_pipeline_id = cur_pipe->id(); |
| if (!_dag.contains(downstream_pipeline_id)) { |
| _dag.insert({downstream_pipeline_id, {}}); |
| } |
| cur_pipe = add_pipeline(cur_pipe); |
| _dag[downstream_pipeline_id].push_back(cur_pipe->id()); |
| |
| sink_ops.push_back(std::make_shared<PartitionSortSinkOperatorX>( |
| pool, next_sink_operator_id(), op->operator_id(), tnode, descs)); |
| RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back())); |
| RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get())); |
| break; |
| } |
| case TPlanNodeType::ANALYTIC_EVAL_NODE: { |
| op = std::make_shared<AnalyticSourceOperatorX>(pool, tnode, next_operator_id(), descs); |
| RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); |
| |
| const auto downstream_pipeline_id = cur_pipe->id(); |
| if (!_dag.contains(downstream_pipeline_id)) { |
| _dag.insert({downstream_pipeline_id, {}}); |
| } |
| cur_pipe = add_pipeline(cur_pipe); |
| _dag[downstream_pipeline_id].push_back(cur_pipe->id()); |
| |
| sink_ops.push_back(std::make_shared<AnalyticSinkOperatorX>( |
| pool, next_sink_operator_id(), op->operator_id(), tnode, descs)); |
| RETURN_IF_ERROR(cur_pipe->set_sink(sink_ops.back())); |
| RETURN_IF_ERROR(cur_pipe->sink()->init(tnode, _runtime_state.get())); |
| break; |
| } |
| case TPlanNodeType::MATERIALIZATION_NODE: { |
| op = std::make_shared<MaterializationOperator>(pool, tnode, next_operator_id(), descs); |
| RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); |
| break; |
| } |
| case TPlanNodeType::INTERSECT_NODE: { |
| RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(pool, tnode, descs, op, |
| cur_pipe, sink_ops)); |
| break; |
| } |
| case TPlanNodeType::EXCEPT_NODE: { |
| RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(pool, tnode, descs, op, |
| cur_pipe, sink_ops)); |
| break; |
| } |
| case TPlanNodeType::REPEAT_NODE: { |
| op = std::make_shared<RepeatOperatorX>(pool, tnode, next_operator_id(), descs); |
| RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); |
| break; |
| } |
| case TPlanNodeType::TABLE_FUNCTION_NODE: { |
| op = std::make_shared<TableFunctionOperatorX>(pool, tnode, next_operator_id(), descs); |
| RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); |
| break; |
| } |
| case TPlanNodeType::ASSERT_NUM_ROWS_NODE: { |
| op = std::make_shared<AssertNumRowsOperatorX>(pool, tnode, next_operator_id(), descs); |
| RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); |
| break; |
| } |
| case TPlanNodeType::EMPTY_SET_NODE: { |
| op = std::make_shared<EmptySetSourceOperatorX>(pool, tnode, next_operator_id(), descs); |
| RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); |
| break; |
| } |
| case TPlanNodeType::DATA_GEN_SCAN_NODE: { |
| op = std::make_shared<DataGenSourceOperatorX>(pool, tnode, next_operator_id(), descs); |
| RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); |
| fe_with_old_version = !tnode.__isset.is_serial_operator; |
| break; |
| } |
| case TPlanNodeType::SCHEMA_SCAN_NODE: { |
| op = std::make_shared<SchemaScanOperatorX>(pool, tnode, next_operator_id(), descs); |
| RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); |
| break; |
| } |
| case TPlanNodeType::META_SCAN_NODE: { |
| op = std::make_shared<MetaScanOperatorX>(pool, tnode, next_operator_id(), descs); |
| RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); |
| break; |
| } |
| case TPlanNodeType::SELECT_NODE: { |
| op = std::make_shared<SelectOperatorX>(pool, tnode, next_operator_id(), descs); |
| RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); |
| break; |
| } |
| case TPlanNodeType::REC_CTE_NODE: { |
| op = std::make_shared<RecCTESourceOperatorX>(pool, tnode, next_operator_id(), descs); |
| RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); |
| |
| const auto downstream_pipeline_id = cur_pipe->id(); |
| if (!_dag.contains(downstream_pipeline_id)) { |
| _dag.insert({downstream_pipeline_id, {}}); |
| } |
| |
| PipelinePtr anchor_side_pipe = add_pipeline(cur_pipe); |
| _dag[downstream_pipeline_id].push_back(anchor_side_pipe->id()); |
| |
| DataSinkOperatorPtr anchor_sink; |
| anchor_sink = std::make_shared<RecCTEAnchorSinkOperatorX>(next_sink_operator_id(), |
| op->operator_id(), tnode, descs); |
| RETURN_IF_ERROR(anchor_side_pipe->set_sink(anchor_sink)); |
| RETURN_IF_ERROR(anchor_side_pipe->sink()->init(tnode, _runtime_state.get())); |
| _pipeline_parent_map.push(op->node_id(), anchor_side_pipe); |
| |
| PipelinePtr rec_side_pipe = add_pipeline(cur_pipe); |
| _dag[downstream_pipeline_id].push_back(rec_side_pipe->id()); |
| |
| DataSinkOperatorPtr rec_sink; |
| rec_sink = std::make_shared<RecCTESinkOperatorX>(next_sink_operator_id(), op->operator_id(), |
| tnode, descs); |
| RETURN_IF_ERROR(rec_side_pipe->set_sink(rec_sink)); |
| RETURN_IF_ERROR(rec_side_pipe->sink()->init(tnode, _runtime_state.get())); |
| _pipeline_parent_map.push(op->node_id(), rec_side_pipe); |
| |
| break; |
| } |
| case TPlanNodeType::REC_CTE_SCAN_NODE: { |
| op = std::make_shared<RecCTEScanOperatorX>(pool, tnode, next_operator_id(), descs); |
| RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); |
| break; |
| } |
| default: |
| return Status::InternalError("Unsupported exec type in pipeline: {}", |
| print_plan_node_type(tnode.node_type)); |
| } |
| if (_params.__isset.parallel_instances && fe_with_old_version) { |
| cur_pipe->set_num_tasks(_params.parallel_instances); |
| op->set_serial_operator(); |
| } |
| |
| return Status::OK(); |
| } |
| // NOLINTEND(readability-function-cognitive-complexity) |
| // NOLINTEND(readability-function-size) |
| |
| template <bool is_intersect> |
| Status PipelineFragmentContext::_build_operators_for_set_operation_node( |
| ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs, OperatorPtr& op, |
| PipelinePtr& cur_pipe, std::vector<DataSinkOperatorPtr>& sink_ops) { |
| op.reset(new SetSourceOperatorX<is_intersect>(pool, tnode, next_operator_id(), descs)); |
| RETURN_IF_ERROR(cur_pipe->add_operator(op, _parallel_instances)); |
| |
| const auto downstream_pipeline_id = cur_pipe->id(); |
| if (!_dag.contains(downstream_pipeline_id)) { |
| _dag.insert({downstream_pipeline_id, {}}); |
| } |
| |
| for (int child_id = 0; child_id < tnode.num_children; child_id++) { |
| PipelinePtr probe_side_pipe = add_pipeline(cur_pipe); |
| _dag[downstream_pipeline_id].push_back(probe_side_pipe->id()); |
| |
| if (child_id == 0) { |
| sink_ops.push_back(std::make_shared<SetSinkOperatorX<is_intersect>>( |
| child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs)); |
| } else { |
| sink_ops.push_back(std::make_shared<SetProbeSinkOperatorX<is_intersect>>( |
| child_id, next_sink_operator_id(), op->operator_id(), pool, tnode, descs)); |
| } |
| RETURN_IF_ERROR(probe_side_pipe->set_sink(sink_ops.back())); |
| RETURN_IF_ERROR(probe_side_pipe->sink()->init(tnode, _runtime_state.get())); |
| // prepare children pipelines. if any pipeline found this as its father, will use the prepared pipeline to build. |
| _pipeline_parent_map.push(op->node_id(), probe_side_pipe); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status PipelineFragmentContext::submit() { |
| if (_submitted) { |
| return Status::InternalError("submitted"); |
| } |
| _submitted = true; |
| |
| int submit_tasks = 0; |
| Status st; |
| auto* scheduler = _query_ctx->get_pipe_exec_scheduler(); |
| for (auto& task : _tasks) { |
| for (auto& t : task) { |
| st = scheduler->submit(t.first); |
| DBUG_EXECUTE_IF("PipelineFragmentContext.submit.failed", |
| { st = Status::Aborted("PipelineFragmentContext.submit.failed"); }); |
| if (!st) { |
| cancel(Status::InternalError("submit context to executor fail")); |
| std::lock_guard<std::mutex> l(_task_mutex); |
| _total_tasks = submit_tasks; |
| break; |
| } |
| submit_tasks++; |
| } |
| } |
| if (!st.ok()) { |
| std::lock_guard<std::mutex> l(_task_mutex); |
| if (_closed_tasks >= _total_tasks) { |
| _close_fragment_instance(); |
| } |
| return Status::InternalError("Submit pipeline failed. err = {}, BE: {}", st.to_string(), |
| BackendOptions::get_localhost()); |
| } else { |
| return st; |
| } |
| } |
| |
| void PipelineFragmentContext::print_profile(const std::string& extra_info) { |
| if (_runtime_state->enable_profile()) { |
| std::stringstream ss; |
| for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) { |
| runtime_profile_ptr->pretty_print(&ss); |
| } |
| |
| if (_runtime_state->load_channel_profile()) { |
| _runtime_state->load_channel_profile()->pretty_print(&ss); |
| } |
| |
| auto profile_str = |
| fmt::format("Query {} fragment {} {}, profile, {}", print_id(this->_query_id), |
| this->_fragment_id, extra_info, ss.str()); |
| LOG_LONG_STRING(INFO, profile_str); |
| } |
| } |
| // If all pipeline tasks binded to the fragment instance are finished, then we could |
| // close the fragment instance. |
| void PipelineFragmentContext::_close_fragment_instance() { |
| if (_is_fragment_instance_closed) { |
| return; |
| } |
| Defer defer_op {[&]() { |
| _is_fragment_instance_closed = true; |
| _notify_cv.notify_all(); |
| }}; |
| _fragment_level_profile->total_time_counter()->update(_fragment_watcher.elapsed_time()); |
| if (!_need_notify_close) { |
| auto st = send_report(true); |
| if (!st) { |
| LOG(WARNING) << fmt::format("Failed to send report for query {}, fragment {}: {}", |
| print_id(_query_id), _fragment_id, st.to_string()); |
| } |
| } |
| // Print profile content in info log is a tempoeray solution for stream load and external_connector. |
| // Since stream load does not have someting like coordinator on FE, so |
| // backend can not report profile to FE, ant its profile can not be shown |
| // in the same way with other query. So we print the profile content to info log. |
| |
| if (_runtime_state->enable_profile() && |
| (_query_ctx->get_query_source() == QuerySource::STREAM_LOAD || |
| _query_ctx->get_query_source() == QuerySource::EXTERNAL_CONNECTOR || |
| _query_ctx->get_query_source() == QuerySource::GROUP_COMMIT_LOAD)) { |
| std::stringstream ss; |
| // Compute the _local_time_percent before pretty_print the runtime_profile |
| // Before add this operation, the print out like that: |
| // UNION_NODE (id=0):(Active: 56.720us, non-child: 00.00%) |
| // After add the operation, the print out like that: |
| // UNION_NODE (id=0):(Active: 56.720us, non-child: 82.53%) |
| // We can easily know the exec node execute time without child time consumed. |
| for (auto runtime_profile_ptr : _runtime_state->pipeline_id_to_profile()) { |
| runtime_profile_ptr->pretty_print(&ss); |
| } |
| |
| if (_runtime_state->load_channel_profile()) { |
| _runtime_state->load_channel_profile()->pretty_print(&ss); |
| } |
| |
| LOG_INFO("Query {} fragment {} profile:\n {}", print_id(_query_id), _fragment_id, ss.str()); |
| } |
| |
| if (_query_ctx->enable_profile()) { |
| _query_ctx->add_fragment_profile(_fragment_id, collect_realtime_profile(), |
| collect_realtime_load_channel_profile()); |
| } |
| |
| if (!_need_notify_close) { |
| // all submitted tasks done |
| _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id}); |
| } |
| } |
| |
| void PipelineFragmentContext::decrement_running_task(PipelineId pipeline_id) { |
| // If all tasks of this pipeline has been closed, upstream tasks is never needed, and we just make those runnable here |
| DCHECK(_pip_id_to_pipeline.contains(pipeline_id)); |
| if (_pip_id_to_pipeline[pipeline_id]->close_task()) { |
| if (_dag.contains(pipeline_id)) { |
| for (auto dep : _dag[pipeline_id]) { |
| _pip_id_to_pipeline[dep]->make_all_runnable(pipeline_id); |
| } |
| } |
| } |
| std::lock_guard<std::mutex> l(_task_mutex); |
| ++_closed_tasks; |
| if (_closed_tasks >= _total_tasks) { |
| _close_fragment_instance(); |
| } |
| } |
| |
| std::string PipelineFragmentContext::get_load_error_url() { |
| if (const auto& str = _runtime_state->get_error_log_file_path(); !str.empty()) { |
| return to_load_error_http_path(str); |
| } |
| for (auto& tasks : _tasks) { |
| for (auto& task : tasks) { |
| if (const auto& str = task.second->get_error_log_file_path(); !str.empty()) { |
| return to_load_error_http_path(str); |
| } |
| } |
| } |
| return ""; |
| } |
| |
| std::string PipelineFragmentContext::get_first_error_msg() { |
| if (const auto& str = _runtime_state->get_first_error_msg(); !str.empty()) { |
| return str; |
| } |
| for (auto& tasks : _tasks) { |
| for (auto& task : tasks) { |
| if (const auto& str = task.second->get_first_error_msg(); !str.empty()) { |
| return str; |
| } |
| } |
| } |
| return ""; |
| } |
| |
| Status PipelineFragmentContext::send_report(bool done) { |
| Status exec_status = _query_ctx->exec_status(); |
| // If plan is done successfully, but _is_report_success is false, |
| // no need to send report. |
| // Load will set _is_report_success to true because load wants to know |
| // the process. |
| if (!_is_report_success && done && exec_status.ok()) { |
| return Status::NeedSendAgain(""); |
| } |
| |
| // If both _is_report_success and _is_report_on_cancel are false, |
| // which means no matter query is success or failed, no report is needed. |
| // This may happen when the query limit reached and |
| // a internal cancellation being processed |
| // When limit is reached the fragment is also cancelled, but _is_report_on_cancel will |
| // be set to false, to avoid sending fault report to FE. |
| if (!_is_report_success && !_is_report_on_cancel) { |
| return Status::NeedSendAgain(""); |
| } |
| |
| std::vector<RuntimeState*> runtime_states; |
| |
| for (auto& tasks : _tasks) { |
| for (auto& task : tasks) { |
| runtime_states.push_back(task.second.get()); |
| } |
| } |
| |
| std::string load_eror_url = _query_ctx->get_load_error_url().empty() |
| ? get_load_error_url() |
| : _query_ctx->get_load_error_url(); |
| std::string first_error_msg = _query_ctx->get_first_error_msg().empty() |
| ? get_first_error_msg() |
| : _query_ctx->get_first_error_msg(); |
| |
| ReportStatusRequest req {.status = exec_status, |
| .runtime_states = runtime_states, |
| .done = done || !exec_status.ok(), |
| .coord_addr = _query_ctx->coord_addr, |
| .query_id = _query_id, |
| .fragment_id = _fragment_id, |
| .fragment_instance_id = TUniqueId(), |
| .backend_num = -1, |
| .runtime_state = _runtime_state.get(), |
| .load_error_url = load_eror_url, |
| .first_error_msg = first_error_msg, |
| .cancel_fn = [this](const Status& reason) { cancel(reason); }}; |
| |
| return _report_status_cb( |
| req, std::dynamic_pointer_cast<PipelineFragmentContext>(shared_from_this())); |
| } |
| |
| size_t PipelineFragmentContext::get_revocable_size(bool* has_running_task) const { |
| size_t res = 0; |
| // _tasks will be cleared during ~PipelineFragmentContext, so that it's safe |
| // here to traverse the vector. |
| for (const auto& task_instances : _tasks) { |
| for (const auto& task : task_instances) { |
| if (task.first->is_running()) { |
| LOG_EVERY_N(INFO, 50) << "Query: " << print_id(_query_id) |
| << " is running, task: " << (void*)task.first.get() |
| << ", is_running: " << task.first->is_running(); |
| *has_running_task = true; |
| return 0; |
| } |
| |
| size_t revocable_size = task.first->get_revocable_size(); |
| if (revocable_size >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) { |
| res += revocable_size; |
| } |
| } |
| } |
| return res; |
| } |
| |
| std::vector<PipelineTask*> PipelineFragmentContext::get_revocable_tasks() const { |
| std::vector<PipelineTask*> revocable_tasks; |
| for (const auto& task_instances : _tasks) { |
| for (const auto& task : task_instances) { |
| size_t revocable_size_ = task.first->get_revocable_size(); |
| |
| if (revocable_size_ >= SpillFile::MIN_SPILL_WRITE_BATCH_MEM) { |
| revocable_tasks.emplace_back(task.first.get()); |
| } |
| } |
| } |
| return revocable_tasks; |
| } |
| |
| std::string PipelineFragmentContext::debug_string() { |
| std::lock_guard<std::mutex> l(_task_mutex); |
| fmt::memory_buffer debug_string_buffer; |
| fmt::format_to(debug_string_buffer, |
| "PipelineFragmentContext Info: _closed_tasks={}, _total_tasks={}, " |
| "need_notify_close={}, has_task_execution_ctx_ref_count={}\n", |
| _closed_tasks, _total_tasks, _need_notify_close, |
| _has_task_execution_ctx_ref_count); |
| for (size_t j = 0; j < _tasks.size(); j++) { |
| fmt::format_to(debug_string_buffer, "Tasks in instance {}:\n", j); |
| for (size_t i = 0; i < _tasks[j].size(); i++) { |
| fmt::format_to(debug_string_buffer, "Task {}: {}\n", i, |
| _tasks[j][i].first->debug_string()); |
| } |
| } |
| |
| return fmt::to_string(debug_string_buffer); |
| } |
| |
| std::vector<std::shared_ptr<TRuntimeProfileTree>> |
| PipelineFragmentContext::collect_realtime_profile() const { |
| std::vector<std::shared_ptr<TRuntimeProfileTree>> res; |
| |
| // we do not have mutex to protect pipeline_id_to_profile |
| // so we need to make sure this funciton is invoked after fragment context |
| // has already been prepared. |
| if (!_prepared) { |
| std::string msg = |
| "Query " + print_id(_query_id) + " collecting profile, but its not prepared"; |
| DCHECK(false) << msg; |
| LOG_ERROR(msg); |
| return res; |
| } |
| |
| // Make sure first profile is fragment level profile |
| auto fragment_profile = std::make_shared<TRuntimeProfileTree>(); |
| _fragment_level_profile->to_thrift(fragment_profile.get(), _runtime_state->profile_level()); |
| res.push_back(fragment_profile); |
| |
| // pipeline_id_to_profile is initialized in prepare stage |
| for (auto pipeline_profile : _runtime_state->pipeline_id_to_profile()) { |
| auto profile_ptr = std::make_shared<TRuntimeProfileTree>(); |
| pipeline_profile->to_thrift(profile_ptr.get(), _runtime_state->profile_level()); |
| res.push_back(profile_ptr); |
| } |
| |
| return res; |
| } |
| |
| std::shared_ptr<TRuntimeProfileTree> |
| PipelineFragmentContext::collect_realtime_load_channel_profile() const { |
| // we do not have mutex to protect pipeline_id_to_profile |
| // so we need to make sure this funciton is invoked after fragment context |
| // has already been prepared. |
| if (!_prepared) { |
| std::string msg = |
| "Query " + print_id(_query_id) + " collecting profile, but its not prepared"; |
| DCHECK(false) << msg; |
| LOG_ERROR(msg); |
| return nullptr; |
| } |
| |
| for (const auto& tasks : _tasks) { |
| for (const auto& task : tasks) { |
| if (task.second->load_channel_profile() == nullptr) { |
| continue; |
| } |
| |
| auto tmp_load_channel_profile = std::make_shared<TRuntimeProfileTree>(); |
| |
| task.second->load_channel_profile()->to_thrift(tmp_load_channel_profile.get(), |
| _runtime_state->profile_level()); |
| _runtime_state->load_channel_profile()->update(*tmp_load_channel_profile); |
| } |
| } |
| |
| auto load_channel_profile = std::make_shared<TRuntimeProfileTree>(); |
| _runtime_state->load_channel_profile()->to_thrift(load_channel_profile.get(), |
| _runtime_state->profile_level()); |
| return load_channel_profile; |
| } |
| |
| Status PipelineFragmentContext::wait_close(bool close) { |
| if (_exec_env->new_load_stream_mgr()->get(_query_id) != nullptr) { |
| return Status::InternalError("stream load do not support reset"); |
| } |
| if (!_need_notify_close) { |
| return Status::InternalError("_need_notify_close is false, do not support reset"); |
| } |
| |
| { |
| std::unique_lock<std::mutex> lock(_task_mutex); |
| while (!(_is_fragment_instance_closed.load() && !_has_task_execution_ctx_ref_count)) { |
| if (_query_ctx->is_cancelled()) { |
| return Status::Cancelled("Query has been cancelled"); |
| } |
| _notify_cv.wait_for(lock, std::chrono::seconds(1)); |
| } |
| } |
| |
| if (close) { |
| auto st = send_report(true); |
| if (!st) { |
| LOG(WARNING) << fmt::format("Failed to send report for query {}, fragment {}: {}", |
| print_id(_query_id), _fragment_id, st.to_string()); |
| } |
| _exec_env->fragment_mgr()->remove_pipeline_context({_query_id, _fragment_id}); |
| } |
| return Status::OK(); |
| } |
| |
| Status PipelineFragmentContext::set_to_rerun() { |
| { |
| std::lock_guard<std::mutex> l(_task_mutex); |
| SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker()); |
| for (auto& tasks : _tasks) { |
| for (const auto& task : tasks) { |
| task.first->runtime_state()->reset_to_rerun(); |
| } |
| } |
| } |
| _release_resource(); |
| _runtime_state->reset_to_rerun(); |
| return Status::OK(); |
| } |
| |
| Status PipelineFragmentContext::rebuild(ThreadPool* thread_pool) { |
| _submitted = false; |
| _is_fragment_instance_closed = false; |
| return _build_and_prepare_full_pipeline(thread_pool); |
| } |
| |
| void PipelineFragmentContext::_release_resource() { |
| std::lock_guard<std::mutex> l(_task_mutex); |
| // The memory released by the query end is recorded in the query mem tracker. |
| SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_ctx->query_mem_tracker()); |
| auto st = _query_ctx->exec_status(); |
| for (auto& _task : _tasks) { |
| if (!_task.empty()) { |
| _call_back(_task.front().first->runtime_state(), &st); |
| } |
| } |
| _tasks.clear(); |
| _dag.clear(); |
| _pip_id_to_pipeline.clear(); |
| _pipelines.clear(); |
| _sink.reset(); |
| _root_op.reset(); |
| _runtime_filter_mgr_map.clear(); |
| _op_id_to_shared_state.clear(); |
| } |
| |
| #include "common/compile_check_end.h" |
| } // namespace doris |