Determine #Partitions for Aggr State Hash Table in the optimizer.
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index 564c5c8..69105e6 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -61,6 +61,7 @@
                       quickstep_catalog_CatalogTypedefs
                       quickstep_catalog_PartitionScheme
                       quickstep_catalog_PartitionSchemeHeader
+                      quickstep_cli_Flags
                       quickstep_expressions_Expressions_proto
                       quickstep_expressions_aggregation_AggregateFunction
                       quickstep_expressions_aggregation_AggregateFunction_proto
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 4f6f807..88dc505 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -21,7 +21,9 @@
 
 #include <algorithm>
 #include <cstddef>
+#include <functional>
 #include <memory>
+#include <numeric>
 #include <string>
 #include <type_traits>
 #include <unordered_map>
@@ -46,6 +48,7 @@
 #include "catalog/CatalogTypedefs.hpp"
 #include "catalog/PartitionScheme.hpp"
 #include "catalog/PartitionSchemeHeader.hpp"
+#include "cli/Flags.hpp"
 #include "expressions/Expressions.pb.h"
 #include "expressions/aggregation/AggregateFunction.hpp"
 #include "expressions/aggregation/AggregateFunction.pb.h"
@@ -167,10 +170,83 @@
 
 DEFINE_bool(parallelize_load, true, "Parallelize loading data files.");
 
+static bool ValidateNumAggregationPartitions(const char *flagname, int value) {
+  return value > 0;
+}
+DEFINE_int32(num_aggregation_partitions,
+             41,
+             "The number of partitions in PartitionedHashTablePool used for "
+             "performing the aggregation");
+static const volatile bool num_aggregation_partitions_dummy
+    = gflags::RegisterFlagValidator(&FLAGS_num_aggregation_partitions, &ValidateNumAggregationPartitions);
+
+DEFINE_uint64(partition_aggregation_num_groups_threshold,
+              100000,
+              "The threshold used for deciding whether the aggregation is done "
+              "in a partitioned way or not");
+
 namespace E = ::quickstep::optimizer::expressions;
 namespace P = ::quickstep::optimizer::physical;
 namespace S = ::quickstep::serialization;
 
