Updates.
diff --git a/query_execution/CMakeLists.txt b/query_execution/CMakeLists.txt
index b5e07df..78b72af 100644
--- a/query_execution/CMakeLists.txt
+++ b/query_execution/CMakeLists.txt
@@ -260,6 +260,7 @@
                       quickstep_relationaloperators_RelationalOperator
                       quickstep_storage_InsertDestination
                       quickstep_storage_StorageBlock
+                      quickstep_storage_StorageManager
                       quickstep_utility_DAG
                       quickstep_utility_Macros
                       tmb)
diff --git a/query_execution/QueryContext.cpp b/query_execution/QueryContext.cpp
index 068b208..6c5776e 100644
--- a/query_execution/QueryContext.cpp
+++ b/query_execution/QueryContext.cpp
@@ -65,6 +65,10 @@
       << "Attempted to create QueryContext from an invalid proto description:\n"
       << proto.DebugString();
 
+  for (int i = 0; i < proto.build_hash_operator_ids_size(); ++i) {
+    build_hash_operator_ids_.emplace_back(proto.build_hash_operator_ids(i));
+  }
+
   for (int i = 0; i < proto.aggregation_states_size(); ++i) {
     aggregation_states_.emplace_back(
         AggregationOperationState::ReconstructFromProto(proto.aggregation_states(i),
diff --git a/query_execution/QueryContext.hpp b/query_execution/QueryContext.hpp
index 07297e3..428d526 100644
--- a/query_execution/QueryContext.hpp
+++ b/query_execution/QueryContext.hpp
@@ -135,7 +135,7 @@
   /**
    * @brief A unique identifier of a relational operator in the DAG.
    **/
-  typedef std::uint32_t dag_operator_id;
+  typedef std::uint64_t dag_operator_id;
 
   /**
    * @brief Constructor.
diff --git a/query_execution/QueryContext.proto b/query_execution/QueryContext.proto
index f8831db..a1dce0a 100644
--- a/query_execution/QueryContext.proto
+++ b/query_execution/QueryContext.proto
@@ -63,5 +63,5 @@
 
   required uint64 query_id = 13;
 
-  repeated uint32 build_hash_operator_ids = 14;
+  repeated uint64 build_hash_operator_ids = 14;
 }
diff --git a/query_execution/QueryManagerBase.hpp b/query_execution/QueryManagerBase.hpp
index a274742..77150ad 100644
--- a/query_execution/QueryManagerBase.hpp
+++ b/query_execution/QueryManagerBase.hpp
@@ -173,7 +173,7 @@
    *
    * @param index The index of the given relational operator in the DAG.
    **/
-  void markOperatorFinished(const dag_node_index index);
+  virtual void markOperatorFinished(const dag_node_index index);
 
   /**
    * @brief Check if all the dependencies of the node at specified index have
diff --git a/query_execution/QueryManagerSingleNode.cpp b/query_execution/QueryManagerSingleNode.cpp
index 71097ab..deff206 100644
--- a/query_execution/QueryManagerSingleNode.cpp
+++ b/query_execution/QueryManagerSingleNode.cpp
@@ -19,6 +19,7 @@
 
 #include "query_execution/QueryManagerSingleNode.hpp"
 
+#include <algorithm>
 #include <cstddef>
 #include <memory>
 #include <utility>
@@ -31,6 +32,7 @@
 #include "relational_operators/RelationalOperator.hpp"
 #include "storage/InsertDestination.hpp"
 #include "storage/StorageBlock.hpp"
+#include "storage/StorageManager.hpp"
 #include "utility/DAG.hpp"
 
 #include "glog/logging.h"
@@ -58,17 +60,9 @@
                                       foreman_client_id_,
                                       bus_)),
       workorders_container_(
-          new WorkOrdersContainer(num_operators_in_dag_, num_numa_nodes)) {
-  // Mark the active operators in the DAG.
-  const std::size_t kNumMaxInitialActiveOperators = 2;
-  for (dag_node_index index = 0; index < num_operators_in_dag_; ++index) {
-    if (checkAllBlockingDependenciesMet(index) && active_operators_.size() < kNumMaxInitialActiveOperators) {
-      active_operators_.push_back(index);
-      activateOperator(index);
-    } else {
-      inactive_operators_.push_back(index);
-    }
-  }
+          new WorkOrdersContainer(num_operators_in_dag_, num_numa_nodes)),
+      estimated_memory_consumption_in_bytes_(0) {
+  updateActiveOperators();
 }
 
 WorkerMessage* QueryManagerSingleNode::getNextWorkerMessage(
@@ -77,50 +71,12 @@
   if (worker_message != nullptr) {
     return worker_message;
   }
-  // Check the operators that have finished execution.
-  std::vector<dag_node_index> execution_finished_operator_indexes;
-  for (dag_node_index active_operator_index : active_operators_) {
-    if (query_exec_state_->hasExecutionFinished(active_operator_index)) {
-      execution_finished_operator_indexes.push_back(active_operator_index);
-    }
-  }
-  // Remove the "done" operators from the list of active operators.
-  for (dag_node_index index : execution_finished_operator_indexes) {
-    auto iter =
-        std::find(active_operators_.begin(), active_operators_.end(), index);
-    DCHECK(iter != active_operators_.end());
-    active_operators_.erase(iter);
-  }
-  // Collect the list of "ready to be active" operators.
-  std::vector<dag_node_index> ready_to_be_active_operators;
-  for (dag_node_index index : inactive_operators_) {
-    if (checkAllBlockingDependenciesMet(index)) {
-      ready_to_be_active_operators.push_back(index);
-    }
-  }
-  // Move as many inactive operators to active operators, as the size of
-  // execution_finished_operator_indexes.
-  std::size_t num_operators_activated = 0;
-  if (!inactive_operators_.empty() &&
-      !execution_finished_operator_indexes.empty() &&
-      !ready_to_be_active_operators.empty()) {
-    for (; num_operators_activated <
-           std::min(execution_finished_operator_indexes.size(),
-                    inactive_operators_.size());
-         ++num_operators_activated) {
-      active_operators_.push_back(ready_to_be_active_operators[num_operators_activated]);
-      activateOperator(active_operators_.back());
-    }
-  }
-  // Remove the newly activated operators.
-  for (std::size_t i = 0; i < num_operators_activated; ++i) {
-    auto iter = std::find(inactive_operators_.begin(),
-                          inactive_operators_.end(),
-                          ready_to_be_active_operators[i]);
-    DCHECK(iter != inactive_operators_.end());
-    inactive_operators_.erase(iter);
-  }
+  // We didn't find a work order. Try activating new operators.
+  updateActiveOperators();
   worker_message = getNextWorkerMessageFromActiveOperators(numa_node);
+  if (worker_message == nullptr) {
+    std::cout << "No workorder even after activating new operators\n";
+  }
   return worker_message;
 }
 
@@ -128,36 +84,38 @@
     const numa_node_id numa_node) {
   WorkOrder *work_order = nullptr;
   for (dag_node_index index : active_operators_) {
-    if (numa_node != kAnyNUMANodeID) {
-      // First try to get a normal WorkOrder from the specified NUMA node.
-      work_order = workorders_container_->getNormalWorkOrderForNUMANode(index, numa_node);
+    if (checkAllBlockingDependenciesMet(index)) {
+      if (numa_node != kAnyNUMANodeID) {
+        // First try to get a normal WorkOrder from the specified NUMA node.
+        work_order = workorders_container_->getNormalWorkOrderForNUMANode(index, numa_node);
+        if (work_order != nullptr) {
+          // A WorkOrder found on the given NUMA node.
+          query_exec_state_->incrementNumQueuedWorkOrders(index);
+          return WorkerMessage::WorkOrderMessage(work_order, index);
+        } else {
+          // Normal workorder not found on this node. Look for a rebuild workorder
+          // on this NUMA node.
+          work_order = workorders_container_->getRebuildWorkOrderForNUMANode(index, numa_node);
+          if (work_order != nullptr) {
+            return WorkerMessage::RebuildWorkOrderMessage(work_order, index);
+          }
+        }
+      }
+      // Either no workorder found on the given NUMA node, or numa_node is
+      // 'kAnyNUMANodeID'.
+      // Try to get a normal WorkOrder from other NUMA nodes.
+      work_order = workorders_container_->getNormalWorkOrder(index);
       if (work_order != nullptr) {
-        // A WorkOrder found on the given NUMA node.
         query_exec_state_->incrementNumQueuedWorkOrders(index);
         return WorkerMessage::WorkOrderMessage(work_order, index);
       } else {
-        // Normal workorder not found on this node. Look for a rebuild workorder
-        // on this NUMA node.
-        work_order = workorders_container_->getRebuildWorkOrderForNUMANode(index, numa_node);
+        // Normal WorkOrder not found, look for a RebuildWorkOrder.
+        work_order = workorders_container_->getRebuildWorkOrder(index);
         if (work_order != nullptr) {
           return WorkerMessage::RebuildWorkOrderMessage(work_order, index);
         }
       }
     }
-    // Either no workorder found on the given NUMA node, or numa_node is
-    // 'kAnyNUMANodeID'.
-    // Try to get a normal WorkOrder from other NUMA nodes.
-    work_order = workorders_container_->getNormalWorkOrder(index);
-    if (work_order != nullptr) {
-      query_exec_state_->incrementNumQueuedWorkOrders(index);
-      return WorkerMessage::WorkOrderMessage(work_order, index);
-    } else {
-      // Normal WorkOrder not found, look for a RebuildWorkOrder.
-      work_order = workorders_container_->getRebuildWorkOrder(index);
-      if (work_order != nullptr) {
-        return WorkerMessage::RebuildWorkOrderMessage(work_order, index);
-      }
-    }
   }
   return nullptr;
 }
@@ -171,6 +129,11 @@
     if (!checkAllBlockingDependenciesMet(index)) {
       return false;
     }
+    if (std::find(active_operators_.begin(), active_operators_.end(), index) == active_operators_.end()) {
+      // This operator is not yet active.
+      std::cout << "Operator " << index << " not active yet\n";
+      return false;
+    }
     const size_t num_pending_workorders_before =
         workorders_container_->getNumNormalWorkOrders(index);
     const bool done_generation =
@@ -242,7 +205,7 @@
 }
 
 void QueryManagerSingleNode::activateOperator(const dag_node_index index) {
-  DCHECK(checkAllBlockingDependenciesMet(index));
+  // DCHECK(checkAllBlockingDependenciesMet(index));
   // It is okay to call the line below multiple times.
   query_dag_->getNodePayloadMutable(index)->informAllBlockingDependenciesMet();
   query_context_->activateOperator(
@@ -250,4 +213,80 @@
   processOperator(index, false);
 }
 
+void QueryManagerSingleNode::updateActiveOperators() {
+  // Mark the active operators in the DAG.
+  for (dag_node_index index = 0; index < num_operators_in_dag_; ++index) {
+    if (!query_exec_state_->hasExecutionFinished(index)) {
+      if (!isOperatorActive(index)) {
+        // This is an inactive operator.
+        const std::size_t estimated_memory_for_operator = query_context_->getHashTableSize(index);
+        // NOTE(harshad) - We might get in trouble in the following situation:
+        // If there's a build operator in the leaf of the query plan DAG, that
+        // has huge memory requirements (greater than the threshold). That
+        // operator won't be activated thereby leading the query execution in a
+        // deadlock.
+        if (canActivateOperator(estimated_memory_for_operator)) {
+          estimated_memory_consumption_in_bytes_ += estimated_memory_for_operator;
+          active_operators_.push_back(index);
+          activateOperator(index);
+          if (estimated_memory_for_operator > 0) {
+            std::cout << "Activated operator: " << index << " Total active: " << active_operators_.size() << "\n";
+          }
+        } else {
+          std::cout << "operator: " << index << " not activated Total active: " << active_operators_.size() << "\n";
+        }
+      }
+    }
+  }
+}
+
+bool QueryManagerSingleNode::isOperatorActive(const dag_node_index index) const {
+  auto iter = std::find(active_operators_.begin(), active_operators_.end(), index);
+  return iter != active_operators_.end();
+}
+
+bool QueryManagerSingleNode::canActivateOperator(const std::size_t estimated_memory_for_operator) const {
+  const std::size_t total_memory_consumption = estimated_memory_for_operator + estimated_memory_consumption_in_bytes_;
+  const std::size_t estimated_num_slots = StorageManager::SlotsNeededForBytes(total_memory_consumption);
+  const std::size_t threshold_num_slots = kAdmissionNumSlotsThreshold * storage_manager_->getMaxBufferPoolSlots();
+  if (estimated_memory_for_operator > 0) {
+    std::cout << "Memory estimate: " << estimated_memory_for_operator;
+    std::cout << " estimated slots: " << estimated_num_slots << " threshold: " << threshold_num_slots << "\n";
+  }
+  return estimated_num_slots < threshold_num_slots;
+}
+
+void QueryManagerSingleNode::markOperatorFinished(const dag_node_index index) {
+  query_exec_state_->setExecutionFinished(index);
+
+  RelationalOperator *op = query_dag_->getNodePayloadMutable(index);
+  op->updateCatalogOnCompletion();
+
+  const relation_id output_rel = op->getOutputRelationID();
+  for (const std::pair<dag_node_index, bool> &dependent_link : query_dag_->getDependents(index)) {
+    const dag_node_index dependent_op_index = dependent_link.first;
+    RelationalOperator *dependent_op = query_dag_->getNodePayloadMutable(dependent_op_index);
+    // Signal dependent operator that current operator is done feeding input blocks.
+    if (output_rel >= 0) {
+      dependent_op->doneFeedingInputBlocks(output_rel);
+    }
+    if (checkAllBlockingDependenciesMet(dependent_op_index)) {
+      dependent_op->informAllBlockingDependenciesMet();
+    }
+  }
+  // Reduce the estimate for this operator.
+  const std::size_t estimated_memory_for_operator = query_context_->getHashTableSize(index);
+  estimated_memory_consumption_in_bytes_ -= estimated_memory_for_operator;
+  std::cout << "Operator " << index << " over, decremented: " << estimated_memory_for_operator << " bytes Total active: " << active_operators_.size();
+  std::cout << " Current slots: " << StorageManager::SlotsNeededForBytes(estimated_memory_consumption_in_bytes_) << "\n";
+  // Remove this operator from the active operators list.
+  auto iter =
+      std::find(active_operators_.begin(), active_operators_.end(), index);
+  if (iter != active_operators_.end()) {
+    active_operators_.erase(iter);
+  } else {
+    DLOG(WARNING) << "Marking operator " << index << " as finished which wasn't active";
+  }
+}
+
 }  // namespace quickstep
diff --git a/query_execution/QueryManagerSingleNode.hpp b/query_execution/QueryManagerSingleNode.hpp
index 8e4a929..9ee6a63 100644
--- a/query_execution/QueryManagerSingleNode.hpp
+++ b/query_execution/QueryManagerSingleNode.hpp
@@ -95,7 +95,11 @@
     return query_context_.get();
   }
 
+  void markOperatorFinished(const dag_node_index index) override;
+
  private:
+  static constexpr float kAdmissionNumSlotsThreshold = 0.25;
+
   bool checkNormalExecutionOver(const dag_node_index index) const override {
     return (checkAllDependenciesMet(index) &&
             !workorders_container_->hasNormalWorkOrder(index) &&
@@ -129,6 +133,12 @@
   WorkerMessage *getNextWorkerMessageFromActiveOperators(
       const numa_node_id numa_node);
 
+  bool isOperatorActive(const dag_node_index index) const;
+
+  bool canActivateOperator(const std::size_t estimated_memory_for_operator) const;
+
+  void updateActiveOperators();
+
   const tmb::client_id foreman_client_id_;
 
   StorageManager *storage_manager_;
@@ -142,6 +152,8 @@
 
   std::vector<dag_node_index> inactive_operators_;
 
+  std::size_t estimated_memory_consumption_in_bytes_;
+
   DISALLOW_COPY_AND_ASSIGN(QueryManagerSingleNode);
 };
 
diff --git a/relational_operators/SortMergeRunOperator.cpp b/relational_operators/SortMergeRunOperator.cpp
index 7de17ab..0ba5115 100644
--- a/relational_operators/SortMergeRunOperator.cpp
+++ b/relational_operators/SortMergeRunOperator.cpp
@@ -50,50 +50,13 @@
     StorageManager *storage_manager,
     const tmb::client_id scheduler_client_id,
     tmb::MessageBus *bus) {
-  if (input_relation_is_stored_) {
-    // Input blocks (or runs) are from base relation. Only possible when base
-    // relation is stored sorted.
-    if (!started_) {
-      // Initialize merge tree completely, since all input runs are known.
-      merge_tree_.initializeTree(input_relation_block_ids_.size());
-      started_ = true;
-      initializeInputRuns();
-    }
-  } else {
-    // Input blocks (or runs) are pipelined from the sorted run generation
-    // operator.
-    if (!started_ && !input_stream_done_) {
-      // Initialize merge tree for first pipeline mode.
-      merge_tree_.initializeForPipeline();
-      started_ = true;
-      initializeInputRuns();
-    }
-  }
+  initializeOperatorState();
   // Generate runs from merge tree.
   return generateWorkOrders(container, query_context, storage_manager, scheduler_client_id, bus);
 }
 
 bool SortMergeRunOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
-  if (input_relation_is_stored_) {
-    // Input blocks (or runs) are from base relation. Only possible when base
-    // relation is stored sorted.
-    if (!started_) {
-      // Initialize merge tree completely, since all input runs are known.
-      merge_tree_.initializeTree(input_relation_block_ids_.size());
-      started_ = true;
-      initializeInputRuns();
-    }
-  } else {
-    // Input blocks (or runs) are pipelined from the sorted run generation
-    // operator.
-    if (!started_ && !input_stream_done_) {
-      // Initialize merge tree for first pipeline mode.
-      merge_tree_.initializeForPipeline();
-      started_ = true;
-      initializeInputRuns();
-    }
-  }
-
+  initializeOperatorState();
   // Get merge jobs from merge tree.
   std::vector<MergeTree::MergeJob> jobs;
   const bool done_generating = merge_tree_.getMergeJobs(&jobs);
@@ -216,6 +179,8 @@
     return;
   }
 
+  initializeOperatorState();
+
   // Now we know all the input blocks; compute the merge tree.
   merge_tree_.initializeTree(input_relation_block_ids_.size());
 
@@ -227,6 +192,28 @@
   merge_tree_.checkAndFixFinalMerge();
 }
 
+void SortMergeRunOperator::initializeOperatorState() {
+  if (input_relation_is_stored_) {
+    // Input blocks (or runs) are from base relation. Only possible when base
+    // relation is stored sorted.
+    if (!started_) {
+      // Initialize merge tree completely, since all input runs are known.
+      merge_tree_.initializeTree(input_relation_block_ids_.size());
+      started_ = true;
+      initializeInputRuns();
+    }
+  } else {
+    // Input blocks (or runs) are pipelined from the sorted run generation
+    // operator.
+    if (!started_ && !input_stream_done_) {
+      // Initialize merge tree for first pipeline mode.
+      merge_tree_.initializeForPipeline();
+      started_ = true;
+      initializeInputRuns();
+    }
+  }
+}
+
 namespace {
 
 /**
diff --git a/relational_operators/SortMergeRunOperator.hpp b/relational_operators/SortMergeRunOperator.hpp
index 0839320..8674fb9 100644
--- a/relational_operators/SortMergeRunOperator.hpp
+++ b/relational_operators/SortMergeRunOperator.hpp
@@ -201,6 +201,15 @@
    **/
   serialization::WorkOrder* createWorkOrderProto(merge_run_operator::MergeTree::MergeJob *job);
 
+  /**
+   * @brief Initialize the operator's internal state.
+   *
+   * @note This function can be called a number of times, but the changes will
+   *       only be in effect because of the first call. Subsequent calls will
+   *       not do anything.
+   **/
+  void initializeOperatorState();
+
   const CatalogRelation &input_relation_;
 
   const CatalogRelation &output_relation_;
diff --git a/storage/HashTableKeyManager.hpp b/storage/HashTableKeyManager.hpp
index c902cbe..8852b00 100644
--- a/storage/HashTableKeyManager.hpp
+++ b/storage/HashTableKeyManager.hpp
@@ -78,7 +78,7 @@
       const std::size_t key_start_in_bucket,
       std::vector<std::size_t> *key_offsets) {
     DCHECK(!key_types.empty());
-    DCHECK_NOTNULL(key_offsets);
+    DCHECK(key_offsets != nullptr);
     std::size_t fixed_key_size = 0;
 
     for (const Type *subkey_type : key_types) {