Updates.
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index b5e07df..78b72af 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -260,6 +260,7 @@
quickstep_relationaloperators_RelationalOperator
quickstep_storage_InsertDestination
quickstep_storage_StorageBlock
+ quickstep_storage_StorageManager
quickstep_utility_DAG
quickstep_utility_Macros
tmb)
diff --git a/query_execution/QueryContext.cpp b/query_execution/QueryContext.cpp
index 068b208..6c5776e 100644
--- a/query_execution/QueryContext.cpp
+++ b/query_execution/QueryContext.cpp
@@ -65,6 +65,10 @@
<< "Attempted to create QueryContext from an invalid proto description:\n"
<< proto.DebugString();
+ for (int i = 0; i < proto.build_hash_operator_ids_size(); ++i) {
+ build_hash_operator_ids_.emplace_back(proto.build_hash_operator_ids(i));
+ }
+
for (int i = 0; i < proto.aggregation_states_size(); ++i) {
aggregation_states_.emplace_back(
AggregationOperationState::ReconstructFromProto(proto.aggregation_states(i),
diff --git a/query_execution/QueryContext.hpp b/query_execution/QueryContext.hpp
index 07297e3..428d526 100644
--- a/query_execution/QueryContext.hpp
+++ b/query_execution/QueryContext.hpp
@@ -135,7 +135,7 @@
/**
* @brief A unique identifier of a relational operator in the DAG.
**/
- typedef std::uint32_t dag_operator_id;
+ typedef std::uint64_t dag_operator_id;
/**
* @brief Constructor.
diff --git a/query_execution/QueryContext.proto b/query_execution/QueryContext.proto
index f8831db..a1dce0a 100644
--- a/query_execution/QueryContext.proto
+++ b/query_execution/QueryContext.proto
@@ -63,5 +63,5 @@
required uint64 query_id = 13;
- repeated uint32 build_hash_operator_ids = 14;
+ repeated uint64 build_hash_operator_ids = 14;
}
diff --git a/query_execution/QueryManagerBase.hpp b/query_execution/QueryManagerBase.hpp
index a274742..77150ad 100644
--- a/query_execution/QueryManagerBase.hpp
+++ b/query_execution/QueryManagerBase.hpp
@@ -173,7 +173,7 @@
*
* @param index The index of the given relational operator in the DAG.
**/
- void markOperatorFinished(const dag_node_index index);
+ virtual void markOperatorFinished(const dag_node_index index);
/**
* @brief Check if all the dependencies of the node at specified index have
diff --git a/query_execution/QueryManagerSingleNode.cpp b/query_execution/QueryManagerSingleNode.cpp
index 71097ab..deff206 100644
--- a/query_execution/QueryManagerSingleNode.cpp
+++ b/query_execution/QueryManagerSingleNode.cpp
@@ -19,6 +19,7 @@
#include "query_execution/QueryManagerSingleNode.hpp"
+#include <algorithm>
#include <cstddef>
#include <memory>
#include <utility>
@@ -31,6 +32,7 @@
#include "relational_operators/RelationalOperator.hpp"
#include "storage/InsertDestination.hpp"
#include "storage/StorageBlock.hpp"
+#include "storage/StorageManager.hpp"
#include "utility/DAG.hpp"
#include "glog/logging.h"
@@ -58,17 +60,9 @@
foreman_client_id_,
bus_)),
workorders_container_(
- new WorkOrdersContainer(num_operators_in_dag_, num_numa_nodes)) {
- // Mark the active operators in the DAG.
- const std::size_t kNumMaxInitialActiveOperators = 2;
- for (dag_node_index index = 0; index < num_operators_in_dag_; ++index) {
- if (checkAllBlockingDependenciesMet(index) && active_operators_.size() < kNumMaxInitialActiveOperators) {
- active_operators_.push_back(index);
- activateOperator(index);
- } else {
- inactive_operators_.push_back(index);
- }
- }
+ new WorkOrdersContainer(num_operators_in_dag_, num_numa_nodes)),
+ estimated_memory_consumption_in_bytes_(0) {
+ updateActiveOperators();
}
WorkerMessage* QueryManagerSingleNode::getNextWorkerMessage(
@@ -77,50 +71,12 @@
if (worker_message != nullptr) {
return worker_message;
}
- // Check the operators that have finished execution.
- std::vector<dag_node_index> execution_finished_operator_indexes;
- for (dag_node_index active_operator_index : active_operators_) {
- if (query_exec_state_->hasExecutionFinished(active_operator_index)) {
- execution_finished_operator_indexes.push_back(active_operator_index);
- }
- }
- // Remove the "done" operators from the list of active operators.
- for (dag_node_index index : execution_finished_operator_indexes) {
- auto iter =
- std::find(active_operators_.begin(), active_operators_.end(), index);
- DCHECK(iter != active_operators_.end());
- active_operators_.erase(iter);
- }
- // Collect the list of "ready to be active" operators.
- std::vector<dag_node_index> ready_to_be_active_operators;
- for (dag_node_index index : inactive_operators_) {
- if (checkAllBlockingDependenciesMet(index)) {
- ready_to_be_active_operators.push_back(index);
- }
- }
- // Move as many inactive operators to active operators, as the size of
- // execution_finished_operator_indexes.
- std::size_t num_operators_activated = 0;
- if (!inactive_operators_.empty() &&
- !execution_finished_operator_indexes.empty() &&
- !ready_to_be_active_operators.empty()) {
- for (; num_operators_activated <
- std::min(execution_finished_operator_indexes.size(),
- inactive_operators_.size());
- ++num_operators_activated) {
- active_operators_.push_back(ready_to_be_active_operators[num_operators_activated]);
- activateOperator(active_operators_.back());
- }
- }
- // Remove the newly activated operators.
- for (std::size_t i = 0; i < num_operators_activated; ++i) {
- auto iter = std::find(inactive_operators_.begin(),
- inactive_operators_.end(),
- ready_to_be_active_operators[i]);
- DCHECK(iter != inactive_operators_.end());
- inactive_operators_.erase(iter);
- }
+ // We didn't find a work order. Try activating new operators.
+ updateActiveOperators();
worker_message = getNextWorkerMessageFromActiveOperators(numa_node);
+ if (worker_message == nullptr) {
+ std::cout << "No workorder even after activating new operators\n";
+ }
return worker_message;
}
@@ -128,36 +84,38 @@
const numa_node_id numa_node) {
WorkOrder *work_order = nullptr;
for (dag_node_index index : active_operators_) {
- if (numa_node != kAnyNUMANodeID) {
- // First try to get a normal WorkOrder from the specified NUMA node.
- work_order = workorders_container_->getNormalWorkOrderForNUMANode(index, numa_node);
+ if (checkAllBlockingDependenciesMet(index)) {
+ if (numa_node != kAnyNUMANodeID) {
+ // First try to get a normal WorkOrder from the specified NUMA node.
+ work_order = workorders_container_->getNormalWorkOrderForNUMANode(index, numa_node);
+ if (work_order != nullptr) {
+ // A WorkOrder found on the given NUMA node.
+ query_exec_state_->incrementNumQueuedWorkOrders(index);
+ return WorkerMessage::WorkOrderMessage(work_order, index);
+ } else {
+ // Normal workorder not found on this node. Look for a rebuild workorder
+ // on this NUMA node.
+ work_order = workorders_container_->getRebuildWorkOrderForNUMANode(index, numa_node);
+ if (work_order != nullptr) {
+ return WorkerMessage::RebuildWorkOrderMessage(work_order, index);
+ }
+ }
+ }
+ // Either no workorder found on the given NUMA node, or numa_node is
+ // 'kAnyNUMANodeID'.
+ // Try to get a normal WorkOrder from other NUMA nodes.
+ work_order = workorders_container_->getNormalWorkOrder(index);
if (work_order != nullptr) {
- // A WorkOrder found on the given NUMA node.
query_exec_state_->incrementNumQueuedWorkOrders(index);
return WorkerMessage::WorkOrderMessage(work_order, index);
} else {
- // Normal workorder not found on this node. Look for a rebuild workorder
- // on this NUMA node.
- work_order = workorders_container_->getRebuildWorkOrderForNUMANode(index, numa_node);
+ // Normal WorkOrder not found, look for a RebuildWorkOrder.
+ work_order = workorders_container_->getRebuildWorkOrder(index);
if (work_order != nullptr) {
return WorkerMessage::RebuildWorkOrderMessage(work_order, index);
}
}
}
- // Either no workorder found on the given NUMA node, or numa_node is
- // 'kAnyNUMANodeID'.
- // Try to get a normal WorkOrder from other NUMA nodes.
- work_order = workorders_container_->getNormalWorkOrder(index);
- if (work_order != nullptr) {
- query_exec_state_->incrementNumQueuedWorkOrders(index);
- return WorkerMessage::WorkOrderMessage(work_order, index);
- } else {
- // Normal WorkOrder not found, look for a RebuildWorkOrder.
- work_order = workorders_container_->getRebuildWorkOrder(index);
- if (work_order != nullptr) {
- return WorkerMessage::RebuildWorkOrderMessage(work_order, index);
- }
- }
}
return nullptr;
}
@@ -171,6 +129,11 @@
if (!checkAllBlockingDependenciesMet(index)) {
return false;
}
+ if (std::find(active_operators_.begin(), active_operators_.end(), index) == active_operators_.end()) {
+ // This operator is not yet active.
+ std::cout << "Operator " << index << " not active yet\n";
+ return false;
+ }
const size_t num_pending_workorders_before =
workorders_container_->getNumNormalWorkOrders(index);
const bool done_generation =
@@ -242,7 +205,7 @@
}
void QueryManagerSingleNode::activateOperator(const dag_node_index index) {
- DCHECK(checkAllBlockingDependenciesMet(index));
+ // DCHECK(checkAllBlockingDependenciesMet(index));
// It is okay to call the line below multiple times.
query_dag_->getNodePayloadMutable(index)->informAllBlockingDependenciesMet();
query_context_->activateOperator(
@@ -250,4 +213,80 @@
processOperator(index, false);
}
+void QueryManagerSingleNode::updateActiveOperators() {
+ // Mark the active operators in the DAG.
+ for (dag_node_index index = 0; index < num_operators_in_dag_; ++index) {
+ if (!query_exec_state_->hasExecutionFinished(index)) {
+ if (!isOperatorActive(index)) {
+ // This is an inactive operator.
+ const std::size_t estimated_memory_for_operator = query_context_->getHashTableSize(index);
+ // NOTE(harshad) - We might get in trouble in the following situation:
+ // If there's a build operator in the leaf of the query plan DAG, that
+ // has huge memory requirements (greater than the threshold). That
+ // operator won't be activated thereby leading the query execution in a
+ // deadlock.
+ if (canActivateOperator(estimated_memory_for_operator)) {
+ estimated_memory_consumption_in_bytes_ += estimated_memory_for_operator;
+ active_operators_.push_back(index);
+ activateOperator(index);
+ if (estimated_memory_for_operator > 0) {
+ std::cout << "Activated operator: " << index << " Total active: " << active_operators_.size() << "\n";
+ }
+ } else {
+ std::cout << "operator: " << index << " not activated Total active: " << active_operators_.size() << "\n";
+ }
+ }
+ }
+ }
+}
+
+bool QueryManagerSingleNode::isOperatorActive(const dag_node_index index) const {
+ auto iter = std::find(active_operators_.begin(), active_operators_.end(), index);
+ return iter != active_operators_.end();
+}
+
+bool QueryManagerSingleNode::canActivateOperator(const std::size_t estimated_memory_for_operator) const {
+ const std::size_t total_memory_consumption = estimated_memory_for_operator + estimated_memory_consumption_in_bytes_;
+ const std::size_t estimated_num_slots = StorageManager::SlotsNeededForBytes(total_memory_consumption);
+ const std::size_t threshold_num_slots = kAdmissionNumSlotsThreshold * storage_manager_->getMaxBufferPoolSlots();
+ if (estimated_memory_for_operator > 0) {
+ std::cout << "Memory estimate: " << estimated_memory_for_operator;
+ std::cout << " estimated slots: " << estimated_num_slots << " threshold: " << threshold_num_slots << "\n";
+ }
+ return estimated_num_slots < threshold_num_slots;
+}
+
+void QueryManagerSingleNode::markOperatorFinished(const dag_node_index index) {
+ query_exec_state_->setExecutionFinished(index);
+
+ RelationalOperator *op = query_dag_->getNodePayloadMutable(index);
+ op->updateCatalogOnCompletion();
+
+ const relation_id output_rel = op->getOutputRelationID();
+ for (const std::pair<dag_node_index, bool> &dependent_link : query_dag_->getDependents(index)) {
+ const dag_node_index dependent_op_index = dependent_link.first;
+ RelationalOperator *dependent_op = query_dag_->getNodePayloadMutable(dependent_op_index);
+ // Signal dependent operator that current operator is done feeding input blocks.
+ if (output_rel >= 0) {
+ dependent_op->doneFeedingInputBlocks(output_rel);
+ }
+ if (checkAllBlockingDependenciesMet(dependent_op_index)) {
+ dependent_op->informAllBlockingDependenciesMet();
+ }
+ }
+ // Reduce the estimate for this operator.
+ const std::size_t estimated_memory_for_operator = query_context_->getHashTableSize(index);
+ estimated_memory_consumption_in_bytes_ -= estimated_memory_for_operator;
+ std::cout << "Operator " << index << " over, decremented: " << estimated_memory_for_operator << " bytes Total active: " << active_operators_.size();
+ std::cout << " Current slots: " << StorageManager::SlotsNeededForBytes(estimated_memory_consumption_in_bytes_) << "\n";
+ // Remove this operator from the active operators list.
+ auto iter =
+ std::find(active_operators_.begin(), active_operators_.end(), index);
+ if (iter != active_operators_.end()) {
+ active_operators_.erase(iter);
+ } else {
+ DLOG(WARNING) << "Marking operator " << index << " as finished which wasn't active";
+ }
+}
+
} // namespace quickstep
diff --git a/query_execution/QueryManagerSingleNode.hpp b/query_execution/QueryManagerSingleNode.hpp
index 8e4a929..9ee6a63 100644
--- a/query_execution/QueryManagerSingleNode.hpp
+++ b/query_execution/QueryManagerSingleNode.hpp
@@ -95,7 +95,11 @@
return query_context_.get();
}
+ void markOperatorFinished(const dag_node_index index) override;
+
private:
+ static constexpr float kAdmissionNumSlotsThreshold = 0.25;
+
bool checkNormalExecutionOver(const dag_node_index index) const override {
return (checkAllDependenciesMet(index) &&
!workorders_container_->hasNormalWorkOrder(index) &&
@@ -129,6 +133,12 @@
WorkerMessage *getNextWorkerMessageFromActiveOperators(
const numa_node_id numa_node);
+ bool isOperatorActive(const dag_node_index index) const;
+
+ bool canActivateOperator(const std::size_t estimated_memory_for_operator) const;
+
+ void updateActiveOperators();
+
const tmb::client_id foreman_client_id_;
StorageManager *storage_manager_;
@@ -142,6 +152,8 @@
std::vector<dag_node_index> inactive_operators_;
+ std::size_t estimated_memory_consumption_in_bytes_;
+
DISALLOW_COPY_AND_ASSIGN(QueryManagerSingleNode);
};
diff --git a/relational_operators/SortMergeRunOperator.cpp b/relational_operators/SortMergeRunOperator.cpp
index 7de17ab..0ba5115 100644
--- a/relational_operators/SortMergeRunOperator.cpp
+++ b/relational_operators/SortMergeRunOperator.cpp
@@ -50,50 +50,13 @@
StorageManager *storage_manager,
const tmb::client_id scheduler_client_id,
tmb::MessageBus *bus) {
- if (input_relation_is_stored_) {
- // Input blocks (or runs) are from base relation. Only possible when base
- // relation is stored sorted.
- if (!started_) {
- // Initialize merge tree completely, since all input runs are known.
- merge_tree_.initializeTree(input_relation_block_ids_.size());
- started_ = true;
- initializeInputRuns();
- }
- } else {
- // Input blocks (or runs) are pipelined from the sorted run generation
- // operator.
- if (!started_ && !input_stream_done_) {
- // Initialize merge tree for first pipeline mode.
- merge_tree_.initializeForPipeline();
- started_ = true;
- initializeInputRuns();
- }
- }
+ initializeOperatorState();
// Generate runs from merge tree.
return generateWorkOrders(container, query_context, storage_manager, scheduler_client_id, bus);
}
bool SortMergeRunOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
- if (input_relation_is_stored_) {
- // Input blocks (or runs) are from base relation. Only possible when base
- // relation is stored sorted.
- if (!started_) {
- // Initialize merge tree completely, since all input runs are known.
- merge_tree_.initializeTree(input_relation_block_ids_.size());
- started_ = true;
- initializeInputRuns();
- }
- } else {
- // Input blocks (or runs) are pipelined from the sorted run generation
- // operator.
- if (!started_ && !input_stream_done_) {
- // Initialize merge tree for first pipeline mode.
- merge_tree_.initializeForPipeline();
- started_ = true;
- initializeInputRuns();
- }
- }
-
+ initializeOperatorState();
// Get merge jobs from merge tree.
std::vector<MergeTree::MergeJob> jobs;
const bool done_generating = merge_tree_.getMergeJobs(&jobs);
@@ -216,6 +179,8 @@
return;
}
+ initializeOperatorState();
+
// Now we know all the input blocks; compute the merge tree.
merge_tree_.initializeTree(input_relation_block_ids_.size());
@@ -227,6 +192,28 @@
merge_tree_.checkAndFixFinalMerge();
}
+void SortMergeRunOperator::initializeOperatorState() {
+ if (input_relation_is_stored_) {
+ // Input blocks (or runs) are from base relation. Only possible when base
+ // relation is stored sorted.
+ if (!started_) {
+ // Initialize merge tree completely, since all input runs are known.
+ merge_tree_.initializeTree(input_relation_block_ids_.size());
+ started_ = true;
+ initializeInputRuns();
+ }
+ } else {
+ // Input blocks (or runs) are pipelined from the sorted run generation
+ // operator.
+ if (!started_ && !input_stream_done_) {
+ // Initialize merge tree for first pipeline mode.
+ merge_tree_.initializeForPipeline();
+ started_ = true;
+ initializeInputRuns();
+ }
+ }
+}
+
namespace {
/**
diff --git a/relational_operators/SortMergeRunOperator.hpp b/relational_operators/SortMergeRunOperator.hpp
index 0839320..8674fb9 100644
--- a/relational_operators/SortMergeRunOperator.hpp
+++ b/relational_operators/SortMergeRunOperator.hpp
@@ -201,6 +201,15 @@
**/
serialization::WorkOrder* createWorkOrderProto(merge_run_operator::MergeTree::MergeJob *job);
+ /**
+ * @brief Initialize the operator's internal state.
+ *
+ * @note This function can be called a number of times, but the changes will
+ * only be in effect because of the first call. Subsequent calls will
+ * not do anything.
+ **/
+ void initializeOperatorState();
+
const CatalogRelation &input_relation_;
const CatalogRelation &output_relation_;
diff --git a/storage/HashTableKeyManager.hpp b/storage/HashTableKeyManager.hpp
index c902cbe..8852b00 100644
--- a/storage/HashTableKeyManager.hpp
+++ b/storage/HashTableKeyManager.hpp
@@ -78,7 +78,7 @@
const std::size_t key_start_in_bucket,
std::vector<std::size_t> *key_offsets) {
DCHECK(!key_types.empty());
- DCHECK_NOTNULL(key_offsets);
+ DCHECK(key_offsets != nullptr);
std::size_t fixed_key_size = 0;
for (const Type *subkey_type : key_types) {