Added num_partitions in RelationalOperator.
diff --git a/catalog/CMakeLists.txt b/catalog/CMakeLists.txt
index 0ca014e..1aa2b41 100644
--- a/catalog/CMakeLists.txt
+++ b/catalog/CMakeLists.txt
@@ -108,6 +108,7 @@
                       quickstep_catalog_Catalog_proto
                       quickstep_catalog_IndexScheme
                       quickstep_catalog_PartitionScheme
+                      quickstep_catalog_PartitionSchemeHeader
                       quickstep_storage_StorageBlockInfo
                       quickstep_storage_StorageBlockLayout
                       quickstep_storage_StorageBlockLayout_proto
diff --git a/catalog/CatalogRelation.cpp b/catalog/CatalogRelation.cpp
index 793fc2d..38c86f6 100644
--- a/catalog/CatalogRelation.cpp
+++ b/catalog/CatalogRelation.cpp
@@ -33,6 +33,7 @@
 #endif
 
 #include "catalog/PartitionScheme.hpp"
+#include "catalog/PartitionSchemeHeader.hpp"
 #include "storage/StorageBlockInfo.hpp"
 #include "storage/StorageBlockLayout.hpp"
 #include "storage/StorageBlockLayout.pb.h"
@@ -180,6 +181,10 @@
 void CatalogRelation::setPartitionScheme(PartitionScheme* partition_scheme) {
   DCHECK_EQ(0u, size_blocks());
   partition_scheme_.reset(partition_scheme);
+
+  if (partition_scheme_) {
+    num_partitions_ = partition_scheme_->getPartitionSchemeHeader().getNumPartitions();
+  }
 }
 
 void CatalogRelation::setDefaultStorageBlockLayout(StorageBlockLayout *default_layout) {
diff --git a/catalog/CatalogRelation.hpp b/catalog/CatalogRelation.hpp
index af7f273..e40b599 100644
--- a/catalog/CatalogRelation.hpp
+++ b/catalog/CatalogRelation.hpp
@@ -81,6 +81,7 @@
                   bool temporary = false)
       : CatalogRelationSchema(parent, name, id, temporary),
         default_layout_(nullptr),
+        num_partitions_(1u),
         statistics_(new CatalogRelationStatistics()) {
   }
 
@@ -138,6 +139,15 @@
   }
 
   /**
+   * @brief Get the number of partitions for the relation.
+   *
+   * @return The number of partitions the relation is partitioned into.
+   **/
+  std::size_t getNumPartitions() const {
+    return num_partitions_;
+  }
+
+  /**
    * @brief Check if a NUMA placement scheme is available for the relation.
    *
    * @return True if the relation has a NUMA placement scheme, false otherwise.
@@ -423,6 +433,7 @@
   // A relation may or may not have a Partition Scheme
   // assosiated with it.
   std::unique_ptr<PartitionScheme> partition_scheme_;
+  std::size_t num_partitions_;
 
   // Index scheme associated with this relation.
   // Defines a set of indices defined for this relation.
diff --git a/query_optimizer/ExecutionGenerator.cpp b/query_optimizer/ExecutionGenerator.cpp
index 9bfd136..d43432c 100644
--- a/query_optimizer/ExecutionGenerator.cpp
+++ b/query_optimizer/ExecutionGenerator.cpp
@@ -747,13 +747,6 @@
                                  insert_destination_proto);
 
   // Create and add a Select operator.
-  const PartitionScheme *input_partition_scheme = input_relation.getPartitionScheme();
-
-  const std::size_t num_partitions =
-      input_partition_scheme
-          ? input_partition_scheme->getPartitionSchemeHeader().getNumPartitions()
-          : 1u;
-
   // Use the "simple" form of the selection operator (a pure projection that
   // doesn't require any expression evaluation or intermediate copies) if
   // possible.
@@ -766,16 +759,14 @@
                                insert_destination_index,
                                execution_predicate_index,
                                move(attributes),
-                               input_relation_info->isStoredRelation(),
-                               num_partitions)
+                               input_relation_info->isStoredRelation())
           : new SelectOperator(query_handle_->query_id(),
                                input_relation,
                                *output_relation,
                                insert_destination_index,
                                execution_predicate_index,
                                project_expressions_group_index,
-                               input_relation_info->isStoredRelation(),
-                               num_partitions);
+                               input_relation_info->isStoredRelation());
 
   const QueryPlan::DAGNodeIndex select_index =
       execution_plan_->addRelationalOperator(op);
@@ -847,12 +838,6 @@
       findRelationInfoOutputByPhysical(build_physical);
 
   const CatalogRelation &build_relation = *build_relation_info->relation;
-  const PartitionScheme *build_partition_scheme = build_relation.getPartitionScheme();
-
-  const std::size_t build_num_partitions =
-      build_partition_scheme
-          ? build_partition_scheme->getPartitionSchemeHeader().getNumPartitions()
-          : 1u;
 
   // Create a BuildLIPFilterOperator for the FilterJoin. This operator builds
   // LIP filters that are applied properly in downstream operators to achieve
@@ -862,7 +847,6 @@
           new BuildLIPFilterOperator(
               query_handle_->query_id(),
               build_relation,
-              build_num_partitions,
               build_side_predicate_index,
               build_relation_info->isStoredRelation()));
 
@@ -954,7 +938,7 @@
 
   const CatalogRelationInfo *build_relation_info =
       findRelationInfoOutputByPhysical(build_physical);
-  const CatalogRelationInfo *probe_operator_info =
+  const CatalogRelationInfo *probe_relation_info =
       findRelationInfoOutputByPhysical(probe_physical);
 
   // Create a vector that indicates whether each project expression is using
@@ -970,9 +954,10 @@
   }
 
   const CatalogRelation *build_relation = build_relation_info->relation;
+  const CatalogRelation *probe_relation = probe_relation_info->relation;
 
   // FIXME(quickstep-team): Add support for self-join.
-  if (build_relation == probe_operator_info->relation) {
+  if (build_relation == probe_relation) {
     THROW_SQL_ERROR() << "Self-join is not supported";
   }
 
@@ -982,9 +967,7 @@
   S::QueryContext::HashTableContext *hash_table_context_proto =
       query_context_proto_->add_join_hash_tables();
 
-  const P::PartitionSchemeHeader *probe_partition_scheme_header = probe_physical->getOutputPartitionSchemeHeader();
-  const std::size_t probe_num_partitions =
-      probe_partition_scheme_header ? probe_partition_scheme_header->num_partitions : 1u;
+  const std::size_t probe_num_partitions = probe_relation->getNumPartitions();
   hash_table_context_proto->set_num_partitions(probe_num_partitions);
 
   S::HashTable *hash_table_proto = hash_table_context_proto->mutable_join_hash_table();
@@ -1052,8 +1035,8 @@
           new HashJoinOperator(
               query_handle_->query_id(),
               *build_relation,
-              *probe_operator_info->relation,
-              probe_operator_info->isStoredRelation(),
+              *probe_relation,
+              probe_relation_info->isStoredRelation(),
               probe_attribute_ids,
               any_probe_attributes_nullable,
               probe_num_partitions,
@@ -1083,9 +1066,9 @@
                                          build_relation_info->producer_operator_index,
                                          true /* is_pipeline_breaker */);
   }
