blob: 0d5d14a18b314896fbed91c864912cfe083925dc [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "pipeline_fragment_context.h"
#include <gen_cpp/DataSinks_types.h>
#include <gen_cpp/PaloInternalService_types.h>
#include <gen_cpp/PlanNodes_types.h>
#include <gen_cpp/Planner_types.h>
#include <opentelemetry/nostd/shared_ptr.h>
#include <opentelemetry/trace/span.h>
#include <opentelemetry/trace/span_context.h>
#include <opentelemetry/trace/tracer.h>
#include <pthread.h>
#include <stdlib.h>
// IWYU pragma: no_include <bits/chrono.h>
#include <chrono> // IWYU pragma: keep
#include <map>
#include <ostream>
#include <typeinfo>
#include <utility>
#include "common/config.h"
#include "common/logging.h"
#include "exec/data_sink.h"
#include "exec/exec_node.h"
#include "exec/scan_node.h"
#include "io/fs/stream_load_pipe.h"
#include "pipeline/exec/aggregation_sink_operator.h"
#include "pipeline/exec/aggregation_source_operator.h"
#include "pipeline/exec/analytic_sink_operator.h"
#include "pipeline/exec/analytic_source_operator.h"
#include "pipeline/exec/assert_num_rows_operator.h"
#include "pipeline/exec/const_value_operator.h"
#include "pipeline/exec/data_queue.h"
#include "pipeline/exec/datagen_operator.h"
#include "pipeline/exec/distinct_streaming_aggregation_sink_operator.h"
#include "pipeline/exec/distinct_streaming_aggregation_source_operator.h"
#include "pipeline/exec/empty_set_operator.h"
#include "pipeline/exec/empty_source_operator.h"
#include "pipeline/exec/exchange_sink_operator.h"
#include "pipeline/exec/exchange_source_operator.h"
#include "pipeline/exec/hashjoin_build_sink.h"
#include "pipeline/exec/hashjoin_probe_operator.h"
#include "pipeline/exec/multi_cast_data_stream_sink.h"
#include "pipeline/exec/multi_cast_data_stream_source.h"
#include "pipeline/exec/mysql_scan_operator.h" // IWYU pragma: keep
#include "pipeline/exec/nested_loop_join_build_operator.h"
#include "pipeline/exec/nested_loop_join_probe_operator.h"
#include "pipeline/exec/olap_table_sink_operator.h"
#include "pipeline/exec/operator.h"
#include "pipeline/exec/partition_sort_sink_operator.h"
#include "pipeline/exec/partition_sort_source_operator.h"
#include "pipeline/exec/repeat_operator.h"
#include "pipeline/exec/result_file_sink_operator.h"
#include "pipeline/exec/result_sink_operator.h"
#include "pipeline/exec/scan_operator.h"
#include "pipeline/exec/schema_scan_operator.h"
#include "pipeline/exec/select_operator.h"
#include "pipeline/exec/set_probe_sink_operator.h" // IWYU pragma: keep
#include "pipeline/exec/set_sink_operator.h" // IWYU pragma: keep
#include "pipeline/exec/set_source_operator.h" // IWYU pragma: keep
#include "pipeline/exec/sort_sink_operator.h"
#include "pipeline/exec/sort_source_operator.h"
#include "pipeline/exec/streaming_aggregation_sink_operator.h"
#include "pipeline/exec/streaming_aggregation_source_operator.h"
#include "pipeline/exec/table_function_operator.h"
#include "pipeline/exec/table_sink_operator.h"
#include "pipeline/exec/union_sink_operator.h"
#include "pipeline/exec/union_source_operator.h"
#include "pipeline_task.h"
#include "runtime/exec_env.h"
#include "runtime/fragment_mgr.h"
#include "runtime/runtime_filter_mgr.h"
#include "runtime/runtime_state.h"
#include "runtime/stream_load/new_load_stream_mgr.h"
#include "runtime/stream_load/stream_load_context.h"
#include "runtime/thread_context.h"
#include "service/backend_options.h"
#include "task_scheduler.h"
#include "util/container_util.hpp"
#include "util/debug_util.h"
#include "util/telemetry/telemetry.h"
#include "util/uid_util.h"
#include "vec/common/assert_cast.h"
#include "vec/exec/join/vhash_join_node.h"
#include "vec/exec/scan/new_es_scan_node.h"
#include "vec/exec/scan/new_file_scan_node.h"
#include "vec/exec/scan/new_jdbc_scan_node.h"
#include "vec/exec/scan/new_odbc_scan_node.h"
#include "vec/exec/scan/new_olap_scan_node.h"
#include "vec/exec/scan/vmeta_scan_node.h"
#include "vec/exec/scan/vscan_node.h"
#include "vec/exec/vaggregation_node.h"
#include "vec/exec/vexchange_node.h"
#include "vec/exec/vunion_node.h"
#include "vec/runtime/vdata_stream_mgr.h"
namespace doris::pipeline {
PipelineFragmentContext::PipelineFragmentContext(
const TUniqueId& query_id, const TUniqueId& instance_id, const int fragment_id,
int backend_num, std::shared_ptr<QueryContext> query_ctx, ExecEnv* exec_env,
const std::function<void(RuntimeState*, Status*)>& call_back,
const report_status_callback& report_status_cb)
: _query_id(query_id),
_fragment_instance_id(instance_id),
_fragment_id(fragment_id),
_backend_num(backend_num),
_exec_env(exec_env),
_cancel_reason(PPlanFragmentCancelReason::INTERNAL_ERROR),
_query_ctx(std::move(query_ctx)),
_call_back(call_back),
_report_thread_active(false),
_report_status_cb(report_status_cb),
_is_report_on_cancel(true) {
if (_query_ctx->get_task_group()) {
_task_group_entity = _query_ctx->get_task_group()->task_entity();
}
_report_thread_future = _report_thread_promise.get_future();
_fragment_watcher.start();
}
PipelineFragmentContext::~PipelineFragmentContext() {
if (_runtime_state != nullptr) {
// The memory released by the query end is recorded in the query mem tracker, main memory in _runtime_state.
SCOPED_ATTACH_TASK(_runtime_state.get());
_call_back(_runtime_state.get(), &_exec_status);
_runtime_state.reset();
} else {
_call_back(_runtime_state.get(), &_exec_status);
}
DCHECK(!_report_thread_active);
}
void PipelineFragmentContext::cancel(const PPlanFragmentCancelReason& reason,
const std::string& msg) {
if (!_runtime_state->is_cancelled()) {
std::lock_guard<std::mutex> l(_status_lock);
if (_runtime_state->is_cancelled()) {
return;
}
if (reason != PPlanFragmentCancelReason::LIMIT_REACH) {
_exec_status = Status::Cancelled(msg);
}
_runtime_state->set_is_cancelled(msg);
LOG_WARNING("Query {} instance {} cancelled, reason {}, message {}", print_id(_query_id),
print_id(_fragment_instance_id), PPlanFragmentCancelReason_Name(reason),
msg.substr(0, 50));
// Print detail informations below when you debugging here.
//
// for (auto& task : _tasks) {
// LOG(WARNING) << task->debug_string();
// }
_runtime_state->set_process_status(_exec_status);
// Get pipe from new load stream manager and send cancel to it or the fragment may hang to wait read from pipe
// For stream load the fragment's query_id == load id, it is set in FE.
auto stream_load_ctx = _exec_env->new_load_stream_mgr()->get(_query_id);
if (stream_load_ctx != nullptr) {
stream_load_ctx->pipe->cancel(msg);
}
_cancel_reason = reason;
_cancel_msg = msg;
// To notify wait_for_start()
_query_ctx->set_ready_to_execute(true);
// must close stream_mgr to avoid dead lock in Exchange Node
_exec_env->vstream_mgr()->cancel(_fragment_instance_id);
// Cancel the result queue manager used by spark doris connector
// TODO pipeline incomp
// _exec_env->result_queue_mgr()->update_queue_status(id, Status::Aborted(msg));
}
}
PipelinePtr PipelineFragmentContext::add_pipeline() {
// _prepared、_submitted, _canceled should do not add pipeline
PipelineId id = _next_pipeline_id++;
auto pipeline = std::make_shared<Pipeline>(id, weak_from_this());
_pipelines.emplace_back(pipeline);
return pipeline;
}
Status PipelineFragmentContext::prepare(const doris::TPipelineFragmentParams& request,
const size_t idx) {
if (_prepared) {
return Status::InternalError("Already prepared");
}
const auto& local_params = request.local_params[idx];
_runtime_profile.reset(new RuntimeProfile("PipelineContext"));
_start_timer = ADD_TIMER(_runtime_profile, "StartTime");
COUNTER_UPDATE(_start_timer, _fragment_watcher.elapsed_time());
_prepare_timer = ADD_TIMER(_runtime_profile, "PrepareTime");
SCOPED_TIMER(_prepare_timer);
auto* fragment_context = this;
OpentelemetryTracer tracer = telemetry::get_noop_tracer();
if (opentelemetry::trace::Tracer::GetCurrentSpan()->GetContext().IsValid()) {
tracer = telemetry::get_tracer(print_id(_query_id));
}
LOG_INFO("PipelineFragmentContext::prepare")
.tag("query_id", print_id(_query_id))
.tag("instance_id", print_id(local_params.fragment_instance_id))
.tag("backend_num", local_params.backend_num)
.tag("pthread_id", (uintptr_t)pthread_self());
// 1. init _runtime_state
_runtime_state = RuntimeState::create_unique(local_params, request.query_id,
request.fragment_id, request.query_options,
_query_ctx->query_globals, _exec_env);
_runtime_state->set_query_ctx(_query_ctx.get());
_runtime_state->set_query_mem_tracker(_query_ctx->query_mem_tracker);
_runtime_state->set_tracer(std::move(tracer));
// TODO should be combine with plan_fragment_executor.prepare funciton
SCOPED_ATTACH_TASK(get_runtime_state());
_runtime_state->runtime_filter_mgr()->init();
_runtime_state->set_be_number(local_params.backend_num);
if (request.__isset.backend_id) {
_runtime_state->set_backend_id(request.backend_id);
}
if (request.__isset.import_label) {
_runtime_state->set_import_label(request.import_label);
}
if (request.__isset.db_name) {
_runtime_state->set_db_name(request.db_name);
}
if (request.__isset.load_job_id) {
_runtime_state->set_load_job_id(request.load_job_id);
}
if (request.query_options.__isset.is_report_success) {
fragment_context->set_is_report_success(request.query_options.is_report_success);
}
if (request.is_simplified_param) {
_desc_tbl = _query_ctx->desc_tbl;
} else {
DCHECK(request.__isset.desc_tbl);
RETURN_IF_ERROR(
DescriptorTbl::create(_runtime_state->obj_pool(), request.desc_tbl, &_desc_tbl));
}
_runtime_state->set_desc_tbl(_desc_tbl);
// 2. Create ExecNode to build pipeline with PipelineFragmentContext
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(
ExecNode::create_tree(_runtime_state.get(), _runtime_state->obj_pool(),
request.fragment.plan, *_desc_tbl, &_root_plan));
// Set senders of exchange nodes before pipeline build
std::vector<ExecNode*> exch_nodes;
_root_plan->collect_nodes(TPlanNodeType::EXCHANGE_NODE, &exch_nodes);
for (ExecNode* exch_node : exch_nodes) {
DCHECK_EQ(exch_node->type(), TPlanNodeType::EXCHANGE_NODE);
int num_senders = find_with_default(request.per_exch_num_senders, exch_node->id(), 0);
DCHECK_GT(num_senders, 0);
static_cast<vectorized::VExchangeNode*>(exch_node)->set_num_senders(num_senders);
}
// All prepare work do in exec node tree
RETURN_IF_ERROR(_root_plan->prepare(_runtime_state.get()));
// set scan ranges
std::vector<ExecNode*> scan_nodes;
std::vector<TScanRangeParams> no_scan_ranges;
_root_plan->collect_scan_nodes(&scan_nodes);
VLOG_CRITICAL << "scan_nodes.size()=" << scan_nodes.size();
VLOG_CRITICAL << "params.per_node_scan_ranges.size()="
<< local_params.per_node_scan_ranges.size();
// set scan range in ScanNode
for (int i = 0; i < scan_nodes.size(); ++i) {
// TODO(cmy): this "if...else" should be removed once all ScanNode are derived from VScanNode.
ExecNode* node = scan_nodes[i];
if (typeid(*node) == typeid(vectorized::NewOlapScanNode) ||
typeid(*node) == typeid(vectorized::NewFileScanNode) ||
typeid(*node) == typeid(vectorized::NewOdbcScanNode) ||
typeid(*node) == typeid(vectorized::NewEsScanNode) ||
typeid(*node) == typeid(vectorized::VMetaScanNode) ||
typeid(*node) == typeid(vectorized::NewJdbcScanNode)) {
auto* scan_node = static_cast<vectorized::VScanNode*>(scan_nodes[i]);
auto scan_ranges = find_with_default(local_params.per_node_scan_ranges, scan_node->id(),
no_scan_ranges);
const bool shared_scan =
find_with_default(local_params.per_node_shared_scans, scan_node->id(), false);
scan_node->set_scan_ranges(_runtime_state.get(), scan_ranges);
scan_node->set_shared_scan(_runtime_state.get(), shared_scan);
} else {
ScanNode* scan_node = static_cast<ScanNode*>(node);
auto scan_ranges = find_with_default(local_params.per_node_scan_ranges, scan_node->id(),
no_scan_ranges);
scan_node->set_scan_ranges(_runtime_state.get(), scan_ranges);
VLOG_CRITICAL << "scan_node_Id=" << scan_node->id()
<< " size=" << scan_ranges.get().size();
}
}
_runtime_state->set_per_fragment_instance_idx(local_params.sender_id);
_runtime_state->set_num_per_fragment_instances(request.num_senders);
if (request.fragment.__isset.output_sink) {
RETURN_IF_ERROR_OR_CATCH_EXCEPTION(DataSink::create_data_sink(
_runtime_state->obj_pool(), request.fragment.output_sink,
request.fragment.output_exprs, request, idx, _root_plan->row_desc(),
_runtime_state.get(), &_sink, *_desc_tbl));
}
_root_pipeline = fragment_context->add_pipeline();
_root_pipeline->set_is_root_pipeline();
RETURN_IF_ERROR(_build_pipelines(_root_plan, _root_pipeline));
if (_sink) {
RETURN_IF_ERROR(_create_sink(request.local_params[idx].sender_id,
request.fragment.output_sink, _runtime_state.get()));
}
RETURN_IF_ERROR(_build_pipeline_tasks(request));
if (_sink) {
_runtime_state->runtime_profile()->add_child(_sink->profile(), true, nullptr);
}
_runtime_state->runtime_profile()->add_child(_root_plan->runtime_profile(), true, nullptr);
_runtime_state->runtime_profile()->add_child(_runtime_profile.get(), true, nullptr);
_prepared = true;
return Status::OK();
}
Status PipelineFragmentContext::_build_pipeline_tasks(
const doris::TPipelineFragmentParams& request) {
_total_tasks = 0;
for (PipelinePtr& pipeline : _pipelines) {
// if sink
auto sink = pipeline->sink()->build_operator();
// TODO pipeline 1 need to add new interface for exec node and operator
sink->init(request.fragment.output_sink);
RETURN_IF_ERROR(pipeline->build_operators());
auto task = std::make_unique<PipelineTask>(pipeline, _total_tasks++, _runtime_state.get(),
sink, this, pipeline->pipeline_profile());
sink->set_child(task->get_root());
_tasks.emplace_back(std::move(task));
_runtime_profile->add_child(pipeline->pipeline_profile(), true, nullptr);
}
for (auto& task : _tasks) {
RETURN_IF_ERROR(task->prepare(_runtime_state.get()));
}
// register the profile of child data stream sender
for (auto& sender : _multi_cast_stream_sink_senders) {
_sink->profile()->add_child(sender->profile(), true, nullptr);
}
return Status::OK();
}
void PipelineFragmentContext::_stop_report_thread() {
if (!_report_thread_active) {
return;
}
_report_thread_active = false;
_stop_report_thread_cv.notify_one();
// Wait infinitly to ensure that the report task is finished and the this variable
// is not used in report thread.
_report_thread_future.wait();
}
void PipelineFragmentContext::report_profile() {
SCOPED_ATTACH_TASK(_runtime_state.get());
VLOG_FILE << "report_profile(): instance_id=" << _runtime_state->fragment_instance_id();
_report_thread_active = true;
std::unique_lock<std::mutex> l(_report_thread_lock);
// tell Open() that we started
_report_thread_started_cv.notify_one();
// Jitter the reporting time of remote fragments by a random amount between
// 0 and the report_interval. This way, the coordinator doesn't get all the
// updates at once so its better for contention as well as smoother progress
// reporting.
int report_fragment_offset = rand() % config::status_report_interval;
// We don't want to wait longer than it takes to run the entire fragment.
_stop_report_thread_cv.wait_for(l, std::chrono::seconds(report_fragment_offset));
while (_report_thread_active) {
if (config::status_report_interval > 0) {
// wait_for can return because the timeout occurred or the condition variable
// was signaled. We can't rely on its return value to distinguish between the
// two cases (e.g. there is a race here where the wait timed out but before grabbing
// the lock, the condition variable was signaled). Instead, we will use an external
// flag, _report_thread_active, to coordinate this.
_stop_report_thread_cv.wait_for(l,
std::chrono::seconds(config::status_report_interval));
} else {
LOG(WARNING) << "config::status_report_interval is equal to or less than zero, exiting "
"reporting thread.";
break;
}
if (VLOG_FILE_IS_ON) {
VLOG_FILE << "Reporting " << (!_report_thread_active ? "final " : " ")
<< "profile for instance " << _runtime_state->fragment_instance_id();
std::stringstream ss;
_runtime_state->runtime_profile()->compute_time_in_profile();
_runtime_state->runtime_profile()->pretty_print(&ss);
if (_runtime_state->load_channel_profile()) {
// _runtime_state->load_channel_profile()->compute_time_in_profile(); // TODO load channel profile add timer
_runtime_state->load_channel_profile()->pretty_print(&ss);
}
VLOG_FILE << ss.str();
}
if (!_report_thread_active) {
break;
}
send_report(false);
}
VLOG_FILE << "exiting reporting thread: instance_id=" << _runtime_state->fragment_instance_id();
}
// TODO: use virtual function to do abstruct
Status PipelineFragmentContext::_build_pipelines(ExecNode* node, PipelinePtr cur_pipe) {
auto node_type = node->type();
switch (node_type) {
// for source
case TPlanNodeType::OLAP_SCAN_NODE:
case TPlanNodeType::JDBC_SCAN_NODE:
case TPlanNodeType::ODBC_SCAN_NODE:
case TPlanNodeType::FILE_SCAN_NODE:
case TPlanNodeType::META_SCAN_NODE:
case TPlanNodeType::ES_HTTP_SCAN_NODE:
case TPlanNodeType::ES_SCAN_NODE: {
OperatorBuilderPtr operator_t = std::make_shared<ScanOperatorBuilder>(node->id(), node);
RETURN_IF_ERROR(cur_pipe->add_operator(operator_t));
break;
}
case TPlanNodeType::MYSQL_SCAN_NODE: {
#ifdef DORIS_WITH_MYSQL
OperatorBuilderPtr operator_t =
std::make_shared<MysqlScanOperatorBuilder>(node->id(), node);
RETURN_IF_ERROR(cur_pipe->add_operator(operator_t));
break;
#else
return Status::InternalError(
"Don't support MySQL table, you should rebuild Doris with WITH_MYSQL option ON");
#endif
}
case TPlanNodeType::SCHEMA_SCAN_NODE: {
OperatorBuilderPtr operator_t =
std::make_shared<SchemaScanOperatorBuilder>(node->id(), node);
RETURN_IF_ERROR(cur_pipe->add_operator(operator_t));
break;
}
case TPlanNodeType::EXCHANGE_NODE: {
OperatorBuilderPtr operator_t =
std::make_shared<ExchangeSourceOperatorBuilder>(node->id(), node);
RETURN_IF_ERROR(cur_pipe->add_operator(operator_t));
break;
}
case TPlanNodeType::EMPTY_SET_NODE: {
OperatorBuilderPtr operator_t =
std::make_shared<EmptySetSourceOperatorBuilder>(node->id(), node);
RETURN_IF_ERROR(cur_pipe->add_operator(operator_t));
break;
}
case TPlanNodeType::DATA_GEN_SCAN_NODE: {
OperatorBuilderPtr operator_t = std::make_shared<DataGenOperatorBuilder>(node->id(), node);
RETURN_IF_ERROR(cur_pipe->add_operator(operator_t));
break;
}
case TPlanNodeType::UNION_NODE: {
auto* union_node = assert_cast<vectorized::VUnionNode*>(node);
if (union_node->children_count() == 0 &&
union_node->get_first_materialized_child_idx() == 0) { // only have const expr
OperatorBuilderPtr builder =
std::make_shared<ConstValueOperatorBuilder>(node->id(), node);
RETURN_IF_ERROR(cur_pipe->add_operator(builder));
} else {
int child_count = union_node->children_count();
auto data_queue = std::make_shared<DataQueue>(child_count);
for (int child_id = 0; child_id < child_count; ++child_id) {
auto new_child_pipeline = add_pipeline();
RETURN_IF_ERROR(_build_pipelines(union_node->child(child_id), new_child_pipeline));
OperatorBuilderPtr child_sink_builder = std::make_shared<UnionSinkOperatorBuilder>(
union_node->id(), child_id, union_node, data_queue);
RETURN_IF_ERROR(new_child_pipeline->set_sink(child_sink_builder));
}
OperatorBuilderPtr source_builder = std::make_shared<UnionSourceOperatorBuilder>(
node->id(), union_node, data_queue);
RETURN_IF_ERROR(cur_pipe->add_operator(source_builder));
}
break;
}
case TPlanNodeType::AGGREGATION_NODE: {
auto* agg_node = dynamic_cast<vectorized::AggregationNode*>(node);
auto new_pipe = add_pipeline();
RETURN_IF_ERROR(_build_pipelines(node->child(0), new_pipe));
if (agg_node->is_probe_expr_ctxs_empty() && node->row_desc().num_slots() == 0) {
return Status::InternalError("Illegal aggregate node " +
std::to_string(agg_node->id()) +
": group by and output is empty");
}
if (agg_node->is_aggregate_evaluators_empty() && !agg_node->is_probe_expr_ctxs_empty()) {
auto data_queue = std::make_shared<DataQueue>(1);
OperatorBuilderPtr pre_agg_sink =
std::make_shared<DistinctStreamingAggSinkOperatorBuilder>(node->id(), agg_node,
data_queue);
RETURN_IF_ERROR(new_pipe->set_sink(pre_agg_sink));
OperatorBuilderPtr pre_agg_source =
std::make_shared<DistinctStreamingAggSourceOperatorBuilder>(
node->id(), agg_node, data_queue);
RETURN_IF_ERROR(cur_pipe->add_operator(pre_agg_source));
} else if (agg_node->is_streaming_preagg() && !agg_node->is_probe_expr_ctxs_empty()) {
auto data_queue = std::make_shared<DataQueue>(1);
OperatorBuilderPtr pre_agg_sink = std::make_shared<StreamingAggSinkOperatorBuilder>(
node->id(), agg_node, data_queue);
RETURN_IF_ERROR(new_pipe->set_sink(pre_agg_sink));
OperatorBuilderPtr pre_agg_source = std::make_shared<StreamingAggSourceOperatorBuilder>(
node->id(), agg_node, data_queue);
RETURN_IF_ERROR(cur_pipe->add_operator(pre_agg_source));
} else {
OperatorBuilderPtr agg_sink =
std::make_shared<AggSinkOperatorBuilder>(node->id(), agg_node);
RETURN_IF_ERROR(new_pipe->set_sink(agg_sink));
OperatorBuilderPtr agg_source =
std::make_shared<AggSourceOperatorBuilder>(node->id(), agg_node);
RETURN_IF_ERROR(cur_pipe->add_operator(agg_source));
}
break;
}
case TPlanNodeType::SORT_NODE: {
auto new_pipeline = add_pipeline();
RETURN_IF_ERROR(_build_pipelines(node->child(0), new_pipeline));
OperatorBuilderPtr sort_sink = std::make_shared<SortSinkOperatorBuilder>(node->id(), node);
RETURN_IF_ERROR(new_pipeline->set_sink(sort_sink));
OperatorBuilderPtr sort_source =
std::make_shared<SortSourceOperatorBuilder>(node->id(), node);
RETURN_IF_ERROR(cur_pipe->add_operator(sort_source));
break;
}
case TPlanNodeType::PARTITION_SORT_NODE: {
auto new_pipeline = add_pipeline();
RETURN_IF_ERROR(_build_pipelines(node->child(0), new_pipeline));
OperatorBuilderPtr partition_sort_sink =
std::make_shared<PartitionSortSinkOperatorBuilder>(node->id(), node);
RETURN_IF_ERROR(new_pipeline->set_sink(partition_sort_sink));
OperatorBuilderPtr partition_sort_source =
std::make_shared<PartitionSortSourceOperatorBuilder>(node->id(), node);
RETURN_IF_ERROR(cur_pipe->add_operator(partition_sort_source));
break;
}
case TPlanNodeType::ANALYTIC_EVAL_NODE: {
auto new_pipeline = add_pipeline();
RETURN_IF_ERROR(_build_pipelines(node->child(0), new_pipeline));
OperatorBuilderPtr analytic_sink =
std::make_shared<AnalyticSinkOperatorBuilder>(node->id(), node);
RETURN_IF_ERROR(new_pipeline->set_sink(analytic_sink));
OperatorBuilderPtr analytic_source =
std::make_shared<AnalyticSourceOperatorBuilder>(node->id(), node);
RETURN_IF_ERROR(cur_pipe->add_operator(analytic_source));
break;
}
case TPlanNodeType::REPEAT_NODE: {
RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe));
OperatorBuilderPtr builder = std::make_shared<RepeatOperatorBuilder>(node->id(), node);
RETURN_IF_ERROR(cur_pipe->add_operator(builder));
break;
}
case TPlanNodeType::ASSERT_NUM_ROWS_NODE: {
RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe));
OperatorBuilderPtr builder =
std::make_shared<AssertNumRowsOperatorBuilder>(node->id(), node);
RETURN_IF_ERROR(cur_pipe->add_operator(builder));
break;
}
case TPlanNodeType::TABLE_FUNCTION_NODE: {
RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe));
OperatorBuilderPtr builder =
std::make_shared<TableFunctionOperatorBuilder>(node->id(), node);
RETURN_IF_ERROR(cur_pipe->add_operator(builder));
break;
}
case TPlanNodeType::HASH_JOIN_NODE: {
auto* join_node = assert_cast<vectorized::HashJoinNode*>(node);
auto new_pipe = add_pipeline();
if (join_node->should_build_hash_table()) {
RETURN_IF_ERROR(_build_pipelines(node->child(1), new_pipe));
} else {
OperatorBuilderPtr builder = std::make_shared<EmptySourceOperatorBuilder>(
node->child(1)->id(), node->child(1)->row_desc(), node->child(1));
new_pipe->add_operator(builder);
}
OperatorBuilderPtr join_sink =
std::make_shared<HashJoinBuildSinkBuilder>(node->id(), join_node);
RETURN_IF_ERROR(new_pipe->set_sink(join_sink));
RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe));
OperatorBuilderPtr join_source =
std::make_shared<HashJoinProbeOperatorBuilder>(node->id(), join_node);
RETURN_IF_ERROR(cur_pipe->add_operator(join_source));
cur_pipe->add_dependency(new_pipe);
break;
}
case TPlanNodeType::CROSS_JOIN_NODE: {
auto new_pipe = add_pipeline();
RETURN_IF_ERROR(_build_pipelines(node->child(1), new_pipe));
OperatorBuilderPtr join_sink =
std::make_shared<NestLoopJoinBuildOperatorBuilder>(node->id(), node);
RETURN_IF_ERROR(new_pipe->set_sink(join_sink));
RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe));
OperatorBuilderPtr join_source =
std::make_shared<NestLoopJoinProbeOperatorBuilder>(node->id(), node);
RETURN_IF_ERROR(cur_pipe->add_operator(join_source));
cur_pipe->add_dependency(new_pipe);
break;
}
case TPlanNodeType::INTERSECT_NODE: {
RETURN_IF_ERROR(_build_operators_for_set_operation_node<true>(node, cur_pipe));
break;
}
case TPlanNodeType::EXCEPT_NODE: {
RETURN_IF_ERROR(_build_operators_for_set_operation_node<false>(node, cur_pipe));
break;
}
case TPlanNodeType::SELECT_NODE: {
RETURN_IF_ERROR(_build_pipelines(node->child(0), cur_pipe));
OperatorBuilderPtr builder = std::make_shared<SelectOperatorBuilder>(node->id(), node);
RETURN_IF_ERROR(cur_pipe->add_operator(builder));
break;
}
default:
return Status::InternalError("Unsupported exec type in pipeline: {}",
print_plan_node_type(node_type));
}
return Status::OK();
}
template <bool is_intersect>
Status PipelineFragmentContext::_build_operators_for_set_operation_node(ExecNode* node,
PipelinePtr cur_pipe) {
auto build_pipeline = add_pipeline();
RETURN_IF_ERROR(_build_pipelines(node->child(0), build_pipeline));
OperatorBuilderPtr sink_builder =
std::make_shared<SetSinkOperatorBuilder<is_intersect>>(node->id(), node);
RETURN_IF_ERROR(build_pipeline->set_sink(sink_builder));
std::vector<PipelinePtr> all_pipelines;
all_pipelines.emplace_back(build_pipeline);
for (int child_id = 1; child_id < node->children_count(); ++child_id) {
auto probe_pipeline = add_pipeline();
RETURN_IF_ERROR(_build_pipelines(node->child(child_id), probe_pipeline));
OperatorBuilderPtr probe_sink_builder =
std::make_shared<SetProbeSinkOperatorBuilder<is_intersect>>(node->id(), child_id,
node);
RETURN_IF_ERROR(probe_pipeline->set_sink(probe_sink_builder));
//eg: These sinks must be completed one by one in order, child(1) must wait child(0) build finish
probe_pipeline->add_dependency(all_pipelines[child_id - 1]);
all_pipelines.emplace_back(probe_pipeline);
}
OperatorBuilderPtr source_builder =
std::make_shared<SetSourceOperatorBuilder<is_intersect>>(node->id(), node);
return cur_pipe->add_operator(source_builder);
}
Status PipelineFragmentContext::submit() {
if (_submitted) {
return Status::InternalError("submitted");
}
_submitted = true;
int submit_tasks = 0;
Status st;
auto* scheduler = _exec_env->pipeline_task_scheduler();
if (_task_group_entity) {
scheduler = _exec_env->pipeline_task_group_scheduler();
}
for (auto& task : _tasks) {
st = scheduler->schedule_task(task.get());
if (!st) {
std::lock_guard<std::mutex> l(_status_lock);
cancel(PPlanFragmentCancelReason::INTERNAL_ERROR, "submit context fail");
_total_tasks = submit_tasks;
break;
}
submit_tasks++;
}
if (!st.ok()) {
std::lock_guard<std::mutex> l(_task_mutex);
if (_closed_tasks == _total_tasks) {
std::call_once(_close_once_flag, [this] { _close_action(); });
}
return Status::InternalError("Submit pipeline failed. err = {}, BE: {}", st.to_string(),
BackendOptions::get_localhost());
} else {
return st;
}
}
void PipelineFragmentContext::close_sink() {
if (_sink) {
if (_prepared) {
_sink->close(_runtime_state.get(), Status::RuntimeError("prepare failed"));
} else {
_sink->close(_runtime_state.get(), Status::OK());
}
}
}
void PipelineFragmentContext::close_if_prepare_failed() {
if (_tasks.empty()) {
if (_root_plan) {
_root_plan->close(_runtime_state.get());
}
if (_sink) {
_sink->close(_runtime_state.get(), Status::RuntimeError("prepare failed"));
}
}
for (auto& task : _tasks) {
DCHECK(!task->is_pending_finish());
WARN_IF_ERROR(task->close(), "close_if_prepare_failed failed: ");
close_a_pipeline();
}
}
// construct sink operator
Status PipelineFragmentContext::_create_sink(int sender_id, const TDataSink& thrift_sink,
RuntimeState* state) {
OperatorBuilderPtr sink_;
switch (thrift_sink.type) {
case TDataSinkType::DATA_STREAM_SINK: {
sink_ = std::make_shared<ExchangeSinkOperatorBuilder>(thrift_sink.stream_sink.dest_node_id,
_sink.get(), this);
break;
}
case TDataSinkType::RESULT_SINK: {
sink_ = std::make_shared<ResultSinkOperatorBuilder>(next_operator_builder_id(),
_sink.get());
break;
}
case TDataSinkType::OLAP_TABLE_SINK: {
sink_ = std::make_shared<OlapTableSinkOperatorBuilder>(next_operator_builder_id(),
_sink.get());
break;
}
case TDataSinkType::MYSQL_TABLE_SINK:
case TDataSinkType::JDBC_TABLE_SINK:
case TDataSinkType::ODBC_TABLE_SINK: {
sink_ = std::make_shared<TableSinkOperatorBuilder>(next_operator_builder_id(), _sink.get());
break;
}
case TDataSinkType::RESULT_FILE_SINK: {
sink_ = std::make_shared<ResultFileSinkOperatorBuilder>(
thrift_sink.result_file_sink.dest_node_id, _sink.get());
break;
}
case TDataSinkType::MULTI_CAST_DATA_STREAM_SINK: {
sink_ = std::make_shared<MultiCastDataStreamSinkOperatorBuilder>(next_operator_builder_id(),
_sink.get());
RETURN_IF_ERROR(_root_pipeline->set_sink(sink_));
auto& multi_cast_data_streamer =
assert_cast<vectorized::MultiCastDataStreamSink*>(_sink.get())
->get_multi_cast_data_streamer();
DCHECK_EQ(thrift_sink.multi_cast_stream_sink.sinks.size(),
thrift_sink.multi_cast_stream_sink.destinations.size());
auto sender_size = thrift_sink.multi_cast_stream_sink.sinks.size();
_multi_cast_stream_sink_senders.resize(sender_size);
for (int i = 0; i < sender_size; ++i) {
auto new_pipeline = add_pipeline();
auto row_desc =
!thrift_sink.multi_cast_stream_sink.sinks[i].output_exprs.empty()
? RowDescriptor(
_runtime_state->desc_tbl(),
{thrift_sink.multi_cast_stream_sink.sinks[i].output_tuple_id},
{false})
: sink_->row_desc();
// 1. create the data stream sender sink
_multi_cast_stream_sink_senders[i].reset(new vectorized::VDataStreamSender(
_runtime_state.get(), _runtime_state->obj_pool(), sender_id, row_desc,
thrift_sink.multi_cast_stream_sink.sinks[i],
thrift_sink.multi_cast_stream_sink.destinations[i], 16 * 1024));
// 2. create and set the source operator of multi_cast_data_stream_source for new pipeline
OperatorBuilderPtr source_op =
std::make_shared<MultiCastDataStreamerSourceOperatorBuilder>(
next_operator_builder_id(), i, multi_cast_data_streamer,
thrift_sink.multi_cast_stream_sink.sinks[i]);
new_pipeline->add_operator(source_op);
// 3. create and set sink operator of data stream sender for new pipeline
OperatorBuilderPtr sink_op_builder = std::make_shared<ExchangeSinkOperatorBuilder>(
next_operator_builder_id(), _multi_cast_stream_sink_senders[i].get(), this, i);
new_pipeline->set_sink(sink_op_builder);
// 4. init and prepare the data_stream_sender of diff exchange
TDataSink t;
t.stream_sink = thrift_sink.multi_cast_stream_sink.sinks[i];
RETURN_IF_ERROR(_multi_cast_stream_sink_senders[i]->init(t));
RETURN_IF_ERROR(_multi_cast_stream_sink_senders[i]->prepare(state));
}
return Status::OK();
}
default:
return Status::InternalError("Unsuported sink type in pipeline: {}", thrift_sink.type);
}
return _root_pipeline->set_sink(sink_);
}
void PipelineFragmentContext::_close_action() {
_runtime_profile->total_time_counter()->update(_fragment_watcher.elapsed_time());
send_report(true);
_stop_report_thread();
// all submitted tasks done
_exec_env->fragment_mgr()->remove_pipeline_context(shared_from_this());
}
void PipelineFragmentContext::close_a_pipeline() {
std::lock_guard<std::mutex> l(_task_mutex);
++_closed_tasks;
if (_closed_tasks == _total_tasks) {
std::call_once(_close_once_flag, [this] { _close_action(); });
}
}
void PipelineFragmentContext::send_report(bool done) {
Status exec_status = Status::OK();
{
std::lock_guard<std::mutex> l(_status_lock);
exec_status = _exec_status;
}
// If plan is done successfully, but _is_report_success is false,
// no need to send report.
if (!_is_report_success && done && exec_status.ok()) {
return;
}
// If both _is_report_success and _is_report_on_cancel are false,
// which means no matter query is success or failed, no report is needed.
// This may happen when the query limit reached and
// a internal cancellation being processed
if (!_is_report_success && !_is_report_on_cancel) {
return;
}
_report_status_cb(
{exec_status,
_runtime_state->enable_profile() ? _runtime_state->runtime_profile() : nullptr,
_runtime_state->enable_profile() ? _runtime_state->load_channel_profile() : nullptr,
done || !exec_status.ok(), _query_ctx->coord_addr, _query_id, _fragment_id,
_fragment_instance_id, _backend_num, _runtime_state.get(),
std::bind(&PipelineFragmentContext::update_status, this, std::placeholders::_1),
std::bind(&PipelineFragmentContext::cancel, this, std::placeholders::_1,
std::placeholders::_2)});
}
} // namespace doris::pipeline