Updates
diff --git a/expressions/aggregation/AggFunc.hpp b/expressions/aggregation/AggFunc.hpp
index 31f385e..98b19ba 100644
--- a/expressions/aggregation/AggFunc.hpp
+++ b/expressions/aggregation/AggFunc.hpp
@@ -69,6 +69,11 @@
                    typename AggState<ArgType>::AtomicT> {};
 
   template <typename ArgType>
+  inline static void InitAtomic(typename AggState<ArgType>::AtomicT *state) {
+    state->store(0, std::memory_order_relaxed);
+  }
+
+  template <typename ArgType>
   inline static void MergeArgAtomic(const typename ArgType::cpptype &value,
                                     typename AggState<ArgType>::AtomicT *state) {
     LOG(FATAL) << "Not implemented";
@@ -81,6 +86,11 @@
   }
 
   template <typename ArgType>
+  inline static void InitUnsafe(typename AggState<ArgType>::T *state) {
+    *state = 0;
+  }
+
+  template <typename ArgType>
   inline static void MergeArgUnsafe(const typename ArgType::cpptype &value,
                                     typename AggState<ArgType>::T *state) {
     *state += value;
diff --git a/storage/AggregationOperationState.cpp b/storage/AggregationOperationState.cpp
index 00bb433..392e0f6 100644
--- a/storage/AggregationOperationState.cpp
+++ b/storage/AggregationOperationState.cpp
@@ -212,7 +212,7 @@
           AggregationStateHashTableFactory::CreateResizable(
               hash_table_impl_type,
               group_by_types_,
-              estimated_num_entries,
+              estimated_num_entries * 2,
               group_by_handles,
               storage_manager_));
     } else {
@@ -384,12 +384,12 @@
     }
   }
 
-  // There are GROUP BYs without DISTINCT. Check if the estimated number of
-  // groups is large enough to warrant a partitioned aggregation.
-  return estimated_num_groups >
-         static_cast<std::size_t>(
-             FLAGS_partition_aggregation_num_groups_threshold);
-  return false;
+//  // There are GROUP BYs without DISTINCT. Check if the estimated number of
+//  // groups is large enough to warrant a partitioned aggregation.
+//  return estimated_num_groups >
+//         static_cast<std::size_t>(
+//             FLAGS_partition_aggregation_num_groups_threshold);
+  return true;
 }
 
 std::size_t AggregationOperationState::getNumInitializationPartitions() const {
@@ -663,60 +663,10 @@
     InsertDestination *output_destination) {
   PackedPayloadHashTable *hash_table =
       static_cast<PackedPayloadHashTable *>(partitioned_hashtable_.get());
+//  std::cout << hash_table->numEntries() << "\n";
 
-  // Each element of 'group_by_keys' is a vector of values for a particular
-  // group (which is also the prefix of the finalized Tuple for that group).
-  std::vector<std::vector<TypedValue>> group_by_keys;
-
-  if (handles_.empty()) {
-    hash_table->forEachCompositeKeyInPartition(
-        partition_id,
-        [&](std::vector<TypedValue> &group_by_key) -> void {
-      group_by_keys.emplace_back(std::move(group_by_key));
-    });
-  }
-
-  // Collect per-aggregate finalized values.
-  std::vector<std::unique_ptr<ColumnVector>> final_values;
-  for (std::size_t agg_idx = 0; agg_idx < handles_.size(); ++agg_idx) {
-    ColumnVector *agg_result_col = handles_[agg_idx]->finalizeHashTable(
-        *hash_table, agg_idx, &group_by_keys);
-    if (agg_result_col != nullptr) {
-      final_values.emplace_back(agg_result_col);
-    }
-  }
-//  hash_table->destroyPayload();
-
-  std::vector<std::unique_ptr<ColumnVector>> group_by_cvs;
-  std::size_t group_by_element_idx = 0;
-  for (const Type *group_by_type : group_by_types_) {
-    if (NativeColumnVector::UsableForType(*group_by_type)) {
-      NativeColumnVector *element_cv =
-          new NativeColumnVector(*group_by_type, group_by_keys.size());
-      group_by_cvs.emplace_back(element_cv);
-      for (std::vector<TypedValue> &group_key : group_by_keys) {
-        element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
-      }
-    } else {
-      IndirectColumnVector *element_cv =
-          new IndirectColumnVector(*group_by_type, group_by_keys.size());
-      group_by_cvs.emplace_back(element_cv);
-      for (std::vector<TypedValue> &group_key : group_by_keys) {
-        element_cv->appendTypedValue(std::move(group_key[group_by_element_idx]));
-      }
-    }
-    ++group_by_element_idx;
-  }
-
-  // Stitch together a ColumnVectorsValueAccessor combining the GROUP BY keys
-  // and the finalized aggregates.
   ColumnVectorsValueAccessor complete_result;
-  for (std::unique_ptr<ColumnVector> &group_by_cv : group_by_cvs) {
-    complete_result.addColumn(group_by_cv.release());
-  }
-  for (std::unique_ptr<ColumnVector> &final_value_cv : final_values) {
-    complete_result.addColumn(final_value_cv.release());
-  }
+  hash_table->finalize(partition_id, &complete_result);
 
   // Bulk-insert the complete result.
   output_destination->bulkInsertTuples(&complete_result);
diff --git a/storage/AggregationUtil.hpp b/storage/AggregationUtil.hpp
new file mode 100644
index 0000000..74a9095
--- /dev/null
+++ b/storage/AggregationUtil.hpp
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_STORAGE_AGGREGATION_UTIL_HPP_
+#define QUICKSTEP_STORAGE_AGGREGATION_UTIL_HPP_
+
+#include <type_traits>
+
+#include "expressions/aggregation/AggregationID.hpp"
+#include "expressions/aggregation/AggFunc.hpp"
+#include "storage/ValueAccessor.hpp"
+#include "storage/ValueAccessorMultiplexer.hpp"
+#include "storage/ValueAccessorUtil.hpp"
+#include "types/TypeID.hpp"
+#include "types/Type.hpp"
+#include "types/containers/ColumnVectorsValueAccessor.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+/** \addtogroup Storage
+ *  @{
+ */
+
+template <typename T>
+using remove_const_reference_t = std::remove_const_t<std::remove_reference_t<T>>;
+
+template <typename FunctorT>
+inline auto InvokeOnKeyType(const Type &type,
+                            const FunctorT &functor) {
+  switch (type.getTypeID()) {
+    case TypeID::kInt:
+      return functor(static_cast<const IntType&>(type));
+    case TypeID::kLong:
+      return functor(static_cast<const LongType&>(type));
+    default:
+      LOG(FATAL) << "Not supported";
+  }
+}
+
+template <typename FunctorT>
+inline auto InvokeOnType(const Type &type,
+                         const FunctorT &functor) {
+  switch (type.getTypeID()) {
+    case TypeID::kInt:
+      return functor(static_cast<const IntType&>(type));
+    case TypeID::kLong:
+      return functor(static_cast<const LongType&>(type));
+    case TypeID::kFloat:
+      return functor(static_cast<const FloatType&>(type));
+    case TypeID::kDouble:
+      return functor(static_cast<const DoubleType&>(type));
+    default:
+      LOG(FATAL) << "Not supported";
+  }
+}
+
+template <typename FunctorT>
+inline auto InvokeOnBool(const bool &val,
+                         const FunctorT &functor) {
+  if (val) {
+    return functor(std::true_type());
+  } else {
+    return functor(std::false_type());
+  }
+}
+
+template <typename FunctorT>
+inline auto InvokeOnBools(const bool &val1,
+                          const bool &val2,
+                          const FunctorT &functor) {
+  if (val1) {
+    if (val2) {
+      return functor(std::true_type(), std::true_type());
+    } else {
+      return functor(std::true_type(), std::false_type());
+    }
+  } else {
+    if (val2) {
+      return functor(std::false_type(), std::true_type());
+    } else {
+      return functor(std::false_type(), std::false_type());
+    }
+  }
+}
+
+template <typename FunctorT>
+inline auto InvokeOnAggFunc(const AggregationID &agg_id,
+                            const FunctorT &functor) {
+  switch (agg_id) {
+    case AggregationID::kSum: {
+      return functor(Sum());
+    }
+    default:
+      LOG(FATAL) << "Not supported";
+  }
+}
+
+template <typename FunctorT>
+inline auto InvokeIf(const std::true_type &val,
+                     const FunctorT &functor) {
+  return functor();
+}
+
+template <typename FunctorT>
+inline void InvokeIf(const std::false_type &val,
+                     const FunctorT &functor) {
+}
+
+//template <typename FunctorT>
+//inline void InvokeOnAggFuncIfApplicableToArgType(
+//    const AggregationID &agg_id,
+//    const Type &arg_type,
+//    const FunctorT &functor) {
+//  InvokeOnAggFunc(
+//      agg_id,
+//      [&](const auto &agg_func) -> void {
+//    InvokeOnType(
+//        arg_type,
+//        [&](const auto &arg_type) -> void {
+//      using AggFuncT = std::remove_reference_t<decltype(agg_func)>;
+//      using ArgT = remove_const_reference_t<decltype(arg_type)>;
+//
+//      InvokeIf(
+//          typename AggFuncT::template HasAtomicImpl<ArgT>(),
+//          [&]() -> void {
+//        functor(agg_func, arg_type);
+//      });
+//    });
+//  });
+//}
+
+template <typename FunctorT>
+inline void InvokeOnAggFuncWithArgType(
+    const AggregationID &agg_id,
+    const Type &arg_type,
+    const FunctorT &functor) {
+  InvokeOnAggFunc(
+      agg_id,
+      [&](const auto &agg_func) -> void {
+    InvokeOnType(
+        arg_type,
+        [&](const auto &arg_type) -> void {
+      functor(agg_func, arg_type);
+    });
+  });
+}
+
+template <typename FunctorT>
+inline auto InvokeOnTwoAccessors(
+    const ValueAccessorMultiplexer &accessor_mux,
+    const ValueAccessorSource &first_source,
+    const ValueAccessorSource &second_source,
+    const FunctorT &functor) {
+  ValueAccessor *base_accessor = accessor_mux.getBaseAccessor();
+  ColumnVectorsValueAccessor *derived_accessor =
+      static_cast<ColumnVectorsValueAccessor *>(accessor_mux.getDerivedAccessor());
+
+  InvokeOnAnyValueAccessor(
+      base_accessor,
+      [&](auto *accessor) {
+    if (first_source == ValueAccessorSource::kBase) {
+      if (second_source == ValueAccessorSource::kBase) {
+        return functor(std::false_type(), accessor, accessor);
+      } else {
+        return functor(std::true_type(), accessor, derived_accessor);
+      }
+    } else {
+      if (second_source == ValueAccessorSource::kBase) {
+        return functor(std::true_type(), derived_accessor, accessor);
+      } else {
+        return functor(std::false_type(), derived_accessor, derived_accessor);
+      }
+    }
+  });
+}
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_STORAGE_AGGREGATION_UTIL_HPP_
diff --git a/storage/CMakeLists.txt b/storage/CMakeLists.txt
index fcc069b..8ef3560 100644
--- a/storage/CMakeLists.txt
+++ b/storage/CMakeLists.txt
@@ -145,6 +145,7 @@
             AggregationOperationState.cpp
             AggregationOperationState.hpp)
 add_library(quickstep_storage_AggregationOperationState_proto ${storage_AggregationOperationState_proto_srcs})
+add_library(quickstep_storage_AggregationUtil ../empty_src.cpp AggregationUtil.hpp)
 add_library(quickstep_storage_BasicColumnStoreTupleStorageSubBlock
             BasicColumnStoreTupleStorageSubBlock.cpp
             BasicColumnStoreTupleStorageSubBlock.hpp)
@@ -304,6 +305,15 @@
                       quickstep_expressions_aggregation_AggregateFunction_proto
                       quickstep_storage_HashTable_proto
                       ${PROTOBUF_LIBRARY})