-  if (!probe_operator_info->isStoredRelation()) {
+  if (!probe_relation_info->isStoredRelation()) {
     execution_plan_->addDirectDependency(join_operator_index,
-                                         probe_operator_info->producer_operator_index,
+                                         probe_relation_info->producer_operator_index,
                                          false /* is_pipeline_breaker */);
   }
   execution_plan_->addDirectDependency(join_operator_index,
@@ -1140,15 +1123,11 @@
     THROW_SQL_ERROR() << "NestedLoopsJoin does not support self-join yet";
   }
 
-  const PartitionScheme *left_partition_scheme = left_relation.getPartitionScheme();
-  const std::size_t num_partitions =
-      left_partition_scheme ? left_partition_scheme->getPartitionSchemeHeader().getNumPartitions()
-                            : 1u;
+  const std::size_t num_partitions = left_relation.getNumPartitions();
 
 #ifdef QUICKSTEP_DEBUG
-  const PartitionScheme *right_partition_scheme = right_relation.getPartitionScheme();
-  if (right_partition_scheme) {
-    DCHECK_EQ(num_partitions, right_partition_scheme->getPartitionSchemeHeader().getNumPartitions());
+  if (right_relation.hasPartitionScheme()) {
+    DCHECK_EQ(num_partitions, right_relation.getNumPartitions());
   }
 #endif
 
@@ -1549,12 +1528,6 @@
   const CatalogRelationInfo *selection_relation_info =
       findRelationInfoOutputByPhysical(physical_plan->selection());
   const CatalogRelation &selection_relation = *selection_relation_info->relation;
-  const PartitionScheme *selection_partition_scheme = selection_relation.getPartitionScheme();
-
-  const std::size_t num_partitions =
-      selection_partition_scheme
-          ? selection_partition_scheme->getPartitionSchemeHeader().getNumPartitions()
-          : 1u;
 
   // Prepare the attributes, which are output columns of the selection relation.
   std::vector<attribute_id> attributes;
@@ -1580,8 +1553,7 @@
                          insert_destination_index,
                          QueryContext::kInvalidPredicateId,
                          move(attributes),
-                         selection_relation_info->isStoredRelation(),
-                         num_partitions);
+                         selection_relation_info->isStoredRelation());
 
   const QueryPlan::DAGNodeIndex insert_selection_index =
       execution_plan_->addRelationalOperator(insert_selection_op);
@@ -1756,11 +1728,7 @@
   const CatalogRelationInfo *input_relation_info =
       findRelationInfoOutputByPhysical(physical_plan->input());
   const CatalogRelation &input_relation = *input_relation_info->relation;
-  const PartitionScheme *input_partition_scheme = input_relation.getPartitionScheme();
-  const size_t num_partitions =
-      input_partition_scheme
-          ? input_partition_scheme->getPartitionSchemeHeader().getNumPartitions()
-          : 1u;
+  const size_t num_partitions = input_relation.getNumPartitions();
 
   // Create aggr state proto.
   const QueryContext::aggregation_state_id aggr_state_index =
@@ -2312,7 +2280,9 @@
   // Get input.
   const CatalogRelationInfo *input_relation_info =
       findRelationInfoOutputByPhysical(physical_plan->input());
-  window_aggr_state_proto->set_input_relation_id(input_relation_info->relation->getID());
+  const CatalogRelation &input_relation = *input_relation_info->relation;
+  DCHECK_EQ(1u, input_relation.getNumPartitions());
+  window_aggr_state_proto->set_input_relation_id(input_relation.getID());
 
   // Get window aggregate function expression.
   const E::AliasPtr &named_window_aggregate_expression =
@@ -2382,7 +2352,7 @@
   const QueryPlan::DAGNodeIndex window_aggregation_operator_index =
       execution_plan_->addRelationalOperator(
           new WindowAggregationOperator(query_handle_->query_id(),
-                                        *input_relation_info->relation,
+                                        input_relation,
                                         *output_relation,
                                         window_aggr_state_index,
                                         insert_destination_index));
diff --git a/relational_operators/AggregationOperator.hpp b/relational_operators/AggregationOperator.hpp
index 2fb620b..c673ff5 100644
--- a/relational_operators/AggregationOperator.hpp
+++ b/relational_operators/AggregationOperator.hpp
@@ -76,11 +76,10 @@
                       bool input_relation_is_stored,
                       const QueryContext::aggregation_state_id aggr_state_index,
                       const std::size_t num_partitions)
-      : RelationalOperator(query_id),
+      : RelationalOperator(query_id, num_partitions),
         input_relation_(input_relation),
         input_relation_is_stored_(input_relation_is_stored),
         aggr_state_index_(aggr_state_index),
-        num_partitions_(num_partitions),
         input_relation_block_ids_(num_partitions),
         num_workorders_generated_(num_partitions),
         started_(false) {
@@ -136,7 +135,6 @@
   const CatalogRelation &input_relation_;
   const bool input_relation_is_stored_;
   const QueryContext::aggregation_state_id aggr_state_index_;
-  const std::size_t num_partitions_;
 
   // The index is the partition id.
   std::vector<BlocksInPartition> input_relation_block_ids_;
diff --git a/relational_operators/BuildAggregationExistenceMapOperator.hpp b/relational_operators/BuildAggregationExistenceMapOperator.hpp
index 8791a38..9cfd93d 100644
--- a/relational_operators/BuildAggregationExistenceMapOperator.hpp
+++ b/relational_operators/BuildAggregationExistenceMapOperator.hpp
@@ -79,12 +79,11 @@
                                        const bool input_relation_is_stored,
                                        const QueryContext::aggregation_state_id aggr_state_index,
                                        const std::size_t num_partitions)
-      : RelationalOperator(query_id),
+      : RelationalOperator(query_id, num_partitions),
         input_relation_(input_relation),
         build_attribute_(build_attribute),
         input_relation_is_stored_(input_relation_is_stored),
         aggr_state_index_(aggr_state_index),
-        num_partitions_(num_partitions),
         input_relation_block_ids_(num_partitions),
         num_workorders_generated_(num_partitions),
         started_(false) {
@@ -138,7 +137,6 @@
   const attribute_id build_attribute_;
   const bool input_relation_is_stored_;
   const QueryContext::aggregation_state_id aggr_state_index_;
-  const std::size_t num_partitions_;
 
   // The index is the partition id.
   std::vector<BlocksInPartition> input_relation_block_ids_;
diff --git a/relational_operators/BuildHashOperator.hpp b/relational_operators/BuildHashOperator.hpp
index d18d9fb..dfb6dfe 100644
--- a/relational_operators/BuildHashOperator.hpp
+++ b/relational_operators/BuildHashOperator.hpp
@@ -89,12 +89,11 @@
                     const bool any_join_key_attributes_nullable,
                     const std::size_t num_partitions,
                     const QueryContext::join_hash_table_id hash_table_index)
-      : RelationalOperator(query_id),
+      : RelationalOperator(query_id, num_partitions),
         input_relation_(input_relation),
         input_relation_is_stored_(input_relation_is_stored),
         join_key_attributes_(join_key_attributes),
         any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
-        num_partitions_(num_partitions),
         is_broadcast_join_(num_partitions > 1u && !input_relation.hasPartitionScheme()),
         hash_table_index_(hash_table_index),
         input_relation_block_ids_(num_partitions),
