Merge branch 'master' into query-manager
diff --git a/catalog/CatalogDatabaseCache.cpp b/catalog/CatalogDatabaseCache.cpp
index 9989703..ec8f47a 100644
--- a/catalog/CatalogDatabaseCache.cpp
+++ b/catalog/CatalogDatabaseCache.cpp
@@ -19,6 +19,7 @@
 #include <memory>
 #include <unordered_map>
 #include <utility>
+#include <vector>
 
 #include "catalog/Catalog.pb.h"
 #include "catalog/CatalogRelationSchema.hpp"
@@ -30,6 +31,7 @@
 
 using std::make_unique;
 using std::move;
+using std::vector;
 
 namespace quickstep {
 
@@ -59,18 +61,34 @@
       << "Attempted to create CatalogDatabaseCache from an invalid proto description:\n"
       << proto.DebugString();
 
-  SpinSharedMutexExclusiveLock<false> lock(relations_mutex_);
-  for (int i = 0; i < proto.relations_size(); ++i) {
-    auto relation_schema = make_unique<const CatalogRelationSchema>(proto.relations(i));
-    const relation_id rel_id = relation_schema->getID();
-
-    auto it = rel_map_.find(rel_id);
-    if (it == rel_map_.end()) {
-      rel_map_.emplace(rel_id, move(relation_schema));
-    } else {
-      it->second.reset(relation_schema.release());
+  vector<int> new_relation_schema_proto_indices;
+  {
+    SpinSharedMutexSharedLock<false> read_lock(relations_mutex_);
+    for (int i = 0; i < proto.relations_size(); ++i) {
+      const auto it = rel_map_.find(proto.relations(i).relation_id());
+      if (it == rel_map_.end()) {
+        new_relation_schema_proto_indices.push_back(i);
+      } else {
+        // TODO(quickstep-team): Support schema changes by adding the index of
+        // changed schema proto in 'changed_relation_schema_proto_indices'.
+      }
     }
   }
+
+  SpinSharedMutexExclusiveLock<false> write_lock(relations_mutex_);
+  for (const int i : new_relation_schema_proto_indices) {
+    const serialization::CatalogRelationSchema &proto_relation = proto.relations(i);
+    auto relation_schema = make_unique<const CatalogRelationSchema>(proto_relation);
+    rel_map_.emplace(proto_relation.relation_id(), move(relation_schema));
+  }
+
+  // TODO(quickstep-team): Reset the schema for the changes in the following
+  // steps for each index in 'changed_relation_schema_proto_indices':
+  // 1. Drop the blocks belonged to 'proto.relations(i).relation_id()' in the
+  //    buffer pool.
+  // 2. Reset the changed schema, while the scheduler ensures no queries will
+  //    load back the related blocks.
+  // 3. Signal the scheduler to accept new queries for the changed schema.
 }
 
 void CatalogDatabaseCache::dropRelationById(const relation_id id) {
diff --git a/catalog/CatalogDatabaseCache.hpp b/catalog/CatalogDatabaseCache.hpp
index 3826537..77afe2a 100644
--- a/catalog/CatalogDatabaseCache.hpp
+++ b/catalog/CatalogDatabaseCache.hpp
@@ -42,7 +42,14 @@
  */
 
 /**
- * @brief A database cache used in the distributed version.
+ * @brief A database cache managed by Shiftboss in the distributed version.
+ *        During the runtime, it contains all the referenced relation schemas
+ *        to execute a query. For a SELECT query, the temporary query
+ *        result relation will be dropped after the CLI finishes processing the
+ *        result and notifies Shiftboss.
+ *
+ * @note A CatalogRelationSchema should be kept unless all associated blocks
+ *       have been deleted.
  **/
 class CatalogDatabaseCache : public CatalogDatabaseLite {
  public:
@@ -88,7 +95,7 @@
 
   /**
    * @brief Update the cache from its serialized Protocol Buffer form. If the
-   *        relation schema exists, it will be overwritten.
+   *        relation schema exists, it will be ignored.
    *
    * @param proto The Protocol Buffer serialization of a catalog cache,
    *        previously produced in optimizer.
diff --git a/catalog/tests/Catalog_unittest.cpp b/catalog/tests/Catalog_unittest.cpp
index 65593dc..8ee55a6 100644
--- a/catalog/tests/Catalog_unittest.cpp
+++ b/catalog/tests/Catalog_unittest.cpp
@@ -606,6 +606,10 @@
   CatalogDatabaseCache cache(db_->getProto());
   compareCatalogDatabaseCache(cache);
 
+  // Test dropping relations in the cache.
+  cache.dropRelationById(rel->getID());
+  ASSERT_EQ(0u, cache.size());
+
   // CatalogRelactionSchema changes.
   const std::size_t str_type_length = 8;
   rel->addAttribute(
@@ -620,10 +624,6 @@
   // Update the cache after the schema changed.
   cache.update(db_->getProto());
   compareCatalogDatabaseCache(cache);
-
-  // Test dropping relations in the cache.
-  cache.dropRelationById(rel->getID());
-  ASSERT_EQ(0u, cache.size());
 }
 
 }  // namespace quickstep
diff --git a/query_execution/QueryContext.hpp b/query_execution/QueryContext.hpp
index 3502d66..0e9e21c 100644
--- a/query_execution/QueryContext.hpp
+++ b/query_execution/QueryContext.hpp
@@ -144,6 +144,17 @@
                            const CatalogDatabaseLite &database);
 
   /**
+   * @brief Whether the given AggregationOperationState id is valid.
+   *
+   * @param id The AggregationOperationState id.
+   *
+   * @return True if valid, otherwise false.
+   **/
+  bool isValidAggregationStateId(const aggregation_state_id id) const {
+    return id < aggregation_states_.size();
+  }
+
+  /**
    * @brief Get the AggregationOperationState.
    *
    * @param id The AggregationOperationState id in the query.
@@ -170,6 +181,17 @@
   }
 
   /**
+   * @brief Whether the given GeneratorFunctionHandle id is valid.
+   *
+   * @param id The GeneratorFunctionHandle id.
+   *
+   * @return True if valid, otherwise false.
+   **/
+  bool isValidGeneratorFunctionId(const generator_function_id id) const {
+    return id < generator_functions_.size();
+  }
+
+  /**
    * @brief Get the GeneratorFunctionHandle.
    *
    * @param id The GeneratorFunctionHandle id in the query.
@@ -183,6 +205,19 @@
   }
 
   /**
+   * @brief Whether the given InsertDestination id is valid.
+   *
+   * @param id The InsertDestination id.
+   *
+   * @return True if valid, otherwise false.
+   **/
+  bool isValidInsertDestinationId(const insert_destination_id id) const {
+    return id != kInvalidInsertDestinationId
+        && id >= 0
+        && static_cast<std::size_t>(id) < insert_destinations_.size();
+  }
+
+  /**
    * @brief Get the InsertDestination.
    *
    * @param id The InsertDestination id in the query.
@@ -207,6 +242,17 @@
   }
 
   /**
+   * @brief Whether the given JoinHashTable id is valid.
+   *
+   * @param id The JoinHashTable id.
+   *
+   * @return True if valid, otherwise false.
+   **/
+  bool isValidJoinHashTableId(const join_hash_table_id id) const {
+    return id < join_hash_tables_.size();
+  }
+
+  /**
    * @brief Get the JoinHashTable.
    *
    * @param id The JoinHashTable id in the query.
@@ -229,6 +275,18 @@
   }
 
   /**
+   * @brief Whether the given Predicate id is valid or no predicate.
+   *
+   * @param id The Predicate id.
+   *
+   * @return True if valid or no predicate, otherwise false.
+   **/
+  bool isValidPredicate(const predicate_id id) const {
+    return (id == kInvalidPredicateId)  // No predicate.
+        || (id >= 0 && static_cast<std::size_t>(id) < predicates_.size());
+  }
+
+  /**
    * @brief Get the const Predicate.
    *
    * @param id The Predicate id in the query.
@@ -247,6 +305,19 @@
   }
 
   /**
+   * @brief Whether the given Scalar group id is valid.
+   *
+   * @param id The Scalar group id.
+   *
+   * @return True if valid, otherwise false.
+   **/
+  bool isValidScalarGroupId(const scalar_group_id id) const {
+    return id != kInvalidScalarGroupId
+        && id >= 0
+        && static_cast<std::size_t>(id) < scalar_groups_.size();
+  }
+
+  /**
    * @brief Get the group of Scalars.
    *
    * @param id The Scalar group id in the query.
@@ -260,6 +331,17 @@
     return scalar_groups_[id];
   }
 
+ /**
+   * @brief Whether the given SortConfiguration id is valid.
+   *
+   * @param id The SortConfiguration id.
+   *
+   * @return True if valid, otherwise false.
+   **/
+  bool isValidSortConfigId(const sort_config_id id) const {
+    return id < sort_configs_.size();
+  }
+
   /**
    * @brief Get the SortConfiguration.
    *
@@ -273,6 +355,17 @@
   }
 
   /**
+   * @brief Whether the given Tuple id is valid.
+   *
+   * @param id The Tuple id.
+   *
+   * @return True if valid, otherwise false.
+   **/
+  bool isValidTupleId(const tuple_id id) const {
+    return id < tuples_.size();
+  }
+
+  /**
    * @brief Release the ownership of the Tuple referenced by the id.
    *
    * @note Each id should use only once.
@@ -288,6 +381,17 @@
   }
 
   /**
+   * @brief Whether the given update assignments group id is valid.
+   *
+   * @param id The group id of the update assignments.
+   *
+   * @return True if valid, otherwise false.
+   **/
+  bool isValidUpdateGroupId(const update_group_id id) const {
+    return static_cast<std::size_t>(id) < update_groups_.size();
+  }
+
+  /**
    * @brief Get the group of the update assignments for UpdateWorkOrder.
    *
    * @param id The group id of the update assignments in the query.
diff --git a/query_optimizer/CMakeLists.txt b/query_optimizer/CMakeLists.txt
index a11a994..60319d4 100644
--- a/query_optimizer/CMakeLists.txt
+++ b/query_optimizer/CMakeLists.txt
@@ -1,5 +1,7 @@
 #   Copyright 2011-2015 Quickstep Technologies LLC.
 #   Copyright 2015-2016 Pivotal Software, Inc.
+#   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+#     University of Wisconsin—Madison.
 #
 #   Licensed under the Apache License, Version 2.0 (the "License");
 #   you may not use this file except in compliance with the License.
@@ -126,12 +128,15 @@
 target_link_libraries(quickstep_queryoptimizer_LogicalGenerator
                       glog
                       quickstep_parser_ParseStatement
+                      quickstep_queryoptimizer_OptimizerContext
                       quickstep_queryoptimizer_logical_Logical
                       quickstep_queryoptimizer_resolver_Resolver
                       quickstep_queryoptimizer_rules_CollapseProject
                       quickstep_queryoptimizer_rules_GenerateJoins
                       quickstep_queryoptimizer_rules_PushDownFilter
+                      quickstep_queryoptimizer_rules_PushDownSemiAntiJoin
                       quickstep_queryoptimizer_rules_Rule
+                      quickstep_queryoptimizer_rules_UnnestSubqueries
                       quickstep_queryoptimizer_Validator
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_queryoptimizer_LogicalToPhysicalMapper
diff --git a/query_optimizer/LogicalGenerator.cpp b/query_optimizer/LogicalGenerator.cpp
index 34b372b..509a1a0 100644
--- a/query_optimizer/LogicalGenerator.cpp
+++ b/query_optimizer/LogicalGenerator.cpp
@@ -1,6 +1,8 @@
 /**
  *   Copyright 2011-2015 Quickstep Technologies LLC.
  *   Copyright 2015 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin—Madison.
  *
  *   Licensed under the Apache License, Version 2.0 (the "License");
  *   you may not use this file except in compliance with the License.
@@ -22,13 +24,16 @@
 
 #include "parser/ParseStatement.hpp"
 
+#include "query_optimizer/OptimizerContext.hpp"
 #include "query_optimizer/Validator.hpp"
 #include "query_optimizer/logical/Logical.hpp"
 #include "query_optimizer/resolver/Resolver.hpp"
 #include "query_optimizer/rules/CollapseProject.hpp"
 #include "query_optimizer/rules/GenerateJoins.hpp"
 #include "query_optimizer/rules/PushDownFilter.hpp"
+#include "query_optimizer/rules/PushDownSemiAntiJoin.hpp"
 #include "query_optimizer/rules/Rule.hpp"
+#include "query_optimizer/rules/UnnestSubqueries.hpp"
 
 #include "glog/logging.h"
 
@@ -57,6 +62,10 @@
 
 void LogicalGenerator::optimizePlan() {
   std::vector<std::unique_ptr<Rule<L::Logical>>> rules;
+  if (optimizer_context_->has_nested_queries()) {
+    rules.emplace_back(new UnnestSubqueries(optimizer_context_));
+  }
+  rules.emplace_back(new PushDownSemiAntiJoin());
   rules.emplace_back(new PushDownFilter());
   rules.emplace_back(new GenerateJoins());
   rules.emplace_back(new PushDownFilter());
diff --git a/query_optimizer/logical/Aggregate.cpp b/query_optimizer/logical/Aggregate.cpp
index 3d9b471..27bbd1b 100644
--- a/query_optimizer/logical/Aggregate.cpp
+++ b/query_optimizer/logical/Aggregate.cpp
@@ -1,6 +1,8 @@
 /**
  *   Copyright 2011-2015 Quickstep Technologies LLC.
  *   Copyright 2015 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin—Madison.
  *
  *   Licensed under the Apache License, Version 2.0 (the "License");
  *   you may not use this file except in compliance with the License.
@@ -24,6 +26,7 @@
 #include "query_optimizer/expressions/AttributeReference.hpp"
 #include "query_optimizer/expressions/ExpressionUtil.hpp"
 #include "query_optimizer/expressions/NamedExpression.hpp"
+#include "query_optimizer/expressions/PatternMatcher.hpp"
 #include "utility/Cast.hpp"
 
 #include "glog/logging.h"
@@ -72,6 +75,35 @@
   return referenced_attributes;
 }
 
+LogicalPtr Aggregate::copyWithNewInputExpressions(
+    const std::vector<E::ExpressionPtr> &input_expressions) const {
+  DCHECK_EQ(grouping_expressions_.size() + aggregate_expressions_.size(),
+            input_expressions.size());
+
+  std::vector<E::NamedExpressionPtr> new_grouping_expressions;
+  for (std::vector<E::ExpressionPtr>::size_type index = 0;
+       index < grouping_expressions_.size();
+       ++index) {
+    E::NamedExpressionPtr grouping_expression;
+    E::SomeNamedExpression::MatchesWithConditionalCast(input_expressions[index],
+                                                       &grouping_expression);
+    DCHECK(grouping_expression != nullptr);
+    new_grouping_expressions.emplace_back(grouping_expression);
+  }
+
+  std::vector<E::AliasPtr> new_aggregate_expressions;
+  for (std::vector<E::ExpressionPtr>::size_type index = grouping_expressions_.size();
+       index < input_expressions.size();
+       ++index) {
+    E::AliasPtr aggregate_expression;
+    E::SomeAlias::MatchesWithConditionalCast(input_expressions[index], &aggregate_expression);
+    DCHECK(aggregate_expression != nullptr);
+    new_aggregate_expressions.emplace_back(aggregate_expression);
+  }
+
+  return Create(input_, new_grouping_expressions, new_aggregate_expressions);
+}
+
 void Aggregate::getFieldStringItems(
     std::vector<std::string> *inline_field_names,
     std::vector<std::string> *inline_field_values,
diff --git a/query_optimizer/logical/Aggregate.hpp b/query_optimizer/logical/Aggregate.hpp
index db25293..78bd9f3 100644
--- a/query_optimizer/logical/Aggregate.hpp
+++ b/query_optimizer/logical/Aggregate.hpp
@@ -1,6 +1,8 @@
 /**
  *   Copyright 2011-2015 Quickstep Technologies LLC.
  *   Copyright 2015 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin—Madison.
  *
  *   Licensed under the Apache License, Version 2.0 (the "License");
  *   you may not use this file except in compliance with the License.
@@ -24,6 +26,7 @@
 
 #include "query_optimizer/OptimizerTree.hpp"
 #include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/Expression.hpp"
 #include "query_optimizer/expressions/NamedExpression.hpp"
 #include "query_optimizer/expressions/Alias.hpp"
 #include "query_optimizer/logical/Logical.hpp"
@@ -73,6 +76,9 @@
   LogicalPtr copyWithNewChildren(
       const std::vector<LogicalPtr> &new_children) const override;
 
+  LogicalPtr copyWithNewInputExpressions(
+      const std::vector<expressions::ExpressionPtr> &input_expressions) const override;
+
   std::vector<expressions::AttributeReferencePtr> getOutputAttributes() const override;
 
   std::vector<expressions::AttributeReferencePtr> getReferencedAttributes() const override;
@@ -110,6 +116,8 @@
         grouping_expressions_(grouping_expressions),
         aggregate_expressions_(aggregate_expressions) {
     addChild(input_);
+    addInputExpressions(grouping_expressions_);
+    addInputExpressions(aggregate_expressions_);
   }
 
   LogicalPtr input_;
diff --git a/query_optimizer/logical/CMakeLists.txt b/query_optimizer/logical/CMakeLists.txt
index 38cff8d..0467233 100644
--- a/query_optimizer/logical/CMakeLists.txt
+++ b/query_optimizer/logical/CMakeLists.txt
@@ -1,5 +1,7 @@
 #   Copyright 2011-2015 Quickstep Technologies LLC.
 #   Copyright 2015-2016 Pivotal Software, Inc.
+#   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+#     University of Wisconsin—Madison.
 #
 #   Licensed under the Apache License, Version 2.0 (the "License");
 #   you may not use this file except in compliance with the License.
@@ -48,8 +50,10 @@
                       quickstep_queryoptimizer_OptimizerTree
                       quickstep_queryoptimizer_expressions_Alias
                       quickstep_queryoptimizer_expressions_AttributeReference
+                      quickstep_queryoptimizer_expressions_Expression
                       quickstep_queryoptimizer_expressions_ExpressionUtil
                       quickstep_queryoptimizer_expressions_NamedExpression
+                      quickstep_queryoptimizer_expressions_PatternMatcher
                       quickstep_queryoptimizer_logical_Logical
                       quickstep_queryoptimizer_logical_LogicalType
                       quickstep_utility_Cast
@@ -109,7 +113,9 @@
                       glog
                       quickstep_queryoptimizer_OptimizerTree
                       quickstep_queryoptimizer_expressions_AttributeReference
+                      quickstep_queryoptimizer_expressions_Expression
                       quickstep_queryoptimizer_expressions_LogicalAnd
+                      quickstep_queryoptimizer_expressions_PatternMatcher
                       quickstep_queryoptimizer_expressions_Predicate
                       quickstep_queryoptimizer_logical_Logical
                       quickstep_queryoptimizer_logical_LogicalType
@@ -119,6 +125,7 @@
                       glog
                       quickstep_queryoptimizer_OptimizerTree
                       quickstep_queryoptimizer_expressions_AttributeReference
+                      quickstep_queryoptimizer_expressions_Predicate
                       quickstep_queryoptimizer_logical_BinaryJoin
                       quickstep_queryoptimizer_logical_Logical
                       quickstep_queryoptimizer_logical_LogicalType
@@ -146,6 +153,7 @@
 target_link_libraries(quickstep_queryoptimizer_logical_Logical
                       quickstep_queryoptimizer_OptimizerTree
                       quickstep_queryoptimizer_expressions_AttributeReference
+                      quickstep_queryoptimizer_expressions_Expression
                       quickstep_queryoptimizer_logical_LogicalType
                       quickstep_utility_Macros)
 target_link_libraries(quickstep_queryoptimizer_logical_MultiwayCartesianJoin
@@ -160,6 +168,9 @@
 target_link_libraries(quickstep_queryoptimizer_logical_NestedLoopsJoin
                       glog
                       quickstep_queryoptimizer_OptimizerTree
+                      quickstep_queryoptimizer_expressions_AttributeReference
+                      quickstep_queryoptimizer_expressions_Expression
+                      quickstep_queryoptimizer_expressions_PatternMatcher
                       quickstep_queryoptimizer_expressions_Predicate
                       quickstep_queryoptimizer_expressions_PredicateLiteral
                       quickstep_queryoptimizer_logical_BinaryJoin
@@ -172,8 +183,10 @@
                       glog
                       quickstep_queryoptimizer_OptimizerTree
                       quickstep_queryoptimizer_expressions_AttributeReference
+                      quickstep_queryoptimizer_expressions_Expression
                       quickstep_queryoptimizer_expressions_ExpressionUtil
                       quickstep_queryoptimizer_expressions_NamedExpression
+                      quickstep_queryoptimizer_expressions_PatternMatcher
                       quickstep_queryoptimizer_logical_Logical
                       quickstep_queryoptimizer_logical_LogicalType
                       quickstep_utility_Cast
@@ -230,6 +243,7 @@
                       glog
                       quickstep_queryoptimizer_OptimizerTree
                       quickstep_queryoptimizer_expressions_AttributeReference
+                      quickstep_queryoptimizer_expressions_ExprId
                       quickstep_queryoptimizer_logical_Logical
                       quickstep_queryoptimizer_logical_LogicalType
                       quickstep_utility_Cast
diff --git a/query_optimizer/logical/Filter.cpp b/query_optimizer/logical/Filter.cpp
index 846725f..05f02ed 100644
--- a/query_optimizer/logical/Filter.cpp
+++ b/query_optimizer/logical/Filter.cpp
@@ -1,6 +1,8 @@
 /**
  *   Copyright 2011-2015 Quickstep Technologies LLC.
  *   Copyright 2015 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin—Madison.
  *
  *   Licensed under the Apache License, Version 2.0 (the "License");
  *   you may not use this file except in compliance with the License.
@@ -20,8 +22,12 @@
 #include <vector>
 
 #include "query_optimizer/OptimizerTree.hpp"
+#include "query_optimizer/expressions/PatternMatcher.hpp"
+#include "query_optimizer/expressions/Predicate.hpp"
 #include "query_optimizer/logical/PatternMatcher.hpp"
 
+#include "glog/logging.h"
+
 namespace quickstep {
 namespace optimizer {
 namespace logical {
@@ -43,6 +49,18 @@
     filter_predicate_ = filter_predicate;
   }
   addChild(flattened_input);
+  addInputExpression(filter_predicate_);
+}
+
+LogicalPtr Filter::copyWithNewInputExpressions(
+    const std::vector<expressions::ExpressionPtr> &input_expressions) const {
+  DCHECK_EQ(1u, input_expressions.size());
+
+  E::PredicatePtr new_filter_predicate;
+  E::SomePredicate::MatchesWithConditionalCast(input_expressions[0], &new_filter_predicate);
+  DCHECK(new_filter_predicate != nullptr);
+
+  return Create(children()[0], new_filter_predicate);
 }
 
 void Filter::getFieldStringItems(
diff --git a/query_optimizer/logical/Filter.hpp b/query_optimizer/logical/Filter.hpp
index 7101453..fd0016c 100644
--- a/query_optimizer/logical/Filter.hpp
+++ b/query_optimizer/logical/Filter.hpp
@@ -1,6 +1,8 @@
 /**
  *   Copyright 2011-2015 Quickstep Technologies LLC.
  *   Copyright 2015 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin—Madison.
  *
  *   Licensed under the Apache License, Version 2.0 (the "License");
  *   you may not use this file except in compliance with the License.
@@ -24,6 +26,7 @@
 
 #include "query_optimizer/OptimizerTree.hpp"
 #include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/Expression.hpp"
 #include "query_optimizer/expressions/LogicalAnd.hpp"
 #include "query_optimizer/expressions/Predicate.hpp"
 #include "query_optimizer/logical/Logical.hpp"
@@ -71,6 +74,9 @@
     return Filter::Create(new_children[0], filter_predicate_);
   }
 
+  LogicalPtr copyWithNewInputExpressions(
+      const std::vector<expressions::ExpressionPtr> &input_expressions) const override;
+
   std::vector<expressions::AttributeReferencePtr> getOutputAttributes() const override {
     return input()->getOutputAttributes();
   }
diff --git a/query_optimizer/logical/HashJoin.hpp b/query_optimizer/logical/HashJoin.hpp
index d5f515a..2ae30cf 100644
--- a/query_optimizer/logical/HashJoin.hpp
+++ b/query_optimizer/logical/HashJoin.hpp
@@ -1,6 +1,8 @@
 /**
  *   Copyright 2011-2015 Quickstep Technologies LLC.
  *   Copyright 2015 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin—Madison.
  *
  *   Licensed under the Apache License, Version 2.0 (the "License");
  *   you may not use this file except in compliance with the License.
@@ -20,10 +22,12 @@
 
 #include <memory>
 #include <string>
+#include <type_traits>
 #include <vector>
 
 #include "query_optimizer/OptimizerTree.hpp"
 #include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/Predicate.hpp"
 #include "query_optimizer/logical/BinaryJoin.hpp"
 #include "query_optimizer/logical/Logical.hpp"
 #include "query_optimizer/logical/LogicalType.hpp"
@@ -50,9 +54,28 @@
  */
 class HashJoin : public BinaryJoin {
  public:
+  enum class JoinType {
+    kInnerJoin = 0,
+    kLeftSemiJoin,
+    kLeftAntiJoin
+  };
+
+
   LogicalType getLogicalType() const override { return LogicalType::kHashJoin; }
 
-  std::string getName() const override { return "HashJoin"; }
+  std::string getName() const override {
+    switch (join_type_) {
+      case JoinType::kInnerJoin:
+        return "HashJoin";
+      case JoinType::kLeftSemiJoin:
+        return "HashLeftSemiJoin";
+      case JoinType::kLeftAntiJoin:
+        return "HashLeftAntiJoin";
+      default:
+        LOG(FATAL) << "Invalid JoinType: "
+                   << static_cast<typename std::underlying_type<JoinType>::type>(join_type_);
+    }
+  }
 
   /**
    * @brief Join attributes in the left logical 'left_'.
@@ -68,13 +91,29 @@
     return right_join_attributes_;
   }
 
+  /**
+   * @brief The filtering predicate evaluated after join.
+   */
+  const expressions::PredicatePtr& residual_predicate() const {
+    return residual_predicate_;
+  }
+
+  /**
+   * @return Join type of this hash join.
+   */
+  JoinType join_type() const {
+    return join_type_;
+  }
+
   LogicalPtr copyWithNewChildren(
       const std::vector<LogicalPtr> &new_children) const override {
     DCHECK_EQ(new_children.size(), children().size());
     return HashJoin::Create(new_children[0],
                             new_children[1],
                             left_join_attributes_,
-                            right_join_attributes_);
+                            right_join_attributes_,
+                            residual_predicate_,
+                            join_type_);
   }
 
   std::vector<expressions::AttributeReferencePtr> getReferencedAttributes() const override {
@@ -98,13 +137,22 @@
    * @param right The right input operator.
    * @param left_join_attributes The join attributes in the 'left'.
    * @param right_join_attributes The join attributes in the 'right'.
+   * @param Join type of this hash join.
    * @return An immutable HashJoin.
    */
   static HashJoinPtr Create(const LogicalPtr &left,
                             const LogicalPtr &right,
                             const std::vector<expressions::AttributeReferencePtr> &left_join_attributes,
-                            const std::vector<expressions::AttributeReferencePtr> &right_join_attributes) {
-    return HashJoinPtr(new HashJoin(left, right, left_join_attributes, right_join_attributes));
+                            const std::vector<expressions::AttributeReferencePtr> &right_join_attributes,
+                            const expressions::PredicatePtr &residual_predicate,
+                            const JoinType join_type) {
+    return HashJoinPtr(
+        new HashJoin(left,
+                     right,
+                     left_join_attributes,
+                     right_join_attributes,
+                     residual_predicate,
+                     join_type));
   }
 
  protected:
@@ -132,16 +180,26 @@
   HashJoin(const LogicalPtr &left,
            const LogicalPtr &right,
            const std::vector<expressions::AttributeReferencePtr> &left_join_attributes,
-           const std::vector<expressions::AttributeReferencePtr> &right_join_attributes)
+           const std::vector<expressions::AttributeReferencePtr> &right_join_attributes,
+           const expressions::PredicatePtr &residual_predicate,
+           const JoinType join_type)
       : BinaryJoin(left, right),
         left_join_attributes_(left_join_attributes),
-        right_join_attributes_(right_join_attributes) {
+        right_join_attributes_(right_join_attributes),
+        residual_predicate_(residual_predicate),
+        join_type_(join_type) {
     DCHECK_EQ(left_join_attributes.size(), right_join_attributes.size());
     DCHECK(!left_join_attributes.empty());
+
+    if (residual_predicate_ != nullptr) {
+      addInputExpression(residual_predicate_);
+    }
   }
 
   std::vector<expressions::AttributeReferencePtr> left_join_attributes_;
   std::vector<expressions::AttributeReferencePtr> right_join_attributes_;
+  expressions::PredicatePtr residual_predicate_;
+  const JoinType join_type_;
 
   DISALLOW_COPY_AND_ASSIGN(HashJoin);
 };
diff --git a/query_optimizer/logical/Logical.hpp b/query_optimizer/logical/Logical.hpp
index f0850aa..a1e9155 100644
--- a/query_optimizer/logical/Logical.hpp
+++ b/query_optimizer/logical/Logical.hpp
@@ -1,6 +1,8 @@
 /**
  *   Copyright 2011-2015 Quickstep Technologies LLC.
  *   Copyright 2015 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin—Madison.
  *
  *   Licensed under the Apache License, Version 2.0 (the "License");
  *   you may not use this file except in compliance with the License.
@@ -23,9 +25,12 @@
 
 #include "query_optimizer/OptimizerTree.hpp"
 #include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/Expression.hpp"
 #include "query_optimizer/logical/LogicalType.hpp"
 #include "utility/Macros.hpp"
 
+#include "glog/logging.h"
+
 namespace quickstep {
 namespace optimizer {
 namespace logical {
@@ -40,7 +45,8 @@
 /**
  * @brief Base class for all logical operator/node.
  */
-class Logical : public OptimizerTree<Logical> {
+class Logical : public OptimizerTree<Logical>,
+                public std::enable_shared_from_this<const Logical> {
  public:
   /**
    * @brief Destructor.
@@ -64,12 +70,62 @@
    */
   virtual std::vector<expressions::AttributeReferencePtr> getReferencedAttributes() const = 0;
 
+  /**
+   * @brief Make a copy of this logical node with input expressions replaced by
+   *        input_expression.
+   *
+   * @return A copy of this logical node with the new input expressions.
+   */
+  virtual LogicalPtr copyWithNewInputExpressions(
+      const std::vector<expressions::ExpressionPtr> &input_expressions) const {
+    // Default implementation: return a shared reference to "this" when there is
+    // no change.
+    DCHECK(input_expressions.empty());
+    return shared_from_this();
+  }
+
+  /**
+   * @return The input expressions in this Logical.
+   */
+  const std::vector<expressions::ExpressionPtr>& input_expressions() const {
+    return input_expressions_;
+  }
+
+  /**
+   * @return The input attributes, which are the union set of the output attributes of
+   *         all children.
+   */
+  std::vector<expressions::AttributeReferencePtr> getInputAttributes() const {
+    std::vector<expressions::AttributeReferencePtr> input_attributes;
+    for (const LogicalPtr &child : children()) {
+      const std::vector<expressions::AttributeReferencePtr> child_output_attributes =
+          child->getOutputAttributes();
+      input_attributes.insert(input_attributes.end(),
+                              child_output_attributes.begin(),
+                              child_output_attributes.end());
+    }
+    return input_attributes;
+  }
+
  protected:
   /**
    * @brief Constructor.
    */
   Logical() {}
 
+  template <class ExpressionClass>
+  void addInputExpressions(const std::vector<ExpressionClass> &new_input_expressions) {
+    input_expressions_.insert(input_expressions_.end(),
+                              new_input_expressions.begin(),
+                              new_input_expressions.end());
+  }
+
+  void addInputExpression(const expressions::ExpressionPtr &new_input_expression) {
+    input_expressions_.emplace_back(new_input_expression);
+  }
+
+  std::vector<expressions::ExpressionPtr> input_expressions_;
+
  private:
   DISALLOW_COPY_AND_ASSIGN(Logical);
 };
diff --git a/query_optimizer/logical/NestedLoopsJoin.hpp b/query_optimizer/logical/NestedLoopsJoin.hpp
index 933f01c..febd845 100644
--- a/query_optimizer/logical/NestedLoopsJoin.hpp
+++ b/query_optimizer/logical/NestedLoopsJoin.hpp
@@ -1,6 +1,8 @@
 /**
  *   Copyright 2011-2015 Quickstep Technologies LLC.
  *   Copyright 2015 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin—Madison.
  *
  *   Licensed under the Apache License, Version 2.0 (the "License");
  *   you may not use this file except in compliance with the License.
@@ -23,6 +25,9 @@
 #include <vector>
 
 #include "query_optimizer/OptimizerTree.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/Expression.hpp"
+#include "query_optimizer/expressions/PatternMatcher.hpp"
 #include "query_optimizer/expressions/Predicate.hpp"
 #include "query_optimizer/expressions/PredicateLiteral.hpp"
 #include "query_optimizer/logical/BinaryJoin.hpp"
@@ -70,6 +75,18 @@
                                    join_predicate_);
   }
 
+  LogicalPtr copyWithNewInputExpressions(
+      const std::vector<expressions::ExpressionPtr> &input_expressions) const override {
+    DCHECK_EQ(1u, input_expressions.size());
+
+    expressions::PredicatePtr new_filter_predicate;
+    expressions::SomePredicate::MatchesWithConditionalCast(input_expressions[0],
+                                                           &new_filter_predicate);
+    DCHECK(new_filter_predicate != nullptr);
+
+    return Create(left(), right(), new_filter_predicate);
+  }
+
   std::vector<expressions::AttributeReferencePtr> getReferencedAttributes() const override {
     return join_predicate_->getReferencedAttributes();
   }
@@ -119,7 +136,9 @@
                   const LogicalPtr &right_operand,
                   const expressions::PredicatePtr &join_predicate)
       : BinaryJoin(left_operand, right_operand),
-        join_predicate_(join_predicate) {}
+        join_predicate_(join_predicate) {
+    addInputExpression(join_predicate);
+  }
 
   expressions::PredicatePtr join_predicate_;
 
diff --git a/query_optimizer/logical/Project.cpp b/query_optimizer/logical/Project.cpp
index 3f06e06..fd9200e 100644
--- a/query_optimizer/logical/Project.cpp
+++ b/query_optimizer/logical/Project.cpp
@@ -1,6 +1,8 @@
 /**
  *   Copyright 2011-2015 Quickstep Technologies LLC.
  *   Copyright 2015 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin—Madison.
  *
  *   Licensed under the Apache License, Version 2.0 (the "License");
  *   you may not use this file except in compliance with the License.
@@ -23,6 +25,7 @@
 #include "query_optimizer/OptimizerTree.hpp"
 #include "query_optimizer/expressions/ExpressionUtil.hpp"
 #include "query_optimizer/expressions/NamedExpression.hpp"
+#include "query_optimizer/expressions/PatternMatcher.hpp"
 #include "utility/Cast.hpp"
 
 #include "glog/logging.h"
@@ -55,6 +58,19 @@
   return referenced_attributes;
 }
 
+LogicalPtr Project::copyWithNewInputExpressions(
+    const std::vector<E::ExpressionPtr> &input_expressions) const {
+  DCHECK_EQ(project_expressions_.size(), input_expressions.size());
+  std::vector<E::NamedExpressionPtr> new_project_expressions;
+  for (const E::ExpressionPtr &input_expression : input_expressions) {
+    E::NamedExpressionPtr project_expression;
+    E::SomeNamedExpression::MatchesWithConditionalCast(input_expression, &project_expression);
+    DCHECK(project_expression != nullptr);
+    new_project_expressions.emplace_back(project_expression);
+  }
+  return Create(input_, new_project_expressions);
+}
+
 void Project::getFieldStringItems(
     std::vector<std::string> *inline_field_names,
     std::vector<std::string> *inline_field_values,
diff --git a/query_optimizer/logical/Project.hpp b/query_optimizer/logical/Project.hpp
index b894b9a..02113fa 100644
--- a/query_optimizer/logical/Project.hpp
+++ b/query_optimizer/logical/Project.hpp
@@ -1,6 +1,8 @@
 /**
  *   Copyright 2011-2015 Quickstep Technologies LLC.
  *   Copyright 2015 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin—Madison.
  *
  *   Licensed under the Apache License, Version 2.0 (the "License");
  *   you may not use this file except in compliance with the License.
@@ -24,6 +26,7 @@
 
 #include "query_optimizer/OptimizerTree.hpp"
 #include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/Expression.hpp"
 #include "query_optimizer/expressions/NamedExpression.hpp"
 #include "query_optimizer/logical/Logical.hpp"
 #include "query_optimizer/logical/LogicalType.hpp"
@@ -65,6 +68,9 @@
   LogicalPtr copyWithNewChildren(
       const std::vector<LogicalPtr> &new_children) const override;
 
+  LogicalPtr copyWithNewInputExpressions(
+      const std::vector<expressions::ExpressionPtr> &input_expressions) const override;
+
   std::vector<expressions::AttributeReferencePtr> getOutputAttributes() const override;
 
   std::vector<expressions::AttributeReferencePtr> getReferencedAttributes() const override;
@@ -98,6 +104,7 @@
       const std::vector<expressions::NamedExpressionPtr> &project_expressions)
       : input_(input), project_expressions_(project_expressions) {
     addChild(input);
+    addInputExpressions(project_expressions_);
   }
 
   LogicalPtr input_;
diff --git a/query_optimizer/logical/TopLevelPlan.hpp b/query_optimizer/logical/TopLevelPlan.hpp
index 71d5922..747a14e 100644
--- a/query_optimizer/logical/TopLevelPlan.hpp
+++ b/query_optimizer/logical/TopLevelPlan.hpp
@@ -1,6 +1,8 @@
 /**
  *   Copyright 2011-2015 Quickstep Technologies LLC.
  *   Copyright 2015 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin—Madison.
  *
  *   Licensed under the Apache License, Version 2.0 (the "License");
  *   you may not use this file except in compliance with the License.
@@ -20,10 +22,12 @@
 
 #include <memory>
 #include <string>
+#include <unordered_map>
 #include <vector>
 
 #include "query_optimizer/OptimizerTree.hpp"
 #include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/ExprId.hpp"
 #include "query_optimizer/logical/Logical.hpp"
 #include "query_optimizer/logical/LogicalType.hpp"
 #include "utility/Macros.hpp"
@@ -77,7 +81,15 @@
                                  ++new_children.begin(),
                                  new_children.end());
     }
-    return TopLevelPlan::Create(new_children[0], new_shared_subplans);
+    return Create(new_children[0], new_shared_subplans, uncorrelated_subquery_map_);
+  }
+
+  /**
+   * @return Map from the expression ID of an attribute reference to the
+   *         uncorrelated subquery that produces the attribute.
+   */
+  const std::unordered_map<expressions::ExprId, int>& uncorrelated_subquery_map() const {
+    return uncorrelated_subquery_map_;
   }
 
   std::vector<expressions::AttributeReferencePtr> getOutputAttributes() const override {
@@ -93,11 +105,14 @@
    *
    * @param plan The input plan.
    * @param shared_subplans The subplans referenced in the main input plan.
+   * @param uncorrelated_subquery_map Map from expression IDs to uncorrelated scalar subqueries.
    * @return An immutable TopLevelPlan.
    */
   static TopLevelPlanPtr Create(const LogicalPtr &plan,
-                                const std::vector<LogicalPtr> &shared_subplans = std::vector<LogicalPtr>()) {
-    return TopLevelPlanPtr(new TopLevelPlan(plan, shared_subplans));
+                                const std::vector<LogicalPtr> &shared_subplans = std::vector<LogicalPtr>(),
+                                const std::unordered_map<expressions::ExprId, int> &uncorrelated_subquery_map =
+                                    std::unordered_map<expressions::ExprId, int>()) {
+    return TopLevelPlanPtr(new TopLevelPlan(plan, shared_subplans, uncorrelated_subquery_map));
   }
 
  protected:
@@ -111,9 +126,11 @@
 
  private:
   TopLevelPlan(const LogicalPtr &plan,
-               const std::vector<LogicalPtr> &shared_subplans)
+               const std::vector<LogicalPtr> &shared_subplans,
+               const std::unordered_map<expressions::ExprId, int> &uncorrelated_subquery_map)
       : plan_(plan),
-        shared_subplans_(shared_subplans) {
+        shared_subplans_(shared_subplans),
+        uncorrelated_subquery_map_(uncorrelated_subquery_map) {
     addChild(plan);
     for (const LogicalPtr &shared_subplan : shared_subplans) {
       addChild(shared_subplan);
@@ -124,6 +141,8 @@
   // Stored in the topological ordering based on dependencies.
   std::vector<LogicalPtr> shared_subplans_;
 
+  std::unordered_map<expressions::ExprId, int> uncorrelated_subquery_map_;
+
   DISALLOW_COPY_AND_ASSIGN(TopLevelPlan);
 };
 
diff --git a/query_optimizer/physical/CMakeLists.txt b/query_optimizer/physical/CMakeLists.txt
index d833423..ea3752d 100644
--- a/query_optimizer/physical/CMakeLists.txt
+++ b/query_optimizer/physical/CMakeLists.txt
@@ -232,8 +232,8 @@
                       glog
                       quickstep_queryoptimizer_OptimizerTree
                       quickstep_queryoptimizer_expressions_AttributeReference
+                      quickstep_queryoptimizer_expressions_ExprId
                       quickstep_queryoptimizer_expressions_ExpressionUtil
-                      quickstep_queryoptimizer_expressions_NamedExpression
                       quickstep_queryoptimizer_physical_Physical
                       quickstep_queryoptimizer_physical_PhysicalType
                       quickstep_utility_Cast
diff --git a/query_optimizer/physical/HashJoin.cpp b/query_optimizer/physical/HashJoin.cpp
index 2fb8b2b..71c3692 100644
--- a/query_optimizer/physical/HashJoin.cpp
+++ b/query_optimizer/physical/HashJoin.cpp
@@ -1,6 +1,8 @@
 /**
  *   Copyright 2011-2015 Quickstep Technologies LLC.
  *   Copyright 2015 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin—Madison.
  *
  *   Licensed under the Apache License, Version 2.0 (the "License");
  *   you may not use this file except in compliance with the License.
@@ -76,7 +78,8 @@
                      left_join_attributes_,
                      right_join_attributes_,
                      residual_predicate_,
-                     new_project_expressions);
+                     new_project_expressions,
+                     join_type_);
     return true;
   }
   return false;
diff --git a/query_optimizer/physical/HashJoin.hpp b/query_optimizer/physical/HashJoin.hpp
index 265e0d4..80e7c2f 100644
--- a/query_optimizer/physical/HashJoin.hpp
+++ b/query_optimizer/physical/HashJoin.hpp
@@ -1,6 +1,8 @@
 /**
  *   Copyright 2011-2015 Quickstep Technologies LLC.
  *   Copyright 2015 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin—Madison.
  *
  *   Licensed under the Apache License, Version 2.0 (the "License");
  *   you may not use this file except in compliance with the License.
@@ -20,6 +22,7 @@
 
 #include <memory>
 #include <string>
+#include <type_traits>
 #include <vector>
 
 #include "query_optimizer/OptimizerTree.hpp"
@@ -50,9 +53,27 @@
  */
 class HashJoin : public BinaryJoin {
  public:
+  enum class JoinType {
+    kInnerJoin = 0,
+    kLeftSemiJoin,
+    kLeftAntiJoin
+  };
+
   PhysicalType getPhysicalType() const override { return PhysicalType::kHashJoin; }
 
-  std::string getName() const override { return "HashJoin"; }
+  std::string getName() const override {
+    switch (join_type_) {
+      case JoinType::kInnerJoin:
+        return "HashJoin";
+      case JoinType::kLeftSemiJoin:
+        return "HashLeftSemiJoin";
+      case JoinType::kLeftAntiJoin:
+        return "HashLeftAntiJoin";
+      default:
+        LOG(FATAL) << "Invalid JoinType: "
+                   << static_cast<typename std::underlying_type<JoinType>::type>(join_type_);
+    }
+  }
 
   /**
    * @brief Join attributes in the left logical 'left_'.
@@ -75,6 +96,13 @@
     return residual_predicate_;
   }
 
+  /**
+   * @return Join type of this hash join.
+   */
+  JoinType join_type() const {
+    return join_type_;
+  }
+
   PhysicalPtr copyWithNewChildren(
       const std::vector<PhysicalPtr> &new_children) const override {
     DCHECK_EQ(children().size(), new_children.size());
@@ -83,7 +111,8 @@
                   left_join_attributes_,
                   right_join_attributes_,
                   residual_predicate_,
-                  project_expressions());
+                  project_expressions(),
+                  join_type_);
   }
 
   std::vector<expressions::AttributeReferencePtr> getReferencedAttributes() const override;
@@ -102,6 +131,7 @@
    * @param right_join_attributes The join attributes in the 'right'.
    * @param residual_predicate Optional filtering predicate evaluated after join.
    * @param project_expressions The project expressions.
+   * @param Join type of this hash join.
    * @return An immutable physical HashJoin.
    */
   static HashJoinPtr Create(
@@ -110,14 +140,16 @@
       const std::vector<expressions::AttributeReferencePtr> &left_join_attributes,
       const std::vector<expressions::AttributeReferencePtr> &right_join_attributes,
       const expressions::PredicatePtr &residual_predicate,
-      const std::vector<expressions::NamedExpressionPtr> &project_expressions) {
+      const std::vector<expressions::NamedExpressionPtr> &project_expressions,
+      const JoinType join_type) {
     return HashJoinPtr(
         new HashJoin(left,
                      right,
                      left_join_attributes,
                      right_join_attributes,
                      residual_predicate,
-                     project_expressions));
+                     project_expressions,
+                     join_type));
   }
 
  protected:
@@ -136,15 +168,19 @@
       const std::vector<expressions::AttributeReferencePtr> &left_join_attributes,
       const std::vector<expressions::AttributeReferencePtr> &right_join_attributes,
       const expressions::PredicatePtr &residual_predicate,
-      const std::vector<expressions::NamedExpressionPtr> &project_expressions)
+      const std::vector<expressions::NamedExpressionPtr> &project_expressions,
+      const JoinType join_type)
       : BinaryJoin(left, right, project_expressions),
         left_join_attributes_(left_join_attributes),
         right_join_attributes_(right_join_attributes),
-        residual_predicate_(residual_predicate) {}
+        residual_predicate_(residual_predicate),
+        join_type_(join_type) {
+  }
 
   std::vector<expressions::AttributeReferencePtr> left_join_attributes_;
   std::vector<expressions::AttributeReferencePtr> right_join_attributes_;
   expressions::PredicatePtr residual_predicate_;
+  JoinType join_type_;
 
   DISALLOW_COPY_AND_ASSIGN(HashJoin);
 };
diff --git a/query_optimizer/physical/TopLevelPlan.hpp b/query_optimizer/physical/TopLevelPlan.hpp
index 4ee6b5b..5e655fc 100644
--- a/query_optimizer/physical/TopLevelPlan.hpp
+++ b/query_optimizer/physical/TopLevelPlan.hpp
@@ -1,6 +1,8 @@
 /**
  *   Copyright 2011-2015 Quickstep Technologies LLC.
  *   Copyright 2015 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin—Madison.
  *
  *   Licensed under the Apache License, Version 2.0 (the "License");
  *   you may not use this file except in compliance with the License.
@@ -20,12 +22,13 @@
 
 #include <memory>
 #include <string>
+#include <unordered_map>
 #include <vector>
 
 #include "query_optimizer/OptimizerTree.hpp"
 #include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/ExprId.hpp"
 #include "query_optimizer/expressions/ExpressionUtil.hpp"
-#include "query_optimizer/expressions/NamedExpression.hpp"
 #include "query_optimizer/physical/Physical.hpp"
 #include "query_optimizer/physical/PhysicalType.hpp"
 #include "utility/Macros.hpp"
@@ -69,6 +72,14 @@
   }
 
   /**
+   * @return Map from the expression ID of an attribute reference to the
+   *         uncorrelated subquery that produces the attribute.
+   */
+  const std::unordered_map<expressions::ExprId, int>& uncorrelated_subquery_map() const {
+    return uncorrelated_subquery_map_;
+  }
+
+  /**
    * @brief Gets a shared subquery given by the position.
    *
    * @param index The position of the shared subplan to be returned.
@@ -89,7 +100,9 @@
                                  ++new_children.begin(),
                                  new_children.end());
     }
-    return TopLevelPlan::Create(new_children[0], new_shared_subplans);
+    return TopLevelPlan::Create(new_children[0],
+                                new_shared_subplans,
+                                uncorrelated_subquery_map_);
   }
 
   std::vector<expressions::AttributeReferencePtr> getOutputAttributes() const override {
@@ -112,11 +125,18 @@
    *
    * @param plan The query plan.
    * @param shared_subplans The subplans referenced in the main input plan.
+   * @param Map from the expression ID of an attribute reference to the
+   *        uncorrelated subquery that produces the attribute.
    * @return An immutable TopLevelPlan.
    */
-  static TopLevelPlanPtr Create(const PhysicalPtr &plan,
-                                const std::vector<PhysicalPtr> &shared_subplans = std::vector<PhysicalPtr>()) {
-    return TopLevelPlanPtr(new TopLevelPlan(plan, shared_subplans));
+  static TopLevelPlanPtr Create(
+      const PhysicalPtr &plan,
+      const std::vector<PhysicalPtr> &shared_subplans = {},
+      const std::unordered_map<expressions::ExprId, int> &uncorrelated_subquery_map
+          = std::unordered_map<expressions::ExprId, int>()) {
+    return TopLevelPlanPtr(new TopLevelPlan(plan,
+                                            shared_subplans,
+                                            uncorrelated_subquery_map));
   }
 
  protected:
@@ -130,9 +150,11 @@
 
  private:
   TopLevelPlan(const PhysicalPtr &plan,
-               const std::vector<PhysicalPtr> &shared_subplans)
-        : plan_(plan),
-          shared_subplans_(shared_subplans) {
+               const std::vector<PhysicalPtr> &shared_subplans,
+               const std::unordered_map<expressions::ExprId, int> &uncorrelated_subquery_map)
+      : plan_(plan),
+        shared_subplans_(shared_subplans),
+        uncorrelated_subquery_map_(uncorrelated_subquery_map) {
     addChild(plan);
     for (const PhysicalPtr &shared_subplan : shared_subplans) {
       addChild(shared_subplan);
@@ -142,6 +164,7 @@
   PhysicalPtr plan_;
   // Stored in the topological ordering based on dependencies.
   std::vector<PhysicalPtr> shared_subplans_;
+  std::unordered_map<expressions::ExprId, int> uncorrelated_subquery_map_;
 
   DISALLOW_COPY_AND_ASSIGN(TopLevelPlan);
 };
diff --git a/query_optimizer/rules/CMakeLists.txt b/query_optimizer/rules/CMakeLists.txt
index bf69cc7..03dcc50 100644
--- a/query_optimizer/rules/CMakeLists.txt
+++ b/query_optimizer/rules/CMakeLists.txt
@@ -1,5 +1,7 @@
 #   Copyright 2011-2015 Quickstep Technologies LLC.
 #   Copyright 2015 Pivotal Software, Inc.
+#   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+#     University of Wisconsin—Madison.
 #
 #   Licensed under the Apache License, Version 2.0 (the "License");
 #   you may not use this file except in compliance with the License.
@@ -21,10 +23,13 @@
 add_library(quickstep_queryoptimizer_rules_GenerateJoins GenerateJoins.cpp GenerateJoins.hpp)
 add_library(quickstep_queryoptimizer_rules_PruneColumns PruneColumns.cpp PruneColumns.hpp)
 add_library(quickstep_queryoptimizer_rules_PushDownFilter PushDownFilter.cpp PushDownFilter.hpp)
+add_library(quickstep_queryoptimizer_rules_PushDownSemiAntiJoin PushDownSemiAntiJoin.cpp PushDownSemiAntiJoin.hpp)
 add_library(quickstep_queryoptimizer_rules_Rule ../../empty_src.cpp Rule.hpp)
 add_library(quickstep_queryoptimizer_rules_RuleHelper RuleHelper.cpp RuleHelper.hpp)
 add_library(quickstep_queryoptimizer_rules_TopDownRule ../../empty_src.cpp TopDownRule.hpp)
 add_library(quickstep_queryoptimizer_rules_UpdateExpression UpdateExpression.cpp UpdateExpression.hpp)
+add_library(quickstep_queryoptimizer_rules_UnnestSubqueries UnnestSubqueries.cpp UnnestSubqueries.hpp)
+
                       
 # Link dependencies:
 target_link_libraries(quickstep_queryoptimizer_rules_BottomUpRule
@@ -58,7 +63,6 @@
                       quickstep_queryoptimizer_rules_TopDownRule
                       quickstep_types_operations_comparisons_ComparisonFactory
                       quickstep_types_operations_comparisons_ComparisonID
-                      quickstep_utility_HashPair
                       quickstep_utility_Macros
                       quickstep_utility_VectorUtil)
 target_link_libraries(quickstep_queryoptimizer_rules_PruneColumns
@@ -81,6 +85,14 @@
                       quickstep_queryoptimizer_rules_RuleHelper
                       quickstep_queryoptimizer_rules_TopDownRule
                       quickstep_utility_Macros)
+target_link_libraries(quickstep_queryoptimizer_rules_PushDownSemiAntiJoin
+                      quickstep_queryoptimizer_expressions_AttributeReference
+                      quickstep_queryoptimizer_expressions_ExpressionUtil
+                      quickstep_queryoptimizer_logical_HashJoin
+                      quickstep_queryoptimizer_logical_Logical
+                      quickstep_queryoptimizer_logical_PatternMatcher
+                      quickstep_queryoptimizer_rules_TopDownRule
+                      quickstep_utility_Macros)
 target_link_libraries(quickstep_queryoptimizer_rules_Rule
                       glog
                       quickstep_utility_Macros)
@@ -97,6 +109,36 @@
 target_link_libraries(quickstep_queryoptimizer_rules_TopDownRule
                       quickstep_queryoptimizer_rules_Rule
                       quickstep_utility_Macros)
+target_link_libraries(quickstep_queryoptimizer_rules_UnnestSubqueries
+                      quickstep_queryoptimizer_OptimizerContext
+                      quickstep_queryoptimizer_expressions_AttributeReference
+                      quickstep_queryoptimizer_expressions_ComparisonExpression
+                      quickstep_queryoptimizer_expressions_Exists
+                      quickstep_queryoptimizer_expressions_ExprId
+                      quickstep_queryoptimizer_expressions_Expression
+                      quickstep_queryoptimizer_expressions_ExpressionType
+                      quickstep_queryoptimizer_expressions_ExpressionUtil
+                      quickstep_queryoptimizer_expressions_LogicalAnd
+                      quickstep_queryoptimizer_expressions_LogicalNot
+                      quickstep_queryoptimizer_expressions_LogicalOr
+                      quickstep_queryoptimizer_expressions_NamedExpression
+                      quickstep_queryoptimizer_expressions_PatternMatcher
+                      quickstep_queryoptimizer_expressions_Predicate
+                      quickstep_queryoptimizer_expressions_SubqueryExpression
+                      quickstep_queryoptimizer_logical_Aggregate
+                      quickstep_queryoptimizer_logical_Filter
+                      quickstep_queryoptimizer_logical_HashJoin
+                      quickstep_queryoptimizer_logical_Logical
+                      quickstep_queryoptimizer_logical_LogicalType
+                      quickstep_queryoptimizer_logical_PatternMatcher
+                      quickstep_queryoptimizer_logical_Project
+                      quickstep_queryoptimizer_logical_TopLevelPlan
+                      quickstep_queryoptimizer_rules_BottomUpRule
+                      quickstep_queryoptimizer_rules_Rule
+                      quickstep_types_operations_comparisons_Comparison
+                      quickstep_types_operations_comparisons_ComparisonID
+                      quickstep_utility_Macros
+                      quickstep_utility_SqlError)
 target_link_libraries(quickstep_queryoptimizer_rules_UpdateExpression
                       glog
                       quickstep_queryoptimizer_expressions_ExprId
@@ -115,7 +157,9 @@
                       quickstep_queryoptimizer_rules_GenerateJoins
                       quickstep_queryoptimizer_rules_PruneColumns
                       quickstep_queryoptimizer_rules_PushDownFilter
+                      quickstep_queryoptimizer_rules_PushDownSemiAntiJoin
                       quickstep_queryoptimizer_rules_Rule
                       quickstep_queryoptimizer_rules_RuleHelper
                       quickstep_queryoptimizer_rules_TopDownRule
-                      quickstep_queryoptimizer_rules_UpdateExpression)
+                      quickstep_queryoptimizer_rules_UpdateExpression
+                      quickstep_queryoptimizer_rules_UnnestSubqueries)
diff --git a/query_optimizer/rules/GenerateJoins.cpp b/query_optimizer/rules/GenerateJoins.cpp
index 50879ec..05fa397 100644
--- a/query_optimizer/rules/GenerateJoins.cpp
+++ b/query_optimizer/rules/GenerateJoins.cpp
@@ -1,6 +1,8 @@
 /**
  *   Copyright 2011-2015 Quickstep Technologies LLC.
  *   Copyright 2015 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin—Madison.
  *
  *   Licensed under the Apache License, Version 2.0 (the "License");
  *   you may not use this file except in compliance with the License.
@@ -35,7 +37,6 @@
 #include "query_optimizer/rules/RuleHelper.hpp"
 #include "types/operations/comparisons/ComparisonFactory.hpp"
 #include "types/operations/comparisons/ComparisonID.hpp"
-#include "utility/HashPair.hpp"
 #include "utility/VectorUtil.hpp"
 
 #include "glog/logging.h"
@@ -280,8 +281,8 @@
     // The predicates that are not used in any joins.
     std::vector<E::PredicatePtr> filter_predicates;
 
-    // First, create a HashJoin for each hash join predicate in the order as
-    // specified in the query.
+    // First, create an inner HashJoin for each hash join predicate in the order
+    // as specified in the query.
     for (const std::unique_ptr<const HashJoinPredicateInfo> &
              hash_join_predicate_info : hash_join_predicates) {
       const L::LogicalPtr left_logical =
@@ -296,7 +297,9 @@
             left_logical,
             right_logical,
             hash_join_predicate_info->left_join_attributes,
-            hash_join_predicate_info->right_join_attributes);
+            hash_join_predicate_info->right_join_attributes,
+            nullptr /* residual_predicate */,
+            L::HashJoin::JoinType::kInnerJoin);
         UpdateLogicalVectors(left_logical,
                              right_logical,
                              logical_join,
diff --git a/query_optimizer/rules/PushDownSemiAntiJoin.cpp b/query_optimizer/rules/PushDownSemiAntiJoin.cpp
new file mode 100644
index 0000000..54af764
--- /dev/null
+++ b/query_optimizer/rules/PushDownSemiAntiJoin.cpp
@@ -0,0 +1,113 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin—Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include "query_optimizer/rules/PushDownSemiAntiJoin.hpp"
+
+#include <vector>
+
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/ExpressionUtil.hpp"
+#include "query_optimizer/logical/HashJoin.hpp"
+#include "query_optimizer/logical/Logical.hpp"
+#include "query_optimizer/logical/PatternMatcher.hpp"
+
+namespace quickstep {
+namespace optimizer {
+
+namespace E = ::quickstep::optimizer::expressions;
+namespace L = ::quickstep::optimizer::logical;
+
+L::LogicalPtr PushDownSemiAntiJoin::applyToNode(const L::LogicalPtr &input) {
+  std::vector<L::LogicalPtr> new_children;
+  bool has_changes = false;
+
+  for (const L::LogicalPtr &child : input->children()) {
+    L::HashJoinPtr hash_join;
+    if (L::SomeHashJoin::MatchesWithConditionalCast(child, &hash_join)) {
+      if (hash_join->join_type() == L::HashJoin::JoinType::kLeftSemiJoin ||
+          hash_join->join_type() == L::HashJoin::JoinType::kLeftAntiJoin) {
+        L::LogicalPtr new_child = pushDownSemiAntiJoin(hash_join);
+        if (new_child != child) {
+          has_changes = true;
+        }
+        new_children.push_back(new_child);
+      }
+    } else {
+      new_children.push_back(child);
+    }
+  }
+
+  if (has_changes) {
+    return input->copyWithNewChildren(new_children);
+  } else {
+    return input;
+  }
+}
+
+L::LogicalPtr PushDownSemiAntiJoin::pushDownSemiAntiJoin(
+    const L::HashJoinPtr &semi_anti_join) {
+  const L::LogicalPtr &left_input = semi_anti_join->left();
+  std::vector<L::LogicalPtr> left_input_children = left_input->children();
+
+  if (!left_input_children.empty()) {
+    const std::vector<L::LogicalPtr>::size_type last_input_index = left_input_children.size();
+
+    std::vector<L::LogicalPtr>::size_type input_index = 0;
+    while (input_index < last_input_index) {
+      if (SubsetOfExpressions(semi_anti_join->left_join_attributes(),
+                              left_input_children[input_index]->getOutputAttributes())) {
+        break;
+      }
+      ++input_index;
+    }
+
+    if (input_index < last_input_index &&
+        semi_anti_join->residual_predicate() != nullptr) {
+      const std::vector<E::AttributeReferencePtr> input_child_output_attributes =
+          left_input_children[input_index]->getOutputAttributes();
+      const std::vector<E::AttributeReferencePtr> right_input_output_attributes =
+          semi_anti_join->right()->getOutputAttributes();
+      E::UnorderedAttributeSet visible_attribute_set(input_child_output_attributes.begin(),
+                                                     input_child_output_attributes.end());
+      visible_attribute_set.insert(right_input_output_attributes.begin(),
+                                   right_input_output_attributes.end());
+
+      const std::vector<E::AttributeReferencePtr> referenced_attrs_in_residual =
+          E::GetAttributeReferencesWithinScope(
+              semi_anti_join->residual_predicate()->getReferencedAttributes(),
+              E::AttributeReferenceScope::kLocal);
+      for (const E::AttributeReferencePtr &referenced_attr : referenced_attrs_in_residual) {
+        if (visible_attribute_set.find(referenced_attr) == visible_attribute_set.end()) {
+          input_index = last_input_index;
+          break;
+        }
+      }
+    }
+
+    if (input_index < last_input_index) {
+      left_input_children[input_index] =
+          semi_anti_join->copyWithNewChildren({left_input_children[input_index],
+                                               semi_anti_join->right()});
+      return left_input->copyWithNewChildren(left_input_children);
+    }
+  }
+
+  return semi_anti_join;
+}
+
+}  // namespace optimizer
+}  // namespace quickstep
diff --git a/query_optimizer/rules/PushDownSemiAntiJoin.hpp b/query_optimizer/rules/PushDownSemiAntiJoin.hpp
new file mode 100644
index 0000000..7b1ea85
--- /dev/null
+++ b/query_optimizer/rules/PushDownSemiAntiJoin.hpp
@@ -0,0 +1,49 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin—Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_RULES_PUSH_DOWN_SEMI_ANTI_JOIN_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_RULES_PUSH_DOWN_SEMI_ANTI_JOIN_HPP_
+
+#include <string>
+
+#include "query_optimizer/logical/HashJoin.hpp"
+#include "query_optimizer/logical/Logical.hpp"
+#include "query_optimizer/rules/TopDownRule.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+namespace optimizer {
+
+class PushDownSemiAntiJoin : public TopDownRule<logical::Logical> {
+ public:
+  PushDownSemiAntiJoin() {}
+
+  std::string getName() const override { return "PushDownSemiAntiJoin"; }
+
+ protected:
+  logical::LogicalPtr applyToNode(const logical::LogicalPtr &input) override;
+
+ private:
+  logical::LogicalPtr pushDownSemiAntiJoin(const logical::HashJoinPtr &input);
+
+  DISALLOW_COPY_AND_ASSIGN(PushDownSemiAntiJoin);
+};
+
+}  // namespace optimizer
+}  // namespace quickstep
+
+#endif /* QUICKSTEP_QUERY_OPTIMIZER_RULES_PUSH_DOWN_SEMI_ANTI_JOIN_HPP_ */
diff --git a/query_optimizer/rules/UnnestSubqueries.cpp b/query_optimizer/rules/UnnestSubqueries.cpp
new file mode 100644
index 0000000..2dca36e
--- /dev/null
+++ b/query_optimizer/rules/UnnestSubqueries.cpp
@@ -0,0 +1,714 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin—Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include "query_optimizer/rules/UnnestSubqueries.hpp"
+
+#include <algorithm>
+#include <memory>
+#include <utility>
+#include <vector>
+
+#include "query_optimizer/OptimizerContext.hpp"
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/ComparisonExpression.hpp"
+#include "query_optimizer/expressions/Exists.hpp"
+#include "query_optimizer/expressions/ExprId.hpp"
+#include "query_optimizer/expressions/ExpressionType.hpp"
+#include "query_optimizer/expressions/ExpressionUtil.hpp"
+#include "query_optimizer/expressions/LogicalAnd.hpp"
+#include "query_optimizer/expressions/LogicalNot.hpp"
+#include "query_optimizer/expressions/LogicalOr.hpp"
+#include "query_optimizer/expressions/NamedExpression.hpp"
+#include "query_optimizer/expressions/PatternMatcher.hpp"
+#include "query_optimizer/expressions/SubqueryExpression.hpp"
+#include "query_optimizer/logical/Aggregate.hpp"
+#include "query_optimizer/logical/Filter.hpp"
+#include "query_optimizer/logical/HashJoin.hpp"
+#include "query_optimizer/logical/Logical.hpp"
+#include "query_optimizer/logical/LogicalType.hpp"
+#include "query_optimizer/logical/PatternMatcher.hpp"
+#include "query_optimizer/logical/Project.hpp"
+#include "query_optimizer/logical/TopLevelPlan.hpp"
+#include "query_optimizer/rules/Rule.hpp"
+#include "types/operations/comparisons/Comparison.hpp"
+#include "types/operations/comparisons/ComparisonID.hpp"
+#include "utility/SqlError.hpp"
+
+#include "glog/logging.h"
+
+namespace quickstep {
+namespace optimizer {
+
+namespace E = ::quickstep::optimizer::expressions;
+namespace L = ::quickstep::optimizer::logical;
+
+struct CorrelatedQueryInfo {
+  enum class JoinType {
+    kInnerJoin = 0,
+    kLeftSemiJoin,
+    kLeftAntiJoin
+  };
+
+  CorrelatedQueryInfo(const JoinType join_type_in,
+                      const L::LogicalPtr &correlated_query_in,
+                      std::vector<E::AttributeReferencePtr> &&probe_join_attributes_in,
+                      std::vector<E::AttributeReferencePtr> &&build_join_attributes_in,
+                      std::vector<E::PredicatePtr> &&non_hash_join_predicates_in)
+      : join_type(join_type_in),
+        correlated_query(correlated_query_in),
+        probe_join_attributes(std::move(probe_join_attributes_in)),
+        build_join_attributes(std::move(build_join_attributes_in)),
+        non_hash_join_predicates(std::move(non_hash_join_predicates_in)) {}
+
+  JoinType join_type;
+  L::LogicalPtr correlated_query;
+  std::vector<E::AttributeReferencePtr> probe_join_attributes;
+  std::vector<E::AttributeReferencePtr> build_join_attributes;
+  std::vector<E::PredicatePtr> non_hash_join_predicates;
+};
+
+L::LogicalPtr UnnestSubqueries::apply(const L::LogicalPtr &input) {
+  DCHECK(L::SomeTopLevelPlan::Matches(input))
+      << "The input logical node must be of TopLevelPlan";
+
+  const L::TopLevelPlan &top_level_plan = static_cast<const L::TopLevelPlan&>(*input);
+
+  bool has_changes = false;
+
+  std::vector<E::AttributeReferencePtr> empty_probe_join_predicates;
+  std::vector<E::AttributeReferencePtr> empty_build_join_predicates;
+  std::vector<E::PredicatePtr> empty_correlated_non_hash_join_predicates;
+  std::unordered_map<E::ExprId, L::LogicalPtr> uncorrelated_subqueries;
+  expressions::UnorderedNamedExpressionSet empty_attributes;
+  UnnestSubqueriesForNonRootLogical unnest_rule(false /* scalar query */,
+                                                empty_attributes,
+                                                context_,
+                                                &uncorrelated_subqueries,
+                                                &empty_probe_join_predicates,
+                                                &empty_build_join_predicates,
+                                                &empty_correlated_non_hash_join_predicates);
+  DCHECK(empty_probe_join_predicates.empty());
+  DCHECK(empty_build_join_predicates.empty());
+  DCHECK(empty_correlated_non_hash_join_predicates.empty());
+
+  // Unnest subqueries in each subplan and the main plan.
+  std::vector<L::LogicalPtr> new_shared_subplans;
+  for (const L::LogicalPtr &shared_subplan : top_level_plan.shared_subplans()) {
+    const L::LogicalPtr new_shared_subplan = unnest_rule.apply(shared_subplan);
+    if (new_shared_subplan != shared_subplan && !has_changes) {
+      has_changes = true;
+    }
+    new_shared_subplans.emplace_back(new_shared_subplan);
+  }
+
+  const L::LogicalPtr new_main_plan = unnest_rule.apply(top_level_plan.plan());
+  if (new_main_plan != top_level_plan.plan() && !has_changes) {
+    has_changes = true;
+  }
+
+  std::unordered_map<E::ExprId, int> uncorrelated_map;
+  std::vector<L::LogicalPtr>::size_type subplan_id = new_shared_subplans.size();
+  for (const std::pair<const E::ExprId, L::LogicalPtr> &uncorrelated_query_info : uncorrelated_subqueries) {
+    uncorrelated_map.emplace(uncorrelated_query_info.first,
+                             subplan_id);
+    new_shared_subplans.emplace_back(uncorrelated_query_info.second);
+    ++subplan_id;
+  }
+
+  if (!uncorrelated_subqueries.empty() || has_changes) {
+    const L::LogicalPtr new_top_level_plan =
+        L::TopLevelPlan::Create(new_main_plan,
+                                new_shared_subplans,
+                                uncorrelated_map);
+    LOG_APPLYING_RULE(input, new_top_level_plan);
+    return new_top_level_plan;
+  }
+
+  LOG_IGNORING_RULE(input);
+  return input;
+}
+
+L::LogicalPtr UnnestSubqueriesForNonRootLogical::applyInternal(
+    const L::LogicalPtr &input,
+    E::UnorderedAttributeSet *inner_attributes,
+    std::vector<E::AttributeReferencePtr> *probe_join_attributes,
+    std::vector<E::AttributeReferencePtr> *build_join_attributes,
+    std::vector<E::PredicatePtr> *non_hash_join_predicates) {
+  DCHECK(inner_attributes->empty());
+  DCHECK(probe_join_attributes->empty());
+  DCHECK(build_join_attributes->empty());
+  DCHECK(non_hash_join_predicates->empty());
+
+  // First, handle subquery expressions in the children.
+  bool has_change = false;
+  std::vector<L::LogicalPtr> new_children;
+  for (const L::LogicalPtr &child : input->children()) {
+    E::UnorderedAttributeSet inner_attributes_in_child;
+    std::vector<E::AttributeReferencePtr> probe_join_predicates_in_child;
+    std::vector<E::AttributeReferencePtr> build_join_predicates_in_child;
+    std::vector<E::PredicatePtr> non_hash_join_predicates_in_child;
+    const L::LogicalPtr new_child =
+        applyInternal(child,
+                      &inner_attributes_in_child,
+                      &probe_join_predicates_in_child,
+                      &build_join_predicates_in_child,
+                      &non_hash_join_predicates_in_child);
+    inner_attributes->insert(inner_attributes_in_child.begin(),
+                             inner_attributes_in_child.end());
+    probe_join_attributes->insert(probe_join_attributes->end(),
+                                  probe_join_predicates_in_child.begin(),
+                                  probe_join_predicates_in_child.end());
+    build_join_attributes->insert(build_join_attributes->end(),
+                                  build_join_predicates_in_child.begin(),
+                                  build_join_predicates_in_child.end());
+    non_hash_join_predicates->insert(non_hash_join_predicates->end(),
+                                     non_hash_join_predicates_in_child.begin(),
+                                     non_hash_join_predicates_in_child.end());
+    if (new_child != child && !has_change) {
+      has_change = true;
+    }
+    new_children.emplace_back(new_child);
+  }
+
+  L::LogicalPtr input_with_new_children = input;
+  if (has_change) {
+    input_with_new_children = input->copyWithNewChildren(new_children);
+  }
+
+  // Next, process subquery expressions in the current node.
+  const L::LogicalPtr output = applyToNode(input_with_new_children,
+                                           inner_attributes,
+                                           probe_join_attributes,
+                                           build_join_attributes,
+                                           non_hash_join_predicates);
+  return output;
+}
+
+L::LogicalPtr UnnestSubqueriesForNonRootLogical::applyToNode(
+    const L::LogicalPtr &input,
+    E::UnorderedAttributeSet *inner_attributes,
+    std::vector<E::AttributeReferencePtr> *probe_join_attributes,
+    std::vector<E::AttributeReferencePtr> *build_join_attributes,
+    std::vector<E::PredicatePtr> *non_hash_join_predicates) {
+  // First eliminate subqueries expressions in this node (that is, this node is the outer query).
+  L::LogicalPtr input_with_no_subqueries = eliminateNestedScalarQueries(input);
+  DCHECK(input_with_no_subqueries != nullptr);
+
+  // Next determine whether this node has any outer reference (that is, this node is the inner query).
+  const std::vector<E::ExpressionPtr> &input_expressions = input_with_no_subqueries->input_expressions();
+  std::vector<E::ExpressionPtr> new_input_expressions;
+  bool has_change = false;
+  for (const E::ExpressionPtr &input_expression : input_expressions) {
+    const E::ExpressionPtr new_input_expression =
+        eliminateOuterAttributeReference(input_expression,
+                                         inner_attributes,
+                                         probe_join_attributes,
+                                         build_join_attributes,
+                                         non_hash_join_predicates);
+    if (new_input_expression != nullptr) {
+      new_input_expressions.emplace_back(new_input_expression);
+    }
+    if (new_input_expression != input_expression && !has_change) {
+      has_change = true;
+    }
+  }
+
+  switch (input_with_no_subqueries->getLogicalType()) {
+    case L::LogicalType::kAggregate: {
+      if (!non_hash_join_predicates->empty()) {
+        THROW_SQL_ERROR()
+            << "Non-equality join predicate with an outer query must be above any aggregate";
+      }
+
+      const L::Aggregate &aggregate = static_cast<const L::Aggregate&>(*input_with_no_subqueries);
+      DCHECK(!has_change);
+      if (!inner_attributes->empty()) {
+        std::vector<E::NamedExpressionPtr> group_expressions = aggregate.grouping_expressions();
+        group_expressions.insert(group_expressions.end(),
+                                 inner_attributes->begin(),
+                                 inner_attributes->end());
+        return L::Aggregate::Create(aggregate.input(),
+                                    group_expressions,
+                                    aggregate.aggregate_expressions());
+      }
+      return input_with_no_subqueries;
+    }
+    case L::LogicalType::kFilter: {
+      if (new_input_expressions.empty()) {
+        const L::Filter &filter = static_cast<const L::Filter&>(*input_with_no_subqueries);
+        return filter.input();
+      }
+      if (has_change) {
+        return input_with_no_subqueries->copyWithNewInputExpressions(new_input_expressions);
+      }
+      return input_with_no_subqueries;
+    }
+    case L::LogicalType::kProject: {
+      if (has_change) {
+        input_with_no_subqueries =
+            input_with_no_subqueries->copyWithNewInputExpressions(new_input_expressions);
+      }
+      if (!inner_attributes->empty()) {
+         const L::Project &project = static_cast<const L::Project&>(*input_with_no_subqueries);
+         // Insert those inner attributes into the front of the project expression
+         // list rather than the back to help reduce unnecessary selection for
+         // reordering output attributes, since those attributes are usually also
+         // grouping attributes that proceed others in projection.
+         std::vector<E::NamedExpressionPtr> new_project_expressions;
+         const E::UnorderedNamedExpressionSet project_expression_set(project.project_expressions().begin(),
+                                                                     project.project_expressions().end());
+         for (const E::AttributeReferencePtr &inner_attribute : *inner_attributes) {
+           if (project_expression_set.find(inner_attribute) == project_expression_set.end()) {
+             new_project_expressions.emplace_back(inner_attribute);
+           }
+         }
+         new_project_expressions.insert(new_project_expressions.end(),
+                                        project_expression_set.begin(),
+                                        project_expression_set.end());
+         if (new_project_expressions.size() != project.project_expressions().size()) {
+           input_with_no_subqueries = L::Project::Create(project.input(),
+                                                         new_project_expressions);
+         }
+      }
+      return input_with_no_subqueries;
+    }
+    default:
+      if (has_change) {
+        return input_with_no_subqueries->copyWithNewInputExpressions(new_input_expressions);
+      }
+      return input_with_no_subqueries;
+  }
+}
+
+E::ExpressionPtr UnnestSubqueriesForNonRootLogical::eliminateOuterAttributeReference(
+    const E::ExpressionPtr &expression,
+    E::UnorderedAttributeSet *inner_attributes,
+    std::vector<E::AttributeReferencePtr> *probe_join_attributes,
+    std::vector<E::AttributeReferencePtr> *build_join_attributes,
+    std::vector<E::PredicatePtr> *non_hash_join_predicates) {
+  DCHECK(expression->getExpressionType() != E::ExpressionType::kExists);
+  DCHECK(expression->getExpressionType() != E::ExpressionType::kSubqueryExpression);
+
+  switch (expression->getExpressionType()) {
+    case E::ExpressionType::kPredicateLiteral:  // Fall through
+    case E::ExpressionType::kScalarLiteral:
+      return expression;
+    case E::ExpressionType::kLogicalAnd: {
+      const E::LogicalAnd &logical_and = static_cast<const E::LogicalAnd&>(*expression);
+      bool has_change = false;
+      std::vector<E::PredicatePtr> new_children;
+      for (const E::PredicatePtr &child : logical_and.operands()) {
+        const E::ExpressionPtr new_child =
+            eliminateOuterAttributeReference(child,
+                                             inner_attributes,
+                                             probe_join_attributes,
+                                             build_join_attributes,
+                                             non_hash_join_predicates);
+        if (new_child != nullptr) {
+          new_children.emplace_back(
+              std::static_pointer_cast<const E::Predicate>(new_child));
+        }
+        if (new_child != child && !has_change) {
+          has_change = true;
+        }
+      }
+      if (has_change) {
+        if (new_children.empty()) {
+          return E::ExpressionPtr();
+        }
+        if (new_children.size() == 1u) {
+          return new_children[0];
+        }
+        return E::LogicalAnd::Create(new_children);
+      }
+      return expression;
+    }
+    case E::ExpressionType::kComparisonExpression: {
+      const E::ComparisonExpressionPtr comparison_expression =
+          std::static_pointer_cast<const E::ComparisonExpression>(expression);
+      E::AttributeReferencePtr outer_attribute;
+      E::AttributeReferencePtr inner_attribute;
+      // Hash-join predicate.
+      if (E::SomeAttributeReference::MatchesWithConditionalCast(
+              comparison_expression->left(), &outer_attribute) &&
+          E::SomeAttributeReference::MatchesWithConditionalCast(
+              comparison_expression->right(), &inner_attribute) &&
+          comparison_expression->comparison().getComparisonID() == ComparisonID::kEqual) {
+        if (!isCorrelatedOuterAttribute(outer_attribute)) {
+          if (!isCorrelatedOuterAttribute(inner_attribute)) {
+            return expression;
+          }
+          std::swap(outer_attribute, inner_attribute);
+        } else if (isCorrelatedOuterAttribute(inner_attribute)) {
+          THROW_SQL_ERROR() << "Comparison of two outer attribute references is not allowed";
+        }
+        if (visiable_attrs_from_outer_query_.find(outer_attribute) == visiable_attrs_from_outer_query_.end()) {
+          THROW_SQL_ERROR()
+              << "Nested queries can only reference attributes in the outer query one level above";
+        }
+        outer_attribute =
+            E::AttributeReference::Create(outer_attribute->id(),
+                                          outer_attribute->attribute_name(),
+                                          outer_attribute->attribute_alias(),
+                                          outer_attribute->relation_name(),
+                                          outer_attribute->getValueType(),
+                                          E::AttributeReferenceScope::kLocal);
+
+        inner_attributes->emplace(inner_attribute);
+        probe_join_attributes->emplace_back(outer_attribute);
+        build_join_attributes->emplace_back(inner_attribute);
+        return E::ExpressionPtr();
+      } else {
+        DeOuterAttributeReference de_outer_reference_rule(!scalar_query_,
+                                                          *uncorrelated_subqueries_,
+                                                          visiable_attrs_from_outer_query_);
+        const E::ExpressionPtr new_expression =
+            de_outer_reference_rule.apply(expression);
+        if (new_expression != expression) {
+          // All the inner attributes should be insert into the inner_attributes,
+          // so that they will be visible from the outer query.
+          // Note that we use expression instead of new_expression, since
+          // the outer attribute reference in new_expression is de-outer referenced.
+          const std::vector<E::AttributeReferencePtr> &inner_attrs_vec =
+              E::GetAttributeReferencesWithinScope(expression->getReferencedAttributes(),
+                                                   E::AttributeReferenceScope::kLocal);
+          for (const E::AttributeReferencePtr &attr : inner_attrs_vec) {
+            inner_attributes->emplace(attr);
+          }
+
+          non_hash_join_predicates->emplace_back(
+              std::static_pointer_cast<const E::Predicate>(new_expression));
+          return E::ExpressionPtr();
+        }
+      }
+      return expression;
+    }
+    default: {
+      validateNonOuterAttributeReference(expression);
+      return expression;
+    }
+  }
+}
+
+void UnnestSubqueriesForNonRootLogical::validateNonOuterAttributeReference(
+    const E::ExpressionPtr &expression) {
+  const std::vector<E::AttributeReferencePtr> referenced_attributes =
+      expression->getReferencedAttributes();
+  for (const E::AttributeReferencePtr &referenced_attribute : referenced_attributes) {
+    if (isCorrelatedOuterAttribute(referenced_attribute)) {
+      THROW_SQL_ERROR()
+          << "Outer attribute reference can only appear in a single-attribute equality predicate";
+    }
+  }
+}
+
+bool UnnestSubqueriesForNonRootLogical::isCorrelatedOuterAttribute(
+    const E::AttributeReferencePtr &attribute) const {
+  return (attribute->scope() == E::AttributeReferenceScope::kOuter &&
+          uncorrelated_subqueries_->find(attribute->id()) == uncorrelated_subqueries_->end());
+}
+
+L::LogicalPtr UnnestSubqueriesForNonRootLogical::eliminateNestedScalarQueries(const L::LogicalPtr &node) {
+  const std::vector<E::ExpressionPtr> &input_expressions = node->input_expressions();
+  const std::vector<E::AttributeReferencePtr> input_attributes =
+      node->getInputAttributes();
+  E::UnorderedNamedExpressionSet input_attribute_set(input_attributes.begin(),
+                                                     input_attributes.end());
+
+  // Transform subquery experssions into non-subquery expressions.
+  std::vector<CorrelatedQueryInfo> correlated_query_info_vec;
+  std::vector<E::ExpressionPtr> new_input_expressions;
+  bool has_changed_expression = false;
+  for (const E::ExpressionPtr &input_expression : input_expressions) {
+    std::vector<E::PredicatePtr> correlated_predicates;
+    UnnestSubqueriesForExpession unnest_rule_for_expr(
+        input_attribute_set,
+        context_,
+        uncorrelated_subqueries_,
+        &correlated_query_info_vec);
+    const E::ExpressionPtr new_input_expression =
+        unnest_rule_for_expr.apply(input_expression);
+    if (new_input_expression != input_expression && !has_changed_expression) {
+      has_changed_expression = true;
+    }
+    new_input_expressions.emplace_back(new_input_expression);
+  }
+
+  if (has_changed_expression) {
+    // If there are correlated subquery expressions, add logical joins.
+    if (!correlated_query_info_vec.empty()) {
+      L::LogicalPtr new_child;
+
+      if (new_input_expressions[0] == nullptr &&
+          node->getLogicalType() == L::LogicalType::kNestedLoopsJoin) {
+        new_child = node;
+      } else {
+        DCHECK_EQ(1u, node->children().size());
+        new_child = node->children()[0];
+      }
+
+      for (CorrelatedQueryInfo &correlated_query_info : correlated_query_info_vec) {
+        DCHECK(!correlated_query_info.probe_join_attributes.empty());
+        if (correlated_query_info.join_type == CorrelatedQueryInfo::JoinType::kInnerJoin) {
+          DCHECK(correlated_query_info.non_hash_join_predicates.empty())
+              << correlated_query_info.non_hash_join_predicates[0]->toString();
+          new_child = L::HashJoin::Create(new_child,
+                                          correlated_query_info.correlated_query,
+                                          correlated_query_info.probe_join_attributes,
+                                          correlated_query_info.build_join_attributes,
+                                          nullptr, /* residual_predicate */
+                                          L::HashJoin::JoinType::kInnerJoin);
+        } else {
+          E::PredicatePtr filter_predicate;
+          if (correlated_query_info.non_hash_join_predicates.size() > 1u) {
+            filter_predicate = E::LogicalAnd::Create(correlated_query_info.non_hash_join_predicates);
+          } else if (!correlated_query_info.non_hash_join_predicates.empty()) {
+            filter_predicate = correlated_query_info.non_hash_join_predicates[0];
+          }
+
+          L::HashJoin::JoinType join_type =
+              (correlated_query_info.join_type == CorrelatedQueryInfo::JoinType::kLeftSemiJoin)
+                  ? L::HashJoin::JoinType::kLeftSemiJoin
+                  : L::HashJoin::JoinType::kLeftAntiJoin;
+          new_child = L::HashJoin::Create(new_child,
+                                          correlated_query_info.correlated_query,
+                                          correlated_query_info.probe_join_attributes,
+                                          correlated_query_info.build_join_attributes,
+                                          filter_predicate,
+                                          join_type);
+        }
+      }
+
+      // Special case for filter and nested loops join which may contain EXISTS and IN.
+      if (new_input_expressions[0] == nullptr) {
+        DCHECK_EQ(1u, new_input_expressions.size());
+        DCHECK(node->getLogicalType() == L::LogicalType::kFilter ||
+               node->getLogicalType() == L::LogicalType::kNestedLoopsJoin) << node->toString();
+        LOG_APPLYING_RULE(node, new_child);
+        return new_child;
+      }
+
+      L::LogicalPtr new_logical = node->copyWithNewInputExpressions(new_input_expressions);
+      new_logical = new_logical->copyWithNewChildren({new_child});
+      LOG_APPLYING_RULE(node, new_logical);
+      return new_logical;
+    }
+
+    L::LogicalPtr new_logical = node->copyWithNewInputExpressions(new_input_expressions);
+    LOG_APPLYING_RULE(node, new_logical);
+    return new_logical;
+  }
+
+  LOG_IGNORING_RULE(node);
+  return node;
+}
+
+E::ExpressionPtr UnnestSubqueriesForExpession::applyInternal(
+    const bool allow_exists_or_in,
+    const E::ExpressionPtr &node) {
+  switch (node->getExpressionType()) {
+    case E::ExpressionType::kSubqueryExpression: {
+      const E::SubqueryExpression &subquery_expression =
+          static_cast<const E::SubqueryExpression&>(*node);
+
+      std::vector<E::AttributeReferencePtr> probe_join_attributes;
+      std::vector<E::AttributeReferencePtr> build_join_attributes;
+      std::vector<E::PredicatePtr> non_hash_join_predicates;
+      UnnestSubqueriesForNonRootLogical unnest_logical_rule(true,  // scalar_query
+                                                            visible_attributes_from_outer_query_,
+                                                            context_,
+                                                            uncorrelated_subqueries_,
+                                                            &probe_join_attributes,
+                                                            &build_join_attributes,
+                                                            &non_hash_join_predicates);
+      const L::LogicalPtr subquery = subquery_expression.subquery();
+      const L::LogicalPtr new_subquery = unnest_logical_rule.apply(subquery);
+      const E::AttributeReferencePtr output_attribute = subquery->getOutputAttributes()[0];
+      DCHECK(!new_subquery->getOutputAttributes().empty());
+      if (probe_join_attributes.empty()) {
+        DCHECK(non_hash_join_predicates.empty());
+        DCHECK_EQ(1u, new_subquery->getOutputAttributes().size()) << node->toString();
+        const E::AttributeReferencePtr new_outer_attribute_reference =
+            E::AttributeReference::Create(context_->nextExprId(),
+                                          output_attribute->attribute_name(),
+                                          output_attribute->attribute_alias(),
+                                          output_attribute->relation_name(),
+                                          output_attribute->getValueType(),
+                                          E::AttributeReferenceScope::kOuter);
+        uncorrelated_subqueries_->emplace(new_outer_attribute_reference->id(),
+                                          new_subquery);
+        return new_outer_attribute_reference;
+      } else {
+        correlated_query_info_vec_->emplace_back(CorrelatedQueryInfo::JoinType::kInnerJoin,
+                                                 new_subquery,
+                                                 std::move(probe_join_attributes),
+                                                 std::move(build_join_attributes),
+                                                 std::move(non_hash_join_predicates));
+      }
+
+      return output_attribute;
+    }
+    case E::ExpressionType::kExists: {
+      if (!allow_exists_or_in) {
+        THROW_SQL_ERROR() << "EXISTS can only appear in (un-nested) NOT, AND or by itself";
+      }
+      transformExists(static_cast<const E::Exists&>(*node));
+      return E::ExpressionPtr();
+    }
+    case E::ExpressionType::kLogicalNot: {
+      const E::LogicalNot &logical_not = static_cast<const E::LogicalNot&>(*node);
+      const E::PredicatePtr &operand = logical_not.operand();
+      if (operand->getExpressionType() == E::ExpressionType::kExists) {
+        if (!allow_exists_or_in) {
+          THROW_SQL_ERROR() << "EXISTS can only appear in (un-nested) NOT, AND or by itself";
+        }
+        transformExists(static_cast<const E::Exists&>(*operand));
+        correlated_query_info_vec_->back().join_type = CorrelatedQueryInfo::JoinType::kLeftAntiJoin;
+        return E::PredicatePtr();
+      }
+      const E::ExpressionPtr new_operand =
+          applyInternal(false /* allow_exists_or_in */,
+                        operand);
+      if (new_operand != operand) {
+        return E::LogicalNot::Create(std::static_pointer_cast<const E::Predicate>(new_operand));
+      } else {
+        return node;
+      }
+    }
+    case E::ExpressionType::kLogicalAnd: {
+      const E::LogicalAnd &logical_and = static_cast<const E::LogicalAnd&>(*node);
+
+      std::vector<E::PredicatePtr> new_operands;
+      bool has_change = false;
+      for (const E::PredicatePtr &operand : logical_and.operands()) {
+        const E::ExpressionPtr new_operand =
+            applyInternal(allow_exists_or_in,
+                          operand);
+        if (new_operand != operand && !has_change) {
+          has_change = true;
+        }
+        if (new_operand != nullptr) {
+          new_operands.emplace_back(std::static_pointer_cast<const E::Predicate>(new_operand));
+        }
+      }
+
+      if (new_operands.empty()) {
+        return E::PredicatePtr();
+      } else if (new_operands.size() == 1u) {
+        return new_operands[0];
+      }
+
+      if (has_change) {
+        return E::LogicalAnd::Create(new_operands);
+      }
+      return node;
+    }
+    case E::ExpressionType::kLogicalOr: {
+      const E::LogicalOr &logical_or = static_cast<const E::LogicalOr&>(*node);
+
+      std::vector<E::PredicatePtr> new_operands;
+      bool has_change = false;
+      for (const E::PredicatePtr &operand : logical_or.operands()) {
+        const E::ExpressionPtr new_operand =
+            applyInternal(false,
+                          operand);
+        if (new_operand != operand && !has_change) {
+          has_change = true;
+        }
+        DCHECK(new_operand != nullptr);
+        new_operands.emplace_back(std::static_pointer_cast<const E::Predicate>(new_operand));
+      }
+
+      if (has_change) {
+        return E::LogicalOr::Create(new_operands);
+      }
+      return node;
+    }
+    default:
+      std::vector<E::ExpressionPtr> new_children;
+      bool has_change = false;
+      for (const E::ExpressionPtr &child : node->children()) {
+        const E::ExpressionPtr new_child =
+            applyInternal(allow_exists_or_in, child);
+        if (new_child != child && !has_change) {
+          has_change = true;
+        }
+        DCHECK(child != nullptr);
+        new_children.emplace_back(new_child);
+      }
+
+      if (has_change) {
+        return node->copyWithNewChildren(new_children);
+      }
+      return node;
+  }
+}
+
+void UnnestSubqueriesForExpession::transformExists(
+    const E::Exists &exists_predicate) {
+  std::vector<E::AttributeReferencePtr> probe_join_attributes;
+  std::vector<E::AttributeReferencePtr> build_join_attributes;
+  std::vector<E::PredicatePtr> non_hash_join_predicates;
+  UnnestSubqueriesForNonRootLogical unnest_logical_rule(false,  // scalar_query
+                                                        visible_attributes_from_outer_query_,
+                                                        context_,
+                                                        uncorrelated_subqueries_,
+                                                        &probe_join_attributes,
+                                                        &build_join_attributes,
+                                                        &non_hash_join_predicates);
+  DCHECK_EQ(probe_join_attributes.size(), build_join_attributes.size());
+  const L::LogicalPtr new_subquery =
+      unnest_logical_rule.apply(exists_predicate.exists_subquery()->subquery());
+
+  if (probe_join_attributes.empty()) {
+    if (!non_hash_join_predicates.empty()) {
+      THROW_SQL_ERROR() << "Correlated queries must have an equality join predicate with the outer query";
+    }
+    THROW_SQL_ERROR() << "EXISTS subquery cannot be un-correlated with the outer query";
+  }
+
+  correlated_query_info_vec_->emplace_back(CorrelatedQueryInfo::JoinType::kLeftSemiJoin,
+                                           new_subquery,
+                                           std::move(probe_join_attributes),
+                                           std::move(build_join_attributes),
+                                           std::move(non_hash_join_predicates));
+}
+
+E::ExpressionPtr DeOuterAttributeReference::applyToNode(const E::ExpressionPtr &input) {
+  E::AttributeReferencePtr attr;
+  if (E::SomeAttributeReference::MatchesWithConditionalCast(input, &attr) &&
+      attr->scope() == E::AttributeReferenceScope::kOuter &&
+      uncorrelated_subqueries_.find(attr->id()) == uncorrelated_subqueries_.end()) {
+    if (!allow_outer_reference_) {
+      THROW_SQL_ERROR() << "Non-equality join predicate is not allowed in scalar subqueries";
+    }
+    if (visiable_attrs_from_outer_query_.find(attr) == visiable_attrs_from_outer_query_.end()) {
+      THROW_SQL_ERROR()
+          << "Nested queries can only reference attributes in the outer query one level above";
+    }
+    return E::AttributeReference::Create(attr->id(),
+                                         attr->attribute_name(),
+                                         attr->attribute_alias(),
+                                         attr->relation_name(),
+                                         attr->getValueType(),
+                                         E::AttributeReferenceScope::kLocal);
+  }
+  return input;
+}
+
+}  // namespace optimizer
+}  // namespace quickstep
diff --git a/query_optimizer/rules/UnnestSubqueries.hpp b/query_optimizer/rules/UnnestSubqueries.hpp
new file mode 100644
index 0000000..1ab48ac
--- /dev/null
+++ b/query_optimizer/rules/UnnestSubqueries.hpp
@@ -0,0 +1,219 @@
+/**
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin—Madison.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_QUERY_OPTIMIZER_RULES_UNNEST_SUBQUERIES_HPP_
+#define QUICKSTEP_QUERY_OPTIMIZER_RULES_UNNEST_SUBQUERIES_HPP_
+
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include "query_optimizer/expressions/AttributeReference.hpp"
+#include "query_optimizer/expressions/ExprId.hpp"
+#include "query_optimizer/expressions/Expression.hpp"
+#include "query_optimizer/expressions/ExpressionUtil.hpp"
+#include "query_optimizer/expressions/Predicate.hpp"
+#include "query_optimizer/logical/Logical.hpp"
+#include "query_optimizer/rules/Rule.hpp"
+#include "query_optimizer/rules/BottomUpRule.hpp"
+#include "utility/Macros.hpp"
+
+namespace quickstep {
+namespace optimizer {
+
+class OptimizerContext;
+
+namespace expressions {
+
+class Exists;
+
+}
+
+struct CorrelatedQueryInfo;
+
+/**
+ * @brief Rule that removes subquery expressions including IN with a subquery,
+ *        Exists and scalar subquery expressions.
+ */
+class UnnestSubqueries : public Rule<logical::Logical> {
+ public:
+  explicit UnnestSubqueries(OptimizerContext *context)
+      : context_(context) {}
+
+  std::string getName() const override { return "UnnestSubqueries"; }
+
+  logical::LogicalPtr apply(const logical::LogicalPtr &input) override;
+
+ private:
+  OptimizerContext *context_;
+
+  DISALLOW_COPY_AND_ASSIGN(UnnestSubqueries);
+};
+
+class UnnestSubqueriesForNonRootLogical : public Rule<logical::Logical> {
+ public:
+  UnnestSubqueriesForNonRootLogical(
+      const bool scalar_query,
+      const expressions::UnorderedNamedExpressionSet &visiable_attrs_from_outer_query,
+      OptimizerContext *context,
+      std::unordered_map<expressions::ExprId, logical::LogicalPtr> *uncorrelated_subqueries,
+      std::vector<expressions::AttributeReferencePtr> *probe_join_attributes,
+      std::vector<expressions::AttributeReferencePtr> *build_join_attributes,
+      std::vector<expressions::PredicatePtr> *correlated_non_hash_join_predicates)
+      : scalar_query_(scalar_query),
+        context_(context),
+        visiable_attrs_from_outer_query_(visiable_attrs_from_outer_query),
+        uncorrelated_subqueries_(uncorrelated_subqueries),
+        probe_join_attributes_(probe_join_attributes),
+        build_join_attributes_(build_join_attributes),
+        correlated_non_hash_join_predicates_(correlated_non_hash_join_predicates) {}
+
+  std::string getName() const override { return "UnnestScalarSubuqeryForLogical"; }
+
+  logical::LogicalPtr apply(const logical::LogicalPtr &input) override {
+    expressions::UnorderedAttributeSet inner_attributes;
+    std::vector<expressions::AttributeReferencePtr> probe_join_attributes;
+    std::vector<expressions::AttributeReferencePtr> build_join_attributes;
+    std::vector<expressions::PredicatePtr> non_hash_join_predicates;
+
+    const logical::LogicalPtr output =
+        applyInternal(input,
+                      &inner_attributes,
+                      &probe_join_attributes,
+                      &build_join_attributes,
+                      &non_hash_join_predicates);
+
+    probe_join_attributes_->insert(probe_join_attributes_->end(),
+                                   probe_join_attributes.begin(),
+                                   probe_join_attributes.end());
+    build_join_attributes_->insert(build_join_attributes_->end(),
+                                   build_join_attributes.begin(),
+                                   build_join_attributes.end());
+    correlated_non_hash_join_predicates_->insert(correlated_non_hash_join_predicates_->end(),
+                                                 non_hash_join_predicates.begin(),
+                                                 non_hash_join_predicates.end());
+
+    return output;
+  }
+
+ private:
+  logical::LogicalPtr applyInternal(const logical::LogicalPtr &input,
+                                    expressions::UnorderedAttributeSet *inner_attributes,
+                                    std::vector<expressions::AttributeReferencePtr> *probe_join_attributes,
+                                    std::vector<expressions::AttributeReferencePtr> *build_join_attributes,
+                                    std::vector<expressions::PredicatePtr> *non_hash_join_predicates);
+
+  logical::LogicalPtr applyToNode(
+      const logical::LogicalPtr &input,
+      expressions::UnorderedAttributeSet *inner_attributes,
+      std::vector<expressions::AttributeReferencePtr> *probe_join_attributes,
+      std::vector<expressions::AttributeReferencePtr> *build_join_attributes,
+      std::vector<expressions::PredicatePtr> *non_hash_join_predicates);
+
+  // For each input expression, call UnnestSubqueriesForExpession to
+  // eliminate any subquery expressions in it and add a join if necessary.
+  logical::LogicalPtr eliminateNestedScalarQueries(const logical::LogicalPtr &node);
+
+  // Eliminate outer attribute references in this node, add predicates with
+  // outer attribute references into join attributes or non_hash_join_predicates.
+  expressions::ExpressionPtr eliminateOuterAttributeReference(
+      const expressions::ExpressionPtr &expression,
+      expressions::UnorderedAttributeSet *inner_attributes,
+      std::vector<expressions::AttributeReferencePtr> *probe_join_attributes,
+      std::vector<expressions::AttributeReferencePtr> *build_join_attributes,
+      std::vector<expressions::PredicatePtr> *non_hash_join_predicates);
+
+  void validateNonOuterAttributeReference(const expressions::ExpressionPtr &expression);
+
+  bool isCorrelatedOuterAttribute(const expressions::AttributeReferencePtr &attribute) const;
+
+  const bool scalar_query_;
+  OptimizerContext *context_;
+  const expressions::UnorderedNamedExpressionSet &visiable_attrs_from_outer_query_;
+  std::unordered_map<expressions::ExprId, logical::LogicalPtr> *uncorrelated_subqueries_;
+  std::vector<expressions::AttributeReferencePtr> *probe_join_attributes_;
+  std::vector<expressions::AttributeReferencePtr> *build_join_attributes_;
+  std::vector<expressions::PredicatePtr> *correlated_non_hash_join_predicates_;
+
+  DISALLOW_COPY_AND_ASSIGN(UnnestSubqueriesForNonRootLogical);
+};
+
+class UnnestSubqueriesForExpession : public Rule<expressions::Expression> {
+ public:
+  UnnestSubqueriesForExpession(
+      const expressions::UnorderedNamedExpressionSet &visible_attributes_from_outer_query,
+      OptimizerContext *context,
+      std::unordered_map<expressions::ExprId, logical::LogicalPtr> *uncorrelated_subqueries,
+      std::vector<CorrelatedQueryInfo> *correlated_query_info_vec)
+      : visible_attributes_from_outer_query_(visible_attributes_from_outer_query),
+        context_(context),
+        uncorrelated_subqueries_(uncorrelated_subqueries),
+        correlated_query_info_vec_(correlated_query_info_vec) {}
+
+  std::string getName() const override { return "UnnestScalarSubqueryForExpession"; }
+
+  expressions::ExpressionPtr apply(const expressions::ExpressionPtr &node) override {
+    return applyInternal(true /* allow_exists_or_in */,
+                         node);
+  }
+
+ private:
+  const expressions::UnorderedNamedExpressionSet &visible_attributes_from_outer_query_;
+
+  expressions::ExpressionPtr applyInternal(
+      const bool allow_exists_or_in,
+      const expressions::ExpressionPtr &node);
+
+  void transformExists(const expressions::Exists &exists_predicate);
+
+  OptimizerContext *context_;
+  std::unordered_map<expressions::ExprId, logical::LogicalPtr> *uncorrelated_subqueries_;
+  std::vector<CorrelatedQueryInfo> *correlated_query_info_vec_;
+
+  DISALLOW_COPY_AND_ASSIGN(UnnestSubqueriesForExpession);
+};
+
+class DeOuterAttributeReference : public BottomUpRule<expressions::Expression> {
+ public:
+  DeOuterAttributeReference(
+      const bool allow_outer_reference,
+      const std::unordered_map<expressions::ExprId, logical::LogicalPtr> &uncorrelated_subqueries,
+      const expressions::UnorderedNamedExpressionSet &visiable_attrs_from_outer_query)
+      : allow_outer_reference_(allow_outer_reference),
+        uncorrelated_subqueries_(uncorrelated_subqueries),
+        visiable_attrs_from_outer_query_(visiable_attrs_from_outer_query) {}
+
+  std::string getName() const override {
+    return "DeOuterAttributeReference";
+  }
+
+ protected:
+  expressions::ExpressionPtr applyToNode(const expressions::ExpressionPtr &input) override;
+
+ private:
+  const bool allow_outer_reference_;
+  const std::unordered_map<expressions::ExprId, logical::LogicalPtr> &uncorrelated_subqueries_;
+  const expressions::UnorderedNamedExpressionSet &visiable_attrs_from_outer_query_;
+
+  DISALLOW_COPY_AND_ASSIGN(DeOuterAttributeReference);
+};
+
+}  // namespace optimizer
+}  // namespace quickstep
+
+#endif /* QUICKSTEP_QUERY_OPTIMIZER_RULES_UNNEST_SUBQUERIES_HPP_ */
diff --git a/query_optimizer/rules/tests/GenerateJoins_unittest.cpp b/query_optimizer/rules/tests/GenerateJoins_unittest.cpp
index dc26899..b7db431 100644
--- a/query_optimizer/rules/tests/GenerateJoins_unittest.cpp
+++ b/query_optimizer/rules/tests/GenerateJoins_unittest.cpp
@@ -1,6 +1,8 @@
 /**
  *   Copyright 2011-2015 Quickstep Technologies LLC.
  *   Copyright 2015 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin—Madison.
  *
  *   Licensed under the Apache License, Version 2.0 (the "License");
  *   you may not use this file except in compliance with the License.
@@ -17,6 +19,8 @@
 
 #include "query_optimizer/rules/GenerateJoins.hpp"
 
+#include <cstddef>
+#include <memory>
 #include <utility>
 #include <vector>
 
@@ -30,16 +34,16 @@
 #include "query_optimizer/logical/MultiwayCartesianJoin.hpp"
 #include "query_optimizer/logical/NestedLoopsJoin.hpp"
 #include "query_optimizer/logical/TableReference.hpp"
+#include "query_optimizer/rules/Rule.hpp"
 #include "query_optimizer/rules/tests/LogicalRuleTest.hpp"
 #include "query_optimizer/rules/tests/RuleTest.hpp"
-#include "types/operations/binary_operations/BinaryOperation.hpp"
 #include "types/operations/binary_operations/BinaryOperationFactory.hpp"
 #include "types/operations/binary_operations/BinaryOperationID.hpp"
-#include "types/operations/comparisons/Comparison.hpp"
 #include "types/operations/comparisons/ComparisonFactory.hpp"
 #include "types/operations/comparisons/ComparisonID.hpp"
 #include "utility/Macros.hpp"
 
+#include "glog/logging.h"
 #include "gtest/gtest.h"
 
 namespace quickstep {
@@ -102,12 +106,16 @@
         join_operands[0],
         join_operands[1],
         {join_attribute_pairs[0].first},
-        {join_attribute_pairs[0].second});
+        {join_attribute_pairs[0].second},
+        nullptr /* residual_predicate */,
+        L::HashJoin::JoinType::kInnerJoin);
     for (size_t i = 2; i < join_operands.size(); ++i) {
       hash_join = L::HashJoin::Create(hash_join,
                                       join_operands[i],
                                       {join_attribute_pairs[i - 1].first},
-                                      {join_attribute_pairs[i - 1].second});
+                                      {join_attribute_pairs[i - 1].second},
+                                      nullptr /* residual_predicate */,
+                                      L::HashJoin::JoinType::kInnerJoin);
     }
     return hash_join;
   }
