Determine #InitPartitions for CollisionFreeVectorTable in the optimizer.
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index 69105e6..fdf8796 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -65,6 +65,7 @@
                       quickstep_expressions_Expressions_proto
                       quickstep_expressions_aggregation_AggregateFunction
                       quickstep_expressions_aggregation_AggregateFunction_proto
+                      quickstep_expressions_aggregation_AggregationID
                       quickstep_expressions_predicate_Predicate
                       quickstep_expressions_scalar_Scalar
                       quickstep_expressions_scalar_ScalarAttribute
@@ -150,12 +151,14 @@
                       quickstep_storage_InsertDestination_proto
                       quickstep_storage_StorageBlockLayout
                       quickstep_storage_StorageBlockLayout_proto
+                      quickstep_storage_StorageConstants
                       quickstep_storage_SubBlockTypeRegistry
                       quickstep_types_Type
                       quickstep_types_Type_proto
                       quickstep_types_TypedValue
                       quickstep_types_TypedValue_proto
                       quickstep_types_containers_Tuple_proto
+                      quickstep_utility_BarrieredReadWriteConcurrentBitVector
                       quickstep_utility_Macros
                       quickstep_utility_SqlError)
 if (ENABLE_DISTRIBUTED)
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 88dc505..9bfd136 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -20,6 +20,7 @@
 #include "query_optimizer/ExecutionGenerator.hpp"
 
 #include <algorithm>
+#include <atomic>
 #include <cstddef>
 #include <functional>
 #include <memory>
@@ -52,6 +53,7 @@
 #include "expressions/Expressions.pb.h"
 #include "expressions/aggregation/AggregateFunction.hpp"
 #include "expressions/aggregation/AggregateFunction.pb.h"
+#include "expressions/aggregation/AggregationID.hpp"
 #include "expressions/predicate/Predicate.hpp"
 #include "expressions/scalar/Scalar.hpp"
 #include "expressions/scalar/ScalarAttribute.hpp"
@@ -133,19 +135,23 @@
 #include "storage/InsertDestination.pb.h"
 #include "storage/StorageBlockLayout.hpp"
 #include "storage/StorageBlockLayout.pb.h"
+#include "storage/StorageConstants.hpp"
 #include "storage/SubBlockTypeRegistry.hpp"
 #include "types/Type.hpp"
 #include "types/Type.pb.h"
 #include "types/TypedValue.hpp"
 #include "types/TypedValue.pb.h"
 #include "types/containers/Tuple.pb.h"
+#include "utility/BarrieredReadWriteConcurrentBitVector.hpp"
 #include "utility/SqlError.hpp"
 
 #include "gflags/gflags.h"
 #include "glog/logging.h"
 
+using std::atomic;
 using std::make_unique;
 using std::move;
+using std::pair;
 using std::static_pointer_cast;
 using std::unique_ptr;
 using std::unordered_map;
