Removed an unnecessary API in RelationalOperator.
diff --git a/query_execution/QueryManagerBase.cpp b/query_execution/QueryManagerBase.cpp
index f84ad4e..14c9ba5 100644
--- a/query_execution/QueryManagerBase.cpp
+++ b/query_execution/QueryManagerBase.cpp
@@ -70,14 +70,14 @@
for (const pair<dag_node_index, bool> &dependent_link :
query_dag_->getDependents(node_index)) {
const dag_node_index dependent_op_index = dependent_link.first;
- if (!query_dag_->getLinkMetadata(node_index, dependent_op_index)) {
- // The link is not a pipeline-breaker. Streaming of blocks is possible
- // between these two operators.
- output_consumers_[node_index].push_back(dependent_op_index);
- } else {
+ if (query_dag_->getLinkMetadata(node_index, dependent_op_index)) {
// The link is a pipeline-breaker. Streaming of blocks is not possible
// between these two operators.
blocking_dependencies_[dependent_op_index].push_back(node_index);
+ } else {
+ // The link is not a pipeline-breaker. Streaming of blocks is possible
+ // between these two operators.
+ output_consumers_[node_index].push_back(dependent_op_index);
}
}
}
@@ -231,9 +231,6 @@
if (output_rel >= 0) {
dependent_op->doneFeedingInputBlocks(output_rel);
}
- if (checkAllBlockingDependenciesMet(dependent_op_index)) {
- dependent_op->informAllBlockingDependenciesMet();
- }
}
}
diff --git a/query_execution/QueryManagerDistributed.cpp b/query_execution/QueryManagerDistributed.cpp
index a248391..1144e9f 100644
--- a/query_execution/QueryManagerDistributed.cpp
+++ b/query_execution/QueryManagerDistributed.cpp
@@ -70,7 +70,6 @@
// Collect all the workorders from all the relational operators in the DAG.
for (dag_node_index index = 0; index < num_operators_in_dag_; ++index) {
if (checkAllBlockingDependenciesMet(index)) {
- query_dag_->getNodePayloadMutable(index)->informAllBlockingDependenciesMet();
processOperator(index, false);
}
}
diff --git a/query_execution/QueryManagerSingleNode.cpp b/query_execution/QueryManagerSingleNode.cpp
index f33a501..001faa8 100644
--- a/query_execution/QueryManagerSingleNode.cpp
+++ b/query_execution/QueryManagerSingleNode.cpp
@@ -64,7 +64,6 @@
// Collect all the workorders from all the relational operators in the DAG.
for (dag_node_index index = 0; index < num_operators_in_dag_; ++index) {
if (checkAllBlockingDependenciesMet(index)) {
- query_dag_->getNodePayloadMutable(index)->informAllBlockingDependenciesMet();
processOperator(index, false);
}
}
diff --git a/query_execution/tests/QueryManagerSingleNode_unittest.cpp b/query_execution/tests/QueryManagerSingleNode_unittest.cpp
index 9183d32..c65364c 100644
--- a/query_execution/tests/QueryManagerSingleNode_unittest.cpp
+++ b/query_execution/tests/QueryManagerSingleNode_unittest.cpp
@@ -134,11 +134,6 @@
}
}
- inline bool getBlockingDependenciesMet() const {
- MOCK_OP_LOG(3) << "met.";
- return blocking_dependencies_met_;
- }
-
void setInsertDestinationID(const QueryContext::insert_destination_id insert_destination_index) {
insert_destination_index_ = insert_destination_index;
}
@@ -165,7 +160,7 @@
++num_workorders_generated_;
}
} else {
- if (blocking_dependencies_met_ && (num_workorders_generated_ < max_workorders_)) {
+ if (num_workorders_generated_ < max_workorders_) {
MOCK_OP_LOG(3) << "[static] generate WorkOrder";
container->addNormalWorkOrder(new MockWorkOrder(op_index_), op_index_);
++num_workorders_generated_;
@@ -304,9 +299,6 @@
constructQueryManager();
- // op doesn't have any dependencies.
- EXPECT_TRUE(op.getBlockingDependenciesMet());
-
// We expect one call for op's getAllWorkOrders().
EXPECT_EQ(1, op.getNumCalls(MockOperator::kGetAllWorkOrders));
EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlock));
@@ -322,9 +314,6 @@
constructQueryManager();
- // op doesn't have any dependencies.
- EXPECT_TRUE(op.getBlockingDependenciesMet());
-
// We expect one call for op's getAllWorkOrders().
EXPECT_EQ(1, op.getNumCalls(MockOperator::kGetAllWorkOrders));
EXPECT_EQ(0, op.getNumCalls(MockOperator::kFeedInputBlock));
@@ -372,9 +361,6 @@
constructQueryManager();
- // op doesn't have any dependencies.
- EXPECT_TRUE(op.getBlockingDependenciesMet());
-
for (int i = 0; i < 3; ++i) {
// We expect one call for op's getAllWorkOrders().
EXPECT_EQ(i + 1, op.getNumCalls(MockOperator::kGetAllWorkOrders));
@@ -437,9 +423,6 @@
constructQueryManager();
- // op1 doesn't have any dependencies
- EXPECT_TRUE(op1.getBlockingDependenciesMet());
-
// Only op1 should receive a call to getAllWorkOrders initially.
EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlock));
@@ -488,9 +471,6 @@
EXPECT_EQ(1, getNumWorkOrdersInExecution(id2));
- // op1 is op2's blocking dependency.
- EXPECT_TRUE(op2.getBlockingDependenciesMet());
-
EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
// op2 should get first call of getAllWorkOrders() when op1 is over.
EXPECT_EQ(1, op2.getNumCalls(MockOperator::kGetAllWorkOrders));
@@ -541,11 +521,6 @@
constructQueryManager();
- // As none of the operators have a blocking link, blocking dependencies should
- // be met.
- EXPECT_TRUE(op1.getBlockingDependenciesMet());
- EXPECT_TRUE(op2.getBlockingDependenciesMet());
-
EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
EXPECT_EQ(1, op1.getNumWorkOrders());
EXPECT_EQ(0, op1.getNumCalls(MockOperator::kFeedInputBlock));
@@ -686,10 +661,6 @@
static_cast<BlockPoolInsertDestination *>(insert_destination)
->available_block_refs_.push_back(move(block_ref));
- // There's no blocking dependency in the DAG.
- EXPECT_TRUE(op1.getBlockingDependenciesMet());
- EXPECT_TRUE(op2.getBlockingDependenciesMet());
-
EXPECT_EQ(1, op1.getNumCalls(MockOperator::kGetAllWorkOrders));
EXPECT_EQ(1, op1.getNumWorkOrders());
diff --git a/relational_operators/DeleteOperator.cpp b/relational_operators/DeleteOperator.cpp
index 14cbf6f..a8723a9 100644
--- a/relational_operators/DeleteOperator.cpp
+++ b/relational_operators/DeleteOperator.cpp
@@ -54,22 +54,24 @@
if (relation_is_stored_) {
// If relation_ is stored, iterate over the list of blocks in relation_.
- if (!started_) {
- for (const block_id input_block_id : relation_block_ids_) {
- container->addNormalWorkOrder(
- new DeleteWorkOrder(query_id_,
- relation_,
- input_block_id,
- predicate,
- storage_manager,
- op_index_,
- scheduler_client_id,
- bus),
- op_index_);
- }
- started_ = true;
+ if (started_) {
+ return true;
}
- return started_;
+
+ for (const block_id input_block_id : relation_block_ids_) {
+ container->addNormalWorkOrder(
+ new DeleteWorkOrder(query_id_,
+ relation_,
+ input_block_id,
+ predicate,
+ storage_manager,
+ op_index_,
+ scheduler_client_id,
+ bus),
+ op_index_);
+ }
+ started_ = true;
+ return true;
} else {
while (num_workorders_generated_ < relation_block_ids_.size()) {
container->addNormalWorkOrder(
@@ -91,12 +93,14 @@
bool DeleteOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
if (relation_is_stored_) {
// If relation_ is stored, iterate over the list of blocks in relation_.
- if (!started_) {
- for (const block_id input_block_id : relation_block_ids_) {
- container->addWorkOrderProto(createWorkOrderProto(input_block_id), op_index_);
- }
- started_ = true;
+ if (started_) {
+ return true;
}
+
+ for (const block_id input_block_id : relation_block_ids_) {
+ container->addWorkOrderProto(createWorkOrderProto(input_block_id), op_index_);
+ }
+ started_ = true;
return true;
} else {
while (num_workorders_generated_ < relation_block_ids_.size()) {
diff --git a/relational_operators/DestroyAggregationStateOperator.cpp b/relational_operators/DestroyAggregationStateOperator.cpp
index 013bf18..3d36e20 100644
--- a/relational_operators/DestroyAggregationStateOperator.cpp
+++ b/relational_operators/DestroyAggregationStateOperator.cpp
@@ -35,32 +35,35 @@
StorageManager *storage_manager,
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus) {
- if (blocking_dependencies_met_ && !work_generated_) {
- work_generated_ = true;
- for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
- container->addNormalWorkOrder(
- new DestroyAggregationStateWorkOrder(query_id_, aggr_state_index_, part_id, query_context),
- op_index_);
- }
+ if (work_generated_) {
+ return true;
}
- return work_generated_;
+
+ for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+ container->addNormalWorkOrder(
+ new DestroyAggregationStateWorkOrder(query_id_, aggr_state_index_, part_id, query_context),
+ op_index_);
+ }
+ work_generated_ = true;
+ return true;
}
bool DestroyAggregationStateOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
- if (blocking_dependencies_met_ && !work_generated_) {
- work_generated_ = true;
-
- for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
- serialization::WorkOrder *proto = new serialization::WorkOrder;
- proto->set_work_order_type(serialization::DESTROY_AGGREGATION_STATE);
- proto->set_query_id(query_id_);
- proto->SetExtension(serialization::DestroyAggregationStateWorkOrder::aggr_state_index, aggr_state_index_);
- proto->SetExtension(serialization::DestroyAggregationStateWorkOrder::partition_id, part_id);
-
- container->addWorkOrderProto(proto, op_index_);
- }
+ if (work_generated_) {
+ return true;
}
- return work_generated_;
+
+ for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+ serialization::WorkOrder *proto = new serialization::WorkOrder;
+ proto->set_work_order_type(serialization::DESTROY_AGGREGATION_STATE);
+ proto->set_query_id(query_id_);
+ proto->SetExtension(serialization::DestroyAggregationStateWorkOrder::aggr_state_index, aggr_state_index_);
+ proto->SetExtension(serialization::DestroyAggregationStateWorkOrder::partition_id, part_id);
+
+ container->addWorkOrderProto(proto, op_index_);
+ }
+ work_generated_ = true;
+ return true;
}
void DestroyAggregationStateWorkOrder::execute() {
diff --git a/relational_operators/DestroyHashOperator.cpp b/relational_operators/DestroyHashOperator.cpp
index 5b84bba..d9ea19b 100644
--- a/relational_operators/DestroyHashOperator.cpp
+++ b/relational_operators/DestroyHashOperator.cpp
@@ -34,32 +34,36 @@
StorageManager *storage_manager,
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus) {
- if (blocking_dependencies_met_ && !work_generated_) {
- for (std::size_t part_id = 0; part_id < build_num_partitions_; ++part_id) {
- container->addNormalWorkOrder(
- new DestroyHashWorkOrder(query_id_, hash_table_index_, part_id, query_context),
- op_index_);
- }
- work_generated_ = true;
+ if (work_generated_) {
+ return true;
}
- return work_generated_;
+
+ for (std::size_t part_id = 0; part_id < build_num_partitions_; ++part_id) {
+ container->addNormalWorkOrder(
+ new DestroyHashWorkOrder(query_id_, hash_table_index_, part_id, query_context),
+ op_index_);
+ }
+ work_generated_ = true;
+ return true;
}
bool DestroyHashOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
- if (blocking_dependencies_met_ && !work_generated_) {
- for (std::size_t part_id = 0; part_id < build_num_partitions_; ++part_id) {
- serialization::WorkOrder *proto = new serialization::WorkOrder;
- proto->set_work_order_type(serialization::DESTROY_HASH);
- proto->set_query_id(query_id_);
- proto->SetExtension(serialization::DestroyHashWorkOrder::join_hash_table_index, hash_table_index_);
- proto->SetExtension(serialization::DestroyHashWorkOrder::partition_id, part_id);
-
- container->addWorkOrderProto(proto, op_index_);
- }
-
- work_generated_ = true;
+ if (work_generated_) {
+ return true;
}
- return work_generated_;
+
+ for (std::size_t part_id = 0; part_id < build_num_partitions_; ++part_id) {
+ serialization::WorkOrder *proto = new serialization::WorkOrder;
+ proto->set_work_order_type(serialization::DESTROY_HASH);
+ proto->set_query_id(query_id_);
+ proto->SetExtension(serialization::DestroyHashWorkOrder::join_hash_table_index, hash_table_index_);
+ proto->SetExtension(serialization::DestroyHashWorkOrder::partition_id, part_id);
+
+ container->addWorkOrderProto(proto, op_index_);
+ }
+
+ work_generated_ = true;
+ return true;
}
diff --git a/relational_operators/DropTableOperator.cpp b/relational_operators/DropTableOperator.cpp
index 5cd5ebc..59f72a0 100644
--- a/relational_operators/DropTableOperator.cpp
+++ b/relational_operators/DropTableOperator.cpp
@@ -42,42 +42,44 @@
StorageManager *storage_manager,
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus) {
- if (blocking_dependencies_met_ && !work_generated_) {
- work_generated_ = true;
-
- std::vector<block_id> relation_blocks(relation_.getBlocksSnapshot());
-
- // DropTableWorkOrder only drops blocks, if any.
- container->addNormalWorkOrder(
- new DropTableWorkOrder(
- query_id_, std::move(relation_blocks), storage_manager),
- op_index_);
-
- database_->setStatus(CatalogDatabase::Status::kPendingBlockDeletions);
+ if (work_generated_) {
+ return true;
}
- return work_generated_;
+ std::vector<block_id> relation_blocks(relation_.getBlocksSnapshot());
+
+ // DropTableWorkOrder only drops blocks, if any.
+ container->addNormalWorkOrder(
+ new DropTableWorkOrder(
+ query_id_, std::move(relation_blocks), storage_manager),
+ op_index_);
+
+ database_->setStatus(CatalogDatabase::Status::kPendingBlockDeletions);
+
+ work_generated_ = true;
+ return true;
}
bool DropTableOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
- if (blocking_dependencies_met_ && !work_generated_) {
- work_generated_ = true;
-
- serialization::WorkOrder *proto = new serialization::WorkOrder;
- proto->set_work_order_type(serialization::DROP_TABLE);
- proto->set_query_id(query_id_);
-
- std::vector<block_id> relation_blocks(relation_.getBlocksSnapshot());
- for (const block_id relation_block : relation_blocks) {
- proto->AddExtension(serialization::DropTableWorkOrder::block_ids, relation_block);
- }
-
- container->addWorkOrderProto(proto, op_index_);
-
- database_->setStatus(CatalogDatabase::Status::kPendingBlockDeletions);
+ if (work_generated_) {
+ return true;
}
- return work_generated_;
+ serialization::WorkOrder *proto = new serialization::WorkOrder;
+ proto->set_work_order_type(serialization::DROP_TABLE);
+ proto->set_query_id(query_id_);
+
+ std::vector<block_id> relation_blocks(relation_.getBlocksSnapshot());
+ for (const block_id relation_block : relation_blocks) {
+ proto->AddExtension(serialization::DropTableWorkOrder::block_ids, relation_block);
+ }
+
+ container->addWorkOrderProto(proto, op_index_);
+
+ database_->setStatus(CatalogDatabase::Status::kPendingBlockDeletions);
+
+ work_generated_ = true;
+ return true;
}
void DropTableOperator::updateCatalogOnCompletion() {
diff --git a/relational_operators/FinalizeAggregationOperator.cpp b/relational_operators/FinalizeAggregationOperator.cpp
index 8283437..efa4cba 100644
--- a/relational_operators/FinalizeAggregationOperator.cpp
+++ b/relational_operators/FinalizeAggregationOperator.cpp
@@ -40,56 +40,59 @@
StorageManager *storage_manager,
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus) {
+ if (started_) {
+ return true;
+ }
+
DCHECK(query_context != nullptr);
-
- if (blocking_dependencies_met_ && !started_) {
- started_ = true;
-
- for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
- AggregationOperationState *agg_state =
- query_context->getAggregationState(aggr_state_index_, part_id);
- DCHECK(agg_state != nullptr);
- for (std::size_t state_part_id = 0;
- state_part_id < aggr_state_num_partitions_;
- ++state_part_id) {
- container->addNormalWorkOrder(
- new FinalizeAggregationWorkOrder(
- query_id_,
- part_id,
- state_part_id,
- agg_state,
- query_context->getInsertDestination(output_destination_index_)),
- op_index_);
- }
+ for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+ AggregationOperationState *agg_state =
+ query_context->getAggregationState(aggr_state_index_, part_id);
+ DCHECK(agg_state != nullptr);
+ for (std::size_t state_part_id = 0;
+ state_part_id < aggr_state_num_partitions_;
+ ++state_part_id) {
+ container->addNormalWorkOrder(
+ new FinalizeAggregationWorkOrder(
+ query_id_,
+ part_id,
+ state_part_id,
+ agg_state,
+ query_context->getInsertDestination(output_destination_index_)),
+ op_index_);
}
}
- return started_;
+
+ started_ = true;
+ return true;
}
// TODO(quickstep-team) : Think about how the number of partitions could be
// accessed in this function. Until then, we can't use partitioned aggregation
// finalization with the distributed version.
bool FinalizeAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
- if (blocking_dependencies_met_ && !started_) {
- started_ = true;
-
- for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
- serialization::WorkOrder *proto = new serialization::WorkOrder;
- proto->set_work_order_type(serialization::FINALIZE_AGGREGATION);
- proto->set_query_id(query_id_);
- proto->SetExtension(serialization::FinalizeAggregationWorkOrder::aggr_state_index,
- aggr_state_index_);
- proto->SetExtension(serialization::FinalizeAggregationWorkOrder::partition_id,
- part_id);
- proto->SetExtension(serialization::FinalizeAggregationWorkOrder::state_partition_id,
- 0u);
- proto->SetExtension(serialization::FinalizeAggregationWorkOrder::insert_destination_index,
- output_destination_index_);
-
- container->addWorkOrderProto(proto, op_index_);
- }
+ if (started_) {
+ return true;
}
- return started_;
+
+ for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+ serialization::WorkOrder *proto = new serialization::WorkOrder;
+ proto->set_work_order_type(serialization::FINALIZE_AGGREGATION);
+ proto->set_query_id(query_id_);
+ proto->SetExtension(serialization::FinalizeAggregationWorkOrder::aggr_state_index,
+ aggr_state_index_);
+ proto->SetExtension(serialization::FinalizeAggregationWorkOrder::partition_id,
+ part_id);
+ proto->SetExtension(serialization::FinalizeAggregationWorkOrder::state_partition_id,
+ 0u);
+ proto->SetExtension(serialization::FinalizeAggregationWorkOrder::insert_destination_index,
+ output_destination_index_);
+
+ container->addWorkOrderProto(proto, op_index_);
+ }
+
+ started_ = true;
+ return true;
}
void FinalizeAggregationWorkOrder::execute() {
diff --git a/relational_operators/HashJoinOperator.cpp b/relational_operators/HashJoinOperator.cpp
index d3e7e08..e385e46 100644
--- a/relational_operators/HashJoinOperator.cpp
+++ b/relational_operators/HashJoinOperator.cpp
@@ -202,56 +202,53 @@
WorkOrdersContainer *container,
QueryContext *query_context,
StorageManager *storage_manager) {
- // We wait until the building of global hash table is complete.
- if (blocking_dependencies_met_) {
- DCHECK(query_context != nullptr);
+ DCHECK(query_context != nullptr);
- const Predicate *residual_predicate =
- query_context->getPredicate(residual_predicate_index_);
- const vector<unique_ptr<const Scalar>> &selection =
- query_context->getScalarGroup(selection_index_);
- InsertDestination *output_destination =
- query_context->getInsertDestination(output_destination_index_);
+ const Predicate *residual_predicate =
+ query_context->getPredicate(residual_predicate_index_);
+ const vector<unique_ptr<const Scalar>> &selection =
+ query_context->getScalarGroup(selection_index_);
+ InsertDestination *output_destination =
+ query_context->getInsertDestination(output_destination_index_);
- if (probe_relation_is_stored_) {
- if (started_) {
- return true;
- }
-
- for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) {
- const JoinHashTable &hash_table =
- *(query_context->getJoinHashTable(hash_table_index_, part_id));
-
- for (const block_id probe_block_id : probe_relation_block_ids_[part_id]) {
- container->addNormalWorkOrder(
- new JoinWorkOrderClass(query_id_, build_relation_, probe_relation_, join_key_attributes_,
- any_join_key_attributes_nullable_, part_id, probe_block_id, residual_predicate,
- selection, hash_table, output_destination, storage_manager,
- CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)),
- op_index_);
- }
- }
- started_ = true;
+ if (probe_relation_is_stored_) {
+ if (started_) {
return true;
- } else {
- for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) {
- const JoinHashTable &hash_table =
- *(query_context->getJoinHashTable(hash_table_index_, part_id));
+ }
- while (num_workorders_generated_[part_id] < probe_relation_block_ids_[part_id].size()) {
- container->addNormalWorkOrder(
- new JoinWorkOrderClass(query_id_, build_relation_, probe_relation_, join_key_attributes_,
- any_join_key_attributes_nullable_, part_id,
- probe_relation_block_ids_[part_id][num_workorders_generated_[part_id]],
- residual_predicate, selection, hash_table, output_destination, storage_manager,
- CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)),
- op_index_);
- ++num_workorders_generated_[part_id];
- } // end while
- } // end for
- return done_feeding_input_relation_;
- } // end else (probe_relation_is_stored_)
- } // end if (blocking_dependencies_met_)
+ for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) {
+ const JoinHashTable &hash_table =
+ *(query_context->getJoinHashTable(hash_table_index_, part_id));
+
+ for (const block_id probe_block_id : probe_relation_block_ids_[part_id]) {
+ container->addNormalWorkOrder(
+ new JoinWorkOrderClass(query_id_, build_relation_, probe_relation_, join_key_attributes_,
+ any_join_key_attributes_nullable_, part_id, probe_block_id, residual_predicate,
+ selection, hash_table, output_destination, storage_manager,
+ CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)),
+ op_index_);
+ }
+ }
+ started_ = true;
+ return true;
+ } else {
+ for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) {
+ const JoinHashTable &hash_table =
+ *(query_context->getJoinHashTable(hash_table_index_, part_id));
+
+ while (num_workorders_generated_[part_id] < probe_relation_block_ids_[part_id].size()) {
+ container->addNormalWorkOrder(
+ new JoinWorkOrderClass(query_id_, build_relation_, probe_relation_, join_key_attributes_,
+ any_join_key_attributes_nullable_, part_id,
+ probe_relation_block_ids_[part_id][num_workorders_generated_[part_id]],
+ residual_predicate, selection, hash_table, output_destination, storage_manager,
+ CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)),
+ op_index_);
+ ++num_workorders_generated_[part_id];
+ } // end while
+ } // end for
+ return done_feeding_input_relation_;
+ } // end else (probe_relation_is_stored_)
return false;
}
@@ -259,56 +256,53 @@
WorkOrdersContainer *container,
QueryContext *query_context,
StorageManager *storage_manager) {
- // We wait until the building of global hash table is complete.
- if (blocking_dependencies_met_) {
- DCHECK(query_context != nullptr);
+ DCHECK(query_context != nullptr);
- const vector<unique_ptr<const Scalar>> &selection =
- query_context->getScalarGroup(selection_index_);
+ const vector<unique_ptr<const Scalar>> &selection =
+ query_context->getScalarGroup(selection_index_);
- InsertDestination *output_destination =
- query_context->getInsertDestination(output_destination_index_);
+ InsertDestination *output_destination =
+ query_context->getInsertDestination(output_destination_index_);
- if (probe_relation_is_stored_) {
- if (started_) {
- return true;
- }
-
- for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) {
- const JoinHashTable &hash_table =
- *(query_context->getJoinHashTable(hash_table_index_, part_id));
-
- for (const block_id probe_block_id : probe_relation_block_ids_[part_id]) {
- container->addNormalWorkOrder(
- new HashOuterJoinWorkOrder(query_id_, build_relation_, probe_relation_, join_key_attributes_,
- any_join_key_attributes_nullable_, part_id, probe_block_id, selection,
- is_selection_on_build_, hash_table, output_destination, storage_manager,
- CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)),
- op_index_);
- }
- }
- started_ = true;
+ if (probe_relation_is_stored_) {
+ if (started_) {
return true;
- } else {
- for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) {
- const JoinHashTable &hash_table =
- *(query_context->getJoinHashTable(hash_table_index_, part_id));
+ }
- while (num_workorders_generated_[part_id] < probe_relation_block_ids_[part_id].size()) {
- container->addNormalWorkOrder(
- new HashOuterJoinWorkOrder(query_id_, build_relation_, probe_relation_, join_key_attributes_,
- any_join_key_attributes_nullable_, part_id,
- probe_relation_block_ids_[part_id][num_workorders_generated_[part_id]],
- selection, is_selection_on_build_, hash_table, output_destination,
- storage_manager,
- CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)),
- op_index_);
- ++num_workorders_generated_[part_id];
- }
+ for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) {
+ const JoinHashTable &hash_table =
+ *(query_context->getJoinHashTable(hash_table_index_, part_id));
+
+ for (const block_id probe_block_id : probe_relation_block_ids_[part_id]) {
+ container->addNormalWorkOrder(
+ new HashOuterJoinWorkOrder(query_id_, build_relation_, probe_relation_, join_key_attributes_,
+ any_join_key_attributes_nullable_, part_id, probe_block_id, selection,
+ is_selection_on_build_, hash_table, output_destination, storage_manager,
+ CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)),
+ op_index_);
}
- return done_feeding_input_relation_;
- } // end else (probe_relation_is_stored_)
- } // end if (blocking_dependencies_met_)
+ }
+ started_ = true;
+ return true;
+ } else {
+ for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) {
+ const JoinHashTable &hash_table =
+ *(query_context->getJoinHashTable(hash_table_index_, part_id));
+
+ while (num_workorders_generated_[part_id] < probe_relation_block_ids_[part_id].size()) {
+ container->addNormalWorkOrder(
+ new HashOuterJoinWorkOrder(query_id_, build_relation_, probe_relation_, join_key_attributes_,
+ any_join_key_attributes_nullable_, part_id,
+ probe_relation_block_ids_[part_id][num_workorders_generated_[part_id]],
+ selection, is_selection_on_build_, hash_table, output_destination,
+ storage_manager,
+ CreateLIPFilterAdaptiveProberHelper(lip_deployment_index_, query_context)),
+ op_index_);
+ ++num_workorders_generated_[part_id];
+ }
+ }
+ return done_feeding_input_relation_;
+ } // end else (probe_relation_is_stored_)
return false;
}
@@ -330,11 +324,6 @@
bool HashJoinOperator::getAllNonOuterJoinWorkOrderProtos(
WorkOrderProtosContainer *container,
const serialization::HashJoinWorkOrder::HashJoinWorkOrderType hash_join_type) {
- // We wait until the building of global hash table is complete.
- if (!blocking_dependencies_met_) {
- return false;
- }
-
if (probe_relation_is_stored_) {
if (started_) {
return true;
@@ -396,11 +385,6 @@
}
bool HashJoinOperator::getAllOuterJoinWorkOrderProtos(WorkOrderProtosContainer *container) {
- // We wait until the building of global hash table is complete.
- if (!blocking_dependencies_met_) {
- return false;
- }
-
if (probe_relation_is_stored_) {
if (started_) {
return true;
diff --git a/relational_operators/InitializeAggregationOperator.cpp b/relational_operators/InitializeAggregationOperator.cpp
index f91299d..136686b 100644
--- a/relational_operators/InitializeAggregationOperator.cpp
+++ b/relational_operators/InitializeAggregationOperator.cpp
@@ -39,24 +39,27 @@
StorageManager *storage_manager,
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus) {
- if (!started_) {
- for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
- AggregationOperationState *agg_state =
- query_context->getAggregationState(aggr_state_index_, part_id);
- DCHECK(agg_state != nullptr);
-
- for (std::size_t state_part_id = 0;
- state_part_id < aggr_state_num_init_partitions_;
- ++state_part_id) {
- container->addNormalWorkOrder(
- new InitializeAggregationWorkOrder(query_id_,
- state_part_id,
- agg_state),
- op_index_);
- }
- }
- started_ = true;
+ if (started_) {
+ return true;
}
+
+ for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+ AggregationOperationState *agg_state =
+ query_context->getAggregationState(aggr_state_index_, part_id);
+ DCHECK(agg_state != nullptr);
+
+ for (std::size_t state_part_id = 0;
+ state_part_id < aggr_state_num_init_partitions_;
+ ++state_part_id) {
+ container->addNormalWorkOrder(
+ new InitializeAggregationWorkOrder(query_id_,
+ state_part_id,
+ agg_state),
+ op_index_);
+ }
+ }
+
+ started_ = true;
return true;
}
diff --git a/relational_operators/InsertOperator.cpp b/relational_operators/InsertOperator.cpp
index 31c7fa8..fbd3a07 100644
--- a/relational_operators/InsertOperator.cpp
+++ b/relational_operators/InsertOperator.cpp
@@ -39,34 +39,37 @@
StorageManager *storage_manager,
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus) {
- if (blocking_dependencies_met_ && !work_generated_) {
- DCHECK(query_context != nullptr);
-
- work_generated_ = true;
- container->addNormalWorkOrder(
- new InsertWorkOrder(
- query_id_,
- query_context->getInsertDestination(output_destination_index_),
- query_context->releaseTuple(tuple_index_)),
- op_index_);
+ if (work_generated_) {
+ return true;
}
- return work_generated_;
+
+ DCHECK(query_context != nullptr);
+ container->addNormalWorkOrder(
+ new InsertWorkOrder(
+ query_id_,
+ query_context->getInsertDestination(output_destination_index_),
+ query_context->releaseTuple(tuple_index_)),
+ op_index_);
+
+ work_generated_ = true;
+ return true;
}
bool InsertOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
- if (blocking_dependencies_met_ && !work_generated_) {
- work_generated_ = true;
-
- serialization::WorkOrder *proto = new serialization::WorkOrder;
- proto->set_work_order_type(serialization::INSERT);
- proto->set_query_id(query_id_);
- proto->SetExtension(serialization::InsertWorkOrder::insert_destination_index, output_destination_index_);
- proto->SetExtension(serialization::InsertWorkOrder::tuple_index, tuple_index_);
-
- container->addWorkOrderProto(proto, op_index_);
+ if (work_generated_) {
+ return true;
}
- return work_generated_;
+ serialization::WorkOrder *proto = new serialization::WorkOrder;
+ proto->set_work_order_type(serialization::INSERT);
+ proto->set_query_id(query_id_);
+ proto->SetExtension(serialization::InsertWorkOrder::insert_destination_index, output_destination_index_);
+ proto->SetExtension(serialization::InsertWorkOrder::tuple_index, tuple_index_);
+
+ container->addWorkOrderProto(proto, op_index_);
+
+ work_generated_ = true;
+ return true;
}
diff --git a/relational_operators/RelationalOperator.hpp b/relational_operators/RelationalOperator.hpp
index 425fa32..5de7eb5 100644
--- a/relational_operators/RelationalOperator.hpp
+++ b/relational_operators/RelationalOperator.hpp
@@ -163,19 +163,6 @@
}
/**
- * @brief Inform this RelationalOperator that ALL the dependencies which break
- * the pipeline have been met.
- *
- * @note This function is only relevant in certain operators like HashJoin
- * which have a pipeline breaking dependency on BuildHash operator.
- * Such operators can start generating WorkOrders when all the pipeline
- * breaking dependencies are met.
- **/
- inline void informAllBlockingDependenciesMet() {
- blocking_dependencies_met_ = true;
- }
-
- /**
* @brief Receive input blocks for this RelationalOperator.
*
* @param input_block_id The ID of the input block.
@@ -289,16 +276,13 @@
* @param blocking_dependencies_met If those dependencies which break the
* pipeline have been met.
**/
- explicit RelationalOperator(const std::size_t query_id,
- const bool blocking_dependencies_met = false)
+ explicit RelationalOperator(const std::size_t query_id)
: query_id_(query_id),
- blocking_dependencies_met_(blocking_dependencies_met),
done_feeding_input_relation_(false),
lip_deployment_index_(QueryContext::kInvalidLIPDeploymentId) {}
const std::size_t query_id_;
- bool blocking_dependencies_met_;
bool done_feeding_input_relation_;
std::size_t op_index_;
diff --git a/relational_operators/SampleOperator.cpp b/relational_operators/SampleOperator.cpp
index 63733bf..d4f84b0 100644
--- a/relational_operators/SampleOperator.cpp
+++ b/relational_operators/SampleOperator.cpp
@@ -53,39 +53,42 @@
std::uniform_real_distribution<> distribution(0, 1);
const double probability = static_cast<double>(percentage_) / 100;
if (input_relation_is_stored_) {
- if (!started_) {
- // If the sampling is by block choose blocks randomly
- if (is_block_sample_) {
- for (const block_id input_block_id : input_relation_block_ids_) {
- if (distribution(generator) <= probability) {
- container->addNormalWorkOrder(
- new SampleWorkOrder(query_id_,
- input_relation_,
- input_block_id,
- is_block_sample_,
- percentage_,
- output_destination,
- storage_manager),
- op_index_);
- }
- }
- } else {
- // Add all the blocks for tuple sampling which would handle
- // the sampling from each block
- for (const block_id input_block_id : input_relation_block_ids_) {
- container->addNormalWorkOrder(new SampleWorkOrder(query_id_,
- input_relation_,
- input_block_id,
- is_block_sample_,
- percentage_,
- output_destination,
- storage_manager),
- op_index_);
+ if (started_) {
+ return true;
+ }
+
+ // If the sampling is by block choose blocks randomly
+ if (is_block_sample_) {
+ for (const block_id input_block_id : input_relation_block_ids_) {
+ if (distribution(generator) <= probability) {
+ container->addNormalWorkOrder(
+ new SampleWorkOrder(query_id_,
+ input_relation_,
+ input_block_id,
+ is_block_sample_,
+ percentage_,
+ output_destination,
+ storage_manager),
+ op_index_);
}
}
- started_ = true;
+ } else {
+ // Add all the blocks for tuple sampling which would handle
+ // the sampling from each block
+ for (const block_id input_block_id : input_relation_block_ids_) {
+ container->addNormalWorkOrder(new SampleWorkOrder(query_id_,
+ input_relation_,
+ input_block_id,
+ is_block_sample_,
+ percentage_,
+ output_destination,
+ storage_manager),
+ op_index_);
+ }
}
- return started_;
+
+ started_ = true;
+ return true;
} else {
if (is_block_sample_) {
while (num_workorders_generated_ < input_relation_block_ids_.size()) {
diff --git a/relational_operators/TableGeneratorOperator.cpp b/relational_operators/TableGeneratorOperator.cpp
index 3f62fc9..a9e187d 100644
--- a/relational_operators/TableGeneratorOperator.cpp
+++ b/relational_operators/TableGeneratorOperator.cpp
@@ -39,35 +39,39 @@
StorageManager *storage_manager,
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus) {
- if (!started_) {
- DCHECK(query_context != nullptr);
-
- // Currently the generator function is not abstracted to be parallelizable,
- // so just produce one work order.
- container->addNormalWorkOrder(
- new TableGeneratorWorkOrder(
- query_id_,
- query_context->getGeneratorFunctionHandle(
- generator_function_index_),
- query_context->getInsertDestination(output_destination_index_)),
- op_index_);
- started_ = true;
+ if (started_) {
+ return true;
}
- return started_;
+
+ DCHECK(query_context != nullptr);
+
+ // Currently the generator function is not abstracted to be parallelizable,
+ // so just produce one work order.
+ container->addNormalWorkOrder(
+ new TableGeneratorWorkOrder(
+ query_id_,
+ query_context->getGeneratorFunctionHandle(
+ generator_function_index_),
+ query_context->getInsertDestination(output_destination_index_)),
+ op_index_);
+ started_ = true;
+ return true;
}
bool TableGeneratorOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
- if (!started_) {
- serialization::WorkOrder *proto = new serialization::WorkOrder;
- proto->set_work_order_type(serialization::TABLE_GENERATOR);
- proto->set_query_id(query_id_);
-
- proto->SetExtension(serialization::TableGeneratorWorkOrder::generator_function_index, generator_function_index_);
- proto->SetExtension(serialization::TableGeneratorWorkOrder::insert_destination_index, output_destination_index_);
-
- container->addWorkOrderProto(proto, op_index_);
- started_ = true;
+ if (started_) {
+ return true;
}
+
+ serialization::WorkOrder *proto = new serialization::WorkOrder;
+ proto->set_work_order_type(serialization::TABLE_GENERATOR);
+ proto->set_query_id(query_id_);
+
+ proto->SetExtension(serialization::TableGeneratorWorkOrder::generator_function_index, generator_function_index_);
+ proto->SetExtension(serialization::TableGeneratorWorkOrder::insert_destination_index, output_destination_index_);
+
+ container->addWorkOrderProto(proto, op_index_);
+ started_ = true;
return true;
}
diff --git a/relational_operators/TextScanOperator.cpp b/relational_operators/TextScanOperator.cpp
index a133e0c..3ca3af4 100644
--- a/relational_operators/TextScanOperator.cpp
+++ b/relational_operators/TextScanOperator.cpp
@@ -115,75 +115,81 @@
InsertDestination *output_destination =
query_context->getInsertDestination(output_destination_index_);
- if (blocking_dependencies_met_ && !work_generated_) {
- for (const std::string &file : files) {
+ if (work_generated_) {
+ return true;
+ }
+
+ for (const std::string &file : files) {
#ifdef QUICKSTEP_HAVE_UNISTD
- // Check file permissions before trying to open it.
- const int access_result = access(file.c_str(), R_OK);
- CHECK_EQ(0, access_result)
- << "File " << file << " is not readable due to permission issues.";
+ // Check file permissions before trying to open it.
+ const int access_result = access(file.c_str(), R_OK);
+ CHECK_EQ(0, access_result)
+ << "File " << file << " is not readable due to permission issues.";
#endif // QUICKSTEP_HAVE_UNISTD
- const std::size_t file_size = getFileSize(file);
+ const std::size_t file_size = getFileSize(file);
- std::size_t text_offset = 0;
- for (size_t num_full_segments = file_size / FLAGS_textscan_text_segment_size;
- num_full_segments > 0;
- --num_full_segments, text_offset += FLAGS_textscan_text_segment_size) {
- container->addNormalWorkOrder(
- new TextScanWorkOrder(query_id_,
- file,
- text_offset,
- FLAGS_textscan_text_segment_size,
- field_terminator_,
- process_escape_sequences_,
- output_destination),
- op_index_);
- }
-
- // Deal with the residual partial segment whose size is less than
- // 'FLAGS_textscan_text_segment_size'.
- if (text_offset < file_size) {
- container->addNormalWorkOrder(
- new TextScanWorkOrder(query_id_,
- file,
- text_offset,
- file_size - text_offset,
- field_terminator_,
- process_escape_sequences_,
- output_destination),
- op_index_);
- }
+ std::size_t text_offset = 0;
+ for (size_t num_full_segments = file_size / FLAGS_textscan_text_segment_size;
+ num_full_segments > 0;
+ --num_full_segments, text_offset += FLAGS_textscan_text_segment_size) {
+ container->addNormalWorkOrder(
+ new TextScanWorkOrder(query_id_,
+ file,
+ text_offset,
+ FLAGS_textscan_text_segment_size,
+ field_terminator_,
+ process_escape_sequences_,
+ output_destination),
+ op_index_);
}
- work_generated_ = true;
+
+ // Deal with the residual partial segment whose size is less than
+ // 'FLAGS_textscan_text_segment_size'.
+ if (text_offset < file_size) {
+ container->addNormalWorkOrder(
+ new TextScanWorkOrder(query_id_,
+ file,
+ text_offset,
+ file_size - text_offset,
+ field_terminator_,
+ process_escape_sequences_,
+ output_destination),
+ op_index_);
+ }
}
- return work_generated_;
+
+ work_generated_ = true;
+ return true;
}
bool TextScanOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
const std::vector<std::string> files = utility::file::GlobExpand(file_pattern_);
- if (blocking_dependencies_met_ && !work_generated_) {
- for (const string &file : files) {
- const std::size_t file_size = getFileSize(file);
-
- size_t text_offset = 0;
- for (size_t num_full_segments = file_size / FLAGS_textscan_text_segment_size;
- num_full_segments > 0;
- --num_full_segments, text_offset += FLAGS_textscan_text_segment_size) {
- container->addWorkOrderProto(createWorkOrderProto(file, text_offset, FLAGS_textscan_text_segment_size),
- op_index_);
- }
-
- // Deal with the residual partial segment whose size is less than
- // 'FLAGS_textscan_text_segment_size'.
- if (text_offset < file_size) {
- container->addWorkOrderProto(createWorkOrderProto(file, text_offset, file_size - text_offset),
- op_index_);
- }
- }
- work_generated_ = true;
+ if (work_generated_) {
+ return true;
}
- return work_generated_;
+
+ for (const string &file : files) {
+ const std::size_t file_size = getFileSize(file);
+
+ size_t text_offset = 0;
+ for (size_t num_full_segments = file_size / FLAGS_textscan_text_segment_size;
+ num_full_segments > 0;
+ --num_full_segments, text_offset += FLAGS_textscan_text_segment_size) {
+ container->addWorkOrderProto(createWorkOrderProto(file, text_offset, FLAGS_textscan_text_segment_size),
+ op_index_);
+ }
+
+ // Deal with the residual partial segment whose size is less than
+ // 'FLAGS_textscan_text_segment_size'.
+ if (text_offset < file_size) {
+ container->addWorkOrderProto(createWorkOrderProto(file, text_offset, file_size - text_offset),
+ op_index_);
+ }
+ }
+
+ work_generated_ = true;
+ return true;
}
serialization::WorkOrder* TextScanOperator::createWorkOrderProto(const string &filename,
diff --git a/relational_operators/UpdateOperator.cpp b/relational_operators/UpdateOperator.cpp
index 40dfb22..08bfa59 100644
--- a/relational_operators/UpdateOperator.cpp
+++ b/relational_operators/UpdateOperator.cpp
@@ -52,49 +52,52 @@
StorageManager *storage_manager,
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus) {
- if (blocking_dependencies_met_ && !started_) {
- DCHECK(query_context != nullptr);
-
- for (const block_id input_block_id : input_blocks_) {
- container->addNormalWorkOrder(
- new UpdateWorkOrder(
- query_id_,
- relation_,
- input_block_id,
- query_context->getPredicate(predicate_index_),
- query_context->getUpdateGroup(update_group_index_),
- query_context->getInsertDestination(
- relocation_destination_index_),
- storage_manager,
- op_index_,
- scheduler_client_id,
- bus),
- op_index_);
- }
- started_ = true;
+ if (started_) {
+ return true;
}
- return started_;
+
+ DCHECK(query_context != nullptr);
+ for (const block_id input_block_id : input_blocks_) {
+ container->addNormalWorkOrder(
+ new UpdateWorkOrder(
+ query_id_,
+ relation_,
+ input_block_id,
+ query_context->getPredicate(predicate_index_),
+ query_context->getUpdateGroup(update_group_index_),
+ query_context->getInsertDestination(
+ relocation_destination_index_),
+ storage_manager,
+ op_index_,
+ scheduler_client_id,
+ bus),
+ op_index_);
+ }
+ started_ = true;
+ return true;
}
bool UpdateOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
- if (blocking_dependencies_met_ && !started_) {
- for (const block_id input_block_id : input_blocks_) {
- serialization::WorkOrder *proto = new serialization::WorkOrder;
- proto->set_work_order_type(serialization::UPDATE);
- proto->set_query_id(query_id_);
-
- proto->SetExtension(serialization::UpdateWorkOrder::operator_index, op_index_);
- proto->SetExtension(serialization::UpdateWorkOrder::relation_id, relation_.getID());
- proto->SetExtension(serialization::UpdateWorkOrder::insert_destination_index, relocation_destination_index_);
- proto->SetExtension(serialization::UpdateWorkOrder::predicate_index, predicate_index_);
- proto->SetExtension(serialization::UpdateWorkOrder::update_group_index, update_group_index_);
- proto->SetExtension(serialization::UpdateWorkOrder::block_id, input_block_id);
-
- container->addWorkOrderProto(proto, op_index_);
- }
- started_ = true;
+ if (started_) {
+ return true;
}
- return started_;
+
+ for (const block_id input_block_id : input_blocks_) {
+ serialization::WorkOrder *proto = new serialization::WorkOrder;
+ proto->set_work_order_type(serialization::UPDATE);
+ proto->set_query_id(query_id_);
+
+ proto->SetExtension(serialization::UpdateWorkOrder::operator_index, op_index_);
+ proto->SetExtension(serialization::UpdateWorkOrder::relation_id, relation_.getID());
+ proto->SetExtension(serialization::UpdateWorkOrder::insert_destination_index, relocation_destination_index_);
+ proto->SetExtension(serialization::UpdateWorkOrder::predicate_index, predicate_index_);
+ proto->SetExtension(serialization::UpdateWorkOrder::update_group_index, update_group_index_);
+ proto->SetExtension(serialization::UpdateWorkOrder::block_id, input_block_id);
+
+ container->addWorkOrderProto(proto, op_index_);
+ }
+ started_ = true;
+ return true;
}
void UpdateWorkOrder::execute() {
diff --git a/relational_operators/WindowAggregationOperator.cpp b/relational_operators/WindowAggregationOperator.cpp
index 5a1f8ec..2fd9e32 100644
--- a/relational_operators/WindowAggregationOperator.cpp
+++ b/relational_operators/WindowAggregationOperator.cpp
@@ -39,32 +39,33 @@
StorageManager *storage_manager,
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus) {
- DCHECK(query_context != nullptr);
-
- if (blocking_dependencies_met_ && !generated_) {
- std::vector<block_id> relation_blocks =
- input_relation_.getBlocksSnapshot();
-
- container->addNormalWorkOrder(
- new WindowAggregationWorkOrder(
- query_id_,
- query_context->releaseWindowAggregationState(window_aggregation_state_index_),
- std::move(relation_blocks),
- query_context->getInsertDestination(output_destination_index_)),
- op_index_);
- generated_ = true;
+ if (generated_) {
+ return true;
}
- return generated_;
+ std::vector<block_id> relation_blocks =
+ input_relation_.getBlocksSnapshot();
+
+ DCHECK(query_context != nullptr);
+ container->addNormalWorkOrder(
+ new WindowAggregationWorkOrder(
+ query_id_,
+ query_context->releaseWindowAggregationState(window_aggregation_state_index_),
+ std::move(relation_blocks),
+ query_context->getInsertDestination(output_destination_index_)),
+ op_index_);
+ generated_ = true;
+ return true;
}
bool WindowAggregationOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
- if (blocking_dependencies_met_ && !generated_) {
- container->addWorkOrderProto(createWorkOrderProto(), op_index_);
- generated_ = true;
+ if (generated_) {
+ return true;
}
- return generated_;
+ container->addWorkOrderProto(createWorkOrderProto(), op_index_);
+ generated_ = true;
+ return true;
}
serialization::WorkOrder* WindowAggregationOperator::createWorkOrderProto() {
diff --git a/relational_operators/tests/AggregationOperator_unittest.cpp b/relational_operators/tests/AggregationOperator_unittest.cpp
index 3b4a737..4cda2d1 100644
--- a/relational_operators/tests/AggregationOperator_unittest.cpp
+++ b/relational_operators/tests/AggregationOperator_unittest.cpp
@@ -425,8 +425,6 @@
delete work_order;
}
- finalize_op_->informAllBlockingDependenciesMet();
-
WorkOrdersContainer finalize_op_container(1, 0);
const std::size_t finalize_op_index = 0;
finalize_op_->getAllWorkOrders(&finalize_op_container,
@@ -441,8 +439,6 @@
delete work_order;
}
- destroy_aggr_state_op_->informAllBlockingDependenciesMet();
-
WorkOrdersContainer destroy_aggr_state_op_container(1, 0);
const std::size_t destroy_aggr_state_op_index = 0;
destroy_aggr_state_op_->getAllWorkOrders(&destroy_aggr_state_op_container,
diff --git a/relational_operators/tests/HashJoinOperator_unittest.cpp b/relational_operators/tests/HashJoinOperator_unittest.cpp
index 8338872..89aafe5 100644
--- a/relational_operators/tests/HashJoinOperator_unittest.cpp
+++ b/relational_operators/tests/HashJoinOperator_unittest.cpp
@@ -469,8 +469,6 @@
// Execute the operators.
fetchAndExecuteWorkOrders(builder.get());
-
- prober->informAllBlockingDependenciesMet();
fetchAndExecuteWorkOrders(prober.get());
// Check result values
@@ -512,7 +510,6 @@
// Create cleaner operator.
unique_ptr<DestroyHashOperator> cleaner(new DestroyHashOperator(kQueryId, kSinglePartition, join_hash_table_index));
- cleaner->informAllBlockingDependenciesMet();
fetchAndExecuteWorkOrders(cleaner.get());
db_->dropRelationById(output_relation_id);
@@ -623,8 +620,6 @@
// Execute the operators.
fetchAndExecuteWorkOrders(builder.get());
-
- prober->informAllBlockingDependenciesMet();
fetchAndExecuteWorkOrders(prober.get());
// Check result values
@@ -687,7 +682,6 @@
// Create cleaner operator.
unique_ptr<DestroyHashOperator> cleaner(new DestroyHashOperator(kQueryId, kSinglePartition, join_hash_table_index));
- cleaner->informAllBlockingDependenciesMet();
fetchAndExecuteWorkOrders(cleaner.get());
db_->dropRelationById(output_relation_id);
@@ -784,8 +778,6 @@
// Execute the operators.
fetchAndExecuteWorkOrders(builder.get());
-
- prober->informAllBlockingDependenciesMet();
fetchAndExecuteWorkOrders(prober.get());
// Check result values
@@ -827,7 +819,6 @@
// Create cleaner operator.
unique_ptr<DestroyHashOperator> cleaner(new DestroyHashOperator(kQueryId, kSinglePartition, join_hash_table_index));
- cleaner->informAllBlockingDependenciesMet();
fetchAndExecuteWorkOrders(cleaner.get());
db_->dropRelationById(output_relation_id);
@@ -931,8 +922,6 @@
// Execute the operators.
fetchAndExecuteWorkOrders(builder.get());
-
- prober->informAllBlockingDependenciesMet();
fetchAndExecuteWorkOrders(prober.get());
// Check result values
@@ -999,7 +988,6 @@
// Create the cleaner operator.
unique_ptr<DestroyHashOperator> cleaner(new DestroyHashOperator(kQueryId, kSinglePartition, join_hash_table_index));
- cleaner->informAllBlockingDependenciesMet();
fetchAndExecuteWorkOrders(cleaner.get());
db_->dropRelationById(output_relation_id);
@@ -1112,8 +1100,6 @@
// Execute the operators.
fetchAndExecuteWorkOrders(builder.get());
-
- prober->informAllBlockingDependenciesMet();
fetchAndExecuteWorkOrders(prober.get());
// Check result values
@@ -1180,7 +1166,6 @@
// Create cleaner operator.
unique_ptr<DestroyHashOperator> cleaner(new DestroyHashOperator(kQueryId, kSinglePartition, join_hash_table_index));
- cleaner->informAllBlockingDependenciesMet();
fetchAndExecuteWorkOrders(cleaner.get());
db_->dropRelationById(output_relation_id);
@@ -1304,8 +1289,6 @@
// Execute the operators.
fetchAndExecuteWorkOrders(builder.get());
-
- prober->informAllBlockingDependenciesMet();
fetchAndExecuteWorkOrders(prober.get());
// Check result values
@@ -1372,7 +1355,6 @@
// Create cleaner operator.
unique_ptr<DestroyHashOperator> cleaner(new DestroyHashOperator(kQueryId, kSinglePartition, join_hash_table_index));
- cleaner->informAllBlockingDependenciesMet();
fetchAndExecuteWorkOrders(cleaner.get());
db_->dropRelationById(output_relation_id);
@@ -1476,8 +1458,6 @@
// Execute the operators.
fetchAndExecuteWorkOrders(builder.get());
-
- prober->informAllBlockingDependenciesMet();
fetchAndExecuteWorkOrders(prober.get());
// Check result values
@@ -1519,7 +1499,6 @@
// Create cleaner operator.
auto cleaner = make_unique<DestroyHashOperator>(kQueryId, kMultiplePartitions, join_hash_table_index);
- cleaner->informAllBlockingDependenciesMet();
fetchAndExecuteWorkOrders(cleaner.get());
db_->dropRelationById(output_relation_id);
@@ -1624,8 +1603,6 @@
// Execute the operators.
fetchAndExecuteWorkOrders(builder.get());
-
- prober->informAllBlockingDependenciesMet();
fetchAndExecuteWorkOrders(prober.get());
// Check result values
@@ -1692,7 +1669,6 @@
// Create cleaner operator.
auto cleaner = make_unique<DestroyHashOperator>(kQueryId, kMultiplePartitions, join_hash_table_index);
- cleaner->informAllBlockingDependenciesMet();
fetchAndExecuteWorkOrders(cleaner.get());
db_->dropRelationById(output_relation_id);
@@ -1807,8 +1783,6 @@
// Execute the operators.
fetchAndExecuteWorkOrders(builder.get());
-
- prober->informAllBlockingDependenciesMet();
fetchAndExecuteWorkOrders(prober.get());
// Check result values
@@ -1875,7 +1849,6 @@
// Create cleaner operator.
auto cleaner = make_unique<DestroyHashOperator>(kQueryId, kMultiplePartitions, join_hash_table_index);
- cleaner->informAllBlockingDependenciesMet();
fetchAndExecuteWorkOrders(cleaner.get());
db_->dropRelationById(output_relation_id);
diff --git a/relational_operators/tests/TextScanOperator_unittest.cpp b/relational_operators/tests/TextScanOperator_unittest.cpp
index 53a9124..c92a3dd 100644
--- a/relational_operators/tests/TextScanOperator_unittest.cpp
+++ b/relational_operators/tests/TextScanOperator_unittest.cpp
@@ -110,7 +110,6 @@
op->setOperatorIndex(kOpIndex);
WorkOrdersContainer container(1, 0);
const std::size_t op_index = 0;
- op->informAllBlockingDependenciesMet();
op->getAllWorkOrders(&container,
query_context_.get(),
storage_manager_.get(),