@@ -216,7 +224,9 @@
                                             {relation_attribute_reference_0_0_,
                                              relation_attribute_reference_0_1_},
                                             {relation_attribute_reference_1_0_,
-                                             relation_attribute_reference_1_1_}),
+                                             relation_attribute_reference_1_1_},
+                                            nullptr /* residual_predicate */,
+                                            L::HashJoin::JoinType::kInnerJoin),
                         single_table_predicate);
   APPLY_RULE_AND_CHECK_OUTPUT();
 }
diff --git a/query_optimizer/rules/tests/PruneColumns_unittest.cpp b/query_optimizer/rules/tests/PruneColumns_unittest.cpp
index 0b0739f..ff51abf 100644
--- a/query_optimizer/rules/tests/PruneColumns_unittest.cpp
+++ b/query_optimizer/rules/tests/PruneColumns_unittest.cpp
@@ -1,6 +1,8 @@
 /**
  *   Copyright 2011-2015 Quickstep Technologies LLC.
  *   Copyright 2015 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin—Madison.
  *
  *   Licensed under the Apache License, Version 2.0 (the "License");
  *   you may not use this file except in compliance with the License.
@@ -20,7 +22,6 @@
 #include <memory>
 #include <vector>
 
-#include "query_optimizer/expressions/Alias.hpp"
 #include "query_optimizer/expressions/AttributeReference.hpp"
 #include "query_optimizer/expressions/NamedExpression.hpp"
 #include "query_optimizer/expressions/Predicate.hpp"
@@ -37,6 +38,9 @@
 #include "utility/Cast.hpp"
 #include "utility/Macros.hpp"
 
+#include "glog/logging.h"
+#include "gtest/gtest.h"
+
 namespace quickstep {
 namespace optimizer {
 
@@ -91,7 +95,8 @@
                                                        {relation_attribute_reference_0_0_},
                                                        {relation_attribute_reference_1_0_},
                                                        E::PredicatePtr(),
-                                                       project_expressions_with_redundancy);
+                                                       project_expressions_with_redundancy,
+                                                       P::HashJoin::JoinType::kInnerJoin);
 
   // alias_add_literal_0_: relation_attribute_reference_0_0_ + literal_0_.
   // filter_predicate_1_: relation_attribute_reference_1_0_ > literal_0_.
@@ -118,7 +123,8 @@
       {relation_attribute_reference_0_0_},
       {relation_attribute_reference_1_0_},
       E::PredicatePtr(),
-      pruned_project_expressions_for_join);
+      pruned_project_expressions_for_join,
+      P::HashJoin::JoinType::kInnerJoin);
   const P::SelectionPtr new_selection = std::static_pointer_cast<const P::Selection>(
       selection->copyWithNewChildren({new_hash_join} /* new_children */));
   expect_output_ = P::TopLevelPlan::Create(new_selection);