+namespace {
+
+size_t CalculateNumFinalizationPartitionsForCollisionFreeVectorTable(const size_t num_entries) {
+  // Set finalization segment size as 4096 entries.
+  constexpr size_t kFinalizeSegmentSize = 4uL * 1024L;
+
+  // At least 1 partition, at most (#workers * 2) partitions.
+  return std::max(1uL, std::min(num_entries / kFinalizeSegmentSize,
+                                static_cast<size_t>(2 * FLAGS_num_workers)));
+}
+
+bool CheckAggregatePartitioned(const std::size_t num_aggregate_functions,
+                               const std::vector<bool> &is_distincts,
+                               const std::vector<attribute_id> &group_by_attrs,
+                               const std::size_t estimated_num_groups) {
+  // If there's no aggregation, return false.
+  if (num_aggregate_functions == 0) {
+    return false;
+  }
+  // If there is only only aggregate function, we allow distinct aggregation.
+  // Otherwise it can't be partitioned with distinct aggregation.
+  if (num_aggregate_functions > 1) {
+    for (const bool distinct : is_distincts) {
+      if (distinct) {
+        return false;
+      }
+    }
+  }
+  // There's no distinct aggregation involved, Check if there's at least one
+  // GROUP BY operation.
+  if (group_by_attrs.empty()) {
+    return false;
+  }
+
+  // Currently we require that all the group-by keys are ScalarAttributes for
+  // the convenient of implementing copy elision.
+  // TODO(jianqiao): relax this requirement.
+  for (const attribute_id group_by_attr : group_by_attrs) {
+    if (group_by_attr == kInvalidAttributeID) {
+      return false;
+    }
+  }
+
+  // Currently we always use partitioned aggregation to parallelize distinct
+  // aggregation.
+  const bool all_distinct = std::accumulate(is_distincts.begin(), is_distincts.end(),
+                                            !is_distincts.empty(), std::logical_and<bool>());
+  if (all_distinct) {
+    return true;
+  }
+
+  // There are GROUP BYs without DISTINCT. Check if the estimated number of
+  // groups is large enough to warrant a partitioned aggregation.
+  return estimated_num_groups >= FLAGS_partition_aggregation_num_groups_threshold;
+}
+
+}  // namespace
+
 constexpr QueryPlan::DAGNodeIndex ExecutionGenerator::CatalogRelationInfo::kInvalidOperatorIndex;
 
 void ExecutionGenerator::generatePlan(const P::PhysicalPtr &physical_plan) {
@@ -1618,8 +1694,8 @@
     const P::AggregatePtr &physical_plan) {
   const CatalogRelationInfo *input_relation_info =
       findRelationInfoOutputByPhysical(physical_plan->input());
-  const CatalogRelation *input_relation = input_relation_info->relation;
-  const PartitionScheme *input_partition_scheme = input_relation->getPartitionScheme();
+  const CatalogRelation &input_relation = *input_relation_info->relation;
+  const PartitionScheme *input_partition_scheme = input_relation.getPartitionScheme();
   const size_t num_partitions =
       input_partition_scheme
           ? input_partition_scheme->getPartitionSchemeHeader().getNumPartitions()
@@ -1634,11 +1710,12 @@
 
   S::AggregationOperationState *aggr_state_proto =
       aggr_state_context_proto->mutable_aggregation_state();
-  aggr_state_proto->set_relation_id(input_relation->getID());
+  aggr_state_proto->set_relation_id(input_relation.getID());
 
   bool use_parallel_initialization = false;
 
   std::vector<const Type*> group_by_types;
+  std::vector<attribute_id> group_by_attrs;
   for (const E::NamedExpressionPtr &grouping_expression : physical_plan->grouping_expressions()) {
     unique_ptr<const Scalar> execution_group_by_expression;
     E::AliasPtr alias;
@@ -1653,10 +1730,47 @@
       execution_group_by_expression.reset(
           grouping_expression->concretize(attribute_substitution_map_));
     }
-    aggr_state_proto->add_group_by_expressions()->CopyFrom(execution_group_by_expression->getProto());
+    aggr_state_proto->add_group_by_expressions()->MergeFrom(execution_group_by_expression->getProto());
     group_by_types.push_back(&execution_group_by_expression->getType());
+    group_by_attrs.push_back(execution_group_by_expression->getAttributeIdForValueAccessor());
   }
 
+  const auto &aggregate_expressions = physical_plan->aggregate_expressions();
+  vector<bool> is_distincts;
+  for (const E::AliasPtr &named_aggregate_expression : aggregate_expressions) {
+    const E::AggregateFunctionPtr unnamed_aggregate_expression =
+        std::static_pointer_cast<const E::AggregateFunction>(named_aggregate_expression->expression());
+
+    // Add a new entry in 'aggregates'.
+    S::Aggregate *aggr_proto = aggr_state_proto->add_aggregates();
+
+    // Set the AggregateFunction.
+    aggr_proto->mutable_function()->MergeFrom(
+        unnamed_aggregate_expression->getAggregate().getProto());
+
+    // Add each of the aggregate's arguments.
+    for (const E::ScalarPtr &argument : unnamed_aggregate_expression->getArguments()) {
+      unique_ptr<const Scalar> concretized_argument(argument->concretize(attribute_substitution_map_));
+      aggr_proto->add_argument()->MergeFrom(concretized_argument->getProto());
+    }
+
+    // Set whether it is a DISTINCT aggregation.
+    const bool is_distinct = unnamed_aggregate_expression->is_distinct();
+    aggr_proto->set_is_distinct(is_distinct);
+    is_distincts.push_back(is_distinct);
+
+    // Add distinctify hash table impl type if it is a DISTINCT aggregation.
+    if (unnamed_aggregate_expression->is_distinct()) {
+      const std::vector<E::ScalarPtr> &arguments = unnamed_aggregate_expression->getArguments();
+      DCHECK_GE(arguments.size(), 1u);
+      // Right now only SeparateChaining implementation is supported.
+      aggr_state_proto->add_distinctify_hash_table_impl_types(
+          serialization::HashTableImplType::SEPARATE_CHAINING);
+    }
+  }
+
+  bool aggr_state_is_partitioned = false;
+  std::size_t aggr_state_num_partitions = 1u;
   if (!group_by_types.empty()) {
     const std::size_t estimated_num_groups =
         cost_model_for_aggregation_->estimateNumGroupsForAggregate(physical_plan);
@@ -1671,6 +1785,7 @@
           serialization::HashTableImplType::COLLISION_FREE_VECTOR);
       aggr_state_proto->set_estimated_num_entries(max_num_groups);
       use_parallel_initialization = true;