@@ -102,8 +101,9 @@
         started_(false) {
     if (input_relation_is_stored) {
       if (input_relation.hasPartitionScheme()) {
+        DCHECK_EQ(num_partitions_, input_relation.getNumPartitions());
+
         const PartitionScheme &part_scheme = *input_relation.getPartitionScheme();
-        DCHECK_EQ(part_scheme.getPartitionSchemeHeader().getNumPartitions(), num_partitions_);
         for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
           input_relation_block_ids_[part_id] = part_scheme.getBlocksInPartition(part_id);
         }
@@ -162,7 +162,6 @@
   const bool input_relation_is_stored_;
   const std::vector<attribute_id> join_key_attributes_;
   const bool any_join_key_attributes_nullable_;
-  const std::size_t num_partitions_;
   const bool is_broadcast_join_;
   const QueryContext::join_hash_table_id hash_table_index_;
 
diff --git a/relational_operators/BuildLIPFilterOperator.hpp b/relational_operators/BuildLIPFilterOperator.hpp
index 9b23dd9..41c3294 100644
--- a/relational_operators/BuildLIPFilterOperator.hpp
+++ b/relational_operators/BuildLIPFilterOperator.hpp
@@ -28,7 +28,6 @@
 #include "catalog/CatalogRelation.hpp"
 #include "catalog/CatalogTypedefs.hpp"
 #include "catalog/PartitionScheme.hpp"
-#include "catalog/PartitionSchemeHeader.hpp"
 #include "query_execution/QueryContext.hpp"
 #include "relational_operators/RelationalOperator.hpp"
 #include "relational_operators/WorkOrder.hpp"
@@ -70,8 +69,6 @@
    *
    * @param query_id The ID of the query to which this operator belongs.
    * @param input_relation The relation to build LIP filters on.
-   * @param num_partitions The number of partitions in 'input_relation'.
-   *        If no partitions, it is one.
    * @param build_side_predicate_index The index of the predicate in QueryContext
    *        where the predicate is to be applied to the input relation before
    *        building the LIP filters (or kInvalidPredicateId if no predicate is
@@ -82,21 +79,18 @@
    **/
   BuildLIPFilterOperator(const std::size_t query_id,
                          const CatalogRelation &input_relation,
-                         const std::size_t num_partitions,
                          const QueryContext::predicate_id build_side_predicate_index,
                          const bool input_relation_is_stored)
-    : RelationalOperator(query_id),
+    : RelationalOperator(query_id, input_relation.getNumPartitions()),
       input_relation_(input_relation),
-      num_partitions_(num_partitions),
       build_side_predicate_index_(build_side_predicate_index),
       input_relation_is_stored_(input_relation_is_stored),
-      input_relation_block_ids_(num_partitions),
-      num_workorders_generated_(num_partitions),
+      input_relation_block_ids_(num_partitions_),
+      num_workorders_generated_(num_partitions_),
       started_(false) {
     if (input_relation_is_stored) {
       if (input_relation.hasPartitionScheme()) {
         const PartitionScheme &part_scheme = *input_relation.getPartitionScheme();
-        DCHECK_EQ(part_scheme.getPartitionSchemeHeader().getNumPartitions(), num_partitions_);
         for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
           input_relation_block_ids_[part_id] = part_scheme.getBlocksInPartition(part_id);
         }
@@ -148,7 +142,6 @@
   serialization::WorkOrder* createWorkOrderProto(const partition_id part_id, const block_id block);
 
   const CatalogRelation &input_relation_;
-  const std::size_t num_partitions_;
   const QueryContext::predicate_id build_side_predicate_index_;
   const bool input_relation_is_stored_;
 
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index c0def6f..57ba9f9 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -153,7 +153,6 @@
                       quickstep_catalog_CatalogRelation
                       quickstep_catalog_CatalogTypedefs
                       quickstep_catalog_PartitionScheme
-                      quickstep_catalog_PartitionSchemeHeader
                       quickstep_queryexecution_QueryContext
                       quickstep_queryexecution_WorkOrderProtosContainer
                       quickstep_queryexecution_WorkOrdersContainer
@@ -190,6 +189,7 @@
                       quickstep_catalog_CatalogRelation
                       quickstep_catalog_CatalogRelationSchema
                       quickstep_catalog_CatalogTypedefs
+                      quickstep_catalog_PartitionScheme
                       quickstep_queryexecution_QueryContext
                       quickstep_queryexecution_QueryExecutionMessages_proto
                       quickstep_queryexecution_QueryExecutionTypedefs
@@ -394,7 +394,7 @@
                       glog
                       quickstep_catalog_CatalogRelation
                       quickstep_catalog_CatalogTypedefs
-                      quickstep_catalog_PartitionSchemeHeader
+                      quickstep_catalog_PartitionScheme
                       quickstep_queryexecution_QueryContext
                       quickstep_queryexecution_WorkOrderProtosContainer
                       quickstep_queryexecution_WorkOrdersContainer
@@ -534,6 +534,7 @@
                       quickstep_catalog_CatalogRelation
                       quickstep_catalog_CatalogRelationSchema
                       quickstep_catalog_CatalogTypedefs
+                      quickstep_catalog_PartitionScheme
                       quickstep_queryexecution_QueryContext
                       quickstep_queryexecution_QueryExecutionMessages_proto
                       quickstep_queryexecution_QueryExecutionTypedefs
diff --git a/relational_operators/CreateIndexOperator.hpp b/relational_operators/CreateIndexOperator.hpp
index 09d5d81..9449f53 100644
--- a/relational_operators/CreateIndexOperator.hpp
+++ b/relational_operators/CreateIndexOperator.hpp
@@ -64,7 +64,7 @@
                       CatalogRelation *relation,
                       const std::string &index_name,
                       IndexSubBlockDescription &&index_description)  // NOLINT(whitespace/operators)
-      : RelationalOperator(query_id),
+      : RelationalOperator(query_id, 0u),
         relation_(DCHECK_NOTNULL(relation)),
         index_name_(index_name),
         index_description_(index_description) {}
diff --git a/relational_operators/CreateTableOperator.hpp b/relational_operators/CreateTableOperator.hpp
index 52c7846..827ed72 100644
--- a/relational_operators/CreateTableOperator.hpp
+++ b/relational_operators/CreateTableOperator.hpp
@@ -64,7 +64,7 @@
   CreateTableOperator(const std::size_t query_id,
                       CatalogRelation *relation,
                       CatalogDatabase *database)
-      : RelationalOperator(query_id),
+      : RelationalOperator(query_id, 0u),
         relation_(DCHECK_NOTNULL(relation)),
         database_(DCHECK_NOTNULL(database)) {}
 
diff --git a/relational_operators/DeleteOperator.cpp b/relational_operators/DeleteOperator.cpp
index a8723a9..4958024 100644
--- a/relational_operators/DeleteOperator.cpp
+++ b/relational_operators/DeleteOperator.cpp
@@ -25,6 +25,7 @@
 #include <vector>
 
 #include "catalog/CatalogRelationSchema.hpp"
+#include "catalog/CatalogTypedefs.hpp"
 #include "query_execution/QueryContext.hpp"
 #include "query_execution/QueryExecutionMessages.pb.h"
 #include "query_execution/QueryExecutionUtil.hpp"
@@ -58,36 +59,42 @@
       return true;
     }
 
-    for (const block_id input_block_id : relation_block_ids_) {
-      container->addNormalWorkOrder(
-          new DeleteWorkOrder(query_id_,
-                              relation_,
-                              input_block_id,
-                              predicate,
-                              storage_manager,
-                              op_index_,
-                              scheduler_client_id,
-                              bus),
-          op_index_);
+    for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+      for (const block_id input_block_id : relation_block_ids_[part_id]) {
+        container->addNormalWorkOrder(
+            new DeleteWorkOrder(query_id_,
+                                relation_,
+                                part_id,
+                                input_block_id,
+                                predicate,
+                                storage_manager,
+                                op_index_,
+                                scheduler_client_id,
+                                bus),
+            op_index_);
+      }
     }
     started_ = true;
     return true;
-  } else {
-    while (num_workorders_generated_ < relation_block_ids_.size()) {
+  }
+
+  for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+    while (num_workorders_generated_[part_id] < relation_block_ids_[part_id].size()) {
       container->addNormalWorkOrder(
           new DeleteWorkOrder(query_id_,
                               relation_,
-                              relation_block_ids_[num_workorders_generated_],
+                              part_id,
+                              relation_block_ids_[part_id][num_workorders_generated_[part_id]],
                               predicate,
                               storage_manager,
                               op_index_,
                               scheduler_client_id,
                               bus),
           op_index_);
-      ++num_workorders_generated_;
+      ++num_workorders_generated_[part_id];
     }
-    return done_feeding_input_relation_;
   }
+  return done_feeding_input_relation_;
 }
 
 bool DeleteOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
@@ -97,23 +104,27 @@
       return true;
     }
 
