| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| #pragma once |
| #include <Core/Block.h> |
| #include <Interpreters/Aggregator.h> |
| #include <Interpreters/Context.h> |
| #include <Interpreters/JoinUtils.h> |
| #include <Processors/Chunk.h> |
| #include <Processors/IProcessor.h> |
| #include <Processors/Transforms/AggregatingTransform.h> |
| #include <Poco/Logger.h> |
| #include <Common/AggregateUtil.h> |
| #include <Common/MemorySpillScheduler.h> |
| |
| |
| namespace local_engine |
| { |
| /** |
| * GraceAggregatingTransform is use to aggregate original data or merging intermediate aggregating result. It support spilling data into disk |
| * when the memory usage is over limit. |
| * If the input data is original data, no_pre_aggregated is true. If it's intermediate aggregating result, no_pre_aggregated is false. |
| * IF the output data is final aggregating result, final is true, otherwise it false. |
| */ |
| class GraceAggregatingTransform : public DB::IProcessor |
| { |
| public: |
| using Status = DB::IProcessor::Status; |
| explicit GraceAggregatingTransform( |
| const DB::SharedHeader & header_, |
| DB::AggregatingTransformParamsPtr params_, |
| DB::ContextPtr context_, |
| bool no_pre_aggregated_, |
| bool final_output_); |
| ~GraceAggregatingTransform() override; |
| |
| Status prepare() override; |
| void work() override; |
| String getName() const override { return "GraceAggregatingTransform"; } |
| |
| private: |
| bool no_pre_aggregated; |
| bool final_output; |
| DB::SharedHeader header; |
| DB::Block output_header; |
| DB::ColumnRawPtrs key_columns; |
| DB::Aggregator::AggregateColumns aggregate_columns; |
| DB::AggregatingTransformParamsPtr params; |
| DB::ContextPtr context; |
| DB::TemporaryDataOnDiskScopePtr tmp_data_disk; |
| DB::AggregatedDataVariantsPtr current_data_variants = nullptr; |
| size_t current_bucket_index = 0; |
| |
| /// Followings are configurations defined in context config. |
| // max buckets number, default is 32 |
| size_t max_buckets = 0; |
| // If the buckets number is overflow the max_buckets, throw exception or not. |
| bool throw_on_overflow_buckets = false; |
| // Even the memory usage has reached the limit, we still allow to aggregate some more keys before |
| // extend the buckets. |
| size_t aggregated_keys_before_extend_buckets = 8196; |
| // The ratio of memory usage to the total memory usage of the whole query. |
| double max_allowed_memory_usage_ratio = 0.9; |
| // configured by max_pending_flush_blocks_per_grace_merging_bucket |
| size_t max_pending_flush_blocks_per_bucket = 0; |
| |
| struct BufferFileStream |
| { |
| /// store the intermediate result blocks. |
| std::list<DB::Block> intermediate_blocks; |
| /// Only be used when there is no pre-aggregated step, store the original input blocks. |
| std::list<DB::Block> original_blocks; |
| /// store the intermediate result blocks. |
| std::optional<DB::TemporaryBlockStreamHolder> intermediate_file_stream; |
| /// Only be used when there is no pre-aggregated step |
| std::optional<DB::TemporaryBlockStreamHolder> original_file_stream; |
| size_t pending_bytes = 0; |
| }; |
| std::unordered_map<size_t, BufferFileStream> buckets; |
| |
| size_t getBucketsNum() const { return buckets.size(); } |
| bool extendBuckets(); |
| void rehashDataVariants(); |
| DB::Blocks scatterBlock(const DB::Block & block); |
| /// Add a block into a bucket, if the pending bytes reaches limit, flush it into disk. |
| void addBlockIntoFileBucket(size_t bucket_index, const DB::Block & block, bool is_original_block); |
| void flushBuckets(); |
| size_t flushBucket(size_t bucket_index); |
| /// Load blocks from disk and merge them into a new hash table, make a new AggregateDataBlockConverter |
| /// to generate output blocks. |
| std::unique_ptr<AggregateDataBlockConverter> prepareBucketOutputBlocks(size_t bucket); |
| /// Pass current_final_blocks into a new AggregateDataBlockConverter to generate output blocks. |
| std::unique_ptr<AggregateDataBlockConverter> currentDataVariantToBlockConverter(bool final); |
| void checkAndSetupCurrentDataVariants(); |
| /// Merge one block into current_data_variants. |
| void mergeOneBlock(const DB::Block & block, bool is_original_block); |
| |
| // spill control |
| bool isMemoryOverflow(); |
| DB::ProcessorMemoryStats getMemoryStats() override; |
| bool spillOnSize(size_t bytes) override; |
| bool force_spill = false; // a force flag to trigger spill |
| bool force_spill_on_bytes = 0; |
| |
| bool input_finished = false; |
| bool has_input = false; |
| DB::Chunk input_chunk; |
| bool has_output = false; |
| DB::Chunk output_chunk; |
| DB::BlocksList current_final_blocks; |
| std::unique_ptr<AggregateDataBlockConverter> block_converter = nullptr; |
| bool no_more_keys = false; |
| bool enable_spill_test = false; |
| |
| double per_key_memory_usage = 0; |
| |
| // metrics |
| size_t total_input_blocks = 0; |
| size_t total_input_rows = 0; |
| size_t total_output_blocks = 0; |
| size_t total_output_rows = 0; |
| size_t total_spill_disk_bytes = 0; |
| size_t total_spill_disk_time = 0; |
| size_t total_read_disk_time = 0; |
| size_t total_scatter_time = 0; |
| |
| Poco::Logger * logger = &Poco::Logger::get("GraceMergingAggregatedTransform"); |
| }; |
| } |