blob: b470c9237e84027b27e32818aaa64d62e4a68887 [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
#pragma once
#include "common/logging.h"
#include "pipeline/exec/operator.h"
#include "pipeline/pipeline_x/dependency.h"
#include "pipeline/pipeline_x/local_exchange/local_exchanger.h"
namespace doris::vectorized {
class AsyncResultWriter;
}
namespace doris::pipeline {
// This struct is used only for initializing local state.
struct LocalStateInfo {
RuntimeProfile* parent_profile = nullptr;
const std::vector<TScanRangeParams> scan_ranges;
BasicSharedState* shared_state;
std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, std::shared_ptr<Dependency>>>
le_state_map;
const int task_idx;
};
// This struct is used only for initializing local sink state.
struct LocalSinkStateInfo {
const int task_idx;
RuntimeProfile* parent_profile = nullptr;
const int sender_id;
BasicSharedState* shared_state;
std::map<int, std::pair<std::shared_ptr<LocalExchangeSharedState>, std::shared_ptr<Dependency>>>
le_state_map;
const TDataSink& tsink;
};
class PipelineXLocalStateBase {
public:
PipelineXLocalStateBase(RuntimeState* state, OperatorXBase* parent);
virtual ~PipelineXLocalStateBase() = default;
template <class TARGET>
TARGET& cast() {
DCHECK(dynamic_cast<TARGET*>(this))
<< " Mismatch type! Current type is " << typeid(*this).name()
<< " and expect type is" << typeid(TARGET).name();
return reinterpret_cast<TARGET&>(*this);
}
template <class TARGET>
const TARGET& cast() const {
DCHECK(dynamic_cast<TARGET*>(this))
<< " Mismatch type! Current type is " << typeid(*this).name()
<< " and expect type is" << typeid(TARGET).name();
return reinterpret_cast<const TARGET&>(*this);
}
// Do initialization. This step should be executed only once and in bthread, so we can do some
// lightweight or non-idempotent operations (e.g. init profile, clone expr ctx from operatorX)
virtual Status init(RuntimeState* state, LocalStateInfo& info) = 0;
// Do initialization. This step can be executed multiple times, so we should make sure it is
// idempotent (e.g. wait for runtime filters).
virtual Status open(RuntimeState* state) = 0;
virtual Status close(RuntimeState* state) = 0;
// If use projection, we should clear `_origin_block`.
void clear_origin_block();
[[nodiscard]] bool reached_limit() const;
void reached_limit(vectorized::Block* block, bool* eos);
RuntimeProfile* profile() { return _runtime_profile.get(); }
MemTracker* mem_tracker() { return _mem_tracker.get(); }
RuntimeProfile::Counter* rows_returned_counter() { return _rows_returned_counter; }
RuntimeProfile::Counter* blocks_returned_counter() { return _blocks_returned_counter; }
RuntimeProfile::Counter* exec_time_counter() { return _exec_timer; }
OperatorXBase* parent() { return _parent; }
RuntimeState* state() { return _state; }
vectorized::VExprContextSPtrs& conjuncts() { return _conjuncts; }
vectorized::VExprContextSPtrs& projections() { return _projections; }
[[nodiscard]] int64_t num_rows_returned() const { return _num_rows_returned; }
void add_num_rows_returned(int64_t delta) { _num_rows_returned += delta; }
void set_num_rows_returned(int64_t value) { _num_rows_returned = value; }
[[nodiscard]] virtual std::string debug_string(int indentation_level = 0) const = 0;
virtual std::vector<Dependency*> dependencies() const { return {nullptr}; }
// override in Scan
virtual Dependency* finishdependency() { return nullptr; }
// override in Scan MultiCastSink
virtual std::vector<Dependency*> filter_dependencies() { return {}; }
std::shared_ptr<QueryStatistics> get_query_statistics_ptr() { return _query_statistics; }
protected:
friend class OperatorXBase;
ObjectPool* _pool = nullptr;
int64_t _num_rows_returned {0};
std::unique_ptr<RuntimeProfile> _runtime_profile;
// Record this node memory size. it is expected that artificial guarantees are accurate,
// which will providea reference for operator memory.
std::unique_ptr<MemTracker> _mem_tracker;
std::shared_ptr<QueryStatistics> _query_statistics = nullptr;
RuntimeProfile::Counter* _rows_returned_counter = nullptr;
RuntimeProfile::Counter* _blocks_returned_counter = nullptr;
RuntimeProfile::Counter* _wait_for_dependency_timer = nullptr;
RuntimeProfile::Counter* _memory_used_counter = nullptr;
RuntimeProfile::Counter* _projection_timer = nullptr;
RuntimeProfile::Counter* _exec_timer = nullptr;
// Account for peak memory used by this node
RuntimeProfile::Counter* _peak_memory_usage_counter = nullptr;
RuntimeProfile::Counter* _init_timer = nullptr;
RuntimeProfile::Counter* _open_timer = nullptr;
RuntimeProfile::Counter* _close_timer = nullptr;
OperatorXBase* _parent = nullptr;
RuntimeState* _state = nullptr;
vectorized::VExprContextSPtrs _conjuncts;
vectorized::VExprContextSPtrs _projections;
// Used in common subexpression elimination to compute intermediate results.
std::vector<vectorized::VExprContextSPtrs> _intermediate_projections;
bool _closed = false;
vectorized::Block _origin_block;
};
class OperatorXBase : public OperatorBase {
public:
OperatorXBase(ObjectPool* pool, const TPlanNode& tnode, const int operator_id,
const DescriptorTbl& descs)
: OperatorBase(nullptr),
_operator_id(operator_id),
_node_id(tnode.node_id),
_type(tnode.node_type),
_pool(pool),
_tuple_ids(tnode.row_tuples),
_row_descriptor(descs, tnode.row_tuples, tnode.nullable_tuples),
_resource_profile(tnode.resource_profile),
_limit(tnode.limit) {
if (tnode.__isset.output_tuple_id) {
_output_row_descriptor.reset(new RowDescriptor(descs, {tnode.output_tuple_id}, {true}));
}
if (tnode.__isset.output_tuple_id) {
_output_row_descriptor = std::make_unique<RowDescriptor>(
descs, std::vector {tnode.output_tuple_id}, std::vector {true});
}
if (!tnode.intermediate_output_tuple_id_list.empty()) {
DCHECK(tnode.__isset.output_tuple_id) << " no final output tuple id";
// common subexpression elimination
DCHECK_EQ(tnode.intermediate_output_tuple_id_list.size(),
tnode.intermediate_projections_list.size());
_intermediate_output_row_descriptor.reserve(
tnode.intermediate_output_tuple_id_list.size());
for (auto output_tuple_id : tnode.intermediate_output_tuple_id_list) {
_intermediate_output_row_descriptor.push_back(
RowDescriptor(descs, std::vector {output_tuple_id}, std::vector {true}));
}
}
}
OperatorXBase(ObjectPool* pool, int node_id, int operator_id)
: OperatorBase(nullptr),
_operator_id(operator_id),
_node_id(node_id),
_pool(pool),
_limit(-1) {}
virtual Status init(const TPlanNode& tnode, RuntimeState* state);
Status init(const TDataSink& tsink) override {
LOG(FATAL) << "should not reach here!";
return Status::OK();
}
virtual Status init(ExchangeType type) {
LOG(FATAL) << "should not reach here!";
return Status::OK();
}
[[nodiscard]] RuntimeProfile* get_runtime_profile() const override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"Runtime Profile is not owned by operator");
return nullptr;
}
[[noreturn]] virtual const std::vector<TRuntimeFilterDesc>& runtime_filter_descs() {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, _op_name);
}
[[nodiscard]] std::string get_name() const override { return _op_name; }
[[nodiscard]] virtual DataDistribution required_data_distribution() const {
return _child_x && _child_x->ignore_data_distribution() && !is_source()
? DataDistribution(ExchangeType::PASSTHROUGH)
: DataDistribution(ExchangeType::NOOP);
}
[[nodiscard]] virtual bool need_data_from_children(RuntimeState* state) const {
return is_source() ? true : _child_x == nullptr || _child_x->need_data_from_children(state);
}
[[nodiscard]] virtual bool ignore_data_distribution() const {
return _child_x ? _child_x->ignore_data_distribution() : _ignore_data_distribution;
}
[[nodiscard]] bool ignore_data_hash_distribution() const {
return _child_x ? _child_x->ignore_data_hash_distribution() : _ignore_data_distribution;
}
void set_ignore_data_distribution() { _ignore_data_distribution = true; }
Status prepare(RuntimeState* state) override;
Status open(RuntimeState* state) override;
Status get_block(RuntimeState* state, vectorized::Block* block,
SourceState& source_state) override {
LOG(FATAL) << "should not be called in pipelineX";
return Status::OK();
}
[[nodiscard]] virtual Status get_block(RuntimeState* state, vectorized::Block* block,
bool* eos) = 0;
[[nodiscard]] bool can_terminate_early() override { return false; }
[[nodiscard]] virtual bool can_terminate_early(RuntimeState* state) { return false; }
[[nodiscard]] virtual bool is_shuffled_hash_join() const { return false; }
bool can_read() override {
LOG(FATAL) << "should not reach here!";
return false;
}
bool can_write() override {
LOG(FATAL) << "should not reach here!";
return false;
}
[[nodiscard]] bool is_pending_finish() const override {
LOG(FATAL) << "should not reach here!";
return false;
}
Status sink(RuntimeState* state, vectorized::Block* block, SourceState source_state) override {
LOG(FATAL) << "should not reach here!";
return Status::OK();
}
bool runtime_filters_are_ready_or_timeout() override {
LOG(FATAL) << "should not reach here!";
return true;
}
Status close(RuntimeState* state) override;
[[nodiscard]] virtual const RowDescriptor& intermediate_row_desc() const {
return _row_descriptor;
}
// input expr -> intermediate_projections[0] -> intermediate_projections[1] -> intermediate_projections[2] ... -> final projections -> output expr
// prepare _row_descriptor intermediate_row_desc[0] intermediate_row_desc[1] intermediate_row_desc.end() _output_row_descriptor
[[nodiscard]] const RowDescriptor& intermediate_row_desc(int idx) {
if (idx == 0) {
return intermediate_row_desc();
}
DCHECK((idx - 1) < _intermediate_output_row_descriptor.size());
return _intermediate_output_row_descriptor[idx - 1];
}
[[nodiscard]] const RowDescriptor& projections_row_desc() const {
if (_intermediate_output_row_descriptor.empty()) {
return intermediate_row_desc();
} else {
return _intermediate_output_row_descriptor.back();
}
}
[[nodiscard]] std::string debug_string() const override { return ""; }
virtual std::string debug_string(int indentation_level = 0) const;
virtual std::string debug_string(RuntimeState* state, int indentation_level = 0) const;
virtual Status setup_local_state(RuntimeState* state, LocalStateInfo& info) = 0;
template <class TARGET>
TARGET& cast() {
DCHECK(dynamic_cast<TARGET*>(this))
<< " Mismatch type! Current type is " << typeid(*this).name()
<< " and expect type is" << typeid(TARGET).name();
return reinterpret_cast<TARGET&>(*this);
}
template <class TARGET>
const TARGET& cast() const {
DCHECK(dynamic_cast<const TARGET*>(this))
<< " Mismatch type! Current type is " << typeid(*this).name()
<< " and expect type is" << typeid(TARGET).name();
return reinterpret_cast<const TARGET&>(*this);
}
[[nodiscard]] OperatorXPtr get_child() { return _child_x; }
[[nodiscard]] vectorized::VExprContextSPtrs& conjuncts() { return _conjuncts; }
[[nodiscard]] virtual RowDescriptor& row_descriptor() { return _row_descriptor; }
[[nodiscard]] int id() const override { return node_id(); }
[[nodiscard]] int operator_id() const { return _operator_id; }
[[nodiscard]] int node_id() const { return _node_id; }
[[nodiscard]] int64_t limit() const { return _limit; }
[[nodiscard]] const RowDescriptor& row_desc() const override {
return _output_row_descriptor ? *_output_row_descriptor : _row_descriptor;
}
[[nodiscard]] const RowDescriptor* output_row_descriptor() {
return _output_row_descriptor.get();
}
bool has_output_row_desc() const { return _output_row_descriptor != nullptr; }
[[nodiscard]] bool is_source() const override { return false; }
[[nodiscard]] virtual Status get_block_after_projects(RuntimeState* state,
vectorized::Block* block, bool* eos);
/// Only use in vectorized exec engine try to do projections to trans _row_desc -> _output_row_desc
Status do_projections(RuntimeState* state, vectorized::Block* origin_block,
vectorized::Block* output_block) const;
void set_parallel_tasks(int parallel_tasks) { _parallel_tasks = parallel_tasks; }
int parallel_tasks() const { return _parallel_tasks; }
protected:
template <typename Dependency>
friend class PipelineXLocalState;
friend class PipelineXLocalStateBase;
friend class VScanner;
const int _operator_id;
const int _node_id; // unique w/in single plan tree
TPlanNodeType::type _type;
ObjectPool* _pool = nullptr;
std::vector<TupleId> _tuple_ids;
vectorized::VExprContextSPtrs _conjuncts;
RowDescriptor _row_descriptor;
std::unique_ptr<RowDescriptor> _output_row_descriptor = nullptr;
vectorized::VExprContextSPtrs _projections;
std::vector<RowDescriptor> _intermediate_output_row_descriptor;
// Used in common subexpression elimination to compute intermediate results.
std::vector<vectorized::VExprContextSPtrs> _intermediate_projections;
/// Resource information sent from the frontend.
const TBackendResourceProfile _resource_profile;
int64_t _limit; // -1: no limit
std::string _op_name;
bool _ignore_data_distribution = false;
int _parallel_tasks = 0;
//_keep_origin is used to avoid copying during projection,
// currently set to false only in the nestloop join.
bool _keep_origin = true;
};
template <typename LocalStateType>
class OperatorX : public OperatorXBase {
public:
OperatorX(ObjectPool* pool, const TPlanNode& tnode, const int operator_id,
const DescriptorTbl& descs)
: OperatorXBase(pool, tnode, operator_id, descs) {}
OperatorX(ObjectPool* pool, int node_id, int operator_id)
: OperatorXBase(pool, node_id, operator_id) {};
~OperatorX() override = default;
Status setup_local_state(RuntimeState* state, LocalStateInfo& info) override;
using LocalState = LocalStateType;
[[nodiscard]] LocalState& get_local_state(RuntimeState* state) const {
return state->get_local_state(operator_id())->template cast<LocalState>();
}
};
template <typename SharedStateArg = FakeSharedState>
class PipelineXLocalState : public PipelineXLocalStateBase {
public:
using SharedStateType = SharedStateArg;
PipelineXLocalState(RuntimeState* state, OperatorXBase* parent)
: PipelineXLocalStateBase(state, parent) {}
~PipelineXLocalState() override = default;
Status init(RuntimeState* state, LocalStateInfo& info) override;
Status open(RuntimeState* state) override;
virtual std::string name_suffix() const {
return " (id=" + std::to_string(_parent->node_id()) + ")";
}
Status close(RuntimeState* state) override;
[[nodiscard]] std::string debug_string(int indentation_level = 0) const override;
std::vector<Dependency*> dependencies() const override {
return _dependency ? std::vector<Dependency*> {_dependency} : std::vector<Dependency*> {};
}
void inc_running_big_mem_op_num(RuntimeState* state) {
if (!_big_mem_op_num_added) {
state->get_query_ctx()->inc_running_big_mem_op_num();
_big_mem_op_num_added = true;
}
}
void dec_running_big_mem_op_num(RuntimeState* state) {
if (_big_mem_op_num_added && !_big_mem_op_num_deced) {
state->get_query_ctx()->dec_running_big_mem_op_num();
_big_mem_op_num_deced = true;
}
}
protected:
Dependency* _dependency = nullptr;
SharedStateArg* _shared_state = nullptr;
private:
bool _big_mem_op_num_added = false;
bool _big_mem_op_num_deced = false;
};
template <typename SharedStateArg>
class PipelineXSpillLocalState : public PipelineXLocalState<SharedStateArg> {
public:
using Base = PipelineXLocalState<SharedStateArg>;
PipelineXSpillLocalState(RuntimeState* state, OperatorXBase* parent)
: PipelineXLocalState<SharedStateArg>(state, parent) {}
~PipelineXSpillLocalState() override = default;
Status init(RuntimeState* state, LocalStateInfo& info) override {
RETURN_IF_ERROR(PipelineXLocalState<SharedStateArg>::init(state, info));
_spill_counters = ADD_LABEL_COUNTER_WITH_LEVEL(Base::profile(), "Spill", 1);
_spill_recover_time =
ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillRecoverTime", "Spill", 1);
_spill_read_data_time =
ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillReadDataTime", "Spill", 1);
_spill_deserialize_time =
ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillDeserializeTime", "Spill", 1);
_spill_read_bytes = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "SpillReadDataSize",
TUnit::BYTES, "Spill", 1);
_spill_wait_in_queue_timer =
ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillWaitInQueueTime", "Spill", 1);
_spill_write_wait_io_timer =
ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteWaitIOTime", "Spill", 1);
_spill_read_wait_io_timer =
ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillReadWaitIOTime", "Spill", 1);
return Status::OK();
}
RuntimeProfile::Counter* _spill_counters = nullptr;
RuntimeProfile::Counter* _spill_recover_time;
RuntimeProfile::Counter* _spill_read_data_time;
RuntimeProfile::Counter* _spill_deserialize_time;
RuntimeProfile::Counter* _spill_read_bytes;
RuntimeProfile::Counter* _spill_write_wait_io_timer = nullptr;
RuntimeProfile::Counter* _spill_read_wait_io_timer = nullptr;
RuntimeProfile::Counter* _spill_wait_in_queue_timer = nullptr;
};
class DataSinkOperatorXBase;
class PipelineXSinkLocalStateBase {
public:
PipelineXSinkLocalStateBase(DataSinkOperatorXBase* parent_, RuntimeState* state_);
virtual ~PipelineXSinkLocalStateBase() = default;
// Do initialization. This step should be executed only once and in bthread, so we can do some
// lightweight or non-idempotent operations (e.g. init profile, clone expr ctx from operatorX)
virtual Status init(RuntimeState* state, LocalSinkStateInfo& info) = 0;
// Do initialization. This step can be executed multiple times, so we should make sure it is
// idempotent (e.g. wait for runtime filters).
virtual Status open(RuntimeState* state) = 0;
virtual Status close(RuntimeState* state, Status exec_status) = 0;
[[nodiscard]] virtual std::string debug_string(int indentation_level) const = 0;
template <class TARGET>
TARGET& cast() {
DCHECK(dynamic_cast<TARGET*>(this))
<< " Mismatch type! Current type is " << typeid(*this).name()
<< " and expect type is" << typeid(TARGET).name();
return reinterpret_cast<TARGET&>(*this);
}
template <class TARGET>
const TARGET& cast() const {
DCHECK(dynamic_cast<const TARGET*>(this))
<< " Mismatch type! Current type is " << typeid(*this).name()
<< " and expect type is" << typeid(TARGET).name();
return reinterpret_cast<const TARGET&>(*this);
}
DataSinkOperatorXBase* parent() { return _parent; }
RuntimeState* state() { return _state; }
RuntimeProfile* profile() { return _profile; }
MemTracker* mem_tracker() { return _mem_tracker.get(); }
[[nodiscard]] RuntimeProfile* faker_runtime_profile() const {
return _faker_runtime_profile.get();
}
RuntimeProfile::Counter* rows_input_counter() { return _rows_input_counter; }
RuntimeProfile::Counter* exec_time_counter() { return _exec_timer; }
virtual std::vector<Dependency*> dependencies() const { return {nullptr}; }
// override in exchange sink , AsyncWriterSink
virtual Dependency* finishdependency() { return nullptr; }
std::shared_ptr<QueryStatistics> get_query_statistics_ptr() { return _query_statistics; }
protected:
DataSinkOperatorXBase* _parent = nullptr;
RuntimeState* _state = nullptr;
RuntimeProfile* _profile = nullptr;
std::unique_ptr<MemTracker> _mem_tracker;
// Set to true after close() has been called. subclasses should check and set this in
// close().
bool _closed = false;
//NOTICE: now add a faker profile, because sometimes the profile record is useless
//so we want remove some counters and timers, eg: in join node, if it's broadcast_join
//and shared hash table, some counter/timer about build hash table is useless,
//so we could add those counter/timer in faker profile, and those will not display in web profile.
std::unique_ptr<RuntimeProfile> _faker_runtime_profile =
std::make_unique<RuntimeProfile>("faker profile");
RuntimeProfile::Counter* _rows_input_counter = nullptr;
RuntimeProfile::Counter* _init_timer = nullptr;
RuntimeProfile::Counter* _open_timer = nullptr;
RuntimeProfile::Counter* _close_timer = nullptr;
RuntimeProfile::Counter* _wait_for_dependency_timer = nullptr;
RuntimeProfile::Counter* _wait_for_finish_dependency_timer = nullptr;
RuntimeProfile::Counter* _exec_timer = nullptr;
RuntimeProfile::Counter* _memory_used_counter = nullptr;
RuntimeProfile::Counter* _peak_memory_usage_counter = nullptr;
std::shared_ptr<QueryStatistics> _query_statistics = nullptr;
};
class DataSinkOperatorXBase : public OperatorBase {
public:
DataSinkOperatorXBase(const int operator_id, const int node_id)
: OperatorBase(nullptr), _operator_id(operator_id), _node_id(node_id), _dests_id({1}) {}
DataSinkOperatorXBase(const int operator_id, const int node_id, const int dest_id)
: OperatorBase(nullptr),
_operator_id(operator_id),
_node_id(node_id),
_dests_id({dest_id}) {}
DataSinkOperatorXBase(const int operator_id, const int node_id, std::vector<int>& sources)
: OperatorBase(nullptr),
_operator_id(operator_id),
_node_id(node_id),
_dests_id(sources) {}
~DataSinkOperatorXBase() override = default;
// For agg/sort/join sink.
virtual Status init(const TPlanNode& tnode, RuntimeState* state);
Status init(const TDataSink& tsink) override;
[[nodiscard]] virtual Status init(ExchangeType type, const int num_buckets,
const bool is_shuffled_hash_join,
const std::map<int, int>& shuffle_idx_to_instance_idx) {
return Status::InternalError("init() is only implemented in local exchange!");
}
Status prepare(RuntimeState* state) override { return Status::OK(); }
Status open(RuntimeState* state) override { return Status::OK(); }
Status sink(RuntimeState* state, vectorized::Block* block, SourceState source_state) override {
LOG(FATAL) << "should not reach here!";
return Status::OK();
}
[[nodiscard]] virtual Status sink(RuntimeState* state, vectorized::Block* block, bool eos) = 0;
[[nodiscard]] virtual Status setup_local_state(RuntimeState* state,
LocalSinkStateInfo& info) = 0;
template <class TARGET>
TARGET& cast() {
DCHECK(dynamic_cast<TARGET*>(this))
<< " Mismatch type! Current type is " << typeid(*this).name()
<< " and expect type is" << typeid(TARGET).name();
return reinterpret_cast<TARGET&>(*this);
}
template <class TARGET>
const TARGET& cast() const {
DCHECK(dynamic_cast<const TARGET*>(this))
<< " Mismatch type! Current type is " << typeid(*this).name()
<< " and expect type is" << typeid(TARGET).name();
return reinterpret_cast<const TARGET&>(*this);
}
[[nodiscard]] virtual std::shared_ptr<BasicSharedState> create_shared_state() const = 0;
[[nodiscard]] virtual DataDistribution required_data_distribution() const {
return _child_x && _child_x->ignore_data_distribution()
? DataDistribution(ExchangeType::PASSTHROUGH)
: DataDistribution(ExchangeType::NOOP);
}
[[nodiscard]] virtual bool is_shuffled_hash_join() const { return false; }
Status close(RuntimeState* state) override {
return Status::InternalError("Should not reach here!");
}
bool can_read() override {
LOG(FATAL) << "should not reach here!";
return false;
}
bool can_write() override {
LOG(FATAL) << "should not reach here!";
return false;
}
[[nodiscard]] bool is_pending_finish() const override {
LOG(FATAL) << "should not reach here!";
return false;
}
[[nodiscard]] std::string debug_string() const override { return ""; }
[[nodiscard]] virtual std::string debug_string(int indentation_level) const;
[[nodiscard]] virtual std::string debug_string(RuntimeState* state,
int indentation_level) const;
[[nodiscard]] bool is_sink() const override { return true; }
[[nodiscard]] bool is_source() const override { return false; }
static Status close(RuntimeState* state, Status exec_status) {
auto result = state->get_sink_local_state_result();
if (!result) {
return result.error();
}
return result.value()->close(state, exec_status);
}
[[nodiscard]] RuntimeProfile* get_runtime_profile() const override {
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR,
"Runtime Profile is not owned by operator");
return nullptr;
}
[[nodiscard]] int id() const override { return node_id(); }
[[nodiscard]] int operator_id() const { return _operator_id; }
[[nodiscard]] const std::vector<int>& dests_id() const { return _dests_id; }
void set_dests_id(const std::vector<int>& dest_id) { _dests_id = dest_id; }
[[nodiscard]] int node_id() const { return _node_id; }
[[nodiscard]] std::string get_name() const override { return _name; }
virtual bool should_dry_run(RuntimeState* state) { return false; }
protected:
template <typename Writer, typename Parent>
requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>)
friend class AsyncWriterSink;
// _operator_id : the current Operator's ID, which is not visible to the user.
// _node_id : the plan node ID corresponding to the Operator, which is visible on the profile.
// _dests_id : the target _operator_id of the sink, for example, in the case of a multi-sink, there are multiple targets.
const int _operator_id;
const int _node_id;
std::vector<int> _dests_id;
std::string _name;
// Maybe this will be transferred to BufferControlBlock.
std::shared_ptr<QueryStatistics> _query_statistics;
};
template <typename LocalStateType>
class DataSinkOperatorX : public DataSinkOperatorXBase {
public:
DataSinkOperatorX(int operator_id, const int node_id)
: DataSinkOperatorXBase(operator_id, node_id) {}
DataSinkOperatorX(const int id, const int node_id, const int source_id)
: DataSinkOperatorXBase(id, node_id, source_id) {}
DataSinkOperatorX(const int id, const int node_id, std::vector<int> sources)
: DataSinkOperatorXBase(id, node_id, sources) {}
~DataSinkOperatorX() override = default;
Status setup_local_state(RuntimeState* state, LocalSinkStateInfo& info) override;
std::shared_ptr<BasicSharedState> create_shared_state() const override;
using LocalState = LocalStateType;
[[nodiscard]] LocalState& get_local_state(RuntimeState* state) const {
return state->get_sink_local_state()->template cast<LocalState>();
}
};
template <typename SharedStateArg = FakeSharedState>
class PipelineXSinkLocalState : public PipelineXSinkLocalStateBase {
public:
using SharedStateType = SharedStateArg;
PipelineXSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
: PipelineXSinkLocalStateBase(parent, state) {}
~PipelineXSinkLocalState() override = default;
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status open(RuntimeState* state) override { return Status::OK(); }
Status close(RuntimeState* state, Status exec_status) override;
[[nodiscard]] std::string debug_string(int indentation_level) const override;
virtual std::string name_suffix() { return " (id=" + std::to_string(_parent->node_id()) + ")"; }
std::vector<Dependency*> dependencies() const override {
return _dependency ? std::vector<Dependency*> {_dependency} : std::vector<Dependency*> {};
}
void inc_running_big_mem_op_num(RuntimeState* state) {
if (!_big_mem_op_num_added) {
state->get_query_ctx()->inc_running_big_mem_op_num();
_big_mem_op_num_added = true;
}
}
void dec_running_big_mem_op_num(RuntimeState* state) {
if (_big_mem_op_num_added && !_big_mem_op_num_deced) {
state->get_query_ctx()->dec_running_big_mem_op_num();
_big_mem_op_num_deced = true;
}
}
protected:
Dependency* _dependency = nullptr;
SharedStateType* _shared_state = nullptr;
private:
bool _big_mem_op_num_added = false;
bool _big_mem_op_num_deced = false;
};
template <typename SharedStateArg>
class PipelineXSpillSinkLocalState : public PipelineXSinkLocalState<SharedStateArg> {
public:
using Base = PipelineXSinkLocalState<SharedStateArg>;
PipelineXSpillSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state)
: Base(parent, state) {}
~PipelineXSpillSinkLocalState() override = default;
Status init(RuntimeState* state, LocalSinkStateInfo& info) override {
RETURN_IF_ERROR(Base::init(state, info));
_spill_counters = ADD_LABEL_COUNTER_WITH_LEVEL(Base::profile(), "Spill", 1);
_spill_timer = ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillTime", "Spill", 1);
_spill_serialize_block_timer =
ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillSerializeBlockTime", "Spill", 1);
_spill_write_disk_timer =
ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteDiskTime", "Spill", 1);
_spill_data_size = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteDataSize",
TUnit::BYTES, "Spill", 1);
_spill_block_count = ADD_CHILD_COUNTER_WITH_LEVEL(Base::profile(), "SpillWriteBlockCount",
TUnit::UNIT, "Spill", 1);
_spill_wait_in_queue_timer =
ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillWaitInQueueTime", "Spill", 1);
_spill_write_wait_io_timer =
ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillWriteWaitIOTime", "Spill", 1);
_spill_read_wait_io_timer =
ADD_CHILD_TIMER_WITH_LEVEL(Base::profile(), "SpillReadWaitIOTime", "Spill", 1);
return Status::OK();
}
RuntimeProfile::Counter* _spill_counters = nullptr;
RuntimeProfile::Counter* _spill_timer = nullptr;
RuntimeProfile::Counter* _spill_serialize_block_timer = nullptr;
RuntimeProfile::Counter* _spill_write_disk_timer = nullptr;
RuntimeProfile::Counter* _spill_data_size = nullptr;
RuntimeProfile::Counter* _spill_block_count = nullptr;
RuntimeProfile::Counter* _spill_wait_in_queue_timer = nullptr;
RuntimeProfile::Counter* _spill_write_wait_io_timer = nullptr;
RuntimeProfile::Counter* _spill_read_wait_io_timer = nullptr;
};
/**
* StreamingOperatorX indicates operators which always processes block in streaming way (one-in-one-out).
*/
template <typename LocalStateType>
class StreamingOperatorX : public OperatorX<LocalStateType> {
public:
StreamingOperatorX(ObjectPool* pool, const TPlanNode& tnode, int operator_id,
const DescriptorTbl& descs)
: OperatorX<LocalStateType>(pool, tnode, operator_id, descs) {}
virtual ~StreamingOperatorX() = default;
Status get_block(RuntimeState* state, vectorized::Block* block, bool* eos) override;
virtual Status pull(RuntimeState* state, vectorized::Block* block, bool* eos) = 0;
};
/**
* StatefulOperatorX indicates the operators with some states inside.
*
* Specifically, we called an operator stateful if an operator can determine its output by itself.
* For example, hash join probe operator is a typical StatefulOperator. When it gets a block from probe side, it will hold this block inside (e.g. _child_block).
* If there are still remain rows in probe block, we can get output block by calling `get_block` without any data from its child.
* In a nutshell, it is a one-to-many relation between input blocks and output blocks for StatefulOperator.
*/
template <typename LocalStateType>
class StatefulOperatorX : public OperatorX<LocalStateType> {
public:
StatefulOperatorX(ObjectPool* pool, const TPlanNode& tnode, const int operator_id,
const DescriptorTbl& descs)
: OperatorX<LocalStateType>(pool, tnode, operator_id, descs) {}
virtual ~StatefulOperatorX() = default;
using OperatorX<LocalStateType>::get_local_state;
[[nodiscard]] Status get_block(RuntimeState* state, vectorized::Block* block,
bool* eos) override;
[[nodiscard]] virtual Status pull(RuntimeState* state, vectorized::Block* block,
bool* eos) const = 0;
[[nodiscard]] virtual Status push(RuntimeState* state, vectorized::Block* input_block,
bool eos) const = 0;
[[nodiscard]] virtual bool need_more_input_data(RuntimeState* state) const = 0;
bool need_data_from_children(RuntimeState* state) const override {
if (need_more_input_data(state)) {
return OperatorX<LocalStateType>::_child_x->need_data_from_children(state);
} else {
return false;
}
}
};
template <typename Writer, typename Parent>
requires(std::is_base_of_v<vectorized::AsyncResultWriter, Writer>)
class AsyncWriterSink : public PipelineXSinkLocalState<FakeSharedState> {
public:
using Base = PipelineXSinkLocalState<FakeSharedState>;
AsyncWriterSink(DataSinkOperatorXBase* parent, RuntimeState* state)
: Base(parent, state), _async_writer_dependency(nullptr) {
_finish_dependency = std::make_shared<FinishDependency>(
parent->operator_id(), parent->node_id(), parent->get_name() + "_FINISH_DEPENDENCY",
state->get_query_ctx());
}
Status init(RuntimeState* state, LocalSinkStateInfo& info) override;
Status open(RuntimeState* state) override;
Status sink(RuntimeState* state, vectorized::Block* block, bool eos);
std::vector<Dependency*> dependencies() const override {
return {_async_writer_dependency.get()};
}
Status close(RuntimeState* state, Status exec_status) override;
Dependency* finishdependency() override { return _finish_dependency.get(); }
protected:
vectorized::VExprContextSPtrs _output_vexpr_ctxs;
std::unique_ptr<Writer> _writer;
std::shared_ptr<AsyncWriterDependency> _async_writer_dependency;
std::shared_ptr<Dependency> _finish_dependency;
};
} // namespace doris::pipeline