Added Vector Aggregation support in the distributed version.
diff --git a/relational_operators/FinalizeAggregationOperator.cpp b/relational_operators/FinalizeAggregationOperator.cpp
index 68d0ef4..92fc7f6 100644
--- a/relational_operators/FinalizeAggregationOperator.cpp
+++ b/relational_operators/FinalizeAggregationOperator.cpp
@@ -67,28 +67,29 @@
return true;
}
-// TODO(quickstep-team) : Think about how the number of partitions could be
-// accessed in this function. Until then, we can't use partitioned aggregation
-// finalization with the distributed version.
bool FinalizeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
if (started_) {
return true;
}
for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
- serialization::WorkOrder *proto = new serialization::WorkOrder;
- proto->set_work_order_type(serialization::FINALIZE_AGGREGATION);
- proto->set_query_id(query_id_);
- proto->SetExtension(serialization::FinalizeAggregationWorkOrder::aggr_state_index,
- aggr_state_index_);
- proto->SetExtension(serialization::FinalizeAggregationWorkOrder::partition_id,
- part_id);
- proto->SetExtension(serialization::FinalizeAggregationWorkOrder::state_partition_id,
- 0u);
- proto->SetExtension(serialization::FinalizeAggregationWorkOrder::insert_destination_index,
- output_destination_index_);
+ for (std::size_t state_part_id = 0;
+ state_part_id < aggr_state_num_partitions_;
+ ++state_part_id) {
+ serialization::WorkOrder *proto = new serialization::WorkOrder;
+ proto->set_work_order_type(serialization::FINALIZE_AGGREGATION);
+ proto->set_query_id(query_id_);
+ proto->SetExtension(serialization::FinalizeAggregationWorkOrder::aggr_state_index,
+ aggr_state_index_);
+ proto->SetExtension(serialization::FinalizeAggregationWorkOrder::partition_id,
+ part_id);
+ proto->SetExtension(serialization::FinalizeAggregationWorkOrder::state_partition_id,
+ state_part_id);
+ proto->SetExtension(serialization::FinalizeAggregationWorkOrder::insert_destination_index,
+ output_destination_index_);
- container->addWorkOrderProto(proto, op_index_);
+ container->addWorkOrderProto(proto, op_index_);
+ }
}
started_ = true;
diff --git a/relational_operators/InitializeAggregationOperator.cpp b/relational_operators/InitializeAggregationOperator.cpp
index 39a6fb4..89dfd7e 100644
--- a/relational_operators/InitializeAggregationOperator.cpp
+++ b/relational_operators/InitializeAggregationOperator.cpp
@@ -64,26 +64,25 @@
return true;
}
-// TODO(quickstep-team) : Think about how the number of partitions could be
-// accessed in this function. Until then, we can't use partitioned aggregation
-// initialization with the distributed version.
bool InitializeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
- LOG(FATAL) << "Not supported";
-
if (started_) {
return true;
}
for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
- serialization::WorkOrder *proto = new serialization::WorkOrder;
- proto->set_work_order_type(serialization::INITIALIZE_AGGREGATION);
- proto->set_query_id(query_id_);
+ for (std::size_t state_part_id = 0;
+ state_part_id < aggr_state_num_init_partitions_;
+ ++state_part_id) {
+ serialization::WorkOrder *proto = new serialization::WorkOrder;
+ proto->set_work_order_type(serialization::INITIALIZE_AGGREGATION);
+ proto->set_query_id(query_id_);
- proto->SetExtension(serialization::InitializeAggregationWorkOrder::aggr_state_index, aggr_state_index_);
- proto->SetExtension(serialization::InitializeAggregationWorkOrder::partition_id, part_id);
- proto->SetExtension(serialization::InitializeAggregationWorkOrder::state_partition_id, 0u);
+ proto->SetExtension(serialization::InitializeAggregationWorkOrder::aggr_state_index, aggr_state_index_);
+ proto->SetExtension(serialization::InitializeAggregationWorkOrder::partition_id, part_id);
+ proto->SetExtension(serialization::InitializeAggregationWorkOrder::state_partition_id, state_part_id);
- container->addWorkOrderProto(proto, op_index_);
+ container->addWorkOrderProto(proto, op_index_);
+ }
}
started_ = true;
return true;
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index 25cc81a..3a991bd 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -237,8 +237,6 @@
LOG(INFO) << "Creating FinalizeAggregationWorkOrder (Partition " << part_id << ") for Query " << query_id
<< " in Shiftboss " << shiftboss_index;
- // TODO(quickstep-team): Handle inner-table partitioning in the distributed
- // setting.
return new FinalizeAggregationWorkOrder(
query_id,
part_id,