Added Vector Aggregation support in the distributed version.
diff --git a/relational_operators/FinalizeAggregationOperator.cpp b/relational_operators/FinalizeAggregationOperator.cpp
index 68d0ef4..92fc7f6 100644
--- a/relational_operators/FinalizeAggregationOperator.cpp
+++ b/relational_operators/FinalizeAggregationOperator.cpp
@@ -67,28 +67,29 @@
   return true;
 }
 
-// TODO(quickstep-team) : Think about how the number of partitions could be
-// accessed in this function. Until then, we can't use partitioned aggregation
-// finalization with the distributed version.
 bool FinalizeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
   if (started_) {
     return true;
   }
 
   for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
-    serialization::WorkOrder *proto = new serialization::WorkOrder;
-    proto->set_work_order_type(serialization::FINALIZE_AGGREGATION);
-    proto->set_query_id(query_id_);
-    proto->SetExtension(serialization::FinalizeAggregationWorkOrder::aggr_state_index,
-                        aggr_state_index_);
-    proto->SetExtension(serialization::FinalizeAggregationWorkOrder::partition_id,
-                        part_id);
-    proto->SetExtension(serialization::FinalizeAggregationWorkOrder::state_partition_id,
-                        0u);
-    proto->SetExtension(serialization::FinalizeAggregationWorkOrder::insert_destination_index,
-                        output_destination_index_);
+    for (std::size_t state_part_id = 0;
+         state_part_id < aggr_state_num_partitions_;
+         ++state_part_id) {
+      serialization::WorkOrder *proto = new serialization::WorkOrder;
+      proto->set_work_order_type(serialization::FINALIZE_AGGREGATION);
+      proto->set_query_id(query_id_);
+      proto->SetExtension(serialization::FinalizeAggregationWorkOrder::aggr_state_index,
+                          aggr_state_index_);
+      proto->SetExtension(serialization::FinalizeAggregationWorkOrder::partition_id,
+                          part_id);
+      proto->SetExtension(serialization::FinalizeAggregationWorkOrder::state_partition_id,
+                          state_part_id);
+      proto->SetExtension(serialization::FinalizeAggregationWorkOrder::insert_destination_index,
+                          output_destination_index_);
 
-    container->addWorkOrderProto(proto, op_index_);
+      container->addWorkOrderProto(proto, op_index_);
+    }
   }
 
   started_ = true;
diff --git a/relational_operators/InitializeAggregationOperator.cpp b/relational_operators/InitializeAggregationOperator.cpp
index 39a6fb4..89dfd7e 100644
--- a/relational_operators/InitializeAggregationOperator.cpp
+++ b/relational_operators/InitializeAggregationOperator.cpp
@@ -64,26 +64,25 @@
   return true;
 }
 
-// TODO(quickstep-team) : Think about how the number of partitions could be
-// accessed in this function. Until then, we can't use partitioned aggregation
-// initialization with the distributed version.
 bool InitializeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
-  LOG(FATAL) << "Not supported";
-
   if (started_) {
     return true;
   }
 
   for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
-    serialization::WorkOrder *proto = new serialization::WorkOrder;
-    proto->set_work_order_type(serialization::INITIALIZE_AGGREGATION);
-    proto->set_query_id(query_id_);
+    for (std::size_t state_part_id = 0;
+         state_part_id < aggr_state_num_init_partitions_;
+         ++state_part_id) {
+      serialization::WorkOrder *proto = new serialization::WorkOrder;
+      proto->set_work_order_type(serialization::INITIALIZE_AGGREGATION);
+      proto->set_query_id(query_id_);
 
-    proto->SetExtension(serialization::InitializeAggregationWorkOrder::aggr_state_index, aggr_state_index_);
-    proto->SetExtension(serialization::InitializeAggregationWorkOrder::partition_id, part_id);
-    proto->SetExtension(serialization::InitializeAggregationWorkOrder::state_partition_id, 0u);
+      proto->SetExtension(serialization::InitializeAggregationWorkOrder::aggr_state_index, aggr_state_index_);
+      proto->SetExtension(serialization::InitializeAggregationWorkOrder::partition_id, part_id);
+      proto->SetExtension(serialization::InitializeAggregationWorkOrder::state_partition_id, state_part_id);
 
-    container->addWorkOrderProto(proto, op_index_);
+      container->addWorkOrderProto(proto, op_index_);
+    }
   }
   started_ = true;
   return true;
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index 25cc81a..3a991bd 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -237,8 +237,6 @@
 
       LOG(INFO) << "Creating FinalizeAggregationWorkOrder (Partition " << part_id << ") for Query " << query_id
                 << " in Shiftboss " << shiftboss_index;
-      // TODO(quickstep-team): Handle inner-table partitioning in the distributed
-      // setting.
       return new FinalizeAggregationWorkOrder(
           query_id,
           part_id,