| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "operator.h" |
| |
| #include <glog/logging.h> |
| |
| #include <memory> |
| #include <string> |
| |
| #include "common/logging.h" |
| #include "common/status.h" |
| #include "exec/exec_node.h" |
| #include "pipeline/exec/aggregation_sink_operator.h" |
| #include "pipeline/exec/aggregation_source_operator.h" |
| #include "pipeline/exec/analytic_sink_operator.h" |
| #include "pipeline/exec/analytic_source_operator.h" |
| #include "pipeline/exec/assert_num_rows_operator.h" |
| #include "pipeline/exec/datagen_operator.h" |
| #include "pipeline/exec/distinct_streaming_aggregation_operator.h" |
| #include "pipeline/exec/empty_set_operator.h" |
| #include "pipeline/exec/es_scan_operator.h" |
| #include "pipeline/exec/exchange_sink_operator.h" |
| #include "pipeline/exec/exchange_source_operator.h" |
| #include "pipeline/exec/file_scan_operator.h" |
| #include "pipeline/exec/group_commit_block_sink_operator.h" |
| #include "pipeline/exec/hashjoin_build_sink.h" |
| #include "pipeline/exec/hashjoin_probe_operator.h" |
| #include "pipeline/exec/hive_table_sink_operator.h" |
| #include "pipeline/exec/jdbc_scan_operator.h" |
| #include "pipeline/exec/jdbc_table_sink_operator.h" |
| #include "pipeline/exec/meta_scan_operator.h" |
| #include "pipeline/exec/multi_cast_data_stream_sink.h" |
| #include "pipeline/exec/multi_cast_data_stream_source.h" |
| #include "pipeline/exec/nested_loop_join_build_operator.h" |
| #include "pipeline/exec/nested_loop_join_probe_operator.h" |
| #include "pipeline/exec/olap_scan_operator.h" |
| #include "pipeline/exec/olap_table_sink_operator.h" |
| #include "pipeline/exec/olap_table_sink_v2_operator.h" |
| #include "pipeline/exec/partition_sort_sink_operator.h" |
| #include "pipeline/exec/partition_sort_source_operator.h" |
| #include "pipeline/exec/partitioned_aggregation_sink_operator.h" |
| #include "pipeline/exec/partitioned_aggregation_source_operator.h" |
| #include "pipeline/exec/partitioned_hash_join_probe_operator.h" |
| #include "pipeline/exec/partitioned_hash_join_sink_operator.h" |
| #include "pipeline/exec/repeat_operator.h" |
| #include "pipeline/exec/result_file_sink_operator.h" |
| #include "pipeline/exec/result_sink_operator.h" |
| #include "pipeline/exec/schema_scan_operator.h" |
| #include "pipeline/exec/select_operator.h" |
| #include "pipeline/exec/set_probe_sink_operator.h" |
| #include "pipeline/exec/set_sink_operator.h" |
| #include "pipeline/exec/set_source_operator.h" |
| #include "pipeline/exec/sort_sink_operator.h" |
| #include "pipeline/exec/sort_source_operator.h" |
| #include "pipeline/exec/spill_sort_sink_operator.h" |
| #include "pipeline/exec/spill_sort_source_operator.h" |
| #include "pipeline/exec/streaming_aggregation_operator.h" |
| #include "pipeline/exec/streaming_aggregation_sink_operator.h" |
| #include "pipeline/exec/streaming_aggregation_source_operator.h" |
| #include "pipeline/exec/table_function_operator.h" |
| #include "pipeline/exec/union_sink_operator.h" |
| #include "pipeline/exec/union_source_operator.h" |
| #include "pipeline/pipeline_x/dependency.h" |
| #include "pipeline/pipeline_x/local_exchange/local_exchange_sink_operator.h" |
| #include "pipeline/pipeline_x/local_exchange/local_exchange_source_operator.h" |
| #include "util/debug_util.h" |
| #include "util/runtime_profile.h" |
| |
| namespace doris::pipeline { |
| |
| template <typename SharedStateArg> |
| std::string PipelineXLocalState<SharedStateArg>::debug_string(int indentation_level) const { |
| fmt::memory_buffer debug_string_buffer; |
| fmt::format_to(debug_string_buffer, "{}", _parent->debug_string(indentation_level)); |
| return fmt::to_string(debug_string_buffer); |
| } |
| |
| template <typename SharedStateArg> |
| std::string PipelineXSinkLocalState<SharedStateArg>::debug_string(int indentation_level) const { |
| fmt::memory_buffer debug_string_buffer; |
| fmt::format_to(debug_string_buffer, "{}", _parent->debug_string(indentation_level)); |
| return fmt::to_string(debug_string_buffer); |
| } |
| |
| std::string OperatorXBase::debug_string(int indentation_level) const { |
| fmt::memory_buffer debug_string_buffer; |
| fmt::format_to(debug_string_buffer, "{}{}: id={}, parallel_tasks={}", |
| std::string(indentation_level * 2, ' '), _op_name, node_id(), _parallel_tasks); |
| return fmt::to_string(debug_string_buffer); |
| } |
| |
| std::string OperatorXBase::debug_string(RuntimeState* state, int indentation_level) const { |
| return state->get_local_state(operator_id())->debug_string(indentation_level); |
| } |
| |
| Status OperatorXBase::init(const TPlanNode& tnode, RuntimeState* /*state*/) { |
| std::string node_name = print_plan_node_type(tnode.node_type); |
| auto substr = node_name.substr(0, node_name.find("_NODE")); |
| _op_name = substr + "_OPERATOR"; |
| |
| if (tnode.__isset.vconjunct) { |
| vectorized::VExprContextSPtr context; |
| RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(tnode.vconjunct, context)); |
| _conjuncts.emplace_back(context); |
| } else if (tnode.__isset.conjuncts) { |
| for (auto& conjunct : tnode.conjuncts) { |
| vectorized::VExprContextSPtr context; |
| RETURN_IF_ERROR(vectorized::VExpr::create_expr_tree(conjunct, context)); |
| _conjuncts.emplace_back(context); |
| } |
| } |
| |
| // create the projections expr |
| |
| if (tnode.__isset.projections) { |
| DCHECK(tnode.__isset.output_tuple_id); |
| RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(tnode.projections, _projections)); |
| } |
| if (!tnode.intermediate_projections_list.empty()) { |
| DCHECK(tnode.__isset.projections) << "no final projections"; |
| _intermediate_projections.reserve(tnode.intermediate_projections_list.size()); |
| for (const auto& tnode_projections : tnode.intermediate_projections_list) { |
| vectorized::VExprContextSPtrs projections; |
| RETURN_IF_ERROR(vectorized::VExpr::create_expr_trees(tnode_projections, projections)); |
| _intermediate_projections.push_back(projections); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status OperatorXBase::prepare(RuntimeState* state) { |
| for (auto& conjunct : _conjuncts) { |
| RETURN_IF_ERROR(conjunct->prepare(state, intermediate_row_desc())); |
| } |
| for (int i = 0; i < _intermediate_projections.size(); i++) { |
| RETURN_IF_ERROR(vectorized::VExpr::prepare(_intermediate_projections[i], state, |
| intermediate_row_desc(i))); |
| } |
| RETURN_IF_ERROR(vectorized::VExpr::prepare(_projections, state, projections_row_desc())); |
| |
| if (has_output_row_desc()) { |
| RETURN_IF_ERROR( |
| vectorized::VExpr::check_expr_output_type(_projections, *_output_row_descriptor)); |
| } |
| |
| if (_child_x && !is_source()) { |
| RETURN_IF_ERROR(_child_x->prepare(state)); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status OperatorXBase::open(RuntimeState* state) { |
| for (auto& conjunct : _conjuncts) { |
| RETURN_IF_ERROR(conjunct->open(state)); |
| } |
| RETURN_IF_ERROR(vectorized::VExpr::open(_projections, state)); |
| for (auto& projections : _intermediate_projections) { |
| RETURN_IF_ERROR(vectorized::VExpr::open(projections, state)); |
| } |
| if (_child_x && !is_source()) { |
| RETURN_IF_ERROR(_child_x->open(state)); |
| } |
| return Status::OK(); |
| } |
| |
| Status OperatorXBase::close(RuntimeState* state) { |
| if (_child_x && !is_source()) { |
| RETURN_IF_ERROR(_child_x->close(state)); |
| } |
| auto result = state->get_local_state_result(operator_id()); |
| if (!result) { |
| return result.error(); |
| } |
| return result.value()->close(state); |
| } |
| |
| void PipelineXLocalStateBase::clear_origin_block() { |
| _origin_block.clear_column_data(_parent->intermediate_row_desc().num_materialized_slots()); |
| } |
| |
| Status OperatorXBase::do_projections(RuntimeState* state, vectorized::Block* origin_block, |
| vectorized::Block* output_block) const { |
| auto* local_state = state->get_local_state(operator_id()); |
| SCOPED_TIMER(local_state->exec_time_counter()); |
| SCOPED_TIMER(local_state->_projection_timer); |
| const size_t rows = origin_block->rows(); |
| if (rows == 0) { |
| return Status::OK(); |
| } |
| vectorized::Block input_block = *origin_block; |
| |
| std::vector<int> result_column_ids; |
| for (const auto& projections : _intermediate_projections) { |
| result_column_ids.resize(projections.size()); |
| for (int i = 0; i < projections.size(); i++) { |
| RETURN_IF_ERROR(projections[i]->execute(&input_block, &result_column_ids[i])); |
| } |
| input_block.shuffle_columns(result_column_ids); |
| } |
| |
| DCHECK_EQ(rows, input_block.rows()); |
| auto insert_column_datas = [&](auto& to, vectorized::ColumnPtr& from, size_t rows) { |
| if (to->is_nullable() && !from->is_nullable()) { |
| if (_keep_origin || !from->is_exclusive()) { |
| auto& null_column = reinterpret_cast<vectorized::ColumnNullable&>(*to); |
| null_column.get_nested_column().insert_range_from(*from, 0, rows); |
| null_column.get_null_map_column().get_data().resize_fill(rows, 0); |
| } else { |
| to = make_nullable(from, false)->assume_mutable(); |
| } |
| } else { |
| if (_keep_origin || !from->is_exclusive()) { |
| to->insert_range_from(*from, 0, rows); |
| } else { |
| to = from->assume_mutable(); |
| } |
| } |
| }; |
| |
| using namespace vectorized; |
| vectorized::MutableBlock mutable_block = |
| vectorized::VectorizedUtils::build_mutable_mem_reuse_block(output_block, |
| *_output_row_descriptor); |
| if (rows != 0) { |
| auto& mutable_columns = mutable_block.mutable_columns(); |
| DCHECK(mutable_columns.size() == local_state->_projections.size()); |
| for (int i = 0; i < mutable_columns.size(); ++i) { |
| auto result_column_id = -1; |
| RETURN_IF_ERROR(local_state->_projections[i]->execute(&input_block, &result_column_id)); |
| auto column_ptr = input_block.get_by_position(result_column_id) |
| .column->convert_to_full_column_if_const(); |
| insert_column_datas(mutable_columns[i], column_ptr, rows); |
| } |
| DCHECK(mutable_block.rows() == rows); |
| output_block->set_columns(std::move(mutable_columns)); |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status OperatorXBase::get_block_after_projects(RuntimeState* state, vectorized::Block* block, |
| bool* eos) { |
| auto local_state = state->get_local_state(operator_id()); |
| if (_output_row_descriptor) { |
| local_state->clear_origin_block(); |
| auto status = get_block(state, &local_state->_origin_block, eos); |
| if (UNLIKELY(!status.ok())) return status; |
| return do_projections(state, &local_state->_origin_block, block); |
| } |
| local_state->_peak_memory_usage_counter->set(local_state->_mem_tracker->peak_consumption()); |
| return get_block(state, block, eos); |
| } |
| |
| bool PipelineXLocalStateBase::reached_limit() const { |
| return _parent->_limit != -1 && _num_rows_returned >= _parent->_limit; |
| } |
| |
| void PipelineXLocalStateBase::reached_limit(vectorized::Block* block, bool* eos) { |
| if (_parent->_limit != -1 and _num_rows_returned + block->rows() >= _parent->_limit) { |
| block->set_num_rows(_parent->_limit - _num_rows_returned); |
| *eos = true; |
| } |
| |
| if (auto rows = block->rows()) { |
| _num_rows_returned += rows; |
| COUNTER_UPDATE(_blocks_returned_counter, 1); |
| COUNTER_SET(_rows_returned_counter, _num_rows_returned); |
| } |
| } |
| |
| std::string DataSinkOperatorXBase::debug_string(int indentation_level) const { |
| fmt::memory_buffer debug_string_buffer; |
| |
| fmt::format_to(debug_string_buffer, "{}{}: id={}", std::string(indentation_level * 2, ' '), |
| _name, node_id()); |
| return fmt::to_string(debug_string_buffer); |
| } |
| |
| std::string DataSinkOperatorXBase::debug_string(RuntimeState* state, int indentation_level) const { |
| return state->get_sink_local_state()->debug_string(indentation_level); |
| } |
| |
| Status DataSinkOperatorXBase::init(const TDataSink& tsink) { |
| std::string op_name = "UNKNOWN_SINK"; |
| std::map<int, const char*>::const_iterator it = _TDataSinkType_VALUES_TO_NAMES.find(tsink.type); |
| |
| if (it != _TDataSinkType_VALUES_TO_NAMES.end()) { |
| op_name = it->second; |
| } |
| _name = op_name + "_OPERATOR"; |
| return Status::OK(); |
| } |
| |
| Status DataSinkOperatorXBase::init(const TPlanNode& tnode, RuntimeState* state) { |
| std::string op_name = print_plan_node_type(tnode.node_type); |
| |
| auto substr = op_name.substr(0, op_name.find("_NODE")); |
| |
| _name = substr + "_SINK_OPERATOR"; |
| return Status::OK(); |
| } |
| |
| template <typename LocalStateType> |
| Status DataSinkOperatorX<LocalStateType>::setup_local_state(RuntimeState* state, |
| LocalSinkStateInfo& info) { |
| auto local_state = LocalStateType::create_unique(this, state); |
| RETURN_IF_ERROR(local_state->init(state, info)); |
| state->emplace_sink_local_state(operator_id(), std::move(local_state)); |
| return Status::OK(); |
| } |
| |
| template <typename LocalStateType> |
| std::shared_ptr<BasicSharedState> DataSinkOperatorX<LocalStateType>::create_shared_state() const { |
| if constexpr (std::is_same_v<typename LocalStateType::SharedStateType, |
| LocalExchangeSharedState>) { |
| return nullptr; |
| } else if constexpr (std::is_same_v<typename LocalStateType::SharedStateType, |
| MultiCastSharedState>) { |
| LOG(FATAL) << "should not reach here!"; |
| return nullptr; |
| } else { |
| std::shared_ptr<BasicSharedState> ss = nullptr; |
| ss = LocalStateType::SharedStateType::create_shared(); |
| ss->id = operator_id(); |
| for (auto& dest : dests_id()) { |
| ss->related_op_ids.insert(dest); |
| } |
| return ss; |
| } |
| } |
| |
| template <typename LocalStateType> |
| Status OperatorX<LocalStateType>::setup_local_state(RuntimeState* state, LocalStateInfo& info) { |
| auto local_state = LocalStateType::create_unique(state, this); |
| RETURN_IF_ERROR(local_state->init(state, info)); |
| state->emplace_local_state(operator_id(), std::move(local_state)); |
| return Status::OK(); |
| } |
| |
| PipelineXSinkLocalStateBase::PipelineXSinkLocalStateBase(DataSinkOperatorXBase* parent, |
| RuntimeState* state) |
| : _parent(parent), _state(state) { |
| _query_statistics = std::make_shared<QueryStatistics>(); |
| } |
| |
| PipelineXLocalStateBase::PipelineXLocalStateBase(RuntimeState* state, OperatorXBase* parent) |
| : _num_rows_returned(0), |
| _rows_returned_counter(nullptr), |
| _peak_memory_usage_counter(nullptr), |
| _parent(parent), |
| _state(state) { |
| _query_statistics = std::make_shared<QueryStatistics>(); |
| } |
| |
| template <typename SharedStateArg> |
| Status PipelineXLocalState<SharedStateArg>::init(RuntimeState* state, LocalStateInfo& info) { |
| _runtime_profile.reset(new RuntimeProfile(_parent->get_name() + name_suffix())); |
| _runtime_profile->set_metadata(_parent->node_id()); |
| _runtime_profile->set_is_sink(false); |
| info.parent_profile->add_child(_runtime_profile.get(), true, nullptr); |
| constexpr auto is_fake_shared = std::is_same_v<SharedStateArg, FakeSharedState>; |
| if constexpr (!is_fake_shared) { |
| if constexpr (std::is_same_v<LocalExchangeSharedState, SharedStateArg>) { |
| _shared_state = info.le_state_map[_parent->operator_id()].first.get(); |
| |
| _dependency = _shared_state->get_dep_by_channel_id(info.task_idx); |
| _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL( |
| _runtime_profile, "WaitForDependency[" + _dependency->name() + "]Time", 1); |
| } else if (info.shared_state) { |
| // For UnionSourceOperator without children, there is no shared state. |
| _shared_state = info.shared_state->template cast<SharedStateArg>(); |
| |
| _dependency = _shared_state->create_source_dependency( |
| _parent->operator_id(), _parent->node_id(), _parent->get_name(), |
| state->get_query_ctx()); |
| _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL( |
| _runtime_profile, "WaitForDependency[" + _dependency->name() + "]Time", 1); |
| } |
| } |
| |
| _rows_returned_counter = |
| ADD_COUNTER_WITH_LEVEL(_runtime_profile, "RowsProduced", TUnit::UNIT, 1); |
| _blocks_returned_counter = |
| ADD_COUNTER_WITH_LEVEL(_runtime_profile, "BlocksProduced", TUnit::UNIT, 1); |
| _projection_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "ProjectionTime", 1); |
| _init_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "InitTime", 1); |
| _open_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "OpenTime", 1); |
| _close_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "CloseTime", 1); |
| _exec_timer = ADD_TIMER_WITH_LEVEL(_runtime_profile, "ExecTime", 1); |
| _mem_tracker = std::make_unique<MemTracker>("PipelineXLocalState:" + _runtime_profile->name()); |
| _memory_used_counter = ADD_LABEL_COUNTER_WITH_LEVEL(_runtime_profile, "MemoryUsage", 1); |
| _peak_memory_usage_counter = _runtime_profile->AddHighWaterMarkCounter( |
| "PeakMemoryUsage", TUnit::BYTES, "MemoryUsage", 1); |
| return Status::OK(); |
| } |
| |
| template <typename SharedStateArg> |
| Status PipelineXLocalState<SharedStateArg>::open(RuntimeState* state) { |
| _conjuncts.resize(_parent->_conjuncts.size()); |
| _projections.resize(_parent->_projections.size()); |
| for (size_t i = 0; i < _conjuncts.size(); i++) { |
| RETURN_IF_ERROR(_parent->_conjuncts[i]->clone(state, _conjuncts[i])); |
| } |
| for (size_t i = 0; i < _projections.size(); i++) { |
| RETURN_IF_ERROR(_parent->_projections[i]->clone(state, _projections[i])); |
| } |
| _intermediate_projections.resize(_parent->_intermediate_projections.size()); |
| for (int i = 0; i < _parent->_intermediate_projections.size(); i++) { |
| _intermediate_projections[i].resize(_parent->_intermediate_projections[i].size()); |
| for (int j = 0; j < _parent->_intermediate_projections[i].size(); j++) { |
| RETURN_IF_ERROR(_parent->_intermediate_projections[i][j]->clone( |
| state, _intermediate_projections[i][j])); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| template <typename SharedStateArg> |
| Status PipelineXLocalState<SharedStateArg>::close(RuntimeState* state) { |
| if (_closed) { |
| return Status::OK(); |
| } |
| if constexpr (!std::is_same_v<SharedStateArg, FakeSharedState>) { |
| COUNTER_SET(_wait_for_dependency_timer, _dependency->watcher_elapse_time()); |
| } |
| if (_rows_returned_counter != nullptr) { |
| COUNTER_SET(_rows_returned_counter, _num_rows_returned); |
| } |
| if (_peak_memory_usage_counter) { |
| _peak_memory_usage_counter->set(_mem_tracker->peak_consumption()); |
| } |
| _closed = true; |
| return Status::OK(); |
| } |
| |
| template <typename SharedState> |
| Status PipelineXSinkLocalState<SharedState>::init(RuntimeState* state, LocalSinkStateInfo& info) { |
| // create profile |
| _profile = state->obj_pool()->add(new RuntimeProfile(_parent->get_name() + name_suffix())); |
| _profile->set_metadata(_parent->node_id()); |
| _profile->set_is_sink(true); |
| _wait_for_finish_dependency_timer = ADD_TIMER(_profile, "PendingFinishDependency"); |
| constexpr auto is_fake_shared = std::is_same_v<SharedState, FakeSharedState>; |
| if constexpr (!is_fake_shared) { |
| if constexpr (std::is_same_v<LocalExchangeSharedState, SharedState>) { |
| _dependency = info.le_state_map[_parent->dests_id().front()].second.get(); |
| _shared_state = (SharedState*)_dependency->shared_state(); |
| } else { |
| _shared_state = info.shared_state->template cast<SharedState>(); |
| _dependency = _shared_state->create_sink_dependency( |
| _parent->dests_id().front(), _parent->node_id(), _parent->get_name(), |
| state->get_query_ctx()); |
| } |
| _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL( |
| _profile, "WaitForDependency[" + _dependency->name() + "]Time", 1); |
| } else { |
| _dependency = nullptr; |
| } |
| _rows_input_counter = ADD_COUNTER_WITH_LEVEL(_profile, "InputRows", TUnit::UNIT, 1); |
| _init_timer = ADD_TIMER_WITH_LEVEL(_profile, "InitTime", 1); |
| _open_timer = ADD_TIMER_WITH_LEVEL(_profile, "OpenTime", 1); |
| _close_timer = ADD_TIMER_WITH_LEVEL(_profile, "CloseTime", 1); |
| _exec_timer = ADD_TIMER_WITH_LEVEL(_profile, "ExecTime", 1); |
| info.parent_profile->add_child(_profile, true, nullptr); |
| _mem_tracker = std::make_unique<MemTracker>(_parent->get_name()); |
| _memory_used_counter = ADD_LABEL_COUNTER_WITH_LEVEL(_profile, "MemoryUsage", 1); |
| _peak_memory_usage_counter = |
| _profile->AddHighWaterMarkCounter("PeakMemoryUsage", TUnit::BYTES, "MemoryUsage", 1); |
| return Status::OK(); |
| } |
| |
| template <typename SharedState> |
| Status PipelineXSinkLocalState<SharedState>::close(RuntimeState* state, Status exec_status) { |
| if (_closed) { |
| return Status::OK(); |
| } |
| if constexpr (!std::is_same_v<SharedState, FakeSharedState>) { |
| COUNTER_SET(_wait_for_dependency_timer, _dependency->watcher_elapse_time()); |
| } |
| if (_peak_memory_usage_counter) { |
| _peak_memory_usage_counter->set(_mem_tracker->peak_consumption()); |
| } |
| _closed = true; |
| return Status::OK(); |
| } |
| |
| template <typename LocalStateType> |
| Status StreamingOperatorX<LocalStateType>::get_block(RuntimeState* state, vectorized::Block* block, |
| bool* eos) { |
| RETURN_IF_ERROR( |
| OperatorX<LocalStateType>::_child_x->get_block_after_projects(state, block, eos)); |
| return pull(state, block, eos); |
| } |
| |
| template <typename LocalStateType> |
| Status StatefulOperatorX<LocalStateType>::get_block(RuntimeState* state, vectorized::Block* block, |
| bool* eos) { |
| auto& local_state = get_local_state(state); |
| if (need_more_input_data(state)) { |
| local_state._child_block->clear_column_data( |
| OperatorX<LocalStateType>::_child_x->row_desc().num_materialized_slots()); |
| RETURN_IF_ERROR(OperatorX<LocalStateType>::_child_x->get_block_after_projects( |
| state, local_state._child_block.get(), &local_state._child_eos)); |
| *eos = local_state._child_eos; |
| if (local_state._child_block->rows() == 0 && !local_state._child_eos) { |
| return Status::OK(); |
| } |
| { |
| SCOPED_TIMER(local_state.exec_time_counter()); |
| RETURN_IF_ERROR(push(state, local_state._child_block.get(), local_state._child_eos)); |
| } |
| } |
| |
| if (!need_more_input_data(state)) { |
| SCOPED_TIMER(local_state.exec_time_counter()); |
| bool new_eos = false; |
| RETURN_IF_ERROR(pull(state, block, &new_eos)); |
| if (new_eos) { |
| *eos = true; |
| } else if (!need_more_input_data(state)) { |
| *eos = false; |
| } |
| } |
| return Status::OK(); |
| } |
| |
| template <typename Writer, typename Parent> |
| requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>) |
| Status AsyncWriterSink<Writer, Parent>::init(RuntimeState* state, LocalSinkStateInfo& info) { |
| RETURN_IF_ERROR(Base::init(state, info)); |
| _writer.reset(new Writer(info.tsink, _output_vexpr_ctxs)); |
| _async_writer_dependency = AsyncWriterDependency::create_shared( |
| _parent->operator_id(), _parent->node_id(), state->get_query_ctx()); |
| _writer->set_dependency(_async_writer_dependency.get(), _finish_dependency.get()); |
| |
| _wait_for_dependency_timer = ADD_TIMER_WITH_LEVEL( |
| _profile, "WaitForDependency[" + _async_writer_dependency->name() + "]Time", 1); |
| return Status::OK(); |
| } |
| |
| template <typename Writer, typename Parent> |
| requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>) |
| Status AsyncWriterSink<Writer, Parent>::open(RuntimeState* state) { |
| RETURN_IF_ERROR(Base::open(state)); |
| _output_vexpr_ctxs.resize(_parent->cast<Parent>()._output_vexpr_ctxs.size()); |
| for (size_t i = 0; i < _output_vexpr_ctxs.size(); i++) { |
| RETURN_IF_ERROR( |
| _parent->cast<Parent>()._output_vexpr_ctxs[i]->clone(state, _output_vexpr_ctxs[i])); |
| } |
| RETURN_IF_ERROR(_writer->start_writer(state, _profile)); |
| return Status::OK(); |
| } |
| |
| template <typename Writer, typename Parent> |
| requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>) |
| Status AsyncWriterSink<Writer, Parent>::sink(RuntimeState* state, vectorized::Block* block, |
| bool eos) { |
| return _writer->sink(block, eos); |
| } |
| |
| template <typename Writer, typename Parent> |
| requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>) |
| Status AsyncWriterSink<Writer, Parent>::close(RuntimeState* state, Status exec_status) { |
| if (_closed) { |
| return Status::OK(); |
| } |
| COUNTER_SET(_wait_for_dependency_timer, _async_writer_dependency->watcher_elapse_time()); |
| COUNTER_SET(_wait_for_finish_dependency_timer, _finish_dependency->watcher_elapse_time()); |
| // if the init failed, the _writer may be nullptr. so here need check |
| if (_writer) { |
| Status st = _writer->get_writer_status(); |
| if (exec_status.ok()) { |
| _writer->force_close(state->is_cancelled() ? Status::Cancelled("Cancelled") |
| : Status::Cancelled("force close")); |
| } else { |
| _writer->force_close(exec_status); |
| } |
| // If there is an error in process_block thread, then we should get the writer |
| // status before call force_close. For example, the thread may failed in commit |
| // transaction. |
| RETURN_IF_ERROR(st); |
| } |
| return Base::close(state, exec_status); |
| } |
| |
| #define DECLARE_OPERATOR_X(LOCAL_STATE) template class DataSinkOperatorX<LOCAL_STATE>; |
| DECLARE_OPERATOR_X(HashJoinBuildSinkLocalState) |
| DECLARE_OPERATOR_X(ResultSinkLocalState) |
| DECLARE_OPERATOR_X(JdbcTableSinkLocalState) |
| DECLARE_OPERATOR_X(ResultFileSinkLocalState) |
| DECLARE_OPERATOR_X(OlapTableSinkLocalState) |
| DECLARE_OPERATOR_X(OlapTableSinkV2LocalState) |
| DECLARE_OPERATOR_X(HiveTableSinkLocalState) |
| DECLARE_OPERATOR_X(AnalyticSinkLocalState) |
| DECLARE_OPERATOR_X(SortSinkLocalState) |
| DECLARE_OPERATOR_X(SpillSortSinkLocalState) |
| DECLARE_OPERATOR_X(LocalExchangeSinkLocalState) |
| DECLARE_OPERATOR_X(AggSinkLocalState) |
| DECLARE_OPERATOR_X(PartitionedAggSinkLocalState) |
| DECLARE_OPERATOR_X(ExchangeSinkLocalState) |
| DECLARE_OPERATOR_X(NestedLoopJoinBuildSinkLocalState) |
| DECLARE_OPERATOR_X(UnionSinkLocalState) |
| DECLARE_OPERATOR_X(MultiCastDataStreamSinkLocalState) |
| DECLARE_OPERATOR_X(PartitionSortSinkLocalState) |
| DECLARE_OPERATOR_X(SetProbeSinkLocalState<true>) |
| DECLARE_OPERATOR_X(SetProbeSinkLocalState<false>) |
| DECLARE_OPERATOR_X(SetSinkLocalState<true>) |
| DECLARE_OPERATOR_X(SetSinkLocalState<false>) |
| DECLARE_OPERATOR_X(PartitionedHashJoinSinkLocalState) |
| DECLARE_OPERATOR_X(GroupCommitBlockSinkLocalState) |
| |
| #undef DECLARE_OPERATOR_X |
| |
| #define DECLARE_OPERATOR_X(LOCAL_STATE) template class OperatorX<LOCAL_STATE>; |
| DECLARE_OPERATOR_X(HashJoinProbeLocalState) |
| DECLARE_OPERATOR_X(OlapScanLocalState) |
| DECLARE_OPERATOR_X(JDBCScanLocalState) |
| DECLARE_OPERATOR_X(FileScanLocalState) |
| DECLARE_OPERATOR_X(EsScanLocalState) |
| DECLARE_OPERATOR_X(AnalyticLocalState) |
| DECLARE_OPERATOR_X(SortLocalState) |
| DECLARE_OPERATOR_X(SpillSortLocalState) |
| DECLARE_OPERATOR_X(AggLocalState) |
| DECLARE_OPERATOR_X(PartitionedAggLocalState) |
| DECLARE_OPERATOR_X(TableFunctionLocalState) |
| DECLARE_OPERATOR_X(ExchangeLocalState) |
| DECLARE_OPERATOR_X(RepeatLocalState) |
| DECLARE_OPERATOR_X(NestedLoopJoinProbeLocalState) |
| DECLARE_OPERATOR_X(AssertNumRowsLocalState) |
| DECLARE_OPERATOR_X(EmptySetLocalState) |
| DECLARE_OPERATOR_X(UnionSourceLocalState) |
| DECLARE_OPERATOR_X(MultiCastDataStreamSourceLocalState) |
| DECLARE_OPERATOR_X(PartitionSortSourceLocalState) |
| DECLARE_OPERATOR_X(SetSourceLocalState<true>) |
| DECLARE_OPERATOR_X(SetSourceLocalState<false>) |
| DECLARE_OPERATOR_X(DataGenLocalState) |
| DECLARE_OPERATOR_X(SchemaScanLocalState) |
| DECLARE_OPERATOR_X(MetaScanLocalState) |
| DECLARE_OPERATOR_X(LocalExchangeSourceLocalState) |
| DECLARE_OPERATOR_X(PartitionedHashJoinProbeLocalState) |
| |
| #undef DECLARE_OPERATOR_X |
| |
| template class StreamingOperatorX<AssertNumRowsLocalState>; |
| template class StreamingOperatorX<SelectLocalState>; |
| |
| template class StatefulOperatorX<HashJoinProbeLocalState>; |
| template class StatefulOperatorX<PartitionedHashJoinProbeLocalState>; |
| template class StatefulOperatorX<RepeatLocalState>; |
| template class StatefulOperatorX<StreamingAggLocalState>; |
| template class StatefulOperatorX<DistinctStreamingAggLocalState>; |
| template class StatefulOperatorX<NestedLoopJoinProbeLocalState>; |
| template class StatefulOperatorX<TableFunctionLocalState>; |
| |
| template class PipelineXSinkLocalState<HashJoinSharedState>; |
| template class PipelineXSinkLocalState<PartitionedHashJoinSharedState>; |
| template class PipelineXSinkLocalState<SortSharedState>; |
| template class PipelineXSinkLocalState<SpillSortSharedState>; |
| template class PipelineXSinkLocalState<NestedLoopJoinSharedState>; |
| template class PipelineXSinkLocalState<AnalyticSharedState>; |
| template class PipelineXSinkLocalState<AggSharedState>; |
| template class PipelineXSinkLocalState<PartitionedAggSharedState>; |
| template class PipelineXSinkLocalState<FakeSharedState>; |
| template class PipelineXSinkLocalState<UnionSharedState>; |
| template class PipelineXSinkLocalState<PartitionSortNodeSharedState>; |
| template class PipelineXSinkLocalState<MultiCastSharedState>; |
| template class PipelineXSinkLocalState<SetSharedState>; |
| template class PipelineXSinkLocalState<LocalExchangeSharedState>; |
| template class PipelineXSinkLocalState<BasicSharedState>; |
| |
| template class PipelineXLocalState<HashJoinSharedState>; |
| template class PipelineXLocalState<PartitionedHashJoinSharedState>; |
| template class PipelineXLocalState<SortSharedState>; |
| template class PipelineXLocalState<SpillSortSharedState>; |
| template class PipelineXLocalState<NestedLoopJoinSharedState>; |
| template class PipelineXLocalState<AnalyticSharedState>; |
| template class PipelineXLocalState<AggSharedState>; |
| template class PipelineXLocalState<PartitionedAggSharedState>; |
| template class PipelineXLocalState<FakeSharedState>; |
| template class PipelineXLocalState<UnionSharedState>; |
| template class PipelineXLocalState<MultiCastSharedState>; |
| template class PipelineXLocalState<PartitionSortNodeSharedState>; |
| template class PipelineXLocalState<SetSharedState>; |
| template class PipelineXLocalState<LocalExchangeSharedState>; |
| template class PipelineXLocalState<BasicSharedState>; |
| |
| template class AsyncWriterSink<doris::vectorized::VFileResultWriter, ResultFileSinkOperatorX>; |
| template class AsyncWriterSink<doris::vectorized::VJdbcTableWriter, JdbcTableSinkOperatorX>; |
| template class AsyncWriterSink<doris::vectorized::VTabletWriter, OlapTableSinkOperatorX>; |
| template class AsyncWriterSink<doris::vectorized::VTabletWriterV2, OlapTableSinkV2OperatorX>; |
| template class AsyncWriterSink<doris::vectorized::VHiveTableWriter, HiveTableSinkOperatorX>; |
| |
| } // namespace doris::pipeline |