| /** |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, |
| * software distributed under the License is distributed on an |
| * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY |
| * KIND, either express or implied. See the License for the |
| * specific language governing permissions and limitations |
| * under the License. |
| **/ |
| |
| #ifndef QUICKSTEP_RELATIONAL_OPERATORS_AGGREGATION_OPERATION_HPP_ |
| #define QUICKSTEP_RELATIONAL_OPERATORS_AGGREGATION_OPERATION_HPP_ |
| |
| #include <cstddef> |
| #include <memory> |
| #include <vector> |
| |
| #include "catalog/CatalogTypedefs.hpp" |
| #include "expressions/aggregation/AggregationHandle.hpp" |
| #include "expressions/aggregation/AggregationID.hpp" |
| #include "expressions/predicate/Predicate.hpp" |
| #include "expressions/scalar/Scalar.hpp" |
| #include "storage/AggregationOperationState.pb.h" |
| #include "storage/HashTableBase.hpp" |
| #include "storage/HashTablePool.hpp" |
| #include "storage/PartitionedHashTablePool.hpp" |
| #include "storage/StorageBlockInfo.hpp" |
| #include "utility/Macros.hpp" |
| |
| #include "gflags/gflags.h" |
| |
| namespace quickstep { |
| |
| class AggregateFunction; |
| class CatalogDatabaseLite; |
| class CatalogRelationSchema; |
| class InsertDestination; |
| class LIPFilterAdaptiveProber; |
| class StorageManager; |
| |
| DECLARE_int32(num_aggregation_partitions); |
| DECLARE_int32(partition_aggregation_num_groups_threshold); |
| |
| /** \addtogroup Storage |
| * @{ |
| */ |
| |
| /** |
| * @brief Helper class for maintaining state during aggregation operation. |
| * If a GROUP BY list was provided, this class maintains a hash table |
| * for each aggregate computed where the key is the GROUP BY expression |
| * values and payload is each group's corresponding running aggregation |
| * state. Without GROUP BY, this class maintains a single aggregation |
| * state for each aggregate computed. |
| * @note See also AggregationHandle, which encapsulates logic for actually |
| * computing aggregates, and AggregateFunction, which represents a |
| * particular SQL aggregate in the abstract sense. |
| * |
| * This class represents the common state for an instance of |
| * AggregationOperator, and also encapsulates the high-level logic for |
| * aggregating over blocks and generating final results. |
| * AggregationWorkOrder::execute() is mainly just a call to aggregateBlock(), |
| * while FinalizeAggregationWorkOrder::execute() is mainly just a call to |
| * finalizeAggregate(). |
| **/ |
| class AggregationOperationState { |
| public: |
| /** |
| * @brief Constructor for aggregation operation state. |
| * @note The order of some of the parameters to this constructor (or the |
| * corresponding fields when reconstructing from a protobuf) determines |
| * the schema of tuples written out by finalizeAggregate(). If group_by |
| * is nonempty, the first attribute(s) will be the group-by values, in |
| * order. Following that will be the values for each aggregate |
| * specified by aggregate_functions (with arguments specified by |
| * attributes), in order. |
| * |
| * @param input_relation Input relation on which the aggregates are computed. |
| * @param aggregate_functions A list of the aggregate functions to be |
| * computed. |
| * @param arguments For each entry in aggregate_functions, a corresponding |
| * list of argument expressions to that aggregate. This is moved-from, |
| * with AggregationOperationState taking ownership. |
| * @param is_distinct For each entry in aggregate_functions, whether DISTINCT |
| * should be applied to the entry's arguments. |
| * @param group_by A list of expressions to compute the GROUP BY values. If |
| * empty, no grouping is used. This is moved-from, with |
| * AggregationOperationState taking ownership. |
| * @param predicate The predicate to be applied prior to aggregation. nullptr |
| * indicates no predicate to be applied. This object takes ownership |
| * of predicate. |
| * @param estimated_num_entries Estimated of number of entries in the hash |
| * table. A good estimate would be a fraction of total number of tuples |
| * in the input relation. |
| * @param hash_table_impl_type The HashTable implementation to use for |
| * GROUP BY. Ignored if group_by is empty. |
| * @param distinctify_hash_table_impl_type The HashTable implementation to use |
| * for the distinctify phase of each DISTINCT aggregation. |
| * @param storage_manager The StorageManager to use for allocating hash |
| * tables. Single aggregation state (when GROUP BY list is not |
| * specified) is not allocated using memory from storage manager. |
| */ |
| AggregationOperationState( |
| const CatalogRelationSchema &input_relation, |
| const std::vector<const AggregateFunction *> &aggregate_functions, |
| std::vector<std::vector<std::unique_ptr<const Scalar>>> &&arguments, |
| std::vector<bool> &&is_distinct, |
| std::vector<std::unique_ptr<const Scalar>> &&group_by, |
| const Predicate *predicate, |
| const std::size_t estimated_num_entries, |
| const HashTableImplType hash_table_impl_type, |
| const std::vector<HashTableImplType> &distinctify_hash_table_impl_types, |
| StorageManager *storage_manager); |
| |
| ~AggregationOperationState() {} |
| |
| /** |
| * @brief Generate the aggregation operation state from the serialized |
| * Protocol Buffer representation. |
| * @note The order of some repeated fields in the proto representation |
| * determines the schema of tuples written out by finalizeAggregate(). |
| * See the note for the constructor for details. |
| * |
| * @param proto A serialized Protocol Buffer representation of an |
| * AggregationOperationState, originally generated by the optimizer. |
| * @param database The Database to resolve relation and attribute references |
| * in. |
| * @param storage_manager The StorageManager to use. |
| **/ |
| static AggregationOperationState* ReconstructFromProto( |
| const serialization::AggregationOperationState &proto, |
| const CatalogDatabaseLite &database, |
| StorageManager *storage_manager); |
| |
| /** |
| * @brief Check whether a serialization::AggregationOperationState is |
| * fully-formed and all parts are valid. |
| * |
| * @param proto A serialized Protocol Buffer representation of an |
| * AggregationOperationState, originally generated by the optimizer. |
| * @param database The Database to resolve relation and attribute references |
| * in. |
| * @return Whether proto is fully-formed and valid. |
| **/ |
| static bool ProtoIsValid( |
| const serialization::AggregationOperationState &proto, |
| const CatalogDatabaseLite &database); |
| |
| /** |
| * @brief Compute aggregates on the tuples of the given storage block, |
| * updating the running state maintained by this |
| * AggregationOperationState. |
| * |
| * @param input_block The block ID of the storage block where the aggreates |
| * are going to be computed. |
| * @param lip_filter_adaptive_prober The LIPFilter prober for pre-filtering |
| * the block. |
| **/ |
| void aggregateBlock(const block_id input_block, |
| LIPFilterAdaptiveProber *lip_filter_adaptive_prober); |
| |
| /** |
| * @brief Generate the final results for the aggregates managed by this |
| * AggregationOperationState and write them out to StorageBlock(s). |
| * |
| * @param output_destination An InsertDestination where the finalized output |
| * tuple(s) from this aggregate are to be written. |
| **/ |
| void finalizeAggregate(InsertDestination *output_destination); |
| |
| /** |
| * @brief Destroy the payloads in the aggregation hash tables. |
| **/ |
| void destroyAggregationHashTablePayload(); |
| |
| /** |
| * @brief Generate the final results for the aggregates managed by this |
| * AggregationOperationState and write them out to StorageBlock(s). |
| * In this implementation, each thread picks a hash table belonging to |
| * a partition and writes its values to StorageBlock(s). There is no |
| * need to merge multiple hash tables in one, because there is no |
| * overlap in the keys across two hash tables. |
| * |
| * @param partition_id The ID of the partition for which finalize is being |
| * performed. |
| * @param output_destination An InsertDestination where the finalized output |
| * tuple(s) from this aggregate are to be written. |
| **/ |
| void finalizeAggregatePartitioned( |
| const std::size_t partition_id, InsertDestination *output_destination); |
| |
| static void mergeGroupByHashTables(AggregationStateHashTableBase *src, |
| AggregationStateHashTableBase *dst); |
| |
| bool isAggregatePartitioned() const { |
| return is_aggregate_partitioned_; |
| } |
| |
| /** |
| * @brief Get the number of partitions to be used for the aggregation. |
| * For non-partitioned aggregations, we return 1. |
| **/ |
| std::size_t getNumPartitions() const { |
| return is_aggregate_partitioned_ |
| ? partitioned_group_by_hashtable_pool_->getNumPartitions() |
| : 1; |
| } |
| |
| int dflag; |
| |
| private: |
| // Merge locally (per storage block) aggregated states with global aggregation |
| // states. |
| void mergeSingleState( |
| const std::vector<std::unique_ptr<AggregationState>> &local_state); |
| |
| // Aggregate on input block. |
| void aggregateBlockSingleState(const block_id input_block); |
| void aggregateBlockHashTable(const block_id input_block, |
| LIPFilterAdaptiveProber *lip_filter_adaptive_prober); |
| |
| void finalizeSingleState(InsertDestination *output_destination); |
| void finalizeHashTable(InsertDestination *output_destination); |
| |
| bool checkAggregatePartitioned( |
| const std::size_t estimated_num_groups, |
| const std::vector<bool> &is_distinct, |
| const std::vector<std::unique_ptr<const Scalar>> &group_by, |
| const std::vector<const AggregateFunction *> &aggregate_functions) const { |
| // If there's no aggregation, return false. |
| if (aggregate_functions.empty()) { |
| return false; |
| } |
| // Check if there's a distinct operation involved in any aggregate, if so |
| // the aggregate can't be partitioned. |
| for (auto distinct : is_distinct) { |
| if (distinct) { |
| return false; |
| } |
| } |
| // There's no distinct aggregation involved, Check if there's at least one |
| // GROUP BY operation. |
| if (group_by.empty()) { |
| return false; |
| } |
| // There are GROUP BYs without DISTINCT. Check if the estimated number of |
| // groups is large enough to warrant a partitioned aggregation. |
| return estimated_num_groups > |
| static_cast<std::size_t>( |
| FLAGS_partition_aggregation_num_groups_threshold); |
| } |
| |
| // Common state for all aggregates in this operation: the input relation, the |
| // filter predicate (if any), and the list of GROUP BY expressions (if any). |
| const CatalogRelationSchema &input_relation_; |
| |
| // Whether the aggregation is partitioned or not. |
| const bool is_aggregate_partitioned_; |
| |
| std::unique_ptr<const Predicate> predicate_; |
| std::vector<std::unique_ptr<const Scalar>> group_by_list_; |
| |
| // Each individual aggregate in this operation has an AggregationHandle and |
| // some number of Scalar arguments. |
| std::vector<AggregationHandle *> handles_; |
| std::vector<std::vector<std::unique_ptr<const Scalar>>> arguments_; |
| |
| // For each aggregate, whether DISTINCT should be applied to the aggregate's |
| // arguments. |
| std::vector<bool> is_distinct_; |
| |
| // Hash table for obtaining distinct (i.e. unique) arguments. |
| std::vector<std::unique_ptr<AggregationStateHashTableBase>> |
| distinctify_hashtables_; |
| |
| #ifdef QUICKSTEP_ENABLE_VECTOR_COPY_ELISION_SELECTION |
| // If all an aggregate's argument expressions are simply attributes in |
| // 'input_relation_', then this caches the attribute IDs of those arguments. |
| std::vector<std::vector<attribute_id>> arguments_as_attributes_; |
| #endif |
| |
| // Per-aggregate global states for aggregation without GROUP BY. |
| std::vector<std::unique_ptr<AggregationState>> single_states_; |
| |
| // Per-aggregate HashTables for aggregation with GROUP BY. |
| // |
| // TODO(shoban): We should ideally store the aggregation state together in one |
| // hash table to prevent multiple lookups. |
| std::vector<std::unique_ptr<AggregationStateHashTableBase>> |
| group_by_hashtables_; |
| |
| // A vector of group by hash table pools. |
| std::unique_ptr<HashTablePool> group_by_hashtable_pool_; |
| |
| std::unique_ptr<PartitionedHashTablePool> partitioned_group_by_hashtable_pool_; |
| |
| StorageManager *storage_manager_; |
| |
| DISALLOW_COPY_AND_ASSIGN(AggregationOperationState); |
| }; |
| |
| /** @} */ |
| |
| } // namespace quickstep |
| |
| #endif // QUICKSTEP_RELATIONAL_OPERATORS_AGGREGATION_OPERATION_HPP_ |