@@ -191,6 +197,61 @@
 
 namespace {
 
+size_t CacheLineAlignedBytes(const size_t actual_bytes) {
+  return (actual_bytes + kCacheLineBytes - 1) / kCacheLineBytes * kCacheLineBytes;
+}
+
+size_t CalculateNumInitializationPartitionsForCollisionFreeVectorTable(const size_t memory_size) {
+  // At least 1 partition, at most (#workers * 2) partitions.
+  return std::max(1uL, std::min(memory_size / kCollisonFreeVectorInitBlobSize,
+                                static_cast<size_t>(2 * FLAGS_num_workers)));
+}
+
+void CalculateCollisionFreeAggregationInfo(
+    const size_t num_entries, const vector<pair<AggregationID, vector<const Type *>>> &group_by_aggrs_info,
+    S::CollisionFreeVectorInfo *collision_free_vector_info) {
+  size_t memory_size = CacheLineAlignedBytes(
+      BarrieredReadWriteConcurrentBitVector::BytesNeeded(num_entries));
+
+  for (std::size_t i = 0; i < group_by_aggrs_info.size(); ++i) {
+    const auto &group_by_aggr_info = group_by_aggrs_info[i];
+
+    size_t state_size = 0;
+    switch (group_by_aggr_info.first) {
+      case AggregationID::kCount: {
+        state_size = sizeof(atomic<size_t>);
+        break;
+      }
+      case AggregationID::kSum: {
+        const vector<const Type *> &argument_types = group_by_aggr_info.second;
+        DCHECK_EQ(1u, argument_types.size());
+        switch (argument_types.front()->getTypeID()) {
+          case TypeID::kInt:
+          case TypeID::kLong:
+            state_size = sizeof(atomic<std::int64_t>);
+            break;
+          case TypeID::kFloat:
+          case TypeID::kDouble:
+            state_size = sizeof(atomic<double>);
+            break;
+          default:
+            LOG(FATAL) << "No support by CollisionFreeVector";
+        }
+        break;
+      }
+      default:
+        LOG(FATAL) << "No support by CollisionFreeVector";
+    }
+
+    collision_free_vector_info->add_state_offsets(memory_size);
+    memory_size += CacheLineAlignedBytes(state_size * num_entries);
+  }
+
+  collision_free_vector_info->set_memory_size(memory_size);
+  collision_free_vector_info->set_num_init_partitions(
+      CalculateNumInitializationPartitionsForCollisionFreeVectorTable(memory_size));
+}
+
 size_t CalculateNumFinalizationPartitionsForCollisionFreeVectorTable(const size_t num_entries) {
   // Set finalization segment size as 4096 entries.
   constexpr size_t kFinalizeSegmentSize = 4uL * 1024L;
@@ -1737,6 +1798,7 @@
 
   const auto &aggregate_expressions = physical_plan->aggregate_expressions();
   vector<bool> is_distincts;
+  vector<pair<AggregationID, vector<const Type *>>> group_by_aggrs_info;
   for (const E::AliasPtr &named_aggregate_expression : aggregate_expressions) {
     const E::AggregateFunctionPtr unnamed_aggregate_expression =
         std::static_pointer_cast<const E::AggregateFunction>(named_aggregate_expression->expression());
@@ -1745,15 +1807,22 @@
     S::Aggregate *aggr_proto = aggr_state_proto->add_aggregates();
 
     // Set the AggregateFunction.
-    aggr_proto->mutable_function()->MergeFrom(
-        unnamed_aggregate_expression->getAggregate().getProto());
+    const AggregateFunction &aggr_func = unnamed_aggregate_expression->getAggregate();
+    aggr_proto->mutable_function()->MergeFrom(aggr_func.getProto());
 
     // Add each of the aggregate's arguments.
+    vector<const Type *> argument_types;
     for (const E::ScalarPtr &argument : unnamed_aggregate_expression->getArguments()) {
       unique_ptr<const Scalar> concretized_argument(argument->concretize(attribute_substitution_map_));
+      argument_types.push_back(&concretized_argument->getType());
+
       aggr_proto->add_argument()->MergeFrom(concretized_argument->getProto());
     }
 
+    if (!group_by_types.empty()) {
+      group_by_aggrs_info.emplace_back(aggr_func.getAggregationID(), move(argument_types));
+    }
+
     // Set whether it is a DISTINCT aggregation.
     const bool is_distinct = unnamed_aggregate_expression->is_distinct();
     aggr_proto->set_is_distinct(is_distinct);
@@ -1786,6 +1855,10 @@
       aggr_state_proto->set_estimated_num_entries(max_num_groups);
       use_parallel_initialization = true;
       aggr_state_num_partitions = CalculateNumFinalizationPartitionsForCollisionFreeVectorTable(max_num_groups);
+
+      DCHECK(!group_by_aggrs_info.empty());
+      CalculateCollisionFreeAggregationInfo(max_num_groups, group_by_aggrs_info,
+                                            aggr_state_proto->mutable_collision_free_vector_info());
     } else {
       if (cost_model_for_aggregation_->canUseTwoPhaseCompactKeyAggregation(
               physical_plan, estimated_num_groups)) {
@@ -1831,12 +1904,15 @@
   }
 
   if (use_parallel_initialization) {
+    DCHECK(aggr_state_proto->has_collision_free_vector_info());
+
     const QueryPlan::DAGNodeIndex initialize_aggregation_operator_index =
         execution_plan_->addRelationalOperator(
             new InitializeAggregationOperator(
                 query_handle_->query_id(),
                 aggr_state_index,
-                num_partitions));
+                num_partitions,
+                aggr_state_proto->collision_free_vector_info().num_init_partitions()));
 
     execution_plan_->addDirectDependency(aggregation_operator_index,
                                          initialize_aggregation_operator_index,
@@ -1939,6 +2015,7 @@
     aggr_state_proto->mutable_predicate()->MergeFrom(predicate->getProto());
   }
 
+  vector<pair<AggregationID, vector<const Type *>>> group_by_aggrs_info;
   for (const E::AliasPtr &named_aggregate_expression : physical_plan->aggregate_expressions()) {
     const E::AggregateFunctionPtr unnamed_aggregate_expression =
         std::static_pointer_cast<const E::AggregateFunction>(named_aggregate_expression->expression());
@@ -1947,26 +2024,35 @@
     S::Aggregate *aggr_proto = aggr_state_proto->add_aggregates();
 
     // Set the AggregateFunction.
-    aggr_proto->mutable_function()->MergeFrom(
-        unnamed_aggregate_expression->getAggregate().getProto());
+    const AggregateFunction &aggr_func = unnamed_aggregate_expression->getAggregate();
+    aggr_proto->mutable_function()->MergeFrom(aggr_func.getProto());
 
     // Add each of the aggregate's arguments.
+    vector<const Type *> argument_types;
     for (const E::ScalarPtr &argument : unnamed_aggregate_expression->getArguments()) {
       unique_ptr<const Scalar> concretized_argument(argument->concretize(attribute_substitution_map_));
+      argument_types.push_back(&concretized_argument->getType());
+
       aggr_proto->add_argument()->MergeFrom(concretized_argument->getProto());
     }
 
+    group_by_aggrs_info.emplace_back(aggr_func.getAggregationID(), move(argument_types));
+
     // Set whether it is a DISTINCT aggregation.
     DCHECK(!unnamed_aggregate_expression->is_distinct());
     aggr_proto->set_is_distinct(false);
   }
 
+  CalculateCollisionFreeAggregationInfo(estimated_num_entries, group_by_aggrs_info,
+                                        aggr_state_proto->mutable_collision_free_vector_info());
+
   const QueryPlan::DAGNodeIndex initialize_aggregation_operator_index =
       execution_plan_->addRelationalOperator(
           new InitializeAggregationOperator(
               query_handle_->query_id(),
               aggr_state_index,
-              num_partitions));
+              num_partitions,
+              aggr_state_proto->collision_free_vector_info().num_init_partitions()));
 
   const QueryPlan::DAGNodeIndex build_aggregation_existence_map_operator_index =
       execution_plan_->addRelationalOperator(
diff --git a/relational_operators/InitializeAggregationOperator.cpp b/relational_operators/InitializeAggregationOperator.cpp
index e197b08..f91299d 100644
--- a/relational_operators/InitializeAggregationOperator.cpp
+++ b/relational_operators/InitializeAggregationOperator.cpp
@@ -46,7 +46,7 @@
       DCHECK(agg_state != nullptr);
 
       for (std::size_t state_part_id = 0;
-           state_part_id < agg_state->getNumInitializationPartitions();
+           state_part_id < aggr_state_num_init_partitions_;
            ++state_part_id) {
         container->addNormalWorkOrder(
             new InitializeAggregationWorkOrder(query_id_,
diff --git a/relational_operators/InitializeAggregationOperator.hpp b/relational_operators/InitializeAggregationOperator.hpp
index 0a9d25d..b7e9aae 100644
--- a/relational_operators/InitializeAggregationOperator.hpp
+++ b/relational_operators/InitializeAggregationOperator.hpp
@@ -58,13 +58,17 @@
    * @param aggr_state_index The index of the AggregationOperationState in QueryContext.
    * @param num_partitions The number of partitions in 'input_relation'. If no
    *        partitions, it is one.
+   * @param aggr_state_num_init_partitions The number of partitions to be used
+   *        for initialize the aggregation state collision free vector table.
    **/
   InitializeAggregationOperator(const std::size_t query_id,
                                 const QueryContext::aggregation_state_id aggr_state_index,
-                                const std::size_t num_partitions)
+                                const std::size_t num_partitions,
+                                const std::size_t aggr_state_num_init_partitions)
       : RelationalOperator(query_id),
         aggr_state_index_(aggr_state_index),
         num_partitions_(num_partitions),
+        aggr_state_num_init_partitions_(aggr_state_num_init_partitions),
         started_(false) {}
 
   ~InitializeAggregationOperator() override {}
@@ -87,7 +91,7 @@
 
  private:
   const QueryContext::aggregation_state_id aggr_state_index_;
-  const std::size_t num_partitions_;
+  const std::size_t num_partitions_, aggr_state_num_init_partitions_;
   bool started_;
 
   DISALLOW_COPY_AND_ASSIGN(InitializeAggregationOperator);
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 3d1c14a..0f4795f 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -40,8 +40,9 @@
 #include "expressions/scalar/Scalar.hpp"
 #include "storage/AggregationOperationState.pb.h"
 #include "storage/CollisionFreeVectorTable.hpp"
-#include "storage/HashTableFactory.hpp"
+#include "storage/HashTable.pb.h"
 #include "storage/HashTableBase.hpp"
+#include "storage/HashTableFactory.hpp"
 #include "storage/InsertDestination.hpp"
 #include "storage/PackedPayloadHashTable.hpp"
 #include "storage/StorageBlock.hpp"
@@ -63,8 +64,13 @@
 
 #include "glog/logging.h"
 
+using std::size_t;
+using std::vector;
+
 namespace quickstep {
 
+namespace S = serialization;
+
 AggregationOperationState::AggregationOperationState(
     const CatalogRelationSchema &input_relation,
     const std::vector<const AggregateFunction *> &aggregate_functions,
@@ -77,7 +83,10 @@
     const std::size_t num_partitions,
     const HashTableImplType hash_table_impl_type,
     const std::vector<HashTableImplType> &distinctify_hash_table_impl_types,
-    StorageManager *storage_manager)
+    StorageManager *storage_manager,
+    const size_t collision_free_vector_memory_size,
+    const size_t collision_free_vector_num_init_partitions,
+    const vector<size_t> &collision_free_vector_state_offsets)
     : input_relation_(input_relation),
       is_aggregate_collision_free_(
           group_by.empty() ? false
@@ -208,7 +217,10 @@
               estimated_num_entries,
               group_by_handles,
               storage_manager,
-              num_partitions));
+              num_partitions,
+              collision_free_vector_memory_size,
+              collision_free_vector_num_init_partitions,
+              collision_free_vector_state_offsets));
     } else if (is_aggregate_partitioned_) {
       if (all_distinct_) {
         DCHECK_EQ(1u, group_by_handles.size());
@@ -288,6 +300,19 @@
         PredicateFactory::ReconstructFromProto(proto.predicate(), database));
   }
 