diff --git a/query_optimizer/strategy/CMakeLists.txt b/query_optimizer/strategy/CMakeLists.txt
index e4052cf..74f5a4b 100644
--- a/query_optimizer/strategy/CMakeLists.txt
+++ b/query_optimizer/strategy/CMakeLists.txt
@@ -1,5 +1,7 @@
 #   Copyright 2011-2015 Quickstep Technologies LLC.
 #   Copyright 2015 Pivotal Software, Inc.
+#   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+#     University of Wisconsin—Madison.
 #
 #   Licensed under the Apache License, Version 2.0 (the "License");
 #   you may not use this file except in compliance with the License.
@@ -43,8 +45,6 @@
 target_link_libraries(quickstep_queryoptimizer_strategy_Join
                       glog
                       quickstep_queryoptimizer_LogicalToPhysicalMapper
-                      quickstep_queryoptimizer_OptimizerContext
-                      quickstep_queryoptimizer_expressions_Alias
                       quickstep_queryoptimizer_expressions_AttributeReference
                       quickstep_queryoptimizer_expressions_ComparisonExpression
                       quickstep_queryoptimizer_expressions_Expression
@@ -81,7 +81,6 @@
                       quickstep_queryoptimizer_logical_InsertSelection
                       quickstep_queryoptimizer_logical_InsertTuple
                       quickstep_queryoptimizer_logical_Logical
-                      quickstep_queryoptimizer_logical_Logical
                       quickstep_queryoptimizer_logical_LogicalType
                       quickstep_queryoptimizer_logical_Sample
                       quickstep_queryoptimizer_logical_SharedSubplanReference
diff --git a/query_optimizer/strategy/Join.cpp b/query_optimizer/strategy/Join.cpp
index e9dd1d6..becea61 100644
--- a/query_optimizer/strategy/Join.cpp
+++ b/query_optimizer/strategy/Join.cpp
@@ -1,6 +1,8 @@
 /**
  *   Copyright 2011-2015 Quickstep Technologies LLC.
  *   Copyright 2015 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin—Madison.
  *
  *   Licensed under the Apache License, Version 2.0 (the "License");
  *   you may not use this file except in compliance with the License.
@@ -17,11 +19,10 @@
 
 #include "query_optimizer/strategy/Join.hpp"
 
+#include <type_traits>
 #include <vector>
 
 #include "query_optimizer/LogicalToPhysicalMapper.hpp"
-#include "query_optimizer/OptimizerContext.hpp"
-#include "query_optimizer/expressions/Alias.hpp"
 #include "query_optimizer/expressions/AttributeReference.hpp"
 #include "query_optimizer/expressions/ComparisonExpression.hpp"
 #include "query_optimizer/expressions/Expression.hpp"
@@ -66,7 +67,7 @@
     if (L::SomeHashJoin::MatchesWithConditionalCast(logical_project->input(),
                                                     &logical_hash_join)) {
       addHashJoin(logical_project,
-                  logical_filter,
+                  nullptr /* logical_filter */,
                   logical_hash_join,
                   physical_output);
       return true;
