| // Licensed to the Apache Software Foundation (ASF) under one |
| // or more contributor license agreements. See the NOTICE file |
| // distributed with this work for additional information |
| // regarding copyright ownership. The ASF licenses this file |
| // to you under the Apache License, Version 2.0 (the |
| // "License"); you may not use this file except in compliance |
| // with the License. You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, |
| // software distributed under the License is distributed on an |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| // KIND, either express or implied. See the License for the |
| // specific language governing permissions and limitations |
| // under the License. |
| |
| #include "aggregation_sink_operator.h" |
| |
| #include <memory> |
| #include <string> |
| |
| #include "common/cast_set.h" |
| #include "common/status.h" |
| #include "pipeline/exec/operator.h" |
| #include "runtime/primitive_type.h" |
| #include "runtime/thread_context.h" |
| #include "util/runtime_profile.h" |
| #include "vec/aggregate_functions/aggregate_function_simple_factory.h" |
| #include "vec/common/hash_table/hash.h" |
| #include "vec/exprs/vectorized_agg_fn.h" |
| |
| namespace doris::pipeline { |
| #include "common/compile_check_begin.h" |
| /// The minimum reduction factor (input rows divided by output rows) to grow hash tables |
| /// in a streaming preaggregation, given that the hash tables are currently the given |
| /// size or above. The sizes roughly correspond to hash table sizes where the bucket |
| /// arrays will fit in a cache level. Intuitively, we don't want the working set of the |
| /// aggregation to expand to the next level of cache unless we're reducing the input |
| /// enough to outweigh the increased memory latency we'll incur for each hash table |
| /// lookup. |
| /// |
| /// Note that the current reduction achieved is not always a good estimate of the |
| /// final reduction. It may be biased either way depending on the ordering of the |
| /// input. If the input order is random, we will underestimate the final reduction |
| /// factor because the probability of a row having the same key as a previous row |
| /// increases as more input is processed. If the input order is correlated with the |
| /// key, skew may bias the estimate. If high cardinality keys appear first, we |
| /// may overestimate and if low cardinality keys appear first, we underestimate. |
| /// To estimate the eventual reduction achieved, we estimate the final reduction |
| /// using the planner's estimated input cardinality and the assumption that input |
| /// is in a random order. This means that we assume that the reduction factor will |
| /// increase over time. |
| AggSinkLocalState::AggSinkLocalState(DataSinkOperatorXBase* parent, RuntimeState* state) |
| : Base(parent, state) {} |
| |
| Status AggSinkLocalState::init(RuntimeState* state, LocalSinkStateInfo& info) { |
| RETURN_IF_ERROR(Base::init(state, info)); |
| SCOPED_TIMER(Base::exec_time_counter()); |
| SCOPED_TIMER(Base::_init_timer); |
| _agg_data = Base::_shared_state->agg_data.get(); |
| _hash_table_size_counter = ADD_COUNTER(custom_profile(), "HashTableSize", TUnit::UNIT); |
| _hash_table_memory_usage = |
| ADD_COUNTER_WITH_LEVEL(Base::custom_profile(), "MemoryUsageHashTable", TUnit::BYTES, 1); |
| _serialize_key_arena_memory_usage = ADD_COUNTER_WITH_LEVEL( |
| Base::custom_profile(), "MemoryUsageSerializeKeyArena", TUnit::BYTES, 1); |
| |
| _build_timer = ADD_TIMER(Base::custom_profile(), "BuildTime"); |
| _merge_timer = ADD_TIMER(Base::custom_profile(), "MergeTime"); |
| _expr_timer = ADD_TIMER(Base::custom_profile(), "ExprTime"); |
| _deserialize_data_timer = ADD_TIMER(Base::custom_profile(), "DeserializeAndMergeTime"); |
| _hash_table_compute_timer = ADD_TIMER(Base::custom_profile(), "HashTableComputeTime"); |
| _hash_table_limit_compute_timer = ADD_TIMER(Base::custom_profile(), "DoLimitComputeTime"); |
| _hash_table_emplace_timer = ADD_TIMER(Base::custom_profile(), "HashTableEmplaceTime"); |
| _hash_table_input_counter = |
| ADD_COUNTER(Base::custom_profile(), "HashTableInputCount", TUnit::UNIT); |
| |
| _memory_usage_container = ADD_COUNTER(custom_profile(), "MemoryUsageContainer", TUnit::BYTES); |
| _memory_usage_arena = ADD_COUNTER(custom_profile(), "MemoryUsageArena", TUnit::BYTES); |
| |
| return Status::OK(); |
| } |
| |
| Status AggSinkLocalState::open(RuntimeState* state) { |
| SCOPED_TIMER(Base::exec_time_counter()); |
| SCOPED_TIMER(Base::_open_timer); |
| RETURN_IF_ERROR(Base::open(state)); |
| auto& p = Base::_parent->template cast<AggSinkOperatorX>(); |
| Base::_shared_state->align_aggregate_states = p._align_aggregate_states; |
| Base::_shared_state->total_size_of_aggregate_states = p._total_size_of_aggregate_states; |
| Base::_shared_state->offsets_of_aggregate_states = p._offsets_of_aggregate_states; |
| Base::_shared_state->make_nullable_keys = p._make_nullable_keys; |
| Base::_shared_state->probe_expr_ctxs.resize(p._probe_expr_ctxs.size()); |
| |
| Base::_shared_state->limit = p._limit; |
| Base::_shared_state->do_sort_limit = p._do_sort_limit; |
| Base::_shared_state->null_directions = p._null_directions; |
| Base::_shared_state->order_directions = p._order_directions; |
| for (size_t i = 0; i < Base::_shared_state->probe_expr_ctxs.size(); i++) { |
| RETURN_IF_ERROR( |
| p._probe_expr_ctxs[i]->clone(state, Base::_shared_state->probe_expr_ctxs[i])); |
| } |
| |
| if (Base::_shared_state->probe_expr_ctxs.empty()) { |
| _agg_data->without_key = |
| reinterpret_cast<vectorized::AggregateDataPtr>(_agg_profile_arena.aligned_alloc( |
| p._total_size_of_aggregate_states, p._align_aggregate_states)); |
| |
| if (p._is_merge) { |
| _executor = std::make_unique<Executor<true, true>>(); |
| } else { |
| _executor = std::make_unique<Executor<true, false>>(); |
| } |
| } else { |
| RETURN_IF_ERROR(_init_hash_method(Base::_shared_state->probe_expr_ctxs)); |
| |
| std::visit(vectorized::Overload {[&](std::monostate& arg) { |
| throw doris::Exception(ErrorCode::INTERNAL_ERROR, |
| "uninited hash table"); |
| }, |
| [&](auto& agg_method) { |
| using HashTableType = |
| std::decay_t<decltype(agg_method)>; |
| using KeyType = typename HashTableType::Key; |
| |
| /// some aggregate functions (like AVG for decimal) have align issues. |
| Base::_shared_state->aggregate_data_container = |
| std::make_unique<AggregateDataContainer>( |
| sizeof(KeyType), |
| ((p._total_size_of_aggregate_states + |
| p._align_aggregate_states - 1) / |
| p._align_aggregate_states) * |
| p._align_aggregate_states); |
| }}, |
| _agg_data->method_variant); |
| if (p._is_merge) { |
| _executor = std::make_unique<Executor<false, true>>(); |
| } else { |
| _executor = std::make_unique<Executor<false, false>>(); |
| } |
| |
| _should_limit_output = p._limit != -1 && // has limit |
| (!p._have_conjuncts) && // no having conjunct |
| !Base::_shared_state->enable_spill; |
| } |
| for (auto& evaluator : p._aggregate_evaluators) { |
| Base::_shared_state->aggregate_evaluators.push_back(evaluator->clone(state, p._pool)); |
| } |
| for (auto& evaluator : Base::_shared_state->aggregate_evaluators) { |
| evaluator->set_timer(_merge_timer, _expr_timer); |
| } |
| // move _create_agg_status to open not in during prepare, |
| // because during prepare and open thread is not the same one, |
| // this could cause unable to get JVM |
| if (Base::_shared_state->probe_expr_ctxs.empty()) { |
| // _create_agg_status may acquire a lot of memory, may allocate failed when memory is very few |
| RETURN_IF_ERROR(_create_agg_status(_agg_data->without_key)); |
| _shared_state->agg_data_created_without_key = true; |
| } |
| return Status::OK(); |
| } |
| |
| Status AggSinkLocalState::_create_agg_status(vectorized::AggregateDataPtr data) { |
| auto& shared_state = *Base::_shared_state; |
| for (int i = 0; i < shared_state.aggregate_evaluators.size(); ++i) { |
| try { |
| shared_state.aggregate_evaluators[i]->create( |
| data + shared_state.offsets_of_aggregate_states[i]); |
| } catch (...) { |
| for (int j = 0; j < i; ++j) { |
| shared_state.aggregate_evaluators[j]->destroy( |
| data + shared_state.offsets_of_aggregate_states[j]); |
| } |
| throw; |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status AggSinkLocalState::_execute_without_key(vectorized::Block* block) { |
| DCHECK(_agg_data->without_key != nullptr); |
| SCOPED_TIMER(_build_timer); |
| _memory_usage_last_executing = 0; |
| SCOPED_PEAK_MEM(&_memory_usage_last_executing); |
| for (int i = 0; i < Base::_shared_state->aggregate_evaluators.size(); ++i) { |
| RETURN_IF_ERROR(Base::_shared_state->aggregate_evaluators[i]->execute_single_add( |
| block, |
| _agg_data->without_key + Base::_parent->template cast<AggSinkOperatorX>() |
| ._offsets_of_aggregate_states[i], |
| _agg_arena_pool)); |
| } |
| return Status::OK(); |
| } |
| |
| Status AggSinkLocalState::_merge_with_serialized_key(vectorized::Block* block) { |
| _memory_usage_last_executing = 0; |
| SCOPED_PEAK_MEM(&_memory_usage_last_executing); |
| if (_shared_state->reach_limit) { |
| return _merge_with_serialized_key_helper<true, false>(block); |
| } else { |
| return _merge_with_serialized_key_helper<false, false>(block); |
| } |
| } |
| |
| size_t AggSinkLocalState::_memory_usage() const { |
| if (0 == _get_hash_table_size()) { |
| return 0; |
| } |
| size_t usage = 0; |
| usage += _agg_arena_pool.size(); |
| |
| if (Base::_shared_state->aggregate_data_container) { |
| usage += Base::_shared_state->aggregate_data_container->memory_usage(); |
| } |
| |
| std::visit(vectorized::Overload {[&](std::monostate& arg) -> void { |
| throw doris::Exception(ErrorCode::INTERNAL_ERROR, |
| "uninited hash table"); |
| }, |
| [&](auto& agg_method) -> void { |
| auto data = agg_method.hash_table; |
| usage += data->get_buffer_size_in_bytes(); |
| }}, |
| _agg_data->method_variant); |
| |
| return usage; |
| } |
| |
| bool AggSinkLocalState::is_blockable() const { |
| return std::any_of( |
| Base::_shared_state->aggregate_evaluators.begin(), |
| Base::_shared_state->aggregate_evaluators.end(), |
| [](const vectorized::AggFnEvaluator* evaluator) { return evaluator->is_blockable(); }); |
| } |
| |
| void AggSinkLocalState::_update_memusage_with_serialized_key() { |
| std::visit(vectorized::Overload { |
| [&](std::monostate& arg) -> void { |
| throw doris::Exception(ErrorCode::INTERNAL_ERROR, "uninited hash table"); |
| }, |
| [&](auto& agg_method) -> void { |
| auto& data = *agg_method.hash_table; |
| int64_t memory_usage_arena = _agg_arena_pool.size(); |
| int64_t memory_usage_container = |
| _shared_state->aggregate_data_container->memory_usage(); |
| int64_t hash_table_memory_usage = data.get_buffer_size_in_bytes(); |
| |
| COUNTER_SET(_memory_usage_arena, memory_usage_arena); |
| COUNTER_SET(_memory_usage_container, memory_usage_container); |
| COUNTER_SET(_hash_table_memory_usage, hash_table_memory_usage); |
| COUNTER_SET(_serialize_key_arena_memory_usage, |
| memory_usage_arena + memory_usage_container); |
| |
| COUNTER_SET(_memory_used_counter, memory_usage_arena + |
| memory_usage_container + |
| hash_table_memory_usage); |
| }}, |
| _agg_data->method_variant); |
| } |
| |
| Status AggSinkLocalState::_destroy_agg_status(vectorized::AggregateDataPtr data) { |
| auto& shared_state = *Base::_shared_state; |
| for (int i = 0; i < shared_state.aggregate_evaluators.size(); ++i) { |
| shared_state.aggregate_evaluators[i]->function()->destroy( |
| data + shared_state.offsets_of_aggregate_states[i]); |
| } |
| return Status::OK(); |
| } |
| |
| template <bool limit, bool for_spill> |
| Status AggSinkLocalState::_merge_with_serialized_key_helper(vectorized::Block* block) { |
| SCOPED_TIMER(_merge_timer); |
| |
| size_t key_size = Base::_shared_state->probe_expr_ctxs.size(); |
| vectorized::ColumnRawPtrs key_columns(key_size); |
| std::vector<int> key_locs(key_size); |
| |
| for (int i = 0; i < key_size; ++i) { |
| if constexpr (for_spill) { |
| key_columns[i] = block->get_by_position(i).column.get(); |
| key_locs[i] = i; |
| } else { |
| int& result_column_id = key_locs[i]; |
| RETURN_IF_ERROR( |
| Base::_shared_state->probe_expr_ctxs[i]->execute(block, &result_column_id)); |
| block->replace_by_position_if_const(result_column_id); |
| key_columns[i] = block->get_by_position(result_column_id).column.get(); |
| } |
| key_columns[i]->assume_mutable()->replace_float_special_values(); |
| } |
| |
| size_t rows = block->rows(); |
| if (_places.size() < rows) { |
| _places.resize(rows); |
| } |
| |
| if (limit && !_shared_state->do_sort_limit) { |
| _find_in_hash_table(_places.data(), key_columns, (uint32_t)rows); |
| |
| for (int i = 0; i < Base::_shared_state->aggregate_evaluators.size(); ++i) { |
| if (Base::_shared_state->aggregate_evaluators[i]->is_merge()) { |
| int col_id = AggSharedState::get_slot_column_id( |
| Base::_shared_state->aggregate_evaluators[i]); |
| auto column = block->get_by_position(col_id).column; |
| if (column->is_nullable()) { |
| column = ((vectorized::ColumnNullable*)column.get())->get_nested_column_ptr(); |
| } |
| |
| size_t buffer_size = |
| Base::_shared_state->aggregate_evaluators[i]->function()->size_of_data() * |
| rows; |
| if (_deserialize_buffer.size() < buffer_size) { |
| _deserialize_buffer.resize(buffer_size); |
| } |
| |
| { |
| SCOPED_TIMER(_deserialize_data_timer); |
| Base::_shared_state->aggregate_evaluators[i] |
| ->function() |
| ->deserialize_and_merge_vec_selected( |
| _places.data(), |
| Base::_parent->template cast<AggSinkOperatorX>() |
| ._offsets_of_aggregate_states[i], |
| _deserialize_buffer.data(), column.get(), _agg_arena_pool, |
| rows); |
| } |
| } else { |
| RETURN_IF_ERROR( |
| Base::_shared_state->aggregate_evaluators[i]->execute_batch_add_selected( |
| block, |
| Base::_parent->template cast<AggSinkOperatorX>() |
| ._offsets_of_aggregate_states[i], |
| _places.data(), _agg_arena_pool)); |
| } |
| } |
| } else { |
| bool need_do_agg = true; |
| if (limit) { |
| need_do_agg = _emplace_into_hash_table_limit(_places.data(), block, key_locs, |
| key_columns, (uint32_t)rows); |
| rows = block->rows(); |
| } else { |
| _emplace_into_hash_table(_places.data(), key_columns, (uint32_t)rows); |
| } |
| |
| if (need_do_agg) { |
| for (int i = 0; i < Base::_shared_state->aggregate_evaluators.size(); ++i) { |
| if (Base::_shared_state->aggregate_evaluators[i]->is_merge() || for_spill) { |
| size_t col_id = 0; |
| if constexpr (for_spill) { |
| col_id = Base::_shared_state->probe_expr_ctxs.size() + i; |
| } else { |
| col_id = AggSharedState::get_slot_column_id( |
| Base::_shared_state->aggregate_evaluators[i]); |
| } |
| auto column = block->get_by_position(col_id).column; |
| if (column->is_nullable()) { |
| column = ((vectorized::ColumnNullable*)column.get()) |
| ->get_nested_column_ptr(); |
| } |
| |
| size_t buffer_size = Base::_shared_state->aggregate_evaluators[i] |
| ->function() |
| ->size_of_data() * |
| rows; |
| if (_deserialize_buffer.size() < buffer_size) { |
| _deserialize_buffer.resize(buffer_size); |
| } |
| |
| { |
| SCOPED_TIMER(_deserialize_data_timer); |
| Base::_shared_state->aggregate_evaluators[i] |
| ->function() |
| ->deserialize_and_merge_vec( |
| _places.data(), |
| Base::_parent->template cast<AggSinkOperatorX>() |
| ._offsets_of_aggregate_states[i], |
| _deserialize_buffer.data(), column.get(), _agg_arena_pool, |
| rows); |
| } |
| } else { |
| RETURN_IF_ERROR(Base::_shared_state->aggregate_evaluators[i]->execute_batch_add( |
| block, |
| Base::_parent->template cast<AggSinkOperatorX>() |
| ._offsets_of_aggregate_states[i], |
| _places.data(), _agg_arena_pool)); |
| } |
| } |
| } |
| |
| if (!limit && _should_limit_output) { |
| const size_t hash_table_size = _get_hash_table_size(); |
| _shared_state->reach_limit = |
| hash_table_size >= Base::_parent->template cast<AggSinkOperatorX>()._limit; |
| if (_shared_state->do_sort_limit && _shared_state->reach_limit) { |
| _shared_state->build_limit_heap(hash_table_size); |
| } |
| } |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status AggSinkLocalState::_merge_without_key(vectorized::Block* block) { |
| SCOPED_TIMER(_merge_timer); |
| DCHECK(_agg_data->without_key != nullptr); |
| |
| _memory_usage_last_executing = 0; |
| SCOPED_PEAK_MEM(&_memory_usage_last_executing); |
| for (int i = 0; i < Base::_shared_state->aggregate_evaluators.size(); ++i) { |
| if (Base::_shared_state->aggregate_evaluators[i]->is_merge()) { |
| int col_id = AggSharedState::get_slot_column_id( |
| Base::_shared_state->aggregate_evaluators[i]); |
| auto column = block->get_by_position(col_id).column; |
| if (column->is_nullable()) { |
| column = ((vectorized::ColumnNullable*)column.get())->get_nested_column_ptr(); |
| } |
| |
| SCOPED_TIMER(_deserialize_data_timer); |
| Base::_shared_state->aggregate_evaluators[i] |
| ->function() |
| ->deserialize_and_merge_from_column( |
| _agg_data->without_key + |
| Base::_parent->template cast<AggSinkOperatorX>() |
| ._offsets_of_aggregate_states[i], |
| *column, _agg_arena_pool); |
| } else { |
| RETURN_IF_ERROR(Base::_shared_state->aggregate_evaluators[i]->execute_single_add( |
| block, |
| _agg_data->without_key + Base::_parent->template cast<AggSinkOperatorX>() |
| ._offsets_of_aggregate_states[i], |
| _agg_arena_pool)); |
| } |
| } |
| return Status::OK(); |
| } |
| |
| void AggSinkLocalState::_update_memusage_without_key() { |
| int64_t arena_memory_usage = _agg_arena_pool.size(); |
| COUNTER_SET(_memory_used_counter, arena_memory_usage); |
| COUNTER_SET(_serialize_key_arena_memory_usage, arena_memory_usage); |
| } |
| |
| Status AggSinkLocalState::_execute_with_serialized_key(vectorized::Block* block) { |
| _memory_usage_last_executing = 0; |
| SCOPED_PEAK_MEM(&_memory_usage_last_executing); |
| if (_shared_state->reach_limit) { |
| return _execute_with_serialized_key_helper<true>(block); |
| } else { |
| return _execute_with_serialized_key_helper<false>(block); |
| } |
| } |
| |
| template <bool limit> |
| Status AggSinkLocalState::_execute_with_serialized_key_helper(vectorized::Block* block) { |
| SCOPED_TIMER(_build_timer); |
| DCHECK(!Base::_shared_state->probe_expr_ctxs.empty()); |
| |
| size_t key_size = Base::_shared_state->probe_expr_ctxs.size(); |
| vectorized::ColumnRawPtrs key_columns(key_size); |
| std::vector<int> key_locs(key_size); |
| { |
| SCOPED_TIMER(_expr_timer); |
| for (size_t i = 0; i < key_size; ++i) { |
| int& result_column_id = key_locs[i]; |
| RETURN_IF_ERROR( |
| Base::_shared_state->probe_expr_ctxs[i]->execute(block, &result_column_id)); |
| block->get_by_position(result_column_id).column = |
| block->get_by_position(result_column_id) |
| .column->convert_to_full_column_if_const(); |
| key_columns[i] = block->get_by_position(result_column_id).column.get(); |
| key_columns[i]->assume_mutable()->replace_float_special_values(); |
| } |
| } |
| |
| auto rows = (uint32_t)block->rows(); |
| if (_places.size() < rows) { |
| _places.resize(rows); |
| } |
| |
| if (limit && !_shared_state->do_sort_limit) { |
| _find_in_hash_table(_places.data(), key_columns, rows); |
| |
| for (int i = 0; i < Base::_shared_state->aggregate_evaluators.size(); ++i) { |
| RETURN_IF_ERROR( |
| Base::_shared_state->aggregate_evaluators[i]->execute_batch_add_selected( |
| block, |
| Base::_parent->template cast<AggSinkOperatorX>() |
| ._offsets_of_aggregate_states[i], |
| _places.data(), _agg_arena_pool)); |
| } |
| } else { |
| auto do_aggregate_evaluators = [&] { |
| for (int i = 0; i < Base::_shared_state->aggregate_evaluators.size(); ++i) { |
| RETURN_IF_ERROR(Base::_shared_state->aggregate_evaluators[i]->execute_batch_add( |
| block, |
| Base::_parent->template cast<AggSinkOperatorX>() |
| ._offsets_of_aggregate_states[i], |
| _places.data(), _agg_arena_pool)); |
| } |
| return Status::OK(); |
| }; |
| |
| if constexpr (limit) { |
| if (_emplace_into_hash_table_limit(_places.data(), block, key_locs, key_columns, |
| rows)) { |
| RETURN_IF_ERROR(do_aggregate_evaluators()); |
| } |
| } else { |
| _emplace_into_hash_table(_places.data(), key_columns, rows); |
| RETURN_IF_ERROR(do_aggregate_evaluators()); |
| |
| if (_should_limit_output && !Base::_shared_state->enable_spill) { |
| const size_t hash_table_size = _get_hash_table_size(); |
| |
| _shared_state->reach_limit = |
| hash_table_size >= |
| (_shared_state->do_sort_limit |
| ? Base::_parent->template cast<AggSinkOperatorX>()._limit * |
| config::topn_agg_limit_multiplier |
| : Base::_parent->template cast<AggSinkOperatorX>()._limit); |
| if (_shared_state->reach_limit && _shared_state->do_sort_limit) { |
| _shared_state->build_limit_heap(hash_table_size); |
| } |
| } |
| } |
| } |
| return Status::OK(); |
| } |
| |
| size_t AggSinkLocalState::_get_hash_table_size() const { |
| return std::visit( |
| vectorized::Overload {[&](std::monostate& arg) -> size_t { return 0; }, |
| [&](auto& agg_method) { return agg_method.hash_table->size(); }}, |
| _agg_data->method_variant); |
| } |
| |
| void AggSinkLocalState::_emplace_into_hash_table(vectorized::AggregateDataPtr* places, |
| vectorized::ColumnRawPtrs& key_columns, |
| uint32_t num_rows) { |
| std::visit(vectorized::Overload { |
| [&](std::monostate& arg) -> void { |
| throw doris::Exception(ErrorCode::INTERNAL_ERROR, "uninited hash table"); |
| }, |
| [&](auto& agg_method) -> void { |
| SCOPED_TIMER(_hash_table_compute_timer); |
| using HashMethodType = std::decay_t<decltype(agg_method)>; |
| using AggState = typename HashMethodType::State; |
| AggState state(key_columns); |
| agg_method.init_serialized_keys(key_columns, num_rows); |
| |
| auto creator = [this](const auto& ctor, auto& key, auto& origin) { |
| HashMethodType::try_presis_key_and_origin(key, origin, |
| _agg_arena_pool); |
| auto mapped = |
| Base::_shared_state->aggregate_data_container->append_data( |
| origin); |
| auto st = _create_agg_status(mapped); |
| if (!st) { |
| throw Exception(st.code(), st.to_string()); |
| } |
| ctor(key, mapped); |
| }; |
| |
| auto creator_for_null_key = [&](auto& mapped) { |
| mapped = _agg_arena_pool.aligned_alloc( |
| Base::_parent->template cast<AggSinkOperatorX>() |
| ._total_size_of_aggregate_states, |
| Base::_parent->template cast<AggSinkOperatorX>() |
| ._align_aggregate_states); |
| auto st = _create_agg_status(mapped); |
| if (!st) { |
| throw Exception(st.code(), st.to_string()); |
| } |
| }; |
| |
| SCOPED_TIMER(_hash_table_emplace_timer); |
| for (size_t i = 0; i < num_rows; ++i) { |
| places[i] = *agg_method.lazy_emplace(state, i, creator, |
| creator_for_null_key); |
| } |
| |
| COUNTER_UPDATE(_hash_table_input_counter, num_rows); |
| }}, |
| _agg_data->method_variant); |
| } |
| |
| bool AggSinkLocalState::_emplace_into_hash_table_limit(vectorized::AggregateDataPtr* places, |
| vectorized::Block* block, |
| const std::vector<int>& key_locs, |
| vectorized::ColumnRawPtrs& key_columns, |
| uint32_t num_rows) { |
| return std::visit( |
| vectorized::Overload { |
| [&](std::monostate& arg) { |
| throw doris::Exception(ErrorCode::INTERNAL_ERROR, "uninited hash table"); |
| return true; |
| }, |
| [&](auto&& agg_method) -> bool { |
| SCOPED_TIMER(_hash_table_compute_timer); |
| using HashMethodType = std::decay_t<decltype(agg_method)>; |
| using AggState = typename HashMethodType::State; |
| |
| bool need_filter = false; |
| { |
| SCOPED_TIMER(_hash_table_limit_compute_timer); |
| need_filter = |
| _shared_state->do_limit_filter(block, num_rows, &key_locs); |
| } |
| |
| auto& need_computes = _shared_state->need_computes; |
| if (auto need_agg = |
| std::find(need_computes.begin(), need_computes.end(), 1); |
| need_agg != need_computes.end()) { |
| if (need_filter) { |
| vectorized::Block::filter_block_internal(block, need_computes); |
| for (int i = 0; i < key_locs.size(); ++i) { |
| key_columns[i] = |
| block->get_by_position(key_locs[i]).column.get(); |
| } |
| num_rows = (uint32_t)block->rows(); |
| } |
| |
| AggState state(key_columns); |
| agg_method.init_serialized_keys(key_columns, num_rows); |
| size_t i = 0; |
| |
| auto creator = [&](const auto& ctor, auto& key, auto& origin) { |
| try { |
| HashMethodType::try_presis_key_and_origin(key, origin, |
| _agg_arena_pool); |
| _shared_state->refresh_top_limit(i, key_columns); |
| auto mapped = |
| _shared_state->aggregate_data_container->append_data( |
| origin); |
| auto st = _create_agg_status(mapped); |
| if (!st) { |
| throw Exception(st.code(), st.to_string()); |
| } |
| ctor(key, mapped); |
| } catch (...) { |
| // Exception-safety - if it can not allocate memory or create status, |
| // the destructors will not be called. |
| ctor(key, nullptr); |
| throw; |
| } |
| }; |
| |
| auto creator_for_null_key = [&](auto& mapped) { |
| mapped = _agg_arena_pool.aligned_alloc( |
| Base::_parent->template cast<AggSinkOperatorX>() |
| ._total_size_of_aggregate_states, |
| Base::_parent->template cast<AggSinkOperatorX>() |
| ._align_aggregate_states); |
| auto st = _create_agg_status(mapped); |
| if (!st) { |
| throw Exception(st.code(), st.to_string()); |
| } |
| _shared_state->refresh_top_limit(i, key_columns); |
| }; |
| |
| SCOPED_TIMER(_hash_table_emplace_timer); |
| for (i = 0; i < num_rows; ++i) { |
| places[i] = *agg_method.lazy_emplace(state, i, creator, |
| creator_for_null_key); |
| } |
| COUNTER_UPDATE(_hash_table_input_counter, num_rows); |
| return true; |
| } |
| return false; |
| }}, |
| _agg_data->method_variant); |
| } |
| |
| void AggSinkLocalState::_find_in_hash_table(vectorized::AggregateDataPtr* places, |
| vectorized::ColumnRawPtrs& key_columns, |
| uint32_t num_rows) { |
| std::visit(vectorized::Overload {[&](std::monostate& arg) -> void { |
| throw doris::Exception(ErrorCode::INTERNAL_ERROR, |
| "uninited hash table"); |
| }, |
| [&](auto& agg_method) -> void { |
| using HashMethodType = std::decay_t<decltype(agg_method)>; |
| using AggState = typename HashMethodType::State; |
| AggState state(key_columns); |
| agg_method.init_serialized_keys(key_columns, num_rows); |
| |
| /// For all rows. |
| for (size_t i = 0; i < num_rows; ++i) { |
| auto find_result = agg_method.find(state, i); |
| |
| if (find_result.is_found()) { |
| places[i] = find_result.get_mapped(); |
| } else { |
| places[i] = nullptr; |
| } |
| } |
| }}, |
| _agg_data->method_variant); |
| } |
| |
| Status AggSinkLocalState::_init_hash_method(const vectorized::VExprContextSPtrs& probe_exprs) { |
| RETURN_IF_ERROR(init_hash_method<AggregatedDataVariants>( |
| _agg_data, get_data_types(probe_exprs), |
| Base::_parent->template cast<AggSinkOperatorX>()._is_first_phase)); |
| return Status::OK(); |
| } |
| |
| size_t AggSinkLocalState::get_reserve_mem_size(RuntimeState* state, bool eos) const { |
| size_t size_to_reserve = std::visit( |
| [&](auto&& arg) -> size_t { |
| using HashTableCtxType = std::decay_t<decltype(arg)>; |
| if constexpr (std::is_same_v<HashTableCtxType, std::monostate>) { |
| return 0; |
| } else { |
| return arg.hash_table->estimate_memory(state->batch_size()); |
| } |
| }, |
| _agg_data->method_variant); |
| |
| size_to_reserve += _memory_usage_last_executing; |
| return size_to_reserve; |
| } |
| |
| // TODO: Tricky processing if `multi_distinct_` exists which will be re-planed by optimizer. |
| AggSinkOperatorX::AggSinkOperatorX(ObjectPool* pool, int operator_id, int dest_id, |
| const TPlanNode& tnode, const DescriptorTbl& descs, |
| bool require_bucket_distribution) |
| : DataSinkOperatorX<AggSinkLocalState>(operator_id, tnode, dest_id), |
| _intermediate_tuple_id(tnode.agg_node.intermediate_tuple_id), |
| _output_tuple_id(tnode.agg_node.output_tuple_id), |
| _needs_finalize(tnode.agg_node.need_finalize), |
| _is_merge(false), |
| _is_first_phase(tnode.agg_node.__isset.is_first_phase && tnode.agg_node.is_first_phase), |
| _pool(pool), |
| _limit(tnode.limit), |
| _have_conjuncts((tnode.__isset.vconjunct && !tnode.vconjunct.nodes.empty()) || |
| (tnode.__isset.conjuncts && !tnode.conjuncts.empty())), |
| _partition_exprs( |
| tnode.__isset.distribute_expr_lists && |
| (require_bucket_distribution || |
| std::any_of( |
| tnode.agg_node.aggregate_functions.begin(), |
| tnode.agg_node.aggregate_functions.end(), |
| [](const TExpr& texpr) -> bool { |
| return texpr.nodes[0] |
| .fn.name.function_name.starts_with( |
| vectorized:: |
| DISTINCT_FUNCTION_PREFIX); |
| })) |
| ? tnode.distribute_expr_lists[0] |
| : tnode.agg_node.grouping_exprs), |
| _is_colocate(tnode.agg_node.__isset.is_colocate && tnode.agg_node.is_colocate), |
| _require_bucket_distribution(require_bucket_distribution), |
| _agg_fn_output_row_descriptor(descs, tnode.row_tuples) {} |
| |
| Status AggSinkOperatorX::init(const TPlanNode& tnode, RuntimeState* state) { |
| RETURN_IF_ERROR(DataSinkOperatorX<AggSinkLocalState>::init(tnode, state)); |
| // ignore return status for now , so we need to introduce ExecNode::init() |
| RETURN_IF_ERROR( |
| vectorized::VExpr::create_expr_trees(tnode.agg_node.grouping_exprs, _probe_expr_ctxs)); |
| |
| // init aggregate functions |
| _aggregate_evaluators.reserve(tnode.agg_node.aggregate_functions.size()); |
| |
| TSortInfo dummy; |
| for (int i = 0; i < tnode.agg_node.aggregate_functions.size(); ++i) { |
| vectorized::AggFnEvaluator* evaluator = nullptr; |
| RETURN_IF_ERROR(vectorized::AggFnEvaluator::create( |
| _pool, tnode.agg_node.aggregate_functions[i], |
| tnode.agg_node.__isset.agg_sort_infos ? tnode.agg_node.agg_sort_infos[i] : dummy, |
| tnode.agg_node.grouping_exprs.empty(), false, &evaluator)); |
| _aggregate_evaluators.push_back(evaluator); |
| } |
| |
| if (tnode.agg_node.__isset.agg_sort_info_by_group_key) { |
| _do_sort_limit = true; |
| const auto& agg_sort_info = tnode.agg_node.agg_sort_info_by_group_key; |
| DCHECK_EQ(agg_sort_info.nulls_first.size(), agg_sort_info.is_asc_order.size()); |
| |
| const size_t order_by_key_size = agg_sort_info.is_asc_order.size(); |
| _order_directions.resize(order_by_key_size); |
| _null_directions.resize(order_by_key_size); |
| for (int i = 0; i < order_by_key_size; ++i) { |
| _order_directions[i] = agg_sort_info.is_asc_order[i] ? 1 : -1; |
| _null_directions[i] = |
| agg_sort_info.nulls_first[i] ? -_order_directions[i] : _order_directions[i]; |
| } |
| } |
| |
| return Status::OK(); |
| } |
| |
| Status AggSinkOperatorX::prepare(RuntimeState* state) { |
| RETURN_IF_ERROR(DataSinkOperatorX<AggSinkLocalState>::prepare(state)); |
| |
| RETURN_IF_ERROR(_init_probe_expr_ctx(state)); |
| |
| RETURN_IF_ERROR(_init_aggregate_evaluators(state)); |
| |
| RETURN_IF_ERROR(_calc_aggregate_evaluators()); |
| |
| RETURN_IF_ERROR(_check_agg_fn_output()); |
| |
| return Status::OK(); |
| } |
| |
| Status AggSinkOperatorX::_init_probe_expr_ctx(RuntimeState* state) { |
| _intermediate_tuple_desc = state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id); |
| _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id); |
| DCHECK_EQ(_intermediate_tuple_desc->slots().size(), _output_tuple_desc->slots().size()); |
| RETURN_IF_ERROR(vectorized::VExpr::prepare( |
| _probe_expr_ctxs, state, DataSinkOperatorX<AggSinkLocalState>::_child->row_desc())); |
| |
| RETURN_IF_ERROR(vectorized::VExpr::open(_probe_expr_ctxs, state)); |
| return Status::OK(); |
| } |
| |
| Status AggSinkOperatorX::_init_aggregate_evaluators(RuntimeState* state) { |
| size_t j = _probe_expr_ctxs.size(); |
| for (size_t i = 0; i < j; ++i) { |
| auto nullable_output = _output_tuple_desc->slots()[i]->is_nullable(); |
| auto nullable_input = _probe_expr_ctxs[i]->root()->is_nullable(); |
| if (nullable_output != nullable_input) { |
| DCHECK(nullable_output); |
| _make_nullable_keys.emplace_back(i); |
| } |
| } |
| for (size_t i = 0; i < _aggregate_evaluators.size(); ++i, ++j) { |
| SlotDescriptor* intermediate_slot_desc = _intermediate_tuple_desc->slots()[j]; |
| SlotDescriptor* output_slot_desc = _output_tuple_desc->slots()[j]; |
| RETURN_IF_ERROR(_aggregate_evaluators[i]->prepare( |
| state, DataSinkOperatorX<AggSinkLocalState>::_child->row_desc(), |
| intermediate_slot_desc, output_slot_desc)); |
| _aggregate_evaluators[i]->set_version(state->be_exec_version()); |
| } |
| |
| for (auto& _aggregate_evaluator : _aggregate_evaluators) { |
| RETURN_IF_ERROR(_aggregate_evaluator->open(state)); |
| } |
| return Status::OK(); |
| } |
| |
| Status AggSinkOperatorX::_calc_aggregate_evaluators() { |
| _offsets_of_aggregate_states.resize(_aggregate_evaluators.size()); |
| _is_merge = false; |
| for (size_t i = 0; i < _aggregate_evaluators.size(); ++i) { |
| _offsets_of_aggregate_states[i] = _total_size_of_aggregate_states; |
| |
| const auto& agg_function = _aggregate_evaluators[i]->function(); |
| // aggreate states are aligned based on maximum requirement |
| _align_aggregate_states = std::max(_align_aggregate_states, agg_function->align_of_data()); |
| _total_size_of_aggregate_states += agg_function->size_of_data(); |
| |
| // If not the last aggregate_state, we need pad it so that next aggregate_state will be aligned. |
| if (i + 1 < _aggregate_evaluators.size()) { |
| size_t alignment_of_next_state = |
| _aggregate_evaluators[i + 1]->function()->align_of_data(); |
| if ((alignment_of_next_state & (alignment_of_next_state - 1)) != 0) { |
| return Status::RuntimeError("Logical error: align_of_data is not 2^N"); |
| } |
| |
| /// Extend total_size to next alignment requirement |
| /// Add padding by rounding up 'total_size_of_aggregate_states' to be a multiplier of alignment_of_next_state. |
| _total_size_of_aggregate_states = |
| (_total_size_of_aggregate_states + alignment_of_next_state - 1) / |
| alignment_of_next_state * alignment_of_next_state; |
| } |
| if (_aggregate_evaluators[i]->is_merge()) { |
| _is_merge = true; |
| } |
| } |
| return Status::OK(); |
| } |
| |
| Status AggSinkOperatorX::_check_agg_fn_output() { |
| if (_needs_finalize) { |
| RETURN_IF_ERROR(vectorized::AggFnEvaluator::check_agg_fn_output( |
| cast_set<uint32_t>(_probe_expr_ctxs.size()), _aggregate_evaluators, |
| _agg_fn_output_row_descriptor)); |
| } |
| return Status::OK(); |
| } |
| |
| Status AggSinkOperatorX::sink(doris::RuntimeState* state, vectorized::Block* in_block, bool eos) { |
| auto& local_state = get_local_state(state); |
| SCOPED_TIMER(local_state.exec_time_counter()); |
| COUNTER_UPDATE(local_state.rows_input_counter(), (int64_t)in_block->rows()); |
| local_state._shared_state->input_num_rows += in_block->rows(); |
| if (in_block->rows() > 0) { |
| RETURN_IF_ERROR(local_state._executor->execute(&local_state, in_block)); |
| local_state._executor->update_memusage(&local_state); |
| COUNTER_SET(local_state._hash_table_size_counter, |
| (int64_t)local_state._get_hash_table_size()); |
| } |
| if (eos) { |
| local_state._dependency->set_ready_to_read(); |
| } |
| return Status::OK(); |
| } |
| |
| size_t AggSinkOperatorX::get_revocable_mem_size(RuntimeState* state) const { |
| auto& local_state = get_local_state(state); |
| return local_state._memory_usage(); |
| } |
| |
| Status AggSinkOperatorX::reset_hash_table(RuntimeState* state) { |
| auto& local_state = get_local_state(state); |
| auto& ss = *local_state.Base::_shared_state; |
| RETURN_IF_ERROR(ss.reset_hash_table()); |
| local_state._serialize_key_arena_memory_usage->set((int64_t)0); |
| local_state._agg_arena_pool.clear(true); |
| return Status::OK(); |
| } |
| |
| size_t AggSinkOperatorX::get_reserve_mem_size(RuntimeState* state, bool eos) { |
| auto& local_state = get_local_state(state); |
| return local_state.get_reserve_mem_size(state, eos); |
| } |
| |
| Status AggSinkLocalState::close(RuntimeState* state, Status exec_status) { |
| SCOPED_TIMER(Base::exec_time_counter()); |
| SCOPED_TIMER(Base::_close_timer); |
| if (Base::_closed) { |
| return Status::OK(); |
| } |
| _preagg_block.clear(); |
| vectorized::PODArray<vectorized::AggregateDataPtr> tmp_places; |
| _places.swap(tmp_places); |
| |
| std::vector<char> tmp_deserialize_buffer; |
| _deserialize_buffer.swap(tmp_deserialize_buffer); |
| return Base::close(state, exec_status); |
| } |
| |
| } // namespace doris::pipeline |