| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "AggregateUtil.h" |
| #include <Core/Settings.h> |
| #include <Poco/Logger.h> |
| #include <Common/AggregateUtil.h> |
| #include <Common/CHUtil.h> |
| #include <Common/Exception.h> |
| #include <Common/Stopwatch.h> |
| #include <Common/formatReadable.h> |
| #include <Common/logger_useful.h> |
| |
| namespace DB |
| { |
| namespace ErrorCodes |
| { |
| extern const int LOGICAL_ERROR; |
| extern const int UNKNOWN_AGGREGATED_DATA_VARIANT; |
| } |
| |
| namespace Setting |
| { |
| extern const SettingsDouble max_bytes_ratio_before_external_group_by; |
| extern const SettingsUInt64 max_bytes_before_external_group_by; |
| extern const SettingsBool optimize_group_by_constant_keys; |
| extern const SettingsUInt64 min_free_disk_space_for_temporary_data; |
| extern const SettingsMaxThreads max_threads; |
| extern const SettingsBool empty_result_for_aggregation_by_empty_set; |
| extern const SettingsUInt64 group_by_two_level_threshold_bytes; |
| extern const SettingsOverflowModeGroupBy group_by_overflow_mode; |
| extern const SettingsUInt64 max_rows_to_group_by; |
| extern const SettingsBool enable_memory_bound_merging_of_aggregation_results; |
| extern const SettingsUInt64 aggregation_in_order_max_block_bytes; |
| extern const SettingsUInt64 group_by_two_level_threshold; |
| extern const SettingsFloat min_hit_rate_to_use_consecutive_keys_optimization; |
| extern const SettingsUInt64 max_block_size; |
| extern const SettingsBool compile_aggregate_expressions; |
| extern const SettingsUInt64 min_count_to_compile_aggregate_expression; |
| extern const SettingsBool enable_software_prefetch_in_aggregation; |
| } |
| |
| template <typename Method> |
| static Int32 extractMethodBucketsNum(Method & /*method*/) |
| { |
| return Method::Data::NUM_BUCKETS; |
| } |
| |
| Int32 GlutenAggregatorUtil::getBucketsNum(AggregatedDataVariants & data_variants) |
| { |
| if (!data_variants.isTwoLevel()) |
| return 0; |
| |
| Int32 buckets_num = 0; |
| #define M(NAME) \ |
| else if (data_variants.type == AggregatedDataVariants::Type::NAME) buckets_num = extractMethodBucketsNum(*data_variants.NAME); |
| |
| if (false) |
| { |
| } // NOLINT |
| APPLY_FOR_VARIANTS_TWO_LEVEL(M) |
| #undef M |
| else throw Exception(ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT, "Unknown aggregated data variant"); |
| return buckets_num; |
| } |
| |
| std::optional<Block> GlutenAggregatorUtil::safeConvertOneBucketToBlock( |
| Aggregator & aggregator, AggregatedDataVariants & variants, Arena * arena, bool final, Int32 bucket) |
| { |
| if (!variants.isTwoLevel()) |
| return {}; |
| if (bucket >= getBucketsNum(variants)) |
| return {}; |
| return aggregator.convertOneBucketToBlock(variants, arena, final, bucket); |
| } |
| |
| template <typename Method> |
| static void releaseOneBucket(Method & method, Int32 bucket) |
| { |
| method.data.impls[bucket].clearAndShrink(); |
| } |
| |
| void GlutenAggregatorUtil::safeReleaseOneBucket(AggregatedDataVariants & variants, Int32 bucket) |
| { |
| if (!variants.isTwoLevel()) |
| return; |
| if (bucket >= getBucketsNum(variants)) |
| return; |
| #define M(NAME) else if (variants.type == AggregatedDataVariants::Type::NAME) releaseOneBucket(*variants.NAME, bucket); |
| |
| if (false) |
| { |
| } // NOLINT |
| APPLY_FOR_VARIANTS_TWO_LEVEL(M) |
| #undef M |
| else throw Exception(ErrorCodes::UNKNOWN_AGGREGATED_DATA_VARIANT, "Unknown aggregated data variant"); |
| } |
| |
| } |
| |
| namespace local_engine |
| { |
| AggregateDataBlockConverter::AggregateDataBlockConverter( |
| DB::Aggregator & aggregator_, DB::AggregatedDataVariantsPtr data_variants_, bool final_) |
| : aggregator(aggregator_), data_variants(std::move(data_variants_)), final(final_) |
| { |
| if (data_variants->isTwoLevel()) |
| buckets_num = DB::GlutenAggregatorUtil::getBucketsNum(*data_variants); |
| else if (data_variants->size()) |
| buckets_num = 1; |
| else |
| buckets_num = 0; |
| } |
| |
| bool AggregateDataBlockConverter::hasNext() |
| { |
| while (current_bucket < buckets_num && output_blocks.empty()) |
| { |
| if (data_variants->isTwoLevel()) |
| { |
| Stopwatch watch; |
| auto optional_block = DB::GlutenAggregatorUtil::safeConvertOneBucketToBlock( |
| aggregator, *data_variants, data_variants->aggregates_pool, final, current_bucket); |
| if (!optional_block) |
| throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Invalid bucket number {} for two-level aggregation", current_bucket); |
| auto & block = *optional_block; |
| LOG_DEBUG( |
| &Poco::Logger::get("AggregateDataBlockConverter"), |
| "Convert bucket {} into one block, rows: {}, cols: {}, bytes:{}, total bucket: {}, total rows: {}, time: {}", |
| current_bucket, |
| block.rows(), |
| block.columns(), |
| ReadableSize(block.allocatedBytes()), |
| buckets_num, |
| data_variants->size(), |
| watch.elapsedMilliseconds()); |
| DB::GlutenAggregatorUtil::safeReleaseOneBucket(*data_variants, current_bucket); |
| if (block.rows()) |
| output_blocks.emplace_back(std::move(block)); |
| } |
| else |
| { |
| size_t keys = data_variants->size(); |
| auto blocks = aggregator.convertToBlocks(*data_variants, final); |
| size_t total_allocated_bytes = 0; |
| size_t total_bytes = 0; |
| while (!blocks.empty()) |
| { |
| if (blocks.front().rows()) |
| { |
| total_allocated_bytes += blocks.front().allocatedBytes(); |
| total_bytes += blocks.front().bytes(); |
| output_blocks.emplace_back(std::move(blocks.front())); |
| } |
| blocks.pop_front(); |
| } |
| LOG_DEBUG( |
| &Poco::Logger::get("AggregateDataBlockConverter"), |
| "Convert single level hash table into blocks. blocks: {}, total bytes: {}, total allocated bytes: {}, total rows: {}", |
| output_blocks.size(), |
| ReadableSize(total_bytes), |
| ReadableSize(total_allocated_bytes), |
| keys); |
| data_variants = nullptr; |
| } |
| ++current_bucket; |
| } |
| return !output_blocks.empty(); |
| } |
| |
| DB::Block AggregateDataBlockConverter::next() |
| { |
| auto block = output_blocks.front(); |
| output_blocks.pop_front(); |
| return block; |
| } |
| |
| DB::Aggregator::Params AggregatorParamsHelper::buildParams( |
| const DB::ContextPtr & context, |
| const DB::Names & grouping_keys, |
| const DB::AggregateDescriptions & agg_descriptions, |
| Mode mode, |
| Algorithm algorithm) |
| { |
| const auto & settings = context->getSettingsRef(); |
| size_t max_rows_to_group_by = mode == Mode::PARTIAL_TO_FINISHED ? 0 : static_cast<size_t>(settings[DB::Setting::max_rows_to_group_by]); |
| |
| size_t group_by_two_level_threshold |
| = algorithm == Algorithm::GlutenGraceAggregate ? static_cast<size_t>(settings[DB::Setting::group_by_two_level_threshold]) : 0; |
| size_t group_by_two_level_threshold_bytes = algorithm == Algorithm::GlutenGraceAggregate |
| ? 0 |
| : (mode == Mode::PARTIAL_TO_FINISHED ? 0 : static_cast<size_t>(settings[DB::Setting::group_by_two_level_threshold_bytes])); |
| double max_bytes_ratio_before_external_group_by = algorithm == Algorithm::GlutenGraceAggregate |
| ? 0.0 |
| : (mode == Mode::PARTIAL_TO_FINISHED ? 0.0 : settings[DB::Setting::max_bytes_ratio_before_external_group_by]); |
| size_t max_bytes_before_external_group_by = algorithm == Algorithm::GlutenGraceAggregate |
| ? 0 |
| : (mode == Mode::PARTIAL_TO_FINISHED ? 0 : static_cast<size_t>(settings[DB::Setting::max_bytes_before_external_group_by])); |
| bool empty_result_for_aggregation_by_empty_set = algorithm == Algorithm::GlutenGraceAggregate |
| ? false |
| : (mode == Mode::PARTIAL_TO_FINISHED ? false : static_cast<bool>(settings[DB::Setting::empty_result_for_aggregation_by_empty_set])); |
| DB::TemporaryDataOnDiskScopePtr tmp_data_scope = algorithm == Algorithm::GlutenGraceAggregate ? nullptr : context->getTempDataOnDisk(); |
| |
| size_t min_free_disk_space = algorithm == Algorithm::GlutenGraceAggregate |
| ? 0 |
| : static_cast<size_t>(settings[DB::Setting::min_free_disk_space_for_temporary_data]); |
| bool compile_aggregate_expressions = mode == Mode::PARTIAL_TO_FINISHED ? false : settings[DB::Setting::compile_aggregate_expressions]; |
| size_t min_count_to_compile_aggregate_expression |
| = mode == Mode::PARTIAL_TO_FINISHED ? 0 : settings[DB::Setting::min_count_to_compile_aggregate_expression]; |
| size_t max_block_size = PODArrayUtil::adjustMemoryEfficientSize(settings[DB::Setting::max_block_size]); |
| bool enable_prefetch = mode != Mode::PARTIAL_TO_FINISHED; |
| bool only_merge = mode == Mode::PARTIAL_TO_FINISHED; |
| bool optimize_group_by_constant_keys |
| = mode == Mode::PARTIAL_TO_FINISHED ? false : settings[DB::Setting::optimize_group_by_constant_keys]; |
| |
| DB::Settings aggregate_settings{settings}; |
| aggregate_settings[DB::Setting::max_rows_to_group_by] = max_rows_to_group_by; |
| aggregate_settings[DB::Setting::max_bytes_ratio_before_external_group_by] = max_bytes_ratio_before_external_group_by; |
| aggregate_settings[DB::Setting::max_bytes_before_external_group_by] = max_bytes_before_external_group_by; |
| aggregate_settings[DB::Setting::min_free_disk_space_for_temporary_data] = min_free_disk_space; |
| aggregate_settings[DB::Setting::compile_aggregate_expressions] = compile_aggregate_expressions; |
| aggregate_settings[DB::Setting::min_count_to_compile_aggregate_expression] = min_count_to_compile_aggregate_expression; |
| aggregate_settings[DB::Setting::max_block_size] = max_block_size; |
| aggregate_settings[DB::Setting::enable_software_prefetch_in_aggregation] = enable_prefetch; |
| aggregate_settings[DB::Setting::optimize_group_by_constant_keys] = optimize_group_by_constant_keys; |
| return DB::Aggregator::Params{ |
| grouping_keys, |
| agg_descriptions, |
| /*overflow_row*/ false, |
| aggregate_settings[DB::Setting::max_rows_to_group_by], |
| aggregate_settings[DB::Setting::group_by_overflow_mode], |
| group_by_two_level_threshold, |
| group_by_two_level_threshold_bytes, |
| DB::Aggregator::Params::getMaxBytesBeforeExternalGroupBy( |
| aggregate_settings[DB::Setting::max_bytes_before_external_group_by], |
| aggregate_settings[DB::Setting::max_bytes_ratio_before_external_group_by]), |
| empty_result_for_aggregation_by_empty_set, |
| tmp_data_scope, |
| aggregate_settings[DB::Setting::max_threads], |
| aggregate_settings[DB::Setting::min_free_disk_space_for_temporary_data], |
| aggregate_settings[DB::Setting::compile_aggregate_expressions], |
| aggregate_settings[DB::Setting::min_count_to_compile_aggregate_expression], |
| aggregate_settings[DB::Setting::max_block_size], |
| aggregate_settings[DB::Setting::enable_software_prefetch_in_aggregation], |
| only_merge, |
| aggregate_settings[DB::Setting::optimize_group_by_constant_keys], |
| aggregate_settings[DB::Setting::min_hit_rate_to_use_consecutive_keys_optimization], |
| {}}; |
| } |
| |
| |
| #define COMPARE_FIELD(field) \ |
| if (lhs.field != rhs.field) \ |
| { \ |
| LOG_ERROR(getLogger("AggregatorParamsHelper"), "Aggregator::Params field " #field " is not equal. {}/{}", lhs.field, rhs.field); \ |
| return false; \ |
| } |
| bool AggregatorParamsHelper::compare(const DB::Aggregator::Params & lhs, const DB::Aggregator::Params & rhs) |
| { |
| COMPARE_FIELD(overflow_row); |
| COMPARE_FIELD(max_rows_to_group_by); |
| COMPARE_FIELD(group_by_overflow_mode); |
| COMPARE_FIELD(group_by_two_level_threshold); |
| COMPARE_FIELD(group_by_two_level_threshold_bytes); |
| COMPARE_FIELD(max_bytes_before_external_group_by); |
| COMPARE_FIELD(empty_result_for_aggregation_by_empty_set); |
| COMPARE_FIELD(max_threads); |
| COMPARE_FIELD(min_free_disk_space); |
| COMPARE_FIELD(compile_aggregate_expressions); |
| if ((lhs.tmp_data_scope == nullptr) != (rhs.tmp_data_scope == nullptr)) |
| { |
| LOG_ERROR(getLogger("AggregatorParamsHelper"), "Aggregator::Params field tmp_data_scope is not equal."); |
| return false; |
| } |
| COMPARE_FIELD(min_count_to_compile_aggregate_expression); |
| COMPARE_FIELD(enable_prefetch); |
| COMPARE_FIELD(only_merge); |
| COMPARE_FIELD(optimize_group_by_constant_keys); |
| COMPARE_FIELD(min_hit_rate_to_use_consecutive_keys_optimization); |
| return true; |
| } |
| } |