-    for (const block_id input_block_id : relation_block_ids_) {
-      container->addWorkOrderProto(createWorkOrderProto(input_block_id), op_index_);
+    for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+      for (const block_id input_block_id : relation_block_ids_[part_id]) {
+        container->addWorkOrderProto(createWorkOrderProto(part_id, input_block_id), op_index_);
+      }
     }
     started_ = true;
     return true;
-  } else {
-    while (num_workorders_generated_ < relation_block_ids_.size()) {
-      container->addWorkOrderProto(
-          createWorkOrderProto(relation_block_ids_[num_workorders_generated_]),
-          op_index_);
-      ++num_workorders_generated_;
-    }
-    return done_feeding_input_relation_;
   }
+
+  for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+    while (num_workorders_generated_[part_id] < relation_block_ids_[part_id].size()) {
+      container->addWorkOrderProto(
+          createWorkOrderProto(part_id, relation_block_ids_[part_id][num_workorders_generated_[part_id]]),
+          op_index_);
+      ++num_workorders_generated_[part_id];
+    }
+  }
+  return done_feeding_input_relation_;
 }
 
-serialization::WorkOrder* DeleteOperator::createWorkOrderProto(const block_id block) {
+serialization::WorkOrder* DeleteOperator::createWorkOrderProto(const partition_id part_id, const block_id block) {
   serialization::WorkOrder *proto = new serialization::WorkOrder;
   proto->set_work_order_type(serialization::DELETE);
   proto->set_query_id(query_id_);
@@ -122,6 +133,7 @@
   proto->SetExtension(serialization::DeleteWorkOrder::relation_id, relation_.getID());
   proto->SetExtension(serialization::DeleteWorkOrder::predicate_index, predicate_index_);
   proto->SetExtension(serialization::DeleteWorkOrder::block_id, block);
+  proto->SetExtension(serialization::DeleteWorkOrder::partition_id, part_id);
 
   return proto;
 }
@@ -139,6 +151,7 @@
   proto.set_block_id(input_block_id_);
   proto.set_relation_id(input_relation_.getID());
   proto.set_query_id(query_id_);
+  proto.set_partition_id(partition_id_);
 
   // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
   const std::size_t proto_length = proto.ByteSize();
diff --git a/relational_operators/DeleteOperator.hpp b/relational_operators/DeleteOperator.hpp
index aed37f6..b084560 100644
--- a/relational_operators/DeleteOperator.hpp
+++ b/relational_operators/DeleteOperator.hpp
@@ -26,6 +26,7 @@
 
 #include "catalog/CatalogRelation.hpp"
 #include "catalog/CatalogTypedefs.hpp"
+#include "catalog/PartitionScheme.hpp"
 #include "query_execution/QueryContext.hpp"
 #include "query_execution/QueryExecutionTypedefs.hpp"
 #include "relational_operators/RelationalOperator.hpp"
@@ -73,14 +74,26 @@
                  const CatalogRelation &relation,
                  const QueryContext::predicate_id predicate_index,
                  const bool relation_is_stored)
-      : RelationalOperator(query_id),
+      : RelationalOperator(query_id, relation.getNumPartitions()),
         relation_(relation),
         predicate_index_(predicate_index),
         relation_is_stored_(relation_is_stored),
         started_(false),
-        relation_block_ids_(relation_is_stored ? relation.getBlocksSnapshot()
-                                               : std::vector<block_id>()),
-        num_workorders_generated_(0) {}
+        relation_block_ids_(num_partitions_),
+        num_workorders_generated_(num_partitions_) {
+    if (relation_is_stored) {
+      if (relation.hasPartitionScheme()) {
+        const PartitionScheme &part_scheme = *relation.getPartitionScheme();
+
+        for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+          relation_block_ids_[part_id] = part_scheme.getBlocksInPartition(part_id);
+        }
+      } else {
+        DCHECK_EQ(1u, num_partitions_);
+        relation_block_ids_[0] = relation.getBlocksSnapshot();
+      }
+    }
+  }
 
   ~DeleteOperator() override {}
 
@@ -107,16 +120,17 @@
   void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
                       const partition_id part_id) override {
     DCHECK(!relation_is_stored_);
-    relation_block_ids_.push_back(input_block_id);
+    relation_block_ids_[part_id].push_back(input_block_id);
   }
 
  private:
   /**
    * @brief Create Work Order proto.
    *
+   * @param part_id The partition id.
    * @param block The block id used in the Work Order.
    **/
-  serialization::WorkOrder* createWorkOrderProto(const block_id block);
+  serialization::WorkOrder* createWorkOrderProto(const partition_id part_id, const block_id block);
 
   const CatalogRelation &relation_;
   const QueryContext::predicate_id predicate_index_;
@@ -125,8 +139,8 @@
 
   bool started_;
 
-  std::vector<block_id> relation_block_ids_;
-  std::vector<block_id>::size_type num_workorders_generated_;
+  std::vector<std::vector<block_id>> relation_block_ids_;
+  std::vector<std::size_t> num_workorders_generated_;
 
   DISALLOW_COPY_AND_ASSIGN(DeleteOperator);
 };
@@ -141,6 +155,7 @@
    *
    * @param query_id The ID of the query to which this workorder belongs.
    * @param input_relation The relation to perform the DELETE over.
+   * @param part_id The partition id.
    * @param input_block_id The block Id.
    * @param predicate All tuples matching \c predicate will be deleted (If
    *        NULL, then all tuples will be deleted).
@@ -152,13 +167,14 @@
    **/
   DeleteWorkOrder(const std::size_t query_id,
                   const CatalogRelationSchema &input_relation,
+                  const partition_id part_id,
                   const block_id input_block_id,
                   const Predicate *predicate,
                   StorageManager *storage_manager,
                   const std::size_t delete_operator_index,
                   const tmb::client_id scheduler_client_id,
                   MessageBus *bus)
-      : WorkOrder(query_id),
+      : WorkOrder(query_id, part_id),
         input_relation_(input_relation),
         input_block_id_(input_block_id),
         predicate_(predicate),
diff --git a/relational_operators/DestroyAggregationStateOperator.hpp b/relational_operators/DestroyAggregationStateOperator.hpp
index c5b092f..f285957 100644
--- a/relational_operators/DestroyAggregationStateOperator.hpp
+++ b/relational_operators/DestroyAggregationStateOperator.hpp
@@ -61,9 +61,8 @@
       const std::size_t query_id,
       const QueryContext::aggregation_state_id aggr_state_index,
       const std::size_t num_partitions)
-      : RelationalOperator(query_id),
+      : RelationalOperator(query_id, num_partitions),
         aggr_state_index_(aggr_state_index),
-        num_partitions_(num_partitions),
         work_generated_(false) {}
 
   ~DestroyAggregationStateOperator() override {}
@@ -86,7 +85,6 @@
 
  private:
   const QueryContext::aggregation_state_id aggr_state_index_;
-  const std::size_t num_partitions_;
   bool work_generated_;
 
   DISALLOW_COPY_AND_ASSIGN(DestroyAggregationStateOperator);
diff --git a/relational_operators/DestroyHashOperator.cpp b/relational_operators/DestroyHashOperator.cpp
index 4b93f0c..490c316 100644
--- a/relational_operators/DestroyHashOperator.cpp
+++ b/relational_operators/DestroyHashOperator.cpp
@@ -38,7 +38,7 @@
     return true;
   }
 
-  for (std::size_t part_id = 0; part_id < build_num_partitions_; ++part_id) {
+  for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) {
     container->addNormalWorkOrder(
         new DestroyHashWorkOrder(query_id_, hash_table_index_, part_id, query_context),
         op_index_);
@@ -52,7 +52,7 @@
     return true;
   }
 
