Assign equal probability to all the active operators.
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index 04a0348..50fd14b 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -65,6 +65,7 @@
quickstep_threading_ThreadUtil
quickstep_utility_DAG
quickstep_utility_Macros
+ quickstep_utility_StringUtil
tmb)
target_link_libraries(quickstep_queryexecution_ForemanLite
glog
diff --git a/query_execution/Foreman.cpp b/query_execution/Foreman.cpp
index 304c429..35eceea 100644
--- a/query_execution/Foreman.cpp
+++ b/query_execution/Foreman.cpp
@@ -17,6 +17,7 @@
#include "query_execution/Foreman.hpp"
+#include <algorithm>
#include <cstddef>
#include <memory>
#include <utility>
@@ -317,6 +318,13 @@
}
}
}
+ // Loop again to populate the active_operators_ vector.
+ for (dag_node_index node_index = 0; node_index < dag_size; ++node_index) {
+ if (checkAllBlockingDependenciesMet(node_index)) {
+ // Add to active_operators_.
+ active_operators_.emplace_back(node_index);
+ }
+ }
}
// TODO(harshad) : The default policy may execute remote WorkOrders for an
@@ -326,8 +334,48 @@
WorkerMessage* Foreman::getNextWorkerMessage(
const dag_node_index start_operator_index, const int numa_node) {
// Default policy: Operator with lowest index first.
+ updateProbabilities();
+ // We try few times.
WorkOrder *work_order = nullptr;
- size_t num_operators_checked = 0;
+ //while (work_order == nullptr) {
+ // std::cout << "Look for workorder active operator count: " << operator_probabilities_.size() << "\n";
+ int next_operator_index = chooseOperator();
+ if (next_operator_index == -1) {
+ return nullptr;
+ }
+ if (numa_node != -1) {
+ work_order = workorders_container_->getNormalWorkOrderForNUMANode(next_operator_index, numa_node);
+ if (work_order != nullptr) {
+ // A WorkOrder found on the given NUMA node.
+ query_exec_state_->incrementNumQueuedWorkOrders(next_operator_index);
+ return WorkerMessage::WorkOrderMessage(work_order, next_operator_index);
+ } else {
+ // Normal workorder not found on this node. Look for a rebuild workorder
+ // on this NUMA node.
+ work_order = workorders_container_->getRebuildWorkOrderForNUMANode(next_operator_index, numa_node);
+ if (work_order != nullptr) {
+ return WorkerMessage::RebuildWorkOrderMessage(work_order, next_operator_index);
+ }
+ }
+ }
+ // Either no workorder found on the given NUMA node, or numa_node is -1.
+ // Try to get a normal WorkOrder from other NUMA nodes.
+ work_order = workorders_container_->getNormalWorkOrder(next_operator_index);
+ if (work_order != nullptr) {
+ /*if (!hasOperatorStarted(next_operator_index)) {
+ operator_start_timestamp_[next_operator_index] = std::chrono::steady_clock::now();
+ }*/
+ query_exec_state_->incrementNumQueuedWorkOrders(next_operator_index);
+ return WorkerMessage::WorkOrderMessage(work_order, next_operator_index);
+ } else {
+ // Normal WorkOrder not found, look for a RebuildWorkOrder.
+ work_order = workorders_container_->getRebuildWorkOrder(next_operator_index);
+ if (work_order != nullptr) {
+ return WorkerMessage::RebuildWorkOrderMessage(work_order, next_operator_index);
+ }
+ }
+ // }
+ /*size_t num_operators_checked = 0;
for (dag_node_index index = start_operator_index;
num_operators_checked < query_dag_->size();
index = (index + 1) % query_dag_->size(), ++num_operators_checked) {
@@ -354,6 +402,9 @@
// Try to get a normal WorkOrder from other NUMA nodes.
work_order = workorders_container_->getNormalWorkOrder(index);
if (work_order != nullptr) {
+ if (!hasOperatorStarted(index)) {
+ operator_start_timestamp_[index] = std::chrono::steady_clock::now();
+ }
query_exec_state_->incrementNumQueuedWorkOrders(index);
return WorkerMessage::WorkOrderMessage(work_order, index);
} else {
@@ -363,7 +414,7 @@
return WorkerMessage::RebuildWorkOrderMessage(work_order, index);
}
}
- }
+ }*/
// No WorkOrders available right now.
return nullptr;
}
@@ -423,6 +474,13 @@
(num_pending_workorders_before <
workorders_container_->getNumNormalWorkOrders(index));
}
+ if (generated_new_workorders) {
+ if (std::find(active_operators_.begin(), active_operators_.end(), index) == active_operators_.end()) {
+ // std::cout << "Added operator " << index << " in processOperator()\n";
+ active_operators_.emplace_back(index);
+ updateProbabilities();
+ }
+ }
return generated_new_workorders;
}
@@ -467,6 +525,14 @@
}
void Foreman::markOperatorFinished(const dag_node_index index) {
+ // std::cout << "Operator " << index << "finished\n";
+ // operator_duration_[index] = std::chrono::steady_clock::now() - operator_start_timestamp_[index];
+ // Remove this operator.
+ active_operators_.erase(std::remove(active_operators_.begin(), active_operators_.end(), index), active_operators_.end());
+ // Add to finished operators.
+ // finished_operators_.emplace_back(index);
+ updateProbabilities();
+
query_exec_state_->setExecutionFinished(index);
RelationalOperator *op = query_dag_->getNodePayloadMutable(index);
@@ -496,6 +562,11 @@
query_exec_state_->setRebuildStatus(
index, workorders_container_->getNumRebuildWorkOrders(index), true);
+ if (query_exec_state_->getNumRebuildWorkOrders(index) > 0) {
+ if (std::find(active_operators_.begin(), active_operators_.end(), index) == active_operators_.end()) {
+ active_operators_.emplace_back(index);
+ }
+ }
return (query_exec_state_->getNumRebuildWorkOrders(index) == 0);
}
@@ -528,4 +599,87 @@
}
}
+void Foreman::updateProbabilities() {
+ // std::unordered_map<dag_node_index, std::size_t> num_workorders;
+ std::vector<std::pair<double, dag_node_index>> operator_probabilities;
+ /*std::size_t total_num_workorders = 0;
+ for (dag_node_index active_op_index : active_operators_) {
+ if (checkAllBlockingDependenciesMet(active_op_index)) {
+ std::size_t num_workorders_for_op = workorders_container_->getNumNormalWorkOrders(active_op_index);
+ num_workorders_for_op += workorders_container_->getNumRebuildWorkOrders(active_op_index);
+ if (num_workorders_for_op > 0) {
+ num_workorders[active_op_index] = num_workorders_for_op;
+ total_num_workorders += num_workorders_for_op;
+ }
+ }
+ }*/
+ std::unordered_map<dag_node_index, bool> schedulable_operators;
+ for (dag_node_index active_op_index : active_operators_) {
+ if (checkAllBlockingDependenciesMet(active_op_index)) {
+ schedulable_operators[active_op_index] = true;
+ }
+ }
+ /*if (total_num_workorders == 0) {
+ operator_probabilities_.swap(operator_probabilities);
+ return;
+ }*/
+ if (schedulable_operators.empty()) {
+ operator_probabilities_.swap(operator_probabilities);
+ return;
+ }
+ std::size_t last_operator_index = 0;
+ // if (num_workorders.size() == 1u) {
+ if (schedulable_operators.size() == 1u) {
+ // Only one operator is active.
+ // last_operator_index = num_workorders.begin()->first;
+ last_operator_index = schedulable_operators.begin()->first;
+ operator_probabilities.emplace_back(1.0, last_operator_index);
+ } else {
+ // More than one active operators.
+ double cumulative_probability = 0.0;
+ const double individual_probability = 1 / static_cast<double>(schedulable_operators.size());
+ /*for (auto it = num_workorders.begin(); it != num_workorders.end(); ++it) {
+ const double individual_probability = it->second / static_cast<double>(total_num_workorders);
+ cumulative_probability += individual_probability;
+ operator_probabilities.emplace_back(cumulative_probability, it->first);
+ last_operator_index = it->first;
+ }*/
+ for (auto it = schedulable_operators.begin(); it != schedulable_operators.end(); ++it) {
+ cumulative_probability += individual_probability;
+ operator_probabilities.emplace_back(cumulative_probability, it->first);
+ last_operator_index = it->first;
+ }
+ DCHECK(!operator_probabilities.empty());
+ operator_probabilities.back().first = 1.0;
+ operator_probabilities.back().second = last_operator_index;
+ }
+ // Round off the cumulative probability for the last element.
+ operator_probabilities_.swap(operator_probabilities);
+}
+
+int Foreman::chooseOperator() {
+ if (operator_probabilities_.empty()) {
+ // std::cout << "No operator right now\n";
+ return -1;
+ } else if (operator_probabilities_.size() == 1u) {
+ int operator_index = static_cast<int>(operator_probabilities_.front().second);
+ if (workorders_container_->hasNormalWorkOrder(operator_index) || workorders_container_->hasRebuildWorkOrder(operator_index)) {
+ // std::cout << "Single operator: " << operator_index << "\n";
+ return operator_index;
+ } else {
+ // std::cout << "Single operator: " << operator_index << " but no workorder\n";
+ return -1;
+ }
+ } else {
+ std::uniform_real_distribution<double> dist(0.0, 1.0);
+ double chosen_probability = dist(mt_);
+ std::pair<double, std::size_t> search_key = std::make_pair(chosen_probability, 0);
+ auto chosen_operator_it = std::upper_bound(
+ operator_probabilities_.begin(), operator_probabilities_.end(), search_key);
+ DCHECK(chosen_operator_it != operator_probabilities_.end());
+ // std::cout << "Operator: " << chosen_operator_it->second << "\n";
+ return static_cast<int>(chosen_operator_it->second);
+ }
+}
+
} // namespace quickstep
diff --git a/query_execution/Foreman.hpp b/query_execution/Foreman.hpp
index 2d6e0d3..39852f8 100644
--- a/query_execution/Foreman.hpp
+++ b/query_execution/Foreman.hpp
@@ -18,8 +18,11 @@
#ifndef QUICKSTEP_QUERY_EXECUTION_FOREMAN_HPP_
#define QUICKSTEP_QUERY_EXECUTION_FOREMAN_HPP_
+#include <chrono>
#include <cstddef>
#include <memory>
+#include <random>
+#include <unordered_map>
#include <vector>
#include "catalog/CatalogTypedefs.hpp"
@@ -34,6 +37,7 @@
#include "storage/StorageBlockInfo.hpp"
#include "utility/DAG.hpp"
#include "utility/Macros.hpp"
+#include "utility/StringUtil.hpp"
#include "glog/logging.h"
#include "gtest/gtest_prod.h"
@@ -80,7 +84,8 @@
catalog_database_(DCHECK_NOTNULL(catalog_database)),
storage_manager_(DCHECK_NOTNULL(storage_manager)),
max_msgs_per_worker_(1),
- num_numa_nodes_(num_numa_nodes) {
+ num_numa_nodes_(num_numa_nodes),
+ mt_(std::random_device()()) {
bus_->RegisterClientAsSender(foreman_client_id_, kWorkOrderMessage);
bus_->RegisterClientAsSender(foreman_client_id_, kRebuildWorkOrderMessage);
// NOTE : Foreman thread sends poison messages in the optimizer's
@@ -271,6 +276,9 @@
void cleanUp() {
output_consumers_.clear();
blocking_dependencies_.clear();
+ /*for (auto it = operator_duration_.begin(); it != operator_duration_.end(); ++it) {
+ std::cout << "Op: " << it->first << " Time: " << DoubleToStringWithSignificantDigits(it->second.count(), 3) << "\n";
+ }*/
}
/**
@@ -429,6 +437,17 @@
**/
void getRebuildWorkOrders(const dag_node_index index, WorkOrdersContainer *container);
+ /*bool hasOperatorStarted(const dag_node_index index) const {
+ if (operator_start_timestamp_.find(index) != operator_start_timestamp_.end()) {
+ return true;
+ }
+ return false;
+ }*/
+
+ void updateProbabilities();
+
+ int chooseOperator();
+
CatalogDatabaseLite *catalog_database_;
StorageManager *storage_manager_;
@@ -454,6 +473,19 @@
WorkerDirectory *workers_;
+ // A vector of IDs of the schedulable operators.
+ // Note, for simplicity, in this list there could be operators for which no
+ // work order has been scheduled yet.
+ std::vector<dag_node_index> active_operators_;
+ // Operators which have finished the execution.
+ // std::vector<dag_node_index> finished_operators_;
+
+ std::vector<std::pair<double, dag_node_index>> operator_probabilities_;
+
+ /*std::unordered_map<dag_node_index, std::chrono::duration<double, std::milli>> operator_duration_;
+ std::unordered_map<dag_node_index, std::chrono::time_point<std::chrono::steady_clock>> operator_start_timestamp_;*/
+
+ std::mt19937_64 mt_;
friend class ForemanTest;
FRIEND_TEST(ForemanTest, TwoNodesDAGPartiallyFilledBlocksTest);