| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #pragma once |
| |
| #include <fmt/format.h> |
| #include <glog/logging.h> |
| |
| #include <atomic> |
| #include <cstdint> |
| #include <functional> |
| #include <memory> |
| #include <string> |
| #include <utility> |
| #include <vector> |
| |
| #include "common/be_mock_util.h" |
| #include "common/logging.h" |
| #include "common/status.h" |
| #include "pipeline/dependency.h" |
| #include "pipeline/exec/operator.h" |
| #include "pipeline/exec/spill_utils.h" |
| #include "pipeline/local_exchange/local_exchanger.h" |
| #include "runtime/memory/mem_tracker.h" |
| #include "runtime/query_context.h" |
| #include "runtime/runtime_state.h" |
| #include "runtime/thread_context.h" |
| #include "util/runtime_profile.h" |
| #include "vec/core/block.h" |
| #include "vec/runtime/vdata_stream_recvr.h" |
| |
| namespace doris { |
| #include "common/compile_check_begin.h" |
| class RowDescriptor; |
| class RuntimeState; |
| class TDataSink; |
| namespace vectorized { |
| class AsyncResultWriter; |
| class AnnTopNRuntime; |
| } // namespace vectorized |
| } // namespace doris |
| |
| namespace doris::pipeline { |
| |
| class OperatorBase; |
| class OperatorXBase; |
| class DataSinkOperatorXBase; |
| |
| using OperatorPtr = std::shared_ptr<OperatorXBase>; |
| using Operators = std::vector<OperatorPtr>; |
| |
| using DataSinkOperatorPtr = std::shared_ptr<DataSinkOperatorXBase>; |
| |
| // This struct is used only for initializing local state. |
| struct LocalStateInfo { |
| RuntimeProfile* parent_profile = nullptr; |
| const std::vector<TScanRangeParams>& scan_ranges; |
| BasicSharedState* shared_state; |
| const std::map<int, std::pair<std::shared_ptr<BasicSharedState>, |
| std::vector<std::shared_ptr<Dependency>>>>& shared_state_map; |
| const int task_idx; |
| }; |
| |
| // This struct is used only for initializing local sink state. |
| struct LocalSinkStateInfo { |
| const int task_idx = 0; |
| RuntimeProfile* parent_profile = nullptr; |
| const int sender_id = 0; |
| BasicSharedState* shared_state; |
| const std::map<int, std::pair<std::shared_ptr<BasicSharedState>, |
| std::vector<std::shared_ptr<Dependency>>>>& shared_state_map; |
| const TDataSink& tsink; |
| }; |
| |
| class OperatorBase { |
| public: |
| explicit OperatorBase() : _child(nullptr), _is_closed(false) {} |
| explicit OperatorBase(bool is_serial_operator) |
| : _child(nullptr), _is_closed(false), _is_serial_operator(is_serial_operator) {} |
| virtual ~OperatorBase() = default; |
| |
| virtual bool is_sink() const { return false; } |
| |
| virtual bool is_source() const { return false; } |
| |
| [[nodiscard]] virtual const RowDescriptor& row_desc() const; |
| |
| [[nodiscard]] virtual Status init(const TDataSink& tsink) { return Status::OK(); } |
| |
| [[nodiscard]] virtual std::string get_name() const = 0; |
| [[nodiscard]] virtual Status prepare(RuntimeState* state) = 0; |
| [[nodiscard]] virtual Status terminate(RuntimeState* state) = 0; |
| [[nodiscard]] virtual Status close(RuntimeState* state); |
| [[nodiscard]] virtual int node_id() const = 0; |
| |
| [[nodiscard]] virtual Status set_child(OperatorPtr child) { |
| if (_child && child != nullptr) { |
| return Status::InternalError("Child is already set in node name={}", get_name()); |
| } |
| _child = child; |
| return Status::OK(); |
| } |
| |
| // Operators need to be executed serially. (e.g. finalized agg without key) |
| [[nodiscard]] virtual bool is_serial_operator() const { return _is_serial_operator; } |
| |
| [[nodiscard]] bool is_closed() const { return _is_closed; } |
| |
| virtual size_t revocable_mem_size(RuntimeState* state) const { return 0; } |
| |
| virtual Status revoke_memory(RuntimeState* state, |
| const std::shared_ptr<SpillContext>& spill_context) { |
| return Status::OK(); |
| } |
| |
| virtual void set_low_memory_mode(RuntimeState* state) {} |
| |
| [[nodiscard]] virtual bool require_data_distribution() const { return false; } |
| OperatorPtr child() { return _child; } |
| [[nodiscard]] bool followed_by_shuffled_operator() const { |
| return _followed_by_shuffled_operator; |
| } |
| void set_followed_by_shuffled_operator(bool followed_by_shuffled_operator) { |
| _followed_by_shuffled_operator = followed_by_shuffled_operator; |
| } |
| [[nodiscard]] virtual bool is_shuffled_operator() const { return false; } |
| [[nodiscard]] virtual DataDistribution required_data_distribution() const; |
| [[nodiscard]] virtual bool require_shuffled_data_distribution() const; |
| |
| protected: |
| OperatorPtr _child = nullptr; |
| |
| bool _is_closed; |
| bool _followed_by_shuffled_operator = false; |
| bool _is_serial_operator = false; |
| }; |
| |
| class PipelineXLocalStateBase { |
| public: |
| PipelineXLocalStateBase(RuntimeState* state, OperatorXBase* parent); |
| virtual ~PipelineXLocalStateBase() = default; |
| |
| template <class TARGET> |
| TARGET& cast() { |
| DCHECK(dynamic_cast<TARGET*>(this)) |
| << " Mismatch type! Current type is " << typeid(*this).name() |
| << " and expect type is" << typeid(TARGET).name(); |
| return reinterpret_cast<TARGET&>(*this); |
| } |
| template <class TARGET> |
| const TARGET& cast() const { |
| DCHECK(dynamic_cast<TARGET*>(this)) |
| << " Mismatch type! Current type is " << typeid(*this).name() |
| << " and expect type is" << typeid(TARGET).name(); |
| return reinterpret_cast<const TARGET&>(*this); |
| } |
| |
| // Do initialization. This step should be executed only once and in bthread, so we can do some |
| // lightweight or non-idempotent operations (e.g. init profile, clone expr ctx from operatorX) |
| virtual Status init(RuntimeState* state, LocalStateInfo& info) = 0; |
| // Do initialization. This step can be executed multiple times, so we should make sure it is |
| // idempotent (e.g. wait for runtime filters). |
| virtual Status open(RuntimeState* state) = 0; |
| virtual Status close(RuntimeState* state) = 0; |
| virtual Status terminate(RuntimeState* state) = 0; |
| |
| // If use projection, we should clear `_origin_block`. |
| void clear_origin_block(); |
| |
| void reached_limit(vectorized::Block* block, bool* eos); |
| RuntimeProfile* profile() { return _runtime_profile.get(); } |
| |
| RuntimeProfile::Counter* exec_time_counter() { return _exec_timer; } |
| RuntimeProfile::Counter* memory_used_counter() { return _memory_used_counter; } |
| OperatorXBase* parent() { return _parent; } |
| RuntimeState* state() { return _state; } |
| vectorized::VExprContextSPtrs& conjuncts() { return _conjuncts; } |
| vectorized::VExprContextSPtrs& projections() { return _projections; } |
| [[nodiscard]] int64_t num_rows_returned() const { return _num_rows_returned; } |
| void add_num_rows_returned(int64_t delta) { _num_rows_returned += delta; } |
| void set_num_rows_returned(int64_t value) { _num_rows_returned = value; } |
| |
| [[nodiscard]] virtual std::string debug_string(int indentation_level = 0) const = 0; |
| |
| virtual std::vector<Dependency*> dependencies() const { return {nullptr}; } |
| |
| // override in Scan |
| virtual Dependency* finishdependency() { return nullptr; } |
| virtual Dependency* spill_dependency() const { return nullptr; } |
| // override in Scan MultiCastSink |
| virtual std::vector<Dependency*> filter_dependencies() { return {}; } |
| |
| Status filter_block(const vectorized::VExprContextSPtrs& expr_contexts, |
| vectorized::Block* block, size_t column_to_keep); |
| |
| int64_t& estimate_memory_usage() { return _estimate_memory_usage; } |
| |
| void reset_estimate_memory_usage() { _estimate_memory_usage = 0; } |
| |
| bool low_memory_mode() { |
| #ifdef BE_TEST |
| return false; |
| #else |
| return _state->low_memory_mode(); |
| #endif |
| } |
| |
| protected: |
| friend class OperatorXBase; |
| template <typename LocalStateType> |
| friend class ScanOperatorX; |
| |
| ObjectPool* _pool = nullptr; |
| int64_t _num_rows_returned {0}; |
| int64_t _estimate_memory_usage {0}; |
| |
| std::unique_ptr<RuntimeProfile> _runtime_profile; |
| |
| RuntimeProfile::Counter* _rows_returned_counter = nullptr; |
| RuntimeProfile::Counter* _blocks_returned_counter = nullptr; |
| RuntimeProfile::Counter* _wait_for_dependency_timer = nullptr; |
| // Account for current memory and peak memory used by this node |
| RuntimeProfile::HighWaterMarkCounter* _memory_used_counter = nullptr; |
| RuntimeProfile::Counter* _projection_timer = nullptr; |
| RuntimeProfile::Counter* _exec_timer = nullptr; |
| RuntimeProfile::Counter* _init_timer = nullptr; |
| RuntimeProfile::Counter* _open_timer = nullptr; |
| RuntimeProfile::Counter* _close_timer = nullptr; |
| |
| OperatorXBase* _parent = nullptr; |
| RuntimeState* _state = nullptr; |
| vectorized::VExprContextSPtrs _conjuncts; |
| vectorized::VExprContextSPtrs _projections; |
| std::shared_ptr<vectorized::AnnTopNRuntime> _ann_topn_runtime; |
| // Used in common subexpression elimination to compute intermediate results. |
| std::vector<vectorized::VExprContextSPtrs> _intermediate_projections; |
| |
| bool _closed = false; |
| std::atomic<bool> _terminated = false; |
| vectorized::Block _origin_block; |
| }; |
| |
| template <typename SharedStateArg = FakeSharedState> |
| class PipelineXLocalState : public PipelineXLocalStateBase { |
| public: |
| using SharedStateType = SharedStateArg; |
| PipelineXLocalState(RuntimeState* state, OperatorXBase* parent) |
| : PipelineXLocalStateBase(state, parent) {} |
| ~PipelineXLocalState() override = default; |
| |
| Status init(RuntimeState* state, LocalStateInfo& info) override; |
| Status open(RuntimeState* state) override; |
| |
| virtual std::string name_suffix() const; |
| |
| Status close(RuntimeState* state) override; |
| Status terminate(RuntimeState* state) override; |
| |
| [[nodiscard]] std::string debug_string(int indentation_level = 0) const override; |
| |
| std::vector<Dependency*> dependencies() const override { |
| return _dependency ? std::vector<Dependency*> {_dependency} : std::vector<Dependency*> {}; |
| } |
| Dependency* spill_dependency() const override { return _spill_dependency.get(); } |
| |
| virtual bool must_set_shared_state() const { |
| return !std::is_same_v<SharedStateArg, FakeSharedState>; |
| } |
| |
| protected: |
| Dependency* _dependency = nullptr; |
| std::shared_ptr<Dependency> _spill_dependency; |
| SharedStateArg* _shared_state = nullptr; |
| }; |
| |
| template <typename SharedStateArg> |
| class PipelineXSpillLocalState : public PipelineXLocalState<SharedStateArg> { |
| public: |
| using Base = PipelineXLocalState<SharedStateArg>; |
| PipelineXSpillLocalState(RuntimeState* state, OperatorXBase* parent) |
| : PipelineXLocalState<SharedStateArg>(state, parent) {} |
| ~PipelineXSpillLocalState() override = default; |
| |
| Status init(RuntimeState* state, LocalStateInfo& info) override { |
| RETURN_IF_ERROR(PipelineXLocalState<SharedStateArg>::init(state, info)); |
| |
| init_spill_read_counters(); |
| |
| return Status::OK(); |
| } |
| |
| void init_spill_write_counters() { |
| _spill_write_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteTime", 1); |
| |
| _spill_write_wait_in_queue_task_count = ADD_COUNTER_WITH_LEVEL( |
| Base::profile(), "SpillWriteTaskWaitInQueueCount", TUnit::UNIT, 1); |
| _spill_writing_task_count = |
| ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteTaskCount", TUnit::UNIT, 1); |
| _spill_write_wait_in_queue_timer = |
| ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteTaskWaitInQueueTime", 1); |
| |
| _spill_write_file_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteFileTime", 1); |
| |
| _spill_write_serialize_block_timer = |
| ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteSerializeBlockTime", 1); |
| _spill_write_block_count = |
| ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteBlockCount", TUnit::UNIT, 1); |
| _spill_write_block_data_size = |
| ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteBlockBytes", TUnit::BYTES, 1); |
| _spill_write_file_total_size = |
| ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteFileBytes", TUnit::BYTES, 1); |
| _spill_write_rows_count = |
| ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteRows", TUnit::UNIT, 1); |
| _spill_file_total_count = |
| ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteFileTotalCount", TUnit::UNIT, 1); |
| } |
| |
| void init_spill_read_counters() { |
| _spill_total_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillTotalTime", 1); |
| |
| // Spill read counters |
| _spill_recover_time = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillRecoverTime", 1); |
| |
| _spill_read_wait_in_queue_task_count = ADD_COUNTER_WITH_LEVEL( |
| Base::profile(), "SpillReadTaskWaitInQueueCount", TUnit::UNIT, 1); |
| _spill_reading_task_count = |
| ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadTaskCount", TUnit::UNIT, 1); |
| _spill_read_wait_in_queue_timer = |
| ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillReadTaskWaitInQueueTime", 1); |
| |
| _spill_read_file_time = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillReadFileTime", 1); |
| _spill_read_derialize_block_timer = |
| ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillReadDerializeBlockTime", 1); |
| |
| _spill_read_block_count = |
| ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadBlockCount", TUnit::UNIT, 1); |
| _spill_read_block_data_size = |
| ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadBlockBytes", TUnit::BYTES, 1); |
| _spill_read_file_size = |
| ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadFileBytes", TUnit::BYTES, 1); |
| _spill_read_rows_count = |
| ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadRows", TUnit::UNIT, 1); |
| _spill_read_file_count = |
| ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadFileCount", TUnit::UNIT, 1); |
| |
| _spill_file_current_size = ADD_COUNTER_WITH_LEVEL( |
| Base::profile(), "SpillWriteFileCurrentBytes", TUnit::BYTES, 1); |
| _spill_file_current_count = ADD_COUNTER_WITH_LEVEL( |
| Base::profile(), "SpillWriteFileCurrentCount", TUnit::UNIT, 1); |
| } |
| |
| // These two counters are shared to spill source operators as the initial value |
| // Initialize values of counters 'SpillWriteFileCurrentBytes' and 'SpillWriteFileCurrentCount' |
| // from spill sink operators' "SpillWriteFileTotalCount" and "SpillWriteFileBytes" |
| void copy_shared_spill_profile() { |
| if (_copy_shared_spill_profile) { |
| _copy_shared_spill_profile = false; |
| const auto* spill_shared_state = (const BasicSpillSharedState*)Base::_shared_state; |
| COUNTER_UPDATE(_spill_file_current_size, |
| spill_shared_state->_spill_write_file_total_size->value()); |
| COUNTER_UPDATE(_spill_file_current_count, |
| spill_shared_state->_spill_file_total_count->value()); |
| Base::_shared_state->update_spill_stream_profiles(Base::profile()); |
| } |
| } |
| |
| // Total time of spill, including spill task scheduling time, |
| // serialize block time, write disk file time, |
| // and read disk file time, deserialize block time etc. |
| RuntimeProfile::Counter* _spill_total_timer = nullptr; |
| |
| // Spill write counters |
| // Total time of spill write, including serialize block time, write disk file, |
| // and wait in queue time, etc. |
| RuntimeProfile::Counter* _spill_write_timer = nullptr; |
| |
| RuntimeProfile::Counter* _spill_write_wait_in_queue_task_count = nullptr; |
| RuntimeProfile::Counter* _spill_writing_task_count = nullptr; |
| RuntimeProfile::Counter* _spill_write_wait_in_queue_timer = nullptr; |
| |
| // Total time of writing file |
| RuntimeProfile::Counter* _spill_write_file_timer = nullptr; |
| RuntimeProfile::Counter* _spill_write_serialize_block_timer = nullptr; |
| // Original count of spilled Blocks |
| // One Big Block maybe split into multiple small Blocks when actually written to disk file. |
| RuntimeProfile::Counter* _spill_write_block_count = nullptr; |
| // Total bytes of spill data in Block format(in memory format) |
| RuntimeProfile::Counter* _spill_write_block_data_size = nullptr; |
| // Total bytes of spill data written to disk file(after serialized) |
| RuntimeProfile::Counter* _spill_write_file_total_size = nullptr; |
| RuntimeProfile::Counter* _spill_write_rows_count = nullptr; |
| RuntimeProfile::Counter* _spill_file_total_count = nullptr; |
| RuntimeProfile::Counter* _spill_file_current_count = nullptr; |
| // Spilled file total size |
| RuntimeProfile::Counter* _spill_file_total_size = nullptr; |
| // Current spilled file size |
| RuntimeProfile::Counter* _spill_file_current_size = nullptr; |
| |
| // Spill read counters |
| // Total time of recovring spilled data, including read file time, deserialize time, etc. |
| RuntimeProfile::Counter* _spill_recover_time = nullptr; |
| |
| RuntimeProfile::Counter* _spill_read_wait_in_queue_task_count = nullptr; |
| RuntimeProfile::Counter* _spill_reading_task_count = nullptr; |
| RuntimeProfile::Counter* _spill_read_wait_in_queue_timer = nullptr; |
| |
| RuntimeProfile::Counter* _spill_read_file_time = nullptr; |
| RuntimeProfile::Counter* _spill_read_derialize_block_timer = nullptr; |
| RuntimeProfile::Counter* _spill_read_block_count = nullptr; |
| // Total bytes of read data in Block format(in memory format) |
| RuntimeProfile::Counter* _spill_read_block_data_size = nullptr; |
| // Total bytes of spill data read from disk file |
| RuntimeProfile::Counter* _spill_read_file_size = nullptr; |
| RuntimeProfile::Counter* _spill_read_rows_count = nullptr; |
| RuntimeProfile::Counter* _spill_read_file_count = nullptr; |
| |
| bool _copy_shared_spill_profile = true; |
| }; |
| |
| class DataSinkOperatorXBase; |
| |
| class PipelineXSinkLocalStateBase { |
| public: |
| PipelineXSinkLocalStateBase(DataSinkOperatorXBase* parent_, RuntimeState* state_); |
| virtual ~PipelineXSinkLocalStateBase() = default; |
| |
| // Do initialization. This step should be executed only once and in bthread, so we can do some |
| // lightweight or non-idempotent operations (e.g. init profile, clone expr ctx from operatorX) |
| virtual Status init(RuntimeState* state, LocalSinkStateInfo& info) = 0; |
| |
| // Do initialization. This step can be executed multiple times, so we should make sure it is |
| // idempotent (e.g. wait for runtime filters). |
| virtual Status open(RuntimeState* state) = 0; |
| virtual Status terminate(RuntimeState* state) = 0; |
| virtual Status close(RuntimeState* state, Status exec_status) = 0; |
| [[nodiscard]] virtual bool is_finished() const { return false; } |
| |
| [[nodiscard]] virtual std::string debug_string(int indentation_level) const = 0; |
| |
| template <class TARGET> |
| TARGET& cast() { |
| DCHECK(dynamic_cast<TARGET*>(this)) |
| << " Mismatch type! Current type is " << typeid(*this).name() |
| << " and expect type is" << typeid(TARGET).name(); |
| return reinterpret_cast<TARGET&>(*this); |
| } |
| template <class TARGET> |
| const TARGET& cast() const { |
| DCHECK(dynamic_cast<const TARGET*>(this)) |
| << " Mismatch type! Current type is " << typeid(*this).name() |
| << " and expect type is" << typeid(TARGET).name(); |
| return reinterpret_cast<const TARGET&>(*this); |
| } |
| |
| DataSinkOperatorXBase* parent() { return _parent; } |
| RuntimeState* state() { return _state; } |
| RuntimeProfile* profile() { return _profile; } |
| [[nodiscard]] RuntimeProfile* faker_runtime_profile() const { |
| return _faker_runtime_profile.get(); |
| } |
| |
| RuntimeProfile::Counter* rows_input_counter() { return _rows_input_counter; } |
| RuntimeProfile::Counter* exec_time_counter() { return _exec_timer; } |
| RuntimeProfile::Counter* memory_used_counter() { return _memory_used_counter; } |
| |
| virtual std::vector<Dependency*> dependencies() const { return {nullptr}; } |
| |
| // override in exchange sink , AsyncWriterSink |
| virtual Dependency* finishdependency() { return nullptr; } |
| virtual Dependency* spill_dependency() const { return nullptr; } |
| |
| bool low_memory_mode() { return _state->low_memory_mode(); } |
| |
| protected: |
| DataSinkOperatorXBase* _parent = nullptr; |
| RuntimeState* _state = nullptr; |
| RuntimeProfile* _profile = nullptr; |
| // Set to true after close() has been called. subclasses should check and set this in |
| // close(). |
| bool _closed = false; |
| bool _terminated = false; |
| //NOTICE: now add a faker profile, because sometimes the profile record is useless |
| //so we want remove some counters and timers, eg: in join node, if it's broadcast_join |
| //and shared hash table, some counter/timer about build hash table is useless, |
| //so we could add those counter/timer in faker profile, and those will not display in web profile. |
| std::unique_ptr<RuntimeProfile> _faker_runtime_profile = |
| std::make_unique<RuntimeProfile>("faker profile"); |
| |
| RuntimeProfile::Counter* _rows_input_counter = nullptr; |
| RuntimeProfile::Counter* _init_timer = nullptr; |
| RuntimeProfile::Counter* _open_timer = nullptr; |
| RuntimeProfile::Counter* _close_timer = nullptr; |
| RuntimeProfile::Counter* _wait_for_dependency_timer = nullptr; |
| RuntimeProfile::Counter* _wait_for_finish_dependency_timer = nullptr; |
| RuntimeProfile::Counter* _exec_timer = nullptr; |
| RuntimeProfile::HighWaterMarkCounter* _memory_used_counter = nullptr; |
| }; |
| |
| template <typename SharedStateArg = FakeSharedState> |
| class PipelineXSinkLocalState : public PipelineXSinkLocalStateBase { |
| public: |
| using SharedStateType = SharedStateArg; |
| PipelineXSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) |
| : PipelineXSinkLocalStateBase(parent, state) {} |
| ~PipelineXSinkLocalState() override = default; |
| |
| Status init(RuntimeState* state, LocalSinkStateInfo& info) override; |
| |
| Status open(RuntimeState* state) override { return Status::OK(); } |
| |
| Status terminate(RuntimeState* state) override; |
| Status close(RuntimeState* state, Status exec_status) override; |
| |
| [[nodiscard]] std::string debug_string(int indentation_level) const override; |
| |
| virtual std::string name_suffix(); |
| |
| std::vector<Dependency*> dependencies() const override { |
| return _dependency ? std::vector<Dependency*> {_dependency} : std::vector<Dependency*> {}; |
| } |
| Dependency* spill_dependency() const override { return _spill_dependency.get(); } |
| |
| virtual bool must_set_shared_state() const { |
| return !std::is_same_v<SharedStateArg, FakeSharedState>; |
| } |
| |
| protected: |
| Dependency* _dependency = nullptr; |
| std::shared_ptr<Dependency> _spill_dependency; |
| SharedStateType* _shared_state = nullptr; |
| }; |
| |
| class DataSinkOperatorXBase : public OperatorBase { |
| public: |
| DataSinkOperatorXBase(const int operator_id, const int node_id, const int dest_id) |
| : _operator_id(operator_id), _node_id(node_id), _dests_id({dest_id}) {} |
| DataSinkOperatorXBase(const int operator_id, const TPlanNode& tnode, const int dest_id) |
| : OperatorBase(tnode.__isset.is_serial_operator && tnode.is_serial_operator), |
| _operator_id(operator_id), |
| _node_id(tnode.node_id), |
| _dests_id({dest_id}) {} |
| |
| DataSinkOperatorXBase(const int operator_id, const int node_id, std::vector<int>& dests) |
| : _operator_id(operator_id), _node_id(node_id), _dests_id(dests) {} |
| |
| #ifdef BE_TEST |
| DataSinkOperatorXBase() : _operator_id(-1), _node_id(0), _dests_id({-1}) {}; |
| #endif |
| |
| ~DataSinkOperatorXBase() override = default; |
| |
| // For agg/sort/join sink. |
| virtual Status init(const TPlanNode& tnode, RuntimeState* state); |
| |
| Status init(const TDataSink& tsink) override; |
| [[nodiscard]] virtual Status init(ExchangeType type, const int num_buckets, |
| const bool use_global_hash_shuffle, |
| const std::map<int, int>& shuffle_idx_to_instance_idx) { |
| return Status::InternalError("init() is only implemented in local exchange!"); |
| } |
| |
| Status prepare(RuntimeState* state) override { return Status::OK(); } |
| Status terminate(RuntimeState* state) override; |
| [[nodiscard]] bool is_finished(RuntimeState* state) const { |
| auto result = state->get_sink_local_state_result(); |
| if (!result) { |
| return result.error(); |
| } |
| return result.value()->is_finished(); |
| } |
| |
| [[nodiscard]] virtual Status sink(RuntimeState* state, vectorized::Block* block, bool eos) = 0; |
| |
| [[nodiscard]] virtual Status setup_local_state(RuntimeState* state, |
| LocalSinkStateInfo& info) = 0; |
| |
| [[nodiscard]] virtual size_t get_reserve_mem_size(RuntimeState* state, bool eos) { |
| return state->minimum_operator_memory_required_bytes(); |
| } |
| |
| [[nodiscard]] bool is_spillable() const { return _spillable; } |
| |
| template <class TARGET> |
| TARGET& cast() { |
| DCHECK(dynamic_cast<TARGET*>(this)) |
| << " Mismatch type! Current type is " << typeid(*this).name() |
| << " and expect type is" << typeid(TARGET).name(); |
| return reinterpret_cast<TARGET&>(*this); |
| } |
| template <class TARGET> |
| const TARGET& cast() const { |
| DCHECK(dynamic_cast<const TARGET*>(this)) |
| << " Mismatch type! Current type is " << typeid(*this).name() |
| << " and expect type is" << typeid(TARGET).name(); |
| return reinterpret_cast<const TARGET&>(*this); |
| } |
| |
| [[nodiscard]] virtual std::shared_ptr<BasicSharedState> create_shared_state() const = 0; |
| |
| Status close(RuntimeState* state) override { |
| return Status::InternalError("Should not reach here!"); |
| } |
| |
| [[nodiscard]] virtual std::string debug_string(int indentation_level) const; |
| |
| [[nodiscard]] virtual std::string debug_string(RuntimeState* state, |
| int indentation_level) const; |
| |
| [[nodiscard]] bool is_sink() const override { return true; } |
| |
| static Status close(RuntimeState* state, Status exec_status) { |
| auto result = state->get_sink_local_state_result(); |
| if (!result) { |
| return result.error(); |
| } |
| return result.value()->close(state, exec_status); |
| } |
| |
| [[nodiscard]] int operator_id() const { return _operator_id; } |
| |
| [[nodiscard]] const std::vector<int>& dests_id() const { return _dests_id; } |
| |
| [[nodiscard]] int nereids_id() const { return _nereids_id; } |
| |
| [[nodiscard]] int node_id() const override { return _node_id; } |
| |
| [[nodiscard]] std::string get_name() const override { return _name; } |
| |
| virtual bool should_dry_run(RuntimeState* state) { return false; } |
| |
| [[nodiscard]] virtual bool count_down_destination() { return true; } |
| |
| protected: |
| template <typename Writer, typename Parent> |
| requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>) |
| friend class AsyncWriterSink; |
| // _operator_id : the current Operator's ID, which is not visible to the user. |
| // _node_id : the plan node ID corresponding to the Operator, which is visible on the profile. |
| // _dests_id : the target _operator_id of the sink, for example, in the case of a multi-sink, there are multiple targets. |
| const int _operator_id; |
| const int _node_id; |
| int _nereids_id = -1; |
| bool _spillable = false; |
| std::vector<int> _dests_id; |
| std::string _name; |
| }; |
| |
| template <typename LocalStateType> |
| class DataSinkOperatorX : public DataSinkOperatorXBase { |
| public: |
| DataSinkOperatorX(const int id, const int node_id, const int dest_id) |
| : DataSinkOperatorXBase(id, node_id, dest_id) {} |
| DataSinkOperatorX(const int id, const TPlanNode& tnode, const int dest_id) |
| : DataSinkOperatorXBase(id, tnode, dest_id) {} |
| |
| DataSinkOperatorX(const int id, const int node_id, std::vector<int> dest_ids) |
| : DataSinkOperatorXBase(id, node_id, dest_ids) {} |
| #ifdef BE_TEST |
| DataSinkOperatorX() = default; |
| #endif |
| ~DataSinkOperatorX() override = default; |
| |
| Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) override; |
| std::shared_ptr<BasicSharedState> create_shared_state() const override; |
| |
| using LocalState = LocalStateType; |
| [[nodiscard]] LocalState& get_local_state(RuntimeState* state) const { |
| return state->get_sink_local_state()->template cast<LocalState>(); |
| } |
| }; |
| |
| template <typename SharedStateArg> |
| class PipelineXSpillSinkLocalState : public PipelineXSinkLocalState<SharedStateArg> { |
| public: |
| using Base = PipelineXSinkLocalState<SharedStateArg>; |
| PipelineXSpillSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) |
| : Base(parent, state) {} |
| ~PipelineXSpillSinkLocalState() override = default; |
| |
| Status init(RuntimeState* state, LocalSinkStateInfo& info) override { |
| RETURN_IF_ERROR(Base::init(state, info)); |
| init_spill_counters(); |
| return Status::OK(); |
| } |
| |
| void init_spill_counters() { |
| _spill_total_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillTotalTime", 1); |
| |
| _spill_write_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteTime", 1); |
| |
| _spill_write_wait_in_queue_task_count = ADD_COUNTER_WITH_LEVEL( |
| Base::profile(), "SpillWriteTaskWaitInQueueCount", TUnit::UNIT, 1); |
| _spill_writing_task_count = |
| ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteTaskCount", TUnit::UNIT, 1); |
| _spill_write_wait_in_queue_timer = |
| ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteTaskWaitInQueueTime", 1); |
| |
| _spill_write_file_timer = ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteFileTime", 1); |
| |
| _spill_write_serialize_block_timer = |
| ADD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteSerializeBlockTime", 1); |
| _spill_write_block_count = |
| ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteBlockCount", TUnit::UNIT, 1); |
| _spill_write_block_data_size = |
| ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteBlockBytes", TUnit::BYTES, 1); |
| _spill_write_rows_count = |
| ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteRows", TUnit::UNIT, 1); |
| |
| _spill_max_rows_of_partition = |
| ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillMaxRowsOfPartition", TUnit::UNIT, 1); |
| _spill_min_rows_of_partition = |
| ADD_COUNTER_WITH_LEVEL(Base::profile(), "SpillMinRowsOfPartition", TUnit::UNIT, 1); |
| } |
| |
| std::vector<Dependency*> dependencies() const override { |
| auto dependencies = Base::dependencies(); |
| return dependencies; |
| } |
| |
| void update_max_min_rows_counter() { |
| int64_t max_rows = 0; |
| int64_t min_rows = std::numeric_limits<int64_t>::max(); |
| |
| for (auto rows : _rows_in_partitions) { |
| if (rows > max_rows) { |
| max_rows = rows; |
| } |
| if (rows < min_rows) { |
| min_rows = rows; |
| } |
| } |
| |
| COUNTER_SET(_spill_max_rows_of_partition, max_rows); |
| COUNTER_SET(_spill_min_rows_of_partition, min_rows); |
| } |
| |
| std::vector<int64_t> _rows_in_partitions; |
| |
| // Total time of spill, including spill task scheduling time, |
| // serialize block time, write disk file time, |
| // and read disk file time, deserialize block time etc. |
| RuntimeProfile::Counter* _spill_total_timer = nullptr; |
| |
| // Spill write counters |
| // Total time of spill write, including serialize block time, write disk file, |
| // and wait in queue time, etc. |
| RuntimeProfile::Counter* _spill_write_timer = nullptr; |
| |
| RuntimeProfile::Counter* _spill_write_wait_in_queue_task_count = nullptr; |
| RuntimeProfile::Counter* _spill_writing_task_count = nullptr; |
| RuntimeProfile::Counter* _spill_write_wait_in_queue_timer = nullptr; |
| |
| // Total time of writing file |
| RuntimeProfile::Counter* _spill_write_file_timer = nullptr; |
| RuntimeProfile::Counter* _spill_write_serialize_block_timer = nullptr; |
| // Original count of spilled Blocks |
| // One Big Block maybe split into multiple small Blocks when actually written to disk file. |
| RuntimeProfile::Counter* _spill_write_block_count = nullptr; |
| // Total bytes of spill data in Block format(in memory format) |
| RuntimeProfile::Counter* _spill_write_block_data_size = nullptr; |
| RuntimeProfile::Counter* _spill_write_rows_count = nullptr; |
| // Spilled file total size |
| RuntimeProfile::Counter* _spill_file_total_size = nullptr; |
| |
| RuntimeProfile::Counter* _spill_max_rows_of_partition = nullptr; |
| RuntimeProfile::Counter* _spill_min_rows_of_partition = nullptr; |
| }; |
| |
| class OperatorXBase : public OperatorBase { |
| public: |
| OperatorXBase(ObjectPool* pool, const TPlanNode& tnode, const int operator_id, |
| const DescriptorTbl& descs) |
| : OperatorBase(tnode.__isset.is_serial_operator && tnode.is_serial_operator), |
| _operator_id(operator_id), |
| _node_id(tnode.node_id), |
| _type(tnode.node_type), |
| _pool(pool), |
| _tuple_ids(tnode.row_tuples), |
| _row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples), |
| _resource_profile(tnode.resource_profile), |
| _limit(tnode.limit) { |
| if (tnode.__isset.output_tuple_id) { |
| LOG_INFO("Operator {}, node_id {}, output_tuple_id {}", this->_op_name, tnode.node_id, |
| tnode.output_tuple_id); |
| _output_row_descriptor.reset(new RowDescriptor(descs, {tnode.output_tuple_id}, {true})); |
| } |
| if (!tnode.intermediate_output_tuple_id_list.empty()) { |
| LOG_INFO("Operator {}, node_id {}, intermediate_output_tuple_id_list: [{}]", |
| this->_op_name, tnode.node_id, |
| fmt::join(tnode.intermediate_output_tuple_id_list, ",")); |
| // common subexpression elimination |
| _intermediate_output_row_descriptor.reserve( |
| tnode.intermediate_output_tuple_id_list.size()); |
| for (auto output_tuple_id : tnode.intermediate_output_tuple_id_list) { |
| _intermediate_output_row_descriptor.push_back( |
| RowDescriptor(descs, std::vector {output_tuple_id}, std::vector {true})); |
| } |
| } |
| } |
| |
| OperatorXBase(ObjectPool* pool, int node_id, int operator_id) |
| : OperatorBase(), |
| _operator_id(operator_id), |
| _node_id(node_id), |
| _pool(pool), |
| _limit(-1) {} |
| |
| #ifdef BE_TEST |
| OperatorXBase() : _operator_id(-1), _node_id(0), _limit(-1) {}; |
| #endif |
| virtual Status init(const TPlanNode& tnode, RuntimeState* state); |
| Status init(const TDataSink& tsink) override { |
| throw Exception(Status::FatalError("should not reach here!")); |
| } |
| virtual Status init(ExchangeType type) { |
| throw Exception(Status::FatalError("should not reach here!")); |
| } |
| [[noreturn]] virtual const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() { |
| throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, _op_name); |
| } |
| [[nodiscard]] std::string get_name() const override { return _op_name; } |
| [[nodiscard]] virtual bool need_more_input_data(RuntimeState* state) const { return true; } |
| |
| // Tablets should be hold before open phase. |
| [[nodiscard]] virtual Status hold_tablets(RuntimeState* state) { return Status::OK(); } |
| Status prepare(RuntimeState* state) override; |
| |
| Status terminate(RuntimeState* state) override; |
| [[nodiscard]] virtual Status get_block(RuntimeState* state, vectorized::Block* block, |
| bool* eos) = 0; |
| |
| Status close(RuntimeState* state) override; |
| |
| [[nodiscard]] virtual const RowDescriptor& intermediate_row_desc() const { |
| return _row_descriptor; |
| } |
| |
| [[nodiscard]] const RowDescriptor& intermediate_row_desc(int idx) { |
| if (idx == 0) { |
| return intermediate_row_desc(); |
| } |
| DCHECK((idx - 1) < _intermediate_output_row_descriptor.size()); |
| return _intermediate_output_row_descriptor[idx - 1]; |
| } |
| |
| [[nodiscard]] const RowDescriptor& projections_row_desc() const { |
| if (_intermediate_output_row_descriptor.empty()) { |
| return intermediate_row_desc(); |
| } else { |
| return _intermediate_output_row_descriptor.back(); |
| } |
| } |
| |
| size_t revocable_mem_size(RuntimeState* state) const override { |
| return (_child and !is_source()) ? _child->revocable_mem_size(state) : 0; |
| } |
| |
| // If this method is not overwrite by child, its default value is 1MB |
| [[nodiscard]] virtual size_t get_reserve_mem_size(RuntimeState* state) { |
| return state->minimum_operator_memory_required_bytes(); |
| } |
| |
| virtual std::string debug_string(int indentation_level = 0) const; |
| |
| virtual std::string debug_string(RuntimeState* state, int indentation_level = 0) const; |
| |
| virtual Status setup_local_state(RuntimeState* state, LocalStateInfo& info) = 0; |
| |
| template <class TARGET> |
| TARGET& cast() { |
| DCHECK(dynamic_cast<TARGET*>(this)) |
| << " Mismatch type! Current type is " << typeid(*this).name() |
| << " and expect type is" << typeid(TARGET).name(); |
| return reinterpret_cast<TARGET&>(*this); |
| } |
| template <class TARGET> |
| const TARGET& cast() const { |
| DCHECK(dynamic_cast<const TARGET*>(this)) |
| << " Mismatch type! Current type is " << typeid(*this).name() |
| << " and expect type is" << typeid(TARGET).name(); |
| return reinterpret_cast<const TARGET&>(*this); |
| } |
| |
| [[nodiscard]] OperatorPtr get_child() { return _child; } |
| |
| [[nodiscard]] vectorized::VExprContextSPtrs& conjuncts() { return _conjuncts; } |
| [[nodiscard]] virtual RowDescriptor& row_descriptor() { return _row_descriptor; } |
| |
| [[nodiscard]] int operator_id() const { return _operator_id; } |
| [[nodiscard]] int node_id() const override { return _node_id; } |
| [[nodiscard]] int nereids_id() const { return _nereids_id; } |
| |
| [[nodiscard]] int64_t limit() const { return _limit; } |
| |
| [[nodiscard]] const RowDescriptor& row_desc() const override { |
| return _output_row_descriptor ? *_output_row_descriptor : _row_descriptor; |
| } |
| |
| [[nodiscard]] const RowDescriptor* output_row_descriptor() { |
| return _output_row_descriptor.get(); |
| } |
| |
| bool has_output_row_desc() const { return _output_row_descriptor != nullptr; } |
| |
| [[nodiscard]] virtual Status get_block_after_projects(RuntimeState* state, |
| vectorized::Block* block, bool* eos); |
| |
| /// Only use in vectorized exec engine try to do projections to trans _row_desc -> _output_row_desc |
| Status do_projections(RuntimeState* state, vectorized::Block* origin_block, |
| vectorized::Block* output_block) const; |
| void set_parallel_tasks(int parallel_tasks) { _parallel_tasks = parallel_tasks; } |
| int parallel_tasks() const { return _parallel_tasks; } |
| |
| // To keep compatibility with older FE |
| void set_serial_operator() { _is_serial_operator = true; } |
| |
| virtual void reset_reserve_mem_size(RuntimeState* state) {} |
| |
| protected: |
| template <typename Dependency> |
| friend class PipelineXLocalState; |
| friend class PipelineXLocalStateBase; |
| friend class Scanner; |
| const int _operator_id; |
| const int _node_id; // unique w/in single plan tree |
| int _nereids_id = -1; |
| TPlanNodeType::type _type; |
| ObjectPool* _pool = nullptr; |
| std::vector<TupleId> _tuple_ids; |
| |
| private: |
| // The expr of operator set to private permissions, as cannot be executed concurrently, |
| // should use local state's expr. |
| vectorized::VExprContextSPtrs _conjuncts; |
| vectorized::VExprContextSPtrs _projections; |
| // Used in common subexpression elimination to compute intermediate results. |
| std::vector<vectorized::VExprContextSPtrs> _intermediate_projections; |
| |
| protected: |
| RowDescriptor _row_descriptor; |
| std::unique_ptr<RowDescriptor> _output_row_descriptor = nullptr; |
| std::vector<RowDescriptor> _intermediate_output_row_descriptor; |
| |
| /// Resource information sent from the frontend. |
| const TBackendResourceProfile _resource_profile; |
| |
| int64_t _limit; // -1: no limit |
| |
| uint32_t _debug_point_count = 0; |
| std::atomic_uint32_t _bytes_per_row = 0; |
| |
| std::string _op_name; |
| int _parallel_tasks = 0; |
| |
| //_keep_origin is used to avoid copying during projection, |
| // currently set to false only in the nestloop join. |
| bool _keep_origin = true; |
| }; |
| |
| template <typename LocalStateType> |
| class OperatorX : public OperatorXBase { |
| public: |
| OperatorX(ObjectPool* pool, const TPlanNode& tnode, const int operator_id, |
| const DescriptorTbl& descs) |
| : OperatorXBase(pool, tnode, operator_id, descs) {} |
| OperatorX(ObjectPool* pool, int node_id, int operator_id) |
| : OperatorXBase(pool, node_id, operator_id) {}; |
| |
| #ifdef BE_TEST |
| OperatorX() = default; |
| #endif |
| |
| ~OperatorX() override = default; |
| |
| Status setup_local_state(RuntimeState* state, LocalStateInfo& info) override; |
| using LocalState = LocalStateType; |
| [[nodiscard]] LocalState& get_local_state(RuntimeState* state) const { |
| return state->get_local_state(operator_id())->template cast<LocalState>(); |
| } |
| |
| size_t get_reserve_mem_size(RuntimeState* state) override { |
| auto& local_state = get_local_state(state); |
| auto estimated_size = local_state.estimate_memory_usage(); |
| if (estimated_size < state->minimum_operator_memory_required_bytes()) { |
| estimated_size = state->minimum_operator_memory_required_bytes(); |
| } |
| if (!is_source() && _child) { |
| auto child_reserve_size = _child->get_reserve_mem_size(state); |
| estimated_size += |
| std::max(state->minimum_operator_memory_required_bytes(), child_reserve_size); |
| } |
| return estimated_size; |
| } |
| |
| void reset_reserve_mem_size(RuntimeState* state) override { |
| auto& local_state = get_local_state(state); |
| local_state.reset_estimate_memory_usage(); |
| |
| if (!is_source() && _child) { |
| _child->reset_reserve_mem_size(state); |
| } |
| } |
| }; |
| |
| /** |
| * StreamingOperatorX indicates operators which always processes block in streaming way (one-in-one-out). |
| */ |
| template <typename LocalStateType> |
| class StreamingOperatorX : public OperatorX<LocalStateType> { |
| public: |
| StreamingOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id, |
| const DescriptorTbl& descs) |
| : OperatorX<LocalStateType>(pool, tnode, operator_id, descs) {} |
| |
| #ifdef BE_TEST |
| StreamingOperatorX() = default; |
| #endif |
| |
| virtual ~StreamingOperatorX() = default; |
| |
| Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override; |
| |
| virtual Status pull(RuntimeState* state, vectorized::Block* block, bool* eos) = 0; |
| }; |
| |
| /** |
| * StatefulOperatorX indicates the operators with some states inside. |
| * |
| * Specifically, we called an operator stateful if an operator can determine its output by itself. |
| * For example, hash join probe operator is a typical StatefulOperator. When it gets a block from probe side, it will hold this block inside (e.g. _child_block). |
| * If there are still remain rows in probe block, we can get output block by calling `get_block` without any data from its child. |
| * In a nutshell, it is a one-to-many relation between input blocks and output blocks for StatefulOperator. |
| */ |
| template <typename LocalStateType> |
| class StatefulOperatorX : public OperatorX<LocalStateType> { |
| public: |
| StatefulOperatorX(ObjectPool* pool, const TPlanNode& tnode, const int operator_id, |
| const DescriptorTbl& descs) |
| : OperatorX<LocalStateType>(pool, tnode, operator_id, descs) {} |
| #ifdef BE_TEST |
| StatefulOperatorX() = default; |
| #endif |
| virtual ~StatefulOperatorX() = default; |
| |
| using OperatorX<LocalStateType>::get_local_state; |
| |
| [[nodiscard]] Status get_block(RuntimeState* state, vectorized::Block* block, |
| bool* eos) override; |
| |
| [[nodiscard]] virtual Status pull(RuntimeState* state, vectorized::Block* block, |
| bool* eos) const = 0; |
| [[nodiscard]] virtual Status push(RuntimeState* state, vectorized::Block* input_block, |
| bool eos) const = 0; |
| bool need_more_input_data(RuntimeState* state) const override { return true; } |
| }; |
| |
| template <typename Writer, typename Parent> |
| requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>) |
| class AsyncWriterSink : public PipelineXSinkLocalState<BasicSharedState> { |
| public: |
| using Base = PipelineXSinkLocalState<BasicSharedState>; |
| AsyncWriterSink(DataSinkOperatorXBase* parent, RuntimeState* state) |
| : Base(parent, state), _async_writer_dependency(nullptr) { |
| _finish_dependency = |
| std::make_shared<Dependency>(parent->operator_id(), parent->node_id(), |
| parent->get_name() + "_FINISH_DEPENDENCY", true); |
| } |
| |
| Status init(RuntimeState* state, LocalSinkStateInfo& info) override; |
| |
| Status open(RuntimeState* state) override; |
| |
| Status sink(RuntimeState* state, vectorized::Block* block, bool eos); |
| |
| std::vector<Dependency*> dependencies() const override { |
| return {_async_writer_dependency.get()}; |
| } |
| Status close(RuntimeState* state, Status exec_status) override; |
| |
| Dependency* finishdependency() override { return _finish_dependency.get(); } |
| |
| protected: |
| vectorized::VExprContextSPtrs _output_vexpr_ctxs; |
| std::unique_ptr<Writer> _writer; |
| |
| std::shared_ptr<Dependency> _async_writer_dependency; |
| std::shared_ptr<Dependency> _finish_dependency; |
| }; |
| |
| #ifdef BE_TEST |
| class DummyOperatorLocalState final : public PipelineXLocalState<FakeSharedState> { |
| public: |
| ENABLE_FACTORY_CREATOR(DummyOperatorLocalState); |
| |
| DummyOperatorLocalState(RuntimeState* state, OperatorXBase* parent) |
| : PipelineXLocalState<FakeSharedState>(state, parent) { |
| _tmp_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), |
| "DummyOperatorDependency", true); |
| _finish_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), |
| "DummyOperatorDependency", true); |
| _filter_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), |
| "DummyOperatorDependency", true); |
| _spill_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), |
| "DummyOperatorDependency", true); |
| } |
| Dependency* finishdependency() override { return _finish_dependency.get(); } |
| ~DummyOperatorLocalState() = default; |
| |
| std::vector<Dependency*> dependencies() const override { return {_tmp_dependency.get()}; } |
| std::vector<Dependency*> filter_dependencies() override { return {_filter_dependency.get()}; } |
| Dependency* spill_dependency() const override { return _spill_dependency.get(); } |
| |
| private: |
| std::shared_ptr<Dependency> _tmp_dependency; |
| std::shared_ptr<Dependency> _finish_dependency; |
| std::shared_ptr<Dependency> _filter_dependency; |
| }; |
| |
| class DummyOperator final : public OperatorX<DummyOperatorLocalState> { |
| public: |
| DummyOperator() : OperatorX<DummyOperatorLocalState>(nullptr, 0, 0) {} |
| |
| [[nodiscard]] bool is_source() const override { return true; } |
| |
| Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override { |
| *eos = _eos; |
| return Status::OK(); |
| } |
| void set_low_memory_mode(RuntimeState* state) override { _low_memory_mode = true; } |
| Status terminate(RuntimeState* state) override { |
| _terminated = true; |
| return Status::OK(); |
| } |
| size_t revocable_mem_size(RuntimeState* state) const override { return _revocable_mem_size; } |
| size_t get_reserve_mem_size(RuntimeState* state) override { |
| return _disable_reserve_mem |
| ? 0 |
| : OperatorX<DummyOperatorLocalState>::get_reserve_mem_size(state); |
| } |
| |
| private: |
| friend class AssertNumRowsLocalState; |
| bool _eos = false; |
| bool _low_memory_mode = false; |
| bool _terminated = false; |
| size_t _revocable_mem_size = 0; |
| bool _disable_reserve_mem = false; |
| }; |
| |
| class DummySinkLocalState final : public PipelineXSinkLocalState<BasicSharedState> { |
| public: |
| using Base = PipelineXSinkLocalState<BasicSharedState>; |
| ENABLE_FACTORY_CREATOR(DummySinkLocalState); |
| DummySinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) : Base(parent, state) { |
| _tmp_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), |
| "DummyOperatorDependency", true); |
| _finish_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), |
| "DummyOperatorDependency", true); |
| _spill_dependency = Dependency::create_shared(_parent->operator_id(), _parent->node_id(), |
| "DummyOperatorDependency", true); |
| } |
| |
| std::vector<Dependency*> dependencies() const override { return {_tmp_dependency.get()}; } |
| Dependency* finishdependency() override { return _finish_dependency.get(); } |
| Dependency* spill_dependency() const override { return _spill_dependency.get(); } |
| bool is_finished() const override { return _is_finished; } |
| |
| private: |
| std::shared_ptr<Dependency> _tmp_dependency; |
| std::shared_ptr<Dependency> _finish_dependency; |
| std::atomic_bool _is_finished = false; |
| }; |
| |
| class DummySinkOperatorX final : public DataSinkOperatorX<DummySinkLocalState> { |
| public: |
| DummySinkOperatorX(int op_id, int node_id, int dest_id) |
| : DataSinkOperatorX<DummySinkLocalState>(op_id, node_id, dest_id) {} |
| Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override { |
| return _return_eof ? Status::Error<ErrorCode::END_OF_FILE>("source have closed") |
| : Status::OK(); |
| } |
| void set_low_memory_mode(RuntimeState* state) override { _low_memory_mode = true; } |
| Status terminate(RuntimeState* state) override { |
| _terminated = true; |
| return Status::OK(); |
| } |
| size_t revocable_mem_size(RuntimeState* state) const override { return _revocable_mem_size; } |
| size_t get_reserve_mem_size(RuntimeState* state, bool eos) override { |
| return _disable_reserve_mem |
| ? 0 |
| : DataSinkOperatorX<DummySinkLocalState>::get_reserve_mem_size(state, eos); |
| } |
| |
| private: |
| bool _low_memory_mode = false; |
| bool _terminated = false; |
| std::atomic_bool _return_eof = false; |
| size_t _revocable_mem_size = 0; |
| bool _disable_reserve_mem = false; |
| }; |
| #endif |
| |
| #include "common/compile_check_end.h" |
| } // namespace doris::pipeline |