@@ -83,15 +84,18 @@
   }
 
   // Collapse project-filter-join.
+  // Note that Filter cannot be pushed into the semi-join, anti-join or outer-join.
   if (logical_project != nullptr &&
       L::SomeFilter::MatchesWithConditionalCast(logical_project->input(), &logical_filter)) {
     if (L::SomeHashJoin::MatchesWithConditionalCast(logical_filter->input(),
                                                     &logical_hash_join)) {
-      addHashJoin(logical_project,
-                  logical_filter,
-                  logical_hash_join,
-                  physical_output);
-      return true;
+      if (logical_hash_join->join_type() == L::HashJoin::JoinType::kInnerJoin) {
+        addHashJoin(logical_project,
+                    logical_filter,
+                    logical_hash_join,
+                    physical_output);
+        return true;
+      }
     }
 
     if (L::SomeNestedLoopsJoin::MatchesWithConditionalCast(logical_filter->input(),
@@ -107,14 +111,17 @@
   }
 
   // Collapse filter-join.
+  // Note that Filter cannot be pushed into the semi-join, anti-join or outer-join.
   if (L::SomeFilter::MatchesWithConditionalCast(logical_input, &logical_filter)) {
     if (L::SomeHashJoin::MatchesWithConditionalCast(logical_filter->input(),
                                                     &logical_hash_join)) {
-      addHashJoin(logical_project,
-                  logical_filter,
-                  logical_hash_join,
-                  physical_output);
-      return true;
+      if (logical_hash_join->join_type() == L::HashJoin::JoinType::kInnerJoin) {
+        addHashJoin(nullptr /* logical_project */,
+                    logical_filter,
+                    logical_hash_join,
+                    physical_output);
+        return true;
+      }
     }
 
     if (L::SomeNestedLoopsJoin::MatchesWithConditionalCast(logical_filter->input(),
@@ -132,8 +139,8 @@
 
   // Convert a single binary join.
   if (L::SomeHashJoin::MatchesWithConditionalCast(logical_input, &logical_hash_join)) {
-    addHashJoin(logical_project,
-                logical_filter,
+    addHashJoin(nullptr /* logical_project */,
+                nullptr /* logical_filter */,
                 logical_hash_join,
                 physical_output);
     return true;
@@ -169,6 +176,11 @@
   std::vector<E::AttributeReferencePtr> left_join_attributes = logical_hash_join->left_join_attributes();
   std::vector<E::AttributeReferencePtr> right_join_attributes = logical_hash_join->right_join_attributes();
   std::vector<E::ExpressionPtr> non_hash_join_predicates;
+
+  if (logical_hash_join->residual_predicate() != nullptr) {
+    non_hash_join_predicates.emplace_back(logical_hash_join->residual_predicate());
+  }
+
   if (logical_filter != nullptr) {
     std::vector<E::PredicatePtr> filter_predicates;
     const std::vector<E::AttributeReferencePtr> left_input_attributes =
@@ -291,13 +303,31 @@
     residual_predicate = E::LogicalAnd::Create(CastSharedPtrVector<E::Predicate>(non_hash_join_predicates));
   }
 
+  P::HashJoin::JoinType join_type;
+  switch (logical_hash_join->join_type()) {
+    case L::HashJoin::JoinType::kInnerJoin:
+      join_type = P::HashJoin::JoinType::kInnerJoin;
+      break;
+    case L::HashJoin::JoinType::kLeftSemiJoin:
+      join_type = P::HashJoin::JoinType::kLeftSemiJoin;
+      break;
+    case L::HashJoin::JoinType::kLeftAntiJoin:
+      join_type = P::HashJoin::JoinType::kLeftAntiJoin;
+      break;
+    default:
+      LOG(FATAL) << "Invalid logical::HashJoin::JoinType: "
+                 << static_cast<typename std::underlying_type<L::HashJoin::JoinType>::type>(
+                        logical_hash_join->join_type());
+  }
+
   *physical_output =
       P::HashJoin::Create(left,
                           right,
                           left_join_attributes,
                           right_join_attributes,
                           residual_predicate,
-                          project_expressions);
+                          project_expressions,
+                          join_type);
 }
 
 void Join::addNestedLoopsJoin(const L::NestedLoopsJoinPtr &logical_nested_loops_join,
diff --git a/query_optimizer/strategy/Join.hpp b/query_optimizer/strategy/Join.hpp
index 7a87a16..efba286 100644
--- a/query_optimizer/strategy/Join.hpp
+++ b/query_optimizer/strategy/Join.hpp
@@ -1,6 +1,8 @@
 /**
  *   Copyright 2011-2015 Quickstep Technologies LLC.
  *   Copyright 2015 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin—Madison.
  *
  *   Licensed under the Apache License, Version 2.0 (the "License");
  *   you may not use this file except in compliance with the License.
@@ -22,7 +24,6 @@
 #include <vector>
 
 #include "query_optimizer/expressions/NamedExpression.hpp"
-#include "query_optimizer/expressions/Expression.hpp"
 #include "query_optimizer/expressions/Predicate.hpp"
 #include "query_optimizer/logical/Filter.hpp"
 #include "query_optimizer/logical/HashJoin.hpp"
@@ -70,7 +71,6 @@
    * @param logical_project The project to be folded into the hash join.
    * @param logical_filter The filter to be folded into the hash join.
    * @param logical_hash_join The hash join to be transformed.
-   * @param project_expressions The project expressions for the join.
    * @param physical_output The output physical plan.
    */
   void addHashJoin(const logical::ProjectPtr &logical_project,
diff --git a/query_optimizer/strategy/tests/Join_unittest.cpp b/query_optimizer/strategy/tests/Join_unittest.cpp
index 2ed8a8f..bbfee05 100644
--- a/query_optimizer/strategy/tests/Join_unittest.cpp
+++ b/query_optimizer/strategy/tests/Join_unittest.cpp
@@ -1,6 +1,8 @@
 /**
  *   Copyright 2011-2015 Quickstep Technologies LLC.
  *   Copyright 2015 Pivotal Software, Inc.
+ *   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+ *     University of Wisconsin—Madison.
  *
  *   Licensed under the Apache License, Version 2.0 (the "License");
  *   you may not use this file except in compliance with the License.
@@ -27,7 +29,6 @@
 #include "query_optimizer/expressions/ExpressionUtil.hpp"
 #include "query_optimizer/expressions/LogicalAnd.hpp"
 #include "query_optimizer/expressions/NamedExpression.hpp"
-#include "query_optimizer/expressions/Scalar.hpp"
 #include "query_optimizer/logical/Filter.hpp"
 #include "query_optimizer/logical/HashJoin.hpp"
 #include "query_optimizer/logical/Logical.hpp"
@@ -38,15 +39,9 @@
 #include "query_optimizer/physical/NestedLoopsJoin.hpp"
 #include "query_optimizer/physical/Physical.hpp"
 #include "query_optimizer/physical/Selection.hpp"
-#include "query_optimizer/physical/TableReference.hpp"
 #include "query_optimizer/strategy/tests/StrategyTest.hpp"
-#include "types/operations/binary_operations/BinaryOperation.hpp"
-#include "types/operations/binary_operations/BinaryOperationFactory.hpp"
-#include "types/operations/binary_operations/BinaryOperationID.hpp"
-#include "types/operations/comparisons/Comparison.hpp"
 #include "types/operations/comparisons/ComparisonFactory.hpp"
 #include "types/operations/comparisons/ComparisonID.hpp"
-#include "utility/Cast.hpp"
 #include "utility/Macros.hpp"
 
 #include "glog/logging.h"
@@ -77,7 +72,10 @@
         logical_table_reference_0_,
         logical_table_reference_1_,
         {relation_attribute_reference_0_0_},
-        {relation_attribute_reference_1_0_});
+        {relation_attribute_reference_1_0_},
+        nullptr /* residual_predicate */,
+        L::HashJoin::JoinType::kInnerJoin);
+
 
     std::vector<E::NamedExpressionPtr> project_expressions(
         E::ToNamedExpressions(logical_table_reference_0_->getOutputAttributes()));
@@ -98,7 +96,8 @@
                             {relation_attribute_reference_0_0_},
                             {relation_attribute_reference_1_0_},
                             E::PredicatePtr(),
-                            project_expressions);
+                            project_expressions,
+                            P::HashJoin::JoinType::kInnerJoin);
   }
 
   void setupStrategy(std::unique_ptr<Strategy> *strategy) override {
@@ -143,7 +142,8 @@
       {relation_attribute_reference_0_0_},
       {relation_attribute_reference_1_0_},
       E::PredicatePtr(),
-      logical_project_0_->project_expressions());
+      logical_project_0_->project_expressions(),
+      P::HashJoin::JoinType::kInnerJoin);
   EXPECT_CORRECT_PHYSICAL();
 }
 
@@ -183,7 +183,8 @@
       {relation_attribute_reference_1_0_,
        relation_attribute_reference_1_1_},
       filter_predicate_0_,
-      logical_project_1_->project_expressions());
+      logical_project_1_->project_expressions(),
+      P::HashJoin::JoinType::kInnerJoin);
   EXPECT_CORRECT_PHYSICAL();
 }
 