-  for (std::size_t part_id = 0; part_id < build_num_partitions_; ++part_id) {
+  for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) {
     serialization::WorkOrder *proto = new serialization::WorkOrder;
     proto->set_work_order_type(serialization::DESTROY_HASH);
     proto->set_query_id(query_id_);
diff --git a/relational_operators/DestroyHashOperator.hpp b/relational_operators/DestroyHashOperator.hpp
index 3a6cf0e..6c4c997 100644
--- a/relational_operators/DestroyHashOperator.hpp
+++ b/relational_operators/DestroyHashOperator.hpp
@@ -53,14 +53,13 @@
    * @brief Constructor.
    *
    * @param query_id The ID of the query to which this operator belongs.
-   * @param build_num_partitions The number of partitions in 'build_relation'.
+   * @param num_partitions The number of partitions.
    * @param hash_table_index The index of the JoinHashTable in QueryContext.
    **/
   DestroyHashOperator(const std::size_t query_id,
-                      const std::size_t build_num_partitions,
+                      const std::size_t num_partitions,
                       const QueryContext::join_hash_table_id hash_table_index)
-      : RelationalOperator(query_id),
-        build_num_partitions_(build_num_partitions),
+      : RelationalOperator(query_id, num_partitions),
         hash_table_index_(hash_table_index),
         work_generated_(false) {}
 
@@ -83,7 +82,6 @@
   bool getAllWorkOrderProtos(WorkOrderProtosContainer *container) override;
 
  private:
-  const std::size_t build_num_partitions_;
   const QueryContext::join_hash_table_id hash_table_index_;
   bool work_generated_;
 
diff --git a/relational_operators/FinalizeAggregationOperator.hpp b/relational_operators/FinalizeAggregationOperator.hpp
index a6f1ef6..382a7da 100644
--- a/relational_operators/FinalizeAggregationOperator.hpp
+++ b/relational_operators/FinalizeAggregationOperator.hpp
@@ -72,9 +72,8 @@
       const std::size_t aggr_state_num_partitions,
       const CatalogRelation &output_relation,
       const QueryContext::insert_destination_id output_destination_index)
-      : RelationalOperator(query_id),
+      : RelationalOperator(query_id, num_partitions),
         aggr_state_index_(aggr_state_index),
-        num_partitions_(num_partitions),
         aggr_state_num_partitions_(aggr_state_num_partitions),
         output_relation_(output_relation),
         output_destination_index_(output_destination_index),
@@ -108,7 +107,7 @@
 
  private:
   const QueryContext::aggregation_state_id aggr_state_index_;
-  const std::size_t num_partitions_, aggr_state_num_partitions_;
+  const std::size_t aggr_state_num_partitions_;
   const CatalogRelation &output_relation_;
   const QueryContext::insert_destination_id output_destination_index_;
   bool started_;
diff --git a/relational_operators/HashJoinOperator.hpp b/relational_operators/HashJoinOperator.hpp
index 316be66..519718c 100644
--- a/relational_operators/HashJoinOperator.hpp
+++ b/relational_operators/HashJoinOperator.hpp
@@ -137,13 +137,12 @@
       const QueryContext::scalar_group_id selection_index,
       const std::vector<bool> *is_selection_on_build = nullptr,
       const JoinType join_type = JoinType::kInnerJoin)
-      : RelationalOperator(query_id),
+      : RelationalOperator(query_id, num_partitions),
         build_relation_(build_relation),
         probe_relation_(probe_relation),
         probe_relation_is_stored_(probe_relation_is_stored),
         join_key_attributes_(join_key_attributes),
         any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
-        num_partitions_(num_partitions),
         output_relation_(output_relation),
         output_destination_index_(output_destination_index),
         hash_table_index_(hash_table_index),
@@ -276,7 +275,6 @@
   const bool probe_relation_is_stored_;
   const std::vector<attribute_id> join_key_attributes_;
   const bool any_join_key_attributes_nullable_;
-  const std::size_t num_partitions_;
   const CatalogRelation &output_relation_;
   const QueryContext::insert_destination_id output_destination_index_;
   const QueryContext::join_hash_table_id hash_table_index_;
diff --git a/relational_operators/InitializeAggregationOperator.hpp b/relational_operators/InitializeAggregationOperator.hpp
index 0b052c7..cf9abe5 100644
--- a/relational_operators/InitializeAggregationOperator.hpp
+++ b/relational_operators/InitializeAggregationOperator.hpp
@@ -65,9 +65,8 @@
                                 const QueryContext::aggregation_state_id aggr_state_index,
                                 const std::size_t num_partitions,
                                 const std::size_t aggr_state_num_init_partitions)
-      : RelationalOperator(query_id),
+      : RelationalOperator(query_id, num_partitions),
         aggr_state_index_(aggr_state_index),
-        num_partitions_(num_partitions),
         aggr_state_num_init_partitions_(aggr_state_num_init_partitions),
         started_(false) {}
 
@@ -91,7 +90,7 @@
 
  private:
   const QueryContext::aggregation_state_id aggr_state_index_;
-  const std::size_t num_partitions_, aggr_state_num_init_partitions_;
+  const std::size_t aggr_state_num_init_partitions_;
   bool started_;
 
   DISALLOW_COPY_AND_ASSIGN(InitializeAggregationOperator);
diff --git a/relational_operators/NestedLoopsJoinOperator.hpp b/relational_operators/NestedLoopsJoinOperator.hpp
index 006d496..1b14638 100644
--- a/relational_operators/NestedLoopsJoinOperator.hpp
+++ b/relational_operators/NestedLoopsJoinOperator.hpp
@@ -100,11 +100,10 @@
       const QueryContext::scalar_group_id selection_index,
       const bool left_relation_is_stored,
       const bool right_relation_is_stored)
-      : RelationalOperator(query_id),
+      : RelationalOperator(query_id, num_partitions),
         nested_loops_join_index_(nested_loops_join_index),
         left_input_relation_(left_input_relation),
         right_input_relation_(right_input_relation),
-        num_partitions_(num_partitions),
         output_relation_(output_relation),
         output_destination_index_(output_destination_index),
         join_predicate_index_(join_predicate_index),
@@ -296,8 +295,6 @@
   const CatalogRelation &left_input_relation_;
   const CatalogRelation &right_input_relation_;
 
-  const std::size_t num_partitions_;
-
   const CatalogRelation &output_relation_;
   const QueryContext::insert_destination_id output_destination_index_;
 