+      aggr_state_num_partitions = CalculateNumFinalizationPartitionsForCollisionFreeVectorTable(max_num_groups);
     } else {
       if (cost_model_for_aggregation_->canUseTwoPhaseCompactKeyAggregation(
               physical_plan, estimated_num_groups)) {
@@ -1681,53 +1796,30 @@
         // Otherwise, use SeparateChaining.
         aggr_state_proto->set_hash_table_impl_type(
             serialization::HashTableImplType::SEPARATE_CHAINING);
+        if (CheckAggregatePartitioned(aggregate_expressions.size(), is_distincts, group_by_attrs,
+                                      estimated_num_groups)) {
+          aggr_state_is_partitioned = true;
+          aggr_state_num_partitions = FLAGS_num_aggregation_partitions;
+        }
       }
       aggr_state_proto->set_estimated_num_entries(std::max(16uL, estimated_num_groups));
     }
   } else {
     aggr_state_proto->set_estimated_num_entries(1uL);
   }
-
-  for (const E::AliasPtr &named_aggregate_expression : physical_plan->aggregate_expressions()) {
-    const E::AggregateFunctionPtr unnamed_aggregate_expression =
-        std::static_pointer_cast<const E::AggregateFunction>(named_aggregate_expression->expression());
-
-    // Add a new entry in 'aggregates'.
-    S::Aggregate *aggr_proto = aggr_state_proto->add_aggregates();
-
-    // Set the AggregateFunction.
-    aggr_proto->mutable_function()->CopyFrom(
-        unnamed_aggregate_expression->getAggregate().getProto());
-
-    // Add each of the aggregate's arguments.
-    for (const E::ScalarPtr &argument : unnamed_aggregate_expression->getArguments()) {
-      unique_ptr<const Scalar> concretized_argument(argument->concretize(attribute_substitution_map_));
-      aggr_proto->add_argument()->CopyFrom(concretized_argument->getProto());
-    }
-
-    // Set whether it is a DISTINCT aggregation.
-    aggr_proto->set_is_distinct(unnamed_aggregate_expression->is_distinct());
-
-    // Add distinctify hash table impl type if it is a DISTINCT aggregation.
-    if (unnamed_aggregate_expression->is_distinct()) {
-      const std::vector<E::ScalarPtr> &arguments = unnamed_aggregate_expression->getArguments();
-      DCHECK_GE(arguments.size(), 1u);
-      // Right now only SeparateChaining implementation is supported.
-      aggr_state_proto->add_distinctify_hash_table_impl_types(
-          serialization::HashTableImplType::SEPARATE_CHAINING);
-    }
-  }
+  aggr_state_proto->set_is_partitioned(aggr_state_is_partitioned);
+  aggr_state_proto->set_num_partitions(aggr_state_num_partitions);
 
   if (physical_plan->filter_predicate() != nullptr) {
     unique_ptr<const Predicate> predicate(convertPredicate(physical_plan->filter_predicate()));
-    aggr_state_proto->mutable_predicate()->CopyFrom(predicate->getProto());
+    aggr_state_proto->mutable_predicate()->MergeFrom(predicate->getProto());
   }
 
   const QueryPlan::DAGNodeIndex aggregation_operator_index =
       execution_plan_->addRelationalOperator(
           new AggregationOperator(
               query_handle_->query_id(),
-              *input_relation_info->relation,
+              input_relation,
               input_relation_info->isStoredRelation(),
               aggr_state_index,
               num_partitions));