@@ -211,7 +212,8 @@
       {relation_attribute_reference_0_0_},
       {relation_attribute_reference_1_0_},
       filter_predicate_0_,
-      physical_nested_loops_join_->project_expressions());
+      physical_nested_loops_join_->project_expressions(),
+      P::HashJoin::JoinType::kInnerJoin);
   EXPECT_CORRECT_PHYSICAL();
 }
 
@@ -222,7 +224,9 @@
       L::HashJoin::Create(logical_project_0_,
                           logical_project_on_filter_1_,
                           {relation_attribute_reference_0_0_},
-                          {relation_attribute_reference_1_0_});
+                          {relation_attribute_reference_1_0_},
+                          nullptr /* residual_predicate */,
+                          L::HashJoin::JoinType::kInnerJoin);
   // References an attribute created by the left underlying project of the hash
   // join.
   const E::AliasPtr alias_on_alias_reference = E::Alias::Create(
@@ -244,7 +248,8 @@
       {relation_attribute_reference_0_0_},
       {relation_attribute_reference_1_0_},
       E::PredicatePtr(),
-      {alias_on_alias_reference_after_pullup} /* project_expressions */);
+      {alias_on_alias_reference_after_pullup} /* project_expressions */,
+      P::HashJoin::JoinType::kInnerJoin);
   EXPECT_CORRECT_PHYSICAL();
 
   // HashJoin -- Project