diff --git a/relational_operators/RelationalOperator.hpp b/relational_operators/RelationalOperator.hpp
index 5de7eb5..29ae194 100644
--- a/relational_operators/RelationalOperator.hpp
+++ b/relational_operators/RelationalOperator.hpp
@@ -260,6 +260,15 @@
   }
 
   /**
+   * @brief Get the number of partitions of the input relation in the operator.
+   *
+   * @return The number of partitions of the input relation.
+   */
+  std::size_t getNumPartitions() const {
+    return num_partitions_;
+  }
+
+  /**
    * @brief Deploy a group of LIPFilters to this operator.
    */
   void deployLIPFilters(const QueryContext::lip_deployment_id lip_deployment_index,
@@ -273,15 +282,18 @@
    * @brief Constructor
    *
    * @param query_id The ID of the query to which this operator belongs.
-   * @param blocking_dependencies_met If those dependencies which break the
-   *        pipeline have been met.
+   * @param num_partitions The number of partitions of the input relation.
+   *        If create table / index, return zero. If no partitions, return one.
    **/
-  explicit RelationalOperator(const std::size_t query_id)
+  explicit RelationalOperator(const std::size_t query_id,
+                              const std::size_t num_partitions = 1u)
       : query_id_(query_id),
+        num_partitions_(num_partitions),
         done_feeding_input_relation_(false),
         lip_deployment_index_(QueryContext::kInvalidLIPDeploymentId) {}
 
   const std::size_t query_id_;
+  const std::size_t num_partitions_;
 
   bool done_feeding_input_relation_;
   std::size_t op_index_;
diff --git a/relational_operators/SaveBlocksOperator.cpp b/relational_operators/SaveBlocksOperator.cpp
index 9d6c3f6..591de12 100644
--- a/relational_operators/SaveBlocksOperator.cpp
+++ b/relational_operators/SaveBlocksOperator.cpp
@@ -22,6 +22,7 @@
 #include <vector>
 
 #include "catalog/CatalogRelation.hpp"
+#include "catalog/CatalogTypedefs.hpp"
 #include "query_execution/WorkOrderProtosContainer.hpp"
 #include "query_execution/WorkOrdersContainer.hpp"
 #include "relational_operators/WorkOrder.pb.h"
@@ -38,31 +39,37 @@
     StorageManager *storage_manager,
     const tmb::client_id scheduler_client_id,
     tmb::MessageBus *bus) {
-  while (num_workorders_generated_ < destination_block_ids_.size()) {
-    container->addNormalWorkOrder(
-        new SaveBlocksWorkOrder(
-            query_id_,
-            destination_block_ids_[num_workorders_generated_],
-            force_,
-            storage_manager),
-        op_index_);
-    ++num_workorders_generated_;
+  for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+    while (num_workorders_generated_[part_id] < destination_block_ids_[part_id].size()) {
+      container->addNormalWorkOrder(
+          new SaveBlocksWorkOrder(
+              query_id_,
+              part_id,
+              destination_block_ids_[part_id][num_workorders_generated_[part_id]],
+              force_,
+              storage_manager),
+          op_index_);
+      ++num_workorders_generated_[part_id];
+    }
   }
   return done_feeding_input_relation_;
 }
 
 bool SaveBlocksOperator::getAllWorkOrderProtos(WorkOrderProtosContainer *container) {
-  while (num_workorders_generated_ < destination_block_ids_.size()) {
-    serialization::WorkOrder *proto = new serialization::WorkOrder;
-    proto->set_work_order_type(serialization::SAVE_BLOCKS);
-    proto->set_query_id(query_id_);
-    proto->SetExtension(serialization::SaveBlocksWorkOrder::block_id,
-                        destination_block_ids_[num_workorders_generated_]);
-    proto->SetExtension(serialization::SaveBlocksWorkOrder::force, force_);
+  for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+    while (num_workorders_generated_[part_id] < destination_block_ids_[part_id].size()) {
+      serialization::WorkOrder *proto = new serialization::WorkOrder;
+      proto->set_work_order_type(serialization::SAVE_BLOCKS);
+      proto->set_query_id(query_id_);
+      proto->SetExtension(serialization::SaveBlocksWorkOrder::block_id,
+                          destination_block_ids_[part_id][num_workorders_generated_[part_id]]);
+      proto->SetExtension(serialization::SaveBlocksWorkOrder::force, force_);
+      proto->SetExtension(serialization::SaveBlocksWorkOrder::partition_id, part_id);
 
-    container->addWorkOrderProto(proto, op_index_);
+      container->addWorkOrderProto(proto, op_index_);
 
-    ++num_workorders_generated_;
+      ++num_workorders_generated_[part_id];
+    }
   }
   return done_feeding_input_relation_;
 }
diff --git a/relational_operators/SaveBlocksOperator.hpp b/relational_operators/SaveBlocksOperator.hpp
index 4489299..cea2d7e 100644
--- a/relational_operators/SaveBlocksOperator.hpp
+++ b/relational_operators/SaveBlocksOperator.hpp
@@ -24,6 +24,7 @@
 #include <string>
 #include <vector>
 
+#include "catalog/CatalogRelation.hpp"
 #include "catalog/CatalogTypedefs.hpp"
 #include "relational_operators/RelationalOperator.hpp"
 #include "relational_operators/WorkOrder.hpp"
@@ -38,7 +39,6 @@
 
 namespace quickstep {
 
-class CatalogRelation;
 class QueryContext;
 class StorageManager;
 class WorkOrderProtosContainer;
@@ -64,10 +64,11 @@
   SaveBlocksOperator(const std::size_t query_id,
                      CatalogRelation *relation,
                      const bool force = false)
-      : RelationalOperator(query_id),
+      : RelationalOperator(query_id, relation->getNumPartitions()),
         force_(force),
         relation_(relation),
-        num_workorders_generated_(0) {}
+        destination_block_ids_(num_partitions_),
+        num_workorders_generated_(num_partitions_) {}
 
   ~SaveBlocksOperator() override {}
 
@@ -89,7 +90,7 @@
 
   void feedInputBlock(const block_id input_block_id, const relation_id input_relation_id,
                       const partition_id part_id) override {
-    destination_block_ids_.push_back(input_block_id);
+    destination_block_ids_[part_id].push_back(input_block_id);
   }
 
   void updateCatalogOnCompletion() override;
@@ -98,9 +99,9 @@
   const bool force_;
 
   CatalogRelation *relation_;
-  std::vector<block_id> destination_block_ids_;
+  std::vector<std::vector<block_id>> destination_block_ids_;
 
-  std::vector<block_id>::size_type num_workorders_generated_;
+  std::vector<std::size_t> num_workorders_generated_;
 
   DISALLOW_COPY_AND_ASSIGN(SaveBlocksOperator);
 };
@@ -114,16 +115,18 @@
    * @brief Constructor.
    *
    * @param query_id The ID of the query to which this operator belongs.
+   * @param part_id The partition id.
    * @param save_block_id The id of the block to save.
    * @param force If true, force writing of all blocks to disk, otherwise only
    *        write dirty blocks.
    * @param storage_manager The StorageManager to use.
    **/
   SaveBlocksWorkOrder(const std::size_t query_id,
+                      const partition_id part_id,
                       const block_id save_block_id,
                       const bool force,
                       StorageManager *storage_manager)
-      : WorkOrder(query_id),
+      : WorkOrder(query_id, part_id),
         save_block_id_(save_block_id),
         force_(force),
         storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
diff --git a/relational_operators/SelectOperator.hpp b/relational_operators/SelectOperator.hpp
index f4937b3..a71f4c1 100644
--- a/relational_operators/SelectOperator.hpp
+++ b/relational_operators/SelectOperator.hpp
@@ -32,7 +32,7 @@
 #include "catalog/NUMAPlacementScheme.hpp"
 #endif
 
-#include "catalog/PartitionSchemeHeader.hpp"
+#include "catalog/PartitionScheme.hpp"
 #include "query_execution/QueryContext.hpp"
 #include "relational_operators/RelationalOperator.hpp"
 #include "relational_operators/WorkOrder.hpp"
@@ -85,8 +85,6 @@
    * @param input_relation_is_stored If input_relation is a stored relation and
    *        is fully available to the operator before it can start generating
    *        workorders.
-   * @param num_partitions The number of partitions in 'input_relation'.
-   *        If no partitions, it is one.
    **/
   SelectOperator(
       const std::size_t query_id,
@@ -95,17 +93,15 @@
       const QueryContext::insert_destination_id output_destination_index,
       const QueryContext::predicate_id predicate_index,
       const QueryContext::scalar_group_id selection_index,
-      const bool input_relation_is_stored,
-      const std::size_t num_partitions)
-      : RelationalOperator(query_id),
+      const bool input_relation_is_stored)
+      : RelationalOperator(query_id, input_relation.getNumPartitions()),
         input_relation_(input_relation),
         output_relation_(output_relation),
         output_destination_index_(output_destination_index),
         predicate_index_(predicate_index),
         selection_index_(selection_index),
