Minor changes in QueryManager and QueryExecutionState.
diff --git a/query_execution/QueryExecutionState.hpp b/query_execution/QueryExecutionState.hpp
index a4273dc..0f98964 100644
--- a/query_execution/QueryExecutionState.hpp
+++ b/query_execution/QueryExecutionState.hpp
@@ -99,14 +99,10 @@
const bool rebuild_initiated) {
DCHECK(operator_index < num_operators_);
auto search_res = rebuild_status_.find(operator_index);
- if (search_res != rebuild_status_.end()) {
- search_res->second.has_initiated = rebuild_initiated;
- search_res->second.num_pending_workorders = num_rebuild_workorders;
- } else {
- RebuildStatus rebuild_status(rebuild_initiated, num_rebuild_workorders);
+ DCHECK(search_res != rebuild_status_.end());
- rebuild_status_.emplace(operator_index, std::move(rebuild_status));
- }
+ search_res->second.has_initiated = rebuild_initiated;
+ search_res->second.num_pending_workorders = num_rebuild_workorders;
}
#ifdef QUICKSTEP_DISTRIBUTED
@@ -272,6 +268,8 @@
inline void setRebuildRequired(const std::size_t operator_index) {
DCHECK(operator_index < num_operators_);
rebuild_required_[operator_index] = true;
+
+ rebuild_status_.emplace(operator_index, RebuildStatus());
}
/**
@@ -353,10 +351,9 @@
std::vector<bool> execution_finished_;
struct RebuildStatus {
- RebuildStatus(const bool initiated,
- const std::size_t num_workorders)
- : has_initiated(initiated),
- num_pending_workorders(num_workorders) {}
+ RebuildStatus()
+ : has_initiated(false),
+ num_pending_workorders(0) {}
// Whether rebuild for operator at index i has been initiated.
bool has_initiated;
diff --git a/query_execution/QueryManagerBase.cpp b/query_execution/QueryManagerBase.cpp
index 14c9ba5..f353b64 100644
--- a/query_execution/QueryManagerBase.cpp
+++ b/query_execution/QueryManagerBase.cpp
@@ -64,7 +64,6 @@
if (insert_destination_index != QueryContext::kInvalidInsertDestinationId) {
// Rebuild is necessary whenever InsertDestination is present.
query_exec_state_->setRebuildRequired(node_index);
- query_exec_state_->setRebuildStatus(node_index, 0, false);
}
for (const pair<dag_node_index, bool> &dependent_link :
diff --git a/query_execution/QueryManagerSingleNode.cpp b/query_execution/QueryManagerSingleNode.cpp
index 001faa8..82a0de6 100644
--- a/query_execution/QueryManagerSingleNode.cpp
+++ b/query_execution/QueryManagerSingleNode.cpp
@@ -126,22 +126,20 @@
DCHECK(checkRebuildRequired(index));
DCHECK(!checkRebuildInitiated(index));
- getRebuildWorkOrders(index, workorders_container_.get());
+ const std::size_t num_rebuild_work_orders = getRebuildWorkOrders(index, workorders_container_.get());
+ DCHECK_EQ(workorders_container_->getNumRebuildWorkOrders(index), num_rebuild_work_orders);
query_exec_state_->setRebuildStatus(
- index, workorders_container_->getNumRebuildWorkOrders(index), true);
+ index, num_rebuild_work_orders, true);
- return (query_exec_state_->getNumRebuildWorkOrders(index) == 0);
+ return num_rebuild_work_orders == 0;
}
-void QueryManagerSingleNode::getRebuildWorkOrders(const dag_node_index index,
- WorkOrdersContainer *container) {
+std::size_t QueryManagerSingleNode::getRebuildWorkOrders(const dag_node_index index,
+ WorkOrdersContainer *container) {
const RelationalOperator &op = query_dag_->getNodePayload(index);
const QueryContext::insert_destination_id insert_destination_index = op.getInsertDestinationID();
-
- if (insert_destination_index == QueryContext::kInvalidInsertDestinationId) {
- return;
- }
+ DCHECK_NE(insert_destination_index, QueryContext::kInvalidInsertDestinationId);
std::vector<MutableBlockReference> partially_filled_block_refs;
std::vector<partition_id> part_ids;
@@ -165,6 +163,8 @@
bus_),
index);
}
+
+ return partially_filled_block_refs.size();
}
std::size_t QueryManagerSingleNode::getQueryMemoryConsumptionBytes() const {
diff --git a/query_execution/QueryManagerSingleNode.hpp b/query_execution/QueryManagerSingleNode.hpp
index 6c5e38e..f9d038b 100644
--- a/query_execution/QueryManagerSingleNode.hpp
+++ b/query_execution/QueryManagerSingleNode.hpp
@@ -122,9 +122,11 @@
* @param index The index of the operator in the query plan DAG.
* @param container A pointer to a WorkOrdersContainer to be used to store the
* generated WorkOrders.
+ *
+ * @return The number of generated rebuild work orders.
**/
- void getRebuildWorkOrders(const dag_node_index index,
- WorkOrdersContainer *container);
+ std::size_t getRebuildWorkOrders(const dag_node_index index,
+ WorkOrdersContainer *container);
/**
* @brief Get the total memory (in bytes) occupied by temporary relations