blob: 7aab3e99c743293a6067e66fa906394cf7e3aaa6 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#include "operator.h"
#include "common/status.h"
#include "pipeline/dependency.h"
#include "pipeline/exec/aggregation_sink_operator.h"
#include "pipeline/exec/aggregation_source_operator.h"
#include "pipeline/exec/analytic_sink_operator.h"
#include "pipeline/exec/analytic_source_operator.h"
#include "pipeline/exec/assert_num_rows_operator.h"
#include "pipeline/exec/cache_sink_operator.h"
#include "pipeline/exec/cache_source_operator.h"
#include "pipeline/exec/datagen_operator.h"
#include "pipeline/exec/dict_sink_operator.h"
#include "pipeline/exec/distinct_streaming_aggregation_operator.h"
#include "pipeline/exec/empty_set_operator.h"
#include "pipeline/exec/es_scan_operator.h"
#include "pipeline/exec/exchange_sink_operator.h"
#include "pipeline/exec/exchange_source_operator.h"
#include "pipeline/exec/file_scan_operator.h"
#include "pipeline/exec/group_commit_block_sink_operator.h"
#include "pipeline/exec/group_commit_scan_operator.h"
#include "pipeline/exec/hashjoin_build_sink.h"
#include "pipeline/exec/hashjoin_probe_operator.h"
#include "pipeline/exec/hive_table_sink_operator.h"
#include "pipeline/exec/iceberg_table_sink_operator.h"
#include "pipeline/exec/jdbc_scan_operator.h"
#include "pipeline/exec/jdbc_table_sink_operator.h"
#include "pipeline/exec/local_merge_sort_source_operator.h"
#include "pipeline/exec/materialization_sink_operator.h"
#include "pipeline/exec/materialization_source_operator.h"
#include "pipeline/exec/memory_scratch_sink_operator.h"
#include "pipeline/exec/meta_scan_operator.h"
#include "pipeline/exec/mock_operator.h"
#include "pipeline/exec/mock_scan_operator.h"
#include "pipeline/exec/multi_cast_data_stream_sink.h"
#include "pipeline/exec/multi_cast_data_stream_source.h"
#include "pipeline/exec/nested_loop_join_build_operator.h"
#include "pipeline/exec/nested_loop_join_probe_operator.h"
#include "pipeline/exec/olap_scan_operator.h"
#include "pipeline/exec/olap_table_sink_operator.h"
#include "pipeline/exec/olap_table_sink_v2_operator.h"
#include "pipeline/exec/partition_sort_sink_operator.h"
#include "pipeline/exec/partition_sort_source_operator.h"
#include "pipeline/exec/partitioned_aggregation_sink_operator.h"
#include "pipeline/exec/partitioned_aggregation_source_operator.h"
#include "pipeline/exec/partitioned_hash_join_probe_operator.h"
#include "pipeline/exec/partitioned_hash_join_sink_operator.h"
#include "pipeline/exec/repeat_operator.h"
#include "pipeline/exec/result_file_sink_operator.h"
#include "pipeline/exec/result_sink_operator.h"
#include "pipeline/exec/schema_scan_operator.h"
#include "pipeline/exec/select_operator.h"
#include "pipeline/exec/set_probe_sink_operator.h"
#include "pipeline/exec/set_sink_operator.h"
#include "pipeline/exec/set_source_operator.h"
#include "pipeline/exec/sort_sink_operator.h"
#include "pipeline/exec/sort_source_operator.h"
#include "pipeline/exec/spill_sort_sink_operator.h"
#include "pipeline/exec/spill_sort_source_operator.h"
#include "pipeline/exec/streaming_aggregation_operator.h"
#include "pipeline/exec/table_function_operator.h"
#include "pipeline/exec/union_sink_operator.h"
#include "pipeline/exec/union_source_operator.h"
#include "pipeline/local_exchange/local_exchange_sink_operator.h"
#include "pipeline/local_exchange/local_exchange_source_operator.h"
#include "pipeline/pipeline.h"
#include "util/debug_util.h"
#include "util/runtime_profile.h"
#include "util/string_util.h"
#include "vec/exprs/vexpr.h"
#include "vec/exprs/vexpr_context.h"
#include "vec/utils/util.hpp"
namespace doris {
#include "common/compile_check_begin.h"
class RowDescriptor;
class RuntimeState;
} // namespace doris
namespace doris::pipeline {
Status OperatorBase::close(RuntimeState* state) {
if (_is_closed) {
return Status::OK();
}
_is_closed = true;
return Status::OK();
}
template <typename SharedStateArg>
std::string PipelineXLocalState<SharedStateArg>::name_suffix() const {
if (_parent->nereids_id() == -1) {
return fmt::format(operator_name_suffix, std::to_string(_parent->node_id()));
} else {
return fmt::format("(nereids_id={})" + operator_name_suffix,
std::to_string(_parent->nereids_id()),
std::to_string(_parent->node_id()));
}
}
template <typename SharedStateArg>
std::string PipelineXSinkLocalState<SharedStateArg>::name_suffix() {
if (_parent->nereids_id() == -1) {
return fmt::format(operator_name_suffix, std::to_string(_parent->node_id()));
} else {
return fmt::format("(nereids_id={})" + operator_name_suffix,
std::to_string(_parent->nereids_id()),
std::to_string(_parent->node_id()));
}
}
template <typename SharedStateArg>
Status PipelineXSinkLocalState<SharedStateArg>::terminate(RuntimeState* state) {
if (_terminated) {
return Status::OK();
}
_terminated = true;
return Status::OK();
}
DataDistribution OperatorBase::required_data_distribution() const {
return _child && _child->is_serial_operator() && !is_source()
? DataDistribution(ExchangeType::PASSTHROUGH)
: DataDistribution(ExchangeType::NOOP);
}
bool OperatorBase::require_shuffled_data_distribution() const {
return Pipeline::is_hash_exchange(required_data_distribution().distribution_type);
}
const RowDescriptor& OperatorBase::row_desc() const {
return _child->row_desc();
}
template <typename SharedStateArg>
std::string PipelineXLocalState<SharedStateArg>::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}", _parent->debug_string(indentation_level));
return fmt::to_string(debug_string_buffer);
}
template <typename SharedStateArg>
std::string PipelineXSinkLocalState<SharedStateArg>::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}", _parent->debug_string(indentation_level));
return fmt::to_string(debug_string_buffer);
}
std::string OperatorXBase::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}{}: id={}, parallel_tasks={}, _is_serial_operator={}",
std::string(indentation_level * 2, ' '), _op_name, node_id(), _parallel_tasks,
_is_serial_operator);
return fmt::to_string(debug_string_buffer);
}
std::string OperatorXBase::debug_string(RuntimeState* state, int indentation_level) const {
return state->get_local_state(operator_id())->debug_string(indentation_level);
}
Status OperatorXBase::init(const TPlanNode& tnode, RuntimeState* /*state*/) {
std::string node_name = print_plan_node_type(tnode.node_type);
_nereids_id = tnode.nereids_id;
if (!tnode.intermediate_output_tuple_id_list.empty()) {
if (!tnode.__isset.output_tuple_id) {
return Status::InternalError("no final output tuple id");
}
if (tnode.intermediate_output_tuple_id_list.size() !=
tnode.intermediate_projections_list.size()) {
return Status::InternalError(
"intermediate_output_tuple_id_list size:{} not match "
"intermediate_projections_list size:{}",
tnode.intermediate_output_tuple_id_list.size(),
tnode.intermediate_projections_list.size());
}
}
auto substr = node_name.substr(0, node_name.find("_NODE"));
_op_name = substr + "_OPERATOR";
if (tnode.__isset.vconjunct) {
vectorized::VExprContextSPtr context;
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(tnode.vconjunct, context));
_conjuncts.emplace_back(context);
} else if (tnode.__isset.conjuncts) {
for (auto& conjunct : tnode.conjuncts) {
vectorized::VExprContextSPtr context;
RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(conjunct, context));
_conjuncts.emplace_back(context);
}
}
// create the projections expr
if (tnode.__isset.projections) {
DCHECK(tnode.__isset.output_tuple_id);
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(tnode.projections, _projections));
}
if (!tnode.intermediate_projections_list.empty()) {
DCHECK(tnode.__isset.projections) << "no final projections";
_intermediate_projections.reserve(tnode.intermediate_projections_list.size());
for (const auto& tnode_projections : tnode.intermediate_projections_list) {
vectorized::VExprContextSPtrs projections;
RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(tnode_projections, projections));
_intermediate_projections.push_back(projections);
}
}
return Status::OK();
}
Status OperatorXBase::prepare(RuntimeState* state) {
for (auto& conjunct : _conjuncts) {
RETURN_IF_ERROR(conjunct->prepare(state, intermediate_row_desc()));
}
for (int i = 0; i < _intermediate_projections.size(); i++) {
RETURN_IF_ERROR(vectorized::VExpr::prepare(_intermediate_projections[i], state,
intermediate_row_desc(i)));
}
RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state, projections_row_desc()));
if (has_output_row_desc()) {
RETURN_IF_ERROR(
vectorized::VExpr::check_expr_output_type(_projections, *_output_row_descriptor));
}
for (auto& conjunct : _conjuncts) {
RETURN_IF_ERROR(conjunct->open(state));
}
RETURN_IF_ERROR(vectorized::VExpr::open(_projections, state));
for (auto& projections : _intermediate_projections) {
RETURN_IF_ERROR(vectorized::VExpr::open(projections, state));
}
if (_child && !is_source()) {
RETURN_IF_ERROR(_child->prepare(state));
}
return Status::OK();
}
Status OperatorXBase::terminate(RuntimeState* state) {
if (_child && !is_source()) {
RETURN_IF_ERROR(_child->terminate(state));
}
auto result = state->get_local_state_result(operator_id());
if (!result) {
return result.error();
}
return result.value()->terminate(state);
}
Status OperatorXBase::close(RuntimeState* state) {
if (_child && !is_source()) {
RETURN_IF_ERROR(_child->close(state));
}
auto result = state->get_local_state_result(operator_id());
if (!result) {
return result.error();
}
return result.value()->close(state);
}
void PipelineXLocalStateBase::clear_origin_block() {
_origin_block.clear_column_data(_parent->intermediate_row_desc().num_materialized_slots());
}
Status PipelineXLocalStateBase::filter_block(const vectorized::VExprContextSPtrs& expr_contexts,
vectorized::Block* block, size_t column_to_keep) {
RETURN_IF_ERROR(vectorized::VExprContext::filter_block(expr_contexts, block, column_to_keep));
_estimate_memory_usage += vectorized::VExprContext::get_memory_usage(expr_contexts);
return Status::OK();
}
Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* origin_block,
vectorized::Block* output_block) const {
auto* local_state = state->get_local_state(operator_id());
SCOPED_TIMER(local_state->exec_time_counter());
SCOPED_TIMER(local_state->_projection_timer);
const size_t rows = origin_block->rows();
if (rows == 0) {
return Status::OK();
}
vectorized::Block input_block = *origin_block;
std::vector<int> result_column_ids;
size_t bytes_usage = 0;
for (const auto& projections : local_state->_intermediate_projections) {
result_column_ids.resize(projections.size());
for (int i = 0; i < projections.size(); i++) {
RETURN_IF_ERROR(projections[i]->execute(&input_block, &result_column_ids[i]));
}
bytes_usage += input_block.allocated_bytes();
input_block.shuffle_columns(result_column_ids);
}
DCHECK_EQ(rows, input_block.rows());
auto insert_column_datas = [&](auto& to, vectorized::ColumnPtr& from, size_t rows) {
if (to->is_nullable() && !from->is_nullable()) {
if (_keep_origin || !from->is_exclusive()) {
auto& null_column = reinterpret_cast<vectorized::ColumnNullable&>(*to);
null_column.get_nested_column().insert_range_from(*from, 0, rows);
null_column.get_null_map_column().get_data().resize_fill(rows, 0);
bytes_usage += null_column.allocated_bytes();
} else {
to = make_nullable(from, false)->assume_mutable();
}
} else {
if (_keep_origin || !from->is_exclusive()) {
to->insert_range_from(*from, 0, rows);
bytes_usage += from->allocated_bytes();
} else {
to = from->assume_mutable();
}
}
};
using namespace vectorized;
vectorized::MutableBlock mutable_block =
vectorized::VectorizedUtils::build_mutable_mem_reuse_block(output_block,
*_output_row_descriptor);
if (rows != 0) {
auto& mutable_columns = mutable_block.mutable_columns();
const size_t origin_columns_count = input_block.columns();
DCHECK_EQ(mutable_columns.size(), local_state->_projections.size()) << debug_string();
for (int i = 0; i < mutable_columns.size(); ++i) {
auto result_column_id = -1;
RETURN_IF_ERROR(local_state->_projections[i]->execute(&input_block, &result_column_id));
auto column_ptr = input_block.get_by_position(result_column_id)
.column->convert_to_full_column_if_const();
if (result_column_id >= origin_columns_count) {
bytes_usage += column_ptr->allocated_bytes();
}
insert_column_datas(mutable_columns[i], column_ptr, rows);
}
DCHECK(mutable_block.rows() == rows);
output_block->set_columns(std::move(mutable_columns));
}
local_state->_estimate_memory_usage += bytes_usage;
return Status::OK();
}
Status OperatorXBase::get_block_after_projects(RuntimeState* state, vectorized::Block* block,
bool* eos) {
DBUG_EXECUTE_IF("Pipeline::return_empty_block", {
if (this->_op_name == "AGGREGATION_OPERATOR" || this->_op_name == "HASH_JOIN_OPERATOR" ||
this->_op_name == "PARTITIONED_AGGREGATION_OPERATOR" ||
this->_op_name == "PARTITIONED_HASH_JOIN_OPERATOR" ||
this->_op_name == "CROSS_JOIN_OPERATOR" || this->_op_name == "SORT_OPERATOR") {
if (_debug_point_count++ % 2 == 0) {
return Status::OK();
}
}
});
Status status;
auto* local_state = state->get_local_state(operator_id());
Defer defer([&]() {
if (status.ok()) {
if (auto rows = block->rows()) {
COUNTER_UPDATE(local_state->_rows_returned_counter, rows);
COUNTER_UPDATE(local_state->_blocks_returned_counter, 1);
}
}
});
if (_output_row_descriptor) {
local_state->clear_origin_block();
status = get_block(state, &local_state->_origin_block, eos);
if (UNLIKELY(!status.ok())) {
return status;
}
status = do_projections(state, &local_state->_origin_block, block);
return status;
}
status = get_block(state, block, eos);
RETURN_IF_ERROR(block->check_type_and_column());
return status;
}
void PipelineXLocalStateBase::reached_limit(vectorized::Block* block, bool* eos) {
if (_parent->_limit != -1 and _num_rows_returned + block->rows() >= _parent->_limit) {
block->set_num_rows(_parent->_limit - _num_rows_returned);
*eos = true;
}
DBUG_EXECUTE_IF("Pipeline::reached_limit_early", {
auto op_name = to_lower(_parent->_op_name);
auto arg_op_name = dp->param<std::string>("op_name");
arg_op_name = to_lower(arg_op_name);
if (op_name == arg_op_name) {
*eos = true;
}
});
if (auto rows = block->rows()) {
_num_rows_returned += rows;
}
}
Status DataSinkOperatorXBase::terminate(RuntimeState* state) {
auto result = state->get_sink_local_state_result();
if (!result) {
return result.error();
}
return result.value()->terminate(state);
}
std::string DataSinkOperatorXBase::debug_string(int indentation_level) const {
fmt::memory_buffer debug_string_buffer;
fmt::format_to(debug_string_buffer, "{}{}: id={}, _is_serial_operator={}",
std::string(indentation_level * 2, ' '), _name, node_id(), _is_serial_operator);
return fmt::to_string(debug_string_buffer);
}
std::string DataSinkOperatorXBase::debug_string(RuntimeState* state, int indentation_level) const {
return state->get_sink_local_state()->debug_string(indentation_level);
}
Status DataSinkOperatorXBase::init(const TDataSink& tsink) {
std::string op_name = "UNKNOWN_SINK";
auto it = _TDataSinkType_VALUES_TO_NAMES.find(tsink.type);
if (it != _TDataSinkType_VALUES_TO_NAMES.end()) {
op_name = it->second;
}
_name = op_name + "_OPERATOR";
return Status::OK();
}
Status DataSinkOperatorXBase::init(const TPlanNode& tnode, RuntimeState* state) {
std::string op_name = print_plan_node_type(tnode.node_type);
_nereids_id = tnode.nereids_id;
auto substr = op_name.substr(0, op_name.find("_NODE"));
_name = substr + "_SINK_OPERATOR";
return Status::OK();
}
template <typename LocalStateType>
Status DataSinkOperatorX<LocalStateType>::setup_local_state(RuntimeState* state,
LocalSinkStateInfo& info) {
auto local_state = LocalStateType::create_unique(this, state);
RETURN_IF_ERROR(local_state->init(state, info));
state->emplace_sink_local_state(operator_id(), std::move(local_state));
return Status::OK();
}
template <typename LocalStateType>
std::shared_ptr<BasicSharedState> DataSinkOperatorX<LocalStateType>::create_shared_state() const {
if constexpr (std::is_same_v<typename LocalStateType::SharedStateType,
LocalExchangeSharedState>) {
return nullptr;
} else if constexpr (std::is_same_v<typename LocalStateType::SharedStateType,
MultiCastSharedState>) {
throw Exception(Status::FatalError("should not reach here!"));
} else {
auto ss = LocalStateType::SharedStateType::create_shared();
ss->id = operator_id();
for (auto& dest : dests_id()) {
ss->related_op_ids.insert(dest);
}
return ss;
}
}
template <typename LocalStateType>
Status OperatorX<LocalStateType>::setup_local_state(RuntimeState* state, LocalStateInfo& info) {
auto local_state = LocalStateType::create_unique(state, this);
RETURN_IF_ERROR(local_state->init(state, info));
state->emplace_local_state(operator_id(), std::move(local_state));
return Status::OK();
}
PipelineXSinkLocalStateBase::PipelineXSinkLocalStateBase(DataSinkOperatorXBase* parent,
RuntimeState* state)
: _parent(parent), _state(state) {}
PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state, OperatorXBase* parent)
: _num_rows_returned(0), _rows_returned_counter(nullptr), _parent(parent), _state(state) {}
template <typename SharedStateArg>
Status PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalStateInfo& info) {
_operator_profile.reset(new RuntimeProfile(_parent->get_name() + name_suffix()));
_common_profile.reset(new RuntimeProfile("CommonCounters"));
_custom_profile.reset(new RuntimeProfile("CustomCounters"));
_operator_profile->set_metadata(_parent->node_id());
// indent is false so that source operator will have same
// indentation_level with its parent operator.
info.parent_profile->add_child(_operator_profile.get(), /*indent=*/false);
_operator_profile->add_child(_common_profile.get(), true);
_operator_profile->add_child(_custom_profile.get(), true);
constexpr auto is_fake_shared = std::is_same_v<SharedStateArg, FakeSharedState>;
if constexpr (!is_fake_shared) {
if (info.shared_state_map.find(_parent->operator_id()) != info.shared_state_map.end()) {
_shared_state = info.shared_state_map.at(_parent->operator_id())
.first.get()
->template cast<SharedStateArg>();
_dependency = _shared_state->get_dep_by_channel_id(info.task_idx).front().get();
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
_common_profile, "WaitForDependency[" + _dependency->name() + "]Time", 1);
} else if (info.shared_state) {
if constexpr (std::is_same_v<LocalExchangeSharedState, SharedStateArg>) {
DCHECK(false);
}
// For UnionSourceOperator without children, there is no shared state.
_shared_state = info.shared_state->template cast<SharedStateArg>();
_dependency = _shared_state->create_source_dependency(
_parent->operator_id(), _parent->node_id(), _parent->get_name());
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
_common_profile, "WaitForDependency[" + _dependency->name() + "]Time", 1);
} else {
if constexpr (std::is_same_v<LocalExchangeSharedState, SharedStateArg>) {
DCHECK(false);
}
}
}
if (must_set_shared_state() && _shared_state == nullptr) {
return Status::InternalError("must set shared state, in {}", _parent->get_name());
}
_rows_returned_counter =
ADD_COUNTER_WITH_LEVEL(_common_profile, "RowsProduced", TUnit::UNIT, 1);
_blocks_returned_counter =
ADD_COUNTER_WITH_LEVEL(_common_profile, "BlocksProduced", TUnit::UNIT, 1);
_projection_timer = ADD_TIMER_WITH_LEVEL(_common_profile, "ProjectionTime", 2);
_init_timer = ADD_TIMER_WITH_LEVEL(_common_profile, "InitTime", 2);
_open_timer = ADD_TIMER_WITH_LEVEL(_common_profile, "OpenTime", 2);
_close_timer = ADD_TIMER_WITH_LEVEL(_common_profile, "CloseTime", 2);
_exec_timer = ADD_TIMER_WITH_LEVEL(_common_profile, "ExecTime", 1);
_memory_used_counter =
_common_profile->AddHighWaterMarkCounter("MemoryUsage", TUnit::BYTES, "", 1);
return Status::OK();
}
template <typename SharedStateArg>
Status PipelineXLocalState<SharedStateArg>::open(RuntimeState* state) {
_conjuncts.resize(_parent->_conjuncts.size());
_projections.resize(_parent->_projections.size());
for (size_t i = 0; i < _conjuncts.size(); i++) {
RETURN_IF_ERROR(_parent->_conjuncts[i]->clone(state, _conjuncts[i]));
}
for (size_t i = 0; i < _projections.size(); i++) {
RETURN_IF_ERROR(_parent->_projections[i]->clone(state, _projections[i]));
}
_intermediate_projections.resize(_parent->_intermediate_projections.size());
for (int i = 0; i < _parent->_intermediate_projections.size(); i++) {
_intermediate_projections[i].resize(_parent->_intermediate_projections[i].size());
for (int j = 0; j < _parent->_intermediate_projections[i].size(); j++) {
RETURN_IF_ERROR(_parent->_intermediate_projections[i][j]->clone(
state, _intermediate_projections[i][j]));
}
}
return Status::OK();
}
template <typename SharedStateArg>
Status PipelineXLocalState<SharedStateArg>::terminate(RuntimeState* state) {
if (_terminated) {
return Status::OK();
}
_terminated = true;
return Status::OK();
}
template <typename SharedStateArg>
Status PipelineXLocalState<SharedStateArg>::close(RuntimeState* state) {
if (_closed) {
return Status::OK();
}
if constexpr (!std::is_same_v<SharedStateArg, FakeSharedState>) {
COUNTER_SET(_wait_for_dependency_timer, _dependency->watcher_elapse_time());
}
_closed = true;
return Status::OK();
}
template <typename SharedState>
Status PipelineXSinkLocalState<SharedState>::init(RuntimeState* state, LocalSinkStateInfo& info) {
// create profile
_operator_profile =
state->obj_pool()->add(new RuntimeProfile(_parent->get_name() + name_suffix()));
_common_profile = state->obj_pool()->add(new RuntimeProfile("CommonCounters"));
_custom_profile = state->obj_pool()->add(new RuntimeProfile("CustomCounters"));
// indentation is true
// The parent profile of sink operator is usually a RuntimeProfile called PipelineTask.
// So we should set the indentation to true.
info.parent_profile->add_child(_operator_profile, /*indent=*/true);
_operator_profile->add_child(_common_profile, true);
_operator_profile->add_child(_custom_profile, true);
_operator_profile->set_metadata(_parent->node_id());
_wait_for_finish_dependency_timer = ADD_TIMER(_common_profile, "PendingFinishDependency");
constexpr auto is_fake_shared = std::is_same_v<SharedState, FakeSharedState>;
if constexpr (!is_fake_shared) {
if (info.shared_state_map.find(_parent->dests_id().front()) !=
info.shared_state_map.end()) {
if constexpr (std::is_same_v<LocalExchangeSharedState, SharedState>) {
DCHECK(info.shared_state_map.at(_parent->dests_id().front()).second.size() == 1);
}
_dependency = info.shared_state_map.at(_parent->dests_id().front())
.second[std::is_same_v<LocalExchangeSharedState, SharedState>
? 0
: info.task_idx]
.get();
_shared_state = _dependency->shared_state()->template cast<SharedState>();
} else {
if constexpr (std::is_same_v<LocalExchangeSharedState, SharedState>) {
DCHECK(false);
}
_shared_state = info.shared_state->template cast<SharedState>();
_dependency = _shared_state->create_sink_dependency(
_parent->dests_id().front(), _parent->node_id(), _parent->get_name());
}
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
_common_profile, "WaitForDependency[" + _dependency->name() + "]Time", 1);
}
if (must_set_shared_state() && _shared_state == nullptr) {
return Status::InternalError("must set shared state, in {}", _parent->get_name());
}
_rows_input_counter = ADD_COUNTER_WITH_LEVEL(_common_profile, "InputRows", TUnit::UNIT, 1);
_init_timer = ADD_TIMER_WITH_LEVEL(_common_profile, "InitTime", 2);
_open_timer = ADD_TIMER_WITH_LEVEL(_common_profile, "OpenTime", 2);
_close_timer = ADD_TIMER_WITH_LEVEL(_common_profile, "CloseTime", 2);
_exec_timer = ADD_TIMER_WITH_LEVEL(_common_profile, "ExecTime", 1);
_memory_used_counter =
_common_profile->AddHighWaterMarkCounter("MemoryUsage", TUnit::BYTES, "", 1);
return Status::OK();
}
template <typename SharedState>
Status PipelineXSinkLocalState<SharedState>::close(RuntimeState* state, Status exec_status) {
if (_closed) {
return Status::OK();
}
if constexpr (!std::is_same_v<SharedState, FakeSharedState>) {
COUNTER_SET(_wait_for_dependency_timer, _dependency->watcher_elapse_time());
}
_closed = true;
return Status::OK();
}
template <typename LocalStateType>
Status StreamingOperatorX<LocalStateType>::get_block(RuntimeState* state, vectorized::Block* block,
bool* eos) {
RETURN_IF_ERROR(OperatorX<LocalStateType>::_child->get_block_after_projects(state, block, eos));
return pull(state, block, eos);
}
template <typename LocalStateType>
Status StatefulOperatorX<LocalStateType>::get_block(RuntimeState* state, vectorized::Block* block,
bool* eos) {
auto& local_state = get_local_state(state);
if (need_more_input_data(state)) {
local_state._child_block->clear_column_data(
OperatorX<LocalStateType>::_child->row_desc().num_materialized_slots());
RETURN_IF_ERROR(OperatorX<LocalStateType>::_child->get_block_after_projects(
state, local_state._child_block.get(), &local_state._child_eos));
*eos = local_state._child_eos;
if (local_state._child_block->rows() == 0 && !local_state._child_eos) {
return Status::OK();
}
{
SCOPED_TIMER(local_state.exec_time_counter());
RETURN_IF_ERROR(push(state, local_state._child_block.get(), local_state._child_eos));
}
}
if (!need_more_input_data(state)) {
SCOPED_TIMER(local_state.exec_time_counter());
bool new_eos = false;
RETURN_IF_ERROR(pull(state, block, &new_eos));
if (new_eos) {
*eos = true;
} else if (!need_more_input_data(state)) {
*eos = false;
}
}
return Status::OK();
}
template <typename Writer, typename Parent>
requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>)
Status AsyncWriterSink<Writer, Parent>::init(RuntimeState* state, LocalSinkStateInfo& info) {
RETURN_IF_ERROR(Base::init(state, info));
_async_writer_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(),
"AsyncWriterDependency", true);
_writer.reset(new Writer(info.tsink, _output_vexpr_ctxs, _async_writer_dependency,
_finish_dependency));
_wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL(
common_profile(), "WaitForDependency[" + _async_writer_dependency->name() + "]Time", 1);
return Status::OK();
}
template <typename Writer, typename Parent>
requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>)
Status AsyncWriterSink<Writer, Parent>::open(RuntimeState* state) {
RETURN_IF_ERROR(Base::open(state));
_output_vexpr_ctxs.resize(_parent->cast<Parent>()._output_vexpr_ctxs.size());
for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) {
RETURN_IF_ERROR(
_parent->cast<Parent>()._output_vexpr_ctxs[i]->clone(state, _output_vexpr_ctxs[i]));
}
RETURN_IF_ERROR(_writer->start_writer(state, operator_profile()));
return Status::OK();
}
template <typename Writer, typename Parent>
requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>)
Status AsyncWriterSink<Writer, Parent>::sink(RuntimeState* state, vectorized::Block* block,
bool eos) {
return _writer->sink(block, eos);
}
template <typename Writer, typename Parent>
requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>)
Status AsyncWriterSink<Writer, Parent>::close(RuntimeState* state, Status exec_status) {
if (_closed) {
return Status::OK();
}
COUNTER_SET(_wait_for_dependency_timer, _async_writer_dependency->watcher_elapse_time());
COUNTER_SET(_wait_for_finish_dependency_timer, _finish_dependency->watcher_elapse_time());
// if the init failed, the _writer may be nullptr. so here need check
if (_writer) {
Status st = _writer->get_writer_status();
if (exec_status.ok()) {
_writer->force_close(state->is_cancelled() ? state->cancel_reason()
: Status::Cancelled("force close"));
} else {
_writer->force_close(exec_status);
}
// If there is an error in process_block thread, then we should get the writer
// status before call force_close. For example, the thread may failed in commit
// transaction.
RETURN_IF_ERROR(st);
}
return Base::close(state, exec_status);
}
#define DECLARE_OPERATOR(LOCAL_STATE) template class DataSinkOperatorX<LOCAL_STATE>;
DECLARE_OPERATOR(HashJoinBuildSinkLocalState)
DECLARE_OPERATOR(ResultSinkLocalState)
DECLARE_OPERATOR(JdbcTableSinkLocalState)
DECLARE_OPERATOR(MemoryScratchSinkLocalState)
DECLARE_OPERATOR(ResultFileSinkLocalState)
DECLARE_OPERATOR(OlapTableSinkLocalState)
DECLARE_OPERATOR(OlapTableSinkV2LocalState)
DECLARE_OPERATOR(HiveTableSinkLocalState)
DECLARE_OPERATOR(IcebergTableSinkLocalState)
DECLARE_OPERATOR(AnalyticSinkLocalState)
DECLARE_OPERATOR(SortSinkLocalState)
DECLARE_OPERATOR(SpillSortSinkLocalState)
DECLARE_OPERATOR(LocalExchangeSinkLocalState)
DECLARE_OPERATOR(AggSinkLocalState)
DECLARE_OPERATOR(PartitionedAggSinkLocalState)
DECLARE_OPERATOR(ExchangeSinkLocalState)
DECLARE_OPERATOR(NestedLoopJoinBuildSinkLocalState)
DECLARE_OPERATOR(UnionSinkLocalState)
DECLARE_OPERATOR(MultiCastDataStreamSinkLocalState)
DECLARE_OPERATOR(PartitionSortSinkLocalState)
DECLARE_OPERATOR(SetProbeSinkLocalState<true>)
DECLARE_OPERATOR(SetProbeSinkLocalState<false>)
DECLARE_OPERATOR(SetSinkLocalState<true>)
DECLARE_OPERATOR(SetSinkLocalState<false>)
DECLARE_OPERATOR(PartitionedHashJoinSinkLocalState)
DECLARE_OPERATOR(GroupCommitBlockSinkLocalState)
DECLARE_OPERATOR(CacheSinkLocalState)
DECLARE_OPERATOR(DictSinkLocalState)
DECLARE_OPERATOR(MaterializationSinkLocalState)
#undef DECLARE_OPERATOR
#define DECLARE_OPERATOR(LOCAL_STATE) template class OperatorX<LOCAL_STATE>;
DECLARE_OPERATOR(HashJoinProbeLocalState)
DECLARE_OPERATOR(OlapScanLocalState)
DECLARE_OPERATOR(GroupCommitLocalState)
DECLARE_OPERATOR(JDBCScanLocalState)
DECLARE_OPERATOR(FileScanLocalState)
DECLARE_OPERATOR(EsScanLocalState)
DECLARE_OPERATOR(AnalyticLocalState)
DECLARE_OPERATOR(SortLocalState)
DECLARE_OPERATOR(SpillSortLocalState)
DECLARE_OPERATOR(LocalMergeSortLocalState)
DECLARE_OPERATOR(AggLocalState)
DECLARE_OPERATOR(PartitionedAggLocalState)
DECLARE_OPERATOR(TableFunctionLocalState)
DECLARE_OPERATOR(ExchangeLocalState)
DECLARE_OPERATOR(RepeatLocalState)
DECLARE_OPERATOR(NestedLoopJoinProbeLocalState)
DECLARE_OPERATOR(AssertNumRowsLocalState)
DECLARE_OPERATOR(EmptySetLocalState)
DECLARE_OPERATOR(UnionSourceLocalState)
DECLARE_OPERATOR(MultiCastDataStreamSourceLocalState)
DECLARE_OPERATOR(PartitionSortSourceLocalState)
DECLARE_OPERATOR(SetSourceLocalState<true>)
DECLARE_OPERATOR(SetSourceLocalState<false>)
DECLARE_OPERATOR(DataGenLocalState)
DECLARE_OPERATOR(SchemaScanLocalState)
DECLARE_OPERATOR(MetaScanLocalState)
DECLARE_OPERATOR(LocalExchangeSourceLocalState)
DECLARE_OPERATOR(PartitionedHashJoinProbeLocalState)
DECLARE_OPERATOR(CacheSourceLocalState)
DECLARE_OPERATOR(MaterializationSourceLocalState)
#ifdef BE_TEST
DECLARE_OPERATOR(MockLocalState)
DECLARE_OPERATOR(MockScanLocalState)
#endif
#undef DECLARE_OPERATOR
template class StreamingOperatorX<AssertNumRowsLocalState>;
template class StreamingOperatorX<SelectLocalState>;
template class StatefulOperatorX<HashJoinProbeLocalState>;
template class StatefulOperatorX<PartitionedHashJoinProbeLocalState>;
template class StatefulOperatorX<RepeatLocalState>;
template class StatefulOperatorX<StreamingAggLocalState>;
template class StatefulOperatorX<DistinctStreamingAggLocalState>;
template class StatefulOperatorX<NestedLoopJoinProbeLocalState>;
template class StatefulOperatorX<TableFunctionLocalState>;
template class PipelineXSinkLocalState<HashJoinSharedState>;
template class PipelineXSinkLocalState<PartitionedHashJoinSharedState>;
template class PipelineXSinkLocalState<SortSharedState>;
template class PipelineXSinkLocalState<SpillSortSharedState>;
template class PipelineXSinkLocalState<NestedLoopJoinSharedState>;
template class PipelineXSinkLocalState<AnalyticSharedState>;
template class PipelineXSinkLocalState<AggSharedState>;
template class PipelineXSinkLocalState<PartitionedAggSharedState>;
template class PipelineXSinkLocalState<FakeSharedState>;
template class PipelineXSinkLocalState<UnionSharedState>;
template class PipelineXSinkLocalState<PartitionSortNodeSharedState>;
template class PipelineXSinkLocalState<MultiCastSharedState>;
template class PipelineXSinkLocalState<SetSharedState>;
template class PipelineXSinkLocalState<LocalExchangeSharedState>;
template class PipelineXSinkLocalState<BasicSharedState>;
template class PipelineXSinkLocalState<DataQueueSharedState>;
template class PipelineXLocalState<HashJoinSharedState>;
template class PipelineXLocalState<PartitionedHashJoinSharedState>;
template class PipelineXLocalState<SortSharedState>;
template class PipelineXLocalState<SpillSortSharedState>;
template class PipelineXLocalState<NestedLoopJoinSharedState>;
template class PipelineXLocalState<AnalyticSharedState>;
template class PipelineXLocalState<AggSharedState>;
template class PipelineXLocalState<PartitionedAggSharedState>;
template class PipelineXLocalState<FakeSharedState>;
template class PipelineXLocalState<UnionSharedState>;
template class PipelineXLocalState<DataQueueSharedState>;
template class PipelineXLocalState<MultiCastSharedState>;
template class PipelineXLocalState<PartitionSortNodeSharedState>;
template class PipelineXLocalState<SetSharedState>;
template class PipelineXLocalState<LocalExchangeSharedState>;
template class PipelineXLocalState<BasicSharedState>;
template class AsyncWriterSink<doris::vectorized::VFileResultWriter, ResultFileSinkOperatorX>;
template class AsyncWriterSink<doris::vectorized::VJdbcTableWriter, JdbcTableSinkOperatorX>;
template class AsyncWriterSink<doris::vectorized::VTabletWriter, OlapTableSinkOperatorX>;
template class AsyncWriterSink<doris::vectorized::VTabletWriterV2, OlapTableSinkV2OperatorX>;
template class AsyncWriterSink<doris::vectorized::VHiveTableWriter, HiveTableSinkOperatorX>;
template class AsyncWriterSink<doris::vectorized::VIcebergTableWriter, IcebergTableSinkOperatorX>;
#ifdef BE_TEST
template class OperatorX<DummyOperatorLocalState>;
template class DataSinkOperatorX<DummySinkLocalState>;
#endif
#include "common/compile_check_end.h"
} // namespace doris::pipeline