Determine #Partitions for Aggr State Hash Table in the optimizer.
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index 564c5c8..69105e6 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -61,6 +61,7 @@
quickstep_catalog_CatalogTypedefs
quickstep_catalog_PartitionScheme
quickstep_catalog_PartitionSchemeHeader
+ quickstep_cli_Flags
quickstep_expressions_Expressions_proto
quickstep_expressions_aggregation_AggregateFunction
quickstep_expressions_aggregation_AggregateFunction_proto
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 4f6f807..88dc505 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -21,7 +21,9 @@
#include <algorithm>
#include <cstddef>
+#include <functional>
#include <memory>
+#include <numeric>
#include <string>
#include <type_traits>
#include <unordered_map>
@@ -46,6 +48,7 @@
#include "catalog/CatalogTypedefs.hpp"
#include "catalog/PartitionScheme.hpp"
#include "catalog/PartitionSchemeHeader.hpp"
+#include "cli/Flags.hpp"
#include "expressions/Expressions.pb.h"
#include "expressions/aggregation/AggregateFunction.hpp"
#include "expressions/aggregation/AggregateFunction.pb.h"
@@ -167,10 +170,83 @@
DEFINE_bool(parallelize_load, true, "Parallelize loading data files.");
+static bool ValidateNumAggregationPartitions(const char *flagname, int value) {
+ return value > 0;
+}
+DEFINE_int32(num_aggregation_partitions,
+ 41,
+ "The number of partitions in PartitionedHashTablePool used for "
+ "performing the aggregation");
+static const volatile bool num_aggregation_partitions_dummy
+ = gflags::RegisterFlagValidator(&FLAGS_num_aggregation_partitions, &ValidateNumAggregationPartitions);
+
+DEFINE_uint64(partition_aggregation_num_groups_threshold,
+ 100000,
+ "The threshold used for deciding whether the aggregation is done "
+ "in a partitioned way or not");
+
namespace E = ::quickstep::optimizer::expressions;
namespace P = ::quickstep::optimizer::physical;
namespace S = ::quickstep::serialization;
+namespace {
+
+size_t CalculateNumFinalizationPartitionsForCollisionFreeVectorTable(const size_t num_entries) {
+ // Set finalization segment size as 4096 entries.
+ constexpr size_t kFinalizeSegmentSize = 4uL * 1024L;
+
+ // At least 1 partition, at most (#workers * 2) partitions.
+ return std::max(1uL, std::min(num_entries / kFinalizeSegmentSize,
+ static_cast<size_t>(2 * FLAGS_num_workers)));
+}
+
+bool CheckAggregatePartitioned(const std::size_t num_aggregate_functions,
+ const std::vector<bool> &is_distincts,
+ const std::vector<attribute_id> &group_by_attrs,
+ const std::size_t estimated_num_groups) {
+ // If there's no aggregation, return false.
+ if (num_aggregate_functions == 0) {
+ return false;
+ }
+ // If there is only only aggregate function, we allow distinct aggregation.
+ // Otherwise it can't be partitioned with distinct aggregation.
+ if (num_aggregate_functions > 1) {
+ for (const bool distinct : is_distincts) {
+ if (distinct) {
+ return false;
+ }
+ }
+ }
+ // There's no distinct aggregation involved, Check if there's at least one
+ // GROUP BY operation.
+ if (group_by_attrs.empty()) {
+ return false;
+ }
+
+ // Currently we require that all the group-by keys are ScalarAttributes for
+ // the convenient of implementing copy elision.
+ // TODO(jianqiao): relax this requirement.
+ for (const attribute_id group_by_attr : group_by_attrs) {
+ if (group_by_attr == kInvalidAttributeID) {
+ return false;
+ }
+ }
+
+ // Currently we always use partitioned aggregation to parallelize distinct
+ // aggregation.
+ const bool all_distinct = std::accumulate(is_distincts.begin(), is_distincts.end(),
+ !is_distincts.empty(), std::logical_and<bool>());
+ if (all_distinct) {
+ return true;
+ }
+
+ // There are GROUP BYs without DISTINCT. Check if the estimated number of
+ // groups is large enough to warrant a partitioned aggregation.
+ return estimated_num_groups >= FLAGS_partition_aggregation_num_groups_threshold;
+}
+
+} // namespace
+
constexpr QueryPlan::DAGNodeIndex ExecutionGenerator::CatalogRelationInfo::kInvalidOperatorIndex;
void ExecutionGenerator::generatePlan(const P::PhysicalPtr &physical_plan) {
@@ -1618,8 +1694,8 @@
const P::AggregatePtr &physical_plan) {
const CatalogRelationInfo *input_relation_info =
findRelationInfoOutputByPhysical(physical_plan->input());
- const CatalogRelation *input_relation = input_relation_info->relation;
- const PartitionScheme *input_partition_scheme = input_relation->getPartitionScheme();
+ const CatalogRelation &input_relation = *input_relation_info->relation;
+ const PartitionScheme *input_partition_scheme = input_relation.getPartitionScheme();
const size_t num_partitions =
input_partition_scheme
? input_partition_scheme->getPartitionSchemeHeader().getNumPartitions()
@@ -1634,11 +1710,12 @@
S::AggregationOperationState *aggr_state_proto =
aggr_state_context_proto->mutable_aggregation_state();
- aggr_state_proto->set_relation_id(input_relation->getID());
+ aggr_state_proto->set_relation_id(input_relation.getID());
bool use_parallel_initialization = false;
std::vector<const Type*> group_by_types;
+ std::vector<attribute_id> group_by_attrs;
for (const E::NamedExpressionPtr &grouping_expression : physical_plan->grouping_expressions()) {
unique_ptr<const Scalar> execution_group_by_expression;
E::AliasPtr alias;
@@ -1653,10 +1730,47 @@
execution_group_by_expression.reset(
grouping_expression->concretize(attribute_substitution_map_));
}
- aggr_state_proto->add_group_by_expressions()->CopyFrom(execution_group_by_expression->getProto());
+ aggr_state_proto->add_group_by_expressions()->MergeFrom(execution_group_by_expression->getProto());
group_by_types.push_back(&execution_group_by_expression->getType());
+ group_by_attrs.push_back(execution_group_by_expression->getAttributeIdForValueAccessor());
}
+ const auto &aggregate_expressions = physical_plan->aggregate_expressions();
+ vector<bool> is_distincts;
+ for (const E::AliasPtr &named_aggregate_expression : aggregate_expressions) {
+ const E::AggregateFunctionPtr unnamed_aggregate_expression =
+ std::static_pointer_cast<const E::AggregateFunction>(named_aggregate_expression->expression());
+
+ // Add a new entry in 'aggregates'.
+ S::Aggregate *aggr_proto = aggr_state_proto->add_aggregates();
+
+ // Set the AggregateFunction.
+ aggr_proto->mutable_function()->MergeFrom(
+ unnamed_aggregate_expression->getAggregate().getProto());
+
+ // Add each of the aggregate's arguments.
+ for (const E::ScalarPtr &argument : unnamed_aggregate_expression->getArguments()) {
+ unique_ptr<const Scalar> concretized_argument(argument->concretize(attribute_substitution_map_));
+ aggr_proto->add_argument()->MergeFrom(concretized_argument->getProto());
+ }
+
+ // Set whether it is a DISTINCT aggregation.
+ const bool is_distinct = unnamed_aggregate_expression->is_distinct();
+ aggr_proto->set_is_distinct(is_distinct);
+ is_distincts.push_back(is_distinct);
+
+ // Add distinctify hash table impl type if it is a DISTINCT aggregation.
+ if (unnamed_aggregate_expression->is_distinct()) {
+ const std::vector<E::ScalarPtr> &arguments = unnamed_aggregate_expression->getArguments();
+ DCHECK_GE(arguments.size(), 1u);
+ // Right now only SeparateChaining implementation is supported.
+ aggr_state_proto->add_distinctify_hash_table_impl_types(
+ serialization::HashTableImplType::SEPARATE_CHAINING);
+ }
+ }
+
+ bool aggr_state_is_partitioned = false;
+ std::size_t aggr_state_num_partitions = 1u;
if (!group_by_types.empty()) {
const std::size_t estimated_num_groups =
cost_model_for_aggregation_->estimateNumGroupsForAggregate(physical_plan);
@@ -1671,6 +1785,7 @@
serialization::HashTableImplType::COLLISION_FREE_VECTOR);
aggr_state_proto->set_estimated_num_entries(max_num_groups);
use_parallel_initialization = true;
+ aggr_state_num_partitions = CalculateNumFinalizationPartitionsForCollisionFreeVectorTable(max_num_groups);
} else {
if (cost_model_for_aggregation_->canUseTwoPhaseCompactKeyAggregation(
physical_plan, estimated_num_groups)) {
@@ -1681,53 +1796,30 @@
// Otherwise, use SeparateChaining.
aggr_state_proto->set_hash_table_impl_type(
serialization::HashTableImplType::SEPARATE_CHAINING);
+ if (CheckAggregatePartitioned(aggregate_expressions.size(), is_distincts, group_by_attrs,
+ estimated_num_groups)) {
+ aggr_state_is_partitioned = true;
+ aggr_state_num_partitions = FLAGS_num_aggregation_partitions;
+ }
}
aggr_state_proto->set_estimated_num_entries(std::max(16uL, estimated_num_groups));
}
} else {
aggr_state_proto->set_estimated_num_entries(1uL);
}
-
- for (const E::AliasPtr &named_aggregate_expression : physical_plan->aggregate_expressions()) {
- const E::AggregateFunctionPtr unnamed_aggregate_expression =
- std::static_pointer_cast<const E::AggregateFunction>(named_aggregate_expression->expression());
-
- // Add a new entry in 'aggregates'.
- S::Aggregate *aggr_proto = aggr_state_proto->add_aggregates();
-
- // Set the AggregateFunction.
- aggr_proto->mutable_function()->CopyFrom(
- unnamed_aggregate_expression->getAggregate().getProto());
-
- // Add each of the aggregate's arguments.
- for (const E::ScalarPtr &argument : unnamed_aggregate_expression->getArguments()) {
- unique_ptr<const Scalar> concretized_argument(argument->concretize(attribute_substitution_map_));
- aggr_proto->add_argument()->CopyFrom(concretized_argument->getProto());
- }
-
- // Set whether it is a DISTINCT aggregation.
- aggr_proto->set_is_distinct(unnamed_aggregate_expression->is_distinct());
-
- // Add distinctify hash table impl type if it is a DISTINCT aggregation.
- if (unnamed_aggregate_expression->is_distinct()) {
- const std::vector<E::ScalarPtr> &arguments = unnamed_aggregate_expression->getArguments();
- DCHECK_GE(arguments.size(), 1u);
- // Right now only SeparateChaining implementation is supported.
- aggr_state_proto->add_distinctify_hash_table_impl_types(
- serialization::HashTableImplType::SEPARATE_CHAINING);
- }
- }
+ aggr_state_proto->set_is_partitioned(aggr_state_is_partitioned);
+ aggr_state_proto->set_num_partitions(aggr_state_num_partitions);
if (physical_plan->filter_predicate() != nullptr) {
unique_ptr<const Predicate> predicate(convertPredicate(physical_plan->filter_predicate()));
- aggr_state_proto->mutable_predicate()->CopyFrom(predicate->getProto());
+ aggr_state_proto->mutable_predicate()->MergeFrom(predicate->getProto());
}
const QueryPlan::DAGNodeIndex aggregation_operator_index =
execution_plan_->addRelationalOperator(
new AggregationOperator(
query_handle_->query_id(),
- *input_relation_info->relation,
+ input_relation,
input_relation_info->isStoredRelation(),
aggr_state_index,
num_partitions));
@@ -1765,6 +1857,7 @@
new FinalizeAggregationOperator(query_handle_->query_id(),
aggr_state_index,
num_partitions,
+ aggr_state_num_partitions,
*output_relation,
insert_destination_index));
@@ -1827,18 +1920,23 @@
std::unique_ptr<const Scalar> execution_group_by_expression(
physical_plan->right_join_attributes().front()->concretize(
attribute_substitution_map_));
- aggr_state_proto->add_group_by_expressions()->CopyFrom(
+ aggr_state_proto->add_group_by_expressions()->MergeFrom(
execution_group_by_expression->getProto());
aggr_state_proto->set_hash_table_impl_type(
serialization::HashTableImplType::COLLISION_FREE_VECTOR);
- aggr_state_proto->set_estimated_num_entries(
- physical_plan->group_by_key_value_range());
+
+ const size_t estimated_num_entries = physical_plan->group_by_key_value_range();
+ aggr_state_proto->set_estimated_num_entries(estimated_num_entries);
+
+ const size_t aggr_state_num_partitions =
+ CalculateNumFinalizationPartitionsForCollisionFreeVectorTable(estimated_num_entries);
+ aggr_state_proto->set_num_partitions(aggr_state_num_partitions);
if (physical_plan->right_filter_predicate() != nullptr) {
std::unique_ptr<const Predicate> predicate(
convertPredicate(physical_plan->right_filter_predicate()));
- aggr_state_proto->mutable_predicate()->CopyFrom(predicate->getProto());
+ aggr_state_proto->mutable_predicate()->MergeFrom(predicate->getProto());
}
for (const E::AliasPtr &named_aggregate_expression : physical_plan->aggregate_expressions()) {
@@ -1849,13 +1947,13 @@
S::Aggregate *aggr_proto = aggr_state_proto->add_aggregates();
// Set the AggregateFunction.
- aggr_proto->mutable_function()->CopyFrom(
+ aggr_proto->mutable_function()->MergeFrom(
unnamed_aggregate_expression->getAggregate().getProto());
// Add each of the aggregate's arguments.
for (const E::ScalarPtr &argument : unnamed_aggregate_expression->getArguments()) {
unique_ptr<const Scalar> concretized_argument(argument->concretize(attribute_substitution_map_));
- aggr_proto->add_argument()->CopyFrom(concretized_argument->getProto());
+ aggr_proto->add_argument()->MergeFrom(concretized_argument->getProto());
}
// Set whether it is a DISTINCT aggregation.
@@ -1926,6 +2024,7 @@
new FinalizeAggregationOperator(query_handle_->query_id(),
aggr_state_index,
num_partitions,
+ aggr_state_num_partitions,
*output_relation,
insert_destination_index));
diff --git a/relational_operators/FinalizeAggregationOperator.cpp b/relational_operators/FinalizeAggregationOperator.cpp
index 14db825..8283437 100644
--- a/relational_operators/FinalizeAggregationOperator.cpp
+++ b/relational_operators/FinalizeAggregationOperator.cpp
@@ -50,7 +50,7 @@
query_context->getAggregationState(aggr_state_index_, part_id);
DCHECK(agg_state != nullptr);
for (std::size_t state_part_id = 0;
- state_part_id < agg_state->getNumFinalizationPartitions();
+ state_part_id < aggr_state_num_partitions_;
++state_part_id) {
container->addNormalWorkOrder(
new FinalizeAggregationWorkOrder(
diff --git a/relational_operators/FinalizeAggregationOperator.hpp b/relational_operators/FinalizeAggregationOperator.hpp
index 5210de2..12433b9 100644
--- a/relational_operators/FinalizeAggregationOperator.hpp
+++ b/relational_operators/FinalizeAggregationOperator.hpp
@@ -69,11 +69,13 @@
const std::size_t query_id,
const QueryContext::aggregation_state_id aggr_state_index,
const std::size_t num_partitions,
+ const std::size_t aggr_state_num_partitions,
const CatalogRelation &output_relation,
const QueryContext::insert_destination_id output_destination_index)
: RelationalOperator(query_id),
aggr_state_index_(aggr_state_index),
num_partitions_(num_partitions),
+ aggr_state_num_partitions_(aggr_state_num_partitions),
output_relation_(output_relation),
output_destination_index_(output_destination_index),
started_(false) {}
@@ -106,7 +108,7 @@
private:
const QueryContext::aggregation_state_id aggr_state_index_;
- const std::size_t num_partitions_;
+ const std::size_t num_partitions_, aggr_state_num_partitions_;
const CatalogRelation &output_relation_;
const QueryContext::insert_destination_id output_destination_index_;
bool started_;
diff --git a/relational_operators/tests/AggregationOperator_unittest.cpp b/relational_operators/tests/AggregationOperator_unittest.cpp
index 0690b6b..3b4a737 100644
--- a/relational_operators/tests/AggregationOperator_unittest.cpp
+++ b/relational_operators/tests/AggregationOperator_unittest.cpp
@@ -293,6 +293,7 @@
new FinalizeAggregationOperator(kQueryId,
aggr_state_index,
kNumPartitions,
+ kNumPartitions,
*result_table_,
insert_destination_index));
@@ -387,6 +388,7 @@
new FinalizeAggregationOperator(kQueryId,
aggr_state_index,
kNumPartitions,
+ kNumPartitions,
*result_table_,
insert_destination_index));
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 0f4a105..3d1c14a 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -61,20 +61,10 @@
#include "utility/ColumnVectorCache.hpp"
#include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
-#include "gflags/gflags.h"
-
#include "glog/logging.h"
namespace quickstep {
-DEFINE_int32(num_aggregation_partitions,
- 41,
- "The number of partitions used for performing the aggregation");
-DEFINE_uint64(partition_aggregation_num_groups_threshold,
- 100000,
- "The threshold used for deciding whether the aggregation is done "
- "in a partitioned way or not");
-
AggregationOperationState::AggregationOperationState(
const CatalogRelationSchema &input_relation,
const std::vector<const AggregateFunction *> &aggregate_functions,
@@ -83,31 +73,21 @@
std::vector<std::unique_ptr<const Scalar>> &&group_by,
const Predicate *predicate,
const std::size_t estimated_num_entries,
+ const bool is_partitioned,
+ const std::size_t num_partitions,
const HashTableImplType hash_table_impl_type,
const std::vector<HashTableImplType> &distinctify_hash_table_impl_types,
StorageManager *storage_manager)
: input_relation_(input_relation),
- is_aggregate_collision_free_(false),
- is_aggregate_partitioned_(false),
+ is_aggregate_collision_free_(
+ group_by.empty() ? false
+ : hash_table_impl_type == HashTableImplType::kCollisionFreeVector),
+ is_aggregate_partitioned_(is_partitioned),
predicate_(predicate),
is_distinct_(std::move(is_distinct)),
all_distinct_(std::accumulate(is_distinct_.begin(), is_distinct_.end(),
!is_distinct_.empty(), std::logical_and<bool>())),
storage_manager_(storage_manager) {
- if (!group_by.empty()) {
- switch (hash_table_impl_type) {
- case HashTableImplType::kCollisionFreeVector:
- is_aggregate_collision_free_ = true;
- break;
- case HashTableImplType::kThreadPrivateCompactKey:
- is_aggregate_partitioned_ = false;
- break;
- default:
- is_aggregate_partitioned_ = checkAggregatePartitioned(
- estimated_num_entries, is_distinct_, group_by, aggregate_functions);
- }
- }
-
// Sanity checks: each aggregate has a corresponding list of arguments.
DCHECK(aggregate_functions.size() == arguments.size());
@@ -195,7 +175,7 @@
DCHECK(partitioned_group_by_hashtable_pool_ == nullptr);
partitioned_group_by_hashtable_pool_.reset(
new PartitionedHashTablePool(estimated_num_entries,
- FLAGS_num_aggregation_partitions,
+ num_partitions,
*distinctify_hash_table_impl_types_it,
key_types,
{},
@@ -227,7 +207,8 @@
group_by_types_,
estimated_num_entries,
group_by_handles,
- storage_manager));
+ storage_manager,
+ num_partitions));
} else if (is_aggregate_partitioned_) {
if (all_distinct_) {
DCHECK_EQ(1u, group_by_handles.size());
@@ -241,7 +222,7 @@
} else {
partitioned_group_by_hashtable_pool_.reset(
new PartitionedHashTablePool(estimated_num_entries,
- FLAGS_num_aggregation_partitions,
+ num_partitions,
hash_table_impl_type,
group_by_types_,
group_by_handles,
@@ -315,6 +296,8 @@
std::move(group_by_expressions),
predicate.release(),
proto.estimated_num_entries(),
+ proto.is_partitioned(),
+ proto.num_partitions(),
HashTableImplTypeFromProto(proto.hash_table_impl_type()),
distinctify_hash_table_impl_types,
storage_manager);
@@ -385,50 +368,6 @@
return true;
}
-bool AggregationOperationState::checkAggregatePartitioned(
- const std::size_t estimated_num_groups,
- const std::vector<bool> &is_distinct,
- const std::vector<std::unique_ptr<const Scalar>> &group_by,
- const std::vector<const AggregateFunction *> &aggregate_functions) const {
- // If there's no aggregation, return false.
- if (aggregate_functions.empty()) {
- return false;
- }
- // If there is only only aggregate function, we allow distinct aggregation.
- // Otherwise it can't be partitioned with distinct aggregation.
- if (aggregate_functions.size() > 1) {
- for (auto distinct : is_distinct) {
- if (distinct) {
- return false;
- }
- }
- }
- // There's no distinct aggregation involved, Check if there's at least one
- // GROUP BY operation.
- if (group_by.empty()) {
- return false;
- }
-
- // Currently we require that all the group-by keys are ScalarAttributes for
- // the convenient of implementing copy elision.
- // TODO(jianqiao): relax this requirement.
- for (const auto &group_by_element : group_by) {
- if (group_by_element->getAttributeIdForValueAccessor() == kInvalidAttributeID) {
- return false;
- }
- }
-
- // Currently we always use partitioned aggregation to parallelize distinct
- // aggregation.
- if (all_distinct_) {
- return true;
- }
-
- // There are GROUP BYs without DISTINCT. Check if the estimated number of
- // groups is large enough to warrant a partitioned aggregation.
- return estimated_num_groups >= FLAGS_partition_aggregation_num_groups_threshold;
-}
-
std::size_t AggregationOperationState::getNumInitializationPartitions() const {
if (is_aggregate_collision_free_) {
return static_cast<CollisionFreeVectorTable *>(
@@ -438,17 +377,6 @@
}
}
-std::size_t AggregationOperationState::getNumFinalizationPartitions() const {
- if (is_aggregate_collision_free_) {
- return static_cast<CollisionFreeVectorTable *>(
- collision_free_hashtable_.get())->getNumFinalizationPartitions();
- } else if (is_aggregate_partitioned_) {
- return partitioned_group_by_hashtable_pool_->getNumPartitions();
- } else {
- return 1u;
- }
-}
-
CollisionFreeVectorTable* AggregationOperationState
::getCollisionFreeVectorTable() const {
return static_cast<CollisionFreeVectorTable *>(
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index 207c4f0..6174478 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -98,6 +98,9 @@
* @param estimated_num_entries Estimated of number of entries in the hash
* table. A good estimate would be a fraction of total number of tuples
* in the input relation.
+ * @param is_partitioned Whether this aggregation state is partitioned.
+ * @param num_partitions The number of partitions of the aggregation state
+ * hash table.
* @param hash_table_impl_type The HashTable implementation to use for
* GROUP BY. Ignored if group_by is empty.
* @param distinctify_hash_table_impl_type The HashTable implementation to use
@@ -114,6 +117,8 @@
std::vector<std::unique_ptr<const Scalar>> &&group_by,
const Predicate *predicate,
const std::size_t estimated_num_entries,
+ const bool is_partitioned,
+ const std::size_t num_partitions,
const HashTableImplType hash_table_impl_type,
const std::vector<HashTableImplType> &distinctify_hash_table_impl_types,
StorageManager *storage_manager);
@@ -161,14 +166,6 @@
std::size_t getNumInitializationPartitions() const;
/**
- * @brief Get the number of partitions to be used for finalizing the
- * aggregation.
- *
- * @return The number of partitions to be used for finalizing the aggregation.
- **/
- std::size_t getNumFinalizationPartitions() const;
-
- /**
* @brief Initialize the specified partition of this aggregation.
*
* @param partition_id ID of the partition to be initialized.
@@ -213,13 +210,6 @@
std::size_t getMemoryConsumptionBytes() const;
private:
- // Check whether partitioned aggregation can be applied.
- bool checkAggregatePartitioned(
- const std::size_t estimated_num_groups,
- const std::vector<bool> &is_distinct,
- const std::vector<std::unique_ptr<const Scalar>> &group_by,
- const std::vector<const AggregateFunction *> &aggregate_functions) const;
-
// Aggregate on input block.
void aggregateBlockSingleState(const ValueAccessorMultiplexer &accessor_mux);
@@ -271,10 +261,10 @@
const CatalogRelationSchema &input_relation_;
// Whether the aggregation is collision free or not.
- bool is_aggregate_collision_free_;
+ const bool is_aggregate_collision_free_;
// Whether the aggregation is partitioned or not.
- bool is_aggregate_partitioned_;
+ const bool is_aggregate_partitioned_;
std::unique_ptr<const Predicate> predicate_;
diff --git a/storage/AggregationOperationState.proto b/storage/AggregationOperationState.proto
index 7521d73..1a8a302 100644
--- a/storage/AggregationOperationState.proto
+++ b/storage/AggregationOperationState.proto
@@ -42,4 +42,7 @@
// Each DISTINCT aggregation has its distinctify hash table impl type.
repeated HashTableImplType distinctify_hash_table_impl_types = 7;
+
+ optional bool is_partitioned = 8;
+ optional uint64 num_partitions = 9 [default = 1];
}
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index f33a4f4..3d90bc5 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -267,7 +267,6 @@
# Link dependencies:
target_link_libraries(quickstep_storage_AggregationOperationState
- ${GFLAGS_LIB_NAME}
glog
quickstep_catalog_CatalogDatabaseLite
quickstep_catalog_CatalogRelationSchema
diff --git a/storage/CollisionFreeVectorTable.cpp b/storage/CollisionFreeVectorTable.cpp
index d836014..679f77b 100644
--- a/storage/CollisionFreeVectorTable.cpp
+++ b/storage/CollisionFreeVectorTable.cpp
@@ -43,15 +43,17 @@
CollisionFreeVectorTable::CollisionFreeVectorTable(
const Type *key_type,
const std::size_t num_entries,
+ const std::size_t num_finalize_partitions,
const std::vector<AggregationHandle *> &handles,
StorageManager *storage_manager)
: key_type_(key_type),
num_entries_(num_entries),
num_handles_(handles.size()),
handles_(handles),
- num_finalize_partitions_(CalculateNumFinalizationPartitions(num_entries_)),
+ num_finalize_partitions_(num_finalize_partitions),
storage_manager_(storage_manager) {
DCHECK_GT(num_entries, 0u);
+ DCHECK_GT(num_finalize_partitions_, 0u);
std::size_t required_memory = 0;
const std::size_t existence_map_offset = 0;
diff --git a/storage/CollisionFreeVectorTable.hpp b/storage/CollisionFreeVectorTable.hpp
index 221a221..5aeb5cf 100644
--- a/storage/CollisionFreeVectorTable.hpp
+++ b/storage/CollisionFreeVectorTable.hpp
@@ -58,6 +58,8 @@
*
* @param key_type The group-by key type.
* @param num_entries The estimated number of entries this table will hold.
+ * @param num_finalize_partitions The number of partitions to be used for
+ * finalizing the aggregation.
* @param handles The aggregation handles.
* @param storage_manager The StorageManager to use (a StorageBlob will be
* allocated to hold this table's contents).
@@ -65,6 +67,7 @@
CollisionFreeVectorTable(
const Type *key_type,
const std::size_t num_entries,
+ const std::size_t num_finalize_partitions,
const std::vector<AggregationHandle *> &handles,
StorageManager *storage_manager);
@@ -193,17 +196,6 @@
return std::max(1uL, std::min(memory_size / kInitBlockSize, 80uL));
}
- inline static std::size_t CalculateNumFinalizationPartitions(
- const std::size_t num_entries) {
- // Set finalization segment size as 4096 entries.
- constexpr std::size_t kFinalizeSegmentSize = 4uL * 1024L;
-
- // At least 1 partition, at most 80 partitions.
- // TODO(jianqiao): set the upbound as (# of workers * 2) instead of the
- // hardcoded 80.
- return std::max(1uL, std::min(num_entries / kFinalizeSegmentSize, 80uL));
- }
-
inline std::size_t calculatePartitionLength() const {
const std::size_t partition_length =
(num_entries_ + num_finalize_partitions_ - 1) / num_finalize_partitions_;
diff --git a/storage/HashTableFactory.hpp b/storage/HashTableFactory.hpp
index cb1f16f..d367160 100644
--- a/storage/HashTableFactory.hpp
+++ b/storage/HashTableFactory.hpp
@@ -356,6 +356,8 @@
* @param storage_manager The StorageManager to use (a StorageBlob will be
* allocated to hold the hash table's contents). Forwarded as-is to the
* hash table constructor.
+ * @param num_partitions The number of partitions of this aggregation state
+ * hash table.
* @return A new aggregation state hash table.
**/
static AggregationStateHashTableBase* CreateResizable(
@@ -363,12 +365,13 @@
const std::vector<const Type*> &key_types,
const std::size_t num_entries,
const std::vector<AggregationHandle *> &handles,
- StorageManager *storage_manager) {
+ StorageManager *storage_manager,
+ const std::size_t num_partitions = 1u) {
switch (hash_table_type) {
case HashTableImplType::kCollisionFreeVector:
DCHECK_EQ(1u, key_types.size());
return new CollisionFreeVectorTable(
- key_types.front(), num_entries, handles, storage_manager);
+ key_types.front(), num_entries, num_partitions, handles, storage_manager);
case HashTableImplType::kSeparateChaining:
return new PackedPayloadHashTable(
key_types, num_entries, handles, storage_manager);