-        num_partitions_(num_partitions),
-        input_relation_block_ids_(num_partitions),
-        num_workorders_generated_(num_partitions),
+        input_relation_block_ids_(num_partitions_),
+        num_workorders_generated_(num_partitions_),
         simple_projection_(false),
         input_relation_is_stored_(input_relation_is_stored),
         started_(false) {
@@ -120,6 +116,7 @@
           input_relation_block_ids_[part_id] = part_scheme.getBlocksInPartition(part_id);
         }
       } else {
+        DCHECK_EQ(1u, num_partitions_);
         input_relation_block_ids_[0] = input_relation.getBlocksSnapshot();
       }
     }
@@ -143,8 +140,6 @@
    * @param input_relation_is_stored If input_relation is a stored relation and
    *        is fully available to the operator before it can start generating
    *        workorders.
-   * @param num_partitions The number of partitions in 'input_relation'.
-   *        If no partitions, it is one.
    **/
   SelectOperator(
       const std::size_t query_id,
@@ -153,18 +148,16 @@
       const QueryContext::insert_destination_id output_destination_index,
       const QueryContext::predicate_id predicate_index,
       std::vector<attribute_id> &&selection,
-      const bool input_relation_is_stored,
-      const std::size_t num_partitions)
-      : RelationalOperator(query_id),
+      const bool input_relation_is_stored)
+      : RelationalOperator(query_id, input_relation.getNumPartitions()),
         input_relation_(input_relation),
         output_relation_(output_relation),
         output_destination_index_(output_destination_index),
         predicate_index_(predicate_index),
         selection_index_(QueryContext::kInvalidScalarGroupId),
         simple_selection_(std::move(selection)),
-        num_partitions_(num_partitions),
-        input_relation_block_ids_(num_partitions),
-        num_workorders_generated_(num_partitions),
+        input_relation_block_ids_(num_partitions_),
+        num_workorders_generated_(num_partitions_),
         simple_projection_(true),
         input_relation_is_stored_(input_relation_is_stored),
         started_(false) {
@@ -179,6 +172,7 @@
           input_relation_block_ids_[part_id] = part_scheme.getBlocksInPartition(part_id);
         }
       } else {
+        DCHECK_EQ(1u, num_partitions_);
         input_relation_block_ids_[0] = input_relation.getBlocksSnapshot();
       }
     }
@@ -239,7 +233,6 @@
   const QueryContext::scalar_group_id selection_index_;
   const std::vector<attribute_id> simple_selection_;
 
-  const std::size_t num_partitions_;
   // A vector of vectors V where V[i] indicates the list of block IDs of the
   // input relation that belong to the partition i.
   std::vector<std::vector<block_id>> input_relation_block_ids_;
diff --git a/relational_operators/UpdateOperator.cpp b/relational_operators/UpdateOperator.cpp
index 08bfa59..0f7f999 100644
--- a/relational_operators/UpdateOperator.cpp
+++ b/relational_operators/UpdateOperator.cpp
@@ -25,6 +25,7 @@
 #include <utility>
 
 #include "catalog/CatalogRelationSchema.hpp"
+#include "catalog/CatalogTypedefs.hpp"
 #include "query_execution/QueryContext.hpp"
 #include "query_execution/QueryExecutionMessages.pb.h"
 #include "query_execution/QueryExecutionUtil.hpp"
@@ -57,21 +58,24 @@
   }
 
   DCHECK(query_context != nullptr);
-  for (const block_id input_block_id : input_blocks_) {
-    container->addNormalWorkOrder(
-        new UpdateWorkOrder(
-            query_id_,
-            relation_,
-            input_block_id,
-            query_context->getPredicate(predicate_index_),
-            query_context->getUpdateGroup(update_group_index_),
-            query_context->getInsertDestination(
-                relocation_destination_index_),
-            storage_manager,
-            op_index_,
-            scheduler_client_id,
-            bus),
-        op_index_);
+  for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+    for (const block_id input_block_id : input_blocks_[part_id]) {
+      container->addNormalWorkOrder(
+          new UpdateWorkOrder(
+              query_id_,
+              relation_,
+              part_id,
+              input_block_id,
+              query_context->getPredicate(predicate_index_),
+              query_context->getUpdateGroup(update_group_index_),
+              query_context->getInsertDestination(
+                  relocation_destination_index_),
+              storage_manager,
+              op_index_,
+              scheduler_client_id,
+              bus),
+          op_index_);
+    }
   }
   started_ = true;
   return true;
@@ -82,19 +86,22 @@
     return true;
   }
 
-  for (const block_id input_block_id : input_blocks_) {
-    serialization::WorkOrder *proto = new serialization::WorkOrder;
-    proto->set_work_order_type(serialization::UPDATE);
-    proto->set_query_id(query_id_);
+  for (std::size_t part_id = 0; part_id < num_partitions_; ++part_id) {
+    for (const block_id input_block_id : input_blocks_[part_id]) {
+      serialization::WorkOrder *proto = new serialization::WorkOrder;
+      proto->set_work_order_type(serialization::UPDATE);
+      proto->set_query_id(query_id_);
 
-    proto->SetExtension(serialization::UpdateWorkOrder::operator_index, op_index_);
-    proto->SetExtension(serialization::UpdateWorkOrder::relation_id, relation_.getID());
-    proto->SetExtension(serialization::UpdateWorkOrder::insert_destination_index, relocation_destination_index_);
-    proto->SetExtension(serialization::UpdateWorkOrder::predicate_index, predicate_index_);
-    proto->SetExtension(serialization::UpdateWorkOrder::update_group_index, update_group_index_);
-    proto->SetExtension(serialization::UpdateWorkOrder::block_id, input_block_id);
+      proto->SetExtension(serialization::UpdateWorkOrder::operator_index, op_index_);
+      proto->SetExtension(serialization::UpdateWorkOrder::relation_id, relation_.getID());
+      proto->SetExtension(serialization::UpdateWorkOrder::insert_destination_index, relocation_destination_index_);
+      proto->SetExtension(serialization::UpdateWorkOrder::predicate_index, predicate_index_);
+      proto->SetExtension(serialization::UpdateWorkOrder::update_group_index, update_group_index_);
+      proto->SetExtension(serialization::UpdateWorkOrder::block_id, input_block_id);
+      proto->SetExtension(serialization::UpdateWorkOrder::partition_id, part_id);
 
-    container->addWorkOrderProto(proto, op_index_);
+      container->addWorkOrderProto(proto, op_index_);
+    }
   }
   started_ = true;
   return true;
@@ -120,6 +127,7 @@
   proto.set_block_id(input_block_id_);
   proto.set_relation_id(relation_.getID());
   proto.set_query_id(query_id_);
+  proto.set_partition_id(partition_id_);
 
   // NOTE(zuyu): Using the heap memory to serialize proto as a c-like string.
   const std::size_t proto_length = proto.ByteSize();
diff --git a/relational_operators/UpdateOperator.hpp b/relational_operators/UpdateOperator.hpp
index 8e020d8..fd2096b 100644
--- a/relational_operators/UpdateOperator.hpp
+++ b/relational_operators/UpdateOperator.hpp
@@ -28,6 +28,7 @@
 
 #include "catalog/CatalogRelation.hpp"
 #include "catalog/CatalogTypedefs.hpp"
+#include "catalog/PartitionScheme.hpp"
 #include "query_execution/QueryContext.hpp"
 #include "query_execution/QueryExecutionTypedefs.hpp"
 #include "relational_operators/RelationalOperator.hpp"
@@ -87,13 +88,24 @@
       const QueryContext::insert_destination_id relocation_destination_index,
       const QueryContext::predicate_id predicate_index,
       const QueryContext::update_group_id update_group_index)
-      : RelationalOperator(query_id),
+      : RelationalOperator(query_id, relation.getNumPartitions()),
         relation_(relation),
         relocation_destination_index_(relocation_destination_index),
         predicate_index_(predicate_index),
         update_group_index_(update_group_index),
