Applied WorkOrderSelectionPolicy.
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 4c3b52a..5c750f0 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -63,6 +63,7 @@
add_library(quickstep_queryexecution_Worker Worker.cpp Worker.hpp)
add_library(quickstep_queryexecution_WorkerDirectory ../empty_src.cpp WorkerDirectory.hpp)
add_library(quickstep_queryexecution_WorkerMessage ../empty_src.cpp WorkerMessage.hpp)
+add_library(quickstep_queryexecution_WorkOrderSelectionPolicy ../empty_src.cpp WorkOrderSelectionPolicy.hpp)
add_library(quickstep_queryexecution_WorkerSelectionPolicy ../empty_src.cpp WorkerSelectionPolicy.hpp)
# Link dependencies:
@@ -334,6 +335,7 @@
quickstep_utility_Macros)
target_link_libraries(quickstep_queryexecution_WorkOrdersContainer
glog
+ quickstep_queryexecution_WorkOrderSelectionPolicy
quickstep_relationaloperators_WorkOrder
quickstep_utility_Macros
quickstep_utility_PtrVector)
@@ -352,6 +354,9 @@
target_link_libraries(quickstep_queryexecution_WorkerDirectory
quickstep_queryexecution_QueryExecutionTypedefs
quickstep_utility_Macros)
+target_link_libraries(quickstep_queryexecution_WorkOrderSelectionPolicy
+ glog
+ quickstep_utility_Macros)
target_link_libraries(quickstep_queryexecution_WorkerSelectionPolicy
quickstep_queryexecution_WorkerDirectory
quickstep_utility_Macros)
@@ -377,6 +382,7 @@
quickstep_queryexecution_Worker
quickstep_queryexecution_WorkerDirectory
quickstep_queryexecution_WorkerMessage
+ quickstep_queryexecution_WorkOrderSelectionPolicy
quickstep_queryexecution_WorkerSelectionPolicy)
if (ENABLE_DISTRIBUTED)
target_link_libraries(quickstep_queryexecution
diff --git a/query_execution/QueryManagerSingleNode.cpp b/query_execution/QueryManagerSingleNode.cpp
index 79c4026..f33a501 100644
--- a/query_execution/QueryManagerSingleNode.cpp
+++ b/query_execution/QueryManagerSingleNode.cpp
@@ -72,48 +72,19 @@
WorkerMessage* QueryManagerSingleNode::getNextWorkerMessage(
const dag_node_index start_operator_index, const numa_node_id numa_node) {
- // Default policy: Operator with lowest index first.
- WorkOrder *work_order = nullptr;
- size_t num_operators_checked = 0;
- for (dag_node_index index = start_operator_index;
- num_operators_checked < num_operators_in_dag_;
- index = (index + 1) % num_operators_in_dag_, ++num_operators_checked) {
- if (query_exec_state_->hasExecutionFinished(index)) {
- continue;
- }
- if (numa_node != kAnyNUMANodeID) {
- // First try to get a normal WorkOrder from the specified NUMA node.
- work_order = workorders_container_->getNormalWorkOrderForNUMANode(index, numa_node);
- if (work_order != nullptr) {
- // A WorkOrder found on the given NUMA node.
- query_exec_state_->incrementNumQueuedWorkOrders(index);
- return WorkerMessage::WorkOrderMessage(work_order, index);
- } else {
- // Normal workorder not found on this node. Look for a rebuild workorder
- // on this NUMA node.
- work_order = workorders_container_->getRebuildWorkOrderForNUMANode(index, numa_node);
- if (work_order != nullptr) {
- return WorkerMessage::RebuildWorkOrderMessage(work_order, index);
- }
- }
- }
- // Either no workorder found on the given NUMA node, or numa_node is
- // 'kAnyNUMANodeID'.
- // Try to get a normal WorkOrder from other NUMA nodes.
- work_order = workorders_container_->getNormalWorkOrder(index);
- if (work_order != nullptr) {
- query_exec_state_->incrementNumQueuedWorkOrders(index);
- return WorkerMessage::WorkOrderMessage(work_order, index);
- } else {
- // Normal WorkOrder not found, look for a RebuildWorkOrder.
- work_order = workorders_container_->getRebuildWorkOrder(index);
- if (work_order != nullptr) {
- return WorkerMessage::RebuildWorkOrderMessage(work_order, index);
- }
- }
+ std::size_t operator_index;
+ bool is_rebuild;
+ WorkOrder *work_order = workorders_container_->getNextWorkOrder(&operator_index, &is_rebuild);
+ if (!work_order) {
+ return nullptr;
}
- // No WorkOrders available right now.
- return nullptr;
+
+ if (is_rebuild) {
+ return WorkerMessage::RebuildWorkOrderMessage(work_order, operator_index);
+ }
+
+ query_exec_state_->incrementNumQueuedWorkOrders(operator_index);
+ return WorkerMessage::WorkOrderMessage(work_order, operator_index);
}
bool QueryManagerSingleNode::fetchNormalWorkOrders(const dag_node_index index) {
diff --git a/query_execution/WorkOrderSelectionPolicy.hpp b/query_execution/WorkOrderSelectionPolicy.hpp
new file mode 100644
index 0000000..6fa6c7d
--- /dev/null
+++ b/query_execution/WorkOrderSelectionPolicy.hpp
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_EXECUTION_WORK_ORDER_SELECTION_POLICY_HPP_
+#define QUICKSTEP_QUERY_EXECUTION_WORK_ORDER_SELECTION_POLICY_HPP_
+
+#include <cstddef>
+#include <stack>
+#include <queue>
+
+#include "utility/Macros.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+
+/** \addtogroup QueryExecution
+ * @{
+ */
+
+/**
+ * @brief Base class for a policy to select work orders for query execution.
+ **/
+class WorkOrderSelectionPolicy {
+ public:
+ /**
+ * @brief Whether there is an available work order for execution.
+ *
+ * @return True if a work order is available. Otherwise, false.
+ **/
+ virtual bool hasWorkOrder() const = 0;
+
+ /**
+ * @brief Add work order.
+ *
+ * @param operator_index The operator index for added work order.
+ **/
+ virtual void addWorkOrder(const std::size_t operator_index) = 0;
+
+ /**
+ * @brief Choose the operator index for next workorder execution based on the policy.
+ *
+ * @return The operator index chosen for next workorder execution.
+ **/
+ virtual std::size_t getOperatorIndexForNextWorkOrder() = 0;
+
+ protected:
+ /**
+ * @brief Constructor.
+ **/
+ WorkOrderSelectionPolicy() {}
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(WorkOrderSelectionPolicy);
+};
+
+/**
+ * @brief Choose the next work order in a first-in-first-out manner.
+ **/
+class FifoWorkOrderSelectionPolicy final : public WorkOrderSelectionPolicy {
+ public:
+ /**
+ * @brief Constructor.
+ **/
+ FifoWorkOrderSelectionPolicy() = default;
+
+ bool hasWorkOrder() const override {
+ return !work_orders_.empty();
+ }
+
+ void addWorkOrder(const std::size_t operator_index) override {
+ work_orders_.push(operator_index);
+ }
+
+ std::size_t getOperatorIndexForNextWorkOrder() override {
+ DCHECK(hasWorkOrder());
+ const std::size_t operator_index = work_orders_.front();
+ work_orders_.pop();
+
+ return operator_index;
+ }
+
+ private:
+ std::queue<std::size_t> work_orders_;
+
+ DISALLOW_COPY_AND_ASSIGN(FifoWorkOrderSelectionPolicy);
+};
+
+/**
+ * @brief Choose the next work order in a last-in-first-out manner.
+ **/
+class LifoWorkOrderSelectionPolicy final : public WorkOrderSelectionPolicy {
+ public:
+ /**
+ * @brief Constructor.
+ **/
+ LifoWorkOrderSelectionPolicy() = default;
+
+ bool hasWorkOrder() const override {
+ return !work_orders_.empty();
+ }
+
+ void addWorkOrder(const std::size_t operator_index) override {
+ work_orders_.push(operator_index);
+ }
+
+ std::size_t getOperatorIndexForNextWorkOrder() override {
+ DCHECK(hasWorkOrder());
+ const std::size_t operator_index = work_orders_.top();
+ work_orders_.pop();
+
+ return operator_index;
+ }
+
+ private:
+ std::stack<std::size_t> work_orders_;
+
+ DISALLOW_COPY_AND_ASSIGN(LifoWorkOrderSelectionPolicy);
+};
+
+/** @} */
+
+} // namespace quickstep
+
+#endif // QUICKSTEP_QUERY_EXECUTION_WORK_ORDER_SELECTION_POLICY_HPP_
diff --git a/query_execution/WorkOrdersContainer.hpp b/query_execution/WorkOrdersContainer.hpp
index 1fb3ca6..e8d5ff8 100644
--- a/query_execution/WorkOrdersContainer.hpp
+++ b/query_execution/WorkOrdersContainer.hpp
@@ -26,6 +26,7 @@
#include <queue>
#include <vector>
+#include "query_execution/WorkOrderSelectionPolicy.hpp"
#include "relational_operators/WorkOrder.hpp"
#include "utility/Macros.hpp"
#include "utility/PtrVector.hpp"
@@ -173,6 +174,22 @@
prefer_single_NUMA_node);
}
+ WorkOrder* getNextWorkOrder(std::size_t *operator_index, bool *is_rebuild) {
+ if (rebuild_work_orders_policy_.hasWorkOrder()) {
+ *operator_index = rebuild_work_orders_policy_.getOperatorIndexForNextWorkOrder();
+ *is_rebuild = true;
+ return rebuild_workorders_[*operator_index].getWorkOrder();
+ }
+
+ if (normal_work_orders_policy_.hasWorkOrder()) {
+ *operator_index = normal_work_orders_policy_.getOperatorIndexForNextWorkOrder();
+ *is_rebuild = false;
+ return normal_workorders_[*operator_index].getWorkOrder();
+ }
+
+ return nullptr;
+ }
+
/**
* @brief Get a rebuild WorkOrder for a given operator whch prefer the
* specified NUMA node.
@@ -227,6 +244,7 @@
DCHECK(workorder != nullptr);
DCHECK_LT(operator_index, num_operators_);
normal_workorders_[operator_index].addWorkOrder(workorder);
+ normal_work_orders_policy_.addWorkOrder(operator_index);
}
/**
@@ -245,6 +263,7 @@
DCHECK(workorder != nullptr);
DCHECK_LT(operator_index, num_operators_);
rebuild_workorders_[operator_index].addWorkOrder(workorder);
+ rebuild_work_orders_policy_.addWorkOrder(operator_index);
}
/**
@@ -518,6 +537,9 @@
PtrVector<OperatorWorkOrdersContainer> normal_workorders_;
PtrVector<OperatorWorkOrdersContainer> rebuild_workorders_;
+ LifoWorkOrderSelectionPolicy normal_work_orders_policy_;
+ FifoWorkOrderSelectionPolicy rebuild_work_orders_policy_;
+
DISALLOW_COPY_AND_ASSIGN(WorkOrdersContainer);
};
/** @} */
diff --git a/query_optimizer/tests/execution_generator/Union.test b/query_optimizer/tests/execution_generator/Union.test
index 833e734..6fbe97b 100644
--- a/query_optimizer/tests/execution_generator/Union.test
+++ b/query_optimizer/tests/execution_generator/Union.test
@@ -24,20 +24,7 @@
+-----------+
|result |
+-----------+
-| -1|
| 2|
-| -3|
-| 4|
-| -5|
-| -7|
-| -9|
-| -11|
-| -13|
-| -15|
-| -17|
-| -19|
-| -21|
-| -23|
| 5|
| 0|
| 7|
@@ -51,6 +38,19 @@
| -16|
| -18|
| -20|
+| -1|
+| -3|
+| 4|
+| -5|
+| -7|
+| -9|
+| -11|
+| -13|
+| -15|
+| -17|
+| -19|
+| -21|
+| -23|
+-----------+
==
@@ -87,14 +87,14 @@
| 1|
| 1.73205078|
| 2.23606801|
-| 2|
-| 1.41421354|
-| 1|
-| 1.73205078|
-| 2.23606801|
| 2.64575124|
| 3|
| 3.31662488|
| 3.60555124|
| 3.87298346|
+| 2|
+| 1.41421354|
+| 1|
+| 1.73205078|
+| 2.23606801|
+---------------+