+  size_t collision_free_vector_memory_size = 0;
+  size_t collision_free_vector_num_init_partitions = 0;
+  vector<size_t> collision_free_vector_state_offsets;
+  if (proto.has_collision_free_vector_info()) {
+    const serialization::CollisionFreeVectorInfo &collision_free_vector_info =
+        proto.collision_free_vector_info();
+    collision_free_vector_memory_size = collision_free_vector_info.memory_size();
+    collision_free_vector_num_init_partitions = collision_free_vector_info.num_init_partitions();
+    for (int i = 0; i < collision_free_vector_info.state_offsets_size(); ++i) {
+      collision_free_vector_state_offsets.push_back(collision_free_vector_info.state_offsets(i));
+    }
+  }
+
   return new AggregationOperationState(
       database.getRelationSchemaById(proto.relation_id()),
       aggregate_functions,
@@ -300,7 +325,10 @@
       proto.num_partitions(),
       HashTableImplTypeFromProto(proto.hash_table_impl_type()),
       distinctify_hash_table_impl_types,
-      storage_manager);
+      storage_manager,
+      collision_free_vector_memory_size,
+      collision_free_vector_num_init_partitions,
+      collision_free_vector_state_offsets);
 }
 
 bool AggregationOperationState::ProtoIsValid(
@@ -345,18 +373,31 @@
     }
   }
 
