| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #pragma once |
| |
| #include <stdint.h> |
| |
| #include <memory> |
| |
| #include "common/status.h" |
| #include "operator.h" |
| #include "vec/core/block.h" |
| #include "vec/core/column_with_type_and_name.h" |
| |
| namespace doris { |
| #include "common/compile_check_begin.h" |
| class RuntimeState; |
| |
| namespace pipeline { |
| class DataQueue; |
| |
| class UnionSinkOperatorX; |
| class UnionSinkLocalState final : public PipelineXSinkLocalState<UnionSharedState> { |
| public: |
| ENABLE_FACTORY_CREATOR(UnionSinkLocalState); |
| UnionSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) |
| : Base(parent, state), _child_row_idx(0) {} |
| Status init(RuntimeState* state, LocalSinkStateInfo& info) override; |
| Status open(RuntimeState* state) override; |
| friend class UnionSinkOperatorX; |
| using Base = PipelineXSinkLocalState<UnionSharedState>; |
| using Parent = UnionSinkOperatorX; |
| |
| private: |
| std::unique_ptr<vectorized::Block> _output_block; |
| |
| /// Const exprs materialized by this node. These exprs don't refer to any children. |
| /// Only materialized by the first fragment instance to avoid duplication. |
| vectorized::VExprContextSPtrs _const_expr; |
| |
| /// Exprs materialized by this node. The i-th result expr list refers to the i-th child. |
| vectorized::VExprContextSPtrs _child_expr; |
| |
| /// Index of current row in child_row_block_. |
| int _child_row_idx; |
| RuntimeProfile::Counter* _expr_timer = nullptr; |
| }; |
| |
| class UnionSinkOperatorX MOCK_REMOVE(final) : public DataSinkOperatorX<UnionSinkLocalState> { |
| public: |
| using Base = DataSinkOperatorX<UnionSinkLocalState>; |
| |
| friend class UnionSinkLocalState; |
| UnionSinkOperatorX(int child_id, int sink_id, int dest_id, ObjectPool* pool, |
| const TPlanNode& tnode, const DescriptorTbl& descs); |
| #ifdef BE_TEST |
| UnionSinkOperatorX(int child_size, int cur_child_id, int first_materialized_child_idx) |
| : _first_materialized_child_idx(first_materialized_child_idx), |
| _cur_child_id(cur_child_id), |
| _child_size(child_size) {} |
| #endif |
| ~UnionSinkOperatorX() override = default; |
| Status init(const TDataSink& tsink) override { |
| return Status::InternalError("{} should not init with TDataSink", |
| DataSinkOperatorX<UnionSinkLocalState>::_name); |
| } |
| |
| MOCK_FUNCTION const RowDescriptor& row_descriptor() { return _row_descriptor; } |
| |
| Status init(const TPlanNode& tnode, RuntimeState* state) override; |
| |
| Status prepare(RuntimeState* state) override; |
| |
| Status sink(RuntimeState* state, vectorized::Block* in_block, bool eos) override; |
| |
| std::shared_ptr<BasicSharedState> create_shared_state() const override { |
| if (_cur_child_id > 0) { |
| return nullptr; |
| } else { |
| std::shared_ptr<BasicSharedState> ss = std::make_shared<UnionSharedState>(_child_size); |
| ss->id = operator_id(); |
| for (auto& dest : dests_id()) { |
| ss->related_op_ids.insert(dest); |
| } |
| return ss; |
| } |
| } |
| |
| bool require_shuffled_data_distribution(RuntimeState* /*state*/) const override { |
| return _followed_by_shuffled_operator; |
| } |
| |
| DataDistribution required_data_distribution(RuntimeState* /*state*/) const override { |
| if (_child->is_serial_operator() && _followed_by_shuffled_operator) { |
| return DataDistribution(ExchangeType::HASH_SHUFFLE, _distribute_exprs); |
| } |
| if (_child->is_serial_operator()) { |
| return DataDistribution(ExchangeType::PASSTHROUGH); |
| } |
| return DataDistribution(ExchangeType::NOOP); |
| } |
| |
| void set_low_memory_mode(RuntimeState* state) override { |
| auto& local_state = get_local_state(state); |
| local_state._shared_state->data_queue.set_low_memory_mode(); |
| } |
| |
| bool is_shuffled_operator() const override { return _followed_by_shuffled_operator; } |
| |
| private: |
| int _get_first_materialized_child_idx() const { return _first_materialized_child_idx; } |
| |
| /// Const exprs materialized by this node. These exprs don't refer to any children. |
| /// Only materialized by the first fragment instance to avoid duplication. |
| vectorized::VExprContextSPtrs _const_expr; |
| |
| /// Exprs materialized by this node. The i-th result expr list refers to the i-th child. |
| vectorized::VExprContextSPtrs _child_expr; |
| /// Index of the first non-passthrough child; i.e. a child that needs materialization. |
| /// 0 when all children are materialized, '_children.size()' when no children are |
| /// materialized. |
| const int _first_materialized_child_idx; |
| |
| const RowDescriptor _row_descriptor; |
| const int _cur_child_id; |
| const int _child_size; |
| const std::vector<TExpr> _distribute_exprs; |
| int children_count() const { return _child_size; } |
| bool is_child_passthrough(int child_idx) const { |
| DCHECK_LT(child_idx, _child_size); |
| return child_idx < _first_materialized_child_idx; |
| } |
| Status materialize_child_block(RuntimeState* state, int child_id, |
| vectorized::Block* input_block, |
| vectorized::Block* output_block) { |
| DCHECK_LT(child_id, _child_size); |
| DCHECK(!is_child_passthrough(child_id)); |
| if (input_block->rows() > 0) { |
| vectorized::MutableBlock mblock = |
| vectorized::VectorizedUtils::build_mutable_mem_reuse_block(output_block, |
| row_descriptor()); |
| vectorized::Block res; |
| RETURN_IF_ERROR(materialize_block(state, input_block, child_id, &res)); |
| RETURN_IF_ERROR(mblock.merge(res)); |
| } |
| return Status::OK(); |
| } |
| |
| Status materialize_block(RuntimeState* state, vectorized::Block* src_block, int child_idx, |
| vectorized::Block* res_block) { |
| auto& local_state = get_local_state(state); |
| SCOPED_TIMER(local_state._expr_timer); |
| const auto& child_exprs = local_state._child_expr; |
| vectorized::ColumnsWithTypeAndName colunms; |
| for (size_t i = 0; i < child_exprs.size(); ++i) { |
| vectorized::ColumnWithTypeAndName result_data; |
| RETURN_IF_ERROR(child_exprs[i]->execute(src_block, result_data)); |
| colunms.emplace_back(result_data); |
| } |
| local_state._child_row_idx += src_block->rows(); |
| *res_block = {colunms}; |
| return Status::OK(); |
| } |
| }; |
| |
| } // namespace pipeline |
| #include "common/compile_check_end.h" |
| } // namespace doris |