| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #include "StreamingAggregatingStep.h" |
| #include <Processors/Port.h> |
| #include <Processors/Transforms/AggregatingTransform.h> |
| #include <QueryPipeline/QueryPipelineBuilder.h> |
| #include <Common/BlockTypeUtils.h> |
| #include <Common/CHUtil.h> |
| #include <Common/GlutenConfig.h> |
| #include <Common/QueryContext.h> |
| #include <Common/Stopwatch.h> |
| #include <Common/formatReadable.h> |
| #include <Common/logger_useful.h> |
| |
| namespace DB |
| { |
| namespace ErrorCodes |
| { |
| extern const int LOGICAL_ERROR; |
| } |
| } |
| |
| namespace local_engine |
| { |
| StreamingAggregatingTransform::StreamingAggregatingTransform( |
| DB::ContextPtr context_, const DB::SharedHeader & header_, DB::AggregatingTransformParamsPtr params_) |
| : DB::IProcessor({header_}, {params_->getHeader()}) |
| , context(context_) |
| , key_columns(params_->params.keys_size) |
| , aggregate_columns(params_->params.aggregates_size) |
| , params(params_) |
| { |
| auto config = StreamingAggregateConfig::loadFromContext(context); |
| aggregated_keys_before_evict = config.aggregated_keys_before_streaming_aggregating_evict; |
| aggregated_keys_before_evict = PODArrayUtil::adjustMemoryEfficientSize(aggregated_keys_before_evict); |
| max_allowed_memory_usage_ratio = config.max_memory_usage_ratio_for_streaming_aggregating; |
| high_cardinality_threshold = config.high_cardinality_threshold_for_streaming_aggregating; |
| } |
| |
| StreamingAggregatingTransform::~StreamingAggregatingTransform() |
| { |
| LOG_INFO( |
| logger, |
| "Metrics. total_input_blocks: {}, total_input_rows: {}, total_output_blocks: {}, total_output_rows: {}, " |
| "total_clear_data_variants_num: {}, total_aggregate_time: {}, total_convert_data_variants_time: {}, current mem usage: {}", |
| total_input_blocks, |
| total_input_rows, |
| total_output_blocks, |
| total_output_rows, |
| total_clear_data_variants_num, |
| total_aggregate_time, |
| total_convert_data_variants_time, |
| ReadableSize(currentThreadGroupMemoryUsage())); |
| } |
| |
| StreamingAggregatingTransform::Status StreamingAggregatingTransform::prepare() |
| { |
| auto & output = outputs.front(); |
| auto & input = inputs.front(); |
| if (output.isFinished() || isCancelled()) |
| { |
| input.close(); |
| return Status::Finished; |
| } |
| |
| if (has_output) |
| { |
| if (output.canPush()) |
| { |
| LOG_DEBUG( |
| logger, |
| "Output one chunk. rows: {}, bytes: {}, current memory usage: {}", |
| output_chunk.getNumRows(), |
| ReadableSize(output_chunk.bytes()), |
| ReadableSize(currentThreadGroupMemoryUsage())); |
| total_output_rows += output_chunk.getNumRows(); |
| total_output_blocks++; |
| if (!output_chunk.getNumRows()) |
| throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Invalid output chunk"); |
| output.push(std::move(output_chunk)); |
| has_output = false; |
| } |
| return Status::PortFull; |
| } |
| |
| if (has_input) |
| return Status::Ready; |
| |
| if (input.isFinished()) |
| { |
| /// to trigger the evict action anyway. |
| input_finished = true; |
| |
| /// data is not cleared |
| if (data_variants || (block_converter && block_converter->hasNext())) |
| { |
| has_input = true; |
| return Status::Ready; |
| } |
| else |
| { |
| output.finish(); |
| return Status::Finished; |
| } |
| } |
| |
| input.setNeeded(); |
| if (!input.hasData()) |
| { |
| return Status::NeedData; |
| } |
| input_chunk = input.pull(true); |
| LOG_DEBUG( |
| logger, |
| "Input one new chunk. rows: {}, bytes: {}, current memory usage: {}", |
| input_chunk.getNumRows(), |
| ReadableSize(input_chunk.bytes()), |
| ReadableSize(currentThreadGroupMemoryUsage())); |
| total_input_rows += input_chunk.getNumRows(); |
| total_input_blocks++; |
| has_input = true; |
| return Status::Ready; |
| } |
| |
| bool StreamingAggregatingTransform::needEvict() |
| { |
| if (input_finished) |
| return true; |
| auto memory_soft_limit = DB::CurrentThread::getGroup()->memory_tracker.getSoftLimit(); |
| if (!memory_soft_limit) |
| return false; |
| auto max_mem_used = static_cast<size_t>(memory_soft_limit * max_allowed_memory_usage_ratio); |
| auto current_result_rows = data_variants->size(); |
| /// avoid evict empty or too small aggregated results. |
| if (current_result_rows < aggregated_keys_before_evict) |
| return false; |
| |
| /// If the grouping keys is high cardinality, we should evict data variants early, and avoid to use a big |
| /// hash table. |
| if (static_cast<double>(total_output_rows) / total_input_rows > high_cardinality_threshold) |
| return true; |
| |
| auto current_mem_used = currentThreadGroupMemoryUsage(); |
| if (per_key_memory_usage > 0) |
| { |
| /// When we know each key memory usage, we can take a more greedy memory usage strategy |
| if (current_mem_used + per_key_memory_usage * current_result_rows >= max_mem_used) |
| { |
| LOG_INFO( |
| logger, |
| "Memory is overflow. current_mem_used: {}, max_mem_used: {}, per_key_memory_usage: {}, aggregator keys: {}, hash table type: {}", |
| ReadableSize(current_mem_used), |
| ReadableSize(max_mem_used), |
| ReadableSize(per_key_memory_usage), |
| current_result_rows, |
| data_variants->type); |
| return true; |
| } |
| } |
| else |
| { |
| /// For safety, we should evict data variants when memory usage is overflow on half of max usage at the firs time. |
| /// Usually, the peak memory usage to convert aggregated data variant into blocks is about double of the hash table. |
| if (current_mem_used * 2 >= max_mem_used) |
| { |
| LOG_INFO( |
| logger, |
| "Memory is overflow on half of max usage. current_mem_used: {}, max_mem_used: {}, aggregator keys: {}, hash table type: {}", |
| ReadableSize(current_mem_used), |
| ReadableSize(max_mem_used), |
| current_result_rows, |
| data_variants->type); |
| return true; |
| } |
| } |
| return false; |
| } |
| |
| |
| void StreamingAggregatingTransform::work() |
| { |
| auto pop_one_pending_block = [&]() |
| { |
| bool res = false; |
| if (block_converter) |
| { |
| while (block_converter->hasNext()) |
| { |
| has_output = false; |
| Stopwatch convert_watch; |
| auto block = block_converter->next(); |
| output_chunk = DB::convertToChunk(block); |
| total_convert_data_variants_time += convert_watch.elapsedMicroseconds(); |
| if (!output_chunk.getNumRows()) |
| continue; |
| has_output = true; |
| per_key_memory_usage = output_chunk.allocatedBytes() * 1.0 / output_chunk.getNumRows(); |
| res = true; |
| break; |
| } |
| has_input = block_converter->hasNext(); |
| if (!block_converter->hasNext()) |
| { |
| block_converter = nullptr; |
| } |
| } |
| else |
| has_input = false; |
| return res; |
| }; |
| |
| if (has_input) |
| { |
| /// If there is a AggregateDataBlockConverter in working, generate one block and return. |
| if (pop_one_pending_block()) |
| return; |
| if (block_converter) |
| { |
| throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "block_converter should be null"); |
| } |
| |
| if (!data_variants) |
| { |
| data_variants = std::make_shared<DB::AggregatedDataVariants>(); |
| } |
| |
| has_input = false; |
| if (input_chunk.getNumRows()) |
| { |
| auto num_rows = input_chunk.getNumRows(); |
| Stopwatch watch; |
| params->aggregator.executeOnBlock( |
| input_chunk.detachColumns(), 0, num_rows, *data_variants, key_columns, aggregate_columns, no_more_keys); |
| total_aggregate_time += watch.elapsedMicroseconds(); |
| input_chunk = {}; |
| } |
| |
| if (needEvict()) |
| { |
| block_converter = std::make_unique<AggregateDataBlockConverter>(params->aggregator, data_variants, false); |
| data_variants = nullptr; |
| total_clear_data_variants_num++; |
| pop_one_pending_block(); |
| } |
| } |
| else |
| { |
| throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Invalid state"); |
| } |
| } |
| |
| static DB::ITransformingStep::Traits getTraits() |
| { |
| return DB::ITransformingStep::Traits{ |
| { |
| .preserves_number_of_streams = false, |
| .preserves_sorting = false, |
| }, |
| { |
| .preserves_number_of_rows = false, |
| }}; |
| } |
| |
| static DB::Block buildOutputHeader(const DB::Block & input_header_, const DB::Aggregator::Params params_) |
| { |
| return params_.getHeader(input_header_, false); |
| } |
| StreamingAggregatingStep::StreamingAggregatingStep( |
| const DB::ContextPtr & context_, const DB::SharedHeader & input_header, DB::Aggregator::Params params_) |
| : DB::ITransformingStep(input_header, toShared(buildOutputHeader(*input_header, params_)), getTraits()) |
| , context(context_) |
| , params(std::move(params_)) |
| { |
| } |
| |
| void StreamingAggregatingStep::transformPipeline(DB::QueryPipelineBuilder & pipeline, const DB::BuildQueryPipelineSettings &) |
| { |
| if (params.max_bytes_before_external_group_by) |
| { |
| throw DB::Exception( |
| DB::ErrorCodes::LOGICAL_ERROR, "max_bytes_before_external_group_by is not supported in StreamingAggregatingStep"); |
| } |
| pipeline.dropTotalsAndExtremes(); |
| auto transform_params = std::make_shared<DB::AggregatingTransformParams>(pipeline.getSharedHeader(), params, false); |
| pipeline.resize(1); |
| auto build_transform = [&](DB::OutputPortRawPtrs outputs) |
| { |
| DB::Processors new_processors; |
| for (auto & output : outputs) |
| { |
| auto op = std::make_shared<StreamingAggregatingTransform>(context, pipeline.getSharedHeader(), transform_params); |
| new_processors.push_back(op); |
| DB::connect(*output, op->getInputs().front()); |
| } |
| return new_processors; |
| }; |
| pipeline.transform(build_transform); |
| } |
| |
| void StreamingAggregatingStep::describeActions(DB::IQueryPlanStep::FormatSettings & settings) const |
| { |
| params.explain(settings.out, settings.offset); |
| } |
| |
| void StreamingAggregatingStep::describeActions(DB::JSONBuilder::JSONMap & map) const |
| { |
| params.explain(map); |
| } |
| |
| void StreamingAggregatingStep::updateOutputHeader() |
| { |
| output_header = toShared(buildOutputHeader(*input_headers.front(), params)); |
| } |
| |
| } |