+target_link_libraries(quickstep_storage_AggregationUtil
+                      quickstep_expressions_aggregation_AggFunc
+                      quickstep_expressions_aggregation_AggregationID
+                      quickstep_storage_ValueAccessor
+                      quickstep_storage_ValueAccessorMultiplexer
+                      quickstep_storage_ValueAccessorUtil
+                      quickstep_types_Type
+                      quickstep_types_TypeID
+                      quickstep_types_containers_ColumnVectorsValueAccessor)
 target_link_libraries(quickstep_storage_BasicColumnStoreTupleStorageSubBlock
                       quickstep_catalog_CatalogAttribute
                       quickstep_catalog_CatalogRelationSchema
@@ -441,6 +451,7 @@
                       quickstep_expressions_aggregation_AggFunc
                       quickstep_expressions_aggregation_AggregationHandle
                       quickstep_expressions_aggregation_AggregationID
+                      quickstep_storage_AggregationUtil
                       quickstep_storage_HashTableBase
                       quickstep_storage_StorageBlob
                       quickstep_storage_StorageBlockInfo
@@ -453,7 +464,6 @@
                       quickstep_types_Type
                       quickstep_types_TypeID
                       quickstep_types_containers_ColumnVector
-                      quickstep_types_containers_ColumnVectorsValueAccessor
                       quickstep_utility_BoolVector
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_storage_ColumnStoreUtil
@@ -801,8 +811,11 @@
                       quickstep_utility_Macros
                       quickstep_utility_PrimeNumber)
 target_link_libraries(quickstep_storage_PackedPayloadHashTable
+                      ${GFLAGS_LIB_NAME}
                       quickstep_catalog_CatalogTypedefs
                       quickstep_expressions_aggregation_AggregationHandle
+                      quickstep_expressions_aggregation_AggFunc
+                      quickstep_storage_AggregationUtil
                       quickstep_storage_HashTableBase
                       quickstep_storage_HashTableKeyManager
                       quickstep_storage_StorageBlob
@@ -1111,6 +1124,7 @@
 target_link_libraries(quickstep_storage
                       quickstep_storage_AggregationOperationState
                       quickstep_storage_AggregationOperationState_proto
+                      quickstep_storage_AggregationUtil
                       quickstep_storage_BasicColumnStoreTupleStorageSubBlock
                       quickstep_storage_BasicColumnStoreValueAccessor
                       quickstep_storage_BloomFilterIndexSubBlock
diff --git a/storage/CollisionFreeVectorTable.cpp b/storage/CollisionFreeVectorTable.cpp
index c92f0ab..4c57cc9 100644
--- a/storage/CollisionFreeVectorTable.cpp
+++ b/storage/CollisionFreeVectorTable.cpp
@@ -29,14 +29,13 @@
 
 #include "expressions/aggregation/AggregationHandle.hpp"
 #include "expressions/aggregation/AggregationID.hpp"
+#include "storage/AggregationUtil.hpp"
 #include "storage/StorageBlockInfo.hpp"
 #include "storage/StorageManager.hpp"
 #include "storage/ValueAccessor.hpp"
 #include "storage/ValueAccessorMultiplexer.hpp"
 #include "storage/ValueAccessorUtil.hpp"
 #include "threading/SpinMutex.hpp"
-#include "types/TypeID.hpp"
-#include "types/containers/ColumnVectorsValueAccessor.hpp"
 #include "utility/BoolVector.hpp"
 
 #include "glog/logging.h"
@@ -46,163 +45,6 @@
 DEFINE_uint64(vt_threadprivate_threshold, 1000000L, "");
 DEFINE_bool(use_latch, false, "");
 
-namespace {
-
-template <typename T>
-using remove_const_reference_t = std::remove_const_t<std::remove_reference_t<T>>;
-
-template <typename FunctorT>
-inline auto InvokeOnKeyType(const Type &type,
-                            const FunctorT &functor) {
-  switch (type.getTypeID()) {
-    case TypeID::kInt:
-      return functor(static_cast<const IntType&>(type));
-    case TypeID::kLong:
-      return functor(static_cast<const LongType&>(type));
-    default:
-      LOG(FATAL) << "Not supported";
-  }
-}
-
-template <typename FunctorT>
-inline auto InvokeOnType(const Type &type,
-                         const FunctorT &functor) {
-  switch (type.getTypeID()) {
-    case TypeID::kInt:
-      return functor(static_cast<const IntType&>(type));
-    case TypeID::kLong:
-      return functor(static_cast<const LongType&>(type));
-    case TypeID::kFloat:
-      return functor(static_cast<const FloatType&>(type));
-    case TypeID::kDouble:
-      return functor(static_cast<const DoubleType&>(type));
-    default:
-      LOG(FATAL) << "Not supported";
-  }
-}
-
-template <typename FunctorT>
-inline auto InvokeOnBool(const bool &val,
-                         const FunctorT &functor) {
-  if (val) {
-    return functor(std::true_type());
-  } else {
-    return functor(std::false_type());
-  }
-}
-
-template <typename FunctorT>
-inline auto InvokeOnBools(const bool &val1,
-                          const bool &val2,
-                          const FunctorT &functor) {
-  if (val1) {
-    if (val2) {
-      return functor(std::true_type(), std::true_type());
-    } else {
-      return functor(std::true_type(), std::false_type());
-    }
-  } else {
-    if (val2) {
-      return functor(std::false_type(), std::true_type());
-    } else {
-      return functor(std::false_type(), std::false_type());
-    }
-  }
-}
-
-template <typename FunctorT>
-inline auto InvokeOnAggFunc(const AggregationID &agg_id,
-                            const FunctorT &functor) {
-  switch (agg_id) {
-    case AggregationID::kSum: {
-      return functor(Sum());
-    }
-    default:
-      LOG(FATAL) << "Not supported";
-  }
-}
-
-template <typename FunctorT>
-inline auto InvokeIf(const std::true_type &val,
-                     const FunctorT &functor) {
-  return functor();
-}
-
-template <typename FunctorT>
-inline void InvokeIf(const std::false_type &val,
-                     const FunctorT &functor) {
-}
-
-//template <typename FunctorT>
-//inline void InvokeOnAggFuncIfApplicableToArgType(
-//    const AggregationID &agg_id,
-//    const Type &arg_type,
-//    const FunctorT &functor) {
-//  InvokeOnAggFunc(
-//      agg_id,
-//      [&](const auto &agg_func) -> void {
-//    InvokeOnType(
-//        arg_type,
-//        [&](const auto &arg_type) -> void {
-//      using AggFuncT = std::remove_reference_t<decltype(agg_func)>;
-//      using ArgT = remove_const_reference_t<decltype(arg_type)>;
-//
-//      InvokeIf(
-//          typename AggFuncT::template HasAtomicImpl<ArgT>(),
-//          [&]() -> void {
-//        functor(agg_func, arg_type);
-//      });
-//    });
-//  });
-//}
-
-template <typename FunctorT>
-inline void InvokeOnAggFuncWithArgType(
-    const AggregationID &agg_id,
-    const Type &arg_type,
-    const FunctorT &functor) {
-  InvokeOnAggFunc(
-      agg_id,
-      [&](const auto &agg_func) -> void {
-    InvokeOnType(
-        arg_type,
-        [&](const auto &arg_type) -> void {
-      functor(agg_func, arg_type);
-    });
-  });
-}
-
-template <typename FunctorT>
-inline auto InvokeOnTwoAccessors(
-    const ValueAccessorMultiplexer &accessor_mux,
-    const ValueAccessorSource &first_source,
-    const ValueAccessorSource &second_source,
-    const FunctorT &functor) {
-  ValueAccessor *base_accessor = accessor_mux.getBaseAccessor();
-  ColumnVectorsValueAccessor *derived_accessor =
-      static_cast<ColumnVectorsValueAccessor *>(accessor_mux.getDerivedAccessor());
-
-  InvokeOnAnyValueAccessor(
-      base_accessor,
-      [&](auto *accessor) {
-    if (first_source == ValueAccessorSource::kBase) {
-      if (second_source == ValueAccessorSource::kBase) {
-        return functor(std::false_type(), accessor, accessor);
-      } else {
-        return functor(std::true_type(), accessor, derived_accessor);
-      }
-    } else {
-      if (second_source == ValueAccessorSource::kBase) {
-        return functor(std::true_type(), derived_accessor, accessor);
-      } else {
-        return functor(std::false_type(), derived_accessor, derived_accessor);
-      }
-    }
-  });
-}
-
-}  // namespace
-
 CollisionFreeVectorTable::CollisionFreeVectorTable(
     const Type *key_type,
     const std::size_t num_entries,
diff --git a/storage/PackedPayloadHashTable.cpp b/storage/PackedPayloadHashTable.cpp
index bd7e960..c9d956c 100644
--- a/storage/PackedPayloadHashTable.cpp
+++ b/storage/PackedPayloadHashTable.cpp
@@ -26,6 +26,8 @@
 #include <vector>
 
 #include "expressions/aggregation/AggregationHandle.hpp"
+#include "expressions/aggregation/AggFunc.hpp"
+#include "storage/AggregationUtil.hpp"
 #include "storage/HashTableKeyManager.hpp"
 #include "storage/StorageBlob.hpp"
 #include "storage/StorageBlockInfo.hpp"
@@ -53,28 +55,14 @@
     : key_types_(key_types),
       num_handles_(handles.size()),
       handles_(handles),
-      total_payload_size_(ComputeTotalPayloadSize(handles)),
+      state_sizes_(ComputeStateSizes(handles)),
+      total_payload_size_(ComputeTotalPayloadSize(state_sizes_)),
+      state_offsets_(ComputeStateOffsets(state_sizes_)),
       storage_manager_(storage_manager),
       kBucketAlignment(alignof(std::atomic<std::size_t>)),
       kValueOffset(sizeof(std::atomic<std::size_t>) + sizeof(std::size_t)),
       key_manager_(key_types_, kValueOffset + total_payload_size_),
       bucket_size_(ComputeBucketSize(key_manager_.getFixedKeySize())) {
-  std::size_t payload_offset_running_sum = sizeof(SpinMutex);
-  for (const auto *handle : handles) {
-    payload_offsets_.emplace_back(payload_offset_running_sum);
-    payload_offset_running_sum += handle->getPayloadSize();
-  }
-
-  // NOTE(jianqiao): Potential memory leak / double freeing by copying from
-  // init_payload to buckets if payload contains out of line data.
-  init_payload_ =
-      static_cast<std::uint8_t *>(calloc(this->total_payload_size_, 1));
-  DCHECK(init_payload_ != nullptr);
-
-  for (std::size_t i = 0; i < num_handles_; ++i) {
-    handles_[i]->initPayload(init_payload_ + payload_offsets_[i]);
-  }
-
   // Bucket size always rounds up to the alignment requirement of the atomic
   // size_t "next" pointer at the front or a ValueT, whichever is larger.
   //
@@ -192,7 +180,6 @@
     blob_.release();
     storage_manager_->deleteBlockOrBlobFile(blob_id);
   }
-  std::free(init_payload_);
 }
 
 void PackedPayloadHashTable::clear() {
@@ -214,17 +201,6 @@
 }
 
 void PackedPayloadHashTable::destroyPayload() {
-  const std::size_t num_buckets =
-      header_->buckets_allocated.load(std::memory_order_relaxed);
-  void *bucket_ptr = static_cast<char *>(buckets_) + kValueOffset;
-  for (std::size_t bucket_num = 0; bucket_num < num_buckets; ++bucket_num) {
-    for (std::size_t handle_id = 0; handle_id < num_handles_; ++handle_id) {
-      void *value_internal_ptr =
-          static_cast<char *>(bucket_ptr) + this->payload_offsets_[handle_id];
-      handles_[handle_id]->destroyPayload(static_cast<std::uint8_t *>(value_internal_ptr));
-    }
-    bucket_ptr = static_cast<char *>(bucket_ptr) + bucket_size_;
-  }
 }
 
 bool PackedPayloadHashTable::upsertValueAccessorCompositeKey(
@@ -232,24 +208,131 @@
     const std::vector<MultiSourceAttributeId> &key_attr_ids,
     const ValueAccessorMultiplexer &accessor_mux) {
   ValueAccessor *base_accessor = accessor_mux.getBaseAccessor();
-  ValueAccessor *derived_accessor = accessor_mux.getDerivedAccessor();
+  CHECK(accessor_mux.getDerivedAccessor() == nullptr);
 
-  base_accessor->beginIterationVirtual();
-  if (derived_accessor == nullptr) {
-    return upsertValueAccessorCompositeKeyInternal<false>(
-        argument_ids,
-        key_attr_ids,
-        base_accessor,
-        nullptr);
-  } else {
-    DCHECK(derived_accessor->getImplementationType()
-               == ValueAccessor::Implementation::kColumnVectors);
-    derived_accessor->beginIterationVirtual();
-    return upsertValueAccessorCompositeKeyInternal<true>(
-        argument_ids,
-        key_attr_ids,
-        base_accessor,
-        static_cast<ColumnVectorsValueAccessor *>(derived_accessor));
+  std::vector<attribute_id> key_ids;
+  for (const auto &key_attr_id : key_attr_ids) {
+    key_ids.emplace_back(key_attr_id.attr_id);
+  }
+
+  for (std::size_t i = 0; i < num_handles_; ++i) {
+    DCHECK_LE(argument_ids[i].size(), 1u);
+
+    const AggregationHandle *handle = handles_[i];
+    const auto &argument_types = handle->getArgumentTypes();
+    const auto &argument_ids_i = argument_ids[i];
+
+    attribute_id argument_id = kInvalidAttributeID;
+    const Type *argument_type = nullptr;
+
+    if (argument_ids_i.empty()) {
+      LOG(FATAL) << "Not supported";
+    } else {
+      DCHECK_EQ(1u, argument_ids_i.size());
+      argument_id = argument_ids_i.front().attr_id;
+
+      DCHECK_EQ(1u, argument_types.size());
+      argument_type = argument_types.front();
+    }
+
+    InvokeOnAggFuncWithArgType(
+        handle->getAggregationID(),
+        *argument_type,
+        [&](const auto &agg_func, const auto &arg_type) {
+      using AggFuncT = std::remove_reference_t<decltype(agg_func)>;
+      using ArgT = remove_const_reference_t<decltype(arg_type)>;
+
+      InvokeOnAnyValueAccessor(
+          base_accessor,
+          [&](auto *accessor) -> void {  // NOLINT(build/c++11)
+
+        if (key_ids.size() == 1) {
+          if (FLAGS_use_latch) {
+            this->upsertValueAccessorInternalUnaryLatch<AggFuncT, ArgT>(
+                argument_id,
+                key_ids.front(),
+                state_offsets_[i],
+                accessor);
+          } else {
+            this->upsertValueAccessorInternalUnaryAtomic<AggFuncT, ArgT>(
+                argument_id,
+                key_ids.front(),
+                state_offsets_[i],
+                accessor);
+          }
+        } else {
+          if (FLAGS_use_latch) {
+            this->upsertValueAccessorCompositeKeyInternalUnaryLatch<AggFuncT, ArgT>(
+                argument_id,
+                key_ids,
+                state_offsets_[i],
+                accessor);
+          } else {
+            this->upsertValueAccessorCompositeKeyInternalUnaryAtomic<AggFuncT, ArgT>(
+                argument_id,
+                key_ids,
+                state_offsets_[i],
+                accessor);
+          }
+        }
+      });
+    });
+  }
+  return true;
+}
+
+void PackedPayloadHashTable::finalize(const std::size_t partition_id,
+                                      ColumnVectorsValueAccessor *results) {
+  const std::size_t start_position =
+      calculatePartitionStartPosition(partition_id);
+  const std::size_t end_position =
+      calculatePartitionEndPosition(partition_id);
+
+  const std::size_t num_entries = end_position - start_position;
+
+  for (std::size_t key_idx = 0; key_idx < key_types_.size(); ++key_idx) {
+    NativeColumnVector *key_cv =
+        new NativeColumnVector(*key_types_[key_idx], num_entries);
+    finalizeKey(start_position, end_position, key_idx, key_cv);
+    results->addColumn(key_cv);
+  }
+
+  for (std::size_t i = 0; i < num_handles_; ++i) {
+    const AggregationHandle *handle = handles_[i];
+    const auto &argument_types = handle->getArgumentTypes();
+
+    const Type *argument_type = nullptr;
+    if (argument_types.empty()) {
+      LOG(FATAL) << "Not supported";
+    } else {
+      DCHECK_EQ(1u, argument_types.size());
+      argument_type = argument_types.front();
+    }
+
+    NativeColumnVector *result_cv =
+        new NativeColumnVector(*handle->getResultType(), num_entries);
+
+    InvokeOnAggFuncWithArgType(
+        handle->getAggregationID(),
+        *argument_type,
+        [&](const auto &agg_func, const auto &arg_type) {
+      using AggFuncT = std::remove_reference_t<decltype(agg_func)>;
+      using ArgT = remove_const_reference_t<decltype(arg_type)>;
+
+      if (FLAGS_use_latch) {
+        this->finalizeStateLatch<AggFuncT, ArgT>(start_position,
+                                                 end_position,
+                                                 state_offsets_[i],
+                                                 result_cv);
+      } else {
+        this->finalizeStateAtomic<AggFuncT, ArgT>(start_position,
+                                                  end_position,
+                                                  state_offsets_[i],
+                                                  result_cv);
+      }
+    });
+
+    results->addColumn(result_cv);
   }
 }
 
diff --git a/storage/PackedPayloadHashTable.hpp b/storage/PackedPayloadHashTable.hpp
index 9ba5500..1a56f94 100644
--- a/storage/PackedPayloadHashTable.hpp
+++ b/storage/PackedPayloadHashTable.hpp
@@ -20,6 +20,7 @@
 #ifndef QUICKSTEP_STORAGE_PACKED_PAYLOAD_HASH_TABLE_HPP_
 #define QUICKSTEP_STORAGE_PACKED_PAYLOAD_HASH_TABLE_HPP_
 
+#include <algorithm>
 #include <atomic>
 #include <cstddef>
 #include <cstdint>
@@ -29,6 +30,8 @@
 
 #include "catalog/CatalogTypedefs.hpp"
 #include "expressions/aggregation/AggregationHandle.hpp"
+#include "expressions/aggregation/AggFunc.hpp"
+#include "storage/AggregationUtil.hpp"
 #include "storage/HashTableBase.hpp"
 #include "storage/HashTableKeyManager.hpp"
 #include "storage/StorageBlob.hpp"
@@ -38,14 +41,20 @@
 #include "threading/SpinMutex.hpp"
 #include "threading/SpinSharedMutex.hpp"
 #include "types/TypedValue.hpp"
+#include "types/containers/ColumnVector.hpp"
 #include "types/containers/ColumnVectorsValueAccessor.hpp"
 #include "utility/HashPair.hpp"
 #include "utility/Macros.hpp"
 
+#include "gflags/gflags.h"
+
 #include "glog/logging.h"
 
 namespace quickstep {
 
+DECLARE_int32(num_workers);
+DECLARE_bool(use_latch);
+
 class StorageManager;
 class Type;
 class ValueAccessor;
@@ -126,6 +135,9 @@
       const std::vector<MultiSourceAttributeId> &key_ids,
       const ValueAccessorMultiplexer &accessor_mux) override;
 
+  void finalize(const std::size_t partition_id,
+                ColumnVectorsValueAccessor *results);
+
   /**
    * @return The ID of the StorageBlob used to store this hash table.
    **/
@@ -364,16 +376,55 @@
                                        const std::uint8_t **value,
                                        std::size_t *entry_num) const;
 
-  inline std::uint8_t* upsertCompositeKeyInternal(
-      const std::vector<TypedValue> &key,
-      const std::size_t variable_key_size);
+  inline std::uint8_t* upsertInternal(const TypedValue &key);
 
-  template <bool use_two_accessors>
-  inline bool upsertValueAccessorCompositeKeyInternal(
-      const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
-      const std::vector<MultiSourceAttributeId> &key_ids,
-      ValueAccessor *base_accessor,
-      ColumnVectorsValueAccessor *derived_accessor);
+  inline std::uint8_t* upsertCompositeKeyInternal(
+      const std::vector<TypedValue> &key);
+
+  template <typename AggFuncT, typename ArgT, typename ValueAccessorT>
+  inline bool upsertValueAccessorInternalUnaryAtomic(
+      const attribute_id argument_ids,
+      const attribute_id key_id,
+      const std::size_t state_offset,
+      ValueAccessorT *accessor);
+
+  template <typename AggFuncT, typename ArgT, typename ValueAccessorT>
+  inline bool upsertValueAccessorInternalUnaryLatch(
+      const attribute_id argument_ids,
+      const attribute_id key_id,
+      const std::size_t state_offset,
+      ValueAccessorT *accessor);
+
+  template <typename AggFuncT, typename ArgT, typename ValueAccessorT>
+  inline bool upsertValueAccessorCompositeKeyInternalUnaryAtomic(
+      const attribute_id argument_id,
+      const std::vector<attribute_id> key_ids,
+      const std::size_t state_offset,
+      ValueAccessorT *accessor);
+
+  template <typename AggFuncT, typename ArgT, typename ValueAccessorT>
+  inline bool upsertValueAccessorCompositeKeyInternalUnaryLatch(
+      const attribute_id argument_id,
+      const std::vector<attribute_id> key_ids,
+      const std::size_t state_offset,
+      ValueAccessorT *accessor);
+
+  inline void finalizeKey(const std::size_t start_position,
+                          const std::size_t end_position,
+                          const std::size_t key_idx,
+                          NativeColumnVector *output_cv);
+
+  template <typename AggFuncT, typename ArgT>
+  inline void finalizeStateAtomic(const std::size_t start_position,
+                                  const std::size_t end_position,
+                                  const std::size_t state_offset,
+                                  NativeColumnVector *results);
+
+  template <typename AggFuncT, typename ArgT>
+  inline void finalizeStateLatch(const std::size_t start_position,
+                                 const std::size_t end_position,
+                                 const std::size_t state_offset,
+                                 NativeColumnVector *results);
 
   // Generate a hash for a composite key by hashing each component of 'key' and
   // mixing their bits with CombineHashes().
@@ -387,33 +438,62 @@
     key_inline_ = key_inline;
   }
 
-  inline static std::size_t ComputeTotalPayloadSize(
+  inline static std::vector<std::size_t> ComputeStateSizes(
       const std::vector<AggregationHandle *> &handles) {
-    std::size_t total_payload_size = sizeof(SpinMutex);
-    for (const auto *handle : handles) {
-      total_payload_size += handle->getPayloadSize();
+    std::vector<std::size_t> state_sizes;
+    for (std::size_t i = 0; i < handles.size(); ++i) {
+      const AggregationHandle *handle = handles[i];
+      InvokeOnAggFuncWithArgType(
+          handle->getAggregationID(),
+          *handle->getArgumentTypes().front(),
+          [&](const auto &agg_func, const auto &arg_type) {
+        using AggFuncT = std::remove_reference_t<decltype(agg_func)>;
+        using ArgT = remove_const_reference_t<decltype(arg_type)>;
+
+        if (FLAGS_use_latch) {
+          state_sizes.emplace_back(
+              sizeof(typename AggFuncT::template AggState<ArgT>::T));
+        } else {
+          state_sizes.emplace_back(
+              sizeof(typename AggFuncT::template AggState<ArgT>::AtomicT));
+        }
+      });
     }
-    return total_payload_size;
+    return state_sizes;
+  }
+
+  inline static std::size_t ComputeTotalPayloadSize(
+      const std::vector<std::size_t> &state_sizes) {
+    const std::size_t mutex_size =
+        FLAGS_use_latch ? sizeof(SpinMutex) : 0;
+    const std::size_t total_state_size =
+        std::accumulate(state_sizes.begin(), state_sizes.end(), 0);
+    return mutex_size + total_state_size;
+  }
+
+  inline static std::vector<std::size_t> ComputeStateOffsets(
+      const std::vector<std::size_t> &state_sizes) {
+    std::vector<std::size_t> state_offsets;
+    std::size_t state_offset =
+        FLAGS_use_latch ? sizeof(SpinMutex) : 0;
+    for (const std::size_t state_size : state_sizes) {
+      state_offsets.emplace_back(state_offset);
+      state_offset += state_size;
+    }
+    return state_offsets;
   }
 
   // Assign '*key_vector' with the attribute values specified by 'key_ids' at
   // the current position of 'accessor'. If 'check_for_null_keys' is true, stops
   // and returns true if any of the values is null, otherwise returns false.
-  template <bool use_two_accessors,
-            bool check_for_null_keys,
+  template <bool check_for_null_keys,
             typename ValueAccessorT>
   inline static bool GetCompositeKeyFromValueAccessor(
-      const std::vector<MultiSourceAttributeId> &key_ids,
+      const std::vector<attribute_id> &key_ids,
       const ValueAccessorT *accessor,
-      const ColumnVectorsValueAccessor *derived_accessor,
       std::vector<TypedValue> *key_vector) {
     for (std::size_t key_idx = 0; key_idx < key_ids.size(); ++key_idx) {
-      const MultiSourceAttributeId &key_id = key_ids[key_idx];
-      if (use_two_accessors && key_id.source == ValueAccessorSource::kDerived) {
-        (*key_vector)[key_idx] = derived_accessor->getTypedValue(key_id.attr_id);
-      } else {
-        (*key_vector)[key_idx] = accessor->getTypedValue(key_id.attr_id);
-      }
+      (*key_vector)[key_idx] = accessor->getTypedValue(key_ids[key_idx]);
       if (check_for_null_keys && (*key_vector)[key_idx].isNull()) {
         return true;
       }
@@ -441,9 +521,9 @@
   const std::size_t num_handles_;
   const std::vector<AggregationHandle *> handles_;
 
-  std::size_t total_payload_size_;
-  std::vector<std::size_t> payload_offsets_;
-  std::uint8_t *init_payload_;
+  const std::vector<std::size_t> state_sizes_;
+  const std::size_t total_payload_size_;
+  const std::vector<std::size_t> state_offsets_;
 
   StorageManager *storage_manager_;
   MutableBlobReference blob_;
@@ -471,8 +551,8 @@
     // Set finalization segment size as 4096 entries.
     constexpr std::size_t kFinalizeSegmentSize = 4uL * 1024L;
 
-    // At least 1 partition, at most 80 partitions.
-    return std::max(1uL, std::min(num_entries / kFinalizeSegmentSize, 80uL));
+    return std::max(static_cast<std::size_t>(FLAGS_num_workers),
+                    std::min(num_entries / kFinalizeSegmentSize, 80uL));
   }
 
   // Attempt to find an empty bucket to insert 'hash_code' into, starting after
@@ -488,7 +568,6 @@
   // deallocated after being allocated.
   inline bool locateBucketForInsertion(
       const std::size_t hash_code,
-      const std::size_t variable_key_allocation_required,
       void **bucket,
       std::atomic<std::size_t> **pending_chain_ptr,
       std::size_t *pending_chain_ptr_finish_value);
@@ -636,7 +715,6 @@
 
 inline bool PackedPayloadHashTable::locateBucketForInsertion(
     const std::size_t hash_code,
-    const std::size_t variable_key_allocation_required,
     void **bucket,
     std::atomic<std::size_t> **pending_chain_ptr,
     std::size_t *pending_chain_ptr_finish_value) {
@@ -652,17 +730,6 @@
                                       std::numeric_limits<std::size_t>::max(),
                                       std::memory_order_acq_rel)) {
       // Got to the end of the chain. Allocate a new bucket.
-
-      // First, allocate variable-length key storage, if needed (i.e. if this
-      // is an upsert and we didn't allocate up-front).
-      if (!key_manager_.allocateVariableLengthKeyStorage(
-              variable_key_allocation_required)) {
-        // Ran out of variable-length storage.
-        (*pending_chain_ptr)->store(0, std::memory_order_release);
-        *bucket = nullptr;
-        return false;
-      }
-
       const std::size_t allocated_bucket_num =
           header_->buckets_allocated.fetch_add(1, std::memory_order_relaxed);
       if (allocated_bucket_num >= header_->num_buckets) {
@@ -730,27 +797,7 @@
 inline const std::uint8_t* PackedPayloadHashTable::getSingleCompositeKey(
     const std::vector<TypedValue> &key,
     const std::size_t index) const {
-  DEBUG_ASSERT(this->key_types_.size() == key.size());
-
-  const std::size_t hash_code = this->hashCompositeKey(key);
-  std::size_t bucket_ref =
-      slots_[hash_code % header_->num_slots].load(std::memory_order_relaxed);
-  while (bucket_ref != 0) {
-    DEBUG_ASSERT(bucket_ref != std::numeric_limits<std::size_t>::max());
-    const char *bucket =
-        static_cast<const char *>(buckets_) + (bucket_ref - 1) * bucket_size_;
-    const std::size_t bucket_hash = *reinterpret_cast<const std::size_t *>(
-        bucket + sizeof(std::atomic<std::size_t>));
-    if ((bucket_hash == hash_code) &&
-        key_manager_.compositeKeyCollisionCheck(key, bucket)) {
-      // Match located.
-      return reinterpret_cast<const std::uint8_t *>(bucket + kValueOffset) +
-             this->payload_offsets_[index];
-    }
-    bucket_ref =
-        reinterpret_cast<const std::atomic<std::size_t> *>(bucket)->load(
-            std::memory_order_relaxed);
-  }
+  LOG(FATAL) << "Not implemented";
 
   // Reached the end of the chain and didn't find a match.
   return nullptr;
@@ -759,24 +806,9 @@
 inline bool PackedPayloadHashTable::upsertCompositeKey(
     const std::vector<TypedValue> &key,
     const std::uint8_t *source_state) {
-  const std::size_t variable_size =
-      calculateVariableLengthCompositeKeyCopySize(key);
-  for (;;) {
-    {
-      SpinSharedMutexSharedLock<true> resize_lock(resize_shared_mutex_);
-      std::uint8_t *value =
-          upsertCompositeKeyInternal(key, variable_size);
-      if (value != nullptr) {
-        SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value)));
-        for (unsigned int k = 0; k < num_handles_; ++k) {
-          handles_[k]->mergeStates(source_state + payload_offsets_[k],
-                                   value + payload_offsets_[k]);
-        }
-        return true;
-      }
-    }
-    resize(0, variable_size);
-  }
+  LOG(FATAL) << "Not implemented";
+
+  return true;
 }
 
 template <typename FunctorT>
@@ -784,48 +816,56 @@
     const std::vector<TypedValue> &key,
     FunctorT *functor,
     const std::size_t index) {
-  const std::size_t variable_size =
-      calculateVariableLengthCompositeKeyCopySize(key);
-  for (;;) {
-    {
-      SpinSharedMutexSharedLock<true> resize_lock(resize_shared_mutex_);
-      std::uint8_t *value =
-          upsertCompositeKeyInternal(key, variable_size);
-      if (value != nullptr) {
-        (*functor)(value + payload_offsets_[index]);
-        return true;
-      }
-    }
-    resize(0, variable_size);
-  }
+  LOG(FATAL) << "Not implemented";
+
+  return true;
 }
 
-
-inline std::uint8_t* PackedPayloadHashTable::upsertCompositeKeyInternal(
-    const std::vector<TypedValue> &key,
-    const std::size_t variable_key_size) {
-  if (variable_key_size > 0) {
-    // Don't allocate yet, since the key may already be present. However, we
-    // do check if either the allocated variable storage space OR the free
-    // space is big enough to hold the key (at least one must be true: either
-    // the key is already present and allocated, or we need to be able to
-    // allocate enough space for it).
-    std::size_t allocated_bytes = header_->variable_length_bytes_allocated.load(
-        std::memory_order_relaxed);
-    if ((allocated_bytes < variable_key_size) &&
-        (allocated_bytes + variable_key_size >
-         key_manager_.getVariableLengthKeyStorageSize())) {
+inline std::uint8_t* PackedPayloadHashTable::upsertInternal(
+    const TypedValue &key) {
+  const std::size_t hash_code = key.getHash();
+  void *bucket = nullptr;
+  std::atomic<std::size_t> *pending_chain_ptr;
+  std::size_t pending_chain_ptr_finish_value;
+  for (;;) {
+    if (locateBucketForInsertion(hash_code,
+                                 &bucket,
+                                 &pending_chain_ptr,
+                                 &pending_chain_ptr_finish_value)) {
+      // Found an empty bucket.
+      break;
+    } else if (bucket == nullptr) {
+      // Ran out of buckets or variable-key space.
       return nullptr;
+    } else if (key_manager_.scalarKeyCollisionCheck(key, bucket)) {
+      // Found an already-existing entry for this key.
+      return reinterpret_cast<std::uint8_t *>(static_cast<char *>(bucket) +
+                                              kValueOffset);
     }
   }
 
+  // We are now writing to an empty bucket.
+  // Write the key and hash.
+  writeScalarKeyToBucket(key, hash_code, bucket);
+
+  std::uint8_t *value = static_cast<unsigned char *>(bucket) + kValueOffset;
+//  std::memcpy(value, init_payload_, this->total_payload_size_);
+
+  // Update the previous chain pointer to point to the new bucket.
+  pending_chain_ptr->store(pending_chain_ptr_finish_value, std::memory_order_release);
+
+  // Return the value.
+  return value;
+}
+
+inline std::uint8_t* PackedPayloadHashTable::upsertCompositeKeyInternal(
+    const std::vector<TypedValue> &key) {
   const std::size_t hash_code = this->hashCompositeKey(key);
   void *bucket = nullptr;
   std::atomic<std::size_t> *pending_chain_ptr;
   std::size_t pending_chain_ptr_finish_value;
   for (;;) {
     if (locateBucketForInsertion(hash_code,
-                                 variable_key_size,
                                  &bucket,
                                  &pending_chain_ptr,
                                  &pending_chain_ptr_finish_value)) {
@@ -846,7 +886,8 @@
   writeCompositeKeyToBucket(key, hash_code, bucket);
 
   std::uint8_t *value = static_cast<unsigned char *>(bucket) + kValueOffset;
-  std::memcpy(value, init_payload_, this->total_payload_size_);
+//  std::memcpy(value, init_payload_, this->total_payload_size_);
+  std::memset(value, 0, this->total_payload_size_);
 
   // Update the previous chaing pointer to point to the new bucket.
   pending_chain_ptr->store(pending_chain_ptr_finish_value,
@@ -856,72 +897,165 @@
   return value;
 }
 
-template <bool use_two_accessors>
-inline bool PackedPayloadHashTable::upsertValueAccessorCompositeKeyInternal(
-    const std::vector<std::vector<MultiSourceAttributeId>> &argument_ids,
-    const std::vector<MultiSourceAttributeId> &key_ids,
-    ValueAccessor *base_accessor,
-    ColumnVectorsValueAccessor *derived_accessor) {
-  std::size_t variable_size;
+template <typename AggFuncT, typename ArgT, typename ValueAccessorT>
+inline bool PackedPayloadHashTable
+    ::upsertValueAccessorInternalUnaryAtomic(
+         const attribute_id argument_id,
+         const attribute_id key_id,
+         const std::size_t state_offset,
+         ValueAccessorT *accessor) {
+  using StateT = typename AggFuncT::template AggState<ArgT>;
+
+  accessor->beginIteration();
+  while (accessor->next()) {
+    TypedValue key = accessor->getTypedValue(key_id);
+    std::uint8_t *payload = this->upsertInternal(key);
+    const auto *argument = static_cast<const typename ArgT::cpptype *>(
+        accessor->template getUntypedValue<false>(argument_id));
+    auto *state =
+        reinterpret_cast<typename StateT::AtomicT *>(payload + state_offset);
+
+    AggFuncT::template MergeArgAtomic<ArgT>(*argument, state);
+  }
+  return true;
+}
+
+template <typename AggFuncT, typename ArgT, typename ValueAccessorT>
+inline bool PackedPayloadHashTable
+    ::upsertValueAccessorInternalUnaryLatch(
+         const attribute_id argument_id,
+         const attribute_id key_id,
+         const std::size_t state_offset,
+         ValueAccessorT *accessor) {
+  using StateT = typename AggFuncT::template AggState<ArgT>;
+
+  accessor->beginIteration();
+  while (accessor->next()) {
+    TypedValue key = accessor->getTypedValue(key_id);
+    std::uint8_t *payload = this->upsertInternal(key);
+    const auto *argument = static_cast<const typename ArgT::cpptype *>(
+        accessor->template getUntypedValue<false>(argument_id));
+    auto *state =
+        reinterpret_cast<typename StateT::T *>(payload + state_offset);
+
+    SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(payload)));
+    AggFuncT::template MergeArgUnsafe<ArgT>(*argument, state);
+  }
+  return true;
+}
+
+template <typename AggFuncT, typename ArgT, typename ValueAccessorT>
+inline bool PackedPayloadHashTable::
+    upsertValueAccessorCompositeKeyInternalUnaryAtomic(
+        const attribute_id argument_id,
+        const std::vector<attribute_id> key_ids,
+        const std::size_t state_offset,
+        ValueAccessorT *accessor) {
+  using StateT = typename AggFuncT::template AggState<ArgT>;
+
   std::vector<TypedValue> key_vector;
   key_vector.resize(key_ids.size());
 
-  return InvokeOnAnyValueAccessor(
-      base_accessor,
-      [&](auto *accessor) -> bool {  // NOLINT(build/c++11)
-    bool continuing = true;
-    while (continuing) {
-      {
-        continuing = false;
-        SpinSharedMutexSharedLock<true> lock(resize_shared_mutex_);
-        while (accessor->next()) {
-          if (use_two_accessors) {
-            derived_accessor->next();
-          }
-          if (this->GetCompositeKeyFromValueAccessor<use_two_accessors, true>(
-                  key_ids,
-                  accessor,
-                  derived_accessor,
-                  &key_vector)) {
-            continue;
-          }
-          variable_size = this->calculateVariableLengthCompositeKeyCopySize(key_vector);
-          std::uint8_t *value = this->upsertCompositeKeyInternal(
-              key_vector, variable_size);
-          if (value == nullptr) {
-            continuing = true;
-            break;
-          } else {
-            SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(value)));
-            for (unsigned int k = 0; k < num_handles_; ++k) {
-              const auto &ids = argument_ids[k];
-              if (ids.empty()) {
-                handles_[k]->updateStateNullary(value + payload_offsets_[k]);
-              } else {
-                const MultiSourceAttributeId &arg_id = ids.front();
-                if (use_two_accessors && arg_id.source == ValueAccessorSource::kDerived) {
-                  DCHECK_NE(arg_id.attr_id, kInvalidAttributeID);
-                  handles_[k]->updateStateUnary(derived_accessor->getTypedValue(arg_id.attr_id),
-                                                value + payload_offsets_[k]);
-                } else {
-                  handles_[k]->updateStateUnary(accessor->getTypedValue(arg_id.attr_id),
-                                                value + payload_offsets_[k]);
-                }
-              }
-            }
-          }
-        }
-      }
-      if (continuing) {
-        this->resize(0, variable_size);
-        accessor->previous();
-        if (use_two_accessors) {
-          derived_accessor->previous();
-        }
-      }
-    }
-    return true;
-  });
+  accessor->beginIteration();
+  while (accessor->next()) {
+    this->GetCompositeKeyFromValueAccessor<false>(key_ids,
+                                                  accessor,
+                                                  &key_vector);
+    std::uint8_t *payload = this->upsertCompositeKeyInternal(key_vector);
+    const auto *argument = static_cast<const typename ArgT::cpptype *>(
+        accessor->template getUntypedValue<false>(argument_id));
+    auto *state =
+        reinterpret_cast<typename StateT::AtomicT *>(payload + state_offset);
+
+    AggFuncT::template MergeArgAtomic<ArgT>(*argument, state);
+  }
+  return true;
+}
+
+template <typename AggFuncT, typename ArgT, typename ValueAccessorT>
+inline bool PackedPayloadHashTable::
+    upsertValueAccessorCompositeKeyInternalUnaryLatch(
+        const attribute_id argument_id,
+        const std::vector<attribute_id> key_ids,
+        const std::size_t state_offset,
+        ValueAccessorT *accessor) {
+  using StateT = typename AggFuncT::template AggState<ArgT>;
+
+  std::vector<TypedValue> key_vector;
+  key_vector.resize(key_ids.size());
+
+  accessor->beginIteration();
+  while (accessor->next()) {
+    this->GetCompositeKeyFromValueAccessor<false>(key_ids,
+                                                  accessor,
+                                                  &key_vector);
+    std::uint8_t *payload = this->upsertCompositeKeyInternal(key_vector);
+    const auto *argument = static_cast<const typename ArgT::cpptype *>(
+        accessor->template getUntypedValue<false>(argument_id));
+    auto *state =
+        reinterpret_cast<typename StateT::T *>(payload + state_offset);
+
+    SpinMutexLock lock(*(reinterpret_cast<SpinMutex *>(payload)));
+    AggFuncT::template MergeArgUnsafe<ArgT>(*argument, state);
+  }
+  return true;
+}
+
+inline void PackedPayloadHashTable
+    ::finalizeKey(const std::size_t start_position,
+                  const std::size_t end_position,
+                  const std::size_t key_idx,
+                  NativeColumnVector *output_cv) {
+  for (std::size_t i = start_position; i < end_position; ++i) {
+    const char *bucket =
+        static_cast<const char *>(buckets_) + i * bucket_size_;
+    output_cv->appendTypedValue(
+        key_manager_.getKeyComponentTyped(bucket, key_idx));
+  }
+}
+
+template <typename AggFuncT, typename ArgT>
+inline void PackedPayloadHashTable
+    ::finalizeStateAtomic(const std::size_t start_position,
+                          const std::size_t end_position,
+                          const std::size_t state_offset,
+                          NativeColumnVector *output_cv) {
+  using StateT = typename AggFuncT::template AggState<ArgT>;
+  using ResultT = typename StateT::ResultT;
+
+  const std::size_t offset = kValueOffset + state_offset;
+  for (std::size_t i = start_position; i < end_position; ++i) {
+    const char *bucket =
+        static_cast<const char *>(buckets_) + i * bucket_size_;
+    const auto *state =
+        reinterpret_cast<const typename StateT::AtomicT *>(bucket + offset);
+
+    AggFuncT::template FinalizeAtomic<ArgT>(
+        *state,
+        static_cast<ResultT *>(output_cv->getPtrForDirectWrite()));
+  }
+}
+
+template <typename AggFuncT, typename ArgT>
+inline void PackedPayloadHashTable
+    ::finalizeStateLatch(const std::size_t start_position,
+                         const std::size_t end_position,
+                         const std::size_t state_offset,
+                         NativeColumnVector *output_cv) {
+  using StateT = typename AggFuncT::template AggState<ArgT>;
+  using ResultT = typename StateT::ResultT;
+
+  const std::size_t offset = kValueOffset + state_offset;
+  for (std::size_t i = start_position; i < end_position; ++i) {
+    const char *bucket =
+        static_cast<const char *>(buckets_) + i * bucket_size_;
+    const auto *state =
+        reinterpret_cast<const typename StateT::T *>(bucket + offset);
+
+    AggFuncT::template FinalizeUnsafe<ArgT>(
+        *state,
+        static_cast<ResultT *>(output_cv->getPtrForDirectWrite()));
+  }
 }
 
 inline void PackedPayloadHashTable::writeScalarKeyToBucket(
@@ -990,7 +1124,7 @@
   const std::uint8_t *value_ptr;
   while (getNextEntry(&key, &value_ptr, &entry_num)) {
     ++entries_visited;
-    (*functor)(key, value_ptr + payload_offsets_[index]);
+    (*functor)(key, value_ptr + state_offsets_[index]);
     key.clear();
   }
   return entries_visited;
@@ -1044,7 +1178,7 @@
   const std::uint8_t *value_ptr;
   while (getNextEntryCompositeKey(&key, &value_ptr, &entry_num)) {
     ++entries_visited;
-    (*functor)(key, value_ptr + payload_offsets_[index]);
+    (*functor)(key, value_ptr + state_offsets_[index]);
     key.clear();
   }
   return entries_visited;