| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #pragma once |
| |
| #include <stdint.h> |
| |
| #include <memory> |
| |
| #include "common/status.h" |
| #include "core/block/block.h" |
| #include "exec/operator/operator.h" |
| #include "runtime/runtime_profile.h" |
| |
| namespace doris { |
| #include "common/compile_check_begin.h" |
| class RuntimeState; |
| |
| class StreamingAggOperatorX; |
| |
| class StreamingAggLocalState MOCK_REMOVE(final) : public PipelineXLocalState<FakeSharedState> { |
| public: |
| using Parent = StreamingAggOperatorX; |
| using Base = PipelineXLocalState<FakeSharedState>; |
| ENABLE_FACTORY_CREATOR(StreamingAggLocalState); |
| StreamingAggLocalState(RuntimeState* state, OperatorXBase* parent); |
| ~StreamingAggLocalState() override = default; |
| |
| Status init(RuntimeState* state, LocalStateInfo& info) override; |
| Status open(RuntimeState* state) override; |
| Status close(RuntimeState* state) override; |
| Status do_pre_agg(RuntimeState* state, Block* input_block, Block* output_block); |
| void make_nullable_output_key(Block* block); |
| void build_limit_heap(size_t hash_table_size); |
| |
| private: |
| friend class StreamingAggOperatorX; |
| template <typename LocalStateType> |
| friend class StatefulOperatorX; |
| |
| size_t _memory_usage() const; |
| void _add_limit_heap_top(ColumnRawPtrs& key_columns, size_t rows); |
| bool _do_limit_filter(size_t num_rows, ColumnRawPtrs& key_columns); |
| void _refresh_limit_heap(size_t i, ColumnRawPtrs& key_columns); |
| |
| Status _pre_agg_with_serialized_key(doris::Block* in_block, doris::Block* out_block); |
| bool _should_expand_preagg_hash_tables(); |
| |
| MOCK_FUNCTION bool _should_not_do_pre_agg(size_t rows); |
| |
| Status _execute_with_serialized_key(Block* block); |
| void _update_memusage_with_serialized_key(); |
| Status _init_hash_method(const VExprContextSPtrs& probe_exprs); |
| Status _get_results_with_serialized_key(RuntimeState* state, Block* block, bool* eos); |
| void _emplace_into_hash_table(AggregateDataPtr* places, ColumnRawPtrs& key_columns, |
| const uint32_t num_rows); |
| void _emplace_into_hash_table_inline_count(ColumnRawPtrs& key_columns, uint32_t num_rows); |
| bool _emplace_into_hash_table_limit(AggregateDataPtr* places, Block* block, |
| ColumnRawPtrs& key_columns, uint32_t num_rows); |
| Status _create_agg_status(AggregateDataPtr data); |
| size_t _get_hash_table_size(); |
| |
| RuntimeProfile::Counter* _streaming_agg_timer = nullptr; |
| RuntimeProfile::Counter* _hash_table_compute_timer = nullptr; |
| RuntimeProfile::Counter* _hash_table_limit_compute_timer = nullptr; |
| RuntimeProfile::Counter* _hash_table_emplace_timer = nullptr; |
| RuntimeProfile::Counter* _hash_table_input_counter = nullptr; |
| RuntimeProfile::Counter* _build_timer = nullptr; |
| RuntimeProfile::Counter* _expr_timer = nullptr; |
| RuntimeProfile::Counter* _merge_timer = nullptr; |
| RuntimeProfile::Counter* _insert_values_to_column_timer = nullptr; |
| RuntimeProfile::Counter* _deserialize_data_timer = nullptr; |
| RuntimeProfile::Counter* _hash_table_memory_usage = nullptr; |
| RuntimeProfile::HighWaterMarkCounter* _serialize_key_arena_memory_usage = nullptr; |
| RuntimeProfile::Counter* _hash_table_size_counter = nullptr; |
| RuntimeProfile::Counter* _get_results_timer = nullptr; |
| RuntimeProfile::Counter* _hash_table_iterate_timer = nullptr; |
| RuntimeProfile::Counter* _insert_keys_to_column_timer = nullptr; |
| |
| bool _should_expand_hash_table = true; |
| int64_t _cur_num_rows_returned = 0; |
| Arena _agg_arena_pool; |
| AggregatedDataVariantsUPtr _agg_data = nullptr; |
| std::vector<AggFnEvaluator*> _aggregate_evaluators; |
| // group by k1,k2 |
| VExprContextSPtrs _probe_expr_ctxs; |
| std::unique_ptr<AggregateDataContainer> _aggregate_data_container = nullptr; |
| bool _use_simple_count = false; |
| bool _reach_limit = false; |
| size_t _input_num_rows = 0; |
| |
| int64_t limit = -1; |
| int need_do_sort_limit = -1; |
| bool do_sort_limit = false; |
| MutableColumns limit_columns; |
| int limit_columns_min = -1; |
| PaddedPODArray<uint8_t> need_computes; |
| std::vector<uint8_t> cmp_res; |
| std::vector<int> order_directions; |
| std::vector<int> null_directions; |
| |
| struct HeapLimitCursor { |
| HeapLimitCursor(int row_id, MutableColumns& limit_columns, |
| std::vector<int>& order_directions, std::vector<int>& null_directions) |
| : _row_id(row_id), |
| _limit_columns(limit_columns), |
| _order_directions(order_directions), |
| _null_directions(null_directions) {} |
| |
| HeapLimitCursor(const HeapLimitCursor& other) = default; |
| |
| HeapLimitCursor(HeapLimitCursor&& other) noexcept |
| : _row_id(other._row_id), |
| _limit_columns(other._limit_columns), |
| _order_directions(other._order_directions), |
| _null_directions(other._null_directions) {} |
| |
| HeapLimitCursor& operator=(const HeapLimitCursor& other) noexcept { |
| _row_id = other._row_id; |
| return *this; |
| } |
| |
| HeapLimitCursor& operator=(HeapLimitCursor&& other) noexcept { |
| _row_id = other._row_id; |
| return *this; |
| } |
| |
| bool operator<(const HeapLimitCursor& rhs) const { |
| for (int i = 0; i < _limit_columns.size(); ++i) { |
| const auto& _limit_column = _limit_columns[i]; |
| auto res = _limit_column->compare_at(_row_id, rhs._row_id, *_limit_column, |
| _null_directions[i]) * |
| _order_directions[i]; |
| if (res < 0) { |
| return true; |
| } else if (res > 0) { |
| return false; |
| } |
| } |
| return false; |
| } |
| |
| int _row_id; |
| MutableColumns& _limit_columns; |
| std::vector<int>& _order_directions; |
| std::vector<int>& _null_directions; |
| }; |
| |
| std::priority_queue<HeapLimitCursor> limit_heap; |
| |
| MutableColumns _get_keys_hash_table(); |
| |
| PODArray<AggregateDataPtr> _places; |
| std::vector<char> _deserialize_buffer; |
| |
| std::unique_ptr<Block> _child_block = nullptr; |
| bool _child_eos = false; |
| std::unique_ptr<Block> _pre_aggregated_block = nullptr; |
| std::vector<AggregateDataPtr> _values; |
| bool _opened = false; |
| |
| void _destroy_agg_status(AggregateDataPtr data); |
| |
| void _close_with_serialized_key() { |
| std::visit(Overload {[&](std::monostate& arg) -> void { |
| // Do nothing |
| }, |
| [&](auto& agg_method) -> void { |
| if (_use_simple_count) { |
| // Inline count: mapped slots hold UInt64, |
| // not real agg state pointers. Skip destroy. |
| return; |
| } |
| auto& data = *agg_method.hash_table; |
| data.for_each_mapped([&](auto& mapped) { |
| if (mapped) { |
| _destroy_agg_status(mapped); |
| mapped = nullptr; |
| } |
| }); |
| if (data.has_null_key_data()) { |
| _destroy_agg_status( |
| data.template get_null_key_data<AggregateDataPtr>()); |
| } |
| }}, |
| _agg_data->method_variant); |
| } |
| |
| bool _is_single_backend = false; |
| }; |
| |
| class StreamingAggOperatorX MOCK_REMOVE(final) : public StatefulOperatorX<StreamingAggLocalState> { |
| public: |
| StreamingAggOperatorX(ObjectPool* pool, int operator_id, const TPlanNode& tnode, |
| const DescriptorTbl& descs); |
| #ifdef BE_TEST |
| StreamingAggOperatorX() : _is_first_phase {false} {} |
| #endif |
| |
| ~StreamingAggOperatorX() override = default; |
| Status init(const TPlanNode& tnode, RuntimeState* state) override; |
| void update_operator(const TPlanNode& tnode, bool followed_by_shuffled_operator, |
| bool require_bucket_distribution) override; |
| Status prepare(RuntimeState* state) override; |
| Status pull(RuntimeState* state, Block* block, bool* eos) const override; |
| Status push(RuntimeState* state, Block* input_block, bool eos) const override; |
| bool need_more_input_data(RuntimeState* state) const override; |
| void set_low_memory_mode(RuntimeState* state) override { |
| _spill_streaming_agg_mem_limit = 1024 * 1024; |
| } |
| DataDistribution required_data_distribution(RuntimeState* state) const override { |
| if (_child && _child->is_hash_join_probe() && |
| state->enable_streaming_agg_hash_join_force_passthrough()) { |
| return DataDistribution(ExchangeType::PASSTHROUGH); |
| } |
| if (!state->get_query_ctx()->should_be_shuffled_agg( |
| StatefulOperatorX<StreamingAggLocalState>::node_id())) { |
| return StatefulOperatorX<StreamingAggLocalState>::required_data_distribution(state); |
| } |
| if (_partition_exprs.empty()) { |
| return _needs_finalize |
| ? DataDistribution(ExchangeType::NOOP) |
| : StatefulOperatorX<StreamingAggLocalState>::required_data_distribution( |
| state); |
| } |
| return DataDistribution(ExchangeType::HASH_SHUFFLE, _partition_exprs); |
| } |
| |
| private: |
| friend class StreamingAggLocalState; |
| |
| MOCK_FUNCTION Status _init_probe_expr_ctx(RuntimeState* state); |
| |
| MOCK_FUNCTION Status _init_aggregate_evaluators(RuntimeState* state); |
| |
| MOCK_FUNCTION Status _calc_aggregate_evaluators(); |
| |
| // may be we don't have to know the tuple id |
| TupleId _intermediate_tuple_id; |
| TupleDescriptor* _intermediate_tuple_desc = nullptr; |
| |
| TupleId _output_tuple_id; |
| TupleDescriptor* _output_tuple_desc = nullptr; |
| bool _needs_finalize; |
| const bool _is_first_phase; |
| size_t _align_aggregate_states = 1; |
| /// The offset to the n-th aggregate function in a row of aggregate functions. |
| Sizes _offsets_of_aggregate_states; |
| /// The total size of the row from the aggregate functions. |
| size_t _total_size_of_aggregate_states = 0; |
| |
| /// When spilling is enabled, the streaming agg should not occupy too much memory. |
| size_t _spill_streaming_agg_mem_limit; |
| // group by k1,k2 |
| VExprContextSPtrs _probe_expr_ctxs; |
| std::vector<AggFnEvaluator*> _aggregate_evaluators; |
| bool _can_short_circuit = false; |
| std::vector<size_t> _make_nullable_keys; |
| RowDescriptor _agg_fn_output_row_descriptor; |
| |
| // For sort limit |
| bool _do_sort_limit = false; |
| int64_t _sort_limit = -1; |
| std::vector<int> _order_directions; |
| std::vector<int> _null_directions; |
| |
| std::vector<TExpr> _partition_exprs; |
| }; |
| |
| #include "common/compile_check_end.h" |
| } // namespace doris |