| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #pragma once |
| |
| #include <stdint.h> |
| |
| #include "exec/operator/operator.h" |
| #include "runtime/exec_env.h" |
| #include "runtime/runtime_profile.h" |
| |
| namespace doris { |
| #include "common/compile_check_begin.h" |
| class AggSinkOperatorX; |
| |
| class AggSinkLocalState : public PipelineXSinkLocalState<AggSharedState> { |
| public: |
| ENABLE_FACTORY_CREATOR(AggSinkLocalState); |
| using Base = PipelineXSinkLocalState<AggSharedState>; |
| AggSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state); |
| ~AggSinkLocalState() override = default; |
| |
| Status init(RuntimeState* state, LocalSinkStateInfo& info) override; |
| Status open(RuntimeState* state) override; |
| Status close(RuntimeState* state, Status exec_status) override; |
| bool is_blockable() const override; |
| size_t get_hash_table_size() const; |
| |
| protected: |
| friend class AggSinkOperatorX; |
| |
| struct ExecutorBase { |
| virtual Status execute(AggSinkLocalState* local_state, Block* block) = 0; |
| virtual void update_memusage(AggSinkLocalState* local_state) = 0; |
| virtual ~ExecutorBase() = default; |
| }; |
| template <bool WithoutKey, bool NeedToMerge> |
| struct Executor final : public ExecutorBase { |
| Status execute(AggSinkLocalState* local_state, Block* block) override { |
| if constexpr (WithoutKey) { |
| if constexpr (NeedToMerge) { |
| return local_state->_merge_without_key(block); |
| } else { |
| return local_state->_execute_without_key(block); |
| } |
| } else { |
| if constexpr (NeedToMerge) { |
| return local_state->_merge_with_serialized_key(block); |
| } else { |
| return local_state->_execute_with_serialized_key(block); |
| } |
| } |
| } |
| void update_memusage(AggSinkLocalState* local_state) override { |
| if constexpr (WithoutKey) { |
| local_state->_update_memusage_without_key(); |
| } else { |
| local_state->_update_memusage_with_serialized_key(); |
| } |
| } |
| }; |
| |
| Status _execute_without_key(Block* block); |
| Status _merge_without_key(Block* block); |
| void _update_memusage_without_key(); |
| Status _init_hash_method(const VExprContextSPtrs& probe_exprs); |
| Status _execute_with_serialized_key(Block* block); |
| Status _merge_with_serialized_key(Block* block); |
| void _update_memusage_with_serialized_key(); |
| template <bool limit> |
| |
| Status _execute_with_serialized_key_helper(Block* block); |
| void _find_in_hash_table(AggregateDataPtr* places, ColumnRawPtrs& key_columns, |
| uint32_t num_rows); |
| void _emplace_into_hash_table(AggregateDataPtr* places, ColumnRawPtrs& key_columns, |
| uint32_t num_rows); |
| |
| void _emplace_into_hash_table_inline_count(ColumnRawPtrs& key_columns, uint32_t num_rows); |
| void _merge_into_hash_table_inline_count(ColumnRawPtrs& key_columns, |
| const IColumn* merge_column, uint32_t num_rows); |
| bool _emplace_into_hash_table_limit(AggregateDataPtr* places, Block* block, |
| const std::vector<int>& key_locs, |
| ColumnRawPtrs& key_columns, uint32_t num_rows); |
| |
| template <bool limit, bool for_spill = false> |
| Status _merge_with_serialized_key_helper(Block* block); |
| |
| Status _destroy_agg_status(AggregateDataPtr data); |
| Status _create_agg_status(AggregateDataPtr data); |
| size_t _memory_usage() const; |
| |
| size_t get_reserve_mem_size(RuntimeState* state, bool eos) const; |
| |
| RuntimeProfile::Counter* _hash_table_compute_timer = nullptr; |
| RuntimeProfile::Counter* _hash_table_emplace_timer = nullptr; |
| RuntimeProfile::Counter* _hash_table_limit_compute_timer = nullptr; |
| RuntimeProfile::Counter* _hash_table_input_counter = nullptr; |
| RuntimeProfile::Counter* _build_timer = nullptr; |
| RuntimeProfile::Counter* _expr_timer = nullptr; |
| RuntimeProfile::Counter* _merge_timer = nullptr; |
| RuntimeProfile::Counter* _deserialize_data_timer = nullptr; |
| RuntimeProfile::Counter* _hash_table_memory_usage = nullptr; |
| RuntimeProfile::Counter* _hash_table_size_counter = nullptr; |
| RuntimeProfile::Counter* _serialize_key_arena_memory_usage = nullptr; |
| RuntimeProfile::Counter* _memory_usage_container = nullptr; |
| RuntimeProfile::Counter* _memory_usage_arena = nullptr; |
| |
| bool _should_limit_output = false; |
| |
| PODArray<AggregateDataPtr> _places; |
| std::vector<char> _deserialize_buffer; |
| |
| Block _preagg_block; |
| |
| AggregatedDataVariants* _agg_data = nullptr; |
| |
| std::unique_ptr<ExecutorBase> _executor = nullptr; |
| |
| int64_t _memory_usage_last_executing = 0; |
| }; |
| |
| class AggSinkOperatorX MOCK_REMOVE(final) : public DataSinkOperatorX<AggSinkLocalState> { |
| public: |
| AggSinkOperatorX(ObjectPool* pool, int operator_id, int dest_id, const TPlanNode& tnode, |
| const DescriptorTbl& descs); |
| |
| #ifdef BE_TEST |
| AggSinkOperatorX() |
| : DataSinkOperatorX<AggSinkLocalState>(1, 0, 2), _is_first_phase(), _is_colocate() {} |
| #endif |
| |
| ~AggSinkOperatorX() override = default; |
| Status init(const TDataSink& tsink) override { |
| return Status::InternalError("{} should not init with TPlanNode", |
| DataSinkOperatorX<AggSinkLocalState>::_name); |
| } |
| |
| Status init(const TPlanNode& tnode, RuntimeState* state) override; |
| void update_operator(const TPlanNode& tnode, bool followed_by_shuffled_operator, |
| bool require_bucket_distribution) override; |
| |
| Status prepare(RuntimeState* state) override; |
| |
| Status sink(RuntimeState* state, Block* in_block, bool eos) override; |
| |
| DataDistribution required_data_distribution(RuntimeState* state) const override { |
| if (_partition_exprs.empty()) { |
| return _needs_finalize |
| ? DataDistribution(ExchangeType::NOOP) |
| : DataSinkOperatorX<AggSinkLocalState>::required_data_distribution( |
| state); |
| } |
| return _is_colocate && _require_bucket_distribution |
| ? DataDistribution(ExchangeType::BUCKET_HASH_SHUFFLE, _partition_exprs) |
| : DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); |
| } |
| bool is_colocated_operator() const override { return _is_colocate; } |
| bool is_shuffled_operator() const override { |
| return !_partition_exprs.empty() && _needs_finalize; |
| } |
| size_t get_revocable_mem_size(RuntimeState* state) const; |
| |
| AggregatedDataVariants* get_agg_data(RuntimeState* state) { |
| auto& local_state = get_local_state(state); |
| return local_state._agg_data; |
| } |
| |
| Status reset_hash_table(RuntimeState* state); |
| |
| size_t get_reserve_mem_size(RuntimeState* state, bool eos) override; |
| |
| size_t get_hash_table_size(RuntimeState* state) const; |
| |
| using DataSinkOperatorX<AggSinkLocalState>::node_id; |
| using DataSinkOperatorX<AggSinkLocalState>::operator_id; |
| using DataSinkOperatorX<AggSinkLocalState>::get_local_state; |
| |
| protected: |
| MOCK_FUNCTION Status _init_probe_expr_ctx(RuntimeState* state); |
| |
| MOCK_FUNCTION Status _init_aggregate_evaluators(RuntimeState* state); |
| |
| MOCK_FUNCTION Status _calc_aggregate_evaluators(); |
| |
| MOCK_FUNCTION Status _check_agg_fn_output(); |
| |
| using LocalState = AggSinkLocalState; |
| friend class AggSinkLocalState; |
| std::vector<AggFnEvaluator*> _aggregate_evaluators; |
| |
| // may be we don't have to know the tuple id |
| TupleId _intermediate_tuple_id; |
| TupleDescriptor* _intermediate_tuple_desc = nullptr; |
| |
| TupleId _output_tuple_id; |
| TupleDescriptor* _output_tuple_desc = nullptr; |
| |
| bool _needs_finalize; |
| bool _is_merge; |
| const bool _is_first_phase; |
| |
| size_t _align_aggregate_states = 1; |
| /// The offset to the n-th aggregate function in a row of aggregate functions. |
| Sizes _offsets_of_aggregate_states; |
| /// The total size of the row from the aggregate functions. |
| size_t _total_size_of_aggregate_states = 0; |
| |
| // group by k1,k2 |
| VExprContextSPtrs _probe_expr_ctxs; |
| ObjectPool* _pool = nullptr; |
| std::vector<size_t> _make_nullable_keys; |
| int64_t _limit; // -1: no limit |
| // do sort limit and directions |
| bool _do_sort_limit = false; |
| std::vector<int> _order_directions; |
| std::vector<int> _null_directions; |
| |
| bool _have_conjuncts; |
| std::vector<TExpr> _partition_exprs; |
| const bool _is_colocate; |
| RowDescriptor _agg_fn_output_row_descriptor; |
| }; |
| |
| } // namespace doris |
| #include "common/compile_check_end.h" |