@@ -269,7 +274,9 @@
   input_logical_ = L::HashJoin::Create(logical_project_0_,
                                        logical_project_on_attribute_reference,
                                        {E::ToRef(alias_add_literal_0_)},
-                                       {relation_attribute_reference_1_0_});
+                                       {relation_attribute_reference_1_0_},
+                                       nullptr /* residual_predicate */,
+                                       L::HashJoin::JoinType::kInnerJoin);
   std::vector<E::NamedExpressionPtr> project_expressions(
       E::ToNamedExpressions(physical_project_0_->getOutputAttributes()));
   project_expressions.push_back(alias_on_attribute_reference);
@@ -279,7 +286,8 @@
                           {E::ToRef(alias_add_literal_0_)},
                           {relation_attribute_reference_1_0_},
                           E::PredicatePtr(),
-                          project_expressions);
+                          project_expressions,
+                          P::HashJoin::JoinType::kInnerJoin);
   EXPECT_CORRECT_PHYSICAL();
 }
 
diff --git a/query_optimizer/tests/logical_generator/Select.test b/query_optimizer/tests/logical_generator/Select.test
index 2709ce3..a7a9cf8 100644
--- a/query_optimizer/tests/logical_generator/Select.test
+++ b/query_optimizer/tests/logical_generator/Select.test
@@ -1,5 +1,7 @@
 #   Copyright 2011-2015 Quickstep Technologies LLC.
 #   Copyright 2015 Pivotal Software, Inc.
+#   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+#     University of Wisconsin—Madison.
 #
 #   Licensed under the Apache License, Version 2.0 (the "License");
 #   you may not use this file except in compliance with the License.
@@ -653,3 +655,68 @@
   | type=Long]
   +-AttributeReference[id=7,name=,alias=SUM(float_col),relation=,
     type=Double NULL]
+==
+
+SELECT i
+FROM generate_series(0, 100, 3) AS gs1(i)
+WHERE
+  EXISTS (
+    SELECT *
+    FROM generate_series(0, 100, 5) AS gs2(i)
+    WHERE gs1.i = gs2.i
+  )
+  AND NOT EXISTS (
+    SELECT *
+    FROM generate_series(0, 100, 10) AS gs3(i)
+    WHERE gs1.i = gs3.i
+  )
+  AND (i < 40 OR i > 60);
+--
+TopLevelPlan
++-plan=Project
+| +-input=HashLeftSemiJoin
+| | +-left=HashLeftAntiJoin
+| | | +-left=Filter
+| | | | +-input=Project
+| | | | | +-input=TableGenerator[function_name=generate_series,table_alias=gs1]
+| | | | | | +-AttributeReference[id=0,name=generate_series,alias=gs1,
+| | | | | |   relation=generate_series,type=Int]
+| | | | | +-project_list=
+| | | | |   +-Alias[id=1,name=i,relation=,type=Int]
+| | | | |     +-AttributeReference[id=0,name=generate_series,alias=gs1,
+| | | | |       relation=generate_series,type=Int]
+| | | | +-filter_predicate=Or
+| | | |   +-Less
+| | | |   | +-AttributeReference[id=1,name=i,relation=,type=Int]
+| | | |   | +-Literal[value=40,type=Int]
+| | | |   +-Greater
+| | | |     +-AttributeReference[id=1,name=i,relation=,type=Int]
+| | | |     +-Literal[value=60,type=Int]
+| | | +-right=Project
+| | | | +-input=TableGenerator[function_name=generate_series,table_alias=gs3]
+| | | | | +-AttributeReference[id=4,name=generate_series,alias=gs3,
+| | | | |   relation=generate_series,type=Int]
+| | | | +-project_list=
+| | | |   +-Alias[id=5,name=i,relation=,type=Int]
+| | | |     +-AttributeReference[id=4,name=generate_series,alias=gs3,
+| | | |       relation=generate_series,type=Int]
+| | | +-left_join_attributes=
+| | | | +-AttributeReference[id=1,name=i,relation=,type=Int]
+| | | +-right_join_attributes=
+| | |   +-AttributeReference[id=5,name=i,relation=,type=Int]
+| | +-right=Project
+| | | +-input=TableGenerator[function_name=generate_series,table_alias=gs2]
+| | | | +-AttributeReference[id=2,name=generate_series,alias=gs2,
+| | | |   relation=generate_series,type=Int]
+| | | +-project_list=
+| | |   +-Alias[id=3,name=i,relation=,type=Int]
+| | |     +-AttributeReference[id=2,name=generate_series,alias=gs2,
+| | |       relation=generate_series,type=Int]
+| | +-left_join_attributes=
+| | | +-AttributeReference[id=1,name=i,relation=,type=Int]
+| | +-right_join_attributes=
+| |   +-AttributeReference[id=3,name=i,relation=,type=Int]
+| +-project_list=
+|   +-AttributeReference[id=1,name=i,relation=,type=Int]
++-output_attributes=
+  +-AttributeReference[id=1,name=i,relation=,type=Int]
diff --git a/query_optimizer/tests/physical_generator/Select.test b/query_optimizer/tests/physical_generator/Select.test
index d653a9b..61ba01c 100644
--- a/query_optimizer/tests/physical_generator/Select.test
+++ b/query_optimizer/tests/physical_generator/Select.test
@@ -1,5 +1,7 @@
 #   Copyright 2011-2015 Quickstep Technologies LLC.
 #   Copyright 2015 Pivotal Software, Inc.
+#   Copyright 2016, Quickstep Research Group, Computer Sciences Department,
+#     University of Wisconsin—Madison.
 #
 #   Licensed under the Apache License, Version 2.0 (the "License");
 #   you may not use this file except in compliance with the License.
@@ -1691,3 +1693,112 @@
   | type=Long]
   +-AttributeReference[id=7,name=,alias=SUM(float_col),relation=,
     type=Double NULL]
+==
+
+SELECT i
+FROM generate_series(0, 100, 3) AS gs1(i)
+WHERE
+  EXISTS (
+    SELECT *
+    FROM generate_series(0, 100, 5) AS gs2(i)
+    WHERE gs1.i = gs2.i
+  )
+  AND NOT EXISTS (
+    SELECT *
+    FROM generate_series(0, 100, 10) AS gs3(i)
+    WHERE gs1.i = gs3.i
+  )
+  AND (i < 40 OR i > 60);
+--
+[Optimized Logical Plan]
+TopLevelPlan
++-plan=Project
+| +-input=HashLeftSemiJoin
+| | +-left=HashLeftAntiJoin
+| | | +-left=Filter
+| | | | +-input=Project
+| | | | | +-input=TableGenerator[function_name=generate_series,table_alias=gs1]
+| | | | | | +-AttributeReference[id=0,name=generate_series,alias=gs1,
+| | | | | |   relation=generate_series,type=Int]
+| | | | | +-project_list=
+| | | | |   +-Alias[id=1,name=i,relation=,type=Int]
+| | | | |     +-AttributeReference[id=0,name=generate_series,alias=gs1,
+| | | | |       relation=generate_series,type=Int]
+| | | | +-filter_predicate=Or
+| | | |   +-Less
+| | | |   | +-AttributeReference[id=1,name=i,relation=,type=Int]
+| | | |   | +-Literal[value=40,type=Int]
+| | | |   +-Greater
+| | | |     +-AttributeReference[id=1,name=i,relation=,type=Int]
+| | | |     +-Literal[value=60,type=Int]
+| | | +-right=Project
+| | | | +-input=TableGenerator[function_name=generate_series,table_alias=gs3]
+| | | | | +-AttributeReference[id=4,name=generate_series,alias=gs3,
+| | | | |   relation=generate_series,type=Int]
+| | | | +-project_list=
+| | | |   +-Alias[id=5,name=i,relation=,type=Int]
+| | | |     +-AttributeReference[id=4,name=generate_series,alias=gs3,
+| | | |       relation=generate_series,type=Int]
+| | | +-left_join_attributes=
+| | | | +-AttributeReference[id=1,name=i,relation=,type=Int]
+| | | +-right_join_attributes=
+| | |   +-AttributeReference[id=5,name=i,relation=,type=Int]
+| | +-right=Project
+| | | +-input=TableGenerator[function_name=generate_series,table_alias=gs2]
+| | | | +-AttributeReference[id=2,name=generate_series,alias=gs2,
+| | | |   relation=generate_series,type=Int]
+| | | +-project_list=
+| | |   +-Alias[id=3,name=i,relation=,type=Int]
+| | |     +-AttributeReference[id=2,name=generate_series,alias=gs2,
+| | |       relation=generate_series,type=Int]
+| | +-left_join_attributes=
+| | | +-AttributeReference[id=1,name=i,relation=,type=Int]
+| | +-right_join_attributes=
+| |   +-AttributeReference[id=3,name=i,relation=,type=Int]
+| +-project_list=
+|   +-AttributeReference[id=1,name=i,relation=,type=Int]
++-output_attributes=
+  +-AttributeReference[id=1,name=i,relation=,type=Int]
+[Physical Plan]
+TopLevelPlan
++-plan=HashLeftSemiJoin
+| +-left=HashLeftAntiJoin
+| | +-left=Selection
+| | | +-input=TableGenerator[function_name=generate_series,table_alias=gs1]
+| | | | +-AttributeReference[id=0,name=generate_series,alias=gs1,
+| | | |   relation=generate_series,type=Int]
+| | | +-filter_predicate=Or
+| | | | +-Less
+| | | | | +-AttributeReference[id=0,name=generate_series,alias=gs1,
+| | | | | | relation=generate_series,type=Int]
+| | | | | +-Literal[value=40,type=Int]
+| | | | +-Greater
+| | | |   +-AttributeReference[id=0,name=generate_series,alias=gs1,
+| | | |   | relation=generate_series,type=Int]
+| | | |   +-Literal[value=60,type=Int]
+| | | +-project_expressions=
+| | |   +-Alias[id=1,name=i,relation=,type=Int]
+| | |     +-AttributeReference[id=0,name=generate_series,alias=gs1,
+| | |       relation=generate_series,type=Int]
+| | +-right=TableGenerator[function_name=generate_series,table_alias=gs3]
+| | | +-AttributeReference[id=4,name=generate_series,alias=gs3,
+| | |   relation=generate_series,type=Int]
+| | +-project_expressions=
+| | | +-AttributeReference[id=1,name=i,relation=,type=Int]
+| | +-left_join_attributes=
+| | | +-AttributeReference[id=1,name=i,relation=,type=Int]
+| | +-right_join_attributes=
+| |   +-AttributeReference[id=4,name=generate_series,alias=gs3,
+| |     relation=generate_series,type=Int]
+| +-right=TableGenerator[function_name=generate_series,table_alias=gs2]
+| | +-AttributeReference[id=2,name=generate_series,alias=gs2,
+| |   relation=generate_series,type=Int]
+| +-project_expressions=
+| | +-AttributeReference[id=1,name=i,relation=,type=Int]
+| +-left_join_attributes=
+| | +-AttributeReference[id=1,name=i,relation=,type=Int]
+| +-right_join_attributes=
+|   +-AttributeReference[id=2,name=generate_series,alias=gs2,
+|     relation=generate_series,type=Int]
++-output_attributes=
+  +-AttributeReference[id=1,name=i,relation=,type=Int]
diff --git a/relational_operators/BuildHashOperator.hpp b/relational_operators/BuildHashOperator.hpp
index af72408..f9d830f 100644
--- a/relational_operators/BuildHashOperator.hpp
+++ b/relational_operators/BuildHashOperator.hpp
@@ -18,6 +18,7 @@
 #ifndef QUICKSTEP_RELATIONAL_OPERATORS_BUILD_HASH_OPERATOR_HPP_
 #define QUICKSTEP_RELATIONAL_OPERATORS_BUILD_HASH_OPERATOR_HPP_
 
+#include <utility>
 #include <vector>
 
 #include "catalog/CatalogRelation.hpp"
@@ -148,6 +149,30 @@
         hash_table_(DCHECK_NOTNULL(hash_table)),
         storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
 
+  /**
+   * @brief Constructor for the distributed version.
+   *
+   * @param input_relation The relation to build hash table on.
+   * @param join_key_attributes The IDs of equijoin attributes in
+   *        input_relation.
+   * @param any_join_key_attributes_nullable If any attribute is nullable.
+   * @param build_block_id The block id.
+   * @param hash_table The JoinHashTable to use.
+   * @param storage_manager The StorageManager to use.
+   **/
+  BuildHashWorkOrder(const CatalogRelationSchema &input_relation,
+                     std::vector<attribute_id> &&join_key_attributes,
+                     const bool any_join_key_attributes_nullable,
+                     const block_id build_block_id,
+                     JoinHashTable *hash_table,
+                     StorageManager *storage_manager)
+      : input_relation_(input_relation),
+        join_key_attributes_(std::move(join_key_attributes)),
+        any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
+        build_block_id_(build_block_id),
+        hash_table_(DCHECK_NOTNULL(hash_table)),
+        storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+
   ~BuildHashWorkOrder() override {}
 
   void execute() override;
diff --git a/relational_operators/CMakeLists.txt b/relational_operators/CMakeLists.txt
index 6194429..17a9a6f 100644
--- a/relational_operators/CMakeLists.txt
+++ b/relational_operators/CMakeLists.txt
@@ -19,6 +19,9 @@
 QS_PROTOBUF_GENERATE_CPP(relationaloperators_TextScanOperator_proto_srcs
                          relationaloperators_TextScanOperator_proto_hdrs
                          TextScanOperator.proto)
+QS_PROTOBUF_GENERATE_CPP(relationaloperators_WorkOrder_proto_srcs
+                         relationaloperators_WorkOrder_proto_hdrs
+                         WorkOrder.proto)
 
 # Declare micro-libs:
 add_library(quickstep_relationaloperators_AggregationOperator AggregationOperator.cpp AggregationOperator.hpp)
@@ -57,6 +60,10 @@
             ${relationaloperators_TextScanOperator_proto_hdrs})
 add_library(quickstep_relationaloperators_UpdateOperator UpdateOperator.cpp UpdateOperator.hpp)
 add_library(quickstep_relationaloperators_WorkOrder ../empty_src.cpp WorkOrder.hpp)
+add_library(quickstep_relationaloperators_WorkOrderFactory WorkOrderFactory.cpp WorkOrderFactory.hpp)
+add_library(quickstep_relationaloperators_WorkOrder_proto
+            ${relationaloperators_WorkOrder_proto_srcs}
+            ${relationaloperators_WorkOrder_proto_hdrs})
 
 # Link dependencies:
 target_link_libraries(quickstep_relationaloperators_AggregationOperator
@@ -131,6 +138,7 @@
 target_link_libraries(quickstep_relationaloperators_DropTableOperator
                       glog
                       quickstep_catalog_CatalogDatabase
+                      quickstep_catalog_CatalogDatabaseLite
                       quickstep_catalog_CatalogRelation
                       quickstep_catalog_CatalogTypedefs
                       quickstep_queryexecution_WorkOrdersContainer
@@ -379,9 +387,43 @@
                       tmb)
 target_link_libraries(quickstep_relationaloperators_WorkOrder
                       glog
-                      tmb
                       quickstep_queryexecution_QueryExecutionTypedefs
-                      quickstep_utility_Macros)
+                      quickstep_utility_Macros
+                      tmb)
+target_link_libraries(quickstep_relationaloperators_WorkOrderFactory
+                      glog
+                      quickstep_catalog_CatalogDatabaseLite
+                      quickstep_catalog_CatalogRelationSchema
+                      quickstep_catalog_CatalogTypedefs
+                      quickstep_queryexecution_QueryContext
+                      quickstep_relationaloperators_AggregationOperator
+                      quickstep_relationaloperators_BuildHashOperator
+                      quickstep_relationaloperators_DeleteOperator
+                      quickstep_relationaloperators_DestroyHashOperator
+                      quickstep_relationaloperators_DropTableOperator
+                      quickstep_relationaloperators_FinalizeAggregationOperator
+                      quickstep_relationaloperators_HashJoinOperator
+                      quickstep_relationaloperators_InsertOperator
+                      quickstep_relationaloperators_NestedLoopsJoinOperator
+                      quickstep_relationaloperators_SampleOperator
+                      quickstep_relationaloperators_SaveBlocksOperator
+                      quickstep_relationaloperators_SelectOperator
+                      quickstep_relationaloperators_SortMergeRunOperator
+                      quickstep_relationaloperators_SortMergeRunOperatorHelpers
+                      quickstep_relationaloperators_SortMergeRunOperator_proto
+                      quickstep_relationaloperators_SortRunGenerationOperator
+                      quickstep_relationaloperators_TableGeneratorOperator
+                      quickstep_relationaloperators_TextScanOperator
+                      quickstep_relationaloperators_TextScanOperator_proto
+                      quickstep_relationaloperators_UpdateOperator
+                      quickstep_relationaloperators_WorkOrder_proto
+                      quickstep_storage_StorageBlockInfo
+                      quickstep_utility_Macros
+                      tmb)
+target_link_libraries(quickstep_relationaloperators_WorkOrder_proto
+                      quickstep_relationaloperators_SortMergeRunOperator_proto
+                      quickstep_relationaloperators_TextScanOperator_proto
+                      ${PROTOBUF_LIBRARY})
 
 # Module all-in-one library:
 add_library(quickstep_relationaloperators ../empty_src.cpp RelationalOperatorsModule.hpp)
@@ -410,7 +452,9 @@
                       quickstep_relationaloperators_TextScanOperator
                       quickstep_relationaloperators_TextScanOperator_proto
                       quickstep_relationaloperators_UpdateOperator
-                      quickstep_relationaloperators_WorkOrder)
+                      quickstep_relationaloperators_WorkOrder
+                      quickstep_relationaloperators_WorkOrderFactory
+                      quickstep_relationaloperators_WorkOrder_proto)
 
 # Tests:
 add_executable(AggregationOperator_unittest
diff --git a/relational_operators/DropTableOperator.cpp b/relational_operators/DropTableOperator.cpp
index f3df586..f3a3a2c 100644
--- a/relational_operators/DropTableOperator.cpp
+++ b/relational_operators/DropTableOperator.cpp
@@ -21,6 +21,7 @@
 #include <vector>
 
 #include "catalog/CatalogDatabase.hpp"
+#include "catalog/CatalogDatabaseLite.hpp"
 #include "catalog/CatalogRelation.hpp"
 #include "catalog/CatalogTypedefs.hpp"
 #include "query_execution/WorkOrdersContainer.hpp"
@@ -69,6 +70,11 @@
   for (const block_id block : blocks_) {
     storage_manager_->deleteBlockOrBlobFile(block);
   }
+
+  // Drop the relation in the cache in the distributed version, if any.
+  if (catalog_database_cache_ != nullptr && rel_id_ != kInvalidCatalogId) {
+    catalog_database_cache_->dropRelationById(rel_id_);
+  }
 }
 
 }  // namespace quickstep
diff --git a/relational_operators/DropTableOperator.hpp b/relational_operators/DropTableOperator.hpp
index df350e0..bf9b1b1 100644
--- a/relational_operators/DropTableOperator.hpp
+++ b/relational_operators/DropTableOperator.hpp
@@ -21,6 +21,7 @@
 #include <utility>
 #include <vector>
 
+#include "catalog/CatalogTypedefs.hpp"
 #include "relational_operators/RelationalOperator.hpp"
 #include "relational_operators/WorkOrder.hpp"
 #include "storage/StorageBlockInfo.hpp"
@@ -35,6 +36,7 @@
 namespace quickstep {
 
 class CatalogDatabase;
+class CatalogDatabaseLite;
 class CatalogRelation;
 class QueryContext;
 class StorageManager;
@@ -95,11 +97,18 @@
    *
    * @param blocks The blocks to drop.
    * @param storage_manager The StorageManager to use.
+   * @param rel_id The relation id to drop.
+   * @param catalog_database_cache The CatalogDatabaseCache in the distributed
+   *        version.
    **/
   DropTableWorkOrder(std::vector<block_id> &&blocks,
-                     StorageManager *storage_manager)
+                     StorageManager *storage_manager,
+                     const relation_id rel_id = kInvalidCatalogId,
+                     CatalogDatabaseLite *catalog_database_cache = nullptr)
       : blocks_(std::move(blocks)),
-        storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+        storage_manager_(DCHECK_NOTNULL(storage_manager)),
+        rel_id_(rel_id),
+        catalog_database_cache_(catalog_database_cache) {}
 
   ~DropTableWorkOrder() override {}
 
@@ -107,9 +116,12 @@
 
  private:
   const std::vector<block_id> blocks_;
-
   StorageManager *storage_manager_;
 
+  // Used in the distributed version.
+  const relation_id rel_id_;
+  CatalogDatabaseLite *catalog_database_cache_;
+
   DISALLOW_COPY_AND_ASSIGN(DropTableWorkOrder);
 };
 
diff --git a/relational_operators/HashJoinOperator.hpp b/relational_operators/HashJoinOperator.hpp
index e1996d0..a00e590 100644
--- a/relational_operators/HashJoinOperator.hpp
+++ b/relational_operators/HashJoinOperator.hpp
@@ -19,8 +19,9 @@
 #define QUICKSTEP_RELATIONAL_OPERATORS_HASH_JOIN_OPERATOR_HPP_
 
 #include <cstddef>
-#include <vector>
 #include <memory>
+#include <utility>
+#include <vector>
 
 #include "catalog/CatalogRelation.hpp"
 #include "catalog/CatalogTypedefs.hpp"
@@ -230,6 +231,49 @@
         hash_table_(DCHECK_NOTNULL(hash_table)),
         storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
 
