Fixed the distributed version due to query execution engine simplification.
diff --git a/query_execution/QueryManagerDistributed.cpp b/query_execution/QueryManagerDistributed.cpp
index 30a1396..97b451f 100644
--- a/query_execution/QueryManagerDistributed.cpp
+++ b/query_execution/QueryManagerDistributed.cpp
@@ -70,8 +70,11 @@
// Collect all the workorders from all the non-blocking relational operators in the DAG.
for (const dag_node_index index : non_dependent_operators_) {
if (!fetchNormalWorkOrders(index)) {
- DCHECK(!checkRebuildRequired(index) || initiateRebuild(index));
- markOperatorFinished(index);
+ if (checkRebuildRequired(index)) {
+ initiateRebuild(index);
+ } else {
+ markOperatorFinished(index);
+ }
}
}
@@ -201,21 +204,12 @@
const std::size_t shiftboss_index) {
query_exec_state_->updateRebuildStatus(op_index, num_rebuild_work_orders, shiftboss_index);
- if (!query_exec_state_->hasRebuildFinished(op_index, num_shiftbosses_)) {
- // Wait for the rebuild work orders to finish.
- return;
+ if (query_exec_state_->hasRebuildFinished(op_index, num_shiftbosses_)) {
+ // No needs for rebuilds, or the rebuild has finished.
+ markOperatorFinished(op_index);
}
- // No needs for rebuilds, or the rebuild has finished.
- markOperatorFinished(op_index);
-
- for (const std::pair<dag_node_index, bool> &dependent_link :
- query_dag_->getDependents(op_index)) {
- const dag_node_index dependent_op_index = dependent_link.first;
- if (checkAllBlockingDependenciesMet(dependent_op_index)) {
- fetchNormalWorkOrders(dependent_op_index);
- }
- }
+ // Wait for the rebuild work orders to finish.
}
bool QueryManagerDistributed::initiateRebuild(const dag_node_index index) {
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index 5baa21b..25cc81a 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -741,7 +741,7 @@
proto.GetExtension(serialization::DeleteWorkOrder::predicate_index)) &&
proto.HasExtension(serialization::DeleteWorkOrder::block_id) &&
proto.HasExtension(serialization::DeleteWorkOrder::operator_index) &&
- proto.GetExtension(serialization::DeleteWorkOrder::partition_id);
+ proto.HasExtension(serialization::DeleteWorkOrder::partition_id);
}
case serialization::DESTROY_AGGREGATION_STATE: {
return proto.HasExtension(serialization::DestroyAggregationStateWorkOrder::aggr_state_index) &&
@@ -1033,7 +1033,7 @@
proto.GetExtension(serialization::UpdateWorkOrder::update_group_index)) &&
proto.HasExtension(serialization::UpdateWorkOrder::operator_index) &&
proto.HasExtension(serialization::UpdateWorkOrder::block_id) &&
- proto.GetExtension(serialization::UpdateWorkOrder::partition_id);
+ proto.HasExtension(serialization::UpdateWorkOrder::partition_id);
}
case serialization::WINDOW_AGGREGATION: {
return proto.HasExtension(serialization::WindowAggregationWorkOrder::window_aggr_state_index) &&