-  for (int i = 0; i < proto.group_by_expressions_size(); ++i) {
+  const int group_by_expressions_size = proto.group_by_expressions_size();
+  for (int i = 0; i < group_by_expressions_size; ++i) {
     if (!ScalarFactory::ProtoIsValid(proto.group_by_expressions(i), database)) {
       return false;
     }
   }
 
-  if (proto.group_by_expressions_size() > 0) {
+  if (group_by_expressions_size > 0) {
     if (!proto.has_hash_table_impl_type() ||
         !serialization::HashTableImplType_IsValid(
             proto.hash_table_impl_type())) {
       return false;
     }
+
+    if (proto.hash_table_impl_type() == S::HashTableImplType::COLLISION_FREE_VECTOR) {
+      if (!proto.has_collision_free_vector_info()) {
+        return false;
+      }
+
+      const S::CollisionFreeVectorInfo &proto_collision_free_vector_info = proto.collision_free_vector_info();
+      if (!proto_collision_free_vector_info.IsInitialized() ||
+          proto_collision_free_vector_info.state_offsets_size() != group_by_expressions_size) {
+        return false;
+      }
+    }
   }
 
   if (proto.has_predicate()) {
@@ -368,15 +409,6 @@
   return true;
 }
 
-std::size_t AggregationOperationState::getNumInitializationPartitions() const {
-  if (is_aggregate_collision_free_) {
-    return static_cast<CollisionFreeVectorTable *>(
-        collision_free_hashtable_.get())->getNumInitializationPartitions();
-  } else {
-    return 0u;
-  }
-}
-
 CollisionFreeVectorTable* AggregationOperationState
     ::getCollisionFreeVectorTable() const {
   return static_cast<CollisionFreeVectorTable *>(
diff --git a/storage/AggregationOperationState.hpp b/storage/AggregationOperationState.hpp
index 6174478..c7680b0 100644
--- a/storage/AggregationOperationState.hpp
+++ b/storage/AggregationOperationState.hpp
@@ -108,6 +108,12 @@
    * @param storage_manager The StorageManager to use for allocating hash
    *        tables. Single aggregation state (when GROUP BY list is not
    *        specified) is not allocated using memory from storage manager.
+   * @param collision_free_vector_memory_size For CollisionFreeVectorTable,
+   *        the memory size.
+   * @param collision_free_vector_num_init_partitions For
+   *        CollisionFreeVectorTable, the number of partitions to initialize.
+   * @param collision_free_vector_state_offsets For CollisionFreeVectorTable,
+   *        the offsets for each state.
    */
   AggregationOperationState(
       const CatalogRelationSchema &input_relation,
@@ -121,7 +127,10 @@
       const std::size_t num_partitions,
       const HashTableImplType hash_table_impl_type,
       const std::vector<HashTableImplType> &distinctify_hash_table_impl_types,
-      StorageManager *storage_manager);
+      StorageManager *storage_manager,
+      const std::size_t collision_free_vector_memory_size = 0,
+      const std::size_t collision_free_vector_num_init_partitions = 0,
+      const std::vector<std::size_t> &collision_free_vector_state_offsets = std::vector<std::size_t>());
 
   ~AggregationOperationState() {}
 
diff --git a/storage/AggregationOperationState.proto b/storage/AggregationOperationState.proto
index 1a8a302..d2305f1 100644
--- a/storage/AggregationOperationState.proto
+++ b/storage/AggregationOperationState.proto
@@ -45,4 +45,7 @@
 
   optional bool is_partitioned = 8;
   optional uint64 num_partitions = 9 [default = 1];
+
+  // Required if 'hash_table_impl_type' is 'COLLISION_FREE_VECTOR'.
+  optional CollisionFreeVectorInfo collision_free_vector_info = 10;
 }
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index 3d90bc5..6446a83 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -283,6 +283,7 @@
                       quickstep_storage_HashTableBase
                       quickstep_storage_HashTableFactory
                       quickstep_storage_HashTablePool
+                      quickstep_storage_HashTable_proto
                       quickstep_storage_InsertDestination
                       quickstep_storage_PartitionedHashTablePool
                       quickstep_storage_PackedPayloadHashTable
diff --git a/storage/CollisionFreeVectorTable.cpp b/storage/CollisionFreeVectorTable.cpp
index 679f77b..e803954 100644
--- a/storage/CollisionFreeVectorTable.cpp
+++ b/storage/CollisionFreeVectorTable.cpp
@@ -43,68 +43,33 @@
 CollisionFreeVectorTable::CollisionFreeVectorTable(
     const Type *key_type,
     const std::size_t num_entries,
+    const std::size_t memory_size,
+    const std::size_t num_init_partitions,
     const std::size_t num_finalize_partitions,
+    const std::vector<std::size_t> &state_offsets,
     const std::vector<AggregationHandle *> &handles,
     StorageManager *storage_manager)
     : key_type_(key_type),
       num_entries_(num_entries),
       num_handles_(handles.size()),
       handles_(handles),
+      memory_size_(memory_size),
+      num_init_partitions_(num_init_partitions),
       num_finalize_partitions_(num_finalize_partitions),
       storage_manager_(storage_manager) {
   DCHECK_GT(num_entries, 0u);
   DCHECK_GT(num_finalize_partitions_, 0u);
-
-  std::size_t required_memory = 0;
-  const std::size_t existence_map_offset = 0;
-  std::vector<std::size_t> state_offsets;
-
-  required_memory += CacheLineAlignedBytes(
-      BarrieredReadWriteConcurrentBitVector::BytesNeeded(num_entries));
-
-  for (std::size_t i = 0; i < num_handles_; ++i) {
-    const AggregationHandle *handle = handles_[i];
-    const std::vector<const Type *> argument_types = handle->getArgumentTypes();
-
-    std::size_t state_size = 0;
-    switch (handle->getAggregationID()) {
-      case AggregationID::kCount: {
-        state_size = sizeof(std::atomic<std::size_t>);
-        break;
-      }
-      case AggregationID::kSum: {
-        DCHECK_EQ(1u, argument_types.size());
-        switch (argument_types.front()->getTypeID()) {
-          case TypeID::kInt:  // Fall through
-          case TypeID::kLong:
-            state_size = sizeof(std::atomic<std::int64_t>);
-            break;
-          case TypeID::kFloat:  // Fall through
-          case TypeID::kDouble:
-            state_size = sizeof(std::atomic<double>);
-            break;
-          default:
-            LOG(FATAL) << "Not implemented";
-        }
-        break;
-      }
-      default:
-        LOG(FATAL) << "Not implemented";
-    }
-
-    state_offsets.emplace_back(required_memory);
-    required_memory += CacheLineAlignedBytes(state_size * num_entries);
-  }
+  DCHECK_EQ(num_handles_, state_offsets.size());
 
   const std::size_t num_storage_slots =
-      storage_manager_->SlotsNeededForBytes(required_memory);
+      storage_manager_->SlotsNeededForBytes(memory_size_);
 
   const block_id blob_id = storage_manager_->createBlob(num_storage_slots);
   blob_ = storage_manager_->getBlobMutable(blob_id);
 
   void *memory_start = blob_->getMemoryMutable();
   existence_map_.reset(new BarrieredReadWriteConcurrentBitVector(
-      reinterpret_cast<char *>(memory_start) + existence_map_offset,
+      reinterpret_cast<char *>(memory_start),
       num_entries,
       false /* initialize */));
 
@@ -113,9 +78,6 @@
     vec_tables_.emplace_back(
         reinterpret_cast<char *>(memory_start) + state_offsets.at(i));
   }
-
-  memory_size_ = required_memory;
-  num_init_partitions_ = CalculateNumInitializationPartitions(memory_size_);
 }
 
 CollisionFreeVectorTable::~CollisionFreeVectorTable() {
diff --git a/storage/CollisionFreeVectorTable.hpp b/storage/CollisionFreeVectorTable.hpp
index 5aeb5cf..8e1342b 100644
--- a/storage/CollisionFreeVectorTable.hpp
+++ b/storage/CollisionFreeVectorTable.hpp
@@ -58,8 +58,12 @@
    *
    * @param key_type The group-by key type.
    * @param num_entries The estimated number of entries this table will hold.
+   * @param memory_size The memory size for this table.
+   * @param num_init_partitions The number of partitions to be used for
+   *        initializing the aggregation.
    * @param num_finalize_partitions The number of partitions to be used for
    *        finalizing the aggregation.
+   * @param state_offsets The offsets for each state in the table.
    * @param handles The aggregation handles.
    * @param storage_manager The StorageManager to use (a StorageBlob will be
    *        allocated to hold this table's contents).
@@ -67,7 +71,10 @@
   CollisionFreeVectorTable(
       const Type *key_type,
       const std::size_t num_entries,
+      const std::size_t memory_size,
+      const std::size_t num_init_partitions,
       const std::size_t num_finalize_partitions,
+      const std::vector<std::size_t> &state_offsets,
       const std::vector<AggregationHandle *> &handles,
       StorageManager *storage_manager);
 
@@ -181,21 +188,6 @@
   }
 
  private:
-  inline static std::size_t CacheLineAlignedBytes(const std::size_t actual_bytes) {
-    return (actual_bytes + kCacheLineBytes - 1) / kCacheLineBytes * kCacheLineBytes;
-  }
-
-  inline static std::size_t CalculateNumInitializationPartitions(
-      const std::size_t memory_size) {
-    // Set initialization memory block size as 4MB.
-    constexpr std::size_t kInitBlockSize = 4uL * 1024u * 1024u;
-
-    // At least 1 partition, at most 80 partitions.
-    // TODO(jianqiao): set the upbound as (# of workers * 2) instead of the
-    // hardcoded 80.
-    return std::max(1uL, std::min(memory_size / kInitBlockSize, 80uL));
-  }
-
   inline std::size_t calculatePartitionLength() const {
     const std::size_t partition_length =
         (num_entries_ + num_finalize_partitions_ - 1) / num_finalize_partitions_;
@@ -333,14 +325,13 @@
   std::unique_ptr<BarrieredReadWriteConcurrentBitVector> existence_map_;
   std::vector<void *> vec_tables_;
 
+  const std::size_t memory_size_;
+  const std::size_t num_init_partitions_;
   const std::size_t num_finalize_partitions_;
 
   StorageManager *storage_manager_;
   MutableBlobReference blob_;
 
-  std::size_t memory_size_;
-  std::size_t num_init_partitions_;
-
   DISALLOW_COPY_AND_ASSIGN(CollisionFreeVectorTable);
 };
 
diff --git a/storage/HashTable.proto b/storage/HashTable.proto
index ed383df..d489b9f 100644
--- a/storage/HashTable.proto
+++ b/storage/HashTable.proto
@@ -29,6 +29,12 @@
   THREAD_PRIVATE_COMPACT_KEY = 4;
 }
 
+message CollisionFreeVectorInfo {
+  required uint64 memory_size = 1;
+  required uint64 num_init_partitions = 2;
+  repeated uint64 state_offsets = 3;
+}
+
 // NOTE(chasseur): This proto describes the run-time parameters for a resizable
 // HashTable. It does not describe any template parameters of the HashTable
 // class, which are different in different contexts (e.g. join vs. grouping).
diff --git a/storage/HashTableFactory.hpp b/storage/HashTableFactory.hpp
index d367160..732920f 100644
--- a/storage/HashTableFactory.hpp
+++ b/storage/HashTableFactory.hpp
@@ -358,6 +358,12 @@
    *        hash table constructor.
    * @param num_partitions The number of partitions of this aggregation state
    *        hash table.
+   * @param collision_free_vector_memory_size For CollisionFreeVectorTable,
+   *        the memory size.
+   * @param collision_free_vector_num_init_partitions For
+   *        CollisionFreeVectorTable, the number of partitions to initialize.
+   * @param collision_free_vector_state_offsets For CollisionFreeVectorTable,
+   *        the offsets for each state.
    * @return A new aggregation state hash table.
    **/
   static AggregationStateHashTableBase* CreateResizable(
@@ -366,12 +372,17 @@
       const std::size_t num_entries,
       const std::vector<AggregationHandle *> &handles,
       StorageManager *storage_manager,
-      const std::size_t num_partitions = 1u) {
+      const std::size_t num_partitions = 1u,
+      const std::size_t collision_free_vector_memory_size = 0,
+      const std::size_t collision_free_vector_num_init_partitions = 0,
+      const std::vector<std::size_t> &collision_free_vector_state_offsets = std::vector<std::size_t>()) {
     switch (hash_table_type) {
       case HashTableImplType::kCollisionFreeVector:
         DCHECK_EQ(1u, key_types.size());
         return new CollisionFreeVectorTable(
-            key_types.front(), num_entries, num_partitions, handles, storage_manager);
+            key_types.front(), num_entries, collision_free_vector_memory_size,
+            collision_free_vector_num_init_partitions, num_partitions,
+            collision_free_vector_state_offsets, handles, storage_manager);
       case HashTableImplType::kSeparateChaining:
         return new PackedPayloadHashTable(
             key_types, num_entries, handles, storage_manager);
diff --git a/storage/StorageConstants.hpp b/storage/StorageConstants.hpp
index 037a8a9..c4debe0 100644
--- a/storage/StorageConstants.hpp
+++ b/storage/StorageConstants.hpp
@@ -122,6 +122,9 @@
 // to indicate zero-sized blocks or sub-blocks.
 const std::size_t kZeroSize = 0;
 
+// Set initialization memory blob size for CollisonFreeVector as 4MB.
+constexpr std::size_t kCollisonFreeVectorInitBlobSize = 4uL * 1024u * 1024u;
+
 /** @} */
 
 }  // namespace quickstep