+  /**
+   * @brief Constructor for the distributed version.
+   *
+   * @param build_relation The relation that the hash table was originally built
+   *        on (i.e. the inner relation in the join).
+   * @param probe_relation The relation to probe the hash table with (i.e. the
+   *        outer relation in the join).
+   * @param join_key_attributes The IDs of equijoin attributes in \c
+   *        probe_relation.
+   * @param any_join_key_attributes_nullable If any attribute is nullable.
+   * @param lookup_block_id The block id of the probe_relation.
+   * @param residual_predicate If non-null, apply as an additional filter to
+   *        pairs of tuples that match the hash-join (i.e. key equality)
+   *        predicate. Effectively, this makes the join predicate the
+   *        conjunction of the key-equality predicate and residual_predicate.
+   * @param selection A list of Scalars corresponding to the relation attributes
+   *        in \c output_destination. Each Scalar is evaluated for the joined
+   *        tuples, and the resulting value is inserted into the join result.
+   * @param output_destination The InsertDestination to insert the join results.
+   * @param hash_table The JoinHashTable to use.
+   * @param storage_manager The StorageManager to use.
+   **/
+  HashJoinWorkOrder(const CatalogRelationSchema &build_relation,
+                    const CatalogRelationSchema &probe_relation,
+                    std::vector<attribute_id> &&join_key_attributes,
+                    const bool any_join_key_attributes_nullable,
+                    const block_id lookup_block_id,
+                    const Predicate *residual_predicate,
+                    const std::vector<std::unique_ptr<const Scalar>> &selection,
+                    InsertDestination *output_destination,
+                    JoinHashTable *hash_table,
+                    StorageManager *storage_manager)
+      : build_relation_(build_relation),
+        probe_relation_(probe_relation),
+        join_key_attributes_(std::move(join_key_attributes)),
+        any_join_key_attributes_nullable_(any_join_key_attributes_nullable),
+        block_id_(lookup_block_id),
+        residual_predicate_(residual_predicate),
+        selection_(selection),
+        output_destination_(DCHECK_NOTNULL(output_destination)),
+        hash_table_(DCHECK_NOTNULL(hash_table)),
+        storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+
   ~HashJoinWorkOrder() override {}
 
   /**
diff --git a/relational_operators/SelectOperator.hpp b/relational_operators/SelectOperator.hpp
index a6e9e11..3da496c 100644
--- a/relational_operators/SelectOperator.hpp
+++ b/relational_operators/SelectOperator.hpp
@@ -215,6 +215,42 @@
         output_destination_(DCHECK_NOTNULL(output_destination)),
         storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
 
+  /**
+   * @brief Constructor for the distributed version.
+   *
+   * @note Reference parameter selection is NOT owned by this class and must
+   *       remain valid until after execute() is called.
+   *
+   * @param input_relation The relation to perform selection over.
+   * @param input_block_id The block id.
+   * @param predicate All tuples matching \c predicate will be selected (or NULL
+   *        to select all tuples).
+   * @param simple_projection Whether the Select is simple.
+   * @param simple_selection The list of attribute ids, used if \c
+   *        simple_projection is true.
+   * @param selection A list of Scalars which will be evaluated to project
+   *        input tuples, used if \c simple_projection is false.
+   * @param output_destination The InsertDestination to insert the selection
+   *        results.
+   * @param storage_manager The StorageManager to use.
+   **/
+  SelectWorkOrder(const CatalogRelationSchema &input_relation,
+                  const block_id input_block_id,
+                  const Predicate *predicate,
+                  const bool simple_projection,
+                  std::vector<attribute_id> &&simple_selection,
+                  const std::vector<std::unique_ptr<const Scalar>> *selection,
+                  InsertDestination *output_destination,
+                  StorageManager *storage_manager)
+      : input_relation_(input_relation),
+        input_block_id_(input_block_id),
+        predicate_(predicate),
+        simple_projection_(simple_projection),
+        simple_selection_(std::move(simple_selection)),
+        selection_(selection),
+        output_destination_(DCHECK_NOTNULL(output_destination)),
+        storage_manager_(DCHECK_NOTNULL(storage_manager)) {}
+
   ~SelectWorkOrder() override {}
 
   /**
diff --git a/relational_operators/SortMergeRunOperator.hpp b/relational_operators/SortMergeRunOperator.hpp
index 45b18be..f92affe 100644
--- a/relational_operators/SortMergeRunOperator.hpp
+++ b/relational_operators/SortMergeRunOperator.hpp
@@ -208,11 +208,6 @@
  */
 class SortMergeRunWorkOrder : public WorkOrder {
  public:
-  ~SortMergeRunWorkOrder() {}
-
-  void execute() override;
-
- private:
   /**
    * @brief Constructor.
    *
@@ -252,6 +247,11 @@
     DCHECK(sort_config_.isValid());
   }
 
+  ~SortMergeRunWorkOrder() {}
+
+  void execute() override;
+
+ private:
   const SortConfiguration &sort_config_;
   const CatalogRelationSchema &run_relation_;
   std::vector<merge_run_operator::Run> input_runs_;
diff --git a/relational_operators/SortMergeRunOperator.proto b/relational_operators/SortMergeRunOperator.proto
index 0ea8b07..3b8a777 100644
--- a/relational_operators/SortMergeRunOperator.proto
+++ b/relational_operators/SortMergeRunOperator.proto
@@ -1,5 +1,5 @@
 //   Copyright 2011-2015 Quickstep Technologies LLC.
-//   Copyright 2015 Pivotal Software, Inc.
+//   Copyright 2015-2016 Pivotal Software, Inc.
 //
 //   Licensed under the Apache License, Version 2.0 (the "License");
 //   you may not use this file except in compliance with the License.
@@ -17,6 +17,10 @@
 
 package quickstep.serialization;
 
+message Run {
+  repeated fixed64 blocks = 1;
+}
+
 message SortMergeRunOutput {
   required uint64 merge_level = 1;
   repeated fixed64 blocks = 2;
diff --git a/relational_operators/TextScanOperator.hpp b/relational_operators/TextScanOperator.hpp
index cf1ddbf..a2d4ced 100644
--- a/relational_operators/TextScanOperator.hpp
+++ b/relational_operators/TextScanOperator.hpp
@@ -324,7 +324,7 @@
    * @param scheduler_client_id The TMB client ID of the scheduler thread.
    * @param bus A pointer to the TMB.
    */
-  TextSplitWorkOrder(const std::string filename,
+  TextSplitWorkOrder(const std::string &filename,
                      const bool process_escape_sequences,
                      StorageManager *storage_manager,
                      const std::size_t operator_index,
diff --git a/relational_operators/WorkOrder.proto b/relational_operators/WorkOrder.proto
new file mode 100644
index 0000000..e0ec19d
--- /dev/null
+++ b/relational_operators/WorkOrder.proto
@@ -0,0 +1,244 @@
+//   Copyright 2011-2015 Quickstep Technologies LLC.
+//   Copyright 2015-2016 Pivotal Software, Inc.
+//
+//   Licensed under the Apache License, Version 2.0 (the "License");
+//   you may not use this file except in compliance with the License.
+//   You may obtain a copy of the License at
+//
+//       http://www.apache.org/licenses/LICENSE-2.0
+//
+//   Unless required by applicable law or agreed to in writing, software
+//   distributed under the License is distributed on an "AS IS" BASIS,
+//   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+//   See the License for the specific language governing permissions and
+//   limitations under the License.
+
+syntax = "proto2";
+
+package quickstep.serialization;
+
+import "relational_operators/SortMergeRunOperator.proto";
+import "relational_operators/TextScanOperator.proto";
+
+enum WorkOrderType {
+  AGGREGATION = 1;
+  BUILD_HASH = 2;
+  CREATE_INDEX = 3;  // Placeholder.
+  CREATE_TABLE = 4;  // Placeholder.
+  DELETE = 5;
+  DESTROY_HASH = 6;
+  DROP_TABLE = 7;
+  FINALIZE_AGGREGATION = 8;
+  HASH_JOIN = 9;
+  INSERT = 10;
+  NESTED_LOOP_JOIN = 11;
+  SAMPLE = 12;
+  SAVE_BLOCKS = 13;
+  SELECT = 14;
+  SORT_MERGE_RUN = 15;
+  SORT_RUN_GENERATION = 16;
+  TABLE_GENERATOR = 17;
+  TEXT_SCAN = 18;
+  TEXT_SPLIT = 19;
+  UPDATE = 20;
+}
+
+message WorkOrder {
+  required WorkOrderType work_order_type = 1;
+
+  // The convention for extension numbering is that extensions for a particular
+  // WorkOrderID should begin from (operator_type + 1) * 16.
+  extensions 16 to max;
+}
+
+message AggregationWorkOrder {
+  extend WorkOrder {
+    // All required.
+    optional uint32 aggr_state_index = 16;
+    optional fixed64 block_id = 17;
+  }
+}
+
+message BuildHashWorkOrder {
+  extend WorkOrder {
+    // All required.
+    optional int32 relation_id = 32;
+    repeated int32 join_key_attributes = 33;
+    optional bool any_join_key_attributes_nullable = 34;
+    optional uint32 join_hash_table_index = 35;
+    optional fixed64 block_id = 36;
+  }
+}
+
+message DeleteWorkOrder {
+  extend WorkOrder {
+    // All required.
+    optional uint64 operator_index = 96;
+    optional int32 relation_id = 97;
+    optional int32 predicate_index = 98;
+    optional fixed64 block_id = 99;
+  }
+}
+
+message DestroyHashWorkOrder {
+  extend WorkOrder {
+    // All required.
+    optional uint32 join_hash_table_index = 112;
+  }
+}
+
+message DropTableWorkOrder {
+  extend WorkOrder {
+    // If set, CatalogDatabaseCache will drop the relation.
+    optional int32 relation_id = 128;
+    // Optional, and maybe empty.
+    repeated fixed64 block_ids = 129;
+  }
+}
+
+message FinalizeAggregationWorkOrder {
+  extend WorkOrder {
+    // All required.
+    optional uint32 aggr_state_index = 144;
+    optional int32 insert_destination_index = 145;
+  }
+}
+
+message HashJoinWorkOrder {
+  extend WorkOrder {
+    // All required.
+    optional int32 build_relation_id = 160;
+    optional int32 probe_relation_id = 161;
+    repeated int32 join_key_attributes = 162;
+    optional bool any_join_key_attributes_nullable = 163;
+    optional int32 insert_destination_index = 164;
+    optional uint32 join_hash_table_index = 165;
+    optional int32 residual_predicate_index = 166;
+    optional int32 selection_index = 167;
+    optional fixed64 block_id = 168;
+  }
+}
+
+message InsertWorkOrder {
+  extend WorkOrder {
+    // All required.
+    optional int32 insert_destination_index = 176;
+    optional uint32 tuple_index = 177;
+  }
+}
+
+message NestedLoopsJoinWorkOrder {
+  extend WorkOrder {
+    // All required.
+    optional int32 left_relation_id = 192;
+    optional int32 right_relation_id = 193;
+    optional fixed64 left_block_id = 194;
+    optional fixed64 right_block_id = 195;
+    optional int32 insert_destination_index = 196;
+    optional int32 join_predicate_index = 197;
+    optional int32 selection_index = 198;
+  }
+}
+
+message SampleWorkOrder {
+  extend WorkOrder {
+    // All required.
+    optional int32 relation_id = 208;
+    optional fixed64 block_id = 209;
+    optional bool is_block_sample = 210;
+    optional int32 percentage = 211;
+    optional int32 insert_destination_index = 212;
+  }
+}
+
+message SaveBlocksWorkOrder {
+  extend WorkOrder {
+    // All required.
+    optional fixed64 block_id = 224;
+    optional bool force = 225;
+  }
+}
+
+message SelectWorkOrder {
+  extend WorkOrder {
+    // All required.
+    optional int32 relation_id = 240;
+    optional int32 insert_destination_index = 241;
+    optional int32 predicate_index = 242;
+    optional fixed64 block_id = 243;
+    optional bool simple_projection = 244;
+
+    // When 'simple_projection' is true.
+    repeated int32 simple_selection = 245;
+
+    // Otherwise.
+    optional int32 selection_index = 246;
+  }
+}
+
+message SortMergeRunWorkOrder {
+  extend WorkOrder {
+    // All required.
+    optional uint64 operator_index = 256;
+    optional uint64 sort_config_index = 257;
+    repeated Run runs = 258;
+    optional uint64 top_k = 259;
+    optional uint64 merge_level = 260;
+    optional int32 relation_id = 261;
+    optional int32 insert_destination_index = 262;
+  }
+}
+
+message SortRunGenerationWorkOrder {
+  extend WorkOrder {
+    // All required.
+    optional uint64 sort_config_index = 272;
+    optional int32 relation_id = 273;
+    optional int32 insert_destination_index = 274;
+    optional fixed64 block_id = 275;
+  }
+}
+
+message TableGeneratorWorkOrder {
+  extend WorkOrder {
+    // All required.
+    optional int32 generator_function_index = 288;
+    optional int32 insert_destination_index = 289;
+  }
+}
+
+message TextScanWorkOrder {
+  extend WorkOrder {
+    // All required.
+    optional uint32 field_terminator = 304;  // For one-byte char.
+    optional bool process_escape_sequences = 305;
+    optional int32 insert_destination_index = 306;
+
+    // Either
+    optional string filename = 307;
+
+    // Or
+    optional TextBlob text_blob = 308;
+  }
+}
+
+message TextSplitWorkOrder {
+  extend WorkOrder {
+    // All required.
+    optional uint64 operator_index = 320;
+    optional string filename = 321;
+    optional bool process_escape_sequences = 322;
+  }
+}
+
+message UpdateWorkOrder {
+  extend WorkOrder {
+    // All required.
+    optional uint64 operator_index = 336;
+    optional int32 relation_id = 337;
+    optional int32 insert_destination_index = 338;
+    optional int32 predicate_index = 339;
+    optional uint32 update_group_index = 340;
+    optional fixed64 block_id = 341;
+  }
+}
diff --git a/relational_operators/WorkOrderFactory.cpp b/relational_operators/WorkOrderFactory.cpp
new file mode 100644
index 0000000..4713681
--- /dev/null
+++ b/relational_operators/WorkOrderFactory.cpp
@@ -0,0 +1,602 @@
+/**
+ *   Copyright 2015-2016 Pivotal Software, Inc.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#include "relational_operators/WorkOrderFactory.hpp"
+
+#include <utility>
+#include <vector>
+
+#include "catalog/CatalogDatabaseLite.hpp"
+#include "catalog/CatalogRelationSchema.hpp"
+#include "catalog/CatalogTypedefs.hpp"
+#include "query_execution/QueryContext.hpp"
+#include "relational_operators/AggregationOperator.hpp"
+#include "relational_operators/BuildHashOperator.hpp"
+#include "relational_operators/DeleteOperator.hpp"
+#include "relational_operators/DestroyHashOperator.hpp"
+#include "relational_operators/DropTableOperator.hpp"
+#include "relational_operators/FinalizeAggregationOperator.hpp"
+#include "relational_operators/HashJoinOperator.hpp"
+#include "relational_operators/InsertOperator.hpp"
+#include "relational_operators/NestedLoopsJoinOperator.hpp"
+#include "relational_operators/SampleOperator.hpp"
+#include "relational_operators/SaveBlocksOperator.hpp"
+#include "relational_operators/SelectOperator.hpp"
+#include "relational_operators/SortMergeRunOperator.hpp"
+#include "relational_operators/SortMergeRunOperator.pb.h"
+#include "relational_operators/SortMergeRunOperatorHelpers.hpp"
+#include "relational_operators/SortRunGenerationOperator.hpp"
+#include "relational_operators/TableGeneratorOperator.hpp"
+#include "relational_operators/TextScanOperator.hpp"
+#include "relational_operators/TextScanOperator.pb.h"
+#include "relational_operators/UpdateOperator.hpp"
+#include "relational_operators/WorkOrder.pb.h"
+#include "storage/StorageBlockInfo.hpp"
+
+#include "glog/logging.h"
+
+#include "tmb/id_typedefs.h"
+
+using std::move;
+using std::vector;
+
+namespace quickstep {
+
+WorkOrder* WorkOrderFactory::ReconstructFromProto(const serialization::WorkOrder &proto,
+                                                  CatalogDatabaseLite *catalog_database,
+                                                  QueryContext *query_context,
+                                                  StorageManager *storage_manager,
+                                                  const tmb::client_id shiftboss_client_id,
+                                                  tmb::MessageBus *bus) {
+  DCHECK(query_context != nullptr);
+  DCHECK(ProtoIsValid(proto, *catalog_database, *query_context))
+      << "Attempted to create WorkOrder from an invalid proto description:\n"
+      << proto.DebugString();
+
+  switch (proto.work_order_type()) {
+    case serialization::AGGREGATION: {
+      LOG(INFO) << "Creating AggregationWorkOrder";
+      return new AggregationWorkOrder(
+          proto.GetExtension(serialization::AggregationWorkOrder::block_id),
+          query_context->getAggregationState(
+              proto.GetExtension(serialization::AggregationWorkOrder::aggr_state_index)));
+    }
+    case serialization::BUILD_HASH: {
+      LOG(INFO) << "Creating BuildHashWorkOrder";
+      vector<attribute_id> join_key_attributes;
+      for (int i = 0; i < proto.ExtensionSize(serialization::BuildHashWorkOrder::join_key_attributes); ++i) {
+        join_key_attributes.push_back(
+            proto.GetExtension(serialization::BuildHashWorkOrder::join_key_attributes, i));
+      }
+
+      return new BuildHashWorkOrder(
+          catalog_database->getRelationSchemaById(
+              proto.GetExtension(serialization::BuildHashWorkOrder::relation_id)),
+          move(join_key_attributes),
+          proto.GetExtension(serialization::BuildHashWorkOrder::any_join_key_attributes_nullable),
+          proto.GetExtension(serialization::BuildHashWorkOrder::block_id),
+          query_context->getJoinHashTable(
+              proto.GetExtension(serialization::BuildHashWorkOrder::join_hash_table_index)),
+          storage_manager);
+    }
+    case serialization::DELETE: {
+      LOG(INFO) << "Creating DeleteWorkOrder";
+      return new DeleteWorkOrder(
+          catalog_database->getRelationSchemaById(
+              proto.GetExtension(serialization::DeleteWorkOrder::relation_id)),
+          proto.GetExtension(serialization::DeleteWorkOrder::block_id),
+          query_context->getPredicate(
+              proto.GetExtension(serialization::DeleteWorkOrder::predicate_index)),
+          storage_manager,
+          proto.GetExtension(serialization::DeleteWorkOrder::operator_index),
+          shiftboss_client_id,
+          bus);
+    }
+    case serialization::DESTROY_HASH: {
+      LOG(INFO) << "Creating DestroyHashWorkOrder";
+      return new DestroyHashWorkOrder(
+          proto.GetExtension(serialization::DestroyHashWorkOrder::join_hash_table_index),
+          query_context);
+    }
+    case serialization::DROP_TABLE: {
+      LOG(INFO) << "Creating DropTableWorkOrder";
+      vector<block_id> blocks;
+      for (int i = 0; i < proto.ExtensionSize(serialization::DropTableWorkOrder::block_ids); ++i) {
+        blocks.push_back(
+            proto.GetExtension(serialization::DropTableWorkOrder::block_ids, i));
+      }
+
+      return new DropTableWorkOrder(
+          move(blocks),
+          storage_manager,
+          proto.HasExtension(serialization::DropTableWorkOrder::relation_id)
+              ? proto.GetExtension(serialization::DropTableWorkOrder::relation_id)
+              : kInvalidCatalogId,
+          catalog_database);
+    }
+    case serialization::FINALIZE_AGGREGATION: {
+      LOG(INFO) << "Creating FinalizeAggregationWorkOrder";
+      return new FinalizeAggregationWorkOrder(
+          query_context->releaseAggregationState(
+              proto.GetExtension(serialization::FinalizeAggregationWorkOrder::aggr_state_index)),
+          query_context->getInsertDestination(
+              proto.GetExtension(serialization::FinalizeAggregationWorkOrder::insert_destination_index)));
+    }
+    case serialization::HASH_JOIN: {
+      LOG(INFO) << "Creating HashJoinWorkOrder";
+      vector<attribute_id> join_key_attributes;
+      for (int i = 0; i < proto.ExtensionSize(serialization::HashJoinWorkOrder::join_key_attributes); ++i) {
+        join_key_attributes.push_back(
+            proto.GetExtension(serialization::HashJoinWorkOrder::join_key_attributes, i));
+      }
+
+      return new HashJoinWorkOrder(
+          catalog_database->getRelationSchemaById(
+              proto.GetExtension(serialization::HashJoinWorkOrder::build_relation_id)),
+          catalog_database->getRelationSchemaById(
+              proto.GetExtension(serialization::HashJoinWorkOrder::probe_relation_id)),
+          move(join_key_attributes),
+          proto.GetExtension(serialization::HashJoinWorkOrder::any_join_key_attributes_nullable),
+          proto.GetExtension(serialization::HashJoinWorkOrder::block_id),
+          query_context->getPredicate(
+              proto.GetExtension(serialization::HashJoinWorkOrder::residual_predicate_index)),
+          query_context->getScalarGroup(
+              proto.GetExtension(serialization::HashJoinWorkOrder::selection_index)),
+          query_context->getInsertDestination(
+              proto.GetExtension(serialization::HashJoinWorkOrder::insert_destination_index)),
+          query_context->getJoinHashTable(
+              proto.GetExtension(serialization::HashJoinWorkOrder::join_hash_table_index)),
+          storage_manager);
+    }
+    case serialization::INSERT: {
+      LOG(INFO) << "Creating InsertWorkOrder";
+      return new InsertWorkOrder(
+          query_context->getInsertDestination(
+              proto.GetExtension(serialization::InsertWorkOrder::insert_destination_index)),
+          query_context->releaseTuple(
+              proto.GetExtension(serialization::InsertWorkOrder::tuple_index)));
+    }
+    case serialization::NESTED_LOOP_JOIN: {
+      LOG(INFO) << "Creating NestedLoopsJoinWorkOrder";
+      return new NestedLoopsJoinWorkOrder(
+          catalog_database->getRelationSchemaById(
+              proto.GetExtension(serialization::NestedLoopsJoinWorkOrder::left_relation_id)),
+          catalog_database->getRelationSchemaById(
+              proto.GetExtension(serialization::NestedLoopsJoinWorkOrder::right_relation_id)),
+          proto.GetExtension(serialization::NestedLoopsJoinWorkOrder::left_block_id),
+          proto.GetExtension(serialization::NestedLoopsJoinWorkOrder::right_block_id),
+          query_context->getPredicate(
+              proto.GetExtension(serialization::NestedLoopsJoinWorkOrder::join_predicate_index)),
+          query_context->getScalarGroup(
+              proto.GetExtension(serialization::NestedLoopsJoinWorkOrder::selection_index)),
+          query_context->getInsertDestination(
+              proto.GetExtension(serialization::NestedLoopsJoinWorkOrder::insert_destination_index)),
+          storage_manager);
+    }
+    case serialization::SAMPLE: {
+      LOG(INFO) << "Creating SampleWorkOrder";
+      return new SampleWorkOrder(
+          catalog_database->getRelationSchemaById(
+              proto.GetExtension(serialization::SampleWorkOrder::relation_id)),
+          proto.GetExtension(serialization::SampleWorkOrder::block_id),
+          proto.GetExtension(serialization::SampleWorkOrder::is_block_sample),
+          proto.GetExtension(serialization::SampleWorkOrder::percentage),
+          query_context->getInsertDestination(
+              proto.GetExtension(serialization::SampleWorkOrder::insert_destination_index)),
+          storage_manager);
+    }
+    case serialization::SAVE_BLOCKS: {
+      LOG(INFO) << "Creating SaveBlocksWorkOrder";
+      return new SaveBlocksWorkOrder(
+          proto.GetExtension(serialization::SaveBlocksWorkOrder::block_id),
+          proto.GetExtension(serialization::SaveBlocksWorkOrder::force),
+          storage_manager);
+    }
+    case serialization::SELECT: {
+      LOG(INFO) << "Creating SelectWorkOrder";
+      const bool simple_projection =
+          proto.GetExtension(serialization::SelectWorkOrder::simple_projection);
+      vector<attribute_id> simple_selection;
+      for (int i = 0; i < proto.ExtensionSize(serialization::SelectWorkOrder::simple_selection); ++i) {
+        simple_selection.push_back(
+            proto.GetExtension(serialization::SelectWorkOrder::simple_selection, i));
+      }
+
+      return new SelectWorkOrder(
+          catalog_database->getRelationSchemaById(
+              proto.GetExtension(serialization::SelectWorkOrder::relation_id)),
+          proto.GetExtension(serialization::SelectWorkOrder::block_id),
+          query_context->getPredicate(
+              proto.GetExtension(serialization::SelectWorkOrder::predicate_index)),
+          simple_projection,
+          move(simple_selection),
+          simple_projection ? nullptr
+                            : &query_context->getScalarGroup(
+                                  proto.GetExtension(serialization::SelectWorkOrder::selection_index)),
+          query_context->getInsertDestination(
+              proto.GetExtension(serialization::SelectWorkOrder::insert_destination_index)),
+          storage_manager);
+    }
+    case serialization::SORT_MERGE_RUN: {
+      LOG(INFO) << "Creating SortMergeRunWorkOrder";
+      vector<merge_run_operator::Run> runs;
+      for (int i = 0; i < proto.ExtensionSize(serialization::SortMergeRunWorkOrder::runs); ++i) {
+        merge_run_operator::Run run;
+        const serialization::Run &run_proto =
+            proto.GetExtension(serialization::SortMergeRunWorkOrder::runs, i);
+        for (int j = 0; j < run_proto.blocks_size(); ++j) {
+          run.push_back(run_proto.blocks(j));
+        }
+        runs.push_back(move(run));
+      }
+
+      return new SortMergeRunWorkOrder(
+          query_context->getSortConfig(
+              proto.GetExtension(serialization::SortMergeRunWorkOrder::sort_config_index)),
+          catalog_database->getRelationSchemaById(
+              proto.GetExtension(serialization::SortMergeRunWorkOrder::relation_id)),
+          move(runs),
+          proto.GetExtension(serialization::SortMergeRunWorkOrder::top_k),
+          proto.GetExtension(serialization::SortMergeRunWorkOrder::merge_level),
+          query_context->getInsertDestination(
+              proto.GetExtension(serialization::SortMergeRunWorkOrder::insert_destination_index)),
+          storage_manager,
+          proto.GetExtension(serialization::SortMergeRunWorkOrder::operator_index),
+          shiftboss_client_id,
+          bus);
+    }
+    case serialization::SORT_RUN_GENERATION: {
+      LOG(INFO) << "Creating SortRunGenerationWorkOrder";
+      return new SortRunGenerationWorkOrder(
+          catalog_database->getRelationSchemaById(
+              proto.GetExtension(serialization::SortRunGenerationWorkOrder::relation_id)),
+          proto.GetExtension(serialization::SortRunGenerationWorkOrder::block_id),
+          query_context->getSortConfig(
+              proto.GetExtension(serialization::SortRunGenerationWorkOrder::sort_config_index)),
+          query_context->getInsertDestination(
+              proto.GetExtension(serialization::SortRunGenerationWorkOrder::insert_destination_index)),
+          storage_manager);
+    }
+    case serialization::TABLE_GENERATOR: {
+      LOG(INFO) << "Creating SortRunGenerationWorkOrder";
+      return new TableGeneratorWorkOrder(
+          query_context->getGeneratorFunctionHandle(
+              proto.GetExtension(serialization::TableGeneratorWorkOrder::generator_function_index)),
+          query_context->getInsertDestination(
+              proto.GetExtension(serialization::TableGeneratorWorkOrder::insert_destination_index)));
+    }
+    case serialization::TEXT_SCAN: {
+      LOG(INFO) << "Creating TextScanWorkOrder";
+      if (proto.HasExtension(serialization::TextScanWorkOrder::filename)) {
+        return new TextScanWorkOrder(
+            proto.GetExtension(serialization::TextScanWorkOrder::filename),
+            proto.GetExtension(serialization::TextScanWorkOrder::field_terminator),
+            proto.GetExtension(serialization::TextScanWorkOrder::process_escape_sequences),
+            query_context->getInsertDestination(
+                proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index)),
+            storage_manager);
+      } else {
+        const serialization::TextBlob &text_blob_proto =
+            proto.GetExtension(serialization::TextScanWorkOrder::text_blob);
+
+        return new TextScanWorkOrder(
+            text_blob_proto.blob_id(),
+            text_blob_proto.size(),
+            proto.GetExtension(serialization::TextScanWorkOrder::field_terminator),
+            proto.GetExtension(serialization::TextScanWorkOrder::process_escape_sequences),
+            query_context->getInsertDestination(
+                proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index)),
+            storage_manager);
+      }
+    }
+    case serialization::TEXT_SPLIT: {
+      LOG(INFO) << "Creating TextSplitWorkOrder";
+      return new TextSplitWorkOrder(
+          proto.GetExtension(serialization::TextSplitWorkOrder::filename),
+          proto.GetExtension(serialization::TextSplitWorkOrder::process_escape_sequences),
+          storage_manager,
+          proto.GetExtension(serialization::TextSplitWorkOrder::operator_index),
+          shiftboss_client_id,
+          bus);
+    }
+    case serialization::UPDATE: {
+      LOG(INFO) << "Creating UpdateWorkOrder";
+      return new UpdateWorkOrder(
+          catalog_database->getRelationSchemaById(
+              proto.GetExtension(serialization::UpdateWorkOrder::relation_id)),
+          proto.GetExtension(serialization::UpdateWorkOrder::block_id),
+          query_context->getPredicate(
+              proto.GetExtension(serialization::UpdateWorkOrder::predicate_index)),
+          query_context->getUpdateGroup(
+              proto.GetExtension(serialization::UpdateWorkOrder::update_group_index)),
+          query_context->getInsertDestination(
+              proto.GetExtension(serialization::UpdateWorkOrder::insert_destination_index)),
+          storage_manager,
+          proto.GetExtension(serialization::UpdateWorkOrder::operator_index),
+          shiftboss_client_id,
+          bus);
+    }
+    default:
+      LOG(FATAL) << "Unknown WorkOrder Type in WorkOrderFactory::ReconstructFromProto";
+  }
+}
+
+bool WorkOrderFactory::ProtoIsValid(const serialization::WorkOrder &proto,
+                                    const CatalogDatabaseLite &catalog_database,
+                                    const QueryContext &query_context) {
+  switch (proto.work_order_type()) {
+    case serialization::AGGREGATION: {
+      return proto.HasExtension(serialization::AggregationWorkOrder::block_id) &&
+             proto.HasExtension(serialization::AggregationWorkOrder::aggr_state_index) &&
+             query_context.isValidAggregationStateId(
+                 proto.GetExtension(serialization::AggregationWorkOrder::aggr_state_index));
+    }
+    case serialization::BUILD_HASH: {
+      if (!proto.HasExtension(serialization::BuildHashWorkOrder::relation_id)) {
+        return false;
+      }
+
+      const relation_id rel_id = proto.GetExtension(serialization::BuildHashWorkOrder::relation_id);
+      if (!catalog_database.hasRelationWithId(rel_id)) {
+        return false;
+      }
+
+      const CatalogRelationSchema &relation = catalog_database.getRelationSchemaById(rel_id);
+      for (int i = 0; i < proto.ExtensionSize(serialization::BuildHashWorkOrder::join_key_attributes); ++i) {
+        if (!relation.hasAttributeWithId(
+                proto.GetExtension(serialization::BuildHashWorkOrder::join_key_attributes, i))) {
+          return false;
+        }
+      }
+
+      return proto.HasExtension(serialization::BuildHashWorkOrder::any_join_key_attributes_nullable) &&
+             proto.HasExtension(serialization::BuildHashWorkOrder::block_id) &&
+             proto.HasExtension(serialization::BuildHashWorkOrder::join_hash_table_index) &&
+             query_context.isValidJoinHashTableId(
+                 proto.GetExtension(serialization::BuildHashWorkOrder::join_hash_table_index));
+    }
+    case serialization::DELETE: {
+      return proto.HasExtension(serialization::DeleteWorkOrder::relation_id) &&
+             catalog_database.hasRelationWithId(
+                 proto.GetExtension(serialization::DeleteWorkOrder::relation_id)) &&
+             proto.HasExtension(serialization::DeleteWorkOrder::predicate_index) &&
+             query_context.isValidPredicate(
+                 proto.GetExtension(serialization::DeleteWorkOrder::predicate_index)) &&
+             proto.HasExtension(serialization::DeleteWorkOrder::block_id) &&
+             proto.HasExtension(serialization::DeleteWorkOrder::operator_index);
+    }
+    case serialization::DESTROY_HASH: {
+      return proto.HasExtension(serialization::DestroyHashWorkOrder::join_hash_table_index) &&
+             query_context.isValidJoinHashTableId(
+                 proto.GetExtension(serialization::DestroyHashWorkOrder::join_hash_table_index));
+    }
+    case serialization::DROP_TABLE: {
+      return true;
+    }
+    case serialization::FINALIZE_AGGREGATION: {
+      return proto.HasExtension(serialization::FinalizeAggregationWorkOrder::aggr_state_index) &&
+             query_context.isValidAggregationStateId(
+                 proto.GetExtension(serialization::FinalizeAggregationWorkOrder::aggr_state_index)) &&
+             proto.HasExtension(serialization::FinalizeAggregationWorkOrder::insert_destination_index) &&
+             query_context.isValidInsertDestinationId(
+                 proto.GetExtension(serialization::FinalizeAggregationWorkOrder::insert_destination_index));
+    }
+    case serialization::HASH_JOIN: {
+      if (!proto.HasExtension(serialization::HashJoinWorkOrder::build_relation_id) ||
+          !proto.HasExtension(serialization::HashJoinWorkOrder::probe_relation_id)) {
+        return false;
+      }
+
+      const relation_id build_relation_id = proto.GetExtension(serialization::HashJoinWorkOrder::build_relation_id);
+      if (!catalog_database.hasRelationWithId(build_relation_id)) {
+        return false;
+      }
+
+      const relation_id probe_relation_id = proto.GetExtension(serialization::HashJoinWorkOrder::probe_relation_id);
+      if (!catalog_database.hasRelationWithId(probe_relation_id)) {
+        return false;
+      }
+
+      const CatalogRelationSchema &build_relation = catalog_database.getRelationSchemaById(build_relation_id);
+      const CatalogRelationSchema &probe_relation = catalog_database.getRelationSchemaById(probe_relation_id);
+      for (int i = 0; i < proto.ExtensionSize(serialization::HashJoinWorkOrder::join_key_attributes); ++i) {
+        const attribute_id attr_id = proto.GetExtension(serialization::HashJoinWorkOrder::join_key_attributes, i);
+        if (!build_relation.hasAttributeWithId(attr_id) ||
+            !probe_relation.hasAttributeWithId(attr_id)) {
+          return false;
+        }
+      }
+
+      return proto.HasExtension(serialization::HashJoinWorkOrder::any_join_key_attributes_nullable) &&
+             proto.HasExtension(serialization::HashJoinWorkOrder::insert_destination_index) &&
+             query_context.isValidInsertDestinationId(
+                 proto.GetExtension(serialization::HashJoinWorkOrder::insert_destination_index)) &&
+             proto.HasExtension(serialization::HashJoinWorkOrder::join_hash_table_index) &&
+             query_context.isValidJoinHashTableId(
+                 proto.GetExtension(serialization::HashJoinWorkOrder::join_hash_table_index)) &&
+             proto.HasExtension(serialization::HashJoinWorkOrder::residual_predicate_index) &&
+             query_context.isValidPredicate(
+                 proto.GetExtension(serialization::HashJoinWorkOrder::residual_predicate_index)) &&
+             proto.HasExtension(serialization::HashJoinWorkOrder::selection_index) &&
+             query_context.isValidScalarGroupId(
+                 proto.GetExtension(serialization::HashJoinWorkOrder::selection_index)) &&
+             proto.HasExtension(serialization::HashJoinWorkOrder::block_id);
+    }
+    case serialization::INSERT: {
+      return proto.HasExtension(serialization::InsertWorkOrder::insert_destination_index) &&
+             query_context.isValidInsertDestinationId(
+                 proto.GetExtension(serialization::InsertWorkOrder::insert_destination_index)) &&
+             proto.HasExtension(serialization::InsertWorkOrder::tuple_index) &&
+             query_context.isValidTupleId(
+                 proto.GetExtension(serialization::InsertWorkOrder::tuple_index));
+    }
+    case serialization::NESTED_LOOP_JOIN: {
+      if (!proto.HasExtension(serialization::NestedLoopsJoinWorkOrder::left_relation_id) ||
+          !proto.HasExtension(serialization::NestedLoopsJoinWorkOrder::right_relation_id)) {
+        return false;
+      }
+
+      const relation_id left_relation_id =
+          proto.GetExtension(serialization::NestedLoopsJoinWorkOrder::left_relation_id);
+      if (!catalog_database.hasRelationWithId(left_relation_id)) {
+        return false;
+      }
+
+      const relation_id right_relation_id =
+          proto.GetExtension(serialization::NestedLoopsJoinWorkOrder::right_relation_id);
+      if (!catalog_database.hasRelationWithId(right_relation_id)) {
+        return false;
+      }
+
+      return proto.HasExtension(serialization::NestedLoopsJoinWorkOrder::left_block_id) &&
+             proto.HasExtension(serialization::NestedLoopsJoinWorkOrder::right_block_id) &&
+             proto.HasExtension(serialization::NestedLoopsJoinWorkOrder::insert_destination_index) &&
+             query_context.isValidInsertDestinationId(
+                 proto.GetExtension(serialization::NestedLoopsJoinWorkOrder::insert_destination_index)) &&
+             proto.HasExtension(serialization::NestedLoopsJoinWorkOrder::join_predicate_index) &&
+             query_context.isValidPredicate(
+                 proto.GetExtension(serialization::NestedLoopsJoinWorkOrder::join_predicate_index)) &&
+             proto.HasExtension(serialization::NestedLoopsJoinWorkOrder::selection_index) &&
+             query_context.isValidScalarGroupId(
+                 proto.GetExtension(serialization::NestedLoopsJoinWorkOrder::selection_index));
+    }
+    case serialization::SAMPLE: {
+      return catalog_database.hasRelationWithId(proto.GetExtension(serialization::SampleWorkOrder::relation_id)) &&
+             proto.HasExtension(serialization::SampleWorkOrder::block_id) &&
+             proto.HasExtension(serialization::SampleWorkOrder::is_block_sample) &&
+             proto.HasExtension(serialization::SampleWorkOrder::percentage) &&
+             proto.HasExtension(serialization::SampleWorkOrder::insert_destination_index);
+    }
+    case serialization::SAVE_BLOCKS: {
+      return proto.HasExtension(serialization::SaveBlocksWorkOrder::block_id) &&
+             proto.HasExtension(serialization::SaveBlocksWorkOrder::force);
+    }
+    case serialization::SELECT: {
+      if (!proto.HasExtension(serialization::SelectWorkOrder::relation_id) ||
+          !proto.HasExtension(serialization::SelectWorkOrder::simple_projection) ||
+          !proto.HasExtension(serialization::SelectWorkOrder::selection_index)) {
+        return false;
+      }
+
+      const relation_id rel_id = proto.GetExtension(serialization::SelectWorkOrder::relation_id);
+      if (!catalog_database.hasRelationWithId(rel_id)) {
+        return false;
+      }
+
+      const CatalogRelationSchema &relation = catalog_database.getRelationSchemaById(rel_id);
+      for (int i = 0; i < proto.ExtensionSize(serialization::SelectWorkOrder::simple_selection); ++i) {
+        if (!relation.hasAttributeWithId(
+                 proto.GetExtension(serialization::SelectWorkOrder::simple_selection, i))) {
+          return false;
+        }
+      }
+
+      if (proto.GetExtension(serialization::SelectWorkOrder::simple_projection) ==
+              query_context.isValidScalarGroupId(
+                  proto.GetExtension(serialization::SelectWorkOrder::selection_index))) {
+        return false;
+      }
+
+      return proto.HasExtension(serialization::SelectWorkOrder::insert_destination_index) &&
+             query_context.isValidInsertDestinationId(
+                 proto.GetExtension(serialization::SelectWorkOrder::insert_destination_index)) &&
+             proto.HasExtension(serialization::SelectWorkOrder::predicate_index) &&
+             query_context.isValidPredicate(
+                 proto.GetExtension(serialization::SelectWorkOrder::predicate_index)) &&
+             proto.HasExtension(serialization::SelectWorkOrder::block_id);
+    }
+    case serialization::SORT_MERGE_RUN: {
+      // In Protobuf 2.6, proto.HasExtension does not work for the repeated
+      // message field, but Protobuf 3.0 beta works.
+      // TODO(zuyu): Validate serialization::SortMergeRunWorkOrder::runs.
+      return proto.HasExtension(serialization::SortMergeRunWorkOrder::sort_config_index) &&
+             query_context.isValidSortConfigId(
+                 proto.GetExtension(serialization::SortMergeRunWorkOrder::sort_config_index)) &&
+             proto.HasExtension(serialization::SortMergeRunWorkOrder::top_k) &&
+             proto.HasExtension(serialization::SortMergeRunWorkOrder::merge_level) &&
+             proto.HasExtension(serialization::SortMergeRunWorkOrder::relation_id) &&
+             catalog_database.hasRelationWithId(
+                 proto.GetExtension(serialization::SortMergeRunWorkOrder::relation_id)) &&
+             proto.HasExtension(serialization::SortMergeRunWorkOrder::insert_destination_index) &&
+             query_context.isValidInsertDestinationId(
+                 proto.GetExtension(serialization::SortMergeRunWorkOrder::insert_destination_index)) &&
+             proto.HasExtension(serialization::SortMergeRunWorkOrder::operator_index);
+    }
+    case serialization::SORT_RUN_GENERATION: {
+      return proto.HasExtension(serialization::SortRunGenerationWorkOrder::relation_id) &&
+             catalog_database.hasRelationWithId(
+                 proto.GetExtension(serialization::SortRunGenerationWorkOrder::relation_id)) &&
+             proto.HasExtension(serialization::SortRunGenerationWorkOrder::insert_destination_index) &&
+             query_context.isValidInsertDestinationId(
+                 proto.GetExtension(serialization::SortRunGenerationWorkOrder::insert_destination_index)) &&
+             proto.HasExtension(serialization::SortRunGenerationWorkOrder::sort_config_index) &&
+             query_context.isValidSortConfigId(
+                 proto.GetExtension(serialization::SortRunGenerationWorkOrder::sort_config_index)) &&
+             proto.HasExtension(serialization::SortRunGenerationWorkOrder::block_id);
+    }
+    case serialization::TABLE_GENERATOR: {
+      return proto.HasExtension(serialization::TableGeneratorWorkOrder::generator_function_index) &&
+             query_context.isValidGeneratorFunctionId(
+                 proto.GetExtension(serialization::TableGeneratorWorkOrder::generator_function_index)) &&
+             proto.HasExtension(serialization::TableGeneratorWorkOrder::insert_destination_index) &&
+             query_context.isValidInsertDestinationId(
+                 proto.GetExtension(serialization::TableGeneratorWorkOrder::insert_destination_index));
+    }
+    case serialization::TEXT_SCAN: {
+      if (!proto.HasExtension(serialization::TextScanWorkOrder::field_terminator) ||
+          !proto.HasExtension(serialization::TextScanWorkOrder::process_escape_sequences) ||
+          !proto.HasExtension(serialization::TextScanWorkOrder::insert_destination_index) ||
+          !query_context.isValidInsertDestinationId(
+              proto.GetExtension(serialization::TextScanWorkOrder::insert_destination_index))) {
+        return false;
+      }
+
+      // Two fields are exclusive.
+      if (proto.HasExtension(serialization::TextScanWorkOrder::filename) ==
+              proto.HasExtension(serialization::TextScanWorkOrder::text_blob)) {
+        return false;
+      }
+
+      return proto.HasExtension(serialization::TextScanWorkOrder::filename) ||
+             proto.GetExtension(serialization::TextScanWorkOrder::text_blob).IsInitialized();
+    }
+    case serialization::TEXT_SPLIT: {
+      return proto.HasExtension(serialization::TextSplitWorkOrder::filename) &&
+             proto.HasExtension(serialization::TextSplitWorkOrder::process_escape_sequences) &&
+             proto.HasExtension(serialization::TextSplitWorkOrder::operator_index);
+    }
+    case serialization::UPDATE: {
+      return proto.HasExtension(serialization::UpdateWorkOrder::relation_id) &&
+             catalog_database.hasRelationWithId(
+                 proto.GetExtension(serialization::UpdateWorkOrder::relation_id)) &&
+             proto.HasExtension(serialization::UpdateWorkOrder::insert_destination_index) &&
+             query_context.isValidInsertDestinationId(
+                 proto.GetExtension(serialization::UpdateWorkOrder::insert_destination_index)) &&
+             proto.HasExtension(serialization::UpdateWorkOrder::predicate_index) &&
+             query_context.isValidPredicate(
+                 proto.GetExtension(serialization::UpdateWorkOrder::predicate_index)) &&
+             proto.HasExtension(serialization::UpdateWorkOrder::update_group_index) &&
+             query_context.isValidUpdateGroupId(
+                 proto.GetExtension(serialization::UpdateWorkOrder::update_group_index)) &&
+             proto.HasExtension(serialization::UpdateWorkOrder::operator_index) &&
+             proto.HasExtension(serialization::UpdateWorkOrder::block_id);
+    }
+    default:
+      return false;
+  }
+}
+
+}  // namespace quickstep
diff --git a/relational_operators/WorkOrderFactory.hpp b/relational_operators/WorkOrderFactory.hpp
new file mode 100644
index 0000000..a870d09
--- /dev/null
+++ b/relational_operators/WorkOrderFactory.hpp
@@ -0,0 +1,93 @@
+/**
+ *   Copyright 2015-2016 Pivotal Software, Inc.
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ **/
+
+#ifndef QUICKSTEP_RELATIONAL_OPERATORS_WORK_ORDER_FACTORY_HPP_
+#define QUICKSTEP_RELATIONAL_OPERATORS_WORK_ORDER_FACTORY_HPP_
+
+#include "utility/Macros.hpp"
+
+#include "tmb/id_typedefs.h"
+
+namespace tmb { class MessageBus; }
+
+namespace quickstep {
+
+class CatalogDatabaseLite;
+class QueryContext;
+class StorageManager;
+class WorkOrder;
+
+namespace serialization { class WorkOrder; }
+
+/** \addtogroup RelationalOperators
+ *  @{
+ */
+
+/**
+ * @brief All-static factory object that provides access to WorkOrder.
+ **/
+class WorkOrderFactory {
+ public:
+  /**
+   * @brief Get a pointer to a WorkOrder from its serialized Protocol Buffer
+   *        form.
+   *
+   * @param proto The Protocol Buffer representation of a WorkOrder object,
+   *        originally generated by RelationalOperator::getAllWorkOrders.
+   * @param catalog_database The database to resolve relation and attribute
+   *        references in.
+   * @param query_context A pointer to QueryContext.
+   * @param storage_manager The StorageManager to use.
+   * @param shiftboss_client_id The TMB client id of Shiftboss.
+   * @param bus A pointer to the TMB.
+   *
+   * @return A new WorkOrder reconstructed from the supplied Protocol Buffer.
+   **/
+  static WorkOrder* ReconstructFromProto(const serialization::WorkOrder &proto,
+                                         CatalogDatabaseLite *catalog_database,
+                                         QueryContext *query_context,
+                                         StorageManager *storage_manager,
+                                         const tmb::client_id shiftboss_client_id,
+                                         tmb::MessageBus *bus);
+
+  /**
+   * @brief Check whether a serialization::WorkOrder is fully-formed and
+   *        all parts are valid.
+   *
+   * @param proto A serialized Protocol Buffer representation of a WorkOrder.
+   * @param catalog_database The database to resolve relation and attribute
+   *        references in.
+   * @param query_context The QueryContext to use.
+   *
+   * @return Whether proto is fully-formed and valid.
+   **/
+  static bool ProtoIsValid(const serialization::WorkOrder &proto,
+                           const CatalogDatabaseLite &catalog_database,
+                           const QueryContext &query_context);
+
+ private:
+  // Undefined default constructor. Class is all-static and should not be
+  // instantiated.
+  WorkOrderFactory();
+
+  DISALLOW_COPY_AND_ASSIGN(WorkOrderFactory);
+};
+
+/** @} */
+
+}  // namespace quickstep
+
+#endif  // QUICKSTEP_RELATIONAL_OPERATORS_WORK_ORDER_FACTORY_HPP_