@@ -1765,6 +1857,7 @@
           new FinalizeAggregationOperator(query_handle_->query_id(),
                                           aggr_state_index,
                                           num_partitions,
+                                          aggr_state_num_partitions,
                                           *output_relation,
                                           insert_destination_index));
 
@@ -1827,18 +1920,23 @@
   std::unique_ptr<const Scalar> execution_group_by_expression(
       physical_plan->right_join_attributes().front()->concretize(
           attribute_substitution_map_));
-  aggr_state_proto->add_group_by_expressions()->CopyFrom(
+  aggr_state_proto->add_group_by_expressions()->MergeFrom(
       execution_group_by_expression->getProto());
 
   aggr_state_proto->set_hash_table_impl_type(
       serialization::HashTableImplType::COLLISION_FREE_VECTOR);
-  aggr_state_proto->set_estimated_num_entries(
-      physical_plan->group_by_key_value_range());
+
+  const size_t estimated_num_entries = physical_plan->group_by_key_value_range();
+  aggr_state_proto->set_estimated_num_entries(estimated_num_entries);
+
+  const size_t aggr_state_num_partitions =
+      CalculateNumFinalizationPartitionsForCollisionFreeVectorTable(estimated_num_entries);
+  aggr_state_proto->set_num_partitions(aggr_state_num_partitions);
 
   if (physical_plan->right_filter_predicate() != nullptr) {
     std::unique_ptr<const Predicate> predicate(
         convertPredicate(physical_plan->right_filter_predicate()));
-    aggr_state_proto->mutable_predicate()->CopyFrom(predicate->getProto());
+    aggr_state_proto->mutable_predicate()->MergeFrom(predicate->getProto());
   }
 
   for (const E::AliasPtr &named_aggregate_expression : physical_plan->aggregate_expressions()) {
@@ -1849,13 +1947,13 @@
     S::Aggregate *aggr_proto = aggr_state_proto->add_aggregates();
 
     // Set the AggregateFunction.
-    aggr_proto->mutable_function()->CopyFrom(
+    aggr_proto->mutable_function()->MergeFrom(
         unnamed_aggregate_expression->getAggregate().getProto());
 
     // Add each of the aggregate's arguments.
     for (const E::ScalarPtr &argument : unnamed_aggregate_expression->getArguments()) {
       unique_ptr<const Scalar> concretized_argument(argument->concretize(attribute_substitution_map_));
-      aggr_proto->add_argument()->CopyFrom(concretized_argument->getProto());
+      aggr_proto->add_argument()->MergeFrom(concretized_argument->getProto());
     }
 
     // Set whether it is a DISTINCT aggregation.
@@ -1926,6 +2024,7 @@
           new FinalizeAggregationOperator(query_handle_->query_id(),
                                           aggr_state_index,
                                           num_partitions,
+                                          aggr_state_num_partitions,
                                           *output_relation,
                                           insert_destination_index));
 
diff --git a/relational_operators/FinalizeAggregationOperator.cpp b/relational_operators/FinalizeAggregationOperator.cpp
index 14db825..8283437 100644
--- a/relational_operators/FinalizeAggregationOperator.cpp
+++ b/relational_operators/FinalizeAggregationOperator.cpp
@@ -50,7 +50,7 @@
           query_context->getAggregationState(aggr_state_index_, part_id);
       DCHECK(agg_state != nullptr);
       for (std::size_t state_part_id = 0;
-           state_part_id < agg_state->getNumFinalizationPartitions();
+           state_part_id < aggr_state_num_partitions_;
            ++state_part_id) {
         container->addNormalWorkOrder(
             new FinalizeAggregationWorkOrder(
diff --git a/relational_operators/FinalizeAggregationOperator.hpp b/relational_operators/FinalizeAggregationOperator.hpp
index 5210de2..12433b9 100644
--- a/relational_operators/FinalizeAggregationOperator.hpp
+++ b/relational_operators/FinalizeAggregationOperator.hpp
@@ -69,11 +69,13 @@
       const std::size_t query_id,
       const QueryContext::aggregation_state_id aggr_state_index,
       const std::size_t num_partitions,
+      const std::size_t aggr_state_num_partitions,
       const CatalogRelation &output_relation,
       const QueryContext::insert_destination_id output_destination_index)
       : RelationalOperator(query_id),
         aggr_state_index_(aggr_state_index),
         num_partitions_(num_partitions),
+        aggr_state_num_partitions_(aggr_state_num_partitions),
         output_relation_(output_relation),
         output_destination_index_(output_destination_index),
         started_(false) {}
@@ -106,7 +108,7 @@
 
  private:
   const QueryContext::aggregation_state_id aggr_state_index_;
-  const std::size_t num_partitions_;
+  const std::size_t num_partitions_, aggr_state_num_partitions_;
   const CatalogRelation &output_relation_;
   const QueryContext::insert_destination_id output_destination_index_;
   bool started_;
diff --git a/relational_operators/tests/AggregationOperator_unittest.cpp b/relational_operators/tests/AggregationOperator_unittest.cpp
index 0690b6b..3b4a737 100644
--- a/relational_operators/tests/AggregationOperator_unittest.cpp
+++ b/relational_operators/tests/AggregationOperator_unittest.cpp
@@ -293,6 +293,7 @@
         new FinalizeAggregationOperator(kQueryId,
                                         aggr_state_index,
                                         kNumPartitions,
+                                        kNumPartitions,
                                         *result_table_,
                                         insert_destination_index));
 
@@ -387,6 +388,7 @@
         new FinalizeAggregationOperator(kQueryId,
                                         aggr_state_index,
                                         kNumPartitions,
+                                        kNumPartitions,
                                         *result_table_,
                                         insert_destination_index));
 
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 0f4a105..3d1c14a 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -61,20 +61,10 @@
 #include "utility/ColumnVectorCache.hpp"
 #include "utility/lip_filter/LIPFilterAdaptiveProber.hpp"
 
-#include "gflags/gflags.h"
-
 #include "glog/logging.h"
 
 namespace quickstep {
 
-DEFINE_int32(num_aggregation_partitions,
-             41,
-             "The number of partitions used for performing the aggregation");
-DEFINE_uint64(partition_aggregation_num_groups_threshold,
-              100000,
-              "The threshold used for deciding whether the aggregation is done "
-              "in a partitioned way or not");
-
 AggregationOperationState::AggregationOperationState(
     const CatalogRelationSchema &input_relation,
     const std::vector<const AggregateFunction *> &aggregate_functions,
@@ -83,31 +73,21 @@
     std::vector<std::unique_ptr<const Scalar>> &&group_by,
     const Predicate *predicate,
     const std::size_t estimated_num_entries,
+    const bool is_partitioned,
+    const std::size_t num_partitions,
     const HashTableImplType hash_table_impl_type,
     const std::vector<HashTableImplType> &distinctify_hash_table_impl_types,
     StorageManager *storage_manager)
     : input_relation_(input_relation),
-      is_aggregate_collision_free_(false),
-      is_aggregate_partitioned_(false),
+      is_aggregate_collision_free_(
+          group_by.empty() ? false
+                           : hash_table_impl_type == HashTableImplType::kCollisionFreeVector),
+      is_aggregate_partitioned_(is_partitioned),
       predicate_(predicate),
       is_distinct_(std::move(is_distinct)),
       all_distinct_(std::accumulate(is_distinct_.begin(), is_distinct_.end(),
                                     !is_distinct_.empty(), std::logical_and<bool>())),
       storage_manager_(storage_manager) {
-  if (!group_by.empty()) {
-    switch (hash_table_impl_type) {
-      case HashTableImplType::kCollisionFreeVector:
-        is_aggregate_collision_free_ = true;
-        break;
-      case HashTableImplType::kThreadPrivateCompactKey:
-        is_aggregate_partitioned_ = false;
-        break;
-      default:
-        is_aggregate_partitioned_ = checkAggregatePartitioned(
-            estimated_num_entries, is_distinct_, group_by, aggregate_functions);
-    }
-  }
-
   // Sanity checks: each aggregate has a corresponding list of arguments.
   DCHECK(aggregate_functions.size() == arguments.size());
 
@@ -195,7 +175,7 @@
         DCHECK(partitioned_group_by_hashtable_pool_ == nullptr);
         partitioned_group_by_hashtable_pool_.reset(
             new PartitionedHashTablePool(estimated_num_entries,
-                                         FLAGS_num_aggregation_partitions,
+                                         num_partitions,
                                          *distinctify_hash_table_impl_types_it,
                                          key_types,
                                          {},
@@ -227,7 +207,8 @@
               group_by_types_,
               estimated_num_entries,
               group_by_handles,
-              storage_manager));
+              storage_manager,
+              num_partitions));
     } else if (is_aggregate_partitioned_) {
       if (all_distinct_) {
         DCHECK_EQ(1u, group_by_handles.size());
@@ -241,7 +222,7 @@
       } else {
         partitioned_group_by_hashtable_pool_.reset(
             new PartitionedHashTablePool(estimated_num_entries,
-                                         FLAGS_num_aggregation_partitions,
+                                         num_partitions,
                                          hash_table_impl_type,
                                          group_by_types_,
                                          group_by_handles,
@@ -315,6 +296,8 @@
       std::move(group_by_expressions),
       predicate.release(),
       proto.estimated_num_entries(),
+      proto.is_partitioned(),
+      proto.num_partitions(),
       HashTableImplTypeFromProto(proto.hash_table_impl_type()),
       distinctify_hash_table_impl_types,
       storage_manager);
@@ -385,50 +368,6 @@
   return true;
 }
 
-bool AggregationOperationState::checkAggregatePartitioned(
-    const std::size_t estimated_num_groups,
-    const std::vector<bool> &is_distinct,
-    const std::vector<std::unique_ptr<const Scalar>> &group_by,
-    const std::vector<const AggregateFunction *> &aggregate_functions) const {
-  // If there's no aggregation, return false.
-  if (aggregate_functions.empty()) {
-    return false;
-  }
-  // If there is only only aggregate function, we allow distinct aggregation.
-  // Otherwise it can't be partitioned with distinct aggregation.
-  if (aggregate_functions.size() > 1) {
-    for (auto distinct : is_distinct) {
-      if (distinct) {
-        return false;
-      }
-    }
-  }
-  // There's no distinct aggregation involved, Check if there's at least one
-  // GROUP BY operation.
-  if (group_by.empty()) {
-    return false;
-  }
-
-  // Currently we require that all the group-by keys are ScalarAttributes for
-  // the convenient of implementing copy elision.
-  // TODO(jianqiao): relax this requirement.
-  for (const auto &group_by_element : group_by) {
-    if (group_by_element->getAttributeIdForValueAccessor() == kInvalidAttributeID) {
-      return false;
-    }
-  }
-
-  // Currently we always use partitioned aggregation to parallelize distinct
-  // aggregation.
-  if (all_distinct_) {
-    return true;
-  }
-
-  // There are GROUP BYs without DISTINCT. Check if the estimated number of
-  // groups is large enough to warrant a partitioned aggregation.
-  return estimated_num_groups >= FLAGS_partition_aggregation_num_groups_threshold;
-}
-
 std::size_t AggregationOperationState::getNumInitializationPartitions() const {
   if (is_aggregate_collision_free_) {
     return static_cast<CollisionFreeVectorTable *>(
@@ -438,17 +377,6 @@
   }
 }
 
-std::size_t AggregationOperationState::getNumFinalizationPartitions() const {
-  if (is_aggregate_collision_free_) {
-    return static_cast<CollisionFreeVectorTable *>(
-        collision_free_hashtable_.get())->getNumFinalizationPartitions();
-  } else if (is_aggregate_partitioned_) {
-    return partitioned_group_by_hashtable_pool_->getNumPartitions();
-  } else  {
-    return 1u;
-  }
-}
-
 CollisionFreeVectorTable* AggregationOperationState
     ::getCollisionFreeVectorTable() const {
   return static_cast<CollisionFreeVectorTable *>(
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index 207c4f0..6174478 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -98,6 +98,9 @@
    * @param estimated_num_entries Estimated of number of entries in the hash
    *        table. A good estimate would be a fraction of total number of tuples
    *        in the input relation.
+   * @param is_partitioned Whether this aggregation state is partitioned.
+   * @param num_partitions The number of partitions of the aggregation state
+   *        hash table.
    * @param hash_table_impl_type The HashTable implementation to use for
    *        GROUP BY. Ignored if group_by is empty.
    * @param distinctify_hash_table_impl_type The HashTable implementation to use
@@ -114,6 +117,8 @@
       std::vector<std::unique_ptr<const Scalar>> &&group_by,
       const Predicate *predicate,
       const std::size_t estimated_num_entries,
+      const bool is_partitioned,
+      const std::size_t num_partitions,
       const HashTableImplType hash_table_impl_type,
       const std::vector<HashTableImplType> &distinctify_hash_table_impl_types,
       StorageManager *storage_manager);
@@ -161,14 +166,6 @@
   std::size_t getNumInitializationPartitions() const;
 
   /**
-   * @brief Get the number of partitions to be used for finalizing the
-   *        aggregation.
-   *
-   * @return The number of partitions to be used for finalizing the aggregation.
-   **/
-  std::size_t getNumFinalizationPartitions() const;
-
-  /**
    * @brief Initialize the specified partition of this aggregation.
    *
    * @param partition_id ID of the partition to be initialized.
@@ -213,13 +210,6 @@
   std::size_t getMemoryConsumptionBytes() const;
 
  private:
-  // Check whether partitioned aggregation can be applied.
-  bool checkAggregatePartitioned(
-      const std::size_t estimated_num_groups,
-      const std::vector<bool> &is_distinct,
-      const std::vector<std::unique_ptr<const Scalar>> &group_by,
-      const std::vector<const AggregateFunction *> &aggregate_functions) const;
-
   // Aggregate on input block.
   void aggregateBlockSingleState(const ValueAccessorMultiplexer &accessor_mux);
 
@@ -271,10 +261,10 @@
   const CatalogRelationSchema &input_relation_;
 
   // Whether the aggregation is collision free or not.
-  bool is_aggregate_collision_free_;
+  const bool is_aggregate_collision_free_;
 
   // Whether the aggregation is partitioned or not.
-  bool is_aggregate_partitioned_;
+  const bool is_aggregate_partitioned_;
 
   std::unique_ptr<const Predicate> predicate_;
 
diff --git a/storage/AggregationOperationState.proto b/storage/AggregationOperationState.proto
index 7521d73..1a8a302 100644
--- a/storage/AggregationOperationState.proto
+++ b/storage/AggregationOperationState.proto
@@ -42,4 +42,7 @@
 
   // Each DISTINCT aggregation has its distinctify hash table impl type.
   repeated HashTableImplType distinctify_hash_table_impl_types = 7;
+
+  optional bool is_partitioned = 8;
+  optional uint64 num_partitions = 9 [default = 1];
 }
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index f33a4f4..3d90bc5 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -267,7 +267,6 @@
 
 # Link dependencies:
 target_link_libraries(quickstep_storage_AggregationOperationState
-                      ${GFLAGS_LIB_NAME}
                       glog
                       quickstep_catalog_CatalogDatabaseLite
                       quickstep_catalog_CatalogRelationSchema
diff --git a/storage/CollisionFreeVectorTable.cpp b/storage/CollisionFreeVectorTable.cpp
index d836014..679f77b 100644
--- a/storage/CollisionFreeVectorTable.cpp
+++ b/storage/CollisionFreeVectorTable.cpp
@@ -43,15 +43,17 @@
 CollisionFreeVectorTable::CollisionFreeVectorTable(
     const Type *key_type,
     const std::size_t num_entries,
+    const std::size_t num_finalize_partitions,
     const std::vector<AggregationHandle *> &handles,
     StorageManager *storage_manager)
     : key_type_(key_type),
       num_entries_(num_entries),
       num_handles_(handles.size()),
       handles_(handles),
-      num_finalize_partitions_(CalculateNumFinalizationPartitions(num_entries_)),
+      num_finalize_partitions_(num_finalize_partitions),
       storage_manager_(storage_manager) {
   DCHECK_GT(num_entries, 0u);
+  DCHECK_GT(num_finalize_partitions_, 0u);
 
   std::size_t required_memory = 0;
   const std::size_t existence_map_offset = 0;
diff --git a/storage/CollisionFreeVectorTable.hpp b/storage/CollisionFreeVectorTable.hpp
index 221a221..5aeb5cf 100644
--- a/storage/CollisionFreeVectorTable.hpp
+++ b/storage/CollisionFreeVectorTable.hpp
@@ -58,6 +58,8 @@
    *
    * @param key_type The group-by key type.
    * @param num_entries The estimated number of entries this table will hold.
+   * @param num_finalize_partitions The number of partitions to be used for
+   *        finalizing the aggregation.
    * @param handles The aggregation handles.
    * @param storage_manager The StorageManager to use (a StorageBlob will be
    *        allocated to hold this table's contents).
@@ -65,6 +67,7 @@
   CollisionFreeVectorTable(
       const Type *key_type,
       const std::size_t num_entries,
+      const std::size_t num_finalize_partitions,
       const std::vector<AggregationHandle *> &handles,
       StorageManager *storage_manager);
 
@@ -193,17 +196,6 @@
     return std::max(1uL, std::min(memory_size / kInitBlockSize, 80uL));
   }
 
-  inline static std::size_t CalculateNumFinalizationPartitions(
-      const std::size_t num_entries) {
-    // Set finalization segment size as 4096 entries.
-    constexpr std::size_t kFinalizeSegmentSize = 4uL * 1024L;
-
-    // At least 1 partition, at most 80 partitions.
-    // TODO(jianqiao): set the upbound as (# of workers * 2) instead of the
-    // hardcoded 80.
-    return std::max(1uL, std::min(num_entries / kFinalizeSegmentSize, 80uL));
-  }
-
   inline std::size_t calculatePartitionLength() const {
     const std::size_t partition_length =
         (num_entries_ + num_finalize_partitions_ - 1) / num_finalize_partitions_;
diff --git a/storage/HashTableFactory.hpp b/storage/HashTableFactory.hpp
index cb1f16f..d367160 100644
--- a/storage/HashTableFactory.hpp
+++ b/storage/HashTableFactory.hpp
@@ -356,6 +356,8 @@
    * @param storage_manager The StorageManager to use (a StorageBlob will be
    *        allocated to hold the hash table's contents). Forwarded as-is to the
    *        hash table constructor.
+   * @param num_partitions The number of partitions of this aggregation state
+   *        hash table.
    * @return A new aggregation state hash table.
    **/
   static AggregationStateHashTableBase* CreateResizable(
@@ -363,12 +365,13 @@
       const std::vector<const Type*> &key_types,
       const std::size_t num_entries,
       const std::vector<AggregationHandle *> &handles,
-      StorageManager *storage_manager) {
+      StorageManager *storage_manager,
+      const std::size_t num_partitions = 1u) {
     switch (hash_table_type) {
       case HashTableImplType::kCollisionFreeVector:
         DCHECK_EQ(1u, key_types.size());
         return new CollisionFreeVectorTable(
-            key_types.front(), num_entries, handles, storage_manager);
+            key_types.front(), num_entries, num_partitions, handles, storage_manager);
       case HashTableImplType::kSeparateChaining:
         return new PackedPayloadHashTable(
             key_types, num_entries, handles, storage_manager);