-        input_blocks_(relation.getBlocksSnapshot()),
-        started_(false) {}
+        input_blocks_(num_partitions_),
+        started_(false) {
+    if (relation.hasPartitionScheme()) {
+      const PartitionScheme &part_scheme = *relation.getPartitionScheme();
+
+      for (partition_id part_id = 0; part_id < num_partitions_; ++part_id) {
+        input_blocks_[part_id] = part_scheme.getBlocksInPartition(part_id);
+      }
+    } else {
+      DCHECK_EQ(1u, num_partitions_);
+      input_blocks_[0] = relation.getBlocksSnapshot();
+    }
+  }
 
   ~UpdateOperator() override {}
 
@@ -127,7 +139,7 @@
   const QueryContext::predicate_id predicate_index_;
   const QueryContext::update_group_id update_group_index_;
 
-  const std::vector<block_id> input_blocks_;
+  std::vector<std::vector<block_id>> input_blocks_;
 
   bool started_;
 
@@ -144,6 +156,7 @@
    *
    * @param query_id The ID of the query to which this WorkOrder belongs.
    * @param relation The relation to perform the UPDATE over.
+   * @param part_id The partition id.
    * @param predicate All tuples matching \c predicate will be updated (or NULL
    *        to update all tuples).
    * @param assignments The assignments (the map of attribute_ids to Scalars)
@@ -161,6 +174,7 @@
   UpdateWorkOrder(
       const std::size_t query_id,
       const CatalogRelationSchema &relation,
+      const partition_id part_id,
       const block_id input_block_id,
       const Predicate *predicate,
       const std::unordered_map<attribute_id, std::unique_ptr<const Scalar>>
@@ -170,7 +184,7 @@
       const std::size_t update_operator_index,
       const tmb::client_id scheduler_client_id,
       MessageBus *bus)
-      : WorkOrder(query_id),
+      : WorkOrder(query_id, part_id),
         relation_(relation),
         input_block_id_(input_block_id),
         predicate_(predicate),
diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto
index 42a0e7d..6dafbe0 100644
--- a/relational_operators/WorkOrder.proto
+++ b/relational_operators/WorkOrder.proto
@@ -116,6 +116,7 @@
     optional int32 relation_id = 97;
     optional int32 predicate_index = 98;
     optional fixed64 block_id = 99;
+    optional uint64 partition_id = 100;
   }
 }
 
@@ -235,6 +236,7 @@
     // All required.
     optional fixed64 block_id = 224;
     optional bool force = 225;
+    optional uint64 partition_id = 226;
   }
 }
 
@@ -320,6 +322,7 @@
     optional int32 predicate_index = 323;
     optional uint32 update_group_index = 324;
     optional fixed64 block_id = 325;
+    optional uint64 partition_id = 326;
   }
 }
 
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
index 68f7f55..5baa21b 100644
--- a/relational_operators/WorkOrderFactory.cpp
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -170,11 +170,16 @@
               proto.GetExtension(serialization::BuildHashWorkOrder::lip_deployment_index), query_context));
     }
     case serialization::DELETE: {
-      LOG(INFO) << "Creating DeleteWorkOrder for Query " << query_id << " in Shiftboss " << shiftboss_index;
+      const partition_id part_id =
+          proto.GetExtension(serialization::DeleteWorkOrder::partition_id);
+
+      LOG(INFO) << "Creating DeleteWorkOrder (Partition " << part_id << ") for Query " << query_id
+                << " in Shiftboss " << shiftboss_index;
       return new DeleteWorkOrder(
           query_id,
           catalog_database->getRelationSchemaById(
               proto.GetExtension(serialization::DeleteWorkOrder::relation_id)),
+          part_id,
           proto.GetExtension(serialization::DeleteWorkOrder::block_id),
           query_context->getPredicate(
               proto.GetExtension(serialization::DeleteWorkOrder::predicate_index)),
@@ -436,10 +441,14 @@
           storage_manager);
     }
     case serialization::SAVE_BLOCKS: {
-      LOG(INFO) << "Creating SaveBlocksWorkOrder for Query " << query_id
+      const partition_id part_id =
+          proto.GetExtension(serialization::SaveBlocksWorkOrder::partition_id);
+
+      LOG(INFO) << "Creating SaveBlocksWorkOrder (Partition " << part_id << ") for Query " << query_id
                 << " in Shiftboss " << shiftboss_index;
       return new SaveBlocksWorkOrder(
           query_id,
+          part_id,
           proto.GetExtension(serialization::SaveBlocksWorkOrder::block_id),
           proto.GetExtension(serialization::SaveBlocksWorkOrder::force),
           storage_manager);
@@ -563,11 +572,16 @@
           storage_manager);
     }
     case serialization::UPDATE: {
-      LOG(INFO) << "Creating UpdateWorkOrder for Query " << query_id << " in Shiftboss " << shiftboss_index;
+      const partition_id part_id =
+          proto.GetExtension(serialization::UpdateWorkOrder::partition_id);
+
+      LOG(INFO) << "Creating UpdateWorkOrder (Partition " << part_id << ") for Query " << query_id
+                << " in Shiftboss " << shiftboss_index;
       return new UpdateWorkOrder(
           query_id,
           catalog_database->getRelationSchemaById(
               proto.GetExtension(serialization::UpdateWorkOrder::relation_id)),
+          part_id,
           proto.GetExtension(serialization::UpdateWorkOrder::block_id),
           query_context->getPredicate(
               proto.GetExtension(serialization::UpdateWorkOrder::predicate_index)),
@@ -600,7 +614,7 @@
     default:
       LOG(FATAL) << "Unknown WorkOrder Type in WorkOrderFactory::ReconstructFromProto in Shiftboss" << shiftboss_index;
   }
-}
+}  // NOLINT(readability/fn_size)
 
 bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto,
                                     const CatalogDatabaseLite &catalog_database,
@@ -726,7 +740,8 @@
              query_context.isValidPredicate(
                  proto.GetExtension(serialization::DeleteWorkOrder::predicate_index)) &&
              proto.HasExtension(serialization::DeleteWorkOrder::block_id) &&
-             proto.HasExtension(serialization::DeleteWorkOrder::operator_index);
+             proto.HasExtension(serialization::DeleteWorkOrder::operator_index) &&
+             proto.GetExtension(serialization::DeleteWorkOrder::partition_id);
     }
     case serialization::DESTROY_AGGREGATION_STATE: {
       return proto.HasExtension(serialization::DestroyAggregationStateWorkOrder::aggr_state_index) &&
@@ -885,7 +900,8 @@
     }
     case serialization::SAVE_BLOCKS: {
       return proto.HasExtension(serialization::SaveBlocksWorkOrder::block_id) &&
-             proto.HasExtension(serialization::SaveBlocksWorkOrder::force);
+             proto.HasExtension(serialization::SaveBlocksWorkOrder::force) &&
+             proto.HasExtension(serialization::SaveBlocksWorkOrder::partition_id);
     }
     case serialization::SELECT: {
       if (!proto.HasExtension(serialization::SelectWorkOrder::relation_id) ||
@@ -1016,7 +1032,8 @@
              query_context.isValidUpdateGroupId(
                  proto.GetExtension(serialization::UpdateWorkOrder::update_group_index)) &&
              proto.HasExtension(serialization::UpdateWorkOrder::operator_index) &&
-             proto.HasExtension(serialization::UpdateWorkOrder::block_id);
+             proto.HasExtension(serialization::UpdateWorkOrder::block_id) &&
+             proto.GetExtension(serialization::UpdateWorkOrder::partition_id);
     }
     case serialization::WINDOW_AGGREGATION: {
       return proto.HasExtension(serialization::WindowAggregationWorkOrder::window_aggr_state_index) &&